1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RELATIONAL_STORE
16 #include "relational_store_delegate_impl.h"
17 
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "cloud/cloud_db_constant.h"
21 #include "kv_store_errno.h"
22 #include "log_print.h"
23 #include "param_check_utils.h"
24 #include "relational_store_changed_data_impl.h"
25 #include "relational_store_instance.h"
26 #include "sync_operation.h"
27 
28 namespace DistributedDB {
RelationalStoreDelegateImpl(RelationalStoreConnection * conn,const std::string & path)29 RelationalStoreDelegateImpl::RelationalStoreDelegateImpl(RelationalStoreConnection *conn, const std::string &path)
30     : conn_(conn),
31       storePath_(path)
32 {}
33 
~RelationalStoreDelegateImpl()34 RelationalStoreDelegateImpl::~RelationalStoreDelegateImpl()
35 {
36     if (!releaseFlag_) {
37         LOGF("[RelationalStore Delegate] Can't release directly");
38         return;
39     }
40 
41     conn_ = nullptr;
42 }
43 
RemoveDeviceDataInner(const std::string & device,ClearMode mode)44 DBStatus RelationalStoreDelegateImpl::RemoveDeviceDataInner(const std::string &device, ClearMode mode)
45 {
46     if (mode >= BUTT || mode < 0) {
47         LOGE("Invalid mode for Remove device data, %d.", INVALID_ARGS);
48         return INVALID_ARGS;
49     }
50     if (mode == DEFAULT) {
51         return RemoveDeviceData(device, "");
52     }
53     if (conn_ == nullptr) {
54         LOGE("[RelationalStore Delegate] Invalid connection for operation!");
55         return DB_ERROR;
56     }
57 
58     int errCode = conn_->DoClean(mode);
59     if (errCode != E_OK) {
60         LOGE("[RelationalStore Delegate] remove device cloud data failed:%d", errCode);
61         return TransferDBErrno(errCode);
62     }
63     return OK;
64 }
65 
GetCloudSyncTaskCount()66 int32_t RelationalStoreDelegateImpl::GetCloudSyncTaskCount()
67 {
68     if (conn_ == nullptr) {
69         LOGE("[RelationalStore Delegate] Invalid connection for operation!");
70         return -1;
71     }
72     int32_t count = conn_->GetCloudSyncTaskCount();
73     if (count == -1) {
74         LOGE("[RelationalStore Delegate] Failed to get cloud sync task count.");
75     }
76     return count;
77 }
78 
CreateDistributedTableInner(const std::string & tableName,TableSyncType type)79 DBStatus RelationalStoreDelegateImpl::CreateDistributedTableInner(const std::string &tableName, TableSyncType type)
80 {
81     if (!ParamCheckUtils::CheckRelationalTableName(tableName)) {
82         LOGE("[RelationalStore Delegate] Invalid table name.");
83         return INVALID_ARGS;
84     }
85 
86     if (!(type == DEVICE_COOPERATION || type == CLOUD_COOPERATION)) {
87         LOGE("[RelationalStore Delegate] Invalid table sync type.");
88         return INVALID_ARGS;
89     }
90 
91     if (conn_ == nullptr) {
92         LOGE("[RelationalStore Delegate] Invalid connection for operation!");
93         return DB_ERROR;
94     }
95 
96     int errCode = conn_->CreateDistributedTable(tableName, type);
97     if (errCode != E_OK) {
98         LOGE("[RelationalStore Delegate] Create Distributed table failed:%d", errCode);
99         return TransferDBErrno(errCode);
100     }
101     return OK;
102 }
103 
Sync(const std::vector<std::string> & devices,SyncMode mode,const Query & query,const SyncStatusCallback & onComplete,bool wait)104 DBStatus RelationalStoreDelegateImpl::Sync(const std::vector<std::string> &devices, SyncMode mode,
105     const Query &query, const SyncStatusCallback &onComplete, bool wait)
106 {
107     if (conn_ == nullptr) {
108         LOGE("Invalid connection for operation!");
109         return DB_ERROR;
110     }
111 
112     if (mode > SYNC_MODE_PUSH_PULL) {
113         LOGE("not support other mode");
114         return NOT_SUPPORT;
115     }
116 
117     if (!DBCommon::CheckQueryWithoutMultiTable(query)) {
118         LOGE("not support query with tables");
119         return NOT_SUPPORT;
120     }
121     RelationalStoreConnection::SyncInfo syncInfo{devices, mode,
122         [this, onComplete](const std::map<std::string, std::vector<TableStatus>> &devicesStatus) {
123             OnSyncComplete(devicesStatus, onComplete);
124         }, query, wait};
125     int errCode = conn_->SyncToDevice(syncInfo);
126     if (errCode != E_OK) {
127         LOGW("[RelationalStore Delegate] sync data to device failed:%d", errCode);
128         return TransferDBErrno(errCode);
129     }
130     return OK;
131 }
132 
RemoveDeviceData(const std::string & device,const std::string & tableName)133 DBStatus RelationalStoreDelegateImpl::RemoveDeviceData(const std::string &device, const std::string &tableName)
134 {
135     if (conn_ == nullptr) {
136         LOGE("Invalid connection for operation!");
137         return DB_ERROR;
138     }
139 
140     if (device.empty() || device.length() > DBConstant::MAX_DEV_LENGTH ||
141         !ParamCheckUtils::CheckRelationalTableName(tableName)) {
142         LOGE("[RelationalStore Delegate] Remove device data with invalid device name or table name.");
143         return INVALID_ARGS;
144     }
145 
146     int errCode = conn_->RemoveDeviceData(device, tableName);
147     if (errCode != E_OK) {
148         LOGW("[RelationalStore Delegate] remove device data failed:%d", errCode);
149         return TransferDBErrno(errCode);
150     }
151     return OK;
152 }
153 
Close()154 DBStatus RelationalStoreDelegateImpl::Close()
155 {
156     if (conn_ == nullptr) {
157         return OK;
158     }
159 
160     int errCode = RelationalStoreInstance::ReleaseDataBaseConnection(conn_);
161     if (errCode == -E_BUSY) {
162         LOGW("[RelationalStore Delegate] busy for close");
163         return BUSY;
164     }
165     if (errCode != E_OK) {
166         LOGE("Release db connection error:%d", errCode);
167         return TransferDBErrno(errCode);
168     }
169 
170     LOGI("[RelationalStore Delegate] Close");
171     conn_ = nullptr;
172     return OK;
173 }
174 
SetReleaseFlag(bool flag)175 void RelationalStoreDelegateImpl::SetReleaseFlag(bool flag)
176 {
177     releaseFlag_ = flag;
178 }
179 
OnSyncComplete(const std::map<std::string,std::vector<TableStatus>> & devicesStatus,const SyncStatusCallback & onComplete)180 void RelationalStoreDelegateImpl::OnSyncComplete(const std::map<std::string, std::vector<TableStatus>> &devicesStatus,
181     const SyncStatusCallback &onComplete)
182 {
183     std::map<std::string, std::vector<TableStatus>> res;
184     for (const auto &[device, tablesStatus] : devicesStatus) {
185         for (const auto &tableStatus : tablesStatus) {
186             TableStatus table;
187             table.tableName = tableStatus.tableName;
188             table.status = SyncOperation::DBStatusTrans(tableStatus.status);
189             res[device].push_back(table);
190         }
191     }
192     if (onComplete) {
193         onComplete(res);
194     }
195 }
196 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,std::shared_ptr<ResultSet> & result)197 DBStatus RelationalStoreDelegateImpl::RemoteQuery(const std::string &device, const RemoteCondition &condition,
198     uint64_t timeout, std::shared_ptr<ResultSet> &result)
199 {
200     if (conn_ == nullptr) {
201         LOGE("Invalid connection for operation!");
202         return DB_ERROR;
203     }
204     int errCode = conn_->RemoteQuery(device, condition, timeout, result);
205     if (errCode != E_OK) {
206         LOGW("[RelationalStore Delegate] remote query failed:%d", errCode);
207         result = nullptr;
208         return TransferDBErrno(errCode);
209     }
210     return OK;
211 }
212 
RemoveDeviceData()213 DBStatus RelationalStoreDelegateImpl::RemoveDeviceData()
214 {
215     if (conn_ == nullptr) {
216         LOGE("Invalid connection for operation!");
217         return DB_ERROR;
218     }
219 
220     int errCode = conn_->RemoveDeviceData();
221     if (errCode != E_OK) {
222         LOGW("[RelationalStore Delegate] remove device data failed:%d", errCode);
223         return TransferDBErrno(errCode);
224     }
225     return OK;
226 }
227 
Sync(const std::vector<std::string> & devices,SyncMode mode,const Query & query,const SyncProcessCallback & onProcess,int64_t waitTime)228 DBStatus RelationalStoreDelegateImpl::Sync(const std::vector<std::string> &devices, SyncMode mode, const Query &query,
229     const SyncProcessCallback &onProcess, int64_t waitTime)
230 {
231     CloudSyncOption option;
232     option.devices = devices;
233     option.mode = mode;
234     option.query = query;
235     option.waitTime = waitTime;
236     return Sync(option, onProcess);
237 }
238 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDb)239 DBStatus RelationalStoreDelegateImpl::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb)
240 {
241     if (conn_ == nullptr || conn_->SetCloudDB(cloudDb) != E_OK) {
242         return DB_ERROR;
243     }
244     return OK;
245 }
246 
SetCloudDbSchema(const DataBaseSchema & schema)247 DBStatus RelationalStoreDelegateImpl::SetCloudDbSchema(const DataBaseSchema &schema)
248 {
249     DataBaseSchema cloudSchema = schema;
250     if (!ParamCheckUtils::CheckSharedTableName(cloudSchema)) {
251         LOGE("[RelationalStore Delegate] SharedTableName check failed!");
252         return INVALID_ARGS;
253     }
254     if (conn_ == nullptr) {
255         return DB_ERROR;
256     }
257     // create shared table and set cloud db schema
258     int errorCode = conn_->PrepareAndSetCloudDbSchema(cloudSchema);
259     if (errorCode != E_OK) {
260         LOGE("[RelationalStore Delegate] set cloud schema failed!");
261     }
262     return TransferDBErrno(errorCode);
263 }
264 
RegisterObserver(StoreObserver * observer)265 DBStatus RelationalStoreDelegateImpl::RegisterObserver(StoreObserver *observer)
266 {
267     if (observer == nullptr) {
268         return INVALID_ARGS;
269     }
270     if (conn_ == nullptr) {
271         return DB_ERROR;
272     }
273     std::string userId;
274     std::string appId;
275     std::string storeId;
276     int errCode = conn_->GetStoreInfo(userId, appId, storeId);
277     if (errCode != E_OK) {
278         return DB_ERROR;
279     }
280     errCode = conn_->RegisterObserverAction(observer, [observer, userId, appId, storeId](
281         const std::string &changedDevice, ChangedData &&changedData, bool isChangedData) {
282         if (isChangedData && observer != nullptr) {
283             observer->OnChange(Origin::ORIGIN_CLOUD, changedDevice, std::move(changedData));
284             LOGD("begin to observer on changed data");
285             return;
286         }
287         RelationalStoreChangedDataImpl data(changedDevice);
288         data.SetStoreProperty({userId, appId, storeId});
289         if (observer != nullptr) {
290             LOGD("begin to observer on changed, changedDevice=%s", STR_MASK(changedDevice));
291             observer->OnChange(data);
292         }
293     });
294     return TransferDBErrno(errCode);
295 }
296 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)297 DBStatus RelationalStoreDelegateImpl::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
298 {
299     if (conn_ == nullptr || conn_->SetIAssetLoader(loader) != E_OK) {
300         return DB_ERROR;
301     }
302     return OK;
303 }
304 
UnRegisterObserver()305 DBStatus RelationalStoreDelegateImpl::UnRegisterObserver()
306 {
307     if (conn_ == nullptr) {
308         return DB_ERROR;
309     }
310     // unregister all observer of this delegate
311     return TransferDBErrno(conn_->UnRegisterObserverAction(nullptr));
312 }
313 
UnRegisterObserver(StoreObserver * observer)314 DBStatus RelationalStoreDelegateImpl::UnRegisterObserver(StoreObserver *observer)
315 {
316     if (observer == nullptr) {
317         return INVALID_ARGS;
318     }
319     if (conn_ == nullptr) {
320         return DB_ERROR;
321     }
322     return TransferDBErrno(conn_->UnRegisterObserverAction(observer));
323 }
324 
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)325 DBStatus RelationalStoreDelegateImpl::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
326 {
327     uint64_t taskId = 0;
328     return Sync(option, onProcess, taskId);
329 }
330 
SetTrackerTable(const TrackerSchema & schema)331 DBStatus RelationalStoreDelegateImpl::SetTrackerTable(const TrackerSchema &schema)
332 {
333     if (conn_ == nullptr) {
334         LOGE("[RelationalStore Delegate] Invalid connection for operation!");
335         return DB_ERROR;
336     }
337     if (schema.tableName.empty()) {
338         LOGE("[RelationalStore Delegate] tracker table is empty.");
339         return INVALID_ARGS;
340     }
341     if (!ParamCheckUtils::CheckRelationalTableName(schema.tableName)) {
342         LOGE("[RelationalStore Delegate] Invalid tracker table name.");
343         return INVALID_ARGS;
344     }
345     int errCode = conn_->SetTrackerTable(schema);
346     if (errCode != E_OK) {
347         if (errCode == -E_WITH_INVENTORY_DATA) {
348             LOGI("[RelationalStore Delegate] create tracker table for the first time.");
349         } else {
350             LOGE("[RelationalStore Delegate] Set Subscribe table failed:%d", errCode);
351         }
352         return TransferDBErrno(errCode);
353     }
354     return OK;
355 }
356 
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)357 DBStatus RelationalStoreDelegateImpl::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
358 {
359     if (conn_ == nullptr) {
360         LOGE("[RelationalStore Delegate] Invalid connection for operation!");
361         return DB_ERROR;
362     }
363     int errCode = conn_->ExecuteSql(condition, records);
364     if (errCode != E_OK) {
365         LOGE("[RelationalStore Delegate] execute sql failed:%d", errCode);
366         return TransferDBErrno(errCode);
367     }
368     return OK;
369 }
370 
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty)371 DBStatus RelationalStoreDelegateImpl::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty)
372 {
373     if (conn_ == nullptr) {
374         LOGE("[RelationalStore SetReference] Invalid connection for operation!");
375         return DB_ERROR;
376     }
377     if (!ParamCheckUtils::CheckTableReference(tableReferenceProperty)) {
378         return INVALID_ARGS;
379     }
380     int errCode = conn_->SetReference(tableReferenceProperty);
381     if (errCode != E_OK) {
382         if (errCode != -E_TABLE_REFERENCE_CHANGED) {
383             LOGE("[RelationalStore] SetReference failed:%d", errCode);
384         } else {
385             LOGI("[RelationalStore] reference changed");
386         }
387         return TransferDBErrno(errCode);
388     }
389     LOGI("[RelationalStore Delegate] SetReference success");
390     return OK;
391 }
392 
CleanTrackerData(const std::string & tableName,int64_t cursor)393 DBStatus RelationalStoreDelegateImpl::CleanTrackerData(const std::string &tableName, int64_t cursor)
394 {
395     if (conn_ == nullptr) {
396         LOGE("[RelationalStore Delegate] Invalid connection for operation!");
397         return DB_ERROR;
398     }
399     int errCode = conn_->CleanTrackerData(tableName, cursor);
400     if (errCode != E_OK) {
401         LOGE("[RelationalStore Delegate] clean tracker data failed:%d", errCode);
402         return TransferDBErrno(errCode);
403     }
404     LOGI("[RelationalStore Delegate] CleanTrackerData success");
405     return OK;
406 }
407 
Pragma(PragmaCmd cmd,PragmaData & pragmaData)408 DBStatus RelationalStoreDelegateImpl::Pragma(PragmaCmd cmd, PragmaData &pragmaData)
409 {
410     if (cmd != PragmaCmd::LOGIC_DELETE_SYNC_DATA) {
411         return NOT_SUPPORT;
412     }
413     if (conn_ == nullptr) {
414         LOGE("[RelationalStore Delegate] Invalid connection for operation!");
415         return DB_ERROR;
416     }
417     int errCode = conn_->Pragma(cmd, pragmaData);
418     if (errCode != E_OK) {
419         LOGE("[RelationalStore Delegate] Pragma failed:%d", errCode);
420         return TransferDBErrno(errCode);
421     }
422     LOGI("[RelationalStore Delegate] Pragma success");
423     return OK;
424 }
425 
UpsertData(const std::string & tableName,const std::vector<VBucket> & records,RecordStatus status)426 DBStatus RelationalStoreDelegateImpl::UpsertData(const std::string &tableName, const std::vector<VBucket> &records,
427     RecordStatus status)
428 {
429     if (conn_ == nullptr) {
430         LOGE("[RelationalStore Delegate] Invalid connection for operation!");
431         return DB_ERROR;
432     }
433     int errCode = conn_->UpsertData(status, tableName, records);
434     if (errCode != E_OK) {
435         LOGE("[RelationalStore Delegate] Upsert data failed:%d", errCode);
436         return TransferDBErrno(errCode);
437     }
438     LOGI("[RelationalStore Delegate] Upsert data success");
439     return OK;
440 }
441 
SetCloudSyncConfig(const CloudSyncConfig & config)442 DBStatus RelationalStoreDelegateImpl::SetCloudSyncConfig(const CloudSyncConfig &config)
443 {
444     if (conn_ == nullptr) {
445         LOGE("[RelationalStore Delegate] Invalid connection for SetCloudSyncConfig!");
446         return DB_ERROR;
447     }
448     if (!DBCommon::CheckCloudSyncConfigValid(config)) {
449         return INVALID_ARGS;
450     }
451     int errCode = conn_->SetCloudSyncConfig(config);
452     if (errCode != E_OK) {
453         LOGE("[RelationalStore Delegate] SetCloudSyncConfig failed:%d", errCode);
454         return TransferDBErrno(errCode);
455     }
456     LOGI("[RelationalStore Delegate] SetCloudSyncConfig success");
457     return OK;
458 }
459 
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess,uint64_t taskId)460 DBStatus RelationalStoreDelegateImpl::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
461     uint64_t taskId)
462 {
463     if (conn_ == nullptr) {
464         LOGE("[RelationalStore Delegate] Invalid connection for sync!");
465         return DB_ERROR;
466     }
467     int errCode = conn_->Sync(option, onProcess, taskId);
468     if (errCode != E_OK) {
469         LOGE("[RelationalStore Delegate] Cloud sync failed:%d", errCode);
470         return TransferDBErrno(errCode);
471     }
472     return OK;
473 }
474 
GetCloudTaskStatus(uint64_t taskId)475 SyncProcess RelationalStoreDelegateImpl::GetCloudTaskStatus(uint64_t taskId)
476 {
477     SyncProcess syncProcess;
478     if (conn_ == nullptr) {
479         LOGE("[RelationalStore Delegate] Invalid connection for getting cloud task status!");
480         syncProcess.errCode = DB_ERROR;
481         return syncProcess;
482     }
483     return conn_->GetCloudTaskStatus(taskId);
484 }
485 } // namespace DistributedDB
486 #endif