1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <gtest/gtest.h>
18 #include <thread>
19 
20 #include "db_constant.h"
21 #include "db_common.h"
22 #include "distributeddb_data_generate_unit_test.h"
23 #include "distributeddb_tools_unit_test.h"
24 #include "kv_store_nb_delegate.h"
25 #include "kv_virtual_device.h"
26 #include "mock_sync_task_context.h"
27 #include "platform_specific.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_kv_sync_task_context.h"
30 
31 using namespace testing::ext;
32 using namespace DistributedDB;
33 using namespace DistributedDBUnitTest;
34 using namespace std;
35 
36 namespace {
37     class TestSingleVerKvSyncTaskContext : public SingleVerKvSyncTaskContext {
38     public:
39         TestSingleVerKvSyncTaskContext() = default;
40     };
41     string g_testDir;
42     const string STORE_ID = "kv_stroe_complex_sync_test";
43     const int WAIT_TIME = 1000;
44     const std::string DEVICE_A = "real_device";
45     const std::string DEVICE_B = "deviceB";
46     const std::string DEVICE_C = "deviceC";
47     const std::string CREATE_SYNC_TABLE_SQL =
48     "CREATE TABLE IF NOT EXISTS sync_data(" \
49         "key         BLOB NOT NULL," \
50         "value       BLOB," \
51         "timestamp   INT  NOT NULL," \
52         "flag        INT  NOT NULL," \
53         "device      BLOB," \
54         "ori_device  BLOB," \
55         "hash_key    BLOB PRIMARY KEY NOT NULL," \
56         "w_timestamp INT," \
57         "modify_time INT," \
58         "create_time INT" \
59         ");";
60 
61     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
62     KvStoreConfig g_config;
63     DistributedDBToolsUnitTest g_tool;
64     DBStatus g_kvDelegateStatus = INVALID_ARGS;
65     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
66     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
67     KvVirtualDevice *g_deviceB = nullptr;
68     KvVirtualDevice *g_deviceC = nullptr;
69 
70     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
71     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
72         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
73 
PullSyncTest()74     void PullSyncTest()
75     {
76         DBStatus status = OK;
77         std::vector<std::string> devices;
78         devices.push_back(g_deviceB->GetDeviceId());
79 
80         Key key = {'1'};
81         Key key2 = {'2'};
82         Value value = {'1'};
83         g_deviceB->PutData(key, value, 0, 0);
84         g_deviceB->PutData(key2, value, 1, 0);
85 
86         std::map<std::string, DBStatus> result;
87         status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
88         ASSERT_TRUE(status == OK);
89 
90         ASSERT_TRUE(result.size() == devices.size());
91         for (const auto &pair : result) {
92             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
93             EXPECT_TRUE(pair.second == OK);
94         }
95         Value value3;
96         EXPECT_EQ(g_kvDelegatePtr->Get(key, value3), OK);
97         EXPECT_EQ(value3, value);
98         EXPECT_EQ(g_kvDelegatePtr->Get(key2, value3), OK);
99         EXPECT_EQ(value3, value);
100     }
101 
CrudTest()102     void CrudTest()
103     {
104         vector<Entry> entries;
105         int totalSize = 10;
106         for (int i = 0; i < totalSize; i++) {
107             Entry entry;
108             entry.key.push_back(i);
109             entry.value.push_back('2');
110             entries.push_back(entry);
111         }
112         EXPECT_TRUE(g_kvDelegatePtr->PutBatch(entries) == OK);
113         for (const auto &entry : entries) {
114             Value resultvalue;
115             EXPECT_TRUE(g_kvDelegatePtr->Get(entry.key, resultvalue) == OK);
116             EXPECT_TRUE(resultvalue == entry.value);
117         }
118         for (int i = 0; i < totalSize / 2; i++) { // 2: Half of the total
119             g_kvDelegatePtr->Delete(entries[i].key);
120             Value resultvalue;
121             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == NOT_FOUND);
122         }
123         for (int i = totalSize / 2; i < totalSize; i++) {
124             Value value = entries[i].value;
125             value.push_back('x');
126             EXPECT_TRUE(g_kvDelegatePtr->Put(entries[i].key, value) == OK);
127             Value resultvalue;
128             EXPECT_TRUE(g_kvDelegatePtr->Get(entries[i].key, resultvalue) == OK);
129             EXPECT_TRUE(resultvalue == value);
130         }
131     }
132 
DataSync005()133     void DataSync005()
134     {
135         ASSERT_NE(g_communicatorAggregator, nullptr);
136         SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
137         ASSERT_TRUE(dataSync != nullptr);
138         dataSync->SendSaveDataNotifyPacket(nullptr, 0, 0, 0, TIME_SYNC_MESSAGE);
139         EXPECT_EQ(g_communicatorAggregator->GetOnlineDevices().size(), 3u); // 3 online dev
140         delete dataSync;
141     }
142 
DataSync008()143     void DataSync008()
144     {
145         SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
146         ASSERT_TRUE(dataSync != nullptr);
147         auto context = new (std::nothrow) MockSyncTaskContext();
148         dataSync->PutDataMsg(nullptr);
149         bool isNeedHandle = false;
150         bool isContinue = false;
151         EXPECT_EQ(dataSync->MoveNextDataMsg(context, isNeedHandle, isContinue), nullptr);
152         EXPECT_EQ(isNeedHandle, false);
153         EXPECT_EQ(isContinue, false);
154         delete dataSync;
155         delete context;
156     }
157 
ReSetWaterDogTest001()158     void ReSetWaterDogTest001()
159     {
160         /**
161          * @tc.steps: step1. put 10 key/value
162          * @tc.expected: step1, put return OK.
163          */
164         for (int i = 0; i < 5; i++) { // put 5 key
165             Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 1024); // rand num 1024 for test
166             Value value;
167             DistributedDBToolsUnitTest::GetRandomKeyValue(value, 10 * 50 * 1024u); // 10 * 50 * 1024 = 500k
168             EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
169         }
170         /**
171          * @tc.steps: step2. SetDeviceMtuSize
172          * @tc.expected: step2, return OK.
173          */
174         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 50 * 1024u); // 50 * 1024u = 50k
175         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 50 * 1024u); // 50 * 1024u = 50k
176         /**
177          * @tc.steps: step3. deviceA,deviceB sync to each other at same time
178          * @tc.expected: step3. sync should return OK.
179          */
180         EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true), E_OK);
181         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_A, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
182         g_communicatorAggregator->SetDeviceMtuSize(DEVICE_B, 5 * 1024u * 1024u); // 5 * 1024u * 1024u = 5m
183     }
184 }
185 
186 class DistributedDBSingleVerP2PComplexSyncTest : public testing::Test {
187 public:
188     static void SetUpTestCase(void);
189     static void TearDownTestCase(void);
190     void SetUp();
191     void TearDown();
192 };
193 
SetUpTestCase(void)194 void DistributedDBSingleVerP2PComplexSyncTest::SetUpTestCase(void)
195 {
196     /**
197      * @tc.setup: Init datadir and Virtual Communicator.
198      */
199     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
200     g_config.dataDir = g_testDir;
201     g_mgr.SetKvStoreConfig(g_config);
202 
203     string dir = g_testDir + "/single_ver";
204     DIR* dirTmp = opendir(dir.c_str());
205     if (dirTmp == nullptr) {
206         OS::MakeDBDirectory(dir);
207     } else {
208         closedir(dirTmp);
209     }
210 
211     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
212     ASSERT_TRUE(g_communicatorAggregator != nullptr);
213     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
214 }
215 
TearDownTestCase(void)216 void DistributedDBSingleVerP2PComplexSyncTest::TearDownTestCase(void)
217 {
218     /**
219      * @tc.teardown: Release virtual Communicator and clear data dir.
220      */
221     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
222         LOGE("rm test db files error!");
223     }
224     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
225 }
226 
SetUp(void)227 void DistributedDBSingleVerP2PComplexSyncTest::SetUp(void)
228 {
229     DistributedDBToolsUnitTest::PrintTestCaseInfo();
230     /**
231      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
232      */
233     KvStoreNbDelegate::Option option;
234     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
235     ASSERT_TRUE(g_kvDelegateStatus == OK);
236     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
237     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
238     ASSERT_TRUE(g_deviceB != nullptr);
239     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
240     ASSERT_TRUE(syncInterfaceB != nullptr);
241     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
242 
243     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
244     ASSERT_TRUE(g_deviceC != nullptr);
245     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
246     ASSERT_TRUE(syncInterfaceC != nullptr);
247     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
248 
249     auto permissionCheckCallback = [] (const std::string &userId, const std::string &appId, const std::string &storeId,
250         const std::string &deviceId, uint8_t flag) -> bool {
251             return true;
252         };
253     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(permissionCheckCallback), OK);
254 }
255 
TearDown(void)256 void DistributedDBSingleVerP2PComplexSyncTest::TearDown(void)
257 {
258     /**
259      * @tc.teardown: Release device A, B, C
260      */
261     if (g_kvDelegatePtr != nullptr) {
262         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
263         g_kvDelegatePtr = nullptr;
264         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
265         LOGD("delete kv store status %d", status);
266         ASSERT_TRUE(status == OK);
267     }
268     if (g_deviceB != nullptr) {
269         delete g_deviceB;
270         g_deviceB = nullptr;
271     }
272     if (g_deviceC != nullptr) {
273         delete g_deviceC;
274         g_deviceC = nullptr;
275     }
276     PermissionCheckCallbackV2 nullCallback;
277     EXPECT_EQ(g_mgr.SetPermissionCheckCallback(nullCallback), OK);
278 }
279 
280 /**
281   * @tc.name: SaveDataNotify001
282   * @tc.desc: Test SaveDataNotify function, delay < 30s should sync ok, > 36 should timeout
283   * @tc.type: FUNC
284   * @tc.require: AR000D4876
285   * @tc.author: xushaohua
286   */
287 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SaveDataNotify001, TestSize.Level3)
288 {
289     DBStatus status = OK;
290     const int waitFiveSeconds = 5000;
291     const int waitThirtySeconds = 30000;
292     const int waitThirtySixSeconds = 36000;
293     std::vector<std::string> devices;
294     devices.push_back(g_deviceB->GetDeviceId());
295 
296     /**
297      * @tc.steps: step1. deviceA put {k1, v1}
298      */
299     Key key = {'1'};
300     Value value = {'1'};
301     status = g_kvDelegatePtr->Put(key, value);
302     ASSERT_TRUE(status == OK);
303 
304     /**
305      * @tc.steps: step2. deviceB set sava data dely 5s
306      */
307     g_deviceB->SetSaveDataDelayTime(waitFiveSeconds);
308 
309     /**
310      * @tc.steps: step3. deviceA call sync and wait
311      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
312      */
313     std::map<std::string, DBStatus> result;
314     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
315     ASSERT_TRUE(status == OK);
316     ASSERT_TRUE(result.size() == devices.size());
317     ASSERT_TRUE(result[DEVICE_B] == OK);
318 
319     /**
320      * @tc.steps: step4. deviceB set sava data dely 30s and put {k1, v1}
321      */
322     g_deviceB->SetSaveDataDelayTime(waitThirtySeconds);
323     status = g_kvDelegatePtr->Put(key, value);
324     ASSERT_TRUE(status == OK);
325      /**
326      * @tc.steps: step3. deviceA call sync and wait
327      * @tc.expected: step3. sync should return OK. onComplete should be called, deviceB sync success.
328      */
329     result.clear();
330     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
331     ASSERT_TRUE(status == OK);
332     ASSERT_TRUE(result.size() == devices.size());
333     ASSERT_TRUE(result[DEVICE_B] == OK);
334 
335     /**
336      * @tc.steps: step4. deviceB set sava data dely 36s and put {k1, v1}
337      */
338     g_deviceB->SetSaveDataDelayTime(waitThirtySixSeconds);
339     status = g_kvDelegatePtr->Put(key, value);
340     ASSERT_TRUE(status == OK);
341     /**
342      * @tc.steps: step5. deviceA call sync and wait
343      * @tc.expected: step5. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
344      */
345     result.clear();
346     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
347     ASSERT_TRUE(status == OK);
348     ASSERT_TRUE(result.size() == devices.size());
349     ASSERT_TRUE(result[DEVICE_B] == TIME_OUT);
350 }
351 
352 /**
353   * @tc.name: SametimeSync001
354   * @tc.desc: Test 2 device sync with each other
355   * @tc.type: FUNC
356   * @tc.require: AR000CCPOM
357   * @tc.author: zhangqiquan
358   */
359 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync001, TestSize.Level3)
360 {
361     DBStatus status = OK;
362     std::vector<std::string> devices;
363     devices.push_back(g_deviceB->GetDeviceId());
364 
365     int responseCount = 0;
366     int requestCount = 0;
367     Key key = {'1'};
368     Value value = {'1'};
369     /**
370      * @tc.steps: step1. make sure deviceB send pull firstly and response_pull secondly
371      * @tc.expected: step1. deviceA put data when finish push task. put data should return OK.
372      */
373     g_communicatorAggregator->RegOnDispatch([&responseCount, &requestCount, &key, &value](
__anonb4f3bed90302( const std::string &target, DistributedDB::Message *msg) 374         const std::string &target, DistributedDB::Message *msg) {
375         if (target != "real_device" || msg->GetMessageId() != DATA_SYNC_MESSAGE) {
376             return;
377         }
378 
379         if (msg->GetMessageType() == TYPE_RESPONSE) {
380             responseCount++;
381             if (responseCount == 1) { // 1 is the ack which B response A's push task
382                 EXPECT_EQ(g_kvDelegatePtr->Put(key, value), DBStatus::OK);
383                 std::this_thread::sleep_for(std::chrono::seconds(1));
384             } else if (responseCount == 2) { // 2 is the ack which B response A's response_pull task
385                 msg->SetErrorNo(E_FEEDBACK_COMMUNICATOR_NOT_FOUND);
386             }
387         } else if (msg->GetMessageType() == TYPE_REQUEST) {
388             requestCount++;
389             if (requestCount == 1) { // 1 is A push task
390                 std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2 sec
391             }
392         }
393     });
394     /**
395      * @tc.steps: step2. deviceA,deviceB sync to each other at same time
396      * @tc.expected: step2. sync should return OK.
397      */
398     std::map<std::string, DBStatus> result;
__anonb4f3bed90402null399     std::thread subThread([]{
400         g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, true);
401     });
402     status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_PULL, result);
403     subThread.join();
404     g_communicatorAggregator->RegOnDispatch(nullptr);
405 
406     EXPECT_TRUE(status == OK);
407     ASSERT_TRUE(result.size() == devices.size());
408     EXPECT_TRUE(result[DEVICE_B] == OK);
409     Value actualValue;
410     g_kvDelegatePtr->Get(key, actualValue);
411     EXPECT_EQ(actualValue, value);
412 }
413 
414 /**
415   * @tc.name: SametimeSync002
416   * @tc.desc: Test 2 device sync with each other with water error
417   * @tc.type: FUNC
418   * @tc.require: AR000CCPOM
419   * @tc.author: zhangqiquan
420   */
421 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SametimeSync002, TestSize.Level3)
422 {
423     DBStatus status = OK;
424     std::vector<std::string> devices;
425     devices.push_back(g_deviceB->GetDeviceId());
426     g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'});
427     /**
428      * @tc.steps: step1. make sure deviceA push data failed and increase water mark
429      * @tc.expected: step1. deviceA push failed with timeout
430      */
__anonb4f3bed90502(const std::string &target, DistributedDB::Message *msg) 431     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
432         ASSERT_NE(msg, nullptr);
433         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
434             msg->SetMessageId(INVALID_MESSAGE_ID);
435         }
436     });
437     std::map<std::string, DBStatus> result;
__anonb4f3bed90602(const std::map<std::string, DBStatus> &map) 438     auto callback = [&result](const std::map<std::string, DBStatus> &map) {
439         result = map;
440     };
441     Query query = Query::Select().PrefixKey({'k', '1'});
442     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
443     ASSERT_TRUE(result.size() == devices.size());
444     EXPECT_TRUE(result[DEVICE_B] == TIME_OUT);
445     /**
446      * @tc.steps: step2. A push to B with query2, sleep 1s for waiting step3
447      * @tc.expected: step2. sync should return OK.
448      */
__anonb4f3bed90702(const std::string &target, DistributedDB::Message *msg) 449     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
450         ASSERT_NE(msg, nullptr);
451         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
452             std::this_thread::sleep_for(std::chrono::seconds(1));
453         }
454     });
__anonb4f3bed90802null455     std::thread subThread([&devices] {
456         std::map<std::string, DBStatus> result;
457         auto callback = [&result](const std::map<std::string, DBStatus> &map) {
458             result = map;
459         };
460         Query query = Query::Select().PrefixKey({'k', '2'});
461         LOGD("Begin PUSH");
462         EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
463         ASSERT_TRUE(result.size() == devices.size());
464         EXPECT_TRUE(result[DEVICE_A] == OK);
465     });
466     /**
467      * @tc.steps: step3. B pull to A when A is in push task
468      * @tc.expected: step3. sync should return OP_FINISHED_ALL.
469      */
470     std::this_thread::sleep_for(std::chrono::milliseconds(100));
471     std::map<std::string, int> virtualResult;
472     g_deviceB->Sync(DistributedDB::SYNC_MODE_PULL_ONLY, query,
__anonb4f3bed90a02(const std::map<std::string, int> &map) 473         [&virtualResult](const std::map<std::string, int> &map) {
474             virtualResult = map;
475         }, true);
476     EXPECT_TRUE(status == OK);
477     ASSERT_EQ(virtualResult.size(), devices.size());
478     EXPECT_EQ(virtualResult[DEVICE_A], SyncOperation::OP_FINISHED_ALL);
479     g_communicatorAggregator->RegOnDispatch(nullptr);
480     subThread.join();
481 }
482 
483 /**
484  * @tc.name: DatabaseOnlineCallback001
485  * @tc.desc: check database status notify online callback
486  * @tc.type: FUNC
487  * @tc.require: AR000CQS3S SR000CQE0B
488  * @tc.author: zhuwentao
489  */
490 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOnlineCallback001, TestSize.Level1)
491 {
492     /**
493      * @tc.steps: step1. SetStoreStatusNotifier
494      * @tc.expected: step1. SetStoreStatusNotifier ok
495      */
496     std::string targetDev = "DEVICE_X";
497     bool isCheckOk = false;
498     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anonb4f3bed90b02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 499         const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
500         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
501             onlineStatus == true) {
502             isCheckOk = true;
503         }};
504     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
505     /**
506      * @tc.steps: step2. trigger device online
507      * @tc.expected: step2. check callback ok
508      */
509     g_communicatorAggregator->OnlineDevice(targetDev);
510     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
511     EXPECT_EQ(isCheckOk, true);
512     StoreStatusNotifier nullCallback;
513     g_mgr.SetStoreStatusNotifier(nullCallback);
514 }
515 
516 /**
517  * @tc.name: DatabaseOfflineCallback001
518  * @tc.desc: check database status notify online callback
519  * @tc.type: FUNC
520  * @tc.require: AR000CQS3S SR000CQE0B
521  * @tc.author: zhuwentao
522  */
523 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DatabaseOfflineCallback001, TestSize.Level1)
524 {
525     /**
526      * @tc.steps: step1. SetStoreStatusNotifier
527      * @tc.expected: step1. SetStoreStatusNotifier ok
528      */
529     std::string targetDev = "DEVICE_X";
530     bool isCheckOk = false;
531     auto databaseStatusNotifyCallback = [targetDev, &isCheckOk] (const std::string &userId,
__anonb4f3bed90c02(const std::string &userId, const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) 532         const std::string &appId, const std::string &storeId, const std::string &deviceId, bool onlineStatus) -> void {
533         if (userId == USER_ID && appId == APP_ID && storeId == STORE_ID && deviceId == targetDev &&
534             onlineStatus == false) {
535             isCheckOk = true;
536         }};
537     g_mgr.SetStoreStatusNotifier(databaseStatusNotifyCallback);
538     /**
539      * @tc.steps: step2. trigger device offline
540      * @tc.expected: step2. check callback ok
541      */
542     g_communicatorAggregator->OfflineDevice(targetDev);
543     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME / 20));
544     EXPECT_EQ(isCheckOk, true);
545     StoreStatusNotifier nullCallback;
546     g_mgr.SetStoreStatusNotifier(nullCallback);
547 }
548 
549 /**
550   * @tc.name: CloseSync001
551   * @tc.desc: Test 2 delegate close when sync
552   * @tc.type: FUNC
553   * @tc.require: AR000CCPOM
554   * @tc.author: zhangqiquan
555   */
556 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync001, TestSize.Level3)
557 {
558     DBStatus status = OK;
559     std::vector<std::string> devices;
560     devices.push_back(g_deviceB->GetDeviceId());
561 
562     /**
563      * @tc.steps: step1. make sure A sync start
564      */
565     bool sleep = false;
__anonb4f3bed90d02(const std::string &target, DistributedDB::Message *msg) 566     g_communicatorAggregator->RegOnDispatch([&sleep](const std::string &target, DistributedDB::Message *msg) {
567         if (!sleep) {
568             sleep = true;
569             std::this_thread::sleep_for(std::chrono::seconds(2)); // sleep 2s for waiting close db
570         }
571     });
572 
573     KvStoreNbDelegate* kvDelegatePtrA = nullptr;
574     KvStoreNbDelegate::Option option;
__anonb4f3bed90e02(DBStatus s, KvStoreNbDelegate *delegate) 575     g_mgr.GetKvStore(STORE_ID, option, [&status, &kvDelegatePtrA](DBStatus s, KvStoreNbDelegate *delegate) {
576         status = s;
577         kvDelegatePtrA = delegate;
578     });
579     EXPECT_EQ(status, OK);
580     EXPECT_NE(kvDelegatePtrA, nullptr);
581 
582     Key key = {'k'};
583     Value value = {'v'};
584     kvDelegatePtrA->Put(key, value);
585     std::map<std::string, DBStatus> result;
__anonb4f3bed90f02(const std::map<std::string, DBStatus>& statusMap) 586     auto callback = [&result](const std::map<std::string, DBStatus>& statusMap) {
587         result = statusMap;
588     };
589     /**
590      * @tc.steps: step2. deviceA sync and then close
591      * @tc.expected: step2. sync should abort and data don't exist in B
592      */
__anonb4f3bed91002() 593     std::thread closeThread([&kvDelegatePtrA]() {
594         std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for waiting sync start
595         EXPECT_EQ(g_mgr.CloseKvStore(kvDelegatePtrA), OK);
596     });
597     EXPECT_EQ(kvDelegatePtrA->Sync(devices, SYNC_MODE_PUSH_ONLY, callback, false), OK);
598     LOGD("Sync finish");
599     closeThread.join();
600     std::this_thread::sleep_for(std::chrono::seconds(5)); // sleep 5s for waiting sync finish
601     EXPECT_EQ(result.size(), 0u);
602     VirtualDataItem actualValue;
603     EXPECT_EQ(g_deviceB->GetData(key, actualValue), -E_NOT_FOUND);
604     g_communicatorAggregator->RegOnDispatch(nullptr);
605 }
606 
607 /**
608   * @tc.name: CloseSync002
609   * @tc.desc: Test 1 delegate close when in time sync
610   * @tc.type: FUNC
611   * @tc.require: AR000CCPOM
612   * @tc.author: zhangqiquan
613   */
614 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, CloseSync002, TestSize.Level3)
615 {
616     /**
617      * @tc.steps: step1. invalid time sync packet from A
618      */
__anonb4f3bed91102(const std::string &target, DistributedDB::Message *msg) 619     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
620         ASSERT_NE(msg, nullptr);
621         if (target == DEVICE_B && msg->GetMessageId() == TIME_SYNC_MESSAGE && msg->GetMessageType() == TYPE_REQUEST) {
622             msg->SetMessageId(INVALID_MESSAGE_ID);
623             LOGD("Message is invalid");
624         }
625     });
626     Timestamp currentTime;
627     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
628     g_deviceB->PutData({'k'}, {'v'}, currentTime, 0);
629 
630     /**
631      * @tc.steps: step2. B PUSH to A and A close after 1s
632      * @tc.expected: step2. A closing time cost letter than 4s
633      */
__anonb4f3bed91202() 634     std::thread closingThread([]() {
635         std::this_thread::sleep_for(std::chrono::seconds(1));
636         LOGD("Begin Close");
637         Timestamp beginTime;
638         (void)OS::GetCurrentSysTimeInMicrosecond(beginTime);
639         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
640         Timestamp endTime;
641         (void)OS::GetCurrentSysTimeInMicrosecond(endTime);
642         EXPECT_LE(static_cast<int>(endTime - beginTime), 4 * 1000 * 1000); // waiting 4 * 1000 * 1000 us
643         LOGD("End Close");
644     });
645     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
646     closingThread.join();
647 
648     /**
649      * @tc.steps: step3. remove db
650      * @tc.expected: step3. remove ok
651      */
652     g_kvDelegatePtr = nullptr;
653     DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
654     LOGD("delete kv store status %d", status);
655     ASSERT_TRUE(status == OK);
656     g_communicatorAggregator->RegOnDispatch(nullptr);
657 }
658 
659 /**
660   * @tc.name: OrderbyWriteTimeSync001
661   * @tc.desc: sync query with order by writeTime
662   * @tc.type: FUNC
663   * @tc.require: AR000H5VLO
664   * @tc.author: zhuwentao
665   */
666 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, OrderbyWriteTimeSync001, TestSize.Level0)
667 {
668     /**
669      * @tc.steps: step1. deviceA subscribe query with order by write time
670      * * @tc.expected: step1. interface return not support
671     */
672     std::vector<std::string> devices;
673     devices.push_back(g_deviceB->GetDeviceId());
674     Query query = Query::Select().PrefixKey({'k'}).OrderByWriteTime(true);
675     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, nullptr, query, true), NOT_SUPPORT);
676 }
677 
678 
679 /**
680  * @tc.name: Device Offline Sync 001
681  * @tc.desc: Test push sync when device offline
682  * @tc.type: FUNC
683  * @tc.require: AR000CCPOM
684  * @tc.author: xushaohua
685  */
686 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync001, TestSize.Level1)
687 {
688     std::vector<std::string> devices;
689     devices.push_back(g_deviceB->GetDeviceId());
690     devices.push_back(g_deviceC->GetDeviceId());
691 
692     /**
693      * @tc.steps: step1. deviceA put {k1, v1}, {k2, v2}, {k3 delete}, {k4,v2}
694      */
695     Key key1 = {'1'};
696     Value value1 = {'1'};
697     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value1) == OK);
698 
699     Key key2 = {'2'};
700     Value value2 = {'2'};
701     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value2) == OK);
702 
703     Key key3 = {'3'};
704     Value value3 = {'3'};
705     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value3) == OK);
706     ASSERT_TRUE(g_kvDelegatePtr->Delete(key3) == OK);
707 
708     Key key4 = {'4'};
709     Value value4 = {'4'};
710     ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value4) == OK);
711 
712     /**
713      * @tc.steps: step2. deviceB offline
714      */
715     g_deviceB->Offline();
716 
717     /**
718      * @tc.steps: step3. deviceA call pull sync
719      * @tc.expected: step3. sync should return OK.
720      */
721     std::map<std::string, DBStatus> result;
722     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
723     ASSERT_TRUE(status == OK);
724 
725     /**
726      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
727      *     deviceC has {k1, v1}, {k2, v2}, {k3 delete}, {k4,v4}
728      */
729     for (const auto &pair : result) {
730         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
731         if (pair.first == DEVICE_B) {
732             // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
733             // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
734             // The returned status is COMM_FAILURE
735             EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
736                 (pair.second == COMM_FAILURE));
737         } else {
738             EXPECT_EQ(pair.second, OK);
739         }
740     }
741     VirtualDataItem item;
742     g_deviceC->GetData(key1, item);
743     EXPECT_TRUE(item.value == value1);
744     item.value.clear();
745     g_deviceC->GetData(key2, item);
746     EXPECT_TRUE(item.value == value2);
747     item.value.clear();
748     Key hashKey;
749     DistributedDBToolsUnitTest::CalcHash(key3, hashKey);
750     EXPECT_TRUE(g_deviceC->GetData(hashKey, item) == -E_NOT_FOUND);
751     item.value.clear();
752     g_deviceC->GetData(key4, item);
753     EXPECT_TRUE(item.value == value4);
754 }
755 
756 /**
757  * @tc.name: Device Offline Sync 002
758  * @tc.desc: Test pull sync when device offline
759  * @tc.type: FUNC
760  * @tc.require: AR000CCPOM
761  * @tc.author: xushaohua
762  */
763 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSync002, TestSize.Level1)
764 {
765     std::vector<std::string> devices;
766     devices.push_back(g_deviceB->GetDeviceId());
767     devices.push_back(g_deviceC->GetDeviceId());
768 
769     /**
770      * @tc.steps: step1. deviceB put {k1, v1}
771      */
772     Key key1 = {'1'};
773     Value value1 = {'1'};
774     g_deviceB->PutData(key1, value1, 0, 0);
775 
776     /**
777      * @tc.steps: step2. deviceB offline
778      */
779     g_deviceB->Offline();
780 
781     /**
782      * @tc.steps: step3. deviceC put {k2, v2}, {k3, delete}, {k4, v4}
783      */
784     Key key2 = {'2'};
785     Value value2 = {'2'};
786     g_deviceC->PutData(key2, value2, 0, 0);
787 
788     Key key3 = {'3'};
789     Value value3 = {'3'};
790     g_deviceC->PutData(key3, value3, 0, 1);
791 
792     Key key4 = {'4'};
793     Value value4 = {'4'};
794     g_deviceC->PutData(key4, value4, 0, 0);
795 
796     /**
797      * @tc.steps: step2. deviceA call pull sync
798      * @tc.expected: step2. sync should return OK.
799      */
800     std::map<std::string, DBStatus> result;
801     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
802     ASSERT_TRUE(status == OK);
803 
804     /**
805      * @tc.expected: step3. onComplete should be called, DeviceB status is timeout
806      *     deviceA has {k2, v2}, {k3 delete}, {k4,v4}
807      */
808     for (const auto &pair : result) {
809         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
810         if (pair.first == DEVICE_B) {
811             // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
812             // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
813             // The returned status is COMM_FAILURE
814             EXPECT_TRUE((pair.second == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
815                 (pair.second == COMM_FAILURE));
816         } else {
817             EXPECT_EQ(pair.second, OK);
818         }
819     }
820 
821     Value value5;
822     EXPECT_TRUE(g_kvDelegatePtr->Get(key1, value5) != OK);
823     g_kvDelegatePtr->Get(key2, value5);
824     EXPECT_EQ(value5, value2);
825     EXPECT_TRUE(g_kvDelegatePtr->Get(key3, value5) != OK);
826     g_kvDelegatePtr->Get(key4, value5);
827     EXPECT_EQ(value5, value4);
828 }
829 
830 /**
831   * @tc.name: EncryptedAlgoUpgrade001
832   * @tc.desc: Test upgrade encrypted db can sync normally
833   * @tc.type: FUNC
834   * @tc.require: AR000HI2JS
835   * @tc.author: zhuwentao
836   */
837 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, EncryptedAlgoUpgrade001, TestSize.Level3)
838 {
839     /**
840      * @tc.steps: step1. clear db
841      * * @tc.expected: step1. interface return ok
842     */
843     if (g_kvDelegatePtr != nullptr) {
844         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
845         g_kvDelegatePtr = nullptr;
846         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
847         LOGD("delete kv store status %d", status);
848         ASSERT_TRUE(status == OK);
849     }
850 
851     CipherPassword passwd;
852     std::vector<uint8_t> passwdVect = {'p', 's', 'd', '1'};
853     passwd.SetValue(passwdVect.data(), passwdVect.size());
854     /**
855      * @tc.steps: step2. open old db by sql
856      * * @tc.expected: step2. interface return ok
857     */
858     std::string identifier = DBCommon::GenerateIdentifierId(STORE_ID, APP_ID, USER_ID);
859     std::string hashDir = DBCommon::TransferHashString(identifier);
860     std::string hexHashDir = DBCommon::TransferStringToHex(hashDir);
861     std::string dbPath = g_testDir + "/" + hexHashDir + "/single_ver";
862     ASSERT_TRUE(DBCommon::CreateDirectory(g_testDir + "/" + hexHashDir) == OK);
863     ASSERT_TRUE(DBCommon::CreateDirectory(dbPath) == OK);
864     std::vector<std::string> dbDir {DBConstant::MAINDB_DIR, DBConstant::METADB_DIR, DBConstant::CACHEDB_DIR};
865     for (const auto &item : dbDir) {
866         ASSERT_TRUE(DBCommon::CreateDirectory(dbPath + "/" + item) == OK);
867     }
868     uint64_t flag = SQLITE_OPEN_URI | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
869     sqlite3 *db;
870     std::string fileUrl = dbPath + "/" + DBConstant::MAINDB_DIR + "/" + DBConstant::SINGLE_VER_DATA_STORE + ".db";
871     ASSERT_TRUE(sqlite3_open_v2(fileUrl.c_str(), &db, flag, nullptr) == SQLITE_OK);
872     SQLiteUtils::SetKeyInner(db, CipherType::AES_256_GCM, passwd, DBConstant::DEFAULT_ITER_TIMES);
873     /**
874      * @tc.steps: step3. create table and close
875      * * @tc.expected: step3. interface return ok
876     */
877     ASSERT_TRUE(SQLiteUtils::ExecuteRawSQL(db, CREATE_SYNC_TABLE_SQL) == E_OK);
878     sqlite3_close_v2(db);
879     db = nullptr;
880     LOGI("create old db success");
881     /**
882      * @tc.steps: step4. get kvstore
883      * * @tc.expected: step4. interface return ok
884     */
885     KvStoreNbDelegate::Option option;
886     option.isEncryptedDb = true;
887     option.cipher = CipherType::AES_256_GCM;
888     option.passwd = passwd;
889     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
890     ASSERT_TRUE(g_kvDelegateStatus == OK);
891     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
892     /**
893      * @tc.steps: step5. sync ok
894      * * @tc.expected: step5. interface return ok
895     */
896     PullSyncTest();
897     /**
898      * @tc.steps: step5. crud ok
899      * * @tc.expected: step5. interface return ok
900     */
901     CrudTest();
902 }
903 
904 /**
905   * @tc.name: RemoveDeviceData002
906   * @tc.desc: test remove device data before sync
907   * @tc.type: FUNC
908   * @tc.require:
909   * @tc.author: zhuwentao
910   */
911 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData002, TestSize.Level1)
912 {
913     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
914     /**
915      * @tc.steps: step1. sync deviceB data to A and check data
916      * * @tc.expected: step1. interface return ok
917     */
918     Key key1 = {'1'};
919     Key key2 = {'2'};
920     Value value = {'1'};
921     Timestamp currentTime;
922     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
923     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
924     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
925     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
926     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
927     Value actualValue;
928     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
929     EXPECT_EQ(actualValue, value);
930     actualValue.clear();
931     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
932     EXPECT_EQ(actualValue, value);
933     /**
934      * @tc.steps: step2. call RemoveDeviceData
935      * * @tc.expected: step2. interface return ok
936     */
937     g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
938     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
939     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
940     /**
941      * @tc.steps: step3. sync to device A again and check data
942      * * @tc.expected: step3. sync ok
943     */
944     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
945     actualValue.clear();
946     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
947     EXPECT_EQ(actualValue, value);
948     actualValue.clear();
949     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
950     EXPECT_EQ(actualValue, value);
951 }
952 
953 /**
954   * @tc.name: DataSync001
955   * @tc.desc: Test Data Sync when Initialize
956   * @tc.type: FUNC
957   * @tc.require: AR000HI2JS
958   * @tc.author: zhuwentao
959   */
960 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync001, TestSize.Level1)
961 {
962     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
963     ASSERT_TRUE(dataSync != nullptr);
964     std::shared_ptr<Metadata> inMetadata = nullptr;
965     std::string deviceId;
966     Message message;
967     VirtualSingleVerSyncDBInterface tmpInterface;
968     VirtualCommunicator tmpCommunicator(deviceId, g_communicatorAggregator);
969     EXPECT_EQ(dataSync->Initialize(nullptr, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
970     EXPECT_EQ(dataSync->Initialize(&tmpInterface, nullptr, inMetadata, deviceId), -E_INVALID_ARGS);
971     EXPECT_EQ(dataSync->Initialize(&tmpInterface, &tmpCommunicator, inMetadata, deviceId), -E_INVALID_ARGS);
972     delete dataSync;
973 }
974 
975 /**
976   * @tc.name: DataSync002
977   * @tc.desc: Test active sync with invalid param in DataSync Class
978   * @tc.type: FUNC
979   * @tc.require: AR000HI2JS
980   * @tc.author: zhuwentao
981   */
982 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync002, TestSize.Level1)
983 {
984     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
985     ASSERT_TRUE(dataSync != nullptr);
986     Message message;
987     EXPECT_EQ(dataSync->TryContinueSync(nullptr, &message), -E_INVALID_ARGS);
988     EXPECT_EQ(dataSync->TryContinueSync(nullptr, nullptr), -E_INVALID_ARGS);
989     EXPECT_EQ(dataSync->PushStart(nullptr), -E_INVALID_ARGS);
990     EXPECT_EQ(dataSync->PushPullStart(nullptr), -E_INVALID_ARGS);
991     EXPECT_EQ(dataSync->PullRequestStart(nullptr), -E_INVALID_ARGS);
992     EXPECT_EQ(dataSync->PullResponseStart(nullptr), -E_INVALID_ARGS);
993     delete dataSync;
994 }
995 
996 /**
997   * @tc.name: DataSync003
998   * @tc.desc: Test receive invalid request data packet in DataSync Class
999   * @tc.type: FUNC
1000   * @tc.require: AR000HI2JS
1001   * @tc.author: zhuwentao
1002   */
1003 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync003, TestSize.Level1)
1004 {
1005     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1006     ASSERT_TRUE(dataSync != nullptr);
1007     uint64_t tmpMark = 0;
1008     Message message;
1009     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, nullptr, tmpMark), -E_INVALID_ARGS);
1010     EXPECT_EQ(dataSync->DataRequestRecv(nullptr, &message, tmpMark), -E_INVALID_ARGS);
1011     delete dataSync;
1012 }
1013 
1014 /**
1015   * @tc.name: DataSync004
1016   * @tc.desc: Test receive invalid ack packet in DataSync Class
1017   * @tc.type: FUNC
1018   * @tc.require: AR000HI2JS
1019   * @tc.author: zhuwentao
1020   */
1021 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync004, TestSize.Level1)
1022 {
1023     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1024     ASSERT_TRUE(dataSync != nullptr);
1025     Message message;
1026     TestSingleVerKvSyncTaskContext tmpContext;
1027     EXPECT_EQ(dataSync->AckPacketIdCheck(nullptr), false);
1028     EXPECT_EQ(dataSync->AckPacketIdCheck(&message), false);
1029     EXPECT_EQ(dataSync->AckRecv(&tmpContext, nullptr), -E_INVALID_ARGS);
1030     EXPECT_EQ(dataSync->AckRecv(nullptr, nullptr), -E_INVALID_ARGS);
1031     EXPECT_EQ(dataSync->AckRecv(nullptr, &message), -E_INVALID_ARGS);
1032     delete dataSync;
1033 }
1034 
1035 /**
1036   * @tc.name: DataSync005
1037   * @tc.desc: Test receive invalid notify packet in DataSync Class
1038   * @tc.type: FUNC
1039   * @tc.require: AR000HI2JS
1040   * @tc.author: zhuwentao
1041   */
1042 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync005, TestSize.Level1)
1043 {
1044     ASSERT_NO_FATAL_FAILURE(DataSync005());
1045 }
1046 
1047 /**
1048   * @tc.name: DataSync006
1049   * @tc.desc: Test control start with invalid param in DataSync Class
1050   * @tc.type: FUNC
1051   * @tc.require: AR000HI2JS
1052   * @tc.author: zhuwentao
1053   */
1054 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync006, TestSize.Level1)
1055 {
1056     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1057     ASSERT_TRUE(dataSync != nullptr);
1058     TestSingleVerKvSyncTaskContext tmpContext;
1059     EXPECT_EQ(dataSync->ControlCmdStart(nullptr), -E_INVALID_ARGS);
1060     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1061     std::shared_ptr<SubscribeManager> subManager = std::make_shared<SubscribeManager>();
1062     tmpContext.SetSubscribeManager(subManager);
1063     tmpContext.SetMode(SyncModeType::INVALID_MODE);
1064     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_INVALID_ARGS);
1065     std::set<Key> Keys = {{'a'}, {'b'}};
1066     Query query = Query::Select().InKeys(Keys);
1067     QuerySyncObject innerQuery(query);
1068     tmpContext.SetQuery(innerQuery);
1069     tmpContext.SetMode(SyncModeType::SUBSCRIBE_QUERY);
1070     EXPECT_EQ(dataSync->ControlCmdStart(&tmpContext), -E_NOT_SUPPORT);
1071     delete dataSync;
1072     subManager = nullptr;
1073 }
1074 
1075 /**
1076   * @tc.name: DataSync007
1077   * @tc.desc: Test receive invalid control packet in DataSync Class
1078   * @tc.type: FUNC
1079   * @tc.require: AR000HI2JS
1080   * @tc.author: zhuwentao
1081   */
1082 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync007, TestSize.Level1)
1083 {
1084     SingleVerDataSync *dataSync = new (std::nothrow) SingleVerDataSync();
1085     ASSERT_TRUE(dataSync != nullptr);
1086     Message message;
1087     ControlRequestPacket packet;
1088     TestSingleVerKvSyncTaskContext tmpContext;
1089     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1090     message.SetCopiedObject(packet);
1091     EXPECT_EQ(dataSync->ControlCmdRequestRecv(nullptr, &message), -E_INVALID_ARGS);
1092     delete dataSync;
1093 }
1094 
1095 /**
1096   * @tc.name: DataSync008
1097   * @tc.desc: Test pull null msg in dataQueue in DataSync Class
1098   * @tc.type: FUNC
1099   * @tc.require: AR000HI2JS
1100   * @tc.author: zhuwentao
1101   */
1102 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DataSync008, TestSize.Level1)
1103 {
1104     ASSERT_NO_FATAL_FAILURE(DataSync008());
1105 }
1106 
1107 /**
1108  * @tc.name: SyncRetry001
1109  * @tc.desc: use sync retry sync use push
1110  * @tc.type: FUNC
1111  * @tc.require: AR000CKRTD AR000CQE0E
1112  * @tc.author: zhuwentao
1113  */
1114 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry001, TestSize.Level3)
1115 {
1116     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1117     std::vector<std::string> devices;
1118     devices.push_back(g_deviceB->GetDeviceId());
1119 
1120     /**
1121      * @tc.steps: step1. set sync retry
1122      * @tc.expected: step1, Pragma return OK.
1123      */
1124     int pragmaData = 1;
1125     PragmaData input = static_cast<PragmaData>(&pragmaData);
1126     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1127 
1128     /**
1129      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1130      */
1131     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1132 
1133     /**
1134      * @tc.steps: step3. deviceA call sync and wait
1135      * @tc.expected: step3. sync should return OK.
1136      */
1137     std::map<std::string, DBStatus> result;
1138     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1139 
1140     /**
1141      * @tc.expected: step4. onComplete should be called, and status is time_out
1142      */
1143     ASSERT_TRUE(result.size() == devices.size());
1144     for (const auto &pair : result) {
1145         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1146         EXPECT_TRUE(pair.second == OK);
1147     }
1148     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1149 }
1150 
1151 /**
1152  * @tc.name: SyncRetry002
1153  * @tc.desc: use sync retry sync use pull
1154  * @tc.type: FUNC
1155  * @tc.require: AR000CKRTD AR000CQE0E
1156  * @tc.author: zhuwentao
1157  */
1158 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry002, TestSize.Level3)
1159 {
1160     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE, 4u);
1161     std::vector<std::string> devices;
1162     devices.push_back(g_deviceB->GetDeviceId());
1163 
1164     /**
1165      * @tc.steps: step1. set sync retry
1166      * @tc.expected: step1, Pragma return OK.
1167      */
1168     int pragmaData = 1;
1169     PragmaData input = static_cast<PragmaData>(&pragmaData);
1170     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1171 
1172     /**
1173      * @tc.steps: step2. deviceA call sync and wait
1174      * @tc.expected: step2. sync should return OK.
1175      */
1176     std::map<std::string, DBStatus> result;
1177     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1178 
1179     /**
1180      * @tc.expected: step3. onComplete should be called, and status is time_out
1181      */
1182     ASSERT_TRUE(result.size() == devices.size());
1183     for (const auto &pair : result) {
1184         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1185         EXPECT_TRUE(pair.second == TIME_OUT);
1186     }
1187     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1188 }
1189 
1190 /**
1191  * @tc.name: SyncRetry003
1192  * @tc.desc: use sync retry sync use push by compress
1193  * @tc.type: FUNC
1194  * @tc.require: AR000CKRTD AR000CQE0E
1195  * @tc.author: zhuwentao
1196  */
1197 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry003, TestSize.Level3)
1198 {
1199     if (g_kvDelegatePtr != nullptr) {
1200         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1201         g_kvDelegatePtr = nullptr;
1202     }
1203     /**
1204      * @tc.steps: step1. open db use Compress
1205      * @tc.expected: step1, Pragma return OK.
1206      */
1207     KvStoreNbDelegate::Option option;
1208     option.isNeedCompressOnSync = true;
1209     option.compressionRate = 70;
1210     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1211     ASSERT_TRUE(g_kvDelegateStatus == OK);
1212     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1213 
1214     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1215     std::vector<std::string> devices;
1216     devices.push_back(g_deviceB->GetDeviceId());
1217 
1218     /**
1219      * @tc.steps: step2. set sync retry
1220      * @tc.expected: step2, Pragma return OK.
1221      */
1222     int pragmaData = 1;
1223     PragmaData input = static_cast<PragmaData>(&pragmaData);
1224     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1225 
1226     /**
1227      * @tc.steps: step3. deviceA put {k1, v1}, {k2, v2}
1228      */
1229     ASSERT_TRUE(g_kvDelegatePtr->Put(KEY_1, VALUE_1) == OK);
1230 
1231     /**
1232      * @tc.steps: step4. deviceA call sync and wait
1233      * @tc.expected: step4. sync should return OK.
1234      */
1235     std::map<std::string, DBStatus> result;
1236     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1237 
1238     /**
1239      * @tc.expected: step5. onComplete should be called, and status is time_out
1240      */
1241     ASSERT_TRUE(result.size() == devices.size());
1242     for (const auto &pair : result) {
1243         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1244         EXPECT_TRUE(pair.second == OK);
1245     }
1246     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1247 }
1248 
1249 /**
1250  * @tc.name: SyncRetry004
1251  * @tc.desc: use query sync retry sync use push
1252  * @tc.type: FUNC
1253  * @tc.require: AR000CKRTD AR000CQE0E
1254  * @tc.author: zhuwentao
1255  */
1256 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry004, TestSize.Level3)
1257 {
1258     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1259     std::vector<std::string> devices;
1260     devices.push_back(g_deviceB->GetDeviceId());
1261 
1262     /**
1263      * @tc.steps: step1. set sync retry
1264      * @tc.expected: step1, Pragma return OK.
1265      */
1266     int pragmaData = 1;
1267     PragmaData input = static_cast<PragmaData>(&pragmaData);
1268     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1269 
1270     /**
1271      * @tc.steps: step2. deviceA put {k1, v1}, {k2, v2}
1272      */
1273     for (int i = 0; i < 5; i++) {
1274         Key key = DistributedDBToolsUnitTest::GetRandPrefixKey({'a', 'b'}, 128); // rand num 1024 for test
1275         Value value;
1276         DistributedDBToolsUnitTest::GetRandomKeyValue(value, 256u);
1277         EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1278     }
1279 
1280     /**
1281      * @tc.steps: step3. deviceA call sync and wait
1282      * @tc.expected: step3. sync should return OK.
1283      */
1284     std::map<std::string, DBStatus> result;
1285     std::vector<uint8_t> prefixKey({'a', 'b'});
1286     Query query = Query::Select().PrefixKey(prefixKey);
1287     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, query) == OK);
1288 
1289     /**
1290      * @tc.expected: step4. onComplete should be called, and status is time_out
1291      */
1292     ASSERT_TRUE(result.size() == devices.size());
1293     for (const auto &pair : result) {
1294         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1295         EXPECT_TRUE(pair.second == OK);
1296     }
1297     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1298 }
1299 
1300 /**
1301  * @tc.name: SyncRetry005
1302  * @tc.desc: use sync retry sync use pull by compress
1303  * @tc.type: FUNC
1304  * @tc.require: AR000CKRTD AR000CQE0E
1305  * @tc.author: zhangqiquan
1306  */
1307 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, SyncRetry005, TestSize.Level3)
1308 {
1309     if (g_kvDelegatePtr != nullptr) {
1310         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1311         g_kvDelegatePtr = nullptr;
1312     }
1313     /**
1314      * @tc.steps: step1. open db use Compress
1315      * @tc.expected: step1, Pragma return OK.
1316      */
1317     KvStoreNbDelegate::Option option;
1318     option.isNeedCompressOnSync = true;
1319     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1320     ASSERT_TRUE(g_kvDelegateStatus == OK);
1321     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1322 
1323     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1324     std::vector<std::string> devices;
1325     devices.push_back(g_deviceB->GetDeviceId());
1326 
1327     /**
1328      * @tc.steps: step2. set sync retry
1329      * @tc.expected: step2, Pragma return OK.
1330      */
1331     int pragmaData = 1;
1332     PragmaData input = static_cast<PragmaData>(&pragmaData);
1333     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_SYNC_RETRY, input) == OK);
1334 
1335     /**
1336      * @tc.steps: step3. deviceA call sync and wait
1337      * @tc.expected: step3. sync should return OK.
1338      */
1339     std::map<std::string, DBStatus> result;
1340     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
1341 
1342     /**
1343      * @tc.expected: step4. onComplete should be called, and status is time_out
1344      */
1345     ASSERT_TRUE(result.size() == devices.size());
1346     for (const auto &pair : result) {
1347         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1348         EXPECT_EQ(pair.second, OK);
1349     }
1350     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, UNKNOW_MESSAGE);
1351 }
1352 
1353 /**
1354  * @tc.name: ReSetWatchDogTest001
1355  * @tc.desc: trigger resetWatchDog while pull
1356  * @tc.type: FUNC
1357  * @tc.require: AR000CKRTD AR000CQE0E
1358  * @tc.author: zhuwentao
1359  */
1360 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, ReSetWaterDogTest001, TestSize.Level3)
1361 {
1362     ASSERT_NO_FATAL_FAILURE(ReSetWaterDogTest001());
1363 }
1364 
1365 /**
1366   * @tc.name: RebuildSync001
1367   * @tc.desc: rebuild db and sync again
1368   * @tc.type: FUNC
1369   * @tc.require:
1370   * @tc.author: zhuwentao
1371   */
1372 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync001, TestSize.Level3)
1373 {
1374     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1375     /**
1376      * @tc.steps: step1. sync deviceB data to A and check data
1377      * * @tc.expected: step1. interface return ok
1378     */
1379     Key key1 = {'1'};
1380     Key key2 = {'2'};
1381     Value value = {'1'};
1382     Timestamp currentTime;
1383     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1384     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1385     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1386     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1387     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1388 
1389     Value actualValue;
1390     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1391     EXPECT_EQ(actualValue, value);
1392     actualValue.clear();
1393     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1394     EXPECT_EQ(actualValue, value);
1395     /**
1396      * @tc.steps: step2. delete db and rebuild
1397      * * @tc.expected: step2. interface return ok
1398     */
1399     g_mgr.CloseKvStore(g_kvDelegatePtr);
1400     g_kvDelegatePtr = nullptr;
1401     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1402     KvStoreNbDelegate::Option option;
1403     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1404     ASSERT_TRUE(g_kvDelegateStatus == OK);
1405     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1406     /**
1407      * @tc.steps: step3. sync to device A again
1408      * * @tc.expected: step3. sync ok
1409     */
1410     value = {'2'};
1411     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1412     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1413     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1414     /**
1415      * @tc.steps: step4. check data in device A
1416      * * @tc.expected: step4. check ok
1417     */
1418     actualValue.clear();
1419     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1420     EXPECT_EQ(actualValue, value);
1421 }
1422 
1423 /**
1424   * @tc.name: RebuildSync002
1425   * @tc.desc: test clear remote data when receive data
1426   * @tc.type: FUNC
1427   * @tc.require:
1428   * @tc.author: zhuwentao
1429   */
1430 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync002, TestSize.Level1)
1431 {
1432     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1433     std::vector<std::string> devices;
1434     devices.push_back(g_deviceB->GetDeviceId());
1435     /**
1436      * @tc.steps: step1. device A SET_WIPE_POLICY
1437      * * @tc.expected: step1. interface return ok
1438     */
1439     int pragmaData = 2; // 2 means enable
1440     PragmaData input = static_cast<PragmaData>(&pragmaData);
1441     EXPECT_TRUE(g_kvDelegatePtr->Pragma(SET_WIPE_POLICY, input) == OK);
1442     /**
1443      * @tc.steps: step2. sync deviceB data to A and check data
1444      * * @tc.expected: step2. interface return ok
1445     */
1446     Key key1 = {'1'};
1447     Key key2 = {'2'};
1448     Key key3 = {'3'};
1449     Key key4 = {'4'};
1450     Value value = {'1'};
1451     Timestamp currentTime;
1452     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1453     EXPECT_EQ(g_deviceB->PutData(key1, value, currentTime, 0), E_OK);
1454     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1455     EXPECT_EQ(g_deviceB->PutData(key2, value, currentTime, 0), E_OK);
1456     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1457     /**
1458      * @tc.steps: step3. deviceA call pull sync
1459      * @tc.expected: step3. sync should return OK.
1460      */
1461     std::map<std::string, DBStatus> result;
1462     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result) == OK);
1463 
1464     /**
1465      * @tc.expected: step4. onComplete should be called, check data
1466      */
1467     ASSERT_TRUE(result.size() == devices.size());
1468     for (const auto &pair : result) {
1469         EXPECT_TRUE(pair.second == OK);
1470     }
1471     Value actualValue;
1472     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1473     EXPECT_EQ(actualValue, value);
1474     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1475     EXPECT_EQ(actualValue, value);
1476     /**
1477      * @tc.steps: step5. device B rebuild and put some data
1478      * * @tc.expected: step5. rebuild ok
1479     */
1480     if (g_deviceB != nullptr) {
1481         delete g_deviceB;
1482         g_deviceB = nullptr;
1483     }
1484     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
1485     ASSERT_TRUE(g_deviceB != nullptr);
1486     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1487     ASSERT_TRUE(syncInterfaceB != nullptr);
1488     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
1489     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1490     EXPECT_EQ(g_deviceB->PutData(key3, value, currentTime, 0), E_OK);
1491     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
1492     EXPECT_EQ(g_deviceB->PutData(key4, value, currentTime, 0), E_OK);
1493     /**
1494      * @tc.steps: step6. sync to device A again and check data
1495      * * @tc.expected: step6. sync ok
1496     */
1497     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1498     EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1499     EXPECT_EQ(actualValue, value);
1500     EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1501     EXPECT_EQ(actualValue, value);
1502     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1503     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), NOT_FOUND);
1504 }
1505 
1506 /**
1507   * @tc.name: RebuildSync003
1508   * @tc.desc: test clear history data when receive ack
1509   * @tc.type: FUNC
1510   * @tc.require:
1511   * @tc.author: zhuwentao
1512   */
1513 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync003, TestSize.Level1)
1514 {
1515     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1516     /**
1517      * @tc.steps: step1. sync deviceB data to A and check data
1518      * * @tc.expected: step1. interface return ok
1519     */
1520     Key key1 = {'1'};
1521     Key key2 = {'2'};
1522     Key key3 = {'3'};
1523     Key key4 = {'4'};
1524     Value value = {'1'};
1525     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1526     EXPECT_EQ(g_deviceB->PutData(key2, value, 2u, 0), E_OK); // 2: timestamp
1527     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1528     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1529     Value actualValue;
1530     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1531     EXPECT_EQ(actualValue, value);
1532     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1533     EXPECT_EQ(actualValue, value);
1534     VirtualDataItem item;
1535     EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1536     EXPECT_EQ(item.value, value);
1537     /**
1538      * @tc.steps: step2. device B sync to device A,but make it failed
1539      * * @tc.expected: step2. interface return ok
1540     */
1541     EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 0), E_OK); // 3: timestamp
1542     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_A, DATA_SYNC_MESSAGE);
1543     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1544     /**
1545      * @tc.steps: step3. device B set delay send time
1546      * * @tc.expected: step3. interface return ok
1547     */
1548     std::set<std::string> delayDevice = {DEVICE_B};
1549     g_communicatorAggregator->SetSendDelayInfo(3000u, DATA_SYNC_MESSAGE, 1u, 0u, delayDevice); // delay 3000ms one time
1550     /**
1551      * @tc.steps: step4. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1552      * * @tc.expected: step4. interface return ok
1553     */
1554     g_deviceB->SetClearRemoteStaleData(true);
1555     g_mgr.CloseKvStore(g_kvDelegatePtr);
1556     g_kvDelegatePtr = nullptr;
1557     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1558     KvStoreNbDelegate::Option option;
1559     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1560     ASSERT_TRUE(g_kvDelegateStatus == OK);
1561     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1562     std::map<std::string, DBStatus> result;
1563     std::vector<std::string> devices = {g_deviceB->GetDeviceId()};
1564     g_communicatorAggregator->SetDropMessageTypeByDevice(DEVICE_B, DATA_SYNC_MESSAGE);
1565     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
1566     /**
1567      * @tc.steps: step5. device B sync to A, make it clear history data and check data
1568      * * @tc.expected: step5. interface return ok
1569     */
1570     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1571     EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1572     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1573     EXPECT_EQ(actualValue, value);
1574     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1575     EXPECT_EQ(actualValue, value);
1576     g_communicatorAggregator->ResetSendDelayInfo();
1577 }
1578 
1579 /**
1580   * @tc.name: RebuildSync004
1581   * @tc.desc: test WIPE_STALE_DATA mode when peers rebuilt db
1582   * @tc.type: FUNC
1583   * @tc.require:
1584   * @tc.author: zhangtao
1585   */
1586 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RebuildSync004, TestSize.Level1)
1587 {
1588     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1589     /**
1590      * @tc.steps: step1. sync deviceB data to A and check data
1591      * * @tc.expected: step1. interface return ok
1592     */
1593     Key key1 = {'1'};
1594     Key key2 = {'2'};
1595     Key key3 = {'3'};
1596     Key key4 = {'4'};
1597     Value value = {'1'};
1598     EXPECT_EQ(g_kvDelegatePtr->Put(key1, value), OK);
1599     EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1600     EXPECT_EQ(g_kvDelegatePtr->Put(key3, value), OK);
1601     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1602     Value actualValue;
1603     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), OK);
1604     EXPECT_EQ(actualValue, value);
1605     EXPECT_EQ(g_kvDelegatePtr->Get(key2, actualValue), OK);
1606     EXPECT_EQ(actualValue, value);
1607     EXPECT_EQ(g_kvDelegatePtr->Get(key3, actualValue), OK);
1608     EXPECT_EQ(actualValue, value);
1609     VirtualDataItem item;
1610     EXPECT_EQ(g_deviceB->GetData(key1, item), E_OK);
1611     EXPECT_EQ(item.value, value);
1612     EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1613     EXPECT_EQ(item.value, value);
1614     EXPECT_EQ(g_deviceB->GetData(key3, item), E_OK);
1615     EXPECT_EQ(item.value, value);
1616 
1617     /**
1618      * @tc.steps: step2. device A rebuilt, device B push data to A and set clear remote data mark into context after 1s
1619      * * @tc.expected: step2. interface return ok
1620     */
1621     g_deviceB->SetClearRemoteStaleData(true);
1622     EXPECT_EQ(g_deviceB->PutData(key4, value, 3u, 2), E_OK); // 3: timestamp
1623 
1624     VirtualDataItem item2;
1625     EXPECT_EQ(g_deviceB->GetData(key4, item2), E_OK);
1626     EXPECT_EQ(item2.value, value);
1627     g_mgr.CloseKvStore(g_kvDelegatePtr);
1628     g_kvDelegatePtr = nullptr;
1629     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1630     KvStoreNbDelegate::Option option;
1631     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
1632     ASSERT_TRUE(g_kvDelegateStatus == OK);
1633     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1634 
1635     /**
1636      * @tc.steps: step3. device B sync to A, make it clear history data and check data
1637      * * @tc.expected: step3. interface return ok
1638     */
1639     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1640     EXPECT_EQ(g_deviceB->GetData(key2, item), -E_NOT_FOUND);
1641     EXPECT_EQ(g_deviceB->GetData(key3, item), -E_NOT_FOUND);
1642     EXPECT_EQ(g_deviceB->GetData(key4, item2), E_OK);
1643     EXPECT_EQ(item2.value, value);
1644     EXPECT_EQ(g_kvDelegatePtr->Get(key4, actualValue), OK);
1645     EXPECT_EQ(actualValue, value);
1646 }
1647 
1648 /**
1649   * @tc.name: RemoveDeviceData001
1650   * @tc.desc: call rekey and removeDeviceData Concurrently
1651   * @tc.type: FUNC
1652   * @tc.require: AR000D487B
1653   * @tc.author: zhuwentao
1654   */
1655 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, RemoveDeviceData001, TestSize.Level1)
1656 {
1657     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1658     /**
1659      * @tc.steps: step1. sync deviceB data to A
1660      * * @tc.expected: step1. interface return ok
1661     */
1662     Key key1 = {'1'};
1663     Key key2 = {'2'};
1664     Value value = {'1'};
1665     g_deviceB->PutData(key1, value, 1, 0);
1666     g_deviceB->PutData(key2, value, 2, 0);
1667     g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true);
1668 
1669     Value actualValue;
1670     g_kvDelegatePtr->Get(key1, actualValue);
1671     EXPECT_EQ(actualValue, value);
1672     actualValue.clear();
1673     g_kvDelegatePtr->Get(key2, actualValue);
1674     EXPECT_EQ(actualValue, value);
1675     /**
1676      * @tc.steps: step2. call Rekey and RemoveDeviceData Concurrently
1677      * * @tc.expected: step2. interface return ok
1678     */
__anonb4f3bed91302() 1679     std::thread thread1([]() {
1680         CipherPassword passwd3;
1681         std::vector<uint8_t> passwdVect = {'p', 's', 'd', 'z'};
1682         passwd3.SetValue(passwdVect.data(), passwdVect.size());
1683         g_kvDelegatePtr->Rekey(passwd3);
1684     });
__anonb4f3bed91402() 1685     std::thread thread2([]() {
1686         g_kvDelegatePtr->RemoveDeviceData(g_deviceB->GetDeviceId());
1687     });
1688     thread1.join();
1689     thread2.join();
1690 }
1691 
1692 /**
1693   * @tc.name: DeviceOfflineSyncTask001
1694   * @tc.desc: Test sync task when device offline and close db Concurrently
1695   * @tc.type: FUNC
1696   * @tc.require: AR000HI2JS
1697   * @tc.author: zhuwentao
1698   */
1699 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask001, TestSize.Level3)
1700 {
1701     DBStatus status = OK;
1702     std::vector<std::string> devices;
1703     devices.push_back(g_deviceB->GetDeviceId());
1704 
1705     /**
1706      * @tc.steps: step1. deviceA put {k1, v1}
1707      */
1708     Key key = {'1'};
1709     Value value = {'1'};
1710     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1711 
1712     /**
1713      * @tc.steps: step2. deviceA set auto sync and put some key/value
1714      * @tc.expected: step2. interface should return OK.
1715      */
1716     bool autoSync = true;
1717     PragmaData data = static_cast<PragmaData>(&autoSync);
1718     status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1719     ASSERT_EQ(status, OK);
1720 
1721     Key key1 = {'2'};
1722     Key key2 = {'3'};
1723     Key key3 = {'4'};
1724     Key key4 = {'5'};
1725     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1726     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1727     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1728     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1729     ASSERT_TRUE(g_kvDelegatePtr->Put(key4, value) == OK);
1730     /**
1731      * @tc.steps: step3. device offline and close db Concurrently
1732      * @tc.expected: step3. interface should return OK.
1733      */
__anonb4f3bed91502() 1734     std::thread thread1([]() {
1735         g_mgr.CloseKvStore(g_kvDelegatePtr);
1736         g_kvDelegatePtr = nullptr;
1737     });
__anonb4f3bed91602() 1738     std::thread thread2([]() {
1739         g_deviceB->Offline();
1740     });
1741     thread1.join();
1742     thread2.join();
1743     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME));
1744     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1745 }
1746 
1747 /**
1748   * @tc.name: DeviceOfflineSyncTask002
1749   * @tc.desc: Test sync task when autoSync and close db Concurrently
1750   * @tc.type: FUNC
1751   * @tc.require:
1752   * @tc.author: zhuwentao
1753   */
1754 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask002, TestSize.Level3)
1755 {
1756     DBStatus status = OK;
1757     g_deviceC->Offline();
1758 
1759     /**
1760      * @tc.steps: step1. deviceA put {k1, v1}
1761      */
1762     Key key = {'1'};
1763     Value value = {'1'};
1764     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1765 
1766     /**
1767      * @tc.steps: step2. deviceA set auto sync and put some key/value
1768      * @tc.expected: step2. interface should return OK.
1769      */
1770     bool autoSync = true;
1771     PragmaData data = static_cast<PragmaData>(&autoSync);
1772     status = g_kvDelegatePtr->Pragma(AUTO_SYNC, data);
1773     ASSERT_EQ(status, OK);
1774     std::this_thread::sleep_for(std::chrono::milliseconds(WAIT_TIME * 2));
1775 
1776     Key key1 = {'2'};
1777     Key key2 = {'3'};
1778     Key key3 = {'4'};
1779     ASSERT_TRUE(g_kvDelegatePtr->Put(key1, value) == OK);
1780     ASSERT_TRUE(g_kvDelegatePtr->Put(key2, value) == OK);
1781     ASSERT_TRUE(g_kvDelegatePtr->Put(key3, value) == OK);
1782     /**
1783      * @tc.steps: step3. close db
1784      * @tc.expected: step3. interface should return OK.
1785      */
1786     g_mgr.CloseKvStore(g_kvDelegatePtr);
1787     g_kvDelegatePtr = nullptr;
1788     ASSERT_TRUE(g_mgr.DeleteKvStore(STORE_ID) == OK);
1789 }
1790 
1791 /**
1792   * @tc.name: DeviceOfflineSyncTask003
1793   * @tc.desc: Test sync task when device offline after call sync
1794   * @tc.type: FUNC
1795   * @tc.require:
1796   * @tc.author: zhuwentao
1797   */
1798 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, DeviceOfflineSyncTask003, TestSize.Level3)
1799 {
1800     std::vector<std::string> devices;
1801     devices.push_back(g_deviceB->GetDeviceId());
1802 
1803     /**
1804      * @tc.steps: step1. deviceA put {k1, v1}
1805      */
1806     Key key = {'1'};
1807     Value value = {'1'};
1808     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
1809     /**
1810      * @tc.steps: step2. device offline after call sync
1811      * @tc.expected: step2. interface should return OK.
1812      */
1813     Query query = Query::Select().PrefixKey(key);
1814     ASSERT_TRUE(g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, nullptr, query, false) == OK);
1815     std::this_thread::sleep_for(std::chrono::milliseconds(15)); // wait for 15ms
1816     g_deviceB->Offline();
1817 }
1818 
1819 /**
1820   * @tc.name: GetSyncDataFail001
1821   * @tc.desc: test get sync data failed when sync
1822   * @tc.type: FUNC
1823   * @tc.require:
1824   * @tc.author: zhuwentao
1825   */
1826 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail001, TestSize.Level1)
1827 {
1828     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1829     /**
1830      * @tc.steps: step1. device B set get data errCode control and put some data
1831      * * @tc.expected: step1. interface return ok
1832     */
1833     g_deviceB->SetGetDataErrCode(1, -E_BUSY, true);
1834     Key key1 = {'1'};
1835     Value value = {'1'};
1836     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1837     /**
1838      * @tc.steps: step2. device B sync to device A and check data
1839      * * @tc.expected: step2. interface return ok
1840     */
1841     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1842     Value actualValue;
1843     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1844     g_deviceB->ResetDataControl();
1845 }
1846 
1847 /**
1848   * @tc.name: GetSyncDataFail002
1849   * @tc.desc: test get sync data failed when sync with large data
1850   * @tc.type: FUNC
1851   * @tc.require: AR000D487B
1852   * @tc.author: zhuwentao
1853   */
1854 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail002, TestSize.Level1)
1855 {
1856     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1857     /**
1858      * @tc.steps: step1. device B set get data errCode control and put some data
1859      * * @tc.expected: step1. interface return ok
1860     */
1861     g_deviceB->SetGetDataErrCode(2, -E_BUSY, true);
1862     int totalSize = 4000u;
1863     std::vector<Entry> entries;
1864     std::vector<Key> keys;
1865     const int keyLen = 10; // 20 Bytes
1866     const int valueLen = 10; // 20 Bytes
1867     DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1868     uint32_t i = 1u;
1869     for (const auto &entry : entries) {
1870         EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1871         i++;
1872     }
1873     /**
1874      * @tc.steps: step2. device B sync to device A and check data
1875      * * @tc.expected: step2. interface return ok
1876     */
1877     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_ONLY, true), E_OK);
1878     std::this_thread::sleep_for(std::chrono::seconds(1));
1879     Value actualValue;
1880     for (int j = 1u; j <= totalSize; j++) {
1881         if (j > totalSize / 2) {
1882             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1883         } else {
1884             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1885         }
1886     }
1887     g_deviceB->ResetDataControl();
1888 }
1889 
1890 /**
1891   * @tc.name: GetSyncDataFail003
1892   * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1893   * @tc.type: FUNC
1894   * @tc.require:
1895   * @tc.author: zhuwentao
1896   */
1897 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail003, TestSize.Level1)
1898 {
1899     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1900     /**
1901      * @tc.steps: step1. device B set get data errCode control and put some data
1902      * * @tc.expected: step1. interface return ok
1903     */
1904     g_deviceB->SetGetDataErrCode(1, -E_EKEYREVOKED, true);
1905     Key key1 = {'1'};
1906     Key key2 = {'3'};
1907     Value value = {'1'};
1908     EXPECT_EQ(g_deviceB->PutData(key1, value, 1u, 0), E_OK); // 1: timestamp
1909     EXPECT_EQ(g_kvDelegatePtr->Put(key2, value), OK);
1910     /**
1911      * @tc.steps: step2. device B sync to device A and check data
1912      * * @tc.expected: step2. interface return ok
1913     */
1914     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1915     Value actualValue;
1916     EXPECT_EQ(g_kvDelegatePtr->Get(key1, actualValue), NOT_FOUND);
1917     VirtualDataItem item;
1918     EXPECT_EQ(g_deviceB->GetData(key2, item), E_OK);
1919     g_deviceB->ResetDataControl();
1920 }
1921 
1922 /**
1923   * @tc.name: GetSyncDataFail004
1924   * @tc.desc: test get sync data E_EKEYREVOKED failed in push_and_pull sync
1925   * @tc.type: FUNC
1926   * @tc.require:
1927   * @tc.author: zhuwentao
1928   */
1929 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, GetSyncDataFail004, TestSize.Level1)
1930 {
1931     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1932     /**
1933      * @tc.steps: step1. device B set get data errCode control and put some data
1934      * * @tc.expected: step1. interface return ok
1935     */
1936     g_deviceB->SetGetDataErrCode(2, -E_EKEYREVOKED, true);
1937     int totalSize = 4000u;
1938     std::vector<Entry> entries;
1939     std::vector<Key> keys;
1940     const int keyLen = 10; // 20 Bytes
1941     const int valueLen = 10; // 20 Bytes
1942     DistributedDBUnitTest::GenerateRecords(totalSize, entries, keys, keyLen, valueLen);
1943     uint32_t i = 1u;
1944     for (const auto &entry : entries) {
1945         EXPECT_EQ(g_deviceB->PutData(entry.key, entry.value, i, 0), E_OK);
1946         i++;
1947     }
1948     Key key = {'a', 'b', 'c'};
1949     Value value = {'1'};
1950     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
1951     /**
1952      * @tc.steps: step2. device B sync to device A and check data
1953      * * @tc.expected: step2. interface return ok
1954     */
1955     EXPECT_EQ(g_deviceB->Sync(DistributedDB::SYNC_MODE_PUSH_PULL, true), E_OK);
1956     std::this_thread::sleep_for(std::chrono::seconds(1));
1957     Value actualValue;
1958     for (int j = 1u; j <= totalSize; j++) {
1959         if (j > totalSize / 2) {
1960             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), NOT_FOUND);
1961         } else {
1962             EXPECT_EQ(g_kvDelegatePtr->Get(entries[j - 1].key, actualValue), OK);
1963         }
1964     }
1965     VirtualDataItem item;
1966     EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1967     g_deviceB->ResetDataControl();
1968 }
1969 
1970 /**
1971   * @tc.name: InterceptDataFail001
1972   * @tc.desc: test intercept data failed when sync
1973   * @tc.type: FUNC
1974   * @tc.require:
1975   * @tc.author: zhuwentao
1976   */
1977 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail001, TestSize.Level1)
1978 {
1979     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
1980     /**
1981      * @tc.steps: step1. device A set intercept data errCode and put some data
1982      * * @tc.expected: step1. interface return ok
1983     */
1984     g_kvDelegatePtr->SetPushDataInterceptor(
__anonb4f3bed91702(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 1985         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
1986             int errCode = OK;
1987             auto entries = data.GetEntries();
1988             LOGD("====here111,size=%d", entries.size());
1989             for (size_t i = 0; i < entries.size(); i++) {
1990                 Key newKey;
1991                 errCode = data.ModifyKey(i, newKey);
1992                 if (errCode != OK) {
1993                     break;
1994                 }
1995             }
1996             return errCode;
1997         }
1998     );
1999     Key key = {'1'};
2000     Value value = {'1'};
2001     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2002     /**
2003      * @tc.steps: step2. device A sync to device B and check data
2004      * * @tc.expected: step2. interface return ok
2005     */
2006     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2007     std::map<std::string, DBStatus> result;
2008     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result) == OK);
2009     ASSERT_TRUE(result.size() == devices.size());
2010     for (const auto &pair : result) {
2011         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2012         EXPECT_TRUE(pair.second == INTERCEPT_DATA_FAIL);
2013     }
2014     VirtualDataItem item;
2015     EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
2016 }
2017 
2018 /**
2019   * @tc.name: InterceptDataFail002
2020   * @tc.desc: test intercept data failed when sync
2021   * @tc.type: FUNC
2022   * @tc.require:
2023   * @tc.author: zhangqiquan
2024   */
2025 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptDataFail002, TestSize.Level0)
2026 {
2027     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2028     /**
2029      * @tc.steps: step1. device A set intercept data errCode and B put some data
2030      * @tc.expected: step1. interface return ok
2031      */
2032     g_kvDelegatePtr->SetReceiveDataInterceptor(
__anonb4f3bed91802(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2033         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2034             auto entries = data.GetEntries();
2035             LOGD("====on receive,size=%d", entries.size());
2036             for (size_t i = 0; i < entries.size(); i++) {
2037                 Key newKey;
2038                 int errCode = data.ModifyKey(i, newKey);
2039                 if (errCode != OK) {
2040                     return errCode;
2041                 }
2042             }
2043             return E_OK;
2044         }
2045     );
2046     Key key = {'1'};
2047     Value value = {'1'};
2048     g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2049     /**
2050      * @tc.steps: step2. device A sync to device B and check data
2051      * @tc.expected: step2. interface return ok
2052      */
2053     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2054     std::map<std::string, DBStatus> result;
2055     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2056     ASSERT_TRUE(result.size() == devices.size());
2057     for (const auto &pair : result) {
2058         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2059         EXPECT_EQ(pair.second, INTERCEPT_DATA_FAIL);
2060     }
2061     Value actualValue;
2062     EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2063 }
2064 
2065 /**
2066   * @tc.name: InterceptData001
2067   * @tc.desc: test intercept receive data when sync
2068   * @tc.type: FUNC
2069   * @tc.require:
2070   * @tc.author: zhangqiquan
2071   */
2072 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, InterceptData001, TestSize.Level0)
2073 {
2074     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2075     /**
2076      * @tc.steps: step1. device A set intercept data errCode and B put some data
2077      * @tc.expected: step1. interface return ok
2078      */
2079     g_kvDelegatePtr->SetReceiveDataInterceptor(
__anonb4f3bed91902(InterceptedData &data, const std::string &sourceID, const std::string &targetID) 2080         [](InterceptedData &data, const std::string &sourceID, const std::string &targetID) {
2081             auto entries = data.GetEntries();
2082             LOGD("====on receive,size=%d", entries.size());
2083             for (size_t i = 0; i < entries.size(); i++) {
2084                 Key newKey = {'2'};
2085                 int errCode = data.ModifyKey(i, newKey);
2086                 if (errCode != OK) {
2087                     return errCode;
2088                 }
2089                 Value newValue = {'3'};
2090                 errCode = data.ModifyValue(i, newValue);
2091                 if (errCode != OK) {
2092                     return errCode;
2093                 }
2094             }
2095             return E_OK;
2096         }
2097     );
2098     Key key = {'1'};
2099     Value value = {'1'};
2100     g_deviceB->PutData(key, value, 1u, 0); // 1 is timestamp
2101     /**
2102      * @tc.steps: step2. device A sync to device B and check data
2103      * @tc.expected: step2. interface return ok
2104      */
2105     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2106     std::map<std::string, DBStatus> result;
2107     ASSERT_TRUE(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result) == OK);
2108     ASSERT_TRUE(result.size() == devices.size());
2109     for (const auto &pair : result) {
2110         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2111         EXPECT_EQ(pair.second, OK);
2112     }
2113     Value actualValue;
2114     EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), NOT_FOUND);
2115     key = {'2'};
2116     EXPECT_EQ(g_kvDelegatePtr->Get(key, actualValue), OK);
2117     value = {'3'};
2118     EXPECT_EQ(actualValue, value);
2119 }
2120 
2121 /**
2122   * @tc.name: UpdateKey001
2123   * @tc.desc: test update key can effect local data and sync data, without delete data
2124   * @tc.type: FUNC
2125   * @tc.require:
2126   * @tc.author: zhangqiquan
2127   */
2128 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, UpdateKey001, TestSize.Level1)
2129 {
2130     /**
2131      * @tc.steps: step1. device A set sync data (k1, v1) local data (k2, v2) (k3, v3) and delete (k4, v4)
2132      * @tc.expected: step1. put data return ok
2133      */
2134     Key k1 = {'k', '1'};
2135     Value v1 = {'v', '1'};
2136     g_deviceB->PutData(k1, v1, 1, 0);
2137     ASSERT_EQ(g_deviceB->Sync(SyncMode::SYNC_MODE_PUSH_ONLY, true), E_OK);
2138     Value actualValue;
2139     EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2140     EXPECT_EQ(v1, actualValue);
2141     Key k2 = {'k', '2'};
2142     Value v2 = {'v', '2'};
2143     Key k3 = {'k', '3'};
2144     Value v3 = {'v', '3'};
2145     Key k4 = {'k', '4'};
2146     Value v4 = {'v', '4'};
2147     EXPECT_EQ(g_kvDelegatePtr->Put(k2, v2), OK);
2148     EXPECT_EQ(g_kvDelegatePtr->Put(k3, v3), OK);
2149     EXPECT_EQ(g_kvDelegatePtr->Put(k4, v4), OK);
2150     EXPECT_EQ(g_kvDelegatePtr->Delete(k4), OK);
2151     /**
2152      * @tc.steps: step2. device A update key and set
2153      * @tc.expected: step2. put data return ok
2154      */
__anonb4f3bed91a02(const Key &originKey, Key &newKey) 2155     DBStatus status = g_kvDelegatePtr->UpdateKey([](const Key &originKey, Key &newKey) {
2156         newKey = originKey;
2157         newKey.push_back('0');
2158     });
2159     EXPECT_EQ(status, OK);
2160     k1.push_back('0');
2161     k2.push_back('0');
2162     k3.push_back('0');
2163     EXPECT_EQ(g_kvDelegatePtr->Get(k1, actualValue), OK);
2164     EXPECT_EQ(v1, actualValue);
2165     EXPECT_EQ(g_kvDelegatePtr->Get(k2, actualValue), OK);
2166     EXPECT_EQ(v2, actualValue);
2167     EXPECT_EQ(g_kvDelegatePtr->Get(k3, actualValue), OK);
2168     EXPECT_EQ(v3, actualValue);
2169 }
2170 
2171 /**
2172   * @tc.name: MetaBusy001
2173   * @tc.desc: test sync normal when update water mark busy
2174   * @tc.type: FUNC
2175   * @tc.require:
2176   * @tc.author: zhangqiquan
2177   */
2178 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, MetaBusy001, TestSize.Level1)
2179 {
2180     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2181     Key key = {'1'};
2182     Value value = {'1'};
2183     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
2184     std::vector<std::string> devices = { g_deviceB->GetDeviceId() };
2185     std::map<std::string, DBStatus> result;
2186     ASSERT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2187     ASSERT_EQ(result.size(), devices.size());
2188     for (const auto &pair : result) {
2189         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2190         EXPECT_TRUE(pair.second == OK);
2191     }
2192     value = {'2'};
2193     EXPECT_EQ(g_kvDelegatePtr->Put(key, value), OK);
__anonb4f3bed91b02() 2194     g_deviceB->SetSaveDataCallback([] () {
2195         RuntimeContext::GetInstance()->ScheduleTask([]() {
2196             g_deviceB->EraseWaterMark("real_device");
2197         });
2198         std::this_thread::sleep_for(std::chrono::seconds(1));
2199     });
2200     EXPECT_EQ(g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result), OK);
2201     EXPECT_EQ(result.size(), devices.size());
2202     for (const auto &pair : result) {
2203         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
2204         EXPECT_TRUE(pair.second == OK);
2205     }
2206     g_deviceB->SetSaveDataCallback(nullptr);
2207     RuntimeContext::GetInstance()->StopTaskPool();
2208 }
2209 
2210 /**
2211  * @tc.name: TestErrCodePassthrough001
2212  * @tc.desc: Test ErrCode Passthrough when sync comm fail
2213  * @tc.type: FUNC
2214  * @tc.require:
2215  * @tc.author: suyue
2216  */
2217 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough001, TestSize.Level1)
2218 {
2219     /**
2220      * @tc.steps: step1. device put data.
2221      * @tc.expected: step1. sync return OK.
2222      */
2223     std::vector<std::string> devices;
2224     devices.push_back(g_deviceB->GetDeviceId());
2225     devices.push_back(g_deviceC->GetDeviceId());
2226     Key key1 = {'1'};
2227     Value value1 = {'1'};
2228     ASSERT_EQ(g_kvDelegatePtr->Put(key1, value1), OK);
2229 
2230     /**
2231      * @tc.steps: step2. call sync and mock commErrCode is E_BASE(positive number).
2232      * @tc.expected: step2. return COMM_FAILURE.
2233      */
2234     g_communicatorAggregator->MockCommErrCode(E_BASE);
2235     std::map<std::string, DBStatus> result;
2236     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2237     ASSERT_EQ(status, OK);
2238     for (const auto &pair : result) {
2239         LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, E_BASE);
2240         EXPECT_EQ(pair.second, COMM_FAILURE);
2241     }
2242 
2243     /**
2244      * @tc.steps: step3. call sync and mock commErrCode is -E_BASE(negative number).
2245      * @tc.expected: step3. return -E_BASE.
2246      */
2247     g_communicatorAggregator->MockCommErrCode(-E_BASE);
2248     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2249     ASSERT_EQ(status, OK);
2250     for (const auto &pair : result) {
2251         LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, COMM_FAILURE);
2252         EXPECT_EQ(pair.second, static_cast<DBStatus>(-E_BASE));
2253     }
2254 
2255     /**
2256      * @tc.steps: step4. call sync and mock commErrCode is INT_MAX.
2257      * @tc.expected: step4. return COMM_FAILURE.
2258      */
2259     g_communicatorAggregator->MockCommErrCode(INT_MAX);
2260     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2261     ASSERT_EQ(status, OK);
2262     for (const auto &pair : result) {
2263         LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, INT_MAX);
2264         EXPECT_EQ(pair.second, COMM_FAILURE);
2265     }
2266 
2267     /**
2268      * @tc.steps: step5. call sync and mock commErrCode is -INT_MAX.
2269      * @tc.expected: step5. return -INT_MAX.
2270      */
2271     g_communicatorAggregator->MockCommErrCode(-INT_MAX);
2272     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
2273     ASSERT_EQ(status, OK);
2274     for (const auto &pair : result) {
2275         LOGD("dev %s, status %d, expectStatus %d", pair.first.c_str(), pair.second, -INT_MAX);
2276         EXPECT_EQ(pair.second, -INT_MAX);
2277     }
2278     g_communicatorAggregator->MockCommErrCode(E_OK);
2279 }
2280 
2281 /**
2282   * @tc.name: TestErrCodePassthrough002
2283   * @tc.desc: Test ErrCode Passthrough when sync time out and isDirectEnd is false
2284   * @tc.type: FUNC
2285   * @tc.require:
2286   * @tc.author: suyue
2287   */
2288 HWTEST_F(DistributedDBSingleVerP2PComplexSyncTest, TestErrCodePassthrough002, TestSize.Level3)
2289 {
2290     /**
2291      * @tc.steps: step1. device put data.
2292      * @tc.expected: step1. sync return OK.
2293      */
2294     std::vector<std::string> devices;
2295     devices.push_back(g_deviceB->GetDeviceId());
2296     ASSERT_EQ(g_kvDelegatePtr->Put({'k', '1'}, {'v', '1'}), OK);
2297 
2298     /**
2299      * @tc.steps: step2. set messageId invalid and isDirectEnd is false
2300      * @tc.expected: step2. make sure deviceA push data failed due to timeout
2301      */
__anonb4f3bed91d02(const std::string &target, DistributedDB::Message *msg) 2302     g_communicatorAggregator->RegOnDispatch([](const std::string &target, DistributedDB::Message *msg) {
2303         ASSERT_NE(msg, nullptr);
2304         if (target == DEVICE_B && msg->GetMessageId() == QUERY_SYNC_MESSAGE) {
2305             msg->SetMessageId(INVALID_MESSAGE_ID);
2306         }
2307     });
2308     g_communicatorAggregator->MockDirectEndFlag(false);
2309 
2310     /**
2311      * @tc.steps: step3. call sync and mock errCode is E_BASE(positive number).
2312      * @tc.expected: step3. return TIME_OUT.
2313      */
2314     std::map<std::string, DBStatus> result;
__anonb4f3bed91e02(const std::map<std::string, DBStatus> &map) 2315     auto callback = [&result](const std::map<std::string, DBStatus> &map) {
2316         result = map;
2317     };
2318     Query query = Query::Select().PrefixKey({'k', '1'});
2319     g_communicatorAggregator->MockCommErrCode(E_BASE);
2320     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2321     EXPECT_EQ(result.size(), devices.size());
2322     EXPECT_EQ(result[DEVICE_B], TIME_OUT);
2323 
2324     /**
2325      * @tc.steps: step4. call sync and mock errCode is -E_BASE(negative number).
2326      * @tc.expected: step4. return -E_BASE.
2327      */
2328     g_communicatorAggregator->MockCommErrCode(-E_BASE);
2329     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2330     EXPECT_EQ(result.size(), devices.size());
2331     EXPECT_EQ(result[DEVICE_B], -E_BASE);
2332 
2333     /**
2334      * @tc.steps: step5. call sync and mock errCode is E_OK(0).
2335      * @tc.expected: step5. return TIME_OUT.
2336      */
2337     g_communicatorAggregator->MockCommErrCode(E_OK);
2338     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2339     EXPECT_EQ(result.size(), devices.size());
2340     EXPECT_EQ(result[DEVICE_B], TIME_OUT);
2341 
2342     /**
2343      * @tc.steps: step6. call sync and mock errCode is -INT_MAX.
2344      * @tc.expected: step6. return - INT_MAX.
2345      */
2346     g_communicatorAggregator->MockCommErrCode(-INT_MAX);
2347     EXPECT_EQ(g_kvDelegatePtr->Sync(devices, DistributedDB::SYNC_MODE_PUSH_ONLY, callback, query, true), OK);
2348     EXPECT_EQ(result.size(), devices.size());
2349     EXPECT_EQ(result[DEVICE_B], -INT_MAX);
2350 
2351     g_communicatorAggregator->RegOnDispatch(nullptr);
2352     g_communicatorAggregator->MockCommErrCode(E_OK);
2353     g_communicatorAggregator->MockDirectEndFlag(true);
2354 }
2355