1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OMIT_MULTI_VER
17 #include "multi_ver_vacuum_executor_impl.h"
18 #include "db_errno.h"
19 #include "log_print.h"
20 #include "multi_ver_storage_executor.h"
21
22 namespace DistributedDB {
23 namespace {
24 const uint64_t DEL_FLAG = 0x02; // Del type record flag in OperFlag
25 const uint64_t CLEAR_FLAG = 0x03; // Clear type record flag in OperFlag
26 const uint64_t MASK_FLAG = 0x07; // mask.
27 }
28
MultiVerVacuumExecutorImpl(MultiVerNaturalStore * multiKvDB)29 MultiVerVacuumExecutorImpl::MultiVerVacuumExecutorImpl(MultiVerNaturalStore *multiKvDB)
30 : multiKvDB_(multiKvDB), writeHandle_(nullptr)
31 {
32 }
33
~MultiVerVacuumExecutorImpl()34 MultiVerVacuumExecutorImpl::~MultiVerVacuumExecutorImpl()
35 {
36 // In abnormal case that transaction not commit or rollback
37 if (multiKvDB_ != nullptr && writeHandle_ != nullptr) {
38 multiKvDB_->ReleaseHandle(writeHandle_, true);
39 }
40 }
41
42 // Call this always beyond transaction
GetVacuumAbleCommits(std::list<MultiVerCommitInfo> & leftBranchCommits,std::list<MultiVerCommitInfo> & rightBranchCommits) const43 int MultiVerVacuumExecutorImpl::GetVacuumAbleCommits(std::list<MultiVerCommitInfo> &leftBranchCommits,
44 std::list<MultiVerCommitInfo> &rightBranchCommits) const
45 {
46 if (multiKvDB_ == nullptr) {
47 return -E_INVALID_DB;
48 }
49 if (writeHandle_ != nullptr) {
50 LOGE("[VacuumExec][GetCommit] Mis-Called Within Transaction");
51 return -E_NOT_PERMIT;
52 }
53
54 // It will return at least zero, it's ok. return at most UINT64_MAX to means that all left commit are vacuumable.
55 uint64_t maxVersionOfVacuumAbleLeftCommit = multiKvDB_->GetMaxTrimmableVersion();
56
57 int errCode = E_OK;
58 MultiVerStorageExecutor *readHandle = multiKvDB_->GetHandle(false, errCode, true);
59 if (errCode != E_OK || readHandle == nullptr) {
60 LOGE("[VacuumExec][GetCommit] GetHandle fail, errCode=%d", errCode);
61 return errCode;
62 }
63
64 std::list<MultiVerCommitNode> commitsInTree;
65 errCode = readHandle->GetAllCommitsInTree(commitsInTree);
66 if (errCode != E_OK) {
67 LOGE("[VacuumExec][GetCommit] GetAllCommitsInTree fail, errCode=%d", errCode);
68 multiKvDB_->ReleaseHandle(readHandle, true);
69 return errCode;
70 }
71
72 // As discussed and agreed, the commit in commitsInTree had already be sorted in descending order by version
73 for (auto &eachCommit : commitsInTree) {
74 if (eachCommit.isLocal) {
75 if (eachCommit.version > maxVersionOfVacuumAbleLeftCommit) {
76 continue;
77 }
78 leftBranchCommits.emplace_back(MultiVerCommitInfo{eachCommit.version, eachCommit.commitId});
79 } else {
80 rightBranchCommits.emplace_back(MultiVerCommitInfo{eachCommit.version, eachCommit.commitId});
81 }
82 }
83
84 multiKvDB_->ReleaseHandle(readHandle, true);
85 return E_OK;
86 }
87
88 // Call this within or beyond transaction
GetVacuumNeedRecordsByVersion(uint64_t version,std::list<MultiVerRecordInfo> & vacuumNeedRecords)89 int MultiVerVacuumExecutorImpl::GetVacuumNeedRecordsByVersion(uint64_t version,
90 std::list<MultiVerRecordInfo> &vacuumNeedRecords)
91 {
92 if (multiKvDB_ == nullptr) {
93 return -E_INVALID_DB;
94 }
95 MultiVerStorageExecutor *handle = GetCorrectHandleForUse();
96 if (handle == nullptr) {
97 return -E_NO_RESOURCE_FOR_USE;
98 }
99
100 std::list<MultiVerTrimedVersionData> recordsInCommit;
101 int errCode = handle->GetEntriesByVersion(version, recordsInCommit);
102 if (errCode != E_OK) {
103 LOGE("[VacuumExec][GetVacuumNeed] GetEntriesByVersion fail, errCode=%d", errCode);
104 ReleaseHandleIfNeed(handle);
105 return errCode;
106 }
107
108 for (auto &eachRecord : recordsInCommit) {
109 vacuumNeedRecords.emplace_back(MultiVerRecordInfo{GetRecordType(eachRecord), eachRecord.version,
110 eachRecord.key});
111 }
112
113 ReleaseHandleIfNeed(handle);
114 return E_OK;
115 }
116
117 // Call this within or beyond transaction
GetShadowRecordsOfClearTypeRecord(uint64_t version,const std::vector<uint8_t> & hashKey,std::list<MultiVerRecordInfo> & shadowRecords)118 int MultiVerVacuumExecutorImpl::GetShadowRecordsOfClearTypeRecord(uint64_t version,
119 const std::vector<uint8_t> &hashKey, std::list<MultiVerRecordInfo> &shadowRecords)
120 {
121 if (multiKvDB_ == nullptr) {
122 return -E_INVALID_DB;
123 }
124 MultiVerStorageExecutor *handle = GetCorrectHandleForUse();
125 if (handle == nullptr) {
126 return -E_NO_RESOURCE_FOR_USE;
127 }
128
129 std::list<MultiVerTrimedVersionData> clearShadowRecords;
130 int errCode = handle->GetOverwrittenClearTypeEntries(version, clearShadowRecords);
131 if (errCode != E_OK) {
132 LOGE("[VacuumExec][GetShadowClear] GetOverwrittenClearTypeEntries:%zu fail, err=%d", hashKey.size(), errCode);
133 ReleaseHandleIfNeed(handle);
134 return errCode;
135 }
136
137 for (auto &eachRecord : clearShadowRecords) {
138 shadowRecords.emplace_back(MultiVerRecordInfo{GetRecordType(eachRecord), eachRecord.version, eachRecord.key});
139 }
140
141 ReleaseHandleIfNeed(handle);
142 return E_OK;
143 }
144
145 // Call this within or beyond transaction
GetShadowRecordsOfNonClearTypeRecord(uint64_t version,const std::vector<uint8_t> & hashKey,std::list<MultiVerRecordInfo> & shadowRecords)146 int MultiVerVacuumExecutorImpl::GetShadowRecordsOfNonClearTypeRecord(uint64_t version,
147 const std::vector<uint8_t> &hashKey, std::list<MultiVerRecordInfo> &shadowRecords)
148 {
149 if (multiKvDB_ == nullptr) {
150 return -E_INVALID_DB;
151 }
152 MultiVerStorageExecutor *handle = GetCorrectHandleForUse();
153 if (handle == nullptr) {
154 return -E_NO_RESOURCE_FOR_USE;
155 }
156
157 std::list<MultiVerTrimedVersionData> nonClearShadowRecords;
158 int errCode = handle->GetOverwrittenNonClearTypeEntries(version, hashKey, nonClearShadowRecords);
159 if (errCode != E_OK) {
160 LOGE("[VacuumExec][GetShadowNonClear] GetOverwrittenNonClearTypeEntries fail, errCode=%d", errCode);
161 ReleaseHandleIfNeed(handle);
162 return errCode;
163 }
164
165 for (auto &eachRecord : nonClearShadowRecords) {
166 shadowRecords.emplace_back(MultiVerRecordInfo{GetRecordType(eachRecord), eachRecord.version, eachRecord.key});
167 }
168
169 ReleaseHandleIfNeed(handle);
170 return E_OK;
171 }
172
173 // Call this before change the database
StartTransactionForVacuum()174 int MultiVerVacuumExecutorImpl::StartTransactionForVacuum()
175 {
176 if (multiKvDB_ == nullptr) {
177 return -E_INVALID_DB;
178 }
179 if (writeHandle_ != nullptr) {
180 LOGE("[VacuumExec][Start] Transaction Already Started.");
181 return -E_NOT_PERMIT;
182 }
183
184 int errCode = E_OK;
185 writeHandle_ = multiKvDB_->GetHandle(true, errCode, true);
186 if (errCode != E_OK || writeHandle_ == nullptr) {
187 LOGE("[VacuumExec][Start] GetHandle fail, errCode=%d", errCode);
188 return errCode;
189 }
190
191 errCode = writeHandle_->StartTransaction(MultiTransactionType::ALL_DATA);
192 if (errCode != E_OK) {
193 LOGE("[VacuumExec][Start] StartTransaction fail, errCode=%d", errCode);
194 multiKvDB_->ReleaseHandle(writeHandle_, true);
195 writeHandle_ = nullptr;
196 return errCode;
197 }
198 return E_OK;
199 }
200
201 // Call this if nothing error happened, if this itself failed, do not need to call rollback
CommitTransactionForVacuum()202 int MultiVerVacuumExecutorImpl::CommitTransactionForVacuum()
203 {
204 if (multiKvDB_ == nullptr) {
205 return -E_INVALID_DB;
206 }
207 if (writeHandle_ == nullptr) {
208 LOGE("[VacuumExec][Commit] Transaction Had Not Been Started.");
209 return -E_NOT_PERMIT;
210 }
211
212 int errCode = writeHandle_->CommitTransaction(MultiTransactionType::ALL_DATA);
213 if (errCode != E_OK) {
214 // Commit fail do not need to call rollback which is automatically
215 LOGE("[VacuumExec][Commit] CommitTransaction fail, errCode=%d", errCode);
216 }
217 multiKvDB_->ReleaseHandle(writeHandle_, true);
218 writeHandle_ = nullptr;
219 return errCode;
220 }
221
222 // Call this if anything wrong happened after start transaction except commit fail
RollBackTransactionForVacuum()223 int MultiVerVacuumExecutorImpl::RollBackTransactionForVacuum()
224 {
225 if (multiKvDB_ == nullptr) {
226 return -E_INVALID_DB;
227 }
228 if (writeHandle_ == nullptr) {
229 LOGE("[VacuumExec][RollBack] Transaction Had Not Been Started.");
230 return -E_NOT_PERMIT;
231 }
232
233 int errCode = writeHandle_->RollBackTransaction(MultiTransactionType::ALL_DATA);
234 if (errCode != E_OK) {
235 LOGE("[VacuumExec][RollBack] RollBackTransaction fail, errCode=%d", errCode);
236 }
237 multiKvDB_->ReleaseHandle(writeHandle_, true);
238 writeHandle_ = nullptr;
239 return errCode;
240 }
241
242 // Call this always within transaction
DeleteRecordTotally(uint64_t version,const std::vector<uint8_t> & hashKey)243 int MultiVerVacuumExecutorImpl::DeleteRecordTotally(uint64_t version, const std::vector<uint8_t> &hashKey)
244 {
245 if (multiKvDB_ == nullptr) {
246 return -E_INVALID_DB;
247 }
248 if (writeHandle_ == nullptr) {
249 LOGE("[VacuumExec][Delete] Transaction Had Not Been Started.");
250 return -E_NOT_PERMIT;
251 }
252
253 int errCode = writeHandle_->DeleteEntriesByHashKey(version, hashKey);
254 if (errCode != E_OK) {
255 LOGE("[VacuumExec][Delete] DeleteEntriesByHashKey fail, errCode=%d", errCode);
256 }
257 return errCode;
258 }
259
260 // Call this always within transaction
MarkRecordAsVacuumDone(uint64_t version,const std::vector<uint8_t> & hashKey)261 int MultiVerVacuumExecutorImpl::MarkRecordAsVacuumDone(uint64_t version, const std::vector<uint8_t> &hashKey)
262 {
263 if (multiKvDB_ == nullptr) {
264 return -E_INVALID_DB;
265 }
266 if (writeHandle_ == nullptr) {
267 LOGE("[VacuumExec][MarkRecord] Transaction Had Not Been Started.");
268 return -E_NOT_PERMIT;
269 }
270
271 int errCode = writeHandle_->UpdateTrimedFlag(version, hashKey);
272 if (errCode != E_OK) {
273 LOGE("[VacuumExec][MarkRecord] UpdateTrimedFlag fail, errCode=%d", errCode);
274 }
275 return errCode;
276 }
277
278 // Call this always within transaction
MarkCommitAsVacuumDone(const std::vector<uint8_t> & commitId)279 int MultiVerVacuumExecutorImpl::MarkCommitAsVacuumDone(const std::vector<uint8_t> &commitId)
280 {
281 if (multiKvDB_ == nullptr) {
282 return -E_INVALID_DB;
283 }
284 if (writeHandle_ == nullptr) {
285 LOGE("[VacuumExec][MarkCommit] Transaction Had Not Been Started.");
286 return -E_NOT_PERMIT;
287 }
288
289 int errCode = writeHandle_->UpdateTrimedFlag(commitId);
290 if (errCode != E_OK) {
291 LOGE("[VacuumExec][MarkCommit] UpdateTrimedFlag fail, errCode=%d", errCode);
292 }
293 return errCode;
294 }
295
GetCorrectHandleForUse() const296 MultiVerStorageExecutor *MultiVerVacuumExecutorImpl::GetCorrectHandleForUse() const
297 {
298 if (writeHandle_ != nullptr) {
299 return writeHandle_;
300 }
301 int errCode = E_OK;
302 MultiVerStorageExecutor *handle = multiKvDB_->GetHandle(false, errCode, true);
303 if (errCode != E_OK || handle == nullptr) {
304 LOGE("[VacuumExec][GetHandle] GetHandle fail, errCode=%d", errCode);
305 return nullptr;
306 }
307 return handle;
308 }
309
ReleaseHandleIfNeed(MultiVerStorageExecutor * inHandle)310 void MultiVerVacuumExecutorImpl::ReleaseHandleIfNeed(MultiVerStorageExecutor *inHandle)
311 {
312 if (inHandle != writeHandle_) {
313 multiKvDB_->ReleaseHandle(inHandle, true);
314 }
315 }
316
GetRecordType(const MultiVerTrimedVersionData & inRecord) const317 RecordType MultiVerVacuumExecutorImpl::GetRecordType(const MultiVerTrimedVersionData &inRecord) const
318 {
319 if ((inRecord.operFlag & MASK_FLAG) == CLEAR_FLAG) {
320 return RecordType::CLEAR;
321 } else if ((inRecord.operFlag & MASK_FLAG) == DEL_FLAG) {
322 return RecordType::DELETE;
323 } else {
324 return RecordType::VALID;
325 }
326 }
327 } // namespace DistributedDB
328 #endif