1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <gmock/gmock.h>
18 #include <new>
19 #include <thread>
20 #include "db_errno.h"
21 #include "distributeddb_communicator_common.h"
22 #include "distributeddb_tools_unit_test.h"
23 #include "log_print.h"
24 #include "network_adapter.h"
25 #include "message.h"
26 #include "mock_process_communicator.h"
27 #include "serial_buffer.h"
28 
29 using namespace std;
30 using namespace testing::ext;
31 using namespace DistributedDB;
32 
33 namespace {
34     EnvHandle g_envDeviceA;
35     EnvHandle g_envDeviceB;
36     EnvHandle g_envDeviceC;
37     ICommunicator *g_commAA = nullptr;
38     ICommunicator *g_commAB = nullptr;
39     ICommunicator *g_commBB = nullptr;
40     ICommunicator *g_commBC = nullptr;
41     ICommunicator *g_commCC = nullptr;
42     ICommunicator *g_commCA = nullptr;
43 }
44 
45 class DistributedDBCommunicatorDeepTest : public testing::Test {
46 public:
47     static void SetUpTestCase(void);
48     static void TearDownTestCase(void);
49     void SetUp();
50     void TearDown();
51 };
52 
SetUpTestCase(void)53 void DistributedDBCommunicatorDeepTest::SetUpTestCase(void)
54 {
55     /**
56      * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
57      */
58     LOGI("[UT][DeepTest][SetUpTestCase] Enter.");
59     bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
60     ASSERT_EQ(errCode, true);
61     errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
62     ASSERT_EQ(errCode, true);
63     errCode = SetUpEnv(g_envDeviceC, DEVICE_NAME_C);
64     ASSERT_EQ(errCode, true);
65     DoRegTransformFunction();
66     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
67 }
68 
TearDownTestCase(void)69 void DistributedDBCommunicatorDeepTest::TearDownTestCase(void)
70 {
71     /**
72      * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
73      */
74     LOGI("[UT][DeepTest][TearDownTestCase] Enter.");
75     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
76     TearDownEnv(g_envDeviceA);
77     TearDownEnv(g_envDeviceB);
78     TearDownEnv(g_envDeviceC);
79     CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
80 }
81 
82 namespace {
AllocAllCommunicator()83 void AllocAllCommunicator()
84 {
85     int errorNo = E_OK;
86     g_commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
87     ASSERT_NOT_NULL_AND_ACTIVATE(g_commAA);
88     g_commAB = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
89     ASSERT_NOT_NULL_AND_ACTIVATE(g_commAB);
90     g_commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
91     ASSERT_NOT_NULL_AND_ACTIVATE(g_commBB);
92     g_commBC = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
93     ASSERT_NOT_NULL_AND_ACTIVATE(g_commBC);
94     g_commCC = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_C, errorNo);
95     ASSERT_NOT_NULL_AND_ACTIVATE(g_commCC);
96     g_commCA = g_envDeviceC.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
97     ASSERT_NOT_NULL_AND_ACTIVATE(g_commCA);
98 }
99 
ReleaseAllCommunicator()100 void ReleaseAllCommunicator()
101 {
102     g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAA);
103     g_commAA = nullptr;
104     g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAB);
105     g_commAB = nullptr;
106     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBB);
107     g_commBB = nullptr;
108     g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBC);
109     g_commBC = nullptr;
110     g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCC);
111     g_commCC = nullptr;
112     g_envDeviceC.commAggrHandle->ReleaseCommunicator(g_commCA);
113     g_commCA = nullptr;
114 }
115 }
116 
SetUp()117 void DistributedDBCommunicatorDeepTest::SetUp()
118 {
119     DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
120     /**
121      * @tc.setup: Alloc communicator AA, AB, BB, BC, CC, CA
122      */
123     AllocAllCommunicator();
124 }
125 
TearDown()126 void DistributedDBCommunicatorDeepTest::TearDown()
127 {
128     /**
129      * @tc.teardown: Release communicator AA, AB, BB, BC, CC, CA
130      */
131     ReleaseAllCommunicator();
132     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure all thread quiet
133 }
134 
135 /**
136  * @tc.name: WaitAndRetrySend 001
137  * @tc.desc: Test send retry semantic
138  * @tc.type: FUNC
139  * @tc.require: AR000BVDGI AR000CQE0M
140  * @tc.author: xiaozhenjian
141  */
142 HWTEST_F(DistributedDBCommunicatorDeepTest, WaitAndRetrySend001, TestSize.Level2)
143 {
144     // Preset
145     Message *msgForBB = nullptr;
__anonb9037cf20302(const std::string &srcTarget, Message *inMsg) 146     g_commBB->RegOnMessageCallback([&msgForBB](const std::string &srcTarget, Message *inMsg) {
147         msgForBB = inMsg;
148     }, nullptr);
149     Message *msgForCA = nullptr;
__anonb9037cf20402(const std::string &srcTarget, Message *inMsg) 150     g_commCA->RegOnMessageCallback([&msgForCA](const std::string &srcTarget, Message *inMsg) {
151         msgForCA = inMsg;
152     }, nullptr);
153 
154     /**
155      * @tc.steps: step1. connect device A with device B
156      */
157     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
158     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceC.adapterHandle);
159     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
160 
161     /**
162      * @tc.steps: step2. device A simulate send retry
163      */
164     g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
165 
166     /**
167      * @tc.steps: step3. device A send message to device B using communicator AB
168      * @tc.expected: step3. communicator BB received no message
169      */
170     Message *msgForAB = BuildRegedTinyMessage();
171     ASSERT_NE(msgForAB, nullptr);
172     SendConfig conf = {true, false, 0};
173     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, msgForAB, conf);
174     EXPECT_EQ(errCode, E_OK);
175 
176     Message *msgForAA = BuildRegedTinyMessage();
177     ASSERT_NE(msgForAA, nullptr);
178     errCode = g_commAA->SendMessage(DEVICE_NAME_C, msgForAA, conf);
179     EXPECT_EQ(errCode, E_OK);
180     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
181     EXPECT_EQ(msgForBB, nullptr);
182     EXPECT_NE(msgForCA, nullptr);
183     delete msgForCA;
184     msgForCA = nullptr;
185 
186     /**
187      * @tc.steps: step4. device A simulate sendable feedback
188      * @tc.expected: step4. communicator BB received the message
189      */
190     g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
191     std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms
192     EXPECT_NE(msgForBB, nullptr);
193     delete msgForBB;
194     msgForBB = nullptr;
195 
196     // CleanUp
197     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
198 }
199 
CreateBufferThenAddIntoScheduler(SendTaskScheduler & scheduler,const std::string & dstTarget,Priority inPrio)200 static int CreateBufferThenAddIntoScheduler(SendTaskScheduler &scheduler, const std::string &dstTarget, Priority inPrio)
201 {
202     SerialBuffer *eachBuff = new (std::nothrow) SerialBuffer();
203     if (eachBuff == nullptr) {
204         return -E_OUT_OF_MEMORY;
205     }
206     int errCode = eachBuff->AllocBufferByTotalLength(100, 0); // 100 totallen without header
207     if (errCode != E_OK) {
208         delete eachBuff;
209         eachBuff = nullptr;
210         return errCode;
211     }
212     SendTask task{eachBuff, dstTarget, nullptr, 0u};
213     errCode = scheduler.AddSendTaskIntoSchedule(task, inPrio);
214     if (errCode != E_OK) {
215         delete eachBuff;
216         eachBuff = nullptr;
217         return errCode;
218     }
219     return E_OK;
220 }
221 
222 /**
223  * @tc.name: SendSchedule 001
224  * @tc.desc: Test schedule in Priority order than in send order
225  * @tc.type: FUNC
226  * @tc.require: AR000BVDGI AR000CQE0M
227  * @tc.author: xiaozhenjian
228  */
229 HWTEST_F(DistributedDBCommunicatorDeepTest, SendSchedule001, TestSize.Level2)
230 {
231     // Preset
232     SendTaskScheduler scheduler;
233     scheduler.Initialize();
234 
235     /**
236      * @tc.steps: step1. Add low priority target A buffer to schecduler
237      */
238     int errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::LOW);
239     EXPECT_EQ(errCode, E_OK);
240 
241     /**
242      * @tc.steps: step2. Add low priority target B buffer to schecduler
243      */
244     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::LOW);
245     EXPECT_EQ(errCode, E_OK);
246 
247     /**
248      * @tc.steps: step3. Add normal priority target B buffer to schecduler
249      */
250     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_B, Priority::NORMAL);
251     EXPECT_EQ(errCode, E_OK);
252 
253     /**
254      * @tc.steps: step4. Add normal priority target C buffer to schecduler
255      */
256     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::NORMAL);
257     EXPECT_EQ(errCode, E_OK);
258 
259     /**
260      * @tc.steps: step5. Add high priority target C buffer to schecduler
261      */
262     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_C, Priority::HIGH);
263     EXPECT_EQ(errCode, E_OK);
264 
265     /**
266      * @tc.steps: step6. Add high priority target A buffer to schecduler
267      */
268     errCode = CreateBufferThenAddIntoScheduler(scheduler, DEVICE_NAME_A, Priority::HIGH);
269     EXPECT_EQ(errCode, E_OK);
270 
271     /**
272      * @tc.steps: step7. schedule out buffers one by one
273      * @tc.expected: step7. the order is: high priority target C
274      *                                    high priority target A
275      *                                    normal priority target B
276      *                                    normal priority target C
277      *                                    low priority target A
278      *                                    low priority target B
279      */
280     SendTask outTask;
281     SendTaskInfo outTaskInfo;
282     uint32_t totalLength = 0;
283     // high priority target C
284     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
285     ASSERT_EQ(errCode, E_OK);
286     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
287     EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
288     scheduler.FinalizeLastScheduleTask();
289     // high priority target A
290     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
291     ASSERT_EQ(errCode, E_OK);
292     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
293     EXPECT_EQ(outTaskInfo.taskPrio, Priority::HIGH);
294     scheduler.FinalizeLastScheduleTask();
295     // normal priority target B
296     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
297     ASSERT_EQ(errCode, E_OK);
298     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
299     EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
300     scheduler.FinalizeLastScheduleTask();
301     // normal priority target C
302     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
303     ASSERT_EQ(errCode, E_OK);
304     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_C);
305     EXPECT_EQ(outTaskInfo.taskPrio, Priority::NORMAL);
306     scheduler.FinalizeLastScheduleTask();
307     // low priority target A
308     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
309     ASSERT_EQ(errCode, E_OK);
310     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_A);
311     EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
312     scheduler.FinalizeLastScheduleTask();
313     // low priority target B
314     errCode = scheduler.ScheduleOutSendTask(outTask, outTaskInfo, totalLength);
315     ASSERT_EQ(errCode, E_OK);
316     EXPECT_EQ(outTask.dstTarget, DEVICE_NAME_B);
317     EXPECT_EQ(outTaskInfo.taskPrio, Priority::LOW);
318     scheduler.FinalizeLastScheduleTask();
319 }
320 
321 /**
322  * @tc.name: Fragment 001
323  * @tc.desc: Test fragmentation in send and receive
324  * @tc.type: FUNC
325  * @tc.require: AR000BVDGI AR000CQE0M
326  * @tc.author: xiaozhenjian
327  */
328 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment001, TestSize.Level2)
329 {
330     // Preset
331     Message *recvMsgForBB = nullptr;
__anonb9037cf20502(const std::string &srcTarget, Message *inMsg) 332     g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
333         recvMsgForBB = inMsg;
334     }, nullptr);
335 
336     /**
337      * @tc.steps: step1. connect device A with device B
338      */
339     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
340 
341     /**
342      * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
343      * @tc.expected: step2. communicator BB received the message
344      */
345     const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
346     Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
347     ASSERT_NE(sendMsgForAB, nullptr);
348     SendConfig conf = {false, false, 0};
349     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
350     EXPECT_EQ(errCode, E_OK);
351     std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
352     ASSERT_NE(recvMsgForBB, nullptr);
353     ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
354 
355     /**
356      * @tc.steps: step3. Compare received data with send data
357      * @tc.expected: step3. equal
358      */
359     Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
360     ASSERT_NE(oriMsgForAB, nullptr);
361     const RegedGiantObject *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
362     ASSERT_NE(oriObjForAB, nullptr);
363     const RegedGiantObject *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
364     ASSERT_NE(recvObjForBB, nullptr);
365     bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
366     EXPECT_EQ(isEqual, true);
367 
368     // CleanUp
369     delete oriMsgForAB;
370     oriMsgForAB = nullptr;
371     delete recvMsgForBB;
372     recvMsgForBB = nullptr;
373     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
374 }
375 
376 /**
377  * @tc.name: Fragment 002
378  * @tc.desc: Test fragmentation in partial loss
379  * @tc.type: FUNC
380  * @tc.require: AR000BVDGI AR000CQE0M
381  * @tc.author: xiaozhenjian
382  */
383 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment002, TestSize.Level2)
384 {
385     // Preset
386     Message *recvMsgForCC = nullptr;
__anonb9037cf20602(const std::string &srcTarget, Message *inMsg) 387     g_commCC->RegOnMessageCallback([&recvMsgForCC](const std::string &srcTarget, Message *inMsg) {
388         recvMsgForCC = inMsg;
389     }, nullptr);
390 
391     /**
392      * @tc.steps: step1. connect device B with device C
393      */
394     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
395     std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure quiet
396 
397     /**
398      * @tc.steps: step2. device B simulate partial loss
399      */
400     g_envDeviceB.adapterHandle->SimulateSendPartialLoss();
401 
402     /**
403      * @tc.steps: step3. device B send message(registered and giant) to device C using communicator BC
404      * @tc.expected: step3. communicator CC not receive the message
405      */
406     uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
407     Message *sendMsgForBC = BuildRegedGiantMessage(dataLength);
408     ASSERT_NE(sendMsgForBC, nullptr);
409     SendConfig conf = {false, false, 0};
410     int errCode = g_commBC->SendMessage(DEVICE_NAME_C, sendMsgForBC, conf);
411     EXPECT_EQ(errCode, E_OK);
412     std::this_thread::sleep_for(std::chrono::milliseconds(2600)); // Wait 2600 ms to make sure send done
413     EXPECT_EQ(recvMsgForCC, nullptr);
414 
415     /**
416      * @tc.steps: step4. device B not simulate partial loss
417      */
418     g_envDeviceB.adapterHandle->SimulateSendPartialLossClear();
419 
420     /**
421      * @tc.steps: step5. device B send message(registered and giant) to device C using communicator BC
422      * @tc.expected: step5. communicator CC received the message, the length equal to the one that is second send
423      */
424     dataLength = 17 * 1024 * 1024; // 17 MB, 1024 is scale
425     Message *resendMsgForBC = BuildRegedGiantMessage(dataLength);
426     ASSERT_NE(resendMsgForBC, nullptr);
427     errCode = g_commBC->SendMessage(DEVICE_NAME_C, resendMsgForBC, conf);
428     EXPECT_EQ(errCode, E_OK);
429     std::this_thread::sleep_for(std::chrono::milliseconds(3400)); // Wait 3400 ms to make sure send done
430     ASSERT_NE(recvMsgForCC, nullptr);
431     ASSERT_EQ(recvMsgForCC->GetMessageId(), REGED_GIANT_MSG_ID);
432     const RegedGiantObject *recvObjForCC = recvMsgForCC->GetObject<RegedGiantObject>();
433     ASSERT_NE(recvObjForCC, nullptr);
434     EXPECT_EQ(dataLength, recvObjForCC->rawData_.size());
435 
436     // CleanUp
437     delete recvMsgForCC;
438     recvMsgForCC = nullptr;
439     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
440 }
441 
442 /**
443  * @tc.name: Fragment 003
444  * @tc.desc: Test fragmentation simultaneously
445  * @tc.type: FUNC
446  * @tc.require: AR000BVDGI AR000CQE0M
447  * @tc.author: xiaozhenjian
448  */
449 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment003, TestSize.Level3)
450 {
451     // Preset
452     std::atomic<int> count {0};
__anonb9037cf20702(const std::string &srcTarget, Message *inMsg) 453     OnMessageCallback callback = [&count](const std::string &srcTarget, Message *inMsg) {
454         delete inMsg;
455         inMsg = nullptr;
456         count.fetch_add(1, std::memory_order_seq_cst);
457     };
458     g_commBB->RegOnMessageCallback(callback, nullptr);
459     g_commBC->RegOnMessageCallback(callback, nullptr);
460 
461     /**
462      * @tc.steps: step1. connect device A with device B, then device B with device C
463      */
464     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
465     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
466     std::this_thread::sleep_for(std::chrono::milliseconds(400)); // Wait 400 ms to make sure quiet
467 
468     /**
469      * @tc.steps: step2. device A and device C simulate send block
470      */
471     g_envDeviceA.adapterHandle->SimulateSendBlock();
472     g_envDeviceC.adapterHandle->SimulateSendBlock();
473 
474     /**
475      * @tc.steps: step3. device A send message(registered and giant) to device B using communicator AB
476      */
477     uint32_t dataLength = 23 * 1024 * 1024; // 23 MB, 1024 is scale
478     Message *sendMsgForAB = BuildRegedGiantMessage(dataLength);
479     ASSERT_NE(sendMsgForAB, nullptr);
480     SendConfig conf = {false, false, 0};
481     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsgForAB, conf);
482     EXPECT_EQ(errCode, E_OK);
483 
484     /**
485      * @tc.steps: step4. device C send message(registered and giant) to device B using communicator CC
486      */
487     Message *sendMsgForCC = BuildRegedGiantMessage(dataLength);
488     ASSERT_NE(sendMsgForCC, nullptr);
489     errCode = g_commCC->SendMessage(DEVICE_NAME_B, sendMsgForCC, conf);
490     EXPECT_EQ(errCode, E_OK);
491 
492     /**
493      * @tc.steps: step5. device A and device C not simulate send block
494      * @tc.expected: step5. communicator BB and BV received the message
495      */
496     g_envDeviceA.adapterHandle->SimulateSendBlockClear();
497     g_envDeviceC.adapterHandle->SimulateSendBlockClear();
498     std::this_thread::sleep_for(std::chrono::milliseconds(9200)); // Wait 9200 ms to make sure send done
499     EXPECT_EQ(count, 2); // 2 combined message received
500 
501     // CleanUp
502     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
503     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
504 }
505 
506 /**
507  * @tc.name: Fragment 004
508  * @tc.desc: Test fragmentation in send and receive when rate limit
509  * @tc.type: FUNC
510  * @tc.require: AR000BVDGI AR000CQE0M
511  * @tc.author: zhangqiquan
512  */
513 HWTEST_F(DistributedDBCommunicatorDeepTest, Fragment004, TestSize.Level2)
514 {
515     /**
516      * @tc.steps: step1. connect device A with device B
517      */
518     Message *recvMsgForBB = nullptr;
__anonb9037cf20802(const std::string &srcTarget, Message *inMsg) 519     g_commBB->RegOnMessageCallback([&recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
520         recvMsgForBB = inMsg;
521     }, nullptr);
522     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
523     std::atomic<int> count = 0;
__anonb9037cf20902() 524     g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
525         count++;
526         if (count % 3 == 0) { // retry each 3 packet
527             return -E_WAIT_RETRY;
528         }
529         return E_OK;
530     });
531     /**
532      * @tc.steps: step2. device A send message(registered and giant) to device B using communicator AB
533      * @tc.expected: step2. communicator BB received the message
534      */
535     const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
536     Message *sendMsg = BuildRegedGiantMessage(dataLength);
537     ASSERT_NE(sendMsg, nullptr);
538     SendConfig conf = {false, false, 0};
539     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf);
540     EXPECT_EQ(errCode, E_OK);
541     std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
542     g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
543     g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B);
544     int reTryTimes = 5;
545     while (recvMsgForBB == nullptr && reTryTimes > 0) {
546         std::this_thread::sleep_for(std::chrono::seconds(3));
547         reTryTimes--;
548     }
549     ASSERT_NE(recvMsgForBB, nullptr);
550     ASSERT_EQ(recvMsgForBB->GetMessageId(), REGED_GIANT_MSG_ID);
551     /**
552      * @tc.steps: step3. Compare received data with send data
553      * @tc.expected: step3. equal
554      */
555     Message *oriMsgForAB = BuildRegedGiantMessage(dataLength);
556     ASSERT_NE(oriMsgForAB, nullptr);
557     auto *recvObjForBB = recvMsgForBB->GetObject<RegedGiantObject>();
558     ASSERT_NE(recvObjForBB, nullptr);
559     auto *oriObjForAB = oriMsgForAB->GetObject<RegedGiantObject>();
560     ASSERT_NE(oriObjForAB, nullptr);
561     bool isEqual = RegedGiantObject::CheckEqual(*oriObjForAB, *recvObjForBB);
562     EXPECT_EQ(isEqual, true);
563     g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
564 
565     // CleanUp
566     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
567     delete oriMsgForAB;
568     oriMsgForAB = nullptr;
569     delete recvMsgForBB;
570     recvMsgForBB = nullptr;
571 }
572 
573 namespace {
ClearPreviousTestCaseInfluence()574 void ClearPreviousTestCaseInfluence()
575 {
576     ReleaseAllCommunicator();
577     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
578     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
579     AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
580     std::this_thread::sleep_for(std::chrono::seconds(10)); // Wait 10 s to make sure all thread quiet
581     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
582     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
583     AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
584     AllocAllCommunicator();
585 }
586 }
587 
588 /**
589  * @tc.name: ReliableOnline 001
590  * @tc.desc: Test device online reliability
591  * @tc.type: FUNC
592  * @tc.require: AR000BVDGJ AR000CQE0N
593  * @tc.author: xiaozhenjian
594  */
595 HWTEST_F(DistributedDBCommunicatorDeepTest, ReliableOnline001, TestSize.Level2)
596 {
597     // Preset
598     ClearPreviousTestCaseInfluence();
599     std::atomic<int> count {0};
__anonb9037cf20b02(const std::string &target, bool isConnect) 600     OnConnectCallback callback = [&count](const std::string &target, bool isConnect) {
601         if (isConnect) {
602             count.fetch_add(1, std::memory_order_seq_cst);
603         }
604     };
605     g_commAA->RegOnConnectCallback(callback, nullptr);
606     g_commAB->RegOnConnectCallback(callback, nullptr);
607     g_commBB->RegOnConnectCallback(callback, nullptr);
608     g_commBC->RegOnConnectCallback(callback, nullptr);
609     g_commCC->RegOnConnectCallback(callback, nullptr);
610     g_commCA->RegOnConnectCallback(callback, nullptr);
611 
612     /**
613      * @tc.steps: step1. device A and device B and device C simulate send total loss
614      */
615     g_envDeviceA.adapterHandle->SimulateSendTotalLoss();
616     g_envDeviceB.adapterHandle->SimulateSendTotalLoss();
617     g_envDeviceC.adapterHandle->SimulateSendTotalLoss();
618 
619     /**
620      * @tc.steps: step2. connect device A with device B, device B with device C, device C with device A
621      */
622     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
623     AdapterStub::ConnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
624     AdapterStub::ConnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
625 
626     /**
627      * @tc.steps: step3. wait a long time
628      * @tc.expected: step3. no communicator received the online callback
629      */
630     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure quiet
631     EXPECT_EQ(count, 0); // no online callback received
632 
633     /**
634      * @tc.steps: step4. device A and device B and device C not simulate send total loss
635      */
636     g_envDeviceA.adapterHandle->SimulateSendTotalLossClear();
637     g_envDeviceB.adapterHandle->SimulateSendTotalLossClear();
638     g_envDeviceC.adapterHandle->SimulateSendTotalLossClear();
639     std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure send done
640     EXPECT_EQ(count, 6); // 6 online callback received in total
641 
642     // CleanUp
643     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
644     AdapterStub::DisconnectAdapterStub(g_envDeviceB.adapterHandle, g_envDeviceC.adapterHandle);
645     AdapterStub::DisconnectAdapterStub(g_envDeviceC.adapterHandle, g_envDeviceA.adapterHandle);
646 }
647 
648 /**
649  * @tc.name: NetworkAdapter001
650  * @tc.desc: Test networkAdapter start func
651  * @tc.type: FUNC
652  * @tc.require: AR000BVDGJ
653  * @tc.author: zhangqiquan
654  */
655 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter001, TestSize.Level1)
656 {
657     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
658     EXPECT_CALL(*processCommunicator, Stop()).WillRepeatedly(testing::Return(OK));
659     /**
660      * @tc.steps: step1. adapter start with empty label
661      * @tc.expected: step1. start failed
662      */
663     auto adapter = std::make_shared<NetworkAdapter>("");
664     EXPECT_EQ(adapter->StartAdapter(), -E_INVALID_ARGS);
665     /**
666      * @tc.steps: step2. adapter start with not empty label but processCommunicator is null
667      * @tc.expected: step2. start failed
668      */
669     adapter = std::make_shared<NetworkAdapter>("label");
670     EXPECT_EQ(adapter->StartAdapter(), -E_INVALID_ARGS);
671     /**
672      * @tc.steps: step3. processCommunicator start not ok
673      * @tc.expected: step3. start failed
674      */
675     adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
676     EXPECT_CALL(*processCommunicator, Start).WillRepeatedly(testing::Return(DB_ERROR));
677     EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
678     /**
679      * @tc.steps: step4. processCommunicator reg not ok
680      * @tc.expected: step4. start failed
681      */
682     EXPECT_CALL(*processCommunicator, Start).WillRepeatedly(testing::Return(OK));
683     EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(testing::Return(DB_ERROR));
684     EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
685     EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(testing::Return(OK));
686     EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(testing::Return(DB_ERROR));
687     EXPECT_EQ(adapter->StartAdapter(), -E_PERIPHERAL_INTERFACE_FAIL);
688     /**
689      * @tc.steps: step5. processCommunicator reg ok
690      * @tc.expected: step5. start success
691      */
692     EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(testing::Return(OK));
__anonb9037cf20c02() 693     EXPECT_CALL(*processCommunicator, GetLocalDeviceInfos).WillRepeatedly([]() {
694         DeviceInfos deviceInfos;
695         deviceInfos.identifier = "DEVICES_A"; // local is deviceA
696         return deviceInfos;
697     });
__anonb9037cf20d02() 698     EXPECT_CALL(*processCommunicator, GetRemoteOnlineDeviceInfosList).WillRepeatedly([]() {
699         std::vector<DeviceInfos> res;
700         DeviceInfos deviceInfos;
701         deviceInfos.identifier = "DEVICES_A"; // search local is deviceA
702         res.push_back(deviceInfos);
703         deviceInfos.identifier = "DEVICES_B"; // search remote is deviceB
704         res.push_back(deviceInfos);
705         return res;
706     });
__anonb9037cf20e02(const DeviceInfos &) 707     EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
708         return false;
709     });
710     EXPECT_EQ(adapter->StartAdapter(), E_OK);
711     RuntimeContext::GetInstance()->StopTaskPool();
712 }
713 
714 /**
715  * @tc.name: NetworkAdapter002
716  * @tc.desc: Test networkAdapter get mtu func
717  * @tc.type: FUNC
718  * @tc.require: AR000BVDGJ
719  * @tc.author: zhangqiquan
720  */
721 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter002, TestSize.Level1)
722 {
723     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
724     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
725     /**
726      * @tc.steps: step1. processCommunicator return 0 mtu
727      * @tc.expected: step1. adapter will adjust to min mtu
728      */
__anonb9037cf20f02() 729     EXPECT_CALL(*processCommunicator, GetMtuSize).WillRepeatedly([]() {
730         return 0u;
731     });
732     EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MIN_MTU_SIZE);
733     /**
734      * @tc.steps: step2. processCommunicator return 2 max mtu
735      * @tc.expected: step2. adapter will return min mtu util re make
736      */
__anonb9037cf21002() 737     EXPECT_CALL(*processCommunicator, GetMtuSize).WillRepeatedly([]() {
738         return 2 * DBConstant::MAX_MTU_SIZE;
739     });
740     EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MIN_MTU_SIZE);
741     adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
742     EXPECT_EQ(adapter->GetMtuSize(), DBConstant::MAX_MTU_SIZE);
743 }
744 
745 /**
746  * @tc.name: NetworkAdapter003
747  * @tc.desc: Test networkAdapter get timeout func
748  * @tc.type: FUNC
749  * @tc.require: AR000BVDGJ
750  * @tc.author: zhangqiquan
751  */
752 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter003, TestSize.Level1)
753 {
754     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
755     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
756     /**
757      * @tc.steps: step1. processCommunicator return 0 timeout
758      * @tc.expected: step1. adapter will adjust to min timeout
759      */
__anonb9037cf21102() 760     EXPECT_CALL(*processCommunicator, GetTimeout).WillRepeatedly([]() {
761         return 0u;
762     });
763     EXPECT_EQ(adapter->GetTimeout(), DBConstant::MIN_TIMEOUT);
764     /**
765      * @tc.steps: step2. processCommunicator return 2 max timeout
766      * @tc.expected: step2. adapter will adjust to max timeout
767      */
__anonb9037cf21202() 768     EXPECT_CALL(*processCommunicator, GetTimeout).WillRepeatedly([]() {
769         return 2 * DBConstant::MAX_TIMEOUT;
770     });
771     EXPECT_EQ(adapter->GetTimeout(), DBConstant::MAX_TIMEOUT);
772 }
773 
774 /**
775  * @tc.name: NetworkAdapter004
776  * @tc.desc: Test networkAdapter send bytes func
777  * @tc.type: FUNC
778  * @tc.require: AR000BVDGJ
779  * @tc.author: zhangqiquan
780  */
781 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter004, TestSize.Level1)
782 {
783     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
784     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
785 
__anonb9037cf21302(const DeviceInfos &, const uint8_t *, uint32_t) 786     EXPECT_CALL(*processCommunicator, SendData).WillRepeatedly([](const DeviceInfos &, const uint8_t *, uint32_t) {
787         return OK;
788     });
789     /**
790      * @tc.steps: step1. adapter send data with error param
791      * @tc.expected: step1. adapter send failed
792      */
793     auto data = std::make_shared<uint8_t>(1u);
794     EXPECT_EQ(adapter->SendBytes("DEVICES_B", nullptr, 1, 0), -E_INVALID_ARGS);
795     EXPECT_EQ(adapter->SendBytes("DEVICES_B", data.get(), 0, 0), -E_INVALID_ARGS);
796     /**
797      * @tc.steps: step2. adapter send data with right param
798      * @tc.expected: step2. adapter send ok
799      */
800     EXPECT_EQ(adapter->SendBytes("DEVICES_B", data.get(), 1, 0), E_OK);
801     RuntimeContext::GetInstance()->StopTaskPool();
802 }
803 
804 namespace {
InitAdapter(const std::shared_ptr<NetworkAdapter> & adapter,const std::shared_ptr<MockProcessCommunicator> & processCommunicator,OnDataReceive & onDataReceive,OnDeviceChange & onDataChange)805 void InitAdapter(const std::shared_ptr<NetworkAdapter> &adapter,
806     const std::shared_ptr<MockProcessCommunicator> &processCommunicator,
807     OnDataReceive &onDataReceive, OnDeviceChange &onDataChange)
808 {
809     EXPECT_CALL(*processCommunicator, Stop).WillRepeatedly([]() {
810         return OK;
811     });
812     EXPECT_CALL(*processCommunicator, Start).WillRepeatedly([](const std::string &) {
813         return OK;
814     });
815     EXPECT_CALL(*processCommunicator, RegOnDataReceive).WillRepeatedly(
816         [&onDataReceive](const OnDataReceive &callback) {
817             onDataReceive = callback;
818             return OK;
819     });
820     EXPECT_CALL(*processCommunicator, RegOnDeviceChange).WillRepeatedly(
821         [&onDataChange](const OnDeviceChange &callback) {
822             onDataChange = callback;
823             return OK;
824     });
825     EXPECT_CALL(*processCommunicator, GetRemoteOnlineDeviceInfosList).WillRepeatedly([]() {
826         std::vector<DeviceInfos> res;
827         return res;
828     });
829     EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
830         return false;
831     });
832     EXPECT_EQ(adapter->StartAdapter(), E_OK);
833 }
834 }
835 /**
836  * @tc.name: NetworkAdapter005
837  * @tc.desc: Test networkAdapter receive data func
838  * @tc.type: FUNC
839  * @tc.require: AR000BVDGJ
840  * @tc.author: zhangqiquan
841  */
842 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter005, TestSize.Level1)
843 {
844     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
845     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
846     OnDataReceive onDataReceive;
847     OnDeviceChange onDeviceChange;
848     InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
849     ASSERT_NE(onDataReceive, nullptr);
850     /**
851      * @tc.steps: step1. adapter recv data with error param
852      */
853     auto data = std::make_shared<uint8_t>(1);
854     DeviceInfos deviceInfos;
855     onDataReceive(deviceInfos, nullptr, 1);
856     onDataReceive(deviceInfos, data.get(), 0);
857     /**
858      * @tc.steps: step2. adapter recv data with no permission
859      */
860     EXPECT_CALL(*processCommunicator, CheckAndGetDataHeadInfo).WillRepeatedly(
__anonb9037cf21b02(const uint8_t *, uint32_t, uint32_t &, std::vector<std::string> &) 861         [](const uint8_t *, uint32_t, uint32_t &, std::vector<std::string> &) {
862         return NO_PERMISSION;
863     });
864     onDataReceive(deviceInfos, data.get(), 1);
865     EXPECT_CALL(*processCommunicator, CheckAndGetDataHeadInfo).WillRepeatedly(
__anonb9037cf21c02(const uint8_t *, uint32_t, uint32_t &, std::vector<std::string> &userIds) 866         [](const uint8_t *, uint32_t, uint32_t &, std::vector<std::string> &userIds) {
867             userIds.emplace_back("1");
868             return OK;
869     });
870     /**
871      * @tc.steps: step3. adapter recv data with no callback
872      */
873     onDataReceive(deviceInfos, data.get(), 1);
__anonb9037cf21d02(const std::string &, const uint8_t *, uint32_t, const std::string &) 874     adapter->RegBytesReceiveCallback([](const std::string &, const uint8_t *, uint32_t, const std::string &) {
875     }, nullptr);
876     onDataReceive(deviceInfos, data.get(), 1);
877     RuntimeContext::GetInstance()->StopTaskPool();
878 }
879 
880 /**
881  * @tc.name: NetworkAdapter006
882  * @tc.desc: Test networkAdapter device change func
883  * @tc.type: FUNC
884  * @tc.require: AR000BVDGJ
885  * @tc.author: zhangqiquan
886  */
887 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter006, TestSize.Level1)
888 {
889     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
890     auto adapter = std::make_shared<NetworkAdapter>("label", processCommunicator);
891     OnDataReceive onDataReceive;
892     OnDeviceChange onDeviceChange;
893     InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
894     ASSERT_NE(onDeviceChange, nullptr);
895     DeviceInfos deviceInfos;
896     /**
897      * @tc.steps: step1. onDeviceChange with no same process
898      */
899     onDeviceChange(deviceInfos, true);
900     /**
901      * @tc.steps: step2. onDeviceChange with same process
902      */
__anonb9037cf21e02(const DeviceInfos &) 903     EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
904         return true;
905     });
906     onDeviceChange(deviceInfos, true);
__anonb9037cf21f02(const std::string &, bool) 907     adapter->RegTargetChangeCallback([](const std::string &, bool) {
908     }, nullptr);
909     onDeviceChange(deviceInfos, false);
910     /**
911      * @tc.steps: step3. adapter send data with db_error
912      * @tc.expected: step3. adapter send failed
913      */
914     onDeviceChange(deviceInfos, true);
__anonb9037cf22002(const DeviceInfos &, const uint8_t *, uint32_t) 915     EXPECT_CALL(*processCommunicator, SendData).WillRepeatedly([](const DeviceInfos &, const uint8_t *, uint32_t) {
916         return DB_ERROR;
917     });
__anonb9037cf22102(const DeviceInfos &) 918     EXPECT_CALL(*processCommunicator, IsSameProcessLabelStartedOnPeerDevice).WillRepeatedly([](const DeviceInfos &) {
919         return false;
920     });
921     auto data = std::make_shared<uint8_t>(1);
922     EXPECT_EQ(adapter->SendBytes("", data.get(), 1, 0), static_cast<int>(DB_ERROR));
923     RuntimeContext::GetInstance()->StopTaskPool();
924     EXPECT_EQ(adapter->IsDeviceOnline(""), false);
925     ExtendInfo info;
926     EXPECT_EQ(adapter->GetExtendHeaderHandle(info), nullptr);
927 }
928 
929 /**
930  * @tc.name: NetworkAdapter007
931  * @tc.desc: Test networkAdapter recv invalid head length
932  * @tc.type: FUNC
933  * @tc.require: AR000BVDGJ
934  * @tc.author: zhangqiquan
935  */
936 HWTEST_F(DistributedDBCommunicatorDeepTest, NetworkAdapter007, TestSize.Level1)
937 {
938     auto processCommunicator = std::make_shared<MockProcessCommunicator>();
939     auto adapter = std::make_shared<NetworkAdapter>("NetworkAdapter007", processCommunicator);
940     OnDataReceive onDataReceive;
941     OnDeviceChange onDeviceChange;
942     InitAdapter(adapter, processCommunicator, onDataReceive, onDeviceChange);
943     ASSERT_NE(onDeviceChange, nullptr);
944     /**
945      * @tc.steps: step1. CheckAndGetDataHeadInfo return invalid headLen
946      * @tc.expected: step1. adapter check this len
947      */
948     EXPECT_CALL(*processCommunicator, CheckAndGetDataHeadInfo).WillOnce([](const uint8_t *, uint32_t, uint32_t &headLen,
__anonb9037cf22202(const uint8_t *, uint32_t, uint32_t &headLen, std::vector<std::string> &) 949         std::vector<std::string> &) {
950         headLen = UINT32_MAX;
951         return OK;
952     });
953     /**
954      * @tc.steps: step2. Adapter ignore data because len is too large
955      * @tc.expected: step2. BytesReceive never call
956      */
957     int callByteReceiveCount = 0;
958     int res = adapter->RegBytesReceiveCallback([&callByteReceiveCount](const std::string &, const uint8_t *, uint32_t,
__anonb9037cf22302(const std::string &, const uint8_t *, uint32_t, const std::string &) 959         const std::string &) {
960         callByteReceiveCount++;
961     }, nullptr);
962     EXPECT_EQ(res, E_OK);
963     std::vector<uint8_t> data = { 1u };
964     DeviceInfos deviceInfos;
965     onDataReceive(deviceInfos, data.data(), 1u);
966     EXPECT_EQ(callByteReceiveCount, 0);
967 }
968 
969 /**
970  * @tc.name: RetrySendExceededLimit001
971  * @tc.desc: Test send result when the number of retry times exceeds the limit
972  * @tc.type: FUNC
973  * @tc.require:
974  * @tc.author: suyue
975  */
976 HWTEST_F(DistributedDBCommunicatorDeepTest, RetrySendExceededLimit001, TestSize.Level2)
977 {
978     /**
979      * @tc.steps: step1. connect device A with device B and fork SendBytes
980      * @tc.expected: step1. operation OK
981      */
982     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
983     std::atomic<int> count = 0;
__anonb9037cf22402() 984     g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
985         count++;
986         return -E_WAIT_RETRY;
987     });
988 
989     /**
990      * @tc.steps: step2. the number of retry times for device A to send a message exceeds the limit
991      * @tc.expected: step2. sendResult fail
992      */
993     std::vector<std::pair<int, bool>> sendResult;
__anonb9037cf22502(int result, bool isDirectEnd) 994     auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
995         sendResult.push_back(std::pair<int, bool>(result, isDirectEnd));
996     };
997     const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
998     Message *sendMsg = BuildRegedGiantMessage(dataLength);
999     ASSERT_NE(sendMsg, nullptr);
1000     SendConfig conf = {false, false, 0};
1001     int errCode = g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf, sendResultNotifier);
1002     EXPECT_EQ(errCode, E_OK);
1003     std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
1004     g_envDeviceA.adapterHandle->SimulateSendRetry(DEVICE_NAME_B);
1005     g_envDeviceA.adapterHandle->SimulateSendRetryClear(DEVICE_NAME_B, -E_BASE);
1006     int reTryTimes = 5;
1007     while ((count < 4) && (reTryTimes > 0)) { // Wait to make sure retry exceeds the limit
1008         std::this_thread::sleep_for(std::chrono::seconds(3));
1009         reTryTimes--;
1010     }
1011     ASSERT_EQ(sendResult.size(), static_cast<size_t>(1)); // only one callback result notification
1012     EXPECT_EQ(sendResult[0].first, -E_BASE); // index 0 retry fail
1013     EXPECT_EQ(sendResult[0].second, false);
1014 
1015     g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
1016     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1017 }
1018 
1019 /**
1020  * @tc.name: RetrySendExceededLimit002
1021  * @tc.desc: Test multi thread call SendableCallback when the number of retry times exceeds the limit
1022  * @tc.type: FUNC
1023  * @tc.require:
1024  * @tc.author: suyue
1025  */
1026 HWTEST_F(DistributedDBCommunicatorDeepTest, RetrySendExceededLimit002, TestSize.Level2)
1027 {
1028     /**
1029      * @tc.steps: step1. DeviceA send SendMessage and set SendBytes interface return -E_WAIT_RETRY
1030      * @tc.expected: step1. Send ok
1031      */
1032     AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1033     std::atomic<int> count = 0;
__anonb9037cf22602() 1034     g_envDeviceA.adapterHandle->ForkSendBytes([&count]() {
1035         count++;
1036         return -E_WAIT_RETRY;
1037     });
1038     std::vector<std::pair<int, bool>> sendResult;
__anonb9037cf22702(int result, bool isDirectEnd) 1039     auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
1040         sendResult.push_back(std::pair<int, bool>(result, isDirectEnd));
1041     };
1042     const uint32_t dataLength = 13 * 1024 * 1024; // 13 MB, 1024 is scale
1043     Message *sendMsg = BuildRegedGiantMessage(dataLength);
1044     ASSERT_NE(sendMsg, nullptr);
1045     SendConfig conf = {false, false, 0};
1046     EXPECT_EQ(g_commAB->SendMessage(DEVICE_NAME_B, sendMsg, conf, sendResultNotifier), E_OK);
1047     std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait 1s to make sure send done
1048 
1049     /**
1050      * @tc.steps: step2. Triggering multi thread call SendableCallback interface and set errorCode
1051      * @tc.expected: step2. Callback success
1052      */
1053     std::vector<std::thread> threads;
1054     int threadNum = 3;
1055     threads.reserve(threadNum);
1056     for (int n = 0; n < threadNum; n++) {
__anonb9037cf22802() 1057         threads.emplace_back([&]() {
1058             g_envDeviceA.adapterHandle->SimulateTriggerSendableCallback(DEVICE_NAME_B, -E_BASE);
1059         });
1060     }
1061     for (std::thread &t : threads) {
1062         t.join();
1063     }
1064 
1065     /**
1066      * @tc.steps: step3. Make The number of messages sent by device A exceed the limit
1067      * @tc.expected: step3. SendResult is the errorCode set by SendableCallback interface
1068      */
1069     int reTryTimes = 5;
1070     while ((count < 4) && (reTryTimes > 0)) {
1071         std::this_thread::sleep_for(std::chrono::seconds(3));
1072         reTryTimes--;
1073     }
1074     ASSERT_EQ(sendResult.size(), static_cast<size_t>(1));
1075     EXPECT_EQ(sendResult[0].first, -E_BASE);
1076     EXPECT_EQ(sendResult[0].second, false);
1077     g_envDeviceA.adapterHandle->ForkSendBytes(nullptr);
1078     AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
1079 }
1080