1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "single_ver_serialize_manager.h"
17 
18 #include "db_common.h"
19 #include "generic_single_ver_kv_entry.h"
20 #include "icommunicator.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "parcel.h"
24 #include "remote_executor_packet.h"
25 #include "sync_types.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 std::mutex SingleVerSerializeManager::handlesLock_;
30 std::map<uint32_t, TransformFunc> SingleVerSerializeManager::messageHandles_;
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)31 int SingleVerSerializeManager::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
32 {
33     if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
34         return -E_MESSAGE_ID_ERROR;
35     }
36     SerializeFunc serializeFunc = nullptr;
37     {
38         std::lock_guard<std::mutex> autoLock(handlesLock_);
39         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
40             serializeFunc = messageHandles_.at(inMsg->GetMessageId()).serializeFunc;
41         }
42     }
43     if (serializeFunc != nullptr) {
44         return serializeFunc(buffer, length, inMsg);
45     }
46 
47     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
48         return ControlSerialization(buffer, length, inMsg);
49     }
50     return DataSerialization(buffer, length, inMsg);
51 }
52 
DataSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)53 int SingleVerSerializeManager::DataSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
54 {
55     switch (inMsg->GetMessageType()) {
56         case TYPE_REQUEST:
57             return DataPacketSerialization(buffer, length, inMsg);
58         case TYPE_RESPONSE:
59         case TYPE_NOTIFY:
60             return AckPacketSerialization(buffer, length, inMsg);
61         default:
62             return -E_MESSAGE_TYPE_ERROR;
63     }
64 }
65 
ControlSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)66 int SingleVerSerializeManager::ControlSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
67 {
68     switch (inMsg->GetMessageType()) {
69         case TYPE_REQUEST:
70             return ControlPacketSerialization(buffer, length, inMsg);
71         case TYPE_RESPONSE:
72             return AckControlPacketSerialization(buffer, length, inMsg);
73         default:
74             return -E_MESSAGE_TYPE_ERROR;
75     }
76 }
77 
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)78 int SingleVerSerializeManager::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
79 {
80     if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
81         return -E_MESSAGE_ID_ERROR;
82     }
83     DeserializeFunc deserializeFunc = nullptr;
84     {
85         std::lock_guard<std::mutex> autoLock(handlesLock_);
86         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
87             deserializeFunc = messageHandles_.at(inMsg->GetMessageId()).deserializeFunc;
88         }
89     }
90     if (deserializeFunc != nullptr) {
91         return deserializeFunc(buffer, length, inMsg);
92     }
93     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
94         return ControlDeSerialization(buffer, length, inMsg);
95     }
96     return DataDeSerialization(buffer, length, inMsg);
97 }
98 
DataDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)99 int SingleVerSerializeManager::DataDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
100 {
101     switch (inMsg->GetMessageType()) {
102         case TYPE_REQUEST:
103             return DataPacketDeSerialization(buffer, length, inMsg);
104         case TYPE_RESPONSE:
105         case TYPE_NOTIFY:
106             return AckPacketDeSerialization(buffer, length, inMsg);
107         default:
108             return -E_MESSAGE_TYPE_ERROR;
109     }
110 }
111 
ControlDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)112 int SingleVerSerializeManager::ControlDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
113 {
114     switch (inMsg->GetMessageType()) {
115         case TYPE_REQUEST:
116             return ControlPacketDeSerialization(buffer, length, inMsg);
117         case TYPE_RESPONSE:
118             return AckControlPacketDeSerialization(buffer, length, inMsg);
119         default:
120             return -E_MESSAGE_TYPE_ERROR;
121     }
122 }
123 
CalculateLen(const Message * inMsg)124 uint32_t SingleVerSerializeManager::CalculateLen(const Message *inMsg)
125 {
126     if (!(IsPacketValid(inMsg))) {
127         return 0;
128     }
129     ComputeLengthFunc computeFunc = nullptr;
130     {
131         std::lock_guard<std::mutex> autoLock(handlesLock_);
132         if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
133             computeFunc = messageHandles_.at(inMsg->GetMessageId()).computeFunc;
134         }
135     }
136     if (computeFunc != nullptr) {
137         return computeFunc(inMsg);
138     }
139     if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
140         return CalculateControlLen(inMsg);
141     }
142     return CalculateDataLen(inMsg);
143 }
144 
CalculateDataLen(const Message * inMsg)145 uint32_t SingleVerSerializeManager::CalculateDataLen(const Message *inMsg)
146 {
147     uint32_t len = 0;
148     int errCode;
149     switch (inMsg->GetMessageType()) {
150         case TYPE_REQUEST:
151             errCode = DataPacketCalculateLen(inMsg, len);
152             if (errCode != E_OK) {
153                 LOGE("[CalculateDataLen] calculate data request packet len failed, errCode=%d", errCode);
154                 return 0;
155             }
156             return len;
157         case TYPE_RESPONSE:
158         case TYPE_NOTIFY:
159             errCode = AckPacketCalculateLen(inMsg, len);
160             if (errCode != E_OK) {
161                 LOGE("[CalculateDataLen] calculate data notify packet len failed errCode=%d", errCode);
162                 return 0;
163             }
164             return len;
165         default:
166             return 0;
167     }
168 }
169 
CalculateControlLen(const Message * inMsg)170 uint32_t SingleVerSerializeManager::CalculateControlLen(const Message *inMsg)
171 {
172     uint32_t len = 0;
173     int errCode;
174     switch (inMsg->GetMessageType()) {
175         case TYPE_REQUEST:
176             errCode = ControlPacketCalculateLen(inMsg, len);
177             if (errCode != E_OK) {
178                 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
179                 return 0;
180             }
181             return len;
182         case TYPE_RESPONSE:
183         case TYPE_NOTIFY:
184             errCode = AckControlPacketCalculateLen(inMsg, len);
185             if (errCode != E_OK) {
186                 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
187                 return 0;
188             }
189             return len;
190         default:
191             return 0;
192     }
193 }
194 
RegisterTransformFunc()195 int SingleVerSerializeManager::RegisterTransformFunc()
196 {
197     RegisterInnerTransformFunc();
198     return RegisterCommunicatorTransformFunc();
199 }
200 
DataPacketSyncerPartSerialization(Parcel & parcel,const DataRequestPacket * packet)201 int SingleVerSerializeManager::DataPacketSyncerPartSerialization(Parcel &parcel, const DataRequestPacket *packet)
202 {
203     parcel.WriteUInt64(packet->GetEndWaterMark());
204     parcel.WriteUInt64(packet->GetLocalWaterMark());
205     parcel.WriteUInt64(packet->GetPeerWaterMark());
206     parcel.WriteInt(packet->GetSendCode());
207     parcel.WriteInt(packet->GetMode());
208     parcel.WriteUInt32(packet->GetSessionId());
209     parcel.WriteVector<uint64_t>(packet->GetReserved());
210     if (parcel.IsError()) {
211         return -E_PARSE_FAIL;
212     }
213     if (packet->GetVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
214         parcel.WriteUInt32(packet->GetFlag());
215     }
216     parcel.EightByteAlign();
217     if (parcel.IsError()) {
218         return -E_PARSE_FAIL;
219     }
220     return E_OK;
221 }
222 
DataPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)223 int SingleVerSerializeManager::DataPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
224 {
225     auto packet = inMsg->GetObject<DataRequestPacket>();
226     if (packet == nullptr) {
227         return -E_INVALID_ARGS;
228     }
229     Parcel parcel(buffer, length);
230 
231     // version
232     int errCode = parcel.WriteUInt32(packet->GetVersion());
233     if (errCode != E_OK) {
234         LOGE("[DataPacketSerialization] Serialize version failed");
235         return errCode;
236     }
237     // sendDataItems
238     errCode = GenericSingleVerKvEntry::SerializeDatas(
239         (packet->IsCompressData() ? std::vector<SendDataItem> {} : packet->GetData()), parcel, packet->GetVersion());
240     if (errCode != E_OK) {
241         LOGE("[DataPacketSerialization] Serialize Data failed");
242         return errCode;
243     }
244 
245     // data sync
246     errCode = DataPacketSyncerPartSerialization(parcel, packet);
247     if (errCode != E_OK) {
248         LOGE("[DataPacketSerialization] Serialize Data failed");
249         return errCode;
250     }
251     if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
252         errCode = DataPacketQuerySyncSerialization(parcel, packet); // for query sync
253         if (errCode != E_OK) {
254             return errCode;
255         }
256     }
257     if (packet->IsCompressData()) {
258         // serialize compress data
259         errCode = GenericSingleVerKvEntry::SerializeCompressedDatas(packet->GetData(), packet->GetCompressData(),
260             parcel, packet->GetVersion(), packet->GetCompressAlgo());
261         if (errCode != E_OK) {
262             LOGE("[DataPacketSerialization] Serialize compress Data failed");
263             return errCode;
264         }
265     }
266     return DataPacketInnerSerialization(packet, parcel);
267 }
268 
DataPacketQuerySyncSerialization(Parcel & parcel,const DataRequestPacket * packet)269 int SingleVerSerializeManager::DataPacketQuerySyncSerialization(Parcel &parcel, const DataRequestPacket *packet)
270 {
271     // deleted record send watermark
272     int errCode = parcel.WriteUInt64(packet->GetDeletedWaterMark());
273     if (errCode != E_OK) {
274         LOGE("[QuerySerialization] Serialize deleted record send watermark failed!");
275         return errCode;
276     }
277 
278     // query identify
279     QuerySyncObject queryObj = packet->GetQuery();
280     errCode = parcel.WriteString(packet->GetQueryId());
281     if (errCode != E_OK) {
282         LOGE("[QuerySerialization] Serialize query id failed!");
283         return errCode;
284     }
285     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
286         // need to check.
287         errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
288     }
289     return errCode;
290 }
291 
DataPacketCalculateLen(const Message * inMsg,uint32_t & len)292 int SingleVerSerializeManager::DataPacketCalculateLen(const Message *inMsg, uint32_t &len)
293 {
294     const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
295     if (packet == nullptr) {
296         return -E_INVALID_ARGS;
297     }
298 
299     len = packet->CalculateLen(inMsg->GetMessageId());
300     return E_OK;
301 }
302 
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)303 int SingleVerSerializeManager::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
304 {
305     const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
306     if (packet == nullptr) {
307         return -E_INVALID_ARGS;
308     }
309 
310     len = packet->CalculateLen();
311     return E_OK;
312 }
313 
IsPacketValid(const Message * inMsg)314 bool SingleVerSerializeManager::IsPacketValid(const Message *inMsg)
315 {
316     if (inMsg == nullptr) {
317         return false;
318     }
319 
320     int msgType = inMsg->GetMessageType();
321     if (msgType != TYPE_REQUEST && msgType != TYPE_RESPONSE && msgType != TYPE_NOTIFY) {
322         LOGE("[DataSync][IsPacketValid] Message type ERROR! message type=%d", msgType);
323         return false;
324     }
325     return true;
326 }
327 
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)328 int SingleVerSerializeManager::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
329 {
330     const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
331     if (packet == nullptr) {
332         return -E_INVALID_ARGS;
333     }
334 
335     Parcel parcel(buffer, length);
336     parcel.WriteUInt32(packet->GetVersion());
337     if (parcel.IsError()) {
338         return -E_PARSE_FAIL;
339     }
340     // now V1 compatible for softWareVersion :{101, 102}
341     return AckPacketSyncerPartSerializationV1(parcel, packet);
342 }
343 
AckPacketSyncerPartSerializationV1(Parcel & parcel,const DataAckPacket * packet)344 int SingleVerSerializeManager::AckPacketSyncerPartSerializationV1(Parcel &parcel, const DataAckPacket *packet)
345 {
346     parcel.WriteUInt64(packet->GetData());
347     parcel.WriteInt(packet->GetRecvCode());
348     parcel.WriteVector<uint64_t>(packet->GetReserved());
349     parcel.EightByteAlign();
350     if (parcel.IsError()) {
351         return -E_PARSE_FAIL;
352     }
353     return E_OK;
354 }
355 
DataPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)356 int SingleVerSerializeManager::DataPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
357 {
358     std::vector<SendDataItem> dataItems;
359     uint32_t version;
360     Parcel parcel(const_cast<uint8_t *>(buffer), length);
361     uint32_t packLen = parcel.ReadUInt32(version);
362     if (parcel.IsError()) {
363         return -E_PARSE_FAIL;
364     }
365 
366     if (version > SOFTWARE_VERSION_CURRENT) {
367         return -E_VERSION_NOT_SUPPORT;
368     }
369 
370     packLen += static_cast<uint32_t>(GenericSingleVerKvEntry::DeSerializeDatas(dataItems, parcel));
371     if (parcel.IsError()) {
372         return -E_PARSE_FAIL;
373     }
374 
375     auto packet = new (std::nothrow) DataRequestPacket();
376     if (packet == nullptr) {
377         return -E_OUT_OF_MEMORY;
378     }
379 
380     packet->SetVersion(version);
381     packet->SetData(dataItems);
382     int errCode = DataPacketSyncerPartDeSerialization(parcel, packet, packLen, length, version);
383     if (errCode != E_OK) {
384         goto ERROR;
385     }
386     if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
387         errCode = DataPacketQuerySyncDeSerialization(parcel, packet);
388         if (errCode != E_OK) {
389             goto ERROR;
390         }
391     }
392     errCode = DataPacketInnerDeSerialization(packet, parcel);
393     if (errCode != E_OK) {
394         goto ERROR;
395     }
396     errCode = inMsg->SetExternalObject<>(packet);
397     if (errCode == E_OK) {
398         return errCode;
399     }
400 
401 ERROR:
402     delete packet;
403     packet = nullptr;
404     return errCode;
405 }
406 
DataPacketQuerySyncDeSerialization(Parcel & parcel,DataRequestPacket * packet)407 int SingleVerSerializeManager::DataPacketQuerySyncDeSerialization(Parcel &parcel, DataRequestPacket *packet)
408 {
409     WaterMark deletedWatermark = 0;
410     parcel.ReadUInt64(deletedWatermark);
411     std::string queryId;
412     parcel.ReadString(queryId);
413     if (parcel.IsError()) {
414         return -E_PARSE_FAIL;
415     }
416     // query identify
417     QuerySyncObject querySyncObj;
418     int errCode = E_OK;
419     // for version 105, query is always sent.
420     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
421         // need to check.
422         errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
423     }
424     if (errCode != E_OK) {
425         LOGI("[SingleVerSerializeManager] DeSerializeData object failed.");
426         return errCode;
427     }
428     packet->SetDeletedWaterMark(deletedWatermark);
429     packet->SetQueryId(queryId);
430     if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
431         packet->SetQuery(querySyncObj);
432     }
433     return E_OK;
434 }
435 
DataPacketCompressDataDeSerialization(Parcel & parcel,DataRequestPacket * packet)436 int SingleVerSerializeManager::DataPacketCompressDataDeSerialization(Parcel &parcel, DataRequestPacket *packet)
437 {
438     std::vector<SendDataItem> originalData;
439     int errCode = GenericSingleVerKvEntry::DeSerializeCompressedDatas(originalData, parcel);
440     if (errCode != E_OK) {
441         LOGE("[SingleVerSerializeManager] DeSerializeComptressData failed, errCode=%d", errCode);
442         return errCode;
443     }
444     packet->SetData(originalData);
445     return E_OK;
446 }
447 
DataPacketSyncerPartDeSerialization(Parcel & parcel,DataRequestPacket * packet,uint32_t packLen,uint32_t length,uint32_t version)448 int SingleVerSerializeManager::DataPacketSyncerPartDeSerialization(Parcel &parcel, DataRequestPacket *packet,
449     uint32_t packLen, uint32_t length, uint32_t version)
450 {
451     WaterMark waterMark;
452     WaterMark localWaterMark;
453     WaterMark peerWaterMark;
454     int32_t sendCode;
455     int32_t mode;
456     uint32_t sessionId;
457     std::vector<uint64_t> reserved;
458 
459     uint64_t totPacketLen = packLen;
460     totPacketLen += parcel.ReadUInt64(waterMark);
461     totPacketLen += parcel.ReadUInt64(localWaterMark);
462     totPacketLen += parcel.ReadUInt64(peerWaterMark);
463     totPacketLen += parcel.ReadInt(sendCode);
464     totPacketLen += parcel.ReadInt(mode);
465     totPacketLen += parcel.ReadUInt32(sessionId);
466     totPacketLen += parcel.ReadVector<uint64_t>(reserved);
467     if (version > SOFTWARE_VERSION_RELEASE_2_0) {
468         uint32_t flag = 0u;
469         totPacketLen += parcel.ReadUInt32(flag);
470         packet->SetFlag(flag);
471     }
472     if (totPacketLen > INT32_MAX) {
473         LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input totPackLen=%" PRIu64 " is over limit.",
474              totPacketLen);
475         return -E_LENGTH_ERROR;
476     }
477     parcel.EightByteAlign();
478     totPacketLen = Parcel::GetEightByteAlign(totPacketLen);
479     if (parcel.IsError()) {
480         LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input len=%" PRIu32 ", totPackLen=%" PRIu64,
481             length, totPacketLen);
482         return -E_LENGTH_ERROR;
483     }
484     packet->SetEndWaterMark(waterMark);
485     packet->SetLocalWaterMark(localWaterMark);
486     packet->SetPeerWaterMark(peerWaterMark);
487     packet->SetSendCode(sendCode);
488     packet->SetMode(mode);
489     packet->SetSessionId(sessionId);
490     packet->SetReserved(reserved);
491     return E_OK;
492 }
493 
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)494 int SingleVerSerializeManager::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
495 {
496     DataAckPacket packet;
497     Parcel parcel(const_cast<uint8_t *>(buffer), length);
498     uint32_t version;
499 
500     parcel.ReadUInt32(version);
501     if (parcel.IsError()) {
502         return -E_INVALID_ARGS;
503     }
504     if (version > SOFTWARE_VERSION_CURRENT) {
505         packet.SetVersion(version);
506         packet.SetRecvCode(-E_VERSION_NOT_SUPPORT);
507         return inMsg->SetCopiedObject<>(packet);
508     }
509     packet.SetVersion(version);
510     // now V1 compatible for softWareVersion :{101, 102}
511     int errCode = AckPacketSyncerPartDeSerializationV1(parcel, packet);
512     if (errCode != E_OK) {
513         return errCode;
514     }
515 
516     return inMsg->SetCopiedObject<>(packet);
517 }
518 
AckPacketSyncerPartDeSerializationV1(Parcel & parcel,DataAckPacket & packet)519 int SingleVerSerializeManager::AckPacketSyncerPartDeSerializationV1(Parcel &parcel, DataAckPacket &packet)
520 {
521     WaterMark mark;
522     int32_t errCode;
523     std::vector<uint64_t> reserved;
524 
525     parcel.ReadUInt64(mark);
526     parcel.ReadInt(errCode);
527     parcel.ReadVector<uint64_t>(reserved);
528     if (parcel.IsError()) {
529         LOGE("[AckPacketSyncerPartDeSerializationV1] DeSerialization failed");
530         return -E_INVALID_ARGS;
531     }
532     packet.SetData(mark);
533     packet.SetRecvCode(errCode);
534     packet.SetReserved(reserved);
535     return E_OK;
536 }
537 
ControlPacketCalculateLen(const Message * inMsg,uint32_t & len)538 int SingleVerSerializeManager::ControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
539 {
540     auto packet = inMsg->GetObject<ControlRequestPacket>();
541     if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
542         LOGE("[ControlPacketSerialization] invalid control cmd");
543         return -E_INVALID_ARGS;
544     }
545     if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
546         return SingleVerSerializeManager::SubscribeCalculateLen(inMsg, len);
547     }
548     return E_OK;
549 }
550 
ControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)551 int SingleVerSerializeManager::ControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
552 {
553     auto packet = inMsg->GetObject<ControlRequestPacket>();
554     if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
555         LOGE("[ControlPacketSerialization] invalid control cmd");
556         return -E_INVALID_ARGS;
557     }
558     if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
559         return SingleVerSerializeManager::SubscribeSerialization(buffer, length, inMsg);
560     }
561     return E_OK;
562 }
563 
ControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)564 int SingleVerSerializeManager::ControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
565 {
566     Parcel parcel(const_cast<uint8_t *>(buffer), length);
567     ControlRequestPacket packet;
568     int errCode = ControlRequestDeSerialization(parcel, packet);
569     if (errCode != E_OK) {
570         return errCode;
571     }
572     if (packet.GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet.GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
573         errCode = SubscribeDeSerialization(parcel, inMsg, packet);
574     }
575     return errCode;
576 }
577 
AckControlPacketCalculateLen(const Message * inMsg,uint32_t & len)578 int SingleVerSerializeManager::AckControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
579 {
580     auto packet = inMsg->GetObject<ControlAckPacket>();
581     if (packet == nullptr) {
582         LOGE("[AckControlPacketCalculateLen] invalid control cmd");
583         return -E_INVALID_ARGS;
584     }
585     len = packet->CalculateLen();
586     return E_OK;
587 }
588 
AckControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)589 int SingleVerSerializeManager::AckControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
590 {
591     auto packet = inMsg->GetObject<ControlAckPacket>();
592     if (packet == nullptr) {
593         return -E_INVALID_ARGS;
594     }
595     Parcel parcel(buffer, length);
596     parcel.WriteUInt32(packet->GetVersion());
597     parcel.WriteInt(packet->GetRecvCode());
598     parcel.WriteUInt32(packet->GetcontrolCmdType());
599     parcel.WriteUInt32(packet->GetFlag());
600     if (parcel.IsError()) {
601         LOGE("[AckControlPacketSerialization] Serialization failed");
602         return -E_INVALID_ARGS;
603     }
604     parcel.EightByteAlign();
605     return E_OK;
606 }
607 
AckControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)608 int SingleVerSerializeManager::AckControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
609 {
610     auto packet = new (std::nothrow) ControlAckPacket();
611     if (packet == nullptr) {
612         return -E_OUT_OF_MEMORY;
613     }
614     Parcel parcel(const_cast<uint8_t *>(buffer), length);
615     int32_t recvCode = 0;
616     uint32_t version = 0;
617     uint32_t controlCmdType = 0;
618     uint32_t flag = 0;
619     parcel.ReadUInt32(version);
620     parcel.ReadInt(recvCode);
621     parcel.ReadUInt32(controlCmdType);
622     parcel.ReadUInt32(flag);
623     int errCode;
624     if (parcel.IsError()) {
625         LOGE("[AckControlPacketDeSerialization] DeSerialization failed");
626         errCode = -E_INVALID_ARGS;
627         goto ERROR;
628     }
629     packet->SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), flag);
630     errCode = inMsg->SetExternalObject<>(packet);
631     if (errCode != E_OK) {
632         goto ERROR;
633     }
634     return errCode;
635 ERROR:
636     delete packet;
637     packet = nullptr;
638     return errCode;
639 }
640 
ControlRequestSerialization(Parcel & parcel,const Message * inMsg)641 int SingleVerSerializeManager::ControlRequestSerialization(Parcel &parcel, const Message *inMsg)
642 {
643     auto packet = inMsg->GetObject<ControlRequestPacket>();
644     if (packet == nullptr) {
645         return -E_INVALID_ARGS;
646     }
647     parcel.WriteUInt32(packet->GetVersion());
648     parcel.WriteInt(packet->GetSendCode());
649     parcel.WriteUInt32(packet->GetcontrolCmdType());
650     parcel.WriteUInt32(packet->GetFlag());
651     parcel.EightByteAlign();
652     if (parcel.IsError()) {
653         LOGE("[ControlRequestSerialization] Serialization failed");
654         return -E_INVALID_ARGS;
655     }
656     return E_OK;
657 }
658 
ControlRequestDeSerialization(Parcel & parcel,ControlRequestPacket & packet)659 int SingleVerSerializeManager::ControlRequestDeSerialization(Parcel &parcel, ControlRequestPacket &packet)
660 {
661     uint32_t version = 0;
662     int32_t sendCode = 0;
663     uint32_t controlCmdType = 0;
664     uint32_t flag = 0;
665     parcel.ReadUInt32(version);
666     if (version > SOFTWARE_VERSION_CURRENT) {
667         return -E_VERSION_NOT_SUPPORT;
668     }
669     parcel.ReadInt(sendCode);
670     parcel.ReadUInt32(controlCmdType);
671     parcel.ReadUInt32(flag);
672     if (parcel.IsError()) {
673         LOGE("[ControlRequestDeSerialization] deserialize failed!");
674         return -E_LENGTH_ERROR;
675     }
676     packet.SetPacketHead(sendCode, version, static_cast<int32_t>(controlCmdType), flag);
677     return E_OK;
678 }
679 
SubscribeCalculateLen(const Message * inMsg,uint32_t & len)680 int SingleVerSerializeManager::SubscribeCalculateLen(const Message *inMsg, uint32_t &len)
681 {
682     auto packet = inMsg->GetObject<SubscribeRequest>();
683     if (packet == nullptr) {
684         return -E_INVALID_ARGS;
685     }
686     len = packet->CalculateLen();
687     return E_OK;
688 }
689 
SubscribeSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)690 int SingleVerSerializeManager::SubscribeSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
691 {
692     auto packet = inMsg->GetObject<SubscribeRequest>();
693     if (packet == nullptr) {
694         return -E_INVALID_ARGS;
695     }
696     Parcel parcel(buffer, length);
697     int errCode = ControlRequestSerialization(parcel, inMsg);
698     if (errCode != E_OK) {
699         LOGE("[SubscribeSerialization] ControlRequestPacket Serialization failed, errCode=%d", errCode);
700         return errCode;
701     }
702     QuerySyncObject queryObj = packet->GetQuery();
703     errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
704     if (errCode != E_OK) {
705         LOGE("[SubscribeSerialization] query object Serialization failed, errCode=%d", errCode);
706         return errCode;
707     }
708     return E_OK;
709 }
710 
SubscribeDeSerialization(Parcel & parcel,Message * inMsg,ControlRequestPacket & controlPacket)711 int SingleVerSerializeManager::SubscribeDeSerialization(Parcel &parcel, Message *inMsg,
712     ControlRequestPacket &controlPacket)
713 {
714     auto packet = new (std::nothrow) SubscribeRequest();
715     if (packet == nullptr) {
716         return -E_OUT_OF_MEMORY;
717     }
718     QuerySyncObject querySyncObj;
719     int errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
720     if (errCode != E_OK) {
721         goto ERROR;
722     }
723     packet->SetPacketHead(controlPacket.GetSendCode(), controlPacket.GetVersion(),
724         static_cast<int32_t>(controlPacket.GetcontrolCmdType()), controlPacket.GetFlag());
725     packet->SetQuery(querySyncObj);
726     errCode = inMsg->SetExternalObject<>(packet);
727     if (errCode != E_OK) {
728         goto ERROR;
729     }
730     return errCode;
731 ERROR:
732     delete packet;
733     packet = nullptr;
734     return errCode;
735 }
736 
RegisterCommunicatorTransformFunc()737 int SingleVerSerializeManager::RegisterCommunicatorTransformFunc()
738 {
739     TransformFunc func;
740     func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
741     func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
742         return Serialization(buffer, length, inMsg);
743     };
744     func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
745         return DeSerialization(buffer, length, inMsg);
746     };
747 
748     static std::vector<MessageId> messageIds = {
749         QUERY_SYNC_MESSAGE, DATA_SYNC_MESSAGE, CONTROL_SYNC_MESSAGE, REMOTE_EXECUTE_MESSAGE
750     };
751     int errCode = E_OK;
752     for (auto &id : messageIds) {
753         int retCode = MessageTransform::RegTransformFunction(static_cast<uint32_t>(id), func);
754         if (retCode != E_OK) {
755             LOGE("[SingleVerSerializeManager][RegisterTransformFunc] regist messageId %u failed %d",
756                 static_cast<uint32_t>(id), retCode);
757             errCode = retCode;
758         }
759     }
760     return errCode;
761 }
762 
RegisterInnerTransformFunc()763 void SingleVerSerializeManager::RegisterInnerTransformFunc()
764 {
765     TransformFunc func;
766     func.computeFunc = [](const Message *inMsg) { return ISyncPacketCalculateLen(inMsg); };
767     func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
768         return ISyncPacketSerialization(buffer, length, inMsg);
769     };
770     func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
771         return ISyncPacketDeSerialization(buffer, length, inMsg);
772     };
773     std::lock_guard<std::mutex> autoLock(handlesLock_);
774     messageHandles_.emplace(static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE), func);
775 }
776 
ISyncPacketCalculateLen(const Message * inMsg)777 uint32_t SingleVerSerializeManager::ISyncPacketCalculateLen(const Message *inMsg)
778 {
779     if (inMsg == nullptr) {
780         return 0u;
781     }
782     uint32_t len = 0u;
783     const auto packet = inMsg->GetObject<ISyncPacket>();
784     if (packet != nullptr) {
785         len = packet->CalculateLen();
786     }
787     return len;
788 }
789 
ISyncPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)790 int SingleVerSerializeManager::ISyncPacketSerialization(uint8_t *buffer, uint32_t length,
791     const Message *inMsg)
792 {
793     if (inMsg == nullptr) {
794         return -E_INVALID_ARGS;
795     }
796     int errCode = E_OK;
797     Parcel parcel(buffer, length);
798     auto packet = inMsg->GetObject<ISyncPacket>();
799     if (packet != nullptr) {
800         errCode = packet->Serialization(parcel);
801     }
802     return errCode;
803 }
804 
ISyncPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)805 int SingleVerSerializeManager::ISyncPacketDeSerialization(const uint8_t *buffer, uint32_t length,
806     Message *inMsg)
807 {
808     if (inMsg == nullptr) {
809         return -E_INVALID_ARGS;
810     }
811     ISyncPacket *packet = nullptr;
812     int errCode = BuildISyncPacket(inMsg, packet);
813     if (errCode != E_OK) {
814         return errCode;
815     }
816     Parcel parcel(const_cast<uint8_t *>(buffer), length);
817     do {
818         errCode = packet->DeSerialization(parcel);
819         if (errCode != E_OK) {
820             break;
821         }
822         errCode = inMsg->SetExternalObject(packet);
823     } while (false);
824     if (errCode != E_OK) {
825         delete packet;
826         packet = nullptr;
827     }
828     return E_OK;
829 }
830 
BuildISyncPacket(Message * inMsg,ISyncPacket * & packet)831 int SingleVerSerializeManager::BuildISyncPacket(Message *inMsg, ISyncPacket *&packet)
832 {
833     uint32_t messageId = inMsg->GetMessageId();
834     if (messageId != static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE)) {
835         return -E_INVALID_ARGS;
836     }
837     switch (inMsg->GetMessageType()) {
838         case TYPE_REQUEST:
839             packet = new(std::nothrow) RemoteExecutorRequestPacket();
840             break;
841         case TYPE_RESPONSE:
842             packet = new(std::nothrow) RemoteExecutorAckPacket();
843             break;
844         default:
845             packet = nullptr;
846             break;
847     }
848     if (packet == nullptr) {
849         return -E_OUT_OF_MEMORY;
850     }
851     return E_OK;
852 }
853 
DataPacketExtraConditionsSerialization(Parcel & parcel,const DataRequestPacket * packet)854 int SingleVerSerializeManager::DataPacketExtraConditionsSerialization(Parcel &parcel, const DataRequestPacket *packet)
855 {
856     std::map<std::string, std::string> extraConditions = packet->GetExtraConditions();
857     if (extraConditions.size() > DBConstant::MAX_CONDITION_COUNT) {
858         return -E_INVALID_ARGS;
859     }
860     parcel.WriteUInt32(static_cast<uint32_t>(extraConditions.size()));
861     for (const auto &entry : extraConditions) {
862         if (entry.first.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
863             entry.second.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
864             return -E_INVALID_ARGS;
865         }
866         parcel.WriteString(entry.first);
867         parcel.WriteString(entry.second);
868     }
869     parcel.EightByteAlign();
870     if (parcel.IsError()) {
871         return -E_PARSE_FAIL;
872     }
873     return E_OK;
874 }
875 
DataPacketExtraConditionsDeserialization(Parcel & parcel,DataRequestPacket * packet)876 int SingleVerSerializeManager::DataPacketExtraConditionsDeserialization(Parcel &parcel, DataRequestPacket *packet)
877 {
878     if (!packet->IsExtraConditionData()) {
879         return E_OK;
880     }
881     uint32_t conditionSize = 0u;
882     (void) parcel.ReadUInt32(conditionSize);
883     if (conditionSize > DBConstant::MAX_CONDITION_COUNT) {
884         return -E_INVALID_ARGS;
885     }
886     std::map<std::string, std::string> extraConditions;
887     for (uint32_t i = 0; i < conditionSize; i++) {
888         std::string conditionKey;
889         std::string conditionVal;
890         (void) parcel.ReadString(conditionKey);
891         (void) parcel.ReadString(conditionVal);
892         if (conditionKey.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
893             conditionVal.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
894             return -E_INVALID_ARGS;
895         }
896         extraConditions[conditionKey] = conditionVal;
897     }
898     parcel.EightByteAlign();
899     if (parcel.IsError()) {
900         return -E_PARSE_FAIL;
901     }
902     packet->SetExtraConditions(extraConditions);
903     return E_OK;
904 }
905 
DataPacketInnerDeSerialization(DataRequestPacket * packet,Parcel & parcel)906 int SingleVerSerializeManager::DataPacketInnerDeSerialization(DataRequestPacket *packet, Parcel &parcel)
907 {
908     int errCode = E_OK;
909     if (packet->IsCompressData()) {
910         errCode = DataPacketCompressDataDeSerialization(parcel, packet);
911         if (errCode != E_OK) {
912             return errCode;
913         }
914     }
915     errCode = DataPacketExtraConditionsDeserialization(parcel, packet);
916     if (errCode != E_OK) {
917         return errCode;
918     }
919     if (packet->GetVersion() >= SOFTWARE_VERSION_RELEASE_9_0) {
920         uint64_t schemaVersion = 0u;
921         parcel.ReadUInt64(schemaVersion);
922         int64_t systemTimeOffset = 0u;
923         parcel.ReadInt64(systemTimeOffset);
924         int64_t senderTimeOffset = 0u;
925         parcel.ReadInt64(senderTimeOffset);
926         if (parcel.IsError()) {
927             LOGE("[SingleVerSerializeManager] parse schema version or time offset failed");
928             return -E_PARSE_FAIL;
929         }
930         packet->SetSchemaVersion(schemaVersion);
931         packet->SetSystemTimeOffset(systemTimeOffset);
932         packet->SetSenderTimeOffset(senderTimeOffset);
933         if (!parcel.IsContinueRead()) {
934             return errCode;
935         }
936         SecurityOption option;
937         parcel.ReadInt(option.securityLabel);
938         parcel.ReadInt(option.securityFlag);
939         packet->SetSecurityOption(option);
940     }
941     return errCode;
942 }
943 
DataPacketInnerSerialization(const DataRequestPacket * packet,Parcel & parcel)944 int SingleVerSerializeManager::DataPacketInnerSerialization(const DataRequestPacket *packet, Parcel &parcel)
945 {
946     // flag mask add in 103
947     if (packet->GetVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
948         return E_OK;
949     }
950     if (packet->IsExtraConditionData()) {
951         int errCode = DataPacketExtraConditionsSerialization(parcel, packet);
952         if (errCode != E_OK) {
953             LOGE("[SingleVerSerializeManager] Serialize extra condition failed %d", errCode);
954             return errCode;
955         }
956     }
957     if (packet->GetVersion() >= SOFTWARE_VERSION_RELEASE_9_0) {
958         parcel.WriteUInt64(packet->GetSchemaVersion());
959         parcel.WriteInt64(packet->GetSystemTimeOffset());
960         parcel.WriteInt64(packet->GetSenderTimeOffset());
961         if (parcel.IsError()) {
962             LOGE("[SingleVerSerializeManager] Serialize schema version or time offset failed");
963             return -E_PARSE_FAIL;
964         }
965         auto option = packet->GetSecurityOption();
966         parcel.WriteInt(option.securityLabel);
967         parcel.WriteInt(option.securityFlag);
968         if (parcel.IsError()) {
969             LOGE("[SingleVerSerializeManager] Serialize security option failed");
970             return -E_PARSE_FAIL;
971         }
972     }
973     return E_OK;
974 }
975 }  // namespace DistributedDB