1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifdef RELATIONAL_STORE
17 #include <gtest/gtest.h>
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud/cloud_db_types.h"
20 #include "cloud_db_sync_utils_test.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "log_print.h"
24 #include "relational_store_delegate.h"
25 #include "relational_store_instance.h"
26 #include "relational_store_manager.h"
27 #include "relational_sync_able_storage.h"
28 #include "runtime_config.h"
29 #include "time_helper.h"
30 #include "virtual_asset_loader.h"
31 #include "virtual_cloud_data_translate.h"
32 #include "virtual_cloud_db.h"
33 #include "virtual_communicator_aggregator.h"
34 
35 using namespace testing::ext;
36 using namespace DistributedDB;
37 using namespace DistributedDBUnitTest;
38 
39 namespace {
40 constexpr const char *DB_SUFFIX = ".db";
41 constexpr const char *STORE_ID = "Relational_Store_ID";
42 constexpr const char *CREATE_TABLE_A_SQL =
43     "CREATE TABLE IF NOT EXISTS worker_a(" \
44     "id TEXT PRIMARY KEY," \
45     "name TEXT," \
46     "height REAL ," \
47     "photo BLOB," \
48     "age INT);";
49 constexpr const char *CREATE_TABLE_B_SQL =
50     "CREATE TABLE IF NOT EXISTS worker_b(" \
51     "id TEXT PRIMARY KEY," \
52     "name TEXT," \
53     "height REAL ," \
54     "photo BLOB," \
55     "age INT);";
56 constexpr const char *CREATE_TABLE_C_SQL =
57     "CREATE TABLE IF NOT EXISTS worker_c(" \
58     "id TEXT PRIMARY KEY," \
59     "name TEXT," \
60     "height REAL ," \
61     "photo BLOB," \
62     "age INT);";
63 constexpr const char *CREATE_TABLE_D_SQL =
64     "CREATE TABLE IF NOT EXISTS worker_d(" \
65     "id TEXT PRIMARY KEY," \
66     "name TEXT," \
67     "height REAL ," \
68     "photo BLOB," \
69     "age INT);";
70 const int64_t SYNC_WAIT_TIME = 60;
71 
CreateUserDBAndTable(sqlite3 * & db)72 void CreateUserDBAndTable(sqlite3 *&db)
73 {
74     EXPECT_EQ(RelationalTestUtils::ExecSql(db, "PRAGMA journal_mode=WAL;"), SQLITE_OK);
75     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_A_SQL), SQLITE_OK);
76     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_B_SQL), SQLITE_OK);
77     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_C_SQL), SQLITE_OK);
78     EXPECT_EQ(RelationalTestUtils::ExecSql(db, CREATE_TABLE_D_SQL), SQLITE_OK);
79 }
80 
PrepareOption(CloudSyncOption & option,const Query & query,bool merge=false)81 void PrepareOption(CloudSyncOption &option, const Query &query, bool merge = false)
82 {
83     option.devices = { "CLOUD" };
84     option.mode = SYNC_MODE_CLOUD_MERGE;
85     option.query = query;
86     option.waitTime = SYNC_WAIT_TIME;
87     option.priorityTask = false;
88     option.compensatedSyncOnly = false;
89     option.merge = merge;
90 }
91 
92 class DistributedDBCloudTaskMergeTest : public testing::Test {
93 public:
94     static void SetUpTestCase();
95     static void TearDownTestCase();
96     void SetUp() override;
97     void TearDown() override;
98 protected:
99     void InitTestDir();
100     DataBaseSchema GetSchema();
101     void CloseDb();
102     void InsertUserTableRecord(const std::string &tableName, int64_t recordCounts, int64_t begin = 0);
103     void CheckCloudTableCount(const std::vector<std::string> &tableName, int64_t expectCount);
104     void SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count);
105     std::string testDir_;
106     std::string storePath_;
107     sqlite3 *db_ = nullptr;
108     RelationalStoreDelegate *delegate_ = nullptr;
109     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
110     std::shared_ptr<VirtualAssetLoader> virtualAssetLoader_ = nullptr;
111     std::shared_ptr<RelationalStoreManager> mgr_ = nullptr;
112     std::string tableNameA_ = "worker_a";
113     std::string tableNameB_ = "worker_b";
114     std::string tableNameC_ = "worker_c";
115     std::string tableNameD_ = "worker_d";
116     std::vector<std::string> tables_ = { tableNameA_, tableNameB_, tableNameC_, tableNameD_ };
117     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
118 };
119 
SetUpTestCase()120 void DistributedDBCloudTaskMergeTest::SetUpTestCase()
121 {
122 }
123 
TearDownTestCase()124 void DistributedDBCloudTaskMergeTest::TearDownTestCase()
125 {
126 }
127 
SetUp()128 void DistributedDBCloudTaskMergeTest::SetUp()
129 {
130     DistributedDBToolsUnitTest::PrintTestCaseInfo();
131     InitTestDir();
132     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != 0) {
133         LOGE("rm test db files error.");
134     }
135     DistributedDBToolsUnitTest::PrintTestCaseInfo();
136     LOGD("Test dir is %s", testDir_.c_str());
137     db_ = RelationalTestUtils::CreateDataBase(storePath_);
138     ASSERT_NE(db_, nullptr);
139     CreateUserDBAndTable(db_);
140     mgr_ = std::make_shared<RelationalStoreManager>(APP_ID, USER_ID);
141     RelationalStoreDelegate::Option option;
142     ASSERT_EQ(mgr_->OpenStore(storePath_, STORE_ID_1, option, delegate_), DBStatus::OK);
143     ASSERT_NE(delegate_, nullptr);
144     for (const auto &table : tables_) {
145         ASSERT_EQ(delegate_->CreateDistributedTable(table, CLOUD_COOPERATION), DBStatus::OK);
146     }
147     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
148     virtualAssetLoader_ = std::make_shared<VirtualAssetLoader>();
149     ASSERT_EQ(delegate_->SetCloudDB(virtualCloudDb_), DBStatus::OK);
150     ASSERT_EQ(delegate_->SetIAssetLoader(virtualAssetLoader_), DBStatus::OK);
151     DataBaseSchema dataBaseSchema = GetSchema();
152     ASSERT_EQ(delegate_->SetCloudDbSchema(dataBaseSchema), DBStatus::OK);
153     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
154     ASSERT_TRUE(communicatorAggregator_ != nullptr);
155     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
156 }
157 
TearDown()158 void DistributedDBCloudTaskMergeTest::TearDown()
159 {
160     virtualCloudDb_->ForkQuery(nullptr);
161     virtualCloudDb_->SetCloudError(false);
162     CloseDb();
163     EXPECT_EQ(sqlite3_close_v2(db_), SQLITE_OK);
164     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(testDir_) != E_OK) {
165         LOGE("rm test db files error.");
166     }
167     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
168     communicatorAggregator_ = nullptr;
169     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
170 }
171 
InitTestDir()172 void DistributedDBCloudTaskMergeTest::InitTestDir()
173 {
174     if (!testDir_.empty()) {
175         return;
176     }
177     DistributedDBToolsUnitTest::TestDirInit(testDir_);
178     storePath_ = testDir_ + "/" + STORE_ID_1 + ".db";
179     LOGI("The test db is:%s", testDir_.c_str());
180 }
181 
GetSchema()182 DataBaseSchema DistributedDBCloudTaskMergeTest::GetSchema()
183 {
184     DataBaseSchema schema;
185     for (const auto &table : tables_) {
186         TableSchema tableSchema;
187         tableSchema.name = table;
188         tableSchema.fields = {
189             {"id", TYPE_INDEX<std::string>, true}, {"name", TYPE_INDEX<std::string>}, {"height", TYPE_INDEX<double>},
190             {"photo", TYPE_INDEX<Bytes>}, {"age", TYPE_INDEX<int64_t>}
191         };
192         schema.tables.push_back(tableSchema);
193     }
194     return schema;
195 }
196 
CloseDb()197 void DistributedDBCloudTaskMergeTest::CloseDb()
198 {
199     virtualCloudDb_ = nullptr;
200     if (mgr_ != nullptr) {
201         EXPECT_EQ(mgr_->CloseStore(delegate_), DBStatus::OK);
202         delegate_ = nullptr;
203         mgr_ = nullptr;
204     }
205 }
206 
InsertUserTableRecord(const std::string & tableName,int64_t recordCounts,int64_t begin)207 void DistributedDBCloudTaskMergeTest::InsertUserTableRecord(const std::string &tableName,
208     int64_t recordCounts, int64_t begin)
209 {
210     ASSERT_NE(db_, nullptr);
211     for (int64_t i = begin; i < begin + recordCounts; ++i) {
212         string sql = "INSERT OR REPLACE INTO " + tableName +
213             " (id, name, height, photo, age) VALUES ('" + std::to_string(i) + "', 'Local" +
214             std::to_string(i) + "', '155.10',  'text', '21');";
215         ASSERT_EQ(SQLiteUtils::ExecuteRawSQL(db_, sql), E_OK);
216     }
217 }
218 
CheckCloudTableCount(const std::vector<std::string> & tableNames,int64_t expectCount)219 void DistributedDBCloudTaskMergeTest::CheckCloudTableCount(const std::vector<std::string> &tableNames,
220     int64_t expectCount)
221 {
222     for (const auto &tableName : tableNames) {
223         VBucket extend;
224         extend[CloudDbConstant::CURSOR_FIELD] = std::to_string(0);
225         int64_t realCount = 0;
226         std::vector<VBucket> data;
227         virtualCloudDb_->Query(tableName, extend, data);
228         for (size_t j = 0; j < data.size(); ++j) {
229             auto entry = data[j].find(CloudDbConstant::DELETE_FIELD);
230             if (entry != data[j].end() && std::get<bool>(entry->second)) {
231                 continue;
232             }
233             realCount++;
234         }
235         LOGI("check table %s", tableName.c_str());
236         EXPECT_EQ(realCount, expectCount); // ExpectCount represents the total amount of cloud data.
237     }
238 }
239 
SetForkQueryForCloudMergeSyncTest001(std::atomic<int> & count)240 void DistributedDBCloudTaskMergeTest::SetForkQueryForCloudMergeSyncTest001(std::atomic<int> &count)
241 {
242     virtualCloudDb_->ForkQuery([&count](const std::string &, VBucket &) {
243         count++;
244         if (count == 1) { // taskid1
245             std::this_thread::sleep_for(std::chrono::seconds(1));
246         }
247     });
248 }
249 
250 /**
251  * @tc.name: CloudSyncMergeTaskTest001
252  * @tc.desc: test merge sync task
253  * @tc.type: FUNC
254  * @tc.require:
255  * @tc.author: chenchaohao
256  */
257 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest001, TestSize.Level0)
258 {
259     /**
260      * @tc.steps:step1. insert user table record.
261      * @tc.expected: step1. ok.
262      */
263     const int actualCount = 10; // 10 is count of records
264     InsertUserTableRecord(tableNameA_, actualCount);
265     InsertUserTableRecord(tableNameB_, actualCount);
266     /**
267      * @tc.steps:step2. set callback to check during sync.
268      * @tc.expected: step2. ok.
269      */
270     std::atomic<int> count = 0;
271     SetForkQueryForCloudMergeSyncTest001(count);
272 
273     Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
274     CloudSyncOption option;
275     PrepareOption(option, normalQuery1, false);
276     ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
277 
278     std::mutex callbackMutex;
279     std::condition_variable callbackCv;
280     size_t finishCount = 0u;
__anonb94ae3c60302(const std::map<std::string, SyncProcess> &process) 281     auto callback1 = [&callbackCv, &callbackMutex, &finishCount](const std::map<std::string, SyncProcess> &process) {
282         for (const auto &item: process) {
283             if (item.second.process == DistributedDB::FINISHED) {
284                 {
285                     std::lock_guard<std::mutex> callbackAutoLock(callbackMutex);
286                     finishCount++;
287                 }
288                 LOGW("current finish %zu", finishCount);
289                 callbackCv.notify_one();
290             }
291         }
292     };
293 
294     Query normalQuery2 = Query::Select().FromTable({ tableNameB_ });
295     PrepareOption(option, normalQuery2, true);
296     ASSERT_EQ(delegate_->Sync(option, callback1), OK);
297 
298     InsertUserTableRecord(tableNameC_, actualCount);
299     InsertUserTableRecord(tableNameD_, actualCount);
300 
301     Query normalQuery3 = Query::Select().FromTable({ tableNameC_, tableNameD_ });
302     PrepareOption(option, normalQuery3, true);
303     ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
304 
305     Query normalQuery4 = Query::Select().FromTable({ tableNameB_, tableNameC_, tableNameD_ });
306     PrepareOption(option, normalQuery4, true);
307     ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
308     std::unique_lock<std::mutex> callbackLock(callbackMutex);
__anonb94ae3c60402() 309     callbackCv.wait(callbackLock, [&finishCount]() {
310         return (finishCount >= 1u);
311     });
312     CheckCloudTableCount({ tableNameB_, tableNameC_, tableNameD_ }, actualCount);
313 }
314 
315 /**
316  * @tc.name: CloudSyncMergeTaskTest002
317  * @tc.desc: test merge sync task with different mode.
318  * @tc.type: FUNC
319  * @tc.require:
320  * @tc.author: liaoyonghuang
321  */
322 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest002, TestSize.Level1)
323 {
324     /**
325      * @tc.steps:step1. insert user table record.
326      * @tc.expected: step1. ok.
327      */
328     const int actualCount = 10; // 10 is count of records
329     InsertUserTableRecord(tableNameA_, actualCount);
330     Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
331     CloudSyncOption option;
332     PrepareOption(option, normalQuery1, true);
333     /**
334      * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
335      * @tc.expected: step2. ok.
336      */
337     virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anonb94ae3c60502() 338     std::thread syncThread1([&]() {
339         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
340     });
341     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anonb94ae3c60602() 342     std::thread syncThread2([&]() {
343         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
344     });
345     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
346 
347     /**
348      * @tc.steps:step3. start sync task 3rd.
349      * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
350      */
__anonb94ae3c60702(const std::map<std::string, SyncProcess> &process) 351     auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
352         for (const auto &item: process) {
353             ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
354         }
355     };
__anonb94ae3c60802() 356     std::thread syncThread3([&]() {
357         ASSERT_EQ(delegate_->Sync(option, callback3), OK);
358     });
359     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
360 
361     /**
362      * @tc.steps:step4. start sync task 4th.
363      * @tc.expected: task was not merged because the mode is not SYNC_MODE_CLOUD_MERGE.
364      */
__anonb94ae3c60902(const std::map<std::string, SyncProcess> &process) 365     auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
366         for (const auto &item: process) {
367             ASSERT_EQ(item.second.errCode, OK);
368         }
369     };
__anonb94ae3c60a02() 370     std::thread syncThread4([&]() {
371         option.mode = SYNC_MODE_CLOUD_FORCE_PUSH;
372         ASSERT_EQ(delegate_->Sync(option, callback4), OK);
373     });
374     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
375 
376     /**
377      * @tc.steps:step5. start sync task 5th.
378      * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
379      */
__anonb94ae3c60b02(const std::map<std::string, SyncProcess> &process) 380     auto callback5 = [](const std::map<std::string, SyncProcess> &process) {
381         for (const auto &item: process) {
382             ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
383         }
384     };
__anonb94ae3c60c02() 385     std::thread syncThread5([&]() {
386         option.mode = SYNC_MODE_CLOUD_MERGE;
387         ASSERT_EQ(delegate_->Sync(option, callback5), OK);
388     });
389 
390     syncThread1.join();
391     syncThread2.join();
392     syncThread3.join();
393     syncThread4.join();
394     syncThread5.join();
395 }
396 
397 /**
398  * @tc.name: CloudSyncMergeTaskTest003
399  * @tc.desc: test merge sync task which merge is false.
400  * @tc.type: FUNC
401  * @tc.require:
402  * @tc.author: liaoyonghuang
403  */
404 HWTEST_F(DistributedDBCloudTaskMergeTest, CloudSyncMergeTaskTest003, TestSize.Level1)
405 {
406     /**
407      * @tc.steps:step1. insert user table record.
408      * @tc.expected: step1. ok.
409      */
410     const int actualCount = 10; // 10 is count of records
411     InsertUserTableRecord(tableNameA_, actualCount);
412     Query normalQuery1 = Query::Select().FromTable({ tableNameA_ });
413     CloudSyncOption option;
414     PrepareOption(option, normalQuery1, true);
415     /**
416      * @tc.steps:step2. set 2s block time for sync task 1st, and start sync task 2nd.
417      * @tc.expected: step2. ok.
418      */
419     virtualCloudDb_->SetBlockTime(2000); // block 1st sync task 2s.
__anonb94ae3c60d02() 420     std::thread syncThread1([&]() {
421         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
422     });
423     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
__anonb94ae3c60e02() 424     std::thread syncThread2([&]() {
425         ASSERT_EQ(delegate_->Sync(option, nullptr), OK);
426     });
427     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
428     /**
429      * @tc.steps:step3. start sync task 3rd.
430      * @tc.expected: task CLOUD_SYNC_TASK_MERGED because it was merged into Task 2.
431      */
__anonb94ae3c60f02(const std::map<std::string, SyncProcess> &process) 432     auto callback3 = [](const std::map<std::string, SyncProcess> &process) {
433         for (const auto &item: process) {
434             ASSERT_EQ(item.second.errCode, CLOUD_SYNC_TASK_MERGED);
435             EXPECT_EQ(item.second.tableProcess.size(), 1u);
436             for (const auto &table : item.second.tableProcess) {
437                 EXPECT_EQ(table.second.process, ProcessStatus::FINISHED);
438             }
439         }
440     };
__anonb94ae3c61002() 441     std::thread syncThread3([&]() {
442         ASSERT_EQ(delegate_->Sync(option, callback3), OK);
443     });
444     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // sleep 100ms
445     /**
446      * @tc.steps:step4. start sync task 4th.
447      * @tc.expected: task OK because it cannot be merged.
448      */
__anonb94ae3c61102(const std::map<std::string, SyncProcess> &process) 449     auto callback4 = [](const std::map<std::string, SyncProcess> &process) {
450         for (const auto &item: process) {
451             ASSERT_EQ(item.second.errCode, OK);
452         }
453     };
__anonb94ae3c61202() 454     std::thread syncThread4([&]() {
455         option.merge = false;
456         ASSERT_EQ(delegate_->Sync(option, callback4), OK);
457     });
458 
459     syncThread1.join();
460     syncThread2.join();
461     syncThread3.join();
462     syncThread4.join();
463 }
464 }
465 #endif