1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "cloud/cloud_sync_utils.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "cloud_syncer.h"
22 #include "db_common.h"
23 #include "distributeddb_data_generate_unit_test.h"
24 #include "log_print.h"
25 #include "relational_store_client.h"
26 #include "relational_store_delegate.h"
27 #include "relational_store_instance.h"
28 #include "relational_store_manager.h"
29 #include "relational_sync_able_storage.h"
30 #include "runtime_config.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36 
37 namespace {
38 using namespace testing::ext;
39 using namespace DistributedDB;
40 using namespace DistributedDBUnitTest;
41 const char *g_createSQL =
42     "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(" \
43     "id TEXT PRIMARY KEY," \
44     "name TEXT," \
45     "height REAL ," \
46     "photo BLOB," \
47     "age INT);";
48 const char *g_createNonPrimaryKeySQL =
49     "CREATE TABLE IF NOT EXISTS NonPrimaryKeyTable(" \
50     "id TEXT," \
51     "name TEXT," \
52     "height REAL ," \
53     "photo BLOB," \
54     "age INT);";
55 const int64_t g_syncWaitTime = 60;
56 
57 const Asset g_cloudAsset = {
58     .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
59     .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
60 };
61 
62 std::vector<DBStatus> g_actualDBStatus;
63 std::map<std::string, SyncProcess> lastProcess_;
64 
CreateUserDBAndTable(sqlite3 * & db)65 void CreateUserDBAndTable(sqlite3 *&db)
66 {
67     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
68     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
69     EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createNonPrimaryKeySQL), SQLITE_OK);
70 }
71 
PrepareOption(CloudSyncOption & option,const Query & query,bool isPriorityTask,bool isCompensatedSyncOnly=false)72 void PrepareOption(CloudSyncOption &option, const Query &query, bool isPriorityTask, bool isCompensatedSyncOnly = false)
73 {
74     option.devices = { "CLOUD" };
75     option.mode = SYNC_MODE_CLOUD_MERGE;
76     option.query = query;
77     option.waitTime = g_syncWaitTime;
78     option.priorityTask = isPriorityTask;
79     option.compensatedSyncOnly = isCompensatedSyncOnly;
80 }
81 
BlockSync(const Query & query,RelationalStoreDelegate * delegate,std::vector<DBStatus> & actualDBStatus,bool prioritySync=false)82 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, std::vector<DBStatus> &actualDBStatus,
83     bool prioritySync = false)
84 {
85     std::mutex dataMutex;
86     std::condition_variable cv;
87     bool finish = false;
88     auto callback = [&actualDBStatus, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
89         for (const auto &item: process) {
90             actualDBStatus.push_back(item.second.errCode);
91             if (item.second.process == DistributedDB::FINISHED) {
92                 {
93                     std::lock_guard<std::mutex> autoLock(dataMutex);
94                     finish = true;
95                 }
96                 cv.notify_one();
97             }
98         }
99     };
100     CloudSyncOption option;
101     option.devices = { "CLOUD" };
102     option.mode = SYNC_MODE_CLOUD_MERGE;
103     option.query = query;
104     option.waitTime = g_syncWaitTime;
105     option.priorityTask = prioritySync;
106     ASSERT_EQ(delegate->Sync(option, callback), OK);
107     std::unique_lock<std::mutex> uniqueLock(dataMutex);
108     cv.wait(uniqueLock, [&finish]() {
109         return finish;
110     });
111 }
112 
BlockPrioritySync(const Query & query,RelationalStoreDelegate * delegate,bool isPriority,DBStatus expectResult,bool isCompensatedSyncOnly=false)113 void BlockPrioritySync(const Query &query, RelationalStoreDelegate *delegate, bool isPriority, DBStatus expectResult,
114     bool isCompensatedSyncOnly = false)
115 {
116     std::mutex dataMutex;
117     std::condition_variable cv;
118     bool finish = false;
119     std::map<std::string, SyncProcess> last;
120     auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
121         for (const auto &item: process) {
122             if (item.second.process == DistributedDB::FINISHED) {
123                 {
124                     std::lock_guard<std::mutex> autoLock(dataMutex);
125                     finish = true;
126                 }
127                 cv.notify_one();
128             }
129         }
130         last = process;
131     };
132     CloudSyncOption option;
133     PrepareOption(option, query, isPriority, isCompensatedSyncOnly);
134     ASSERT_EQ(delegate->Sync(option, callback), expectResult);
135     if (expectResult == OK) {
136         std::unique_lock<std::mutex> uniqueLock(dataMutex);
137         cv.wait(uniqueLock, [&finish]() {
138             return finish;
139         });
140     }
141     lastProcess_ = last;
142 }
143 
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)144 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
145 {
146     if (count != 1) {
147         return 0;
148     }
149     auto expectCount = reinterpret_cast<int64_t>(data);
150     EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
151     return 0;
152 }
153 
CheckUserTableResult(sqlite3 * & db,const std::string & tableName,int64_t expectCount)154 void CheckUserTableResult(sqlite3 *&db, const std::string &tableName, int64_t expectCount)
155 {
156     string query = "select count(*) from " + tableName + ";";
157     EXPECT_EQ(sqlite3_exec(db, query.c_str(), QueryCountCallback,
158         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
159 }
160 
161 class DistributedDBCloudCheckSyncTest : public testing::Test {
162 public:
163     static void SetUpTestCase();
164     static void TearDownTestCase();
165     void SetUp() override;
166     void TearDown() override;
167 protected:
168     void InitTestDir();
169     DataBaseSchema GetSchema();
170     void CloseDb();
171     void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
172     void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull);
173     void InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count, int64_t photoSize,
174         bool assetIsNull);
175     void DeleteUserTableRecord(int64_t id);
176     void DeleteUserTableRecord(int64_t begin, int64_t end);
177     void DeleteCloudTableRecord(int64_t gid);
178     void CheckCloudTableCount(const std::string &tableName, int64_t expectCount);
179     bool CheckSyncCount(const Info actualInfo, const Info expectInfo);
180     bool CheckSyncProcess(std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess,
181         vector<SyncProcess> &expectSyncProcessV);
182     void PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
183         RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
184         bool isCheckProcess);
185     void DeleteCloudDBData(int64_t begin, int64_t count);
186     void SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count);
187     void SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count);
188     void InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync = false);
189     void CheckLocalCount(int64_t expectCount);
190     void CheckLogCleaned(int64_t expectCount);
191     void SyncDataStatusTest(bool isCompensatedSyncOnly);
192     std::string testDir_;
193     std::string storePath_;
194     sqlite3 *db_ = nullptr;
195     RelationalStoreDelegate *delegate_ = nullptr;
196     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
197     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
198     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
199     std::string tableName_ = "DistributedDBCloudCheckSyncTest";
200     std::string tableNameShared_ = "DistributedDBCloudCheckSyncTest_shared";
201     std::string tableWithoutPrimaryName_ = "NonPrimaryKeyTable";
202     std::string tableWithoutPrimaryNameShared_ = "NonPrimaryKeyTable_shared";
203     std::string lowerTableName_ = "distributeddbCloudCheckSyncTest";
204     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
205 };
206 
SetUpTestCase()207 void DistributedDBCloudCheckSyncTest::SetUpTestCase()
208 {
209     RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
210 }
211 
TearDownTestCase()212 void DistributedDBCloudCheckSyncTest::TearDownTestCase()
213 {}
214 
SetUp()215 void DistributedDBCloudCheckSyncTest::SetUp()
216 {
217     DistributedDBToolsUnitTest::PrintTestCaseInfo();
218     InitTestDir();
219     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
220         LOGE("rm test db files error.");
221     }
222     DistributedDBToolsUnitTest::PrintTestCaseInfo();
223     LOGD("Test dir is %s", testDir_.c_str());
224     db_ = RelationalTestUtils::CreateDataBase(storePath_);
225     ASSERT_NE(db_, nullptr);
226     CreateUserDBAndTable(db_);
227     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
228     RelationalStoreDelegate::Option option;
229     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
230     ASSERT_NE(delegate_, nullptr);
231     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
232     ASSERT_EQ(delegate_->CreateDistributedTable(tableWithoutPrimaryName_, CLOUD_COOPERATION), DBStatus::OK);
233     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
234     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
235     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
236     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
237     DataBaseSchema dataBaseSchema = GetSchema();
238     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
239     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
240     ASSERT_TRUE(communicatorAggregator_ != nullptr);
241     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
242 }
243 
TearDown()244 void DistributedDBCloudCheckSyncTest::TearDown()
245 {
246     virtualCloudDb_->ForkQuery(nullptr);
247     virtualCloudDb_->SetCloudError(false);
248     CloseDb();
249     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
250     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
251         LOGE("rm test db files error.");
252     }
253     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
254     communicatorAggregator_ = nullptr;
255     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
256 }
257 
InitTestDir()258 void DistributedDBCloudCheckSyncTest::InitTestDir()
259 {
260     if (!testDir_.empty()) {
261         return;
262     }
263     DistributedDBToolsUnitTest::TestDirInit(testDir_);
264     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
265     LOGI("The test db is:%s", testDir_.c_str());
266 }
267 
GetSchema()268 DataBaseSchema DistributedDBCloudCheckSyncTest::GetSchema()
269 {
270     DataBaseSchema schema;
271     TableSchema tableSchema;
272     tableSchema.name = tableName_;
273     tableSchema.sharedTableName = tableName_ + "_shared";
274     tableSchema.fields = {
275         {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
276         {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
277     };
278     TableSchema tableWithoutPrimaryKeySchema;
279     tableWithoutPrimaryKeySchema.name = tableWithoutPrimaryName_;
280     tableWithoutPrimaryKeySchema.sharedTableName = tableWithoutPrimaryNameShared_;
281     tableWithoutPrimaryKeySchema.fields = {
282         {"id", TYPE_INDEX<std::string>}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
283         {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
284     };
285     schema.tables.push_back(tableSchema);
286     schema.tables.push_back(tableWithoutPrimaryKeySchema);
287     return schema;
288 }
289 
CloseDb()290 void DistributedDBCloudCheckSyncTest::CloseDb()
291 {
292     virtualCloudDb_ = nullptr;
293     if (mgr_ != nullptr) {
294         EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
295         delegate_ = nullptr;
296         mgr_ = nullptr;
297     }
298 }
299 
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)300 void DistributedDBCloudCheckSyncTest::InsertUserTableRecord(const std::string &tableName,
301     int64_t recordCounts, int64_t begin)
302 {
303     ASSERT_NE(db_, nullptr);
304     for (int64_t i = begin; i < begin + recordCounts; ++i) {
305         string sql = "INSERT OR REPLACE INTO " + tableName
306             + " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local"
307             + std::to_string(i) + "', '155.10',  'text', '21');";
308         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
309     }
310 }
311 
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)312 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize,
313     bool assetIsNull)
314 {
315     InsertCloudTableRecord(tableName_, begin, count, photoSize, assetIsNull);
316 }
317 
InsertCloudTableRecord(const std::string & tableName,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)318 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count,
319     int64_t photoSize, bool assetIsNull)
320 {
321     std::vector<uint8_t> photo(photoSize, 'v');
322     std::vector<VBucket> record1;
323     std::vector<VBucket> extend1;
324     std::vector<VBucket> record2;
325     std::vector<VBucket> extend2;
326     Timestamp now = TimeHelper::GetSysCurrentTime();
327     for (int64_t i = begin; i < begin + count; ++i) {
328         VBucket data;
329         data.insert_or_assign("id", std::to_string(i));
330         data.insert_or_assign("name", "Cloud" + std::to_string(i));
331         data.insert_or_assign("height", 166.0); // 166.0 is random double value
332         data.insert_or_assign("married", false);
333         data.insert_or_assign("photo", photo);
334         data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
335         Asset asset = g_cloudAsset;
336         asset.name = asset.name + std::to_string(i);
337         assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
338         record1.push_back(data);
339         VBucket log;
340         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
341             now / CloudDbConstant::TEN_THOUSAND + i));
342         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
343             now / CloudDbConstant::TEN_THOUSAND + i));
344         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
345         extend1.push_back(log);
346 
347         std::vector<Asset> assets;
348         data.insert_or_assign("height", 180.3); // 180.3 is random double value
349         for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
350             asset.name = g_cloudAsset.name + std::to_string(j);
351             assets.push_back(asset);
352         }
353         data.erase("assert");
354         data.erase("married");
355         assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
356         record2.push_back(data);
357         extend2.push_back(log);
358     }
359     ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName, std::move(record1), extend1), DBStatus::OK);
360     std::this_thread::sleep_for(std::chrono::milliseconds(count));
361 }
362 
DeleteUserTableRecord(int64_t id)363 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t id)
364 {
365     ASSERT_NE(db_, nullptr);
366     string sql = "DELETE FROM " + tableName_ + " WHERE id ='" + std::to_string(id) + "';";
367     ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
368 }
369 
DeleteUserTableRecord(int64_t begin,int64_t end)370 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t begin, int64_t end)
371 {
372     ASSERT_NE(db_, nullptr);
373     std::string sql = "DELETE FROM " + tableName_ + " WHERE id IN (";
374     for (int64_t i = begin; i <= end; ++i) {
375         sql += "'" + std::to_string(i) + "',";
376     }
377     if (sql.back() == ',') {
378         sql.pop_back();
379     }
380     sql += ");";
381     ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
382 }
383 
DeleteCloudTableRecord(int64_t gid)384 void DistributedDBCloudCheckSyncTest::DeleteCloudTableRecord(int64_t gid)
385 {
386     VBucket idMap;
387     idMap.insert_or_assign("#_gid", std::to_string(gid));
388     ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
389 }
390 
CheckCloudTableCount(const std::string & tableName,int64_t expectCount)391 void DistributedDBCloudCheckSyncTest::CheckCloudTableCount(const std::string &tableName, int64_t expectCount)
392 {
393     VBucket extend;
394     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
395     int64_t realCount = 0;
396     std::vector<VBucket> data;
397     virtualCloudDb_->Query(tableName, extend, data);
398     for (size_t j = 0; j < data.size(); ++j) {
399         auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
400         if (entry != data[j].end() && std::get<bool>(entry->second)) {
401             continue;
402         }
403         realCount++;
404     }
405     EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
406 }
407 
CheckSyncCount(const Info actualInfo,const Info expectInfo)408 bool DistributedDBCloudCheckSyncTest::CheckSyncCount(const Info actualInfo, const Info expectInfo)
409 {
410     if (actualInfo.batchIndex != expectInfo.batchIndex) {
411         return false;
412     }
413     if (actualInfo.total != expectInfo.total) {
414         return false;
415     }
416     if (actualInfo.successCount != expectInfo.successCount) {
417         return false;
418     }
419     if (actualInfo.failCount != expectInfo.failCount) {
420         return false;
421     }
422     return true;
423 }
424 
CheckSyncProcess(std::vector<std::map<std::string,SyncProcess>> & actualSyncProcess,vector<SyncProcess> & expectSyncProcessV)425 bool DistributedDBCloudCheckSyncTest::CheckSyncProcess(
426     std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess, vector<SyncProcess> &expectSyncProcessV)
427 {
428     vector<map<string, SyncProcess>> expectSyncProcess;
429     for (auto syncProcess : expectSyncProcessV) {
430         map<string, SyncProcess> expectSyncProcessMap = {{"CLOUD", syncProcess}};
431         expectSyncProcess.emplace_back(expectSyncProcessMap);
432     }
433     for (int i = 0; i < (int) actualSyncProcess.size(); i++) {
434         map<string, SyncProcess> actualSyncProcessMap = actualSyncProcess[i];
435         map<string, SyncProcess> expectSyncProcessMap = expectSyncProcess[i];
436         for (auto &it : actualSyncProcessMap) {
437             string mapKey = it.first;
438             if (expectSyncProcessMap.find(mapKey) == expectSyncProcessMap.end()) {
439                 return false;
440             }
441             SyncProcess actualSyncProcess = it.second;
442             SyncProcess expectSyncProcess = expectSyncProcessMap.find(mapKey)->second;
443             for (const auto &itInner : actualSyncProcess.tableProcess) {
444                 string tableName = itInner.first;
445                 if (expectSyncProcess.tableProcess.find(tableName) == expectSyncProcess.tableProcess.end()) {
446                     return false;
447                 }
448                 TableProcessInfo actualTableProcessInfo = itInner.second;
449                 TableProcessInfo expectTableProcessInfo = expectSyncProcess.tableProcess.find(tableName)->second;
450                 if (!CheckSyncCount(actualTableProcessInfo.downLoadInfo, expectTableProcessInfo.downLoadInfo)) {
451                     return false;
452                 }
453                 if (!CheckSyncCount(actualTableProcessInfo.upLoadInfo, expectTableProcessInfo.upLoadInfo)) {
454                     return false;
455                 }
456             }
457         }
458     }
459     return true;
460 }
461 
PriorityAndNormalSync(const Query & normalQuery,const Query & priorityQuery,RelationalStoreDelegate * delegate,std::vector<std::map<std::string,SyncProcess>> & prioritySyncProcess,bool isCheckProcess)462 void DistributedDBCloudCheckSyncTest::PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
463     RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
464     bool isCheckProcess)
465 {
466     std::mutex dataMutex;
467     std::condition_variable cv;
468     bool normalFinish = false;
469     bool priorityFinish = false;
470     auto normalCallback = [&cv, &dataMutex, &normalFinish, &priorityFinish, &prioritySyncProcess, &isCheckProcess](
471         const std::map<std::string, SyncProcess> &process) {
472         for (const auto &item: process) {
473             if (item.second.process == DistributedDB::FINISHED) {
474                 normalFinish = true;
475                 if (isCheckProcess) {
476                     ASSERT_EQ(priorityFinish, true);
477                 }
478                 cv.notify_one();
479             }
480         }
481         prioritySyncProcess.emplace_back(process);
482     };
483     auto priorityCallback = [&cv, &priorityFinish, &prioritySyncProcess](
484         const std::map<std::string, SyncProcess> &process) {
485         for (const auto &item: process) {
486             if (item.second.process == DistributedDB::FINISHED) {
487                 priorityFinish = true;
488                 cv.notify_one();
489             }
490         }
491         prioritySyncProcess.emplace_back(process);
492     };
493     CloudSyncOption option;
494     PrepareOption(option, normalQuery, false);
495     virtualCloudDb_->SetBlockTime(500); // 500 ms
496     ASSERT_EQ(delegate->Sync(option, normalCallback), OK);
497     PrepareOption(option, priorityQuery, true);
498     std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50 ms
499     ASSERT_EQ(delegate->Sync(option, priorityCallback), OK);
500     std::unique_lock<std::mutex> uniqueLock(dataMutex);
501     cv.wait(uniqueLock, [&normalFinish]() {
502         return normalFinish;
503     });
504 }
505 
DeleteCloudDBData(int64_t begin,int64_t count)506 void DistributedDBCloudCheckSyncTest::DeleteCloudDBData(int64_t begin, int64_t count)
507 {
508     for (int64_t i = begin; i < begin + count; i++) {
509         VBucket idMap;
510         idMap.insert_or_assign("#_gid", std::to_string(i));
511         ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
512     }
513 }
514 
SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> & count)515 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count)
516 {
517     virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
518         count++;
519         if (count == 1) { // taskid1
520             std::this_thread::sleep_for(std::chrono::seconds(1));
521         }
522         if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
523             CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
524         }
525         if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
526             CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records after last sync
527         }
528         if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
529             CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
530         }
531     });
532 }
533 
SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> & count)534 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count)
535 {
536     virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
537         count++;
538         if (count == 1) { // taskid1
539             std::this_thread::sleep_for(std::chrono::seconds(1));
540         }
541         if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
542             CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
543         }
544         if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
545             CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
546         }
547         if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
548             CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
549         }
550     });
551 }
552 
InitLogicDeleteDataEnv(int64_t dataCount,bool prioritySync)553 void DistributedDBCloudCheckSyncTest::InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync)
554 {
555     // prepare data
556     InsertUserTableRecord(tableName_, dataCount);
557     // sync
558     Query query = Query::Select().FromTable({ tableName_ });
559     BlockSync(query, delegate_, g_actualDBStatus);
560     // delete cloud data
561     for (int i = 0; i < dataCount; ++i) {
562         DeleteCloudTableRecord(i);
563     }
564     // sync again
565     BlockSync(query, delegate_, g_actualDBStatus);
566 }
567 
CheckLocalCount(int64_t expectCount)568 void DistributedDBCloudCheckSyncTest::CheckLocalCount(int64_t expectCount)
569 {
570     // check local data
571     int dataCnt = -1;
572     std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
573     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
574         dataCnt = sqlite3_column_int(stmt, 0);
575         return E_OK;
576     });
577     EXPECT_EQ(dataCnt, expectCount);
578 }
579 
CheckLogCleaned(int64_t expectCount)580 void DistributedDBCloudCheckSyncTest::CheckLogCleaned(int64_t expectCount)
581 {
582     std::string sql1 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
583         " where device = 'cloud';";
584     EXPECT_EQ(sqlite3_exec(db_, sql1.c_str(), QueryCountCallback,
585         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
586     std::string sql2 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where cloud_gid "
587         " is not null and cloud_gid != '';";
588     EXPECT_EQ(sqlite3_exec(db_, sql2.c_str(), QueryCountCallback,
589         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
590     std::string sql3 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
591         " where flag & 0x02 != 0;";
592     EXPECT_EQ(sqlite3_exec(db_, sql3.c_str(), QueryCountCallback,
593         reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
594 }
595 
596 /**
597  * @tc.name: CloudSyncTest001
598  * @tc.desc: sync with device sync query
599  * @tc.type: FUNC
600  * @tc.require:
601  * @tc.author: zhangqiquan
602  */
603 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest001, TestSize.Level0)
604 {
605     // prepare data
606     const int actualCount = 10;
607     InsertUserTableRecord(tableName_, actualCount);
608     // sync twice
609     Query query = Query::Select().FromTable({ tableName_ });
610     BlockSync(query, delegate_, g_actualDBStatus);
611     BlockSync(query, delegate_, g_actualDBStatus);
612     // remove cloud data
613     delegate_->RemoveDeviceData("CLOUD", ClearMode::FLAG_AND_DATA);
614     // check local data
615     int dataCnt = -1;
616     std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anonc8f560be0c02(sqlite3_stmt *stmt) 617     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
618         dataCnt = sqlite3_column_int(stmt, 0);
619         return E_OK;
620     });
621     EXPECT_EQ(dataCnt, 0);
622 }
623 
624 /**
625  * @tc.name: CloudSyncTest002
626  * @tc.desc: sync with same data in one batch
627  * @tc.type: FUNC
628  * @tc.require:
629  * @tc.author: zhangqiquan
630  */
631 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest002, TestSize.Level0)
632 {
633     // prepare data
634     const int actualCount = 1;
635     InsertUserTableRecord(tableName_, actualCount);
636     // sync twice
637     Query query = Query::Select().FromTable({ tableName_ });
638     BlockSync(query, delegate_, g_actualDBStatus);
639     // cloud delete id=0 and insert id=0 but its gid is 1
640     // local delete id=0
641     DeleteCloudTableRecord(0); // cloud gid is 0
642     InsertCloudTableRecord(0, actualCount, 0, false); // 0 is id
643     DeleteUserTableRecord(0); // 0 is id
644     BlockSync(query, delegate_, g_actualDBStatus);
645     bool deleteStatus = true;
646     EXPECT_EQ(virtualCloudDb_->GetDataStatus("1", deleteStatus), OK);
647     EXPECT_EQ(deleteStatus, false);
648 }
649 
650 /**
651  * @tc.name: CloudSyncTest003
652  * @tc.desc: local data is delete before sync, then sync, cloud data will insert into local
653  * @tc.type: FUNC
654  * @tc.require:
655  * @tc.author: zhangshijie
656  */
657 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest003, TestSize.Level0)
658 {
659     // prepare data
660     const int actualCount = 1;
661     InsertUserTableRecord(tableName_, actualCount);
662 
663     InsertCloudTableRecord(0, actualCount, 0, false);
664     // delete local data
665     DeleteUserTableRecord(0);
666     Query query = Query::Select().FromTable({ tableName_ });
667     BlockSync(query, delegate_, g_actualDBStatus);
668 
669     // check local data, cloud date will insert into local
670     int dataCnt = -1;
671     std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anonc8f560be0d02(sqlite3_stmt *stmt) 672     RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
673         dataCnt = sqlite3_column_int(stmt, 0);
674         return E_OK;
675     });
676     EXPECT_EQ(dataCnt, actualCount);
677 }
678 
679 /**
680  * @tc.name: CloudSyncTest004
681  * @tc.desc: sync after insert failed
682  * @tc.type: FUNC
683  * @tc.require:
684  * @tc.author: zhangqiquan
685  */
686 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest004, TestSize.Level0)
687 {
688     // prepare data
689     const int actualCount = 1;
690     InsertUserTableRecord(tableName_, actualCount);
691     // sync twice
692     Query query = Query::Select().FromTable({ tableName_ });
693     LOGW("Block Sync");
694     virtualCloudDb_->SetInsertFailed(1);
695     BlockSync(query, delegate_, g_actualDBStatus);
696     // delete local data
697     DeleteUserTableRecord(0); // 0 is id
698     LOGW("Block Sync");
699     // sync again and this record with be synced to cloud
700     BlockSync(query, delegate_, g_actualDBStatus);
701     bool deleteStatus = true;
702     EXPECT_EQ(virtualCloudDb_->GetDataStatus("0", deleteStatus), OK);
703     EXPECT_EQ(deleteStatus, true);
704 }
705 
706 /**
707  * @tc.name: CloudSyncTest006
708  * @tc.desc: check redownload when common sync pasue.
709  * @tc.type: FUNC
710  * @tc.require:
711  * @tc.author: luoguo
712  */
713 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest006, TestSize.Level0)
714 {
715     /**
716      * @tc.steps:step1. init data and sync
717      * @tc.expected: step1. ok.
718      */
719     const int localCount = 120;  // 120 is count of local
720     const int cloudCount = 100;  // 100 is count of cloud
721     InsertUserTableRecord(tableName_, localCount, 0);
722     InsertUserTableRecord(tableWithoutPrimaryName_, cloudCount, 0);
723     InsertCloudTableRecord(tableWithoutPrimaryName_, 80, cloudCount, 0, false);
724 
725     /**
726      * @tc.steps:step2. common sync will pasue
727      * @tc.expected: step2. ok.
728      */
729     std::vector<std::string> tableNames = {tableName_, tableWithoutPrimaryName_};
730     Query normalQuery = Query::Select().FromTable({tableNames});
731     std::vector<std::string> idValue = {"0", "1", "2"};
732     Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
733     CloudSyncOption option;
734     CloudSyncOption priorityOption;
735     PrepareOption(option, normalQuery, false);
736     PrepareOption(priorityOption, priorityQuery, true);
737     bool isUpload = false;
738     uint32_t blockTime = 2000;
__anonc8f560be0e02(const std::string &tableName, VBucket &extend) 739     virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
740         if (isUpload == false) {
741             isUpload = true;
742             std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
743         }
744     });
745     bool isFinsh = false;
746     bool priorityFinish = false;
__anonc8f560be0f02(const std::map<std::string, SyncProcess> &process) 747     auto normalCallback = [&isFinsh, &priorityFinish](const std::map<std::string, SyncProcess> &process) {
748         for (const auto &item : process) {
749             if (item.second.process == DistributedDB::FINISHED) {
750                 isFinsh = true;
751                 ASSERT_EQ(priorityFinish, true);
752             }
753         }
754     };
755     ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
756 
757     /**
758      * @tc.steps:step3. wait common upload and make priority sync.
759      * @tc.expected: step3. ok.
760      */
761     while (isUpload == false) {
762         std::this_thread::sleep_for(std::chrono::milliseconds(50));
763     }
__anonc8f560be1002(const std::map<std::string, SyncProcess> &process) 764     auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
765         for (const auto &item : process) {
766             if (item.second.process == DistributedDB::FINISHED) {
767                 priorityFinish = true;
768             }
769         }
770     };
771     ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
772     while (isFinsh == false || priorityFinish == false) {
773         std::this_thread::sleep_for(std::chrono::milliseconds(50));
774     }
775 
776     /**
777      * @tc.steps:step4. wait common sync and make priority sync finish, check query Times.
778      * @tc.expected: step4. ok.
779      */
780     uint32_t times = virtualCloudDb_->GetQueryTimes(tableName_);
781     ASSERT_EQ(times, 3u);
782     virtualCloudDb_->ForkUpload(nullptr);
783 }
784 
785 /**
786  * @tc.name: CloudSyncTest007
787  * @tc.desc: check proess info when version conflict sync process.
788  * @tc.type: FUNC
789  * @tc.require:
790  * @tc.author: luoguo
791  */
792 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest007, TestSize.Level0)
793 {
794     /**
795      * @tc.steps:step1. init data and sync
796      * @tc.expected: step1. ok.
797      */
798     const int localCount = 60;  // 120 is count of local
799     InsertUserTableRecord(tableName_, localCount, 0);
800     Query query = Query::Select().FromTable({tableName_});
801     BlockSync(query, delegate_, g_actualDBStatus);
802 
803     /**
804      * @tc.steps:step2. delete 30 - 59 records in user table, and set callback func.
805      * @tc.expected: step2. ok.
806      */
807     DeleteUserTableRecord(30, 59);
808     bool isUpload = false;
__anonc8f560be1102(const std::string &tableName, VBucket &extend) 809     virtualCloudDb_->ForkUpload([&isUpload](const std::string &tableName, VBucket &extend) {
810         if (isUpload == false) {
811             isUpload = true;
812             std::this_thread::sleep_for(std::chrono::milliseconds(2000));
813         }
814     });
815     bool isFinsh = false;
816     std::map<std::string, TableProcessInfo> retSyncProcess;
__anonc8f560be1202(const std::map<std::string, SyncProcess> &process) 817     auto normalCallback = [&isFinsh, &retSyncProcess](const std::map<std::string, SyncProcess> &process) {
818         for (const auto &item : process) {
819             if (item.second.process == DistributedDB::FINISHED) {
820                 isFinsh = true;
821                 ASSERT_EQ(process.empty(), false);
822                 auto lastProcess = process.rbegin();
823                 retSyncProcess = lastProcess->second.tableProcess;
824             }
825         }
826     };
827 
828     /**
829      * @tc.steps:step3. sync.
830      * @tc.expected: step3. ok.
831      */
832     std::vector<std::string> tableNames = {tableName_};
833     Query normalQuery = Query::Select().FromTable({tableNames});
834     CloudSyncOption option;
835     PrepareOption(option, normalQuery, false);
836     ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
837 
838     /**
839      * @tc.steps:step4. wait upload processs and delete 30 record in cloud table.
840      * @tc.expected: step4. ok.
841      */
842     while (isUpload == false) {
843         std::this_thread::sleep_for(std::chrono::milliseconds(50));
844     }
845     DeleteCloudTableRecord(30);
846 
847     /**
848      * @tc.steps:step5. wait sync processs end and check data.
849      * @tc.expected: step5. ok.
850      */
851     while (isFinsh == false) {
852         std::this_thread::sleep_for(std::chrono::milliseconds(50));
853     }
854     ASSERT_EQ(retSyncProcess.empty(), false);
855     auto taskInfo = retSyncProcess.rbegin();
856     ASSERT_EQ(taskInfo->second.upLoadInfo.total, 30u);
857     virtualCloudDb_->ForkUpload(nullptr);
858 }
859 
860 /**
861  * @tc.name: CloudSyncTest009
862  * @tc.desc: reopen database and sync
863  * @tc.type: FUNC
864  * @tc.require:
865  * @tc.author: wangxiangdong
866  */
867 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest009, TestSize.Level0)
868 {
869     /**
870      * @tc.steps: step1. insert 1 record to user table
871      * @tc.expected: step1. OK.
872      */
873     const int actualCount = 1;
874     InsertUserTableRecord(tableName_, actualCount);
875     /**
876      * @tc.steps: step2. sync data to cloud
877      * @tc.expected: step2. OK.
878      */
879     Query query = Query::Select().FromTable({ tableName_ });
880     BlockSync(query, delegate_, g_actualDBStatus);
881     CheckCloudTableCount(tableName_, 1);
882     /**
883      * @tc.steps: step3. drop data table then close db
884      * @tc.expected: step3. OK.
885      */
886     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
887     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
888     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
889     delegate_ = nullptr;
890     /**
891      * @tc.steps: step4. recreate data table and reopen database
892      * @tc.expected: step4. OK.
893      */
894     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
895     RelationalStoreDelegate::Option option;
896     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
897     ASSERT_NE(delegate_, nullptr);
898     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
899     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
900     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
901     DataBaseSchema dataBaseSchema = GetSchema();
902     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
903     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
904     ASSERT_TRUE(communicatorAggregator_ != nullptr);
905     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
906     /**
907      * @tc.steps: step5. sync and cloud data should be deleted
908      * @tc.expected: step5. OK.
909      */
910     BlockSync(query, delegate_, g_actualDBStatus);
911     CheckCloudTableCount(tableName_, 0);
912 }
913 
914 /**
915  * @tc.name: CloudSyncTest010
916  * @tc.desc: reopen database, recreate table with less columns and sync
917  * @tc.type: FUNC
918  * @tc.require:
919  * @tc.author: wangxiangdong
920  */
921 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest010, TestSize.Level0)
922 {
923     /**
924      * @tc.steps: step1. insert 1 record to user table
925      * @tc.expected: step1. OK.
926      */
927     const int actualCount = 1;
928     InsertUserTableRecord(tableName_, actualCount);
929     /**
930      * @tc.steps: step2. sync data to cloud
931      * @tc.expected: step2. OK.
932      */
933     Query query = Query::Select().FromTable({ tableName_ });
934     BlockSync(query, delegate_, g_actualDBStatus);
935     CheckCloudTableCount(tableName_, 1);
936     /**
937      * @tc.steps: step3. drop data table then close db
938      * @tc.expected: step3. OK.
939      */
940     std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
941     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
942     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
943     delegate_ = nullptr;
944     /**
945      * @tc.steps: step4. recreate data table and reopen database
946      * @tc.expected: step4. OK.
947      */
948     std::string createSql = "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(id INT PRIMARY KEY);";
949     EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, createSql), DBStatus::OK);
950     RelationalStoreDelegate::Option option;
951     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
952     ASSERT_NE(delegate_, nullptr);
953     ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::SCHEMA_MISMATCH);
954     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
955     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
956     DataBaseSchema dataBaseSchema = GetSchema();
957     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
958     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
959     ASSERT_TRUE(communicatorAggregator_ != nullptr);
960     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
961     /**
962      * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
963      * @tc.expected: step5. OK.
964      */
965     BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
966     CheckCloudTableCount(tableName_, 1);
967 }
968 
969 /**
970  * @tc.name: CloudSyncObserverTest001
971  * @tc.desc: test cloud sync multi observer
972  * @tc.type: FUNC
973  * @tc.require:
974  * @tc.author: zhangshijie
975  */
976 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncObserverTest001, TestSize.Level0)
977 {
978     // prepare data
979     const int actualCount = 10;
980     InsertUserTableRecord(tableName_, actualCount);
981 
982     /**
983      * @tc.steps:step1. open two delegate with two observer.
984      * @tc.expected: step1. ok.
985      */
986     RelationalStoreDelegate::Option option;
987     auto observer1 = new (std::nothrow) RelationalStoreObserverUnitTest();
988     ASSERT_NE(observer1, nullptr);
989     option.observer = observer1;
990     RelationalStoreDelegate *delegate1 = nullptr;
991     EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
992     ASSERT_NE(delegate1, nullptr);
993 
994     auto observer2 = new (std::nothrow) RelationalStoreObserverUnitTest();
995     ASSERT_NE(observer2, nullptr);
996     option.observer = observer2;
997     RelationalStoreDelegate *delegate2 = nullptr;
998     EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate2), DBStatus::OK);
999     ASSERT_NE(delegate2, nullptr);
1000 
1001     /**
1002      * @tc.steps:step2. insert 1-10 cloud data, start.
1003      * @tc.expected: step2. ok.
1004      */
1005     InsertCloudTableRecord(0, actualCount, actualCount, false);
1006     Query query = Query::Select().FromTable({ tableName_ });
1007     BlockSync(query, delegate_, g_actualDBStatus);
1008 
1009     /**
1010      * @tc.steps:step3. check observer.
1011      * @tc.expected: step3. ok.
1012      */
1013     EXPECT_EQ(observer1->GetCloudCallCount(), 1u);
1014     EXPECT_EQ(observer2->GetCloudCallCount(), 1u);
1015 
1016     /**
1017      * @tc.steps:step4. insert 11-20 cloud data, start.
1018      * @tc.expected: step4. ok.
1019      */
1020     delegate2->UnRegisterObserver();
1021     observer2->ResetCloudSyncToZero();
1022     int64_t begin = 11;
1023     InsertCloudTableRecord(begin, actualCount, actualCount, false);
1024     BlockSync(query, delegate_, g_actualDBStatus);
1025 
1026     /**
1027      * @tc.steps:step5. check observer.
1028      * @tc.expected: step5. ok.
1029      */
1030     EXPECT_EQ(observer1->GetCloudCallCount(), 2u); // 2 is observer1 triggered times
1031     EXPECT_EQ(observer2->GetCloudCallCount(), 0u);
1032 
1033     delete observer1;
1034     observer1 = nullptr;
1035     EXPECT_EQ(mgr_->CloseStore(delegate1), DBStatus::OK);
1036 
1037     delete observer2;
1038     observer2 = nullptr;
1039     EXPECT_EQ(mgr_->CloseStore(delegate2), DBStatus::OK);
1040 }
1041 
1042 /**
1043  * @tc.name: CloudPrioritySyncTest001
1044  * @tc.desc: use priority sync interface when query in or from table
1045  * @tc.type: FUNC
1046  * @tc.require:
1047  * @tc.author: chenchaohao
1048  */
1049 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest001, TestSize.Level0)
1050 {
1051     /**
1052      * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1053      * @tc.expected: step1. ok.
1054      */
1055     const int actualCount = 10; // 10 is count of records
1056     InsertUserTableRecord(tableName_, actualCount);
1057     std::vector<std::string> idValue = {"0", "1", "2"};
1058     Query query = Query::Select().From(tableName_).In("id", idValue);
1059 
1060     /**
1061      * @tc.steps:step2. check ParserQueryNodes
1062      * @tc.expected: step2. ok.
1063      */
__anonc8f560be1302(const std::string &tableName, VBucket &extend) 1064     virtualCloudDb_->ForkQuery([this, &idValue](const std::string &tableName, VBucket &extend) {
1065         EXPECT_EQ(tableName_, tableName);
1066         if (extend.find(CloudDbConstant::QUERY_FIELD) == extend.end()) {
1067             return;
1068         }
1069         Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1070         DBStatus status = OK;
1071         auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1072         EXPECT_EQ(status, OK);
1073         ASSERT_EQ(queryNodes.size(), 1u);
1074         EXPECT_EQ(queryNodes[0].type, QueryNodeType::IN);
1075         EXPECT_EQ(queryNodes[0].fieldName, "id");
1076         ASSERT_EQ(queryNodes[0].fieldValue.size(), idValue.size());
1077         for (size_t i = 0u; i < idValue.size(); i++) {
1078             std::string val = std::get<std::string>(queryNodes[0].fieldValue[i]);
1079             EXPECT_EQ(val, idValue[i]);
1080         }
1081     });
1082     BlockPrioritySync(query, delegate_, true, OK);
1083     virtualCloudDb_->ForkQuery(nullptr);
1084     CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1085 
1086     /**
1087      * @tc.steps:step3. use priority sync interface but not priority.
1088      * @tc.expected: step3. ok.
1089      */
1090     query = Query::Select().FromTable({ tableName_ });
1091     BlockPrioritySync(query, delegate_, false, OK);
1092     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1093 
1094     /**
1095      * @tc.steps:step4. insert user table record and query from table, then priority sync.
1096      * @tc.expected: step4. ok.
1097      */
1098     InsertUserTableRecord(tableName_, actualCount, actualCount);
1099     BlockPrioritySync(query, delegate_, true, OK);
1100     CheckCloudTableCount(tableName_, 20); // 20 is count of cloud records
1101 }
1102 
1103 
1104 /**
1105  * @tc.name: CloudPrioritySyncTest002
1106  * @tc.desc: priority sync in some abnormal query situations
1107  * @tc.type: FUNC
1108  * @tc.require:
1109  * @tc.author: chenchaohao
1110  */
1111 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest002, TestSize.Level0)
1112 {
1113     /**
1114      * @tc.steps:step1. insert user table record.
1115      * @tc.expected: step1. ok.
1116      */
1117     const int actualCount = 1; // 1 is count of records
1118     InsertUserTableRecord(tableName_, actualCount);
1119 
1120     /**
1121      * @tc.steps:step2. query select tablename then priority sync.
1122      * @tc.expected: step2. invalid.
1123      */
1124     Query query = Query::Select(tableName_);
1125     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1126     CheckCloudTableCount(tableName_, 0);
1127 
1128     /**
1129      * @tc.steps:step3. query select without from then priority sync.
1130      * @tc.expected: step3. invalid.
1131      */
1132     query = Query::Select();
1133     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1134     CheckCloudTableCount(tableName_, 0);
1135 
1136     /**
1137      * @tc.steps:step4. query select and from without in then priority sync.
1138      * @tc.expected: step4. invalid.
1139      */
1140     query = Query::Select().From(tableName_);
1141     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1142     CheckCloudTableCount(tableName_, 0);
1143 
1144     /**
1145      * @tc.steps:step5. query select and fromtable then priority sync.
1146      * @tc.expected: step5. not support.
1147      */
1148     query = Query::Select().From(tableName_).FromTable({tableName_});
1149     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1150     CheckCloudTableCount(tableName_, 0);
1151 
1152     /**
1153      * @tc.steps:step6. query select and from with other predicates then priority sync.
1154      * @tc.expected: step6. not support.
1155      */
1156     query = Query::Select().From(tableName_).IsNotNull("id");
1157     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1158     CheckCloudTableCount(tableName_, 0);
1159 
1160     /**
1161      * @tc.steps:step7. query select and from with in and other predicates then priority sync.
1162      * @tc.expected: step7 not support.
1163      */
1164     std::vector<std::string> idValue = {"0"};
1165     query = Query::Select().From(tableName_).IsNotNull("id").In("id", idValue);
1166     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1167     CheckCloudTableCount(tableName_, 0);
1168 
1169     /**
1170      * @tc.steps:step8. query select and from with in non-primary key then priority sync.
1171      * @tc.expected: step8. not support.
1172      */
1173     std::vector<std::string> heightValue = {"155.10"};
1174     query = Query::Select().From(tableName_).In("height", heightValue);
1175     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1176     CheckCloudTableCount(tableName_, 0);
1177 
1178     /**
1179      * @tc.steps:step9. query in count greater than 100.
1180      * @tc.expected: step9. over max limits.
1181      */
1182     idValue.resize(101); // 101 > 100
1183     query = Query::Select().From(tableName_).In("id", idValue);
1184     BlockPrioritySync(query, delegate_, true, OVER_MAX_LIMITS);
1185     CheckCloudTableCount(tableName_, 0);
1186 }
1187 
1188 /**
1189  * @tc.name: CloudPrioritySyncTest003
1190  * @tc.desc: priority sync when normal syncing
1191  * @tc.type: FUNC
1192  * @tc.require:
1193  * @tc.author: chenchaohao
1194  */
1195 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest003, TestSize.Level0)
1196 {
1197     /**
1198      * @tc.steps:step1. insert user table record.
1199      * @tc.expected: step1. ok.
1200      */
1201     const int actualCount = 10; // 10 is count of records
1202     InsertUserTableRecord(tableName_, actualCount);
1203 
1204     /**
1205      * @tc.steps:step2. begin normal sync and priority sync.
1206      * @tc.expected: step2. ok.
1207      */
1208     Query normalQuery = Query::Select().FromTable({tableName_});
1209     std::vector<std::string> idValue = {"0", "1", "2"};
1210     Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
1211     std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
1212     PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, true);
1213     EXPECT_EQ(virtualCloudDb_->GetLockCount(), 2);
1214     virtualCloudDb_->Reset();
1215     EXPECT_EQ(virtualCloudDb_->GetLockCount(), 0);
1216     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1217 }
1218 
1219 /**
1220  * @tc.name: CloudPrioritySyncTest004
1221  * @tc.desc: non-primarykey table priority sync
1222  * @tc.type: FUNC
1223  * @tc.require:
1224  * @tc.author: chenchaohao
1225  */
1226 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest004, TestSize.Level0)
1227 {
1228     /**
1229      * @tc.steps:step1. insert user non-primarykey table record.
1230      * @tc.expected: step1. ok.
1231      */
1232     const int actualCount = 10; // 10 is count of records
1233     InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1234 
1235     /**
1236      * @tc.steps:step2. begin priority sync.
1237      * @tc.expected: step2. not support.
1238      */
1239     std::vector<std::string> idValue = {"0", "1", "2"};
1240     Query query = Query::Select().From(tableWithoutPrimaryName_).In("id", idValue);
1241     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1242     CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1243 
1244     /**
1245      * @tc.steps:step3. begin priority sync when in rowid.
1246      * @tc.expected: step3. invalid.
1247      */
1248     std::vector<int64_t> rowidValue = {0, 1, 2}; // 0,1,2 are rowid value
1249     query = Query::Select().From(tableWithoutPrimaryName_).In("rowid", rowidValue);
1250     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1251     CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1252 }
1253 
1254 /**
1255  * @tc.name: CloudPrioritySyncTest005
1256  * @tc.desc: priority sync but don't have records
1257  * @tc.type: FUNC
1258  * @tc.require:
1259  * @tc.author: chenchaohao
1260  */
1261 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest005, TestSize.Level0)
1262 {
1263     /**
1264      * @tc.steps:step1. insert user non-primarykey table record.
1265      * @tc.expected: step1. ok.
1266      */
1267     const int actualCount = 10; // 10 is count of records
1268     InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1269 
1270     /**
1271      * @tc.steps:step2. begin DistributedDBCloudCheckSyncTest priority sync and check records.
1272      * @tc.expected: step2. ok.
1273      */
1274     std::vector<std::string> idValue = {"0", "1", "2"};
1275     Query query = Query::Select().From(tableName_).In("id", idValue);
1276     BlockPrioritySync(query, delegate_, true, OK);
1277     CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1278     CheckCloudTableCount(tableName_, 0);
1279 }
1280 
1281 /**
1282  * @tc.name: CloudPrioritySyncTest006
1283  * @tc.desc: priority sync tasks greater than limit
1284  * @tc.type: FUNC
1285  * @tc.require:
1286  * @tc.author: chenchaohao
1287  */
1288 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest006, TestSize.Level0)
1289 {
1290     /**
1291      * @tc.steps:step1. insert user table record.
1292      * @tc.expected: step1. ok.
1293      */
1294     const int actualCount = 10; // 10 is count of records
1295     InsertUserTableRecord(tableName_, actualCount);
1296 
1297     /**
1298      * @tc.steps:step2. begin 32 priority sync tasks and then begin 1 priority sync task.
1299      * @tc.expected: step2. ok.
1300      */
1301     std::vector<std::string> idValue = {"0", "1", "2"};
1302     Query query = Query::Select().From(tableName_).In("id", idValue);
1303     std::mutex dataMutex;
1304     std::condition_variable cv;
1305     std::mutex callbackMutex;
1306     std::condition_variable callbackCv;
1307     bool finish = false;
1308     size_t finishCount = 0u;
__anonc8f560be1402(const std::string &tableName, VBucket &extend) 1309     virtualCloudDb_->ForkQuery([&cv, &finish, &dataMutex](const std::string &tableName, VBucket &extend) {
1310         std::unique_lock<std::mutex> uniqueLock(dataMutex);
1311         cv.wait(uniqueLock, [&finish]() {
1312             return finish;
1313         });
1314     });
__anonc8f560be1602(const std::map<std::string, SyncProcess> &process) 1315     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1316         for (const auto &item: process) {
1317             if (item.second.process == DistributedDB::FINISHED) {
1318                 {
1319                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1320                     finishCount++;
1321                 }
1322                 callbackCv.notify_one();
1323             }
1324         }
1325     };
1326     CloudSyncOption option;
1327     PrepareOption(option, query, true);
1328     for (int i = 0; i < 32; i++) { // 32 is count of sync tasks
1329         ASSERT_EQ(delegate_->Sync(option, callback), OK);
1330     }
1331     ASSERT_EQ(delegate_->Sync(option, nullptr), BUSY);
1332     {
1333         std::lock_guard<std::mutex> autoLock(dataMutex);
1334         finish = true;
1335     }
1336     cv.notify_all();
1337     virtualCloudDb_->ForkQuery(nullptr);
1338     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonc8f560be1702() 1339     callbackCv.wait(callbackLock, [&finishCount]() {
1340         return (finishCount == 32u); // 32 is count of finished sync tasks
1341     });
1342     CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1343 }
1344 
1345 /**
1346  * @tc.name: CloudPrioritySyncTest007
1347  * @tc.desc: priority normal priority normal when different query
1348  * @tc.type: FUNC
1349  * @tc.require:
1350  * @tc.author: chenchaohao
1351  */
1352 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest007, TestSize.Level0)
1353 {
1354     /**
1355      * @tc.steps:step1. insert user table record.
1356      * @tc.expected: step1. ok.
1357      */
1358     const int actualCount = 10; // 10 is count of records
1359     InsertUserTableRecord(tableName_, actualCount);
1360 
1361     /**
1362      * @tc.steps:step2. set callback to check during sync.
1363      * @tc.expected: step2. ok.
1364      */
1365     std::atomic<int> count = 0;
1366     SetForkQueryForCloudPrioritySyncTest007(count);
1367 
1368     /**
1369      * @tc.steps:step3. perform priority normal priority normal sync.
1370      * @tc.expected: step3. ok.
1371      */
1372     std::vector<std::string> idValue = {"0"};
1373     Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1374     CloudSyncOption option;
1375     PrepareOption(option, priorytyQuery, true);
1376     option.lockAction = static_cast<LockAction>(0xff); // lock all
1377     std::mutex callbackMutex;
1378     std::condition_variable callbackCv;
1379     size_t finishCount = 0u;
__anonc8f560be1802(const std::map<std::string, SyncProcess> &process) 1380     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1381         for (const auto &item: process) {
1382             if (item.second.process == DistributedDB::FINISHED) {
1383                 {
1384                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1385                     finishCount++;
1386                 }
1387                 callbackCv.notify_one();
1388             }
1389         }
1390     };
1391     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1392     Query normalQuery = Query::Select().FromTable({tableName_});
1393     PrepareOption(option, normalQuery, false);
1394     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1395     idValue = {"1"};
1396     priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1397     PrepareOption(option, priorytyQuery, true);
1398     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1399     PrepareOption(option, normalQuery, false);
1400     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1401     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonc8f560be1902() 1402     callbackCv.wait(callbackLock, [&finishCount]() {
1403         return (finishCount == 4u); // 4 is count of finished sync tasks
1404     });
1405     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1406 }
1407 
1408 /**
1409  * @tc.name: CloudPrioritySyncTest008
1410  * @tc.desc: priority normal priority normal when different query
1411  * @tc.type: FUNC
1412  * @tc.require:
1413  * @tc.author: chenchaohao
1414  */
1415 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest008, TestSize.Level0)
1416 {
1417     /**
1418      * @tc.steps:step1. insert user table record.
1419      * @tc.expected: step1. ok.
1420      */
1421     const int actualCount = 10; // 10 is count of records
1422     InsertUserTableRecord(tableName_, actualCount);
1423 
1424     /**
1425      * @tc.steps:step2. set callback to check during sync.
1426      * @tc.expected: step2. ok.
1427      */
1428     std::atomic<int> count = 0;
1429     SetForkQueryForCloudPrioritySyncTest008(count);
1430 
1431     /**
1432      * @tc.steps:step3. perform priority normal priority normal sync.
1433      * @tc.expected: step3. ok.
1434      */
1435     std::vector<std::string> idValue = {"0"};
1436     Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1437     CloudSyncOption option;
1438     option.lockAction = static_cast<LockAction>(0xff); // lock all
1439     PrepareOption(option, priorytyQuery, true);
1440     std::mutex callbackMutex;
1441     std::condition_variable callbackCv;
1442     size_t finishCount = 0u;
__anonc8f560be1a02(const std::map<std::string, SyncProcess> &process) 1443     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1444         for (const auto &item: process) {
1445             if (item.second.process == DistributedDB::FINISHED) {
1446                 {
1447                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1448                     finishCount++;
1449                 }
1450                 callbackCv.notify_one();
1451             }
1452         }
1453     };
1454     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1455     Query normalQuery = Query::Select().FromTable({tableName_});
1456     PrepareOption(option, normalQuery, false);
1457     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1458     priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1459     PrepareOption(option, priorytyQuery, true);
1460     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1461     PrepareOption(option, normalQuery, false);
1462     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1463     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonc8f560be1b02() 1464     callbackCv.wait(callbackLock, [&finishCount]() {
1465         return (finishCount == 4u); // 4 is count of finished sync tasks
1466     });
1467     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1468 }
1469 
1470 /**
1471  * @tc.name: CloudPrioritySyncTest009
1472  * @tc.desc: use priority sync interface when query equal to from table
1473  * @tc.type: FUNC
1474  * @tc.require:
1475  * @tc.author: zhangqiquan
1476  */
1477 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest009, TestSize.Level0)
1478 {
1479     /**
1480      * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1481      * @tc.expected: step1. ok.
1482      */
1483     const int actualCount = 5; // 5 is count of records
1484     InsertUserTableRecord(tableName_, actualCount);
1485     Query query = Query::Select().From(tableName_).BeginGroup().EqualTo("id", "0").Or().EqualTo("id", "1").EndGroup();
1486 
1487     /**
1488      * @tc.steps:step2. check ParserQueryNodes
1489      * @tc.expected: step2. ok.
1490      */
__anonc8f560be1c02(const std::string &tableName, VBucket &extend) 1491     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
1492         EXPECT_EQ(tableName_, tableName);
1493         Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1494         DBStatus status = OK;
1495         auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1496         EXPECT_EQ(status, OK);
1497         ASSERT_EQ(queryNodes.size(), 5u); // 5 is query nodes count
1498     });
1499     BlockPrioritySync(query, delegate_, true, OK);
1500     virtualCloudDb_->ForkQuery(nullptr);
1501     CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records
1502 }
1503 
1504 /**
1505  * @tc.name: CloudPrioritySyncTest010
1506  * @tc.desc: priority sync after cloud delete
1507  * @tc.type: FUNC
1508  * @tc.require:
1509  * @tc.author: chenchaohao
1510  */
1511 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest010, TestSize.Level0)
1512 {
1513     /**
1514      * @tc.steps:step1. insert user table record.
1515      * @tc.expected: step1. ok.
1516      */
1517     const int actualCount = 10; // 10 is count of records
1518     InsertUserTableRecord(tableName_, actualCount);
1519 
1520     /**
1521      * @tc.steps:step2. normal sync and then delete cloud records.
1522      * @tc.expected: step2. ok.
1523      */
1524     Query query = Query::Select().FromTable({tableName_});
1525     BlockSync(query, delegate_, g_actualDBStatus);
1526     CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after sync
1527     DeleteCloudDBData(0, 3); // delete 0 1 2 record in cloud
1528     CheckCloudTableCount(tableName_, 7); // 7 is count of cloud records after delete
1529     CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1530 
1531     /**
1532      * @tc.steps:step3. priory sync and set query then check user table records.
1533      * @tc.expected: step3. ok.
1534      */
1535     std::vector<std::string> idValue = {"3", "4", "5"};
1536     query = Query::Select().From(tableName_).In("id", idValue);
1537     BlockPrioritySync(query, delegate_, true, OK);
1538     CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records after sync
1539     idValue = {"0", "1", "2"};
1540     query = Query::Select().From(tableName_).In("id", idValue);
1541     BlockPrioritySync(query, delegate_, true, OK);
1542     CheckUserTableResult(db_, tableName_, 7); // 7 is count of user records after sync
1543 }
1544 
1545 /**
1546  * @tc.name: CloudPrioritySyncTest011
1547  * @tc.desc: priority sync after cloud insert
1548  * @tc.type: FUNC
1549  * @tc.require:
1550  * @tc.author: chenchaohao
1551  */
1552 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest011, TestSize.Level0)
1553 {
1554     /**
1555      * @tc.steps:step1. insert cloud table record.
1556      * @tc.expected: step1. ok.
1557      */
1558     const int actualCount = 10; // 10 is count of records
1559     InsertCloudTableRecord(0, actualCount, actualCount, false);
1560     std::vector<std::string> idValue = {"0", "1", "2"};
1561     Query query = Query::Select().From(tableName_).In("id", idValue);
1562     std::atomic<int> count = 0;
1563 
1564     /**
1565      * @tc.steps:step2. check user records when query.
1566      * @tc.expected: step1. ok.
1567      */
__anonc8f560be1d02(const std::string &, VBucket &) 1568     virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
1569         count++;
1570         if (count == 1) { // taskid1
1571             std::this_thread::sleep_for(std::chrono::seconds(1));
1572         }
1573         if (count == 2) { // taskid2
1574             CheckUserTableResult(db_, tableName_, 3); // 3 is count of user records after first sync
1575         }
1576     });
1577     CloudSyncOption option;
1578     PrepareOption(option, query, true);
1579     std::mutex callbackMutex;
1580     std::condition_variable callbackCv;
1581     size_t finishCount = 0u;
__anonc8f560be1e02(const std::map<std::string, SyncProcess> &process) 1582     auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1583         for (const auto &item: process) {
1584             if (item.second.process == DistributedDB::FINISHED) {
1585                 {
1586                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1587                     finishCount++;
1588                 }
1589                 callbackCv.notify_one();
1590             }
1591         }
1592     };
1593 
1594     /**
1595      * @tc.steps:step3. begin sync and check user record.
1596      * @tc.expected: step3. ok.
1597      */
1598     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1599     idValue = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
1600     query = Query::Select().From(tableName_).In("id", idValue);
1601     PrepareOption(option, query, true);
1602     ASSERT_EQ(delegate_->Sync(option, callback), OK);
1603     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonc8f560be1f02() 1604     callbackCv.wait(callbackLock, [&finishCount]() {
1605         return (finishCount == 2u); // 2 is count of finished sync tasks
1606     });
1607     CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1608 }
1609 
1610 /**
1611  * @tc.name: CloudPrioritySyncTest012
1612  * @tc.desc: priority or normal sync when waittime > 300s or < -1
1613  * @tc.type: FUNC
1614  * @tc.require:
1615  * @tc.author: chenchaohao
1616  */
1617 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest012, TestSize.Level0)
1618 {
1619     /**
1620      * @tc.steps:step1. insert cloud table record.
1621      * @tc.expected: step1. ok.
1622      */
1623     const int actualCount = 10; // 10 is count of records
1624     InsertCloudTableRecord(0, actualCount, actualCount, false);
1625     std::vector<std::string> idValue = {"0", "1", "2"};
1626     Query query = Query::Select().From(tableName_).In("id", idValue);
1627 
1628     /**
1629      * @tc.steps:step2. set waittime < -1 then begin sync.
1630      * @tc.expected: step2. invalid.
1631      */
1632     CloudSyncOption option;
1633     PrepareOption(option, query, true);
1634     option.waitTime = -2; // -2 < -1;
1635     ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1636     CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1637 
1638     /**
1639      * @tc.steps:step3. set waittime > 300s then begin sync.
1640      * @tc.expected: step3. invalid.
1641      */
1642 
1643     option.waitTime = 300001; // 300001 > 300s
1644     ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1645     CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1646 }
1647 
1648 /**
1649  * @tc.name: CloudPrioritySyncTest013
1650  * @tc.desc: priority sync in some abnormal composite pk query situations
1651  * @tc.type: FUNC
1652  * @tc.require:
1653  * @tc.author: chenchaohao
1654  */
1655 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest013, TestSize.Level0)
1656 {
1657     /**
1658      * @tc.steps:step1. insert user table record.
1659      * @tc.expected: step1. ok.
1660      */
1661     const int actualCount = 1; // 1 is count of records
1662     InsertUserTableRecord(tableName_, actualCount);
1663 
1664     /**
1665      * @tc.steps:step2. query only begingroup then priority sync.
1666      * @tc.expected: step2. invalid.
1667      */
1668     Query query = Query::Select().From(tableName_).BeginGroup();
1669     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1670     CheckCloudTableCount(tableName_, 0);
1671 
1672     /**
1673      * @tc.steps:step3. query only endgroup then priority sync.
1674      * @tc.expected: step3. invalid.
1675      */
1676     query = Query::Select().From(tableName_).EndGroup();
1677     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1678     CheckCloudTableCount(tableName_, 0);
1679 
1680     /**
1681      * @tc.steps:step4. query only begingroup and endgroup then priority sync.
1682      * @tc.expected: step4. invalid.
1683      */
1684     query = Query::Select().From(tableName_).BeginGroup().EndGroup();
1685     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1686     CheckCloudTableCount(tableName_, 0);
1687 
1688     /**
1689      * @tc.steps:step5. query and from table then priority sync.
1690      * @tc.expected: step5. invalid.
1691      */
1692     query = Query::Select().And().From(tableName_);
1693     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1694     CheckCloudTableCount(tableName_, 0);
1695 
1696     /**
1697      * @tc.steps:step6. query or from table then priority sync.
1698      * @tc.expected: step6. invalid.
1699      */
1700     query = Query::Select().Or().From(tableName_);
1701     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1702     CheckCloudTableCount(tableName_, 0);
1703 
1704     /**
1705      * @tc.steps:step7. query begingroup from table then priority sync.
1706      * @tc.expected: step7 invalid.
1707      */
1708     query = Query::Select().BeginGroup().From(tableName_);
1709     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1710     CheckCloudTableCount(tableName_, 0);
1711 
1712     /**
1713      * @tc.steps:step8. query endgroup from table then priority sync.
1714      * @tc.expected: step8 invalid.
1715      */
1716     query = Query::Select().EndGroup().From(tableName_);
1717     BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1718     CheckCloudTableCount(tableName_, 0);
1719 
1720     /**
1721      * @tc.steps:step9. query and in then priority sync.
1722      * @tc.expected: step9. invalid.
1723      */
1724     std::vector<std::string> idValue = {"0"};
1725     query = Query::Select().From(tableName_).And().In("id", idValue);
1726     BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1727     CheckCloudTableCount(tableName_, 0);
1728 
1729     /**
1730      * @tc.steps:step10. query when the table name does not exit then priority sync.
1731      * @tc.expected: step10. schema mismatch.
1732      */
1733     query = Query::Select().From("tableName").And().In("id", idValue);
1734     BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
1735     CheckCloudTableCount(tableName_, 0);
1736 
1737     /**
1738      * @tc.steps:step11. query when the table name does not exit then priority sync.
1739      * @tc.expected: step11. schema mismatch.
1740      */
1741     query = Query::Select().From("tableName").In("id", idValue);
1742     BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
1743     CheckCloudTableCount(tableName_, 0);
1744 
1745     /**
1746      * @tc.steps:step12. query when the table name does not exit then sync.
1747      * @tc.expected: step12. schema mismatch.
1748      */
1749     query = Query::Select().FromTable({"tableName"});
1750     BlockPrioritySync(query, delegate_, false, SCHEMA_MISMATCH);
1751     CheckCloudTableCount(tableName_, 0);
1752 }
1753 
1754 /**
1755  * @tc.name: CloudPrioritySyncTest016
1756  * @tc.desc: priority sync when normal syncing
1757  * @tc.type: FUNC
1758  * @tc.require:
1759  * @tc.author: wangxiangdong
1760  */
1761 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest016, TestSize.Level0)
1762 {
1763     /**
1764      * @tc.steps:step1. insert cloud table record.
1765      * @tc.expected: step1. ok.
1766      */
1767     const int actualCount = 60; // 60 is count of records
1768     InsertCloudTableRecord(0, actualCount, 0, false);
1769     InsertUserTableRecord(tableName_, 10);
1770 
1771     /**
1772      * @tc.steps:step2. begin normal sync and priority sync.
1773      * @tc.expected: step2. ok.
1774      */
1775     Query normalQuery = Query::Select().FromTable({tableName_});
1776     std::vector<std::string> idValue = {"0", "1", "2"};
1777     Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
1778     std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
1779     PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, false);
1780     virtualCloudDb_->Reset();
1781     CheckCloudTableCount(tableName_, 60); // 10 is count of cloud records
1782     /**
1783      * @tc.steps:step3. check sync process result.
1784      * @tc.expected: step3. ok.
1785      */
1786     std::vector<DistributedDB::SyncProcess> expectSyncResult = {
1787                 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 60, 60, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
1788                 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 3, 3, 0, 0, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
1789                 {FINISHED, OK, {{tableName_, {FINISHED, {1, 3, 3, 0, 0, 0, 0}, {1, 3, 3, 0, 0, 3, 0}}}}},
1790                 {PROCESSING, OK, {{tableName_, {PROCESSING, {2, 63, 63, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
1791                 {FINISHED, OK, {{tableName_, {FINISHED, {2, 63, 63, 0, 50, 0, 0}, {1, 7, 7, 0, 0, 7, 0}}}}}
1792         };
1793     EXPECT_EQ(CheckSyncProcess(prioritySyncProcess, expectSyncResult), true);
1794 }
1795 
1796 /**
1797  * @tc.name: LogicDeleteSyncTest001
1798  * @tc.desc: sync with logic delete
1799  * @tc.type: FUNC
1800  * @tc.require:
1801  * @tc.author: zhangqiquan
1802  */
1803 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest001, TestSize.Level0)
1804 {
1805     bool logicDelete = true;
1806     auto data = static_cast<PragmaData>(&logicDelete);
1807     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1808     int actualCount = 10;
1809     InitLogicDeleteDataEnv(actualCount, true);
1810     CheckLocalCount(actualCount);
1811     std::string device = "";
1812     ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_AND_DATA), DBStatus::OK);
1813     CheckLocalCount(actualCount);
1814 }
1815 
1816 /**
1817  * @tc.name: LogicDeleteSyncTest002
1818  * @tc.desc: sync without logic delete
1819  * @tc.type: FUNC
1820  * @tc.require:
1821  * @tc.author: zhangqiquan
1822  */
1823 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest002, TestSize.Level0)
1824 {
1825     bool logicDelete = false;
1826     auto data = static_cast<PragmaData>(&logicDelete);
1827     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1828     int actualCount = 10;
1829     InitLogicDeleteDataEnv(actualCount);
1830     CheckLocalCount(0);
1831 }
1832 
1833 /**
1834  * @tc.name: LogicDeleteSyncTest003
1835  * @tc.desc: sync with logic delete and check observer
1836  * @tc.type: FUNC
1837  * @tc.require:
1838  * @tc.author: bty
1839  */
1840 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest003, TestSize.Level0)
1841 {
1842     /**
1843      * @tc.steps:step1. register observer.
1844      * @tc.expected: step1. ok.
1845      */
1846     RelationalStoreDelegate::Option option;
1847     auto observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1848     ASSERT_NE(observer, nullptr);
1849     observer->SetCallbackDetailsType(static_cast<uint32_t>(CallbackDetailsType::DETAILED));
1850     EXPECT_EQ(delegate_->RegisterObserver(observer), OK);
1851     ChangedData expectData;
1852     expectData.tableName = tableName_;
1853     expectData.type = ChangedDataType::DATA;
1854     expectData.field.push_back(std::string("id"));
1855     const int count = 10;
1856     for (int64_t i = 0; i < count; ++i) {
1857         expectData.primaryData[ChangeType::OP_DELETE].push_back({std::to_string(i)});
1858     }
1859     expectData.properties = { .isTrackedDataChange = true };
1860     observer->SetExpectedResult(expectData);
1861 
1862     /**
1863      * @tc.steps:step2. set tracker table
1864      * @tc.expected: step2. ok.
1865      */
1866     TrackerSchema trackerSchema;
1867     trackerSchema.tableName = tableName_;
1868     trackerSchema.trackerColNames = { "id" };
1869     EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
1870 
1871     /**
1872      * @tc.steps:step3. set logic delete and sync
1873      * @tc.expected: step3. ok.
1874      */
1875     bool logicDelete = true;
1876     auto data = static_cast<PragmaData>(&logicDelete);
1877     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1878     int actualCount = 10;
1879     InitLogicDeleteDataEnv(actualCount);
1880     CheckLocalCount(actualCount);
1881     EXPECT_EQ(observer->IsAllChangedDataEq(), true);
1882     observer->ClearChangedData();
1883 
1884     /**
1885      * @tc.steps:step4. unSetTrackerTable and sync
1886      * @tc.expected: step4. ok.
1887      */
1888     expectData.properties = { .isTrackedDataChange = false };
1889     observer->SetExpectedResult(expectData);
1890     trackerSchema.trackerColNames = {};
1891     EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
1892     InsertUserTableRecord(tableName_, actualCount);
1893     BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
1894     for (int i = 0; i < actualCount + actualCount; ++i) {
1895         DeleteCloudTableRecord(i);
1896     }
1897     BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
1898     EXPECT_EQ(observer->IsAllChangedDataEq(), true);
1899 
1900     EXPECT_EQ(delegate_->UnRegisterObserver(observer), OK);
1901     delete observer;
1902     observer = nullptr;
1903 }
1904 
1905 /**
1906  * @tc.name: LogicDeleteSyncTest004
1907  * @tc.desc: test removedevicedata in mode FLAG_ONLY when sync with logic delete
1908  * @tc.type: FUNC
1909  * @tc.require:
1910  * @tc.author: chenchaohao
1911  */
1912 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest004, TestSize.Level0)
1913 {
1914     /**
1915      * @tc.steps:step1. set logic delete
1916      * @tc.expected: step1. ok.
1917      */
1918     bool logicDelete = true;
1919     auto data = static_cast<PragmaData>(&logicDelete);
1920     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1921 
1922     /**
1923      * @tc.steps:step2. cloud delete data then sync, check removedevicedata
1924      * @tc.expected: step2. ok.
1925      */
1926     int actualCount = 10;
1927     InitLogicDeleteDataEnv(actualCount);
1928     CheckLocalCount(actualCount);
1929     std::string device = "";
1930     ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
1931     CheckLocalCount(actualCount);
1932     CheckLogCleaned(0);
1933 }
1934 
1935 /**
1936  * @tc.name: LogicDeleteSyncTest005
1937  * @tc.desc: test pragma when set cmd is not logic delete
1938  * @tc.type: FUNC
1939  * @tc.require:
1940  * @tc.author: chenchaohao
1941  */
1942 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest005, TestSize.Level0)
1943 {
1944     /**
1945      * @tc.steps:step1. set cmd is auto sync
1946      * @tc.expected: step1. ok.
1947      */
1948     bool logicDelete = true;
1949     auto data = static_cast<PragmaData>(&logicDelete);
1950     EXPECT_EQ(delegate_->Pragma(AUTO_SYNC, data), DBStatus::NOT_SUPPORT);
1951 }
1952 
1953 /**
1954  * @tc.name: LogicDeleteSyncTest006
1955  * @tc.desc: sync with logic delete after lock table.
1956  * @tc.type: FUNC
1957  * @tc.require:
1958  * @tc.author: liaoyonghuang
1959  */
1960 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest006, TestSize.Level0)
1961 {
1962     /**
1963      * @tc.steps:step1. set logic delete
1964      * @tc.expected: step1. ok.
1965      */
1966     bool logicDelete = true;
1967     auto data = static_cast<PragmaData>(&logicDelete);
1968     delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1969 
1970     /**
1971      * @tc.steps:step2. insert user table record and sync.
1972      * @tc.expected: step2. ok.
1973      */
1974     int dataCount = 10;
1975     InsertUserTableRecord(tableName_, dataCount);
1976     Query query = Query::Select().FromTable({ tableName_ });
1977     BlockSync(query, delegate_, g_actualDBStatus);
1978 
1979     /**
1980      * @tc.steps:step3. Lock log table, and delete data from cloud table.
1981      * @tc.expected: step3. ok.
1982      */
1983     std::vector<std::vector<uint8_t>> hashKey;
1984     CloudDBSyncUtilsTest::GetHashKey(tableName_, " 1=1 ", db_, hashKey);
1985     Lock(tableName_, hashKey, db_);
1986     for (int i = 0; i < dataCount; ++i) {
1987         DeleteCloudTableRecord(i);
1988     }
1989     /**
1990      * @tc.steps:step4. sync.
1991      * @tc.expected: step4. ok.
1992      */
1993     std::vector<DBStatus> actualDBStatus;
1994     BlockSync(query, delegate_, actualDBStatus);
1995     for (auto status : actualDBStatus) {
1996         EXPECT_EQ(status, OK);
1997     }
1998 }
1999 
2000 /**
2001  * @tc.name: LogicDeleteSyncTest008
2002  * @tc.desc: Test sync when data with flag 0x800 locally but there is updated data on the cloud.
2003  * @tc.type: FUNC
2004  * @tc.require:
2005  * @tc.author: liaoyonghuang
2006  */
2007 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest008, TestSize.Level0)
2008 {
2009     /**
2010      * @tc.steps:step1. Insert user table record with flag 0x800. Insert cloud table record.
2011      * @tc.expected: step1. ok.
2012      */
2013     int dataCount = 10;
2014     uint32_t logicDeleteCount = 4;
2015     InsertUserTableRecord(tableName_, dataCount);
2016     std::string sql = "update " + DBCommon::GetLogTableName(tableName_) +
2017         " set flag = flag | 0x800 where data_key <= " + std::to_string(logicDeleteCount);
2018     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2019     InsertCloudTableRecord(0, dataCount, 0, false);
2020     sql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where flag & 0x800=0x800";
2021     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2022         reinterpret_cast<void *>(logicDeleteCount), nullptr), SQLITE_OK);
2023     /**
2024      * @tc.steps:step2. Do sync.
2025      * @tc.expected: step2. ok.
2026      */
2027     Query query = Query::Select().FromTable({ tableName_ });
2028     BlockSync(query, delegate_, g_actualDBStatus);
2029     /**
2030      * @tc.steps:step3. Check data flag in local DB.
2031      * @tc.expected: step3. No data flag is 0x800.
2032      */
2033     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2034         reinterpret_cast<void *>(0), nullptr), SQLITE_OK);
2035 }
2036 
2037 /**
2038  * @tc.name: LockActionTest001
2039  * @tc.desc: InitCompensatedSyncTaskInfo and check lockAction.
2040  * @tc.type: FUNC
2041  * @tc.require:
2042  * @tc.author: wangxiangdong
2043  */
2044 HWTEST_F(DistributedDBCloudCheckSyncTest, LockActionTest001, TestSize.Level0)
2045 {
2046     /**
2047      * @tc.steps:step1. InitCompensatedSyncTaskInfo and check.
2048      * @tc.expected: step1. ok.
2049      */
2050     CloudSyncOption option;
2051     option.devices = { "CLOUD" };
2052     option.mode = SYNC_MODE_CLOUD_MERGE;
2053     option.query = Query::Select().FromTable({ tableName_ });
2054     option.waitTime = g_syncWaitTime;
2055     auto action = static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE)
2056                       | static_cast<uint32_t>(LockAction::DELETE);
2057     option.lockAction = static_cast<LockAction>(action);
2058     option.priorityTask = true;
2059     option.compensatedSyncOnly = true;
2060     const SyncProcessCallback onProcess;
2061     CloudSyncer::CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
2062     EXPECT_EQ(taskInfo.lockAction, option.lockAction);
2063 }
2064 
2065 /**
2066  * @tc.name: LogicCreateRepeatedTableNameTest001
2067  * @tc.desc: test create repeated table name with different cases
2068  * @tc.type: FUNC
2069  * @tc.require:DTS2023120705927
2070  * @tc.author: wangxiangdong
2071  */
2072 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicCreateRepeatedTableNameTest001, TestSize.Level0)
2073 {
2074     /**
2075      * @tc.steps:step1. CreateDistributedTable with same name but different cases.
2076      * @tc.expected: step1. operate successfully.
2077      */
2078     DBStatus createStatus = delegate_->CreateDistributedTable(lowerTableName_, CLOUD_COOPERATION);
2079     ASSERT_EQ(createStatus, DBStatus::OK);
2080 }
2081 
2082 /**
2083  * @tc.name: SaveCursorTest001
2084  * @tc.desc: test whether cloud cursor is saved when first sync
2085  * @tc.type: FUNC
2086  * @tc.require:
2087  * @tc.author: chenchaohao
2088  */
2089 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest001, TestSize.Level0)
2090 {
2091     /**
2092      * @tc.steps:step1. insert cloud records
2093      * @tc.expected: step1. OK
2094      */
2095     const int actualCount = 10;
2096     InsertCloudTableRecord(0, actualCount, 0, false);
2097 
2098     /**
2099      * @tc.steps:step2. check cursor when first sync
2100      * @tc.expected: step2. OK
2101      */
__anonc8f560be2002(const std::string &tableName, VBucket &extend) 2102     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2103         EXPECT_EQ(tableName_, tableName);
2104         auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2105         EXPECT_EQ(cursor, "0");
2106     });
2107     Query query = Query::Select().FromTable({ tableName_ });
2108     BlockSync(query, delegate_, g_actualDBStatus);
2109     CheckLocalCount(actualCount);
2110 }
2111 
2112 /**
2113  * @tc.name: SaveCursorTest002
2114  * @tc.desc: test whether cloud cursor is saved when first download failed
2115  * @tc.type: FUNC
2116  * @tc.require:
2117  * @tc.author: chenchaohao
2118  */
2119 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest002, TestSize.Level0)
2120 {
2121     /**
2122      * @tc.steps:step1. insert cloud records
2123      * @tc.expected: step1. OK
2124      */
2125     const int actualCount = 10;
2126     InsertCloudTableRecord(0, actualCount, 0, false);
2127 
2128     /**
2129      * @tc.steps:step2. set download failed
2130      * @tc.expected: step2. OK
2131      */
2132     virtualCloudDb_->SetCloudError(true);
2133     Query query = Query::Select().FromTable({ tableName_ });
2134     BlockPrioritySync(query, delegate_, false, OK);
2135     CheckLocalCount(0);
2136 
2137     /**
2138      * @tc.steps:step3. check cursor when query
2139      * @tc.expected: step3. OK
2140      */
2141     virtualCloudDb_->SetCloudError(false);
__anonc8f560be2102(const std::string &tableName, VBucket &extend) 2142     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2143         EXPECT_EQ(tableName_, tableName);
2144         auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2145         EXPECT_EQ(cursor, "0");
2146     });
2147     BlockSync(query, delegate_, g_actualDBStatus);
2148     CheckLocalCount(actualCount);
2149 }
2150 
2151 /**
2152  * @tc.name: SaveCursorTest003
2153  * @tc.desc: test whether cloud cursor is saved when first upload failed
2154  * @tc.type: FUNC
2155  * @tc.require:
2156  * @tc.author: chenchaohao
2157  */
2158 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest003, TestSize.Level0)
2159 {
2160     /**
2161      * @tc.steps:step1. insert local records
2162      * @tc.expected: step1. OK
2163      */
2164     const int actualCount = 10;
2165     InsertUserTableRecord(tableName_, actualCount);
2166 
2167     /**
2168      * @tc.steps:step2. set upload failed
2169      * @tc.expected: step2. OK
2170      */
2171     virtualCloudDb_->SetCloudError(true);
2172     Query query = Query::Select().FromTable({ tableName_ });
2173     BlockPrioritySync(query, delegate_, false, OK);
2174     CheckCloudTableCount(tableName_, 0);
2175 
2176     /**
2177      * @tc.steps:step3. check cursor when query
2178      * @tc.expected: step3. OK
2179      */
2180     virtualCloudDb_->SetCloudError(false);
__anonc8f560be2202(const std::string &tableName, VBucket &extend) 2181     virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2182         EXPECT_EQ(tableName_, tableName);
2183         auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2184         EXPECT_EQ(cursor, "0");
2185     });
2186     BlockSync(query, delegate_, g_actualDBStatus);
2187     CheckCloudTableCount(tableName_, actualCount);
2188 }
2189 
2190 /**
2191  * @tc.name: RangeQuerySyncTest001
2192  * @tc.desc: Test sync that has option parameter with range query.
2193  * @tc.type: FUNC
2194  * @tc.require:
2195  * @tc.author: chenchaohao
2196  */
2197 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest001, TestSize.Level0)
2198 {
2199     /**
2200      * @tc.steps:step1. insert user table record.
2201      * @tc.expected: step1. ok.
2202      */
2203     CloudSyncOption option;
2204     option.devices = { "CLOUD" };
2205     option.mode = SYNC_MODE_CLOUD_MERGE;
2206     option.waitTime = g_syncWaitTime;
2207     Query query = Query::Select().From(tableName_).Range({}, {});
2208     option.query = query;
2209 
2210     /**
2211      * @tc.steps:step2. test normal sync with range query.
2212      * @tc.expected: step2. not support.
2213      */
2214     option.priorityTask = false;
2215     ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2216 
2217     /**
2218      * @tc.steps:step3. test Priority sync with range query.
2219      * @tc.expected: step3. not support.
2220      */
2221     option.priorityTask = true;
2222     ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2223 }
2224 
2225 /*
2226  * @tc.name: RangeQuerySyncTest002
2227  * @tc.desc: Test sync that has not option parameter with range query.
2228  * @tc.type: FUNC
2229  * @tc.require:
2230  * @tc.author: mazhao
2231  */
2232 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest002, TestSize.Level1)
2233 {
2234     Query query = Query::Select().FromTable({ tableName_ }).Range({}, {});
2235     ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_FORCE_PULL, query, nullptr, g_syncWaitTime),
2236         DBStatus::NOT_SUPPORT);
2237 }
2238 
2239 /*
2240  * @tc.name: SameDataSync001
2241  * @tc.desc: Test query same data in one batch.
2242  * @tc.type: FUNC
2243  * @tc.require:
2244  * @tc.author: zqq
2245  */
2246 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync001, TestSize.Level0)
2247 {
2248     /**
2249      * @tc.steps:step1. insert cloud records, cloud has two batch id:0-4
2250      * @tc.expected: step1. OK
2251      */
2252     const int actualCount = 5;
2253     InsertCloudTableRecord(0, actualCount, 0, false);
2254     InsertCloudTableRecord(0, actualCount, 0, false);
2255     /**
2256      * @tc.steps:step2. call sync, local has one batch id:0-4
2257      * @tc.expected: step2. OK
2258      */
2259     Query query = Query::Select().FromTable({ tableName_ });
2260     BlockSync(query, delegate_, g_actualDBStatus);
2261     CheckLocalCount(actualCount);
2262 }
2263 
2264 /*
2265  * @tc.name: SameDataSync002
2266  * @tc.desc: Test sync when there are two data with the same primary key on the cloud.
2267  * @tc.type: FUNC
2268  * @tc.require:
2269  * @tc.author: liaoyonghuang
2270  */
2271 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync002, TestSize.Level1)
2272 {
2273     /**
2274      * @tc.steps:step1. insert local 1 record and sync to cloud.
2275      * @tc.expected: step1. OK
2276      */
2277     const int actualCount = 1;
2278     InsertUserTableRecord(tableName_, actualCount);
2279     Query query = Query::Select().FromTable({ tableName_ });
2280     BlockSync(query, delegate_, g_actualDBStatus);
2281 
2282     /**
2283      * @tc.steps:step2. insert 2 records with the same primary key.
2284      * @tc.expected: step2. OK
2285      */
2286     std::vector<VBucket> record;
2287     std::vector<VBucket> extend;
2288     Timestamp now = TimeHelper::GetSysCurrentTime();
2289     VBucket data;
2290     std::vector<uint8_t> photo(0, 'v');
2291     data.insert_or_assign("id", std::string("0"));
2292     data.insert_or_assign("name", std::string("Cloud"));
2293     data.insert_or_assign("height", 166.0); // 166.0 is random double value
2294     data.insert_or_assign("married", false);
2295     data.insert_or_assign("photo", photo);
2296     data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
2297     record.push_back(data);
2298     data.insert_or_assign("age", static_cast<int64_t>(14L)); // 14 is random age
2299     record.push_back(data);
2300     VBucket log;
2301     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
2302         now / CloudDbConstant::TEN_THOUSAND));
2303     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
2304         now / CloudDbConstant::TEN_THOUSAND));
2305     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
2306     log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("1"));
2307     extend.push_back(log);
2308     log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("2"));
2309     extend.push_back(log);
2310     ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend), DBStatus::OK);
2311 
2312     /**
2313      * @tc.steps:step3. sync from cloud and check record.
2314      * @tc.expected: step3. The record with age of 14 has been updated locally.
2315      */
2316     BlockSync(query, delegate_, g_actualDBStatus);
2317     std::string sql = "SELECT age FROM " + tableName_ + " where id=0;";
2318     int64_t actualAge = 0;
2319     int64_t expectAge = 14L;
__anonc8f560be2302(sqlite3_stmt *stmt) 2320     RelationalTestUtils::ExecSql(db_, sql, nullptr, [&actualAge](sqlite3_stmt *stmt) {
2321         actualAge = sqlite3_column_int(stmt, 0);
2322         return E_OK;
2323     });
2324     EXPECT_EQ(actualAge, expectAge);
2325 }
2326 
2327 /*
2328  * @tc.name: CreateDistributedTable001
2329  * @tc.desc: Test create distributed table when table not empty.
2330  * @tc.type: FUNC
2331  * @tc.require:
2332  * @tc.author: zqq
2333  */
2334 HWTEST_F(DistributedDBCloudCheckSyncTest, CreateDistributedTable001, TestSize.Level0)
2335 {
2336     const std::string table = "CreateDistributedTable001";
2337     const std::string createSQL =
2338         "CREATE TABLE IF NOT EXISTS " + table + "(" \
2339         "id TEXT PRIMARY KEY," \
2340         "name TEXT," \
2341         "height REAL ," \
2342         "photo BLOB," \
2343         "age INT);";
2344     ASSERT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
2345     int actualCount = 10;
2346     InsertUserTableRecord(table, actualCount);
2347     InsertCloudTableRecord(table, 0, actualCount, 0, true);
2348     ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
2349     DataBaseSchema dataBaseSchema = GetSchema();
2350     TableSchema schema = dataBaseSchema.tables.at(0);
2351     schema.name = table;
2352     schema.sharedTableName = "";
2353     dataBaseSchema.tables.push_back(schema);
2354     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
2355     /**
2356      * @tc.steps:step2. call sync, local has one batch id:0-4
2357      * @tc.expected: step2. OK
2358      */
2359     Query query = Query::Select().FromTable({ table });
2360     BlockSync(query, delegate_, g_actualDBStatus);
2361     CheckCloudTableCount(table, actualCount);
2362 }
2363 
2364 /*
2365  * @tc.name: CloseDbTest001
2366  * @tc.desc: Test process of db close during sync
2367  * @tc.type: FUNC
2368  * @tc.require:
2369  * @tc.author: bty
2370  */
2371 HWTEST_F(DistributedDBCloudCheckSyncTest, CloseDbTest001, TestSize.Level1)
2372 {
2373     /**
2374      * @tc.steps:step1. insert user table record.
2375      * @tc.expected: step1. ok.
2376      */
2377     const int actualCount = 10; // 10 is count of records
2378     InsertUserTableRecord(tableName_, actualCount);
2379 
2380     /**
2381      * @tc.steps:step2. wait for 2 seconds during the query to close the database.
2382      * @tc.expected: step2. ok.
2383      */
2384     std::mutex callMutex;
2385     int callCount = 0;
__anonc8f560be2402(const std::string &, VBucket &) 2386     virtualCloudDb_->ForkQuery([](const std::string &, VBucket &) {
2387         std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2388     });
2389     const auto callback = [&callCount, &callMutex](
__anonc8f560be2502( const std::map<std::string, SyncProcess> &) 2390         const std::map<std::string, SyncProcess> &) {
2391         {
2392             std::lock_guard<std::mutex> autoLock(callMutex);
2393             callCount++;
2394         }
2395     };
2396     Query query = Query::Select().FromTable({ tableName_ });
2397     ASSERT_EQ(delegate_->Sync({ "CLOUD" }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
2398     std::this_thread::sleep_for(std::chrono::seconds(1)); // block notify 1s
2399     EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
2400     delegate_ = nullptr;
2401     mgr_ = nullptr;
2402 
2403     /**
2404      * @tc.steps:step3. wait for 2 seconds to check the process call count.
2405      * @tc.expected: step3. ok.
2406      */
2407     std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2408     EXPECT_EQ(callCount, 0L);
2409 }
2410 
2411 /*
2412  * @tc.name: ConsistentFlagTest001
2413  * @tc.desc: Test the consistency flag of no asset table
2414  * @tc.type: FUNC
2415  * @tc.require:
2416  * @tc.author: bty
2417  */
2418 HWTEST_F(DistributedDBCloudCheckSyncTest, ConsistentFlagTest001, TestSize.Level1)
2419 {
2420     /**
2421      * @tc.steps:step1. init data and sync
2422      * @tc.expected: step1. ok.
2423      */
2424     const int localCount = 20; // 20 is count of local
2425     const int cloudCount = 10; // 10 is count of cloud
2426     InsertUserTableRecord(tableName_, localCount);
2427     InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2428     Query query = Query::Select().FromTable({ tableName_ });
2429     BlockSync(query, delegate_, g_actualDBStatus);
2430 
2431     /**
2432      * @tc.steps:step2. check the 0x20 bit of flag after sync
2433      * @tc.expected: step2. ok.
2434      */
2435     std::string querySql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
2436         " where flag&0x20=0;";
2437     EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2438         reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2439 
2440     /**
2441      * @tc.steps:step3. delete local data and check
2442      * @tc.expected: step3. ok.
2443      */
2444     std::string sql = "delete from " + tableName_ + " where id = '1';";
2445     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2446     EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2447         reinterpret_cast<void *>(localCount - 1), nullptr), SQLITE_OK);
2448 
2449     /**
2450      * @tc.steps:step4. check the 0x20 bit of flag after sync
2451      * @tc.expected: step4. ok.
2452      */
2453     BlockSync(query, delegate_, g_actualDBStatus);
2454     EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2455         reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2456 }
2457 
SyncDataStatusTest(bool isCompensatedSyncOnly)2458 void DistributedDBCloudCheckSyncTest::SyncDataStatusTest(bool isCompensatedSyncOnly)
2459 {
2460     /**
2461      * @tc.steps:step1. init data and sync
2462      * @tc.expected: step1. ok.
2463      */
2464     const int localCount = 20; // 20 is count of local
2465     const int cloudCount = 10; // 10 is count of cloud
2466     InsertUserTableRecord(tableName_, localCount);
2467     std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
2468     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2469     sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 2 where data_key in (2,12);";
2470     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2471     sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 3 where data_key in (3,13);";
2472     EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2473     std::this_thread::sleep_for(std::chrono::milliseconds(1));
2474     InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2475     Query query = Query::Select().FromTable({tableName_});
2476 
2477     /**
2478      * @tc.steps:step2. check count
2479      * @tc.expected: step2. ok.
2480      */
2481     int64_t syncCount = 2;
2482     BlockPrioritySync(query, delegate_, false, OK, isCompensatedSyncOnly);
2483     if (!isCompensatedSyncOnly) {
2484         std::this_thread::sleep_for(std::chrono::seconds(1)); // wait compensated sync finish
2485     }
2486     std::string preSql = "select count(*) from " + DBCommon::GetLogTableName(tableName_);
2487     std::string querySql = preSql + " where status=0 and data_key in (1,11) and cloud_gid !='';";
2488     CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2489     if (isCompensatedSyncOnly) {
2490         querySql = preSql + " where status=2 and data_key in (2,12) and cloud_gid ='';";
2491         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2492         querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid ='';";
2493         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2494         querySql = preSql + " where status=0 and cloud_gid ='';";
2495         int unSyncCount = 14; // 14 is the num of unSync data with status 0
2496         CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
2497     } else {
2498         // gid 12、13 are upload insert, lock to lock_change
2499         querySql = preSql + " where status=3 and data_key in (2,12) and cloud_gid !='';";
2500         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2501         querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid !='';";
2502         CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2503         querySql = preSql + " where status=0 and cloud_gid !='';";
2504         int unSyncCount = 16; // 16 is the num of sync finish
2505         CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
2506     }
2507 }
2508 
2509 /*
2510  * @tc.name: SyncDataStatusTest001
2511  * @tc.desc: Test the status after compensated sync the no asset table
2512  * @tc.type: FUNC
2513  * @tc.require:
2514  * @tc.author: bty
2515  */
2516 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest001, TestSize.Level1)
2517 {
2518     SyncDataStatusTest(true);
2519 }
2520 
2521 /*
2522  * @tc.name: SyncDataStatusTest002
2523  * @tc.desc: Test the status after normal sync the no asset table
2524  * @tc.type: FUNC
2525  * @tc.require:
2526  * @tc.author: bty
2527  */
2528 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest002, TestSize.Level1)
2529 {
2530     SyncDataStatusTest(false);
2531 }
2532 }
2533 #endif
2534