1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "cloud/asset_operation_utils.h"
17 #include "cloud/cloud_storage_utils.h"
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "process_system_api_adapter_impl.h"
23 #include "relational_store_client.h"
24 #include "relational_store_instance.h"
25 #include "relational_store_manager.h"
26 #include "runtime_config.h"
27 #include "sqlite_relational_store.h"
28 #include "sqlite_relational_utils.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 #include <gtest/gtest.h>
35 #include <iostream>
36
37 using namespace testing::ext;
38 using namespace DistributedDB;
39 using namespace DistributedDBUnitTest;
40 using namespace std;
41
42 namespace {
43 const string STORE_ID = "Relational_Store_Lock_Sync";
44 const string DB_SUFFIX = ".db";
45 const string ASSETS_TABLE_NAME = "student";
46 const string ASSETS_TABLE_NAME_SHARED = "student_shared";
47 const string DEVICE_CLOUD = "cloud_dev";
48 const string COL_ID = "id";
49 const string COL_NAME = "name";
50 const string COL_HEIGHT = "height";
51 const string COL_ASSET = "asset";
52 const string COL_ASSETS = "assets";
53 const string COL_AGE = "age";
54 const int64_t WAIT_TIME = 5;
55 const std::vector<Field> CLOUD_FIELDS = {{COL_ID, TYPE_INDEX<int64_t>, true}, {COL_NAME, TYPE_INDEX<std::string>},
56 {COL_ASSET, TYPE_INDEX<Asset>}, {COL_ASSETS, TYPE_INDEX<Assets>}};
57 const string CREATE_SINGLE_PRIMARY_KEY_TABLE = "CREATE TABLE IF NOT EXISTS " + ASSETS_TABLE_NAME + "(" + COL_ID +
58 " INTEGER PRIMARY KEY," + COL_NAME + " TEXT ," + COL_ASSET + " ASSET," + COL_ASSETS + " ASSETS" + ");";
59 const Asset ASSET_COPY = {.version = 1,
60 .name = "Phone",
61 .assetId = "0",
62 .subpath = "/local/sync",
63 .uri = "/local/sync",
64 .modifyTime = "123456",
65 .createTime = "",
66 .size = "256",
67 .hash = "ASE"};
68 const Assets ASSETS_COPY1 = { ASSET_COPY };
69 const string ASSET_SUFFIX = "_copy";
70
71 string g_storePath;
72 string g_testDir;
73 RelationalStoreObserverUnitTest *g_observer = nullptr;
74 DistributedDB::RelationalStoreManager g_mgr(APP_ID, USER_ID);
75 RelationalStoreDelegate *g_delegate = nullptr;
76 std::shared_ptr<VirtualCloudDb> g_virtualCloudDb;
77 std::shared_ptr<VirtualAssetLoader> g_virtualAssetLoader;
78 std::shared_ptr<VirtualCloudDataTranslate> g_virtualCloudDataTranslate;
79 SyncProcess g_syncProcess;
80 std::condition_variable g_processCondition;
81 std::mutex g_processMutex;
82 IRelationalStore *g_store = nullptr;
83 ICloudSyncStorageHook *g_cloudStoreHook = nullptr;
84 int64_t g_nameId;
85 using CloudSyncStatusCallback = std::function<void(const std::map<std::string, SyncProcess> &onProcess)>;
86
GetCloudDbSchema(DataBaseSchema & dataBaseSchema)87 void GetCloudDbSchema(DataBaseSchema &dataBaseSchema)
88 {
89 TableSchema assetsTableSchema = {.name = ASSETS_TABLE_NAME, .sharedTableName = ASSETS_TABLE_NAME_SHARED,
90 .fields = CLOUD_FIELDS};
91 dataBaseSchema.tables.push_back(assetsTableSchema);
92 }
93
CloseDb()94 void CloseDb()
95 {
96 delete g_observer;
97 g_virtualCloudDb = nullptr;
98 if (g_delegate != nullptr) {
99 EXPECT_EQ(g_mgr.CloseStore(g_delegate), DBStatus::OK);
100 g_delegate = nullptr;
101 }
102 }
103
104 class DistributedDBCloudSyncerLockTest : public testing::Test {
105 public:
106 static void SetUpTestCase(void);
107 static void TearDownTestCase(void);
108 void SetUp();
109 void TearDown();
110
111 protected:
112 void Init();
113 const RelationalSyncAbleStorage *GetRelationalStore();
114 void InsertLocalData(int64_t begin, int64_t count, const std::string &tableName, bool isAssetNull = true);
115 void GenerateDataRecords(
116 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend);
117 void InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart, const std::string &tableName);
118 void UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart, int64_t versionStart,
119 const std::string &tableName);
120 void DeleteCloudDBData(int64_t beginGid, int64_t count, const std::string &tableName);
121 void CallSync(const CloudSyncOption &option, DBStatus expectResult = OK);
122
123 void TestConflictSync001(bool isUpdate);
124 void CheckAssetStatusNormal();
125 void UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version);
126 void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
127 sqlite3 *db = nullptr;
128 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
129 };
130
SetUpTestCase(void)131 void DistributedDBCloudSyncerLockTest::SetUpTestCase(void)
132 {
133 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
134 g_storePath = g_testDir + "/" + STORE_ID + DB_SUFFIX;
135 LOGI("The test db is:%s", g_storePath.c_str());
136 g_virtualCloudDataTranslate = std::make_shared<VirtualCloudDataTranslate>();
137 RuntimeConfig::SetCloudTranslate(g_virtualCloudDataTranslate);
138 }
139
TearDownTestCase(void)140 void DistributedDBCloudSyncerLockTest::TearDownTestCase(void) {}
141
SetUp(void)142 void DistributedDBCloudSyncerLockTest::SetUp(void)
143 {
144 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
145 LOGE("rm test db files error.");
146 }
147 DistributedDBToolsUnitTest::PrintTestCaseInfo();
148 LOGD("Test dir is %s", g_testDir.c_str());
149 Init();
150 g_cloudStoreHook = (ICloudSyncStorageHook *) GetRelationalStore();
151 ASSERT_NE(g_cloudStoreHook, nullptr);
152 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
153 ASSERT_TRUE(communicatorAggregator_ != nullptr);
154 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
155 }
156
TearDown(void)157 void DistributedDBCloudSyncerLockTest::TearDown(void)
158 {
159 RefObject::DecObjRef(g_store);
160 g_virtualCloudDb->ForkUpload(nullptr);
161 CloseDb();
162 EXPECT_EQ(sqlite3_close_v2(db), SQLITE_OK);
163 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
164 LOGE("rm test db files error.");
165 }
166 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
167 communicatorAggregator_ = nullptr;
168 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
169 }
170
Init()171 void DistributedDBCloudSyncerLockTest::Init()
172 {
173 db = RelationalTestUtils::CreateDataBase(g_storePath);
174 ASSERT_NE(db, nullptr);
175 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
176 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_SINGLE_PRIMARY_KEY_TABLE), SQLITE_OK);
177 g_observer = new (std::nothrow) RelationalStoreObserverUnitTest();
178 ASSERT_NE(g_observer, nullptr);
179 ASSERT_EQ(g_mgr.OpenStore(g_storePath, STORE_ID, RelationalStoreDelegate::Option{.observer = g_observer},
180 g_delegate), DBStatus::OK);
181 ASSERT_NE(g_delegate, nullptr);
182 ASSERT_EQ(g_delegate->CreateDistributedTable(ASSETS_TABLE_NAME, CLOUD_COOPERATION), DBStatus::OK);
183 g_virtualCloudDb = make_shared<VirtualCloudDb>();
184 g_virtualAssetLoader = make_shared<VirtualAssetLoader>();
185 g_syncProcess = {};
186 ASSERT_EQ(g_delegate->SetCloudDB(g_virtualCloudDb), DBStatus::OK);
187 ASSERT_EQ(g_delegate->SetIAssetLoader(g_virtualAssetLoader), DBStatus::OK);
188 DataBaseSchema dataBaseSchema;
189 GetCloudDbSchema(dataBaseSchema);
190 ASSERT_EQ(g_delegate->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
191 g_nameId = 0;
192 }
193
GetRelationalStore()194 const RelationalSyncAbleStorage* DistributedDBCloudSyncerLockTest::GetRelationalStore()
195 {
196 RelationalDBProperties properties;
197 CloudDBSyncUtilsTest::InitStoreProp(g_storePath, APP_ID, USER_ID, STORE_ID, properties);
198 int errCode = E_OK;
199 g_store = RelationalStoreInstance::GetDataBase(properties, errCode);
200 if (g_store == nullptr) {
201 return nullptr;
202 }
203 return static_cast<SQLiteRelationalStore *>(g_store)->GetStorageEngine();
204 }
205
206
GenerateDataRecords(int64_t begin,int64_t count,int64_t gidStart,std::vector<VBucket> & record,std::vector<VBucket> & extend)207 void DistributedDBCloudSyncerLockTest::GenerateDataRecords(
208 int64_t begin, int64_t count, int64_t gidStart, std::vector<VBucket> &record, std::vector<VBucket> &extend)
209 {
210 for (int64_t i = begin; i < begin + count; i++) {
211 Assets assets;
212 Asset asset = ASSET_COPY;
213 asset.name = ASSET_COPY.name + std::to_string(i);
214 assets.emplace_back(asset);
215 VBucket data;
216 data.insert_or_assign(COL_ASSET, asset);
217 asset.name = ASSET_COPY.name + std::to_string(i) + "_copy";
218 assets.emplace_back(asset);
219 data.insert_or_assign(COL_ID, i);
220 data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
221 data.insert_or_assign(COL_ASSETS, assets);
222 record.push_back(data);
223
224 VBucket log;
225 Timestamp now = TimeHelper::GetSysCurrentTime();
226 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
227 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
228 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
229 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(i + gidStart));
230 extend.push_back(log);
231 }
232 }
233
InsertLocalData(int64_t begin,int64_t count,const std::string & tableName,bool isAssetNull)234 void DistributedDBCloudSyncerLockTest::InsertLocalData(int64_t begin, int64_t count,
235 const std::string &tableName, bool isAssetNull)
236 {
237 int errCode;
238 std::vector<VBucket> record;
239 std::vector<VBucket> extend;
240 GenerateDataRecords(begin, count, 0, record, extend);
241 const string sql = "insert or replace into " + tableName + " values (?,?,?,?);";
242 for (VBucket vBucket : record) {
243 sqlite3_stmt *stmt = nullptr;
244 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
245 ASSERT_EQ(SQLiteUtils::BindInt64ToStatement(stmt, 1, std::get<int64_t>(vBucket[COL_ID])), E_OK); // 1 is id
246 ASSERT_EQ(SQLiteUtils::BindTextToStatement(stmt, 2, std::get<string>(vBucket[COL_NAME])), E_OK); // 2 is name
247 if (isAssetNull) {
248 ASSERT_EQ(sqlite3_bind_null(stmt, 3), SQLITE_OK); // 3 is asset
249 } else {
250 std::vector<uint8_t> assetBlob = g_virtualCloudDataTranslate->AssetToBlob(ASSET_COPY);
251 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 3, assetBlob, false), E_OK); // 3 is asset
252 }
253 std::vector<uint8_t> assetsBlob = g_virtualCloudDataTranslate->AssetsToBlob(
254 std::get<Assets>(vBucket[COL_ASSETS]));
255 ASSERT_EQ(SQLiteUtils::BindBlobToStatement(stmt, 4, assetsBlob, false), E_OK); // 4 is assets
256 EXPECT_EQ(SQLiteUtils::StepWithRetry(stmt), SQLiteUtils::MapSQLiteErrno(SQLITE_DONE));
257 SQLiteUtils::ResetStatement(stmt, true, errCode);
258 }
259 }
260
InsertCloudDBData(int64_t begin,int64_t count,int64_t gidStart,const std::string & tableName)261 void DistributedDBCloudSyncerLockTest::InsertCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
262 const std::string &tableName)
263 {
264 std::this_thread::sleep_for(std::chrono::milliseconds(1));
265 std::vector<VBucket> record;
266 std::vector<VBucket> extend;
267 GenerateDataRecords(begin, count, gidStart, record, extend);
268 ASSERT_EQ(g_virtualCloudDb->BatchInsertWithGid(tableName, std::move(record), extend), DBStatus::OK);
269 std::this_thread::sleep_for(std::chrono::milliseconds(1));
270 }
271
UpdateCloudDBData(int64_t begin,int64_t count,int64_t gidStart,int64_t versionStart,const std::string & tableName)272 void DistributedDBCloudSyncerLockTest::UpdateCloudDBData(int64_t begin, int64_t count, int64_t gidStart,
273 int64_t versionStart, const std::string &tableName)
274 {
275 std::this_thread::sleep_for(std::chrono::milliseconds(1));
276 std::vector<VBucket> record;
277 std::vector<VBucket> extend;
278 GenerateDataRecords(begin, count, gidStart, record, extend);
279 for (auto &entry: extend) {
280 entry[CloudDbConstant::VERSION_FIELD] = std::to_string(versionStart++);
281 }
282 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(tableName, std::move(record), extend), DBStatus::OK);
283 std::this_thread::sleep_for(std::chrono::milliseconds(1));
284 }
285
DeleteCloudDBData(int64_t beginGid,int64_t count,const std::string & tableName)286 void DistributedDBCloudSyncerLockTest::DeleteCloudDBData(int64_t beginGid, int64_t count,
287 const std::string &tableName)
288 {
289 Timestamp now = TimeHelper::GetSysCurrentTime();
290 std::vector<VBucket> extend;
291 for (int64_t i = 0; i < count; ++i) {
292 VBucket log;
293 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
294 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND + i);
295 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(beginGid + i));
296 extend.push_back(log);
297 }
298 ASSERT_EQ(g_virtualCloudDb->BatchDelete(tableName, extend), DBStatus::OK);
299 std::this_thread::sleep_for(std::chrono::milliseconds(count));
300 }
301
PrepareOption(const Query & query,LockAction action,bool isPriorityTask=false,bool isCompensatedSyncOnly=false)302 CloudSyncOption PrepareOption(const Query &query, LockAction action, bool isPriorityTask = false,
303 bool isCompensatedSyncOnly = false)
304 {
305 CloudSyncOption option;
306 option.devices = { "CLOUD" };
307 option.mode = SYNC_MODE_CLOUD_MERGE;
308 option.query = query;
309 option.waitTime = WAIT_TIME;
310 option.priorityTask = isPriorityTask;
311 option.compensatedSyncOnly = isCompensatedSyncOnly;
312 option.lockAction = action;
313 return option;
314 }
315
CallSync(const CloudSyncOption & option,DBStatus expectResult)316 void DistributedDBCloudSyncerLockTest::CallSync(const CloudSyncOption &option, DBStatus expectResult)
317 {
318 std::mutex dataMutex;
319 std::condition_variable cv;
320 bool finish = false;
321 SyncProcess last;
322 auto callback = [&last, &cv, &dataMutex, &finish](const std::map<std::string, SyncProcess> &process) {
323 for (const auto &item: process) {
324 if (item.second.process == DistributedDB::FINISHED) {
325 {
326 std::lock_guard<std::mutex> autoLock(dataMutex);
327 finish = true;
328 last = item.second;
329 }
330 cv.notify_one();
331 }
332 }
333 };
334 ASSERT_EQ(g_delegate->Sync(option, callback), expectResult);
335 if (expectResult == OK) {
336 std::unique_lock<std::mutex> uniqueLock(dataMutex);
337 cv.wait(uniqueLock, [&finish]() {
338 return finish;
339 });
340 }
341 g_syncProcess = last;
342 }
343
TestConflictSync001(bool isUpdate)344 void DistributedDBCloudSyncerLockTest::TestConflictSync001(bool isUpdate)
345 {
346 /**
347 * @tc.steps:step1. init data and sync
348 * @tc.expected: step1. return ok.
349 */
350 int cloudCount = 20;
351 int localCount = 10;
352 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
353 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
354 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
355 CallSync(option);
356
357 /**
358 * @tc.steps:step2. update local data to upload, and set hook before upload, operator cloud data which id is 1
359 * @tc.expected: step2. return ok.
360 */
361 std::string sql;
362 if (isUpdate) {
363 sql = "update " + ASSETS_TABLE_NAME + " set name = 'xxx' where id = 1;";
364 } else {
365 sql = "delete from " + ASSETS_TABLE_NAME + " where id = 1;";
366 }
367 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
368 int index = 0;
369 g_cloudStoreHook->SetDoUploadHook([&index, this]() {
370 if (++index == 1) {
371 UpdateCloudDBData(1, 1, 0, 21, ASSETS_TABLE_NAME); // 21 is version
372 }
373 });
374
375 /**
376 * @tc.steps:step3. sync and check local data
377 * @tc.expected: step3. return ok.
378 */
379 CallSync(option);
380 sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '1';";
381 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
382 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
383 }
384
CheckAssetStatusNormal()385 void DistributedDBCloudSyncerLockTest::CheckAssetStatusNormal()
386 {
387 std::string sql = "SELECT asset, assets FROM " + ASSETS_TABLE_NAME + ";";
388 sqlite3_stmt *stmt = nullptr;
389 ASSERT_EQ(SQLiteUtils::GetStatement(db, sql, stmt), E_OK);
390 while (SQLiteUtils::StepWithRetry(stmt) != SQLiteUtils::MapSQLiteErrno(SQLITE_DONE)) {
391 ASSERT_EQ(sqlite3_column_type(stmt, 0), SQLITE_BLOB);
392 ASSERT_EQ(sqlite3_column_type(stmt, 1), SQLITE_BLOB);
393 Type assetBlob;
394 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Asset>, 0, assetBlob), E_OK);
395 Asset asset = g_virtualCloudDataTranslate->BlobToAsset(std::get<Bytes>(assetBlob));
396 EXPECT_EQ(asset.status, static_cast<uint32_t>(AssetStatus::NORMAL));
397 Type assetsBlob;
398 ASSERT_EQ(SQLiteRelationalUtils::GetCloudValueByType(stmt, TYPE_INDEX<Assets>, 0, assetsBlob), E_OK);
399 Assets assets = g_virtualCloudDataTranslate->BlobToAssets(std::get<Bytes>(assetsBlob));
400 for (const auto &as : assets) {
401 EXPECT_EQ(as.status, static_cast<uint32_t>(AssetStatus::NORMAL));
402 }
403 }
404 int errCode = E_OK;
405 SQLiteUtils::ResetStatement(stmt, true, errCode);
406 }
407
UpdateCloudAssets(Asset & asset,Assets & assets,const std::string & version)408 void DistributedDBCloudSyncerLockTest::UpdateCloudAssets(Asset &asset, Assets &assets, const std::string &version)
409 {
410 std::this_thread::sleep_for(std::chrono::milliseconds(1));
411 VBucket data;
412 std::vector<VBucket> record;
413 std::vector<VBucket> extend;
414 asset.name.empty() ? data.insert_or_assign(COL_ASSET, Nil()) : data.insert_or_assign(COL_ASSET, asset);
415 data.insert_or_assign(COL_ID, 0L);
416 data.insert_or_assign(COL_NAME, "name" + std::to_string(g_nameId++));
417 assets.empty() ? data.insert_or_assign(COL_ASSETS, Nil()) : data.insert_or_assign(COL_ASSETS, assets);
418 record.push_back(data);
419 VBucket log;
420 Timestamp now = TimeHelper::GetSysCurrentTime();
421 log.insert_or_assign(CloudDbConstant::CREATE_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
422 log.insert_or_assign(CloudDbConstant::MODIFY_FIELD, (int64_t)now / CloudDbConstant::TEN_THOUSAND);
423 log.insert_or_assign(CloudDbConstant::DELETE_FIELD, false);
424 log.insert_or_assign(CloudDbConstant::GID_FIELD, std::to_string(0));
425 log.insert_or_assign(CloudDbConstant::VERSION_FIELD, version);
426 extend.push_back(log);
427 ASSERT_EQ(g_virtualCloudDb->BatchUpdate(ASSETS_TABLE_NAME, std::move(record), extend), DBStatus::OK);
428 std::this_thread::sleep_for(std::chrono::milliseconds(1));
429 }
430
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)431 void DistributedDBCloudSyncerLockTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
432 {
433 std::string sql = "SELECT count(*) FROM " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) + " WHERE ";
434 switch (opType) {
435 case OpType::INSERT:
436 sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
437 " cloud_gid != '' AND version !='' AND flag=flag|0x10";
438 break;
439 case OpType::UPDATE:
440 sql += isCompensated ? " cloud_gid != '' AND version !='' AND flag&0x10=0" :
441 " cloud_gid == '' AND version =='' AND flag=flag|0x10";
442 break;
443 case OpType::DELETE:
444 sql += " cloud_gid == '' AND version ==''";
445 break;
446 default:
447 break;
448 }
449 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
450 reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
451 }
452
453 /**
454 * @tc.name: RDBUnlockCloudSync001
455 * @tc.desc: Test sync with no lock
456 * @tc.type: FUNC
457 * @tc.require:
458 * @tc.author: bty
459 */
460 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBUnlockCloudSync001, TestSize.Level0)
461 {
462 /**
463 * @tc.steps:step1. init data and sync with none lock
464 * @tc.expected: step1. return ok.
465 */
466 int cloudCount = 20;
467 int localCount = 10;
468 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
469 InsertCloudDBData(0, localCount, 0, ASSETS_TABLE_NAME);
470 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::NONE);
471 CallSync(option);
472
473 /**
474 * @tc.steps:step2. insert or replace, check version
475 * @tc.expected: step2. return ok.
476 */
477 std::string sql = "INSERT OR REPLACE INTO " + ASSETS_TABLE_NAME + " VALUES('0', 'XX', '', '');";
478 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
479 sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
480 " where version != '' and version is not null;";
481 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
482 reinterpret_cast<void *>(cloudCount), nullptr), SQLITE_OK);
483 }
484
485 /**
486 * @tc.name: RDBConflictCloudSync001
487 * @tc.desc: Both cloud and local are available, local version is empty, with cloud updates before upload
488 * @tc.type: FUNC
489 * @tc.require:
490 * @tc.author: bty
491 */
492 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync001, TestSize.Level0)
493 {
494 /**
495 * @tc.steps:step1. init data and set hook before upload, update cloud data which gid is 1
496 * @tc.expected: step1. return ok.
497 */
498 int cloudCount = 20;
499 int localCount = 10;
500 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
501 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
502 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
503 int index = 0;
__anon6ad9bb3f0502() 504 g_cloudStoreHook->SetDoUploadHook([&index, this]() {
505 if (++index == 1) {
506 UpdateCloudDBData(1, 1, 0, 1, ASSETS_TABLE_NAME);
507 }
508 });
509
510 /**
511 * @tc.steps:step2. sync and check local data
512 * @tc.expected: step2. return ok.
513 */
514 CallSync(option);
515 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
516 " where flag&0x02=0 AND version='20' AND cloud_gid = '1';";
517 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
518 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
519 }
520
521 /**
522 * @tc.name: RDBConflictCloudSync002
523 * @tc.desc: Both cloud and local are available, with cloud updates before upload
524 * @tc.type: FUNC
525 * @tc.require:
526 * @tc.author: bty
527 */
528 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync002, TestSize.Level0)
529 {
530 TestConflictSync001(true);
531 }
532
533 /**
534 * @tc.name: RDBConflictCloudSync003
535 * @tc.desc: Both cloud and local are available, with cloud deletes before upload
536 * @tc.type: FUNC
537 * @tc.require:
538 * @tc.author: bty
539 */
540 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync003, TestSize.Level0)
541 {
542 TestConflictSync001(false);
543 }
544
545 /**
546 * @tc.name: RDBConflictCloudSync003
547 * @tc.desc: Both cloud and local are available, with cloud inserts before upload
548 * @tc.type: FUNC
549 * @tc.require:
550 * @tc.author: bty
551 */
552 HWTEST_F(DistributedDBCloudSyncerLockTest, RDBConflictCloudSync004, TestSize.Level0)
553 {
554 /**
555 * @tc.steps:step1. init data and sync
556 * @tc.expected: step1. return ok.
557 */
558 int cloudCount = 20;
559 int localCount = 10;
560 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
561 InsertLocalData(0, localCount, ASSETS_TABLE_NAME, true);
562 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
563 CallSync(option);
564
565 /**
566 * @tc.steps:step2. insert local data and set hook before upload, insert cloud data which id is 20
567 * @tc.expected: step2. return ok.
568 */
569 std::string sql = "INSERT INTO " + ASSETS_TABLE_NAME + " VALUES('20', 'XXX', NULL, NULL);";
570 EXPECT_EQ(RelationalTestUtils::ExecSql(db, sql.c_str()), SQLITE_OK);
571 int index = 0;
__anon6ad9bb3f0602() 572 g_cloudStoreHook->SetDoUploadHook([&index, cloudCount, this]() {
573 if (++index == 1) {
574 InsertCloudDBData(cloudCount, 1, cloudCount, ASSETS_TABLE_NAME);
575 }
576 });
577
578 /**
579 * @tc.steps:step3. set hook for batch insert, return CLOUD_VERSION_CONFLICT err
580 * @tc.expected: step3. return ok.
581 */
582 g_virtualCloudDb->ForkInsertConflict([](const std::string &tableName, VBucket &extend, VBucket &record,
__anon6ad9bb3f0702(const std::string &tableName, VBucket &extend, VBucket &record, std::vector<VirtualCloudDb::CloudData> &cloudDataVec) 583 std::vector<VirtualCloudDb::CloudData> &cloudDataVec) {
584 for (auto &[cloudRecord, cloudExtend]: cloudDataVec) {
585 int64_t cloudPk;
586 CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, record, cloudPk);
587 int64_t localPk;
588 CloudStorageUtils::GetValueFromVBucket<int64_t>(COL_ID, cloudRecord, localPk);
589 if (cloudPk != localPk) {
590 continue;
591 }
592 std::string localVersion;
593 CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, extend, localVersion);
594 std::string cloudVersion;
595 CloudStorageUtils::GetValueFromVBucket<std::string>(CloudDbConstant::VERSION_FIELD, cloudExtend,
596 cloudVersion);
597 if (localVersion != cloudVersion) {
598 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_VERSION_CONFLICT);
599 return CLOUD_VERSION_CONFLICT;
600 }
601 }
602 return OK;
603 });
604
605 /**
606 * @tc.steps:step3. sync and check local data
607 * @tc.expected: step3. return ok.
608 */
609 CallSync(option);
610 sql = "select count(*) from " + ASSETS_TABLE_NAME + " where name = 'name30' AND id = '20';";
611 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
612 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
613 for (const auto &table : g_syncProcess.tableProcess) {
614 EXPECT_EQ(table.second.upLoadInfo.failCount, 0u);
615 }
616 }
617
618 /**
619 * @tc.name: QueryCursorTest001
620 * @tc.desc: Test cursor after querying no data
621 * @tc.type: FUNC
622 * @tc.require:
623 * @tc.author: bty
624 */
625 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest001, TestSize.Level0)
626 {
627 /**
628 * @tc.steps:step1. init data and Query with cursor tha exceeds range
629 * @tc.expected: step1. return ok.
630 */
631 int cloudCount = 20;
632 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
633 VBucket extend;
634 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(30);
635 std::vector<VBucket> data;
636
637 /**
638 * @tc.steps:step2. check cursor output param
639 * @tc.expected: step2. return QUERY_END.
640 */
641 EXPECT_EQ(g_virtualCloudDb->Query(ASSETS_TABLE_NAME, extend, data), QUERY_END);
642 EXPECT_EQ(std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]), std::to_string(cloudCount));
643 }
644
645 /**
646 * @tc.name: QueryCursorTest002
647 * @tc.desc: Test cursor in conditional query sync
648 * @tc.type: FUNC
649 * @tc.require:
650 * @tc.author: bty
651 */
652 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest002, TestSize.Level0)
653 {
654 /**
655 * @tc.steps:step1. init data
656 * @tc.expected: step1. return ok.
657 */
658 int count = 10;
659 InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
660 InsertLocalData(0, count, ASSETS_TABLE_NAME, true);
661 std::vector<int> idVec = {2, 3};
662 CloudSyncOption option = PrepareOption(Query::Select().From(ASSETS_TABLE_NAME).In("id", idVec),
663 LockAction::DOWNLOAD, true);
664 int index = 0;
665
666 /**
667 * @tc.steps:step2. sync and check cursor
668 * @tc.expected: step2. return ok.
669 */
__anon6ad9bb3f0802(const std::string &, VBucket &extend) 670 g_virtualCloudDb->ForkQuery([&index](const std::string &, VBucket &extend) {
671 if (index == 1) {
672 std::string cursor;
673 CloudStorageUtils::GetValueFromVBucket(CloudDbConstant::CURSOR_FIELD, extend, cursor);
674 EXPECT_EQ(cursor, std::string(""));
675 }
676 index++;
677 });
678 CallSync(option);
679 }
680
681 /**
682 * @tc.name: RecordConflictTest001
683 * @tc.desc: Test the asset input param after download return CLOUD_RECORD_EXIST_CONFLICT
684 * @tc.type: FUNC
685 * @tc.require:
686 * @tc.author: bty
687 */
688 HWTEST_F(DistributedDBCloudSyncerLockTest, RecordConflictTest001, TestSize.Level0)
689 {
690 /**
691 * @tc.steps:step1. init data and sync
692 * @tc.expected: step1. return ok.
693 */
694 int count = 10;
695 InsertCloudDBData(0, count, 0, ASSETS_TABLE_NAME);
696 g_virtualAssetLoader->SetDownloadStatus(DBStatus::CLOUD_RECORD_EXIST_CONFLICT);
697 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
698 int callCount = 0;
__anon6ad9bb3f0902() 699 g_cloudStoreHook->SetSyncFinishHook([&callCount]() {
700 callCount++;
701 g_processCondition.notify_all();
702 });
703 CallSync(option);
704 {
705 std::unique_lock<std::mutex> lock(g_processMutex);
706 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anon6ad9bb3f0a02() 707 [&callCount]() { return callCount == 2; }); // 2 is compensated sync
708 ASSERT_EQ(result, true);
709 }
710
711 /**
712 * @tc.steps:step2. sync again and check asset
713 * @tc.expected: step2. return ok.
714 */
715 g_virtualAssetLoader->SetDownloadStatus(DBStatus::OK);
__anon6ad9bb3f0b02(std::map<std::string, Assets> &assets) 716 g_virtualAssetLoader->ForkDownload([](std::map<std::string, Assets> &assets) {
717 EXPECT_EQ(assets.find(COL_ASSET) != assets.end(), true);
718 });
719 CallSync(option);
720 {
721 std::unique_lock<std::mutex> lock(g_processMutex);
722 bool result = g_processCondition.wait_for(lock, std::chrono::seconds(WAIT_TIME),
__anon6ad9bb3f0c02() 723 [&callCount]() { return callCount == 4; }); // 4 is compensated sync
724 ASSERT_EQ(result, true);
725 }
726 g_cloudStoreHook->SetSyncFinishHook(nullptr);
727 }
728
729 /**
730 * @tc.name: QueryCursorTest003
731 * @tc.desc: Test whether cursor fallback
732 * @tc.type: FUNC
733 * @tc.require:
734 * @tc.author: bty
735 */
736 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest003, TestSize.Level0)
737 {
738 /**
739 * @tc.steps:step1. init cloud data and sync
740 * @tc.expected: step1. return ok.
741 */
742 int cloudCount = 10;
743 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
744 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
745 CallSync(option);
746
747 /**
748 * @tc.steps:step2. delete cloud data and sync
749 * @tc.expected: step2. return ok.
750 */
751 DeleteCloudDBData(0, cloudCount, ASSETS_TABLE_NAME);
752 CallSync(option);
753
754 /**
755 * @tc.steps:step3. remove data
756 * @tc.expected: step3. return ok.
757 */
758 std::string device = "";
759 ASSERT_EQ(g_delegate->RemoveDeviceData(device, DistributedDB::FLAG_ONLY), DBStatus::OK);
760
761 /**
762 * @tc.steps:step4. insert local and check cursor
763 * @tc.expected: step4. return ok.
764 */
765 InsertLocalData(0, 1, ASSETS_TABLE_NAME, true);
766 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
767 " where cursor='31';";
768 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
769 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
770 }
771
772 /**
773 * @tc.name: QueryCursorTest004
774 * @tc.desc: Test temp trigger under concurrency
775 * @tc.type: FUNC
776 * @tc.require:
777 * @tc.author: bty
778 */
779 HWTEST_F(DistributedDBCloudSyncerLockTest, QueryCursorTest004, TestSize.Level0)
780 {
781 /**
782 * @tc.steps:step1. init cloud data
783 * @tc.expected: step1. return ok.
784 */
785 int cloudCount = 10;
786 InsertLocalData(0, cloudCount, ASSETS_TABLE_NAME, true);
787 InsertCloudDBData(0, cloudCount, 0, ASSETS_TABLE_NAME);
788
789 /**
790 * @tc.steps:step2. set tracker table before saving cloud data
791 * @tc.expected: step2. return ok.
792 */
__anon6ad9bb3f0d02(const std::string &table, VBucket &) 793 g_virtualCloudDb->ForkQuery([](const std::string &table, VBucket &) {
794 TrackerSchema schema = {
795 .tableName = ASSETS_TABLE_NAME, .extendColName = COL_NAME, .trackerColNames = { COL_ID }
796 };
797 EXPECT_EQ(g_delegate->SetTrackerTable(schema), WITH_INVENTORY_DATA);
798 });
799 CloudSyncOption option = PrepareOption(Query::Select().FromTable({ ASSETS_TABLE_NAME }), LockAction::INSERT);
800 CallSync(option);
801
802 /**
803 * @tc.steps:step3. check extend_field and cursor
804 * @tc.expected: step3. return ok.
805 */
806 std::string sql = "select count(*) from " + DBCommon::GetLogTableName(ASSETS_TABLE_NAME) +
807 " where data_key='0' and extend_field='name10' and cursor='32';";
808 EXPECT_EQ(sqlite3_exec(db, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
809 reinterpret_cast<void *>(1), nullptr), SQLITE_OK);
810 }
811 } // namespace
812 #endif // RELATIONAL_STORE