1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_vacuum_executor_impl.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "multi_ver_storage_executor.h"
21 
22 namespace DistributedDB {
23 namespace {
24     const uint64_t DEL_FLAG = 0x02; // Del type record flag in OperFlag
25     const uint64_t CLEAR_FLAG = 0x03; // Clear type record flag in OperFlag
26     const uint64_t MASK_FLAG = 0x07; // mask.
27 }
28 
MultiVerVacuumExecutorImpl(MultiVerNaturalStore * multiKvDB)29 MultiVerVacuumExecutorImpl::MultiVerVacuumExecutorImpl(MultiVerNaturalStore *multiKvDB)
30     : multiKvDB_(multiKvDB), writeHandle_(nullptr)
31 {
32 }
33 
~MultiVerVacuumExecutorImpl()34 MultiVerVacuumExecutorImpl::~MultiVerVacuumExecutorImpl()
35 {
36     // In abnormal case that transaction not commit or rollback
37     if (multiKvDB_ != nullptr && writeHandle_ != nullptr) {
38         multiKvDB_->ReleaseHandle(writeHandle_, true);
39     }
40 }
41 
42 // Call this always beyond transaction
GetVacuumAbleCommits(std::list<MultiVerCommitInfo> & leftBranchCommits,std::list<MultiVerCommitInfo> & rightBranchCommits) const43 int MultiVerVacuumExecutorImpl::GetVacuumAbleCommits(std::list<MultiVerCommitInfo> &leftBranchCommits,
44     std::list<MultiVerCommitInfo> &rightBranchCommits) const
45 {
46     if (multiKvDB_ == nullptr) {
47         return -E_INVALID_DB;
48     }
49     if (writeHandle_ != nullptr) {
50         LOGE("[VacuumExec][GetCommit] Mis-Called Within Transaction");
51         return -E_NOT_PERMIT;
52     }
53 
54     // It will return at least zero, it's ok. return at most UINT64_MAX to means that all left commit are vacuumable.
55     uint64_t maxVersionOfVacuumAbleLeftCommit = multiKvDB_->GetMaxTrimmableVersion();
56 
57     int errCode = E_OK;
58     MultiVerStorageExecutor *readHandle = multiKvDB_->GetHandle(false, errCode, true);
59     if (errCode != E_OK || readHandle == nullptr) {
60         LOGE("[VacuumExec][GetCommit] GetHandle fail, errCode=%d", errCode);
61         return errCode;
62     }
63 
64     std::list<MultiVerCommitNode> commitsInTree;
65     errCode = readHandle->GetAllCommitsInTree(commitsInTree);
66     if (errCode != E_OK) {
67         LOGE("[VacuumExec][GetCommit] GetAllCommitsInTree fail, errCode=%d", errCode);
68         multiKvDB_->ReleaseHandle(readHandle, true);
69         return errCode;
70     }
71 
72     // As discussed and agreed, the commit in commitsInTree had already be sorted in descending order by version
73     for (auto &eachCommit : commitsInTree) {
74         if (eachCommit.isLocal) {
75             if (eachCommit.version > maxVersionOfVacuumAbleLeftCommit) {
76                 continue;
77             }
78             leftBranchCommits.emplace_back(MultiVerCommitInfo{eachCommit.version, eachCommit.commitId});
79         } else {
80             rightBranchCommits.emplace_back(MultiVerCommitInfo{eachCommit.version, eachCommit.commitId});
81         }
82     }
83 
84     multiKvDB_->ReleaseHandle(readHandle, true);
85     return E_OK;
86 }
87 
88 // Call this within or beyond transaction
GetVacuumNeedRecordsByVersion(uint64_t version,std::list<MultiVerRecordInfo> & vacuumNeedRecords)89 int MultiVerVacuumExecutorImpl::GetVacuumNeedRecordsByVersion(uint64_t version,
90     std::list<MultiVerRecordInfo> &vacuumNeedRecords)
91 {
92     if (multiKvDB_ == nullptr) {
93         return -E_INVALID_DB;
94     }
95     MultiVerStorageExecutor *handle = GetCorrectHandleForUse();
96     if (handle == nullptr) {
97         return -E_NO_RESOURCE_FOR_USE;
98     }
99 
100     std::list<MultiVerTrimedVersionData> recordsInCommit;
101     int errCode = handle->GetEntriesByVersion(version, recordsInCommit);
102     if (errCode != E_OK) {
103         LOGE("[VacuumExec][GetVacuumNeed] GetEntriesByVersion fail, errCode=%d", errCode);
104         ReleaseHandleIfNeed(handle);
105         return errCode;
106     }
107 
108     for (auto &eachRecord : recordsInCommit) {
109         vacuumNeedRecords.emplace_back(MultiVerRecordInfo{GetRecordType(eachRecord), eachRecord.version,
110             eachRecord.key});
111     }
112 
113     ReleaseHandleIfNeed(handle);
114     return E_OK;
115 }
116 
117 // Call this within or beyond transaction
GetShadowRecordsOfClearTypeRecord(uint64_t version,const std::vector<uint8_t> & hashKey,std::list<MultiVerRecordInfo> & shadowRecords)118 int MultiVerVacuumExecutorImpl::GetShadowRecordsOfClearTypeRecord(uint64_t version,
119     const std::vector<uint8_t> &hashKey, std::list<MultiVerRecordInfo> &shadowRecords)
120 {
121     if (multiKvDB_ == nullptr) {
122         return -E_INVALID_DB;
123     }
124     MultiVerStorageExecutor *handle = GetCorrectHandleForUse();
125     if (handle == nullptr) {
126         return -E_NO_RESOURCE_FOR_USE;
127     }
128 
129     std::list<MultiVerTrimedVersionData> clearShadowRecords;
130     int errCode = handle->GetOverwrittenClearTypeEntries(version, clearShadowRecords);
131     if (errCode != E_OK) {
132         LOGE("[VacuumExec][GetShadowClear] GetOverwrittenClearTypeEntries:%zu fail, err=%d", hashKey.size(), errCode);
133         ReleaseHandleIfNeed(handle);
134         return errCode;
135     }
136 
137     for (auto &eachRecord : clearShadowRecords) {
138         shadowRecords.emplace_back(MultiVerRecordInfo{GetRecordType(eachRecord), eachRecord.version, eachRecord.key});
139     }
140 
141     ReleaseHandleIfNeed(handle);
142     return E_OK;
143 }
144 
145 // Call this within or beyond transaction
GetShadowRecordsOfNonClearTypeRecord(uint64_t version,const std::vector<uint8_t> & hashKey,std::list<MultiVerRecordInfo> & shadowRecords)146 int MultiVerVacuumExecutorImpl::GetShadowRecordsOfNonClearTypeRecord(uint64_t version,
147     const std::vector<uint8_t> &hashKey, std::list<MultiVerRecordInfo> &shadowRecords)
148 {
149     if (multiKvDB_ == nullptr) {
150         return -E_INVALID_DB;
151     }
152     MultiVerStorageExecutor *handle = GetCorrectHandleForUse();
153     if (handle == nullptr) {
154         return -E_NO_RESOURCE_FOR_USE;
155     }
156 
157     std::list<MultiVerTrimedVersionData> nonClearShadowRecords;
158     int errCode = handle->GetOverwrittenNonClearTypeEntries(version, hashKey, nonClearShadowRecords);
159     if (errCode != E_OK) {
160         LOGE("[VacuumExec][GetShadowNonClear] GetOverwrittenNonClearTypeEntries fail, errCode=%d", errCode);
161         ReleaseHandleIfNeed(handle);
162         return errCode;
163     }
164 
165     for (auto &eachRecord : nonClearShadowRecords) {
166         shadowRecords.emplace_back(MultiVerRecordInfo{GetRecordType(eachRecord), eachRecord.version, eachRecord.key});
167     }
168 
169     ReleaseHandleIfNeed(handle);
170     return E_OK;
171 }
172 
173 // Call this before change the database
StartTransactionForVacuum()174 int MultiVerVacuumExecutorImpl::StartTransactionForVacuum()
175 {
176     if (multiKvDB_ == nullptr) {
177         return -E_INVALID_DB;
178     }
179     if (writeHandle_ != nullptr) {
180         LOGE("[VacuumExec][Start] Transaction Already Started.");
181         return -E_NOT_PERMIT;
182     }
183 
184     int errCode = E_OK;
185     writeHandle_ = multiKvDB_->GetHandle(true, errCode, true);
186     if (errCode != E_OK || writeHandle_ == nullptr) {
187         LOGE("[VacuumExec][Start] GetHandle fail, errCode=%d", errCode);
188         return errCode;
189     }
190 
191     errCode = writeHandle_->StartTransaction(MultiTransactionType::ALL_DATA);
192     if (errCode != E_OK) {
193         LOGE("[VacuumExec][Start] StartTransaction fail, errCode=%d", errCode);
194         multiKvDB_->ReleaseHandle(writeHandle_, true);
195         writeHandle_ = nullptr;
196         return errCode;
197     }
198     return E_OK;
199 }
200 
201 // Call this if nothing error happened, if this itself failed, do not need to call rollback
CommitTransactionForVacuum()202 int MultiVerVacuumExecutorImpl::CommitTransactionForVacuum()
203 {
204     if (multiKvDB_ == nullptr) {
205         return -E_INVALID_DB;
206     }
207     if (writeHandle_ == nullptr) {
208         LOGE("[VacuumExec][Commit] Transaction Had Not Been Started.");
209         return -E_NOT_PERMIT;
210     }
211 
212     int errCode = writeHandle_->CommitTransaction(MultiTransactionType::ALL_DATA);
213     if (errCode != E_OK) {
214         // Commit fail do not need to call rollback which is automatically
215         LOGE("[VacuumExec][Commit] CommitTransaction fail, errCode=%d", errCode);
216     }
217     multiKvDB_->ReleaseHandle(writeHandle_, true);
218     writeHandle_ = nullptr;
219     return errCode;
220 }
221 
222 // Call this if anything wrong happened after start transaction except commit fail
RollBackTransactionForVacuum()223 int MultiVerVacuumExecutorImpl::RollBackTransactionForVacuum()
224 {
225     if (multiKvDB_ == nullptr) {
226         return -E_INVALID_DB;
227     }
228     if (writeHandle_ == nullptr) {
229         LOGE("[VacuumExec][RollBack] Transaction Had Not Been Started.");
230         return -E_NOT_PERMIT;
231     }
232 
233     int errCode = writeHandle_->RollBackTransaction(MultiTransactionType::ALL_DATA);
234     if (errCode != E_OK) {
235         LOGE("[VacuumExec][RollBack] RollBackTransaction fail, errCode=%d", errCode);
236     }
237     multiKvDB_->ReleaseHandle(writeHandle_, true);
238     writeHandle_ = nullptr;
239     return errCode;
240 }
241 
242 // Call this always within transaction
DeleteRecordTotally(uint64_t version,const std::vector<uint8_t> & hashKey)243 int MultiVerVacuumExecutorImpl::DeleteRecordTotally(uint64_t version, const std::vector<uint8_t> &hashKey)
244 {
245     if (multiKvDB_ == nullptr) {
246         return -E_INVALID_DB;
247     }
248     if (writeHandle_ == nullptr) {
249         LOGE("[VacuumExec][Delete] Transaction Had Not Been Started.");
250         return -E_NOT_PERMIT;
251     }
252 
253     int errCode = writeHandle_->DeleteEntriesByHashKey(version, hashKey);
254     if (errCode != E_OK) {
255         LOGE("[VacuumExec][Delete] DeleteEntriesByHashKey fail, errCode=%d", errCode);
256     }
257     return errCode;
258 }
259 
260 // Call this always within transaction
MarkRecordAsVacuumDone(uint64_t version,const std::vector<uint8_t> & hashKey)261 int MultiVerVacuumExecutorImpl::MarkRecordAsVacuumDone(uint64_t version, const std::vector<uint8_t> &hashKey)
262 {
263     if (multiKvDB_ == nullptr) {
264         return -E_INVALID_DB;
265     }
266     if (writeHandle_ == nullptr) {
267         LOGE("[VacuumExec][MarkRecord] Transaction Had Not Been Started.");
268         return -E_NOT_PERMIT;
269     }
270 
271     int errCode = writeHandle_->UpdateTrimedFlag(version, hashKey);
272     if (errCode != E_OK) {
273         LOGE("[VacuumExec][MarkRecord] UpdateTrimedFlag fail, errCode=%d", errCode);
274     }
275     return errCode;
276 }
277 
278 // Call this always within transaction
MarkCommitAsVacuumDone(const std::vector<uint8_t> & commitId)279 int MultiVerVacuumExecutorImpl::MarkCommitAsVacuumDone(const std::vector<uint8_t> &commitId)
280 {
281     if (multiKvDB_ == nullptr) {
282         return -E_INVALID_DB;
283     }
284     if (writeHandle_ == nullptr) {
285         LOGE("[VacuumExec][MarkCommit] Transaction Had Not Been Started.");
286         return -E_NOT_PERMIT;
287     }
288 
289     int errCode = writeHandle_->UpdateTrimedFlag(commitId);
290     if (errCode != E_OK) {
291         LOGE("[VacuumExec][MarkCommit] UpdateTrimedFlag fail, errCode=%d", errCode);
292     }
293     return errCode;
294 }
295 
GetCorrectHandleForUse() const296 MultiVerStorageExecutor *MultiVerVacuumExecutorImpl::GetCorrectHandleForUse() const
297 {
298     if (writeHandle_ != nullptr) {
299         return writeHandle_;
300     }
301     int errCode = E_OK;
302     MultiVerStorageExecutor *handle = multiKvDB_->GetHandle(false, errCode, true);
303     if (errCode != E_OK || handle == nullptr) {
304         LOGE("[VacuumExec][GetHandle] GetHandle fail, errCode=%d", errCode);
305         return nullptr;
306     }
307     return handle;
308 }
309 
ReleaseHandleIfNeed(MultiVerStorageExecutor * inHandle)310 void MultiVerVacuumExecutorImpl::ReleaseHandleIfNeed(MultiVerStorageExecutor *inHandle)
311 {
312     if (inHandle != writeHandle_) {
313         multiKvDB_->ReleaseHandle(inHandle, true);
314     }
315 }
316 
GetRecordType(const MultiVerTrimedVersionData & inRecord) const317 RecordType MultiVerVacuumExecutorImpl::GetRecordType(const MultiVerTrimedVersionData &inRecord) const
318 {
319     if ((inRecord.operFlag & MASK_FLAG) == CLEAR_FLAG) {
320         return RecordType::CLEAR;
321     } else if ((inRecord.operFlag & MASK_FLAG) == DEL_FLAG) {
322         return RecordType::DELETE;
323     } else {
324         return RecordType::VALID;
325     }
326 }
327 } // namespace DistributedDB
328 #endif