1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "rd_single_ver_natural_store_connection.h"
16 #include "rd_single_ver_result_set.h"
17
18 #include <algorithm>
19
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_dfx_adapter.h"
23 #include "db_errno.h"
24 #include "get_query_info.h"
25 #include "kvdb_observer_handle.h"
26 #include "kvdb_pragma.h"
27 #include "log_print.h"
28 #include "store_types.h"
29 #include "sqlite_single_ver_storage_engine.h"
30
31 namespace DistributedDB {
32
RdSingleVerNaturalStoreConnection(RdSingleVerNaturalStore * kvDB)33 RdSingleVerNaturalStoreConnection::RdSingleVerNaturalStoreConnection(RdSingleVerNaturalStore *kvDB)
34 : SingleVerNaturalStoreConnection(kvDB), committedData_(nullptr), writeHandle_(nullptr)
35 {
36 LOGD("RdSingleVerNaturalStoreConnection Created");
37 }
38
~RdSingleVerNaturalStoreConnection()39 RdSingleVerNaturalStoreConnection::~RdSingleVerNaturalStoreConnection()
40 {
41 }
42
GetEntries(const IOption & option,const Key & keyPrefix,std::vector<Entry> & entries) const43 int RdSingleVerNaturalStoreConnection::GetEntries(const IOption &option, const Key &keyPrefix,
44 std::vector<Entry> &entries) const
45 {
46 if (option.dataType != IOption::SYNC_DATA) {
47 LOGE("[RdSingleVerStorageExecutor][GetEntries] IOption only support SYNC_DATA type.");
48 return -E_NOT_SUPPORT;
49 }
50 return GetEntriesInner(true, option, keyPrefix, entries);
51 }
52
CheckOption(const IOption & option,SingleVerDataType & type)53 int RdSingleVerNaturalStoreConnection::CheckOption(const IOption &option, SingleVerDataType &type)
54 {
55 if (option.dataType == IOption::SYNC_DATA) {
56 type = SingleVerDataType::SYNC_TYPE;
57 } else {
58 LOGE("IOption only support SYNC_DATA type.");
59 return -E_NOT_SUPPORT;
60 }
61 return E_OK;
62 }
63
GetEntriesInner(bool isGetValue,const IOption & option,const Key & keyPrefix,std::vector<Entry> & entries) const64 int RdSingleVerNaturalStoreConnection::GetEntriesInner(bool isGetValue, const IOption &option,
65 const Key &keyPrefix, std::vector<Entry> &entries) const
66 {
67 if (keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
68 return -E_INVALID_ARGS;
69 }
70
71 SingleVerDataType type;
72 int errCode = CheckOption(option, type);
73 if (errCode != E_OK) {
74 return errCode;
75 }
76
77 DBDfxAdapter::StartTracing();
78 {
79 std::lock_guard<std::mutex> lock(transactionMutex_);
80 if (writeHandle_ != nullptr) {
81 LOGD("[RdSingleVerNaturalStoreConnection] Transaction started already.");
82 errCode = writeHandle_->GetEntries(KV_SCAN_PREFIX, std::pair<Key, Key>(keyPrefix, {}), entries);
83 DBDfxAdapter::FinishTracing();
84 return errCode;
85 }
86 }
87 RdSingleVerStorageExecutor *handle = GetExecutor(false, errCode);
88 if (handle == nullptr) {
89 LOGE("[RdSingleVerNaturalStoreConnection]::[GetEntries] Get executor failed, errCode = [%d]", errCode);
90 DBDfxAdapter::FinishTracing();
91 return errCode;
92 }
93 errCode = handle->GetEntries(KV_SCAN_PREFIX, std::pair<Key, Key>(keyPrefix, {}), entries);
94 ReleaseExecutor(handle);
95 DBDfxAdapter::FinishTracing();
96 return errCode;
97 }
98
GetResultSet(const IOption & option,const Key & keyPrefix,IKvDBResultSet * & resultSet) const99 int RdSingleVerNaturalStoreConnection::GetResultSet(const IOption &option, const Key &keyPrefix,
100 IKvDBResultSet *&resultSet) const
101 {
102 // maximum of result set size is 4
103 std::lock_guard<std::mutex> lock(kvDbResultSetsMutex_);
104 if (kvDbResultSets_.size() >= MAX_RESULTSET_SIZE) {
105 LOGE("Over max result set size");
106 return -E_MAX_LIMITS;
107 }
108
109 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
110 if (naturalStore == nullptr) {
111 return -E_INVALID_DB;
112 }
113
114 RdSingleVerResultSet *tmpResultSet = new (std::nothrow) RdSingleVerResultSet(naturalStore, keyPrefix);
115 if (tmpResultSet == nullptr) {
116 LOGE("Create single version result set failed.");
117 return -E_OUT_OF_MEMORY;
118 }
119 int errCode = tmpResultSet->Open(false);
120 if (errCode != E_OK) {
121 delete tmpResultSet;
122 resultSet = nullptr;
123 tmpResultSet = nullptr;
124 LOGE("Open result set failed.");
125 return errCode;
126 }
127 resultSet = (IKvDBResultSet *)tmpResultSet;
128 kvDbResultSets_.insert(resultSet);
129 return E_OK;
130 }
131
PrintResultsetKeys(const QueryExpression & queryExpression)132 static void PrintResultsetKeys(const QueryExpression &queryExpression)
133 {
134 std::vector<uint8_t> beginKeyVec = queryExpression.GetBeginKey();
135 std::vector<uint8_t> endKeyVec = queryExpression.GetEndKey();
136 std::string beginKey;
137 std::string endKey;
138 if (beginKeyVec.size() == 0) {
139 beginKey = "NULL";
140 } else {
141 beginKey.assign(beginKeyVec.begin(), beginKeyVec.end());
142 }
143 if (endKeyVec.size() == 0) {
144 endKey = "NULL";
145 } else {
146 endKey.assign(endKeyVec.begin(), endKeyVec.end());
147 }
148
149 LOGD("begin key: %s, end key: %s", beginKey.c_str(), endKey.c_str());
150 }
151
GetResultSet(const IOption & option,const Query & query,IKvDBResultSet * & resultSet) const152 int RdSingleVerNaturalStoreConnection::GetResultSet(const IOption &option,
153 const Query &query, IKvDBResultSet *&resultSet) const
154 {
155 // maximum of result set size is 4
156 std::lock_guard<std::mutex> lock(kvDbResultSetsMutex_);
157 if (kvDbResultSets_.size() >= MAX_RESULTSET_SIZE) {
158 LOGE("Over max result set size");
159 return -E_MAX_LIMITS;
160 }
161
162 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
163 if (naturalStore == nullptr) {
164 return -E_INVALID_DB;
165 }
166 QueryExpression queryExpression = GetQueryInfo::GetQueryExpression(query);
167 int errCode = GetQueryInfo::GetQueryExpression(query).RangeParamCheck();
168 if (errCode != E_OK) {
169 return errCode;
170 }
171 RdSingleVerResultSet *tmpResultSet = new (std::nothrow) RdSingleVerResultSet(naturalStore,
172 queryExpression.GetBeginKey(), queryExpression.GetEndKey(), KV_SCAN_RANGE);
173 if (tmpResultSet == nullptr) {
174 LOGE("Create single version result set failed.");
175 return -E_OUT_OF_MEMORY;
176 }
177 errCode = tmpResultSet->Open(false);
178 if (errCode != E_OK) {
179 delete tmpResultSet;
180 resultSet = nullptr;
181 tmpResultSet = nullptr;
182 LOGE("Open result set failed.");
183 return errCode;
184 }
185 resultSet = (IKvDBResultSet *)tmpResultSet;
186 kvDbResultSets_.insert(resultSet);
187 return E_OK;
188 }
189
190
ReleaseResultSet(IKvDBResultSet * & resultSet)191 void RdSingleVerNaturalStoreConnection::ReleaseResultSet(IKvDBResultSet *&resultSet)
192 {
193 if (resultSet == nullptr) {
194 return;
195 }
196 RdSingleVerResultSet *tmpResultSet = (RdSingleVerResultSet *)resultSet;
197 int errCode = tmpResultSet->Close();
198 if (errCode != E_OK) {
199 LOGE("Open result set failed.");
200 return;
201 }
202 std::lock_guard<std::mutex> lock(kvDbResultSetsMutex_);
203 kvDbResultSets_.erase(resultSet);
204 delete resultSet;
205 resultSet = nullptr;
206 return;
207 }
208
Pragma(int cmd,void * parameter)209 int RdSingleVerNaturalStoreConnection::Pragma(int cmd, void *parameter)
210 {
211 switch (cmd) {
212 case PRAGMA_EXEC_CHECKPOINT:
213 return ForceCheckPoint();
214 default:
215 break;
216 }
217 LOGD("Rd Pragma only support check point for now:%d", cmd);
218 return -E_NOT_SUPPORT;
219 }
220
TranslateObserverModeToEventTypes(unsigned mode,std::list<int> & eventTypes) const221 int RdSingleVerNaturalStoreConnection::TranslateObserverModeToEventTypes(
222 unsigned mode, std::list<int> &eventTypes) const
223 {
224 int errCode = E_OK;
225 switch (mode) {
226 case static_cast<unsigned>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT):
227 eventTypes.push_back(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT));
228 break;
229 default:
230 errCode = -E_NOT_SUPPORT;
231 break;
232 }
233 return errCode;
234 }
235
ForceCheckPoint() const236 int RdSingleVerNaturalStoreConnection::ForceCheckPoint() const
237 {
238 int errCode = E_OK;
239 RdSingleVerStorageExecutor *handle = GetExecutor(false, errCode);
240 if (handle == nullptr) {
241 LOGE("[Connection]::[GetEntries] Get executor failed, errCode = [%d]", errCode);
242 return errCode;
243 }
244
245 errCode = handle->ForceCheckPoint();
246 ReleaseExecutor(handle);
247 return errCode;
248 }
249
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,bool isNeedCommit,int eventType)250 void RdSingleVerNaturalStoreConnection::CommitAndReleaseNotifyData(
251 SingleVerNaturalStoreCommitNotifyData *&committedData, bool isNeedCommit, int eventType)
252 {
253 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
254 if ((naturalStore != nullptr) && (committedData != nullptr)) {
255 if (isNeedCommit) {
256 if (!committedData->IsChangedDataEmpty()) {
257 naturalStore->CommitNotify(eventType, committedData);
258 }
259 }
260 }
261 ReleaseCommitData(committedData);
262 }
263
Get(const IOption & option,const Key & key,Value & value) const264 int RdSingleVerNaturalStoreConnection::Get(const IOption &option, const Key &key, Value &value) const
265 {
266 SingleVerDataType dataType;
267 int errCode = CheckOption(option, dataType);
268 if (errCode != E_OK) {
269 return errCode;
270 }
271
272 if (key.size() > DBConstant::MAX_KEY_SIZE || key.empty()) {
273 return -E_INVALID_ARGS;
274 }
275
276 errCode = CheckReadDataControlled();
277 if (errCode != E_OK) {
278 LOGE("[Get] Do not allowed to read data with errCode = [%d]!", errCode);
279 return errCode;
280 }
281
282 DBDfxAdapter::StartTracing();
283 {
284 // need to check if the transaction started
285 std::lock_guard<std::mutex> lock(transactionMutex_);
286 if (writeHandle_ != nullptr) {
287 Timestamp recordTimestamp;
288 errCode = writeHandle_->GetKvData(dataType, key, value, recordTimestamp);
289 if (errCode != E_OK) {
290 LOGE("[RdSingleVerStorageExecutor][Get] Cannot get the data %d", errCode);
291 }
292 DBDfxAdapter::FinishTracing();
293 return errCode;
294 }
295 }
296 RdSingleVerStorageExecutor *handle = GetExecutor(false, errCode);
297 if (handle == nullptr) {
298 DBDfxAdapter::FinishTracing();
299 return errCode;
300 }
301
302 Timestamp timestamp;
303 errCode = handle->GetKvData(dataType, key, value, timestamp);
304 if (errCode != E_OK) {
305 LOGE("[RdSingleVerStorageExecutor][Get] Cannot get the data %d", errCode);
306 }
307 ReleaseExecutor(handle);
308 DBDfxAdapter::FinishTracing();
309 return errCode;
310 }
311
312 // Clear all the data from the database
Clear(const IOption & option)313 int RdSingleVerNaturalStoreConnection::Clear(const IOption &option)
314 {
315 return -E_NOT_SUPPORT;
316 }
317
GetEntriesInner(const IOption & option,const Query & query,std::vector<Entry> & entries) const318 int RdSingleVerNaturalStoreConnection::GetEntriesInner(const IOption &option, const Query &query,
319 std::vector<Entry> &entries) const
320 {
321 QueryExpression queryExpression = GetQueryInfo::GetQueryExpression(query);
322 int errCode = GetQueryInfo::GetQueryExpression(query).RangeParamCheck();
323 if (errCode != E_OK) {
324 return errCode;
325 }
326 DBDfxAdapter::StartTracing();
327 {
328 std::lock_guard<std::mutex> lock(transactionMutex_);
329 if (writeHandle_ != nullptr) {
330 LOGD("[RdSingleVerNaturalStoreConnection] Transaction started already.");
331 errCode = writeHandle_->GetEntries(KV_SCAN_RANGE, std::pair<Key, Key>(queryExpression.GetBeginKey(),
332 queryExpression.GetEndKey()), entries);
333 DBDfxAdapter::FinishTracing();
334 return errCode;
335 }
336 }
337 RdSingleVerStorageExecutor *handle = GetExecutor(false, errCode);
338 if (handle == nullptr) {
339 LOGE("[RdSingleVerNaturalStoreConnection]::[GetEntries] Get executor failed, errCode = [%d]", errCode);
340 DBDfxAdapter::FinishTracing();
341 return errCode;
342 }
343
344 errCode = handle->GetEntries(KV_SCAN_RANGE, std::pair<Key, Key>(queryExpression.GetBeginKey(),
345 queryExpression.GetEndKey()), entries);
346 ReleaseExecutor(handle);
347 DBDfxAdapter::FinishTracing();
348 if (errCode != -E_NOT_FOUND) {
349 PrintResultsetKeys(queryExpression);
350 }
351 return errCode;
352 }
GetEntries(const IOption & option,const Query & query,std::vector<Entry> & entries) const353 int RdSingleVerNaturalStoreConnection::GetEntries(const IOption &option, const Query &query,
354 std::vector<Entry> &entries) const
355 {
356 if (option.dataType != IOption::SYNC_DATA) {
357 LOGE("[RdSingleVerStorageExecutor][GetEntries]unsupported data type");
358 return -E_INVALID_ARGS;
359 }
360 return GetEntriesInner(option, query, entries);
361 }
362
GetCount(const IOption & option,const Query & query,int & count) const363 int RdSingleVerNaturalStoreConnection::GetCount(const IOption &option, const Query &query, int &count) const
364 {
365 return -E_NOT_SUPPORT;
366 }
367
GetSnapshot(IKvDBSnapshot * & snapshot) const368 int RdSingleVerNaturalStoreConnection::GetSnapshot(IKvDBSnapshot *&snapshot) const
369 {
370 return -E_NOT_SUPPORT;
371 }
372
ReleaseSnapshot(IKvDBSnapshot * & snapshot)373 void RdSingleVerNaturalStoreConnection::ReleaseSnapshot(IKvDBSnapshot *&snapshot)
374 {
375 return;
376 }
377
StartTransaction()378 int RdSingleVerNaturalStoreConnection::StartTransaction()
379 {
380 return -E_NOT_SUPPORT;
381 }
382
Commit()383 int RdSingleVerNaturalStoreConnection::Commit()
384 {
385 return -E_NOT_SUPPORT;
386 }
387
RollBack()388 int RdSingleVerNaturalStoreConnection::RollBack()
389 {
390 return -E_NOT_SUPPORT;
391 }
392
IsTransactionStarted() const393 bool RdSingleVerNaturalStoreConnection::IsTransactionStarted() const
394 {
395 return false;
396 }
397
Rekey(const CipherPassword & passwd)398 int RdSingleVerNaturalStoreConnection::Rekey(const CipherPassword &passwd)
399 {
400 return -E_NOT_SUPPORT;
401 }
402
Export(const std::string & filePath,const CipherPassword & passwd)403 int RdSingleVerNaturalStoreConnection::Export(const std::string &filePath, const CipherPassword &passwd)
404 {
405 if (kvDB_ == nullptr) {
406 return -E_INVALID_DB;
407 }
408
409 // not support passwd
410 if (passwd.GetSize() != 0) {
411 LOGE("[RdSingleVerNaturalStoreConnection][Export]unsupport passwd.");
412 return -E_NOT_SUPPORT;
413 }
414
415 return kvDB_->Export(filePath, passwd);
416 }
417
Import(const std::string & filePath,const CipherPassword & passwd)418 int RdSingleVerNaturalStoreConnection::Import(const std::string &filePath, const CipherPassword &passwd)
419 {
420 // not support passwd
421 if (passwd.GetSize() != 0) {
422 LOGE("[RdSingleVerNaturalStoreConnection][Export]unsupport passwd.");
423 return -E_NOT_SUPPORT;
424 }
425
426 std::lock_guard<std::mutex> lock(importMutex_);
427 int errCode = CheckRdMonoStatus(OperatePerm::IMPORT_MONOPOLIZE_PERM);
428 if (errCode != E_OK) {
429 return errCode;
430 }
431 errCode = kvDB_->Import(filePath, passwd);
432 if ((errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) || (errCode == -E_UNEXPECTED_DATA)) {
433 errCode = -E_INVALID_FILE; // import damaged file or txt file, return -E_INVALID_FILE
434 }
435 GenericKvDBConnection::ResetExclusiveStatus();
436 kvDB_->ReEnableConnection(OperatePerm::IMPORT_MONOPOLIZE_PERM);
437 if (errCode == E_OK) {
438 kvDB_->ResetSyncStatus();
439 }
440 return errCode;
441 }
442
CheckRdMonoStatus(OperatePerm perm)443 int RdSingleVerNaturalStoreConnection::CheckRdMonoStatus(OperatePerm perm)
444 {
445 if (kvDB_ == nullptr) {
446 return -E_INVALID_DB;
447 }
448
449 // check if result set closed
450 {
451 std::lock_guard<std::mutex> kvDbResultLock(kvDbResultSetsMutex_);
452 if (kvDbResultSets_.size() > 0) {
453 LOGE("Active result set exist.");
454 return -E_BUSY;
455 }
456 }
457 // 1. Get the connection number, and get the right to do the rekey operation.
458 int errCode = kvDB_->TryToDisableConnection(perm);
459 if (errCode != E_OK) {
460 // If precheck failed, it means that there are more than one connection.
461 // No need reset the condition for the scene.
462 LOGE("More than one connection");
463 return errCode;
464 }
465 // 2. Check the observer list.
466 errCode = GenericKvDBConnection::PreCheckExclusiveStatus();
467 if (errCode != E_OK) {
468 kvDB_->ReEnableConnection(perm);
469 LOGE("Observer prevents.");
470 return errCode;
471 }
472
473 // 3. Check the conflict notifier.
474 {
475 GenericKvDBConnection::ResetExclusiveStatus();
476 kvDB_->ReEnableConnection(perm);
477 EnableManualSync();
478 }
479 return E_OK;
480 }
481
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)482 int RdSingleVerNaturalStoreConnection::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier ¬ifier)
483 {
484 return -E_NOT_SUPPORT;
485 }
486
487 // Called when Close and delete the connection.
PreClose()488 int RdSingleVerNaturalStoreConnection::PreClose()
489 {
490 // check if result set closed
491 std::lock_guard<std::mutex> lock(kvDbResultSetsMutex_);
492 if (kvDbResultSets_.size() > 0) {
493 LOGE("The rd connection have [%zu] active result set, can not close.", kvDbResultSets_.size());
494 return -E_BUSY;
495 }
496 // check if transaction closed
497 {
498 std::lock_guard<std::mutex> transactionLock(transactionMutex_);
499 if (writeHandle_ != nullptr) {
500 LOGW("Transaction started, rollback before close.");
501 int errCode = RollbackInner();
502 if (errCode != E_OK) {
503 LOGE("cannot rollback %d.", errCode);
504 }
505 ReleaseExecutor(writeHandle_);
506 }
507 }
508 return E_OK;
509 }
510
CheckIntegrity() const511 int RdSingleVerNaturalStoreConnection::CheckIntegrity() const
512 {
513 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
514 if (naturalStore == nullptr) {
515 LOGE("[SingleVerConnection] the store is null");
516 return -E_NOT_INIT;
517 }
518 return naturalStore->CheckIntegrity();
519 }
520
GetKeys(const IOption & option,const Key & keyPrefix,std::vector<Key> & keys) const521 int RdSingleVerNaturalStoreConnection::GetKeys(const IOption &option, const Key &keyPrefix,
522 std::vector<Key> &keys) const
523 {
524 return -E_NOT_SUPPORT;
525 }
526
UpdateKey(const UpdateKeyCallback & callback)527 int RdSingleVerNaturalStoreConnection::UpdateKey(const UpdateKeyCallback &callback)
528 {
529 return -E_NOT_SUPPORT;
530 }
531
CheckSyncEntriesValid(const std::vector<Entry> & entries) const532 int RdSingleVerNaturalStoreConnection::CheckSyncEntriesValid(const std::vector<Entry> &entries) const
533 {
534 if (entries.size() > DBConstant::MAX_BATCH_SIZE) {
535 return -E_INVALID_ARGS;
536 }
537
538 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
539 if (naturalStore == nullptr) {
540 return -E_INVALID_DB;
541 }
542
543 for (const auto &entry : entries) {
544 int errCode = naturalStore->CheckDataStatus(entry.key, entry.value, false);
545 if (errCode != E_OK) {
546 return errCode;
547 }
548 }
549 return E_OK;
550 }
551
PutBatchInner(const IOption & option,const std::vector<Entry> & entries)552 int RdSingleVerNaturalStoreConnection::PutBatchInner(const IOption &option, const std::vector<Entry> &entries)
553 {
554 LOGD("PutBatchInner");
555 std::lock_guard<std::mutex> lock(transactionMutex_);
556 bool isAuto = false;
557 int errCode = E_OK;
558 if (option.dataType != IOption::SYNC_DATA) {
559 LOGE("LOCAL_DATA TYPE NOT SUPPORT in RD executor");
560 return -E_NOT_SUPPORT;
561 }
562 if (writeHandle_ == nullptr) {
563 isAuto = true;
564 errCode = StartTransactionInner(TransactType::IMMEDIATE);
565 if (errCode != E_OK) {
566 return errCode;
567 }
568 }
569
570 errCode = SaveSyncEntries(entries, false);
571
572 if (isAuto) {
573 if (errCode == E_OK) {
574 errCode = CommitInner();
575 } else {
576 int innerCode = RollbackInner();
577 errCode = (innerCode != E_OK) ? innerCode : errCode;
578 }
579 }
580 return errCode;
581 }
582
SaveSyncEntries(const std::vector<Entry> & entries,bool isDelete)583 int RdSingleVerNaturalStoreConnection::SaveSyncEntries(const std::vector<Entry> &entries, bool isDelete)
584 {
585 if (IsSinglePutOrDelete(entries)) {
586 return SaveEntry(entries[0], isDelete);
587 }
588 return writeHandle_->BatchSaveEntries(entries, isDelete, committedData_);
589 }
590
591 // This function currently only be called in local procedure to change sync_data table, do not use in sync procedure.
592 // It will check and amend value when need if it is a schema database. return error if some value disagree with the
593 // schema. But in sync procedure, we just neglect the value that disagree with schema.
SaveEntry(const Entry & entry,bool isDelete,Timestamp timestamp)594 int RdSingleVerNaturalStoreConnection::SaveEntry(const Entry &entry, bool isDelete, Timestamp timestamp)
595 {
596 LOGD("Saving Entry");
597 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
598 if (naturalStore == nullptr) {
599 LOGE("[RdSingleVerNaturalStoreConnection][SaveEntry] the store is null");
600 return -E_INVALID_DB;
601 }
602
603 if (IsExtendedCacheDBMode()) {
604 return -E_NOT_SUPPORT;
605 } else {
606 return SaveEntryNormally(entry, isDelete);
607 }
608 }
609
SaveEntryNormally(const Entry & entry,bool isDelete)610 int RdSingleVerNaturalStoreConnection::SaveEntryNormally(const Entry &entry, bool isDelete)
611 {
612 int errCode = writeHandle_->SaveSyncDataItem(entry, committedData_, isDelete);
613 if (errCode != E_OK) {
614 LOGE("Save entry failed, err:%d", errCode);
615 }
616 return errCode;
617 }
618
GetExecutor(bool isWrite,int & errCode) const619 RdSingleVerStorageExecutor *RdSingleVerNaturalStoreConnection::GetExecutor(bool isWrite, int &errCode) const
620 {
621 LOGD("[RdSingleVerNaturalStoreConnection] Getting Executor ");
622 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
623 if (naturalStore == nullptr) {
624 errCode = -E_NOT_INIT;
625 LOGE("[SingleVerConnection] the store is null");
626 return nullptr;
627 }
628 return naturalStore->GetHandle(isWrite, errCode);
629 }
630
ReleaseExecutor(RdSingleVerStorageExecutor * & executor) const631 void RdSingleVerNaturalStoreConnection::ReleaseExecutor(RdSingleVerStorageExecutor *&executor) const
632 {
633 kvDB_->ReEnableConnection(OperatePerm::NORMAL_WRITE);
634 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
635 if (naturalStore != nullptr) {
636 naturalStore->ReleaseHandle(executor);
637 }
638 }
639
ReleaseCommitData(SingleVerNaturalStoreCommitNotifyData * & committedData)640 void RdSingleVerNaturalStoreConnection::ReleaseCommitData(SingleVerNaturalStoreCommitNotifyData *&committedData)
641 {
642 if (committedData != nullptr) {
643 committedData->DecObjRef(committedData);
644 committedData = nullptr;
645 }
646 }
647
StartTransactionInner(TransactType transType)648 int RdSingleVerNaturalStoreConnection::StartTransactionInner(TransactType transType)
649 {
650 if (IsExtendedCacheDBMode()) {
651 return -E_NOT_SUPPORT;
652 } else {
653 return StartTransactionNormally(transType);
654 }
655 }
656
StartTransactionNormally(TransactType transType)657 int RdSingleVerNaturalStoreConnection::StartTransactionNormally(TransactType transType)
658 {
659 int errCode = E_OK;
660 RdSingleVerStorageExecutor *handle = GetExecutor(true, errCode);
661 if (handle == nullptr) {
662 return errCode;
663 }
664
665 errCode = kvDB_->TryToDisableConnection(OperatePerm::NORMAL_WRITE);
666 if (errCode != E_OK) {
667 ReleaseExecutor(handle);
668 LOGE("Start transaction failed, %d", errCode);
669 return errCode;
670 }
671
672 if (committedData_ == nullptr) {
673 committedData_ = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData;
674 if (committedData_ == nullptr) {
675 ReleaseExecutor(handle);
676 return -E_OUT_OF_MEMORY;
677 }
678 }
679
680 errCode = handle->StartTransaction(transType);
681 if (errCode != E_OK) {
682 ReleaseExecutor(handle);
683 ReleaseCommitData(committedData_);
684 return errCode;
685 }
686
687 writeHandle_ = handle;
688 return E_OK;
689 }
690
CommitInner()691 int RdSingleVerNaturalStoreConnection::CommitInner()
692 {
693 int errCode = writeHandle_->Commit();
694 ReleaseExecutor(writeHandle_);
695 writeHandle_ = nullptr;
696
697 CommitAndReleaseNotifyData(committedData_, true,
698 static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT));
699 return errCode;
700 }
701
RollbackInner()702 int RdSingleVerNaturalStoreConnection::RollbackInner()
703 {
704 int errCode = writeHandle_->Rollback();
705 ReleaseCommitData(committedData_);
706 ReleaseExecutor(writeHandle_);
707 writeHandle_ = nullptr;
708 return errCode;
709 }
710
CheckReadDataControlled() const711 int RdSingleVerNaturalStoreConnection::CheckReadDataControlled() const
712 {
713 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
714 if (naturalStore == nullptr) {
715 LOGE("[SingleVerConnection] natural store is nullptr in CheckReadDataControlled.");
716 return E_OK;
717 }
718 return naturalStore->CheckReadDataControlled();
719 }
720
CheckSyncKeysValid(const std::vector<Key> & keys) const721 int RdSingleVerNaturalStoreConnection::CheckSyncKeysValid(const std::vector<Key> &keys) const
722 {
723 if (keys.size() > DBConstant::MAX_BATCH_SIZE) {
724 return -E_INVALID_ARGS;
725 }
726
727 RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
728 if (naturalStore == nullptr) {
729 return -E_INVALID_DB;
730 }
731
732 for (const auto &key : keys) {
733 int errCode = naturalStore->CheckDataStatus(key, {}, true);
734 if (errCode != E_OK) {
735 return errCode;
736 }
737 }
738 return E_OK;
739 }
740
DeleteBatchInner(const IOption & option,const std::vector<Key> & keys)741 int RdSingleVerNaturalStoreConnection::DeleteBatchInner(const IOption &option, const std::vector<Key> &keys)
742 {
743 DBDfxAdapter::StartTracing();
744 bool isAuto = false;
745 int errCode = E_OK;
746 if (option.dataType != IOption::SYNC_DATA) {
747 LOGE("LOCAL_DATA TYPE NOT SUPPORT in RD executor");
748 DBDfxAdapter::FinishTracing();
749 return -E_NOT_SUPPORT;
750 }
751 std::lock_guard<std::mutex> lock(transactionMutex_);
752 if (writeHandle_ == nullptr) {
753 isAuto = true;
754 errCode = StartTransactionInner(TransactType::IMMEDIATE);
755 if (errCode != E_OK) {
756 DBDfxAdapter::FinishTracing();
757 return errCode;
758 }
759 }
760
761 errCode = DeleteSyncEntries(keys);
762
763 if (isAuto) {
764 if (errCode == E_OK) {
765 errCode = CommitInner();
766 } else {
767 int innerCode = RollbackInner();
768 errCode = (innerCode != E_OK) ? innerCode : errCode;
769 }
770 }
771 DBDfxAdapter::FinishTracing();
772 return errCode;
773 }
774
DeleteSyncEntries(const std::vector<Key> & keys)775 int RdSingleVerNaturalStoreConnection::DeleteSyncEntries(const std::vector<Key> &keys)
776 {
777 std::vector<Entry> entries;
778 for (const auto &key : keys) {
779 Entry entry;
780 entry.key = std::move(key);
781 entries.emplace_back(std::move(entry));
782 }
783
784 int errCode = SaveSyncEntries(entries, true);
785 if ((errCode != E_OK) && (errCode != -E_NOT_FOUND)) {
786 LOGE("[DeleteSyncEntries] Delete data err:%d", errCode);
787 }
788 return (errCode == -E_NOT_FOUND) ? E_OK : errCode;
789 }
790
GetSyncDataSize(const std::string & device,size_t & size) const791 int RdSingleVerNaturalStoreConnection::GetSyncDataSize(const std::string &device, size_t &size) const
792 {
793 return -E_NOT_SUPPORT;
794 }
795
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)796 int RdSingleVerNaturalStoreConnection::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
797 {
798 return -E_NOT_SUPPORT;
799 }
800
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)801 int RdSingleVerNaturalStoreConnection::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
802 {
803 return -E_NOT_SUPPORT;
804 }
805
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)806 void RdSingleVerNaturalStoreConnection::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
807 {
808 }
809 }
810 // namespace DistributedDB
811