1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "frame_retainer.h"
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "serial_buffer.h"
20 
21 namespace DistributedDB {
22 namespace {
23 const uint32_t MAX_RETAIN_CAPACITY = 67108864; // 64 M bytes
24 const uint32_t MAX_RETAIN_TIME = 10; // 10 s
25 const uint32_t MAX_RETAIN_FRAME_SIZE = 33554432; // 32 M bytes
26 const uint32_t MAX_RETAIN_FRAME_PER_LABEL_PER_TARGET = 5; // Allow 5 frame per communicator per source target
27 const int RETAIN_SURVAIL_PERIOD_IN_MILLISECOND = 1000; // Period is 1 s
LogRetainInfo(const std::string & logPrefix,const LabelType & label,const std::string & target,uint64_t order,const RetainWork & work)28 inline void LogRetainInfo(const std::string &logPrefix, const LabelType &label, const std::string &target,
29     uint64_t order, const RetainWork &work)
30 {
31     LOGI("%s : Label=%.3s, target=%s{private}, retainOrder=%" PRIu64 ", frameId=%" PRIu32 ", remainTime=%" PRIu32
32         ", frameSize=%" PRIu32 ".", logPrefix.c_str(), VEC_TO_STR(label), target.c_str(), ULL(order),
33         work.frameId, work.remainTime, work.buffer->GetSize());
34 }
35 }
36 
Initialize()37 void FrameRetainer::Initialize()
38 {
39     RuntimeContext *context = RuntimeContext::GetInstance();
40     if (context == nullptr) {
41         return; // Never gonna happen, context always be valid.
42     }
43     TimerAction action = [this](TimerId inTimerId)->int {
44         PeriodicalSurveillance();
45         return E_OK;
46     };
47     int errCode = context->SetTimer(RETAIN_SURVAIL_PERIOD_IN_MILLISECOND, action, nullptr, timerId_);
48     if (errCode != E_OK) {
49         LOGE("[Retainer][Init] Set timer fail, errCode=%d.", errCode);
50         return;
51     }
52     isTimerWork_ = true;
53 }
54 
Finalize()55 void FrameRetainer::Finalize()
56 {
57     RuntimeContext *context = RuntimeContext::GetInstance();
58     if (context == nullptr) {
59         return; // Never gonna happen, context always be valid.
60     }
61     // First: Stop the timer
62     if (isTimerWork_) {
63         // After return, the timer rely no more on retainer.
64         context->RemoveTimer(timerId_, true);
65         isTimerWork_ = false;
66     }
67     // Second: Clear the retainWorkPool_
68     for (auto &eachLabel : retainWorkPool_) {
69         for (auto &eachTarget : eachLabel.second) {
70             for (auto &eachFrame : eachTarget.second) {
71                 LogRetainInfo("[Retainer][Final] DISCARD", eachLabel.first, eachTarget.first, eachFrame.first,
72                     eachFrame.second);
73                 delete eachFrame.second.buffer;
74                 eachFrame.second.buffer = nullptr;
75             }
76         }
77     }
78     retainWorkPool_.clear();
79     totalSizeByByte_ = 0;
80     totalRetainFrames_ = 0;
81 }
82 
RetainFrame(const FrameInfo & inFrame)83 void FrameRetainer::RetainFrame(const FrameInfo &inFrame)
84 {
85     if (inFrame.buffer == nullptr) {
86         return; // Never gonna happen
87     }
88     RetainWork work{inFrame.buffer, inFrame.frameId, MAX_RETAIN_TIME};
89     if (work.buffer->GetSize() > MAX_RETAIN_FRAME_SIZE) {
90         LOGE("[Retainer][Retain] Frame size=%u over limit=%u.", work.buffer->GetSize(), MAX_RETAIN_FRAME_SIZE);
91         delete work.buffer;
92         work.buffer = nullptr;
93         return;
94     }
95     int errCode = work.buffer->ConvertForCrossThread();
96     if (errCode != E_OK) {
97         LOGE("[Retainer][Retain] ConvertForCrossThread fail, errCode=%d.", errCode);
98         delete work.buffer;
99         work.buffer = nullptr;
100         return;
101     }
102 
103     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
104     std::map<uint64_t, RetainWork> &perLabelPerTarget = retainWorkPool_[inFrame.commLabel][inFrame.srcTarget];
105     if (perLabelPerTarget.size() >= MAX_RETAIN_FRAME_PER_LABEL_PER_TARGET) {
106         // Discard the oldest and obsolete one, update the statistics, free the buffer and remove from the map
107         auto iter = perLabelPerTarget.begin();
108         LogRetainInfo("[Retainer][Retain] DISCARD", inFrame.commLabel, inFrame.srcTarget, iter->first, iter->second);
109         totalSizeByByte_ -= iter->second.buffer->GetSize();
110         totalRetainFrames_--;
111         delete iter->second.buffer;
112         iter->second.buffer = nullptr;
113         perLabelPerTarget.erase(iter);
114     }
115     // Retain the new frame, update the statistics
116     perLabelPerTarget[incRetainOrder_++] = work;
117     totalSizeByByte_ += inFrame.buffer->GetSize();
118     totalRetainFrames_++;
119     // Discard obsolete frames until totalSize under capacity.
120     DiscardObsoleteFramesIfNeed();
121     // Display the final statistics
122     LOGI("[Retainer][Retain] Order=%" PRIu64 ". Statistics: TOTAL_BYTE=%" PRIu32 ", TOTAL_FRAME=%" PRIu32 ".",
123         ULL(incRetainOrder_ - 1), totalSizeByByte_, totalRetainFrames_);
124 }
125 
FetchFramesForSpecificCommunicator(const LabelType & inCommLabel)126 std::list<FrameInfo> FrameRetainer::FetchFramesForSpecificCommunicator(const LabelType &inCommLabel)
127 {
128     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
129     std::list<FrameInfo> outFrameList;
130     if (retainWorkPool_.count(inCommLabel) == 0) {
131         return outFrameList;
132     }
133     auto &perLabel = retainWorkPool_[inCommLabel];
134     std::map<uint64_t, std::string> fetchOrder;
135     for (const auto &eachTarget : perLabel) {
136         for (const auto &eachFrame : eachTarget.second) {
137             fetchOrder[eachFrame.first] = eachTarget.first;
138         }
139     }
140     for (auto &entry : fetchOrder) {
141         RetainWork &work = perLabel[entry.second][entry.first];
142         LogRetainInfo("[Retainer][Fetch] FETCH-OUT", inCommLabel, entry.second, entry.first, work);
143         outFrameList.emplace_back(FrameInfo{work.buffer, entry.second, inCommLabel, work.frameId});
144         // Update statistics
145         totalSizeByByte_ -= work.buffer->GetSize();
146         totalRetainFrames_--;
147     }
148     retainWorkPool_.erase(inCommLabel);
149     return outFrameList;
150 }
151 
DecreaseRemainTimeAndDiscard(const LabelType & label,std::pair<const std::string,std::map<uint64_t,RetainWork>> & eachTarget,std::set<uint64_t> & frameToDiscard)152 void FrameRetainer::DecreaseRemainTimeAndDiscard(const LabelType &label,
153     std::pair<const std::string, std::map<uint64_t, RetainWork>> &eachTarget, std::set<uint64_t> &frameToDiscard)
154 {
155     for (auto &eachFrame : eachTarget.second) {
156         // Decrease remainTime and discard if need. The remainTime will not be zero before decrease.
157         eachFrame.second.remainTime--;
158         if (eachFrame.second.remainTime != 0) {
159             continue;
160         }
161         LogRetainInfo("[Retainer][Surveil] DISCARD", label, eachTarget.first, eachFrame.first,
162             eachFrame.second);
163         totalSizeByByte_ -= eachFrame.second.buffer->GetSize();
164         totalRetainFrames_--;
165         // Free this retain work first
166         delete eachFrame.second.buffer;
167         eachFrame.second.buffer = nullptr;
168         // Record this frame in discard list
169         frameToDiscard.insert(eachFrame.first);
170     }
171 }
172 
PeriodicalSurveillance()173 void FrameRetainer::PeriodicalSurveillance()
174 {
175     std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
176     // First: Discard overtime frames.
177     for (auto &eachLabel : retainWorkPool_) {
178         for (auto &eachTarget : eachLabel.second) {
179             std::set<uint64_t> frameToDiscard;
180             DecreaseRemainTimeAndDiscard(eachLabel.first, eachTarget, frameToDiscard);
181             // Remove the retain work from frameMap.
182             for (auto &entry : frameToDiscard) {
183                 eachTarget.second.erase(entry);
184             }
185         }
186     }
187     // Second: Shrink the retainWorkPool_
188     ShrinkRetainWorkPool();
189 }
190 
DiscardObsoleteFramesIfNeed()191 void FrameRetainer::DiscardObsoleteFramesIfNeed()
192 {
193     if (totalSizeByByte_ <= MAX_RETAIN_CAPACITY) {
194         return;
195     }
196     std::map<uint64_t, std::pair<LabelType, std::string>> discardOrder;
197     // Sort all the frames by their retain order ascendingly
198     for (const auto &eachLabel : retainWorkPool_) {
199         for (const auto &eachTarget : eachLabel.second) {
200             for (const auto &eachFrame : eachTarget.second) {
201                 discardOrder[eachFrame.first] = {eachLabel.first, eachTarget.first};
202             }
203         }
204     }
205     // Discard obsolete frames until totalSize under capacity.
206     while (totalSizeByByte_ > MAX_RETAIN_CAPACITY) {
207         if (discardOrder.empty()) { // Unlikely to happen
208             LOGE("[Retainer][Discard] Internal Error: Byte=%" PRIu32 ", Frames=%" PRIu32 ".",
209                 totalSizeByByte_, totalRetainFrames_);
210             return;
211         }
212         auto iter = discardOrder.begin();
213         RetainWork &workRef = retainWorkPool_[iter->second.first][iter->second.second][iter->first];
214         LogRetainInfo("[Retainer][Discard] DISCARD", iter->second.first, iter->second.second, iter->first, workRef);
215         // Discard the oldest and obsolete one, update the statistics, free the buffer and remove from the map
216         totalSizeByByte_ -= workRef.buffer->GetSize();
217         totalRetainFrames_--;
218         delete workRef.buffer;
219         workRef.buffer = nullptr;
220         retainWorkPool_[iter->second.first][iter->second.second].erase(iter->first);
221         // Remove from the discardOrder
222         discardOrder.erase(iter);
223     }
224     // Shrink the retainWorkPool_ to remove out empty node on the map
225     ShrinkRetainWorkPool();
226 }
227 
ShrinkRetainWorkPool()228 void FrameRetainer::ShrinkRetainWorkPool()
229 {
230     std::set<LabelType> emptyLabel;
231     for (auto &eachLabel : retainWorkPool_) {
232         std::set<std::string> emptyTarget;
233         for (auto &eachTarget : eachLabel.second) {
234             // Record corresponding target if its frameMap empty.
235             if (eachTarget.second.empty()) {
236                 emptyTarget.insert(eachTarget.first);
237             }
238         }
239         // Remove the empty frameMap from the targetMap. Record corresponding label if its targetMap empty.
240         for (auto &entry : emptyTarget) {
241             eachLabel.second.erase(entry);
242         }
243         if (eachLabel.second.empty()) {
244             emptyLabel.insert(eachLabel.first);
245         }
246     }
247     // Remove the empty targetMap from retainWorkPool_
248     for (auto &entry : emptyLabel) {
249         retainWorkPool_.erase(entry);
250     }
251 }
252 } // namespace DistributedDB