1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_client.h"
24 #include "relational_store_instance.h"
25 #include "relational_store_manager.h"
26 #include "runtime_config.h"
27 #include "sqlite_relational_store.h"
28 #include "sqlite_relational_utils.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 #include <gtest/gtest.h>
35 #include <iostream>
36 
37 using namespace testing::ext;
38 using namespace DistributedDB;
39 using namespace DistributedDBUnitTest;
40 using namespace std;
41 
42 namespace {
43 const string STORE_ID = "Relational_Store_Lock_Sync";
44 const string DB_SUFFIX = ".db";
45 const string ASSETS_TABLE_NAME = "student";
46 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
47 const string DEVICE_CLOUD = "cloud_dev";
48 const string COL_ID = "id";
49 const string COL_NAME = "name";
50 const string COL_HEIGHT = "height";
51 const string COL_ASSET = "asset";
52 const string COL_ASSETS = "assets";
53 const string COL_AGE = "age";
54 const int64_t WAIT_TIME = 5;
55 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
56     {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>}};
57 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
58     " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS" + ");";
59 const Asset ASSET_COPY = {.version = 1,
60     .name = "Phone",
61     .assetId = "0",
62     .subpath = "/local/sync",
63     .uri = "/local/sync",
64     .modifyTime = "123456",
65     .createTime = "",
66     .size = "256",
67     .hash = "ASE"};
68 const Assets ASSETS_COPY1 = { ASSET_COPY };
69 const string ASSET_SUFFIX = "_copy";
70 
71 string g_storePath;
72 string g_testDir;
73 RelationalStoreObserverUnitTest *g_observer = nullptr;
74 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
75 RelationalStoreDelegate *g_delegate = nullptr;
76 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
77 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
78 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
79 SyncProcess g_syncProcess;
80 std::condition_variable g_processCondition;
81 std::mutex g_processMutex;
82 IRelationalStore *g_store = nullptr;
83 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
84 int64_t g_nameId;
85 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
86 
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)87 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
88 {
89     TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
90         .fields = CLOUD_FIELDS};
91     dataBaseSchema.tables.push_back(assetsTableSchema);
92 }
93 
CloseDb()94 void CloseDb()
95 {
96     delete g_observer;
97     g_virtualCloudDb = nullptr;
98     if (g_delegate != nullptr) {
99         EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
100         g_delegate = nullptr;
101     }
102 }
103 
104 class DistributedDBCloudSyncerLockTest : public testing::Test {
105 public:
106     static void SetUpTestCase(void);
107     static void TearDownTestCase(void);
108     void SetUp();
109     void TearDown();
110 
111 protected:
112     void Init();
113     const RelationalSyncAbleStorage *GetRelationalStore();
114     void InsertLocalData(int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true);
115     void GenerateDataRecords(
116         int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend);
117     void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName);
118     void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
119         const std::string &tableName);
120     void DeleteCloudDBData(int64_t beginGid, int64_t count, const std::string &tableName);
121     void CallSync(const CloudSyncOption &option, DBStatus expectResult = OK);
122 
123     void TestConflictSync001(bool isUpdate);
124     void CheckAssetStatusNormal();
125     void UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version);
126     void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
127     sqlite3 *db = nullptr;
128     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
129 };
130 
SetUpTestCase(void)131 void DistributedDBCloudSyncerLockTest::SetUpTestCase(void)
132 {
133     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
134     g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
135     LOGI("The test db is:%s", g_storePath.c_str());
136     g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
137     RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
138 }
139 
TearDownTestCase(void)140 void DistributedDBCloudSyncerLockTest::TearDownTestCase(void) {}
141 
SetUp(void)142 void DistributedDBCloudSyncerLockTest::SetUp(void)
143 {
144     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
145         LOGE("rm test db files error.");
146     }
147     DistributedDBToolsUnitTest::PrintTestCaseInfo();
148     LOGD("Test dir is %s", g_testDir.c_str());
149     Init();
150     g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
151     ASSERT_NE(g_cloudStoreHook, nullptr);
152     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
153     ASSERT_TRUE(communicatorAggregator_ != nullptr);
154     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
155 }
156 
TearDown(void)157 void DistributedDBCloudSyncerLockTest::TearDown(void)
158 {
159     RefObject::DecObjRef(g_store);
160     g_virtualCloudDb->ForkUpload(nullptr);
161     CloseDb();
162     EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
163     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
164         LOGE("rm test db files error.");
165     }
166     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
167     communicatorAggregator_ = nullptr;
168     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
169 }
170 
Init()171 void DistributedDBCloudSyncerLockTest::Init()
172 {
173     db = RelationalTestUtils::CreateDataBase(g_storePath);
174     ASSERT_NE(db, nullptr);
175     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
176     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
177     g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
178     ASSERT_NE(g_observer, nullptr);
179     ASSERT_EQ(g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer},
180         g_delegate), DBStatus::OK);
181     ASSERT_NE(g_delegate, nullptr);
182     ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
183     g_virtualCloudDb = make_shared<VirtualCloudDb>();
184     g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
185     g_syncProcess = {};
186     ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
187     ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
188     DataBaseSchema dataBaseSchema;
189     GetCloudDbSchema(dataBaseSchema);
190     ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
191     g_nameId = 0;
192 }
193 
GetRelationalStore()194 const RelationalSyncAbleStorage* DistributedDBCloudSyncerLockTest::GetRelationalStore()
195 {
196     RelationalDBProperties properties;
197     CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
198     int errCode = E_OK;
199     g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
200     if (g_store == nullptr) {
201         return nullptr;
202     }
203     return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
204 }
205 
206 
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)207 void DistributedDBCloudSyncerLockTest::GenerateDataRecords(
208     int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
209 {
210     for (int64_t i = begin; i < begin + count; i++) {
211         Assets assets;
212         Asset asset = ASSET_COPY;
213         asset.name = ASSET_COPY.name + std::to_string(i);
214         assets.emplace_back(asset);
215         VBucket data;
216         data.insert_or_assign(COL_ASSET, asset);
217         asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
218         assets.emplace_back(asset);
219         data.insert_or_assign(COL_ID, i);
220         data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
221         data.insert_or_assign(COL_ASSETS, assets);
222         record.push_back(data);
223 
224         VBucket log;
225         Timestamp now = TimeHelper::GetSysCurrentTime();
226         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
227         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
228         log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
229         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
230         extend.push_back(log);
231     }
232 }
233 
InsertLocalData(int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull)234 void DistributedDBCloudSyncerLockTest::InsertLocalData(int64_t begin, int64_t count,
235     const std::string &tableName, bool isAssetNull)
236 {
237     int errCode;
238     std::vector<VBucket> record;
239     std::vector<VBucket> extend;
240     GenerateDataRecords(begin, count, 0, record, extend);
241     const string sql = "insert or replace into " + tableName + " values (?,?,?,?);";
242     for (VBucket vBucket : record) {
243         sqlite3_stmt *stmt = nullptr;
244         ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
245         ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
246         ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
247         if (isAssetNull) {
248             ASSERT_EQ(sqlite3_bind_null(stmt, 3), SQLITE_OK); // 3 is asset
249         } else {
250             std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
251             ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetBlob, false), E_OK); // 3 is asset
252         }
253         std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
254             std::get<Assets>(vBucket[COL_ASSETS]));
255         ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetsBlob, false), E_OK); // 4 is assets
256         EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
257         SQLiteUtils::ResetStatement(stmt, true, errCode);
258     }
259 }
260 
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)261 void DistributedDBCloudSyncerLockTest::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
262     const std::string &tableName)
263 {
264     std::this_thread::sleep_for(std::chrono::milliseconds(1));
265     std::vector<VBucket> record;
266     std::vector<VBucket> extend;
267     GenerateDataRecords(begin, count, gidStart, record, extend);
268     ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
269     std::this_thread::sleep_for(std::chrono::milliseconds(1));
270 }
271 
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)272 void DistributedDBCloudSyncerLockTest::UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
273     int64_t versionStart, const std::string &tableName)
274 {
275     std::this_thread::sleep_for(std::chrono::milliseconds(1));
276     std::vector<VBucket> record;
277     std::vector<VBucket> extend;
278     GenerateDataRecords(begin, count, gidStart, record, extend);
279     for (auto &entry: extend) {
280         entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
281     }
282     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
283     std::this_thread::sleep_for(std::chrono::milliseconds(1));
284 }
285 
DeleteCloudDBData(int64_t beginGid,int64_t count,const std::string & tableName)286 void DistributedDBCloudSyncerLockTest::DeleteCloudDBData(int64_t beginGid, int64_t count,
287     const std::string &tableName)
288 {
289     Timestamp now = TimeHelper::GetSysCurrentTime();
290     std::vector<VBucket> extend;
291     for (int64_t i = 0; i < count; ++i) {
292         VBucket log;
293         log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
294         log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
295         log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(beginGid + i));
296         extend.push_back(log);
297     }
298     ASSERT_EQ(g_virtualCloudDb->BatchDelete(tableName, extend), DBStatus::OK);
299     std::this_thread::sleep_for(std::chrono::milliseconds(count));
300 }
301 
PrepareOption(const Query & query,LockAction action,bool isPriorityTask=false,bool isCompensatedSyncOnly=false)302 CloudSyncOption PrepareOption(const Query &query, LockAction action, bool isPriorityTask = false,
303     bool isCompensatedSyncOnly = false)
304 {
305     CloudSyncOption option;
306     option.devices = { "CLOUD" };
307     option.mode = SYNC_MODE_CLOUD_MERGE;
308     option.query = query;
309     option.waitTime = WAIT_TIME;
310     option.priorityTask = isPriorityTask;
311     option.compensatedSyncOnly = isCompensatedSyncOnly;
312     option.lockAction = action;
313     return option;
314 }
315 
CallSync(const CloudSyncOption & option,DBStatus expectResult)316 void DistributedDBCloudSyncerLockTest::CallSync(const CloudSyncOption &option, DBStatus expectResult)
317 {
318     std::mutex dataMutex;
319     std::condition_variable cv;
320     bool finish = false;
321     SyncProcess last;
322     auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
323         for (const auto &item: process) {
324             if (item.second.process == DistributedDB::FINISHED) {
325                 {
326                     std::lock_guard<std::mutex> autoLock(dataMutex);
327                     finish = true;
328                     last = item.second;
329                 }
330                 cv.notify_one();
331             }
332         }
333     };
334     ASSERT_EQ(g_delegate->Sync(option, callback), expectResult);
335     if (expectResult == OK) {
336         std::unique_lock<std::mutex> uniqueLock(dataMutex);
337         cv.wait(uniqueLock, [&finish]() {
338             return finish;
339         });
340     }
341     g_syncProcess = last;
342 }
343 
TestConflictSync001(bool isUpdate)344 void DistributedDBCloudSyncerLockTest::TestConflictSync001(bool isUpdate)
345 {
346     /**
347      * @tc.steps:step1. init data and sync
348      * @tc.expected: step1. return ok.
349      */
350     int cloudCount = 20;
351     int localCount = 10;
352     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
353     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
354     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
355     CallSync(option);
356 
357     /**
358      * @tc.steps:step2. update local data to upload, and set hook before upload, operator cloud data which id is 1
359      * @tc.expected: step2. return ok.
360      */
361     std::string sql;
362     if (isUpdate) {
363         sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx' where id = 1;";
364     } else {
365         sql = "delete from " + ASSETS_TABLE_NAME + " where id = 1;";
366     }
367     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
368     int index = 0;
369     g_cloudStoreHook->SetDoUploadHook([&index, this]() {
370         if (++index == 1) {
371             UpdateCloudDBData(1, 1, 0, 21, ASSETS_TABLE_NAME); // 21 is version
372         }
373     });
374 
375     /**
376      * @tc.steps:step3. sync and check local data
377      * @tc.expected: step3. return ok.
378      */
379     CallSync(option);
380     sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '1';";
381     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
382         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
383 }
384 
CheckAssetStatusNormal()385 void DistributedDBCloudSyncerLockTest::CheckAssetStatusNormal()
386 {
387     std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
388     sqlite3_stmt *stmt = nullptr;
389     ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
390     while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
391         ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
392         ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
393         Type assetBlob;
394         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
395         Asset asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
396         EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
397         Type assetsBlob;
398         ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
399         Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
400         for (const auto &as : assets) {
401             EXPECT_EQ(as.status, static_cast<uint32_t>(AssetStatus::NORMAL));
402         }
403     }
404     int errCode = E_OK;
405     SQLiteUtils::ResetStatement(stmt, true, errCode);
406 }
407 
UpdateCloudAssets(Asset & asset,Assets & assets,const std::string & version)408 void DistributedDBCloudSyncerLockTest::UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version)
409 {
410     std::this_thread::sleep_for(std::chrono::milliseconds(1));
411     VBucket data;
412     std::vector<VBucket> record;
413     std::vector<VBucket> extend;
414     asset.name.empty() ? data.insert_or_assign(COL_ASSET, Nil()) : data.insert_or_assign(COL_ASSET, asset);
415     data.insert_or_assign(COL_ID, 0L);
416     data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
417     assets.empty() ? data.insert_or_assign(COL_ASSETS, Nil()) : data.insert_or_assign(COL_ASSETS, assets);
418     record.push_back(data);
419     VBucket log;
420     Timestamp now = TimeHelper::GetSysCurrentTime();
421     log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
422     log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
423     log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
424     log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
425     log.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
426     extend.push_back(log);
427     ASSERT_EQ(g_virtualCloudDb->BatchUpdate(ASSETS_TABLE_NAME, std::move(record), extend), DBStatus::OK);
428     std::this_thread::sleep_for(std::chrono::milliseconds(1));
429 }
430 
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)431 void DistributedDBCloudSyncerLockTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
432 {
433     std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE ";
434     switch (opType) {
435         case OpType::INSERT:
436             sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
437                    " cloud_gid != '' AND version !='' AND flag=flag|0x10";
438             break;
439         case OpType::UPDATE:
440             sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
441                    " cloud_gid == '' AND version =='' AND flag=flag|0x10";
442             break;
443         case OpType::DELETE:
444             sql += " cloud_gid == '' AND version ==''";
445             break;
446         default:
447             break;
448     }
449     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
450         reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
451 }
452 
453 /**
454  * @tc.name: RDBUnlockCloudSync001
455  * @tc.desc: Test sync with no lock
456  * @tc.type: FUNC
457  * @tc.require:
458  * @tc.author: bty
459  */
460 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBUnlockCloudSync001, TestSize.Level0)
461 {
462     /**
463      * @tc.steps:step1. init data and sync with none lock
464      * @tc.expected: step1. return ok.
465      */
466     int cloudCount = 20;
467     int localCount = 10;
468     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
469     InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
470     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::NONE);
471     CallSync(option);
472 
473     /**
474      * @tc.steps:step2. insert or replace, check version
475      * @tc.expected: step2. return ok.
476      */
477     std::string sql = "INSERT OR REPLACE INTO " + ASSETS_TABLE_NAME + " VALUES('0', 'XX', '', '');";
478     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
479     sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
480         " where version != '' and version is not null;";
481     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
482         reinterpret_cast<void *>(cloudCount), nullptr), SQLITE_OK);
483 }
484 
485 /**
486  * @tc.name: RDBConflictCloudSync001
487  * @tc.desc: Both cloud and local are available, local version is empty, with cloud updates before upload
488  * @tc.type: FUNC
489  * @tc.require:
490  * @tc.author: bty
491  */
492 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync001, TestSize.Level0)
493 {
494     /**
495      * @tc.steps:step1. init data and set hook before upload, update cloud data which gid is 1
496      * @tc.expected: step1. return ok.
497      */
498     int cloudCount = 20;
499     int localCount = 10;
500     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
501     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
502     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
503     int index = 0;
__anon6ad9bb3f0502() 504     g_cloudStoreHook->SetDoUploadHook([&index, this]() {
505         if (++index == 1) {
506             UpdateCloudDBData(1, 1, 0, 1, ASSETS_TABLE_NAME);
507         }
508     });
509 
510     /**
511      * @tc.steps:step2. sync and check local data
512      * @tc.expected: step2. return ok.
513      */
514     CallSync(option);
515     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
516         " where flag&0x02=0 AND version='20' AND cloud_gid = '1';";
517     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
518         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
519 }
520 
521 /**
522  * @tc.name: RDBConflictCloudSync002
523  * @tc.desc: Both cloud and local are available, with cloud updates before upload
524  * @tc.type: FUNC
525  * @tc.require:
526  * @tc.author: bty
527  */
528 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync002, TestSize.Level0)
529 {
530     TestConflictSync001(true);
531 }
532 
533 /**
534  * @tc.name: RDBConflictCloudSync003
535  * @tc.desc: Both cloud and local are available, with cloud deletes before upload
536  * @tc.type: FUNC
537  * @tc.require:
538  * @tc.author: bty
539  */
540 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync003, TestSize.Level0)
541 {
542     TestConflictSync001(false);
543 }
544 
545 /**
546  * @tc.name: RDBConflictCloudSync003
547  * @tc.desc: Both cloud and local are available, with cloud inserts before upload
548  * @tc.type: FUNC
549  * @tc.require:
550  * @tc.author: bty
551  */
552 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync004, TestSize.Level0)
553 {
554     /**
555      * @tc.steps:step1. init data and sync
556      * @tc.expected: step1. return ok.
557      */
558     int cloudCount = 20;
559     int localCount = 10;
560     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
561     InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
562     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
563     CallSync(option);
564 
565     /**
566      * @tc.steps:step2. insert local data and set hook before upload, insert cloud data which id is 20
567      * @tc.expected: step2. return ok.
568      */
569     std::string sql = "INSERT INTO " + ASSETS_TABLE_NAME + " VALUES('20', 'XXX', NULL, NULL);";
570     EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
571     int index = 0;
__anon6ad9bb3f0602() 572     g_cloudStoreHook->SetDoUploadHook([&index, cloudCount, this]() {
573         if (++index == 1) {
574             InsertCloudDBData(cloudCount, 1, cloudCount, ASSETS_TABLE_NAME);
575         }
576     });
577 
578     /**
579      * @tc.steps:step3. set hook for batch insert, return CLOUD_VERSION_CONFLICT err
580      * @tc.expected: step3. return ok.
581      */
582     g_virtualCloudDb->ForkInsertConflict([](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6ad9bb3f0702(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 583         std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
584         for (auto &[cloudRecord, cloudExtend]: cloudDataVec) {
585             int64_t cloudPk;
586             CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, record, cloudPk);
587             int64_t localPk;
588             CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, cloudRecord, localPk);
589             if (cloudPk != localPk) {
590                 continue;
591             }
592             std::string localVersion;
593             CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, extend, localVersion);
594             std::string cloudVersion;
595             CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, cloudExtend,
596                 cloudVersion);
597             if (localVersion != cloudVersion) {
598                 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
599                 return CLOUD_VERSION_CONFLICT;
600             }
601         }
602         return OK;
603     });
604 
605     /**
606      * @tc.steps:step3. sync and check local data
607      * @tc.expected: step3. return ok.
608      */
609     CallSync(option);
610     sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '20';";
611     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
612         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
613     for (const auto &table : g_syncProcess.tableProcess) {
614         EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
615     }
616 }
617 
618 /**
619  * @tc.name: QueryCursorTest001
620  * @tc.desc: Test cursor after querying no data
621  * @tc.type: FUNC
622  * @tc.require:
623  * @tc.author: bty
624  */
625 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest001, TestSize.Level0)
626 {
627     /**
628      * @tc.steps:step1. init data and Query with cursor tha exceeds range
629      * @tc.expected: step1. return ok.
630      */
631     int cloudCount = 20;
632     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
633     VBucket extend;
634     extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(30);
635     std::vector<VBucket> data;
636 
637     /**
638      * @tc.steps:step2. check cursor output param
639      * @tc.expected: step2. return QUERY_END.
640      */
641     EXPECT_EQ(g_virtualCloudDb->Query(ASSETS_TABLE_NAME, extend, data), QUERY_END);
642     EXPECT_EQ(std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]), std::to_string(cloudCount));
643 }
644 
645 /**
646  * @tc.name: QueryCursorTest002
647  * @tc.desc: Test cursor in conditional query sync
648  * @tc.type: FUNC
649  * @tc.require:
650  * @tc.author: bty
651  */
652 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest002, TestSize.Level0)
653 {
654     /**
655      * @tc.steps:step1. init data
656      * @tc.expected: step1. return ok.
657      */
658     int count = 10;
659     InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
660     InsertLocalData(0, count, ASSETS_TABLE_NAME, true);
661     std::vector<int> idVec = {2, 3};
662     CloudSyncOption option = PrepareOption(Query::Select().From(ASSETS_TABLE_NAME).In("id", idVec),
663         LockAction::DOWNLOAD, true);
664     int index = 0;
665 
666     /**
667      * @tc.steps:step2. sync and check cursor
668      * @tc.expected: step2. return ok.
669      */
__anon6ad9bb3f0802(const std::string &, VBucket &extend) 670     g_virtualCloudDb->ForkQuery([&index](const std::string &, VBucket &extend) {
671         if (index == 1) {
672             std::string cursor;
673             CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CURSOR_FIELD, extend, cursor);
674             EXPECT_EQ(cursor, std::string(""));
675         }
676         index++;
677     });
678     CallSync(option);
679 }
680 
681 /**
682  * @tc.name: RecordConflictTest001
683  * @tc.desc: Test the asset input param after download return CLOUD_RECORD_EXIST_CONFLICT
684  * @tc.type: FUNC
685  * @tc.require:
686  * @tc.author: bty
687  */
688 HWTEST_F(DistributedDBCloudSyncerLockTest, RecordConflictTest001, TestSize.Level0)
689 {
690     /**
691      * @tc.steps:step1. init data and sync
692      * @tc.expected: step1. return ok.
693      */
694     int count = 10;
695     InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
696     g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
697     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
698     int callCount = 0;
__anon6ad9bb3f0902() 699     g_cloudStoreHook->SetSyncFinishHook([&callCount]() {
700         callCount++;
701         g_processCondition.notify_all();
702     });
703     CallSync(option);
704     {
705         std::unique_lock<std::mutex> lock(g_processMutex);
706         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anon6ad9bb3f0a02() 707             [&callCount]() { return callCount == 2; }); // 2 is compensated sync
708         ASSERT_EQ(result, true);
709     }
710 
711     /**
712      * @tc.steps:step2. sync again and check asset
713      * @tc.expected: step2. return ok.
714      */
715     g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
__anon6ad9bb3f0b02(std::map<std::string, Assets> &assets) 716     g_virtualAssetLoader->ForkDownload([](std::map<std::string, Assets> &assets) {
717         EXPECT_EQ(assets.find(COL_ASSET) != assets.end(), true);
718     });
719     CallSync(option);
720     {
721         std::unique_lock<std::mutex> lock(g_processMutex);
722         bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anon6ad9bb3f0c02() 723             [&callCount]() { return callCount == 4; }); // 4 is compensated sync
724         ASSERT_EQ(result, true);
725     }
726     g_cloudStoreHook->SetSyncFinishHook(nullptr);
727 }
728 
729 /**
730  * @tc.name: QueryCursorTest003
731  * @tc.desc: Test whether cursor fallback
732  * @tc.type: FUNC
733  * @tc.require:
734  * @tc.author: bty
735  */
736 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest003, TestSize.Level0)
737 {
738     /**
739      * @tc.steps:step1. init cloud data and sync
740      * @tc.expected: step1. return ok.
741      */
742     int cloudCount = 10;
743     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
744     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
745     CallSync(option);
746 
747     /**
748      * @tc.steps:step2. delete cloud data and sync
749      * @tc.expected: step2. return ok.
750      */
751     DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
752     CallSync(option);
753 
754     /**
755      * @tc.steps:step3. remove data
756      * @tc.expected: step3. return ok.
757      */
758     std::string device = "";
759     ASSERT_EQ(g_delegate->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
760 
761     /**
762      * @tc.steps:step4. insert local and check cursor
763      * @tc.expected: step4. return ok.
764      */
765     InsertLocalData(0, 1, ASSETS_TABLE_NAME, true);
766     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
767         " where cursor='31';";
768     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
769         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
770 }
771 
772 /**
773  * @tc.name: QueryCursorTest004
774  * @tc.desc: Test temp trigger under concurrency
775  * @tc.type: FUNC
776  * @tc.require:
777  * @tc.author: bty
778  */
779 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest004, TestSize.Level0)
780 {
781     /**
782      * @tc.steps:step1. init cloud data
783      * @tc.expected: step1. return ok.
784      */
785     int cloudCount = 10;
786     InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
787     InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
788 
789     /**
790      * @tc.steps:step2. set tracker table before saving cloud data
791      * @tc.expected: step2. return ok.
792      */
__anon6ad9bb3f0d02(const std::string &table, VBucket &) 793     g_virtualCloudDb->ForkQuery([](const std::string &table, VBucket &) {
794         TrackerSchema schema = {
795             .tableName = ASSETS_TABLE_NAME, .extendColName = COL_NAME, .trackerColNames = { COL_ID }
796         };
797         EXPECT_EQ(g_delegate->SetTrackerTable(schema), WITH_INVENTORY_DATA);
798     });
799     CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
800     CallSync(option);
801 
802     /**
803      * @tc.steps:step3. check extend_field and cursor
804      * @tc.expected: step3. return ok.
805      */
806     std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
807         " where data_key='0' and extend_field='name10' and cursor='32';";
808     EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
809         reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
810 }
811 } // namespace
812 #endif // RELATIONAL_STORE