1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "db_base64_utils.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "kv_virtual_device.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kvdb_manager.h"
26 #include "platform_specific.h"
27 #include "process_system_api_adapter_impl.h"
28 #include "sqlite_cloud_kv_executor_utils.h"
29 #include "virtual_communicator_aggregator.h"
30 #include "virtual_cloud_db.h"
31 
32 using namespace testing::ext;
33 using namespace DistributedDB;
34 using namespace DistributedDBUnitTest;
35 using namespace std;
36 
37 namespace {
38 static std::string HWM_HEAD = "naturalbase_cloud_meta_sync_data_";
39 string g_testDir;
40 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
41 CloudSyncOption g_CloudSyncoption;
42 const std::string USER_ID_2 = "user2";
43 const std::string USER_ID_3 = "user3";
44 const int64_t WAIT_TIME = 5;
45 class DistributedDBCloudKvSyncerTest : public testing::Test {
46 public:
47     static void SetUpTestCase();
48     static void TearDownTestCase();
49     void SetUp();
50     void TearDown();
51 protected:
52     DBStatus GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId, KvStoreNbDelegate::Option option,
53         bool invalidSchema = false);
54     void CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId);
55     void BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus, CloudSyncOption option,
56         DBStatus expectSyncResult = OK);
57     static DataBaseSchema GetDataBaseSchema(bool invalidSchema);
58     void GetSingleStore();
59     void ReleaseSingleStore();
60     void BlockCompensatedSync(int &actSyncCnt, int expSyncCnt);
61     void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
62     std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
63     std::shared_ptr<VirtualCloudDb> virtualCloudDb2_ = nullptr;
64     KvStoreConfig config_;
65     KvStoreNbDelegate* kvDelegatePtrS1_ = nullptr;
66     KvStoreNbDelegate* kvDelegatePtrS2_ = nullptr;
67     SyncProcess lastProcess_;
68     VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
69     KvVirtualDevice *deviceB_ = nullptr;
70     SQLiteSingleVerNaturalStore *singleStore_ = nullptr;
71     std::mutex comSyncMutex;
72     std::condition_variable comSyncCv;
73 };
74 
SetUpTestCase()75 void DistributedDBCloudKvSyncerTest::SetUpTestCase()
76 {
77     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
78     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
79         LOGE("rm test db files error!");
80     }
81     g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
82     g_CloudSyncoption.users.push_back(USER_ID);
83     g_CloudSyncoption.devices.push_back("cloud");
84 
85     string dir = g_testDir + "/single_ver";
86     DIR* dirTmp = opendir(dir.c_str());
87     if (dirTmp == nullptr) {
88         OS::MakeDBDirectory(dir);
89     } else {
90         closedir(dirTmp);
91     }
92 }
93 
TearDownTestCase()94 void DistributedDBCloudKvSyncerTest::TearDownTestCase()
95 {
96     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
97         LOGE("rm test db files error!");
98     }
99 }
100 
SetUp()101 void DistributedDBCloudKvSyncerTest::SetUp()
102 {
103     DistributedDBToolsUnitTest::PrintTestCaseInfo();
104     config_.dataDir = g_testDir;
105     /**
106      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
107      */
108     virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
109     virtualCloudDb2_ = std::make_shared<VirtualCloudDb>();
110     g_mgr.SetKvStoreConfig(config_);
111     KvStoreNbDelegate::Option option1;
112     ASSERT_EQ(GetKvStore(kvDelegatePtrS1_, STORE_ID_1, option1), OK);
113     // set aggregator after get store1, only store2 can sync with p2p
114     communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
115     ASSERT_TRUE(communicatorAggregator_ != nullptr);
116     RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
117     KvStoreNbDelegate::Option option2;
118     ASSERT_EQ(GetKvStore(kvDelegatePtrS2_, STORE_ID_2, option2), OK);
119 
120     deviceB_ = new (std::nothrow) KvVirtualDevice("DEVICE_B");
121     ASSERT_TRUE(deviceB_ != nullptr);
122     auto syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
123     ASSERT_TRUE(syncInterfaceB != nullptr);
124     ASSERT_EQ(deviceB_->Initialize(communicatorAggregator_, syncInterfaceB), E_OK);
125     GetSingleStore();
126 }
127 
TearDown()128 void DistributedDBCloudKvSyncerTest::TearDown()
129 {
130     ReleaseSingleStore();
131     CloseKvStore(kvDelegatePtrS1_, STORE_ID_1);
132     CloseKvStore(kvDelegatePtrS2_, STORE_ID_2);
133     virtualCloudDb_ = nullptr;
134     virtualCloudDb2_ = nullptr;
135     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
136         LOGE("rm test db files error!");
137     }
138 
139     if (deviceB_ != nullptr) {
140         delete deviceB_;
141         deviceB_ = nullptr;
142     }
143 
144     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
145     communicatorAggregator_ = nullptr;
146     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
147 }
148 
BlockSync(KvStoreNbDelegate * delegate,DBStatus expectDBStatus,CloudSyncOption option,DBStatus expectSyncResult)149 void DistributedDBCloudKvSyncerTest::BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus,
150     CloudSyncOption option, DBStatus expectSyncResult)
151 {
152     if (delegate == nullptr) {
153         return;
154     }
155     std::mutex dataMutex;
156     std::condition_variable cv;
157     bool finish = false;
158     SyncProcess last;
159     auto callback = [expectDBStatus, &last, &cv, &dataMutex, &finish, &option](const std::map<std::string,
160         SyncProcess> &process) {
161         size_t notifyCnt = 0;
162         for (const auto &item: process) {
163             LOGD("user = %s, status = %d, errCode = %d", item.first.c_str(), item.second.process, item.second.errCode);
164             if (item.second.process != DistributedDB::FINISHED) {
165                 continue;
166             }
167             EXPECT_EQ(item.second.errCode, expectDBStatus);
168             {
169                 std::lock_guard<std::mutex> autoLock(dataMutex);
170                 notifyCnt++;
171                 std::set<std::string> userSet(option.users.begin(), option.users.end());
172                 if (notifyCnt == userSet.size()) {
173                     finish = true;
174                     last = item.second;
175                     cv.notify_one();
176                 }
177             }
178         }
179     };
180     auto actualRet = delegate->Sync(option, callback);
181     EXPECT_EQ(actualRet, expectSyncResult);
182     if (actualRet == OK) {
183         std::unique_lock<std::mutex> uniqueLock(dataMutex);
184         cv.wait(uniqueLock, [&finish]() {
185             return finish;
186         });
187     }
188     lastProcess_ = last;
189 }
190 
GetDataBaseSchema(bool invalidSchema)191 DataBaseSchema DistributedDBCloudKvSyncerTest::GetDataBaseSchema(bool invalidSchema)
192 {
193     DataBaseSchema schema;
194     TableSchema tableSchema;
195     tableSchema.name = invalidSchema ? "invalid_schema_name" : CloudDbConstant::CLOUD_KV_TABLE_NAME;
196     Field field;
197     field.colName = CloudDbConstant::CLOUD_KV_FIELD_KEY;
198     field.type = TYPE_INDEX<std::string>;
199     field.primary = true;
200     tableSchema.fields.push_back(field);
201     field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE;
202     field.primary = false;
203     tableSchema.fields.push_back(field);
204     field.colName = CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE;
205     tableSchema.fields.push_back(field);
206     field.colName = CloudDbConstant::CLOUD_KV_FIELD_VALUE;
207     tableSchema.fields.push_back(field);
208     field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME;
209     field.type = TYPE_INDEX<int64_t>;
210     tableSchema.fields.push_back(field);
211     schema.tables.push_back(tableSchema);
212     return schema;
213 }
214 
GetSingleStore()215 void DistributedDBCloudKvSyncerTest::GetSingleStore()
216 {
217     KvDBProperties prop;
218     prop.SetStringProp(KvDBProperties::USER_ID, USER_ID);
219     prop.SetStringProp(KvDBProperties::APP_ID, APP_ID);
220     prop.SetStringProp(KvDBProperties::STORE_ID, STORE_ID_1);
221 
222     std::string hashIdentifier = DBCommon::TransferHashString(
223         DBCommon::GenerateIdentifierId(STORE_ID_1, APP_ID, USER_ID, "", 0));
224     prop.SetStringProp(DBProperties::IDENTIFIER_DATA, hashIdentifier);
225     prop.SetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::SINGLE_VER_TYPE_SQLITE);
226     int errCode = E_OK;
227     singleStore_ = static_cast<SQLiteSingleVerNaturalStore *>(KvDBManager::OpenDatabase(prop, errCode));
228     ASSERT_NE(singleStore_, nullptr);
229 }
230 
ReleaseSingleStore()231 void DistributedDBCloudKvSyncerTest::ReleaseSingleStore()
232 {
233     RefObject::DecObjRef(singleStore_);
234     singleStore_ = nullptr;
235 }
236 
BlockCompensatedSync(int & actSyncCnt,int expSyncCnt)237 void DistributedDBCloudKvSyncerTest::BlockCompensatedSync(int &actSyncCnt, int expSyncCnt)
238 {
239     {
240         std::unique_lock<std::mutex> lock(comSyncMutex);
241         bool result = comSyncCv.wait_for(lock, std::chrono::seconds(WAIT_TIME),
242             [&actSyncCnt, expSyncCnt]() { return actSyncCnt == expSyncCnt; });
243         ASSERT_EQ(result, true);
244     }
245 }
246 
247 
GetKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId,KvStoreNbDelegate::Option option,bool invalidSchema)248 DBStatus DistributedDBCloudKvSyncerTest::GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId,
249     KvStoreNbDelegate::Option option, bool invalidSchema)
250 {
251     DBStatus openRet = OK;
252     g_mgr.GetKvStore(storeId, option, [&openRet, &delegate](DBStatus status, KvStoreNbDelegate *openDelegate) {
253         openRet = status;
254         delegate = openDelegate;
255     });
256     EXPECT_EQ(openRet, OK);
257     EXPECT_NE(delegate, nullptr);
258 
259     std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
260     cloudDbs[USER_ID] = virtualCloudDb_;
261     cloudDbs[USER_ID_2] = virtualCloudDb2_;
262     delegate->SetCloudDB(cloudDbs);
263     std::map<std::string, DataBaseSchema> schemas;
264     schemas[USER_ID] = GetDataBaseSchema(invalidSchema);
265     schemas[USER_ID_2] = GetDataBaseSchema(invalidSchema);
266     return delegate->SetCloudDbSchema(schemas);
267 }
268 
CloseKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId)269 void DistributedDBCloudKvSyncerTest::CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId)
270 {
271     if (delegate != nullptr) {
272         ASSERT_EQ(g_mgr.CloseKvStore(delegate), OK);
273         delegate = nullptr;
274         DBStatus status = g_mgr.DeleteKvStore(storeId);
275         LOGD("delete kv store status %d store %s", status, storeId.c_str());
276         ASSERT_EQ(status, OK);
277     }
278 }
279 
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)280 void DistributedDBCloudKvSyncerTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
281 {
282     sqlite3 *db_;
283     uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
284     std::string fileUrl = g_testDir + "/" \
285         "2d23c8a0ffadafcaa03507a4ec2290c83babddcab07c0e2945fbba93efc7eec0/single_ver/main/gen_natural_store.db";
286     EXPECT_EQ(sqlite3_open_v2(fileUrl.c_str(), &db_, flag, nullptr), SQLITE_OK);
287 
288     std::string sql = "SELECT count(*) FROM naturalbase_kv_aux_sync_data_log WHERE ";
289     switch (opType) {
290         case OpType::INSERT:
291             sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
292                 " cloud_gid != '' AND version !='' AND cloud_flag=cloud_flag|0x10";
293             break;
294         case OpType::UPDATE:
295             sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
296                 " cloud_gid == '' AND version =='' AND cloud_flag=cloud_flag|0x10";
297             break;
298         case OpType::DELETE:
299             sql += " cloud_gid == '' AND version ==''";
300             break;
301         default:
302             break;
303     }
304     EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
305         reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
306     sqlite3_close_v2(db_);
307 }
308 
309 /**
310  * @tc.name: UploadAbnormalSync001
311  * @tc.desc: Test upload update record, cloud returned record not found.
312  * @tc.type: FUNC
313  * @tc.require:
314  * @tc.author: bty
315  */
316 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync001, TestSize.Level0)
317 {
318     auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
319     ASSERT_NE(cloudHook, nullptr);
320 
321     /**
322      * @tc.steps:step1. Device A inserts data and synchronizes
323      * @tc.expected: step1 OK.
324      */
325     Key key = {'k'};
326     Value value = {'v'};
327     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
328     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
329 
330     /**
331      * @tc.steps:step2. Device A update data and synchronizes, cloud returned record not found
332      * @tc.expected: step2 OK.
333      */
334     Value value2 = {'x'};
335     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value2), OK);
336     int upIdx = 0;
__anon644cd9970602(const std::string &tableName, VBucket &extend) 337     virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
338         LOGD("cloud db upload index:%d", ++upIdx);
339         if (upIdx == 1) { // 1 is index
340             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
341         }
342     });
343     int syncCnt = 0;
__anon644cd9970702null344     cloudHook->SetSyncFinishHook([&syncCnt, this] {
345         LOGD("sync finish times:%d", ++syncCnt);
346         if (syncCnt == 1) { // 1 is the first sync
347             CheckUploadAbnormal(OpType::UPDATE, 1L); // 1 is expected count
348         } else {
349             CheckUploadAbnormal(OpType::UPDATE, 1L, true); // 1 is expected count
350         }
351         comSyncCv.notify_all();
352     });
353     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
354     BlockCompensatedSync(syncCnt, 2); // 2 is sync times
355     virtualCloudDb_->ForkUpload(nullptr);
356     cloudHook->SetSyncFinishHook(nullptr);
357 }
358 
359 /**
360  * @tc.name: UploadAbnormalSync002
361  * @tc.desc: Test upload insert record, cloud returned record already existed.
362  * @tc.type: FUNC
363  * @tc.require:
364  * @tc.author: bty
365  */
366 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync002, TestSize.Level0)
367 {
368     auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
369     ASSERT_NE(cloudHook, nullptr);
370 
371     /**
372      * @tc.steps:step1. Device A inserts k-v and synchronizes
373      * @tc.expected: step1 OK.
374      */
375     Key key = {'k'};
376     Value value = {'v'};
377     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
378     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
379 
380     /**
381      * @tc.steps:step2. Device A insert k2-v2 and synchronizes, Device B insert k2-v2 and sync before A upload
382      * @tc.expected: step2 OK.
383      */
384     Key key2 = {'x'};
385     Value value2 = {'y'};
386     ASSERT_EQ(kvDelegatePtrS1_->Put(key2, value2), OK);
387     int upIdx = 0;
__anon644cd9970802(const std::string &tableName, VBucket &extend) 388     virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
389         LOGD("cloud db upload index:%d", ++upIdx);
390         if (upIdx == 2) { // 2 is index
391             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
392         }
393     });
394     int doUpIdx = 0;
__anon644cd9970902null395     cloudHook->SetDoUploadHook([&doUpIdx, key2, value2, this] {
396         LOGD("begin upload index:%d", ++doUpIdx);
397         ASSERT_EQ(kvDelegatePtrS2_->Put(key2, value2), OK);
398         BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
399     });
400     int syncCnt = 0;
__anon644cd9970a02null401     cloudHook->SetSyncFinishHook([&syncCnt, this] {
402         LOGD("sync finish times:%d", ++syncCnt);
403         if (syncCnt == 1) { // 1 is the normal sync
404             CheckUploadAbnormal(OpType::INSERT, 1L); // 1 is expected count
405         } else {
406             CheckUploadAbnormal(OpType::INSERT, 2L, true); // 2 is expected count
407         }
408         comSyncCv.notify_all();
409     });
410     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
411     BlockCompensatedSync(syncCnt, 2); // 2 is sync times
412     virtualCloudDb_->ForkUpload(nullptr);
413     cloudHook->SetSyncFinishHook(nullptr);
414     cloudHook->SetDoUploadHook(nullptr);
415 }
416 
417 /**
418  * @tc.name: UploadAbnormalSync003
419  * @tc.desc: Test upload delete record, cloud returned record not found.
420  * @tc.type: FUNC
421  * @tc.require:
422  * @tc.author: bty
423  */
424 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync003, TestSize.Level0)
425 {
426     auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
427     ASSERT_NE(cloudHook, nullptr);
428 
429     /**
430      * @tc.steps:step1. Device A inserts data and synchronizes
431      * @tc.expected: step1 OK.
432      */
433     Key key = {'k'};
434     Value value = {'v'};
435     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
436     BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
437     BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
438 
439     /**
440      * @tc.steps:step2. Device A delete data and synchronizes, Device B delete data and sync before A upload
441      * @tc.expected: step2 OK.
442      */
443     ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
444     int upIdx = 0;
__anon644cd9970b02(const std::string &tableName, VBucket &extend) 445     virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
446         LOGD("cloud db upload index:%d", ++upIdx);
447         if (upIdx == 2) { // 2 is index
448             extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
449         }
450     });
451     int doUpIdx = 0;
__anon644cd9970c02null452     cloudHook->SetDoUploadHook([&doUpIdx, key, this] {
453         LOGD("begin upload index:%d", ++doUpIdx);
454         ASSERT_EQ(kvDelegatePtrS2_->Delete(key), OK);
455         BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
456     });
457     int syncCnt = 0;
__anon644cd9970d02null458     cloudHook->SetSyncFinishHook([&syncCnt, this] {
459         LOGD("sync finish times:%d", ++syncCnt);
460         if (syncCnt == 1) { // 1 is the normal sync
461             CheckUploadAbnormal(OpType::DELETE, 1L); // 1 is expected count
462         } else {
463             CheckUploadAbnormal(OpType::DELETE, 1L, true); // 1 is expected count
464         }
465         comSyncCv.notify_all();
466     });
467     BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
468     BlockCompensatedSync(syncCnt, 1); // 1 is sync times
469     virtualCloudDb_->ForkUpload(nullptr);
470     cloudHook->SetSyncFinishHook(nullptr);
471     cloudHook->SetDoUploadHook(nullptr);
472 }
473 
474 /**
475  * @tc.name: QueryParsingProcessTest001
476  * @tc.desc: Test Query parsing process.
477  * @tc.type: FUNC
478  * @tc.require:
479  * @tc.author: luoguo
480  */
481 HWTEST_F(DistributedDBCloudKvSyncerTest, QueryParsingProcessTest001, TestSize.Level0)
482 {
483     auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
484     ASSERT_NE(cloudHook, nullptr);
485 
486     /**
487      * @tc.steps:step1. Device A inserts data and synchronizes
488      * @tc.expected: step1 OK.
489      */
490     Key key = {'k'};
491     Value value = {'v'};
492     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
493 
494     /**
495      * @tc.steps:step2. Test Query parsing Process
496      * @tc.expected: step2 OK.
497      */
498     QuerySyncObject syncObject;
499     std::vector<VBucket> syncDataPk;
500     VBucket bucket;
501     bucket.insert_or_assign(std::string("k"), std::string("k"));
502     syncDataPk.push_back(bucket);
503     std::string tableName = "sync_data";
504     ASSERT_EQ(CloudStorageUtils::GetSyncQueryByPk(tableName, syncDataPk, true, syncObject), E_OK);
505 
506     Bytes bytes;
507     bytes.resize(syncObject.CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
508     Parcel parcel(bytes.data(), bytes.size());
509     ASSERT_EQ(syncObject.SerializeData(parcel, SOFTWARE_VERSION_CURRENT), E_OK);
510 
511     /**
512      * @tc.steps:step3. Check Node's type is QueryNodeType::IN.
513      * @tc.expected: step3 OK.
514      */
515     std::vector<QueryNode> queryNodes;
516     syncObject.ParserQueryNodes(bytes, queryNodes);
517     ASSERT_EQ(queryNodes[0].type, QueryNodeType::IN);
518 }
519 
520 /**
521  * @tc.name: SyncWithMultipleUsers001.
522  * @tc.desc: Test sync data with multiple users.
523  * @tc.type: FUNC
524  * @tc.require:
525  * @tc.author: liufuchenxing
526  */
527 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers001, TestSize.Level0)
528 {
529     Key key = {'k'};
530     Value value = {'v'};
531     ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
532     CloudSyncOption syncOption;
533     syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
534     syncOption.users.push_back(USER_ID);
535     syncOption.users.push_back(USER_ID_2);
536     syncOption.devices.push_back("cloud");
537     BlockSync(kvDelegatePtrS1_, OK, syncOption);
538     BlockSync(kvDelegatePtrS2_, OK, syncOption);
539     Value actualValue;
540     // cloud download [k,v]
541     EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
542     EXPECT_EQ(actualValue, value);
543 }
544 
545 /**
546  * @tc.name: SyncWithMultipleUsers002.
547  * @tc.desc: test whether upload to the cloud after delete local data that does not have a gid.
548  * @tc.type: FUNC
549  * @tc.require:
550  * @tc.author: luoguo
551  */
552 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers002, TestSize.Level0)
553 {
554     /**
555      * @tc.steps: step1. kvDelegatePtrS1_ put 200 data and sync to cloud.
556      * @tc.expected: step1. return ok.
557      */
558     std::vector<Entry> entries;
559     for (int i = 0; i < 200; i++) {
560         std::string keyStr = "k_" + std::to_string(i);
561         std::string valueStr = "v_" + std::to_string(i);
562         Key key(keyStr.begin(), keyStr.end());
563         Value value(valueStr.begin(), valueStr.end());
564         Entry entry;
565         entry.key = key;
566         entry.value = value;
567         entries.push_back(entry);
568     }
569 
570     ASSERT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
571     CloudSyncOption syncOption;
572     syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
573     syncOption.users.push_back(USER_ID);
574     syncOption.users.push_back(USER_ID_2);
575     syncOption.devices.push_back("cloud");
576     BlockSync(kvDelegatePtrS1_, OK, syncOption);
577 
578     /**
579      * @tc.steps: step2. kvDelegatePtrS2_ only sync user0 from cloud.
580      * @tc.expected: step2. return ok.
581      */
582     syncOption.users.clear();
583     syncOption.users.push_back(USER_ID);
584     BlockSync(kvDelegatePtrS2_, OK, syncOption);
585 
586     /**
587      * @tc.steps: step3. kvDelegatePtrS2_ delete 100 data.
588      * @tc.expected: step3. return ok.
589      */
590     std::vector<Key> keys;
591     for (int i = 0; i < 100; i++) {
592         std::string keyStr = "k_" + std::to_string(i);
593         Key key(keyStr.begin(), keyStr.end());
594         keys.push_back(key);
595     }
596     ASSERT_EQ(kvDelegatePtrS2_->DeleteBatch(keys), OK);
597 
598     /**
599      * @tc.steps: step4. kvDelegatePtrS2_ sync to cloud with user0 user2.
600      * @tc.expected: step4. return ok.
601      */
602     syncOption.users.clear();
603     syncOption.users.push_back(USER_ID);
604     syncOption.users.push_back(USER_ID_2);
605 
606     std::mutex dataMutex;
607     std::condition_variable cv;
608     bool finish = false;
609     uint32_t insertCount = 0;
610     auto callback = [&dataMutex, &cv, &finish, &insertCount, &syncOption](const std::map<std::string,
__anon644cd9970e02(const std::map<std::string, SyncProcess> &process) 611         SyncProcess> &process) {
612         size_t notifyCnt = 0;
613         for (const auto &item : process) {
614             LOGD("user = %s, status = %d, errCode=%d", item.first.c_str(), item.second.process, item.second.errCode);
615             if (item.second.process != DistributedDB::FINISHED) {
616                 continue;
617             }
618             EXPECT_EQ(item.second.errCode, OK);
619             {
620                 std::lock_guard<std::mutex> autoLock(dataMutex);
621                 notifyCnt++;
622                 std::set<std::string> userSet(syncOption.users.begin(), syncOption.users.end());
623                 if (notifyCnt == userSet.size()) {
624                     finish = true;
625                     std::map<std::string, TableProcessInfo> tableProcess(item.second.tableProcess);
626                     insertCount = tableProcess["sync_data"].downLoadInfo.insertCount;
627                     cv.notify_one();
628                 }
629             }
630         }
631     };
632     auto actualRet = kvDelegatePtrS2_->Sync(syncOption, callback);
633     EXPECT_EQ(actualRet, OK);
634     if (actualRet == OK) {
635         std::unique_lock<std::mutex> uniqueLock(dataMutex);
__anon644cd9970f02() 636         cv.wait(uniqueLock, [&finish]() { return finish; });
637     }
638     /**
639      * @tc.steps: step5. check process info, user2's insertCount should be 0.
640      * @tc.expected: step5. return ok.
641      */
642     EXPECT_EQ(insertCount, 0u);
643 }
644 
645 /**
646  * @tc.name: DeviceCollaborationTest001
647  * @tc.desc: Check force override data
648  * @tc.type: FUNC
649  * @tc.require:
650  * @tc.author: zqq
651  */
652 HWTEST_F(DistributedDBCloudKvSyncerTest, DeviceCollaborationTest001, TestSize.Level0)
653 {
654     /**
655      * @tc.steps: step1. open db with DEVICE_COLLABORATION.
656      * @tc.expected: step1. return E_OK.
657      */
658     KvStoreNbDelegate* kvDelegatePtrS3 = nullptr;
659     KvStoreNbDelegate::Option option;
660     option.conflictResolvePolicy = ConflictResolvePolicy::DEVICE_COLLABORATION;
661     ASSERT_EQ(GetKvStore(kvDelegatePtrS3, STORE_ID_3, option), OK);
662     ASSERT_NE(kvDelegatePtrS3, nullptr);
663     KvStoreNbDelegate* kvDelegatePtrS4 = nullptr;
664     ASSERT_EQ(GetKvStore(kvDelegatePtrS4, STORE_ID_4, option), OK);
665     ASSERT_NE(kvDelegatePtrS4, nullptr);
666     /**
667      * @tc.steps: step2. db3 put (k1,v1) sync to db4.
668      * @tc.expected: step2. db4 get (k1,v1).
669      */
670     Key key = {'k'};
671     Value value = {'v'};
672     EXPECT_EQ(kvDelegatePtrS3->Put(key, value), OK);
673     communicatorAggregator_->SetLocalDeviceId("DB3");
674     BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
675     communicatorAggregator_->SetLocalDeviceId("DB4");
676     BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
677     Value actualValue;
678     EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
679     EXPECT_EQ(actualValue, value);
680     /**
681      * @tc.steps: step3. db4 delete (k1,v1) db3 sync again to db4.
682      * @tc.expected: step3. db4 get (k1,v1).
683      */
684     EXPECT_EQ(kvDelegatePtrS4->Delete(key), OK);
685     communicatorAggregator_->SetLocalDeviceId("DB3");
686     EXPECT_EQ(kvDelegatePtrS3->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
687     BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
688     communicatorAggregator_->SetLocalDeviceId("DB4");
689     BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
690     EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
691     EXPECT_EQ(actualValue, value);
692     CloseKvStore(kvDelegatePtrS3, STORE_ID_3);
693     CloseKvStore(kvDelegatePtrS4, STORE_ID_4);
694 }
695 }