1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_data_sync.h"
18 
19 #include "parcel.h"
20 #include "log_print.h"
21 #include "sync_types.h"
22 #include "message_transform.h"
23 #include "performance_analysis.h"
24 #include "db_constant.h"
25 
26 namespace DistributedDB {
27 // Class MultiVerRequestPacket
CalculateLen() const28 uint32_t MultiVerRequestPacket::CalculateLen() const
29 {
30     uint64_t len = Parcel::GetIntLen();
31     len = Parcel::GetEightByteAlign(len);
32     len += Parcel::GetMultiVerCommitLen(commit_);
33     if (len > INT32_MAX) {
34         return 0;
35     }
36     return len;
37 }
38 
SetCommit(MultiVerCommitNode & commit)39 void MultiVerRequestPacket::SetCommit(MultiVerCommitNode &commit)
40 {
41     commit_ = std::move(commit);
42 }
43 
GetCommit(MultiVerCommitNode & commit) const44 void MultiVerRequestPacket::GetCommit(MultiVerCommitNode &commit) const
45 {
46     commit = commit_;
47 }
48 
SetErrCode(int32_t errCode)49 void MultiVerRequestPacket::SetErrCode(int32_t errCode)
50 {
51     errCode_ = errCode;
52 }
53 
GetErrCode() const54 int32_t MultiVerRequestPacket::GetErrCode() const
55 {
56     return errCode_;
57 }
58 
59 // Class MultiVerAckPacket
CalculateLen() const60 uint32_t MultiVerAckPacket::CalculateLen() const
61 {
62     uint64_t len = Parcel::GetIntLen();
63     len = Parcel::GetEightByteAlign(len);
64     for (const auto &iter : entries_) {
65         len += Parcel::GetVectorCharLen(iter);
66         if (len > INT32_MAX) {
67             return 0;
68         }
69     }
70     return len;
71 }
72 
SetData(std::vector<std::vector<uint8_t>> & data)73 void MultiVerAckPacket::SetData(std::vector<std::vector<uint8_t>> &data)
74 {
75     entries_ = std::move(data);
76 }
77 
GetData(std::vector<std::vector<uint8_t>> & data) const78 void MultiVerAckPacket::GetData(std::vector<std::vector<uint8_t>> &data) const
79 {
80     data = entries_;
81 }
82 
SetErrorCode(int32_t errCode)83 void MultiVerAckPacket::SetErrorCode(int32_t errCode)
84 {
85     errorCode_ = errCode;
86 }
87 
GetErrorCode(int32_t & errCode) const88 void MultiVerAckPacket::GetErrorCode(int32_t &errCode) const
89 {
90     errCode = errorCode_;
91 }
92 
93 // Class MultiVerDataSync
~MultiVerDataSync()94 MultiVerDataSync::~MultiVerDataSync()
95 {
96     storagePtr_ = nullptr;
97     communicateHandle_ = nullptr;
98 }
99 
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int MultiVerDataSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103         return -E_INVALID_ARGS;
104     }
105 
106     switch (inMsg->GetMessageType()) {
107         case TYPE_REQUEST:
108             return RequestPacketSerialization(buffer, length, inMsg);
109         case TYPE_RESPONSE:
110             return AckPacketSerialization(buffer, length, inMsg);
111         default:
112             return -E_MESSAGE_TYPE_ERROR;
113     }
114 }
115 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int MultiVerDataSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118     if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119         return -E_MESSAGE_ID_ERROR;
120     }
121 
122     switch (inMsg->GetMessageType()) {
123         case TYPE_REQUEST:
124             return RequestPacketDeSerialization(buffer, length, inMsg);
125         case TYPE_RESPONSE:
126             return AckPacketDeSerialization(buffer, length, inMsg);
127         default:
128             return -E_MESSAGE_TYPE_ERROR;
129     }
130 }
131 
CalculateLen(const Message * inMsg)132 uint32_t MultiVerDataSync::CalculateLen(const Message *inMsg)
133 {
134     if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135         return 0;
136     }
137 
138     uint32_t len = 0;
139     int errCode = E_OK;
140     switch (inMsg->GetMessageType()) {
141         case TYPE_REQUEST:
142             errCode = RequestPacketCalculateLen(inMsg, len);
143             break;
144         case TYPE_RESPONSE:
145             errCode = AckPacketCalculateLen(inMsg, len);
146             break;
147         default:
148             return 0;
149     }
150     if (errCode != E_OK) {
151         return 0;
152     }
153     return len;
154 }
155 
RegisterTransformFunc()156 int MultiVerDataSync::RegisterTransformFunc()
157 {
158     TransformFunc func;
159     func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
160     func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
161         return Serialization(buffer, length, inMsg);
162     };
163     func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
164         return DeSerialization(buffer, length, inMsg);
165     };
166     return MessageTransform::RegTransformFunction(MULTI_VER_DATA_SYNC_MESSAGE, func);
167 }
168 
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)169 int MultiVerDataSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
170 {
171     if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
172         return -E_INVALID_ARGS;
173     }
174     storagePtr_ = storagePtr;
175     communicateHandle_ = communicateHandle;
176     return E_OK;
177 }
178 
TimeOutCallback(MultiVerSyncTaskContext * context,const Message * message) const179 void MultiVerDataSync::TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const
180 {
181     return;
182 }
183 
SyncStart(MultiVerSyncTaskContext * context)184 int MultiVerDataSync::SyncStart(MultiVerSyncTaskContext *context)
185 {
186     if (context == nullptr) {
187         return -E_INVALID_ARGS;
188     }
189     LOGD("MultiVerDataSync::SyncStart dst=%s{private}, begin", context->GetDeviceId().c_str());
190     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
191     if (performance != nullptr) {
192         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_DATA_GET_VALID_COMMIT);
193     }
194     MultiVerCommitNode commit;
195     int errCode = GetValidCommit(context, commit);
196     if (performance != nullptr) {
197         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_GET_VALID_COMMIT);
198     }
199     if (errCode != E_OK) {
200         // sync don't need start
201         SendFinishedRequest(context);
202         return errCode;
203     }
204 
205     errCode = SendRequestPacket(context, commit);
206     LOGD("MultiVerDataSync::SyncStart dst=%s{private}, end", context->GetDeviceId().c_str());
207     return errCode;
208 }
209 
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)210 int MultiVerDataSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
211 {
212     if (message == nullptr || context == nullptr) {
213         return -E_INVALID_ARGS;
214     }
215 
216     if (!IsPacketValid(message, TYPE_REQUEST)) {
217         return -E_INVALID_ARGS;
218     }
219 
220     const MultiVerRequestPacket *packet = message->GetObject<MultiVerRequestPacket>();
221     if (packet == nullptr) {
222         return -E_INVALID_ARGS;
223     }
224     if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
225         return -E_LAST_SYNC_FRAME;
226     }
227     MultiVerCommitNode commit;
228     packet->GetCommit(commit);
229     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
230     if (performance != nullptr) {
231         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_COMMIT_DATA);
232     }
233     std::vector<MultiVerKvEntry *> dataEntries;
234     int errCode = GetCommitData(commit, dataEntries);
235     if (performance != nullptr) {
236         performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_COMMIT_DATA);
237     }
238     if (errCode != E_OK) {
239         LOGE("MultiVerDataSync::RequestRecvCallback : GetCommitData ERR, errno = %d", errCode);
240     }
241 
242     errCode = SendAckPacket(context, dataEntries, errCode, message);
243     for (auto &iter : dataEntries) {
244         ReleaseKvEntry(iter);
245         iter = nullptr;
246     }
247     LOGD("MultiVerDataSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}",
248          errCode, context->GetDeviceId().c_str());
249     return errCode;
250 }
251 
AckRecvCallback(MultiVerSyncTaskContext * context,const Message * message)252 int MultiVerDataSync::AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message)
253 {
254     if (message == nullptr) {
255         return -E_INVALID_ARGS;
256     }
257     if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
258         return -E_INVALID_ARGS;
259     }
260 
261     const MultiVerAckPacket *packet = message->GetObject<MultiVerAckPacket>();
262     if (packet == nullptr) {
263         return -E_INVALID_ARGS;
264     }
265     int32_t errCode = E_OK;
266     packet->GetErrorCode(errCode);
267     if (errCode != E_OK) {
268         return errCode;
269     }
270     std::vector<std::vector<uint8_t>> dataEntries;
271     std::vector<MultiVerKvEntry *> entries;
272     std::vector<ValueSliceHash> valueHashes;
273     MultiVerKvEntry *entry = nullptr;
274 
275     packet->GetData(dataEntries);
276     for (const auto &iter : dataEntries) {
277         MultiVerKvEntry *item = CreateKvEntry(iter);
278         entries.push_back(item);
279     }
280     context->ReleaseEntries();
281     context->SetEntries(entries);
282     context->SetEntriesIndex(0);
283     context->SetEntriesSize(static_cast<int>(entries.size()));
284     LOGD("MultiVerDataSync::AckRecvCallback src=%s{private}, entries num = %zu",
285         context->GetDeviceId().c_str(), entries.size());
286 
287     if (entries.size() > 0) {
288         entry = entries[0];
289         errCode = entry->GetValueHash(valueHashes);
290         if (errCode != E_OK) {
291             return errCode;
292         }
293     }
294     context->SetValueSliceHashNodes(valueHashes);
295     context->SetValueSlicesIndex(0);
296     context->SetValueSlicesSize(valueHashes.size());
297     LOGD("MultiVerDataSync::AckRecvCallback src=%s{private}, ValueSlicesSize num = %zu",
298         context->GetDeviceId().c_str(), valueHashes.size());
299     return errCode;
300 }
301 
PutCommitData(const MultiVerCommitNode & commit,const std::vector<MultiVerKvEntry * > & entries,const std::string & deviceName)302 int MultiVerDataSync::PutCommitData(const MultiVerCommitNode &commit, const std::vector<MultiVerKvEntry *> &entries,
303     const std::string &deviceName)
304 {
305     return storagePtr_->PutCommitData(commit, entries, deviceName);
306 }
307 
MergeSyncCommit(const MultiVerCommitNode & commit,const std::vector<MultiVerCommitNode> & commits)308 int MultiVerDataSync::MergeSyncCommit(const MultiVerCommitNode &commit, const std::vector<MultiVerCommitNode> &commits)
309 {
310     return storagePtr_->MergeSyncCommit(commit, commits);
311 }
312 
ReleaseKvEntry(const MultiVerKvEntry * entry)313 void MultiVerDataSync::ReleaseKvEntry(const MultiVerKvEntry *entry)
314 {
315     return storagePtr_->ReleaseKvEntry(entry);
316 }
317 
SendFinishedRequest(const MultiVerSyncTaskContext * context)318 void MultiVerDataSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
319 {
320     if (context == nullptr) {
321         return;
322     }
323     MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
324     if (packet == nullptr) {
325         LOGE("MultiVerRequestPacket::SendRequestPacket : new packet error");
326         return;
327     }
328     packet->SetErrCode(-E_LAST_SYNC_FRAME);
329     Message *message = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
330     if (message == nullptr) {
331         delete packet;
332         packet = nullptr;
333         LOGE("MultiVerDataSync::SendRequestPacket : new message error");
334         return;
335     }
336     message->SetMessageType(TYPE_REQUEST);
337     message->SetTarget(context->GetDeviceId());
338     int errCode = message->SetExternalObject(packet);
339     if (errCode != E_OK) {
340         delete packet;
341         packet = nullptr;
342         delete message;
343         message = nullptr;
344         LOGE("[MultiVerDataSync][SendFinishedRequest] : SetExternalObject failed errCode:%d", errCode);
345         return;
346     }
347     message->SetSessionId(context->GetRequestSessionId());
348     message->SetSequenceId(context->GetSequenceId());
349 
350     errCode = Send(message->GetTarget(), message);
351     if (errCode != E_OK) {
352         delete message;
353         message = nullptr;
354         LOGE("[MultiVerDataSync][SendFinishedRequest] SendFinishedRequest failed, err %d", errCode);
355     }
356     LOGI("[MultiVerDataSync][SendFinishedRequest] SendFinishedRequest dst=%s{private}", context->GetDeviceId().c_str());
357 }
358 
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)359 int MultiVerDataSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
360 {
361     if ((inMsg == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
362         return -E_INVALID_ARGS;
363     }
364     const MultiVerRequestPacket *packet = inMsg->GetObject<MultiVerRequestPacket>();
365     if (packet == nullptr) {
366         return -E_INVALID_ARGS;
367     }
368 
369     len = packet->CalculateLen();
370     return E_OK;
371 }
372 
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)373 int MultiVerDataSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
374 {
375     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
376         return -E_INVALID_ARGS;
377     }
378     const MultiVerRequestPacket *packet = inMsg->GetObject<MultiVerRequestPacket>();
379     if ((packet == nullptr) || (length != packet->CalculateLen())) {
380         return -E_INVALID_ARGS;
381     }
382 
383     MultiVerCommitNode commit;
384     packet->GetCommit(commit);
385     int32_t ackCode = packet->GetErrCode();
386 
387     Parcel parcel(buffer, length);
388     int errCode = parcel.WriteInt(ackCode);
389     if (errCode != E_OK) {
390         return -E_SECUREC_ERROR;
391     }
392     parcel.EightByteAlign();
393 
394     // commitMap Serialization
395     errCode = parcel.WriteMultiVerCommit(commit);
396     if (errCode != E_OK) {
397         return -E_SECUREC_ERROR;
398     }
399 
400     return errCode;
401 }
402 
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)403 int MultiVerDataSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
404 {
405     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
406         return -E_INVALID_ARGS;
407     }
408 
409     MultiVerCommitNode commit;
410     Parcel parcel(const_cast<uint8_t *>(buffer), length);
411     int32_t pktErrCode;
412     uint64_t packLen = parcel.ReadInt(pktErrCode);
413     parcel.EightByteAlign();
414     if (parcel.IsError()) {
415         return -E_INVALID_ARGS;
416     }
417     packLen = Parcel::GetEightByteAlign(packLen);
418     // commit DeSerialization
419     packLen += parcel.ReadMultiVerCommit(commit);
420     if (packLen != length || parcel.IsError()) {
421         return -E_INVALID_ARGS;
422     }
423     MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
424     if (packet == nullptr) {
425         LOGE("MultiVerDataSync::RequestPacketDeSerialization : new packet error");
426         return -E_OUT_OF_MEMORY;
427     }
428     packet->SetCommit(commit);
429     packet->SetErrCode(pktErrCode);
430     int errCode = inMsg->SetExternalObject<>(packet);
431     if (errCode != E_OK) {
432         delete packet;
433         packet = nullptr;
434     }
435     return errCode;
436 }
437 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)438 int MultiVerDataSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
439 {
440     if (!IsPacketValid(inMsg, TYPE_RESPONSE)) {
441         return -E_INVALID_ARGS;
442     }
443 
444     const MultiVerAckPacket *packet = inMsg->GetObject<MultiVerAckPacket>();
445     if (packet == nullptr) {
446         return -E_INVALID_ARGS;
447     }
448     len = packet->CalculateLen();
449     return E_OK;
450 }
451 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)452 int MultiVerDataSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
453 {
454     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
455         return -E_INVALID_ARGS;
456     }
457     const MultiVerAckPacket *packet = inMsg->GetObject<MultiVerAckPacket>();
458     if ((packet == nullptr) || (length != packet->CalculateLen())) {
459         return -E_INVALID_ARGS;
460     }
461 
462     Parcel parcel(buffer, length);
463     std::vector<std::vector<uint8_t>> entries;
464 
465     packet->GetData(entries);
466     int32_t errCode = E_OK;
467     packet->GetErrorCode(errCode);
468     // errCode Serialization
469     errCode = parcel.WriteInt(errCode);
470     if (errCode != E_OK) {
471         return -E_SECUREC_ERROR;
472     }
473     parcel.EightByteAlign();
474 
475     // commits vector Serialization
476     for (const auto &iter : entries) {
477         errCode = parcel.WriteVectorChar(iter);
478         if (errCode != E_OK) {
479             return -E_SECUREC_ERROR;
480         }
481     }
482 
483     return errCode;
484 }
485 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)486 int MultiVerDataSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
487 {
488     if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
489         return -E_INVALID_ARGS;
490     }
491 
492     Parcel parcel(const_cast<uint8_t *>(buffer), length);
493     int32_t pktErrCode;
494 
495     // errCode DeSerialization
496     uint32_t packLen = parcel.ReadInt(pktErrCode);
497     if (parcel.IsError()) {
498         return -E_INVALID_ARGS;
499     }
500     parcel.EightByteAlign();
501     packLen = Parcel::GetEightByteAlign(packLen);
502 
503     // commits vector DeSerialization
504     std::vector<std::vector<uint8_t>> entries;
505     while (packLen < length) {
506         std::vector<uint8_t> data;
507         packLen += parcel.ReadVectorChar(data);
508         // A valid dataItem got, Save to storage
509         entries.push_back(data);
510         if (parcel.IsError()) {
511             return -E_INVALID_ARGS;
512         }
513     }
514     MultiVerAckPacket *packet = new (std::nothrow) MultiVerAckPacket();
515     if (packet == nullptr) {
516         LOGE("MultiVerDataSync::AckPacketDeSerialization : new packet error");
517         return -E_OUT_OF_MEMORY;
518     }
519     packet->SetData(entries);
520     packet->SetErrorCode(pktErrCode);
521     int errCode = inMsg->SetExternalObject<>(packet);
522     if (errCode != E_OK) {
523         delete packet;
524         packet = nullptr;
525     }
526     return errCode;
527 }
528 
IsPacketValid(const Message * inMsg,uint16_t messageType)529 bool MultiVerDataSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
530 {
531     if ((inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) {
532         return false;
533     }
534     if (messageType != inMsg->GetMessageType()) {
535         return false;
536     }
537     return true;
538 }
539 
GetValidCommit(MultiVerSyncTaskContext * context,MultiVerCommitNode & commit)540 int MultiVerDataSync::GetValidCommit(MultiVerSyncTaskContext *context, MultiVerCommitNode &commit)
541 {
542     int commitsSize = context->GetCommitsSize();
543     if (commitsSize > DBConstant::MAX_COMMIT_SIZE) {
544         LOGE("MultiVerDataSync::GetValidCommit failed, to large!");
545         return -E_LENGTH_ERROR;
546     }
547     int index = context->GetCommitIndex();
548     if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
549         context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
550         index--;
551     }
552     index = (index < 0) ? 0 : index;
553     LOGD("MultiVerDataSync::GetValidCommit begin, dst=%s{private}, index = %d", context->GetDeviceId().c_str(), index);
554     while (index < commitsSize) {
555         MultiVerCommitNode commitItem;
556         context->GetCommit(index, commitItem);
557         LOGD("MultiVerDataSync::GetValidCommit , dst=%s{private}, index = %d, commitsSize = %d",
558             context->GetDeviceId().c_str(), index, commitsSize);
559 
560         index++;
561         context->SetCommitIndex(index);
562         if (IsCommitExisted(commitItem)) {
563             continue;
564         }
565         commit = commitItem;
566         LOGD("MultiVerDataSync::GetValidCommit ok, dst=%s{private}, commit index = %d",
567              context->GetDeviceId().c_str(), index);
568         return E_OK;
569     }
570     LOGD("MultiVerDataSync::GetValidCommit not found, dst=%s{private}", context->GetDeviceId().c_str());
571     return -E_NOT_FOUND;
572 }
573 
IsCommitExisted(const MultiVerCommitNode & commit)574 bool MultiVerDataSync::IsCommitExisted(const MultiVerCommitNode &commit)
575 {
576     return storagePtr_->IsCommitExisted(commit);
577 }
578 
Send(const DeviceID & deviceId,const Message * inMsg)579 int MultiVerDataSync::Send(const DeviceID &deviceId, const Message *inMsg)
580 {
581     SendConfig conf = {false, false, SEND_TIME_OUT, {}};
582     int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
583     if (errCode != E_OK) {
584         LOGE("MultiVerDataSync::Send ERR! ERR = %d", errCode);
585     }
586     return errCode;
587 }
588 
SendRequestPacket(const MultiVerSyncTaskContext * context,MultiVerCommitNode & commit)589 int MultiVerDataSync::SendRequestPacket(const MultiVerSyncTaskContext *context, MultiVerCommitNode &commit)
590 {
591     MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
592     if (packet == nullptr) {
593         LOGE("MultiVerRequestPacket::SendRequestPacket : new packet error");
594         return -E_OUT_OF_MEMORY;
595     }
596     packet->SetCommit(commit);
597     Message *message = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
598     if (message == nullptr) {
599         delete packet;
600         packet = nullptr;
601         LOGE("MultiVerDataSync::SendRequestPacket : new message error");
602         return -E_OUT_OF_MEMORY;
603     }
604     message->SetMessageType(TYPE_REQUEST);
605     message->SetTarget(context->GetDeviceId());
606     int errCode = message->SetExternalObject(packet);
607     if (errCode != E_OK) {
608         delete packet;
609         packet = nullptr;
610         delete message;
611         message = nullptr;
612         LOGE("[MultiVerDataSync][SendRequestPacket] : SetExternalObject failed errCode:%d", errCode);
613         return errCode;
614     }
615     message->SetSessionId(context->GetRequestSessionId());
616     message->SetSequenceId(context->GetSequenceId());
617 
618     PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
619     if (performance != nullptr) {
620         performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV);
621     }
622     errCode = Send(message->GetTarget(), message);
623     if (errCode != E_OK) {
624         delete message;
625         message = nullptr;
626     }
627     LOGD("MultiVerDataSync::SendRequestPacket end");
628     return errCode;
629 }
630 
SendAckPacket(const MultiVerSyncTaskContext * context,const std::vector<MultiVerKvEntry * > & dataItems,int retCode,const Message * message)631 int MultiVerDataSync::SendAckPacket(const MultiVerSyncTaskContext *context,
632     const std::vector<MultiVerKvEntry *> &dataItems, int retCode, const Message *message)
633 {
634     if (message == nullptr) {
635         LOGE("MultiVerDataSync::SendAckPacket : message is nullptr");
636         return -E_INVALID_ARGS;
637     }
638 
639     MultiVerAckPacket *packet = new (std::nothrow) MultiVerAckPacket();
640     if (packet == nullptr) {
641         LOGE("MultiVerDataSync::SendAckPack et : packet is nullptr");
642         return -E_OUT_OF_MEMORY;
643     }
644     Message *ackMessage = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
645     if (ackMessage == nullptr) {
646         delete packet;
647         packet = nullptr;
648         LOGE("MultiVerDataSync::SendAckPacket : new message error");
649         return -E_OUT_OF_MEMORY;
650     }
651 
652     std::vector<std::vector<uint8_t>> entries;
653     for (const auto &iter : dataItems) {
654         std::vector<uint8_t> item;
655         iter->GetSerialData(item);
656         entries.push_back(item);
657     }
658     packet->SetData(entries);
659     packet->SetErrorCode(static_cast<int32_t>(retCode));
660 
661     ackMessage->SetMessageType(TYPE_RESPONSE);
662     ackMessage->SetTarget(context->GetDeviceId());
663     int errCode = ackMessage->SetExternalObject(packet);
664     if (errCode != E_OK) {
665         delete packet;
666         packet = nullptr;
667         delete ackMessage;
668         ackMessage = nullptr;
669         LOGE("[MultiVerDataSync][SendAckPacket] : SetExternalObject failed errCode:%d", errCode);
670         return errCode;
671     }
672     ackMessage->SetSequenceId(message->GetSequenceId());
673     ackMessage->SetSessionId(message->GetSessionId());
674     errCode = Send(ackMessage->GetTarget(), ackMessage);
675     if (errCode != E_OK) {
676         delete ackMessage;
677         ackMessage = nullptr;
678     }
679     LOGD("MultiVerDataSync::SendAckPacket end, dst=%s{private}, errCode = %d", context->GetDeviceId().c_str(), errCode);
680     return errCode;
681 }
682 
GetCommitData(const MultiVerCommitNode & commit,std::vector<MultiVerKvEntry * > & entries)683 int MultiVerDataSync::GetCommitData(const MultiVerCommitNode &commit, std::vector<MultiVerKvEntry *> &entries)
684 {
685     return storagePtr_->GetCommitData(commit, entries);
686 }
687 
CreateKvEntry(const std::vector<uint8_t> & entry)688 MultiVerKvEntry *MultiVerDataSync::CreateKvEntry(const std::vector<uint8_t> &entry)
689 {
690     return storagePtr_->CreateKvEntry(entry);
691 }
692 }
693 #endif