1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 
18 #include "ability_sync.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "kv_store_nb_delegate_impl.h"
22 #include "kv_virtual_device.h"
23 #include "platform_specific.h"
24 #include "process_system_api_adapter_impl.h"
25 #include "single_ver_data_packet.h"
26 #include "virtual_communicator_aggregator.h"
27 
28 using namespace testing::ext;
29 using namespace DistributedDB;
30 using namespace DistributedDBUnitTest;
31 using namespace std;
32 
33 namespace {
34     string g_testDir;
35     const string STORE_ID = "kv_stroe_sync_check_test";
36     const std::string DEVICE_B = "deviceB";
37     const std::string DEVICE_C = "deviceC";
38     const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
39     const int EIGHT_HUNDRED = 800;
40     const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
41     const int TWO_CNT = 2;
42     const int SLEEP_MILLISECONDS = 500;
43     const int TEN_SECONDS = 10;
44     const int THREE_HUNDRED = 300;
45     const int WAIT_30_SECONDS = 30000;
46     const int WAIT_40_SECONDS = 40000;
47     const int TIMEOUT_6_SECONDS = 6000;
48 
49     KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
50     KvStoreConfig g_config;
51     DistributedDBToolsUnitTest g_tool;
52     DBStatus g_kvDelegateStatus = INVALID_ARGS;
53     KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
55     KvVirtualDevice* g_deviceB = nullptr;
56     KvVirtualDevice* g_deviceC = nullptr;
57     VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
58     VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
59 
60     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
61     auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62         placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
63 #ifndef LOW_LEVEL_MEM_DEV
64     const int KEY_LEN = 20; // 20 Bytes
65     const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
66     const int ENTRY_NUM = 2; // 16 entries
67 #endif
68 
69 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
70 public:
71     static void SetUpTestCase(void);
72     static void TearDownTestCase(void);
73     void SetUp();
74     void TearDown();
75 };
76 
SetUpTestCase(void)77 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
78 {
79     /**
80      * @tc.setup: Init datadir and Virtual Communicator.
81      */
82     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
83     g_config.dataDir = g_testDir;
84     g_mgr.SetKvStoreConfig(g_config);
85 
86     string dir = g_testDir + "/single_ver";
87     DIR* dirTmp = opendir(dir.c_str());
88     if (dirTmp == nullptr) {
89         OS::MakeDBDirectory(dir);
90     } else {
91         closedir(dirTmp);
92     }
93 
94     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
95     ASSERT_TRUE(g_communicatorAggregator != nullptr);
96     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
97 
98     std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
99     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
100 }
101 
TearDownTestCase(void)102 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
103 {
104     /**
105      * @tc.teardown: Release virtual Communicator and clear data dir.
106      */
107     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
108         LOGE("rm test db files error!");
109     }
110     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
111     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
112 }
113 
SetUp(void)114 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
115 {
116     DistributedDBToolsUnitTest::PrintTestCaseInfo();
117     /**
118      * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
119      */
120     KvStoreNbDelegate::Option option;
121     option.secOption.securityLabel = SecurityLabel::S3;
122     option.secOption.securityFlag = SecurityFlag::SECE;
123     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
124     ASSERT_TRUE(g_kvDelegateStatus == OK);
125     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
126     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
127     ASSERT_TRUE(g_deviceB != nullptr);
128     g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
129     ASSERT_TRUE(g_syncInterfaceB != nullptr);
130     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
131     SecurityOption virtualOption;
132     virtualOption.securityLabel = option.secOption.securityLabel;
133     virtualOption.securityFlag = option.secOption.securityFlag;
134     g_syncInterfaceB->SetSecurityOption(virtualOption);
135 
136     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
137     ASSERT_TRUE(g_deviceC != nullptr);
138     g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
139     ASSERT_TRUE(g_syncInterfaceC != nullptr);
140     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
141     g_syncInterfaceC->SetSecurityOption(virtualOption);
142     RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
143 }
144 
TearDown(void)145 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
146 {
147     /**
148      * @tc.teardown: Release device A, B, C
149      */
150     if (g_kvDelegatePtr != nullptr) {
151         ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
152         g_kvDelegatePtr = nullptr;
153         DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
154         LOGD("delete kv store status %d", status);
155         ASSERT_TRUE(status == OK);
156     }
157     if (g_deviceB != nullptr) {
158         delete g_deviceB;
159         g_deviceB = nullptr;
160     }
161     if (g_deviceC != nullptr) {
162         delete g_deviceC;
163         g_deviceC = nullptr;
164     }
165     if (g_communicatorAggregator != nullptr) {
166         g_communicatorAggregator->RegOnDispatch(nullptr);
167     }
168 }
169 
170 /**
171  * @tc.name: sec option check Sync 001
172  * @tc.desc: if sec option not equal, forbid sync
173  * @tc.type: FUNC
174  * @tc.require: AR000EV1G6
175  * @tc.author: wangchuanqing
176  */
177 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
178 {
179     DBStatus status = OK;
180     std::vector<std::string> devices;
181     devices.push_back(g_deviceB->GetDeviceId());
182     devices.push_back(g_deviceC->GetDeviceId());
183 
184     /**
185      * @tc.steps: step1. deviceA put {k1, v1}
186      */
187     Key key = {'1'};
188     Value value = {'1'};
189     status = g_kvDelegatePtr->Put(key, value);
190     ASSERT_TRUE(status == OK);
191 
192     ASSERT_TRUE(g_syncInterfaceB != nullptr);
193     ASSERT_TRUE(g_syncInterfaceC != nullptr);
194     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
195     g_syncInterfaceB->SetSecurityOption(secOption);
196     g_syncInterfaceC->SetSecurityOption(secOption);
197 
198     /**
199      * @tc.steps: step2. deviceA call sync and wait
200      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
201      */
202     std::map<std::string, DBStatus> result;
203     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
204     ASSERT_TRUE(status == OK);
205 
206     ASSERT_TRUE(result.size() == devices.size());
207     for (const auto &pair : result) {
208         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
209         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
210     }
211     VirtualDataItem item;
212     g_deviceB->GetData(key, item);
213     EXPECT_TRUE(item.value.empty());
214     g_deviceC->GetData(key, item);
215     EXPECT_TRUE(item.value.empty());
216 }
217 
218 /**
219  * @tc.name: sec option check Sync 002
220  * @tc.desc: if sec option not equal, forbid sync
221  * @tc.type: FUNC
222  * @tc.require: AR000EV1G6
223  * @tc.author: wangchuanqing
224  */
225 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
226 {
227     DBStatus status = OK;
228     std::vector<std::string> devices;
229     devices.push_back(g_deviceB->GetDeviceId());
230     devices.push_back(g_deviceC->GetDeviceId());
231 
232     /**
233      * @tc.steps: step1. deviceA put {k1, v1}
234      */
235     Key key = {'1'};
236     Value value = {'1'};
237     status = g_kvDelegatePtr->Put(key, value);
238     ASSERT_TRUE(status == OK);
239 
240     ASSERT_TRUE(g_syncInterfaceC != nullptr);
241     SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
242     g_syncInterfaceC->SetSecurityOption(secOption);
243     secOption.securityLabel = SecurityLabel::S3;
244     secOption.securityFlag = SecurityFlag::SECE;
245     g_syncInterfaceB->SetSecurityOption(secOption);
246 
247     /**
248      * @tc.steps: step2. deviceA call sync and wait
249      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
250      */
251     std::map<std::string, DBStatus> result;
252     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
253     ASSERT_TRUE(status == OK);
254 
255     ASSERT_TRUE(result.size() == devices.size());
256     for (const auto &pair : result) {
257         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
258         if (pair.first == DEVICE_B) {
259             EXPECT_TRUE(pair.second == OK);
260         } else {
261             EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
262         }
263     }
264     VirtualDataItem item;
265     g_deviceC->GetData(key, item);
266     EXPECT_TRUE(item.value.empty());
267     g_deviceB->GetData(key, item);
268     EXPECT_TRUE(item.value == value);
269 }
270 
271 /**
272  * @tc.name: sec option check Sync 003
273  * @tc.desc: if sec option equal, check not pass, forbid sync
274  * @tc.type: FUNC
275  * @tc.require: AR000EV1G6
276  * @tc.author: zhangqiquan
277  */
278 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck003, TestSize.Level1)
279 {
280     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
281     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0202(const std::string &, const SecurityOption &) 282     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
283         return false;
284     });
285     /**
286      * @tc.steps: step1. record packet
287      * @tc.expected: step1. sync should failed in source.
288      */
289     std::atomic<int> messageCount = 0;
__anon6e997c5f0302(const std::string &, Message *) 290     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &, Message *) {
291         messageCount++;
292     });
293     /**
294      * @tc.steps: step2. deviceA call sync and wait
295      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
296      */
297     DBStatus status = OK;
298     std::vector<std::string> devices;
299     devices.push_back(g_deviceB->GetDeviceId());
300     std::map<std::string, DBStatus> result;
301     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
302     EXPECT_EQ(status, OK);
303     EXPECT_EQ(messageCount, 4); // 4 = 2 time sync + 2 ability sync
304     for (const auto &pair : result) {
305         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
306         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
307     }
308     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
309     g_communicatorAggregator->RegOnDispatch(nullptr);
310 }
311 
312 /**
313  * @tc.name: sec option check Sync 004
314  * @tc.desc: memory db not check device security
315  * @tc.type: FUNC
316  * @tc.require:
317  * @tc.author: zhangqiquan
318  */
319 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck004, TestSize.Level1)
320 {
321     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
322     g_kvDelegatePtr = nullptr;
323     KvStoreNbDelegate::Option option;
324     option.secOption.securityLabel = SecurityLabel::NOT_SET;
325     option.isMemoryDb = true;
326     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
327     ASSERT_TRUE(g_kvDelegateStatus == OK);
328     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
329 
330     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
331     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0402(const std::string &, const SecurityOption &) 332     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
333         return false;
334     });
__anon6e997c5f0502(const std::string &, SecurityOption &securityOption) 335     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
336         securityOption.securityLabel = NOT_SET;
337         return OK;
338     });
__anon6e997c5f0602(SecurityOption &) 339     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &) {
340         return -E_NOT_SUPPORT;
341     });
342 
343     std::vector<std::string> devices;
344     devices.push_back(g_deviceB->GetDeviceId());
345     std::map<std::string, DBStatus> result;
346     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
347     EXPECT_EQ(status, OK);
348     for (const auto &pair : result) {
349         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
350         EXPECT_TRUE(pair.second == OK);
351     }
352 
353     adapter->ForkCheckDeviceSecurityAbility(nullptr);
354     adapter->ForkGetSecurityOption(nullptr);
355     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
356 }
357 
358 /**
359  * @tc.name: sec option check Sync 005
360  * @tc.desc: if sec option equal, check not pass, forbid sync
361  * @tc.type: FUNC
362  * @tc.require:
363  * @tc.author: zhangqiquan
364  */
365 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck005, TestSize.Level1)
366 {
367     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
368     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0702(SecurityOption &option) 369     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
370         option.securityLabel = NOT_SET;
371         return E_OK;
372     });
__anon6e997c5f0802(const std::string &, SecurityOption &securityOption) 373     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
374         securityOption.securityLabel = NOT_SET;
375         return OK;
376     });
377 
378     std::vector<std::string> devices;
379     devices.push_back(g_deviceB->GetDeviceId());
380     std::map<std::string, DBStatus> result;
381     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
382     EXPECT_EQ(status, OK);
383     for (const auto &pair : result) {
384         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
385         EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
386     }
387 
388     adapter->ForkCheckDeviceSecurityAbility(nullptr);
389     adapter->ForkGetSecurityOption(nullptr);
390     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
391 }
392 
393 /**
394  * @tc.name: sec option check Sync 006
395  * @tc.desc: memory db not check device security
396  * @tc.type: FUNC
397  * @tc.require:
398  * @tc.author: zhangqiquan
399  */
400 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck006, TestSize.Level0)
401 {
402     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
403     ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
404     g_kvDelegatePtr = nullptr;
405     KvStoreNbDelegate::Option option;
406     option.secOption.securityLabel = SecurityLabel::S1;
407     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
408     ASSERT_TRUE(g_kvDelegateStatus == OK);
409     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
410 
411     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
412     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0902(const std::string &, const SecurityOption &) 413     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
414         return true;
415     });
__anon6e997c5f0a02(const std::string &, SecurityOption &securityOption) 416     adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
417         securityOption.securityLabel = S1;
418         return OK;
419     });
__anon6e997c5f0b02(SecurityOption &option) 420     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
421         option.securityLabel = SecurityLabel::S0;
422         return E_OK;
423     });
424 
425     std::vector<std::string> devices;
426     devices.push_back(g_deviceB->GetDeviceId());
427     std::map<std::string, DBStatus> result;
428     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
429     EXPECT_EQ(status, OK);
430     for (const auto &pair : result) {
431         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
432         EXPECT_TRUE(pair.second == OK);
433     }
434 
435     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
436     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
437 }
438 
439 /**
440  * @tc.name: sec option check Sync 007
441  * @tc.desc: sync should send security option
442  * @tc.type: FUNC
443  * @tc.require:
444  * @tc.author: zhangqiquan
445  */
446 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck007, TestSize.Level0)
447 {
448     /**
449      * @tc.steps: step1. fork check device security ability
450      * @tc.expected: step1. check param option should be S3 SECE.
451      */
452     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
453     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0c02(const std::string &, const SecurityOption &option) 454     adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &option) {
455         EXPECT_EQ(option.securityLabel, SecurityLabel::S3);
456         EXPECT_EQ(option.securityFlag, SecurityFlag::SECE);
457         return true;
458     });
459     /**
460      * @tc.steps: step2. sync twice
461      * @tc.expected: step2. sync success.
462      */
463     std::vector<std::string> devices;
464     devices.push_back(g_deviceB->GetDeviceId());
465     std::map<std::string, DBStatus> result;
466     g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
467     auto status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
468     ASSERT_TRUE(status == OK);
469     ASSERT_TRUE(result.size() == devices.size());
470     for (const auto &pair : result) {
471         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
472         EXPECT_TRUE(pair.second == OK);
473     }
474     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
475 }
476 
477 /**
478  * @tc.name: SecOptionCheck008
479  * @tc.desc: pull compress sync when check device ability fail
480  * @tc.type: FUNC
481  * @tc.require:
482  * @tc.author: zhangqiquan
483  */
484 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck008, TestSize.Level0)
485 {
486     auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
487     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
488     auto deviceB = g_deviceB->GetDeviceId();
__anon6e997c5f0d02(const std::string &dev, const SecurityOption &) 489     adapter->ForkCheckDeviceSecurityAbility([deviceB](const std::string &dev, const SecurityOption &) {
490         if (dev != "real_device") {
491             return true;
492         }
493         return false;
494     });
__anon6e997c5f0e02(SecurityOption &option) 495     g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
496         option.securityLabel = SecurityLabel::S3;
497         option.securityFlag = SecurityFlag::SECE;
498         return E_OK;
499     });
500     g_syncInterfaceB->SetCompressSync(true);
501     std::vector<std::string> devices;
502     devices.push_back(deviceB);
503     std::map<std::string, DBStatus> result;
504     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
505     EXPECT_EQ(status, OK);
506     for (const auto &pair : result) {
507         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
508         EXPECT_EQ(pair.second, SECURITY_OPTION_CHECK_ERROR);
509     }
510 
511     RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
512     g_syncInterfaceB->ForkGetSecurityOption(nullptr);
513     g_syncInterfaceB->SetCompressSync(false);
514 }
515 
516 #ifndef LOW_LEVEL_MEM_DEV
517 /**
518  * @tc.name: BigDataSync001
519  * @tc.desc: big data sync push mode.
520  * @tc.type: FUNC
521  * @tc.require: AR000F3OOU
522  * @tc.author: wangchuanqing
523  */
524 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
525 {
526     DBStatus status = OK;
527     std::vector<std::string> devices;
528     devices.push_back(g_deviceB->GetDeviceId());
529     devices.push_back(g_deviceC->GetDeviceId());
530 
531     /**
532      * @tc.steps: step1. deviceA put 16 bigData
533      */
534     std::vector<Entry> entries;
535     std::vector<Key> keys;
536     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
537     for (const auto &entry : entries) {
538         status = g_kvDelegatePtr->Put(entry.key, entry.value);
539         ASSERT_TRUE(status == OK);
540     }
541 
542     /**
543      * @tc.steps: step2. deviceA call sync and wait
544      * @tc.expected: step2. sync should return OK.
545      */
546     std::map<std::string, DBStatus> result;
547     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
548     ASSERT_TRUE(status == OK);
549 
550     /**
551      * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
552      */
553     ASSERT_TRUE(result.size() == devices.size());
554     for (const auto &pair : result) {
555         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
556         EXPECT_TRUE(pair.second == OK);
557     }
558     VirtualDataItem item;
559     for (const auto &entry : entries) {
560         item.value.clear();
561         g_deviceB->GetData(entry.key, item);
562         EXPECT_TRUE(item.value == entry.value);
563         item.value.clear();
564         g_deviceC->GetData(entry.key, item);
565         EXPECT_TRUE(item.value == entry.value);
566     }
567 }
568 
569 /**
570  * @tc.name: BigDataSync002
571  * @tc.desc: big data sync pull mode.
572  * @tc.type: FUNC
573  * @tc.require: AR000F3OOU
574  * @tc.author: wangchuanqing
575  */
576 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
577 {
578     DBStatus status = OK;
579     std::vector<std::string> devices;
580     devices.push_back(g_deviceB->GetDeviceId());
581     devices.push_back(g_deviceC->GetDeviceId());
582 
583     /**
584      * @tc.steps: step1. deviceA deviceB put bigData
585      */
586     std::vector<Entry> entries;
587     std::vector<Key> keys;
588     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
589 
590     for (uint32_t i = 0; i < entries.size(); i++) {
591         if (i % 2 == 0) {
592             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
593         } else {
594             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
595         }
596     }
597 
598     /**
599      * @tc.steps: step3. deviceA call pull sync
600      * @tc.expected: step3. sync should return OK.
601      */
602     std::map<std::string, DBStatus> result;
603     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
604     ASSERT_TRUE(status == OK);
605 
606     /**
607      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
608      */
609     ASSERT_TRUE(result.size() == devices.size());
610     for (const auto &pair : result) {
611         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
612         EXPECT_TRUE(pair.second == OK);
613     }
614     for (const auto &entry : entries) {
615         Value value;
616         EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
617         EXPECT_EQ(value, entry.value);
618     }
619 }
620 
621 /**
622  * @tc.name: BigDataSync003
623  * @tc.desc: big data sync pushAndPull mode.
624  * @tc.type: FUNC
625  * @tc.require: AR000F3OOV
626  * @tc.author: wangchuanqing
627  */
628 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
629 {
630     DBStatus status = OK;
631     std::vector<std::string> devices;
632     devices.push_back(g_deviceB->GetDeviceId());
633     devices.push_back(g_deviceC->GetDeviceId());
634 
635     /**
636      * @tc.steps: step1. deviceA deviceB put bigData
637      */
638     std::vector<Entry> entries;
639     std::vector<Key> keys;
640     DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
641 
642     for (uint32_t i = 0; i < entries.size(); i++) {
643         if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
644             g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
645         } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
646             g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
647         } else { // 2 5 8 11 14 for device A
648             status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
649             ASSERT_TRUE(status == OK);
650         }
651     }
652 
653     /**
654      * @tc.steps: step3. deviceA call pushAndpull sync
655      * @tc.expected: step3. sync should return OK.
656      */
657     std::map<std::string, DBStatus> result;
658     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
659     ASSERT_TRUE(status == OK);
660 
661     /**
662      * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
663      * deviceB and deviceC has deviceA data
664      */
665     ASSERT_TRUE(result.size() == devices.size());
666     for (const auto &pair : result) {
667         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
668         EXPECT_TRUE(pair.second == OK);
669     }
670 
671     VirtualDataItem item;
672     for (uint32_t i = 0; i < entries.size(); i++) {
673         Value value;
674         EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
675         EXPECT_EQ(value, entries[i].value);
676 
677         if (i % 3 == 2) { // 2 5 8 11 14 for device A
678         item.value.clear();
679         g_deviceB->GetData(entries[i].key, item);
680         EXPECT_TRUE(item.value == entries[i].value);
681         item.value.clear();
682         g_deviceC->GetData(entries[i].key, item);
683         EXPECT_TRUE(item.value == entries[i].value);
684         }
685     }
686 }
687 #endif
688 
689 /**
690  * @tc.name: PushFinishedNotify 001
691  * @tc.desc: Test remote device push finished notify function.
692  * @tc.type: FUNC
693  * @tc.require: AR000CQS3S
694  * @tc.author: xushaohua
695  */
696 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
697 {
698     std::vector<std::string> devices;
699     devices.push_back(g_deviceB->GetDeviceId());
700 
701     /**
702      * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
703      * @tc.expected: step1. set should return OK.
704      */
705     int pushfinishedFlag = 0;
706     DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anon6e997c5f0f02(const RemotePushNotifyInfo &info) 707         [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
708             EXPECT_TRUE(info.deviceId == DEVICE_B);
709             pushfinishedFlag = 1;
710     });
711     ASSERT_EQ(status, OK);
712 
713     /**
714      * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
715      * @tc.expected: step2. deviceA can not receive push finished notify
716      */
717     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
718     std::map<std::string, DBStatus> result;
719     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
720     EXPECT_TRUE(status == OK);
721     EXPECT_EQ(pushfinishedFlag, 0);
722     pushfinishedFlag = 0;
723 
724     /**
725      * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
726      * @tc.expected: step3. deviceA can not receive push finished notify
727      */
728     EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
729     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
730     EXPECT_TRUE(status == OK);
731     EXPECT_EQ(pushfinishedFlag, 0);
732     pushfinishedFlag = 0;
733 
734     /**
735      * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
736      * @tc.expected: step4. set should return OK.
737      */
__anon6e997c5f1002(const RemotePushNotifyInfo &info) 738     status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
739         EXPECT_TRUE(info.deviceId == DEVICE_B);
740         pushfinishedFlag = 2;
741     });
742     ASSERT_EQ(status, OK);
743 
744     /**
745      * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
746      * @tc.expected: step5. set should return OK.
747      */
748     status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
749     ASSERT_EQ(status, OK);
750 }
751 
752 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)753 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
754 {
755     // just delay the busy ack
756     g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
757         if (dev != g_deviceB->GetDeviceId()) {
758             return;
759         }
760         auto *packet = inMsg->GetObject<DataAckPacket>();
761         if (packet != nullptr && packet->GetRecvCode() == -E_BUSY) {
762             errCodeAck = true;
763             while (!afterErrAck) {
764             }
765             LOGW("NOW SEND BUSY ACK");
766         } else if (errCodeAck) {
767             afterErrAck = true;
768             std::this_thread::sleep_for(std::chrono::seconds(1));
769         }
770     });
771 }
772 
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)773 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
774 {
775     g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
776                                                 const std::string &dev, Message *inMsg) {
777         auto *packet = inMsg->GetObject<DataAckPacket>();
778         if (dev != DEVICE_B) {
779             if (packet != nullptr && (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT)) {
780                 offlineFlag = true;
781                 conditionOffline.notify_all();
782                 LOGW("[Dispatch] NOTIFY OFFLINE");
783                 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
784             }
785         } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
786             LOGW("[Dispatch] NOW INVALID THIS MSG");
787             inMsg->SetMessageType(TYPE_INVALID);
788             inMsg->SetMessageId(INVALID_MESSAGE_ID);
789             invalid = true;
790         }
791     });
792 }
793 
RegOnDispatchWithInvalidMsg(bool & invalid)794 void RegOnDispatchWithInvalidMsg(bool &invalid)
795 {
796     g_communicatorAggregator->RegOnDispatch([&invalid](
797         const std::string &dev, Message *inMsg) {
798         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
799             LOGW("[Dispatch] NOW INVALID THIS MSG");
800             inMsg->SetMessageType(TYPE_INVALID);
801             inMsg->SetMessageId(INVALID_MESSAGE_ID);
802             invalid = true;
803         }
804     });
805 }
806 
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)807 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
808 {
809     /**
810      * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
811      * @tc.expected: step1. should return OK.
812      */
813     Value value = {'1'};
814     std::map<std::string, DBStatus> result;
815     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
816 
817     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
818     EXPECT_TRUE(status == OK);
819     ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
820 }
821 
Sync(KvStoreNbDelegate * kvDelegatePtr,vector<std::string> & devices,const DBStatus & targetStatus)822 void Sync(KvStoreNbDelegate *kvDelegatePtr, vector<std::string> &devices, const DBStatus &targetStatus)
823 {
824     std::map<std::string, DBStatus> result;
825     DBStatus status = g_tool.SyncTest(kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
826     EXPECT_TRUE(status == OK);
827     for (const auto &deviceId : devices) {
828         ASSERT_TRUE(result[deviceId] == targetStatus);
829     }
830 }
831 
Sync(vector<std::string> & devices,const DBStatus & targetStatus)832 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
833 {
834     Sync(g_kvDelegatePtr, devices, targetStatus);
835 }
836 
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)837 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
838     const DBStatus &targetStatus)
839 {
840     std::map<std::string, DBStatus> result;
841     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
842     EXPECT_TRUE(status == OK);
843     for (const auto &deviceId : devices) {
844         if (targetStatus == COMM_FAILURE) {
845             // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
846             // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
847             // The returned status is COMM_FAILURE.
848             // If syncTaskContext of deviceB is not executed first, the error code is transparently transmitted.
849             EXPECT_TRUE((result[deviceId] == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
850                 (result[deviceId] == COMM_FAILURE));
851         } else {
852             ASSERT_EQ(result[deviceId], targetStatus);
853         }
854     }
855 }
856 
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)857 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
858 {
859     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
860 }
861 
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)862 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
863 {
864     Value value = {'2'};
865     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
866 
867     /**
868      * @tc.steps: step2. invalid the sync msg
869      * @tc.expected: step2. should return TIME_OUT.
870      */
871     SyncWithQuery(devices, query, TIME_OUT);
872 
873     /**
874      * @tc.steps: step3. device offline when sync
875      * @tc.expected: step3. should return COMM_FAILURE.
876      */
877     // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
878     // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
879     // The returned status is COMM_FAILURE
880     SyncWithQuery(devices, query, COMM_FAILURE);
881 }
882 
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)883 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
884 {
885     /**
886      * @tc.steps: step1. prepare data
887      */
888     devices.push_back(g_deviceB->GetDeviceId());
889     g_deviceB->Online();
890 
891     Key key = {'1'};
892     query = Query::Select().PrefixKey(key);
893     PrepareEnv(devices, key, query);
894 
895     /**
896      * @tc.steps: step2. query sync and set queryWaterMark
897      * @tc.expected: step2. should return OK.
898      */
899     Value value = {'2'};
900     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
901     SyncWithQuery(devices, query, OK);
902 
903     /**
904      * @tc.steps: step3. sync and invalid msg for set local device waterMark
905      * @tc.expected: step3. should return TIME_OUT.
906      */
907     bool invalidMsg = false;
908     RegOnDispatchWithInvalidMsg(invalidMsg);
909     value = {'3'};
910     ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
911     Sync(devices, TIME_OUT);
912     g_communicatorAggregator->RegOnDispatch(nullptr);
913 }
914 
RegOnDispatchWithoutDataPacket(std::atomic<int> & messageCount,bool calResponse=false)915 void RegOnDispatchWithoutDataPacket(std::atomic<int> &messageCount, bool calResponse = false)
916 {
917     g_communicatorAggregator->RegOnDispatch([calResponse, &messageCount](const std::string &dev, Message *msg) {
918         if (msg->GetMessageId() != TIME_SYNC_MESSAGE && msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
919             return;
920         }
921         if (dev != DEVICE_B || (!calResponse && msg->GetMessageType() != TYPE_REQUEST)) {
922             return;
923         }
924         messageCount++;
925     });
926 }
927 
ReOpenDB()928 void ReOpenDB()
929 {
930     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
931     g_kvDelegatePtr = nullptr;
932     KvStoreNbDelegate::Option option;
933     option.secOption.securityLabel = SecurityLabel::S3;
934     option.secOption.securityFlag = SecurityFlag::SECE;
935     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
936     ASSERT_TRUE(g_kvDelegateStatus == OK);
937     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
938 }
939 }
940 
941 /**
942  * @tc.name: AckSessionCheck 001
943  * @tc.desc: Test ack session check function.
944  * @tc.type: FUNC
945  * @tc.require: AR000F3OOV
946  * @tc.author: zhangqiquan
947  */
948 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
949 {
950     std::vector<std::string> devices;
951     devices.push_back(g_deviceB->GetDeviceId());
952 
953     /**
954      * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
955      * @tc.expected: step1. should return OK.
956      */
957     ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
958 
959     /**
960      * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
961      * @tc.expected: step2. should return OK.
962      */
963     ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
964 
965     bool errCodeAck = false;
966     bool afterErrAck = false;
967     RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
968 
969     Key key = {'1'};
970     Value value = {'1'};
971     Timestamp currentTime;
972     (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
973     EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == E_OK);
974     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
975 
976     Value outValue;
977     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
978 
979     /**
980      * @tc.steps: step3. release the writeHandle and try again, sync success
981      * @tc.expected: step3. should return OK.
982      */
983     EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
984     EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
985 
986     EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == E_OK);
987     EXPECT_EQ(outValue, value);
988 }
989 
990 /**
991  * @tc.name: AckSafeCheck001
992  * @tc.desc: Test ack session check filter all bad ack in device offline scene.
993  * @tc.type: FUNC
994  * @tc.require: AR000F3OOV
995  * @tc.author: zhangqiquan
996  */
997 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
998 {
999     std::vector<std::string> devices;
1000     devices.push_back(g_deviceB->GetDeviceId());
1001     g_deviceB->Online();
1002 
1003     Key key = {'1'};
1004     Query query = Query::Select().PrefixKey(key);
1005     PrepareEnv(devices, key, query);
1006 
1007     std::condition_variable conditionOnline;
1008     std::condition_variable conditionOffline;
1009     bool onlineFlag = false;
1010     bool invalid = false;
1011     bool offlineFlag = false;
__anon6e997c5f1602() 1012     thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
1013         LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
1014         std::mutex offlineMtx;
1015         std::unique_lock<std::mutex> lck(offlineMtx);
1016         conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
1017         g_deviceB->Offline();
1018         std::this_thread::sleep_for(std::chrono::seconds(1));
1019         g_deviceB->Online();
1020         onlineFlag = true;
1021         conditionOnline.notify_all();
1022         LOGW("[Dispatch] NOW DEVICES IS ONLINE");
1023     });
1024     subThread.detach();
1025 
1026     RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
1027 
1028     SyncWithDeviceOffline(devices, key, query);
1029 
1030     std::mutex onlineMtx;
1031     std::unique_lock<std::mutex> lck(onlineMtx);
__anon6e997c5f1802null1032     conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
1033 
1034     /**
1035      * @tc.steps: step4. sync again if has problem it will sync never end
1036      * @tc.expected: step4. should return OK.
1037      */
1038     SyncWithQuery(devices, query, OK);
1039 }
1040 
1041 /**
1042  * @tc.name: WaterMarkCheck001
1043  * @tc.desc: Test waterMark work correct in lost package scene.
1044  * @tc.type: FUNC
1045  * @tc.require: AR000F3OOV
1046  * @tc.author: zhangqiquan
1047  */
1048 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
1049 {
1050     std::vector<std::string> devices;
1051     Query query = Query::Select();
1052     PrepareWaterMarkError(devices, query);
1053 
1054     /**
1055      * @tc.steps: step4. sync again see it work correct
1056      * @tc.expected: step4. should return OK.
1057      */
1058     SyncWithQuery(devices, query, OK);
1059 }
1060 
1061 /**
1062  * @tc.name: WaterMarkCheck002
1063  * @tc.desc: Test pull work correct in error waterMark scene.
1064  * @tc.type: FUNC
1065  * @tc.require: AR000F3OOV
1066  * @tc.author: zhangqiquan
1067  */
1068 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
1069 {
1070     std::vector<std::string> devices;
1071     Query query = Query::Select();
1072     PrepareWaterMarkError(devices, query);
1073 
1074     /**
1075      * @tc.steps: step4. sync again see it work correct
1076      * @tc.expected: step4. should return OK.
1077      */
1078     Key key = {'2'};
1079     ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
1080     query = Query::Select();
1081     SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
1082 
1083     VirtualDataItem item;
1084     EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1085 }
1086 
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)1087 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
1088 {
1089     g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
1090             const std::string &dev, Message *inMsg) {
1091         if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
1092             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1093             sendRequestCount++;
1094             LOGD("sendRequestCount++...");
1095         }
1096     });
1097 }
1098 
TestDifferentSyncMode(SyncMode mode)1099 void TestDifferentSyncMode(SyncMode mode)
1100 {
1101     std::vector<std::string> devices;
1102     devices.push_back(g_deviceB->GetDeviceId());
1103 
1104     /**
1105      * @tc.steps: step1. deviceA put {k1, v1}
1106      */
1107     Key key = {'1'};
1108     Value value = {'1'};
1109     DBStatus status = g_kvDelegatePtr->Put(key, value);
1110     ASSERT_TRUE(status == OK);
1111 
1112     int sendRequestCount = 0;
1113     RegOnDispatchToGetSyncCount(sendRequestCount);
1114 
1115     /**
1116      * @tc.steps: step2. deviceA call sync and wait
1117      * @tc.expected: step2. sync should return OK.
1118      */
1119     std::map<std::string, DBStatus> result;
1120     status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
1121     ASSERT_TRUE(status == OK);
1122 
1123     /**
1124      * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
1125      */
1126     ASSERT_TRUE(result.size() == devices.size());
1127     for (const auto &pair : result) {
1128         LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1129         EXPECT_TRUE(pair.second == OK);
1130     }
1131     VirtualDataItem item;
1132     g_deviceB->GetData(key, item);
1133     EXPECT_TRUE(item.value == value);
1134 
1135     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1136 
1137     /**
1138      * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
1139      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
1140      * be skipped
1141      */
1142     sendRequestCount = 0;
1143     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1144     ASSERT_TRUE(status == OK);
1145     EXPECT_EQ(sendRequestCount, 1);
1146 }
1147 
1148 /**
1149  * @tc.name: PushSyncMergeCheck001
1150  * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
1151  * at the same time.
1152  * @tc.type: FUNC
1153  * @tc.require: AR000F3OOV
1154  * @tc.author: zhangshijie
1155  */
1156 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
1157 {
1158     TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
1159 }
1160 
1161 /**
1162  * @tc.name: PushSyncMergeCheck002
1163  * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
1164  * at the same time.
1165  * @tc.type: FUNC
1166  * @tc.require: AR000F3OOV
1167  * @tc.author: zhangshijie
1168  */
1169 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
1170 {
1171     TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
1172 }
1173 
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)1174 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
1175 {
1176     /**
1177      * @tc.steps: step1. deviceA put {k1, v1}
1178      */
1179     Key key = {'1'};
1180     Value value = {'1'};
1181     DBStatus status = g_kvDelegatePtr->Put(key, value);
1182     ASSERT_TRUE(status == OK);
1183 
1184     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1185 
1186     /**
1187      * @tc.steps: step2. deviceA call sync and don't wait
1188      * @tc.expected: step2. sync should return OK.
1189      */
1190     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
1191         [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
1192         ASSERT_TRUE(statusMap.size() == devices.size());
1193         for (const auto &pair : statusMap) {
1194             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1195             EXPECT_TRUE(pair.second == OK);
1196         }
1197         VirtualDataItem item;
1198         g_deviceB->GetData(key, item);
1199         EXPECT_EQ(item.value, value);
1200         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1201 
1202         // reset sendRequestCount to 0
1203         sendRequestCount = 0;
1204     });
1205     ASSERT_TRUE(status == OK);
1206 }
1207 
1208 /**
1209  * @tc.name: PushSyncMergeCheck003
1210  * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
1211  * @tc.type: FUNC
1212  * @tc.require: AR000F3OOV
1213  * @tc.author: zhangshijie
1214  */
1215 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
1216 {
1217     DBStatus status = OK;
1218     std::vector<std::string> devices;
1219     devices.push_back(g_deviceB->GetDeviceId());
1220 
1221     int sendRequestCount = 0;
1222     PrepareForSyncMergeTest(devices, sendRequestCount);
1223 
1224     /**
1225      * @tc.steps: step3. deviceA call sync and don't wait
1226      * @tc.expected: step3. sync should return OK.
1227      */
1228     Key key = {'1'};
1229     Value value = {'2'};
1230     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f1b02(const std::map<std::string, DBStatus>& statusMap) 1231         [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
1232         /**
1233          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1234          * skipped, but it is no need to do time sync and ability sync, only need to do data sync
1235          */
1236         ASSERT_TRUE(statusMap.size() == devices.size());
1237         for (const auto &pair : statusMap) {
1238             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1239             EXPECT_TRUE(pair.second == OK);
1240         }
1241         VirtualDataItem item;
1242         g_deviceB->GetData(key, item);
1243         EXPECT_EQ(item.value, value);
1244     });
1245     ASSERT_TRUE(status == OK);
1246 
1247     /**
1248      * @tc.steps: step4. deviceA put {k1, v2}
1249      */
1250     while (sendRequestCount < TWO_CNT) {
1251         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1252     }
1253     status = g_kvDelegatePtr->Put(key, value);
1254     ASSERT_TRUE(status == OK);
1255     // wait for the second sync task finish
1256     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1257     EXPECT_EQ(sendRequestCount, 1);
1258 }
1259 
1260 /**
1261  * @tc.name: PushSyncMergeCheck004
1262  * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
1263  * @tc.type: FUNC
1264  * @tc.require: AR000F3OOV
1265  * @tc.author: zhangshijie
1266  */
1267 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
1268 {
1269     DBStatus status = OK;
1270     std::vector<std::string> devices;
1271     devices.push_back(g_deviceB->GetDeviceId());
1272 
1273     int sendRequestCount = 0;
1274     PrepareForSyncMergeTest(devices, sendRequestCount);
1275 
1276     /**
1277      * @tc.steps: step3. deviceA call sync and don't wait
1278      * @tc.expected: step3. sync should return OK.
1279      */
1280     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f1c02(const std::map<std::string, DBStatus>& statusMap) 1281         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1282         /**
1283          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can  be
1284          * skipped
1285          */
1286         ASSERT_TRUE(statusMap.size() == devices.size());
1287         for (const auto &pair : statusMap) {
1288             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1289             EXPECT_TRUE(pair.second == OK);
1290         }
1291     });
1292     ASSERT_TRUE(status == OK);
1293     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1294     EXPECT_EQ(sendRequestCount, 0);
1295 }
1296 
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1297 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1298 {
1299     g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1300         const std::string &dev, Message *inMsg) {
1301         if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1302             inMsg->SetMessageType(TYPE_INVALID);
1303             inMsg->SetMessageId(INVALID_MESSAGE_ID);
1304             sendRequestCount++;
1305             invalid = true;
1306             LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1307             std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1308         }
1309     });
1310 }
1311 
1312 /**
1313  * @tc.name: PushSyncMergeCheck005
1314  * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1315  * @tc.type: FUNC
1316  * @tc.require: AR000F3OOV
1317  * @tc.author: zhangshijie
1318  */
1319 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1320 {
1321     DBStatus status = OK;
1322     std::vector<std::string> devices;
1323     devices.push_back(g_deviceB->GetDeviceId());
1324 
1325     /**
1326      * @tc.steps: step1. deviceA put {k1, v1}
1327      */
1328     Key key = {'1'};
1329     Value value = {'1'};
1330     status = g_kvDelegatePtr->Put(key, value);
1331     ASSERT_TRUE(status == OK);
1332 
1333     int sendRequestCount = 0;
1334     bool invalid = false;
1335     RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1336 
1337     /**
1338      * @tc.steps: step2. deviceA call sync and don't wait
1339      * @tc.expected: step2. sync should return TIME_OUT.
1340      */
1341     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f1e02(const std::map<std::string, DBStatus>& statusMap) 1342         [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1343         ASSERT_TRUE(statusMap.size() == devices.size());
1344         for (const auto &deviceId : devices) {
1345             ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1346         }
1347     });
1348     EXPECT_TRUE(status == OK);
1349 
1350     /**
1351      * @tc.steps: step3. deviceA call sync and don't wait
1352      * @tc.expected: step3. sync should return OK.
1353      */
1354     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f1f02(const std::map<std::string, DBStatus>& statusMap) 1355         [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1356         /**
1357          * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1358          * skipped, deviceB should have {k1, v1}.
1359          */
1360         ASSERT_TRUE(statusMap.size() == devices.size());
1361         for (const auto &pair : statusMap) {
1362             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1363             EXPECT_EQ(pair.second, OK);
1364         }
1365         VirtualDataItem item;
1366         g_deviceB->GetData(key, item);
1367         EXPECT_EQ(item.value, value);
1368     });
1369     ASSERT_TRUE(status == OK);
1370     while (sendRequestCount < 1) {
1371         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1372     }
1373     sendRequestCount = 0;
1374     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1375 
1376     // wait for the second sync task finish
1377     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1378     EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1379 }
1380 
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1381 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1382     Key &key, Value &value, int &sendRequestCount)
1383 {
1384     DBStatus status = OK;
1385     /**
1386      * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1387      */
1388     Query query = Query::Select().PrefixKey(key);
1389     const int dataSize = 10;
1390     for (int i = 0; i < dataSize; i++) {
1391         key.push_back(i);
1392         value.push_back(i);
1393         status = g_kvDelegatePtr->Put(key, value);
1394         ASSERT_TRUE(status == OK);
1395         key.pop_back();
1396         value.pop_back();
1397     }
1398 
1399     RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1400     /**
1401      * @tc.steps: step2. deviceA call query sync and don't wait
1402      * @tc.expected: step2. sync should return OK.
1403      */
1404     auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1405         (const std::map<std::string, DBStatus>& statusMap) {
1406         ASSERT_TRUE(statusMap.size() == devices.size());
1407         for (const auto &pair : statusMap) {
1408             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1409             EXPECT_EQ(pair.second, OK);
1410         }
1411         // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1412         VirtualDataItem item;
1413         for (int i = 0; i < dataSize; i++) {
1414             key.push_back(i);
1415             value.push_back(i);
1416             g_deviceB->GetData(key, item);
1417             EXPECT_EQ(item.value, value);
1418             key.pop_back();
1419             value.pop_back();
1420         }
1421         EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1422         // reset sendRequestCount to 0
1423         sendRequestCount = 0;
1424     };
1425     if (isQuerySync) {
1426         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1427     } else {
1428         status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1429     }
1430     ASSERT_TRUE(status == OK);
1431 }
1432 
1433 /**
1434  * @tc.name: QuerySyncMergeCheck001
1435  * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1436  * @tc.type: FUNC
1437  * @tc.require: AR000F3OOV
1438  * @tc.author: zhangshijie
1439  */
1440 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1441 {
1442     std::vector<std::string> devices;
1443     int sendRequestCount = 0;
1444     devices.push_back(g_deviceB->GetDeviceId());
1445 
1446     Key key {'1'};
1447     Value value {'1'};
1448     Query query = Query::Select().PrefixKey(key);
1449     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1450 
1451     /**
1452      * @tc.steps: step3. deviceA call query sync and don't wait
1453      * @tc.expected: step3. sync should return OK.
1454      */
1455     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f2102(const std::map<std::string, DBStatus>& statusMap) 1456         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1457         /**
1458          * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1459          * skipped because there is no change in db since last query sync
1460          */
1461         ASSERT_TRUE(statusMap.size() == devices.size());
1462         for (const auto &pair : statusMap) {
1463             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1464             EXPECT_TRUE(pair.second == OK);
1465         }
1466     }, query, false);
1467     ASSERT_TRUE(status == OK);
1468     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1469     EXPECT_EQ(sendRequestCount, 0);
1470 }
1471 
1472 /**
1473  * @tc.name: QuerySyncMergeCheck002
1474  * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1475  * @tc.type: FUNC
1476  * @tc.require: AR000F3OOV
1477  * @tc.author: zhangshijie
1478  */
1479 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1480 {
1481     std::vector<std::string> devices;
1482     int sendRequestCount = 0;
1483     devices.push_back(g_deviceB->GetDeviceId());
1484 
1485     Key key {'1'};
1486     Value value {'1'};
1487     Query query = Query::Select().PrefixKey(key);
1488     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1489 
1490     /**
1491      * @tc.steps: step3. deviceA call query sync and don't wait
1492      * @tc.expected: step3. sync should return OK.
1493      */
1494     Value value3{'3'};
1495     DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f2202(const std::map<std::string, DBStatus>& statusMap) 1496         [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1497         /**
1498          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1499          * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1500          */
1501         ASSERT_TRUE(statusMap.size() == devices.size());
1502         for (const auto &pair : statusMap) {
1503             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1504             EXPECT_TRUE(pair.second == OK);
1505         }
1506         VirtualDataItem item;
1507         g_deviceB->GetData(key, item);
1508         EXPECT_TRUE(item.value == value3);
1509         EXPECT_EQ(sendRequestCount, 1);
1510         }, query, false);
1511     ASSERT_TRUE(status == OK);
1512 
1513     /**
1514      * @tc.steps: step4. deviceA put {k1, v1'}
1515      * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1516      * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1517      * be skipped
1518      */
1519     while (sendRequestCount < TWO_CNT) {
1520         std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1521     }
1522     g_kvDelegatePtr->Put(key, value3);
1523     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1524 }
1525 
1526 /**
1527  * @tc.name: QuerySyncMergeCheck003
1528  * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1529  * @tc.type: FUNC
1530  * @tc.require: AR000F3OOV
1531  * @tc.author: zhangshijie
1532  */
1533 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1534 {
1535     std::vector<std::string> devices;
1536     int sendRequestCount = 0;
1537     devices.push_back(g_deviceB->GetDeviceId());
1538 
1539     Key key {'1'};
1540     Value value {'1'};
1541     PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1542 
1543     /**
1544      * @tc.steps: step3.  deviceA call another query sync
1545      * @tc.expected: step3. sync should return OK.
1546      */
1547     Key key2 = {'2'};
1548     Value value2 = {'2'};
1549     DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1550     ASSERT_TRUE(status == OK);
1551     Query query2 = Query::Select().PrefixKey(key2);
1552     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f2302(const std::map<std::string, DBStatus>& statusMap) 1553         [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1554         /**
1555          * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1556          * skipped, deviceB have {k2,v2}
1557          */
1558         ASSERT_TRUE(statusMap.size() == devices.size());
1559         for (const auto &pair : statusMap) {
1560             LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1561             EXPECT_TRUE(pair.second == OK);
1562         }
1563         VirtualDataItem item;
1564         g_deviceB->GetData(key2, item);
1565         EXPECT_TRUE(item.value == value2);
1566         EXPECT_EQ(sendRequestCount, 1);
1567         }, query2, false);
1568     ASSERT_TRUE(status == OK);
1569     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1570 }
1571 
1572 /**
1573 * @tc.name: QuerySyncMergeCheck004
1574 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1575 * @tc.type: FUNC
1576 * @tc.require: AR000F3OOV
1577 * @tc.author: zhangshijie
1578 */
1579 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1580 {
1581     DBStatus status = OK;
1582     std::vector<std::string> devices;
1583     devices.push_back(g_deviceB->GetDeviceId());
1584 
1585     Key key {'1'};
1586     Value value {'1'};
1587     int sendRequestCount = 0;
1588     PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1589 
1590     /**
1591      * @tc.steps: step3. deviceA call query sync without any change in db
1592      * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1593      */
1594     Query query = Query::Select().PrefixKey(key);
1595     status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f2402(const std::map<std::string, DBStatus>& statusMap) 1596         [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1597             /**
1598              * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1599              * can be skipped because there is no change in db since last push sync
1600              */
1601             ASSERT_TRUE(statusMap.size() == devices.size());
1602             for (const auto &pair : statusMap) {
1603                 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1604                 EXPECT_TRUE(pair.second == OK);
1605             }
1606         }, query, false);
1607     ASSERT_TRUE(status == OK);
1608     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1609     EXPECT_EQ(sendRequestCount, 0);
1610 }
1611 
1612 /**
1613   * @tc.name: GetDataNotify001
1614   * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1615   * @tc.type: FUNC
1616   * @tc.require: AR000D4876
1617   * @tc.author: zhangqiquan
1618   */
1619 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1620 {
1621     ASSERT_NE(g_kvDelegatePtr, nullptr);
1622     DBStatus status = OK;
1623     std::vector<std::string> devices;
1624     devices.push_back(g_deviceB->GetDeviceId());
1625     const std::string DEVICE_A = "real_device";
1626     /**
1627      * @tc.steps: step1. deviceB set get data delay 40s
1628      */
1629     g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1630     g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1631 
1632     /**
1633      * @tc.steps: step2. deviceA call sync and wait
1634      * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1635      */
1636     std::map<std::string, DBStatus> result;
1637     std::map<std::string, int> virtualRes;
1638     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1639     EXPECT_EQ(status, OK);
1640     EXPECT_EQ(result.size(), devices.size());
1641     EXPECT_EQ(result[DEVICE_B], TIME_OUT);
1642     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1643     Query query = Query::Select();
__anon6e997c5f2502(std::map<std::string, int> resMap) 1644     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1645         virtualRes = std::move(resMap);
1646     }, true);
1647     EXPECT_EQ(virtualRes.size(), devices.size());
1648     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1649     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1650 
1651     /**
1652      * @tc.steps: step3. deviceB set get data delay 30s
1653      */
1654     g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1655     /**
1656      * @tc.steps: step4. deviceA call sync and wait
1657      * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1658      */
1659     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1660     EXPECT_EQ(status, OK);
1661     EXPECT_EQ(result.size(), devices.size());
1662     EXPECT_EQ(result[DEVICE_B], OK);
1663     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anon6e997c5f2602(std::map<std::string, int> resMap) 1664     g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1665         virtualRes = std::move(resMap);
1666     }, true);
1667     EXPECT_EQ(virtualRes.size(), devices.size());
1668     EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1669     g_deviceB->DelayGetSyncData(0);
1670 }
1671 
1672 /**
1673   * @tc.name: GetDataNotify002
1674   * @tc.desc: Test GetDataNotify function, two device sync each other at same time
1675   * @tc.type: FUNC
1676   * @tc.require: AR000D4876
1677   * @tc.author: zhangqiquan
1678   */
1679 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify002, TestSize.Level3)
1680 {
1681     ASSERT_NE(g_kvDelegatePtr, nullptr);
1682     DBStatus status = OK;
1683     std::vector<std::string> devices;
1684     devices.push_back(g_deviceB->GetDeviceId());
1685     const std::string DEVICE_A = "real_device";
1686 
1687     /**
1688      * @tc.steps: step1. deviceA sync first to finish time sync and ability sync
1689      */
1690     std::map<std::string, DBStatus> result;
1691     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1692     EXPECT_EQ(status, OK);
1693     EXPECT_EQ(result.size(), devices.size());
1694     EXPECT_EQ(result[DEVICE_B], OK);
1695     /**
1696      * @tc.steps: step2. deviceB set get data delay 30s
1697      */
1698     g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1699 
1700     /**
1701      * @tc.steps: step3. deviceB call sync and wait
1702      */
__anon6e997c5f2702() 1703     std::thread asyncThread([]() {
1704         std::map<std::string, int> virtualRes;
1705         Query query = Query::Select();
1706         g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1707                 virtualRes = std::move(resMap);
1708             }, true);
1709     });
1710 
1711     /**
1712      * @tc.steps: step4. deviceA call sync and wait
1713      * @tc.expected: step4. sync should return OK. because notify timer trigger (30s - 1s)/2s => 15times
1714      */
1715     std::this_thread::sleep_for(std::chrono::seconds(1));
1716     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1717     EXPECT_EQ(status, OK);
1718     EXPECT_EQ(result.size(), devices.size());
1719     EXPECT_EQ(result[DEVICE_B], OK);
1720     asyncThread.join();
1721     std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1722 }
1723 
1724 /**
1725  * @tc.name: DelaySync001
1726  * @tc.desc: Test delay first packet will not effect data conflict
1727  * @tc.type: FUNC
1728  * @tc.require:
1729  * @tc.author: zqq
1730  */
1731 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, DelaySync001, TestSize.Level3)
1732 {
1733     // B put (k, b) after A put (k, a)
1734     Key key = {'k'};
1735     Value aValue = {'a'};
1736     g_kvDelegatePtr->Put(key, aValue);
1737     std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
1738     Timestamp currentTime = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
1739     Value bValue = {'b'};
1740     EXPECT_EQ(g_deviceB->PutData(key, bValue, currentTime, 0), E_OK);
1741 
1742     // delay time sync message, delay time/2 should greater than put sleep time
1743     g_communicatorAggregator->SetTimeout(DEVICE_B, DBConstant::MAX_TIMEOUT);
1744     g_communicatorAggregator->SetTimeout("real_device", DBConstant::MAX_TIMEOUT);
__anon6e997c5f2902(const std::string &dstTarget, const Message *msg) 1745     g_communicatorAggregator->RegBeforeDispatch([](const std::string &dstTarget, const Message *msg) {
1746         if (dstTarget == DEVICE_B && msg->GetMessageId() == MessageId::TIME_SYNC_MESSAGE) {
1747             std::this_thread::sleep_for(std::chrono::seconds(3)); // sleep for 3s
1748         }
1749     });
1750 
1751     // A call sync and (k, b) in A
1752     std::vector<std::string> devices;
1753     devices.push_back(g_deviceB->GetDeviceId());
1754     std::map<std::string, DBStatus> result;
1755     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1756     EXPECT_EQ(status, OK);
1757     EXPECT_EQ(result.size(), devices.size());
1758     EXPECT_EQ(result[DEVICE_B], OK);
1759 
1760     Value actualValue;
1761     g_kvDelegatePtr->Get(key, actualValue);
1762     EXPECT_EQ(actualValue, bValue);
1763     g_communicatorAggregator->RegBeforeDispatch(nullptr);
1764 }
1765 
1766 /**
1767  * @tc.name: KVAbilitySyncOpt001
1768  * @tc.desc: check ability sync 2 packet
1769  * @tc.type: FUNC
1770  * @tc.require:
1771  * @tc.author: zhangqiquan
1772  */
1773 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt001, TestSize.Level0)
1774 {
1775     /**
1776      * @tc.steps: step1. record packet
1777      * @tc.expected: step1. sync should failed in source.
1778      */
1779     std::atomic<int> messageCount = 0;
__anon6e997c5f2a02(const std::string &dev, Message *msg) 1780     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
1781         if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1782             return;
1783         }
1784         messageCount++;
1785         EXPECT_GE(g_kvDelegatePtr->GetTaskCount(), 1);
1786     });
1787     /**
1788      * @tc.steps: step2. deviceA call sync and wait
1789      * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
1790      */
1791     DBStatus status = OK;
1792     std::vector<std::string> devices;
1793     devices.push_back(g_deviceB->GetDeviceId());
1794     std::map<std::string, DBStatus> result;
1795     status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1796     EXPECT_EQ(status, OK);
1797     EXPECT_EQ(messageCount, 2); // 2 ability sync
1798     for (const auto &pair : result) {
1799         EXPECT_EQ(pair.second, OK);
1800     }
1801 }
1802 
1803 /**
1804  * @tc.name: KVAbilitySyncOpt002
1805  * @tc.desc: check get task count while conn is nullptr.
1806  * @tc.type: FUNC
1807  * @tc.require:
1808  * @tc.author: caihaoting
1809  */
1810 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt002, TestSize.Level0)
1811 {
1812     /**
1813      * @tc.steps: step1. record packet while conn is nullptr.
1814      * @tc.expected: step1. sync should failed in source and get task count return DB_ERROR.
1815      */
1816     auto kvStoreImpl = static_cast<KvStoreNbDelegateImpl *>(g_kvDelegatePtr);
1817     EXPECT_EQ(kvStoreImpl->Close(), OK);
1818     std::atomic<int> messageCount = 0;
__anon6e997c5f2b02(const std::string &dev, Message *msg) 1819     g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
1820         if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1821             return;
1822         }
1823         messageCount++;
1824         EXPECT_EQ(g_kvDelegatePtr->GetTaskCount(), DB_ERROR);
1825     });
1826 }
1827 
1828 /**
1829  * @tc.name: KVSyncOpt001
1830  * @tc.desc: check time sync and ability sync once
1831  * @tc.type: FUNC
1832  * @tc.require:
1833  * @tc.author: zhangqiquan
1834  */
1835 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt001, TestSize.Level0)
1836 {
1837     /**
1838      * @tc.steps: step1. record packet which send to B
1839      */
1840     std::atomic<int> messageCount = 0;
1841     RegOnDispatchWithoutDataPacket(messageCount);
1842     /**
1843      * @tc.steps: step2. deviceA call sync and wait
1844      * @tc.expected: step2. sync should return OK.
1845      */
1846     std::vector<std::string> devices;
1847     devices.push_back(g_deviceB->GetDeviceId());
1848     Sync(devices, OK);
1849     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1850     /**
1851      * @tc.steps: step3. reopen kv store
1852      * @tc.expected: step3. reopen OK.
1853      */
1854     ReOpenDB();
1855     /**
1856      * @tc.steps: step4. reopen kv store and sync again
1857      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
1858      */
1859     messageCount = 0;
1860     Sync(devices, OK);
1861     EXPECT_EQ(messageCount, 0);
1862     g_communicatorAggregator->RegOnDispatch(nullptr);
1863 }
1864 
1865 /**
1866  * @tc.name: KVSyncOpt002
1867  * @tc.desc: check device time sync once
1868  * @tc.type: FUNC
1869  * @tc.require:
1870  * @tc.author: zhangqiquan
1871  */
1872 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt002, TestSize.Level0)
1873 {
1874 /**
1875      * @tc.steps: step1. record packet which send to B
1876      */
1877     std::atomic<int> messageCount = 0;
1878     RegOnDispatchWithoutDataPacket(messageCount);
1879     /**
1880      * @tc.steps: step2. deviceA call sync and wait
1881      * @tc.expected: step2. sync should return OK.
1882      */
1883     std::vector<std::string> devices;
1884     devices.push_back(g_deviceB->GetDeviceId());
1885     Sync(devices, OK);
1886     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1887     // close kv store avoid packet dispatch error
1888     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1889     g_kvDelegatePtr = nullptr;
1890     ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
1891     EXPECT_TRUE(RuntimeContext::GetInstance()->IsTimeTickMonitorValid());
1892     /**
1893      * @tc.steps: step3. open new kv store
1894      * @tc.expected: step3. open OK.
1895      */
1896     KvStoreNbDelegate::Option option;
1897     option.secOption.securityLabel = SecurityLabel::S3;
1898     option.secOption.securityFlag = SecurityFlag::SECE;
1899     KvStoreNbDelegate *delegate2 = nullptr;
__anon6e997c5f2c02(DBStatus status, KvStoreNbDelegate *delegate) 1900     g_mgr.GetKvStore(STORE_ID_2, option, [&delegate2](DBStatus status, KvStoreNbDelegate *delegate) {
1901         delegate2 = delegate;
1902         EXPECT_EQ(status, OK);
1903     });
1904     /**
1905      * @tc.steps: step4. sync again
1906      * @tc.expected: step4. sync success, only ability sync packet.
1907      */
1908     messageCount = 0;
1909     Sync(delegate2, devices, OK);
1910     EXPECT_EQ(messageCount, 1); // 1 contain ability sync packet
1911     EXPECT_EQ(g_mgr.CloseKvStore(delegate2), OK);
1912     EXPECT_EQ(g_mgr.DeleteKvStore(STORE_ID_2), OK);
1913     g_communicatorAggregator->RegOnDispatch(nullptr);
1914 }
1915 
1916 /**
1917  * @tc.name: KVSyncOpt003
1918  * @tc.desc: check time sync and ability sync once
1919  * @tc.type: FUNC
1920  * @tc.require:
1921  * @tc.author: zhangqiquan
1922  */
1923 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt003, TestSize.Level0)
1924 {
1925     /**
1926      * @tc.steps: step1. record packet which send to B
1927      */
1928     std::atomic<int> messageCount = 0;
1929     RegOnDispatchWithoutDataPacket(messageCount);
1930     /**
1931      * @tc.steps: step2. deviceA call sync and wait
1932      * @tc.expected: step2. sync should return OK.
1933      */
1934     std::vector<std::string> devices;
1935     devices.push_back(g_deviceB->GetDeviceId());
1936     Sync(devices, OK);
1937     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1938     /**
1939      * @tc.steps: step3. reopen kv store
1940      * @tc.expected: step3. reopen OK.
1941      */
1942     ReOpenDB();
1943     /**
1944      * @tc.steps: step4. reopen kv store and sync again
1945      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
1946      */
1947     messageCount = 0;
1948     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
1949     EXPECT_EQ(messageCount, 0);
1950     g_communicatorAggregator->RegOnDispatch(nullptr);
1951 }
1952 
1953 /**
1954  * @tc.name: KVSyncOpt004
1955  * @tc.desc: check sync in keys after reopen db
1956  * @tc.type: FUNC
1957  * @tc.require:
1958  * @tc.author: zhangqiquan
1959  */
1960 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt004, TestSize.Level0)
1961 {
1962     /**
1963      * @tc.steps: step1. deviceA call sync and wait
1964      * @tc.expected: step1. sync should return OK.
1965      */
1966     std::vector<std::string> devices;
1967     devices.push_back(g_deviceB->GetDeviceId());
1968     Sync(devices, OK);
1969     /**
1970      * @tc.steps: step2. reopen kv store
1971      * @tc.expected: step2. reopen OK.
1972      */
1973     ReOpenDB();
1974     /**
1975      * @tc.steps: step3. sync with in keys
1976      * @tc.expected: step3. sync OK.
1977      */
1978     std::map<std::string, DBStatus> result;
1979     std::set<Key> condition;
1980     condition.insert({'k'});
1981     Query query = Query::Select().InKeys(condition);
1982     DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
1983     EXPECT_EQ(status, OK);
1984     for (const auto &deviceId : devices) {
1985         EXPECT_EQ(result[deviceId], OK);
1986     }
1987 }
1988 
1989 /**
1990  * @tc.name: KVSyncOpt005
1991  * @tc.desc: check record ability finish after receive ability sync
1992  * @tc.type: FUNC
1993  * @tc.require:
1994  * @tc.author: zhangqiquan
1995  */
1996 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt005, TestSize.Level0)
1997 {
1998     /**
1999      * @tc.steps: step1. record packet which send to B
2000      */
2001     std::atomic<int> messageCount = 0;
2002     RegOnDispatchWithoutDataPacket(messageCount, true);
2003     /**
2004      * @tc.steps: step2. deviceB call sync and wait
2005      * @tc.expected: step2. sync should return OK.
2006      */
2007     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2008     EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2009     /**
2010      * @tc.steps: step3. reopen kv store
2011      * @tc.expected: step3. reopen OK.
2012      */
2013     ReOpenDB();
2014     /**
2015      * @tc.steps: step4. reopen kv store and sync again
2016      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2017      */
2018     messageCount = 0;
2019     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2020     EXPECT_EQ(messageCount, 0);
2021     g_communicatorAggregator->RegOnDispatch(nullptr);
2022 }
2023 
2024 /**
2025  * @tc.name: KVSyncOpt006
2026  * @tc.desc: check time sync and ability sync once after rebuild
2027  * @tc.type: FUNC
2028  * @tc.require:
2029  * @tc.author: zhangqiquan
2030  */
2031 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt006, TestSize.Level0)
2032 {
2033     /**
2034      * @tc.steps: step1. record packet which send to B
2035      */
2036     std::atomic<int> messageCount = 0;
2037     RegOnDispatchWithoutDataPacket(messageCount, true);
2038     /**
2039      * @tc.steps: step2. deviceA call sync and wait
2040      * @tc.expected: step2. sync should return OK.
2041      */
2042     std::vector<std::string> devices;
2043     devices.push_back(g_deviceB->GetDeviceId());
2044     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2045     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2046     /**
2047      * @tc.steps: step3. rebuild kv store
2048      * @tc.expected: step3. rebuild OK.
2049      */
2050     ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2051     g_kvDelegatePtr = nullptr;
2052     g_mgr.DeleteKvStore(STORE_ID);
2053     KvStoreNbDelegate::Option option;
2054     option.secOption.securityLabel = SecurityLabel::S3;
2055     option.secOption.securityFlag = SecurityFlag::SECE;
2056     g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
2057     ASSERT_TRUE(g_kvDelegateStatus == OK);
2058     ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2059     /**
2060      * @tc.steps: step4. rebuild kv store and sync again
2061      * @tc.expected: step4. rebuild OK and sync success, re ability sync.
2062      */
2063     messageCount = 0;
2064     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2065     EXPECT_EQ(messageCount, 1);
2066     g_communicatorAggregator->RegOnDispatch(nullptr);
2067 }
2068 
2069 /**
2070  * @tc.name: KVSyncOpt007
2071  * @tc.desc: check re ability sync after import
2072  * @tc.type: FUNC
2073  * @tc.require:
2074  * @tc.author: zhangqiquan
2075  */
2076 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt007, TestSize.Level0)
2077 {
2078     /**
2079      * @tc.steps: step1. record packet which send to B
2080      */
2081     std::atomic<int> messageCount = 0;
2082     RegOnDispatchWithoutDataPacket(messageCount, true);
2083     /**
2084      * @tc.steps: step2. deviceB call sync and wait
2085      * @tc.expected: step2. sync should return OK.
2086      */
2087     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2088     EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2089     /**
2090      * @tc.steps: step3. export and import
2091      * @tc.expected: step3. export and import OK.
2092      */
2093     std::string singleExportFileName = g_testDir + "/KVSyncOpt007.$$";
2094     CipherPassword passwd;
2095     EXPECT_EQ(g_kvDelegatePtr->Export(singleExportFileName, passwd), OK);
2096     EXPECT_EQ(g_kvDelegatePtr->Import(singleExportFileName, passwd), OK);
2097     /**
2098      * @tc.steps: step4. reopen kv store and sync again
2099      * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2100      */
2101     messageCount = 0;
2102     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2103     EXPECT_EQ(messageCount, 1); // DEV_A send negotiation 1 ack packet.
2104     g_communicatorAggregator->RegOnDispatch(nullptr);
2105 }
2106 
2107 /**
2108  * @tc.name: KVTimeChange001
2109  * @tc.desc: check time sync and ability sync once
2110  * @tc.type: FUNC
2111  * @tc.require:
2112  * @tc.author: zhangqiquan
2113  */
2114 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVTimeChange001, TestSize.Level0)
2115 {
2116     /**
2117      * @tc.steps: step1. record packet which send to B
2118      */
2119     std::atomic<int> messageCount = 0;
2120     RegOnDispatchWithoutDataPacket(messageCount);
2121     /**
2122      * @tc.steps: step2. deviceA call sync and wait
2123      * @tc.expected: step2. sync should return OK.
2124      */
2125     std::vector<std::string> devices;
2126     devices.push_back(g_deviceB->GetDeviceId());
2127     Sync(devices, OK);
2128     EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2129     /**
2130      * @tc.steps: step3. sync again
2131      * @tc.expected: step3. sync success, no negotiation packet.
2132      */
2133     messageCount = 0;
2134     Sync(devices, OK);
2135     EXPECT_EQ(messageCount, 0);
2136     /**
2137      * @tc.steps: step4. modify time offset and sync again
2138      * @tc.expected: step4. sync success, only time sync packet.
2139      */
2140     RuntimeContext::GetInstance()->NotifyTimestampChanged(100);
2141     RuntimeContext::GetInstance()->RecordAllTimeChange();
2142     RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
2143     messageCount = 0;
2144     Sync(devices, OK);
2145     EXPECT_EQ(messageCount, 1); // 1 contain time sync request packet
2146     messageCount = 0;
2147     EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2148     EXPECT_EQ(messageCount, 0);
2149     g_communicatorAggregator->RegOnDispatch(nullptr);
2150 }
2151 }