1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 #ifndef CLOUDSYNCER_TEST_H 16 #define CLOUDSYNCER_TEST_H 17 18 #include "cloud_merge_strategy.h" 19 #include "cloud_syncer.h" 20 #include "cloud_syncer_test.h" 21 #include "cloud_sync_utils.h" 22 #include "mock_iclouddb.h" 23 24 namespace DistributedDB { 25 26 const Asset ASSET_COPY = { .version = 1, 27 .name = "Phone", 28 .assetId = "0", 29 .subpath = "/local/sync", 30 .uri = "/local/sync", 31 .modifyTime = "123456", 32 .createTime = "", 33 .size = "256", 34 .hash = "ASE" }; 35 36 class TestStorageProxy : public StorageProxy { 37 public: TestStorageProxy(ICloudSyncStorageInterface * iCloud)38 explicit TestStorageProxy(ICloudSyncStorageInterface *iCloud) : StorageProxy(iCloud) 39 { 40 Init(); 41 } 42 }; 43 44 class TestCloudSyncer : public CloudSyncer { 45 public: TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy)46 explicit TestCloudSyncer(std::shared_ptr<DistributedDB::TestStorageProxy> storageProxy) : CloudSyncer(storageProxy) 47 { 48 } 49 ~TestCloudSyncer() override = default; 50 DISABLE_COPY_ASSIGN_MOVE(TestCloudSyncer); 51 InitCloudSyncer(TaskId taskId,SyncMode mode)52 void InitCloudSyncer(TaskId taskId, SyncMode mode) 53 { 54 cloudTaskInfos_.insert(std::pair<TaskId, CloudTaskInfo>{taskId, CloudTaskInfo()}); 55 cloudTaskInfos_[taskId].mode = mode; 56 cloudTaskInfos_[taskId].taskId = taskId; 57 currentContext_.tableName = "TestTable" + std::to_string(taskId); 58 cloudTaskInfos_[taskId].table.push_back(currentContext_.tableName); 59 currentContext_.currentTaskId = taskId; 60 currentContext_.notifier = std::make_shared<ProcessNotifier>(this); 61 currentContext_.processRecorder = std::make_shared<ProcessRecorder>(); 62 currentContext_.notifier->Init({currentContext_.tableName}, { "cloud" }, cloudTaskInfos_[taskId].users); 63 currentContext_.strategy = std::make_shared<CloudMergeStrategy>(); 64 currentContext_.strategy->SetIsKvScene(isKvScene_); 65 closed_ = false; 66 cloudTaskInfos_[taskId].callback = [this, taskId](const std::map<std::string, SyncProcess> &process) { 67 if (process.size() >= 1u) { 68 process_[taskId] = process.begin()->second; 69 } else { 70 SyncProcess tmpProcess; 71 process_[taskId] = tmpProcess; 72 } 73 }; 74 } 75 SetCurrentCloudTaskInfos(std::vector<std::string> tables,const SyncProcessCallback & onProcess)76 void SetCurrentCloudTaskInfos(std::vector<std::string> tables, const SyncProcessCallback &onProcess) 77 { 78 cloudTaskInfos_[currentContext_.currentTaskId].table = tables; 79 cloudTaskInfos_[currentContext_.currentTaskId].callback = onProcess; 80 } 81 GetCurrentCloudTaskInfos()82 CloudTaskInfo GetCurrentCloudTaskInfos() 83 { 84 return cloudTaskInfos_[currentContext_.currentTaskId]; 85 } 86 CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & onProcess,int64_t waitTime)87 int CreateCloudTaskInfoAndCallTryToAddSync(SyncMode mode, const std::vector<std::string> &tables, 88 const SyncProcessCallback &onProcess, int64_t waitTime) 89 { 90 CloudTaskInfo taskInfo; 91 taskInfo.mode = mode; 92 taskInfo.table = tables; 93 taskInfo.callback = onProcess; 94 taskInfo.timeout = waitTime; 95 return TryToAddSyncTask(std::move(taskInfo)); 96 } 97 CallClose()98 void CallClose() 99 { 100 currentContext_.currentTaskId = 0u; 101 currentContext_.strategy = nullptr; 102 currentContext_.notifier = nullptr; 103 Close(); 104 } SetTimeOut(TaskId taskId,int64_t timeout)105 void SetTimeOut(TaskId taskId, int64_t timeout) 106 { 107 this->cloudTaskInfos_[taskId].timeout = timeout; 108 } 109 InitCloudSyncerForSync()110 void InitCloudSyncerForSync() 111 { 112 this->closed_ = false; 113 this->cloudTaskInfos_[this->lastTaskId_].callback = [this]( 114 const std::map<std::string, SyncProcess> &process) { 115 if (process.size() == 1u) { 116 process_[this->lastTaskId_] = process.begin()->second; 117 } else { 118 SyncProcess tmpProcess; 119 process_[this->lastTaskId_] = tmpProcess; 120 } 121 }; 122 } 123 CallDoSyncInner(const CloudTaskInfo & taskInfo)124 int CallDoSyncInner(const CloudTaskInfo &taskInfo) 125 { 126 return DoSyncInner(taskInfo); 127 } 128 getCallback(TaskId taskId)129 SyncProcessCallback getCallback(TaskId taskId) 130 { 131 return cloudTaskInfos_[taskId].callback; 132 } 133 getCurrentTaskId()134 TaskId getCurrentTaskId() 135 { 136 return currentContext_.currentTaskId; 137 } 138 139 int CallDoUpload(TaskId taskId, bool lastTable = false, LockAction lockAction = LockAction::INSERT) 140 { 141 storageProxy_->StartTransaction(); 142 int ret = CloudSyncer::DoUpload(taskId, lastTable, lockAction); 143 storageProxy_->Commit(); 144 return ret; 145 } 146 CallDoDownload(TaskId taskId)147 int CallDoDownload(TaskId taskId) 148 { 149 return CloudSyncer::DoDownload(taskId, true); 150 } 151 GetCurrentContextTableName()152 std::string GetCurrentContextTableName() 153 { 154 return this->currentContext_.tableName; 155 } 156 SetCurrentContextTableName(std::string name)157 void SetCurrentContextTableName(std::string name) 158 { 159 this->currentContext_.tableName = name; 160 } 161 CallClearCloudSyncData(CloudSyncData & uploadData)162 void CallClearCloudSyncData(CloudSyncData& uploadData) 163 { 164 CloudSyncUtils::ClearCloudSyncData(uploadData); 165 } 166 GetUploadSuccessCount(TaskId taskId)167 int32_t GetUploadSuccessCount(TaskId taskId) 168 { 169 return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.successCount; 170 } 171 GetUploadFailCount(TaskId taskId)172 int32_t GetUploadFailCount(TaskId taskId) 173 { 174 return this->process_[taskId].tableProcess[this->GetCurrentContextTableName()].upLoadInfo.failCount; 175 } 176 SetMockICloudDB(MockICloudDB * icloudDB)177 void SetMockICloudDB(MockICloudDB *icloudDB) 178 { 179 this->cloudDB_.SetCloudDB(std::shared_ptr<MockICloudDB>(icloudDB)); 180 } 181 SetMockICloudDB(std::shared_ptr<MockICloudDB> & icloudDB)182 void SetMockICloudDB(std::shared_ptr<MockICloudDB> &icloudDB) 183 { 184 this->cloudDB_.SetCloudDB(icloudDB); 185 } 186 SetAndGetCloudTaskInfo(SyncMode mode,std::vector<std::string> table,SyncProcessCallback callback,int64_t timeout)187 CloudTaskInfo SetAndGetCloudTaskInfo(SyncMode mode, std::vector<std::string> table, 188 SyncProcessCallback callback, int64_t timeout) 189 { 190 CloudTaskInfo taskInfo; 191 taskInfo.mode = mode; 192 taskInfo.table = table; 193 taskInfo.callback = callback; 194 taskInfo.timeout = timeout; 195 return taskInfo; 196 } 197 initFullCloudSyncData(CloudSyncData & uploadData,int size)198 void initFullCloudSyncData(CloudSyncData &uploadData, int size) 199 { 200 VBucket tmp = { std::pair<std::string, int64_t>(CloudDbConstant::MODIFY_FIELD, 1), 201 std::pair<std::string, int64_t>(CloudDbConstant::CREATE_FIELD, 1), 202 std::pair<std::string, std::string>(CloudDbConstant::GID_FIELD, "0"), 203 std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) }; 204 VBucket asset = { std::pair<std::string, Asset>(CloudDbConstant::ASSET, ASSET_COPY) }; 205 uploadData.insData.record = std::vector<VBucket>(size, tmp); 206 uploadData.insData.extend = std::vector<VBucket>(size, tmp); 207 uploadData.insData.assets = std::vector<VBucket>(size, asset); 208 uploadData.updData.record = std::vector<VBucket>(size, tmp); 209 uploadData.updData.extend = std::vector<VBucket>(size, tmp); 210 uploadData.updData.assets = std::vector<VBucket>(size, asset); 211 uploadData.delData.record = std::vector<VBucket>(size, tmp); 212 uploadData.delData.extend = std::vector<VBucket>(size, tmp); 213 } 214 CallTryToAddSyncTask(CloudTaskInfo && taskInfo)215 int CallTryToAddSyncTask(CloudTaskInfo &&taskInfo) 216 { 217 return TryToAddSyncTask(std::move(taskInfo)); 218 } 219 PopTaskQueue()220 void PopTaskQueue() 221 { 222 taskQueue_.pop_back(); 223 } 224 CallPrepareSync(TaskId taskId)225 int CallPrepareSync(TaskId taskId) 226 { 227 return PrepareSync(taskId); 228 } 229 CallNotify()230 void CallNotify() 231 { 232 auto info = cloudTaskInfos_[currentContext_.currentTaskId]; 233 currentContext_.notifier->NotifyProcess(info, {}); 234 } 235 SetAssetFields(const TableName & tableName,const std::vector<Field> & assetFields)236 void SetAssetFields(const TableName &tableName, const std::vector<Field> &assetFields) 237 { 238 currentContext_.tableName = tableName; 239 currentContext_.assetFields[currentContext_.tableName] = assetFields; 240 } 241 242 std::map<std::string, Assets> TestTagAssetsInSingleRecord( 243 VBucket &coveredData, VBucket &beCoveredData, bool setNormalStatus = false) 244 { 245 int ret = E_OK; 246 return TagAssetsInSingleRecord(coveredData, beCoveredData, setNormalStatus, ret); 247 } 248 TestIsDataContainDuplicateAsset(std::vector<Field> & assetFields,VBucket & data)249 bool TestIsDataContainDuplicateAsset(std::vector<Field> &assetFields, VBucket &data) 250 { 251 return IsDataContainDuplicateAsset(assetFields, data); 252 } 253 SetCloudWaterMarks(const TableName & tableName,const std::string & mark)254 void SetCloudWaterMarks(const TableName &tableName, const std::string &mark) 255 { 256 currentContext_.tableName = tableName; 257 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName] = mark; 258 } 259 CallDownloadAssets()260 int CallDownloadAssets() 261 { 262 InnerProcessInfo info; 263 std::vector<std::string> pKColNames; 264 std::set<Key> dupHashKeySet; 265 ChangedData changedAssets; 266 return CloudSyncer::DownloadAssets(info, pKColNames, dupHashKeySet, changedAssets); 267 } 268 SetCurrentContext(TaskId taskId)269 void SetCurrentContext(TaskId taskId) 270 { 271 currentContext_.currentTaskId = taskId; 272 } 273 SetLastTaskId(TaskId taskId)274 void SetLastTaskId(TaskId taskId) 275 { 276 lastTaskId_ = taskId; 277 } 278 SetCurrentTaskPause()279 void SetCurrentTaskPause() 280 { 281 cloudTaskInfos_[currentContext_.currentTaskId].pause = true; 282 } 283 SetAssetDownloadList(int downloadCount)284 void SetAssetDownloadList(int downloadCount) 285 { 286 for (int i = 0; i < downloadCount; ++i) { 287 currentContext_.assetDownloadList.push_back({}); 288 } 289 } 290 SetQuerySyncObject(TaskId taskId,const QuerySyncObject & query)291 void SetQuerySyncObject(TaskId taskId, const QuerySyncObject &query) 292 { 293 std::vector<QuerySyncObject> queryList; 294 queryList.push_back(query); 295 cloudTaskInfos_[taskId].queryList = queryList; 296 } 297 CallGetQuerySyncObject(const std::string & tableName)298 QuerySyncObject CallGetQuerySyncObject(const std::string &tableName) 299 { 300 return CloudSyncer::GetQuerySyncObject(tableName); 301 } 302 CallReloadWaterMarkIfNeed(TaskId taskId,WaterMark & waterMark)303 void CallReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark) 304 { 305 CloudSyncer::ReloadWaterMarkIfNeed(taskId, waterMark); 306 } 307 CallRecordWaterMark(TaskId taskId,Timestamp waterMark)308 void CallRecordWaterMark(TaskId taskId, Timestamp waterMark) 309 { 310 CloudSyncer::RecordWaterMark(taskId, waterMark); 311 } 312 SetResumeSyncParam(TaskId taskId,const SyncParam & syncParam)313 void SetResumeSyncParam(TaskId taskId, const SyncParam &syncParam) 314 { 315 resumeTaskInfos_[taskId].syncParam = syncParam; 316 resumeTaskInfos_[taskId].context.tableName = syncParam.tableName; 317 } 318 ClearResumeTaskInfo(TaskId taskId)319 void ClearResumeTaskInfo(TaskId taskId) 320 { 321 resumeTaskInfos_.erase(taskId); 322 } 323 SetTaskResume(TaskId taskId,bool resume)324 void SetTaskResume(TaskId taskId, bool resume) 325 { 326 cloudTaskInfos_[taskId].resume = resume; 327 } 328 CallGetSyncParamForDownload(TaskId taskId,SyncParam & param)329 int CallGetSyncParamForDownload(TaskId taskId, SyncParam ¶m) 330 { 331 return CloudSyncer::GetSyncParamForDownload(taskId, param); 332 } 333 SetResumeTaskUpload(TaskId taskId,bool upload)334 void SetResumeTaskUpload(TaskId taskId, bool upload) 335 { 336 resumeTaskInfos_[taskId].upload = upload; 337 } 338 IsResumeTaskUpload(TaskId taskId)339 bool IsResumeTaskUpload(TaskId taskId) 340 { 341 return resumeTaskInfos_[taskId].upload; 342 } 343 CallHandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)344 int CallHandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m, 345 VBucket &localAssetInfo) 346 { 347 return CloudSyncer::HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo); 348 } 349 GetProcessRecorder()350 std::shared_ptr<ProcessRecorder> GetProcessRecorder() 351 { 352 std::lock_guard<std::mutex> autoLock(dataLock_); 353 return currentContext_.processRecorder; 354 } 355 CallDoDownloadInNeed(bool needUpload,bool isFirstDownload)356 int CallDoDownloadInNeed(bool needUpload, bool isFirstDownload) 357 { 358 CloudTaskInfo taskInfo; 359 { 360 std::lock_guard<std::mutex> autoLock(dataLock_); 361 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId]; 362 } 363 return DoDownloadInNeed(taskInfo, needUpload, isFirstDownload); 364 } 365 CallDoUploadInNeed()366 int CallDoUploadInNeed() 367 { 368 CloudTaskInfo taskInfo; 369 { 370 std::lock_guard<std::mutex> autoLock(dataLock_); 371 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId]; 372 } 373 return DoUploadInNeed(taskInfo, true); 374 } 375 CloudTaskInfo taskInfo_; 376 private: 377 std::map<TaskId, SyncProcess> process_; 378 }; 379 380 } 381 #endif // #define CLOUDSYNCER_TEST_H