1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H
17 #define OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H
18 #include <atomic>
19 #include <functional>
20 #include <shared_mutex>
21 
22 #include "concurrent_map.h"
23 #include "metadata/store_meta_data.h"
24 #include "rdb_asset_loader.h"
25 #include "rdb_cloud.h"
26 #include "rdb_store.h"
27 #include "relational_store_delegate.h"
28 #include "relational_store_manager.h"
29 #include "store/general_store.h"
30 #include "store/general_value.h"
31 #include "snapshot/snapshot.h"
32 namespace OHOS::DistributedRdb {
33 class RdbGeneralStore : public DistributedData::GeneralStore {
34 public:
35     using Cursor = DistributedData::Cursor;
36     using GenQuery = DistributedData::GenQuery;
37     using VBucket = DistributedData::VBucket;
38     using VBuckets = DistributedData::VBuckets;
39     using Value = DistributedData::Value;
40     using Values = DistributedData::Values;
41     using StoreMetaData = DistributedData::StoreMetaData;
42     using Database = DistributedData::Database;
43     using GenErr = DistributedData::GeneralError;
44     using RdbStore = OHOS::NativeRdb::RdbStore;
45     using Reference = DistributedData::Reference;
46     using Snapshot = DistributedData::Snapshot;
47     using BindAssets = DistributedData::BindAssets;
48 
49     explicit RdbGeneralStore(const StoreMetaData &meta);
50     ~RdbGeneralStore();
51 
52     static void OnSyncStart(const DistributedData::StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode,
53         uint32_t traceId, uint32_t syncCount);
54     static void OnSyncFinish(const DistributedData::StoreInfo &storeInfo, uint32_t flag, uint32_t syncMode,
55         uint32_t traceId);
56     void SetExecutor(std::shared_ptr<Executor> executor) override;
57     int32_t Bind(Database &database, const std::map<uint32_t, BindInfo> &bindInfos,
58         const CloudConfig &config) override;
59     bool IsBound() override;
60     bool IsValid();
61     int32_t Execute(const std::string &table, const std::string &sql) override;
62     int32_t SetDistributedTables(const std::vector<std::string> &tables, int32_t type,
63 	    const std::vector<Reference> &references) override;
64     int32_t SetTrackerTable(const std::string& tableName, const std::set<std::string>& trackerColNames,
65         const std::string& extendColName, bool isForceUpgrade = false) override;
66     int32_t Insert(const std::string &table, VBuckets &&values) override;
67     int32_t Update(const std::string &table, const std::string &setSql, Values &&values, const std::string &whereSql,
68         Values &&conditions) override;
69     int32_t Replace(const std::string &table, VBucket &&value) override;
70     int32_t Delete(const std::string &table, const std::string &sql, Values &&args) override;
71     std::pair<int32_t, std::shared_ptr<Cursor>> Query(const std::string &table, const std::string &sql,
72         Values &&args) override;
73     std::pair<int32_t, std::shared_ptr<Cursor>> Query(const std::string &table, GenQuery &query) override;
74     std::pair<int32_t, int32_t> Sync(const Devices &devices, GenQuery &query, DetailAsync async,
75         const DistributedData::SyncParam &syncParam) override;
76     std::pair<int32_t, std::shared_ptr<Cursor>> PreSharing(GenQuery &query) override;
77     int32_t Clean(const std::vector<std::string> &devices, int32_t mode, const std::string &tableName) override;
78     int32_t Watch(int32_t origin, Watcher &watcher) override;
79     int32_t Unwatch(int32_t origin, Watcher &watcher) override;
80     int32_t RegisterDetailProgressObserver(DetailAsync async) override;
81     int32_t UnregisterDetailProgressObserver() override;
82     int32_t Close(bool isForce = false) override;
83     int32_t AddRef() override;
84     int32_t Release() override;
85     int32_t BindSnapshots(std::shared_ptr<std::map<std::string, std::shared_ptr<Snapshot>>> bindAssets) override;
86     int32_t MergeMigratedData(const std::string &tableName, VBuckets&& values) override;
87     int32_t CleanTrackerData(const std::string &tableName, int64_t cursor) override;
88     std::vector<std::string> GetWaterVersion(const std::string &deviceId) override;
89     std::pair<int32_t, uint32_t> LockCloudDB() override;
90     int32_t UnLockCloudDB() override;
91 
92 private:
93     RdbGeneralStore(const RdbGeneralStore& rdbGeneralStore);
94     RdbGeneralStore& operator=(const RdbGeneralStore& rdbGeneralStore);
95     using RdbDelegate = DistributedDB::RelationalStoreDelegate;
96     using RdbManager = DistributedDB::RelationalStoreManager;
97     using SyncProcess = DistributedDB::SyncProcess;
98     using DBBriefCB = DistributedDB::SyncStatusCallback;
99     using DBProcessCB = std::function<void(const std::map<std::string, SyncProcess> &processes)>;
100     using TaskId = ExecutorPool::TaskId;
101     using Time = std::chrono::steady_clock::time_point;
102     using SyncId = uint64_t;
103     static GenErr ConvertStatus(DistributedDB::DBStatus status);
104     void InitStoreInfo(const StoreMetaData &meta);
105     // GetIntersection and return results in the order of collecter1
106     static std::vector<std::string> GetIntersection(std::vector<std::string> &&syncTables,
107         const std::set<std::string> &localTables);
108     static constexpr inline uint64_t REMOTE_QUERY_TIME_OUT = 30 * 1000;
109     static constexpr int64_t INTERVAL = 1;
110     static constexpr const char* CLOUD_GID = "cloud_gid";
111     static constexpr const char* DATE_KEY = "data_key";
112     static constexpr const char* QUERY_TABLES_SQL = "select name from sqlite_master where type = 'table';";
113     static constexpr uint32_t ITER_V0 = 10000;
114     static constexpr uint32_t ITER_V1 = 5000;
115     static constexpr uint32_t ITERS[] = {ITER_V0, ITER_V1};
116     static constexpr uint32_t ITERS_COUNT = sizeof(ITERS) / sizeof(ITERS[0]);
117     class ObserverProxy : public DistributedDB::StoreObserver {
118     public:
119         using DBChangedIF = DistributedDB::StoreChangedData;
120         using DBChangedData = DistributedDB::ChangedData;
121         using DBOrigin = DistributedDB::Origin;
122         using GenOrigin = Watcher::Origin;
123         void OnChange(const DistributedDB::StoreChangedData &data) override;
124         void OnChange(DBOrigin origin, const std::string &originalId, DBChangedData &&data) override;
HasWatcher()125         bool HasWatcher() const
126         {
127             return watcher_ != nullptr;
128         }
129     private:
130         enum ChangeType {
131             CLOUD_DATA_CHANGE = 0,
132             CLOUD_DATA_CLEAN
133         };
134         void PostDataChange(const StoreMetaData &meta, const std::vector<std::string> &tables, ChangeType type);
135         friend RdbGeneralStore;
136         Watcher *watcher_ = nullptr;
137         std::string storeId_;
138         StoreMetaData meta_;
139     };
140     DBBriefCB GetDBBriefCB(DetailAsync async);
141     DBProcessCB GetCB(SyncId syncId);
142     DBProcessCB GetDBProcessCB(DetailAsync async, uint32_t syncMode, SyncId syncId,
143         uint32_t highMode = AUTO_SYNC_MODE);
144     Executor::Task GetFinishTask(SyncId syncId);
145     std::shared_ptr<Cursor> RemoteQuery(const std::string &device,
146         const DistributedDB::RemoteCondition &remoteCondition);
147     std::string BuildSql(const std::string& table, const std::string& statement,
148         const std::vector<std::string>& columns) const;
149     std::pair<int32_t, VBuckets> QuerySql(const std::string& sql, Values &&args);
150     std::set<std::string> GetTables();
151     VBuckets ExtractExtend(VBuckets& values) const;
152     size_t SqlConcatenate(VBucket &value, std::string &strColumnSql, std::string &strRowValueSql);
153     bool IsPrintLog(DistributedDB::DBStatus status);
154     std::shared_ptr<RdbCloud> GetRdbCloud() const;
155     bool IsFinished(uint64_t syncId) const;
156     void RemoveTasks();
157 
158     ObserverProxy observer_;
159     RdbManager manager_;
160     RdbDelegate *delegate_ = nullptr;
161     DetailAsync async_ = nullptr;
162     std::shared_ptr<RdbCloud> rdbCloud_ {};
163     std::shared_ptr<RdbAssetLoader> rdbLoader_ {};
164     BindInfo bindInfo_;
165     std::atomic<bool> isBound_ = false;
166     std::mutex mutex_;
167     int32_t ref_ = 1;
168     mutable std::shared_timed_mutex rwMutex_;
169 
170     BindAssets snapshots_;
171     DistributedData::StoreInfo storeInfo_;
172 
173     DistributedDB::DBStatus lastError_ = DistributedDB::DBStatus::OK;
174     static constexpr uint32_t PRINT_ERROR_CNT = 150;
175     uint32_t lastErrCnt_ = 0;
176     uint32_t syncNotifyFlag_ = 0;
177     std::atomic<SyncId> syncTaskId_ = 0;
178     std::shared_mutex asyncMutex_ {};
179     mutable std::shared_mutex rdbCloudMutex_;
180     struct FinishTask {
181         TaskId taskId = Executor::INVALID_TASK_ID;
182         DBProcessCB cb = nullptr;
183     };
184     std::shared_ptr<Executor> executor_ = nullptr;
185     std::shared_ptr<ConcurrentMap<SyncId, FinishTask>> tasks_;
186 };
187 } // namespace OHOS::DistributedRdb
188 #endif // OHOS_DISTRIBUTED_DATA_DATAMGR_SERVICE_RDB_GENERAL_STORE_H