1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "frame_retainer.h"
17 #include "db_common.h"
18 #include "log_print.h"
19 #include "serial_buffer.h"
20
21 namespace DistributedDB {
22 namespace {
23 const uint32_t MAX_RETAIN_CAPACITY = 67108864; // 64 M bytes
24 const uint32_t MAX_RETAIN_TIME = 10; // 10 s
25 const uint32_t MAX_RETAIN_FRAME_SIZE = 33554432; // 32 M bytes
26 const uint32_t MAX_RETAIN_FRAME_PER_LABEL_PER_TARGET = 5; // Allow 5 frame per communicator per source target
27 const int RETAIN_SURVAIL_PERIOD_IN_MILLISECOND = 1000; // Period is 1 s
LogRetainInfo(const std::string & logPrefix,const LabelType & label,const std::string & target,uint64_t order,const RetainWork & work)28 inline void LogRetainInfo(const std::string &logPrefix, const LabelType &label, const std::string &target,
29 uint64_t order, const RetainWork &work)
30 {
31 LOGI("%s : Label=%.3s, target=%s{private}, retainOrder=%" PRIu64 ", frameId=%" PRIu32 ", remainTime=%" PRIu32
32 ", frameSize=%" PRIu32 ".", logPrefix.c_str(), VEC_TO_STR(label), target.c_str(), ULL(order),
33 work.frameId, work.remainTime, work.buffer->GetSize());
34 }
35 }
36
Initialize()37 void FrameRetainer::Initialize()
38 {
39 RuntimeContext *context = RuntimeContext::GetInstance();
40 if (context == nullptr) {
41 return; // Never gonna happen, context always be valid.
42 }
43 TimerAction action = [this](TimerId inTimerId)->int {
44 PeriodicalSurveillance();
45 return E_OK;
46 };
47 int errCode = context->SetTimer(RETAIN_SURVAIL_PERIOD_IN_MILLISECOND, action, nullptr, timerId_);
48 if (errCode != E_OK) {
49 LOGE("[Retainer][Init] Set timer fail, errCode=%d.", errCode);
50 return;
51 }
52 isTimerWork_ = true;
53 }
54
Finalize()55 void FrameRetainer::Finalize()
56 {
57 RuntimeContext *context = RuntimeContext::GetInstance();
58 if (context == nullptr) {
59 return; // Never gonna happen, context always be valid.
60 }
61 // First: Stop the timer
62 if (isTimerWork_) {
63 // After return, the timer rely no more on retainer.
64 context->RemoveTimer(timerId_, true);
65 isTimerWork_ = false;
66 }
67 // Second: Clear the retainWorkPool_
68 for (auto &eachLabel : retainWorkPool_) {
69 for (auto &eachTarget : eachLabel.second) {
70 for (auto &eachFrame : eachTarget.second) {
71 LogRetainInfo("[Retainer][Final] DISCARD", eachLabel.first, eachTarget.first, eachFrame.first,
72 eachFrame.second);
73 delete eachFrame.second.buffer;
74 eachFrame.second.buffer = nullptr;
75 }
76 }
77 }
78 retainWorkPool_.clear();
79 totalSizeByByte_ = 0;
80 totalRetainFrames_ = 0;
81 }
82
RetainFrame(const FrameInfo & inFrame)83 void FrameRetainer::RetainFrame(const FrameInfo &inFrame)
84 {
85 if (inFrame.buffer == nullptr) {
86 return; // Never gonna happen
87 }
88 RetainWork work{inFrame.buffer, inFrame.frameId, MAX_RETAIN_TIME};
89 if (work.buffer->GetSize() > MAX_RETAIN_FRAME_SIZE) {
90 LOGE("[Retainer][Retain] Frame size=%u over limit=%u.", work.buffer->GetSize(), MAX_RETAIN_FRAME_SIZE);
91 delete work.buffer;
92 work.buffer = nullptr;
93 return;
94 }
95 int errCode = work.buffer->ConvertForCrossThread();
96 if (errCode != E_OK) {
97 LOGE("[Retainer][Retain] ConvertForCrossThread fail, errCode=%d.", errCode);
98 delete work.buffer;
99 work.buffer = nullptr;
100 return;
101 }
102
103 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
104 std::map<uint64_t, RetainWork> &perLabelPerTarget = retainWorkPool_[inFrame.commLabel][inFrame.srcTarget];
105 if (perLabelPerTarget.size() >= MAX_RETAIN_FRAME_PER_LABEL_PER_TARGET) {
106 // Discard the oldest and obsolete one, update the statistics, free the buffer and remove from the map
107 auto iter = perLabelPerTarget.begin();
108 LogRetainInfo("[Retainer][Retain] DISCARD", inFrame.commLabel, inFrame.srcTarget, iter->first, iter->second);
109 totalSizeByByte_ -= iter->second.buffer->GetSize();
110 totalRetainFrames_--;
111 delete iter->second.buffer;
112 iter->second.buffer = nullptr;
113 perLabelPerTarget.erase(iter);
114 }
115 // Retain the new frame, update the statistics
116 perLabelPerTarget[incRetainOrder_++] = work;
117 totalSizeByByte_ += inFrame.buffer->GetSize();
118 totalRetainFrames_++;
119 // Discard obsolete frames until totalSize under capacity.
120 DiscardObsoleteFramesIfNeed();
121 // Display the final statistics
122 LOGI("[Retainer][Retain] Order=%" PRIu64 ". Statistics: TOTAL_BYTE=%" PRIu32 ", TOTAL_FRAME=%" PRIu32 ".",
123 ULL(incRetainOrder_ - 1), totalSizeByByte_, totalRetainFrames_);
124 }
125
FetchFramesForSpecificCommunicator(const LabelType & inCommLabel)126 std::list<FrameInfo> FrameRetainer::FetchFramesForSpecificCommunicator(const LabelType &inCommLabel)
127 {
128 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
129 std::list<FrameInfo> outFrameList;
130 if (retainWorkPool_.count(inCommLabel) == 0) {
131 return outFrameList;
132 }
133 auto &perLabel = retainWorkPool_[inCommLabel];
134 std::map<uint64_t, std::string> fetchOrder;
135 for (const auto &eachTarget : perLabel) {
136 for (const auto &eachFrame : eachTarget.second) {
137 fetchOrder[eachFrame.first] = eachTarget.first;
138 }
139 }
140 for (auto &entry : fetchOrder) {
141 RetainWork &work = perLabel[entry.second][entry.first];
142 LogRetainInfo("[Retainer][Fetch] FETCH-OUT", inCommLabel, entry.second, entry.first, work);
143 outFrameList.emplace_back(FrameInfo{work.buffer, entry.second, inCommLabel, work.frameId});
144 // Update statistics
145 totalSizeByByte_ -= work.buffer->GetSize();
146 totalRetainFrames_--;
147 }
148 retainWorkPool_.erase(inCommLabel);
149 return outFrameList;
150 }
151
DecreaseRemainTimeAndDiscard(const LabelType & label,std::pair<const std::string,std::map<uint64_t,RetainWork>> & eachTarget,std::set<uint64_t> & frameToDiscard)152 void FrameRetainer::DecreaseRemainTimeAndDiscard(const LabelType &label,
153 std::pair<const std::string, std::map<uint64_t, RetainWork>> &eachTarget, std::set<uint64_t> &frameToDiscard)
154 {
155 for (auto &eachFrame : eachTarget.second) {
156 // Decrease remainTime and discard if need. The remainTime will not be zero before decrease.
157 eachFrame.second.remainTime--;
158 if (eachFrame.second.remainTime != 0) {
159 continue;
160 }
161 LogRetainInfo("[Retainer][Surveil] DISCARD", label, eachTarget.first, eachFrame.first,
162 eachFrame.second);
163 totalSizeByByte_ -= eachFrame.second.buffer->GetSize();
164 totalRetainFrames_--;
165 // Free this retain work first
166 delete eachFrame.second.buffer;
167 eachFrame.second.buffer = nullptr;
168 // Record this frame in discard list
169 frameToDiscard.insert(eachFrame.first);
170 }
171 }
172
PeriodicalSurveillance()173 void FrameRetainer::PeriodicalSurveillance()
174 {
175 std::lock_guard<std::mutex> overallLockGuard(overallMutex_);
176 // First: Discard overtime frames.
177 for (auto &eachLabel : retainWorkPool_) {
178 for (auto &eachTarget : eachLabel.second) {
179 std::set<uint64_t> frameToDiscard;
180 DecreaseRemainTimeAndDiscard(eachLabel.first, eachTarget, frameToDiscard);
181 // Remove the retain work from frameMap.
182 for (auto &entry : frameToDiscard) {
183 eachTarget.second.erase(entry);
184 }
185 }
186 }
187 // Second: Shrink the retainWorkPool_
188 ShrinkRetainWorkPool();
189 }
190
DiscardObsoleteFramesIfNeed()191 void FrameRetainer::DiscardObsoleteFramesIfNeed()
192 {
193 if (totalSizeByByte_ <= MAX_RETAIN_CAPACITY) {
194 return;
195 }
196 std::map<uint64_t, std::pair<LabelType, std::string>> discardOrder;
197 // Sort all the frames by their retain order ascendingly
198 for (const auto &eachLabel : retainWorkPool_) {
199 for (const auto &eachTarget : eachLabel.second) {
200 for (const auto &eachFrame : eachTarget.second) {
201 discardOrder[eachFrame.first] = {eachLabel.first, eachTarget.first};
202 }
203 }
204 }
205 // Discard obsolete frames until totalSize under capacity.
206 while (totalSizeByByte_ > MAX_RETAIN_CAPACITY) {
207 if (discardOrder.empty()) { // Unlikely to happen
208 LOGE("[Retainer][Discard] Internal Error: Byte=%" PRIu32 ", Frames=%" PRIu32 ".",
209 totalSizeByByte_, totalRetainFrames_);
210 return;
211 }
212 auto iter = discardOrder.begin();
213 RetainWork &workRef = retainWorkPool_[iter->second.first][iter->second.second][iter->first];
214 LogRetainInfo("[Retainer][Discard] DISCARD", iter->second.first, iter->second.second, iter->first, workRef);
215 // Discard the oldest and obsolete one, update the statistics, free the buffer and remove from the map
216 totalSizeByByte_ -= workRef.buffer->GetSize();
217 totalRetainFrames_--;
218 delete workRef.buffer;
219 workRef.buffer = nullptr;
220 retainWorkPool_[iter->second.first][iter->second.second].erase(iter->first);
221 // Remove from the discardOrder
222 discardOrder.erase(iter);
223 }
224 // Shrink the retainWorkPool_ to remove out empty node on the map
225 ShrinkRetainWorkPool();
226 }
227
ShrinkRetainWorkPool()228 void FrameRetainer::ShrinkRetainWorkPool()
229 {
230 std::set<LabelType> emptyLabel;
231 for (auto &eachLabel : retainWorkPool_) {
232 std::set<std::string> emptyTarget;
233 for (auto &eachTarget : eachLabel.second) {
234 // Record corresponding target if its frameMap empty.
235 if (eachTarget.second.empty()) {
236 emptyTarget.insert(eachTarget.first);
237 }
238 }
239 // Remove the empty frameMap from the targetMap. Record corresponding label if its targetMap empty.
240 for (auto &entry : emptyTarget) {
241 eachLabel.second.erase(entry);
242 }
243 if (eachLabel.second.empty()) {
244 emptyLabel.insert(eachLabel.first);
245 }
246 }
247 // Remove the empty targetMap from retainWorkPool_
248 for (auto &entry : emptyLabel) {
249 retainWorkPool_.erase(entry);
250 }
251 }
252 } // namespace DistributedDB