1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include <gtest/gtest.h>
17 #include "cloud/cloud_db_constant.h"
18 #include "cloud/cloud_db_types.h"
19 #include "cloud/cloud_sync_utils.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "cloud_syncer.h"
22 #include "db_common.h"
23 #include "distributeddb_data_generate_unit_test.h"
24 #include "log_print.h"
25 #include "relational_store_client.h"
26 #include "relational_store_delegate.h"
27 #include "relational_store_instance.h"
28 #include "relational_store_manager.h"
29 #include "relational_sync_able_storage.h"
30 #include "runtime_config.h"
31 #include "time_helper.h"
32 #include "virtual_asset_loader.h"
33 #include "virtual_cloud_data_translate.h"
34 #include "virtual_cloud_db.h"
35 #include "virtual_communicator_aggregator.h"
36
37 namespace {
38 using namespace testing::ext;
39 using namespace DistributedDB;
40 using namespace DistributedDBUnitTest;
41 const char *g_createSQL =
42 "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(" \
43 "id TEXT PRIMARY KEY," \
44 "name TEXT," \
45 "height REAL ," \
46 "photo BLOB," \
47 "age INT);";
48 const char *g_createNonPrimaryKeySQL =
49 "CREATE TABLE IF NOT EXISTS NonPrimaryKeyTable(" \
50 "id TEXT," \
51 "name TEXT," \
52 "height REAL ," \
53 "photo BLOB," \
54 "age INT);";
55 const int64_t g_syncWaitTime = 60;
56
57 const Asset g_cloudAsset = {
58 .version = 2, .name = "Phone", .assetId = "0", .subpath = "/local/sync", .uri = "/cloud/sync",
59 .modifyTime = "123456", .createTime = "0", .size = "1024", .hash = "DEC"
60 };
61
62 std::vector<DBStatus> g_actualDBStatus;
63 std::map<std::string, SyncProcess> lastProcess_;
64
CreateUserDBAndTable(sqlite3 * & db)65 void CreateUserDBAndTable(sqlite3 *&db)
66 {
67 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
68 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createSQL), SQLITE_OK);
69 EXPECT_EQ(RelationalTestUtils::ExecSql(db, g_createNonPrimaryKeySQL), SQLITE_OK);
70 }
71
PrepareOption(CloudSyncOption & option,const Query & query,bool isPriorityTask,bool isCompensatedSyncOnly=false)72 void PrepareOption(CloudSyncOption &option, const Query &query, bool isPriorityTask, bool isCompensatedSyncOnly = false)
73 {
74 option.devices = { "CLOUD" };
75 option.mode = SYNC_MODE_CLOUD_MERGE;
76 option.query = query;
77 option.waitTime = g_syncWaitTime;
78 option.priorityTask = isPriorityTask;
79 option.compensatedSyncOnly = isCompensatedSyncOnly;
80 }
81
BlockSync(const Query & query,RelationalStoreDelegate * delegate,std::vector<DBStatus> & actualDBStatus,bool prioritySync=false)82 void BlockSync(const Query &query, RelationalStoreDelegate *delegate, std::vector<DBStatus> &actualDBStatus,
83 bool prioritySync = false)
84 {
85 std::mutex dataMutex;
86 std::condition_variable cv;
87 bool finish = false;
88 auto callback = [&actualDBStatus, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
89 for (const auto &item: process) {
90 actualDBStatus.push_back(item.second.errCode);
91 if (item.second.process == DistributedDB::FINISHED) {
92 {
93 std::lock_guard<std::mutex> autoLock(dataMutex);
94 finish = true;
95 }
96 cv.notify_one();
97 }
98 }
99 };
100 CloudSyncOption option;
101 option.devices = { "CLOUD" };
102 option.mode = SYNC_MODE_CLOUD_MERGE;
103 option.query = query;
104 option.waitTime = g_syncWaitTime;
105 option.priorityTask = prioritySync;
106 ASSERT_EQ(delegate->Sync(option, callback), OK);
107 std::unique_lock<std::mutex> uniqueLock(dataMutex);
108 cv.wait(uniqueLock, [&finish]() {
109 return finish;
110 });
111 }
112
BlockPrioritySync(const Query & query,RelationalStoreDelegate * delegate,bool isPriority,DBStatus expectResult,bool isCompensatedSyncOnly=false)113 void BlockPrioritySync(const Query &query, RelationalStoreDelegate *delegate, bool isPriority, DBStatus expectResult,
114 bool isCompensatedSyncOnly = false)
115 {
116 std::mutex dataMutex;
117 std::condition_variable cv;
118 bool finish = false;
119 std::map<std::string, SyncProcess> last;
120 auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
121 for (const auto &item: process) {
122 if (item.second.process == DistributedDB::FINISHED) {
123 {
124 std::lock_guard<std::mutex> autoLock(dataMutex);
125 finish = true;
126 }
127 cv.notify_one();
128 }
129 }
130 last = process;
131 };
132 CloudSyncOption option;
133 PrepareOption(option, query, isPriority, isCompensatedSyncOnly);
134 ASSERT_EQ(delegate->Sync(option, callback), expectResult);
135 if (expectResult == OK) {
136 std::unique_lock<std::mutex> uniqueLock(dataMutex);
137 cv.wait(uniqueLock, [&finish]() {
138 return finish;
139 });
140 }
141 lastProcess_ = last;
142 }
143
QueryCountCallback(void * data,int count,char ** colValue,char ** colName)144 int QueryCountCallback(void *data, int count, char **colValue, char **colName)
145 {
146 if (count != 1) {
147 return 0;
148 }
149 auto expectCount = reinterpret_cast<int64_t>(data);
150 EXPECT_EQ(strtol(colValue[0], nullptr, 10), expectCount); // 10: decimal
151 return 0;
152 }
153
CheckUserTableResult(sqlite3 * & db,const std::string & tableName,int64_t expectCount)154 void CheckUserTableResult(sqlite3 *&db, const std::string &tableName, int64_t expectCount)
155 {
156 string query = "select count(*) from " + tableName + ";";
157 EXPECT_EQ(sqlite3_exec(db, query.c_str(), QueryCountCallback,
158 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
159 }
160
161 class DistributedDBCloudCheckSyncTest : public testing::Test {
162 public:
163 static void SetUpTestCase();
164 static void TearDownTestCase();
165 void SetUp() override;
166 void TearDown() override;
167 protected:
168 void InitTestDir();
169 DataBaseSchema GetSchema();
170 void CloseDb();
171 void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
172 void InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize, bool assetIsNull);
173 void InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count, int64_t photoSize,
174 bool assetIsNull);
175 void DeleteUserTableRecord(int64_t id);
176 void DeleteUserTableRecord(int64_t begin, int64_t end);
177 void DeleteCloudTableRecord(int64_t gid);
178 void CheckCloudTableCount(const std::string &tableName, int64_t expectCount);
179 bool CheckSyncCount(const Info actualInfo, const Info expectInfo);
180 bool CheckSyncProcess(std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess,
181 vector<SyncProcess> &expectSyncProcessV);
182 void PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
183 RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
184 bool isCheckProcess);
185 void DeleteCloudDBData(int64_t begin, int64_t count);
186 void SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count);
187 void SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count);
188 void InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync = false);
189 void CheckLocalCount(int64_t expectCount);
190 void CheckLogCleaned(int64_t expectCount);
191 void SyncDataStatusTest(bool isCompensatedSyncOnly);
192 std::string testDir_;
193 std::string storePath_;
194 sqlite3 *db_ = nullptr;
195 RelationalStoreDelegate *delegate_ = nullptr;
196 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
197 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
198 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
199 std::string tableName_ = "DistributedDBCloudCheckSyncTest";
200 std::string tableNameShared_ = "DistributedDBCloudCheckSyncTest_shared";
201 std::string tableWithoutPrimaryName_ = "NonPrimaryKeyTable";
202 std::string tableWithoutPrimaryNameShared_ = "NonPrimaryKeyTable_shared";
203 std::string lowerTableName_ = "distributeddbCloudCheckSyncTest";
204 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
205 };
206
SetUpTestCase()207 void DistributedDBCloudCheckSyncTest::SetUpTestCase()
208 {
209 RuntimeConfig::SetCloudTranslate(std::make_shared<VirtualCloudDataTranslate>());
210 }
211
TearDownTestCase()212 void DistributedDBCloudCheckSyncTest::TearDownTestCase()
213 {}
214
SetUp()215 void DistributedDBCloudCheckSyncTest::SetUp()
216 {
217 DistributedDBToolsUnitTest::PrintTestCaseInfo();
218 InitTestDir();
219 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
220 LOGE("rm test db files error.");
221 }
222 DistributedDBToolsUnitTest::PrintTestCaseInfo();
223 LOGD("Test dir is %s", testDir_.c_str());
224 db_ = RelationalTestUtils::CreateDataBase(storePath_);
225 ASSERT_NE(db_, nullptr);
226 CreateUserDBAndTable(db_);
227 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
228 RelationalStoreDelegate::Option option;
229 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
230 ASSERT_NE(delegate_, nullptr);
231 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
232 ASSERT_EQ(delegate_->CreateDistributedTable(tableWithoutPrimaryName_, CLOUD_COOPERATION), DBStatus::OK);
233 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
234 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
235 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
236 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
237 DataBaseSchema dataBaseSchema = GetSchema();
238 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
239 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
240 ASSERT_TRUE(communicatorAggregator_ != nullptr);
241 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
242 }
243
TearDown()244 void DistributedDBCloudCheckSyncTest::TearDown()
245 {
246 virtualCloudDb_->ForkQuery(nullptr);
247 virtualCloudDb_->SetCloudError(false);
248 CloseDb();
249 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
250 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
251 LOGE("rm test db files error.");
252 }
253 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
254 communicatorAggregator_ = nullptr;
255 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
256 }
257
InitTestDir()258 void DistributedDBCloudCheckSyncTest::InitTestDir()
259 {
260 if (!testDir_.empty()) {
261 return;
262 }
263 DistributedDBToolsUnitTest::TestDirInit(testDir_);
264 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
265 LOGI("The test db is:%s", testDir_.c_str());
266 }
267
GetSchema()268 DataBaseSchema DistributedDBCloudCheckSyncTest::GetSchema()
269 {
270 DataBaseSchema schema;
271 TableSchema tableSchema;
272 tableSchema.name = tableName_;
273 tableSchema.sharedTableName = tableName_ + "_shared";
274 tableSchema.fields = {
275 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
276 {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
277 };
278 TableSchema tableWithoutPrimaryKeySchema;
279 tableWithoutPrimaryKeySchema.name = tableWithoutPrimaryName_;
280 tableWithoutPrimaryKeySchema.sharedTableName = tableWithoutPrimaryNameShared_;
281 tableWithoutPrimaryKeySchema.fields = {
282 {"id", TYPE_INDEX<std::string>}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
283 {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
284 };
285 schema.tables.push_back(tableSchema);
286 schema.tables.push_back(tableWithoutPrimaryKeySchema);
287 return schema;
288 }
289
CloseDb()290 void DistributedDBCloudCheckSyncTest::CloseDb()
291 {
292 virtualCloudDb_ = nullptr;
293 if (mgr_ != nullptr) {
294 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
295 delegate_ = nullptr;
296 mgr_ = nullptr;
297 }
298 }
299
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)300 void DistributedDBCloudCheckSyncTest::InsertUserTableRecord(const std::string &tableName,
301 int64_t recordCounts, int64_t begin)
302 {
303 ASSERT_NE(db_, nullptr);
304 for (int64_t i = begin; i < begin + recordCounts; ++i) {
305 string sql = "INSERT OR REPLACE INTO " + tableName
306 + " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local"
307 + std::to_string(i) + "', '155.10', 'text', '21');";
308 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
309 }
310 }
311
InsertCloudTableRecord(int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)312 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(int64_t begin, int64_t count, int64_t photoSize,
313 bool assetIsNull)
314 {
315 InsertCloudTableRecord(tableName_, begin, count, photoSize, assetIsNull);
316 }
317
InsertCloudTableRecord(const std::string & tableName,int64_t begin,int64_t count,int64_t photoSize,bool assetIsNull)318 void DistributedDBCloudCheckSyncTest::InsertCloudTableRecord(const std::string &tableName, int64_t begin, int64_t count,
319 int64_t photoSize, bool assetIsNull)
320 {
321 std::vector<uint8_t> photo(photoSize, 'v');
322 std::vector<VBucket> record1;
323 std::vector<VBucket> extend1;
324 std::vector<VBucket> record2;
325 std::vector<VBucket> extend2;
326 Timestamp now = TimeHelper::GetSysCurrentTime();
327 for (int64_t i = begin; i < begin + count; ++i) {
328 VBucket data;
329 data.insert_or_assign("id", std::to_string(i));
330 data.insert_or_assign("name", "Cloud" + std::to_string(i));
331 data.insert_or_assign("height", 166.0); // 166.0 is random double value
332 data.insert_or_assign("married", false);
333 data.insert_or_assign("photo", photo);
334 data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
335 Asset asset = g_cloudAsset;
336 asset.name = asset.name + std::to_string(i);
337 assetIsNull ? data.insert_or_assign("assert", Nil()) : data.insert_or_assign("assert", asset);
338 record1.push_back(data);
339 VBucket log;
340 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
341 now / CloudDbConstant::TEN_THOUSAND + i));
342 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
343 now / CloudDbConstant::TEN_THOUSAND + i));
344 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
345 extend1.push_back(log);
346
347 std::vector<Asset> assets;
348 data.insert_or_assign("height", 180.3); // 180.3 is random double value
349 for (int64_t j = i; j <= i + 2; j++) { // 2 extra num
350 asset.name = g_cloudAsset.name + std::to_string(j);
351 assets.push_back(asset);
352 }
353 data.erase("assert");
354 data.erase("married");
355 assetIsNull ? data.insert_or_assign("asserts", Nil()) : data.insert_or_assign("asserts", assets);
356 record2.push_back(data);
357 extend2.push_back(log);
358 }
359 ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName, std::move(record1), extend1), DBStatus::OK);
360 std::this_thread::sleep_for(std::chrono::milliseconds(count));
361 }
362
DeleteUserTableRecord(int64_t id)363 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t id)
364 {
365 ASSERT_NE(db_, nullptr);
366 string sql = "DELETE FROM " + tableName_ + " WHERE id ='" + std::to_string(id) + "';";
367 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
368 }
369
DeleteUserTableRecord(int64_t begin,int64_t end)370 void DistributedDBCloudCheckSyncTest::DeleteUserTableRecord(int64_t begin, int64_t end)
371 {
372 ASSERT_NE(db_, nullptr);
373 std::string sql = "DELETE FROM " + tableName_ + " WHERE id IN (";
374 for (int64_t i = begin; i <= end; ++i) {
375 sql += "'" + std::to_string(i) + "',";
376 }
377 if (sql.back() == ',') {
378 sql.pop_back();
379 }
380 sql += ");";
381 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
382 }
383
DeleteCloudTableRecord(int64_t gid)384 void DistributedDBCloudCheckSyncTest::DeleteCloudTableRecord(int64_t gid)
385 {
386 VBucket idMap;
387 idMap.insert_or_assign("#_gid", std::to_string(gid));
388 ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
389 }
390
CheckCloudTableCount(const std::string & tableName,int64_t expectCount)391 void DistributedDBCloudCheckSyncTest::CheckCloudTableCount(const std::string &tableName, int64_t expectCount)
392 {
393 VBucket extend;
394 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
395 int64_t realCount = 0;
396 std::vector<VBucket> data;
397 virtualCloudDb_->Query(tableName, extend, data);
398 for (size_t j = 0; j < data.size(); ++j) {
399 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
400 if (entry != data[j].end() && std::get<bool>(entry->second)) {
401 continue;
402 }
403 realCount++;
404 }
405 EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
406 }
407
CheckSyncCount(const Info actualInfo,const Info expectInfo)408 bool DistributedDBCloudCheckSyncTest::CheckSyncCount(const Info actualInfo, const Info expectInfo)
409 {
410 if (actualInfo.batchIndex != expectInfo.batchIndex) {
411 return false;
412 }
413 if (actualInfo.total != expectInfo.total) {
414 return false;
415 }
416 if (actualInfo.successCount != expectInfo.successCount) {
417 return false;
418 }
419 if (actualInfo.failCount != expectInfo.failCount) {
420 return false;
421 }
422 return true;
423 }
424
CheckSyncProcess(std::vector<std::map<std::string,SyncProcess>> & actualSyncProcess,vector<SyncProcess> & expectSyncProcessV)425 bool DistributedDBCloudCheckSyncTest::CheckSyncProcess(
426 std::vector<std::map<std::string, SyncProcess>> &actualSyncProcess, vector<SyncProcess> &expectSyncProcessV)
427 {
428 vector<map<string, SyncProcess>> expectSyncProcess;
429 for (auto syncProcess : expectSyncProcessV) {
430 map<string, SyncProcess> expectSyncProcessMap = {{"CLOUD", syncProcess}};
431 expectSyncProcess.emplace_back(expectSyncProcessMap);
432 }
433 for (int i = 0; i < (int) actualSyncProcess.size(); i++) {
434 map<string, SyncProcess> actualSyncProcessMap = actualSyncProcess[i];
435 map<string, SyncProcess> expectSyncProcessMap = expectSyncProcess[i];
436 for (auto &it : actualSyncProcessMap) {
437 string mapKey = it.first;
438 if (expectSyncProcessMap.find(mapKey) == expectSyncProcessMap.end()) {
439 return false;
440 }
441 SyncProcess actualSyncProcess = it.second;
442 SyncProcess expectSyncProcess = expectSyncProcessMap.find(mapKey)->second;
443 for (const auto &itInner : actualSyncProcess.tableProcess) {
444 string tableName = itInner.first;
445 if (expectSyncProcess.tableProcess.find(tableName) == expectSyncProcess.tableProcess.end()) {
446 return false;
447 }
448 TableProcessInfo actualTableProcessInfo = itInner.second;
449 TableProcessInfo expectTableProcessInfo = expectSyncProcess.tableProcess.find(tableName)->second;
450 if (!CheckSyncCount(actualTableProcessInfo.downLoadInfo, expectTableProcessInfo.downLoadInfo)) {
451 return false;
452 }
453 if (!CheckSyncCount(actualTableProcessInfo.upLoadInfo, expectTableProcessInfo.upLoadInfo)) {
454 return false;
455 }
456 }
457 }
458 }
459 return true;
460 }
461
PriorityAndNormalSync(const Query & normalQuery,const Query & priorityQuery,RelationalStoreDelegate * delegate,std::vector<std::map<std::string,SyncProcess>> & prioritySyncProcess,bool isCheckProcess)462 void DistributedDBCloudCheckSyncTest::PriorityAndNormalSync(const Query &normalQuery, const Query &priorityQuery,
463 RelationalStoreDelegate *delegate, std::vector<std::map<std::string, SyncProcess>> &prioritySyncProcess,
464 bool isCheckProcess)
465 {
466 std::mutex dataMutex;
467 std::condition_variable cv;
468 bool normalFinish = false;
469 bool priorityFinish = false;
470 auto normalCallback = [&cv, &dataMutex, &normalFinish, &priorityFinish, &prioritySyncProcess, &isCheckProcess](
471 const std::map<std::string, SyncProcess> &process) {
472 for (const auto &item: process) {
473 if (item.second.process == DistributedDB::FINISHED) {
474 normalFinish = true;
475 if (isCheckProcess) {
476 ASSERT_EQ(priorityFinish, true);
477 }
478 cv.notify_one();
479 }
480 }
481 prioritySyncProcess.emplace_back(process);
482 };
483 auto priorityCallback = [&cv, &priorityFinish, &prioritySyncProcess](
484 const std::map<std::string, SyncProcess> &process) {
485 for (const auto &item: process) {
486 if (item.second.process == DistributedDB::FINISHED) {
487 priorityFinish = true;
488 cv.notify_one();
489 }
490 }
491 prioritySyncProcess.emplace_back(process);
492 };
493 CloudSyncOption option;
494 PrepareOption(option, normalQuery, false);
495 virtualCloudDb_->SetBlockTime(500); // 500 ms
496 ASSERT_EQ(delegate->Sync(option, normalCallback), OK);
497 PrepareOption(option, priorityQuery, true);
498 std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 50 ms
499 ASSERT_EQ(delegate->Sync(option, priorityCallback), OK);
500 std::unique_lock<std::mutex> uniqueLock(dataMutex);
501 cv.wait(uniqueLock, [&normalFinish]() {
502 return normalFinish;
503 });
504 }
505
DeleteCloudDBData(int64_t begin,int64_t count)506 void DistributedDBCloudCheckSyncTest::DeleteCloudDBData(int64_t begin, int64_t count)
507 {
508 for (int64_t i = begin; i < begin + count; i++) {
509 VBucket idMap;
510 idMap.insert_or_assign("#_gid", std::to_string(i));
511 ASSERT_EQ(virtualCloudDb_->DeleteByGid(tableName_, idMap), DBStatus::OK);
512 }
513 }
514
SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> & count)515 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest007(std::atomic<int> &count)
516 {
517 virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
518 count++;
519 if (count == 1) { // taskid1
520 std::this_thread::sleep_for(std::chrono::seconds(1));
521 }
522 if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
523 CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
524 }
525 if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
526 CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records after last sync
527 }
528 if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
529 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
530 }
531 });
532 }
533
SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> & count)534 void DistributedDBCloudCheckSyncTest::SetForkQueryForCloudPrioritySyncTest008(std::atomic<int> &count)
535 {
536 virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
537 count++;
538 if (count == 1) { // taskid1
539 std::this_thread::sleep_for(std::chrono::seconds(1));
540 }
541 if (count == 3) { // 3 means taskid3 because CheckCloudTableCount will query then count++
542 CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
543 }
544 if (count == 6) { // 6 means taskid2 because CheckCloudTableCount will query then count++
545 CheckCloudTableCount(tableName_, 1); // 1 is count of cloud records after last sync
546 }
547 if (count == 9) { // 9 means taskid4 because CheckCloudTableCount will query then count++
548 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after last sync
549 }
550 });
551 }
552
InitLogicDeleteDataEnv(int64_t dataCount,bool prioritySync)553 void DistributedDBCloudCheckSyncTest::InitLogicDeleteDataEnv(int64_t dataCount, bool prioritySync)
554 {
555 // prepare data
556 InsertUserTableRecord(tableName_, dataCount);
557 // sync
558 Query query = Query::Select().FromTable({ tableName_ });
559 BlockSync(query, delegate_, g_actualDBStatus);
560 // delete cloud data
561 for (int i = 0; i < dataCount; ++i) {
562 DeleteCloudTableRecord(i);
563 }
564 // sync again
565 BlockSync(query, delegate_, g_actualDBStatus);
566 }
567
CheckLocalCount(int64_t expectCount)568 void DistributedDBCloudCheckSyncTest::CheckLocalCount(int64_t expectCount)
569 {
570 // check local data
571 int dataCnt = -1;
572 std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
573 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
574 dataCnt = sqlite3_column_int(stmt, 0);
575 return E_OK;
576 });
577 EXPECT_EQ(dataCnt, expectCount);
578 }
579
CheckLogCleaned(int64_t expectCount)580 void DistributedDBCloudCheckSyncTest::CheckLogCleaned(int64_t expectCount)
581 {
582 std::string sql1 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
583 " where device = 'cloud';";
584 EXPECT_EQ(sqlite3_exec(db_, sql1.c_str(), QueryCountCallback,
585 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
586 std::string sql2 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where cloud_gid "
587 " is not null and cloud_gid != '';";
588 EXPECT_EQ(sqlite3_exec(db_, sql2.c_str(), QueryCountCallback,
589 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
590 std::string sql3 = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
591 " where flag & 0x02 != 0;";
592 EXPECT_EQ(sqlite3_exec(db_, sql3.c_str(), QueryCountCallback,
593 reinterpret_cast<void *>(expectCount), nullptr), SQLITE_OK);
594 }
595
596 /**
597 * @tc.name: CloudSyncTest001
598 * @tc.desc: sync with device sync query
599 * @tc.type: FUNC
600 * @tc.require:
601 * @tc.author: zhangqiquan
602 */
603 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest001, TestSize.Level0)
604 {
605 // prepare data
606 const int actualCount = 10;
607 InsertUserTableRecord(tableName_, actualCount);
608 // sync twice
609 Query query = Query::Select().FromTable({ tableName_ });
610 BlockSync(query, delegate_, g_actualDBStatus);
611 BlockSync(query, delegate_, g_actualDBStatus);
612 // remove cloud data
613 delegate_->RemoveDeviceData("CLOUD", ClearMode::FLAG_AND_DATA);
614 // check local data
615 int dataCnt = -1;
616 std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anonc8f560be0c02(sqlite3_stmt *stmt) 617 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
618 dataCnt = sqlite3_column_int(stmt, 0);
619 return E_OK;
620 });
621 EXPECT_EQ(dataCnt, 0);
622 }
623
624 /**
625 * @tc.name: CloudSyncTest002
626 * @tc.desc: sync with same data in one batch
627 * @tc.type: FUNC
628 * @tc.require:
629 * @tc.author: zhangqiquan
630 */
631 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest002, TestSize.Level0)
632 {
633 // prepare data
634 const int actualCount = 1;
635 InsertUserTableRecord(tableName_, actualCount);
636 // sync twice
637 Query query = Query::Select().FromTable({ tableName_ });
638 BlockSync(query, delegate_, g_actualDBStatus);
639 // cloud delete id=0 and insert id=0 but its gid is 1
640 // local delete id=0
641 DeleteCloudTableRecord(0); // cloud gid is 0
642 InsertCloudTableRecord(0, actualCount, 0, false); // 0 is id
643 DeleteUserTableRecord(0); // 0 is id
644 BlockSync(query, delegate_, g_actualDBStatus);
645 bool deleteStatus = true;
646 EXPECT_EQ(virtualCloudDb_->GetDataStatus("1", deleteStatus), OK);
647 EXPECT_EQ(deleteStatus, false);
648 }
649
650 /**
651 * @tc.name: CloudSyncTest003
652 * @tc.desc: local data is delete before sync, then sync, cloud data will insert into local
653 * @tc.type: FUNC
654 * @tc.require:
655 * @tc.author: zhangshijie
656 */
657 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest003, TestSize.Level0)
658 {
659 // prepare data
660 const int actualCount = 1;
661 InsertUserTableRecord(tableName_, actualCount);
662
663 InsertCloudTableRecord(0, actualCount, 0, false);
664 // delete local data
665 DeleteUserTableRecord(0);
666 Query query = Query::Select().FromTable({ tableName_ });
667 BlockSync(query, delegate_, g_actualDBStatus);
668
669 // check local data, cloud date will insert into local
670 int dataCnt = -1;
671 std::string checkLogSql = "SELECT count(*) FROM " + tableName_;
__anonc8f560be0d02(sqlite3_stmt *stmt) 672 RelationalTestUtils::ExecSql(db_, checkLogSql, nullptr, [&dataCnt](sqlite3_stmt *stmt) {
673 dataCnt = sqlite3_column_int(stmt, 0);
674 return E_OK;
675 });
676 EXPECT_EQ(dataCnt, actualCount);
677 }
678
679 /**
680 * @tc.name: CloudSyncTest004
681 * @tc.desc: sync after insert failed
682 * @tc.type: FUNC
683 * @tc.require:
684 * @tc.author: zhangqiquan
685 */
686 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest004, TestSize.Level0)
687 {
688 // prepare data
689 const int actualCount = 1;
690 InsertUserTableRecord(tableName_, actualCount);
691 // sync twice
692 Query query = Query::Select().FromTable({ tableName_ });
693 LOGW("Block Sync");
694 virtualCloudDb_->SetInsertFailed(1);
695 BlockSync(query, delegate_, g_actualDBStatus);
696 // delete local data
697 DeleteUserTableRecord(0); // 0 is id
698 LOGW("Block Sync");
699 // sync again and this record with be synced to cloud
700 BlockSync(query, delegate_, g_actualDBStatus);
701 bool deleteStatus = true;
702 EXPECT_EQ(virtualCloudDb_->GetDataStatus("0", deleteStatus), OK);
703 EXPECT_EQ(deleteStatus, true);
704 }
705
706 /**
707 * @tc.name: CloudSyncTest006
708 * @tc.desc: check redownload when common sync pasue.
709 * @tc.type: FUNC
710 * @tc.require:
711 * @tc.author: luoguo
712 */
713 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest006, TestSize.Level0)
714 {
715 /**
716 * @tc.steps:step1. init data and sync
717 * @tc.expected: step1. ok.
718 */
719 const int localCount = 120; // 120 is count of local
720 const int cloudCount = 100; // 100 is count of cloud
721 InsertUserTableRecord(tableName_, localCount, 0);
722 InsertUserTableRecord(tableWithoutPrimaryName_, cloudCount, 0);
723 InsertCloudTableRecord(tableWithoutPrimaryName_, 80, cloudCount, 0, false);
724
725 /**
726 * @tc.steps:step2. common sync will pasue
727 * @tc.expected: step2. ok.
728 */
729 std::vector<std::string> tableNames = {tableName_, tableWithoutPrimaryName_};
730 Query normalQuery = Query::Select().FromTable({tableNames});
731 std::vector<std::string> idValue = {"0", "1", "2"};
732 Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
733 CloudSyncOption option;
734 CloudSyncOption priorityOption;
735 PrepareOption(option, normalQuery, false);
736 PrepareOption(priorityOption, priorityQuery, true);
737 bool isUpload = false;
738 uint32_t blockTime = 2000;
__anonc8f560be0e02(const std::string &tableName, VBucket &extend) 739 virtualCloudDb_->ForkUpload([&isUpload, &blockTime](const std::string &tableName, VBucket &extend) {
740 if (isUpload == false) {
741 isUpload = true;
742 std::this_thread::sleep_for(std::chrono::milliseconds(blockTime));
743 }
744 });
745 bool isFinsh = false;
746 bool priorityFinish = false;
__anonc8f560be0f02(const std::map<std::string, SyncProcess> &process) 747 auto normalCallback = [&isFinsh, &priorityFinish](const std::map<std::string, SyncProcess> &process) {
748 for (const auto &item : process) {
749 if (item.second.process == DistributedDB::FINISHED) {
750 isFinsh = true;
751 ASSERT_EQ(priorityFinish, true);
752 }
753 }
754 };
755 ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
756
757 /**
758 * @tc.steps:step3. wait common upload and make priority sync.
759 * @tc.expected: step3. ok.
760 */
761 while (isUpload == false) {
762 std::this_thread::sleep_for(std::chrono::milliseconds(50));
763 }
__anonc8f560be1002(const std::map<std::string, SyncProcess> &process) 764 auto priorityCallback = [&priorityFinish](const std::map<std::string, SyncProcess> &process) {
765 for (const auto &item : process) {
766 if (item.second.process == DistributedDB::FINISHED) {
767 priorityFinish = true;
768 }
769 }
770 };
771 ASSERT_EQ(delegate_->Sync(priorityOption, priorityCallback), OK);
772 while (isFinsh == false || priorityFinish == false) {
773 std::this_thread::sleep_for(std::chrono::milliseconds(50));
774 }
775
776 /**
777 * @tc.steps:step4. wait common sync and make priority sync finish, check query Times.
778 * @tc.expected: step4. ok.
779 */
780 uint32_t times = virtualCloudDb_->GetQueryTimes(tableName_);
781 ASSERT_EQ(times, 3u);
782 virtualCloudDb_->ForkUpload(nullptr);
783 }
784
785 /**
786 * @tc.name: CloudSyncTest007
787 * @tc.desc: check proess info when version conflict sync process.
788 * @tc.type: FUNC
789 * @tc.require:
790 * @tc.author: luoguo
791 */
792 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest007, TestSize.Level0)
793 {
794 /**
795 * @tc.steps:step1. init data and sync
796 * @tc.expected: step1. ok.
797 */
798 const int localCount = 60; // 120 is count of local
799 InsertUserTableRecord(tableName_, localCount, 0);
800 Query query = Query::Select().FromTable({tableName_});
801 BlockSync(query, delegate_, g_actualDBStatus);
802
803 /**
804 * @tc.steps:step2. delete 30 - 59 records in user table, and set callback func.
805 * @tc.expected: step2. ok.
806 */
807 DeleteUserTableRecord(30, 59);
808 bool isUpload = false;
__anonc8f560be1102(const std::string &tableName, VBucket &extend) 809 virtualCloudDb_->ForkUpload([&isUpload](const std::string &tableName, VBucket &extend) {
810 if (isUpload == false) {
811 isUpload = true;
812 std::this_thread::sleep_for(std::chrono::milliseconds(2000));
813 }
814 });
815 bool isFinsh = false;
816 std::map<std::string, TableProcessInfo> retSyncProcess;
__anonc8f560be1202(const std::map<std::string, SyncProcess> &process) 817 auto normalCallback = [&isFinsh, &retSyncProcess](const std::map<std::string, SyncProcess> &process) {
818 for (const auto &item : process) {
819 if (item.second.process == DistributedDB::FINISHED) {
820 isFinsh = true;
821 ASSERT_EQ(process.empty(), false);
822 auto lastProcess = process.rbegin();
823 retSyncProcess = lastProcess->second.tableProcess;
824 }
825 }
826 };
827
828 /**
829 * @tc.steps:step3. sync.
830 * @tc.expected: step3. ok.
831 */
832 std::vector<std::string> tableNames = {tableName_};
833 Query normalQuery = Query::Select().FromTable({tableNames});
834 CloudSyncOption option;
835 PrepareOption(option, normalQuery, false);
836 ASSERT_EQ(delegate_->Sync(option, normalCallback), OK);
837
838 /**
839 * @tc.steps:step4. wait upload processs and delete 30 record in cloud table.
840 * @tc.expected: step4. ok.
841 */
842 while (isUpload == false) {
843 std::this_thread::sleep_for(std::chrono::milliseconds(50));
844 }
845 DeleteCloudTableRecord(30);
846
847 /**
848 * @tc.steps:step5. wait sync processs end and check data.
849 * @tc.expected: step5. ok.
850 */
851 while (isFinsh == false) {
852 std::this_thread::sleep_for(std::chrono::milliseconds(50));
853 }
854 ASSERT_EQ(retSyncProcess.empty(), false);
855 auto taskInfo = retSyncProcess.rbegin();
856 ASSERT_EQ(taskInfo->second.upLoadInfo.total, 30u);
857 virtualCloudDb_->ForkUpload(nullptr);
858 }
859
860 /**
861 * @tc.name: CloudSyncTest009
862 * @tc.desc: reopen database and sync
863 * @tc.type: FUNC
864 * @tc.require:
865 * @tc.author: wangxiangdong
866 */
867 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest009, TestSize.Level0)
868 {
869 /**
870 * @tc.steps: step1. insert 1 record to user table
871 * @tc.expected: step1. OK.
872 */
873 const int actualCount = 1;
874 InsertUserTableRecord(tableName_, actualCount);
875 /**
876 * @tc.steps: step2. sync data to cloud
877 * @tc.expected: step2. OK.
878 */
879 Query query = Query::Select().FromTable({ tableName_ });
880 BlockSync(query, delegate_, g_actualDBStatus);
881 CheckCloudTableCount(tableName_, 1);
882 /**
883 * @tc.steps: step3. drop data table then close db
884 * @tc.expected: step3. OK.
885 */
886 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
887 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
888 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
889 delegate_ = nullptr;
890 /**
891 * @tc.steps: step4. recreate data table and reopen database
892 * @tc.expected: step4. OK.
893 */
894 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, g_createSQL), DBStatus::OK);
895 RelationalStoreDelegate::Option option;
896 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
897 ASSERT_NE(delegate_, nullptr);
898 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::OK);
899 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
900 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
901 DataBaseSchema dataBaseSchema = GetSchema();
902 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
903 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
904 ASSERT_TRUE(communicatorAggregator_ != nullptr);
905 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
906 /**
907 * @tc.steps: step5. sync and cloud data should be deleted
908 * @tc.expected: step5. OK.
909 */
910 BlockSync(query, delegate_, g_actualDBStatus);
911 CheckCloudTableCount(tableName_, 0);
912 }
913
914 /**
915 * @tc.name: CloudSyncTest010
916 * @tc.desc: reopen database, recreate table with less columns and sync
917 * @tc.type: FUNC
918 * @tc.require:
919 * @tc.author: wangxiangdong
920 */
921 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncTest010, TestSize.Level0)
922 {
923 /**
924 * @tc.steps: step1. insert 1 record to user table
925 * @tc.expected: step1. OK.
926 */
927 const int actualCount = 1;
928 InsertUserTableRecord(tableName_, actualCount);
929 /**
930 * @tc.steps: step2. sync data to cloud
931 * @tc.expected: step2. OK.
932 */
933 Query query = Query::Select().FromTable({ tableName_ });
934 BlockSync(query, delegate_, g_actualDBStatus);
935 CheckCloudTableCount(tableName_, 1);
936 /**
937 * @tc.steps: step3. drop data table then close db
938 * @tc.expected: step3. OK.
939 */
940 std::string deleteSql = "DROP TABLE IF EXISTS " + tableName_ + ";";
941 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, deleteSql), DBStatus::OK);
942 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
943 delegate_ = nullptr;
944 /**
945 * @tc.steps: step4. recreate data table and reopen database
946 * @tc.expected: step4. OK.
947 */
948 std::string createSql = "CREATE TABLE IF NOT EXISTS DistributedDBCloudCheckSyncTest(id INT PRIMARY KEY);";
949 EXPECT_EQ(SQLiteUtils::ExecuteRawSQL(db_, createSql), DBStatus::OK);
950 RelationalStoreDelegate::Option option;
951 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
952 ASSERT_NE(delegate_, nullptr);
953 ASSERT_EQ(delegate_->CreateDistributedTable(tableName_, CLOUD_COOPERATION), DBStatus::SCHEMA_MISMATCH);
954 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
955 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
956 DataBaseSchema dataBaseSchema = GetSchema();
957 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
958 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
959 ASSERT_TRUE(communicatorAggregator_ != nullptr);
960 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
961 /**
962 * @tc.steps: step5. sync failed with SCHEMA_MISMATCH
963 * @tc.expected: step5. OK.
964 */
965 BlockPrioritySync(query, delegate_, false, DBStatus::SCHEMA_MISMATCH);
966 CheckCloudTableCount(tableName_, 1);
967 }
968
969 /**
970 * @tc.name: CloudSyncObserverTest001
971 * @tc.desc: test cloud sync multi observer
972 * @tc.type: FUNC
973 * @tc.require:
974 * @tc.author: zhangshijie
975 */
976 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudSyncObserverTest001, TestSize.Level0)
977 {
978 // prepare data
979 const int actualCount = 10;
980 InsertUserTableRecord(tableName_, actualCount);
981
982 /**
983 * @tc.steps:step1. open two delegate with two observer.
984 * @tc.expected: step1. ok.
985 */
986 RelationalStoreDelegate::Option option;
987 auto observer1 = new (std::nothrow) RelationalStoreObserverUnitTest();
988 ASSERT_NE(observer1, nullptr);
989 option.observer = observer1;
990 RelationalStoreDelegate *delegate1 = nullptr;
991 EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate1), DBStatus::OK);
992 ASSERT_NE(delegate1, nullptr);
993
994 auto observer2 = new (std::nothrow) RelationalStoreObserverUnitTest();
995 ASSERT_NE(observer2, nullptr);
996 option.observer = observer2;
997 RelationalStoreDelegate *delegate2 = nullptr;
998 EXPECT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate2), DBStatus::OK);
999 ASSERT_NE(delegate2, nullptr);
1000
1001 /**
1002 * @tc.steps:step2. insert 1-10 cloud data, start.
1003 * @tc.expected: step2. ok.
1004 */
1005 InsertCloudTableRecord(0, actualCount, actualCount, false);
1006 Query query = Query::Select().FromTable({ tableName_ });
1007 BlockSync(query, delegate_, g_actualDBStatus);
1008
1009 /**
1010 * @tc.steps:step3. check observer.
1011 * @tc.expected: step3. ok.
1012 */
1013 EXPECT_EQ(observer1->GetCloudCallCount(), 1u);
1014 EXPECT_EQ(observer2->GetCloudCallCount(), 1u);
1015
1016 /**
1017 * @tc.steps:step4. insert 11-20 cloud data, start.
1018 * @tc.expected: step4. ok.
1019 */
1020 delegate2->UnRegisterObserver();
1021 observer2->ResetCloudSyncToZero();
1022 int64_t begin = 11;
1023 InsertCloudTableRecord(begin, actualCount, actualCount, false);
1024 BlockSync(query, delegate_, g_actualDBStatus);
1025
1026 /**
1027 * @tc.steps:step5. check observer.
1028 * @tc.expected: step5. ok.
1029 */
1030 EXPECT_EQ(observer1->GetCloudCallCount(), 2u); // 2 is observer1 triggered times
1031 EXPECT_EQ(observer2->GetCloudCallCount(), 0u);
1032
1033 delete observer1;
1034 observer1 = nullptr;
1035 EXPECT_EQ(mgr_->CloseStore(delegate1), DBStatus::OK);
1036
1037 delete observer2;
1038 observer2 = nullptr;
1039 EXPECT_EQ(mgr_->CloseStore(delegate2), DBStatus::OK);
1040 }
1041
1042 /**
1043 * @tc.name: CloudPrioritySyncTest001
1044 * @tc.desc: use priority sync interface when query in or from table
1045 * @tc.type: FUNC
1046 * @tc.require:
1047 * @tc.author: chenchaohao
1048 */
1049 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest001, TestSize.Level0)
1050 {
1051 /**
1052 * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1053 * @tc.expected: step1. ok.
1054 */
1055 const int actualCount = 10; // 10 is count of records
1056 InsertUserTableRecord(tableName_, actualCount);
1057 std::vector<std::string> idValue = {"0", "1", "2"};
1058 Query query = Query::Select().From(tableName_).In("id", idValue);
1059
1060 /**
1061 * @tc.steps:step2. check ParserQueryNodes
1062 * @tc.expected: step2. ok.
1063 */
__anonc8f560be1302(const std::string &tableName, VBucket &extend) 1064 virtualCloudDb_->ForkQuery([this, &idValue](const std::string &tableName, VBucket &extend) {
1065 EXPECT_EQ(tableName_, tableName);
1066 if (extend.find(CloudDbConstant::QUERY_FIELD) == extend.end()) {
1067 return;
1068 }
1069 Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1070 DBStatus status = OK;
1071 auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1072 EXPECT_EQ(status, OK);
1073 ASSERT_EQ(queryNodes.size(), 1u);
1074 EXPECT_EQ(queryNodes[0].type, QueryNodeType::IN);
1075 EXPECT_EQ(queryNodes[0].fieldName, "id");
1076 ASSERT_EQ(queryNodes[0].fieldValue.size(), idValue.size());
1077 for (size_t i = 0u; i < idValue.size(); i++) {
1078 std::string val = std::get<std::string>(queryNodes[0].fieldValue[i]);
1079 EXPECT_EQ(val, idValue[i]);
1080 }
1081 });
1082 BlockPrioritySync(query, delegate_, true, OK);
1083 virtualCloudDb_->ForkQuery(nullptr);
1084 CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1085
1086 /**
1087 * @tc.steps:step3. use priority sync interface but not priority.
1088 * @tc.expected: step3. ok.
1089 */
1090 query = Query::Select().FromTable({ tableName_ });
1091 BlockPrioritySync(query, delegate_, false, OK);
1092 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1093
1094 /**
1095 * @tc.steps:step4. insert user table record and query from table, then priority sync.
1096 * @tc.expected: step4. ok.
1097 */
1098 InsertUserTableRecord(tableName_, actualCount, actualCount);
1099 BlockPrioritySync(query, delegate_, true, OK);
1100 CheckCloudTableCount(tableName_, 20); // 20 is count of cloud records
1101 }
1102
1103
1104 /**
1105 * @tc.name: CloudPrioritySyncTest002
1106 * @tc.desc: priority sync in some abnormal query situations
1107 * @tc.type: FUNC
1108 * @tc.require:
1109 * @tc.author: chenchaohao
1110 */
1111 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest002, TestSize.Level0)
1112 {
1113 /**
1114 * @tc.steps:step1. insert user table record.
1115 * @tc.expected: step1. ok.
1116 */
1117 const int actualCount = 1; // 1 is count of records
1118 InsertUserTableRecord(tableName_, actualCount);
1119
1120 /**
1121 * @tc.steps:step2. query select tablename then priority sync.
1122 * @tc.expected: step2. invalid.
1123 */
1124 Query query = Query::Select(tableName_);
1125 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1126 CheckCloudTableCount(tableName_, 0);
1127
1128 /**
1129 * @tc.steps:step3. query select without from then priority sync.
1130 * @tc.expected: step3. invalid.
1131 */
1132 query = Query::Select();
1133 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1134 CheckCloudTableCount(tableName_, 0);
1135
1136 /**
1137 * @tc.steps:step4. query select and from without in then priority sync.
1138 * @tc.expected: step4. invalid.
1139 */
1140 query = Query::Select().From(tableName_);
1141 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1142 CheckCloudTableCount(tableName_, 0);
1143
1144 /**
1145 * @tc.steps:step5. query select and fromtable then priority sync.
1146 * @tc.expected: step5. not support.
1147 */
1148 query = Query::Select().From(tableName_).FromTable({tableName_});
1149 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1150 CheckCloudTableCount(tableName_, 0);
1151
1152 /**
1153 * @tc.steps:step6. query select and from with other predicates then priority sync.
1154 * @tc.expected: step6. not support.
1155 */
1156 query = Query::Select().From(tableName_).IsNotNull("id");
1157 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1158 CheckCloudTableCount(tableName_, 0);
1159
1160 /**
1161 * @tc.steps:step7. query select and from with in and other predicates then priority sync.
1162 * @tc.expected: step7 not support.
1163 */
1164 std::vector<std::string> idValue = {"0"};
1165 query = Query::Select().From(tableName_).IsNotNull("id").In("id", idValue);
1166 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1167 CheckCloudTableCount(tableName_, 0);
1168
1169 /**
1170 * @tc.steps:step8. query select and from with in non-primary key then priority sync.
1171 * @tc.expected: step8. not support.
1172 */
1173 std::vector<std::string> heightValue = {"155.10"};
1174 query = Query::Select().From(tableName_).In("height", heightValue);
1175 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1176 CheckCloudTableCount(tableName_, 0);
1177
1178 /**
1179 * @tc.steps:step9. query in count greater than 100.
1180 * @tc.expected: step9. over max limits.
1181 */
1182 idValue.resize(101); // 101 > 100
1183 query = Query::Select().From(tableName_).In("id", idValue);
1184 BlockPrioritySync(query, delegate_, true, OVER_MAX_LIMITS);
1185 CheckCloudTableCount(tableName_, 0);
1186 }
1187
1188 /**
1189 * @tc.name: CloudPrioritySyncTest003
1190 * @tc.desc: priority sync when normal syncing
1191 * @tc.type: FUNC
1192 * @tc.require:
1193 * @tc.author: chenchaohao
1194 */
1195 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest003, TestSize.Level0)
1196 {
1197 /**
1198 * @tc.steps:step1. insert user table record.
1199 * @tc.expected: step1. ok.
1200 */
1201 const int actualCount = 10; // 10 is count of records
1202 InsertUserTableRecord(tableName_, actualCount);
1203
1204 /**
1205 * @tc.steps:step2. begin normal sync and priority sync.
1206 * @tc.expected: step2. ok.
1207 */
1208 Query normalQuery = Query::Select().FromTable({tableName_});
1209 std::vector<std::string> idValue = {"0", "1", "2"};
1210 Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
1211 std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
1212 PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, true);
1213 EXPECT_EQ(virtualCloudDb_->GetLockCount(), 2);
1214 virtualCloudDb_->Reset();
1215 EXPECT_EQ(virtualCloudDb_->GetLockCount(), 0);
1216 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1217 }
1218
1219 /**
1220 * @tc.name: CloudPrioritySyncTest004
1221 * @tc.desc: non-primarykey table priority sync
1222 * @tc.type: FUNC
1223 * @tc.require:
1224 * @tc.author: chenchaohao
1225 */
1226 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest004, TestSize.Level0)
1227 {
1228 /**
1229 * @tc.steps:step1. insert user non-primarykey table record.
1230 * @tc.expected: step1. ok.
1231 */
1232 const int actualCount = 10; // 10 is count of records
1233 InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1234
1235 /**
1236 * @tc.steps:step2. begin priority sync.
1237 * @tc.expected: step2. not support.
1238 */
1239 std::vector<std::string> idValue = {"0", "1", "2"};
1240 Query query = Query::Select().From(tableWithoutPrimaryName_).In("id", idValue);
1241 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1242 CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1243
1244 /**
1245 * @tc.steps:step3. begin priority sync when in rowid.
1246 * @tc.expected: step3. invalid.
1247 */
1248 std::vector<int64_t> rowidValue = {0, 1, 2}; // 0,1,2 are rowid value
1249 query = Query::Select().From(tableWithoutPrimaryName_).In("rowid", rowidValue);
1250 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1251 CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1252 }
1253
1254 /**
1255 * @tc.name: CloudPrioritySyncTest005
1256 * @tc.desc: priority sync but don't have records
1257 * @tc.type: FUNC
1258 * @tc.require:
1259 * @tc.author: chenchaohao
1260 */
1261 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest005, TestSize.Level0)
1262 {
1263 /**
1264 * @tc.steps:step1. insert user non-primarykey table record.
1265 * @tc.expected: step1. ok.
1266 */
1267 const int actualCount = 10; // 10 is count of records
1268 InsertUserTableRecord(tableWithoutPrimaryName_, actualCount);
1269
1270 /**
1271 * @tc.steps:step2. begin DistributedDBCloudCheckSyncTest priority sync and check records.
1272 * @tc.expected: step2. ok.
1273 */
1274 std::vector<std::string> idValue = {"0", "1", "2"};
1275 Query query = Query::Select().From(tableName_).In("id", idValue);
1276 BlockPrioritySync(query, delegate_, true, OK);
1277 CheckCloudTableCount(tableWithoutPrimaryName_, 0);
1278 CheckCloudTableCount(tableName_, 0);
1279 }
1280
1281 /**
1282 * @tc.name: CloudPrioritySyncTest006
1283 * @tc.desc: priority sync tasks greater than limit
1284 * @tc.type: FUNC
1285 * @tc.require:
1286 * @tc.author: chenchaohao
1287 */
1288 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest006, TestSize.Level0)
1289 {
1290 /**
1291 * @tc.steps:step1. insert user table record.
1292 * @tc.expected: step1. ok.
1293 */
1294 const int actualCount = 10; // 10 is count of records
1295 InsertUserTableRecord(tableName_, actualCount);
1296
1297 /**
1298 * @tc.steps:step2. begin 32 priority sync tasks and then begin 1 priority sync task.
1299 * @tc.expected: step2. ok.
1300 */
1301 std::vector<std::string> idValue = {"0", "1", "2"};
1302 Query query = Query::Select().From(tableName_).In("id", idValue);
1303 std::mutex dataMutex;
1304 std::condition_variable cv;
1305 std::mutex callbackMutex;
1306 std::condition_variable callbackCv;
1307 bool finish = false;
1308 size_t finishCount = 0u;
__anonc8f560be1402(const std::string &tableName, VBucket &extend) 1309 virtualCloudDb_->ForkQuery([&cv, &finish, &dataMutex](const std::string &tableName, VBucket &extend) {
1310 std::unique_lock<std::mutex> uniqueLock(dataMutex);
1311 cv.wait(uniqueLock, [&finish]() {
1312 return finish;
1313 });
1314 });
__anonc8f560be1602(const std::map<std::string, SyncProcess> &process) 1315 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1316 for (const auto &item: process) {
1317 if (item.second.process == DistributedDB::FINISHED) {
1318 {
1319 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1320 finishCount++;
1321 }
1322 callbackCv.notify_one();
1323 }
1324 }
1325 };
1326 CloudSyncOption option;
1327 PrepareOption(option, query, true);
1328 for (int i = 0; i < 32; i++) { // 32 is count of sync tasks
1329 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1330 }
1331 ASSERT_EQ(delegate_->Sync(option, nullptr), BUSY);
1332 {
1333 std::lock_guard<std::mutex> autoLock(dataMutex);
1334 finish = true;
1335 }
1336 cv.notify_all();
1337 virtualCloudDb_->ForkQuery(nullptr);
1338 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonc8f560be1702() 1339 callbackCv.wait(callbackLock, [&finishCount]() {
1340 return (finishCount == 32u); // 32 is count of finished sync tasks
1341 });
1342 CheckCloudTableCount(tableName_, 3); // 3 is count of cloud records
1343 }
1344
1345 /**
1346 * @tc.name: CloudPrioritySyncTest007
1347 * @tc.desc: priority normal priority normal when different query
1348 * @tc.type: FUNC
1349 * @tc.require:
1350 * @tc.author: chenchaohao
1351 */
1352 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest007, TestSize.Level0)
1353 {
1354 /**
1355 * @tc.steps:step1. insert user table record.
1356 * @tc.expected: step1. ok.
1357 */
1358 const int actualCount = 10; // 10 is count of records
1359 InsertUserTableRecord(tableName_, actualCount);
1360
1361 /**
1362 * @tc.steps:step2. set callback to check during sync.
1363 * @tc.expected: step2. ok.
1364 */
1365 std::atomic<int> count = 0;
1366 SetForkQueryForCloudPrioritySyncTest007(count);
1367
1368 /**
1369 * @tc.steps:step3. perform priority normal priority normal sync.
1370 * @tc.expected: step3. ok.
1371 */
1372 std::vector<std::string> idValue = {"0"};
1373 Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1374 CloudSyncOption option;
1375 PrepareOption(option, priorytyQuery, true);
1376 option.lockAction = static_cast<LockAction>(0xff); // lock all
1377 std::mutex callbackMutex;
1378 std::condition_variable callbackCv;
1379 size_t finishCount = 0u;
__anonc8f560be1802(const std::map<std::string, SyncProcess> &process) 1380 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1381 for (const auto &item: process) {
1382 if (item.second.process == DistributedDB::FINISHED) {
1383 {
1384 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1385 finishCount++;
1386 }
1387 callbackCv.notify_one();
1388 }
1389 }
1390 };
1391 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1392 Query normalQuery = Query::Select().FromTable({tableName_});
1393 PrepareOption(option, normalQuery, false);
1394 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1395 idValue = {"1"};
1396 priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1397 PrepareOption(option, priorytyQuery, true);
1398 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1399 PrepareOption(option, normalQuery, false);
1400 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1401 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonc8f560be1902() 1402 callbackCv.wait(callbackLock, [&finishCount]() {
1403 return (finishCount == 4u); // 4 is count of finished sync tasks
1404 });
1405 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1406 }
1407
1408 /**
1409 * @tc.name: CloudPrioritySyncTest008
1410 * @tc.desc: priority normal priority normal when different query
1411 * @tc.type: FUNC
1412 * @tc.require:
1413 * @tc.author: chenchaohao
1414 */
1415 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest008, TestSize.Level0)
1416 {
1417 /**
1418 * @tc.steps:step1. insert user table record.
1419 * @tc.expected: step1. ok.
1420 */
1421 const int actualCount = 10; // 10 is count of records
1422 InsertUserTableRecord(tableName_, actualCount);
1423
1424 /**
1425 * @tc.steps:step2. set callback to check during sync.
1426 * @tc.expected: step2. ok.
1427 */
1428 std::atomic<int> count = 0;
1429 SetForkQueryForCloudPrioritySyncTest008(count);
1430
1431 /**
1432 * @tc.steps:step3. perform priority normal priority normal sync.
1433 * @tc.expected: step3. ok.
1434 */
1435 std::vector<std::string> idValue = {"0"};
1436 Query priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1437 CloudSyncOption option;
1438 option.lockAction = static_cast<LockAction>(0xff); // lock all
1439 PrepareOption(option, priorytyQuery, true);
1440 std::mutex callbackMutex;
1441 std::condition_variable callbackCv;
1442 size_t finishCount = 0u;
__anonc8f560be1a02(const std::map<std::string, SyncProcess> &process) 1443 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1444 for (const auto &item: process) {
1445 if (item.second.process == DistributedDB::FINISHED) {
1446 {
1447 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1448 finishCount++;
1449 }
1450 callbackCv.notify_one();
1451 }
1452 }
1453 };
1454 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1455 Query normalQuery = Query::Select().FromTable({tableName_});
1456 PrepareOption(option, normalQuery, false);
1457 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1458 priorytyQuery = Query::Select().From(tableName_).In("id", idValue);
1459 PrepareOption(option, priorytyQuery, true);
1460 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1461 PrepareOption(option, normalQuery, false);
1462 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1463 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonc8f560be1b02() 1464 callbackCv.wait(callbackLock, [&finishCount]() {
1465 return (finishCount == 4u); // 4 is count of finished sync tasks
1466 });
1467 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records
1468 }
1469
1470 /**
1471 * @tc.name: CloudPrioritySyncTest009
1472 * @tc.desc: use priority sync interface when query equal to from table
1473 * @tc.type: FUNC
1474 * @tc.require:
1475 * @tc.author: zhangqiquan
1476 */
1477 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest009, TestSize.Level0)
1478 {
1479 /**
1480 * @tc.steps:step1. insert user table record and query in 3 records, then priority sync.
1481 * @tc.expected: step1. ok.
1482 */
1483 const int actualCount = 5; // 5 is count of records
1484 InsertUserTableRecord(tableName_, actualCount);
1485 Query query = Query::Select().From(tableName_).BeginGroup().EqualTo("id", "0").Or().EqualTo("id", "1").EndGroup();
1486
1487 /**
1488 * @tc.steps:step2. check ParserQueryNodes
1489 * @tc.expected: step2. ok.
1490 */
__anonc8f560be1c02(const std::string &tableName, VBucket &extend) 1491 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
1492 EXPECT_EQ(tableName_, tableName);
1493 Bytes bytes = std::get<Bytes>(extend[CloudDbConstant::QUERY_FIELD]);
1494 DBStatus status = OK;
1495 auto queryNodes = RelationalStoreManager::ParserQueryNodes(bytes, status);
1496 EXPECT_EQ(status, OK);
1497 ASSERT_EQ(queryNodes.size(), 5u); // 5 is query nodes count
1498 });
1499 BlockPrioritySync(query, delegate_, true, OK);
1500 virtualCloudDb_->ForkQuery(nullptr);
1501 CheckCloudTableCount(tableName_, 2); // 2 is count of cloud records
1502 }
1503
1504 /**
1505 * @tc.name: CloudPrioritySyncTest010
1506 * @tc.desc: priority sync after cloud delete
1507 * @tc.type: FUNC
1508 * @tc.require:
1509 * @tc.author: chenchaohao
1510 */
1511 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest010, TestSize.Level0)
1512 {
1513 /**
1514 * @tc.steps:step1. insert user table record.
1515 * @tc.expected: step1. ok.
1516 */
1517 const int actualCount = 10; // 10 is count of records
1518 InsertUserTableRecord(tableName_, actualCount);
1519
1520 /**
1521 * @tc.steps:step2. normal sync and then delete cloud records.
1522 * @tc.expected: step2. ok.
1523 */
1524 Query query = Query::Select().FromTable({tableName_});
1525 BlockSync(query, delegate_, g_actualDBStatus);
1526 CheckCloudTableCount(tableName_, 10); // 10 is count of cloud records after sync
1527 DeleteCloudDBData(0, 3); // delete 0 1 2 record in cloud
1528 CheckCloudTableCount(tableName_, 7); // 7 is count of cloud records after delete
1529 CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1530
1531 /**
1532 * @tc.steps:step3. priory sync and set query then check user table records.
1533 * @tc.expected: step3. ok.
1534 */
1535 std::vector<std::string> idValue = {"3", "4", "5"};
1536 query = Query::Select().From(tableName_).In("id", idValue);
1537 BlockPrioritySync(query, delegate_, true, OK);
1538 CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records after sync
1539 idValue = {"0", "1", "2"};
1540 query = Query::Select().From(tableName_).In("id", idValue);
1541 BlockPrioritySync(query, delegate_, true, OK);
1542 CheckUserTableResult(db_, tableName_, 7); // 7 is count of user records after sync
1543 }
1544
1545 /**
1546 * @tc.name: CloudPrioritySyncTest011
1547 * @tc.desc: priority sync after cloud insert
1548 * @tc.type: FUNC
1549 * @tc.require:
1550 * @tc.author: chenchaohao
1551 */
1552 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest011, TestSize.Level0)
1553 {
1554 /**
1555 * @tc.steps:step1. insert cloud table record.
1556 * @tc.expected: step1. ok.
1557 */
1558 const int actualCount = 10; // 10 is count of records
1559 InsertCloudTableRecord(0, actualCount, actualCount, false);
1560 std::vector<std::string> idValue = {"0", "1", "2"};
1561 Query query = Query::Select().From(tableName_).In("id", idValue);
1562 std::atomic<int> count = 0;
1563
1564 /**
1565 * @tc.steps:step2. check user records when query.
1566 * @tc.expected: step1. ok.
1567 */
__anonc8f560be1d02(const std::string &, VBucket &) 1568 virtualCloudDb_->ForkQuery([this, &count](const std::string &, VBucket &) {
1569 count++;
1570 if (count == 1) { // taskid1
1571 std::this_thread::sleep_for(std::chrono::seconds(1));
1572 }
1573 if (count == 2) { // taskid2
1574 CheckUserTableResult(db_, tableName_, 3); // 3 is count of user records after first sync
1575 }
1576 });
1577 CloudSyncOption option;
1578 PrepareOption(option, query, true);
1579 std::mutex callbackMutex;
1580 std::condition_variable callbackCv;
1581 size_t finishCount = 0u;
__anonc8f560be1e02(const std::map<std::string, SyncProcess> &process) 1582 auto callback = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
1583 for (const auto &item: process) {
1584 if (item.second.process == DistributedDB::FINISHED) {
1585 {
1586 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
1587 finishCount++;
1588 }
1589 callbackCv.notify_one();
1590 }
1591 }
1592 };
1593
1594 /**
1595 * @tc.steps:step3. begin sync and check user record.
1596 * @tc.expected: step3. ok.
1597 */
1598 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1599 idValue = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
1600 query = Query::Select().From(tableName_).In("id", idValue);
1601 PrepareOption(option, query, true);
1602 ASSERT_EQ(delegate_->Sync(option, callback), OK);
1603 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonc8f560be1f02() 1604 callbackCv.wait(callbackLock, [&finishCount]() {
1605 return (finishCount == 2u); // 2 is count of finished sync tasks
1606 });
1607 CheckUserTableResult(db_, tableName_, 10); // 10 is count of user records
1608 }
1609
1610 /**
1611 * @tc.name: CloudPrioritySyncTest012
1612 * @tc.desc: priority or normal sync when waittime > 300s or < -1
1613 * @tc.type: FUNC
1614 * @tc.require:
1615 * @tc.author: chenchaohao
1616 */
1617 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest012, TestSize.Level0)
1618 {
1619 /**
1620 * @tc.steps:step1. insert cloud table record.
1621 * @tc.expected: step1. ok.
1622 */
1623 const int actualCount = 10; // 10 is count of records
1624 InsertCloudTableRecord(0, actualCount, actualCount, false);
1625 std::vector<std::string> idValue = {"0", "1", "2"};
1626 Query query = Query::Select().From(tableName_).In("id", idValue);
1627
1628 /**
1629 * @tc.steps:step2. set waittime < -1 then begin sync.
1630 * @tc.expected: step2. invalid.
1631 */
1632 CloudSyncOption option;
1633 PrepareOption(option, query, true);
1634 option.waitTime = -2; // -2 < -1;
1635 ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1636 CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1637
1638 /**
1639 * @tc.steps:step3. set waittime > 300s then begin sync.
1640 * @tc.expected: step3. invalid.
1641 */
1642
1643 option.waitTime = 300001; // 300001 > 300s
1644 ASSERT_EQ(delegate_->Sync(option, nullptr), INVALID_ARGS);
1645 CheckUserTableResult(db_, tableName_, 0); // 0 is count of user records
1646 }
1647
1648 /**
1649 * @tc.name: CloudPrioritySyncTest013
1650 * @tc.desc: priority sync in some abnormal composite pk query situations
1651 * @tc.type: FUNC
1652 * @tc.require:
1653 * @tc.author: chenchaohao
1654 */
1655 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest013, TestSize.Level0)
1656 {
1657 /**
1658 * @tc.steps:step1. insert user table record.
1659 * @tc.expected: step1. ok.
1660 */
1661 const int actualCount = 1; // 1 is count of records
1662 InsertUserTableRecord(tableName_, actualCount);
1663
1664 /**
1665 * @tc.steps:step2. query only begingroup then priority sync.
1666 * @tc.expected: step2. invalid.
1667 */
1668 Query query = Query::Select().From(tableName_).BeginGroup();
1669 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1670 CheckCloudTableCount(tableName_, 0);
1671
1672 /**
1673 * @tc.steps:step3. query only endgroup then priority sync.
1674 * @tc.expected: step3. invalid.
1675 */
1676 query = Query::Select().From(tableName_).EndGroup();
1677 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1678 CheckCloudTableCount(tableName_, 0);
1679
1680 /**
1681 * @tc.steps:step4. query only begingroup and endgroup then priority sync.
1682 * @tc.expected: step4. invalid.
1683 */
1684 query = Query::Select().From(tableName_).BeginGroup().EndGroup();
1685 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1686 CheckCloudTableCount(tableName_, 0);
1687
1688 /**
1689 * @tc.steps:step5. query and from table then priority sync.
1690 * @tc.expected: step5. invalid.
1691 */
1692 query = Query::Select().And().From(tableName_);
1693 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1694 CheckCloudTableCount(tableName_, 0);
1695
1696 /**
1697 * @tc.steps:step6. query or from table then priority sync.
1698 * @tc.expected: step6. invalid.
1699 */
1700 query = Query::Select().Or().From(tableName_);
1701 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1702 CheckCloudTableCount(tableName_, 0);
1703
1704 /**
1705 * @tc.steps:step7. query begingroup from table then priority sync.
1706 * @tc.expected: step7 invalid.
1707 */
1708 query = Query::Select().BeginGroup().From(tableName_);
1709 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1710 CheckCloudTableCount(tableName_, 0);
1711
1712 /**
1713 * @tc.steps:step8. query endgroup from table then priority sync.
1714 * @tc.expected: step8 invalid.
1715 */
1716 query = Query::Select().EndGroup().From(tableName_);
1717 BlockPrioritySync(query, delegate_, true, NOT_SUPPORT);
1718 CheckCloudTableCount(tableName_, 0);
1719
1720 /**
1721 * @tc.steps:step9. query and in then priority sync.
1722 * @tc.expected: step9. invalid.
1723 */
1724 std::vector<std::string> idValue = {"0"};
1725 query = Query::Select().From(tableName_).And().In("id", idValue);
1726 BlockPrioritySync(query, delegate_, true, INVALID_ARGS);
1727 CheckCloudTableCount(tableName_, 0);
1728
1729 /**
1730 * @tc.steps:step10. query when the table name does not exit then priority sync.
1731 * @tc.expected: step10. schema mismatch.
1732 */
1733 query = Query::Select().From("tableName").And().In("id", idValue);
1734 BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
1735 CheckCloudTableCount(tableName_, 0);
1736
1737 /**
1738 * @tc.steps:step11. query when the table name does not exit then priority sync.
1739 * @tc.expected: step11. schema mismatch.
1740 */
1741 query = Query::Select().From("tableName").In("id", idValue);
1742 BlockPrioritySync(query, delegate_, true, SCHEMA_MISMATCH);
1743 CheckCloudTableCount(tableName_, 0);
1744
1745 /**
1746 * @tc.steps:step12. query when the table name does not exit then sync.
1747 * @tc.expected: step12. schema mismatch.
1748 */
1749 query = Query::Select().FromTable({"tableName"});
1750 BlockPrioritySync(query, delegate_, false, SCHEMA_MISMATCH);
1751 CheckCloudTableCount(tableName_, 0);
1752 }
1753
1754 /**
1755 * @tc.name: CloudPrioritySyncTest016
1756 * @tc.desc: priority sync when normal syncing
1757 * @tc.type: FUNC
1758 * @tc.require:
1759 * @tc.author: wangxiangdong
1760 */
1761 HWTEST_F(DistributedDBCloudCheckSyncTest, CloudPrioritySyncTest016, TestSize.Level0)
1762 {
1763 /**
1764 * @tc.steps:step1. insert cloud table record.
1765 * @tc.expected: step1. ok.
1766 */
1767 const int actualCount = 60; // 60 is count of records
1768 InsertCloudTableRecord(0, actualCount, 0, false);
1769 InsertUserTableRecord(tableName_, 10);
1770
1771 /**
1772 * @tc.steps:step2. begin normal sync and priority sync.
1773 * @tc.expected: step2. ok.
1774 */
1775 Query normalQuery = Query::Select().FromTable({tableName_});
1776 std::vector<std::string> idValue = {"0", "1", "2"};
1777 Query priorityQuery = Query::Select().From(tableName_).In("id", idValue);
1778 std::vector<std::map<std::string, SyncProcess>> prioritySyncProcess;
1779 PriorityAndNormalSync(normalQuery, priorityQuery, delegate_, prioritySyncProcess, false);
1780 virtualCloudDb_->Reset();
1781 CheckCloudTableCount(tableName_, 60); // 10 is count of cloud records
1782 /**
1783 * @tc.steps:step3. check sync process result.
1784 * @tc.expected: step3. ok.
1785 */
1786 std::vector<DistributedDB::SyncProcess> expectSyncResult = {
1787 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 60, 60, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
1788 {PROCESSING, OK, {{tableName_, {PROCESSING, {1, 3, 3, 0, 0, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
1789 {FINISHED, OK, {{tableName_, {FINISHED, {1, 3, 3, 0, 0, 0, 0}, {1, 3, 3, 0, 0, 3, 0}}}}},
1790 {PROCESSING, OK, {{tableName_, {PROCESSING, {2, 63, 63, 0, 50, 0, 0}, {0, 0, 0, 0, 0, 0, 0}}}}},
1791 {FINISHED, OK, {{tableName_, {FINISHED, {2, 63, 63, 0, 50, 0, 0}, {1, 7, 7, 0, 0, 7, 0}}}}}
1792 };
1793 EXPECT_EQ(CheckSyncProcess(prioritySyncProcess, expectSyncResult), true);
1794 }
1795
1796 /**
1797 * @tc.name: LogicDeleteSyncTest001
1798 * @tc.desc: sync with logic delete
1799 * @tc.type: FUNC
1800 * @tc.require:
1801 * @tc.author: zhangqiquan
1802 */
1803 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest001, TestSize.Level0)
1804 {
1805 bool logicDelete = true;
1806 auto data = static_cast<PragmaData>(&logicDelete);
1807 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1808 int actualCount = 10;
1809 InitLogicDeleteDataEnv(actualCount, true);
1810 CheckLocalCount(actualCount);
1811 std::string device = "";
1812 ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_AND_DATA), DBStatus::OK);
1813 CheckLocalCount(actualCount);
1814 }
1815
1816 /**
1817 * @tc.name: LogicDeleteSyncTest002
1818 * @tc.desc: sync without logic delete
1819 * @tc.type: FUNC
1820 * @tc.require:
1821 * @tc.author: zhangqiquan
1822 */
1823 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest002, TestSize.Level0)
1824 {
1825 bool logicDelete = false;
1826 auto data = static_cast<PragmaData>(&logicDelete);
1827 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1828 int actualCount = 10;
1829 InitLogicDeleteDataEnv(actualCount);
1830 CheckLocalCount(0);
1831 }
1832
1833 /**
1834 * @tc.name: LogicDeleteSyncTest003
1835 * @tc.desc: sync with logic delete and check observer
1836 * @tc.type: FUNC
1837 * @tc.require:
1838 * @tc.author: bty
1839 */
1840 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest003, TestSize.Level0)
1841 {
1842 /**
1843 * @tc.steps:step1. register observer.
1844 * @tc.expected: step1. ok.
1845 */
1846 RelationalStoreDelegate::Option option;
1847 auto observer = new (std::nothrow) RelationalStoreObserverUnitTest();
1848 ASSERT_NE(observer, nullptr);
1849 observer->SetCallbackDetailsType(static_cast<uint32_t>(CallbackDetailsType::DETAILED));
1850 EXPECT_EQ(delegate_->RegisterObserver(observer), OK);
1851 ChangedData expectData;
1852 expectData.tableName = tableName_;
1853 expectData.type = ChangedDataType::DATA;
1854 expectData.field.push_back(std::string("id"));
1855 const int count = 10;
1856 for (int64_t i = 0; i < count; ++i) {
1857 expectData.primaryData[ChangeType::OP_DELETE].push_back({std::to_string(i)});
1858 }
1859 expectData.properties = { .isTrackedDataChange = true };
1860 observer->SetExpectedResult(expectData);
1861
1862 /**
1863 * @tc.steps:step2. set tracker table
1864 * @tc.expected: step2. ok.
1865 */
1866 TrackerSchema trackerSchema;
1867 trackerSchema.tableName = tableName_;
1868 trackerSchema.trackerColNames = { "id" };
1869 EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
1870
1871 /**
1872 * @tc.steps:step3. set logic delete and sync
1873 * @tc.expected: step3. ok.
1874 */
1875 bool logicDelete = true;
1876 auto data = static_cast<PragmaData>(&logicDelete);
1877 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1878 int actualCount = 10;
1879 InitLogicDeleteDataEnv(actualCount);
1880 CheckLocalCount(actualCount);
1881 EXPECT_EQ(observer->IsAllChangedDataEq(), true);
1882 observer->ClearChangedData();
1883
1884 /**
1885 * @tc.steps:step4. unSetTrackerTable and sync
1886 * @tc.expected: step4. ok.
1887 */
1888 expectData.properties = { .isTrackedDataChange = false };
1889 observer->SetExpectedResult(expectData);
1890 trackerSchema.trackerColNames = {};
1891 EXPECT_EQ(delegate_->SetTrackerTable(trackerSchema), OK);
1892 InsertUserTableRecord(tableName_, actualCount);
1893 BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
1894 for (int i = 0; i < actualCount + actualCount; ++i) {
1895 DeleteCloudTableRecord(i);
1896 }
1897 BlockSync(Query::Select().FromTable({ tableName_ }), delegate_, g_actualDBStatus);
1898 EXPECT_EQ(observer->IsAllChangedDataEq(), true);
1899
1900 EXPECT_EQ(delegate_->UnRegisterObserver(observer), OK);
1901 delete observer;
1902 observer = nullptr;
1903 }
1904
1905 /**
1906 * @tc.name: LogicDeleteSyncTest004
1907 * @tc.desc: test removedevicedata in mode FLAG_ONLY when sync with logic delete
1908 * @tc.type: FUNC
1909 * @tc.require:
1910 * @tc.author: chenchaohao
1911 */
1912 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest004, TestSize.Level0)
1913 {
1914 /**
1915 * @tc.steps:step1. set logic delete
1916 * @tc.expected: step1. ok.
1917 */
1918 bool logicDelete = true;
1919 auto data = static_cast<PragmaData>(&logicDelete);
1920 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1921
1922 /**
1923 * @tc.steps:step2. cloud delete data then sync, check removedevicedata
1924 * @tc.expected: step2. ok.
1925 */
1926 int actualCount = 10;
1927 InitLogicDeleteDataEnv(actualCount);
1928 CheckLocalCount(actualCount);
1929 std::string device = "";
1930 ASSERT_EQ(delegate_->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
1931 CheckLocalCount(actualCount);
1932 CheckLogCleaned(0);
1933 }
1934
1935 /**
1936 * @tc.name: LogicDeleteSyncTest005
1937 * @tc.desc: test pragma when set cmd is not logic delete
1938 * @tc.type: FUNC
1939 * @tc.require:
1940 * @tc.author: chenchaohao
1941 */
1942 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest005, TestSize.Level0)
1943 {
1944 /**
1945 * @tc.steps:step1. set cmd is auto sync
1946 * @tc.expected: step1. ok.
1947 */
1948 bool logicDelete = true;
1949 auto data = static_cast<PragmaData>(&logicDelete);
1950 EXPECT_EQ(delegate_->Pragma(AUTO_SYNC, data), DBStatus::NOT_SUPPORT);
1951 }
1952
1953 /**
1954 * @tc.name: LogicDeleteSyncTest006
1955 * @tc.desc: sync with logic delete after lock table.
1956 * @tc.type: FUNC
1957 * @tc.require:
1958 * @tc.author: liaoyonghuang
1959 */
1960 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest006, TestSize.Level0)
1961 {
1962 /**
1963 * @tc.steps:step1. set logic delete
1964 * @tc.expected: step1. ok.
1965 */
1966 bool logicDelete = true;
1967 auto data = static_cast<PragmaData>(&logicDelete);
1968 delegate_->Pragma(LOGIC_DELETE_SYNC_DATA, data);
1969
1970 /**
1971 * @tc.steps:step2. insert user table record and sync.
1972 * @tc.expected: step2. ok.
1973 */
1974 int dataCount = 10;
1975 InsertUserTableRecord(tableName_, dataCount);
1976 Query query = Query::Select().FromTable({ tableName_ });
1977 BlockSync(query, delegate_, g_actualDBStatus);
1978
1979 /**
1980 * @tc.steps:step3. Lock log table, and delete data from cloud table.
1981 * @tc.expected: step3. ok.
1982 */
1983 std::vector<std::vector<uint8_t>> hashKey;
1984 CloudDBSyncUtilsTest::GetHashKey(tableName_, " 1=1 ", db_, hashKey);
1985 Lock(tableName_, hashKey, db_);
1986 for (int i = 0; i < dataCount; ++i) {
1987 DeleteCloudTableRecord(i);
1988 }
1989 /**
1990 * @tc.steps:step4. sync.
1991 * @tc.expected: step4. ok.
1992 */
1993 std::vector<DBStatus> actualDBStatus;
1994 BlockSync(query, delegate_, actualDBStatus);
1995 for (auto status : actualDBStatus) {
1996 EXPECT_EQ(status, OK);
1997 }
1998 }
1999
2000 /**
2001 * @tc.name: LogicDeleteSyncTest008
2002 * @tc.desc: Test sync when data with flag 0x800 locally but there is updated data on the cloud.
2003 * @tc.type: FUNC
2004 * @tc.require:
2005 * @tc.author: liaoyonghuang
2006 */
2007 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicDeleteSyncTest008, TestSize.Level0)
2008 {
2009 /**
2010 * @tc.steps:step1. Insert user table record with flag 0x800. Insert cloud table record.
2011 * @tc.expected: step1. ok.
2012 */
2013 int dataCount = 10;
2014 uint32_t logicDeleteCount = 4;
2015 InsertUserTableRecord(tableName_, dataCount);
2016 std::string sql = "update " + DBCommon::GetLogTableName(tableName_) +
2017 " set flag = flag | 0x800 where data_key <= " + std::to_string(logicDeleteCount);
2018 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2019 InsertCloudTableRecord(0, dataCount, 0, false);
2020 sql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) + " where flag & 0x800=0x800";
2021 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2022 reinterpret_cast<void *>(logicDeleteCount), nullptr), SQLITE_OK);
2023 /**
2024 * @tc.steps:step2. Do sync.
2025 * @tc.expected: step2. ok.
2026 */
2027 Query query = Query::Select().FromTable({ tableName_ });
2028 BlockSync(query, delegate_, g_actualDBStatus);
2029 /**
2030 * @tc.steps:step3. Check data flag in local DB.
2031 * @tc.expected: step3. No data flag is 0x800.
2032 */
2033 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), QueryCountCallback,
2034 reinterpret_cast<void *>(0), nullptr), SQLITE_OK);
2035 }
2036
2037 /**
2038 * @tc.name: LockActionTest001
2039 * @tc.desc: InitCompensatedSyncTaskInfo and check lockAction.
2040 * @tc.type: FUNC
2041 * @tc.require:
2042 * @tc.author: wangxiangdong
2043 */
2044 HWTEST_F(DistributedDBCloudCheckSyncTest, LockActionTest001, TestSize.Level0)
2045 {
2046 /**
2047 * @tc.steps:step1. InitCompensatedSyncTaskInfo and check.
2048 * @tc.expected: step1. ok.
2049 */
2050 CloudSyncOption option;
2051 option.devices = { "CLOUD" };
2052 option.mode = SYNC_MODE_CLOUD_MERGE;
2053 option.query = Query::Select().FromTable({ tableName_ });
2054 option.waitTime = g_syncWaitTime;
2055 auto action = static_cast<uint32_t>(LockAction::INSERT) | static_cast<uint32_t>(LockAction::UPDATE)
2056 | static_cast<uint32_t>(LockAction::DELETE);
2057 option.lockAction = static_cast<LockAction>(action);
2058 option.priorityTask = true;
2059 option.compensatedSyncOnly = true;
2060 const SyncProcessCallback onProcess;
2061 CloudSyncer::CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(option, onProcess);
2062 EXPECT_EQ(taskInfo.lockAction, option.lockAction);
2063 }
2064
2065 /**
2066 * @tc.name: LogicCreateRepeatedTableNameTest001
2067 * @tc.desc: test create repeated table name with different cases
2068 * @tc.type: FUNC
2069 * @tc.require:DTS2023120705927
2070 * @tc.author: wangxiangdong
2071 */
2072 HWTEST_F(DistributedDBCloudCheckSyncTest, LogicCreateRepeatedTableNameTest001, TestSize.Level0)
2073 {
2074 /**
2075 * @tc.steps:step1. CreateDistributedTable with same name but different cases.
2076 * @tc.expected: step1. operate successfully.
2077 */
2078 DBStatus createStatus = delegate_->CreateDistributedTable(lowerTableName_, CLOUD_COOPERATION);
2079 ASSERT_EQ(createStatus, DBStatus::OK);
2080 }
2081
2082 /**
2083 * @tc.name: SaveCursorTest001
2084 * @tc.desc: test whether cloud cursor is saved when first sync
2085 * @tc.type: FUNC
2086 * @tc.require:
2087 * @tc.author: chenchaohao
2088 */
2089 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest001, TestSize.Level0)
2090 {
2091 /**
2092 * @tc.steps:step1. insert cloud records
2093 * @tc.expected: step1. OK
2094 */
2095 const int actualCount = 10;
2096 InsertCloudTableRecord(0, actualCount, 0, false);
2097
2098 /**
2099 * @tc.steps:step2. check cursor when first sync
2100 * @tc.expected: step2. OK
2101 */
__anonc8f560be2002(const std::string &tableName, VBucket &extend) 2102 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2103 EXPECT_EQ(tableName_, tableName);
2104 auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2105 EXPECT_EQ(cursor, "0");
2106 });
2107 Query query = Query::Select().FromTable({ tableName_ });
2108 BlockSync(query, delegate_, g_actualDBStatus);
2109 CheckLocalCount(actualCount);
2110 }
2111
2112 /**
2113 * @tc.name: SaveCursorTest002
2114 * @tc.desc: test whether cloud cursor is saved when first download failed
2115 * @tc.type: FUNC
2116 * @tc.require:
2117 * @tc.author: chenchaohao
2118 */
2119 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest002, TestSize.Level0)
2120 {
2121 /**
2122 * @tc.steps:step1. insert cloud records
2123 * @tc.expected: step1. OK
2124 */
2125 const int actualCount = 10;
2126 InsertCloudTableRecord(0, actualCount, 0, false);
2127
2128 /**
2129 * @tc.steps:step2. set download failed
2130 * @tc.expected: step2. OK
2131 */
2132 virtualCloudDb_->SetCloudError(true);
2133 Query query = Query::Select().FromTable({ tableName_ });
2134 BlockPrioritySync(query, delegate_, false, OK);
2135 CheckLocalCount(0);
2136
2137 /**
2138 * @tc.steps:step3. check cursor when query
2139 * @tc.expected: step3. OK
2140 */
2141 virtualCloudDb_->SetCloudError(false);
__anonc8f560be2102(const std::string &tableName, VBucket &extend) 2142 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2143 EXPECT_EQ(tableName_, tableName);
2144 auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2145 EXPECT_EQ(cursor, "0");
2146 });
2147 BlockSync(query, delegate_, g_actualDBStatus);
2148 CheckLocalCount(actualCount);
2149 }
2150
2151 /**
2152 * @tc.name: SaveCursorTest003
2153 * @tc.desc: test whether cloud cursor is saved when first upload failed
2154 * @tc.type: FUNC
2155 * @tc.require:
2156 * @tc.author: chenchaohao
2157 */
2158 HWTEST_F(DistributedDBCloudCheckSyncTest, SaveCursorTest003, TestSize.Level0)
2159 {
2160 /**
2161 * @tc.steps:step1. insert local records
2162 * @tc.expected: step1. OK
2163 */
2164 const int actualCount = 10;
2165 InsertUserTableRecord(tableName_, actualCount);
2166
2167 /**
2168 * @tc.steps:step2. set upload failed
2169 * @tc.expected: step2. OK
2170 */
2171 virtualCloudDb_->SetCloudError(true);
2172 Query query = Query::Select().FromTable({ tableName_ });
2173 BlockPrioritySync(query, delegate_, false, OK);
2174 CheckCloudTableCount(tableName_, 0);
2175
2176 /**
2177 * @tc.steps:step3. check cursor when query
2178 * @tc.expected: step3. OK
2179 */
2180 virtualCloudDb_->SetCloudError(false);
__anonc8f560be2202(const std::string &tableName, VBucket &extend) 2181 virtualCloudDb_->ForkQuery([this](const std::string &tableName, VBucket &extend) {
2182 EXPECT_EQ(tableName_, tableName);
2183 auto cursor = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
2184 EXPECT_EQ(cursor, "0");
2185 });
2186 BlockSync(query, delegate_, g_actualDBStatus);
2187 CheckCloudTableCount(tableName_, actualCount);
2188 }
2189
2190 /**
2191 * @tc.name: RangeQuerySyncTest001
2192 * @tc.desc: Test sync that has option parameter with range query.
2193 * @tc.type: FUNC
2194 * @tc.require:
2195 * @tc.author: chenchaohao
2196 */
2197 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest001, TestSize.Level0)
2198 {
2199 /**
2200 * @tc.steps:step1. insert user table record.
2201 * @tc.expected: step1. ok.
2202 */
2203 CloudSyncOption option;
2204 option.devices = { "CLOUD" };
2205 option.mode = SYNC_MODE_CLOUD_MERGE;
2206 option.waitTime = g_syncWaitTime;
2207 Query query = Query::Select().From(tableName_).Range({}, {});
2208 option.query = query;
2209
2210 /**
2211 * @tc.steps:step2. test normal sync with range query.
2212 * @tc.expected: step2. not support.
2213 */
2214 option.priorityTask = false;
2215 ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2216
2217 /**
2218 * @tc.steps:step3. test Priority sync with range query.
2219 * @tc.expected: step3. not support.
2220 */
2221 option.priorityTask = true;
2222 ASSERT_EQ(delegate_->Sync(option, nullptr), NOT_SUPPORT);
2223 }
2224
2225 /*
2226 * @tc.name: RangeQuerySyncTest002
2227 * @tc.desc: Test sync that has not option parameter with range query.
2228 * @tc.type: FUNC
2229 * @tc.require:
2230 * @tc.author: mazhao
2231 */
2232 HWTEST_F(DistributedDBCloudCheckSyncTest, RangeQuerySyncTest002, TestSize.Level1)
2233 {
2234 Query query = Query::Select().FromTable({ tableName_ }).Range({}, {});
2235 ASSERT_EQ(delegate_->Sync({"CLOUD"}, SYNC_MODE_CLOUD_FORCE_PULL, query, nullptr, g_syncWaitTime),
2236 DBStatus::NOT_SUPPORT);
2237 }
2238
2239 /*
2240 * @tc.name: SameDataSync001
2241 * @tc.desc: Test query same data in one batch.
2242 * @tc.type: FUNC
2243 * @tc.require:
2244 * @tc.author: zqq
2245 */
2246 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync001, TestSize.Level0)
2247 {
2248 /**
2249 * @tc.steps:step1. insert cloud records, cloud has two batch id:0-4
2250 * @tc.expected: step1. OK
2251 */
2252 const int actualCount = 5;
2253 InsertCloudTableRecord(0, actualCount, 0, false);
2254 InsertCloudTableRecord(0, actualCount, 0, false);
2255 /**
2256 * @tc.steps:step2. call sync, local has one batch id:0-4
2257 * @tc.expected: step2. OK
2258 */
2259 Query query = Query::Select().FromTable({ tableName_ });
2260 BlockSync(query, delegate_, g_actualDBStatus);
2261 CheckLocalCount(actualCount);
2262 }
2263
2264 /*
2265 * @tc.name: SameDataSync002
2266 * @tc.desc: Test sync when there are two data with the same primary key on the cloud.
2267 * @tc.type: FUNC
2268 * @tc.require:
2269 * @tc.author: liaoyonghuang
2270 */
2271 HWTEST_F(DistributedDBCloudCheckSyncTest, SameDataSync002, TestSize.Level1)
2272 {
2273 /**
2274 * @tc.steps:step1. insert local 1 record and sync to cloud.
2275 * @tc.expected: step1. OK
2276 */
2277 const int actualCount = 1;
2278 InsertUserTableRecord(tableName_, actualCount);
2279 Query query = Query::Select().FromTable({ tableName_ });
2280 BlockSync(query, delegate_, g_actualDBStatus);
2281
2282 /**
2283 * @tc.steps:step2. insert 2 records with the same primary key.
2284 * @tc.expected: step2. OK
2285 */
2286 std::vector<VBucket> record;
2287 std::vector<VBucket> extend;
2288 Timestamp now = TimeHelper::GetSysCurrentTime();
2289 VBucket data;
2290 std::vector<uint8_t> photo(0, 'v');
2291 data.insert_or_assign("id", std::string("0"));
2292 data.insert_or_assign("name", std::string("Cloud"));
2293 data.insert_or_assign("height", 166.0); // 166.0 is random double value
2294 data.insert_or_assign("married", false);
2295 data.insert_or_assign("photo", photo);
2296 data.insert_or_assign("age", static_cast<int64_t>(13L)); // 13 is random age
2297 record.push_back(data);
2298 data.insert_or_assign("age", static_cast<int64_t>(14L)); // 14 is random age
2299 record.push_back(data);
2300 VBucket log;
2301 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, static_cast<int64_t>(
2302 now / CloudDbConstant::TEN_THOUSAND));
2303 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, static_cast<int64_t>(
2304 now / CloudDbConstant::TEN_THOUSAND));
2305 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
2306 log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("1"));
2307 extend.push_back(log);
2308 log.insert_or_assign(CloudDbConstant::VERSION_FIELD, std::string("2"));
2309 extend.push_back(log);
2310 ASSERT_EQ(virtualCloudDb_->BatchInsert(tableName_, std::move(record), extend), DBStatus::OK);
2311
2312 /**
2313 * @tc.steps:step3. sync from cloud and check record.
2314 * @tc.expected: step3. The record with age of 14 has been updated locally.
2315 */
2316 BlockSync(query, delegate_, g_actualDBStatus);
2317 std::string sql = "SELECT age FROM " + tableName_ + " where id=0;";
2318 int64_t actualAge = 0;
2319 int64_t expectAge = 14L;
__anonc8f560be2302(sqlite3_stmt *stmt) 2320 RelationalTestUtils::ExecSql(db_, sql, nullptr, [&actualAge](sqlite3_stmt *stmt) {
2321 actualAge = sqlite3_column_int(stmt, 0);
2322 return E_OK;
2323 });
2324 EXPECT_EQ(actualAge, expectAge);
2325 }
2326
2327 /*
2328 * @tc.name: CreateDistributedTable001
2329 * @tc.desc: Test create distributed table when table not empty.
2330 * @tc.type: FUNC
2331 * @tc.require:
2332 * @tc.author: zqq
2333 */
2334 HWTEST_F(DistributedDBCloudCheckSyncTest, CreateDistributedTable001, TestSize.Level0)
2335 {
2336 const std::string table = "CreateDistributedTable001";
2337 const std::string createSQL =
2338 "CREATE TABLE IF NOT EXISTS " + table + "(" \
2339 "id TEXT PRIMARY KEY," \
2340 "name TEXT," \
2341 "height REAL ," \
2342 "photo BLOB," \
2343 "age INT);";
2344 ASSERT_EQ(RelationalTestUtils::ExecSql(db_, createSQL), SQLITE_OK);
2345 int actualCount = 10;
2346 InsertUserTableRecord(table, actualCount);
2347 InsertCloudTableRecord(table, 0, actualCount, 0, true);
2348 ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
2349 DataBaseSchema dataBaseSchema = GetSchema();
2350 TableSchema schema = dataBaseSchema.tables.at(0);
2351 schema.name = table;
2352 schema.sharedTableName = "";
2353 dataBaseSchema.tables.push_back(schema);
2354 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
2355 /**
2356 * @tc.steps:step2. call sync, local has one batch id:0-4
2357 * @tc.expected: step2. OK
2358 */
2359 Query query = Query::Select().FromTable({ table });
2360 BlockSync(query, delegate_, g_actualDBStatus);
2361 CheckCloudTableCount(table, actualCount);
2362 }
2363
2364 /*
2365 * @tc.name: CloseDbTest001
2366 * @tc.desc: Test process of db close during sync
2367 * @tc.type: FUNC
2368 * @tc.require:
2369 * @tc.author: bty
2370 */
2371 HWTEST_F(DistributedDBCloudCheckSyncTest, CloseDbTest001, TestSize.Level1)
2372 {
2373 /**
2374 * @tc.steps:step1. insert user table record.
2375 * @tc.expected: step1. ok.
2376 */
2377 const int actualCount = 10; // 10 is count of records
2378 InsertUserTableRecord(tableName_, actualCount);
2379
2380 /**
2381 * @tc.steps:step2. wait for 2 seconds during the query to close the database.
2382 * @tc.expected: step2. ok.
2383 */
2384 std::mutex callMutex;
2385 int callCount = 0;
__anonc8f560be2402(const std::string &, VBucket &) 2386 virtualCloudDb_->ForkQuery([](const std::string &, VBucket &) {
2387 std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2388 });
2389 const auto callback = [&callCount, &callMutex](
__anonc8f560be2502( const std::map<std::string, SyncProcess> &) 2390 const std::map<std::string, SyncProcess> &) {
2391 {
2392 std::lock_guard<std::mutex> autoLock(callMutex);
2393 callCount++;
2394 }
2395 };
2396 Query query = Query::Select().FromTable({ tableName_ });
2397 ASSERT_EQ(delegate_->Sync({ "CLOUD" }, SYNC_MODE_CLOUD_MERGE, query, callback, g_syncWaitTime), OK);
2398 std::this_thread::sleep_for(std::chrono::seconds(1)); // block notify 1s
2399 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
2400 delegate_ = nullptr;
2401 mgr_ = nullptr;
2402
2403 /**
2404 * @tc.steps:step3. wait for 2 seconds to check the process call count.
2405 * @tc.expected: step3. ok.
2406 */
2407 std::this_thread::sleep_for(std::chrono::seconds(2)); // block notify 2s
2408 EXPECT_EQ(callCount, 0L);
2409 }
2410
2411 /*
2412 * @tc.name: ConsistentFlagTest001
2413 * @tc.desc: Test the consistency flag of no asset table
2414 * @tc.type: FUNC
2415 * @tc.require:
2416 * @tc.author: bty
2417 */
2418 HWTEST_F(DistributedDBCloudCheckSyncTest, ConsistentFlagTest001, TestSize.Level1)
2419 {
2420 /**
2421 * @tc.steps:step1. init data and sync
2422 * @tc.expected: step1. ok.
2423 */
2424 const int localCount = 20; // 20 is count of local
2425 const int cloudCount = 10; // 10 is count of cloud
2426 InsertUserTableRecord(tableName_, localCount);
2427 InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2428 Query query = Query::Select().FromTable({ tableName_ });
2429 BlockSync(query, delegate_, g_actualDBStatus);
2430
2431 /**
2432 * @tc.steps:step2. check the 0x20 bit of flag after sync
2433 * @tc.expected: step2. ok.
2434 */
2435 std::string querySql = "select count(*) from " + DBCommon::GetLogTableName(tableName_) +
2436 " where flag&0x20=0;";
2437 EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2438 reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2439
2440 /**
2441 * @tc.steps:step3. delete local data and check
2442 * @tc.expected: step3. ok.
2443 */
2444 std::string sql = "delete from " + tableName_ + " where id = '1';";
2445 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2446 EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2447 reinterpret_cast<void *>(localCount - 1), nullptr), SQLITE_OK);
2448
2449 /**
2450 * @tc.steps:step4. check the 0x20 bit of flag after sync
2451 * @tc.expected: step4. ok.
2452 */
2453 BlockSync(query, delegate_, g_actualDBStatus);
2454 EXPECT_EQ(sqlite3_exec(db_, querySql.c_str(), QueryCountCallback,
2455 reinterpret_cast<void *>(localCount), nullptr), SQLITE_OK);
2456 }
2457
SyncDataStatusTest(bool isCompensatedSyncOnly)2458 void DistributedDBCloudCheckSyncTest::SyncDataStatusTest(bool isCompensatedSyncOnly)
2459 {
2460 /**
2461 * @tc.steps:step1. init data and sync
2462 * @tc.expected: step1. ok.
2463 */
2464 const int localCount = 20; // 20 is count of local
2465 const int cloudCount = 10; // 10 is count of cloud
2466 InsertUserTableRecord(tableName_, localCount);
2467 std::string sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 1 where data_key in (1,11);";
2468 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2469 sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 2 where data_key in (2,12);";
2470 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2471 sql = "update " + DBCommon::GetLogTableName(tableName_) + " SET status = 3 where data_key in (3,13);";
2472 EXPECT_EQ(RelationalTestUtils::ExecSql(db_, sql), E_OK);
2473 std::this_thread::sleep_for(std::chrono::milliseconds(1));
2474 InsertCloudTableRecord(tableName_, 0, cloudCount, 0, false);
2475 Query query = Query::Select().FromTable({tableName_});
2476
2477 /**
2478 * @tc.steps:step2. check count
2479 * @tc.expected: step2. ok.
2480 */
2481 int64_t syncCount = 2;
2482 BlockPrioritySync(query, delegate_, false, OK, isCompensatedSyncOnly);
2483 if (!isCompensatedSyncOnly) {
2484 std::this_thread::sleep_for(std::chrono::seconds(1)); // wait compensated sync finish
2485 }
2486 std::string preSql = "select count(*) from " + DBCommon::GetLogTableName(tableName_);
2487 std::string querySql = preSql + " where status=0 and data_key in (1,11) and cloud_gid !='';";
2488 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2489 if (isCompensatedSyncOnly) {
2490 querySql = preSql + " where status=2 and data_key in (2,12) and cloud_gid ='';";
2491 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2492 querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid ='';";
2493 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2494 querySql = preSql + " where status=0 and cloud_gid ='';";
2495 int unSyncCount = 14; // 14 is the num of unSync data with status 0
2496 CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
2497 } else {
2498 // gid 12、13 are upload insert, lock to lock_change
2499 querySql = preSql + " where status=3 and data_key in (2,12) and cloud_gid !='';";
2500 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2501 querySql = preSql + " where status=3 and data_key in (3,13) and cloud_gid !='';";
2502 CloudDBSyncUtilsTest::CheckCount(db_, querySql, syncCount);
2503 querySql = preSql + " where status=0 and cloud_gid !='';";
2504 int unSyncCount = 16; // 16 is the num of sync finish
2505 CloudDBSyncUtilsTest::CheckCount(db_, querySql, unSyncCount);
2506 }
2507 }
2508
2509 /*
2510 * @tc.name: SyncDataStatusTest001
2511 * @tc.desc: Test the status after compensated sync the no asset table
2512 * @tc.type: FUNC
2513 * @tc.require:
2514 * @tc.author: bty
2515 */
2516 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest001, TestSize.Level1)
2517 {
2518 SyncDataStatusTest(true);
2519 }
2520
2521 /*
2522 * @tc.name: SyncDataStatusTest002
2523 * @tc.desc: Test the status after normal sync the no asset table
2524 * @tc.type: FUNC
2525 * @tc.require:
2526 * @tc.author: bty
2527 */
2528 HWTEST_F(DistributedDBCloudCheckSyncTest, SyncDataStatusTest002, TestSize.Level1)
2529 {
2530 SyncDataStatusTest(false);
2531 }
2532 }
2533 #endif
2534