1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_vacuum.h"
18 
19 #include <string>
20 #include <vector>
21 #include <cstdint>
22 
23 #include "db_errno.h"
24 #include "db_common.h"
25 #include "log_print.h"
26 #include "macro_utils.h"
27 #include "runtime_context.h"
28 
29 namespace DistributedDB {
30 std::atomic<bool> MultiVerVacuum::enabled_{true};
31 
Enable(bool isEnable)32 void MultiVerVacuum::Enable(bool isEnable)
33 {
34     enabled_ = isEnable;
35 }
36 
Launch(const std::string & dbIdentifier,MultiVerVacuumExecutor * dbHandle)37 int MultiVerVacuum::Launch(const std::string &dbIdentifier, MultiVerVacuumExecutor *dbHandle)
38 {
39     if (!enabled_) {
40         LOGW("[Vacuum][Launch] Functionality Not Enabled!");
41         return E_OK;
42     }
43     if (dbIdentifier.empty() || dbHandle == nullptr) {
44         return -E_INVALID_ARGS;
45     }
46 
47     std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
48     if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
49         dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
50         dbMapVacuumTask_[dbIdentifier].databaseHandle = dbHandle;
51     } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::ABORT_DONE ||
52         dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH) {
53         // Reset vacuum task
54         dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::RUN_WAIT;
55         dbMapVacuumTask_[dbIdentifier].launchErrorHappen = false;
56         dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = false;
57         dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = true;
58         dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
59         dbMapVacuumTask_[dbIdentifier].pauseNeedCount = 0;
60         dbMapVacuumTask_[dbIdentifier].databaseHandle = dbHandle;
61     } else {
62         dbMapVacuumTask_[dbIdentifier].launchErrorHappen = true;
63         LOGE("[Vacuum][Launch] Unexpected pre-status=%d!", static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
64         return -E_NOT_PERMIT;
65     }
66     ActivateBackgroundVacuumTaskExecution();
67     return E_OK;
68 }
69 
Pause(const std::string & dbIdentifier)70 int MultiVerVacuum::Pause(const std::string &dbIdentifier)
71 {
72     if (!enabled_) {
73         return E_OK;
74     }
75     if (dbIdentifier.empty()) {
76         return -E_INVALID_ARGS;
77     }
78 
79     std::unique_lock<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
80     if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
81         return -E_NOT_FOUND;
82     } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::RUN_WAIT ||
83         dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_DONE) {
84         dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::PAUSE_DONE;
85         dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
86         IncPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
87     } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::RUN_NING ||
88         dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_WAIT) {
89         dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::PAUSE_WAIT;
90         dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
91         IncPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
92         vacuumTaskCv_.wait(vacuumTaskLockGuard, [this, &dbIdentifier] {
93             // In concurrency scenario that executor is about to finish this task, the final status may be FINISH.
94             // Even more, in case Abort be called immediately after task finished, the final status may be ABORT_DONE.
95             return dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_DONE ||
96                 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::ABORT_DONE ||
97                 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH;
98         });
99     } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH) {
100         dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
101         IncPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
102     } else {
103         LOGE("[Vacuum][Pause] Unexpected pre-status=%d!", static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
104         return -E_NOT_PERMIT;
105     }
106     return E_OK;
107 }
108 
Continue(const std::string & dbIdentifier,bool autoRelaunchOnce)109 int MultiVerVacuum::Continue(const std::string &dbIdentifier, bool autoRelaunchOnce)
110 {
111     if (!enabled_) {
112         return E_OK;
113     }
114     if (dbIdentifier.empty()) {
115         return -E_INVALID_ARGS;
116     }
117 
118     std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
119     if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
120         return -E_NOT_FOUND;
121     } else if (dbMapVacuumTask_[dbIdentifier].launchErrorHappen) {
122         LOGE("[Vacuum][Continue] LaunchErrorHappen detected, pre-status=%d!",
123             static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
124         return -E_NOT_PERMIT;
125     } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_DONE) {
126         DecPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
127         bool relaunchFlag = (dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce || autoRelaunchOnce);
128         dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = relaunchFlag;
129         // Truly continue this task only when all pause had been counteracted
130         if (IsPauseNotNeed(dbMapVacuumTask_[dbIdentifier])) {
131             dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::RUN_WAIT;
132             dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
133             dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = true;
134             ActivateBackgroundVacuumTaskExecution();
135         }
136     } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH) {
137         // Update relaunch flag first
138         DecPauseNeedCount(dbMapVacuumTask_[dbIdentifier]);
139         bool relaunchFlag = (dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce || autoRelaunchOnce);
140         dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = relaunchFlag;
141         // All pause had been counteracted, so this task is immediatelyRelaunchable, but not necessarily relaunch now.
142         if (IsPauseNotNeed(dbMapVacuumTask_[dbIdentifier])) {
143             dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = true;
144             // Do autoRelaunch if need
145             if (dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce) {
146                 dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::RUN_WAIT;
147                 dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
148                 dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = false;
149                 ActivateBackgroundVacuumTaskExecution();
150             }
151         }
152     } else {
153         LOGE("[Vacuum][Continue] Unexpected pre-status=%d!", static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
154         return -E_NOT_PERMIT;
155     }
156     return E_OK;
157 }
158 
Abort(const std::string & dbIdentifier)159 int MultiVerVacuum::Abort(const std::string &dbIdentifier)
160 {
161     if (!enabled_) {
162         return E_OK;
163     }
164     if (dbIdentifier.empty()) {
165         return -E_INVALID_ARGS;
166     }
167 
168     // The pauseNeedCount must be zero in RUN_WAIT and RUN_NING case, but not always zero in FINISH case.
169     // If pause is called more than continue, status may be PAUSE_WAIT, PAUSE_DONE, which is not expected.
170     // The pauseNeedCount, runWaitOrder and autoRelaunchOnce will be reset when launch(Not Auto) if abort normally
171     std::unique_lock<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
172     if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
173         return -E_NOT_FOUND;
174     } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::RUN_WAIT ||
175         dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_DONE ||
176         dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH) {
177         dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::ABORT_DONE;
178         dbMapVacuumTask_[dbIdentifier].launchErrorHappen = false;
179         dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
180         // In this place, the background will not access information of this vacuum task
181         dbMapVacuumTask_[dbIdentifier].databaseHandle = nullptr;
182         ResetNodeAndRecordContextInfo(dbMapVacuumTask_[dbIdentifier]);
183     } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::RUN_NING ||
184         dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::PAUSE_WAIT) {
185         dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::ABORT_WAIT;
186         dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable = false;
187         vacuumTaskCv_.wait(vacuumTaskLockGuard, [this, &dbIdentifier] {
188             // In concurrency scenario that executor is about to finish this task, the final status may be FINISH
189             return dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::ABORT_DONE ||
190                 dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH;
191         });
192         // Resource is cleaned by background task, still set ABORT_DONE and reset launchErrorHappen and databaseHandle.
193         dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::ABORT_DONE;
194         dbMapVacuumTask_[dbIdentifier].launchErrorHappen = false;
195         dbMapVacuumTask_[dbIdentifier].databaseHandle = nullptr;
196     } else {
197         LOGE("[Vacuum][Abort] Unexpected pre-status=%d!", static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
198         return -E_NOT_PERMIT;
199     }
200     return E_OK;
201 }
202 
AutoRelaunchOnce(const std::string & dbIdentifier)203 int MultiVerVacuum::AutoRelaunchOnce(const std::string &dbIdentifier)
204 {
205     if (!enabled_) {
206         return E_OK;
207     }
208     if (dbIdentifier.empty()) {
209         return -E_INVALID_ARGS;
210     }
211 
212     std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
213     if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
214         return -E_NOT_FOUND;
215     } else if (dbMapVacuumTask_[dbIdentifier].launchErrorHappen) {
216         LOGE("[Vacuum][AutoRelaunch] LaunchErrorHappen detected, pre-status=%d!",
217             static_cast<int>(dbMapVacuumTask_[dbIdentifier].status));
218         return -E_NOT_PERMIT;
219     } else if (dbMapVacuumTask_[dbIdentifier].status == VacuumTaskStatus::FINISH &&
220         dbMapVacuumTask_[dbIdentifier].immediatelyRelaunchable) {
221         // Relaunch this task immediately
222         dbMapVacuumTask_[dbIdentifier].status = VacuumTaskStatus::RUN_WAIT;
223         dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = false;
224         dbMapVacuumTask_[dbIdentifier].runWaitOrder = incRunWaitOrder_++;
225     } else {
226         // Set flag true in order to Relaunch this task once when it finish
227         dbMapVacuumTask_[dbIdentifier].autoRelaunchOnce = true;
228     }
229     ActivateBackgroundVacuumTaskExecution();
230     return E_OK;
231 }
232 
QueryStatus(const std::string & dbIdentifier,VacuumTaskStatus & outStatus) const233 int MultiVerVacuum::QueryStatus(const std::string &dbIdentifier, VacuumTaskStatus &outStatus) const
234 {
235     if (dbIdentifier.empty()) {
236         return -E_INVALID_ARGS;
237     }
238 
239     std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
240     if (dbMapVacuumTask_.count(dbIdentifier) == 0) {
241         return -E_NOT_FOUND;
242     }
243 
244     outStatus = dbMapVacuumTask_.at(dbIdentifier).status;
245     return E_OK;
246 }
247 
~MultiVerVacuum()248 MultiVerVacuum::~MultiVerVacuum()
249 {
250     // Mainly for stop the background task, resources automatically clean by this deconstruction
251     std::unique_lock<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
252     for (auto &each : dbMapVacuumTask_) {
253         if (each.second.status == VacuumTaskStatus::RUN_WAIT || each.second.status == VacuumTaskStatus::PAUSE_DONE) {
254             // For RUN_WAIT and PAUSE_DONE, change to ABORT_DONE
255             each.second.status = VacuumTaskStatus::ABORT_DONE;
256         } else if (each.second.status == VacuumTaskStatus::RUN_NING ||
257             each.second.status == VacuumTaskStatus::PAUSE_WAIT) {
258             // For RUN_NING and PAUSE_WAIT, change to ABORT_WAIT
259             each.second.status = VacuumTaskStatus::ABORT_WAIT;
260         }
261         // For ABORT_WAIT, ABORT_DONE and FINISH, remain as it is.
262     }
263     // Wait for background task to quit
264     vacuumTaskCv_.wait(vacuumTaskLockGuard, [this] {
265         return !isBackgroundVacuumTaskInExecution_;
266     });
267 }
268 
VacuumTaskExecutor()269 void MultiVerVacuum::VacuumTaskExecutor()
270 {
271     // Endless loop until nothing to do
272     while (true) {
273         std::string nextDatabase;
274         {
275             std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
276             int errCode = SearchVacuumTaskToExecute(nextDatabase);
277             if (errCode != E_OK) {
278                 LOGI("[Vacuum][Executor] No available task to execute, about to quit.");
279                 isBackgroundVacuumTaskInExecution_ = false;
280                 // Awake the deconstruction that background thread is about to quit
281                 vacuumTaskCv_.notify_all();
282                 return;
283             }
284         }
285         // No thread will remove entry from dbMapVacuumTask_, so here is concurrency safe.
286         LOGI("[Vacuum][Executor] Execute vacuum task for database=%s.", nextDatabase.c_str());
287         ExecuteSpecificVacuumTask(dbMapVacuumTask_[nextDatabase]);
288         // Awake foreground thread at this task switch point
289         vacuumTaskCv_.notify_all();
290     }
291 }
292 
ExecuteSpecificVacuumTask(VacuumTaskContext & inTask)293 void MultiVerVacuum::ExecuteSpecificVacuumTask(VacuumTaskContext &inTask)
294 {
295     // No other thread will access handle, node and record field of a RUN_NING, PAUSE_WAIT, ABORT_WAIT status task
296     // So it is concurrency safe to access or change these field without protection of lockguard
297     if (inTask.leftBranchCommits.empty() && inTask.rightBranchCommits.empty()) {
298         // Newly launched task
299         int errCode = inTask.databaseHandle->GetVacuumAbleCommits(inTask.leftBranchCommits, inTask.rightBranchCommits);
300         if (errCode != E_OK) {
301             LOGE("[Vacuum][Execute] GetVacuumAbleCommits fail, errCode=%d.", errCode);
302             std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
303             FinishVaccumTask(inTask);
304             return;
305         }
306     }
307 
308     // Vacuum left branch first, since record of left branch will be synced out, more urgently
309     while (!inTask.leftBranchCommits.empty()) {
310         int errCode = DealWithLeftBranchCommit(inTask);
311         if (errCode != E_OK) {
312             return;
313         }
314     }
315     LOGD("[Vacuum][Execute] All vacuum able commits of left branch have been dealt with for this database!");
316 
317     // Vacuum right branch later, since record of right branch will not be synced out, not so urgent
318     while (!inTask.rightBranchCommits.empty()) {
319         int errCode = DealWithRightBranchCommit(inTask);
320         if (errCode != E_OK) {
321             return;
322         }
323     }
324     LOGD("[Vacuum][Execute] All vacuum able commits of right branch have been dealt with for this database!");
325 
326     // Commit changes before finish this task, if fail, just finish it(commit fail auto rollback)
327     int errCode = CommitTransactionIfNeed(inTask);
328     if (errCode != E_OK) {
329         std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
330         FinishVaccumTask(inTask);
331         return;
332     }
333 
334     // Every commit of this task has been treated, consider finish or relaunch the task
335     std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
336     if (inTask.status == VacuumTaskStatus::RUN_NING && inTask.autoRelaunchOnce) {
337         RelaunchVacuumTask(inTask);
338     } else {
339         // If in PAUSE_WAIT or ABORT_WAIT status, shall not relaunch it, just finish it to make sure it be unactive
340         // The autoRelaunchOnce will be set false, if need relaunch, the continue operation will set it true again
341         FinishVaccumTask(inTask);
342     }
343 }
344 
DealWithLeftBranchCommit(VacuumTaskContext & inTask)345 int MultiVerVacuum::DealWithLeftBranchCommit(VacuumTaskContext &inTask)
346 {
347     return DoDealCommitOfLeftOrRight(inTask, inTask.leftBranchCommits, true);
348 }
349 
DealWithLeftBranchVacuumNeedRecord(VacuumTaskContext & inTask)350 int MultiVerVacuum::DealWithLeftBranchVacuumNeedRecord(VacuumTaskContext &inTask)
351 {
352     int errCode = DoCommitAndQuitIfWaitStatusObserved(inTask);
353     if (errCode != E_OK) {
354         return errCode;
355     }
356     // No other thread will access handle, node and record field of a RUN_NING, PAUSE_WAIT, ABORT_WAIT status task
357     // So it is concurrency safe to access or change these field without protection of lockguard
358     const MultiVerRecordInfo &record = inTask.vacuumNeedRecords.front();
359     LOGD("[Vacuum][DealLeftRecord] Type=%" PRIu32 ", Version=%" PRIu64 ", HashKey=%s.",
360         static_cast<uint32_t>(record.type), record.version, VEC_TO_STR(record.hashKey));
361     if (inTask.shadowRecords.empty()) {
362         if (record.type == RecordType::CLEAR) {
363             errCode = inTask.databaseHandle->GetShadowRecordsOfClearTypeRecord(record.version, record.hashKey,
364                 inTask.shadowRecords);
365         } else {
366             errCode = inTask.databaseHandle->GetShadowRecordsOfNonClearTypeRecord(record.version, record.hashKey,
367                 inTask.shadowRecords);
368         }
369         if (errCode != E_OK) {
370             LOGE("[Vacuum][DealLeftRecord] GetShadowRecords fail, Type=%d, Version=%llu, HashKey=%s, errCode=%d.",
371                 static_cast<int>(record.type), ULL(record.version), VEC_TO_STR(record.hashKey), errCode);
372             DoRollBackAndFinish(inTask);
373             return errCode;
374         }
375     }
376 
377     while (!inTask.shadowRecords.empty()) {
378         errCode = DealWithLeftBranchShadowRecord(inTask);
379         if (errCode != E_OK) {
380             return errCode;
381         }
382     }
383 
384     // Every shadowRecords of this vacuumNeedRecord has been treated, mark this vacuumNeedRecord as vacuum done
385     errCode = StartTransactionIfNotYet(inTask);
386     if (errCode != E_OK) {
387         std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
388         FinishVaccumTask(inTask);
389         return errCode;
390     }
391     errCode = inTask.databaseHandle->MarkRecordAsVacuumDone(record.version, record.hashKey);
392     if (errCode != E_OK) {
393         LOGE("[Vacuum][DealLeftRecord] MarkRecordAsVacuumDone fail, Type=%d, Version=%llu, HashKey=%s, errCode=%d.",
394             static_cast<int>(record.type), ULL(record.version), VEC_TO_STR(record.hashKey), errCode);
395         DoRollBackAndFinish(inTask);
396         return errCode;
397     }
398     // Pop out this vacuumNeedRecord
399     inTask.vacuumNeedRecords.pop_front();
400     return E_OK;
401 }
402 
DealWithLeftBranchShadowRecord(VacuumTaskContext & inTask)403 int MultiVerVacuum::DealWithLeftBranchShadowRecord(VacuumTaskContext &inTask)
404 {
405     return DoDeleteRecordOfLeftShadowOrRightVacuumNeed(inTask, inTask.shadowRecords);
406 }
407 
DealWithRightBranchCommit(VacuumTaskContext & inTask)408 int MultiVerVacuum::DealWithRightBranchCommit(VacuumTaskContext &inTask)
409 {
410     return DoDealCommitOfLeftOrRight(inTask, inTask.rightBranchCommits, false);
411 }
412 
DealWithRightBranchVacuumNeedRecord(VacuumTaskContext & inTask)413 int MultiVerVacuum::DealWithRightBranchVacuumNeedRecord(VacuumTaskContext &inTask)
414 {
415     return DoDeleteRecordOfLeftShadowOrRightVacuumNeed(inTask, inTask.vacuumNeedRecords);
416 }
417 
DoDealCommitOfLeftOrRight(VacuumTaskContext & inTask,std::list<MultiVerCommitInfo> & commitList,bool isLeft)418 int MultiVerVacuum::DoDealCommitOfLeftOrRight(VacuumTaskContext &inTask, std::list<MultiVerCommitInfo> &commitList,
419     bool isLeft)
420 {
421     int errCode = DoCommitAndQuitIfWaitStatusObserved(inTask);
422     if (errCode != E_OK) {
423         return errCode;
424     }
425     // No other thread will access handle, node and record field of a RUN_NING, PAUSE_WAIT, ABORT_WAIT status task
426     // So it is concurrency safe to access or change these field without protection of lockguard
427     const MultiVerCommitInfo &commit = commitList.front();
428     LOGD("[Vacuum][DoDealCommit] Version=%llu, CommitId=%s, isLeft=%d.", ULL(commit.version),
429         VEC_TO_STR(commit.commitId), isLeft);
430     if (inTask.vacuumNeedRecords.empty()) {
431         errCode = inTask.databaseHandle->GetVacuumNeedRecordsByVersion(commit.version, inTask.vacuumNeedRecords);
432         if (errCode != E_OK) {
433             LOGE("[Vacuum][DoDealCommit] GetVacuumNeedRecordsByVersion fail, Version=%llu, CommitId=%s, isLeft=%d, "
434                 "errCode=%d.", ULL(commit.version), VEC_TO_STR(commit.commitId), isLeft, errCode);
435             DoRollBackAndFinish(inTask);
436             return errCode;
437         }
438     }
439 
440     while (!inTask.vacuumNeedRecords.empty()) {
441         if (isLeft) {
442             errCode = DealWithLeftBranchVacuumNeedRecord(inTask);
443         } else {
444             errCode = DealWithRightBranchVacuumNeedRecord(inTask);
445         }
446         if (errCode != E_OK) {
447             return errCode;
448         }
449     }
450 
451     // Every vacuumNeedRecords of this commit has been treated, mark this commit as vacuum done
452     errCode = StartTransactionIfNotYet(inTask);
453     if (errCode != E_OK) {
454         std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
455         FinishVaccumTask(inTask);
456         return errCode;
457     }
458     errCode = inTask.databaseHandle->MarkCommitAsVacuumDone(commit.commitId);
459     if (errCode != E_OK) {
460         LOGE("[Vacuum][DoDealCommit] MarkCommitAsVacuumDone fail, Version=%llu, CommitId=%s, isLeft=%d, errCode=%d.",
461             ULL(commit.version), VEC_TO_STR(commit.commitId), isLeft, errCode);
462         DoRollBackAndFinish(inTask);
463         return errCode;
464     }
465     // Pop out this commit
466     commitList.pop_front();
467     return E_OK;
468 }
469 
DoDeleteRecordOfLeftShadowOrRightVacuumNeed(VacuumTaskContext & inTask,std::list<MultiVerRecordInfo> & recordList)470 int MultiVerVacuum::DoDeleteRecordOfLeftShadowOrRightVacuumNeed(VacuumTaskContext &inTask,
471     std::list<MultiVerRecordInfo> &recordList)
472 {
473     int errCode = DoCommitAndQuitIfWaitStatusObserved(inTask);
474     if (errCode != E_OK) {
475         return errCode;
476     }
477     // No other thread will access handle, node and record field of a RUN_NING, PAUSE_WAIT, ABORT_WAIT status task
478     // So it is concurrency safe to access or change these field without protection of lockguard
479     const MultiVerRecordInfo &record = recordList.front();
480     LOGD("[Vacuum][DoDeleteRecord] Type=%u, Version=%llu, HashKey=%s.", static_cast<unsigned>(record.type),
481         ULL(record.version), VEC_TO_STR(record.hashKey));
482     errCode = StartTransactionIfNotYet(inTask);
483     if (errCode != E_OK) {
484         std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
485         FinishVaccumTask(inTask);
486         return errCode;
487     }
488     errCode = inTask.databaseHandle->DeleteRecordTotally(record.version, record.hashKey);
489     if (errCode != E_OK) {
490         LOGE("[Vacuum][DoDeleteRecord] DeleteRecordTotally fail, Type=%u, Version=%llu, HashKey=%s, errCode=%d.",
491             static_cast<unsigned>(record.type), ULL(record.version), VEC_TO_STR(record.hashKey), errCode);
492         DoRollBackAndFinish(inTask);
493         return errCode;
494     }
495     // Pop out this shadowRecord or vacuumNeedRecord
496     recordList.pop_front();
497     return E_OK;
498 }
499 
DoRollBackAndFinish(VacuumTaskContext & inTask)500 void MultiVerVacuum::DoRollBackAndFinish(VacuumTaskContext &inTask)
501 {
502     RollBackTransactionIfNeed(inTask);
503     std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
504     FinishVaccumTask(inTask);
505 }
506 
DoCommitAndQuitIfWaitStatusObserved(VacuumTaskContext & inTask)507 int MultiVerVacuum::DoCommitAndQuitIfWaitStatusObserved(VacuumTaskContext &inTask)
508 {
509     bool waitStatusObserved = false;
510     {
511         std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
512         if (inTask.status == VacuumTaskStatus::PAUSE_WAIT || inTask.status == VacuumTaskStatus::ABORT_WAIT) {
513             waitStatusObserved = true;
514         }
515     }
516     // Only this TaskThread will change a PAUSE_WAIT or ABORT_WAIT status to other status
517     // So here during the gap of miss-lockguard-protection, the status of this inTask will not change
518     if (waitStatusObserved) {
519         // CommitTransactionIfNeed may be an time cost operation, should not be called within the range of lockguard
520         int errCode = CommitTransactionIfNeed(inTask);
521         // Change status operation should be protected within the lockguard
522         std::lock_guard<std::mutex> vacuumTaskLockGuard(vacuumTaskMutex_);
523         if (errCode != E_OK) {
524             // If commit fail, just finish this task(commit fail auto rollback)
525             FinishVaccumTask(inTask);
526             return errCode;
527         }
528         if (inTask.status == VacuumTaskStatus::ABORT_WAIT) {
529             AbortVacuumTask(inTask);
530             return -E_TASK_BREAK_OFF;
531         }
532         // Nor commit fail, nor Abort_wait case, here is Pause_wait Case, just set status to Pause_done
533         inTask.status = VacuumTaskStatus::PAUSE_DONE;
534         return -E_TASK_BREAK_OFF;
535     }
536     return E_OK;
537 }
538 
StartTransactionIfNotYet(VacuumTaskContext & inTask)539 int MultiVerVacuum::StartTransactionIfNotYet(VacuumTaskContext &inTask)
540 {
541     if (!inTask.isTransactionStarted) {
542         int errCode = inTask.databaseHandle->StartTransactionForVacuum();
543         if (errCode != E_OK) {
544             LOGE("[Vacuum][StartTransact] StartTransactionForVacuum fail, errCode=%d.", errCode);
545             return errCode;
546         }
547         inTask.isTransactionStarted = true;
548     }
549     return E_OK;
550 }
551 
CommitTransactionIfNeed(VacuumTaskContext & inTask)552 int MultiVerVacuum::CommitTransactionIfNeed(VacuumTaskContext &inTask)
553 {
554     if (inTask.isTransactionStarted) {
555         // Whether CommitTransactionForVacuum fail or not, the transaction is ended.
556         inTask.isTransactionStarted = false;
557         int errCode = inTask.databaseHandle->CommitTransactionForVacuum();
558         if (errCode != E_OK) {
559             LOGE("[Vacuum][CommitTransact] CommitTransactionForVacuum fail, errCode=%d.", errCode);
560             return errCode;
561         }
562     }
563     return E_OK;
564 }
565 
RollBackTransactionIfNeed(VacuumTaskContext & inTask)566 void MultiVerVacuum::RollBackTransactionIfNeed(VacuumTaskContext &inTask)
567 {
568     if (inTask.isTransactionStarted) {
569         // Whether RollBackTransactionForVacuum fail or not, the transaction is ended.
570         inTask.isTransactionStarted = false;
571         int errCode = inTask.databaseHandle->RollBackTransactionForVacuum();
572         if (errCode != E_OK) {
573             LOGE("[Vacuum][RollBackTransact] RollBackTransactionForVacuum fail, errCode=%d.", errCode);
574         }
575     }
576 }
577 
FinishVaccumTask(VacuumTaskContext & inTask)578 void MultiVerVacuum::FinishVaccumTask(VacuumTaskContext &inTask)
579 {
580     inTask.status = VacuumTaskStatus::FINISH;
581     // It is OK to reset the autoRelaunchOnce. Since this is called when this task is RUN_NING status, all pause to
582     // this task will block and wait, and all continue to this task happens after we reset the autoRelaunchOnce
583     inTask.autoRelaunchOnce = false;
584     // Do not reset the databaseHandle while finish a task, because it will be reused after autoRelaunch
585     ResetNodeAndRecordContextInfo(inTask);
586 }
587 
RelaunchVacuumTask(VacuumTaskContext & inTask)588 void MultiVerVacuum::RelaunchVacuumTask(VacuumTaskContext &inTask)
589 {
590     inTask.status = VacuumTaskStatus::RUN_WAIT;
591     inTask.runWaitOrder = incRunWaitOrder_++; // Queue at the back
592     inTask.autoRelaunchOnce = false;
593     // Obviously can not reset the databaseHandle while relaunch a task
594     ResetNodeAndRecordContextInfo(inTask);
595 }
596 
AbortVacuumTask(VacuumTaskContext & inTask)597 void MultiVerVacuum::AbortVacuumTask(VacuumTaskContext &inTask)
598 {
599     inTask.status = VacuumTaskStatus::ABORT_DONE;
600     inTask.autoRelaunchOnce = false;
601     inTask.databaseHandle = nullptr; // reset handle in abort case
602     ResetNodeAndRecordContextInfo(inTask);
603 }
604 
ResetNodeAndRecordContextInfo(VacuumTaskContext & inTask)605 void MultiVerVacuum::ResetNodeAndRecordContextInfo(VacuumTaskContext &inTask)
606 {
607     inTask.leftBranchCommits.clear();
608     inTask.rightBranchCommits.clear();
609     inTask.vacuumNeedRecords.clear();
610     inTask.shadowRecords.clear();
611     inTask.isTransactionStarted = false;
612 }
613 
SearchVacuumTaskToExecute(std::string & outDbIdentifier)614 int MultiVerVacuum::SearchVacuumTaskToExecute(std::string &outDbIdentifier)
615 {
616     // Find a vacuum task with the smallest runWaitOrder among tasks that is in RUN_WAIT Status(Except In Error).
617     uint64_t minRunWaitOrder = UINT64_MAX;
618     for (auto &eachTask : dbMapVacuumTask_) {
619         LOGD("[Vacuum][Search] db=%s, status=%d, error=%d, relaunch=%d, immediate=%d, runWait=%llu, pauseCount=%llu.",
620             eachTask.first.c_str(), static_cast<int>(eachTask.second.status), eachTask.second.launchErrorHappen,
621             eachTask.second.autoRelaunchOnce, eachTask.second.immediatelyRelaunchable,
622             ULL(eachTask.second.runWaitOrder), ULL(eachTask.second.pauseNeedCount));
623         if (eachTask.second.status == VacuumTaskStatus::RUN_WAIT && !eachTask.second.launchErrorHappen) {
624             if (eachTask.second.runWaitOrder < minRunWaitOrder) {
625                 minRunWaitOrder = eachTask.second.runWaitOrder;
626                 outDbIdentifier = eachTask.first;
627             }
628         }
629     }
630     if (!outDbIdentifier.empty()) {
631         dbMapVacuumTask_[outDbIdentifier].status = VacuumTaskStatus::RUN_NING;
632         return E_OK;
633     } else {
634         return -E_NOT_FOUND;
635     }
636 }
637 
ActivateBackgroundVacuumTaskExecution()638 void MultiVerVacuum::ActivateBackgroundVacuumTaskExecution()
639 {
640     if (!isBackgroundVacuumTaskInExecution_) {
641         TaskAction backgroundTask = [this]() {
642             LOGI("[Vacuum][Activate] Begin Background Execution.");
643             VacuumTaskExecutor();
644             LOGI("[Vacuum][Activate] End Background Execution.");
645         };
646         int errCode = RuntimeContext::GetInstance()->ScheduleTask(backgroundTask);
647         if (errCode != E_OK) {
648             LOGE("[Vacuum][Activate] ScheduleTask failed, errCode = %d.", errCode);
649             return;
650         }
651         isBackgroundVacuumTaskInExecution_ = true;
652     }
653 }
654 
IncPauseNeedCount(VacuumTaskContext & inTask)655 void MultiVerVacuum::IncPauseNeedCount(VacuumTaskContext &inTask)
656 {
657     inTask.pauseNeedCount++;
658 }
659 
DecPauseNeedCount(VacuumTaskContext & inTask)660 void MultiVerVacuum::DecPauseNeedCount(VacuumTaskContext &inTask)
661 {
662     if (inTask.pauseNeedCount == 0) {
663         LOGE("[Vacuum][DecPause] PauseNeedCount Zero Before Decrease.");
664         return;
665     }
666     inTask.pauseNeedCount--;
667 }
668 
IsPauseNotNeed(VacuumTaskContext & inTask)669 bool MultiVerVacuum::IsPauseNotNeed(VacuumTaskContext &inTask)
670 {
671     return inTask.pauseNeedCount == 0;
672 }
673 } // namespace DistributedDB
674 #endif
675