1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_data_sync.h"
18
19 #include "parcel.h"
20 #include "log_print.h"
21 #include "sync_types.h"
22 #include "message_transform.h"
23 #include "performance_analysis.h"
24 #include "db_constant.h"
25
26 namespace DistributedDB {
27 // Class MultiVerRequestPacket
CalculateLen() const28 uint32_t MultiVerRequestPacket::CalculateLen() const
29 {
30 uint64_t len = Parcel::GetIntLen();
31 len = Parcel::GetEightByteAlign(len);
32 len += Parcel::GetMultiVerCommitLen(commit_);
33 if (len > INT32_MAX) {
34 return 0;
35 }
36 return len;
37 }
38
SetCommit(MultiVerCommitNode & commit)39 void MultiVerRequestPacket::SetCommit(MultiVerCommitNode &commit)
40 {
41 commit_ = std::move(commit);
42 }
43
GetCommit(MultiVerCommitNode & commit) const44 void MultiVerRequestPacket::GetCommit(MultiVerCommitNode &commit) const
45 {
46 commit = commit_;
47 }
48
SetErrCode(int32_t errCode)49 void MultiVerRequestPacket::SetErrCode(int32_t errCode)
50 {
51 errCode_ = errCode;
52 }
53
GetErrCode() const54 int32_t MultiVerRequestPacket::GetErrCode() const
55 {
56 return errCode_;
57 }
58
59 // Class MultiVerAckPacket
CalculateLen() const60 uint32_t MultiVerAckPacket::CalculateLen() const
61 {
62 uint64_t len = Parcel::GetIntLen();
63 len = Parcel::GetEightByteAlign(len);
64 for (const auto &iter : entries_) {
65 len += Parcel::GetVectorCharLen(iter);
66 if (len > INT32_MAX) {
67 return 0;
68 }
69 }
70 return len;
71 }
72
SetData(std::vector<std::vector<uint8_t>> & data)73 void MultiVerAckPacket::SetData(std::vector<std::vector<uint8_t>> &data)
74 {
75 entries_ = std::move(data);
76 }
77
GetData(std::vector<std::vector<uint8_t>> & data) const78 void MultiVerAckPacket::GetData(std::vector<std::vector<uint8_t>> &data) const
79 {
80 data = entries_;
81 }
82
SetErrorCode(int32_t errCode)83 void MultiVerAckPacket::SetErrorCode(int32_t errCode)
84 {
85 errorCode_ = errCode;
86 }
87
GetErrorCode(int32_t & errCode) const88 void MultiVerAckPacket::GetErrorCode(int32_t &errCode) const
89 {
90 errCode = errorCode_;
91 }
92
93 // Class MultiVerDataSync
~MultiVerDataSync()94 MultiVerDataSync::~MultiVerDataSync()
95 {
96 storagePtr_ = nullptr;
97 communicateHandle_ = nullptr;
98 }
99
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)100 int MultiVerDataSync::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
101 {
102 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
103 return -E_INVALID_ARGS;
104 }
105
106 switch (inMsg->GetMessageType()) {
107 case TYPE_REQUEST:
108 return RequestPacketSerialization(buffer, length, inMsg);
109 case TYPE_RESPONSE:
110 return AckPacketSerialization(buffer, length, inMsg);
111 default:
112 return -E_MESSAGE_TYPE_ERROR;
113 }
114 }
115
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)116 int MultiVerDataSync::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
117 {
118 if ((buffer == nullptr) || !(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
119 return -E_MESSAGE_ID_ERROR;
120 }
121
122 switch (inMsg->GetMessageType()) {
123 case TYPE_REQUEST:
124 return RequestPacketDeSerialization(buffer, length, inMsg);
125 case TYPE_RESPONSE:
126 return AckPacketDeSerialization(buffer, length, inMsg);
127 default:
128 return -E_MESSAGE_TYPE_ERROR;
129 }
130 }
131
CalculateLen(const Message * inMsg)132 uint32_t MultiVerDataSync::CalculateLen(const Message *inMsg)
133 {
134 if (!(IsPacketValid(inMsg, TYPE_RESPONSE) || IsPacketValid(inMsg, TYPE_REQUEST))) {
135 return 0;
136 }
137
138 uint32_t len = 0;
139 int errCode = E_OK;
140 switch (inMsg->GetMessageType()) {
141 case TYPE_REQUEST:
142 errCode = RequestPacketCalculateLen(inMsg, len);
143 break;
144 case TYPE_RESPONSE:
145 errCode = AckPacketCalculateLen(inMsg, len);
146 break;
147 default:
148 return 0;
149 }
150 if (errCode != E_OK) {
151 return 0;
152 }
153 return len;
154 }
155
RegisterTransformFunc()156 int MultiVerDataSync::RegisterTransformFunc()
157 {
158 TransformFunc func;
159 func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
160 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
161 return Serialization(buffer, length, inMsg);
162 };
163 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
164 return DeSerialization(buffer, length, inMsg);
165 };
166 return MessageTransform::RegTransformFunction(MULTI_VER_DATA_SYNC_MESSAGE, func);
167 }
168
Initialize(MultiVerKvDBSyncInterface * storagePtr,ICommunicator * communicateHandle)169 int MultiVerDataSync::Initialize(MultiVerKvDBSyncInterface *storagePtr, ICommunicator *communicateHandle)
170 {
171 if ((storagePtr == nullptr) || (communicateHandle == nullptr)) {
172 return -E_INVALID_ARGS;
173 }
174 storagePtr_ = storagePtr;
175 communicateHandle_ = communicateHandle;
176 return E_OK;
177 }
178
TimeOutCallback(MultiVerSyncTaskContext * context,const Message * message) const179 void MultiVerDataSync::TimeOutCallback(MultiVerSyncTaskContext *context, const Message *message) const
180 {
181 return;
182 }
183
SyncStart(MultiVerSyncTaskContext * context)184 int MultiVerDataSync::SyncStart(MultiVerSyncTaskContext *context)
185 {
186 if (context == nullptr) {
187 return -E_INVALID_ARGS;
188 }
189 LOGD("MultiVerDataSync::SyncStart dst=%s{private}, begin", context->GetDeviceId().c_str());
190 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
191 if (performance != nullptr) {
192 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_DATA_GET_VALID_COMMIT);
193 }
194 MultiVerCommitNode commit;
195 int errCode = GetValidCommit(context, commit);
196 if (performance != nullptr) {
197 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_DATA_GET_VALID_COMMIT);
198 }
199 if (errCode != E_OK) {
200 // sync don't need start
201 SendFinishedRequest(context);
202 return errCode;
203 }
204
205 errCode = SendRequestPacket(context, commit);
206 LOGD("MultiVerDataSync::SyncStart dst=%s{private}, end", context->GetDeviceId().c_str());
207 return errCode;
208 }
209
RequestRecvCallback(const MultiVerSyncTaskContext * context,const Message * message)210 int MultiVerDataSync::RequestRecvCallback(const MultiVerSyncTaskContext *context, const Message *message)
211 {
212 if (message == nullptr || context == nullptr) {
213 return -E_INVALID_ARGS;
214 }
215
216 if (!IsPacketValid(message, TYPE_REQUEST)) {
217 return -E_INVALID_ARGS;
218 }
219
220 const MultiVerRequestPacket *packet = message->GetObject<MultiVerRequestPacket>();
221 if (packet == nullptr) {
222 return -E_INVALID_ARGS;
223 }
224 if (packet->GetErrCode() == -E_LAST_SYNC_FRAME) {
225 return -E_LAST_SYNC_FRAME;
226 }
227 MultiVerCommitNode commit;
228 packet->GetCommit(commit);
229 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
230 if (performance != nullptr) {
231 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_GET_COMMIT_DATA);
232 }
233 std::vector<MultiVerKvEntry *> dataEntries;
234 int errCode = GetCommitData(commit, dataEntries);
235 if (performance != nullptr) {
236 performance->StepTimeRecordEnd(MV_TEST_RECORDS::RECORD_GET_COMMIT_DATA);
237 }
238 if (errCode != E_OK) {
239 LOGE("MultiVerDataSync::RequestRecvCallback : GetCommitData ERR, errno = %d", errCode);
240 }
241
242 errCode = SendAckPacket(context, dataEntries, errCode, message);
243 for (auto &iter : dataEntries) {
244 ReleaseKvEntry(iter);
245 iter = nullptr;
246 }
247 LOGD("MultiVerDataSync::RequestRecvCallback : SendAckPacket, errno = %d, dst = %s{private}",
248 errCode, context->GetDeviceId().c_str());
249 return errCode;
250 }
251
AckRecvCallback(MultiVerSyncTaskContext * context,const Message * message)252 int MultiVerDataSync::AckRecvCallback(MultiVerSyncTaskContext *context, const Message *message)
253 {
254 if (message == nullptr) {
255 return -E_INVALID_ARGS;
256 }
257 if (!IsPacketValid(message, TYPE_RESPONSE) || (context == nullptr)) {
258 return -E_INVALID_ARGS;
259 }
260
261 const MultiVerAckPacket *packet = message->GetObject<MultiVerAckPacket>();
262 if (packet == nullptr) {
263 return -E_INVALID_ARGS;
264 }
265 int32_t errCode = E_OK;
266 packet->GetErrorCode(errCode);
267 if (errCode != E_OK) {
268 return errCode;
269 }
270 std::vector<std::vector<uint8_t>> dataEntries;
271 std::vector<MultiVerKvEntry *> entries;
272 std::vector<ValueSliceHash> valueHashes;
273 MultiVerKvEntry *entry = nullptr;
274
275 packet->GetData(dataEntries);
276 for (const auto &iter : dataEntries) {
277 MultiVerKvEntry *item = CreateKvEntry(iter);
278 entries.push_back(item);
279 }
280 context->ReleaseEntries();
281 context->SetEntries(entries);
282 context->SetEntriesIndex(0);
283 context->SetEntriesSize(static_cast<int>(entries.size()));
284 LOGD("MultiVerDataSync::AckRecvCallback src=%s{private}, entries num = %zu",
285 context->GetDeviceId().c_str(), entries.size());
286
287 if (entries.size() > 0) {
288 entry = entries[0];
289 errCode = entry->GetValueHash(valueHashes);
290 if (errCode != E_OK) {
291 return errCode;
292 }
293 }
294 context->SetValueSliceHashNodes(valueHashes);
295 context->SetValueSlicesIndex(0);
296 context->SetValueSlicesSize(valueHashes.size());
297 LOGD("MultiVerDataSync::AckRecvCallback src=%s{private}, ValueSlicesSize num = %zu",
298 context->GetDeviceId().c_str(), valueHashes.size());
299 return errCode;
300 }
301
PutCommitData(const MultiVerCommitNode & commit,const std::vector<MultiVerKvEntry * > & entries,const std::string & deviceName)302 int MultiVerDataSync::PutCommitData(const MultiVerCommitNode &commit, const std::vector<MultiVerKvEntry *> &entries,
303 const std::string &deviceName)
304 {
305 return storagePtr_->PutCommitData(commit, entries, deviceName);
306 }
307
MergeSyncCommit(const MultiVerCommitNode & commit,const std::vector<MultiVerCommitNode> & commits)308 int MultiVerDataSync::MergeSyncCommit(const MultiVerCommitNode &commit, const std::vector<MultiVerCommitNode> &commits)
309 {
310 return storagePtr_->MergeSyncCommit(commit, commits);
311 }
312
ReleaseKvEntry(const MultiVerKvEntry * entry)313 void MultiVerDataSync::ReleaseKvEntry(const MultiVerKvEntry *entry)
314 {
315 return storagePtr_->ReleaseKvEntry(entry);
316 }
317
SendFinishedRequest(const MultiVerSyncTaskContext * context)318 void MultiVerDataSync::SendFinishedRequest(const MultiVerSyncTaskContext *context)
319 {
320 if (context == nullptr) {
321 return;
322 }
323 MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
324 if (packet == nullptr) {
325 LOGE("MultiVerRequestPacket::SendRequestPacket : new packet error");
326 return;
327 }
328 packet->SetErrCode(-E_LAST_SYNC_FRAME);
329 Message *message = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
330 if (message == nullptr) {
331 delete packet;
332 packet = nullptr;
333 LOGE("MultiVerDataSync::SendRequestPacket : new message error");
334 return;
335 }
336 message->SetMessageType(TYPE_REQUEST);
337 message->SetTarget(context->GetDeviceId());
338 int errCode = message->SetExternalObject(packet);
339 if (errCode != E_OK) {
340 delete packet;
341 packet = nullptr;
342 delete message;
343 message = nullptr;
344 LOGE("[MultiVerDataSync][SendFinishedRequest] : SetExternalObject failed errCode:%d", errCode);
345 return;
346 }
347 message->SetSessionId(context->GetRequestSessionId());
348 message->SetSequenceId(context->GetSequenceId());
349
350 errCode = Send(message->GetTarget(), message);
351 if (errCode != E_OK) {
352 delete message;
353 message = nullptr;
354 LOGE("[MultiVerDataSync][SendFinishedRequest] SendFinishedRequest failed, err %d", errCode);
355 }
356 LOGI("[MultiVerDataSync][SendFinishedRequest] SendFinishedRequest dst=%s{private}", context->GetDeviceId().c_str());
357 }
358
RequestPacketCalculateLen(const Message * inMsg,uint32_t & len)359 int MultiVerDataSync::RequestPacketCalculateLen(const Message *inMsg, uint32_t &len)
360 {
361 if ((inMsg == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
362 return -E_INVALID_ARGS;
363 }
364 const MultiVerRequestPacket *packet = inMsg->GetObject<MultiVerRequestPacket>();
365 if (packet == nullptr) {
366 return -E_INVALID_ARGS;
367 }
368
369 len = packet->CalculateLen();
370 return E_OK;
371 }
372
RequestPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)373 int MultiVerDataSync::RequestPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
374 {
375 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
376 return -E_INVALID_ARGS;
377 }
378 const MultiVerRequestPacket *packet = inMsg->GetObject<MultiVerRequestPacket>();
379 if ((packet == nullptr) || (length != packet->CalculateLen())) {
380 return -E_INVALID_ARGS;
381 }
382
383 MultiVerCommitNode commit;
384 packet->GetCommit(commit);
385 int32_t ackCode = packet->GetErrCode();
386
387 Parcel parcel(buffer, length);
388 int errCode = parcel.WriteInt(ackCode);
389 if (errCode != E_OK) {
390 return -E_SECUREC_ERROR;
391 }
392 parcel.EightByteAlign();
393
394 // commitMap Serialization
395 errCode = parcel.WriteMultiVerCommit(commit);
396 if (errCode != E_OK) {
397 return -E_SECUREC_ERROR;
398 }
399
400 return errCode;
401 }
402
RequestPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)403 int MultiVerDataSync::RequestPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
404 {
405 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_REQUEST)) {
406 return -E_INVALID_ARGS;
407 }
408
409 MultiVerCommitNode commit;
410 Parcel parcel(const_cast<uint8_t *>(buffer), length);
411 int32_t pktErrCode;
412 uint64_t packLen = parcel.ReadInt(pktErrCode);
413 parcel.EightByteAlign();
414 if (parcel.IsError()) {
415 return -E_INVALID_ARGS;
416 }
417 packLen = Parcel::GetEightByteAlign(packLen);
418 // commit DeSerialization
419 packLen += parcel.ReadMultiVerCommit(commit);
420 if (packLen != length || parcel.IsError()) {
421 return -E_INVALID_ARGS;
422 }
423 MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
424 if (packet == nullptr) {
425 LOGE("MultiVerDataSync::RequestPacketDeSerialization : new packet error");
426 return -E_OUT_OF_MEMORY;
427 }
428 packet->SetCommit(commit);
429 packet->SetErrCode(pktErrCode);
430 int errCode = inMsg->SetExternalObject<>(packet);
431 if (errCode != E_OK) {
432 delete packet;
433 packet = nullptr;
434 }
435 return errCode;
436 }
437
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)438 int MultiVerDataSync::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
439 {
440 if (!IsPacketValid(inMsg, TYPE_RESPONSE)) {
441 return -E_INVALID_ARGS;
442 }
443
444 const MultiVerAckPacket *packet = inMsg->GetObject<MultiVerAckPacket>();
445 if (packet == nullptr) {
446 return -E_INVALID_ARGS;
447 }
448 len = packet->CalculateLen();
449 return E_OK;
450 }
451
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)452 int MultiVerDataSync::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
453 {
454 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
455 return -E_INVALID_ARGS;
456 }
457 const MultiVerAckPacket *packet = inMsg->GetObject<MultiVerAckPacket>();
458 if ((packet == nullptr) || (length != packet->CalculateLen())) {
459 return -E_INVALID_ARGS;
460 }
461
462 Parcel parcel(buffer, length);
463 std::vector<std::vector<uint8_t>> entries;
464
465 packet->GetData(entries);
466 int32_t errCode = E_OK;
467 packet->GetErrorCode(errCode);
468 // errCode Serialization
469 errCode = parcel.WriteInt(errCode);
470 if (errCode != E_OK) {
471 return -E_SECUREC_ERROR;
472 }
473 parcel.EightByteAlign();
474
475 // commits vector Serialization
476 for (const auto &iter : entries) {
477 errCode = parcel.WriteVectorChar(iter);
478 if (errCode != E_OK) {
479 return -E_SECUREC_ERROR;
480 }
481 }
482
483 return errCode;
484 }
485
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)486 int MultiVerDataSync::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
487 {
488 if ((buffer == nullptr) || !IsPacketValid(inMsg, TYPE_RESPONSE)) {
489 return -E_INVALID_ARGS;
490 }
491
492 Parcel parcel(const_cast<uint8_t *>(buffer), length);
493 int32_t pktErrCode;
494
495 // errCode DeSerialization
496 uint32_t packLen = parcel.ReadInt(pktErrCode);
497 if (parcel.IsError()) {
498 return -E_INVALID_ARGS;
499 }
500 parcel.EightByteAlign();
501 packLen = Parcel::GetEightByteAlign(packLen);
502
503 // commits vector DeSerialization
504 std::vector<std::vector<uint8_t>> entries;
505 while (packLen < length) {
506 std::vector<uint8_t> data;
507 packLen += parcel.ReadVectorChar(data);
508 // A valid dataItem got, Save to storage
509 entries.push_back(data);
510 if (parcel.IsError()) {
511 return -E_INVALID_ARGS;
512 }
513 }
514 MultiVerAckPacket *packet = new (std::nothrow) MultiVerAckPacket();
515 if (packet == nullptr) {
516 LOGE("MultiVerDataSync::AckPacketDeSerialization : new packet error");
517 return -E_OUT_OF_MEMORY;
518 }
519 packet->SetData(entries);
520 packet->SetErrorCode(pktErrCode);
521 int errCode = inMsg->SetExternalObject<>(packet);
522 if (errCode != E_OK) {
523 delete packet;
524 packet = nullptr;
525 }
526 return errCode;
527 }
528
IsPacketValid(const Message * inMsg,uint16_t messageType)529 bool MultiVerDataSync::IsPacketValid(const Message *inMsg, uint16_t messageType)
530 {
531 if ((inMsg == nullptr) || (inMsg->GetMessageId() != MULTI_VER_DATA_SYNC_MESSAGE)) {
532 return false;
533 }
534 if (messageType != inMsg->GetMessageType()) {
535 return false;
536 }
537 return true;
538 }
539
GetValidCommit(MultiVerSyncTaskContext * context,MultiVerCommitNode & commit)540 int MultiVerDataSync::GetValidCommit(MultiVerSyncTaskContext *context, MultiVerCommitNode &commit)
541 {
542 int commitsSize = context->GetCommitsSize();
543 if (commitsSize > DBConstant::MAX_COMMIT_SIZE) {
544 LOGE("MultiVerDataSync::GetValidCommit failed, to large!");
545 return -E_LENGTH_ERROR;
546 }
547 int index = context->GetCommitIndex();
548 if (context->GetRetryStatus() == SyncTaskContext::NEED_RETRY) {
549 context->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
550 index--;
551 }
552 index = (index < 0) ? 0 : index;
553 LOGD("MultiVerDataSync::GetValidCommit begin, dst=%s{private}, index = %d", context->GetDeviceId().c_str(), index);
554 while (index < commitsSize) {
555 MultiVerCommitNode commitItem;
556 context->GetCommit(index, commitItem);
557 LOGD("MultiVerDataSync::GetValidCommit , dst=%s{private}, index = %d, commitsSize = %d",
558 context->GetDeviceId().c_str(), index, commitsSize);
559
560 index++;
561 context->SetCommitIndex(index);
562 if (IsCommitExisted(commitItem)) {
563 continue;
564 }
565 commit = commitItem;
566 LOGD("MultiVerDataSync::GetValidCommit ok, dst=%s{private}, commit index = %d",
567 context->GetDeviceId().c_str(), index);
568 return E_OK;
569 }
570 LOGD("MultiVerDataSync::GetValidCommit not found, dst=%s{private}", context->GetDeviceId().c_str());
571 return -E_NOT_FOUND;
572 }
573
IsCommitExisted(const MultiVerCommitNode & commit)574 bool MultiVerDataSync::IsCommitExisted(const MultiVerCommitNode &commit)
575 {
576 return storagePtr_->IsCommitExisted(commit);
577 }
578
Send(const DeviceID & deviceId,const Message * inMsg)579 int MultiVerDataSync::Send(const DeviceID &deviceId, const Message *inMsg)
580 {
581 SendConfig conf = {false, false, SEND_TIME_OUT, {}};
582 int errCode = communicateHandle_->SendMessage(deviceId, inMsg, conf);
583 if (errCode != E_OK) {
584 LOGE("MultiVerDataSync::Send ERR! ERR = %d", errCode);
585 }
586 return errCode;
587 }
588
SendRequestPacket(const MultiVerSyncTaskContext * context,MultiVerCommitNode & commit)589 int MultiVerDataSync::SendRequestPacket(const MultiVerSyncTaskContext *context, MultiVerCommitNode &commit)
590 {
591 MultiVerRequestPacket *packet = new (std::nothrow) MultiVerRequestPacket();
592 if (packet == nullptr) {
593 LOGE("MultiVerRequestPacket::SendRequestPacket : new packet error");
594 return -E_OUT_OF_MEMORY;
595 }
596 packet->SetCommit(commit);
597 Message *message = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
598 if (message == nullptr) {
599 delete packet;
600 packet = nullptr;
601 LOGE("MultiVerDataSync::SendRequestPacket : new message error");
602 return -E_OUT_OF_MEMORY;
603 }
604 message->SetMessageType(TYPE_REQUEST);
605 message->SetTarget(context->GetDeviceId());
606 int errCode = message->SetExternalObject(packet);
607 if (errCode != E_OK) {
608 delete packet;
609 packet = nullptr;
610 delete message;
611 message = nullptr;
612 LOGE("[MultiVerDataSync][SendRequestPacket] : SetExternalObject failed errCode:%d", errCode);
613 return errCode;
614 }
615 message->SetSessionId(context->GetRequestSessionId());
616 message->SetSequenceId(context->GetSequenceId());
617
618 PerformanceAnalysis *performance = PerformanceAnalysis::GetInstance();
619 if (performance != nullptr) {
620 performance->StepTimeRecordStart(MV_TEST_RECORDS::RECORD_DATA_ENTRY_SEND_REQUEST_TO_ACK_RECV);
621 }
622 errCode = Send(message->GetTarget(), message);
623 if (errCode != E_OK) {
624 delete message;
625 message = nullptr;
626 }
627 LOGD("MultiVerDataSync::SendRequestPacket end");
628 return errCode;
629 }
630
SendAckPacket(const MultiVerSyncTaskContext * context,const std::vector<MultiVerKvEntry * > & dataItems,int retCode,const Message * message)631 int MultiVerDataSync::SendAckPacket(const MultiVerSyncTaskContext *context,
632 const std::vector<MultiVerKvEntry *> &dataItems, int retCode, const Message *message)
633 {
634 if (message == nullptr) {
635 LOGE("MultiVerDataSync::SendAckPacket : message is nullptr");
636 return -E_INVALID_ARGS;
637 }
638
639 MultiVerAckPacket *packet = new (std::nothrow) MultiVerAckPacket();
640 if (packet == nullptr) {
641 LOGE("MultiVerDataSync::SendAckPack et : packet is nullptr");
642 return -E_OUT_OF_MEMORY;
643 }
644 Message *ackMessage = new (std::nothrow) Message(MULTI_VER_DATA_SYNC_MESSAGE);
645 if (ackMessage == nullptr) {
646 delete packet;
647 packet = nullptr;
648 LOGE("MultiVerDataSync::SendAckPacket : new message error");
649 return -E_OUT_OF_MEMORY;
650 }
651
652 std::vector<std::vector<uint8_t>> entries;
653 for (const auto &iter : dataItems) {
654 std::vector<uint8_t> item;
655 iter->GetSerialData(item);
656 entries.push_back(item);
657 }
658 packet->SetData(entries);
659 packet->SetErrorCode(static_cast<int32_t>(retCode));
660
661 ackMessage->SetMessageType(TYPE_RESPONSE);
662 ackMessage->SetTarget(context->GetDeviceId());
663 int errCode = ackMessage->SetExternalObject(packet);
664 if (errCode != E_OK) {
665 delete packet;
666 packet = nullptr;
667 delete ackMessage;
668 ackMessage = nullptr;
669 LOGE("[MultiVerDataSync][SendAckPacket] : SetExternalObject failed errCode:%d", errCode);
670 return errCode;
671 }
672 ackMessage->SetSequenceId(message->GetSequenceId());
673 ackMessage->SetSessionId(message->GetSessionId());
674 errCode = Send(ackMessage->GetTarget(), ackMessage);
675 if (errCode != E_OK) {
676 delete ackMessage;
677 ackMessage = nullptr;
678 }
679 LOGD("MultiVerDataSync::SendAckPacket end, dst=%s{private}, errCode = %d", context->GetDeviceId().c_str(), errCode);
680 return errCode;
681 }
682
GetCommitData(const MultiVerCommitNode & commit,std::vector<MultiVerKvEntry * > & entries)683 int MultiVerDataSync::GetCommitData(const MultiVerCommitNode &commit, std::vector<MultiVerKvEntry *> &entries)
684 {
685 return storagePtr_->GetCommitData(commit, entries);
686 }
687
CreateKvEntry(const std::vector<uint8_t> & entry)688 MultiVerKvEntry *MultiVerDataSync::CreateKvEntry(const std::vector<uint8_t> &entry)
689 {
690 return storagePtr_->CreateKvEntry(entry);
691 }
692 }
693 #endif