1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "mock_sync_task_context.h"
27 #include "platform_specific.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35
36 namespace {
37 class TestSingleVerKvSyncTaskContext : public SingleVerKvSyncTaskContext {
38 public:
39 TestSingleVerKvSyncTaskContext() = default;
40 };
41 string g_testDir;
42 const string STORE_ID = "kv_stroe_complex_sync_test";
43 const int WAIT_TIME = 1000;
44 const std::string DEVICE_A = "real_device";
45 const std::string DEVICE_B = "deviceB";
46 const std::string DEVICE_C = "deviceC";
47 const std::string CREATE_SYNC_TABLE_SQL =
48 "CREATE TABLE IF NOT EXISTS sync_data(" \
49 "key BLOB NOT NULL," \
50 "value BLOB," \
51 "timestamp INT NOT NULL," \
52 "flag INT NOT NULL," \
53 "device BLOB," \
54 "ori_device BLOB," \
55 "hash_key BLOB PRIMARY KEY NOT NULL," \
56 "w_timestamp INT," \
57 "modify_time INT," \
58 "create_time INT" \
59 ");";
60
61 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
62 KvStoreConfig g_config;
63 DistributedDBToolsUnitTest g_tool;
64 DBStatus g_kvDelegateStatus = INVALID_ARGS;
65 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
66 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
67 KvVirtualDevice *g_deviceB = nullptr;
68 KvVirtualDevice *g_deviceC = nullptr;
69
70 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
71 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
72 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
73
PullSyncTest()74 void PullSyncTest()
75 {
76 DBStatus status = OK;
77 std::vector<std::string> devices;
78 devices.push_back(g_deviceB->GetDeviceId());
79
80 Key key = {'1'};
81 Key key2 = {'2'};
82 Value value = {'1'};
83 g_deviceB->PutData(key, value, 0, 0);
84 g_deviceB->PutData(key2, value, 1, 0);
85
86 std::map<std::string, DBStatus> result;
87 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
88 ASSERT_TRUE(status == OK);
89
90 ASSERT_TRUE(result.size() == devices.size());
91 for (const auto &pair : result) {
92 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
93 EXPECT_TRUE(pair.second == OK);
94 }
95 Value value3;
96 EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
97 EXPECT_EQ(value3, value);
98 EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
99 EXPECT_EQ(value3, value);
100 }
101
CrudTest()102 void CrudTest()
103 {
104 vector<Entry> entries;
105 int totalSize = 10;
106 for (int i = 0; i < totalSize; i++) {
107 Entry entry;
108 entry.key.push_back(i);
109 entry.value.push_back('2');
110 entries.push_back(entry);
111 }
112 EXPECT_TRUE(g_kvDelegatePtr->PutBatch(entries) == OK);
113 for (const auto &entry : entries) {
114 Value resultvalue;
115 EXPECT_TRUE(g_kvDelegatePtr->Get(entry.key, resultvalue) == OK);
116 EXPECT_TRUE(resultvalue == entry.value);
117 }
118 for (int i = 0; i < totalSize / 2; i++) { // 2: Half of the total
119 g_kvDelegatePtr->Delete(entries[i].key);
120 Value resultvalue;
121 EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == NOT_FOUND);
122 }
123 for (int i = totalSize / 2; i < totalSize; i++) {
124 Value value = entries[i].value;
125 value.push_back('x');
126 EXPECT_TRUE(g_kvDelegatePtr->Put(entries[i].key, value) == OK);
127 Value resultvalue;
128 EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == OK);
129 EXPECT_TRUE(resultvalue == value);
130 }
131 }
132
DataSync005()133 void DataSync005()
134 {
135 ASSERT_NE(g_communicatorAggregator, nullptr);
136 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
137 ASSERT_TRUE(dataSync != nullptr);
138 dataSync->SendSaveDataNotifyPacket(nullptr, 0, 0, 0, TIME_SYNC_MESSAGE);
139 EXPECT_EQ(g_communicatorAggregator->GetOnlineDevices().size(), 3u); // 3 online dev
140 delete dataSync;
141 }
142
DataSync008()143 void DataSync008()
144 {
145 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
146 ASSERT_TRUE(dataSync != nullptr);
147 auto context = new (std::nothrow) MockSyncTaskContext();
148 dataSync->PutDataMsg(nullptr);
149 bool isNeedHandle = false;
150 bool isContinue = false;
151 EXPECT_EQ(dataSync->MoveNextDataMsg(context, isNeedHandle, isContinue), nullptr);
152 EXPECT_EQ(isNeedHandle, false);
153 EXPECT_EQ(isContinue, false);
154 delete dataSync;
155 delete context;
156 }
157
ReSetWaterDogTest001()158 void ReSetWaterDogTest001()
159 {
160 /**
161 * @tc.steps: step1. put 10 key/value
162 * @tc.expected: step1, put return OK.
163 */
164 for (int i = 0; i < 5; i++) { // put 5 key
165 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 1024); // rand num 1024 for test
166 Value value;
167 DistributedDBToolsUnitTest::GetRandomKeyValue(value, 10 * 50 * 1024u); // 10 * 50 * 1024 = 500k
168 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
169 }
170 /**
171 * @tc.steps: step2. SetDeviceMtuSize
172 * @tc.expected: step2, return OK.
173 */
174 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 50 * 1024u); // 50 * 1024u = 50k
175 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 50 * 1024u); // 50 * 1024u = 50k
176 /**
177 * @tc.steps: step3. deviceA,deviceB sync to each other at same time
178 * @tc.expected: step3. sync should return OK.
179 */
180 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true), E_OK);
181 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
182 g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
183 }
184 }
185
186 class DistributedDBSingleVerP2PComplexSyncTest : public testing::Test {
187 public:
188 static void SetUpTestCase(void);
189 static void TearDownTestCase(void);
190 void SetUp();
191 void TearDown();
192 };
193
SetUpTestCase(void)194 void DistributedDBSingleVerP2PComplexSyncTest::SetUpTestCase(void)
195 {
196 /**
197 * @tc.setup: Init datadir and Virtual Communicator.
198 */
199 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
200 g_config.dataDir = g_testDir;
201 g_mgr.SetKvStoreConfig(g_config);
202
203 string dir = g_testDir + "/single_ver";
204 DIR* dirTmp = opendir(dir.c_str());
205 if (dirTmp == nullptr) {
206 OS::MakeDBDirectory(dir);
207 } else {
208 closedir(dirTmp);
209 }
210
211 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
212 ASSERT_TRUE(g_communicatorAggregator != nullptr);
213 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
214 }
215
TearDownTestCase(void)216 void DistributedDBSingleVerP2PComplexSyncTest::TearDownTestCase(void)
217 {
218 /**
219 * @tc.teardown: Release virtual Communicator and clear data dir.
220 */
221 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
222 LOGE("rm test db files error!");
223 }
224 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
225 }
226
SetUp(void)227 void DistributedDBSingleVerP2PComplexSyncTest::SetUp(void)
228 {
229 DistributedDBToolsUnitTest::PrintTestCaseInfo();
230 /**
231 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
232 */
233 KvStoreNbDelegate::Option option;
234 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
235 ASSERT_TRUE(g_kvDelegateStatus == OK);
236 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
237 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
238 ASSERT_TRUE(g_deviceB != nullptr);
239 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
240 ASSERT_TRUE(syncInterfaceB != nullptr);
241 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
242
243 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
244 ASSERT_TRUE(g_deviceC != nullptr);
245 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
246 ASSERT_TRUE(syncInterfaceC != nullptr);
247 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
248
249 auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
250 const std::string &deviceId, uint8_t flag) -> bool {
251 return true;
252 };
253 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
254 }
255
TearDown(void)256 void DistributedDBSingleVerP2PComplexSyncTest::TearDown(void)
257 {
258 /**
259 * @tc.teardown: Release device A, B, C
260 */
261 if (g_kvDelegatePtr != nullptr) {
262 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
263 g_kvDelegatePtr = nullptr;
264 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
265 LOGD("delete kv store status %d", status);
266 ASSERT_TRUE(status == OK);
267 }
268 if (g_deviceB != nullptr) {
269 delete g_deviceB;
270 g_deviceB = nullptr;
271 }
272 if (g_deviceC != nullptr) {
273 delete g_deviceC;
274 g_deviceC = nullptr;
275 }
276 PermissionCheckCallbackV2 nullCallback;
277 EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
278 }
279
280 /**
281 * @tc.name: SaveDataNotify001
282 * @tc.desc: Test SaveDataNotify function, delay < 30s should sync ok, > 36 should timeout
283 * @tc.type: FUNC
284 * @tc.require: AR000D4876
285 * @tc.author: xushaohua
286 */
287 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SaveDataNotify001, TestSize.Level3)
288 {
289 DBStatus status = OK;
290 const int waitFiveSeconds = 5000;
291 const int waitThirtySeconds = 30000;
292 const int waitThirtySixSeconds = 36000;
293 std::vector<std::string> devices;
294 devices.push_back(g_deviceB->GetDeviceId());
295
296 /**
297 * @tc.steps: step1. deviceA put {k1, v1}
298 */
299 Key key = {'1'};
300 Value value = {'1'};
301 status = g_kvDelegatePtr->Put(key, value);
302 ASSERT_TRUE(status == OK);
303
304 /**
305 * @tc.steps: step2. deviceB set sava data dely 5s
306 */
307 g_deviceB->SetSaveDataDelayTime(waitFiveSeconds);
308
309 /**
310 * @tc.steps: step3. deviceA call sync and wait
311 * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
312 */
313 std::map<std::string, DBStatus> result;
314 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
315 ASSERT_TRUE(status == OK);
316 ASSERT_TRUE(result.size() == devices.size());
317 ASSERT_TRUE(result[DEVICE_B] == OK);
318
319 /**
320 * @tc.steps: step4. deviceB set sava data dely 30s and put {k1, v1}
321 */
322 g_deviceB->SetSaveDataDelayTime(waitThirtySeconds);
323 status = g_kvDelegatePtr->Put(key, value);
324 ASSERT_TRUE(status == OK);
325 /**
326 * @tc.steps: step3. deviceA call sync and wait
327 * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
328 */
329 result.clear();
330 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
331 ASSERT_TRUE(status == OK);
332 ASSERT_TRUE(result.size() == devices.size());
333 ASSERT_TRUE(result[DEVICE_B] == OK);
334
335 /**
336 * @tc.steps: step4. deviceB set sava data dely 36s and put {k1, v1}
337 */
338 g_deviceB->SetSaveDataDelayTime(waitThirtySixSeconds);
339 status = g_kvDelegatePtr->Put(key, value);
340 ASSERT_TRUE(status == OK);
341 /**
342 * @tc.steps: step5. deviceA call sync and wait
343 * @tc.expected: step5. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
344 */
345 result.clear();
346 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
347 ASSERT_TRUE(status == OK);
348 ASSERT_TRUE(result.size() == devices.size());
349 ASSERT_TRUE(result[DEVICE_B] == TIME_OUT);
350 }
351
352 /**
353 * @tc.name: SametimeSync001
354 * @tc.desc: Test 2 device sync with each other
355 * @tc.type: FUNC
356 * @tc.require: AR000CCPOM
357 * @tc.author: zhangqiquan
358 */
359 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync001, TestSize.Level3)
360 {
361 DBStatus status = OK;
362 std::vector<std::string> devices;
363 devices.push_back(g_deviceB->GetDeviceId());
364
365 int responseCount = 0;
366 int requestCount = 0;
367 Key key = {'1'};
368 Value value = {'1'};
369 /**
370 * @tc.steps: step1. make sure deviceB send pull firstly and response_pull secondly
371 * @tc.expected: step1. deviceA put data when finish push task. put data should return OK.
372 */
373 g_communicatorAggregator->RegOnDispatch([&responseCount, &requestCount, &key, &value](
__anonb4f3bed90302( const std::string &target, DistributedDB::Message *msg) 374 const std::string &target, DistributedDB::Message *msg) {
375 if (target != "real_device" || msg->GetMessageId() != DATA_SYNC_MESSAGE) {
376 return;
377 }
378
379 if (msg->GetMessageType() == TYPE_RESPONSE) {
380 responseCount++;
381 if (responseCount == 1) { // 1 is the ack which B response A's push task
382 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), DBStatus::OK);
383 std::this_thread::sleep_for(std::chrono::seconds(1));
384 } else if (responseCount == 2) { // 2 is the ack which B response A's response_pull task
385 msg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
386 }
387 } else if (msg->GetMessageType() == TYPE_REQUEST) {
388 requestCount++;
389 if (requestCount == 1) { // 1 is A push task
390 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2 sec
391 }
392 }
393 });
394 /**
395 * @tc.steps: step2. deviceA,deviceB sync to each other at same time
396 * @tc.expected: step2. sync should return OK.
397 */
398 std::map<std::string, DBStatus> result;
__anonb4f3bed90402null399 std::thread subThread([]{
400 g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
401 });
402 status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_PULL, result);
403 subThread.join();
404 g_communicatorAggregator->RegOnDispatch(nullptr);
405
406 EXPECT_TRUE(status == OK);
407 ASSERT_TRUE(result.size() == devices.size());
408 EXPECT_TRUE(result[DEVICE_B] == OK);
409 Value actualValue;
410 g_kvDelegatePtr->Get(key, actualValue);
411 EXPECT_EQ(actualValue, value);
412 }
413
414 /**
415 * @tc.name: SametimeSync002
416 * @tc.desc: Test 2 device sync with each other with water error
417 * @tc.type: FUNC
418 * @tc.require: AR000CCPOM
419 * @tc.author: zhangqiquan
420 */
421 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync002, TestSize.Level3)
422 {
423 DBStatus status = OK;
424 std::vector<std::string> devices;
425 devices.push_back(g_deviceB->GetDeviceId());
426 g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'});
427 /**
428 * @tc.steps: step1. make sure deviceA push data failed and increase water mark
429 * @tc.expected: step1. deviceA push failed with timeout
430 */
__anonb4f3bed90502(const std::string &target, DistributedDB::Message *msg) 431 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
432 ASSERT_NE(msg, nullptr);
433 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
434 msg->SetMessageId(INVALID_MESSAGE_ID);
435 }
436 });
437 std::map<std::string, DBStatus> result;
__anonb4f3bed90602(const std::map<std::string, DBStatus> &map) 438 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
439 result = map;
440 };
441 Query query = Query::Select().PrefixKey({'k', '1'});
442 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
443 ASSERT_TRUE(result.size() == devices.size());
444 EXPECT_TRUE(result[DEVICE_B] == TIME_OUT);
445 /**
446 * @tc.steps: step2. A push to B with query2, sleep 1s for waiting step3
447 * @tc.expected: step2. sync should return OK.
448 */
__anonb4f3bed90702(const std::string &target, DistributedDB::Message *msg) 449 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
450 ASSERT_NE(msg, nullptr);
451 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
452 std::this_thread::sleep_for(std::chrono::seconds(1));
453 }
454 });
__anonb4f3bed90802null455 std::thread subThread([&devices] {
456 std::map<std::string, DBStatus> result;
457 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
458 result = map;
459 };
460 Query query = Query::Select().PrefixKey({'k', '2'});
461 LOGD("Begin PUSH");
462 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
463 ASSERT_TRUE(result.size() == devices.size());
464 EXPECT_TRUE(result[DEVICE_A] == OK);
465 });
466 /**
467 * @tc.steps: step3. B pull to A when A is in push task
468 * @tc.expected: step3. sync should return OP_FINISHED_ALL.
469 */
470 std::this_thread::sleep_for(std::chrono::milliseconds(100));
471 std::map<std::string, int> virtualResult;
472 g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
__anonb4f3bed90a02(const std::map<std::string, int> &map) 473 [&virtualResult](const std::map<std::string, int> &map) {
474 virtualResult = map;
475 }, true);
476 EXPECT_TRUE(status == OK);
477 ASSERT_EQ(virtualResult.size(), devices.size());
478 EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
479 g_communicatorAggregator->RegOnDispatch(nullptr);
480 subThread.join();
481 }
482
483 /**
484 * @tc.name: DatabaseOnlineCallback001
485 * @tc.desc: check database status notify online callback
486 * @tc.type: FUNC
487 * @tc.require: AR000CQS3S SR000CQE0B
488 * @tc.author: zhuwentao
489 */
490 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOnlineCallback001, TestSize.Level1)
491 {
492 /**
493 * @tc.steps: step1. SetStoreStatusNotifier
494 * @tc.expected: step1. SetStoreStatusNotifier ok
495 */
496 std::string targetDev = "DEVICE_X";
497 bool isCheckOk = false;
498 auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anonb4f3bed90b02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 499 const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
500 if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
501 onlineStatus == true) {
502 isCheckOk = true;
503 }};
504 g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
505 /**
506 * @tc.steps: step2. trigger device online
507 * @tc.expected: step2. check callback ok
508 */
509 g_communicatorAggregator->OnlineDevice(targetDev);
510 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
511 EXPECT_EQ(isCheckOk, true);
512 StoreStatusNotifier nullCallback;
513 g_mgr.SetStoreStatusNotifier(nullCallback);
514 }
515
516 /**
517 * @tc.name: DatabaseOfflineCallback001
518 * @tc.desc: check database status notify online callback
519 * @tc.type: FUNC
520 * @tc.require: AR000CQS3S SR000CQE0B
521 * @tc.author: zhuwentao
522 */
523 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOfflineCallback001, TestSize.Level1)
524 {
525 /**
526 * @tc.steps: step1. SetStoreStatusNotifier
527 * @tc.expected: step1. SetStoreStatusNotifier ok
528 */
529 std::string targetDev = "DEVICE_X";
530 bool isCheckOk = false;
531 auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anonb4f3bed90c02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 532 const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
533 if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
534 onlineStatus == false) {
535 isCheckOk = true;
536 }};
537 g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
538 /**
539 * @tc.steps: step2. trigger device offline
540 * @tc.expected: step2. check callback ok
541 */
542 g_communicatorAggregator->OfflineDevice(targetDev);
543 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
544 EXPECT_EQ(isCheckOk, true);
545 StoreStatusNotifier nullCallback;
546 g_mgr.SetStoreStatusNotifier(nullCallback);
547 }
548
549 /**
550 * @tc.name: CloseSync001
551 * @tc.desc: Test 2 delegate close when sync
552 * @tc.type: FUNC
553 * @tc.require: AR000CCPOM
554 * @tc.author: zhangqiquan
555 */
556 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync001, TestSize.Level3)
557 {
558 DBStatus status = OK;
559 std::vector<std::string> devices;
560 devices.push_back(g_deviceB->GetDeviceId());
561
562 /**
563 * @tc.steps: step1. make sure A sync start
564 */
565 bool sleep = false;
__anonb4f3bed90d02(const std::string &target, DistributedDB::Message *msg) 566 g_communicatorAggregator->RegOnDispatch([&sleep](const std::string &target, DistributedDB::Message *msg) {
567 if (!sleep) {
568 sleep = true;
569 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s for waiting close db
570 }
571 });
572
573 KvStoreNbDelegate* kvDelegatePtrA = nullptr;
574 KvStoreNbDelegate::Option option;
__anonb4f3bed90e02(DBStatus s, KvStoreNbDelegate *delegate) 575 g_mgr.GetKvStore(STORE_ID, option, [&status, &kvDelegatePtrA](DBStatus s, KvStoreNbDelegate *delegate) {
576 status = s;
577 kvDelegatePtrA = delegate;
578 });
579 EXPECT_EQ(status, OK);
580 EXPECT_NE(kvDelegatePtrA, nullptr);
581
582 Key key = {'k'};
583 Value value = {'v'};
584 kvDelegatePtrA->Put(key, value);
585 std::map<std::string, DBStatus> result;
__anonb4f3bed90f02(const std::map<std::string, DBStatus>& statusMap) 586 auto callback = [&result](const std::map<std::string, DBStatus>& statusMap) {
587 result = statusMap;
588 };
589 /**
590 * @tc.steps: step2. deviceA sync and then close
591 * @tc.expected: step2. sync should abort and data don't exist in B
592 */
__anonb4f3bed91002() 593 std::thread closeThread([&kvDelegatePtrA]() {
594 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for waiting sync start
595 EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrA), OK);
596 });
597 EXPECT_EQ(kvDelegatePtrA->Sync(devices, SYNC_MODE_PUSH_ONLY, callback, false), OK);
598 LOGD("Sync finish");
599 closeThread.join();
600 std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s for waiting sync finish
601 EXPECT_EQ(result.size(), 0u);
602 VirtualDataItem actualValue;
603 EXPECT_EQ(g_deviceB->GetData(key, actualValue), -E_NOT_FOUND);
604 g_communicatorAggregator->RegOnDispatch(nullptr);
605 }
606
607 /**
608 * @tc.name: CloseSync002
609 * @tc.desc: Test 1 delegate close when in time sync
610 * @tc.type: FUNC
611 * @tc.require: AR000CCPOM
612 * @tc.author: zhangqiquan
613 */
614 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync002, TestSize.Level3)
615 {
616 /**
617 * @tc.steps: step1. invalid time sync packet from A
618 */
__anonb4f3bed91102(const std::string &target, DistributedDB::Message *msg) 619 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
620 ASSERT_NE(msg, nullptr);
621 if (target == DEVICE_B && msg->GetMessageId() == TIME_SYNC_MESSAGE && msg->GetMessageType() == TYPE_REQUEST) {
622 msg->SetMessageId(INVALID_MESSAGE_ID);
623 LOGD("Message is invalid");
624 }
625 });
626 Timestamp currentTime;
627 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
628 g_deviceB->PutData({'k'}, {'v'}, currentTime, 0);
629
630 /**
631 * @tc.steps: step2. B PUSH to A and A close after 1s
632 * @tc.expected: step2. A closing time cost letter than 4s
633 */
__anonb4f3bed91202() 634 std::thread closingThread([]() {
635 std::this_thread::sleep_for(std::chrono::seconds(1));
636 LOGD("Begin Close");
637 Timestamp beginTime;
638 (void)OS::GetCurrentSysTimeInMicrosecond(beginTime);
639 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
640 Timestamp endTime;
641 (void)OS::GetCurrentSysTimeInMicrosecond(endTime);
642 EXPECT_LE(static_cast<int>(endTime - beginTime), 4 * 1000 * 1000); // waiting 4 * 1000 * 1000 us
643 LOGD("End Close");
644 });
645 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
646 closingThread.join();
647
648 /**
649 * @tc.steps: step3. remove db
650 * @tc.expected: step3. remove ok
651 */
652 g_kvDelegatePtr = nullptr;
653 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
654 LOGD("delete kv store status %d", status);
655 ASSERT_TRUE(status == OK);
656 g_communicatorAggregator->RegOnDispatch(nullptr);
657 }
658
659 /**
660 * @tc.name: OrderbyWriteTimeSync001
661 * @tc.desc: sync query with order by writeTime
662 * @tc.type: FUNC
663 * @tc.require: AR000H5VLO
664 * @tc.author: zhuwentao
665 */
666 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, OrderbyWriteTimeSync001, TestSize.Level0)
667 {
668 /**
669 * @tc.steps: step1. deviceA subscribe query with order by write time
670 * * @tc.expected: step1. interface return not support
671 */
672 std::vector<std::string> devices;
673 devices.push_back(g_deviceB->GetDeviceId());
674 Query query = Query::Select().PrefixKey({'k'}).OrderByWriteTime(true);
675 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, nullptr, query, true), NOT_SUPPORT);
676 }
677
678
679 /**
680 * @tc.name: Device Offline Sync 001
681 * @tc.desc: Test push sync when device offline
682 * @tc.type: FUNC
683 * @tc.require: AR000CCPOM
684 * @tc.author: xushaohua
685 */
686 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync001, TestSize.Level1)
687 {
688 std::vector<std::string> devices;
689 devices.push_back(g_deviceB->GetDeviceId());
690 devices.push_back(g_deviceC->GetDeviceId());
691
692 /**
693 * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2}, {k3 delete}, {k4,v2}
694 */
695 Key key1 = {'1'};
696 Value value1 = {'1'};
697 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value1) == OK);
698
699 Key key2 = {'2'};
700 Value value2 = {'2'};
701 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value2) == OK);
702
703 Key key3 = {'3'};
704 Value value3 = {'3'};
705 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value3) == OK);
706 ASSERT_TRUE(g_kvDelegatePtr->Delete(key3) == OK);
707
708 Key key4 = {'4'};
709 Value value4 = {'4'};
710 ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value4) == OK);
711
712 /**
713 * @tc.steps: step2. deviceB offline
714 */
715 g_deviceB->Offline();
716
717 /**
718 * @tc.steps: step3. deviceA call pull sync
719 * @tc.expected: step3. sync should return OK.
720 */
721 std::map<std::string, DBStatus> result;
722 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
723 ASSERT_TRUE(status == OK);
724
725 /**
726 * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
727 * deviceC has {k1, v1}, {k2, v2}, {k3 delete}, {k4,v4}
728 */
729 for (const auto &pair : result) {
730 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
731 if (pair.first == DEVICE_B) {
732 // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
733 // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
734 // The returned status is COMM_FAILURE
735 EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
736 (pair.second == COMM_FAILURE));
737 } else {
738 EXPECT_EQ(pair.second, OK);
739 }
740 }
741 VirtualDataItem item;
742 g_deviceC->GetData(key1, item);
743 EXPECT_TRUE(item.value == value1);
744 item.value.clear();
745 g_deviceC->GetData(key2, item);
746 EXPECT_TRUE(item.value == value2);
747 item.value.clear();
748 Key hashKey;
749 DistributedDBToolsUnitTest::CalcHash(key3, hashKey);
750 EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND);
751 item.value.clear();
752 g_deviceC->GetData(key4, item);
753 EXPECT_TRUE(item.value == value4);
754 }
755
756 /**
757 * @tc.name: Device Offline Sync 002
758 * @tc.desc: Test pull sync when device offline
759 * @tc.type: FUNC
760 * @tc.require: AR000CCPOM
761 * @tc.author: xushaohua
762 */
763 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync002, TestSize.Level1)
764 {
765 std::vector<std::string> devices;
766 devices.push_back(g_deviceB->GetDeviceId());
767 devices.push_back(g_deviceC->GetDeviceId());
768
769 /**
770 * @tc.steps: step1. deviceB put {k1, v1}
771 */
772 Key key1 = {'1'};
773 Value value1 = {'1'};
774 g_deviceB->PutData(key1, value1, 0, 0);
775
776 /**
777 * @tc.steps: step2. deviceB offline
778 */
779 g_deviceB->Offline();
780
781 /**
782 * @tc.steps: step3. deviceC put {k2, v2}, {k3, delete}, {k4, v4}
783 */
784 Key key2 = {'2'};
785 Value value2 = {'2'};
786 g_deviceC->PutData(key2, value2, 0, 0);
787
788 Key key3 = {'3'};
789 Value value3 = {'3'};
790 g_deviceC->PutData(key3, value3, 0, 1);
791
792 Key key4 = {'4'};
793 Value value4 = {'4'};
794 g_deviceC->PutData(key4, value4, 0, 0);
795
796 /**
797 * @tc.steps: step2. deviceA call pull sync
798 * @tc.expected: step2. sync should return OK.
799 */
800 std::map<std::string, DBStatus> result;
801 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
802 ASSERT_TRUE(status == OK);
803
804 /**
805 * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
806 * deviceA has {k2, v2}, {k3 delete}, {k4,v4}
807 */
808 for (const auto &pair : result) {
809 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
810 if (pair.first == DEVICE_B) {
811 // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
812 // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
813 // The returned status is COMM_FAILURE
814 EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
815 (pair.second == COMM_FAILURE));
816 } else {
817 EXPECT_EQ(pair.second, OK);
818 }
819 }
820
821 Value value5;
822 EXPECT_TRUE(g_kvDelegatePtr->Get(key1, value5) != OK);
823 g_kvDelegatePtr->Get(key2, value5);
824 EXPECT_EQ(value5, value2);
825 EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value5) != OK);
826 g_kvDelegatePtr->Get(key4, value5);
827 EXPECT_EQ(value5, value4);
828 }
829
830 /**
831 * @tc.name: EncryptedAlgoUpgrade001
832 * @tc.desc: Test upgrade encrypted db can sync normally
833 * @tc.type: FUNC
834 * @tc.require: AR000HI2JS
835 * @tc.author: zhuwentao
836 */
837 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, EncryptedAlgoUpgrade001, TestSize.Level3)
838 {
839 /**
840 * @tc.steps: step1. clear db
841 * * @tc.expected: step1. interface return ok
842 */
843 if (g_kvDelegatePtr != nullptr) {
844 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
845 g_kvDelegatePtr = nullptr;
846 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
847 LOGD("delete kv store status %d", status);
848 ASSERT_TRUE(status == OK);
849 }
850
851 CipherPassword passwd;
852 std::vector<uint8_t> passwdVect = {'p', 's', 'd', '1'};
853 passwd.SetValue(passwdVect.data(), passwdVect.size());
854 /**
855 * @tc.steps: step2. open old db by sql
856 * * @tc.expected: step2. interface return ok
857 */
858 std::string identifier = DBCommon::GenerateIdentifierId(STORE_ID, APP_ID, USER_ID);
859 std::string hashDir = DBCommon::TransferHashString(identifier);
860 std::string hexHashDir = DBCommon::TransferStringToHex(hashDir);
861 std::string dbPath = g_testDir + "/" + hexHashDir + "/single_ver";
862 ASSERT_TRUE(DBCommon::CreateDirectory(g_testDir + "/" + hexHashDir) == OK);
863 ASSERT_TRUE(DBCommon::CreateDirectory(dbPath) == OK);
864 std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
865 for (const auto &item : dbDir) {
866 ASSERT_TRUE(DBCommon::CreateDirectory(dbPath + "/" + item) == OK);
867 }
868 uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
869 sqlite3 *db;
870 std::string fileUrl = dbPath + "/" + DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + ".db";
871 ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr) == SQLITE_OK);
872 SQLiteUtils::SetKeyInner(db, CipherType::AES_256_GCM, passwd, DBConstant::DEFAULT_ITER_TIMES);
873 /**
874 * @tc.steps: step3. create table and close
875 * * @tc.expected: step3. interface return ok
876 */
877 ASSERT_TRUE(SQLiteUtils::ExecuteRawSQL(db, CREATE_SYNC_TABLE_SQL) == E_OK);
878 sqlite3_close_v2(db);
879 db = nullptr;
880 LOGI("create old db success");
881 /**
882 * @tc.steps: step4. get kvstore
883 * * @tc.expected: step4. interface return ok
884 */
885 KvStoreNbDelegate::Option option;
886 option.isEncryptedDb = true;
887 option.cipher = CipherType::AES_256_GCM;
888 option.passwd = passwd;
889 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
890 ASSERT_TRUE(g_kvDelegateStatus == OK);
891 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
892 /**
893 * @tc.steps: step5. sync ok
894 * * @tc.expected: step5. interface return ok
895 */
896 PullSyncTest();
897 /**
898 * @tc.steps: step5. crud ok
899 * * @tc.expected: step5. interface return ok
900 */
901 CrudTest();
902 }
903
904 /**
905 * @tc.name: RemoveDeviceData002
906 * @tc.desc: test remove device data before sync
907 * @tc.type: FUNC
908 * @tc.require:
909 * @tc.author: zhuwentao
910 */
911 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData002, TestSize.Level1)
912 {
913 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
914 /**
915 * @tc.steps: step1. sync deviceB data to A and check data
916 * * @tc.expected: step1. interface return ok
917 */
918 Key key1 = {'1'};
919 Key key2 = {'2'};
920 Value value = {'1'};
921 Timestamp currentTime;
922 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
923 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
924 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
925 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
926 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
927 Value actualValue;
928 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
929 EXPECT_EQ(actualValue, value);
930 actualValue.clear();
931 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
932 EXPECT_EQ(actualValue, value);
933 /**
934 * @tc.steps: step2. call RemoveDeviceData
935 * * @tc.expected: step2. interface return ok
936 */
937 g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
938 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
939 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
940 /**
941 * @tc.steps: step3. sync to device A again and check data
942 * * @tc.expected: step3. sync ok
943 */
944 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
945 actualValue.clear();
946 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
947 EXPECT_EQ(actualValue, value);
948 actualValue.clear();
949 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
950 EXPECT_EQ(actualValue, value);
951 }
952
953 /**
954 * @tc.name: DataSync001
955 * @tc.desc: Test Data Sync when Initialize
956 * @tc.type: FUNC
957 * @tc.require: AR000HI2JS
958 * @tc.author: zhuwentao
959 */
960 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync001, TestSize.Level1)
961 {
962 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
963 ASSERT_TRUE(dataSync != nullptr);
964 std::shared_ptr<Metadata> inMetadata = nullptr;
965 std::string deviceId;
966 Message message;
967 VirtualSingleVerSyncDBInterface tmpInterface;
968 VirtualCommunicator tmpCommunicator(deviceId, g_communicatorAggregator);
969 EXPECT_EQ(dataSync->Initialize(nullptr, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
970 EXPECT_EQ(dataSync->Initialize(&tmpInterface, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
971 EXPECT_EQ(dataSync->Initialize(&tmpInterface, &tmpCommunicator, inMetadata, deviceId), -E_INVALID_ARGS);
972 delete dataSync;
973 }
974
975 /**
976 * @tc.name: DataSync002
977 * @tc.desc: Test active sync with invalid param in DataSync Class
978 * @tc.type: FUNC
979 * @tc.require: AR000HI2JS
980 * @tc.author: zhuwentao
981 */
982 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync002, TestSize.Level1)
983 {
984 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
985 ASSERT_TRUE(dataSync != nullptr);
986 Message message;
987 EXPECT_EQ(dataSync->TryContinueSync(nullptr, &message), -E_INVALID_ARGS);
988 EXPECT_EQ(dataSync->TryContinueSync(nullptr, nullptr), -E_INVALID_ARGS);
989 EXPECT_EQ(dataSync->PushStart(nullptr), -E_INVALID_ARGS);
990 EXPECT_EQ(dataSync->PushPullStart(nullptr), -E_INVALID_ARGS);
991 EXPECT_EQ(dataSync->PullRequestStart(nullptr), -E_INVALID_ARGS);
992 EXPECT_EQ(dataSync->PullResponseStart(nullptr), -E_INVALID_ARGS);
993 delete dataSync;
994 }
995
996 /**
997 * @tc.name: DataSync003
998 * @tc.desc: Test receive invalid request data packet in DataSync Class
999 * @tc.type: FUNC
1000 * @tc.require: AR000HI2JS
1001 * @tc.author: zhuwentao
1002 */
1003 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync003, TestSize.Level1)
1004 {
1005 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1006 ASSERT_TRUE(dataSync != nullptr);
1007 uint64_t tmpMark = 0;
1008 Message message;
1009 EXPECT_EQ(dataSync->DataRequestRecv(nullptr, nullptr, tmpMark), -E_INVALID_ARGS);
1010 EXPECT_EQ(dataSync->DataRequestRecv(nullptr, &message, tmpMark), -E_INVALID_ARGS);
1011 delete dataSync;
1012 }
1013
1014 /**
1015 * @tc.name: DataSync004
1016 * @tc.desc: Test receive invalid ack packet in DataSync Class
1017 * @tc.type: FUNC
1018 * @tc.require: AR000HI2JS
1019 * @tc.author: zhuwentao
1020 */
1021 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync004, TestSize.Level1)
1022 {
1023 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1024 ASSERT_TRUE(dataSync != nullptr);
1025 Message message;
1026 TestSingleVerKvSyncTaskContext tmpContext;
1027 EXPECT_EQ(dataSync->AckPacketIdCheck(nullptr), false);
1028 EXPECT_EQ(dataSync->AckPacketIdCheck(&message), false);
1029 EXPECT_EQ(dataSync->AckRecv(&tmpContext, nullptr), -E_INVALID_ARGS);
1030 EXPECT_EQ(dataSync->AckRecv(nullptr, nullptr), -E_INVALID_ARGS);
1031 EXPECT_EQ(dataSync->AckRecv(nullptr, &message), -E_INVALID_ARGS);
1032 delete dataSync;
1033 }
1034
1035 /**
1036 * @tc.name: DataSync005
1037 * @tc.desc: Test receive invalid notify packet in DataSync Class
1038 * @tc.type: FUNC
1039 * @tc.require: AR000HI2JS
1040 * @tc.author: zhuwentao
1041 */
1042 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync005, TestSize.Level1)
1043 {
1044 ASSERT_NO_FATAL_FAILURE(DataSync005());
1045 }
1046
1047 /**
1048 * @tc.name: DataSync006
1049 * @tc.desc: Test control start with invalid param in DataSync Class
1050 * @tc.type: FUNC
1051 * @tc.require: AR000HI2JS
1052 * @tc.author: zhuwentao
1053 */
1054 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync006, TestSize.Level1)
1055 {
1056 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1057 ASSERT_TRUE(dataSync != nullptr);
1058 TestSingleVerKvSyncTaskContext tmpContext;
1059 EXPECT_EQ(dataSync->ControlCmdStart(nullptr), -E_INVALID_ARGS);
1060 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1061 std::shared_ptr<SubscribeManager> subManager = std::make_shared<SubscribeManager>();
1062 tmpContext.SetSubscribeManager(subManager);
1063 tmpContext.SetMode(SyncModeType::INVALID_MODE);
1064 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1065 std::set<Key> Keys = {{'a'}, {'b'}};
1066 Query query = Query::Select().InKeys(Keys);
1067 QuerySyncObject innerQuery(query);
1068 tmpContext.SetQuery(innerQuery);
1069 tmpContext.SetMode(SyncModeType::SUBSCRIBE_QUERY);
1070 EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_NOT_SUPPORT);
1071 delete dataSync;
1072 subManager = nullptr;
1073 }
1074
1075 /**
1076 * @tc.name: DataSync007
1077 * @tc.desc: Test receive invalid control packet in DataSync Class
1078 * @tc.type: FUNC
1079 * @tc.require: AR000HI2JS
1080 * @tc.author: zhuwentao
1081 */
1082 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync007, TestSize.Level1)
1083 {
1084 SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1085 ASSERT_TRUE(dataSync != nullptr);
1086 Message message;
1087 ControlRequestPacket packet;
1088 TestSingleVerKvSyncTaskContext tmpContext;
1089 EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1090 message.SetCopiedObject(packet);
1091 EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1092 delete dataSync;
1093 }
1094
1095 /**
1096 * @tc.name: DataSync008
1097 * @tc.desc: Test pull null msg in dataQueue in DataSync Class
1098 * @tc.type: FUNC
1099 * @tc.require: AR000HI2JS
1100 * @tc.author: zhuwentao
1101 */
1102 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync008, TestSize.Level1)
1103 {
1104 ASSERT_NO_FATAL_FAILURE(DataSync008());
1105 }
1106
1107 /**
1108 * @tc.name: SyncRetry001
1109 * @tc.desc: use sync retry sync use push
1110 * @tc.type: FUNC
1111 * @tc.require: AR000CKRTD AR000CQE0E
1112 * @tc.author: zhuwentao
1113 */
1114 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry001, TestSize.Level3)
1115 {
1116 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1117 std::vector<std::string> devices;
1118 devices.push_back(g_deviceB->GetDeviceId());
1119
1120 /**
1121 * @tc.steps: step1. set sync retry
1122 * @tc.expected: step1, Pragma return OK.
1123 */
1124 int pragmaData = 1;
1125 PragmaData input = static_cast<PragmaData>(&pragmaData);
1126 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1127
1128 /**
1129 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1130 */
1131 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1132
1133 /**
1134 * @tc.steps: step3. deviceA call sync and wait
1135 * @tc.expected: step3. sync should return OK.
1136 */
1137 std::map<std::string, DBStatus> result;
1138 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1139
1140 /**
1141 * @tc.expected: step4. onComplete should be called, and status is time_out
1142 */
1143 ASSERT_TRUE(result.size() == devices.size());
1144 for (const auto &pair : result) {
1145 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1146 EXPECT_TRUE(pair.second == OK);
1147 }
1148 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1149 }
1150
1151 /**
1152 * @tc.name: SyncRetry002
1153 * @tc.desc: use sync retry sync use pull
1154 * @tc.type: FUNC
1155 * @tc.require: AR000CKRTD AR000CQE0E
1156 * @tc.author: zhuwentao
1157 */
1158 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry002, TestSize.Level3)
1159 {
1160 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE, 4u);
1161 std::vector<std::string> devices;
1162 devices.push_back(g_deviceB->GetDeviceId());
1163
1164 /**
1165 * @tc.steps: step1. set sync retry
1166 * @tc.expected: step1, Pragma return OK.
1167 */
1168 int pragmaData = 1;
1169 PragmaData input = static_cast<PragmaData>(&pragmaData);
1170 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1171
1172 /**
1173 * @tc.steps: step2. deviceA call sync and wait
1174 * @tc.expected: step2. sync should return OK.
1175 */
1176 std::map<std::string, DBStatus> result;
1177 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1178
1179 /**
1180 * @tc.expected: step3. onComplete should be called, and status is time_out
1181 */
1182 ASSERT_TRUE(result.size() == devices.size());
1183 for (const auto &pair : result) {
1184 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1185 EXPECT_TRUE(pair.second == TIME_OUT);
1186 }
1187 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1188 }
1189
1190 /**
1191 * @tc.name: SyncRetry003
1192 * @tc.desc: use sync retry sync use push by compress
1193 * @tc.type: FUNC
1194 * @tc.require: AR000CKRTD AR000CQE0E
1195 * @tc.author: zhuwentao
1196 */
1197 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry003, TestSize.Level3)
1198 {
1199 if (g_kvDelegatePtr != nullptr) {
1200 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1201 g_kvDelegatePtr = nullptr;
1202 }
1203 /**
1204 * @tc.steps: step1. open db use Compress
1205 * @tc.expected: step1, Pragma return OK.
1206 */
1207 KvStoreNbDelegate::Option option;
1208 option.isNeedCompressOnSync = true;
1209 option.compressionRate = 70;
1210 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1211 ASSERT_TRUE(g_kvDelegateStatus == OK);
1212 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1213
1214 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1215 std::vector<std::string> devices;
1216 devices.push_back(g_deviceB->GetDeviceId());
1217
1218 /**
1219 * @tc.steps: step2. set sync retry
1220 * @tc.expected: step2, Pragma return OK.
1221 */
1222 int pragmaData = 1;
1223 PragmaData input = static_cast<PragmaData>(&pragmaData);
1224 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1225
1226 /**
1227 * @tc.steps: step3. deviceA put {k1, v1}, {k2, v2}
1228 */
1229 ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1230
1231 /**
1232 * @tc.steps: step4. deviceA call sync and wait
1233 * @tc.expected: step4. sync should return OK.
1234 */
1235 std::map<std::string, DBStatus> result;
1236 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1237
1238 /**
1239 * @tc.expected: step5. onComplete should be called, and status is time_out
1240 */
1241 ASSERT_TRUE(result.size() == devices.size());
1242 for (const auto &pair : result) {
1243 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1244 EXPECT_TRUE(pair.second == OK);
1245 }
1246 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1247 }
1248
1249 /**
1250 * @tc.name: SyncRetry004
1251 * @tc.desc: use query sync retry sync use push
1252 * @tc.type: FUNC
1253 * @tc.require: AR000CKRTD AR000CQE0E
1254 * @tc.author: zhuwentao
1255 */
1256 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry004, TestSize.Level3)
1257 {
1258 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1259 std::vector<std::string> devices;
1260 devices.push_back(g_deviceB->GetDeviceId());
1261
1262 /**
1263 * @tc.steps: step1. set sync retry
1264 * @tc.expected: step1, Pragma return OK.
1265 */
1266 int pragmaData = 1;
1267 PragmaData input = static_cast<PragmaData>(&pragmaData);
1268 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1269
1270 /**
1271 * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1272 */
1273 for (int i = 0; i < 5; i++) {
1274 Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 128); // rand num 1024 for test
1275 Value value;
1276 DistributedDBToolsUnitTest::GetRandomKeyValue(value, 256u);
1277 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1278 }
1279
1280 /**
1281 * @tc.steps: step3. deviceA call sync and wait
1282 * @tc.expected: step3. sync should return OK.
1283 */
1284 std::map<std::string, DBStatus> result;
1285 std::vector<uint8_t> prefixKey({'a', 'b'});
1286 Query query = Query::Select().PrefixKey(prefixKey);
1287 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
1288
1289 /**
1290 * @tc.expected: step4. onComplete should be called, and status is time_out
1291 */
1292 ASSERT_TRUE(result.size() == devices.size());
1293 for (const auto &pair : result) {
1294 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1295 EXPECT_TRUE(pair.second == OK);
1296 }
1297 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1298 }
1299
1300 /**
1301 * @tc.name: SyncRetry005
1302 * @tc.desc: use sync retry sync use pull by compress
1303 * @tc.type: FUNC
1304 * @tc.require: AR000CKRTD AR000CQE0E
1305 * @tc.author: zhangqiquan
1306 */
1307 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry005, TestSize.Level3)
1308 {
1309 if (g_kvDelegatePtr != nullptr) {
1310 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1311 g_kvDelegatePtr = nullptr;
1312 }
1313 /**
1314 * @tc.steps: step1. open db use Compress
1315 * @tc.expected: step1, Pragma return OK.
1316 */
1317 KvStoreNbDelegate::Option option;
1318 option.isNeedCompressOnSync = true;
1319 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1320 ASSERT_TRUE(g_kvDelegateStatus == OK);
1321 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1322
1323 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1324 std::vector<std::string> devices;
1325 devices.push_back(g_deviceB->GetDeviceId());
1326
1327 /**
1328 * @tc.steps: step2. set sync retry
1329 * @tc.expected: step2, Pragma return OK.
1330 */
1331 int pragmaData = 1;
1332 PragmaData input = static_cast<PragmaData>(&pragmaData);
1333 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1334
1335 /**
1336 * @tc.steps: step3. deviceA call sync and wait
1337 * @tc.expected: step3. sync should return OK.
1338 */
1339 std::map<std::string, DBStatus> result;
1340 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1341
1342 /**
1343 * @tc.expected: step4. onComplete should be called, and status is time_out
1344 */
1345 ASSERT_TRUE(result.size() == devices.size());
1346 for (const auto &pair : result) {
1347 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1348 EXPECT_EQ(pair.second, OK);
1349 }
1350 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1351 }
1352
1353 /**
1354 * @tc.name: ReSetWatchDogTest001
1355 * @tc.desc: trigger resetWatchDog while pull
1356 * @tc.type: FUNC
1357 * @tc.require: AR000CKRTD AR000CQE0E
1358 * @tc.author: zhuwentao
1359 */
1360 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, ReSetWaterDogTest001, TestSize.Level3)
1361 {
1362 ASSERT_NO_FATAL_FAILURE(ReSetWaterDogTest001());
1363 }
1364
1365 /**
1366 * @tc.name: RebuildSync001
1367 * @tc.desc: rebuild db and sync again
1368 * @tc.type: FUNC
1369 * @tc.require:
1370 * @tc.author: zhuwentao
1371 */
1372 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync001, TestSize.Level3)
1373 {
1374 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1375 /**
1376 * @tc.steps: step1. sync deviceB data to A and check data
1377 * * @tc.expected: step1. interface return ok
1378 */
1379 Key key1 = {'1'};
1380 Key key2 = {'2'};
1381 Value value = {'1'};
1382 Timestamp currentTime;
1383 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1384 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1385 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1386 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1387 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1388
1389 Value actualValue;
1390 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1391 EXPECT_EQ(actualValue, value);
1392 actualValue.clear();
1393 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1394 EXPECT_EQ(actualValue, value);
1395 /**
1396 * @tc.steps: step2. delete db and rebuild
1397 * * @tc.expected: step2. interface return ok
1398 */
1399 g_mgr.CloseKvStore(g_kvDelegatePtr);
1400 g_kvDelegatePtr = nullptr;
1401 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1402 KvStoreNbDelegate::Option option;
1403 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1404 ASSERT_TRUE(g_kvDelegateStatus == OK);
1405 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1406 /**
1407 * @tc.steps: step3. sync to device A again
1408 * * @tc.expected: step3. sync ok
1409 */
1410 value = {'2'};
1411 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1412 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1413 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1414 /**
1415 * @tc.steps: step4. check data in device A
1416 * * @tc.expected: step4. check ok
1417 */
1418 actualValue.clear();
1419 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1420 EXPECT_EQ(actualValue, value);
1421 }
1422
1423 /**
1424 * @tc.name: RebuildSync002
1425 * @tc.desc: test clear remote data when receive data
1426 * @tc.type: FUNC
1427 * @tc.require:
1428 * @tc.author: zhuwentao
1429 */
1430 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync002, TestSize.Level1)
1431 {
1432 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1433 std::vector<std::string> devices;
1434 devices.push_back(g_deviceB->GetDeviceId());
1435 /**
1436 * @tc.steps: step1. device A SET_WIPE_POLICY
1437 * * @tc.expected: step1. interface return ok
1438 */
1439 int pragmaData = 2; // 2 means enable
1440 PragmaData input = static_cast<PragmaData>(&pragmaData);
1441 EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK);
1442 /**
1443 * @tc.steps: step2. sync deviceB data to A and check data
1444 * * @tc.expected: step2. interface return ok
1445 */
1446 Key key1 = {'1'};
1447 Key key2 = {'2'};
1448 Key key3 = {'3'};
1449 Key key4 = {'4'};
1450 Value value = {'1'};
1451 Timestamp currentTime;
1452 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1453 EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1454 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1455 EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1456 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1457 /**
1458 * @tc.steps: step3. deviceA call pull sync
1459 * @tc.expected: step3. sync should return OK.
1460 */
1461 std::map<std::string, DBStatus> result;
1462 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK);
1463
1464 /**
1465 * @tc.expected: step4. onComplete should be called, check data
1466 */
1467 ASSERT_TRUE(result.size() == devices.size());
1468 for (const auto &pair : result) {
1469 EXPECT_TRUE(pair.second == OK);
1470 }
1471 Value actualValue;
1472 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1473 EXPECT_EQ(actualValue, value);
1474 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1475 EXPECT_EQ(actualValue, value);
1476 /**
1477 * @tc.steps: step5. device B rebuild and put some data
1478 * * @tc.expected: step5. rebuild ok
1479 */
1480 if (g_deviceB != nullptr) {
1481 delete g_deviceB;
1482 g_deviceB = nullptr;
1483 }
1484 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
1485 ASSERT_TRUE(g_deviceB != nullptr);
1486 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1487 ASSERT_TRUE(syncInterfaceB != nullptr);
1488 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
1489 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1490 EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK);
1491 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1492 EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK);
1493 /**
1494 * @tc.steps: step6. sync to device A again and check data
1495 * * @tc.expected: step6. sync ok
1496 */
1497 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1498 EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1499 EXPECT_EQ(actualValue, value);
1500 EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1501 EXPECT_EQ(actualValue, value);
1502 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1503 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
1504 }
1505
1506 /**
1507 * @tc.name: RebuildSync003
1508 * @tc.desc: test clear history data when receive ack
1509 * @tc.type: FUNC
1510 * @tc.require:
1511 * @tc.author: zhuwentao
1512 */
1513 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync003, TestSize.Level1)
1514 {
1515 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1516 /**
1517 * @tc.steps: step1. sync deviceB data to A and check data
1518 * * @tc.expected: step1. interface return ok
1519 */
1520 Key key1 = {'1'};
1521 Key key2 = {'2'};
1522 Key key3 = {'3'};
1523 Key key4 = {'4'};
1524 Value value = {'1'};
1525 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1526 EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp
1527 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1528 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1529 Value actualValue;
1530 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1531 EXPECT_EQ(actualValue, value);
1532 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1533 EXPECT_EQ(actualValue, value);
1534 VirtualDataItem item;
1535 EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1536 EXPECT_EQ(item.value, value);
1537 /**
1538 * @tc.steps: step2. device B sync to device A,but make it failed
1539 * * @tc.expected: step2. interface return ok
1540 */
1541 EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp
1542 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE);
1543 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1544 /**
1545 * @tc.steps: step3. device B set delay send time
1546 * * @tc.expected: step3. interface return ok
1547 */
1548 std::set<std::string> delayDevice = {DEVICE_B};
1549 g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice); // delay 3000ms one time
1550 /**
1551 * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1552 * * @tc.expected: step4. interface return ok
1553 */
1554 g_deviceB->SetClearRemoteStaleData(true);
1555 g_mgr.CloseKvStore(g_kvDelegatePtr);
1556 g_kvDelegatePtr = nullptr;
1557 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1558 KvStoreNbDelegate::Option option;
1559 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1560 ASSERT_TRUE(g_kvDelegateStatus == OK);
1561 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1562 std::map<std::string, DBStatus> result;
1563 std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
1564 g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1565 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1566 /**
1567 * @tc.steps: step5. device B sync to A, make it clear history data and check data
1568 * * @tc.expected: step5. interface return ok
1569 */
1570 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1571 EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1572 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1573 EXPECT_EQ(actualValue, value);
1574 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1575 EXPECT_EQ(actualValue, value);
1576 g_communicatorAggregator->ResetSendDelayInfo();
1577 }
1578
1579 /**
1580 * @tc.name: RebuildSync004
1581 * @tc.desc: test WIPE_STALE_DATA mode when peers rebuilt db
1582 * @tc.type: FUNC
1583 * @tc.require:
1584 * @tc.author: zhangtao
1585 */
1586 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync004, TestSize.Level1)
1587 {
1588 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1589 /**
1590 * @tc.steps: step1. sync deviceB data to A and check data
1591 * * @tc.expected: step1. interface return ok
1592 */
1593 Key key1 = {'1'};
1594 Key key2 = {'2'};
1595 Key key3 = {'3'};
1596 Key key4 = {'4'};
1597 Value value = {'1'};
1598 EXPECT_EQ(g_kvDelegatePtr->Put(key1, value), OK);
1599 EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1600 EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1601 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1602 Value actualValue;
1603 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1604 EXPECT_EQ(actualValue, value);
1605 EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1606 EXPECT_EQ(actualValue, value);
1607 EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1608 EXPECT_EQ(actualValue, value);
1609 VirtualDataItem item;
1610 EXPECT_EQ(g_deviceB->GetData(key1, item), E_OK);
1611 EXPECT_EQ(item.value, value);
1612 EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1613 EXPECT_EQ(item.value, value);
1614 EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1615 EXPECT_EQ(item.value, value);
1616
1617 /**
1618 * @tc.steps: step2. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1619 * * @tc.expected: step2. interface return ok
1620 */
1621 g_deviceB->SetClearRemoteStaleData(true);
1622 EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 2), E_OK); // 3: timestamp
1623
1624 VirtualDataItem item2;
1625 EXPECT_EQ(g_deviceB->GetData(key4, item2), E_OK);
1626 EXPECT_EQ(item2.value, value);
1627 g_mgr.CloseKvStore(g_kvDelegatePtr);
1628 g_kvDelegatePtr = nullptr;
1629 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1630 KvStoreNbDelegate::Option option;
1631 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1632 ASSERT_TRUE(g_kvDelegateStatus == OK);
1633 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1634
1635 /**
1636 * @tc.steps: step3. device B sync to A, make it clear history data and check data
1637 * * @tc.expected: step3. interface return ok
1638 */
1639 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1640 EXPECT_EQ(g_deviceB->GetData(key2, item), -E_NOT_FOUND);
1641 EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1642 EXPECT_EQ(g_deviceB->GetData(key4, item2), E_OK);
1643 EXPECT_EQ(item2.value, value);
1644 EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1645 EXPECT_EQ(actualValue, value);
1646 }
1647
1648 /**
1649 * @tc.name: RemoveDeviceData001
1650 * @tc.desc: call rekey and removeDeviceData Concurrently
1651 * @tc.type: FUNC
1652 * @tc.require: AR000D487B
1653 * @tc.author: zhuwentao
1654 */
1655 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData001, TestSize.Level1)
1656 {
1657 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1658 /**
1659 * @tc.steps: step1. sync deviceB data to A
1660 * * @tc.expected: step1. interface return ok
1661 */
1662 Key key1 = {'1'};
1663 Key key2 = {'2'};
1664 Value value = {'1'};
1665 g_deviceB->PutData(key1, value, 1, 0);
1666 g_deviceB->PutData(key2, value, 2, 0);
1667 g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true);
1668
1669 Value actualValue;
1670 g_kvDelegatePtr->Get(key1, actualValue);
1671 EXPECT_EQ(actualValue, value);
1672 actualValue.clear();
1673 g_kvDelegatePtr->Get(key2, actualValue);
1674 EXPECT_EQ(actualValue, value);
1675 /**
1676 * @tc.steps: step2. call Rekey and RemoveDeviceData Concurrently
1677 * * @tc.expected: step2. interface return ok
1678 */
__anonb4f3bed91302() 1679 std::thread thread1([]() {
1680 CipherPassword passwd3;
1681 std::vector<uint8_t> passwdVect = {'p', 's', 'd', 'z'};
1682 passwd3.SetValue(passwdVect.data(), passwdVect.size());
1683 g_kvDelegatePtr->Rekey(passwd3);
1684 });
__anonb4f3bed91402() 1685 std::thread thread2([]() {
1686 g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
1687 });
1688 thread1.join();
1689 thread2.join();
1690 }
1691
1692 /**
1693 * @tc.name: DeviceOfflineSyncTask001
1694 * @tc.desc: Test sync task when device offline and close db Concurrently
1695 * @tc.type: FUNC
1696 * @tc.require: AR000HI2JS
1697 * @tc.author: zhuwentao
1698 */
1699 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask001, TestSize.Level3)
1700 {
1701 DBStatus status = OK;
1702 std::vector<std::string> devices;
1703 devices.push_back(g_deviceB->GetDeviceId());
1704
1705 /**
1706 * @tc.steps: step1. deviceA put {k1, v1}
1707 */
1708 Key key = {'1'};
1709 Value value = {'1'};
1710 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1711
1712 /**
1713 * @tc.steps: step2. deviceA set auto sync and put some key/value
1714 * @tc.expected: step2. interface should return OK.
1715 */
1716 bool autoSync = true;
1717 PragmaData data = static_cast<PragmaData>(&autoSync);
1718 status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1719 ASSERT_EQ(status, OK);
1720
1721 Key key1 = {'2'};
1722 Key key2 = {'3'};
1723 Key key3 = {'4'};
1724 Key key4 = {'5'};
1725 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1726 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1727 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1728 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1729 ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value) == OK);
1730 /**
1731 * @tc.steps: step3. device offline and close db Concurrently
1732 * @tc.expected: step3. interface should return OK.
1733 */
__anonb4f3bed91502() 1734 std::thread thread1([]() {
1735 g_mgr.CloseKvStore(g_kvDelegatePtr);
1736 g_kvDelegatePtr = nullptr;
1737 });
__anonb4f3bed91602() 1738 std::thread thread2([]() {
1739 g_deviceB->Offline();
1740 });
1741 thread1.join();
1742 thread2.join();
1743 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1744 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1745 }
1746
1747 /**
1748 * @tc.name: DeviceOfflineSyncTask002
1749 * @tc.desc: Test sync task when autoSync and close db Concurrently
1750 * @tc.type: FUNC
1751 * @tc.require:
1752 * @tc.author: zhuwentao
1753 */
1754 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask002, TestSize.Level3)
1755 {
1756 DBStatus status = OK;
1757 g_deviceC->Offline();
1758
1759 /**
1760 * @tc.steps: step1. deviceA put {k1, v1}
1761 */
1762 Key key = {'1'};
1763 Value value = {'1'};
1764 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1765
1766 /**
1767 * @tc.steps: step2. deviceA set auto sync and put some key/value
1768 * @tc.expected: step2. interface should return OK.
1769 */
1770 bool autoSync = true;
1771 PragmaData data = static_cast<PragmaData>(&autoSync);
1772 status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1773 ASSERT_EQ(status, OK);
1774 std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME * 2));
1775
1776 Key key1 = {'2'};
1777 Key key2 = {'3'};
1778 Key key3 = {'4'};
1779 ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1780 ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1781 ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1782 /**
1783 * @tc.steps: step3. close db
1784 * @tc.expected: step3. interface should return OK.
1785 */
1786 g_mgr.CloseKvStore(g_kvDelegatePtr);
1787 g_kvDelegatePtr = nullptr;
1788 ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1789 }
1790
1791 /**
1792 * @tc.name: DeviceOfflineSyncTask003
1793 * @tc.desc: Test sync task when device offline after call sync
1794 * @tc.type: FUNC
1795 * @tc.require:
1796 * @tc.author: zhuwentao
1797 */
1798 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask003, TestSize.Level3)
1799 {
1800 std::vector<std::string> devices;
1801 devices.push_back(g_deviceB->GetDeviceId());
1802
1803 /**
1804 * @tc.steps: step1. deviceA put {k1, v1}
1805 */
1806 Key key = {'1'};
1807 Value value = {'1'};
1808 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1809 /**
1810 * @tc.steps: step2. device offline after call sync
1811 * @tc.expected: step2. interface should return OK.
1812 */
1813 Query query = Query::Select().PrefixKey(key);
1814 ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, query, false) == OK);
1815 std::this_thread::sleep_for(std::chrono::milliseconds(15)); // wait for 15ms
1816 g_deviceB->Offline();
1817 }
1818
1819 /**
1820 * @tc.name: GetSyncDataFail001
1821 * @tc.desc: test get sync data failed when sync
1822 * @tc.type: FUNC
1823 * @tc.require:
1824 * @tc.author: zhuwentao
1825 */
1826 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail001, TestSize.Level1)
1827 {
1828 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1829 /**
1830 * @tc.steps: step1. device B set get data errCode control and put some data
1831 * * @tc.expected: step1. interface return ok
1832 */
1833 g_deviceB->SetGetDataErrCode(1, -E_BUSY, true);
1834 Key key1 = {'1'};
1835 Value value = {'1'};
1836 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1837 /**
1838 * @tc.steps: step2. device B sync to device A and check data
1839 * * @tc.expected: step2. interface return ok
1840 */
1841 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1842 Value actualValue;
1843 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1844 g_deviceB->ResetDataControl();
1845 }
1846
1847 /**
1848 * @tc.name: GetSyncDataFail002
1849 * @tc.desc: test get sync data failed when sync with large data
1850 * @tc.type: FUNC
1851 * @tc.require: AR000D487B
1852 * @tc.author: zhuwentao
1853 */
1854 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail002, TestSize.Level1)
1855 {
1856 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1857 /**
1858 * @tc.steps: step1. device B set get data errCode control and put some data
1859 * * @tc.expected: step1. interface return ok
1860 */
1861 g_deviceB->SetGetDataErrCode(2, -E_BUSY, true);
1862 int totalSize = 4000u;
1863 std::vector<Entry> entries;
1864 std::vector<Key> keys;
1865 const int keyLen = 10; // 20 Bytes
1866 const int valueLen = 10; // 20 Bytes
1867 DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1868 uint32_t i = 1u;
1869 for (const auto &entry : entries) {
1870 EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1871 i++;
1872 }
1873 /**
1874 * @tc.steps: step2. device B sync to device A and check data
1875 * * @tc.expected: step2. interface return ok
1876 */
1877 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1878 std::this_thread::sleep_for(std::chrono::seconds(1));
1879 Value actualValue;
1880 for (int j = 1u; j <= totalSize; j++) {
1881 if (j > totalSize / 2) {
1882 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1883 } else {
1884 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1885 }
1886 }
1887 g_deviceB->ResetDataControl();
1888 }
1889
1890 /**
1891 * @tc.name: GetSyncDataFail003
1892 * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1893 * @tc.type: FUNC
1894 * @tc.require:
1895 * @tc.author: zhuwentao
1896 */
1897 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail003, TestSize.Level1)
1898 {
1899 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1900 /**
1901 * @tc.steps: step1. device B set get data errCode control and put some data
1902 * * @tc.expected: step1. interface return ok
1903 */
1904 g_deviceB->SetGetDataErrCode(1, -E_EKEYREVOKED, true);
1905 Key key1 = {'1'};
1906 Key key2 = {'3'};
1907 Value value = {'1'};
1908 EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1909 EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1910 /**
1911 * @tc.steps: step2. device B sync to device A and check data
1912 * * @tc.expected: step2. interface return ok
1913 */
1914 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1915 Value actualValue;
1916 EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1917 VirtualDataItem item;
1918 EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1919 g_deviceB->ResetDataControl();
1920 }
1921
1922 /**
1923 * @tc.name: GetSyncDataFail004
1924 * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1925 * @tc.type: FUNC
1926 * @tc.require:
1927 * @tc.author: zhuwentao
1928 */
1929 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail004, TestSize.Level1)
1930 {
1931 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1932 /**
1933 * @tc.steps: step1. device B set get data errCode control and put some data
1934 * * @tc.expected: step1. interface return ok
1935 */
1936 g_deviceB->SetGetDataErrCode(2, -E_EKEYREVOKED, true);
1937 int totalSize = 4000u;
1938 std::vector<Entry> entries;
1939 std::vector<Key> keys;
1940 const int keyLen = 10; // 20 Bytes
1941 const int valueLen = 10; // 20 Bytes
1942 DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1943 uint32_t i = 1u;
1944 for (const auto &entry : entries) {
1945 EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1946 i++;
1947 }
1948 Key key = {'a', 'b', 'c'};
1949 Value value = {'1'};
1950 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1951 /**
1952 * @tc.steps: step2. device B sync to device A and check data
1953 * * @tc.expected: step2. interface return ok
1954 */
1955 EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1956 std::this_thread::sleep_for(std::chrono::seconds(1));
1957 Value actualValue;
1958 for (int j = 1u; j <= totalSize; j++) {
1959 if (j > totalSize / 2) {
1960 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1961 } else {
1962 EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1963 }
1964 }
1965 VirtualDataItem item;
1966 EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1967 g_deviceB->ResetDataControl();
1968 }
1969
1970 /**
1971 * @tc.name: InterceptDataFail001
1972 * @tc.desc: test intercept data failed when sync
1973 * @tc.type: FUNC
1974 * @tc.require:
1975 * @tc.author: zhuwentao
1976 */
1977 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail001, TestSize.Level1)
1978 {
1979 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1980 /**
1981 * @tc.steps: step1. device A set intercept data errCode and put some data
1982 * * @tc.expected: step1. interface return ok
1983 */
1984 g_kvDelegatePtr->SetPushDataInterceptor(
__anonb4f3bed91702(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1985 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1986 int errCode = OK;
1987 auto entries = data.GetEntries();
1988 LOGD("====here111,size=%d", entries.size());
1989 for (size_t i = 0; i < entries.size(); i++) {
1990 Key newKey;
1991 errCode = data.ModifyKey(i, newKey);
1992 if (errCode != OK) {
1993 break;
1994 }
1995 }
1996 return errCode;
1997 }
1998 );
1999 Key key = {'1'};
2000 Value value = {'1'};
2001 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2002 /**
2003 * @tc.steps: step2. device A sync to device B and check data
2004 * * @tc.expected: step2. interface return ok
2005 */
2006 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2007 std::map<std::string, DBStatus> result;
2008 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
2009 ASSERT_TRUE(result.size() == devices.size());
2010 for (const auto &pair : result) {
2011 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2012 EXPECT_TRUE(pair.second == INTERCEPT_DATA_FAIL);
2013 }
2014 VirtualDataItem item;
2015 EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
2016 }
2017
2018 /**
2019 * @tc.name: InterceptDataFail002
2020 * @tc.desc: test intercept data failed when sync
2021 * @tc.type: FUNC
2022 * @tc.require:
2023 * @tc.author: zhangqiquan
2024 */
2025 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail002, TestSize.Level0)
2026 {
2027 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2028 /**
2029 * @tc.steps: step1. device A set intercept data errCode and B put some data
2030 * @tc.expected: step1. interface return ok
2031 */
2032 g_kvDelegatePtr->SetReceiveDataInterceptor(
__anonb4f3bed91802(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2033 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2034 auto entries = data.GetEntries();
2035 LOGD("====on receive,size=%d", entries.size());
2036 for (size_t i = 0; i < entries.size(); i++) {
2037 Key newKey;
2038 int errCode = data.ModifyKey(i, newKey);
2039 if (errCode != OK) {
2040 return errCode;
2041 }
2042 }
2043 return E_OK;
2044 }
2045 );
2046 Key key = {'1'};
2047 Value value = {'1'};
2048 g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2049 /**
2050 * @tc.steps: step2. device A sync to device B and check data
2051 * @tc.expected: step2. interface return ok
2052 */
2053 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2054 std::map<std::string, DBStatus> result;
2055 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2056 ASSERT_TRUE(result.size() == devices.size());
2057 for (const auto &pair : result) {
2058 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2059 EXPECT_EQ(pair.second, INTERCEPT_DATA_FAIL);
2060 }
2061 Value actualValue;
2062 EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2063 }
2064
2065 /**
2066 * @tc.name: InterceptData001
2067 * @tc.desc: test intercept receive data when sync
2068 * @tc.type: FUNC
2069 * @tc.require:
2070 * @tc.author: zhangqiquan
2071 */
2072 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptData001, TestSize.Level0)
2073 {
2074 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2075 /**
2076 * @tc.steps: step1. device A set intercept data errCode and B put some data
2077 * @tc.expected: step1. interface return ok
2078 */
2079 g_kvDelegatePtr->SetReceiveDataInterceptor(
__anonb4f3bed91902(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2080 [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2081 auto entries = data.GetEntries();
2082 LOGD("====on receive,size=%d", entries.size());
2083 for (size_t i = 0; i < entries.size(); i++) {
2084 Key newKey = {'2'};
2085 int errCode = data.ModifyKey(i, newKey);
2086 if (errCode != OK) {
2087 return errCode;
2088 }
2089 Value newValue = {'3'};
2090 errCode = data.ModifyValue(i, newValue);
2091 if (errCode != OK) {
2092 return errCode;
2093 }
2094 }
2095 return E_OK;
2096 }
2097 );
2098 Key key = {'1'};
2099 Value value = {'1'};
2100 g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2101 /**
2102 * @tc.steps: step2. device A sync to device B and check data
2103 * @tc.expected: step2. interface return ok
2104 */
2105 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2106 std::map<std::string, DBStatus> result;
2107 ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2108 ASSERT_TRUE(result.size() == devices.size());
2109 for (const auto &pair : result) {
2110 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2111 EXPECT_EQ(pair.second, OK);
2112 }
2113 Value actualValue;
2114 EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2115 key = {'2'};
2116 EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), OK);
2117 value = {'3'};
2118 EXPECT_EQ(actualValue, value);
2119 }
2120
2121 /**
2122 * @tc.name: UpdateKey001
2123 * @tc.desc: test update key can effect local data and sync data, without delete data
2124 * @tc.type: FUNC
2125 * @tc.require:
2126 * @tc.author: zhangqiquan
2127 */
2128 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, UpdateKey001, TestSize.Level1)
2129 {
2130 /**
2131 * @tc.steps: step1. device A set sync data (k1, v1) local data (k2, v2) (k3, v3) and delete (k4, v4)
2132 * @tc.expected: step1. put data return ok
2133 */
2134 Key k1 = {'k', '1'};
2135 Value v1 = {'v', '1'};
2136 g_deviceB->PutData(k1, v1, 1, 0);
2137 ASSERT_EQ(g_deviceB->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true), E_OK);
2138 Value actualValue;
2139 EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2140 EXPECT_EQ(v1, actualValue);
2141 Key k2 = {'k', '2'};
2142 Value v2 = {'v', '2'};
2143 Key k3 = {'k', '3'};
2144 Value v3 = {'v', '3'};
2145 Key k4 = {'k', '4'};
2146 Value v4 = {'v', '4'};
2147 EXPECT_EQ(g_kvDelegatePtr->Put(k2, v2), OK);
2148 EXPECT_EQ(g_kvDelegatePtr->Put(k3, v3), OK);
2149 EXPECT_EQ(g_kvDelegatePtr->Put(k4, v4), OK);
2150 EXPECT_EQ(g_kvDelegatePtr->Delete(k4), OK);
2151 /**
2152 * @tc.steps: step2. device A update key and set
2153 * @tc.expected: step2. put data return ok
2154 */
__anonb4f3bed91a02(const Key &originKey, Key &newKey) 2155 DBStatus status = g_kvDelegatePtr->UpdateKey([](const Key &originKey, Key &newKey) {
2156 newKey = originKey;
2157 newKey.push_back('0');
2158 });
2159 EXPECT_EQ(status, OK);
2160 k1.push_back('0');
2161 k2.push_back('0');
2162 k3.push_back('0');
2163 EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2164 EXPECT_EQ(v1, actualValue);
2165 EXPECT_EQ(g_kvDelegatePtr->Get(k2, actualValue), OK);
2166 EXPECT_EQ(v2, actualValue);
2167 EXPECT_EQ(g_kvDelegatePtr->Get(k3, actualValue), OK);
2168 EXPECT_EQ(v3, actualValue);
2169 }
2170
2171 /**
2172 * @tc.name: MetaBusy001
2173 * @tc.desc: test sync normal when update water mark busy
2174 * @tc.type: FUNC
2175 * @tc.require:
2176 * @tc.author: zhangqiquan
2177 */
2178 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, MetaBusy001, TestSize.Level1)
2179 {
2180 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2181 Key key = {'1'};
2182 Value value = {'1'};
2183 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2184 std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2185 std::map<std::string, DBStatus> result;
2186 ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2187 ASSERT_EQ(result.size(), devices.size());
2188 for (const auto &pair : result) {
2189 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2190 EXPECT_TRUE(pair.second == OK);
2191 }
2192 value = {'2'};
2193 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
__anonb4f3bed91b02() 2194 g_deviceB->SetSaveDataCallback([] () {
2195 RuntimeContext::GetInstance()->ScheduleTask([]() {
2196 g_deviceB->EraseWaterMark("real_device");
2197 });
2198 std::this_thread::sleep_for(std::chrono::seconds(1));
2199 });
2200 EXPECT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2201 EXPECT_EQ(result.size(), devices.size());
2202 for (const auto &pair : result) {
2203 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2204 EXPECT_TRUE(pair.second == OK);
2205 }
2206 g_deviceB->SetSaveDataCallback(nullptr);
2207 RuntimeContext::GetInstance()->StopTaskPool();
2208 }
2209
2210 /**
2211 * @tc.name: TestErrCodePassthrough001
2212 * @tc.desc: Test ErrCode Passthrough when sync comm fail
2213 * @tc.type: FUNC
2214 * @tc.require:
2215 * @tc.author: suyue
2216 */
2217 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough001, TestSize.Level1)
2218 {
2219 /**
2220 * @tc.steps: step1. device put data.
2221 * @tc.expected: step1. sync return OK.
2222 */
2223 std::vector<std::string> devices;
2224 devices.push_back(g_deviceB->GetDeviceId());
2225 devices.push_back(g_deviceC->GetDeviceId());
2226 Key key1 = {'1'};
2227 Value value1 = {'1'};
2228 ASSERT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
2229
2230 /**
2231 * @tc.steps: step2. call sync and mock commErrCode is E_BASE(positive number).
2232 * @tc.expected: step2. return COMM_FAILURE.
2233 */
2234 g_communicatorAggregator->MockCommErrCode(E_BASE);
2235 std::map<std::string, DBStatus> result;
2236 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2237 ASSERT_EQ(status, OK);
2238 for (const auto &pair : result) {
2239 LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, E_BASE);
2240 EXPECT_EQ(pair.second, COMM_FAILURE);
2241 }
2242
2243 /**
2244 * @tc.steps: step3. call sync and mock commErrCode is -E_BASE(negative number).
2245 * @tc.expected: step3. return -E_BASE.
2246 */
2247 g_communicatorAggregator->MockCommErrCode(-E_BASE);
2248 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2249 ASSERT_EQ(status, OK);
2250 for (const auto &pair : result) {
2251 LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, COMM_FAILURE);
2252 EXPECT_EQ(pair.second, static_cast<DBStatus>(-E_BASE));
2253 }
2254
2255 /**
2256 * @tc.steps: step4. call sync and mock commErrCode is INT_MAX.
2257 * @tc.expected: step4. return COMM_FAILURE.
2258 */
2259 g_communicatorAggregator->MockCommErrCode(INT_MAX);
2260 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2261 ASSERT_EQ(status, OK);
2262 for (const auto &pair : result) {
2263 LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, INT_MAX);
2264 EXPECT_EQ(pair.second, COMM_FAILURE);
2265 }
2266
2267 /**
2268 * @tc.steps: step5. call sync and mock commErrCode is -INT_MAX.
2269 * @tc.expected: step5. return -INT_MAX.
2270 */
2271 g_communicatorAggregator->MockCommErrCode(-INT_MAX);
2272 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2273 ASSERT_EQ(status, OK);
2274 for (const auto &pair : result) {
2275 LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, -INT_MAX);
2276 EXPECT_EQ(pair.second, -INT_MAX);
2277 }
2278 g_communicatorAggregator->MockCommErrCode(E_OK);
2279 }
2280
2281 /**
2282 * @tc.name: TestErrCodePassthrough002
2283 * @tc.desc: Test ErrCode Passthrough when sync time out and isDirectEnd is false
2284 * @tc.type: FUNC
2285 * @tc.require:
2286 * @tc.author: suyue
2287 */
2288 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough002, TestSize.Level3)
2289 {
2290 /**
2291 * @tc.steps: step1. device put data.
2292 * @tc.expected: step1. sync return OK.
2293 */
2294 std::vector<std::string> devices;
2295 devices.push_back(g_deviceB->GetDeviceId());
2296 ASSERT_EQ(g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'}), OK);
2297
2298 /**
2299 * @tc.steps: step2. set messageId invalid and isDirectEnd is false
2300 * @tc.expected: step2. make sure deviceA push data failed due to timeout
2301 */
__anonb4f3bed91d02(const std::string &target, DistributedDB::Message *msg) 2302 g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
2303 ASSERT_NE(msg, nullptr);
2304 if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
2305 msg->SetMessageId(INVALID_MESSAGE_ID);
2306 }
2307 });
2308 g_communicatorAggregator->MockDirectEndFlag(false);
2309
2310 /**
2311 * @tc.steps: step3. call sync and mock errCode is E_BASE(positive number).
2312 * @tc.expected: step3. return TIME_OUT.
2313 */
2314 std::map<std::string, DBStatus> result;
__anonb4f3bed91e02(const std::map<std::string, DBStatus> &map) 2315 auto callback = [&result](const std::map<std::string, DBStatus> &map) {
2316 result = map;
2317 };
2318 Query query = Query::Select().PrefixKey({'k', '1'});
2319 g_communicatorAggregator->MockCommErrCode(E_BASE);
2320 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2321 EXPECT_EQ(result.size(), devices.size());
2322 EXPECT_EQ(result[DEVICE_B], TIME_OUT);
2323
2324 /**
2325 * @tc.steps: step4. call sync and mock errCode is -E_BASE(negative number).
2326 * @tc.expected: step4. return -E_BASE.
2327 */
2328 g_communicatorAggregator->MockCommErrCode(-E_BASE);
2329 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2330 EXPECT_EQ(result.size(), devices.size());
2331 EXPECT_EQ(result[DEVICE_B], -E_BASE);
2332
2333 /**
2334 * @tc.steps: step5. call sync and mock errCode is E_OK(0).
2335 * @tc.expected: step5. return TIME_OUT.
2336 */
2337 g_communicatorAggregator->MockCommErrCode(E_OK);
2338 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2339 EXPECT_EQ(result.size(), devices.size());
2340 EXPECT_EQ(result[DEVICE_B], TIME_OUT);
2341
2342 /**
2343 * @tc.steps: step6. call sync and mock errCode is -INT_MAX.
2344 * @tc.expected: step6. return - INT_MAX.
2345 */
2346 g_communicatorAggregator->MockCommErrCode(-INT_MAX);
2347 EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2348 EXPECT_EQ(result.size(), devices.size());
2349 EXPECT_EQ(result[DEVICE_B], -INT_MAX);
2350
2351 g_communicatorAggregator->RegOnDispatch(nullptr);
2352 g_communicatorAggregator->MockCommErrCode(E_OK);
2353 g_communicatorAggregator->MockDirectEndFlag(true);
2354 }
2355