1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_vacuum.h"
18
19 #include <string>
20 #include <vector>
21 #include <cstdint>
22
23 #include "db_errno.h"
24 #include "db_common.h"
25 #include "log_print.h"
26 #include "macro_utils.h"
27 #include "runtime_context.h"
28
29 namespace DistributedDB {
30 std::atomic<bool> MultiVerVacuum::enabled_{true};
31
Enable(bool isEnable)32 void MultiVerVacuum::Enable(bool isEnable)
33 {
34 enabled_ = isEnable;
35 }
36
Launch(const std::string & dbIdentifier,MultiVerVacuumExecutor * dbHandle)37 int MultiVerVacuum::Launch(const std::string &dbIdentifier, MultiVerVacuumExecutor *dbHandle)
38 {
39 if (!enabled_) {
40 LOGW("[Vacuum][Launch] Functionality Not Enabled!");
41 return E_OK;
42 }
43 if (dbIdentifier.empty() || dbHandle == nullptr) {
44 return -E_INVALID_ARGS;
45 }
46
47 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
48 if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
49 dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
50 dbMapVacuumTask_[dbIdentifier].databaseHandle = dbHandle;
51 } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::ABORT_DONE ||
52 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH) {
53 // Reset vacuum task
54 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::RUN_WAIT;
55 dbMapVacuumTask_[dbIdentifier].launchErrorHappen = false;
56 dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = false;
57 dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = true;
58 dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
59 dbMapVacuumTask_[dbIdentifier].pauseNeedCount = 0;
60 dbMapVacuumTask_[dbIdentifier].databaseHandle = dbHandle;
61 } else {
62 dbMapVacuumTask_[dbIdentifier].launchErrorHappen = true;
63 LOGE("[Vacuum][Launch] Unexpected pre-status=%d!", static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
64 return -E_NOT_PERMIT;
65 }
66 ActivateBackgroundVacuumTaskExecution();
67 return E_OK;
68 }
69
Pause(const std::string & dbIdentifier)70 int MultiVerVacuum::Pause(const std::string &dbIdentifier)
71 {
72 if (!enabled_) {
73 return E_OK;
74 }
75 if (dbIdentifier.empty()) {
76 return -E_INVALID_ARGS;
77 }
78
79 std::unique_lock<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
80 if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
81 return -E_NOT_FOUND;
82 } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::RUN_WAIT ||
83 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_DONE) {
84 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::PAUSE_DONE;
85 dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
86 IncPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
87 } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::RUN_NING ||
88 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_WAIT) {
89 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::PAUSE_WAIT;
90 dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
91 IncPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
92 vacuumTaskCv_.wait(vacuumTaskLockGuard, [this, &dbIdentifier] {
93 // In concurrency scenario that executor is about to finish this task, the final status may be FINISH.
94 // Even more, in case Abort be called immediately after task finished, the final status may be ABORT_DONE.
95 return dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_DONE ||
96 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::ABORT_DONE ||
97 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH;
98 });
99 } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH) {
100 dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
101 IncPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
102 } else {
103 LOGE("[Vacuum][Pause] Unexpected pre-status=%d!", static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
104 return -E_NOT_PERMIT;
105 }
106 return E_OK;
107 }
108
Continue(const std::string & dbIdentifier,bool autoRelaunchOnce)109 int MultiVerVacuum::Continue(const std::string &dbIdentifier, bool autoRelaunchOnce)
110 {
111 if (!enabled_) {
112 return E_OK;
113 }
114 if (dbIdentifier.empty()) {
115 return -E_INVALID_ARGS;
116 }
117
118 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
119 if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
120 return -E_NOT_FOUND;
121 } else if (dbMapVacuumTask_[dbIdentifier].launchErrorHappen) {
122 LOGE("[Vacuum][Continue] LaunchErrorHappen detected, pre-status=%d!",
123 static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
124 return -E_NOT_PERMIT;
125 } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_DONE) {
126 DecPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
127 bool relaunchFlag = (dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce || autoRelaunchOnce);
128 dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = relaunchFlag;
129 // Truly continue this task only when all pause had been counteracted
130 if (IsPauseNotNeed(dbMapVacuumTask_[dbIdentifier])) {
131 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::RUN_WAIT;
132 dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
133 dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = true;
134 ActivateBackgroundVacuumTaskExecution();
135 }
136 } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH) {
137 // Update relaunch flag first
138 DecPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
139 bool relaunchFlag = (dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce || autoRelaunchOnce);
140 dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = relaunchFlag;
141 // All pause had been counteracted, so this task is immediatelyRelaunchable, but not necessarily relaunch now.
142 if (IsPauseNotNeed(dbMapVacuumTask_[dbIdentifier])) {
143 dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = true;
144 // Do autoRelaunch if need
145 if (dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce) {
146 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::RUN_WAIT;
147 dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
148 dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = false;
149 ActivateBackgroundVacuumTaskExecution();
150 }
151 }
152 } else {
153 LOGE("[Vacuum][Continue] Unexpected pre-status=%d!", static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
154 return -E_NOT_PERMIT;
155 }
156 return E_OK;
157 }
158
Abort(const std::string & dbIdentifier)159 int MultiVerVacuum::Abort(const std::string &dbIdentifier)
160 {
161 if (!enabled_) {
162 return E_OK;
163 }
164 if (dbIdentifier.empty()) {
165 return -E_INVALID_ARGS;
166 }
167
168 // The pauseNeedCount must be zero in RUN_WAIT and RUN_NING case, but not always zero in FINISH case.
169 // If pause is called more than continue, status may be PAUSE_WAIT, PAUSE_DONE, which is not expected.
170 // The pauseNeedCount, runWaitOrder and autoRelaunchOnce will be reset when launch(Not Auto) if abort normally
171 std::unique_lock<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
172 if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
173 return -E_NOT_FOUND;
174 } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::RUN_WAIT ||
175 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_DONE ||
176 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH) {
177 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::ABORT_DONE;
178 dbMapVacuumTask_[dbIdentifier].launchErrorHappen = false;
179 dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
180 // In this place, the background will not access information of this vacuum task
181 dbMapVacuumTask_[dbIdentifier].databaseHandle = nullptr;
182 ResetNodeAndRecordContextInfo(dbMapVacuumTask_[dbIdentifier]);
183 } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::RUN_NING ||
184 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_WAIT) {
185 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::ABORT_WAIT;
186 dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
187 vacuumTaskCv_.wait(vacuumTaskLockGuard, [this, &dbIdentifier] {
188 // In concurrency scenario that executor is about to finish this task, the final status may be FINISH
189 return dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::ABORT_DONE ||
190 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH;
191 });
192 // Resource is cleaned by background task, still set ABORT_DONE and reset launchErrorHappen and databaseHandle.
193 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::ABORT_DONE;
194 dbMapVacuumTask_[dbIdentifier].launchErrorHappen = false;
195 dbMapVacuumTask_[dbIdentifier].databaseHandle = nullptr;
196 } else {
197 LOGE("[Vacuum][Abort] Unexpected pre-status=%d!", static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
198 return -E_NOT_PERMIT;
199 }
200 return E_OK;
201 }
202
AutoRelaunchOnce(const std::string & dbIdentifier)203 int MultiVerVacuum::AutoRelaunchOnce(const std::string &dbIdentifier)
204 {
205 if (!enabled_) {
206 return E_OK;
207 }
208 if (dbIdentifier.empty()) {
209 return -E_INVALID_ARGS;
210 }
211
212 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
213 if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
214 return -E_NOT_FOUND;
215 } else if (dbMapVacuumTask_[dbIdentifier].launchErrorHappen) {
216 LOGE("[Vacuum][AutoRelaunch] LaunchErrorHappen detected, pre-status=%d!",
217 static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
218 return -E_NOT_PERMIT;
219 } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH &&
220 dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable) {
221 // Relaunch this task immediately
222 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::RUN_WAIT;
223 dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = false;
224 dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
225 } else {
226 // Set flag true in order to Relaunch this task once when it finish
227 dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = true;
228 }
229 ActivateBackgroundVacuumTaskExecution();
230 return E_OK;
231 }
232
QueryStatus(const std::string & dbIdentifier,VacuumTaskStatus & outStatus) const233 int MultiVerVacuum::QueryStatus(const std::string &dbIdentifier, VacuumTaskStatus &outStatus) const
234 {
235 if (dbIdentifier.empty()) {
236 return -E_INVALID_ARGS;
237 }
238
239 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
240 if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
241 return -E_NOT_FOUND;
242 }
243
244 outStatus = dbMapVacuumTask_.at(dbIdentifier).status;
245 return E_OK;
246 }
247
~MultiVerVacuum()248 MultiVerVacuum::~MultiVerVacuum()
249 {
250 // Mainly for stop the background task, resources automatically clean by this deconstruction
251 std::unique_lock<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
252 for (auto &each : dbMapVacuumTask_) {
253 if (each.second.status == VacuumTaskStatus::RUN_WAIT || each.second.status == VacuumTaskStatus::PAUSE_DONE) {
254 // For RUN_WAIT and PAUSE_DONE, change to ABORT_DONE
255 each.second.status = VacuumTaskStatus::ABORT_DONE;
256 } else if (each.second.status == VacuumTaskStatus::RUN_NING ||
257 each.second.status == VacuumTaskStatus::PAUSE_WAIT) {
258 // For RUN_NING and PAUSE_WAIT, change to ABORT_WAIT
259 each.second.status = VacuumTaskStatus::ABORT_WAIT;
260 }
261 // For ABORT_WAIT, ABORT_DONE and FINISH, remain as it is.
262 }
263 // Wait for background task to quit
264 vacuumTaskCv_.wait(vacuumTaskLockGuard, [this] {
265 return !isBackgroundVacuumTaskInExecution_;
266 });
267 }
268
VacuumTaskExecutor()269 void MultiVerVacuum::VacuumTaskExecutor()
270 {
271 // Endless loop until nothing to do
272 while (true) {
273 std::string nextDatabase;
274 {
275 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
276 int errCode = SearchVacuumTaskToExecute(nextDatabase);
277 if (errCode != E_OK) {
278 LOGI("[Vacuum][Executor] No available task to execute, about to quit.");
279 isBackgroundVacuumTaskInExecution_ = false;
280 // Awake the deconstruction that background thread is about to quit
281 vacuumTaskCv_.notify_all();
282 return;
283 }
284 }
285 // No thread will remove entry from dbMapVacuumTask_, so here is concurrency safe.
286 LOGI("[Vacuum][Executor] Execute vacuum task for database=%s.", nextDatabase.c_str());
287 ExecuteSpecificVacuumTask(dbMapVacuumTask_[nextDatabase]);
288 // Awake foreground thread at this task switch point
289 vacuumTaskCv_.notify_all();
290 }
291 }
292
ExecuteSpecificVacuumTask(VacuumTaskContext & inTask)293 void MultiVerVacuum::ExecuteSpecificVacuumTask(VacuumTaskContext &inTask)
294 {
295 // No other thread will access handle, node and record field of a RUN_NING, PAUSE_WAIT, ABORT_WAIT status task
296 // So it is concurrency safe to access or change these field without protection of lockguard
297 if (inTask.leftBranchCommits.empty() && inTask.rightBranchCommits.empty()) {
298 // Newly launched task
299 int errCode = inTask.databaseHandle->GetVacuumAbleCommits(inTask.leftBranchCommits, inTask.rightBranchCommits);
300 if (errCode != E_OK) {
301 LOGE("[Vacuum][Execute] GetVacuumAbleCommits fail, errCode=%d.", errCode);
302 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
303 FinishVaccumTask(inTask);
304 return;
305 }
306 }
307
308 // Vacuum left branch first, since record of left branch will be synced out, more urgently
309 while (!inTask.leftBranchCommits.empty()) {
310 int errCode = DealWithLeftBranchCommit(inTask);
311 if (errCode != E_OK) {
312 return;
313 }
314 }
315 LOGD("[Vacuum][Execute] All vacuum able commits of left branch have been dealt with for this database!");
316
317 // Vacuum right branch later, since record of right branch will not be synced out, not so urgent
318 while (!inTask.rightBranchCommits.empty()) {
319 int errCode = DealWithRightBranchCommit(inTask);
320 if (errCode != E_OK) {
321 return;
322 }
323 }
324 LOGD("[Vacuum][Execute] All vacuum able commits of right branch have been dealt with for this database!");
325
326 // Commit changes before finish this task, if fail, just finish it(commit fail auto rollback)
327 int errCode = CommitTransactionIfNeed(inTask);
328 if (errCode != E_OK) {
329 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
330 FinishVaccumTask(inTask);
331 return;
332 }
333
334 // Every commit of this task has been treated, consider finish or relaunch the task
335 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
336 if (inTask.status == VacuumTaskStatus::RUN_NING && inTask.autoRelaunchOnce) {
337 RelaunchVacuumTask(inTask);
338 } else {
339 // If in PAUSE_WAIT or ABORT_WAIT status, shall not relaunch it, just finish it to make sure it be unactive
340 // The autoRelaunchOnce will be set false, if need relaunch, the continue operation will set it true again
341 FinishVaccumTask(inTask);
342 }
343 }
344
DealWithLeftBranchCommit(VacuumTaskContext & inTask)345 int MultiVerVacuum::DealWithLeftBranchCommit(VacuumTaskContext &inTask)
346 {
347 return DoDealCommitOfLeftOrRight(inTask, inTask.leftBranchCommits, true);
348 }
349
DealWithLeftBranchVacuumNeedRecord(VacuumTaskContext & inTask)350 int MultiVerVacuum::DealWithLeftBranchVacuumNeedRecord(VacuumTaskContext &inTask)
351 {
352 int errCode = DoCommitAndQuitIfWaitStatusObserved(inTask);
353 if (errCode != E_OK) {
354 return errCode;
355 }
356 // No other thread will access handle, node and record field of a RUN_NING, PAUSE_WAIT, ABORT_WAIT status task
357 // So it is concurrency safe to access or change these field without protection of lockguard
358 const MultiVerRecordInfo &record = inTask.vacuumNeedRecords.front();
359 LOGD("[Vacuum][DealLeftRecord] Type=%" PRIu32 ", Version=%" PRIu64 ", HashKey=%s.",
360 static_cast<uint32_t>(record.type), record.version, VEC_TO_STR(record.hashKey));
361 if (inTask.shadowRecords.empty()) {
362 if (record.type == RecordType::CLEAR) {
363 errCode = inTask.databaseHandle->GetShadowRecordsOfClearTypeRecord(record.version, record.hashKey,
364 inTask.shadowRecords);
365 } else {
366 errCode = inTask.databaseHandle->GetShadowRecordsOfNonClearTypeRecord(record.version, record.hashKey,
367 inTask.shadowRecords);
368 }
369 if (errCode != E_OK) {
370 LOGE("[Vacuum][DealLeftRecord] GetShadowRecords fail, Type=%d, Version=%llu, HashKey=%s, errCode=%d.",
371 static_cast<int>(record.type), ULL(record.version), VEC_TO_STR(record.hashKey), errCode);
372 DoRollBackAndFinish(inTask);
373 return errCode;
374 }
375 }
376
377 while (!inTask.shadowRecords.empty()) {
378 errCode = DealWithLeftBranchShadowRecord(inTask);
379 if (errCode != E_OK) {
380 return errCode;
381 }
382 }
383
384 // Every shadowRecords of this vacuumNeedRecord has been treated, mark this vacuumNeedRecord as vacuum done
385 errCode = StartTransactionIfNotYet(inTask);
386 if (errCode != E_OK) {
387 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
388 FinishVaccumTask(inTask);
389 return errCode;
390 }
391 errCode = inTask.databaseHandle->MarkRecordAsVacuumDone(record.version, record.hashKey);
392 if (errCode != E_OK) {
393 LOGE("[Vacuum][DealLeftRecord] MarkRecordAsVacuumDone fail, Type=%d, Version=%llu, HashKey=%s, errCode=%d.",
394 static_cast<int>(record.type), ULL(record.version), VEC_TO_STR(record.hashKey), errCode);
395 DoRollBackAndFinish(inTask);
396 return errCode;
397 }
398 // Pop out this vacuumNeedRecord
399 inTask.vacuumNeedRecords.pop_front();
400 return E_OK;
401 }
402
DealWithLeftBranchShadowRecord(VacuumTaskContext & inTask)403 int MultiVerVacuum::DealWithLeftBranchShadowRecord(VacuumTaskContext &inTask)
404 {
405 return DoDeleteRecordOfLeftShadowOrRightVacuumNeed(inTask, inTask.shadowRecords);
406 }
407
DealWithRightBranchCommit(VacuumTaskContext & inTask)408 int MultiVerVacuum::DealWithRightBranchCommit(VacuumTaskContext &inTask)
409 {
410 return DoDealCommitOfLeftOrRight(inTask, inTask.rightBranchCommits, false);
411 }
412
DealWithRightBranchVacuumNeedRecord(VacuumTaskContext & inTask)413 int MultiVerVacuum::DealWithRightBranchVacuumNeedRecord(VacuumTaskContext &inTask)
414 {
415 return DoDeleteRecordOfLeftShadowOrRightVacuumNeed(inTask, inTask.vacuumNeedRecords);
416 }
417
DoDealCommitOfLeftOrRight(VacuumTaskContext & inTask,std::list<MultiVerCommitInfo> & commitList,bool isLeft)418 int MultiVerVacuum::DoDealCommitOfLeftOrRight(VacuumTaskContext &inTask, std::list<MultiVerCommitInfo> &commitList,
419 bool isLeft)
420 {
421 int errCode = DoCommitAndQuitIfWaitStatusObserved(inTask);
422 if (errCode != E_OK) {
423 return errCode;
424 }
425 // No other thread will access handle, node and record field of a RUN_NING, PAUSE_WAIT, ABORT_WAIT status task
426 // So it is concurrency safe to access or change these field without protection of lockguard
427 const MultiVerCommitInfo &commit = commitList.front();
428 LOGD("[Vacuum][DoDealCommit] Version=%llu, CommitId=%s, isLeft=%d.", ULL(commit.version),
429 VEC_TO_STR(commit.commitId), isLeft);
430 if (inTask.vacuumNeedRecords.empty()) {
431 errCode = inTask.databaseHandle->GetVacuumNeedRecordsByVersion(commit.version, inTask.vacuumNeedRecords);
432 if (errCode != E_OK) {
433 LOGE("[Vacuum][DoDealCommit] GetVacuumNeedRecordsByVersion fail, Version=%llu, CommitId=%s, isLeft=%d, "
434 "errCode=%d.", ULL(commit.version), VEC_TO_STR(commit.commitId), isLeft, errCode);
435 DoRollBackAndFinish(inTask);
436 return errCode;
437 }
438 }
439
440 while (!inTask.vacuumNeedRecords.empty()) {
441 if (isLeft) {
442 errCode = DealWithLeftBranchVacuumNeedRecord(inTask);
443 } else {
444 errCode = DealWithRightBranchVacuumNeedRecord(inTask);
445 }
446 if (errCode != E_OK) {
447 return errCode;
448 }
449 }
450
451 // Every vacuumNeedRecords of this commit has been treated, mark this commit as vacuum done
452 errCode = StartTransactionIfNotYet(inTask);
453 if (errCode != E_OK) {
454 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
455 FinishVaccumTask(inTask);
456 return errCode;
457 }
458 errCode = inTask.databaseHandle->MarkCommitAsVacuumDone(commit.commitId);
459 if (errCode != E_OK) {
460 LOGE("[Vacuum][DoDealCommit] MarkCommitAsVacuumDone fail, Version=%llu, CommitId=%s, isLeft=%d, errCode=%d.",
461 ULL(commit.version), VEC_TO_STR(commit.commitId), isLeft, errCode);
462 DoRollBackAndFinish(inTask);
463 return errCode;
464 }
465 // Pop out this commit
466 commitList.pop_front();
467 return E_OK;
468 }
469
DoDeleteRecordOfLeftShadowOrRightVacuumNeed(VacuumTaskContext & inTask,std::list<MultiVerRecordInfo> & recordList)470 int MultiVerVacuum::DoDeleteRecordOfLeftShadowOrRightVacuumNeed(VacuumTaskContext &inTask,
471 std::list<MultiVerRecordInfo> &recordList)
472 {
473 int errCode = DoCommitAndQuitIfWaitStatusObserved(inTask);
474 if (errCode != E_OK) {
475 return errCode;
476 }
477 // No other thread will access handle, node and record field of a RUN_NING, PAUSE_WAIT, ABORT_WAIT status task
478 // So it is concurrency safe to access or change these field without protection of lockguard
479 const MultiVerRecordInfo &record = recordList.front();
480 LOGD("[Vacuum][DoDeleteRecord] Type=%u, Version=%llu, HashKey=%s.", static_cast<unsigned>(record.type),
481 ULL(record.version), VEC_TO_STR(record.hashKey));
482 errCode = StartTransactionIfNotYet(inTask);
483 if (errCode != E_OK) {
484 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
485 FinishVaccumTask(inTask);
486 return errCode;
487 }
488 errCode = inTask.databaseHandle->DeleteRecordTotally(record.version, record.hashKey);
489 if (errCode != E_OK) {
490 LOGE("[Vacuum][DoDeleteRecord] DeleteRecordTotally fail, Type=%u, Version=%llu, HashKey=%s, errCode=%d.",
491 static_cast<unsigned>(record.type), ULL(record.version), VEC_TO_STR(record.hashKey), errCode);
492 DoRollBackAndFinish(inTask);
493 return errCode;
494 }
495 // Pop out this shadowRecord or vacuumNeedRecord
496 recordList.pop_front();
497 return E_OK;
498 }
499
DoRollBackAndFinish(VacuumTaskContext & inTask)500 void MultiVerVacuum::DoRollBackAndFinish(VacuumTaskContext &inTask)
501 {
502 RollBackTransactionIfNeed(inTask);
503 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
504 FinishVaccumTask(inTask);
505 }
506
DoCommitAndQuitIfWaitStatusObserved(VacuumTaskContext & inTask)507 int MultiVerVacuum::DoCommitAndQuitIfWaitStatusObserved(VacuumTaskContext &inTask)
508 {
509 bool waitStatusObserved = false;
510 {
511 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
512 if (inTask.status == VacuumTaskStatus::PAUSE_WAIT || inTask.status == VacuumTaskStatus::ABORT_WAIT) {
513 waitStatusObserved = true;
514 }
515 }
516 // Only this TaskThread will change a PAUSE_WAIT or ABORT_WAIT status to other status
517 // So here during the gap of miss-lockguard-protection, the status of this inTask will not change
518 if (waitStatusObserved) {
519 // CommitTransactionIfNeed may be an time cost operation, should not be called within the range of lockguard
520 int errCode = CommitTransactionIfNeed(inTask);
521 // Change status operation should be protected within the lockguard
522 std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
523 if (errCode != E_OK) {
524 // If commit fail, just finish this task(commit fail auto rollback)
525 FinishVaccumTask(inTask);
526 return errCode;
527 }
528 if (inTask.status == VacuumTaskStatus::ABORT_WAIT) {
529 AbortVacuumTask(inTask);
530 return -E_TASK_BREAK_OFF;
531 }
532 // Nor commit fail, nor Abort_wait case, here is Pause_wait Case, just set status to Pause_done
533 inTask.status = VacuumTaskStatus::PAUSE_DONE;
534 return -E_TASK_BREAK_OFF;
535 }
536 return E_OK;
537 }
538
StartTransactionIfNotYet(VacuumTaskContext & inTask)539 int MultiVerVacuum::StartTransactionIfNotYet(VacuumTaskContext &inTask)
540 {
541 if (!inTask.isTransactionStarted) {
542 int errCode = inTask.databaseHandle->StartTransactionForVacuum();
543 if (errCode != E_OK) {
544 LOGE("[Vacuum][StartTransact] StartTransactionForVacuum fail, errCode=%d.", errCode);
545 return errCode;
546 }
547 inTask.isTransactionStarted = true;
548 }
549 return E_OK;
550 }
551
CommitTransactionIfNeed(VacuumTaskContext & inTask)552 int MultiVerVacuum::CommitTransactionIfNeed(VacuumTaskContext &inTask)
553 {
554 if (inTask.isTransactionStarted) {
555 // Whether CommitTransactionForVacuum fail or not, the transaction is ended.
556 inTask.isTransactionStarted = false;
557 int errCode = inTask.databaseHandle->CommitTransactionForVacuum();
558 if (errCode != E_OK) {
559 LOGE("[Vacuum][CommitTransact] CommitTransactionForVacuum fail, errCode=%d.", errCode);
560 return errCode;
561 }
562 }
563 return E_OK;
564 }
565
RollBackTransactionIfNeed(VacuumTaskContext & inTask)566 void MultiVerVacuum::RollBackTransactionIfNeed(VacuumTaskContext &inTask)
567 {
568 if (inTask.isTransactionStarted) {
569 // Whether RollBackTransactionForVacuum fail or not, the transaction is ended.
570 inTask.isTransactionStarted = false;
571 int errCode = inTask.databaseHandle->RollBackTransactionForVacuum();
572 if (errCode != E_OK) {
573 LOGE("[Vacuum][RollBackTransact] RollBackTransactionForVacuum fail, errCode=%d.", errCode);
574 }
575 }
576 }
577
FinishVaccumTask(VacuumTaskContext & inTask)578 void MultiVerVacuum::FinishVaccumTask(VacuumTaskContext &inTask)
579 {
580 inTask.status = VacuumTaskStatus::FINISH;
581 // It is OK to reset the autoRelaunchOnce. Since this is called when this task is RUN_NING status, all pause to
582 // this task will block and wait, and all continue to this task happens after we reset the autoRelaunchOnce
583 inTask.autoRelaunchOnce = false;
584 // Do not reset the databaseHandle while finish a task, because it will be reused after autoRelaunch
585 ResetNodeAndRecordContextInfo(inTask);
586 }
587
RelaunchVacuumTask(VacuumTaskContext & inTask)588 void MultiVerVacuum::RelaunchVacuumTask(VacuumTaskContext &inTask)
589 {
590 inTask.status = VacuumTaskStatus::RUN_WAIT;
591 inTask.runWaitOrder = incRunWaitOrder_++; // Queue at the back
592 inTask.autoRelaunchOnce = false;
593 // Obviously can not reset the databaseHandle while relaunch a task
594 ResetNodeAndRecordContextInfo(inTask);
595 }
596
AbortVacuumTask(VacuumTaskContext & inTask)597 void MultiVerVacuum::AbortVacuumTask(VacuumTaskContext &inTask)
598 {
599 inTask.status = VacuumTaskStatus::ABORT_DONE;
600 inTask.autoRelaunchOnce = false;
601 inTask.databaseHandle = nullptr; // reset handle in abort case
602 ResetNodeAndRecordContextInfo(inTask);
603 }
604
ResetNodeAndRecordContextInfo(VacuumTaskContext & inTask)605 void MultiVerVacuum::ResetNodeAndRecordContextInfo(VacuumTaskContext &inTask)
606 {
607 inTask.leftBranchCommits.clear();
608 inTask.rightBranchCommits.clear();
609 inTask.vacuumNeedRecords.clear();
610 inTask.shadowRecords.clear();
611 inTask.isTransactionStarted = false;
612 }
613
SearchVacuumTaskToExecute(std::string & outDbIdentifier)614 int MultiVerVacuum::SearchVacuumTaskToExecute(std::string &outDbIdentifier)
615 {
616 // Find a vacuum task with the smallest runWaitOrder among tasks that is in RUN_WAIT Status(Except In Error).
617 uint64_t minRunWaitOrder = UINT64_MAX;
618 for (auto &eachTask : dbMapVacuumTask_) {
619 LOGD("[Vacuum][Search] db=%s, status=%d, error=%d, relaunch=%d, immediate=%d, runWait=%llu, pauseCount=%llu.",
620 eachTask.first.c_str(), static_cast<int>(eachTask.second.status), eachTask.second.launchErrorHappen,
621 eachTask.second.autoRelaunchOnce, eachTask.second.immediatelyRelaunchable,
622 ULL(eachTask.second.runWaitOrder), ULL(eachTask.second.pauseNeedCount));
623 if (eachTask.second.status == VacuumTaskStatus::RUN_WAIT && !eachTask.second.launchErrorHappen) {
624 if (eachTask.second.runWaitOrder < minRunWaitOrder) {
625 minRunWaitOrder = eachTask.second.runWaitOrder;
626 outDbIdentifier = eachTask.first;
627 }
628 }
629 }
630 if (!outDbIdentifier.empty()) {
631 dbMapVacuumTask_[outDbIdentifier].status = VacuumTaskStatus::RUN_NING;
632 return E_OK;
633 } else {
634 return -E_NOT_FOUND;
635 }
636 }
637
ActivateBackgroundVacuumTaskExecution()638 void MultiVerVacuum::ActivateBackgroundVacuumTaskExecution()
639 {
640 if (!isBackgroundVacuumTaskInExecution_) {
641 TaskAction backgroundTask = [this]() {
642 LOGI("[Vacuum][Activate] Begin Background Execution.");
643 VacuumTaskExecutor();
644 LOGI("[Vacuum][Activate] End Background Execution.");
645 };
646 int errCode = RuntimeContext::GetInstance()->ScheduleTask(backgroundTask);
647 if (errCode != E_OK) {
648 LOGE("[Vacuum][Activate] ScheduleTask failed, errCode = %d.", errCode);
649 return;
650 }
651 isBackgroundVacuumTaskInExecution_ = true;
652 }
653 }
654
IncPauseNeedCount(VacuumTaskContext & inTask)655 void MultiVerVacuum::IncPauseNeedCount(VacuumTaskContext &inTask)
656 {
657 inTask.pauseNeedCount++;
658 }
659
DecPauseNeedCount(VacuumTaskContext & inTask)660 void MultiVerVacuum::DecPauseNeedCount(VacuumTaskContext &inTask)
661 {
662 if (inTask.pauseNeedCount == 0) {
663 LOGE("[Vacuum][DecPause] PauseNeedCount Zero Before Decrease.");
664 return;
665 }
666 inTask.pauseNeedCount--;
667 }
668
IsPauseNotNeed(VacuumTaskContext & inTask)669 bool MultiVerVacuum::IsPauseNotNeed(VacuumTaskContext &inTask)
670 {
671 return inTask.pauseNeedCount == 0;
672 }
673 } // namespace DistributedDB
674 #endif
675