1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H 17 #define OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H 18 #include <atomic> 19 #include <functional> 20 #include <shared_mutex> 21 22 #include "concurrent_map.h" 23 #include "metadata/store_meta_data.h" 24 #include "rdb_asset_loader.h" 25 #include "rdb_cloud.h" 26 #include "rdb_store.h" 27 #include "relational_store_delegate.h" 28 #include "relational_store_manager.h" 29 #include "store/general_store.h" 30 #include "store/general_value.h" 31 #include "snapshot/snapshot.h" 32 namespace OHOS::DistributedRdb { 33 class RdbGeneralStore : public DistributedData::GeneralStore { 34 public: 35 using Cursor = DistributedData::Cursor; 36 using GenQuery = DistributedData::GenQuery; 37 using VBucket = DistributedData::VBucket; 38 using VBuckets = DistributedData::VBuckets; 39 using Value = DistributedData::Value; 40 using Values = DistributedData::Values; 41 using StoreMetaData = DistributedData::StoreMetaData; 42 using Database = DistributedData::Database; 43 using GenErr = DistributedData::GeneralError; 44 using RdbStore = OHOS::NativeRdb::RdbStore; 45 using Reference = DistributedData::Reference; 46 using Snapshot = DistributedData::Snapshot; 47 using BindAssets = DistributedData::BindAssets; 48 49 explicit RdbGeneralStore(const StoreMetaData &meta); 50 ~RdbGeneralStore(); 51 52 static void OnSyncStart(const DistributedData::StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, 53 uint32_t traceId, uint32_t syncCount); 54 static void OnSyncFinish(const DistributedData::StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode, 55 uint32_t traceId); 56 void SetExecutor(std::shared_ptr<Executor> executor) override; 57 int32_t Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos, 58 const CloudConfig &config) override; 59 bool IsBound() override; 60 bool IsValid(); 61 int32_t Execute(const std::string &table, const std::string &sql) override; 62 int32_t SetDistributedTables(const std::vector<std::string> &tables, int32_t type, 63 const std::vector<Reference> &references) override; 64 int32_t SetTrackerTable(const std::string& tableName, const std::set<std::string>& trackerColNames, 65 const std::string& extendColName, bool isForceUpgrade = false) override; 66 int32_t Insert(const std::string &table, VBuckets &&values) override; 67 int32_t Update(const std::string &table, const std::string &setSql, Values &&values, const std::string &whereSql, 68 Values &&conditions) override; 69 int32_t Replace(const std::string &table, VBucket &&value) override; 70 int32_t Delete(const std::string &table, const std::string &sql, Values &&args) override; 71 std::pair<int32_t, std::shared_ptr<Cursor>> Query(const std::string &table, const std::string &sql, 72 Values &&args) override; 73 std::pair<int32_t, std::shared_ptr<Cursor>> Query(const std::string &table, GenQuery &query) override; 74 std::pair<int32_t, int32_t> Sync(const Devices &devices, GenQuery &query, DetailAsync async, 75 const DistributedData::SyncParam &syncParam) override; 76 std::pair<int32_t, std::shared_ptr<Cursor>> PreSharing(GenQuery &query) override; 77 int32_t Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName) override; 78 int32_t Watch(int32_t origin, Watcher &watcher) override; 79 int32_t Unwatch(int32_t origin, Watcher &watcher) override; 80 int32_t RegisterDetailProgressObserver(DetailAsync async) override; 81 int32_t UnregisterDetailProgressObserver() override; 82 int32_t Close(bool isForce = false) override; 83 int32_t AddRef() override; 84 int32_t Release() override; 85 int32_t BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets) override; 86 int32_t MergeMigratedData(const std::string &tableName, VBuckets&& values) override; 87 int32_t CleanTrackerData(const std::string &tableName, int64_t cursor) override; 88 std::vector<std::string> GetWaterVersion(const std::string &deviceId) override; 89 std::pair<int32_t, uint32_t> LockCloudDB() override; 90 int32_t UnLockCloudDB() override; 91 92 private: 93 RdbGeneralStore(const RdbGeneralStore& rdbGeneralStore); 94 RdbGeneralStore& operator=(const RdbGeneralStore& rdbGeneralStore); 95 using RdbDelegate = DistributedDB::RelationalStoreDelegate; 96 using RdbManager = DistributedDB::RelationalStoreManager; 97 using SyncProcess = DistributedDB::SyncProcess; 98 using DBBriefCB = DistributedDB::SyncStatusCallback; 99 using DBProcessCB = std::function<void(const std::map<std::string, SyncProcess> &processes)>; 100 using TaskId = ExecutorPool::TaskId; 101 using Time = std::chrono::steady_clock::time_point; 102 using SyncId = uint64_t; 103 static GenErr ConvertStatus(DistributedDB::DBStatus status); 104 void InitStoreInfo(const StoreMetaData &meta); 105 // GetIntersection and return results in the order of collecter1 106 static std::vector<std::string> GetIntersection(std::vector<std::string> &&syncTables, 107 const std::set<std::string> &localTables); 108 static constexpr inline uint64_t REMOTE_QUERY_TIME_OUT = 30 * 1000; 109 static constexpr int64_t INTERVAL = 1; 110 static constexpr const char* CLOUD_GID = "cloud_gid"; 111 static constexpr const char* DATE_KEY = "data_key"; 112 static constexpr const char* QUERY_TABLES_SQL = "select name from sqlite_master where type = 'table';"; 113 static constexpr uint32_t ITER_V0 = 10000; 114 static constexpr uint32_t ITER_V1 = 5000; 115 static constexpr uint32_t ITERS[] = {ITER_V0, ITER_V1}; 116 static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]); 117 class ObserverProxy : public DistributedDB::StoreObserver { 118 public: 119 using DBChangedIF = DistributedDB::StoreChangedData; 120 using DBChangedData = DistributedDB::ChangedData; 121 using DBOrigin = DistributedDB::Origin; 122 using GenOrigin = Watcher::Origin; 123 void OnChange(const DistributedDB::StoreChangedData &data) override; 124 void OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data) override; HasWatcher()125 bool HasWatcher() const 126 { 127 return watcher_ != nullptr; 128 } 129 private: 130 enum ChangeType { 131 CLOUD_DATA_CHANGE = 0, 132 CLOUD_DATA_CLEAN 133 }; 134 void PostDataChange(const StoreMetaData &meta, const std::vector<std::string> &tables, ChangeType type); 135 friend RdbGeneralStore; 136 Watcher *watcher_ = nullptr; 137 std::string storeId_; 138 StoreMetaData meta_; 139 }; 140 DBBriefCB GetDBBriefCB(DetailAsync async); 141 DBProcessCB GetCB(SyncId syncId); 142 DBProcessCB GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId, 143 uint32_t highMode = AUTO_SYNC_MODE); 144 Executor::Task GetFinishTask(SyncId syncId); 145 std::shared_ptr<Cursor> RemoteQuery(const std::string &device, 146 const DistributedDB::RemoteCondition &remoteCondition); 147 std::string BuildSql(const std::string& table, const std::string& statement, 148 const std::vector<std::string>& columns) const; 149 std::pair<int32_t, VBuckets> QuerySql(const std::string& sql, Values &&args); 150 std::set<std::string> GetTables(); 151 VBuckets ExtractExtend(VBuckets& values) const; 152 size_t SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql); 153 bool IsPrintLog(DistributedDB::DBStatus status); 154 std::shared_ptr<RdbCloud> GetRdbCloud() const; 155 bool IsFinished(uint64_t syncId) const; 156 void RemoveTasks(); 157 158 ObserverProxy observer_; 159 RdbManager manager_; 160 RdbDelegate *delegate_ = nullptr; 161 DetailAsync async_ = nullptr; 162 std::shared_ptr<RdbCloud> rdbCloud_ {}; 163 std::shared_ptr<RdbAssetLoader> rdbLoader_ {}; 164 BindInfo bindInfo_; 165 std::atomic<bool> isBound_ = false; 166 std::mutex mutex_; 167 int32_t ref_ = 1; 168 mutable std::shared_timed_mutex rwMutex_; 169 170 BindAssets snapshots_; 171 DistributedData::StoreInfo storeInfo_; 172 173 DistributedDB::DBStatus lastError_ = DistributedDB::DBStatus::OK; 174 static constexpr uint32_t PRINT_ERROR_CNT = 150; 175 uint32_t lastErrCnt_ = 0; 176 uint32_t syncNotifyFlag_ = 0; 177 std::atomic<SyncId> syncTaskId_ = 0; 178 std::shared_mutex asyncMutex_ {}; 179 mutable std::shared_mutex rdbCloudMutex_; 180 struct FinishTask { 181 TaskId taskId = Executor::INVALID_TASK_ID; 182 DBProcessCB cb = nullptr; 183 }; 184 std::shared_ptr<Executor> executor_ = nullptr; 185 std::shared_ptr<ConcurrentMap<SyncId, FinishTask>> tasks_; 186 }; 187 } // namespace OHOS::DistributedRdb 188 #endif // OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H