1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <gmock/gmock.h>
18 #include <new>
19 #include <thread>
20 #include "db_errno.h"
21 #include "distributeddb_communicator_common.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "log_print.h"
24 #include "network_adapter.h"
25 #include "message.h"
26 #include "mock_process_communicator.h"
27 #include "serial_buffer.h"
28
29 using namespace std;
30 using namespace testing::ext;
31 using namespace DistributedDB;
32
33 namespace {
34 EnvHandle g_envDeviceA;
35 EnvHandle g_envDeviceB;
36 EnvHandle g_envDeviceC;
37 ICommunicator *g_commAA = nullptr;
38 ICommunicator *g_commAB = nullptr;
39 ICommunicator *g_commBB = nullptr;
40 ICommunicator *g_commBC = nullptr;
41 ICommunicator *g_commCC = nullptr;
42 ICommunicator *g_commCA = nullptr;
43 }
44
45 class DistributedDBCommunicatorDeepTest : public testing::Test {
46 public:
47 static void SetUpTestCase(void);
48 static void TearDownTestCase(void);
49 void SetUp();
50 void TearDown();
51 };
52
SetUpTestCase(void)53 void DistributedDBCommunicatorDeepTest::SetUpTestCase(void)
54 {
55 /**
56 * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
57 */
58 LOGI("[UT][DeepTest][SetUpTestCase] Enter.");
59 bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
60 ASSERT_EQ(errCode, true);
61 errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
62 ASSERT_EQ(errCode, true);
63 errCode = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
64 ASSERT_EQ(errCode, true);
65 DoRegTransformFunction();
66 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
67 }
68
TearDownTestCase(void)69 void DistributedDBCommunicatorDeepTest::TearDownTestCase(void)
70 {
71 /**
72 * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
73 */
74 LOGI("[UT][DeepTest][TearDownTestCase] Enter.");
75 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
76 TearDownEnv(g_envDeviceA);
77 TearDownEnv(g_envDeviceB);
78 TearDownEnv(g_envDeviceC);
79 CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
80 }
81
82 namespace {
AllocAllCommunicator()83 void AllocAllCommunicator()
84 {
85 int errorNo = E_OK;
86 g_commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
87 ASSERT_NOT_NULL_AND_ACTIVATE(g_commAA);
88 g_commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
89 ASSERT_NOT_NULL_AND_ACTIVATE(g_commAB);
90 g_commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
91 ASSERT_NOT_NULL_AND_ACTIVATE(g_commBB);
92 g_commBC = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
93 ASSERT_NOT_NULL_AND_ACTIVATE(g_commBC);
94 g_commCC = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
95 ASSERT_NOT_NULL_AND_ACTIVATE(g_commCC);
96 g_commCA = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
97 ASSERT_NOT_NULL_AND_ACTIVATE(g_commCA);
98 }
99
ReleaseAllCommunicator()100 void ReleaseAllCommunicator()
101 {
102 g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAA);
103 g_commAA = nullptr;
104 g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAB);
105 g_commAB = nullptr;
106 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBB);
107 g_commBB = nullptr;
108 g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBC);
109 g_commBC = nullptr;
110 g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCC);
111 g_commCC = nullptr;
112 g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCA);
113 g_commCA = nullptr;
114 }
115 }
116
SetUp()117 void DistributedDBCommunicatorDeepTest::SetUp()
118 {
119 DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
120 /**
121 * @tc.setup: Alloc communicator AA, AB, BB, BC, CC, CA
122 */
123 AllocAllCommunicator();
124 }
125
TearDown()126 void DistributedDBCommunicatorDeepTest::TearDown()
127 {
128 /**
129 * @tc.teardown: Release communicator AA, AB, BB, BC, CC, CA
130 */
131 ReleaseAllCommunicator();
132 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure all thread quiet
133 }
134
135 /**
136 * @tc.name: WaitAndRetrySend 001
137 * @tc.desc: Test send retry semantic
138 * @tc.type: FUNC
139 * @tc.require: AR000BVDGI AR000CQE0M
140 * @tc.author: xiaozhenjian
141 */
142 HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend001, TestSize.Level2)
143 {
144 // Preset
145 Message *msgForBB = nullptr;
__anonb9037cf20302(const std::string &srcTarget, Message *inMsg) 146 g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, Message *inMsg) {
147 msgForBB = inMsg;
148 }, nullptr);
149 Message *msgForCA = nullptr;
__anonb9037cf20402(const std::string &srcTarget, Message *inMsg) 150 g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, Message *inMsg) {
151 msgForCA = inMsg;
152 }, nullptr);
153
154 /**
155 * @tc.steps: step1. connect device A with device B
156 */
157 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
158 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
159 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
160
161 /**
162 * @tc.steps: step2. device A simulate send retry
163 */
164 g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
165
166 /**
167 * @tc.steps: step3. device A send message to device B using communicator AB
168 * @tc.expected: step3. communicator BB received no message
169 */
170 Message *msgForAB = BuildRegedTinyMessage();
171 ASSERT_NE(msgForAB, nullptr);
172 SendConfig conf = {true, false, 0};
173 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, msgForAB, conf);
174 EXPECT_EQ(errCode, E_OK);
175
176 Message *msgForAA = BuildRegedTinyMessage();
177 ASSERT_NE(msgForAA, nullptr);
178 errCode = g_commAA->SendMessage(DEVICE_NAME_C, msgForAA, conf);
179 EXPECT_EQ(errCode, E_OK);
180 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
181 EXPECT_EQ(msgForBB, nullptr);
182 EXPECT_NE(msgForCA, nullptr);
183 delete msgForCA;
184 msgForCA = nullptr;
185
186 /**
187 * @tc.steps: step4. device A simulate sendable feedback
188 * @tc.expected: step4. communicator BB received the message
189 */
190 g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
191 std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
192 EXPECT_NE(msgForBB, nullptr);
193 delete msgForBB;
194 msgForBB = nullptr;
195
196 // CleanUp
197 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
198 }
199
CreateBufferThenAddIntoScheduler(SendTaskScheduler & scheduler,const std::string & dstTarget,Priority inPrio)200 static int CreateBufferThenAddIntoScheduler(SendTaskScheduler &scheduler, const std::string &dstTarget, Priority inPrio)
201 {
202 SerialBuffer *eachBuff = new (std::nothrow) SerialBuffer();
203 if (eachBuff == nullptr) {
204 return -E_OUT_OF_MEMORY;
205 }
206 int errCode = eachBuff->AllocBufferByTotalLength(100, 0); // 100 totallen without header
207 if (errCode != E_OK) {
208 delete eachBuff;
209 eachBuff = nullptr;
210 return errCode;
211 }
212 SendTask task{eachBuff, dstTarget, nullptr, 0u};
213 errCode = scheduler.AddSendTaskIntoSchedule(task, inPrio);
214 if (errCode != E_OK) {
215 delete eachBuff;
216 eachBuff = nullptr;
217 return errCode;
218 }
219 return E_OK;
220 }
221
222 /**
223 * @tc.name: SendSchedule 001
224 * @tc.desc: Test schedule in Priority order than in send order
225 * @tc.type: FUNC
226 * @tc.require: AR000BVDGI AR000CQE0M
227 * @tc.author: xiaozhenjian
228 */
229 HWTEST_F(DistributedDBCommunicatorDeepTest, SendSchedule001, TestSize.Level2)
230 {
231 // Preset
232 SendTaskScheduler scheduler;
233 scheduler.Initialize();
234
235 /**
236 * @tc.steps: step1. Add low priority target A buffer to schecduler
237 */
238 int errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::LOW);
239 EXPECT_EQ(errCode, E_OK);
240
241 /**
242 * @tc.steps: step2. Add low priority target B buffer to schecduler
243 */
244 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::LOW);
245 EXPECT_EQ(errCode, E_OK);
246
247 /**
248 * @tc.steps: step3. Add normal priority target B buffer to schecduler
249 */
250 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::NORMAL);
251 EXPECT_EQ(errCode, E_OK);
252
253 /**
254 * @tc.steps: step4. Add normal priority target C buffer to schecduler
255 */
256 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::NORMAL);
257 EXPECT_EQ(errCode, E_OK);
258
259 /**
260 * @tc.steps: step5. Add high priority target C buffer to schecduler
261 */
262 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::HIGH);
263 EXPECT_EQ(errCode, E_OK);
264
265 /**
266 * @tc.steps: step6. Add high priority target A buffer to schecduler
267 */
268 errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::HIGH);
269 EXPECT_EQ(errCode, E_OK);
270
271 /**
272 * @tc.steps: step7. schedule out buffers one by one
273 * @tc.expected: step7. the order is: high priority target C
274 * high priority target A
275 * normal priority target B
276 * normal priority target C
277 * low priority target A
278 * low priority target B
279 */
280 SendTask outTask;
281 SendTaskInfo outTaskInfo;
282 uint32_t totalLength = 0;
283 // high priority target C
284 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
285 ASSERT_EQ(errCode, E_OK);
286 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
287 EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
288 scheduler.FinalizeLastScheduleTask();
289 // high priority target A
290 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
291 ASSERT_EQ(errCode, E_OK);
292 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
293 EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
294 scheduler.FinalizeLastScheduleTask();
295 // normal priority target B
296 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
297 ASSERT_EQ(errCode, E_OK);
298 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
299 EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
300 scheduler.FinalizeLastScheduleTask();
301 // normal priority target C
302 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
303 ASSERT_EQ(errCode, E_OK);
304 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
305 EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
306 scheduler.FinalizeLastScheduleTask();
307 // low priority target A
308 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
309 ASSERT_EQ(errCode, E_OK);
310 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
311 EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
312 scheduler.FinalizeLastScheduleTask();
313 // low priority target B
314 errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
315 ASSERT_EQ(errCode, E_OK);
316 EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
317 EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
318 scheduler.FinalizeLastScheduleTask();
319 }
320
321 /**
322 * @tc.name: Fragment 001
323 * @tc.desc: Test fragmentation in send and receive
324 * @tc.type: FUNC
325 * @tc.require: AR000BVDGI AR000CQE0M
326 * @tc.author: xiaozhenjian
327 */
328 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment001, TestSize.Level2)
329 {
330 // Preset
331 Message *recvMsgForBB = nullptr;
__anonb9037cf20502(const std::string &srcTarget, Message *inMsg) 332 g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
333 recvMsgForBB = inMsg;
334 }, nullptr);
335
336 /**
337 * @tc.steps: step1. connect device A with device B
338 */
339 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
340
341 /**
342 * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
343 * @tc.expected: step2. communicator BB received the message
344 */
345 const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
346 Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
347 ASSERT_NE(sendMsgForAB, nullptr);
348 SendConfig conf = {false, false, 0};
349 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
350 EXPECT_EQ(errCode, E_OK);
351 std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
352 ASSERT_NE(recvMsgForBB, nullptr);
353 ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
354
355 /**
356 * @tc.steps: step3. Compare received data with send data
357 * @tc.expected: step3. equal
358 */
359 Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
360 ASSERT_NE(oriMsgForAB, nullptr);
361 const RegedGiantObject *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
362 ASSERT_NE(oriObjForAB, nullptr);
363 const RegedGiantObject *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
364 ASSERT_NE(recvObjForBB, nullptr);
365 bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
366 EXPECT_EQ(isEqual, true);
367
368 // CleanUp
369 delete oriMsgForAB;
370 oriMsgForAB = nullptr;
371 delete recvMsgForBB;
372 recvMsgForBB = nullptr;
373 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
374 }
375
376 /**
377 * @tc.name: Fragment 002
378 * @tc.desc: Test fragmentation in partial loss
379 * @tc.type: FUNC
380 * @tc.require: AR000BVDGI AR000CQE0M
381 * @tc.author: xiaozhenjian
382 */
383 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment002, TestSize.Level2)
384 {
385 // Preset
386 Message *recvMsgForCC = nullptr;
__anonb9037cf20602(const std::string &srcTarget, Message *inMsg) 387 g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, Message *inMsg) {
388 recvMsgForCC = inMsg;
389 }, nullptr);
390
391 /**
392 * @tc.steps: step1. connect device B with device C
393 */
394 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
395 std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
396
397 /**
398 * @tc.steps: step2. device B simulate partial loss
399 */
400 g_envDeviceB.adapterHandle->SimulateSendPartialLoss();
401
402 /**
403 * @tc.steps: step3. device B send message(registered and giant) to device C using communicator BC
404 * @tc.expected: step3. communicator CC not receive the message
405 */
406 uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
407 Message *sendMsgForBC = BuildRegedGiantMessage(dataLength);
408 ASSERT_NE(sendMsgForBC, nullptr);
409 SendConfig conf = {false, false, 0};
410 int errCode = g_commBC->SendMessage(DEVICE_NAME_C, sendMsgForBC, conf);
411 EXPECT_EQ(errCode, E_OK);
412 std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
413 EXPECT_EQ(recvMsgForCC, nullptr);
414
415 /**
416 * @tc.steps: step4. device B not simulate partial loss
417 */
418 g_envDeviceB.adapterHandle->SimulateSendPartialLossClear();
419
420 /**
421 * @tc.steps: step5. device B send message(registered and giant) to device C using communicator BC
422 * @tc.expected: step5. communicator CC received the message, the length equal to the one that is second send
423 */
424 dataLength = 17 * 1024 * 1024; // 17 MB, 1024 is scale
425 Message *resendMsgForBC = BuildRegedGiantMessage(dataLength);
426 ASSERT_NE(resendMsgForBC, nullptr);
427 errCode = g_commBC->SendMessage(DEVICE_NAME_C, resendMsgForBC, conf);
428 EXPECT_EQ(errCode, E_OK);
429 std::this_thread::sleep_for(std::chrono::milliseconds(3400)); // Wait 3400 ms to make sure send done
430 ASSERT_NE(recvMsgForCC, nullptr);
431 ASSERT_EQ(recvMsgForCC->GetMessageId(), REGED_GIANT_MSG_ID);
432 const RegedGiantObject *recvObjForCC = recvMsgForCC->GetObject<RegedGiantObject>();
433 ASSERT_NE(recvObjForCC, nullptr);
434 EXPECT_EQ(dataLength, recvObjForCC->rawData_.size());
435
436 // CleanUp
437 delete recvMsgForCC;
438 recvMsgForCC = nullptr;
439 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
440 }
441
442 /**
443 * @tc.name: Fragment 003
444 * @tc.desc: Test fragmentation simultaneously
445 * @tc.type: FUNC
446 * @tc.require: AR000BVDGI AR000CQE0M
447 * @tc.author: xiaozhenjian
448 */
449 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment003, TestSize.Level3)
450 {
451 // Preset
452 std::atomic<int> count {0};
__anonb9037cf20702(const std::string &srcTarget, Message *inMsg) 453 OnMessageCallback callback = [&count](const std::string &srcTarget, Message *inMsg) {
454 delete inMsg;
455 inMsg = nullptr;
456 count.fetch_add(1, std::memory_order_seq_cst);
457 };
458 g_commBB->RegOnMessageCallback(callback, nullptr);
459 g_commBC->RegOnMessageCallback(callback, nullptr);
460
461 /**
462 * @tc.steps: step1. connect device A with device B, then device B with device C
463 */
464 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
465 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
466 std::this_thread::sleep_for(std::chrono::milliseconds(400)); // Wait 400 ms to make sure quiet
467
468 /**
469 * @tc.steps: step2. device A and device C simulate send block
470 */
471 g_envDeviceA.adapterHandle->SimulateSendBlock();
472 g_envDeviceC.adapterHandle->SimulateSendBlock();
473
474 /**
475 * @tc.steps: step3. device A send message(registered and giant) to device B using communicator AB
476 */
477 uint32_t dataLength = 23 * 1024 * 1024; // 23 MB, 1024 is scale
478 Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
479 ASSERT_NE(sendMsgForAB, nullptr);
480 SendConfig conf = {false, false, 0};
481 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
482 EXPECT_EQ(errCode, E_OK);
483
484 /**
485 * @tc.steps: step4. device C send message(registered and giant) to device B using communicator CC
486 */
487 Message *sendMsgForCC = BuildRegedGiantMessage(dataLength);
488 ASSERT_NE(sendMsgForCC, nullptr);
489 errCode = g_commCC->SendMessage(DEVICE_NAME_B, sendMsgForCC, conf);
490 EXPECT_EQ(errCode, E_OK);
491
492 /**
493 * @tc.steps: step5. device A and device C not simulate send block
494 * @tc.expected: step5. communicator BB and BV received the message
495 */
496 g_envDeviceA.adapterHandle->SimulateSendBlockClear();
497 g_envDeviceC.adapterHandle->SimulateSendBlockClear();
498 std::this_thread::sleep_for(std::chrono::milliseconds(9200)); // Wait 9200 ms to make sure send done
499 EXPECT_EQ(count, 2); // 2 combined message received
500
501 // CleanUp
502 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
503 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
504 }
505
506 /**
507 * @tc.name: Fragment 004
508 * @tc.desc: Test fragmentation in send and receive when rate limit
509 * @tc.type: FUNC
510 * @tc.require: AR000BVDGI AR000CQE0M
511 * @tc.author: zhangqiquan
512 */
513 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment004, TestSize.Level2)
514 {
515 /**
516 * @tc.steps: step1. connect device A with device B
517 */
518 Message *recvMsgForBB = nullptr;
__anonb9037cf20802(const std::string &srcTarget, Message *inMsg) 519 g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
520 recvMsgForBB = inMsg;
521 }, nullptr);
522 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
523 std::atomic<int> count = 0;
__anonb9037cf20902() 524 g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
525 count++;
526 if (count % 3 == 0) { // retry each 3 packet
527 return -E_WAIT_RETRY;
528 }
529 return E_OK;
530 });
531 /**
532 * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
533 * @tc.expected: step2. communicator BB received the message
534 */
535 const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
536 Message *sendMsg = BuildRegedGiantMessage(dataLength);
537 ASSERT_NE(sendMsg, nullptr);
538 SendConfig conf = {false, false, 0};
539 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf);
540 EXPECT_EQ(errCode, E_OK);
541 std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
542 g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
543 g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
544 int reTryTimes = 5;
545 while (recvMsgForBB == nullptr && reTryTimes > 0) {
546 std::this_thread::sleep_for(std::chrono::seconds(3));
547 reTryTimes--;
548 }
549 ASSERT_NE(recvMsgForBB, nullptr);
550 ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
551 /**
552 * @tc.steps: step3. Compare received data with send data
553 * @tc.expected: step3. equal
554 */
555 Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
556 ASSERT_NE(oriMsgForAB, nullptr);
557 auto *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
558 ASSERT_NE(recvObjForBB, nullptr);
559 auto *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
560 ASSERT_NE(oriObjForAB, nullptr);
561 bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
562 EXPECT_EQ(isEqual, true);
563 g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
564
565 // CleanUp
566 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
567 delete oriMsgForAB;
568 oriMsgForAB = nullptr;
569 delete recvMsgForBB;
570 recvMsgForBB = nullptr;
571 }
572
573 namespace {
ClearPreviousTestCaseInfluence()574 void ClearPreviousTestCaseInfluence()
575 {
576 ReleaseAllCommunicator();
577 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
578 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
579 AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
580 std::this_thread::sleep_for(std::chrono::seconds(10)); // Wait 10 s to make sure all thread quiet
581 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
582 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
583 AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
584 AllocAllCommunicator();
585 }
586 }
587
588 /**
589 * @tc.name: ReliableOnline 001
590 * @tc.desc: Test device online reliability
591 * @tc.type: FUNC
592 * @tc.require: AR000BVDGJ AR000CQE0N
593 * @tc.author: xiaozhenjian
594 */
595 HWTEST_F(DistributedDBCommunicatorDeepTest, ReliableOnline001, TestSize.Level2)
596 {
597 // Preset
598 ClearPreviousTestCaseInfluence();
599 std::atomic<int> count {0};
__anonb9037cf20b02(const std::string &target, bool isConnect) 600 OnConnectCallback callback = [&count](const std::string &target, bool isConnect) {
601 if (isConnect) {
602 count.fetch_add(1, std::memory_order_seq_cst);
603 }
604 };
605 g_commAA->RegOnConnectCallback(callback, nullptr);
606 g_commAB->RegOnConnectCallback(callback, nullptr);
607 g_commBB->RegOnConnectCallback(callback, nullptr);
608 g_commBC->RegOnConnectCallback(callback, nullptr);
609 g_commCC->RegOnConnectCallback(callback, nullptr);
610 g_commCA->RegOnConnectCallback(callback, nullptr);
611
612 /**
613 * @tc.steps: step1. device A and device B and device C simulate send total loss
614 */
615 g_envDeviceA.adapterHandle->SimulateSendTotalLoss();
616 g_envDeviceB.adapterHandle->SimulateSendTotalLoss();
617 g_envDeviceC.adapterHandle->SimulateSendTotalLoss();
618
619 /**
620 * @tc.steps: step2. connect device A with device B, device B with device C, device C with device A
621 */
622 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
623 AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
624 AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
625
626 /**
627 * @tc.steps: step3. wait a long time
628 * @tc.expected: step3. no communicator received the online callback
629 */
630 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure quiet
631 EXPECT_EQ(count, 0); // no online callback received
632
633 /**
634 * @tc.steps: step4. device A and device B and device C not simulate send total loss
635 */
636 g_envDeviceA.adapterHandle->SimulateSendTotalLossClear();
637 g_envDeviceB.adapterHandle->SimulateSendTotalLossClear();
638 g_envDeviceC.adapterHandle->SimulateSendTotalLossClear();
639 std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure send done
640 EXPECT_EQ(count, 6); // 6 online callback received in total
641
642 // CleanUp
643 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
644 AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
645 AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
646 }
647
648 /**
649 * @tc.name: NetworkAdapter001
650 * @tc.desc: Test networkAdapter start func
651 * @tc.type: FUNC
652 * @tc.require: AR000BVDGJ
653 * @tc.author: zhangqiquan
654 */
655 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter001, TestSize.Level1)
656 {
657 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
658 EXPECT_CALL(*processCommunicator, Stop()).WillRepeatedly(testing::Return(OK));
659 /**
660 * @tc.steps: step1. adapter start with empty label
661 * @tc.expected: step1. start failed
662 */
663 auto adapter = std::make_shared<NetworkAdapter>("");
664 EXPECT_EQ(adapter->StartAdapter(), -E_INVALID_ARGS);
665 /**
666 * @tc.steps: step2. adapter start with not empty label but processCommunicator is null
667 * @tc.expected: step2. start failed
668 */
669 adapter = std::make_shared<NetworkAdapter>("label");
670 EXPECT_EQ(adapter->StartAdapter(), -E_INVALID_ARGS);
671 /**
672 * @tc.steps: step3. processCommunicator start not ok
673 * @tc.expected: step3. start failed
674 */
675 adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
676 EXPECT_CALL(*processCommunicator, Start).WillRepeatedly(testing::Return(DB_ERROR));
677 EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
678 /**
679 * @tc.steps: step4. processCommunicator reg not ok
680 * @tc.expected: step4. start failed
681 */
682 EXPECT_CALL(*processCommunicator, Start).WillRepeatedly(testing::Return(OK));
683 EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(testing::Return(DB_ERROR));
684 EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
685 EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(testing::Return(OK));
686 EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(testing::Return(DB_ERROR));
687 EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
688 /**
689 * @tc.steps: step5. processCommunicator reg ok
690 * @tc.expected: step5. start success
691 */
692 EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(testing::Return(OK));
__anonb9037cf20c02() 693 EXPECT_CALL(*processCommunicator, GetLocalDeviceInfos).WillRepeatedly([]() {
694 DeviceInfos deviceInfos;
695 deviceInfos.identifier = "DEVICES_A"; // local is deviceA
696 return deviceInfos;
697 });
__anonb9037cf20d02() 698 EXPECT_CALL(*processCommunicator, GetRemoteOnlineDeviceInfosList).WillRepeatedly([]() {
699 std::vector<DeviceInfos> res;
700 DeviceInfos deviceInfos;
701 deviceInfos.identifier = "DEVICES_A"; // search local is deviceA
702 res.push_back(deviceInfos);
703 deviceInfos.identifier = "DEVICES_B"; // search remote is deviceB
704 res.push_back(deviceInfos);
705 return res;
706 });
__anonb9037cf20e02(const DeviceInfos &) 707 EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
708 return false;
709 });
710 EXPECT_EQ(adapter->StartAdapter(), E_OK);
711 RuntimeContext::GetInstance()->StopTaskPool();
712 }
713
714 /**
715 * @tc.name: NetworkAdapter002
716 * @tc.desc: Test networkAdapter get mtu func
717 * @tc.type: FUNC
718 * @tc.require: AR000BVDGJ
719 * @tc.author: zhangqiquan
720 */
721 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter002, TestSize.Level1)
722 {
723 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
724 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
725 /**
726 * @tc.steps: step1. processCommunicator return 0 mtu
727 * @tc.expected: step1. adapter will adjust to min mtu
728 */
__anonb9037cf20f02() 729 EXPECT_CALL(*processCommunicator, GetMtuSize).WillRepeatedly([]() {
730 return 0u;
731 });
732 EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MIN_MTU_SIZE);
733 /**
734 * @tc.steps: step2. processCommunicator return 2 max mtu
735 * @tc.expected: step2. adapter will return min mtu util re make
736 */
__anonb9037cf21002() 737 EXPECT_CALL(*processCommunicator, GetMtuSize).WillRepeatedly([]() {
738 return 2 * DBConstant::MAX_MTU_SIZE;
739 });
740 EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MIN_MTU_SIZE);
741 adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
742 EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MAX_MTU_SIZE);
743 }
744
745 /**
746 * @tc.name: NetworkAdapter003
747 * @tc.desc: Test networkAdapter get timeout func
748 * @tc.type: FUNC
749 * @tc.require: AR000BVDGJ
750 * @tc.author: zhangqiquan
751 */
752 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter003, TestSize.Level1)
753 {
754 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
755 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
756 /**
757 * @tc.steps: step1. processCommunicator return 0 timeout
758 * @tc.expected: step1. adapter will adjust to min timeout
759 */
__anonb9037cf21102() 760 EXPECT_CALL(*processCommunicator, GetTimeout).WillRepeatedly([]() {
761 return 0u;
762 });
763 EXPECT_EQ(adapter->GetTimeout(), DBConstant::MIN_TIMEOUT);
764 /**
765 * @tc.steps: step2. processCommunicator return 2 max timeout
766 * @tc.expected: step2. adapter will adjust to max timeout
767 */
__anonb9037cf21202() 768 EXPECT_CALL(*processCommunicator, GetTimeout).WillRepeatedly([]() {
769 return 2 * DBConstant::MAX_TIMEOUT;
770 });
771 EXPECT_EQ(adapter->GetTimeout(), DBConstant::MAX_TIMEOUT);
772 }
773
774 /**
775 * @tc.name: NetworkAdapter004
776 * @tc.desc: Test networkAdapter send bytes func
777 * @tc.type: FUNC
778 * @tc.require: AR000BVDGJ
779 * @tc.author: zhangqiquan
780 */
781 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter004, TestSize.Level1)
782 {
783 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
784 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
785
__anonb9037cf21302(const DeviceInfos &, const uint8_t *, uint32_t) 786 EXPECT_CALL(*processCommunicator, SendData).WillRepeatedly([](const DeviceInfos &, const uint8_t *, uint32_t) {
787 return OK;
788 });
789 /**
790 * @tc.steps: step1. adapter send data with error param
791 * @tc.expected: step1. adapter send failed
792 */
793 auto data = std::make_shared<uint8_t>(1u);
794 EXPECT_EQ(adapter->SendBytes("DEVICES_B", nullptr, 1, 0), -E_INVALID_ARGS);
795 EXPECT_EQ(adapter->SendBytes("DEVICES_B", data.get(), 0, 0), -E_INVALID_ARGS);
796 /**
797 * @tc.steps: step2. adapter send data with right param
798 * @tc.expected: step2. adapter send ok
799 */
800 EXPECT_EQ(adapter->SendBytes("DEVICES_B", data.get(), 1, 0), E_OK);
801 RuntimeContext::GetInstance()->StopTaskPool();
802 }
803
804 namespace {
InitAdapter(const std::shared_ptr<NetworkAdapter> & adapter,const std::shared_ptr<MockProcessCommunicator> & processCommunicator,OnDataReceive & onDataReceive,OnDeviceChange & onDataChange)805 void InitAdapter(const std::shared_ptr<NetworkAdapter> &adapter,
806 const std::shared_ptr<MockProcessCommunicator> &processCommunicator,
807 OnDataReceive &onDataReceive, OnDeviceChange &onDataChange)
808 {
809 EXPECT_CALL(*processCommunicator, Stop).WillRepeatedly([]() {
810 return OK;
811 });
812 EXPECT_CALL(*processCommunicator, Start).WillRepeatedly([](const std::string &) {
813 return OK;
814 });
815 EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(
816 [&onDataReceive](const OnDataReceive &callback) {
817 onDataReceive = callback;
818 return OK;
819 });
820 EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(
821 [&onDataChange](const OnDeviceChange &callback) {
822 onDataChange = callback;
823 return OK;
824 });
825 EXPECT_CALL(*processCommunicator, GetRemoteOnlineDeviceInfosList).WillRepeatedly([]() {
826 std::vector<DeviceInfos> res;
827 return res;
828 });
829 EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
830 return false;
831 });
832 EXPECT_EQ(adapter->StartAdapter(), E_OK);
833 }
834 }
835 /**
836 * @tc.name: NetworkAdapter005
837 * @tc.desc: Test networkAdapter receive data func
838 * @tc.type: FUNC
839 * @tc.require: AR000BVDGJ
840 * @tc.author: zhangqiquan
841 */
842 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter005, TestSize.Level1)
843 {
844 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
845 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
846 OnDataReceive onDataReceive;
847 OnDeviceChange onDeviceChange;
848 InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
849 ASSERT_NE(onDataReceive, nullptr);
850 /**
851 * @tc.steps: step1. adapter recv data with error param
852 */
853 auto data = std::make_shared<uint8_t>(1);
854 DeviceInfos deviceInfos;
855 onDataReceive(deviceInfos, nullptr, 1);
856 onDataReceive(deviceInfos, data.get(), 0);
857 /**
858 * @tc.steps: step2. adapter recv data with no permission
859 */
860 EXPECT_CALL(*processCommunicator, CheckAndGetDataHeadInfo).WillRepeatedly(
__anonb9037cf21b02(const uint8_t *, uint32_t, uint32_t &, std::vector<std::string> &) 861 [](const uint8_t *, uint32_t, uint32_t &, std::vector<std::string> &) {
862 return NO_PERMISSION;
863 });
864 onDataReceive(deviceInfos, data.get(), 1);
865 EXPECT_CALL(*processCommunicator, CheckAndGetDataHeadInfo).WillRepeatedly(
__anonb9037cf21c02(const uint8_t *, uint32_t, uint32_t &, std::vector<std::string> &userIds) 866 [](const uint8_t *, uint32_t, uint32_t &, std::vector<std::string> &userIds) {
867 userIds.emplace_back("1");
868 return OK;
869 });
870 /**
871 * @tc.steps: step3. adapter recv data with no callback
872 */
873 onDataReceive(deviceInfos, data.get(), 1);
__anonb9037cf21d02(const std::string &, const uint8_t *, uint32_t, const std::string &) 874 adapter->RegBytesReceiveCallback([](const std::string &, const uint8_t *, uint32_t, const std::string &) {
875 }, nullptr);
876 onDataReceive(deviceInfos, data.get(), 1);
877 RuntimeContext::GetInstance()->StopTaskPool();
878 }
879
880 /**
881 * @tc.name: NetworkAdapter006
882 * @tc.desc: Test networkAdapter device change func
883 * @tc.type: FUNC
884 * @tc.require: AR000BVDGJ
885 * @tc.author: zhangqiquan
886 */
887 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter006, TestSize.Level1)
888 {
889 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
890 auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
891 OnDataReceive onDataReceive;
892 OnDeviceChange onDeviceChange;
893 InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
894 ASSERT_NE(onDeviceChange, nullptr);
895 DeviceInfos deviceInfos;
896 /**
897 * @tc.steps: step1. onDeviceChange with no same process
898 */
899 onDeviceChange(deviceInfos, true);
900 /**
901 * @tc.steps: step2. onDeviceChange with same process
902 */
__anonb9037cf21e02(const DeviceInfos &) 903 EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
904 return true;
905 });
906 onDeviceChange(deviceInfos, true);
__anonb9037cf21f02(const std::string &, bool) 907 adapter->RegTargetChangeCallback([](const std::string &, bool) {
908 }, nullptr);
909 onDeviceChange(deviceInfos, false);
910 /**
911 * @tc.steps: step3. adapter send data with db_error
912 * @tc.expected: step3. adapter send failed
913 */
914 onDeviceChange(deviceInfos, true);
__anonb9037cf22002(const DeviceInfos &, const uint8_t *, uint32_t) 915 EXPECT_CALL(*processCommunicator, SendData).WillRepeatedly([](const DeviceInfos &, const uint8_t *, uint32_t) {
916 return DB_ERROR;
917 });
__anonb9037cf22102(const DeviceInfos &) 918 EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
919 return false;
920 });
921 auto data = std::make_shared<uint8_t>(1);
922 EXPECT_EQ(adapter->SendBytes("", data.get(), 1, 0), static_cast<int>(DB_ERROR));
923 RuntimeContext::GetInstance()->StopTaskPool();
924 EXPECT_EQ(adapter->IsDeviceOnline(""), false);
925 ExtendInfo info;
926 EXPECT_EQ(adapter->GetExtendHeaderHandle(info), nullptr);
927 }
928
929 /**
930 * @tc.name: NetworkAdapter007
931 * @tc.desc: Test networkAdapter recv invalid head length
932 * @tc.type: FUNC
933 * @tc.require: AR000BVDGJ
934 * @tc.author: zhangqiquan
935 */
936 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter007, TestSize.Level1)
937 {
938 auto processCommunicator = std::make_shared<MockProcessCommunicator>();
939 auto adapter = std::make_shared<NetworkAdapter>("NetworkAdapter007", processCommunicator);
940 OnDataReceive onDataReceive;
941 OnDeviceChange onDeviceChange;
942 InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
943 ASSERT_NE(onDeviceChange, nullptr);
944 /**
945 * @tc.steps: step1. CheckAndGetDataHeadInfo return invalid headLen
946 * @tc.expected: step1. adapter check this len
947 */
948 EXPECT_CALL(*processCommunicator, CheckAndGetDataHeadInfo).WillOnce([](const uint8_t *, uint32_t, uint32_t &headLen,
__anonb9037cf22202(const uint8_t *, uint32_t, uint32_t &headLen, std::vector<std::string> &) 949 std::vector<std::string> &) {
950 headLen = UINT32_MAX;
951 return OK;
952 });
953 /**
954 * @tc.steps: step2. Adapter ignore data because len is too large
955 * @tc.expected: step2. BytesReceive never call
956 */
957 int callByteReceiveCount = 0;
958 int res = adapter->RegBytesReceiveCallback([&callByteReceiveCount](const std::string &, const uint8_t *, uint32_t,
__anonb9037cf22302(const std::string &, const uint8_t *, uint32_t, const std::string &) 959 const std::string &) {
960 callByteReceiveCount++;
961 }, nullptr);
962 EXPECT_EQ(res, E_OK);
963 std::vector<uint8_t> data = { 1u };
964 DeviceInfos deviceInfos;
965 onDataReceive(deviceInfos, data.data(), 1u);
966 EXPECT_EQ(callByteReceiveCount, 0);
967 }
968
969 /**
970 * @tc.name: RetrySendExceededLimit001
971 * @tc.desc: Test send result when the number of retry times exceeds the limit
972 * @tc.type: FUNC
973 * @tc.require:
974 * @tc.author: suyue
975 */
976 HWTEST_F(DistributedDBCommunicatorDeepTest, RetrySendExceededLimit001, TestSize.Level2)
977 {
978 /**
979 * @tc.steps: step1. connect device A with device B and fork SendBytes
980 * @tc.expected: step1. operation OK
981 */
982 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
983 std::atomic<int> count = 0;
__anonb9037cf22402() 984 g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
985 count++;
986 return -E_WAIT_RETRY;
987 });
988
989 /**
990 * @tc.steps: step2. the number of retry times for device A to send a message exceeds the limit
991 * @tc.expected: step2. sendResult fail
992 */
993 std::vector<std::pair<int, bool>> sendResult;
__anonb9037cf22502(int result, bool isDirectEnd) 994 auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
995 sendResult.push_back(std::pair<int, bool>(result, isDirectEnd));
996 };
997 const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
998 Message *sendMsg = BuildRegedGiantMessage(dataLength);
999 ASSERT_NE(sendMsg, nullptr);
1000 SendConfig conf = {false, false, 0};
1001 int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf, sendResultNotifier);
1002 EXPECT_EQ(errCode, E_OK);
1003 std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
1004 g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
1005 g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B, -E_BASE);
1006 int reTryTimes = 5;
1007 while ((count < 4) && (reTryTimes > 0)) { // Wait to make sure retry exceeds the limit
1008 std::this_thread::sleep_for(std::chrono::seconds(3));
1009 reTryTimes--;
1010 }
1011 ASSERT_EQ(sendResult.size(), static_cast<size_t>(1)); // only one callback result notification
1012 EXPECT_EQ(sendResult[0].first, -E_BASE); // index 0 retry fail
1013 EXPECT_EQ(sendResult[0].second, false);
1014
1015 g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
1016 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1017 }
1018
1019 /**
1020 * @tc.name: RetrySendExceededLimit002
1021 * @tc.desc: Test multi thread call SendableCallback when the number of retry times exceeds the limit
1022 * @tc.type: FUNC
1023 * @tc.require:
1024 * @tc.author: suyue
1025 */
1026 HWTEST_F(DistributedDBCommunicatorDeepTest, RetrySendExceededLimit002, TestSize.Level2)
1027 {
1028 /**
1029 * @tc.steps: step1. DeviceA send SendMessage and set SendBytes interface return -E_WAIT_RETRY
1030 * @tc.expected: step1. Send ok
1031 */
1032 AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1033 std::atomic<int> count = 0;
__anonb9037cf22602() 1034 g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
1035 count++;
1036 return -E_WAIT_RETRY;
1037 });
1038 std::vector<std::pair<int, bool>> sendResult;
__anonb9037cf22702(int result, bool isDirectEnd) 1039 auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
1040 sendResult.push_back(std::pair<int, bool>(result, isDirectEnd));
1041 };
1042 const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
1043 Message *sendMsg = BuildRegedGiantMessage(dataLength);
1044 ASSERT_NE(sendMsg, nullptr);
1045 SendConfig conf = {false, false, 0};
1046 EXPECT_EQ(g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf, sendResultNotifier), E_OK);
1047 std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
1048
1049 /**
1050 * @tc.steps: step2. Triggering multi thread call SendableCallback interface and set errorCode
1051 * @tc.expected: step2. Callback success
1052 */
1053 std::vector<std::thread> threads;
1054 int threadNum = 3;
1055 threads.reserve(threadNum);
1056 for (int n = 0; n < threadNum; n++) {
__anonb9037cf22802() 1057 threads.emplace_back([&]() {
1058 g_envDeviceA.adapterHandle->SimulateTriggerSendableCallback(DEVICE_NAME_B, -E_BASE);
1059 });
1060 }
1061 for (std::thread &t : threads) {
1062 t.join();
1063 }
1064
1065 /**
1066 * @tc.steps: step3. Make The number of messages sent by device A exceed the limit
1067 * @tc.expected: step3. SendResult is the errorCode set by SendableCallback interface
1068 */
1069 int reTryTimes = 5;
1070 while ((count < 4) && (reTryTimes > 0)) {
1071 std::this_thread::sleep_for(std::chrono::seconds(3));
1072 reTryTimes--;
1073 }
1074 ASSERT_EQ(sendResult.size(), static_cast<size_t>(1));
1075 EXPECT_EQ(sendResult[0].first, -E_BASE);
1076 EXPECT_EQ(sendResult[0].second, false);
1077 g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
1078 AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1079 }
1080