1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "cloud/cloud_db_constant.h"
19 #include "cloud_db_sync_utils_test.h"
20 #include "db_base64_utils.h"
21 #include "distributeddb_data_generate_unit_test.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "kv_virtual_device.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kvdb_manager.h"
26 #include "platform_specific.h"
27 #include "process_system_api_adapter_impl.h"
28 #include "sqlite_cloud_kv_executor_utils.h"
29 #include "virtual_communicator_aggregator.h"
30 #include "virtual_cloud_db.h"
31
32 using namespace testing::ext;
33 using namespace DistributedDB;
34 using namespace DistributedDBUnitTest;
35 using namespace std;
36
37 namespace {
38 static std::string HWM_HEAD = "naturalbase_cloud_meta_sync_data_";
39 string g_testDir;
40 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
41 CloudSyncOption g_CloudSyncoption;
42 const std::string USER_ID_2 = "user2";
43 const std::string USER_ID_3 = "user3";
44 const int64_t WAIT_TIME = 5;
45 class DistributedDBCloudKvSyncerTest : public testing::Test {
46 public:
47 static void SetUpTestCase();
48 static void TearDownTestCase();
49 void SetUp();
50 void TearDown();
51 protected:
52 DBStatus GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId, KvStoreNbDelegate::Option option,
53 bool invalidSchema = false);
54 void CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId);
55 void BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus, CloudSyncOption option,
56 DBStatus expectSyncResult = OK);
57 static DataBaseSchema GetDataBaseSchema(bool invalidSchema);
58 void GetSingleStore();
59 void ReleaseSingleStore();
60 void BlockCompensatedSync(int &actSyncCnt, int expSyncCnt);
61 void CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated = false);
62 std::shared_ptr<VirtualCloudDb> virtualCloudDb_ = nullptr;
63 std::shared_ptr<VirtualCloudDb> virtualCloudDb2_ = nullptr;
64 KvStoreConfig config_;
65 KvStoreNbDelegate* kvDelegatePtrS1_ = nullptr;
66 KvStoreNbDelegate* kvDelegatePtrS2_ = nullptr;
67 SyncProcess lastProcess_;
68 VirtualCommunicatorAggregator *communicatorAggregator_ = nullptr;
69 KvVirtualDevice *deviceB_ = nullptr;
70 SQLiteSingleVerNaturalStore *singleStore_ = nullptr;
71 std::mutex comSyncMutex;
72 std::condition_variable comSyncCv;
73 };
74
SetUpTestCase()75 void DistributedDBCloudKvSyncerTest::SetUpTestCase()
76 {
77 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
78 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
79 LOGE("rm test db files error!");
80 }
81 g_CloudSyncoption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
82 g_CloudSyncoption.users.push_back(USER_ID);
83 g_CloudSyncoption.devices.push_back("cloud");
84
85 string dir = g_testDir + "/single_ver";
86 DIR* dirTmp = opendir(dir.c_str());
87 if (dirTmp == nullptr) {
88 OS::MakeDBDirectory(dir);
89 } else {
90 closedir(dirTmp);
91 }
92 }
93
TearDownTestCase()94 void DistributedDBCloudKvSyncerTest::TearDownTestCase()
95 {
96 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
97 LOGE("rm test db files error!");
98 }
99 }
100
SetUp()101 void DistributedDBCloudKvSyncerTest::SetUp()
102 {
103 DistributedDBToolsUnitTest::PrintTestCaseInfo();
104 config_.dataDir = g_testDir;
105 /**
106 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
107 */
108 virtualCloudDb_ = std::make_shared<VirtualCloudDb>();
109 virtualCloudDb2_ = std::make_shared<VirtualCloudDb>();
110 g_mgr.SetKvStoreConfig(config_);
111 KvStoreNbDelegate::Option option1;
112 ASSERT_EQ(GetKvStore(kvDelegatePtrS1_, STORE_ID_1, option1), OK);
113 // set aggregator after get store1, only store2 can sync with p2p
114 communicatorAggregator_ = new (std::nothrow) VirtualCommunicatorAggregator();
115 ASSERT_TRUE(communicatorAggregator_ != nullptr);
116 RuntimeContext::GetInstance()->SetCommunicatorAggregator(communicatorAggregator_);
117 KvStoreNbDelegate::Option option2;
118 ASSERT_EQ(GetKvStore(kvDelegatePtrS2_, STORE_ID_2, option2), OK);
119
120 deviceB_ = new (std::nothrow) KvVirtualDevice("DEVICE_B");
121 ASSERT_TRUE(deviceB_ != nullptr);
122 auto syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
123 ASSERT_TRUE(syncInterfaceB != nullptr);
124 ASSERT_EQ(deviceB_->Initialize(communicatorAggregator_, syncInterfaceB), E_OK);
125 GetSingleStore();
126 }
127
TearDown()128 void DistributedDBCloudKvSyncerTest::TearDown()
129 {
130 ReleaseSingleStore();
131 CloseKvStore(kvDelegatePtrS1_, STORE_ID_1);
132 CloseKvStore(kvDelegatePtrS2_, STORE_ID_2);
133 virtualCloudDb_ = nullptr;
134 virtualCloudDb2_ = nullptr;
135 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
136 LOGE("rm test db files error!");
137 }
138
139 if (deviceB_ != nullptr) {
140 delete deviceB_;
141 deviceB_ = nullptr;
142 }
143
144 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
145 communicatorAggregator_ = nullptr;
146 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
147 }
148
BlockSync(KvStoreNbDelegate * delegate,DBStatus expectDBStatus,CloudSyncOption option,DBStatus expectSyncResult)149 void DistributedDBCloudKvSyncerTest::BlockSync(KvStoreNbDelegate *delegate, DBStatus expectDBStatus,
150 CloudSyncOption option, DBStatus expectSyncResult)
151 {
152 if (delegate == nullptr) {
153 return;
154 }
155 std::mutex dataMutex;
156 std::condition_variable cv;
157 bool finish = false;
158 SyncProcess last;
159 auto callback = [expectDBStatus, &last, &cv, &dataMutex, &finish, &option](const std::map<std::string,
160 SyncProcess> &process) {
161 size_t notifyCnt = 0;
162 for (const auto &item: process) {
163 LOGD("user = %s, status = %d, errCode = %d", item.first.c_str(), item.second.process, item.second.errCode);
164 if (item.second.process != DistributedDB::FINISHED) {
165 continue;
166 }
167 EXPECT_EQ(item.second.errCode, expectDBStatus);
168 {
169 std::lock_guard<std::mutex> autoLock(dataMutex);
170 notifyCnt++;
171 std::set<std::string> userSet(option.users.begin(), option.users.end());
172 if (notifyCnt == userSet.size()) {
173 finish = true;
174 last = item.second;
175 cv.notify_one();
176 }
177 }
178 }
179 };
180 auto actualRet = delegate->Sync(option, callback);
181 EXPECT_EQ(actualRet, expectSyncResult);
182 if (actualRet == OK) {
183 std::unique_lock<std::mutex> uniqueLock(dataMutex);
184 cv.wait(uniqueLock, [&finish]() {
185 return finish;
186 });
187 }
188 lastProcess_ = last;
189 }
190
GetDataBaseSchema(bool invalidSchema)191 DataBaseSchema DistributedDBCloudKvSyncerTest::GetDataBaseSchema(bool invalidSchema)
192 {
193 DataBaseSchema schema;
194 TableSchema tableSchema;
195 tableSchema.name = invalidSchema ? "invalid_schema_name" : CloudDbConstant::CLOUD_KV_TABLE_NAME;
196 Field field;
197 field.colName = CloudDbConstant::CLOUD_KV_FIELD_KEY;
198 field.type = TYPE_INDEX<std::string>;
199 field.primary = true;
200 tableSchema.fields.push_back(field);
201 field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE;
202 field.primary = false;
203 tableSchema.fields.push_back(field);
204 field.colName = CloudDbConstant::CLOUD_KV_FIELD_ORI_DEVICE;
205 tableSchema.fields.push_back(field);
206 field.colName = CloudDbConstant::CLOUD_KV_FIELD_VALUE;
207 tableSchema.fields.push_back(field);
208 field.colName = CloudDbConstant::CLOUD_KV_FIELD_DEVICE_CREATE_TIME;
209 field.type = TYPE_INDEX<int64_t>;
210 tableSchema.fields.push_back(field);
211 schema.tables.push_back(tableSchema);
212 return schema;
213 }
214
GetSingleStore()215 void DistributedDBCloudKvSyncerTest::GetSingleStore()
216 {
217 KvDBProperties prop;
218 prop.SetStringProp(KvDBProperties::USER_ID, USER_ID);
219 prop.SetStringProp(KvDBProperties::APP_ID, APP_ID);
220 prop.SetStringProp(KvDBProperties::STORE_ID, STORE_ID_1);
221
222 std::string hashIdentifier = DBCommon::TransferHashString(
223 DBCommon::GenerateIdentifierId(STORE_ID_1, APP_ID, USER_ID, "", 0));
224 prop.SetStringProp(DBProperties::IDENTIFIER_DATA, hashIdentifier);
225 prop.SetIntProp(KvDBProperties::DATABASE_TYPE, KvDBProperties::SINGLE_VER_TYPE_SQLITE);
226 int errCode = E_OK;
227 singleStore_ = static_cast<SQLiteSingleVerNaturalStore *>(KvDBManager::OpenDatabase(prop, errCode));
228 ASSERT_NE(singleStore_, nullptr);
229 }
230
ReleaseSingleStore()231 void DistributedDBCloudKvSyncerTest::ReleaseSingleStore()
232 {
233 RefObject::DecObjRef(singleStore_);
234 singleStore_ = nullptr;
235 }
236
BlockCompensatedSync(int & actSyncCnt,int expSyncCnt)237 void DistributedDBCloudKvSyncerTest::BlockCompensatedSync(int &actSyncCnt, int expSyncCnt)
238 {
239 {
240 std::unique_lock<std::mutex> lock(comSyncMutex);
241 bool result = comSyncCv.wait_for(lock, std::chrono::seconds(WAIT_TIME),
242 [&actSyncCnt, expSyncCnt]() { return actSyncCnt == expSyncCnt; });
243 ASSERT_EQ(result, true);
244 }
245 }
246
247
GetKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId,KvStoreNbDelegate::Option option,bool invalidSchema)248 DBStatus DistributedDBCloudKvSyncerTest::GetKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId,
249 KvStoreNbDelegate::Option option, bool invalidSchema)
250 {
251 DBStatus openRet = OK;
252 g_mgr.GetKvStore(storeId, option, [&openRet, &delegate](DBStatus status, KvStoreNbDelegate *openDelegate) {
253 openRet = status;
254 delegate = openDelegate;
255 });
256 EXPECT_EQ(openRet, OK);
257 EXPECT_NE(delegate, nullptr);
258
259 std::map<std::string, std::shared_ptr<ICloudDb>> cloudDbs;
260 cloudDbs[USER_ID] = virtualCloudDb_;
261 cloudDbs[USER_ID_2] = virtualCloudDb2_;
262 delegate->SetCloudDB(cloudDbs);
263 std::map<std::string, DataBaseSchema> schemas;
264 schemas[USER_ID] = GetDataBaseSchema(invalidSchema);
265 schemas[USER_ID_2] = GetDataBaseSchema(invalidSchema);
266 return delegate->SetCloudDbSchema(schemas);
267 }
268
CloseKvStore(KvStoreNbDelegate * & delegate,const std::string & storeId)269 void DistributedDBCloudKvSyncerTest::CloseKvStore(KvStoreNbDelegate *&delegate, const std::string &storeId)
270 {
271 if (delegate != nullptr) {
272 ASSERT_EQ(g_mgr.CloseKvStore(delegate), OK);
273 delegate = nullptr;
274 DBStatus status = g_mgr.DeleteKvStore(storeId);
275 LOGD("delete kv store status %d store %s", status, storeId.c_str());
276 ASSERT_EQ(status, OK);
277 }
278 }
279
CheckUploadAbnormal(OpType opType,int64_t expCnt,bool isCompensated)280 void DistributedDBCloudKvSyncerTest::CheckUploadAbnormal(OpType opType, int64_t expCnt, bool isCompensated)
281 {
282 sqlite3 *db_;
283 uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
284 std::string fileUrl = g_testDir + "/" \
285 "2d23c8a0ffadafcaa03507a4ec2290c83babddcab07c0e2945fbba93efc7eec0/single_ver/main/gen_natural_store.db";
286 EXPECT_EQ(sqlite3_open_v2(fileUrl.c_str(), &db_, flag, nullptr), SQLITE_OK);
287
288 std::string sql = "SELECT count(*) FROM naturalbase_kv_aux_sync_data_log WHERE ";
289 switch (opType) {
290 case OpType::INSERT:
291 sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
292 " cloud_gid != '' AND version !='' AND cloud_flag=cloud_flag|0x10";
293 break;
294 case OpType::UPDATE:
295 sql += isCompensated ? " cloud_gid != '' AND version !='' AND cloud_flag&0x10=0" :
296 " cloud_gid == '' AND version =='' AND cloud_flag=cloud_flag|0x10";
297 break;
298 case OpType::DELETE:
299 sql += " cloud_gid == '' AND version ==''";
300 break;
301 default:
302 break;
303 }
304 EXPECT_EQ(sqlite3_exec(db_, sql.c_str(), CloudDBSyncUtilsTest::QueryCountCallback,
305 reinterpret_cast<void *>(expCnt), nullptr), SQLITE_OK);
306 sqlite3_close_v2(db_);
307 }
308
309 /**
310 * @tc.name: UploadAbnormalSync001
311 * @tc.desc: Test upload update record, cloud returned record not found.
312 * @tc.type: FUNC
313 * @tc.require:
314 * @tc.author: bty
315 */
316 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync001, TestSize.Level0)
317 {
318 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
319 ASSERT_NE(cloudHook, nullptr);
320
321 /**
322 * @tc.steps:step1. Device A inserts data and synchronizes
323 * @tc.expected: step1 OK.
324 */
325 Key key = {'k'};
326 Value value = {'v'};
327 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
328 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
329
330 /**
331 * @tc.steps:step2. Device A update data and synchronizes, cloud returned record not found
332 * @tc.expected: step2 OK.
333 */
334 Value value2 = {'x'};
335 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value2), OK);
336 int upIdx = 0;
__anon644cd9970602(const std::string &tableName, VBucket &extend) 337 virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
338 LOGD("cloud db upload index:%d", ++upIdx);
339 if (upIdx == 1) { // 1 is index
340 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
341 }
342 });
343 int syncCnt = 0;
__anon644cd9970702null344 cloudHook->SetSyncFinishHook([&syncCnt, this] {
345 LOGD("sync finish times:%d", ++syncCnt);
346 if (syncCnt == 1) { // 1 is the first sync
347 CheckUploadAbnormal(OpType::UPDATE, 1L); // 1 is expected count
348 } else {
349 CheckUploadAbnormal(OpType::UPDATE, 1L, true); // 1 is expected count
350 }
351 comSyncCv.notify_all();
352 });
353 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
354 BlockCompensatedSync(syncCnt, 2); // 2 is sync times
355 virtualCloudDb_->ForkUpload(nullptr);
356 cloudHook->SetSyncFinishHook(nullptr);
357 }
358
359 /**
360 * @tc.name: UploadAbnormalSync002
361 * @tc.desc: Test upload insert record, cloud returned record already existed.
362 * @tc.type: FUNC
363 * @tc.require:
364 * @tc.author: bty
365 */
366 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync002, TestSize.Level0)
367 {
368 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
369 ASSERT_NE(cloudHook, nullptr);
370
371 /**
372 * @tc.steps:step1. Device A inserts k-v and synchronizes
373 * @tc.expected: step1 OK.
374 */
375 Key key = {'k'};
376 Value value = {'v'};
377 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
378 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
379
380 /**
381 * @tc.steps:step2. Device A insert k2-v2 and synchronizes, Device B insert k2-v2 and sync before A upload
382 * @tc.expected: step2 OK.
383 */
384 Key key2 = {'x'};
385 Value value2 = {'y'};
386 ASSERT_EQ(kvDelegatePtrS1_->Put(key2, value2), OK);
387 int upIdx = 0;
__anon644cd9970802(const std::string &tableName, VBucket &extend) 388 virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
389 LOGD("cloud db upload index:%d", ++upIdx);
390 if (upIdx == 2) { // 2 is index
391 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_ALREADY_EXISTED);
392 }
393 });
394 int doUpIdx = 0;
__anon644cd9970902null395 cloudHook->SetDoUploadHook([&doUpIdx, key2, value2, this] {
396 LOGD("begin upload index:%d", ++doUpIdx);
397 ASSERT_EQ(kvDelegatePtrS2_->Put(key2, value2), OK);
398 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
399 });
400 int syncCnt = 0;
__anon644cd9970a02null401 cloudHook->SetSyncFinishHook([&syncCnt, this] {
402 LOGD("sync finish times:%d", ++syncCnt);
403 if (syncCnt == 1) { // 1 is the normal sync
404 CheckUploadAbnormal(OpType::INSERT, 1L); // 1 is expected count
405 } else {
406 CheckUploadAbnormal(OpType::INSERT, 2L, true); // 2 is expected count
407 }
408 comSyncCv.notify_all();
409 });
410 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
411 BlockCompensatedSync(syncCnt, 2); // 2 is sync times
412 virtualCloudDb_->ForkUpload(nullptr);
413 cloudHook->SetSyncFinishHook(nullptr);
414 cloudHook->SetDoUploadHook(nullptr);
415 }
416
417 /**
418 * @tc.name: UploadAbnormalSync003
419 * @tc.desc: Test upload delete record, cloud returned record not found.
420 * @tc.type: FUNC
421 * @tc.require:
422 * @tc.author: bty
423 */
424 HWTEST_F(DistributedDBCloudKvSyncerTest, UploadAbnormalSync003, TestSize.Level0)
425 {
426 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
427 ASSERT_NE(cloudHook, nullptr);
428
429 /**
430 * @tc.steps:step1. Device A inserts data and synchronizes
431 * @tc.expected: step1 OK.
432 */
433 Key key = {'k'};
434 Value value = {'v'};
435 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
436 BlockSync(kvDelegatePtrS1_, OK, g_CloudSyncoption);
437 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
438
439 /**
440 * @tc.steps:step2. Device A delete data and synchronizes, Device B delete data and sync before A upload
441 * @tc.expected: step2 OK.
442 */
443 ASSERT_EQ(kvDelegatePtrS1_->Delete(key), OK);
444 int upIdx = 0;
__anon644cd9970b02(const std::string &tableName, VBucket &extend) 445 virtualCloudDb_->ForkUpload([&upIdx](const std::string &tableName, VBucket &extend) {
446 LOGD("cloud db upload index:%d", ++upIdx);
447 if (upIdx == 2) { // 2 is index
448 extend[CloudDbConstant::ERROR_FIELD] = static_cast<int64_t>(DBStatus::CLOUD_RECORD_NOT_FOUND);
449 }
450 });
451 int doUpIdx = 0;
__anon644cd9970c02null452 cloudHook->SetDoUploadHook([&doUpIdx, key, this] {
453 LOGD("begin upload index:%d", ++doUpIdx);
454 ASSERT_EQ(kvDelegatePtrS2_->Delete(key), OK);
455 BlockSync(kvDelegatePtrS2_, OK, g_CloudSyncoption);
456 });
457 int syncCnt = 0;
__anon644cd9970d02null458 cloudHook->SetSyncFinishHook([&syncCnt, this] {
459 LOGD("sync finish times:%d", ++syncCnt);
460 if (syncCnt == 1) { // 1 is the normal sync
461 CheckUploadAbnormal(OpType::DELETE, 1L); // 1 is expected count
462 } else {
463 CheckUploadAbnormal(OpType::DELETE, 1L, true); // 1 is expected count
464 }
465 comSyncCv.notify_all();
466 });
467 BlockSync(kvDelegatePtrS1_, CLOUD_ERROR, g_CloudSyncoption);
468 BlockCompensatedSync(syncCnt, 1); // 1 is sync times
469 virtualCloudDb_->ForkUpload(nullptr);
470 cloudHook->SetSyncFinishHook(nullptr);
471 cloudHook->SetDoUploadHook(nullptr);
472 }
473
474 /**
475 * @tc.name: QueryParsingProcessTest001
476 * @tc.desc: Test Query parsing process.
477 * @tc.type: FUNC
478 * @tc.require:
479 * @tc.author: luoguo
480 */
481 HWTEST_F(DistributedDBCloudKvSyncerTest, QueryParsingProcessTest001, TestSize.Level0)
482 {
483 auto cloudHook = (ICloudSyncStorageHook *) singleStore_->GetCloudKvStore();
484 ASSERT_NE(cloudHook, nullptr);
485
486 /**
487 * @tc.steps:step1. Device A inserts data and synchronizes
488 * @tc.expected: step1 OK.
489 */
490 Key key = {'k'};
491 Value value = {'v'};
492 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
493
494 /**
495 * @tc.steps:step2. Test Query parsing Process
496 * @tc.expected: step2 OK.
497 */
498 QuerySyncObject syncObject;
499 std::vector<VBucket> syncDataPk;
500 VBucket bucket;
501 bucket.insert_or_assign(std::string("k"), std::string("k"));
502 syncDataPk.push_back(bucket);
503 std::string tableName = "sync_data";
504 ASSERT_EQ(CloudStorageUtils::GetSyncQueryByPk(tableName, syncDataPk, true, syncObject), E_OK);
505
506 Bytes bytes;
507 bytes.resize(syncObject.CalculateParcelLen(SOFTWARE_VERSION_CURRENT));
508 Parcel parcel(bytes.data(), bytes.size());
509 ASSERT_EQ(syncObject.SerializeData(parcel, SOFTWARE_VERSION_CURRENT), E_OK);
510
511 /**
512 * @tc.steps:step3. Check Node's type is QueryNodeType::IN.
513 * @tc.expected: step3 OK.
514 */
515 std::vector<QueryNode> queryNodes;
516 syncObject.ParserQueryNodes(bytes, queryNodes);
517 ASSERT_EQ(queryNodes[0].type, QueryNodeType::IN);
518 }
519
520 /**
521 * @tc.name: SyncWithMultipleUsers001.
522 * @tc.desc: Test sync data with multiple users.
523 * @tc.type: FUNC
524 * @tc.require:
525 * @tc.author: liufuchenxing
526 */
527 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers001, TestSize.Level0)
528 {
529 Key key = {'k'};
530 Value value = {'v'};
531 ASSERT_EQ(kvDelegatePtrS1_->Put(key, value), OK);
532 CloudSyncOption syncOption;
533 syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
534 syncOption.users.push_back(USER_ID);
535 syncOption.users.push_back(USER_ID_2);
536 syncOption.devices.push_back("cloud");
537 BlockSync(kvDelegatePtrS1_, OK, syncOption);
538 BlockSync(kvDelegatePtrS2_, OK, syncOption);
539 Value actualValue;
540 // cloud download [k,v]
541 EXPECT_EQ(kvDelegatePtrS2_->Get(key, actualValue), OK);
542 EXPECT_EQ(actualValue, value);
543 }
544
545 /**
546 * @tc.name: SyncWithMultipleUsers002.
547 * @tc.desc: test whether upload to the cloud after delete local data that does not have a gid.
548 * @tc.type: FUNC
549 * @tc.require:
550 * @tc.author: luoguo
551 */
552 HWTEST_F(DistributedDBCloudKvSyncerTest, SyncWithMultipleUsers002, TestSize.Level0)
553 {
554 /**
555 * @tc.steps: step1. kvDelegatePtrS1_ put 200 data and sync to cloud.
556 * @tc.expected: step1. return ok.
557 */
558 std::vector<Entry> entries;
559 for (int i = 0; i < 200; i++) {
560 std::string keyStr = "k_" + std::to_string(i);
561 std::string valueStr = "v_" + std::to_string(i);
562 Key key(keyStr.begin(), keyStr.end());
563 Value value(valueStr.begin(), valueStr.end());
564 Entry entry;
565 entry.key = key;
566 entry.value = value;
567 entries.push_back(entry);
568 }
569
570 ASSERT_EQ(kvDelegatePtrS1_->PutBatch(entries), OK);
571 CloudSyncOption syncOption;
572 syncOption.mode = SyncMode::SYNC_MODE_CLOUD_MERGE;
573 syncOption.users.push_back(USER_ID);
574 syncOption.users.push_back(USER_ID_2);
575 syncOption.devices.push_back("cloud");
576 BlockSync(kvDelegatePtrS1_, OK, syncOption);
577
578 /**
579 * @tc.steps: step2. kvDelegatePtrS2_ only sync user0 from cloud.
580 * @tc.expected: step2. return ok.
581 */
582 syncOption.users.clear();
583 syncOption.users.push_back(USER_ID);
584 BlockSync(kvDelegatePtrS2_, OK, syncOption);
585
586 /**
587 * @tc.steps: step3. kvDelegatePtrS2_ delete 100 data.
588 * @tc.expected: step3. return ok.
589 */
590 std::vector<Key> keys;
591 for (int i = 0; i < 100; i++) {
592 std::string keyStr = "k_" + std::to_string(i);
593 Key key(keyStr.begin(), keyStr.end());
594 keys.push_back(key);
595 }
596 ASSERT_EQ(kvDelegatePtrS2_->DeleteBatch(keys), OK);
597
598 /**
599 * @tc.steps: step4. kvDelegatePtrS2_ sync to cloud with user0 user2.
600 * @tc.expected: step4. return ok.
601 */
602 syncOption.users.clear();
603 syncOption.users.push_back(USER_ID);
604 syncOption.users.push_back(USER_ID_2);
605
606 std::mutex dataMutex;
607 std::condition_variable cv;
608 bool finish = false;
609 uint32_t insertCount = 0;
610 auto callback = [&dataMutex, &cv, &finish, &insertCount, &syncOption](const std::map<std::string,
__anon644cd9970e02(const std::map<std::string, SyncProcess> &process) 611 SyncProcess> &process) {
612 size_t notifyCnt = 0;
613 for (const auto &item : process) {
614 LOGD("user = %s, status = %d, errCode=%d", item.first.c_str(), item.second.process, item.second.errCode);
615 if (item.second.process != DistributedDB::FINISHED) {
616 continue;
617 }
618 EXPECT_EQ(item.second.errCode, OK);
619 {
620 std::lock_guard<std::mutex> autoLock(dataMutex);
621 notifyCnt++;
622 std::set<std::string> userSet(syncOption.users.begin(), syncOption.users.end());
623 if (notifyCnt == userSet.size()) {
624 finish = true;
625 std::map<std::string, TableProcessInfo> tableProcess(item.second.tableProcess);
626 insertCount = tableProcess["sync_data"].downLoadInfo.insertCount;
627 cv.notify_one();
628 }
629 }
630 }
631 };
632 auto actualRet = kvDelegatePtrS2_->Sync(syncOption, callback);
633 EXPECT_EQ(actualRet, OK);
634 if (actualRet == OK) {
635 std::unique_lock<std::mutex> uniqueLock(dataMutex);
__anon644cd9970f02() 636 cv.wait(uniqueLock, [&finish]() { return finish; });
637 }
638 /**
639 * @tc.steps: step5. check process info, user2's insertCount should be 0.
640 * @tc.expected: step5. return ok.
641 */
642 EXPECT_EQ(insertCount, 0u);
643 }
644
645 /**
646 * @tc.name: DeviceCollaborationTest001
647 * @tc.desc: Check force override data
648 * @tc.type: FUNC
649 * @tc.require:
650 * @tc.author: zqq
651 */
652 HWTEST_F(DistributedDBCloudKvSyncerTest, DeviceCollaborationTest001, TestSize.Level0)
653 {
654 /**
655 * @tc.steps: step1. open db with DEVICE_COLLABORATION.
656 * @tc.expected: step1. return E_OK.
657 */
658 KvStoreNbDelegate* kvDelegatePtrS3 = nullptr;
659 KvStoreNbDelegate::Option option;
660 option.conflictResolvePolicy = ConflictResolvePolicy::DEVICE_COLLABORATION;
661 ASSERT_EQ(GetKvStore(kvDelegatePtrS3, STORE_ID_3, option), OK);
662 ASSERT_NE(kvDelegatePtrS3, nullptr);
663 KvStoreNbDelegate* kvDelegatePtrS4 = nullptr;
664 ASSERT_EQ(GetKvStore(kvDelegatePtrS4, STORE_ID_4, option), OK);
665 ASSERT_NE(kvDelegatePtrS4, nullptr);
666 /**
667 * @tc.steps: step2. db3 put (k1,v1) sync to db4.
668 * @tc.expected: step2. db4 get (k1,v1).
669 */
670 Key key = {'k'};
671 Value value = {'v'};
672 EXPECT_EQ(kvDelegatePtrS3->Put(key, value), OK);
673 communicatorAggregator_->SetLocalDeviceId("DB3");
674 BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
675 communicatorAggregator_->SetLocalDeviceId("DB4");
676 BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
677 Value actualValue;
678 EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
679 EXPECT_EQ(actualValue, value);
680 /**
681 * @tc.steps: step3. db4 delete (k1,v1) db3 sync again to db4.
682 * @tc.expected: step3. db4 get (k1,v1).
683 */
684 EXPECT_EQ(kvDelegatePtrS4->Delete(key), OK);
685 communicatorAggregator_->SetLocalDeviceId("DB3");
686 EXPECT_EQ(kvDelegatePtrS3->RemoveDeviceData("", ClearMode::FLAG_AND_DATA), OK);
687 BlockSync(kvDelegatePtrS3, OK, g_CloudSyncoption);
688 communicatorAggregator_->SetLocalDeviceId("DB4");
689 BlockSync(kvDelegatePtrS4, OK, g_CloudSyncoption);
690 EXPECT_EQ(kvDelegatePtrS4->Get(key, actualValue), OK);
691 EXPECT_EQ(actualValue, value);
692 CloseKvStore(kvDelegatePtrS3, STORE_ID_3);
693 CloseKvStore(kvDelegatePtrS4, STORE_ID_4);
694 }
695 }