1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17
18 #include "ability_sync.h"
19 #include "distributeddb_data_generate_unit_test.h"
20 #include "distributeddb_tools_unit_test.h"
21 #include "kv_store_nb_delegate_impl.h"
22 #include "kv_virtual_device.h"
23 #include "platform_specific.h"
24 #include "process_system_api_adapter_impl.h"
25 #include "single_ver_data_packet.h"
26 #include "virtual_communicator_aggregator.h"
27
28 using namespace testing::ext;
29 using namespace DistributedDB;
30 using namespace DistributedDBUnitTest;
31 using namespace std;
32
33 namespace {
34 string g_testDir;
35 const string STORE_ID = "kv_stroe_sync_check_test";
36 const std::string DEVICE_B = "deviceB";
37 const std::string DEVICE_C = "deviceC";
38 const int LOCAL_WATER_MARK_NOT_INIT = 0xaa;
39 const int EIGHT_HUNDRED = 800;
40 const int NORMAL_SYNC_SEND_REQUEST_CNT = 3;
41 const int TWO_CNT = 2;
42 const int SLEEP_MILLISECONDS = 500;
43 const int TEN_SECONDS = 10;
44 const int THREE_HUNDRED = 300;
45 const int WAIT_30_SECONDS = 30000;
46 const int WAIT_40_SECONDS = 40000;
47 const int TIMEOUT_6_SECONDS = 6000;
48
49 KvStoreDelegateManager g_mgr(APP_ID, USER_ID);
50 KvStoreConfig g_config;
51 DistributedDBToolsUnitTest g_tool;
52 DBStatus g_kvDelegateStatus = INVALID_ARGS;
53 KvStoreNbDelegate* g_kvDelegatePtr = nullptr;
54 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
55 KvVirtualDevice* g_deviceB = nullptr;
56 KvVirtualDevice* g_deviceC = nullptr;
57 VirtualSingleVerSyncDBInterface *g_syncInterfaceB = nullptr;
58 VirtualSingleVerSyncDBInterface *g_syncInterfaceC = nullptr;
59
60 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
61 auto g_kvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
62 placeholders::_1, placeholders::_2, std::ref(g_kvDelegateStatus), std::ref(g_kvDelegatePtr));
63 #ifndef LOW_LEVEL_MEM_DEV
64 const int KEY_LEN = 20; // 20 Bytes
65 const int VALUE_LEN = 4 * 1024 * 1024; // 4MB
66 const int ENTRY_NUM = 2; // 16 entries
67 #endif
68
69 class DistributedDBSingleVerP2PSyncCheckTest : public testing::Test {
70 public:
71 static void SetUpTestCase(void);
72 static void TearDownTestCase(void);
73 void SetUp();
74 void TearDown();
75 };
76
SetUpTestCase(void)77 void DistributedDBSingleVerP2PSyncCheckTest::SetUpTestCase(void)
78 {
79 /**
80 * @tc.setup: Init datadir and Virtual Communicator.
81 */
82 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
83 g_config.dataDir = g_testDir;
84 g_mgr.SetKvStoreConfig(g_config);
85
86 string dir = g_testDir + "/single_ver";
87 DIR* dirTmp = opendir(dir.c_str());
88 if (dirTmp == nullptr) {
89 OS::MakeDBDirectory(dir);
90 } else {
91 closedir(dirTmp);
92 }
93
94 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
95 ASSERT_TRUE(g_communicatorAggregator != nullptr);
96 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
97
98 std::shared_ptr<ProcessSystemApiAdapterImpl> g_adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
99 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(g_adapter);
100 }
101
TearDownTestCase(void)102 void DistributedDBSingleVerP2PSyncCheckTest::TearDownTestCase(void)
103 {
104 /**
105 * @tc.teardown: Release virtual Communicator and clear data dir.
106 */
107 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
108 LOGE("rm test db files error!");
109 }
110 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
111 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
112 }
113
SetUp(void)114 void DistributedDBSingleVerP2PSyncCheckTest::SetUp(void)
115 {
116 DistributedDBToolsUnitTest::PrintTestCaseInfo();
117 /**
118 * @tc.setup: create virtual device B and C, and get a KvStoreNbDelegate as deviceA
119 */
120 KvStoreNbDelegate::Option option;
121 option.secOption.securityLabel = SecurityLabel::S3;
122 option.secOption.securityFlag = SecurityFlag::SECE;
123 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
124 ASSERT_TRUE(g_kvDelegateStatus == OK);
125 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
126 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
127 ASSERT_TRUE(g_deviceB != nullptr);
128 g_syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
129 ASSERT_TRUE(g_syncInterfaceB != nullptr);
130 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, g_syncInterfaceB), E_OK);
131 SecurityOption virtualOption;
132 virtualOption.securityLabel = option.secOption.securityLabel;
133 virtualOption.securityFlag = option.secOption.securityFlag;
134 g_syncInterfaceB->SetSecurityOption(virtualOption);
135
136 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
137 ASSERT_TRUE(g_deviceC != nullptr);
138 g_syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
139 ASSERT_TRUE(g_syncInterfaceC != nullptr);
140 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, g_syncInterfaceC), E_OK);
141 g_syncInterfaceC->SetSecurityOption(virtualOption);
142 RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
143 }
144
TearDown(void)145 void DistributedDBSingleVerP2PSyncCheckTest::TearDown(void)
146 {
147 /**
148 * @tc.teardown: Release device A, B, C
149 */
150 if (g_kvDelegatePtr != nullptr) {
151 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
152 g_kvDelegatePtr = nullptr;
153 DBStatus status = g_mgr.DeleteKvStore(STORE_ID);
154 LOGD("delete kv store status %d", status);
155 ASSERT_TRUE(status == OK);
156 }
157 if (g_deviceB != nullptr) {
158 delete g_deviceB;
159 g_deviceB = nullptr;
160 }
161 if (g_deviceC != nullptr) {
162 delete g_deviceC;
163 g_deviceC = nullptr;
164 }
165 if (g_communicatorAggregator != nullptr) {
166 g_communicatorAggregator->RegOnDispatch(nullptr);
167 }
168 }
169
170 /**
171 * @tc.name: sec option check Sync 001
172 * @tc.desc: if sec option not equal, forbid sync
173 * @tc.type: FUNC
174 * @tc.require: AR000EV1G6
175 * @tc.author: wangchuanqing
176 */
177 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck001, TestSize.Level1)
178 {
179 DBStatus status = OK;
180 std::vector<std::string> devices;
181 devices.push_back(g_deviceB->GetDeviceId());
182 devices.push_back(g_deviceC->GetDeviceId());
183
184 /**
185 * @tc.steps: step1. deviceA put {k1, v1}
186 */
187 Key key = {'1'};
188 Value value = {'1'};
189 status = g_kvDelegatePtr->Put(key, value);
190 ASSERT_TRUE(status == OK);
191
192 ASSERT_TRUE(g_syncInterfaceB != nullptr);
193 ASSERT_TRUE(g_syncInterfaceC != nullptr);
194 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
195 g_syncInterfaceB->SetSecurityOption(secOption);
196 g_syncInterfaceC->SetSecurityOption(secOption);
197
198 /**
199 * @tc.steps: step2. deviceA call sync and wait
200 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
201 */
202 std::map<std::string, DBStatus> result;
203 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
204 ASSERT_TRUE(status == OK);
205
206 ASSERT_TRUE(result.size() == devices.size());
207 for (const auto &pair : result) {
208 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
209 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
210 }
211 VirtualDataItem item;
212 g_deviceB->GetData(key, item);
213 EXPECT_TRUE(item.value.empty());
214 g_deviceC->GetData(key, item);
215 EXPECT_TRUE(item.value.empty());
216 }
217
218 /**
219 * @tc.name: sec option check Sync 002
220 * @tc.desc: if sec option not equal, forbid sync
221 * @tc.type: FUNC
222 * @tc.require: AR000EV1G6
223 * @tc.author: wangchuanqing
224 */
225 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck002, TestSize.Level1)
226 {
227 DBStatus status = OK;
228 std::vector<std::string> devices;
229 devices.push_back(g_deviceB->GetDeviceId());
230 devices.push_back(g_deviceC->GetDeviceId());
231
232 /**
233 * @tc.steps: step1. deviceA put {k1, v1}
234 */
235 Key key = {'1'};
236 Value value = {'1'};
237 status = g_kvDelegatePtr->Put(key, value);
238 ASSERT_TRUE(status == OK);
239
240 ASSERT_TRUE(g_syncInterfaceC != nullptr);
241 SecurityOption secOption{SecurityLabel::S4, SecurityFlag::ECE};
242 g_syncInterfaceC->SetSecurityOption(secOption);
243 secOption.securityLabel = SecurityLabel::S3;
244 secOption.securityFlag = SecurityFlag::SECE;
245 g_syncInterfaceB->SetSecurityOption(secOption);
246
247 /**
248 * @tc.steps: step2. deviceA call sync and wait
249 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
250 */
251 std::map<std::string, DBStatus> result;
252 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
253 ASSERT_TRUE(status == OK);
254
255 ASSERT_TRUE(result.size() == devices.size());
256 for (const auto &pair : result) {
257 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
258 if (pair.first == DEVICE_B) {
259 EXPECT_TRUE(pair.second == OK);
260 } else {
261 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
262 }
263 }
264 VirtualDataItem item;
265 g_deviceC->GetData(key, item);
266 EXPECT_TRUE(item.value.empty());
267 g_deviceB->GetData(key, item);
268 EXPECT_TRUE(item.value == value);
269 }
270
271 /**
272 * @tc.name: sec option check Sync 003
273 * @tc.desc: if sec option equal, check not pass, forbid sync
274 * @tc.type: FUNC
275 * @tc.require: AR000EV1G6
276 * @tc.author: zhangqiquan
277 */
278 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck003, TestSize.Level1)
279 {
280 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
281 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0202(const std::string &, const SecurityOption &) 282 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
283 return false;
284 });
285 /**
286 * @tc.steps: step1. record packet
287 * @tc.expected: step1. sync should failed in source.
288 */
289 std::atomic<int> messageCount = 0;
__anon6e997c5f0302(const std::string &, Message *) 290 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &, Message *) {
291 messageCount++;
292 });
293 /**
294 * @tc.steps: step2. deviceA call sync and wait
295 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
296 */
297 DBStatus status = OK;
298 std::vector<std::string> devices;
299 devices.push_back(g_deviceB->GetDeviceId());
300 std::map<std::string, DBStatus> result;
301 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
302 EXPECT_EQ(status, OK);
303 EXPECT_EQ(messageCount, 4); // 4 = 2 time sync + 2 ability sync
304 for (const auto &pair : result) {
305 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
306 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
307 }
308 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
309 g_communicatorAggregator->RegOnDispatch(nullptr);
310 }
311
312 /**
313 * @tc.name: sec option check Sync 004
314 * @tc.desc: memory db not check device security
315 * @tc.type: FUNC
316 * @tc.require:
317 * @tc.author: zhangqiquan
318 */
319 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck004, TestSize.Level1)
320 {
321 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
322 g_kvDelegatePtr = nullptr;
323 KvStoreNbDelegate::Option option;
324 option.secOption.securityLabel = SecurityLabel::NOT_SET;
325 option.isMemoryDb = true;
326 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
327 ASSERT_TRUE(g_kvDelegateStatus == OK);
328 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
329
330 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
331 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0402(const std::string &, const SecurityOption &) 332 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
333 return false;
334 });
__anon6e997c5f0502(const std::string &, SecurityOption &securityOption) 335 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
336 securityOption.securityLabel = NOT_SET;
337 return OK;
338 });
__anon6e997c5f0602(SecurityOption &) 339 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &) {
340 return -E_NOT_SUPPORT;
341 });
342
343 std::vector<std::string> devices;
344 devices.push_back(g_deviceB->GetDeviceId());
345 std::map<std::string, DBStatus> result;
346 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
347 EXPECT_EQ(status, OK);
348 for (const auto &pair : result) {
349 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
350 EXPECT_TRUE(pair.second == OK);
351 }
352
353 adapter->ForkCheckDeviceSecurityAbility(nullptr);
354 adapter->ForkGetSecurityOption(nullptr);
355 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
356 }
357
358 /**
359 * @tc.name: sec option check Sync 005
360 * @tc.desc: if sec option equal, check not pass, forbid sync
361 * @tc.type: FUNC
362 * @tc.require:
363 * @tc.author: zhangqiquan
364 */
365 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck005, TestSize.Level1)
366 {
367 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
368 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0702(SecurityOption &option) 369 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
370 option.securityLabel = NOT_SET;
371 return E_OK;
372 });
__anon6e997c5f0802(const std::string &, SecurityOption &securityOption) 373 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
374 securityOption.securityLabel = NOT_SET;
375 return OK;
376 });
377
378 std::vector<std::string> devices;
379 devices.push_back(g_deviceB->GetDeviceId());
380 std::map<std::string, DBStatus> result;
381 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
382 EXPECT_EQ(status, OK);
383 for (const auto &pair : result) {
384 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
385 EXPECT_TRUE(pair.second == SECURITY_OPTION_CHECK_ERROR);
386 }
387
388 adapter->ForkCheckDeviceSecurityAbility(nullptr);
389 adapter->ForkGetSecurityOption(nullptr);
390 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
391 }
392
393 /**
394 * @tc.name: sec option check Sync 006
395 * @tc.desc: memory db not check device security
396 * @tc.type: FUNC
397 * @tc.require:
398 * @tc.author: zhangqiquan
399 */
400 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck006, TestSize.Level0)
401 {
402 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
403 ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
404 g_kvDelegatePtr = nullptr;
405 KvStoreNbDelegate::Option option;
406 option.secOption.securityLabel = SecurityLabel::S1;
407 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
408 ASSERT_TRUE(g_kvDelegateStatus == OK);
409 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
410
411 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
412 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0902(const std::string &, const SecurityOption &) 413 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &) {
414 return true;
415 });
__anon6e997c5f0a02(const std::string &, SecurityOption &securityOption) 416 adapter->ForkGetSecurityOption([](const std::string &, SecurityOption &securityOption) {
417 securityOption.securityLabel = S1;
418 return OK;
419 });
__anon6e997c5f0b02(SecurityOption &option) 420 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
421 option.securityLabel = SecurityLabel::S0;
422 return E_OK;
423 });
424
425 std::vector<std::string> devices;
426 devices.push_back(g_deviceB->GetDeviceId());
427 std::map<std::string, DBStatus> result;
428 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
429 EXPECT_EQ(status, OK);
430 for (const auto &pair : result) {
431 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
432 EXPECT_TRUE(pair.second == OK);
433 }
434
435 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
436 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
437 }
438
439 /**
440 * @tc.name: sec option check Sync 007
441 * @tc.desc: sync should send security option
442 * @tc.type: FUNC
443 * @tc.require:
444 * @tc.author: zhangqiquan
445 */
446 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck007, TestSize.Level0)
447 {
448 /**
449 * @tc.steps: step1. fork check device security ability
450 * @tc.expected: step1. check param option should be S3 SECE.
451 */
452 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
453 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
__anon6e997c5f0c02(const std::string &, const SecurityOption &option) 454 adapter->ForkCheckDeviceSecurityAbility([](const std::string &, const SecurityOption &option) {
455 EXPECT_EQ(option.securityLabel, SecurityLabel::S3);
456 EXPECT_EQ(option.securityFlag, SecurityFlag::SECE);
457 return true;
458 });
459 /**
460 * @tc.steps: step2. sync twice
461 * @tc.expected: step2. sync success.
462 */
463 std::vector<std::string> devices;
464 devices.push_back(g_deviceB->GetDeviceId());
465 std::map<std::string, DBStatus> result;
466 g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
467 auto status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
468 ASSERT_TRUE(status == OK);
469 ASSERT_TRUE(result.size() == devices.size());
470 for (const auto &pair : result) {
471 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
472 EXPECT_TRUE(pair.second == OK);
473 }
474 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(nullptr);
475 }
476
477 /**
478 * @tc.name: SecOptionCheck008
479 * @tc.desc: pull compress sync when check device ability fail
480 * @tc.type: FUNC
481 * @tc.require:
482 * @tc.author: zhangqiquan
483 */
484 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SecOptionCheck008, TestSize.Level0)
485 {
486 auto adapter = std::make_shared<ProcessSystemApiAdapterImpl>();
487 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(adapter);
488 auto deviceB = g_deviceB->GetDeviceId();
__anon6e997c5f0d02(const std::string &dev, const SecurityOption &) 489 adapter->ForkCheckDeviceSecurityAbility([deviceB](const std::string &dev, const SecurityOption &) {
490 if (dev != "real_device") {
491 return true;
492 }
493 return false;
494 });
__anon6e997c5f0e02(SecurityOption &option) 495 g_syncInterfaceB->ForkGetSecurityOption([](SecurityOption &option) {
496 option.securityLabel = SecurityLabel::S3;
497 option.securityFlag = SecurityFlag::SECE;
498 return E_OK;
499 });
500 g_syncInterfaceB->SetCompressSync(true);
501 std::vector<std::string> devices;
502 devices.push_back(deviceB);
503 std::map<std::string, DBStatus> result;
504 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
505 EXPECT_EQ(status, OK);
506 for (const auto &pair : result) {
507 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
508 EXPECT_EQ(pair.second, SECURITY_OPTION_CHECK_ERROR);
509 }
510
511 RuntimeContext::GetInstance()->SetProcessSystemApiAdapter(std::make_shared<ProcessSystemApiAdapterImpl>());
512 g_syncInterfaceB->ForkGetSecurityOption(nullptr);
513 g_syncInterfaceB->SetCompressSync(false);
514 }
515
516 #ifndef LOW_LEVEL_MEM_DEV
517 /**
518 * @tc.name: BigDataSync001
519 * @tc.desc: big data sync push mode.
520 * @tc.type: FUNC
521 * @tc.require: AR000F3OOU
522 * @tc.author: wangchuanqing
523 */
524 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync001, TestSize.Level1)
525 {
526 DBStatus status = OK;
527 std::vector<std::string> devices;
528 devices.push_back(g_deviceB->GetDeviceId());
529 devices.push_back(g_deviceC->GetDeviceId());
530
531 /**
532 * @tc.steps: step1. deviceA put 16 bigData
533 */
534 std::vector<Entry> entries;
535 std::vector<Key> keys;
536 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
537 for (const auto &entry : entries) {
538 status = g_kvDelegatePtr->Put(entry.key, entry.value);
539 ASSERT_TRUE(status == OK);
540 }
541
542 /**
543 * @tc.steps: step2. deviceA call sync and wait
544 * @tc.expected: step2. sync should return OK.
545 */
546 std::map<std::string, DBStatus> result;
547 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
548 ASSERT_TRUE(status == OK);
549
550 /**
551 * @tc.expected: step2. onComplete should be called, DeviceB,C have {k1,v1}
552 */
553 ASSERT_TRUE(result.size() == devices.size());
554 for (const auto &pair : result) {
555 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
556 EXPECT_TRUE(pair.second == OK);
557 }
558 VirtualDataItem item;
559 for (const auto &entry : entries) {
560 item.value.clear();
561 g_deviceB->GetData(entry.key, item);
562 EXPECT_TRUE(item.value == entry.value);
563 item.value.clear();
564 g_deviceC->GetData(entry.key, item);
565 EXPECT_TRUE(item.value == entry.value);
566 }
567 }
568
569 /**
570 * @tc.name: BigDataSync002
571 * @tc.desc: big data sync pull mode.
572 * @tc.type: FUNC
573 * @tc.require: AR000F3OOU
574 * @tc.author: wangchuanqing
575 */
576 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync002, TestSize.Level1)
577 {
578 DBStatus status = OK;
579 std::vector<std::string> devices;
580 devices.push_back(g_deviceB->GetDeviceId());
581 devices.push_back(g_deviceC->GetDeviceId());
582
583 /**
584 * @tc.steps: step1. deviceA deviceB put bigData
585 */
586 std::vector<Entry> entries;
587 std::vector<Key> keys;
588 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
589
590 for (uint32_t i = 0; i < entries.size(); i++) {
591 if (i % 2 == 0) {
592 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
593 } else {
594 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
595 }
596 }
597
598 /**
599 * @tc.steps: step3. deviceA call pull sync
600 * @tc.expected: step3. sync should return OK.
601 */
602 std::map<std::string, DBStatus> result;
603 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result);
604 ASSERT_TRUE(status == OK);
605
606 /**
607 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
608 */
609 ASSERT_TRUE(result.size() == devices.size());
610 for (const auto &pair : result) {
611 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
612 EXPECT_TRUE(pair.second == OK);
613 }
614 for (const auto &entry : entries) {
615 Value value;
616 EXPECT_EQ(g_kvDelegatePtr->Get(entry.key, value), OK);
617 EXPECT_EQ(value, entry.value);
618 }
619 }
620
621 /**
622 * @tc.name: BigDataSync003
623 * @tc.desc: big data sync pushAndPull mode.
624 * @tc.type: FUNC
625 * @tc.require: AR000F3OOV
626 * @tc.author: wangchuanqing
627 */
628 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, BigDataSync003, TestSize.Level1)
629 {
630 DBStatus status = OK;
631 std::vector<std::string> devices;
632 devices.push_back(g_deviceB->GetDeviceId());
633 devices.push_back(g_deviceC->GetDeviceId());
634
635 /**
636 * @tc.steps: step1. deviceA deviceB put bigData
637 */
638 std::vector<Entry> entries;
639 std::vector<Key> keys;
640 DistributedDBUnitTest::GenerateRecords(ENTRY_NUM, entries, keys, KEY_LEN, VALUE_LEN);
641
642 for (uint32_t i = 0; i < entries.size(); i++) {
643 if (i % 3 == 0) { // 0 3 6 9 12 15 for deivec B
644 g_deviceB->PutData(entries[i].key, entries[i].value, 0, 0);
645 } else if (i % 3 == 1) { // 1 4 7 10 13 16 for device C
646 g_deviceC->PutData(entries[i].key, entries[i].value, 0, 0);
647 } else { // 2 5 8 11 14 for device A
648 status = g_kvDelegatePtr->Put(entries[i].key, entries[i].value);
649 ASSERT_TRUE(status == OK);
650 }
651 }
652
653 /**
654 * @tc.steps: step3. deviceA call pushAndpull sync
655 * @tc.expected: step3. sync should return OK.
656 */
657 std::map<std::string, DBStatus> result;
658 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
659 ASSERT_TRUE(status == OK);
660
661 /**
662 * @tc.expected: step3. onComplete should be called, DeviceA have all bigData
663 * deviceB and deviceC has deviceA data
664 */
665 ASSERT_TRUE(result.size() == devices.size());
666 for (const auto &pair : result) {
667 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
668 EXPECT_TRUE(pair.second == OK);
669 }
670
671 VirtualDataItem item;
672 for (uint32_t i = 0; i < entries.size(); i++) {
673 Value value;
674 EXPECT_EQ(g_kvDelegatePtr->Get(entries[i].key, value), OK);
675 EXPECT_EQ(value, entries[i].value);
676
677 if (i % 3 == 2) { // 2 5 8 11 14 for device A
678 item.value.clear();
679 g_deviceB->GetData(entries[i].key, item);
680 EXPECT_TRUE(item.value == entries[i].value);
681 item.value.clear();
682 g_deviceC->GetData(entries[i].key, item);
683 EXPECT_TRUE(item.value == entries[i].value);
684 }
685 }
686 }
687 #endif
688
689 /**
690 * @tc.name: PushFinishedNotify 001
691 * @tc.desc: Test remote device push finished notify function.
692 * @tc.type: FUNC
693 * @tc.require: AR000CQS3S
694 * @tc.author: xushaohua
695 */
696 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, PushFinishedNotify001, TestSize.Level1)
697 {
698 std::vector<std::string> devices;
699 devices.push_back(g_deviceB->GetDeviceId());
700
701 /**
702 * @tc.steps: step1. deviceA call SetRemotePushFinishedNotify
703 * @tc.expected: step1. set should return OK.
704 */
705 int pushfinishedFlag = 0;
706 DBStatus status = g_kvDelegatePtr->SetRemotePushFinishedNotify(
__anon6e997c5f0f02(const RemotePushNotifyInfo &info) 707 [&pushfinishedFlag](const RemotePushNotifyInfo &info) {
708 EXPECT_TRUE(info.deviceId == DEVICE_B);
709 pushfinishedFlag = 1;
710 });
711 ASSERT_EQ(status, OK);
712
713 /**
714 * @tc.steps: step2. deviceB put k2, v2, and deviceA pull from deviceB
715 * @tc.expected: step2. deviceA can not receive push finished notify
716 */
717 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_2, VALUE_2), OK);
718 std::map<std::string, DBStatus> result;
719 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
720 EXPECT_TRUE(status == OK);
721 EXPECT_EQ(pushfinishedFlag, 0);
722 pushfinishedFlag = 0;
723
724 /**
725 * @tc.steps: step3. deviceB put k3, v3, and deviceA push and pull to deviceB
726 * @tc.expected: step3. deviceA can not receive push finished notify
727 */
728 EXPECT_EQ(g_kvDelegatePtr->Put(KEY_3, VALUE_3), OK);
729 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_PULL, result);
730 EXPECT_TRUE(status == OK);
731 EXPECT_EQ(pushfinishedFlag, 0);
732 pushfinishedFlag = 0;
733
734 /**
735 * @tc.steps: step4. deviceA call SetRemotePushFinishedNotify to reset notify
736 * @tc.expected: step4. set should return OK.
737 */
__anon6e997c5f1002(const RemotePushNotifyInfo &info) 738 status = g_kvDelegatePtr->SetRemotePushFinishedNotify([&pushfinishedFlag](const RemotePushNotifyInfo &info) {
739 EXPECT_TRUE(info.deviceId == DEVICE_B);
740 pushfinishedFlag = 2;
741 });
742 ASSERT_EQ(status, OK);
743
744 /**
745 * @tc.steps: step5. deviceA call SetRemotePushFinishedNotify set null to unregist
746 * @tc.expected: step5. set should return OK.
747 */
748 status = g_kvDelegatePtr->SetRemotePushFinishedNotify(nullptr);
749 ASSERT_EQ(status, OK);
750 }
751
752 namespace {
RegOnDispatchWithDelayAck(bool & errCodeAck,bool & afterErrAck)753 void RegOnDispatchWithDelayAck(bool &errCodeAck, bool &afterErrAck)
754 {
755 // just delay the busy ack
756 g_communicatorAggregator->RegOnDispatch([&errCodeAck, &afterErrAck](const std::string &dev, Message *inMsg) {
757 if (dev != g_deviceB->GetDeviceId()) {
758 return;
759 }
760 auto *packet = inMsg->GetObject<DataAckPacket>();
761 if (packet != nullptr && packet->GetRecvCode() == -E_BUSY) {
762 errCodeAck = true;
763 while (!afterErrAck) {
764 }
765 LOGW("NOW SEND BUSY ACK");
766 } else if (errCodeAck) {
767 afterErrAck = true;
768 std::this_thread::sleep_for(std::chrono::seconds(1));
769 }
770 });
771 }
772
RegOnDispatchWithOffline(bool & offlineFlag,bool & invalid,condition_variable & conditionOffline)773 void RegOnDispatchWithOffline(bool &offlineFlag, bool &invalid, condition_variable &conditionOffline)
774 {
775 g_communicatorAggregator->RegOnDispatch([&offlineFlag, &invalid, &conditionOffline](
776 const std::string &dev, Message *inMsg) {
777 auto *packet = inMsg->GetObject<DataAckPacket>();
778 if (dev != DEVICE_B) {
779 if (packet != nullptr && (packet->GetRecvCode() == LOCAL_WATER_MARK_NOT_INIT)) {
780 offlineFlag = true;
781 conditionOffline.notify_all();
782 LOGW("[Dispatch] NOTIFY OFFLINE");
783 std::this_thread::sleep_for(std::chrono::microseconds(EIGHT_HUNDRED));
784 }
785 } else if (!invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
786 LOGW("[Dispatch] NOW INVALID THIS MSG");
787 inMsg->SetMessageType(TYPE_INVALID);
788 inMsg->SetMessageId(INVALID_MESSAGE_ID);
789 invalid = true;
790 }
791 });
792 }
793
RegOnDispatchWithInvalidMsg(bool & invalid)794 void RegOnDispatchWithInvalidMsg(bool &invalid)
795 {
796 g_communicatorAggregator->RegOnDispatch([&invalid](
797 const std::string &dev, Message *inMsg) {
798 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
799 LOGW("[Dispatch] NOW INVALID THIS MSG");
800 inMsg->SetMessageType(TYPE_INVALID);
801 inMsg->SetMessageId(INVALID_MESSAGE_ID);
802 invalid = true;
803 }
804 });
805 }
806
PrepareEnv(vector<std::string> & devices,Key & key,Query & query)807 void PrepareEnv(vector<std::string> &devices, Key &key, Query &query)
808 {
809 /**
810 * @tc.steps: step1. ensure the watermark is no zero and finish timeSync and abilitySync
811 * @tc.expected: step1. should return OK.
812 */
813 Value value = {'1'};
814 std::map<std::string, DBStatus> result;
815 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
816
817 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
818 EXPECT_TRUE(status == OK);
819 ASSERT_TRUE(result[g_deviceB->GetDeviceId()] == OK);
820 }
821
Sync(KvStoreNbDelegate * kvDelegatePtr,vector<std::string> & devices,const DBStatus & targetStatus)822 void Sync(KvStoreNbDelegate *kvDelegatePtr, vector<std::string> &devices, const DBStatus &targetStatus)
823 {
824 std::map<std::string, DBStatus> result;
825 DBStatus status = g_tool.SyncTest(kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result);
826 EXPECT_TRUE(status == OK);
827 for (const auto &deviceId : devices) {
828 ASSERT_TRUE(result[deviceId] == targetStatus);
829 }
830 }
831
Sync(vector<std::string> & devices,const DBStatus & targetStatus)832 void Sync(vector<std::string> &devices, const DBStatus &targetStatus)
833 {
834 Sync(g_kvDelegatePtr, devices, targetStatus);
835 }
836
SyncWithQuery(vector<std::string> & devices,const Query & query,const SyncMode & mode,const DBStatus & targetStatus)837 void SyncWithQuery(vector<std::string> &devices, const Query &query, const SyncMode &mode,
838 const DBStatus &targetStatus)
839 {
840 std::map<std::string, DBStatus> result;
841 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result, query);
842 EXPECT_TRUE(status == OK);
843 for (const auto &deviceId : devices) {
844 if (targetStatus == COMM_FAILURE) {
845 // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
846 // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
847 // The returned status is COMM_FAILURE.
848 // If syncTaskContext of deviceB is not executed first, the error code is transparently transmitted.
849 EXPECT_TRUE((result[deviceId] == static_cast<DBStatus>(-E_PERIPHERAL_INTERFACE_FAIL)) ||
850 (result[deviceId] == COMM_FAILURE));
851 } else {
852 ASSERT_EQ(result[deviceId], targetStatus);
853 }
854 }
855 }
856
SyncWithQuery(vector<std::string> & devices,const Query & query,const DBStatus & targetStatus)857 void SyncWithQuery(vector<std::string> &devices, const Query &query, const DBStatus &targetStatus)
858 {
859 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PUSH_ONLY, targetStatus);
860 }
861
SyncWithDeviceOffline(vector<std::string> & devices,Key & key,const Query & query)862 void SyncWithDeviceOffline(vector<std::string> &devices, Key &key, const Query &query)
863 {
864 Value value = {'2'};
865 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
866
867 /**
868 * @tc.steps: step2. invalid the sync msg
869 * @tc.expected: step2. should return TIME_OUT.
870 */
871 SyncWithQuery(devices, query, TIME_OUT);
872
873 /**
874 * @tc.steps: step3. device offline when sync
875 * @tc.expected: step3. should return COMM_FAILURE.
876 */
877 // If syncTaskContext of deviceB is scheduled to be executed first, ClearAllSyncTask is
878 // invoked when OfflineHandleByDevice is triggered, and SyncOperation::Finished() is triggered in advance.
879 // The returned status is COMM_FAILURE
880 SyncWithQuery(devices, query, COMM_FAILURE);
881 }
882
PrepareWaterMarkError(std::vector<std::string> & devices,Query & query)883 void PrepareWaterMarkError(std::vector<std::string> &devices, Query &query)
884 {
885 /**
886 * @tc.steps: step1. prepare data
887 */
888 devices.push_back(g_deviceB->GetDeviceId());
889 g_deviceB->Online();
890
891 Key key = {'1'};
892 query = Query::Select().PrefixKey(key);
893 PrepareEnv(devices, key, query);
894
895 /**
896 * @tc.steps: step2. query sync and set queryWaterMark
897 * @tc.expected: step2. should return OK.
898 */
899 Value value = {'2'};
900 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
901 SyncWithQuery(devices, query, OK);
902
903 /**
904 * @tc.steps: step3. sync and invalid msg for set local device waterMark
905 * @tc.expected: step3. should return TIME_OUT.
906 */
907 bool invalidMsg = false;
908 RegOnDispatchWithInvalidMsg(invalidMsg);
909 value = {'3'};
910 ASSERT_TRUE(g_kvDelegatePtr->Put(key, value) == OK);
911 Sync(devices, TIME_OUT);
912 g_communicatorAggregator->RegOnDispatch(nullptr);
913 }
914
RegOnDispatchWithoutDataPacket(std::atomic<int> & messageCount,bool calResponse=false)915 void RegOnDispatchWithoutDataPacket(std::atomic<int> &messageCount, bool calResponse = false)
916 {
917 g_communicatorAggregator->RegOnDispatch([calResponse, &messageCount](const std::string &dev, Message *msg) {
918 if (msg->GetMessageId() != TIME_SYNC_MESSAGE && msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
919 return;
920 }
921 if (dev != DEVICE_B || (!calResponse && msg->GetMessageType() != TYPE_REQUEST)) {
922 return;
923 }
924 messageCount++;
925 });
926 }
927
ReOpenDB()928 void ReOpenDB()
929 {
930 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
931 g_kvDelegatePtr = nullptr;
932 KvStoreNbDelegate::Option option;
933 option.secOption.securityLabel = SecurityLabel::S3;
934 option.secOption.securityFlag = SecurityFlag::SECE;
935 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
936 ASSERT_TRUE(g_kvDelegateStatus == OK);
937 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
938 }
939 }
940
941 /**
942 * @tc.name: AckSessionCheck 001
943 * @tc.desc: Test ack session check function.
944 * @tc.type: FUNC
945 * @tc.require: AR000F3OOV
946 * @tc.author: zhangqiquan
947 */
948 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSessionCheck001, TestSize.Level3)
949 {
950 std::vector<std::string> devices;
951 devices.push_back(g_deviceB->GetDeviceId());
952
953 /**
954 * @tc.steps: step1. deviceB sync to deviceA just for timeSync and abilitySync
955 * @tc.expected: step1. should return OK.
956 */
957 ASSERT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
958
959 /**
960 * @tc.steps: step2. deviceA StartTransaction for prevent other sync action deviceB sync will fail
961 * @tc.expected: step2. should return OK.
962 */
963 ASSERT_TRUE(g_kvDelegatePtr->StartTransaction() == OK);
964
965 bool errCodeAck = false;
966 bool afterErrAck = false;
967 RegOnDispatchWithDelayAck(errCodeAck, afterErrAck);
968
969 Key key = {'1'};
970 Value value = {'1'};
971 Timestamp currentTime;
972 (void)OS::GetCurrentSysTimeInMicrosecond(currentTime);
973 EXPECT_TRUE(g_deviceB->PutData(key, value, currentTime, 0) == E_OK);
974 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
975
976 Value outValue;
977 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == NOT_FOUND);
978
979 /**
980 * @tc.steps: step3. release the writeHandle and try again, sync success
981 * @tc.expected: step3. should return OK.
982 */
983 EXPECT_TRUE(g_kvDelegatePtr->Commit() == OK);
984 EXPECT_TRUE(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true) == OK);
985
986 EXPECT_TRUE(g_kvDelegatePtr->Get(key, outValue) == E_OK);
987 EXPECT_EQ(outValue, value);
988 }
989
990 /**
991 * @tc.name: AckSafeCheck001
992 * @tc.desc: Test ack session check filter all bad ack in device offline scene.
993 * @tc.type: FUNC
994 * @tc.require: AR000F3OOV
995 * @tc.author: zhangqiquan
996 */
997 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, AckSafeCheck001, TestSize.Level3)
998 {
999 std::vector<std::string> devices;
1000 devices.push_back(g_deviceB->GetDeviceId());
1001 g_deviceB->Online();
1002
1003 Key key = {'1'};
1004 Query query = Query::Select().PrefixKey(key);
1005 PrepareEnv(devices, key, query);
1006
1007 std::condition_variable conditionOnline;
1008 std::condition_variable conditionOffline;
1009 bool onlineFlag = false;
1010 bool invalid = false;
1011 bool offlineFlag = false;
__anon6e997c5f1602() 1012 thread subThread([&onlineFlag, &conditionOnline, &offlineFlag, &conditionOffline]() {
1013 LOGW("[Dispatch] NOW DEVICES IS OFFLINE");
1014 std::mutex offlineMtx;
1015 std::unique_lock<std::mutex> lck(offlineMtx);
1016 conditionOffline.wait(lck, [&offlineFlag]{ return offlineFlag; });
1017 g_deviceB->Offline();
1018 std::this_thread::sleep_for(std::chrono::seconds(1));
1019 g_deviceB->Online();
1020 onlineFlag = true;
1021 conditionOnline.notify_all();
1022 LOGW("[Dispatch] NOW DEVICES IS ONLINE");
1023 });
1024 subThread.detach();
1025
1026 RegOnDispatchWithOffline(offlineFlag, invalid, conditionOffline);
1027
1028 SyncWithDeviceOffline(devices, key, query);
1029
1030 std::mutex onlineMtx;
1031 std::unique_lock<std::mutex> lck(onlineMtx);
__anon6e997c5f1802null1032 conditionOnline.wait(lck, [&onlineFlag]{ return onlineFlag; });
1033
1034 /**
1035 * @tc.steps: step4. sync again if has problem it will sync never end
1036 * @tc.expected: step4. should return OK.
1037 */
1038 SyncWithQuery(devices, query, OK);
1039 }
1040
1041 /**
1042 * @tc.name: WaterMarkCheck001
1043 * @tc.desc: Test waterMark work correct in lost package scene.
1044 * @tc.type: FUNC
1045 * @tc.require: AR000F3OOV
1046 * @tc.author: zhangqiquan
1047 */
1048 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck001, TestSize.Level1)
1049 {
1050 std::vector<std::string> devices;
1051 Query query = Query::Select();
1052 PrepareWaterMarkError(devices, query);
1053
1054 /**
1055 * @tc.steps: step4. sync again see it work correct
1056 * @tc.expected: step4. should return OK.
1057 */
1058 SyncWithQuery(devices, query, OK);
1059 }
1060
1061 /**
1062 * @tc.name: WaterMarkCheck002
1063 * @tc.desc: Test pull work correct in error waterMark scene.
1064 * @tc.type: FUNC
1065 * @tc.require: AR000F3OOV
1066 * @tc.author: zhangqiquan
1067 */
1068 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, WaterMarkCheck002, TestSize.Level1)
1069 {
1070 std::vector<std::string> devices;
1071 Query query = Query::Select();
1072 PrepareWaterMarkError(devices, query);
1073
1074 /**
1075 * @tc.steps: step4. sync again see it work correct
1076 * @tc.expected: step4. should return OK.
1077 */
1078 Key key = {'2'};
1079 ASSERT_TRUE(g_kvDelegatePtr->Put(key, {}) == OK);
1080 query = Query::Select();
1081 SyncWithQuery(devices, query, DistributedDB::SYNC_MODE_PULL_ONLY, OK);
1082
1083 VirtualDataItem item;
1084 EXPECT_EQ(g_deviceB->GetData(key, item), -E_NOT_FOUND);
1085 }
1086
RegOnDispatchToGetSyncCount(int & sendRequestCount,int sleepMs=0)1087 void RegOnDispatchToGetSyncCount(int &sendRequestCount, int sleepMs = 0)
1088 {
1089 g_communicatorAggregator->RegOnDispatch([sleepMs, &sendRequestCount](
1090 const std::string &dev, Message *inMsg) {
1091 if (dev == DEVICE_B && inMsg->GetMessageType() == TYPE_REQUEST) {
1092 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1093 sendRequestCount++;
1094 LOGD("sendRequestCount++...");
1095 }
1096 });
1097 }
1098
TestDifferentSyncMode(SyncMode mode)1099 void TestDifferentSyncMode(SyncMode mode)
1100 {
1101 std::vector<std::string> devices;
1102 devices.push_back(g_deviceB->GetDeviceId());
1103
1104 /**
1105 * @tc.steps: step1. deviceA put {k1, v1}
1106 */
1107 Key key = {'1'};
1108 Value value = {'1'};
1109 DBStatus status = g_kvDelegatePtr->Put(key, value);
1110 ASSERT_TRUE(status == OK);
1111
1112 int sendRequestCount = 0;
1113 RegOnDispatchToGetSyncCount(sendRequestCount);
1114
1115 /**
1116 * @tc.steps: step2. deviceA call sync and wait
1117 * @tc.expected: step2. sync should return OK.
1118 */
1119 std::map<std::string, DBStatus> result;
1120 status = g_tool.SyncTest(g_kvDelegatePtr, devices, mode, result);
1121 ASSERT_TRUE(status == OK);
1122
1123 /**
1124 * @tc.expected: step2. onComplete should be called, DeviceB have {k1,v1}, send request message 3 times
1125 */
1126 ASSERT_TRUE(result.size() == devices.size());
1127 for (const auto &pair : result) {
1128 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1129 EXPECT_TRUE(pair.second == OK);
1130 }
1131 VirtualDataItem item;
1132 g_deviceB->GetData(key, item);
1133 EXPECT_TRUE(item.value == value);
1134
1135 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1136
1137 /**
1138 * @tc.steps: step3. reset sendRequestCount to 0, deviceA call sync and wait again without any change in db
1139 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 1, because this merge can not
1140 * be skipped
1141 */
1142 sendRequestCount = 0;
1143 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1144 ASSERT_TRUE(status == OK);
1145 EXPECT_EQ(sendRequestCount, 1);
1146 }
1147
1148 /**
1149 * @tc.name: PushSyncMergeCheck001
1150 * @tc.desc: Test push sync task merge, task can not be merged when the two sync task is not in the queue
1151 * at the same time.
1152 * @tc.type: FUNC
1153 * @tc.require: AR000F3OOV
1154 * @tc.author: zhangshijie
1155 */
1156 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck001, TestSize.Level1)
1157 {
1158 TestDifferentSyncMode(SYNC_MODE_PUSH_ONLY);
1159 }
1160
1161 /**
1162 * @tc.name: PushSyncMergeCheck002
1163 * @tc.desc: Test push_pull sync task merge, task can not be merged when the two sync task is not in the queue
1164 * at the same time.
1165 * @tc.type: FUNC
1166 * @tc.require: AR000F3OOV
1167 * @tc.author: zhangshijie
1168 */
1169 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck002, TestSize.Level1)
1170 {
1171 TestDifferentSyncMode(SYNC_MODE_PUSH_PULL);
1172 }
1173
PrepareForSyncMergeTest(std::vector<std::string> & devices,int & sendRequestCount)1174 void PrepareForSyncMergeTest(std::vector<std::string> &devices, int &sendRequestCount)
1175 {
1176 /**
1177 * @tc.steps: step1. deviceA put {k1, v1}
1178 */
1179 Key key = {'1'};
1180 Value value = {'1'};
1181 DBStatus status = g_kvDelegatePtr->Put(key, value);
1182 ASSERT_TRUE(status == OK);
1183
1184 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1185
1186 /**
1187 * @tc.steps: step2. deviceA call sync and don't wait
1188 * @tc.expected: step2. sync should return OK.
1189 */
1190 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
1191 [&sendRequestCount, devices, key, value](const std::map<std::string, DBStatus>& statusMap) {
1192 ASSERT_TRUE(statusMap.size() == devices.size());
1193 for (const auto &pair : statusMap) {
1194 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1195 EXPECT_TRUE(pair.second == OK);
1196 }
1197 VirtualDataItem item;
1198 g_deviceB->GetData(key, item);
1199 EXPECT_EQ(item.value, value);
1200 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1201
1202 // reset sendRequestCount to 0
1203 sendRequestCount = 0;
1204 });
1205 ASSERT_TRUE(status == OK);
1206 }
1207
1208 /**
1209 * @tc.name: PushSyncMergeCheck003
1210 * @tc.desc: Test push sync task merge, task can not be merged when there is change in db since last push sync
1211 * @tc.type: FUNC
1212 * @tc.require: AR000F3OOV
1213 * @tc.author: zhangshijie
1214 */
1215 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck003, TestSize.Level3)
1216 {
1217 DBStatus status = OK;
1218 std::vector<std::string> devices;
1219 devices.push_back(g_deviceB->GetDeviceId());
1220
1221 int sendRequestCount = 0;
1222 PrepareForSyncMergeTest(devices, sendRequestCount);
1223
1224 /**
1225 * @tc.steps: step3. deviceA call sync and don't wait
1226 * @tc.expected: step3. sync should return OK.
1227 */
1228 Key key = {'1'};
1229 Value value = {'2'};
1230 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f1b02(const std::map<std::string, DBStatus>& statusMap) 1231 [&sendRequestCount, devices, key, value, this](const std::map<std::string, DBStatus>& statusMap) {
1232 /**
1233 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1234 * skipped, but it is no need to do time sync and ability sync, only need to do data sync
1235 */
1236 ASSERT_TRUE(statusMap.size() == devices.size());
1237 for (const auto &pair : statusMap) {
1238 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1239 EXPECT_TRUE(pair.second == OK);
1240 }
1241 VirtualDataItem item;
1242 g_deviceB->GetData(key, item);
1243 EXPECT_EQ(item.value, value);
1244 });
1245 ASSERT_TRUE(status == OK);
1246
1247 /**
1248 * @tc.steps: step4. deviceA put {k1, v2}
1249 */
1250 while (sendRequestCount < TWO_CNT) {
1251 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1252 }
1253 status = g_kvDelegatePtr->Put(key, value);
1254 ASSERT_TRUE(status == OK);
1255 // wait for the second sync task finish
1256 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1257 EXPECT_EQ(sendRequestCount, 1);
1258 }
1259
1260 /**
1261 * @tc.name: PushSyncMergeCheck004
1262 * @tc.desc: Test push sync task merge, task can be merged when there is no change in db since last push sync
1263 * @tc.type: FUNC
1264 * @tc.require: AR000F3OOV
1265 * @tc.author: zhangshijie
1266 */
1267 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck004, TestSize.Level3)
1268 {
1269 DBStatus status = OK;
1270 std::vector<std::string> devices;
1271 devices.push_back(g_deviceB->GetDeviceId());
1272
1273 int sendRequestCount = 0;
1274 PrepareForSyncMergeTest(devices, sendRequestCount);
1275
1276 /**
1277 * @tc.steps: step3. deviceA call sync and don't wait
1278 * @tc.expected: step3. sync should return OK.
1279 */
1280 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f1c02(const std::map<std::string, DBStatus>& statusMap) 1281 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1282 /**
1283 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1284 * skipped
1285 */
1286 ASSERT_TRUE(statusMap.size() == devices.size());
1287 for (const auto &pair : statusMap) {
1288 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1289 EXPECT_TRUE(pair.second == OK);
1290 }
1291 });
1292 ASSERT_TRUE(status == OK);
1293 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1294 EXPECT_EQ(sendRequestCount, 0);
1295 }
1296
RegOnDispatchWithInvalidMsgAndCnt(int & sendRequestCount,int sleepMs,bool & invalid)1297 void RegOnDispatchWithInvalidMsgAndCnt(int &sendRequestCount, int sleepMs, bool &invalid)
1298 {
1299 g_communicatorAggregator->RegOnDispatch([&sendRequestCount, sleepMs, &invalid](
1300 const std::string &dev, Message *inMsg) {
1301 if (dev == DEVICE_B && !invalid && inMsg->GetMessageType() == TYPE_REQUEST) {
1302 inMsg->SetMessageType(TYPE_INVALID);
1303 inMsg->SetMessageId(INVALID_MESSAGE_ID);
1304 sendRequestCount++;
1305 invalid = true;
1306 LOGW("[Dispatch]invalid THIS MSG, sendRequestCount = %d", sendRequestCount);
1307 std::this_thread::sleep_for(std::chrono::milliseconds(sleepMs));
1308 }
1309 });
1310 }
1311
1312 /**
1313 * @tc.name: PushSyncMergeCheck005
1314 * @tc.desc: Test push sync task merge, task cannot be merged when the last push sync is failed
1315 * @tc.type: FUNC
1316 * @tc.require: AR000F3OOV
1317 * @tc.author: zhangshijie
1318 */
1319 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, SyncMergeCheck005, TestSize.Level3)
1320 {
1321 DBStatus status = OK;
1322 std::vector<std::string> devices;
1323 devices.push_back(g_deviceB->GetDeviceId());
1324
1325 /**
1326 * @tc.steps: step1. deviceA put {k1, v1}
1327 */
1328 Key key = {'1'};
1329 Value value = {'1'};
1330 status = g_kvDelegatePtr->Put(key, value);
1331 ASSERT_TRUE(status == OK);
1332
1333 int sendRequestCount = 0;
1334 bool invalid = false;
1335 RegOnDispatchWithInvalidMsgAndCnt(sendRequestCount, SLEEP_MILLISECONDS, invalid);
1336
1337 /**
1338 * @tc.steps: step2. deviceA call sync and don't wait
1339 * @tc.expected: step2. sync should return TIME_OUT.
1340 */
1341 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f1e02(const std::map<std::string, DBStatus>& statusMap) 1342 [&sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1343 ASSERT_TRUE(statusMap.size() == devices.size());
1344 for (const auto &deviceId : devices) {
1345 ASSERT_EQ(statusMap.at(deviceId), TIME_OUT);
1346 }
1347 });
1348 EXPECT_TRUE(status == OK);
1349
1350 /**
1351 * @tc.steps: step3. deviceA call sync and don't wait
1352 * @tc.expected: step3. sync should return OK.
1353 */
1354 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f1f02(const std::map<std::string, DBStatus>& statusMap) 1355 [key, value, &sendRequestCount, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1356 /**
1357 * @tc.expected: when the second sync task return, sendRequestCount should be 3, because this merge can not be
1358 * skipped, deviceB should have {k1, v1}.
1359 */
1360 ASSERT_TRUE(statusMap.size() == devices.size());
1361 for (const auto &pair : statusMap) {
1362 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1363 EXPECT_EQ(pair.second, OK);
1364 }
1365 VirtualDataItem item;
1366 g_deviceB->GetData(key, item);
1367 EXPECT_EQ(item.value, value);
1368 });
1369 ASSERT_TRUE(status == OK);
1370 while (sendRequestCount < 1) {
1371 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1372 }
1373 sendRequestCount = 0;
1374 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1375
1376 // wait for the second sync task finish
1377 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1378 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1379 }
1380
PrePareForQuerySyncMergeTest(bool isQuerySync,std::vector<std::string> & devices,Key & key,Value & value,int & sendRequestCount)1381 void PrePareForQuerySyncMergeTest(bool isQuerySync, std::vector<std::string> &devices,
1382 Key &key, Value &value, int &sendRequestCount)
1383 {
1384 DBStatus status = OK;
1385 /**
1386 * @tc.steps: step1. deviceA put {k1, v1}...{k10, v10}
1387 */
1388 Query query = Query::Select().PrefixKey(key);
1389 const int dataSize = 10;
1390 for (int i = 0; i < dataSize; i++) {
1391 key.push_back(i);
1392 value.push_back(i);
1393 status = g_kvDelegatePtr->Put(key, value);
1394 ASSERT_TRUE(status == OK);
1395 key.pop_back();
1396 value.pop_back();
1397 }
1398
1399 RegOnDispatchToGetSyncCount(sendRequestCount, SLEEP_MILLISECONDS);
1400 /**
1401 * @tc.steps: step2. deviceA call query sync and don't wait
1402 * @tc.expected: step2. sync should return OK.
1403 */
1404 auto completeCallBack = [&sendRequestCount, &key, &value, dataSize, devices]
1405 (const std::map<std::string, DBStatus>& statusMap) {
1406 ASSERT_TRUE(statusMap.size() == devices.size());
1407 for (const auto &pair : statusMap) {
1408 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1409 EXPECT_EQ(pair.second, OK);
1410 }
1411 // when first sync finish, DeviceB have {k1,v1}, {k3,v3}, {k5,v5} .. send request message 3 times
1412 VirtualDataItem item;
1413 for (int i = 0; i < dataSize; i++) {
1414 key.push_back(i);
1415 value.push_back(i);
1416 g_deviceB->GetData(key, item);
1417 EXPECT_EQ(item.value, value);
1418 key.pop_back();
1419 value.pop_back();
1420 }
1421 EXPECT_EQ(sendRequestCount, NORMAL_SYNC_SEND_REQUEST_CNT);
1422 // reset sendRequestCount to 0
1423 sendRequestCount = 0;
1424 };
1425 if (isQuerySync) {
1426 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack, query, false);
1427 } else {
1428 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY, completeCallBack);
1429 }
1430 ASSERT_TRUE(status == OK);
1431 }
1432
1433 /**
1434 * @tc.name: QuerySyncMergeCheck001
1435 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last query sync
1436 * @tc.type: FUNC
1437 * @tc.require: AR000F3OOV
1438 * @tc.author: zhangshijie
1439 */
1440 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck001, TestSize.Level3)
1441 {
1442 std::vector<std::string> devices;
1443 int sendRequestCount = 0;
1444 devices.push_back(g_deviceB->GetDeviceId());
1445
1446 Key key {'1'};
1447 Value value {'1'};
1448 Query query = Query::Select().PrefixKey(key);
1449 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1450
1451 /**
1452 * @tc.steps: step3. deviceA call query sync and don't wait
1453 * @tc.expected: step3. sync should return OK.
1454 */
1455 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f2102(const std::map<std::string, DBStatus>& statusMap) 1456 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1457 /**
1458 * @tc.expected: when the second sync task return, sendRequestCount should be 0, because this merge can be
1459 * skipped because there is no change in db since last query sync
1460 */
1461 ASSERT_TRUE(statusMap.size() == devices.size());
1462 for (const auto &pair : statusMap) {
1463 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1464 EXPECT_TRUE(pair.second == OK);
1465 }
1466 }, query, false);
1467 ASSERT_TRUE(status == OK);
1468 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1469 EXPECT_EQ(sendRequestCount, 0);
1470 }
1471
1472 /**
1473 * @tc.name: QuerySyncMergeCheck002
1474 * @tc.desc: Test query push sync task merge, task can not be merged when there is change in db since last sync
1475 * @tc.type: FUNC
1476 * @tc.require: AR000F3OOV
1477 * @tc.author: zhangshijie
1478 */
1479 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck002, TestSize.Level3)
1480 {
1481 std::vector<std::string> devices;
1482 int sendRequestCount = 0;
1483 devices.push_back(g_deviceB->GetDeviceId());
1484
1485 Key key {'1'};
1486 Value value {'1'};
1487 Query query = Query::Select().PrefixKey(key);
1488 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1489
1490 /**
1491 * @tc.steps: step3. deviceA call query sync and don't wait
1492 * @tc.expected: step3. sync should return OK.
1493 */
1494 Value value3{'3'};
1495 DBStatus status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f2202(const std::map<std::string, DBStatus>& statusMap) 1496 [&sendRequestCount, devices, key, value3, this](const std::map<std::string, DBStatus>& statusMap) {
1497 /**
1498 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1499 * skipped when there is change in db since last query sync, deviceB have {k1, v1'}
1500 */
1501 ASSERT_TRUE(statusMap.size() == devices.size());
1502 for (const auto &pair : statusMap) {
1503 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1504 EXPECT_TRUE(pair.second == OK);
1505 }
1506 VirtualDataItem item;
1507 g_deviceB->GetData(key, item);
1508 EXPECT_TRUE(item.value == value3);
1509 EXPECT_EQ(sendRequestCount, 1);
1510 }, query, false);
1511 ASSERT_TRUE(status == OK);
1512
1513 /**
1514 * @tc.steps: step4. deviceA put {k1, v1'}
1515 * @tc.steps: step4. reset sendRequestCount to 0, deviceA call sync and wait
1516 * @tc.expected: step4. sync should return OK, and sendRequestCount should be 1, because this merge can not
1517 * be skipped
1518 */
1519 while (sendRequestCount < TWO_CNT) {
1520 std::this_thread::sleep_for(std::chrono::milliseconds(THREE_HUNDRED));
1521 }
1522 g_kvDelegatePtr->Put(key, value3);
1523 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1524 }
1525
1526 /**
1527 * @tc.name: QuerySyncMergeCheck003
1528 * @tc.desc: Test query push sync task merge, task can not be merged when then query id is different
1529 * @tc.type: FUNC
1530 * @tc.require: AR000F3OOV
1531 * @tc.author: zhangshijie
1532 */
1533 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck003, TestSize.Level3)
1534 {
1535 std::vector<std::string> devices;
1536 int sendRequestCount = 0;
1537 devices.push_back(g_deviceB->GetDeviceId());
1538
1539 Key key {'1'};
1540 Value value {'1'};
1541 PrePareForQuerySyncMergeTest(true, devices, key, value, sendRequestCount);
1542
1543 /**
1544 * @tc.steps: step3. deviceA call another query sync
1545 * @tc.expected: step3. sync should return OK.
1546 */
1547 Key key2 = {'2'};
1548 Value value2 = {'2'};
1549 DBStatus status = g_kvDelegatePtr->Put(key2, value2);
1550 ASSERT_TRUE(status == OK);
1551 Query query2 = Query::Select().PrefixKey(key2);
1552 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f2302(const std::map<std::string, DBStatus>& statusMap) 1553 [&sendRequestCount, key2, value2, devices, this](const std::map<std::string, DBStatus>& statusMap) {
1554 /**
1555 * @tc.expected: when the second sync task return, sendRequestCount should be 1, because this merge can not be
1556 * skipped, deviceB have {k2,v2}
1557 */
1558 ASSERT_TRUE(statusMap.size() == devices.size());
1559 for (const auto &pair : statusMap) {
1560 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1561 EXPECT_TRUE(pair.second == OK);
1562 }
1563 VirtualDataItem item;
1564 g_deviceB->GetData(key2, item);
1565 EXPECT_TRUE(item.value == value2);
1566 EXPECT_EQ(sendRequestCount, 1);
1567 }, query2, false);
1568 ASSERT_TRUE(status == OK);
1569 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1570 }
1571
1572 /**
1573 * @tc.name: QuerySyncMergeCheck004
1574 * @tc.desc: Test query push sync task merge, task can be merged when there is no change in db since last push sync
1575 * @tc.type: FUNC
1576 * @tc.require: AR000F3OOV
1577 * @tc.author: zhangshijie
1578 */
1579 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, QuerySyncMergeCheck004, TestSize.Level3)
1580 {
1581 DBStatus status = OK;
1582 std::vector<std::string> devices;
1583 devices.push_back(g_deviceB->GetDeviceId());
1584
1585 Key key {'1'};
1586 Value value {'1'};
1587 int sendRequestCount = 0;
1588 PrePareForQuerySyncMergeTest(false, devices, key, value, sendRequestCount);
1589
1590 /**
1591 * @tc.steps: step3. deviceA call query sync without any change in db
1592 * @tc.expected: step3. sync should return OK, and sendRequestCount should be 0, because this merge can be skipped
1593 */
1594 Query query = Query::Select().PrefixKey(key);
1595 status = g_kvDelegatePtr->Sync(devices, SYNC_MODE_PUSH_ONLY,
__anon6e997c5f2402(const std::map<std::string, DBStatus>& statusMap) 1596 [devices, this](const std::map<std::string, DBStatus>& statusMap) {
1597 /**
1598 * @tc.expected step3: when the second sync task return, sendRequestCount should be 0, because this merge
1599 * can be skipped because there is no change in db since last push sync
1600 */
1601 ASSERT_TRUE(statusMap.size() == devices.size());
1602 for (const auto &pair : statusMap) {
1603 LOGD("dev %s, status %d", pair.first.c_str(), pair.second);
1604 EXPECT_TRUE(pair.second == OK);
1605 }
1606 }, query, false);
1607 ASSERT_TRUE(status == OK);
1608 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1609 EXPECT_EQ(sendRequestCount, 0);
1610 }
1611
1612 /**
1613 * @tc.name: GetDataNotify001
1614 * @tc.desc: Test GetDataNotify function, delay < 30s should sync ok, > 36 should timeout
1615 * @tc.type: FUNC
1616 * @tc.require: AR000D4876
1617 * @tc.author: zhangqiquan
1618 */
1619 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify001, TestSize.Level3)
1620 {
1621 ASSERT_NE(g_kvDelegatePtr, nullptr);
1622 DBStatus status = OK;
1623 std::vector<std::string> devices;
1624 devices.push_back(g_deviceB->GetDeviceId());
1625 const std::string DEVICE_A = "real_device";
1626 /**
1627 * @tc.steps: step1. deviceB set get data delay 40s
1628 */
1629 g_deviceB->DelayGetSyncData(WAIT_40_SECONDS);
1630 g_communicatorAggregator->SetTimeout(DEVICE_A, TIMEOUT_6_SECONDS);
1631
1632 /**
1633 * @tc.steps: step2. deviceA call sync and wait
1634 * @tc.expected: step2. sync should return OK. onComplete should be called, deviceB sync TIME_OUT.
1635 */
1636 std::map<std::string, DBStatus> result;
1637 std::map<std::string, int> virtualRes;
1638 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1639 EXPECT_EQ(status, OK);
1640 EXPECT_EQ(result.size(), devices.size());
1641 EXPECT_EQ(result[DEVICE_B], TIME_OUT);
1642 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1643 Query query = Query::Select();
__anon6e997c5f2502(std::map<std::string, int> resMap) 1644 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1645 virtualRes = std::move(resMap);
1646 }, true);
1647 EXPECT_EQ(virtualRes.size(), devices.size());
1648 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_TIMEOUT));
1649 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1650
1651 /**
1652 * @tc.steps: step3. deviceB set get data delay 30s
1653 */
1654 g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1655 /**
1656 * @tc.steps: step4. deviceA call sync and wait
1657 * @tc.expected: step4. sync should return OK. onComplete should be called, deviceB sync OK.
1658 */
1659 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1660 EXPECT_EQ(status, OK);
1661 EXPECT_EQ(result.size(), devices.size());
1662 EXPECT_EQ(result[DEVICE_B], OK);
1663 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
__anon6e997c5f2602(std::map<std::string, int> resMap) 1664 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1665 virtualRes = std::move(resMap);
1666 }, true);
1667 EXPECT_EQ(virtualRes.size(), devices.size());
1668 EXPECT_EQ(virtualRes[DEVICE_A], static_cast<int>(SyncOperation::OP_FINISHED_ALL));
1669 g_deviceB->DelayGetSyncData(0);
1670 }
1671
1672 /**
1673 * @tc.name: GetDataNotify002
1674 * @tc.desc: Test GetDataNotify function, two device sync each other at same time
1675 * @tc.type: FUNC
1676 * @tc.require: AR000D4876
1677 * @tc.author: zhangqiquan
1678 */
1679 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, GetDataNotify002, TestSize.Level3)
1680 {
1681 ASSERT_NE(g_kvDelegatePtr, nullptr);
1682 DBStatus status = OK;
1683 std::vector<std::string> devices;
1684 devices.push_back(g_deviceB->GetDeviceId());
1685 const std::string DEVICE_A = "real_device";
1686
1687 /**
1688 * @tc.steps: step1. deviceA sync first to finish time sync and ability sync
1689 */
1690 std::map<std::string, DBStatus> result;
1691 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1692 EXPECT_EQ(status, OK);
1693 EXPECT_EQ(result.size(), devices.size());
1694 EXPECT_EQ(result[DEVICE_B], OK);
1695 /**
1696 * @tc.steps: step2. deviceB set get data delay 30s
1697 */
1698 g_deviceB->DelayGetSyncData(WAIT_30_SECONDS);
1699
1700 /**
1701 * @tc.steps: step3. deviceB call sync and wait
1702 */
__anon6e997c5f2702() 1703 std::thread asyncThread([]() {
1704 std::map<std::string, int> virtualRes;
1705 Query query = Query::Select();
1706 g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, query, [&virtualRes](std::map<std::string, int> resMap) {
1707 virtualRes = std::move(resMap);
1708 }, true);
1709 });
1710
1711 /**
1712 * @tc.steps: step4. deviceA call sync and wait
1713 * @tc.expected: step4. sync should return OK. because notify timer trigger (30s - 1s)/2s => 15times
1714 */
1715 std::this_thread::sleep_for(std::chrono::seconds(1));
1716 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result, true);
1717 EXPECT_EQ(status, OK);
1718 EXPECT_EQ(result.size(), devices.size());
1719 EXPECT_EQ(result[DEVICE_B], OK);
1720 asyncThread.join();
1721 std::this_thread::sleep_for(std::chrono::seconds(TEN_SECONDS));
1722 }
1723
1724 /**
1725 * @tc.name: DelaySync001
1726 * @tc.desc: Test delay first packet will not effect data conflict
1727 * @tc.type: FUNC
1728 * @tc.require:
1729 * @tc.author: zqq
1730 */
1731 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, DelaySync001, TestSize.Level3)
1732 {
1733 // B put (k, b) after A put (k, a)
1734 Key key = {'k'};
1735 Value aValue = {'a'};
1736 g_kvDelegatePtr->Put(key, aValue);
1737 std::this_thread::sleep_for(std::chrono::seconds(1)); // sleep 1s for data conflict
1738 Timestamp currentTime = TimeHelper::GetSysCurrentTime() + TimeHelper::BASE_OFFSET;
1739 Value bValue = {'b'};
1740 EXPECT_EQ(g_deviceB->PutData(key, bValue, currentTime, 0), E_OK);
1741
1742 // delay time sync message, delay time/2 should greater than put sleep time
1743 g_communicatorAggregator->SetTimeout(DEVICE_B, DBConstant::MAX_TIMEOUT);
1744 g_communicatorAggregator->SetTimeout("real_device", DBConstant::MAX_TIMEOUT);
__anon6e997c5f2902(const std::string &dstTarget, const Message *msg) 1745 g_communicatorAggregator->RegBeforeDispatch([](const std::string &dstTarget, const Message *msg) {
1746 if (dstTarget == DEVICE_B && msg->GetMessageId() == MessageId::TIME_SYNC_MESSAGE) {
1747 std::this_thread::sleep_for(std::chrono::seconds(3)); // sleep for 3s
1748 }
1749 });
1750
1751 // A call sync and (k, b) in A
1752 std::vector<std::string> devices;
1753 devices.push_back(g_deviceB->GetDeviceId());
1754 std::map<std::string, DBStatus> result;
1755 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PULL_ONLY, result, true);
1756 EXPECT_EQ(status, OK);
1757 EXPECT_EQ(result.size(), devices.size());
1758 EXPECT_EQ(result[DEVICE_B], OK);
1759
1760 Value actualValue;
1761 g_kvDelegatePtr->Get(key, actualValue);
1762 EXPECT_EQ(actualValue, bValue);
1763 g_communicatorAggregator->RegBeforeDispatch(nullptr);
1764 }
1765
1766 /**
1767 * @tc.name: KVAbilitySyncOpt001
1768 * @tc.desc: check ability sync 2 packet
1769 * @tc.type: FUNC
1770 * @tc.require:
1771 * @tc.author: zhangqiquan
1772 */
1773 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt001, TestSize.Level0)
1774 {
1775 /**
1776 * @tc.steps: step1. record packet
1777 * @tc.expected: step1. sync should failed in source.
1778 */
1779 std::atomic<int> messageCount = 0;
__anon6e997c5f2a02(const std::string &dev, Message *msg) 1780 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
1781 if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1782 return;
1783 }
1784 messageCount++;
1785 EXPECT_GE(g_kvDelegatePtr->GetTaskCount(), 1);
1786 });
1787 /**
1788 * @tc.steps: step2. deviceA call sync and wait
1789 * @tc.expected: step2. sync should return SECURITY_OPTION_CHECK_ERROR.
1790 */
1791 DBStatus status = OK;
1792 std::vector<std::string> devices;
1793 devices.push_back(g_deviceB->GetDeviceId());
1794 std::map<std::string, DBStatus> result;
1795 status = g_tool.SyncTest(g_kvDelegatePtr, devices, SYNC_MODE_PUSH_ONLY, result);
1796 EXPECT_EQ(status, OK);
1797 EXPECT_EQ(messageCount, 2); // 2 ability sync
1798 for (const auto &pair : result) {
1799 EXPECT_EQ(pair.second, OK);
1800 }
1801 }
1802
1803 /**
1804 * @tc.name: KVAbilitySyncOpt002
1805 * @tc.desc: check get task count while conn is nullptr.
1806 * @tc.type: FUNC
1807 * @tc.require:
1808 * @tc.author: caihaoting
1809 */
1810 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVAbilitySyncOpt002, TestSize.Level0)
1811 {
1812 /**
1813 * @tc.steps: step1. record packet while conn is nullptr.
1814 * @tc.expected: step1. sync should failed in source and get task count return DB_ERROR.
1815 */
1816 auto kvStoreImpl = static_cast<KvStoreNbDelegateImpl *>(g_kvDelegatePtr);
1817 EXPECT_EQ(kvStoreImpl->Close(), OK);
1818 std::atomic<int> messageCount = 0;
__anon6e997c5f2b02(const std::string &dev, Message *msg) 1819 g_communicatorAggregator->RegOnDispatch([&messageCount](const std::string &dev, Message *msg) {
1820 if (msg->GetMessageId() != ABILITY_SYNC_MESSAGE) {
1821 return;
1822 }
1823 messageCount++;
1824 EXPECT_EQ(g_kvDelegatePtr->GetTaskCount(), DB_ERROR);
1825 });
1826 }
1827
1828 /**
1829 * @tc.name: KVSyncOpt001
1830 * @tc.desc: check time sync and ability sync once
1831 * @tc.type: FUNC
1832 * @tc.require:
1833 * @tc.author: zhangqiquan
1834 */
1835 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt001, TestSize.Level0)
1836 {
1837 /**
1838 * @tc.steps: step1. record packet which send to B
1839 */
1840 std::atomic<int> messageCount = 0;
1841 RegOnDispatchWithoutDataPacket(messageCount);
1842 /**
1843 * @tc.steps: step2. deviceA call sync and wait
1844 * @tc.expected: step2. sync should return OK.
1845 */
1846 std::vector<std::string> devices;
1847 devices.push_back(g_deviceB->GetDeviceId());
1848 Sync(devices, OK);
1849 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1850 /**
1851 * @tc.steps: step3. reopen kv store
1852 * @tc.expected: step3. reopen OK.
1853 */
1854 ReOpenDB();
1855 /**
1856 * @tc.steps: step4. reopen kv store and sync again
1857 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
1858 */
1859 messageCount = 0;
1860 Sync(devices, OK);
1861 EXPECT_EQ(messageCount, 0);
1862 g_communicatorAggregator->RegOnDispatch(nullptr);
1863 }
1864
1865 /**
1866 * @tc.name: KVSyncOpt002
1867 * @tc.desc: check device time sync once
1868 * @tc.type: FUNC
1869 * @tc.require:
1870 * @tc.author: zhangqiquan
1871 */
1872 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt002, TestSize.Level0)
1873 {
1874 /**
1875 * @tc.steps: step1. record packet which send to B
1876 */
1877 std::atomic<int> messageCount = 0;
1878 RegOnDispatchWithoutDataPacket(messageCount);
1879 /**
1880 * @tc.steps: step2. deviceA call sync and wait
1881 * @tc.expected: step2. sync should return OK.
1882 */
1883 std::vector<std::string> devices;
1884 devices.push_back(g_deviceB->GetDeviceId());
1885 Sync(devices, OK);
1886 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1887 // close kv store avoid packet dispatch error
1888 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
1889 g_kvDelegatePtr = nullptr;
1890 ASSERT_EQ(g_mgr.DeleteKvStore(STORE_ID), OK);
1891 EXPECT_TRUE(RuntimeContext::GetInstance()->IsTimeTickMonitorValid());
1892 /**
1893 * @tc.steps: step3. open new kv store
1894 * @tc.expected: step3. open OK.
1895 */
1896 KvStoreNbDelegate::Option option;
1897 option.secOption.securityLabel = SecurityLabel::S3;
1898 option.secOption.securityFlag = SecurityFlag::SECE;
1899 KvStoreNbDelegate *delegate2 = nullptr;
__anon6e997c5f2c02(DBStatus status, KvStoreNbDelegate *delegate) 1900 g_mgr.GetKvStore(STORE_ID_2, option, [&delegate2](DBStatus status, KvStoreNbDelegate *delegate) {
1901 delegate2 = delegate;
1902 EXPECT_EQ(status, OK);
1903 });
1904 /**
1905 * @tc.steps: step4. sync again
1906 * @tc.expected: step4. sync success, only ability sync packet.
1907 */
1908 messageCount = 0;
1909 Sync(delegate2, devices, OK);
1910 EXPECT_EQ(messageCount, 1); // 1 contain ability sync packet
1911 EXPECT_EQ(g_mgr.CloseKvStore(delegate2), OK);
1912 EXPECT_EQ(g_mgr.DeleteKvStore(STORE_ID_2), OK);
1913 g_communicatorAggregator->RegOnDispatch(nullptr);
1914 }
1915
1916 /**
1917 * @tc.name: KVSyncOpt003
1918 * @tc.desc: check time sync and ability sync once
1919 * @tc.type: FUNC
1920 * @tc.require:
1921 * @tc.author: zhangqiquan
1922 */
1923 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt003, TestSize.Level0)
1924 {
1925 /**
1926 * @tc.steps: step1. record packet which send to B
1927 */
1928 std::atomic<int> messageCount = 0;
1929 RegOnDispatchWithoutDataPacket(messageCount);
1930 /**
1931 * @tc.steps: step2. deviceA call sync and wait
1932 * @tc.expected: step2. sync should return OK.
1933 */
1934 std::vector<std::string> devices;
1935 devices.push_back(g_deviceB->GetDeviceId());
1936 Sync(devices, OK);
1937 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
1938 /**
1939 * @tc.steps: step3. reopen kv store
1940 * @tc.expected: step3. reopen OK.
1941 */
1942 ReOpenDB();
1943 /**
1944 * @tc.steps: step4. reopen kv store and sync again
1945 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
1946 */
1947 messageCount = 0;
1948 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
1949 EXPECT_EQ(messageCount, 0);
1950 g_communicatorAggregator->RegOnDispatch(nullptr);
1951 }
1952
1953 /**
1954 * @tc.name: KVSyncOpt004
1955 * @tc.desc: check sync in keys after reopen db
1956 * @tc.type: FUNC
1957 * @tc.require:
1958 * @tc.author: zhangqiquan
1959 */
1960 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt004, TestSize.Level0)
1961 {
1962 /**
1963 * @tc.steps: step1. deviceA call sync and wait
1964 * @tc.expected: step1. sync should return OK.
1965 */
1966 std::vector<std::string> devices;
1967 devices.push_back(g_deviceB->GetDeviceId());
1968 Sync(devices, OK);
1969 /**
1970 * @tc.steps: step2. reopen kv store
1971 * @tc.expected: step2. reopen OK.
1972 */
1973 ReOpenDB();
1974 /**
1975 * @tc.steps: step3. sync with in keys
1976 * @tc.expected: step3. sync OK.
1977 */
1978 std::map<std::string, DBStatus> result;
1979 std::set<Key> condition;
1980 condition.insert({'k'});
1981 Query query = Query::Select().InKeys(condition);
1982 DBStatus status = g_tool.SyncTest(g_kvDelegatePtr, devices, DistributedDB::SYNC_MODE_PUSH_ONLY, result, query);
1983 EXPECT_EQ(status, OK);
1984 for (const auto &deviceId : devices) {
1985 EXPECT_EQ(result[deviceId], OK);
1986 }
1987 }
1988
1989 /**
1990 * @tc.name: KVSyncOpt005
1991 * @tc.desc: check record ability finish after receive ability sync
1992 * @tc.type: FUNC
1993 * @tc.require:
1994 * @tc.author: zhangqiquan
1995 */
1996 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt005, TestSize.Level0)
1997 {
1998 /**
1999 * @tc.steps: step1. record packet which send to B
2000 */
2001 std::atomic<int> messageCount = 0;
2002 RegOnDispatchWithoutDataPacket(messageCount, true);
2003 /**
2004 * @tc.steps: step2. deviceB call sync and wait
2005 * @tc.expected: step2. sync should return OK.
2006 */
2007 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2008 EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2009 /**
2010 * @tc.steps: step3. reopen kv store
2011 * @tc.expected: step3. reopen OK.
2012 */
2013 ReOpenDB();
2014 /**
2015 * @tc.steps: step4. reopen kv store and sync again
2016 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2017 */
2018 messageCount = 0;
2019 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2020 EXPECT_EQ(messageCount, 0);
2021 g_communicatorAggregator->RegOnDispatch(nullptr);
2022 }
2023
2024 /**
2025 * @tc.name: KVSyncOpt006
2026 * @tc.desc: check time sync and ability sync once after rebuild
2027 * @tc.type: FUNC
2028 * @tc.require:
2029 * @tc.author: zhangqiquan
2030 */
2031 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt006, TestSize.Level0)
2032 {
2033 /**
2034 * @tc.steps: step1. record packet which send to B
2035 */
2036 std::atomic<int> messageCount = 0;
2037 RegOnDispatchWithoutDataPacket(messageCount, true);
2038 /**
2039 * @tc.steps: step2. deviceA call sync and wait
2040 * @tc.expected: step2. sync should return OK.
2041 */
2042 std::vector<std::string> devices;
2043 devices.push_back(g_deviceB->GetDeviceId());
2044 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2045 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2046 /**
2047 * @tc.steps: step3. rebuild kv store
2048 * @tc.expected: step3. rebuild OK.
2049 */
2050 ASSERT_EQ(g_mgr.CloseKvStore(g_kvDelegatePtr), OK);
2051 g_kvDelegatePtr = nullptr;
2052 g_mgr.DeleteKvStore(STORE_ID);
2053 KvStoreNbDelegate::Option option;
2054 option.secOption.securityLabel = SecurityLabel::S3;
2055 option.secOption.securityFlag = SecurityFlag::SECE;
2056 g_mgr.GetKvStore(STORE_ID, option, g_kvDelegateCallback);
2057 ASSERT_TRUE(g_kvDelegateStatus == OK);
2058 ASSERT_TRUE(g_kvDelegatePtr != nullptr);
2059 /**
2060 * @tc.steps: step4. rebuild kv store and sync again
2061 * @tc.expected: step4. rebuild OK and sync success, re ability sync.
2062 */
2063 messageCount = 0;
2064 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2065 EXPECT_EQ(messageCount, 1);
2066 g_communicatorAggregator->RegOnDispatch(nullptr);
2067 }
2068
2069 /**
2070 * @tc.name: KVSyncOpt007
2071 * @tc.desc: check re ability sync after import
2072 * @tc.type: FUNC
2073 * @tc.require:
2074 * @tc.author: zhangqiquan
2075 */
2076 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVSyncOpt007, TestSize.Level0)
2077 {
2078 /**
2079 * @tc.steps: step1. record packet which send to B
2080 */
2081 std::atomic<int> messageCount = 0;
2082 RegOnDispatchWithoutDataPacket(messageCount, true);
2083 /**
2084 * @tc.steps: step2. deviceB call sync and wait
2085 * @tc.expected: step2. sync should return OK.
2086 */
2087 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2088 EXPECT_EQ(messageCount, 2); // DEV_A send negotiation 2 ack packet.
2089 /**
2090 * @tc.steps: step3. export and import
2091 * @tc.expected: step3. export and import OK.
2092 */
2093 std::string singleExportFileName = g_testDir + "/KVSyncOpt007.$$";
2094 CipherPassword passwd;
2095 EXPECT_EQ(g_kvDelegatePtr->Export(singleExportFileName, passwd), OK);
2096 EXPECT_EQ(g_kvDelegatePtr->Import(singleExportFileName, passwd), OK);
2097 /**
2098 * @tc.steps: step4. reopen kv store and sync again
2099 * @tc.expected: step4. reopen OK and sync success, no negotiation packet.
2100 */
2101 messageCount = 0;
2102 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2103 EXPECT_EQ(messageCount, 1); // DEV_A send negotiation 1 ack packet.
2104 g_communicatorAggregator->RegOnDispatch(nullptr);
2105 }
2106
2107 /**
2108 * @tc.name: KVTimeChange001
2109 * @tc.desc: check time sync and ability sync once
2110 * @tc.type: FUNC
2111 * @tc.require:
2112 * @tc.author: zhangqiquan
2113 */
2114 HWTEST_F(DistributedDBSingleVerP2PSyncCheckTest, KVTimeChange001, TestSize.Level0)
2115 {
2116 /**
2117 * @tc.steps: step1. record packet which send to B
2118 */
2119 std::atomic<int> messageCount = 0;
2120 RegOnDispatchWithoutDataPacket(messageCount);
2121 /**
2122 * @tc.steps: step2. deviceA call sync and wait
2123 * @tc.expected: step2. sync should return OK.
2124 */
2125 std::vector<std::string> devices;
2126 devices.push_back(g_deviceB->GetDeviceId());
2127 Sync(devices, OK);
2128 EXPECT_EQ(messageCount, 2); // 2 contain time sync request packet and ability sync packet
2129 /**
2130 * @tc.steps: step3. sync again
2131 * @tc.expected: step3. sync success, no negotiation packet.
2132 */
2133 messageCount = 0;
2134 Sync(devices, OK);
2135 EXPECT_EQ(messageCount, 0);
2136 /**
2137 * @tc.steps: step4. modify time offset and sync again
2138 * @tc.expected: step4. sync success, only time sync packet.
2139 */
2140 RuntimeContext::GetInstance()->NotifyTimestampChanged(100);
2141 RuntimeContext::GetInstance()->RecordAllTimeChange();
2142 RuntimeContext::GetInstance()->ClearAllDeviceTimeInfo();
2143 messageCount = 0;
2144 Sync(devices, OK);
2145 EXPECT_EQ(messageCount, 1); // 1 contain time sync request packet
2146 messageCount = 0;
2147 EXPECT_EQ(g_deviceB->Sync(SYNC_MODE_PUSH_ONLY, true), E_OK);
2148 EXPECT_EQ(messageCount, 0);
2149 g_communicatorAggregator->RegOnDispatch(nullptr);
2150 }
2151 }