1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "rd_single_ver_natural_store_connection.h"
16 #include "rd_single_ver_result_set.h"
17 
18 #include <algorithm>
19 
20 #include "db_common.h"
21 #include "db_constant.h"
22 #include "db_dfx_adapter.h"
23 #include "db_errno.h"
24 #include "get_query_info.h"
25 #include "kvdb_observer_handle.h"
26 #include "kvdb_pragma.h"
27 #include "log_print.h"
28 #include "store_types.h"
29 #include "sqlite_single_ver_storage_engine.h"
30 
31 namespace DistributedDB {
32 
RdSingleVerNaturalStoreConnection(RdSingleVerNaturalStore * kvDB)33 RdSingleVerNaturalStoreConnection::RdSingleVerNaturalStoreConnection(RdSingleVerNaturalStore *kvDB)
34     : SingleVerNaturalStoreConnection(kvDB), committedData_(nullptr), writeHandle_(nullptr)
35 {
36     LOGD("RdSingleVerNaturalStoreConnection Created");
37 }
38 
~RdSingleVerNaturalStoreConnection()39 RdSingleVerNaturalStoreConnection::~RdSingleVerNaturalStoreConnection()
40 {
41 }
42 
GetEntries(const IOption & option,const Key & keyPrefix,std::vector<Entry> & entries) const43 int RdSingleVerNaturalStoreConnection::GetEntries(const IOption &option, const Key &keyPrefix,
44     std::vector<Entry> &entries) const
45 {
46     if (option.dataType != IOption::SYNC_DATA) {
47         LOGE("[RdSingleVerStorageExecutor][GetEntries] IOption only support SYNC_DATA type.");
48         return -E_NOT_SUPPORT;
49     }
50     return GetEntriesInner(true, option, keyPrefix, entries);
51 }
52 
CheckOption(const IOption & option,SingleVerDataType & type)53 int RdSingleVerNaturalStoreConnection::CheckOption(const IOption &option, SingleVerDataType &type)
54 {
55     if (option.dataType == IOption::SYNC_DATA) {
56         type = SingleVerDataType::SYNC_TYPE;
57     } else {
58         LOGE("IOption only support SYNC_DATA type.");
59         return -E_NOT_SUPPORT;
60     }
61     return E_OK;
62 }
63 
GetEntriesInner(bool isGetValue,const IOption & option,const Key & keyPrefix,std::vector<Entry> & entries) const64 int RdSingleVerNaturalStoreConnection::GetEntriesInner(bool isGetValue, const IOption &option,
65     const Key &keyPrefix, std::vector<Entry> &entries) const
66 {
67     if (keyPrefix.size() > DBConstant::MAX_KEY_SIZE) {
68         return -E_INVALID_ARGS;
69     }
70 
71     SingleVerDataType type;
72     int errCode = CheckOption(option, type);
73     if (errCode != E_OK) {
74         return errCode;
75     }
76 
77     DBDfxAdapter::StartTracing();
78     {
79         std::lock_guard<std::mutex> lock(transactionMutex_);
80         if (writeHandle_ != nullptr) {
81             LOGD("[RdSingleVerNaturalStoreConnection] Transaction started already.");
82             errCode = writeHandle_->GetEntries(KV_SCAN_PREFIX, std::pair<Key, Key>(keyPrefix, {}), entries);
83             DBDfxAdapter::FinishTracing();
84             return errCode;
85         }
86     }
87     RdSingleVerStorageExecutor *handle = GetExecutor(false, errCode);
88     if (handle == nullptr) {
89         LOGE("[RdSingleVerNaturalStoreConnection]::[GetEntries] Get executor failed, errCode = [%d]", errCode);
90         DBDfxAdapter::FinishTracing();
91         return errCode;
92     }
93     errCode = handle->GetEntries(KV_SCAN_PREFIX, std::pair<Key, Key>(keyPrefix, {}), entries);
94     ReleaseExecutor(handle);
95     DBDfxAdapter::FinishTracing();
96     return errCode;
97 }
98 
GetResultSet(const IOption & option,const Key & keyPrefix,IKvDBResultSet * & resultSet) const99 int RdSingleVerNaturalStoreConnection::GetResultSet(const IOption &option, const Key &keyPrefix,
100     IKvDBResultSet *&resultSet) const
101 {
102     // maximum of result set size is 4
103     std::lock_guard<std::mutex> lock(kvDbResultSetsMutex_);
104     if (kvDbResultSets_.size() >= MAX_RESULTSET_SIZE) {
105         LOGE("Over max result set size");
106         return -E_MAX_LIMITS;
107     }
108 
109     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
110     if (naturalStore == nullptr) {
111         return -E_INVALID_DB;
112     }
113 
114     RdSingleVerResultSet *tmpResultSet = new (std::nothrow) RdSingleVerResultSet(naturalStore, keyPrefix);
115     if (tmpResultSet == nullptr) {
116         LOGE("Create single version result set failed.");
117         return -E_OUT_OF_MEMORY;
118     }
119     int errCode = tmpResultSet->Open(false);
120     if (errCode != E_OK) {
121         delete tmpResultSet;
122         resultSet = nullptr;
123         tmpResultSet = nullptr;
124         LOGE("Open result set failed.");
125         return errCode;
126     }
127     resultSet = (IKvDBResultSet *)tmpResultSet;
128     kvDbResultSets_.insert(resultSet);
129     return E_OK;
130 }
131 
PrintResultsetKeys(const QueryExpression & queryExpression)132 static void PrintResultsetKeys(const QueryExpression &queryExpression)
133 {
134     std::vector<uint8_t> beginKeyVec = queryExpression.GetBeginKey();
135     std::vector<uint8_t> endKeyVec = queryExpression.GetEndKey();
136     std::string beginKey;
137     std::string endKey;
138     if (beginKeyVec.size() == 0) {
139         beginKey = "NULL";
140     } else {
141         beginKey.assign(beginKeyVec.begin(), beginKeyVec.end());
142     }
143     if (endKeyVec.size() == 0) {
144         endKey = "NULL";
145     } else {
146         endKey.assign(endKeyVec.begin(), endKeyVec.end());
147     }
148 
149     LOGD("begin key: %s, end key: %s", beginKey.c_str(), endKey.c_str());
150 }
151 
GetResultSet(const IOption & option,const Query & query,IKvDBResultSet * & resultSet) const152 int RdSingleVerNaturalStoreConnection::GetResultSet(const IOption &option,
153     const Query &query, IKvDBResultSet *&resultSet) const
154 {
155     // maximum of result set size is 4
156     std::lock_guard<std::mutex> lock(kvDbResultSetsMutex_);
157     if (kvDbResultSets_.size() >= MAX_RESULTSET_SIZE) {
158         LOGE("Over max result set size");
159         return -E_MAX_LIMITS;
160     }
161 
162     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
163     if (naturalStore == nullptr) {
164         return -E_INVALID_DB;
165     }
166     QueryExpression queryExpression = GetQueryInfo::GetQueryExpression(query);
167     int errCode = GetQueryInfo::GetQueryExpression(query).RangeParamCheck();
168     if (errCode != E_OK) {
169         return errCode;
170     }
171     RdSingleVerResultSet *tmpResultSet = new (std::nothrow) RdSingleVerResultSet(naturalStore,
172         queryExpression.GetBeginKey(), queryExpression.GetEndKey(), KV_SCAN_RANGE);
173     if (tmpResultSet == nullptr) {
174         LOGE("Create single version result set failed.");
175         return -E_OUT_OF_MEMORY;
176     }
177     errCode = tmpResultSet->Open(false);
178     if (errCode != E_OK) {
179         delete tmpResultSet;
180         resultSet = nullptr;
181         tmpResultSet = nullptr;
182         LOGE("Open result set failed.");
183         return errCode;
184     }
185     resultSet = (IKvDBResultSet *)tmpResultSet;
186     kvDbResultSets_.insert(resultSet);
187     return E_OK;
188 }
189 
190 
ReleaseResultSet(IKvDBResultSet * & resultSet)191 void RdSingleVerNaturalStoreConnection::ReleaseResultSet(IKvDBResultSet *&resultSet)
192 {
193     if (resultSet == nullptr) {
194         return;
195     }
196     RdSingleVerResultSet *tmpResultSet = (RdSingleVerResultSet *)resultSet;
197     int errCode = tmpResultSet->Close();
198     if (errCode != E_OK) {
199         LOGE("Open result set failed.");
200         return;
201     }
202     std::lock_guard<std::mutex> lock(kvDbResultSetsMutex_);
203     kvDbResultSets_.erase(resultSet);
204     delete resultSet;
205     resultSet = nullptr;
206     return;
207 }
208 
Pragma(int cmd,void * parameter)209 int RdSingleVerNaturalStoreConnection::Pragma(int cmd, void *parameter)
210 {
211     switch (cmd) {
212         case PRAGMA_EXEC_CHECKPOINT:
213             return ForceCheckPoint();
214         default:
215             break;
216     }
217     LOGD("Rd Pragma only support check point for now:%d", cmd);
218     return -E_NOT_SUPPORT;
219 }
220 
TranslateObserverModeToEventTypes(unsigned mode,std::list<int> & eventTypes) const221 int RdSingleVerNaturalStoreConnection::TranslateObserverModeToEventTypes(
222     unsigned mode, std::list<int> &eventTypes) const
223 {
224     int errCode = E_OK;
225     switch (mode) {
226         case static_cast<unsigned>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT):
227             eventTypes.push_back(static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT));
228             break;
229         default:
230             errCode = -E_NOT_SUPPORT;
231             break;
232     }
233     return errCode;
234 }
235 
ForceCheckPoint() const236 int RdSingleVerNaturalStoreConnection::ForceCheckPoint() const
237 {
238     int errCode = E_OK;
239     RdSingleVerStorageExecutor *handle = GetExecutor(false, errCode);
240     if (handle == nullptr) {
241         LOGE("[Connection]::[GetEntries] Get executor failed, errCode = [%d]", errCode);
242         return errCode;
243     }
244 
245     errCode = handle->ForceCheckPoint();
246     ReleaseExecutor(handle);
247     return errCode;
248 }
249 
CommitAndReleaseNotifyData(SingleVerNaturalStoreCommitNotifyData * & committedData,bool isNeedCommit,int eventType)250 void RdSingleVerNaturalStoreConnection::CommitAndReleaseNotifyData(
251     SingleVerNaturalStoreCommitNotifyData *&committedData, bool isNeedCommit, int eventType)
252 {
253     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
254     if ((naturalStore != nullptr) && (committedData != nullptr)) {
255         if (isNeedCommit) {
256             if (!committedData->IsChangedDataEmpty()) {
257                 naturalStore->CommitNotify(eventType, committedData);
258             }
259         }
260     }
261     ReleaseCommitData(committedData);
262 }
263 
Get(const IOption & option,const Key & key,Value & value) const264 int RdSingleVerNaturalStoreConnection::Get(const IOption &option, const Key &key, Value &value) const
265 {
266     SingleVerDataType dataType;
267     int errCode = CheckOption(option, dataType);
268     if (errCode != E_OK) {
269         return errCode;
270     }
271 
272     if (key.size() > DBConstant::MAX_KEY_SIZE || key.empty()) {
273         return -E_INVALID_ARGS;
274     }
275 
276     errCode = CheckReadDataControlled();
277     if (errCode != E_OK) {
278         LOGE("[Get] Do not allowed to read data with errCode = [%d]!", errCode);
279         return errCode;
280     }
281 
282     DBDfxAdapter::StartTracing();
283     {
284         // need to check if the transaction started
285         std::lock_guard<std::mutex> lock(transactionMutex_);
286         if (writeHandle_ != nullptr) {
287             Timestamp recordTimestamp;
288             errCode = writeHandle_->GetKvData(dataType, key, value, recordTimestamp);
289             if (errCode != E_OK) {
290                 LOGE("[RdSingleVerStorageExecutor][Get] Cannot get the data %d", errCode);
291             }
292             DBDfxAdapter::FinishTracing();
293             return errCode;
294         }
295     }
296     RdSingleVerStorageExecutor *handle = GetExecutor(false, errCode);
297     if (handle == nullptr) {
298         DBDfxAdapter::FinishTracing();
299         return errCode;
300     }
301 
302     Timestamp timestamp;
303     errCode = handle->GetKvData(dataType, key, value, timestamp);
304     if (errCode != E_OK) {
305         LOGE("[RdSingleVerStorageExecutor][Get] Cannot get the data %d", errCode);
306     }
307     ReleaseExecutor(handle);
308     DBDfxAdapter::FinishTracing();
309     return errCode;
310 }
311 
312 // Clear all the data from the database
Clear(const IOption & option)313 int RdSingleVerNaturalStoreConnection::Clear(const IOption &option)
314 {
315     return -E_NOT_SUPPORT;
316 }
317 
GetEntriesInner(const IOption & option,const Query & query,std::vector<Entry> & entries) const318 int RdSingleVerNaturalStoreConnection::GetEntriesInner(const IOption &option, const Query &query,
319     std::vector<Entry> &entries) const
320 {
321     QueryExpression queryExpression = GetQueryInfo::GetQueryExpression(query);
322     int errCode = GetQueryInfo::GetQueryExpression(query).RangeParamCheck();
323     if (errCode != E_OK) {
324         return errCode;
325     }
326     DBDfxAdapter::StartTracing();
327     {
328         std::lock_guard<std::mutex> lock(transactionMutex_);
329         if (writeHandle_ != nullptr) {
330             LOGD("[RdSingleVerNaturalStoreConnection] Transaction started already.");
331             errCode = writeHandle_->GetEntries(KV_SCAN_RANGE, std::pair<Key, Key>(queryExpression.GetBeginKey(),
332                 queryExpression.GetEndKey()), entries);
333             DBDfxAdapter::FinishTracing();
334             return errCode;
335         }
336     }
337     RdSingleVerStorageExecutor *handle = GetExecutor(false, errCode);
338     if (handle == nullptr) {
339         LOGE("[RdSingleVerNaturalStoreConnection]::[GetEntries] Get executor failed, errCode = [%d]", errCode);
340         DBDfxAdapter::FinishTracing();
341         return errCode;
342     }
343 
344     errCode = handle->GetEntries(KV_SCAN_RANGE, std::pair<Key, Key>(queryExpression.GetBeginKey(),
345         queryExpression.GetEndKey()), entries);
346     ReleaseExecutor(handle);
347     DBDfxAdapter::FinishTracing();
348     if (errCode != -E_NOT_FOUND) {
349         PrintResultsetKeys(queryExpression);
350     }
351     return errCode;
352 }
GetEntries(const IOption & option,const Query & query,std::vector<Entry> & entries) const353 int RdSingleVerNaturalStoreConnection::GetEntries(const IOption &option, const Query &query,
354     std::vector<Entry> &entries) const
355 {
356     if (option.dataType != IOption::SYNC_DATA) {
357         LOGE("[RdSingleVerStorageExecutor][GetEntries]unsupported data type");
358         return -E_INVALID_ARGS;
359     }
360     return GetEntriesInner(option, query, entries);
361 }
362 
GetCount(const IOption & option,const Query & query,int & count) const363 int RdSingleVerNaturalStoreConnection::GetCount(const IOption &option, const Query &query, int &count) const
364 {
365     return -E_NOT_SUPPORT;
366 }
367 
GetSnapshot(IKvDBSnapshot * & snapshot) const368 int RdSingleVerNaturalStoreConnection::GetSnapshot(IKvDBSnapshot *&snapshot) const
369 {
370     return -E_NOT_SUPPORT;
371 }
372 
ReleaseSnapshot(IKvDBSnapshot * & snapshot)373 void RdSingleVerNaturalStoreConnection::ReleaseSnapshot(IKvDBSnapshot *&snapshot)
374 {
375     return;
376 }
377 
StartTransaction()378 int RdSingleVerNaturalStoreConnection::StartTransaction()
379 {
380     return -E_NOT_SUPPORT;
381 }
382 
Commit()383 int RdSingleVerNaturalStoreConnection::Commit()
384 {
385     return -E_NOT_SUPPORT;
386 }
387 
RollBack()388 int RdSingleVerNaturalStoreConnection::RollBack()
389 {
390     return -E_NOT_SUPPORT;
391 }
392 
IsTransactionStarted() const393 bool RdSingleVerNaturalStoreConnection::IsTransactionStarted() const
394 {
395     return false;
396 }
397 
Rekey(const CipherPassword & passwd)398 int RdSingleVerNaturalStoreConnection::Rekey(const CipherPassword &passwd)
399 {
400     return -E_NOT_SUPPORT;
401 }
402 
Export(const std::string & filePath,const CipherPassword & passwd)403 int RdSingleVerNaturalStoreConnection::Export(const std::string &filePath, const CipherPassword &passwd)
404 {
405     if (kvDB_ == nullptr) {
406         return -E_INVALID_DB;
407     }
408 
409     // not support passwd
410     if (passwd.GetSize() != 0) {
411         LOGE("[RdSingleVerNaturalStoreConnection][Export]unsupport passwd.");
412         return -E_NOT_SUPPORT;
413     }
414 
415     return kvDB_->Export(filePath, passwd);
416 }
417 
Import(const std::string & filePath,const CipherPassword & passwd)418 int RdSingleVerNaturalStoreConnection::Import(const std::string &filePath, const CipherPassword &passwd)
419 {
420     // not support passwd
421     if (passwd.GetSize() != 0) {
422         LOGE("[RdSingleVerNaturalStoreConnection][Export]unsupport passwd.");
423         return -E_NOT_SUPPORT;
424     }
425 
426     std::lock_guard<std::mutex> lock(importMutex_);
427     int errCode = CheckRdMonoStatus(OperatePerm::IMPORT_MONOPOLIZE_PERM);
428     if (errCode != E_OK) {
429         return errCode;
430     }
431     errCode = kvDB_->Import(filePath, passwd);
432     if ((errCode == -E_INVALID_PASSWD_OR_CORRUPTED_DB) || (errCode == -E_UNEXPECTED_DATA)) {
433         errCode = -E_INVALID_FILE; // import damaged file or txt file, return -E_INVALID_FILE
434     }
435     GenericKvDBConnection::ResetExclusiveStatus();
436     kvDB_->ReEnableConnection(OperatePerm::IMPORT_MONOPOLIZE_PERM);
437     if (errCode == E_OK) {
438         kvDB_->ResetSyncStatus();
439     }
440     return errCode;
441 }
442 
CheckRdMonoStatus(OperatePerm perm)443 int RdSingleVerNaturalStoreConnection::CheckRdMonoStatus(OperatePerm perm)
444 {
445     if (kvDB_ == nullptr) {
446         return -E_INVALID_DB;
447     }
448 
449     // check if result set closed
450     {
451         std::lock_guard<std::mutex> kvDbResultLock(kvDbResultSetsMutex_);
452         if (kvDbResultSets_.size() > 0) {
453             LOGE("Active result set exist.");
454             return -E_BUSY;
455         }
456     }
457     // 1. Get the connection number, and get the right to do the rekey operation.
458     int errCode = kvDB_->TryToDisableConnection(perm);
459     if (errCode != E_OK) {
460         // If precheck failed, it means that there are more than one connection.
461         // No need reset the condition for the scene.
462         LOGE("More than one connection");
463         return errCode;
464     }
465     // 2. Check the observer list.
466     errCode = GenericKvDBConnection::PreCheckExclusiveStatus();
467     if (errCode != E_OK) {
468         kvDB_->ReEnableConnection(perm);
469         LOGE("Observer prevents.");
470         return errCode;
471     }
472 
473     // 3. Check the conflict notifier.
474     {
475         GenericKvDBConnection::ResetExclusiveStatus();
476         kvDB_->ReEnableConnection(perm);
477         EnableManualSync();
478     }
479     return E_OK;
480 }
481 
RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier & notifier)482 int RdSingleVerNaturalStoreConnection::RegisterLifeCycleCallback(const DatabaseLifeCycleNotifier &notifier)
483 {
484     return -E_NOT_SUPPORT;
485 }
486 
487 // Called when Close and delete the connection.
PreClose()488 int RdSingleVerNaturalStoreConnection::PreClose()
489 {
490     // check if result set closed
491     std::lock_guard<std::mutex> lock(kvDbResultSetsMutex_);
492     if (kvDbResultSets_.size() > 0) {
493         LOGE("The rd connection have [%zu] active result set, can not close.", kvDbResultSets_.size());
494         return -E_BUSY;
495     }
496     // check if transaction closed
497     {
498         std::lock_guard<std::mutex> transactionLock(transactionMutex_);
499         if (writeHandle_ != nullptr) {
500             LOGW("Transaction started, rollback before close.");
501             int errCode = RollbackInner();
502             if (errCode != E_OK) {
503                 LOGE("cannot rollback %d.", errCode);
504             }
505             ReleaseExecutor(writeHandle_);
506         }
507     }
508     return E_OK;
509 }
510 
CheckIntegrity() const511 int RdSingleVerNaturalStoreConnection::CheckIntegrity() const
512 {
513     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
514     if (naturalStore == nullptr) {
515         LOGE("[SingleVerConnection] the store is null");
516         return -E_NOT_INIT;
517     }
518     return naturalStore->CheckIntegrity();
519 }
520 
GetKeys(const IOption & option,const Key & keyPrefix,std::vector<Key> & keys) const521 int RdSingleVerNaturalStoreConnection::GetKeys(const IOption &option, const Key &keyPrefix,
522     std::vector<Key> &keys) const
523 {
524     return -E_NOT_SUPPORT;
525 }
526 
UpdateKey(const UpdateKeyCallback & callback)527 int RdSingleVerNaturalStoreConnection::UpdateKey(const UpdateKeyCallback &callback)
528 {
529     return -E_NOT_SUPPORT;
530 }
531 
CheckSyncEntriesValid(const std::vector<Entry> & entries) const532 int RdSingleVerNaturalStoreConnection::CheckSyncEntriesValid(const std::vector<Entry> &entries) const
533 {
534     if (entries.size() > DBConstant::MAX_BATCH_SIZE) {
535         return -E_INVALID_ARGS;
536     }
537 
538     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
539     if (naturalStore == nullptr) {
540         return -E_INVALID_DB;
541     }
542 
543     for (const auto &entry : entries) {
544         int errCode = naturalStore->CheckDataStatus(entry.key, entry.value, false);
545         if (errCode != E_OK) {
546             return errCode;
547         }
548     }
549     return E_OK;
550 }
551 
PutBatchInner(const IOption & option,const std::vector<Entry> & entries)552 int RdSingleVerNaturalStoreConnection::PutBatchInner(const IOption &option, const std::vector<Entry> &entries)
553 {
554     LOGD("PutBatchInner");
555     std::lock_guard<std::mutex> lock(transactionMutex_);
556     bool isAuto = false;
557     int errCode = E_OK;
558     if (option.dataType != IOption::SYNC_DATA) {
559         LOGE("LOCAL_DATA TYPE NOT SUPPORT in RD executor");
560         return -E_NOT_SUPPORT;
561     }
562     if (writeHandle_ == nullptr) {
563         isAuto = true;
564         errCode = StartTransactionInner(TransactType::IMMEDIATE);
565         if (errCode != E_OK) {
566             return errCode;
567         }
568     }
569 
570     errCode = SaveSyncEntries(entries, false);
571 
572     if (isAuto) {
573         if (errCode == E_OK) {
574             errCode = CommitInner();
575         } else {
576             int innerCode = RollbackInner();
577             errCode = (innerCode != E_OK) ? innerCode : errCode;
578         }
579     }
580     return errCode;
581 }
582 
SaveSyncEntries(const std::vector<Entry> & entries,bool isDelete)583 int RdSingleVerNaturalStoreConnection::SaveSyncEntries(const std::vector<Entry> &entries, bool isDelete)
584 {
585     if (IsSinglePutOrDelete(entries)) {
586         return SaveEntry(entries[0], isDelete);
587     }
588     return writeHandle_->BatchSaveEntries(entries, isDelete, committedData_);
589 }
590 
591 // This function currently only be called in local procedure to change sync_data table, do not use in sync procedure.
592 // It will check and amend value when need if it is a schema database. return error if some value disagree with the
593 // schema. But in sync procedure, we just neglect the value that disagree with schema.
SaveEntry(const Entry & entry,bool isDelete,Timestamp timestamp)594 int RdSingleVerNaturalStoreConnection::SaveEntry(const Entry &entry, bool isDelete, Timestamp timestamp)
595 {
596     LOGD("Saving Entry");
597     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
598     if (naturalStore == nullptr) {
599         LOGE("[RdSingleVerNaturalStoreConnection][SaveEntry] the store is null");
600         return -E_INVALID_DB;
601     }
602 
603     if (IsExtendedCacheDBMode()) {
604         return -E_NOT_SUPPORT;
605     } else {
606         return SaveEntryNormally(entry, isDelete);
607     }
608 }
609 
SaveEntryNormally(const Entry & entry,bool isDelete)610 int RdSingleVerNaturalStoreConnection::SaveEntryNormally(const Entry &entry, bool isDelete)
611 {
612     int errCode = writeHandle_->SaveSyncDataItem(entry, committedData_, isDelete);
613     if (errCode != E_OK) {
614         LOGE("Save entry failed, err:%d", errCode);
615     }
616     return errCode;
617 }
618 
GetExecutor(bool isWrite,int & errCode) const619 RdSingleVerStorageExecutor *RdSingleVerNaturalStoreConnection::GetExecutor(bool isWrite, int &errCode) const
620 {
621     LOGD("[RdSingleVerNaturalStoreConnection] Getting Executor ");
622     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
623     if (naturalStore == nullptr) {
624         errCode = -E_NOT_INIT;
625         LOGE("[SingleVerConnection] the store is null");
626         return nullptr;
627     }
628     return naturalStore->GetHandle(isWrite, errCode);
629 }
630 
ReleaseExecutor(RdSingleVerStorageExecutor * & executor) const631 void RdSingleVerNaturalStoreConnection::ReleaseExecutor(RdSingleVerStorageExecutor *&executor) const
632 {
633     kvDB_->ReEnableConnection(OperatePerm::NORMAL_WRITE);
634     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
635     if (naturalStore != nullptr) {
636         naturalStore->ReleaseHandle(executor);
637     }
638 }
639 
ReleaseCommitData(SingleVerNaturalStoreCommitNotifyData * & committedData)640 void RdSingleVerNaturalStoreConnection::ReleaseCommitData(SingleVerNaturalStoreCommitNotifyData *&committedData)
641 {
642     if (committedData != nullptr) {
643         committedData->DecObjRef(committedData);
644         committedData = nullptr;
645     }
646 }
647 
StartTransactionInner(TransactType transType)648 int RdSingleVerNaturalStoreConnection::StartTransactionInner(TransactType transType)
649 {
650     if (IsExtendedCacheDBMode()) {
651         return -E_NOT_SUPPORT;
652     } else {
653         return StartTransactionNormally(transType);
654     }
655 }
656 
StartTransactionNormally(TransactType transType)657 int RdSingleVerNaturalStoreConnection::StartTransactionNormally(TransactType transType)
658 {
659     int errCode = E_OK;
660     RdSingleVerStorageExecutor *handle = GetExecutor(true, errCode);
661     if (handle == nullptr) {
662         return errCode;
663     }
664 
665     errCode = kvDB_->TryToDisableConnection(OperatePerm::NORMAL_WRITE);
666     if (errCode != E_OK) {
667         ReleaseExecutor(handle);
668         LOGE("Start transaction failed, %d", errCode);
669         return errCode;
670     }
671 
672     if (committedData_ == nullptr) {
673         committedData_ = new (std::nothrow) SingleVerNaturalStoreCommitNotifyData;
674         if (committedData_ == nullptr) {
675             ReleaseExecutor(handle);
676             return -E_OUT_OF_MEMORY;
677         }
678     }
679 
680     errCode = handle->StartTransaction(transType);
681     if (errCode != E_OK) {
682         ReleaseExecutor(handle);
683         ReleaseCommitData(committedData_);
684         return errCode;
685     }
686 
687     writeHandle_ = handle;
688     return E_OK;
689 }
690 
CommitInner()691 int RdSingleVerNaturalStoreConnection::CommitInner()
692 {
693     int errCode = writeHandle_->Commit();
694     ReleaseExecutor(writeHandle_);
695     writeHandle_ = nullptr;
696 
697     CommitAndReleaseNotifyData(committedData_, true,
698         static_cast<int>(SQLiteGeneralNSNotificationEventType::SQLITE_GENERAL_NS_PUT_EVENT));
699     return errCode;
700 }
701 
RollbackInner()702 int RdSingleVerNaturalStoreConnection::RollbackInner()
703 {
704     int errCode = writeHandle_->Rollback();
705     ReleaseCommitData(committedData_);
706     ReleaseExecutor(writeHandle_);
707     writeHandle_ = nullptr;
708     return errCode;
709 }
710 
CheckReadDataControlled() const711 int RdSingleVerNaturalStoreConnection::CheckReadDataControlled() const
712 {
713     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
714     if (naturalStore == nullptr) {
715         LOGE("[SingleVerConnection] natural store is nullptr in CheckReadDataControlled.");
716         return E_OK;
717     }
718     return naturalStore->CheckReadDataControlled();
719 }
720 
CheckSyncKeysValid(const std::vector<Key> & keys) const721 int RdSingleVerNaturalStoreConnection::CheckSyncKeysValid(const std::vector<Key> &keys) const
722 {
723     if (keys.size() > DBConstant::MAX_BATCH_SIZE) {
724         return -E_INVALID_ARGS;
725     }
726 
727     RdSingleVerNaturalStore *naturalStore = GetDB<RdSingleVerNaturalStore>();
728     if (naturalStore == nullptr) {
729         return -E_INVALID_DB;
730     }
731 
732     for (const auto &key : keys) {
733         int errCode = naturalStore->CheckDataStatus(key, {}, true);
734         if (errCode != E_OK) {
735             return errCode;
736         }
737     }
738     return E_OK;
739 }
740 
DeleteBatchInner(const IOption & option,const std::vector<Key> & keys)741 int RdSingleVerNaturalStoreConnection::DeleteBatchInner(const IOption &option, const std::vector<Key> &keys)
742 {
743     DBDfxAdapter::StartTracing();
744     bool isAuto = false;
745     int errCode = E_OK;
746     if (option.dataType != IOption::SYNC_DATA) {
747         LOGE("LOCAL_DATA TYPE NOT SUPPORT in RD executor");
748         DBDfxAdapter::FinishTracing();
749         return -E_NOT_SUPPORT;
750     }
751     std::lock_guard<std::mutex> lock(transactionMutex_);
752     if (writeHandle_ == nullptr) {
753         isAuto = true;
754         errCode = StartTransactionInner(TransactType::IMMEDIATE);
755         if (errCode != E_OK) {
756             DBDfxAdapter::FinishTracing();
757             return errCode;
758         }
759     }
760 
761     errCode = DeleteSyncEntries(keys);
762 
763     if (isAuto) {
764         if (errCode == E_OK) {
765             errCode = CommitInner();
766         } else {
767             int innerCode = RollbackInner();
768             errCode = (innerCode != E_OK) ? innerCode : errCode;
769         }
770     }
771     DBDfxAdapter::FinishTracing();
772     return errCode;
773 }
774 
DeleteSyncEntries(const std::vector<Key> & keys)775 int RdSingleVerNaturalStoreConnection::DeleteSyncEntries(const std::vector<Key> &keys)
776 {
777     std::vector<Entry> entries;
778     for (const auto &key : keys) {
779         Entry entry;
780         entry.key = std::move(key);
781         entries.emplace_back(std::move(entry));
782     }
783 
784     int errCode = SaveSyncEntries(entries, true);
785     if ((errCode != E_OK) && (errCode != -E_NOT_FOUND)) {
786         LOGE("[DeleteSyncEntries] Delete data err:%d", errCode);
787     }
788     return (errCode == -E_NOT_FOUND) ? E_OK : errCode;
789 }
790 
GetSyncDataSize(const std::string & device,size_t & size) const791 int RdSingleVerNaturalStoreConnection::GetSyncDataSize(const std::string &device, size_t &size) const
792 {
793     return -E_NOT_SUPPORT;
794 }
795 
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)796 int RdSingleVerNaturalStoreConnection::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
797 {
798     return -E_NOT_SUPPORT;
799 }
800 
SetCloudDB(const std::map<std::string,std::shared_ptr<ICloudDb>> & cloudDBs)801 int RdSingleVerNaturalStoreConnection::SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs)
802 {
803     return -E_NOT_SUPPORT;
804 }
805 
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)806 void RdSingleVerNaturalStoreConnection::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
807 {
808 }
809 }
810 // namespace DistributedDB
811