1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifdef RELATIONAL_STORE
17 #include <gtest/gtest.h>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_types.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "log_print.h"
24 #include "relational_store_delegate.h"
25 #include "relational_store_instance.h"
26 #include "relational_store_manager.h"
27 #include "relational_sync_able_storage.h"
28 #include "runtime_config.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38
39 namespace {
40 constexpr const char *DB_SUFFIX = ".db";
41 constexpr const char *STORE_ID = "Relational_Store_ID";
42 constexpr const char *CREATE_TABLE_A_SQL =
43 "CREATE TABLE IF NOT EXISTS worker_a(" \
44 "id TEXT PRIMARY KEY," \
45 "name TEXT," \
46 "height REAL ," \
47 "photo BLOB," \
48 "age INT);";
49 constexpr const char *CREATE_TABLE_B_SQL =
50 "CREATE TABLE IF NOT EXISTS worker_b(" \
51 "id TEXT PRIMARY KEY," \
52 "name TEXT," \
53 "height REAL ," \
54 "photo BLOB," \
55 "age INT);";
56 constexpr const char *CREATE_TABLE_C_SQL =
57 "CREATE TABLE IF NOT EXISTS worker_c(" \
58 "id TEXT PRIMARY KEY," \
59 "name TEXT," \
60 "height REAL ," \
61 "photo BLOB," \
62 "age INT);";
63 constexpr const char *CREATE_TABLE_D_SQL =
64 "CREATE TABLE IF NOT EXISTS worker_d(" \
65 "id TEXT PRIMARY KEY," \
66 "name TEXT," \
67 "height REAL ," \
68 "photo BLOB," \
69 "age INT);";
70 const int64_t SYNC_WAIT_TIME = 60;
71
CreateUserDBAndTable(sqlite3 * & db)72 void CreateUserDBAndTable(sqlite3 *&db)
73 {
74 EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
75 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_A_SQL), SQLITE_OK);
76 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_B_SQL), SQLITE_OK);
77 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_C_SQL), SQLITE_OK);
78 EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_D_SQL), SQLITE_OK);
79 }
80
PrepareOption(CloudSyncOption & option,const Query & query,bool merge=false)81 void PrepareOption(CloudSyncOption &option, const Query &query, bool merge = false)
82 {
83 option.devices = { "CLOUD" };
84 option.mode = SYNC_MODE_CLOUD_MERGE;
85 option.query = query;
86 option.waitTime = SYNC_WAIT_TIME;
87 option.priorityTask = false;
88 option.compensatedSyncOnly = false;
89 option.merge = merge;
90 }
91
92 class DistributedDBCloudTaskMergeTest : public testing::Test {
93 public:
94 static void SetUpTestCase();
95 static void TearDownTestCase();
96 void SetUp() override;
97 void TearDown() override;
98 protected:
99 void InitTestDir();
100 DataBaseSchema GetSchema();
101 void CloseDb();
102 void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
103 void CheckCloudTableCount(const std::vector<std::string> &tableName, int64_t expectCount);
104 void SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count);
105 std::string testDir_;
106 std::string storePath_;
107 sqlite3 *db_ = nullptr;
108 RelationalStoreDelegate *delegate_ = nullptr;
109 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
110 std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
111 std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
112 std::string tableNameA_ = "worker_a";
113 std::string tableNameB_ = "worker_b";
114 std::string tableNameC_ = "worker_c";
115 std::string tableNameD_ = "worker_d";
116 std::vector<std::string> tables_ = { tableNameA_, tableNameB_, tableNameC_, tableNameD_ };
117 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
118 };
119
SetUpTestCase()120 void DistributedDBCloudTaskMergeTest::SetUpTestCase()
121 {
122 }
123
TearDownTestCase()124 void DistributedDBCloudTaskMergeTest::TearDownTestCase()
125 {
126 }
127
SetUp()128 void DistributedDBCloudTaskMergeTest::SetUp()
129 {
130 DistributedDBToolsUnitTest::PrintTestCaseInfo();
131 InitTestDir();
132 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
133 LOGE("rm test db files error.");
134 }
135 DistributedDBToolsUnitTest::PrintTestCaseInfo();
136 LOGD("Test dir is %s", testDir_.c_str());
137 db_ = RelationalTestUtils::CreateDataBase(storePath_);
138 ASSERT_NE(db_, nullptr);
139 CreateUserDBAndTable(db_);
140 mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
141 RelationalStoreDelegate::Option option;
142 ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
143 ASSERT_NE(delegate_, nullptr);
144 for (const auto &table : tables_) {
145 ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
146 }
147 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
148 virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
149 ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
150 ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
151 DataBaseSchema dataBaseSchema = GetSchema();
152 ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
153 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
154 ASSERT_TRUE(communicatorAggregator_ != nullptr);
155 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
156 }
157
TearDown()158 void DistributedDBCloudTaskMergeTest::TearDown()
159 {
160 virtualCloudDb_->ForkQuery(nullptr);
161 virtualCloudDb_->SetCloudError(false);
162 CloseDb();
163 EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
164 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
165 LOGE("rm test db files error.");
166 }
167 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
168 communicatorAggregator_ = nullptr;
169 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
170 }
171
InitTestDir()172 void DistributedDBCloudTaskMergeTest::InitTestDir()
173 {
174 if (!testDir_.empty()) {
175 return;
176 }
177 DistributedDBToolsUnitTest::TestDirInit(testDir_);
178 storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
179 LOGI("The test db is:%s", testDir_.c_str());
180 }
181
GetSchema()182 DataBaseSchema DistributedDBCloudTaskMergeTest::GetSchema()
183 {
184 DataBaseSchema schema;
185 for (const auto &table : tables_) {
186 TableSchema tableSchema;
187 tableSchema.name = table;
188 tableSchema.fields = {
189 {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
190 {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
191 };
192 schema.tables.push_back(tableSchema);
193 }
194 return schema;
195 }
196
CloseDb()197 void DistributedDBCloudTaskMergeTest::CloseDb()
198 {
199 virtualCloudDb_ = nullptr;
200 if (mgr_ != nullptr) {
201 EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
202 delegate_ = nullptr;
203 mgr_ = nullptr;
204 }
205 }
206
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)207 void DistributedDBCloudTaskMergeTest::InsertUserTableRecord(const std::string &tableName,
208 int64_t recordCounts, int64_t begin)
209 {
210 ASSERT_NE(db_, nullptr);
211 for (int64_t i = begin; i < begin + recordCounts; ++i) {
212 string sql = "INSERT OR REPLACE INTO " + tableName +
213 " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local" +
214 std::to_string(i) + "', '155.10', 'text', '21');";
215 ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
216 }
217 }
218
CheckCloudTableCount(const std::vector<std::string> & tableNames,int64_t expectCount)219 void DistributedDBCloudTaskMergeTest::CheckCloudTableCount(const std::vector<std::string> &tableNames,
220 int64_t expectCount)
221 {
222 for (const auto &tableName : tableNames) {
223 VBucket extend;
224 extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
225 int64_t realCount = 0;
226 std::vector<VBucket> data;
227 virtualCloudDb_->Query(tableName, extend, data);
228 for (size_t j = 0; j < data.size(); ++j) {
229 auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
230 if (entry != data[j].end() && std::get<bool>(entry->second)) {
231 continue;
232 }
233 realCount++;
234 }
235 LOGI("check table %s", tableName.c_str());
236 EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
237 }
238 }
239
SetForkQueryForCloudMergeSyncTest001(std::atomic<int> & count)240 void DistributedDBCloudTaskMergeTest::SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count)
241 {
242 virtualCloudDb_->ForkQuery([&count](const std::string &, VBucket &) {
243 count++;
244 if (count == 1) { // taskid1
245 std::this_thread::sleep_for(std::chrono::seconds(1));
246 }
247 });
248 }
249
250 /**
251 * @tc.name: CloudSyncMergeTaskTest001
252 * @tc.desc: test merge sync task
253 * @tc.type: FUNC
254 * @tc.require:
255 * @tc.author: chenchaohao
256 */
257 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest001, TestSize.Level0)
258 {
259 /**
260 * @tc.steps:step1. insert user table record.
261 * @tc.expected: step1. ok.
262 */
263 const int actualCount = 10; // 10 is count of records
264 InsertUserTableRecord(tableNameA_, actualCount);
265 InsertUserTableRecord(tableNameB_, actualCount);
266 /**
267 * @tc.steps:step2. set callback to check during sync.
268 * @tc.expected: step2. ok.
269 */
270 std::atomic<int> count = 0;
271 SetForkQueryForCloudMergeSyncTest001(count);
272
273 Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
274 CloudSyncOption option;
275 PrepareOption(option, normalQuery1, false);
276 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
277
278 std::mutex callbackMutex;
279 std::condition_variable callbackCv;
280 size_t finishCount = 0u;
__anonb94ae3c60302(const std::map<std::string, SyncProcess> &process) 281 auto callback1 = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
282 for (const auto &item: process) {
283 if (item.second.process == DistributedDB::FINISHED) {
284 {
285 std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
286 finishCount++;
287 }
288 LOGW("current finish %zu", finishCount);
289 callbackCv.notify_one();
290 }
291 }
292 };
293
294 Query normalQuery2 = Query::Select().FromTable({ tableNameB_ });
295 PrepareOption(option, normalQuery2, true);
296 ASSERT_EQ(delegate_->Sync(option, callback1), OK);
297
298 InsertUserTableRecord(tableNameC_, actualCount);
299 InsertUserTableRecord(tableNameD_, actualCount);
300
301 Query normalQuery3 = Query::Select().FromTable({ tableNameC_, tableNameD_ });
302 PrepareOption(option, normalQuery3, true);
303 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
304
305 Query normalQuery4 = Query::Select().FromTable({ tableNameB_, tableNameC_, tableNameD_ });
306 PrepareOption(option, normalQuery4, true);
307 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
308 std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonb94ae3c60402() 309 callbackCv.wait(callbackLock, [&finishCount]() {
310 return (finishCount >= 1u);
311 });
312 CheckCloudTableCount({ tableNameB_, tableNameC_, tableNameD_ }, actualCount);
313 }
314
315 /**
316 * @tc.name: CloudSyncMergeTaskTest002
317 * @tc.desc: test merge sync task with different mode.
318 * @tc.type: FUNC
319 * @tc.require:
320 * @tc.author: liaoyonghuang
321 */
322 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest002, TestSize.Level1)
323 {
324 /**
325 * @tc.steps:step1. insert user table record.
326 * @tc.expected: step1. ok.
327 */
328 const int actualCount = 10; // 10 is count of records
329 InsertUserTableRecord(tableNameA_, actualCount);
330 Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
331 CloudSyncOption option;
332 PrepareOption(option, normalQuery1, true);
333 /**
334 * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
335 * @tc.expected: step2. ok.
336 */
337 virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anonb94ae3c60502() 338 std::thread syncThread1([&]() {
339 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
340 });
341 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anonb94ae3c60602() 342 std::thread syncThread2([&]() {
343 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
344 });
345 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
346
347 /**
348 * @tc.steps:step3. start sync task 3rd.
349 * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
350 */
__anonb94ae3c60702(const std::map<std::string, SyncProcess> &process) 351 auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
352 for (const auto &item: process) {
353 ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
354 }
355 };
__anonb94ae3c60802() 356 std::thread syncThread3([&]() {
357 ASSERT_EQ(delegate_->Sync(option, callback3), OK);
358 });
359 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
360
361 /**
362 * @tc.steps:step4. start sync task 4th.
363 * @tc.expected: task was not merged because the mode is not SYNC_MODE_CLOUD_MERGE.
364 */
__anonb94ae3c60902(const std::map<std::string, SyncProcess> &process) 365 auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
366 for (const auto &item: process) {
367 ASSERT_EQ(item.second.errCode, OK);
368 }
369 };
__anonb94ae3c60a02() 370 std::thread syncThread4([&]() {
371 option.mode = SYNC_MODE_CLOUD_FORCE_PUSH;
372 ASSERT_EQ(delegate_->Sync(option, callback4), OK);
373 });
374 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
375
376 /**
377 * @tc.steps:step5. start sync task 5th.
378 * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
379 */
__anonb94ae3c60b02(const std::map<std::string, SyncProcess> &process) 380 auto callback5 = [](const std::map<std::string, SyncProcess> &process) {
381 for (const auto &item: process) {
382 ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
383 }
384 };
__anonb94ae3c60c02() 385 std::thread syncThread5([&]() {
386 option.mode = SYNC_MODE_CLOUD_MERGE;
387 ASSERT_EQ(delegate_->Sync(option, callback5), OK);
388 });
389
390 syncThread1.join();
391 syncThread2.join();
392 syncThread3.join();
393 syncThread4.join();
394 syncThread5.join();
395 }
396
397 /**
398 * @tc.name: CloudSyncMergeTaskTest003
399 * @tc.desc: test merge sync task which merge is false.
400 * @tc.type: FUNC
401 * @tc.require:
402 * @tc.author: liaoyonghuang
403 */
404 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest003, TestSize.Level1)
405 {
406 /**
407 * @tc.steps:step1. insert user table record.
408 * @tc.expected: step1. ok.
409 */
410 const int actualCount = 10; // 10 is count of records
411 InsertUserTableRecord(tableNameA_, actualCount);
412 Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
413 CloudSyncOption option;
414 PrepareOption(option, normalQuery1, true);
415 /**
416 * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
417 * @tc.expected: step2. ok.
418 */
419 virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anonb94ae3c60d02() 420 std::thread syncThread1([&]() {
421 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
422 });
423 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anonb94ae3c60e02() 424 std::thread syncThread2([&]() {
425 ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
426 });
427 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
428 /**
429 * @tc.steps:step3. start sync task 3rd.
430 * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
431 */
__anonb94ae3c60f02(const std::map<std::string, SyncProcess> &process) 432 auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
433 for (const auto &item: process) {
434 ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
435 EXPECT_EQ(item.second.tableProcess.size(), 1u);
436 for (const auto &table : item.second.tableProcess) {
437 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
438 }
439 }
440 };
__anonb94ae3c61002() 441 std::thread syncThread3([&]() {
442 ASSERT_EQ(delegate_->Sync(option, callback3), OK);
443 });
444 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
445 /**
446 * @tc.steps:step4. start sync task 4th.
447 * @tc.expected: task OK because it cannot be merged.
448 */
__anonb94ae3c61102(const std::map<std::string, SyncProcess> &process) 449 auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
450 for (const auto &item: process) {
451 ASSERT_EQ(item.second.errCode, OK);
452 }
453 };
__anonb94ae3c61202() 454 std::thread syncThread4([&]() {
455 option.merge = false;
456 ASSERT_EQ(delegate_->Sync(option, callback4), OK);
457 });
458
459 syncThread1.join();
460 syncThread2.join();
461 syncThread3.join();
462 syncThread4.join();
463 }
464 }
465 #endif