1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "single_ver_serialize_manager.h"
17
18 #include "db_common.h"
19 #include "generic_single_ver_kv_entry.h"
20 #include "icommunicator.h"
21 #include "log_print.h"
22 #include "message_transform.h"
23 #include "parcel.h"
24 #include "remote_executor_packet.h"
25 #include "sync_types.h"
26 #include "version.h"
27
28 namespace DistributedDB {
29 std::mutex SingleVerSerializeManager::handlesLock_;
30 std::map<uint32_t, TransformFunc> SingleVerSerializeManager::messageHandles_;
Serialization(uint8_t * buffer,uint32_t length,const Message * inMsg)31 int SingleVerSerializeManager::Serialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
32 {
33 if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
34 return -E_MESSAGE_ID_ERROR;
35 }
36 SerializeFunc serializeFunc = nullptr;
37 {
38 std::lock_guard<std::mutex> autoLock(handlesLock_);
39 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
40 serializeFunc = messageHandles_.at(inMsg->GetMessageId()).serializeFunc;
41 }
42 }
43 if (serializeFunc != nullptr) {
44 return serializeFunc(buffer, length, inMsg);
45 }
46
47 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
48 return ControlSerialization(buffer, length, inMsg);
49 }
50 return DataSerialization(buffer, length, inMsg);
51 }
52
DataSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)53 int SingleVerSerializeManager::DataSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
54 {
55 switch (inMsg->GetMessageType()) {
56 case TYPE_REQUEST:
57 return DataPacketSerialization(buffer, length, inMsg);
58 case TYPE_RESPONSE:
59 case TYPE_NOTIFY:
60 return AckPacketSerialization(buffer, length, inMsg);
61 default:
62 return -E_MESSAGE_TYPE_ERROR;
63 }
64 }
65
ControlSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)66 int SingleVerSerializeManager::ControlSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
67 {
68 switch (inMsg->GetMessageType()) {
69 case TYPE_REQUEST:
70 return ControlPacketSerialization(buffer, length, inMsg);
71 case TYPE_RESPONSE:
72 return AckControlPacketSerialization(buffer, length, inMsg);
73 default:
74 return -E_MESSAGE_TYPE_ERROR;
75 }
76 }
77
DeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)78 int SingleVerSerializeManager::DeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
79 {
80 if ((buffer == nullptr) || !(IsPacketValid(inMsg))) {
81 return -E_MESSAGE_ID_ERROR;
82 }
83 DeserializeFunc deserializeFunc = nullptr;
84 {
85 std::lock_guard<std::mutex> autoLock(handlesLock_);
86 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
87 deserializeFunc = messageHandles_.at(inMsg->GetMessageId()).deserializeFunc;
88 }
89 }
90 if (deserializeFunc != nullptr) {
91 return deserializeFunc(buffer, length, inMsg);
92 }
93 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
94 return ControlDeSerialization(buffer, length, inMsg);
95 }
96 return DataDeSerialization(buffer, length, inMsg);
97 }
98
DataDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)99 int SingleVerSerializeManager::DataDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
100 {
101 switch (inMsg->GetMessageType()) {
102 case TYPE_REQUEST:
103 return DataPacketDeSerialization(buffer, length, inMsg);
104 case TYPE_RESPONSE:
105 case TYPE_NOTIFY:
106 return AckPacketDeSerialization(buffer, length, inMsg);
107 default:
108 return -E_MESSAGE_TYPE_ERROR;
109 }
110 }
111
ControlDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)112 int SingleVerSerializeManager::ControlDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
113 {
114 switch (inMsg->GetMessageType()) {
115 case TYPE_REQUEST:
116 return ControlPacketDeSerialization(buffer, length, inMsg);
117 case TYPE_RESPONSE:
118 return AckControlPacketDeSerialization(buffer, length, inMsg);
119 default:
120 return -E_MESSAGE_TYPE_ERROR;
121 }
122 }
123
CalculateLen(const Message * inMsg)124 uint32_t SingleVerSerializeManager::CalculateLen(const Message *inMsg)
125 {
126 if (!(IsPacketValid(inMsg))) {
127 return 0;
128 }
129 ComputeLengthFunc computeFunc = nullptr;
130 {
131 std::lock_guard<std::mutex> autoLock(handlesLock_);
132 if (messageHandles_.find(inMsg->GetMessageId()) != messageHandles_.end()) {
133 computeFunc = messageHandles_.at(inMsg->GetMessageId()).computeFunc;
134 }
135 }
136 if (computeFunc != nullptr) {
137 return computeFunc(inMsg);
138 }
139 if (inMsg->GetMessageId() == CONTROL_SYNC_MESSAGE) {
140 return CalculateControlLen(inMsg);
141 }
142 return CalculateDataLen(inMsg);
143 }
144
CalculateDataLen(const Message * inMsg)145 uint32_t SingleVerSerializeManager::CalculateDataLen(const Message *inMsg)
146 {
147 uint32_t len = 0;
148 int errCode;
149 switch (inMsg->GetMessageType()) {
150 case TYPE_REQUEST:
151 errCode = DataPacketCalculateLen(inMsg, len);
152 if (errCode != E_OK) {
153 LOGE("[CalculateDataLen] calculate data request packet len failed, errCode=%d", errCode);
154 return 0;
155 }
156 return len;
157 case TYPE_RESPONSE:
158 case TYPE_NOTIFY:
159 errCode = AckPacketCalculateLen(inMsg, len);
160 if (errCode != E_OK) {
161 LOGE("[CalculateDataLen] calculate data notify packet len failed errCode=%d", errCode);
162 return 0;
163 }
164 return len;
165 default:
166 return 0;
167 }
168 }
169
CalculateControlLen(const Message * inMsg)170 uint32_t SingleVerSerializeManager::CalculateControlLen(const Message *inMsg)
171 {
172 uint32_t len = 0;
173 int errCode;
174 switch (inMsg->GetMessageType()) {
175 case TYPE_REQUEST:
176 errCode = ControlPacketCalculateLen(inMsg, len);
177 if (errCode != E_OK) {
178 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
179 return 0;
180 }
181 return len;
182 case TYPE_RESPONSE:
183 case TYPE_NOTIFY:
184 errCode = AckControlPacketCalculateLen(inMsg, len);
185 if (errCode != E_OK) {
186 LOGE("[CalculateControlLen] calculate control request packet len failed, errCode=%d", errCode);
187 return 0;
188 }
189 return len;
190 default:
191 return 0;
192 }
193 }
194
RegisterTransformFunc()195 int SingleVerSerializeManager::RegisterTransformFunc()
196 {
197 RegisterInnerTransformFunc();
198 return RegisterCommunicatorTransformFunc();
199 }
200
DataPacketSyncerPartSerialization(Parcel & parcel,const DataRequestPacket * packet)201 int SingleVerSerializeManager::DataPacketSyncerPartSerialization(Parcel &parcel, const DataRequestPacket *packet)
202 {
203 parcel.WriteUInt64(packet->GetEndWaterMark());
204 parcel.WriteUInt64(packet->GetLocalWaterMark());
205 parcel.WriteUInt64(packet->GetPeerWaterMark());
206 parcel.WriteInt(packet->GetSendCode());
207 parcel.WriteInt(packet->GetMode());
208 parcel.WriteUInt32(packet->GetSessionId());
209 parcel.WriteVector<uint64_t>(packet->GetReserved());
210 if (parcel.IsError()) {
211 return -E_PARSE_FAIL;
212 }
213 if (packet->GetVersion() > SOFTWARE_VERSION_RELEASE_2_0) {
214 parcel.WriteUInt32(packet->GetFlag());
215 }
216 parcel.EightByteAlign();
217 if (parcel.IsError()) {
218 return -E_PARSE_FAIL;
219 }
220 return E_OK;
221 }
222
DataPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)223 int SingleVerSerializeManager::DataPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
224 {
225 auto packet = inMsg->GetObject<DataRequestPacket>();
226 if (packet == nullptr) {
227 return -E_INVALID_ARGS;
228 }
229 Parcel parcel(buffer, length);
230
231 // version
232 int errCode = parcel.WriteUInt32(packet->GetVersion());
233 if (errCode != E_OK) {
234 LOGE("[DataPacketSerialization] Serialize version failed");
235 return errCode;
236 }
237 // sendDataItems
238 errCode = GenericSingleVerKvEntry::SerializeDatas(
239 (packet->IsCompressData() ? std::vector<SendDataItem> {} : packet->GetData()), parcel, packet->GetVersion());
240 if (errCode != E_OK) {
241 LOGE("[DataPacketSerialization] Serialize Data failed");
242 return errCode;
243 }
244
245 // data sync
246 errCode = DataPacketSyncerPartSerialization(parcel, packet);
247 if (errCode != E_OK) {
248 LOGE("[DataPacketSerialization] Serialize Data failed");
249 return errCode;
250 }
251 if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
252 errCode = DataPacketQuerySyncSerialization(parcel, packet); // for query sync
253 if (errCode != E_OK) {
254 return errCode;
255 }
256 }
257 if (packet->IsCompressData()) {
258 // serialize compress data
259 errCode = GenericSingleVerKvEntry::SerializeCompressedDatas(packet->GetData(), packet->GetCompressData(),
260 parcel, packet->GetVersion(), packet->GetCompressAlgo());
261 if (errCode != E_OK) {
262 LOGE("[DataPacketSerialization] Serialize compress Data failed");
263 return errCode;
264 }
265 }
266 return DataPacketInnerSerialization(packet, parcel);
267 }
268
DataPacketQuerySyncSerialization(Parcel & parcel,const DataRequestPacket * packet)269 int SingleVerSerializeManager::DataPacketQuerySyncSerialization(Parcel &parcel, const DataRequestPacket *packet)
270 {
271 // deleted record send watermark
272 int errCode = parcel.WriteUInt64(packet->GetDeletedWaterMark());
273 if (errCode != E_OK) {
274 LOGE("[QuerySerialization] Serialize deleted record send watermark failed!");
275 return errCode;
276 }
277
278 // query identify
279 QuerySyncObject queryObj = packet->GetQuery();
280 errCode = parcel.WriteString(packet->GetQueryId());
281 if (errCode != E_OK) {
282 LOGE("[QuerySerialization] Serialize query id failed!");
283 return errCode;
284 }
285 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
286 // need to check.
287 errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
288 }
289 return errCode;
290 }
291
DataPacketCalculateLen(const Message * inMsg,uint32_t & len)292 int SingleVerSerializeManager::DataPacketCalculateLen(const Message *inMsg, uint32_t &len)
293 {
294 const DataRequestPacket *packet = inMsg->GetObject<DataRequestPacket>();
295 if (packet == nullptr) {
296 return -E_INVALID_ARGS;
297 }
298
299 len = packet->CalculateLen(inMsg->GetMessageId());
300 return E_OK;
301 }
302
AckPacketCalculateLen(const Message * inMsg,uint32_t & len)303 int SingleVerSerializeManager::AckPacketCalculateLen(const Message *inMsg, uint32_t &len)
304 {
305 const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
306 if (packet == nullptr) {
307 return -E_INVALID_ARGS;
308 }
309
310 len = packet->CalculateLen();
311 return E_OK;
312 }
313
IsPacketValid(const Message * inMsg)314 bool SingleVerSerializeManager::IsPacketValid(const Message *inMsg)
315 {
316 if (inMsg == nullptr) {
317 return false;
318 }
319
320 int msgType = inMsg->GetMessageType();
321 if (msgType != TYPE_REQUEST && msgType != TYPE_RESPONSE && msgType != TYPE_NOTIFY) {
322 LOGE("[DataSync][IsPacketValid] Message type ERROR! message type=%d", msgType);
323 return false;
324 }
325 return true;
326 }
327
AckPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)328 int SingleVerSerializeManager::AckPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
329 {
330 const DataAckPacket *packet = inMsg->GetObject<DataAckPacket>();
331 if (packet == nullptr) {
332 return -E_INVALID_ARGS;
333 }
334
335 Parcel parcel(buffer, length);
336 parcel.WriteUInt32(packet->GetVersion());
337 if (parcel.IsError()) {
338 return -E_PARSE_FAIL;
339 }
340 // now V1 compatible for softWareVersion :{101, 102}
341 return AckPacketSyncerPartSerializationV1(parcel, packet);
342 }
343
AckPacketSyncerPartSerializationV1(Parcel & parcel,const DataAckPacket * packet)344 int SingleVerSerializeManager::AckPacketSyncerPartSerializationV1(Parcel &parcel, const DataAckPacket *packet)
345 {
346 parcel.WriteUInt64(packet->GetData());
347 parcel.WriteInt(packet->GetRecvCode());
348 parcel.WriteVector<uint64_t>(packet->GetReserved());
349 parcel.EightByteAlign();
350 if (parcel.IsError()) {
351 return -E_PARSE_FAIL;
352 }
353 return E_OK;
354 }
355
DataPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)356 int SingleVerSerializeManager::DataPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
357 {
358 std::vector<SendDataItem> dataItems;
359 uint32_t version;
360 Parcel parcel(const_cast<uint8_t *>(buffer), length);
361 uint32_t packLen = parcel.ReadUInt32(version);
362 if (parcel.IsError()) {
363 return -E_PARSE_FAIL;
364 }
365
366 if (version > SOFTWARE_VERSION_CURRENT) {
367 return -E_VERSION_NOT_SUPPORT;
368 }
369
370 packLen += static_cast<uint32_t>(GenericSingleVerKvEntry::DeSerializeDatas(dataItems, parcel));
371 if (parcel.IsError()) {
372 return -E_PARSE_FAIL;
373 }
374
375 auto packet = new (std::nothrow) DataRequestPacket();
376 if (packet == nullptr) {
377 return -E_OUT_OF_MEMORY;
378 }
379
380 packet->SetVersion(version);
381 packet->SetData(dataItems);
382 int errCode = DataPacketSyncerPartDeSerialization(parcel, packet, packLen, length, version);
383 if (errCode != E_OK) {
384 goto ERROR;
385 }
386 if (inMsg->GetMessageId() == QUERY_SYNC_MESSAGE) {
387 errCode = DataPacketQuerySyncDeSerialization(parcel, packet);
388 if (errCode != E_OK) {
389 goto ERROR;
390 }
391 }
392 errCode = DataPacketInnerDeSerialization(packet, parcel);
393 if (errCode != E_OK) {
394 goto ERROR;
395 }
396 errCode = inMsg->SetExternalObject<>(packet);
397 if (errCode == E_OK) {
398 return errCode;
399 }
400
401 ERROR:
402 delete packet;
403 packet = nullptr;
404 return errCode;
405 }
406
DataPacketQuerySyncDeSerialization(Parcel & parcel,DataRequestPacket * packet)407 int SingleVerSerializeManager::DataPacketQuerySyncDeSerialization(Parcel &parcel, DataRequestPacket *packet)
408 {
409 WaterMark deletedWatermark = 0;
410 parcel.ReadUInt64(deletedWatermark);
411 std::string queryId;
412 parcel.ReadString(queryId);
413 if (parcel.IsError()) {
414 return -E_PARSE_FAIL;
415 }
416 // query identify
417 QuerySyncObject querySyncObj;
418 int errCode = E_OK;
419 // for version 105, query is always sent.
420 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
421 // need to check.
422 errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
423 }
424 if (errCode != E_OK) {
425 LOGI("[SingleVerSerializeManager] DeSerializeData object failed.");
426 return errCode;
427 }
428 packet->SetDeletedWaterMark(deletedWatermark);
429 packet->SetQueryId(queryId);
430 if ((packet->GetVersion() > SOFTWARE_VERSION_RELEASE_4_0) || packet->GetMode() != QUERY_PUSH) {
431 packet->SetQuery(querySyncObj);
432 }
433 return E_OK;
434 }
435
DataPacketCompressDataDeSerialization(Parcel & parcel,DataRequestPacket * packet)436 int SingleVerSerializeManager::DataPacketCompressDataDeSerialization(Parcel &parcel, DataRequestPacket *packet)
437 {
438 std::vector<SendDataItem> originalData;
439 int errCode = GenericSingleVerKvEntry::DeSerializeCompressedDatas(originalData, parcel);
440 if (errCode != E_OK) {
441 LOGE("[SingleVerSerializeManager] DeSerializeComptressData failed, errCode=%d", errCode);
442 return errCode;
443 }
444 packet->SetData(originalData);
445 return E_OK;
446 }
447
DataPacketSyncerPartDeSerialization(Parcel & parcel,DataRequestPacket * packet,uint32_t packLen,uint32_t length,uint32_t version)448 int SingleVerSerializeManager::DataPacketSyncerPartDeSerialization(Parcel &parcel, DataRequestPacket *packet,
449 uint32_t packLen, uint32_t length, uint32_t version)
450 {
451 WaterMark waterMark;
452 WaterMark localWaterMark;
453 WaterMark peerWaterMark;
454 int32_t sendCode;
455 int32_t mode;
456 uint32_t sessionId;
457 std::vector<uint64_t> reserved;
458
459 uint64_t totPacketLen = packLen;
460 totPacketLen += parcel.ReadUInt64(waterMark);
461 totPacketLen += parcel.ReadUInt64(localWaterMark);
462 totPacketLen += parcel.ReadUInt64(peerWaterMark);
463 totPacketLen += parcel.ReadInt(sendCode);
464 totPacketLen += parcel.ReadInt(mode);
465 totPacketLen += parcel.ReadUInt32(sessionId);
466 totPacketLen += parcel.ReadVector<uint64_t>(reserved);
467 if (version > SOFTWARE_VERSION_RELEASE_2_0) {
468 uint32_t flag = 0u;
469 totPacketLen += parcel.ReadUInt32(flag);
470 packet->SetFlag(flag);
471 }
472 if (totPacketLen > INT32_MAX) {
473 LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input totPackLen=%" PRIu64 " is over limit.",
474 totPacketLen);
475 return -E_LENGTH_ERROR;
476 }
477 parcel.EightByteAlign();
478 totPacketLen = Parcel::GetEightByteAlign(totPacketLen);
479 if (parcel.IsError()) {
480 LOGE("[DataSync][DataPacketDeSerialization] deserialize failed! input len=%" PRIu32 ", totPackLen=%" PRIu64,
481 length, totPacketLen);
482 return -E_LENGTH_ERROR;
483 }
484 packet->SetEndWaterMark(waterMark);
485 packet->SetLocalWaterMark(localWaterMark);
486 packet->SetPeerWaterMark(peerWaterMark);
487 packet->SetSendCode(sendCode);
488 packet->SetMode(mode);
489 packet->SetSessionId(sessionId);
490 packet->SetReserved(reserved);
491 return E_OK;
492 }
493
AckPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)494 int SingleVerSerializeManager::AckPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
495 {
496 DataAckPacket packet;
497 Parcel parcel(const_cast<uint8_t *>(buffer), length);
498 uint32_t version;
499
500 parcel.ReadUInt32(version);
501 if (parcel.IsError()) {
502 return -E_INVALID_ARGS;
503 }
504 if (version > SOFTWARE_VERSION_CURRENT) {
505 packet.SetVersion(version);
506 packet.SetRecvCode(-E_VERSION_NOT_SUPPORT);
507 return inMsg->SetCopiedObject<>(packet);
508 }
509 packet.SetVersion(version);
510 // now V1 compatible for softWareVersion :{101, 102}
511 int errCode = AckPacketSyncerPartDeSerializationV1(parcel, packet);
512 if (errCode != E_OK) {
513 return errCode;
514 }
515
516 return inMsg->SetCopiedObject<>(packet);
517 }
518
AckPacketSyncerPartDeSerializationV1(Parcel & parcel,DataAckPacket & packet)519 int SingleVerSerializeManager::AckPacketSyncerPartDeSerializationV1(Parcel &parcel, DataAckPacket &packet)
520 {
521 WaterMark mark;
522 int32_t errCode;
523 std::vector<uint64_t> reserved;
524
525 parcel.ReadUInt64(mark);
526 parcel.ReadInt(errCode);
527 parcel.ReadVector<uint64_t>(reserved);
528 if (parcel.IsError()) {
529 LOGE("[AckPacketSyncerPartDeSerializationV1] DeSerialization failed");
530 return -E_INVALID_ARGS;
531 }
532 packet.SetData(mark);
533 packet.SetRecvCode(errCode);
534 packet.SetReserved(reserved);
535 return E_OK;
536 }
537
ControlPacketCalculateLen(const Message * inMsg,uint32_t & len)538 int SingleVerSerializeManager::ControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
539 {
540 auto packet = inMsg->GetObject<ControlRequestPacket>();
541 if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
542 LOGE("[ControlPacketSerialization] invalid control cmd");
543 return -E_INVALID_ARGS;
544 }
545 if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
546 return SingleVerSerializeManager::SubscribeCalculateLen(inMsg, len);
547 }
548 return E_OK;
549 }
550
ControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)551 int SingleVerSerializeManager::ControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
552 {
553 auto packet = inMsg->GetObject<ControlRequestPacket>();
554 if (packet == nullptr || packet->GetcontrolCmdType() >= INVALID_CONTROL_CMD) {
555 LOGE("[ControlPacketSerialization] invalid control cmd");
556 return -E_INVALID_ARGS;
557 }
558 if (packet->GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet->GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
559 return SingleVerSerializeManager::SubscribeSerialization(buffer, length, inMsg);
560 }
561 return E_OK;
562 }
563
ControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)564 int SingleVerSerializeManager::ControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
565 {
566 Parcel parcel(const_cast<uint8_t *>(buffer), length);
567 ControlRequestPacket packet;
568 int errCode = ControlRequestDeSerialization(parcel, packet);
569 if (errCode != E_OK) {
570 return errCode;
571 }
572 if (packet.GetcontrolCmdType() == SUBSCRIBE_QUERY_CMD || packet.GetcontrolCmdType() == UNSUBSCRIBE_QUERY_CMD) {
573 errCode = SubscribeDeSerialization(parcel, inMsg, packet);
574 }
575 return errCode;
576 }
577
AckControlPacketCalculateLen(const Message * inMsg,uint32_t & len)578 int SingleVerSerializeManager::AckControlPacketCalculateLen(const Message *inMsg, uint32_t &len)
579 {
580 auto packet = inMsg->GetObject<ControlAckPacket>();
581 if (packet == nullptr) {
582 LOGE("[AckControlPacketCalculateLen] invalid control cmd");
583 return -E_INVALID_ARGS;
584 }
585 len = packet->CalculateLen();
586 return E_OK;
587 }
588
AckControlPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)589 int SingleVerSerializeManager::AckControlPacketSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
590 {
591 auto packet = inMsg->GetObject<ControlAckPacket>();
592 if (packet == nullptr) {
593 return -E_INVALID_ARGS;
594 }
595 Parcel parcel(buffer, length);
596 parcel.WriteUInt32(packet->GetVersion());
597 parcel.WriteInt(packet->GetRecvCode());
598 parcel.WriteUInt32(packet->GetcontrolCmdType());
599 parcel.WriteUInt32(packet->GetFlag());
600 if (parcel.IsError()) {
601 LOGE("[AckControlPacketSerialization] Serialization failed");
602 return -E_INVALID_ARGS;
603 }
604 parcel.EightByteAlign();
605 return E_OK;
606 }
607
AckControlPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)608 int SingleVerSerializeManager::AckControlPacketDeSerialization(const uint8_t *buffer, uint32_t length, Message *inMsg)
609 {
610 auto packet = new (std::nothrow) ControlAckPacket();
611 if (packet == nullptr) {
612 return -E_OUT_OF_MEMORY;
613 }
614 Parcel parcel(const_cast<uint8_t *>(buffer), length);
615 int32_t recvCode = 0;
616 uint32_t version = 0;
617 uint32_t controlCmdType = 0;
618 uint32_t flag = 0;
619 parcel.ReadUInt32(version);
620 parcel.ReadInt(recvCode);
621 parcel.ReadUInt32(controlCmdType);
622 parcel.ReadUInt32(flag);
623 int errCode;
624 if (parcel.IsError()) {
625 LOGE("[AckControlPacketDeSerialization] DeSerialization failed");
626 errCode = -E_INVALID_ARGS;
627 goto ERROR;
628 }
629 packet->SetPacketHead(recvCode, version, static_cast<int32_t>(controlCmdType), flag);
630 errCode = inMsg->SetExternalObject<>(packet);
631 if (errCode != E_OK) {
632 goto ERROR;
633 }
634 return errCode;
635 ERROR:
636 delete packet;
637 packet = nullptr;
638 return errCode;
639 }
640
ControlRequestSerialization(Parcel & parcel,const Message * inMsg)641 int SingleVerSerializeManager::ControlRequestSerialization(Parcel &parcel, const Message *inMsg)
642 {
643 auto packet = inMsg->GetObject<ControlRequestPacket>();
644 if (packet == nullptr) {
645 return -E_INVALID_ARGS;
646 }
647 parcel.WriteUInt32(packet->GetVersion());
648 parcel.WriteInt(packet->GetSendCode());
649 parcel.WriteUInt32(packet->GetcontrolCmdType());
650 parcel.WriteUInt32(packet->GetFlag());
651 parcel.EightByteAlign();
652 if (parcel.IsError()) {
653 LOGE("[ControlRequestSerialization] Serialization failed");
654 return -E_INVALID_ARGS;
655 }
656 return E_OK;
657 }
658
ControlRequestDeSerialization(Parcel & parcel,ControlRequestPacket & packet)659 int SingleVerSerializeManager::ControlRequestDeSerialization(Parcel &parcel, ControlRequestPacket &packet)
660 {
661 uint32_t version = 0;
662 int32_t sendCode = 0;
663 uint32_t controlCmdType = 0;
664 uint32_t flag = 0;
665 parcel.ReadUInt32(version);
666 if (version > SOFTWARE_VERSION_CURRENT) {
667 return -E_VERSION_NOT_SUPPORT;
668 }
669 parcel.ReadInt(sendCode);
670 parcel.ReadUInt32(controlCmdType);
671 parcel.ReadUInt32(flag);
672 if (parcel.IsError()) {
673 LOGE("[ControlRequestDeSerialization] deserialize failed!");
674 return -E_LENGTH_ERROR;
675 }
676 packet.SetPacketHead(sendCode, version, static_cast<int32_t>(controlCmdType), flag);
677 return E_OK;
678 }
679
SubscribeCalculateLen(const Message * inMsg,uint32_t & len)680 int SingleVerSerializeManager::SubscribeCalculateLen(const Message *inMsg, uint32_t &len)
681 {
682 auto packet = inMsg->GetObject<SubscribeRequest>();
683 if (packet == nullptr) {
684 return -E_INVALID_ARGS;
685 }
686 len = packet->CalculateLen();
687 return E_OK;
688 }
689
SubscribeSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)690 int SingleVerSerializeManager::SubscribeSerialization(uint8_t *buffer, uint32_t length, const Message *inMsg)
691 {
692 auto packet = inMsg->GetObject<SubscribeRequest>();
693 if (packet == nullptr) {
694 return -E_INVALID_ARGS;
695 }
696 Parcel parcel(buffer, length);
697 int errCode = ControlRequestSerialization(parcel, inMsg);
698 if (errCode != E_OK) {
699 LOGE("[SubscribeSerialization] ControlRequestPacket Serialization failed, errCode=%d", errCode);
700 return errCode;
701 }
702 QuerySyncObject queryObj = packet->GetQuery();
703 errCode = queryObj.SerializeData(parcel, SOFTWARE_VERSION_CURRENT);
704 if (errCode != E_OK) {
705 LOGE("[SubscribeSerialization] query object Serialization failed, errCode=%d", errCode);
706 return errCode;
707 }
708 return E_OK;
709 }
710
SubscribeDeSerialization(Parcel & parcel,Message * inMsg,ControlRequestPacket & controlPacket)711 int SingleVerSerializeManager::SubscribeDeSerialization(Parcel &parcel, Message *inMsg,
712 ControlRequestPacket &controlPacket)
713 {
714 auto packet = new (std::nothrow) SubscribeRequest();
715 if (packet == nullptr) {
716 return -E_OUT_OF_MEMORY;
717 }
718 QuerySyncObject querySyncObj;
719 int errCode = QuerySyncObject::DeSerializeData(parcel, querySyncObj);
720 if (errCode != E_OK) {
721 goto ERROR;
722 }
723 packet->SetPacketHead(controlPacket.GetSendCode(), controlPacket.GetVersion(),
724 static_cast<int32_t>(controlPacket.GetcontrolCmdType()), controlPacket.GetFlag());
725 packet->SetQuery(querySyncObj);
726 errCode = inMsg->SetExternalObject<>(packet);
727 if (errCode != E_OK) {
728 goto ERROR;
729 }
730 return errCode;
731 ERROR:
732 delete packet;
733 packet = nullptr;
734 return errCode;
735 }
736
RegisterCommunicatorTransformFunc()737 int SingleVerSerializeManager::RegisterCommunicatorTransformFunc()
738 {
739 TransformFunc func;
740 func.computeFunc = [](const Message *inMsg) { return CalculateLen(inMsg); };
741 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
742 return Serialization(buffer, length, inMsg);
743 };
744 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
745 return DeSerialization(buffer, length, inMsg);
746 };
747
748 static std::vector<MessageId> messageIds = {
749 QUERY_SYNC_MESSAGE, DATA_SYNC_MESSAGE, CONTROL_SYNC_MESSAGE, REMOTE_EXECUTE_MESSAGE
750 };
751 int errCode = E_OK;
752 for (auto &id : messageIds) {
753 int retCode = MessageTransform::RegTransformFunction(static_cast<uint32_t>(id), func);
754 if (retCode != E_OK) {
755 LOGE("[SingleVerSerializeManager][RegisterTransformFunc] regist messageId %u failed %d",
756 static_cast<uint32_t>(id), retCode);
757 errCode = retCode;
758 }
759 }
760 return errCode;
761 }
762
RegisterInnerTransformFunc()763 void SingleVerSerializeManager::RegisterInnerTransformFunc()
764 {
765 TransformFunc func;
766 func.computeFunc = [](const Message *inMsg) { return ISyncPacketCalculateLen(inMsg); };
767 func.serializeFunc = [](uint8_t *buffer, uint32_t length, const Message *inMsg) {
768 return ISyncPacketSerialization(buffer, length, inMsg);
769 };
770 func.deserializeFunc = [](const uint8_t *buffer, uint32_t length, Message *inMsg) {
771 return ISyncPacketDeSerialization(buffer, length, inMsg);
772 };
773 std::lock_guard<std::mutex> autoLock(handlesLock_);
774 messageHandles_.emplace(static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE), func);
775 }
776
ISyncPacketCalculateLen(const Message * inMsg)777 uint32_t SingleVerSerializeManager::ISyncPacketCalculateLen(const Message *inMsg)
778 {
779 if (inMsg == nullptr) {
780 return 0u;
781 }
782 uint32_t len = 0u;
783 const auto packet = inMsg->GetObject<ISyncPacket>();
784 if (packet != nullptr) {
785 len = packet->CalculateLen();
786 }
787 return len;
788 }
789
ISyncPacketSerialization(uint8_t * buffer,uint32_t length,const Message * inMsg)790 int SingleVerSerializeManager::ISyncPacketSerialization(uint8_t *buffer, uint32_t length,
791 const Message *inMsg)
792 {
793 if (inMsg == nullptr) {
794 return -E_INVALID_ARGS;
795 }
796 int errCode = E_OK;
797 Parcel parcel(buffer, length);
798 auto packet = inMsg->GetObject<ISyncPacket>();
799 if (packet != nullptr) {
800 errCode = packet->Serialization(parcel);
801 }
802 return errCode;
803 }
804
ISyncPacketDeSerialization(const uint8_t * buffer,uint32_t length,Message * inMsg)805 int SingleVerSerializeManager::ISyncPacketDeSerialization(const uint8_t *buffer, uint32_t length,
806 Message *inMsg)
807 {
808 if (inMsg == nullptr) {
809 return -E_INVALID_ARGS;
810 }
811 ISyncPacket *packet = nullptr;
812 int errCode = BuildISyncPacket(inMsg, packet);
813 if (errCode != E_OK) {
814 return errCode;
815 }
816 Parcel parcel(const_cast<uint8_t *>(buffer), length);
817 do {
818 errCode = packet->DeSerialization(parcel);
819 if (errCode != E_OK) {
820 break;
821 }
822 errCode = inMsg->SetExternalObject(packet);
823 } while (false);
824 if (errCode != E_OK) {
825 delete packet;
826 packet = nullptr;
827 }
828 return E_OK;
829 }
830
BuildISyncPacket(Message * inMsg,ISyncPacket * & packet)831 int SingleVerSerializeManager::BuildISyncPacket(Message *inMsg, ISyncPacket *&packet)
832 {
833 uint32_t messageId = inMsg->GetMessageId();
834 if (messageId != static_cast<uint32_t>(REMOTE_EXECUTE_MESSAGE)) {
835 return -E_INVALID_ARGS;
836 }
837 switch (inMsg->GetMessageType()) {
838 case TYPE_REQUEST:
839 packet = new(std::nothrow) RemoteExecutorRequestPacket();
840 break;
841 case TYPE_RESPONSE:
842 packet = new(std::nothrow) RemoteExecutorAckPacket();
843 break;
844 default:
845 packet = nullptr;
846 break;
847 }
848 if (packet == nullptr) {
849 return -E_OUT_OF_MEMORY;
850 }
851 return E_OK;
852 }
853
DataPacketExtraConditionsSerialization(Parcel & parcel,const DataRequestPacket * packet)854 int SingleVerSerializeManager::DataPacketExtraConditionsSerialization(Parcel &parcel, const DataRequestPacket *packet)
855 {
856 std::map<std::string, std::string> extraConditions = packet->GetExtraConditions();
857 if (extraConditions.size() > DBConstant::MAX_CONDITION_COUNT) {
858 return -E_INVALID_ARGS;
859 }
860 parcel.WriteUInt32(static_cast<uint32_t>(extraConditions.size()));
861 for (const auto &entry : extraConditions) {
862 if (entry.first.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
863 entry.second.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
864 return -E_INVALID_ARGS;
865 }
866 parcel.WriteString(entry.first);
867 parcel.WriteString(entry.second);
868 }
869 parcel.EightByteAlign();
870 if (parcel.IsError()) {
871 return -E_PARSE_FAIL;
872 }
873 return E_OK;
874 }
875
DataPacketExtraConditionsDeserialization(Parcel & parcel,DataRequestPacket * packet)876 int SingleVerSerializeManager::DataPacketExtraConditionsDeserialization(Parcel &parcel, DataRequestPacket *packet)
877 {
878 if (!packet->IsExtraConditionData()) {
879 return E_OK;
880 }
881 uint32_t conditionSize = 0u;
882 (void) parcel.ReadUInt32(conditionSize);
883 if (conditionSize > DBConstant::MAX_CONDITION_COUNT) {
884 return -E_INVALID_ARGS;
885 }
886 std::map<std::string, std::string> extraConditions;
887 for (uint32_t i = 0; i < conditionSize; i++) {
888 std::string conditionKey;
889 std::string conditionVal;
890 (void) parcel.ReadString(conditionKey);
891 (void) parcel.ReadString(conditionVal);
892 if (conditionKey.length() > DBConstant::MAX_CONDITION_KEY_LEN ||
893 conditionVal.length() > DBConstant::MAX_CONDITION_VALUE_LEN) {
894 return -E_INVALID_ARGS;
895 }
896 extraConditions[conditionKey] = conditionVal;
897 }
898 parcel.EightByteAlign();
899 if (parcel.IsError()) {
900 return -E_PARSE_FAIL;
901 }
902 packet->SetExtraConditions(extraConditions);
903 return E_OK;
904 }
905
DataPacketInnerDeSerialization(DataRequestPacket * packet,Parcel & parcel)906 int SingleVerSerializeManager::DataPacketInnerDeSerialization(DataRequestPacket *packet, Parcel &parcel)
907 {
908 int errCode = E_OK;
909 if (packet->IsCompressData()) {
910 errCode = DataPacketCompressDataDeSerialization(parcel, packet);
911 if (errCode != E_OK) {
912 return errCode;
913 }
914 }
915 errCode = DataPacketExtraConditionsDeserialization(parcel, packet);
916 if (errCode != E_OK) {
917 return errCode;
918 }
919 if (packet->GetVersion() >= SOFTWARE_VERSION_RELEASE_9_0) {
920 uint64_t schemaVersion = 0u;
921 parcel.ReadUInt64(schemaVersion);
922 int64_t systemTimeOffset = 0u;
923 parcel.ReadInt64(systemTimeOffset);
924 int64_t senderTimeOffset = 0u;
925 parcel.ReadInt64(senderTimeOffset);
926 if (parcel.IsError()) {
927 LOGE("[SingleVerSerializeManager] parse schema version or time offset failed");
928 return -E_PARSE_FAIL;
929 }
930 packet->SetSchemaVersion(schemaVersion);
931 packet->SetSystemTimeOffset(systemTimeOffset);
932 packet->SetSenderTimeOffset(senderTimeOffset);
933 if (!parcel.IsContinueRead()) {
934 return errCode;
935 }
936 SecurityOption option;
937 parcel.ReadInt(option.securityLabel);
938 parcel.ReadInt(option.securityFlag);
939 packet->SetSecurityOption(option);
940 }
941 return errCode;
942 }
943
DataPacketInnerSerialization(const DataRequestPacket * packet,Parcel & parcel)944 int SingleVerSerializeManager::DataPacketInnerSerialization(const DataRequestPacket *packet, Parcel &parcel)
945 {
946 // flag mask add in 103
947 if (packet->GetVersion() < SOFTWARE_VERSION_RELEASE_3_0) {
948 return E_OK;
949 }
950 if (packet->IsExtraConditionData()) {
951 int errCode = DataPacketExtraConditionsSerialization(parcel, packet);
952 if (errCode != E_OK) {
953 LOGE("[SingleVerSerializeManager] Serialize extra condition failed %d", errCode);
954 return errCode;
955 }
956 }
957 if (packet->GetVersion() >= SOFTWARE_VERSION_RELEASE_9_0) {
958 parcel.WriteUInt64(packet->GetSchemaVersion());
959 parcel.WriteInt64(packet->GetSystemTimeOffset());
960 parcel.WriteInt64(packet->GetSenderTimeOffset());
961 if (parcel.IsError()) {
962 LOGE("[SingleVerSerializeManager] Serialize schema version or time offset failed");
963 return -E_PARSE_FAIL;
964 }
965 auto option = packet->GetSecurityOption();
966 parcel.WriteInt(option.securityLabel);
967 parcel.WriteInt(option.securityFlag);
968 if (parcel.IsError()) {
969 LOGE("[SingleVerSerializeManager] Serialize security option failed");
970 return -E_PARSE_FAIL;
971 }
972 }
973 return E_OK;
974 }
975 } // namespace DistributedDB