1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifndef CLOUDSYNCER_TEST_H
16 #define CLOUDSYNCER_TEST_H
17 
18 #include "cloud_merge_strategy.h"
19 #include "cloud_syncer.h"
20 #include "cloud_syncer_test.h"
21 #include "cloud_sync_utils.h"
22 #include "mock_iclouddb.h"
23 
24 namespace DistributedDB {
25 
26 const Asset ASSET_COPY = { .version = 1,
27     .name = "Phone",
28     .assetId = "0",
29     .subpath = "/local/sync",
30     .uri = "/local/sync",
31     .modifyTime = "123456",
32     .createTime = "",
33     .size = "256",
34     .hash = "ASE" };
35 
36 class TestStorageProxy : public StorageProxy {
37 public:
TestStorageProxy(ICloudSyncStorageInterface * iCloud)38     explicit TestStorageProxy(ICloudSyncStorageInterface *iCloud) : StorageProxy(iCloud)
39     {
40         Init();
41     }
42 };
43 
44 class TestCloudSyncer : public CloudSyncer {
45 public:
TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy)46     explicit TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy) : CloudSyncer(storageProxy)
47     {
48     }
49     ~TestCloudSyncer() override = default;
50     DISABLE_COPY_ASSIGN_MOVE(TestCloudSyncer);
51 
InitCloudSyncer(TaskId taskId,SyncMode mode)52     void InitCloudSyncer(TaskId taskId, SyncMode mode)
53     {
54         cloudTaskInfos_.insert(std::pair<TaskId, CloudTaskInfo>{taskId, CloudTaskInfo()});
55         cloudTaskInfos_[taskId].mode = mode;
56         cloudTaskInfos_[taskId].taskId = taskId;
57         currentContext_.tableName = "TestTable" + std::to_string(taskId);
58         cloudTaskInfos_[taskId].table.push_back(currentContext_.tableName);
59         currentContext_.currentTaskId = taskId;
60         currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
61         currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
62         currentContext_.notifier->Init({currentContext_.tableName}, { "cloud" }, cloudTaskInfos_[taskId].users);
63         currentContext_.strategy = std::make_shared<CloudMergeStrategy>();
64         currentContext_.strategy->SetIsKvScene(isKvScene_);
65         closed_ = false;
66         cloudTaskInfos_[taskId].callback = [this, taskId](const std::map<std::string, SyncProcess> &process) {
67             if (process.size() >= 1u) {
68                 process_[taskId] = process.begin()->second;
69             } else {
70                 SyncProcess tmpProcess;
71                 process_[taskId] = tmpProcess;
72             }
73         };
74     }
75 
SetCurrentCloudTaskInfos(std::vector<std::string> tables,const SyncProcessCallback & onProcess)76     void SetCurrentCloudTaskInfos(std::vector<std::string> tables, const SyncProcessCallback &onProcess)
77     {
78         cloudTaskInfos_[currentContext_.currentTaskId].table = tables;
79         cloudTaskInfos_[currentContext_.currentTaskId].callback = onProcess;
80     }
81 
GetCurrentCloudTaskInfos()82     CloudTaskInfo GetCurrentCloudTaskInfos()
83     {
84         return cloudTaskInfos_[currentContext_.currentTaskId];
85     }
86 
CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & onProcess,int64_t waitTime)87     int CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode, const std::vector<std::string> &tables,
88         const SyncProcessCallback &onProcess, int64_t waitTime)
89     {
90         CloudTaskInfo taskInfo;
91         taskInfo.mode = mode;
92         taskInfo.table = tables;
93         taskInfo.callback = onProcess;
94         taskInfo.timeout = waitTime;
95         return TryToAddSyncTask(std::move(taskInfo));
96     }
97 
CallClose()98     void CallClose()
99     {
100         currentContext_.currentTaskId = 0u;
101         currentContext_.strategy = nullptr;
102         currentContext_.notifier = nullptr;
103         Close();
104     }
SetTimeOut(TaskId taskId,int64_t timeout)105     void SetTimeOut(TaskId taskId, int64_t timeout)
106     {
107         this->cloudTaskInfos_[taskId].timeout = timeout;
108     }
109 
InitCloudSyncerForSync()110     void InitCloudSyncerForSync()
111     {
112         this->closed_ = false;
113         this->cloudTaskInfos_[this->lastTaskId_].callback = [this](
114             const std::map<std::string, SyncProcess> &process) {
115             if (process.size() == 1u) {
116                 process_[this->lastTaskId_] = process.begin()->second;
117             } else {
118                 SyncProcess tmpProcess;
119                 process_[this->lastTaskId_] = tmpProcess;
120             }
121         };
122     }
123 
CallDoSyncInner(const CloudTaskInfo & taskInfo)124     int CallDoSyncInner(const CloudTaskInfo &taskInfo)
125     {
126         return DoSyncInner(taskInfo);
127     }
128 
getCallback(TaskId taskId)129     SyncProcessCallback getCallback(TaskId taskId)
130     {
131         return cloudTaskInfos_[taskId].callback;
132     }
133 
getCurrentTaskId()134     TaskId getCurrentTaskId()
135     {
136         return currentContext_.currentTaskId;
137     }
138 
139     int CallDoUpload(TaskId taskId, bool lastTable = false, LockAction lockAction = LockAction::INSERT)
140     {
141         storageProxy_->StartTransaction();
142         int ret = CloudSyncer::DoUpload(taskId, lastTable, lockAction);
143         storageProxy_->Commit();
144         return ret;
145     }
146 
CallDoDownload(TaskId taskId)147     int CallDoDownload(TaskId taskId)
148     {
149         return CloudSyncer::DoDownload(taskId, true);
150     }
151 
GetCurrentContextTableName()152     std::string GetCurrentContextTableName()
153     {
154         return this->currentContext_.tableName;
155     }
156 
SetCurrentContextTableName(std::string name)157     void SetCurrentContextTableName(std::string name)
158     {
159         this->currentContext_.tableName = name;
160     }
161 
CallClearCloudSyncData(CloudSyncData & uploadData)162     void CallClearCloudSyncData(CloudSyncData& uploadData)
163     {
164         CloudSyncUtils::ClearCloudSyncData(uploadData);
165     }
166 
GetUploadSuccessCount(TaskId taskId)167     int32_t GetUploadSuccessCount(TaskId taskId)
168     {
169         return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.successCount;
170     }
171 
GetUploadFailCount(TaskId taskId)172     int32_t GetUploadFailCount(TaskId taskId)
173     {
174         return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.failCount;
175     }
176 
SetMockICloudDB(MockICloudDB * icloudDB)177     void SetMockICloudDB(MockICloudDB *icloudDB)
178     {
179         this->cloudDB_.SetCloudDB(std::shared_ptr<MockICloudDB>(icloudDB));
180     }
181 
SetMockICloudDB(std::shared_ptr<MockICloudDB> & icloudDB)182     void SetMockICloudDB(std::shared_ptr<MockICloudDB> &icloudDB)
183     {
184         this->cloudDB_.SetCloudDB(icloudDB);
185     }
186 
SetAndGetCloudTaskInfo(SyncMode mode,std::vector<std::string> table,SyncProcessCallback callback,int64_t timeout)187     CloudTaskInfo SetAndGetCloudTaskInfo(SyncMode mode, std::vector<std::string> table,
188         SyncProcessCallback callback, int64_t timeout)
189     {
190         CloudTaskInfo taskInfo;
191         taskInfo.mode = mode;
192         taskInfo.table = table;
193         taskInfo.callback = callback;
194         taskInfo.timeout = timeout;
195         return taskInfo;
196     }
197 
initFullCloudSyncData(CloudSyncData & uploadData,int size)198     void initFullCloudSyncData(CloudSyncData &uploadData, int size)
199     {
200         VBucket tmp = { std::pair<std::string, int64_t>(CloudDbConstant::MODIFY_FIELD, 1),
201                         std::pair<std::string, int64_t>(CloudDbConstant::CREATE_FIELD, 1),
202                         std::pair<std::string, std::string>(CloudDbConstant::GID_FIELD, "0"),
203                         std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) };
204         VBucket asset = { std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) };
205         uploadData.insData.record = std::vector<VBucket>(size, tmp);
206         uploadData.insData.extend = std::vector<VBucket>(size, tmp);
207         uploadData.insData.assets = std::vector<VBucket>(size, asset);
208         uploadData.updData.record = std::vector<VBucket>(size, tmp);
209         uploadData.updData.extend = std::vector<VBucket>(size, tmp);
210         uploadData.updData.assets = std::vector<VBucket>(size, asset);
211         uploadData.delData.record = std::vector<VBucket>(size, tmp);
212         uploadData.delData.extend = std::vector<VBucket>(size, tmp);
213     }
214 
CallTryToAddSyncTask(CloudTaskInfo && taskInfo)215     int CallTryToAddSyncTask(CloudTaskInfo &&taskInfo)
216     {
217         return TryToAddSyncTask(std::move(taskInfo));
218     }
219 
PopTaskQueue()220     void PopTaskQueue()
221     {
222         taskQueue_.pop_back();
223     }
224 
CallPrepareSync(TaskId taskId)225     int CallPrepareSync(TaskId taskId)
226     {
227         return PrepareSync(taskId);
228     }
229 
CallNotify()230     void CallNotify()
231     {
232         auto info = cloudTaskInfos_[currentContext_.currentTaskId];
233         currentContext_.notifier->NotifyProcess(info, {});
234     }
235 
SetAssetFields(const TableName & tableName,const std::vector<Field> & assetFields)236     void SetAssetFields(const TableName &tableName, const std::vector<Field> &assetFields)
237     {
238         currentContext_.tableName = tableName;
239         currentContext_.assetFields[currentContext_.tableName] = assetFields;
240     }
241 
242     std::map<std::string, Assets> TestTagAssetsInSingleRecord(
243         VBucket &coveredData, VBucket &beCoveredData, bool setNormalStatus = false)
244     {
245         int ret = E_OK;
246         return TagAssetsInSingleRecord(coveredData, beCoveredData, setNormalStatus, ret);
247     }
248 
TestIsDataContainDuplicateAsset(std::vector<Field> & assetFields,VBucket & data)249     bool TestIsDataContainDuplicateAsset(std::vector<Field> &assetFields, VBucket &data)
250     {
251         return IsDataContainDuplicateAsset(assetFields, data);
252     }
253 
SetCloudWaterMarks(const TableName & tableName,const std::string & mark)254     void SetCloudWaterMarks(const TableName &tableName, const std::string &mark)
255     {
256         currentContext_.tableName = tableName;
257         currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName] = mark;
258     }
259 
CallDownloadAssets()260     int CallDownloadAssets()
261     {
262         InnerProcessInfo info;
263         std::vector<std::string> pKColNames;
264         std::set<Key> dupHashKeySet;
265         ChangedData changedAssets;
266         return CloudSyncer::DownloadAssets(info, pKColNames, dupHashKeySet, changedAssets);
267     }
268 
SetCurrentContext(TaskId taskId)269     void SetCurrentContext(TaskId taskId)
270     {
271         currentContext_.currentTaskId = taskId;
272     }
273 
SetLastTaskId(TaskId taskId)274     void SetLastTaskId(TaskId taskId)
275     {
276         lastTaskId_ = taskId;
277     }
278 
SetCurrentTaskPause()279     void SetCurrentTaskPause()
280     {
281         cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
282     }
283 
SetAssetDownloadList(int downloadCount)284     void SetAssetDownloadList(int downloadCount)
285     {
286         for (int i = 0; i < downloadCount; ++i) {
287             currentContext_.assetDownloadList.push_back({});
288         }
289     }
290 
SetQuerySyncObject(TaskId taskId,const QuerySyncObject & query)291     void SetQuerySyncObject(TaskId taskId, const QuerySyncObject &query)
292     {
293         std::vector<QuerySyncObject> queryList;
294         queryList.push_back(query);
295         cloudTaskInfos_[taskId].queryList = queryList;
296     }
297 
CallGetQuerySyncObject(const std::string & tableName)298     QuerySyncObject CallGetQuerySyncObject(const std::string &tableName)
299     {
300         return CloudSyncer::GetQuerySyncObject(tableName);
301     }
302 
CallReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)303     void CallReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark)
304     {
305         CloudSyncer::ReloadWaterMarkIfNeed(taskId, waterMark);
306     }
307 
CallRecordWaterMark(TaskId taskId,Timestamp waterMark)308     void CallRecordWaterMark(TaskId taskId, Timestamp waterMark)
309     {
310         CloudSyncer::RecordWaterMark(taskId, waterMark);
311     }
312 
SetResumeSyncParam(TaskId taskId,const SyncParam & syncParam)313     void SetResumeSyncParam(TaskId taskId, const SyncParam &syncParam)
314     {
315         resumeTaskInfos_[taskId].syncParam = syncParam;
316         resumeTaskInfos_[taskId].context.tableName = syncParam.tableName;
317     }
318 
ClearResumeTaskInfo(TaskId taskId)319     void ClearResumeTaskInfo(TaskId taskId)
320     {
321         resumeTaskInfos_.erase(taskId);
322     }
323 
SetTaskResume(TaskId taskId,bool resume)324     void SetTaskResume(TaskId taskId, bool resume)
325     {
326         cloudTaskInfos_[taskId].resume = resume;
327     }
328 
CallGetSyncParamForDownload(TaskId taskId,SyncParam & param)329     int CallGetSyncParamForDownload(TaskId taskId, SyncParam &param)
330     {
331         return CloudSyncer::GetSyncParamForDownload(taskId, param);
332     }
333 
SetResumeTaskUpload(TaskId taskId,bool upload)334     void SetResumeTaskUpload(TaskId taskId, bool upload)
335     {
336         resumeTaskInfos_[taskId].upload = upload;
337     }
338 
IsResumeTaskUpload(TaskId taskId)339     bool IsResumeTaskUpload(TaskId taskId)
340     {
341         return resumeTaskInfos_[taskId].upload;
342     }
343 
CallHandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)344     int CallHandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
345         VBucket &localAssetInfo)
346     {
347         return CloudSyncer::HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
348     }
349 
GetProcessRecorder()350     std::shared_ptr<ProcessRecorder> GetProcessRecorder()
351     {
352         std::lock_guard<std::mutex> autoLock(dataLock_);
353         return currentContext_.processRecorder;
354     }
355 
CallDoDownloadInNeed(bool needUpload,bool isFirstDownload)356     int CallDoDownloadInNeed(bool needUpload, bool isFirstDownload)
357     {
358         CloudTaskInfo taskInfo;
359         {
360             std::lock_guard<std::mutex> autoLock(dataLock_);
361             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
362         }
363         return DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
364     }
365 
CallDoUploadInNeed()366     int CallDoUploadInNeed()
367     {
368         CloudTaskInfo taskInfo;
369         {
370             std::lock_guard<std::mutex> autoLock(dataLock_);
371             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
372         }
373         return DoUploadInNeed(taskInfo, true);
374     }
375     CloudTaskInfo taskInfo_;
376 private:
377     std::map<TaskId, SyncProcess> process_;
378 };
379 
380 }
381 #endif // #define CLOUDSYNCER_TEST_H