1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef FRAME_COMBINER_H
17 #define FRAME_COMBINER_H
18 
19 #include <cstdint>
20 #include <map>
21 #include <mutex>
22 #include "combine_status.h"
23 #include "macro_utils.h"
24 #include "parse_result.h"
25 #include "runtime_context.h"
26 #include "semaphore_utils.h"
27 
28 namespace DistributedDB {
29 class SerialBuffer; // Forward Declarations
30 
31 struct CombineWork {
32     SerialBuffer *buffer = nullptr;
33     CombineStatus status;
34     ParseResult frameInfo;
35 };
36 
37 class FrameCombiner {
38 public:
39     FrameCombiner() = default; // Default constructor must be explicitly provided due to DISABLE_COPY_ASSIGN_MOVE
40     ~FrameCombiner() = default; // Since constructor must be provided, codedex demand deconstructor be provided as well
41     DISABLE_COPY_ASSIGN_MOVE(FrameCombiner);
42 
43     // Start the timer to supervise the progress
44     void Initialize();
45 
46     // Clear the CombineWorkPool and stop the timer
47     void Finalize();
48 
49     // outErrorNo is set E_OK if nothing error happened.
50     // Return nullptr if error happened or no combination is done.
51     // Return a valid buffer as well as a valid outFrameResult if combination done.
52     // The caller is responsible for release the buffer.
53     SerialBuffer *AssembleFrameFragment(const uint8_t *bytes, uint32_t length, const ParseResult &inPacketInfo,
54         ParseResult &outFrameInfo, int &outErrorNo);
55 
56 private:
57     // This methed called from timer, it has overallMutex_ protect itself inside the method
58     void PeriodicalSurveillance();
59 
60     // Following method should be called under protection of overallMutex_ outside the method
61     int ContinueExistCombineWork(const uint8_t *bytes, uint32_t length, const ParseResult &inPacketInfo);
62     int CreateNewCombineWork(const uint8_t *bytes, uint32_t length, const ParseResult &inPacketInfo);
63     void AbortCombineWorkBySource(uint64_t inSourceId);
64 
65     bool CheckPacketWithOriWork(const ParseResult &inPacketInfo, const CombineWork &inWork);
66     SerialBuffer *CreateNewFrameBuffer(const ParseResult &inInfo);
67 
68     mutable std::mutex overallMutex_;
69 
70     TimerId timerId_ = 0; // 0 is invalid timerId
71     bool isTimerWork_ = false;
72     SemaphoreUtils timerRemovedIndicator_ {0};
73     uint64_t incProgressId_ = 0;
74     uint64_t totalSizeByByte_ = 0;
75     std::map<uint64_t, std::map<uint32_t, CombineWork>> combineWorkPool_;
76 };
77 } // namespace DistributedDB
78 
79 #endif // FRAME_COMBINER_H
80