1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RELATIONAL_STORE
16 #include "relational_store_delegate_impl.h"
17
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "cloud/cloud_db_constant.h"
21 #include "kv_store_errno.h"
22 #include "log_print.h"
23 #include "param_check_utils.h"
24 #include "relational_store_changed_data_impl.h"
25 #include "relational_store_instance.h"
26 #include "sync_operation.h"
27
28 namespace DistributedDB {
RelationalStoreDelegateImpl(RelationalStoreConnection * conn,const std::string & path)29 RelationalStoreDelegateImpl::RelationalStoreDelegateImpl(RelationalStoreConnection *conn, const std::string &path)
30 : conn_(conn),
31 storePath_(path)
32 {}
33
~RelationalStoreDelegateImpl()34 RelationalStoreDelegateImpl::~RelationalStoreDelegateImpl()
35 {
36 if (!releaseFlag_) {
37 LOGF("[RelationalStore Delegate] Can't release directly");
38 return;
39 }
40
41 conn_ = nullptr;
42 }
43
RemoveDeviceDataInner(const std::string & device,ClearMode mode)44 DBStatus RelationalStoreDelegateImpl::RemoveDeviceDataInner(const std::string &device, ClearMode mode)
45 {
46 if (mode >= BUTT || mode < 0) {
47 LOGE("Invalid mode for Remove device data, %d.", INVALID_ARGS);
48 return INVALID_ARGS;
49 }
50 if (mode == DEFAULT) {
51 return RemoveDeviceData(device, "");
52 }
53 if (conn_ == nullptr) {
54 LOGE("[RelationalStore Delegate] Invalid connection for operation!");
55 return DB_ERROR;
56 }
57
58 int errCode = conn_->DoClean(mode);
59 if (errCode != E_OK) {
60 LOGE("[RelationalStore Delegate] remove device cloud data failed:%d", errCode);
61 return TransferDBErrno(errCode);
62 }
63 return OK;
64 }
65
GetCloudSyncTaskCount()66 int32_t RelationalStoreDelegateImpl::GetCloudSyncTaskCount()
67 {
68 if (conn_ == nullptr) {
69 LOGE("[RelationalStore Delegate] Invalid connection for operation!");
70 return -1;
71 }
72 int32_t count = conn_->GetCloudSyncTaskCount();
73 if (count == -1) {
74 LOGE("[RelationalStore Delegate] Failed to get cloud sync task count.");
75 }
76 return count;
77 }
78
CreateDistributedTableInner(const std::string & tableName,TableSyncType type)79 DBStatus RelationalStoreDelegateImpl::CreateDistributedTableInner(const std::string &tableName, TableSyncType type)
80 {
81 if (!ParamCheckUtils::CheckRelationalTableName(tableName)) {
82 LOGE("[RelationalStore Delegate] Invalid table name.");
83 return INVALID_ARGS;
84 }
85
86 if (!(type == DEVICE_COOPERATION || type == CLOUD_COOPERATION)) {
87 LOGE("[RelationalStore Delegate] Invalid table sync type.");
88 return INVALID_ARGS;
89 }
90
91 if (conn_ == nullptr) {
92 LOGE("[RelationalStore Delegate] Invalid connection for operation!");
93 return DB_ERROR;
94 }
95
96 int errCode = conn_->CreateDistributedTable(tableName, type);
97 if (errCode != E_OK) {
98 LOGE("[RelationalStore Delegate] Create Distributed table failed:%d", errCode);
99 return TransferDBErrno(errCode);
100 }
101 return OK;
102 }
103
Sync(const std::vector<std::string> & devices,SyncMode mode,const Query & query,const SyncStatusCallback & onComplete,bool wait)104 DBStatus RelationalStoreDelegateImpl::Sync(const std::vector<std::string> &devices, SyncMode mode,
105 const Query &query, const SyncStatusCallback &onComplete, bool wait)
106 {
107 if (conn_ == nullptr) {
108 LOGE("Invalid connection for operation!");
109 return DB_ERROR;
110 }
111
112 if (mode > SYNC_MODE_PUSH_PULL) {
113 LOGE("not support other mode");
114 return NOT_SUPPORT;
115 }
116
117 if (!DBCommon::CheckQueryWithoutMultiTable(query)) {
118 LOGE("not support query with tables");
119 return NOT_SUPPORT;
120 }
121 RelationalStoreConnection::SyncInfo syncInfo{devices, mode,
122 [this, onComplete](const std::map<std::string, std::vector<TableStatus>> &devicesStatus) {
123 OnSyncComplete(devicesStatus, onComplete);
124 }, query, wait};
125 int errCode = conn_->SyncToDevice(syncInfo);
126 if (errCode != E_OK) {
127 LOGW("[RelationalStore Delegate] sync data to device failed:%d", errCode);
128 return TransferDBErrno(errCode);
129 }
130 return OK;
131 }
132
RemoveDeviceData(const std::string & device,const std::string & tableName)133 DBStatus RelationalStoreDelegateImpl::RemoveDeviceData(const std::string &device, const std::string &tableName)
134 {
135 if (conn_ == nullptr) {
136 LOGE("Invalid connection for operation!");
137 return DB_ERROR;
138 }
139
140 if (device.empty() || device.length() > DBConstant::MAX_DEV_LENGTH ||
141 !ParamCheckUtils::CheckRelationalTableName(tableName)) {
142 LOGE("[RelationalStore Delegate] Remove device data with invalid device name or table name.");
143 return INVALID_ARGS;
144 }
145
146 int errCode = conn_->RemoveDeviceData(device, tableName);
147 if (errCode != E_OK) {
148 LOGW("[RelationalStore Delegate] remove device data failed:%d", errCode);
149 return TransferDBErrno(errCode);
150 }
151 return OK;
152 }
153
Close()154 DBStatus RelationalStoreDelegateImpl::Close()
155 {
156 if (conn_ == nullptr) {
157 return OK;
158 }
159
160 int errCode = RelationalStoreInstance::ReleaseDataBaseConnection(conn_);
161 if (errCode == -E_BUSY) {
162 LOGW("[RelationalStore Delegate] busy for close");
163 return BUSY;
164 }
165 if (errCode != E_OK) {
166 LOGE("Release db connection error:%d", errCode);
167 return TransferDBErrno(errCode);
168 }
169
170 LOGI("[RelationalStore Delegate] Close");
171 conn_ = nullptr;
172 return OK;
173 }
174
SetReleaseFlag(bool flag)175 void RelationalStoreDelegateImpl::SetReleaseFlag(bool flag)
176 {
177 releaseFlag_ = flag;
178 }
179
OnSyncComplete(const std::map<std::string,std::vector<TableStatus>> & devicesStatus,const SyncStatusCallback & onComplete)180 void RelationalStoreDelegateImpl::OnSyncComplete(const std::map<std::string, std::vector<TableStatus>> &devicesStatus,
181 const SyncStatusCallback &onComplete)
182 {
183 std::map<std::string, std::vector<TableStatus>> res;
184 for (const auto &[device, tablesStatus] : devicesStatus) {
185 for (const auto &tableStatus : tablesStatus) {
186 TableStatus table;
187 table.tableName = tableStatus.tableName;
188 table.status = SyncOperation::DBStatusTrans(tableStatus.status);
189 res[device].push_back(table);
190 }
191 }
192 if (onComplete) {
193 onComplete(res);
194 }
195 }
196
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,std::shared_ptr<ResultSet> & result)197 DBStatus RelationalStoreDelegateImpl::RemoteQuery(const std::string &device, const RemoteCondition &condition,
198 uint64_t timeout, std::shared_ptr<ResultSet> &result)
199 {
200 if (conn_ == nullptr) {
201 LOGE("Invalid connection for operation!");
202 return DB_ERROR;
203 }
204 int errCode = conn_->RemoteQuery(device, condition, timeout, result);
205 if (errCode != E_OK) {
206 LOGW("[RelationalStore Delegate] remote query failed:%d", errCode);
207 result = nullptr;
208 return TransferDBErrno(errCode);
209 }
210 return OK;
211 }
212
RemoveDeviceData()213 DBStatus RelationalStoreDelegateImpl::RemoveDeviceData()
214 {
215 if (conn_ == nullptr) {
216 LOGE("Invalid connection for operation!");
217 return DB_ERROR;
218 }
219
220 int errCode = conn_->RemoveDeviceData();
221 if (errCode != E_OK) {
222 LOGW("[RelationalStore Delegate] remove device data failed:%d", errCode);
223 return TransferDBErrno(errCode);
224 }
225 return OK;
226 }
227
Sync(const std::vector<std::string> & devices,SyncMode mode,const Query & query,const SyncProcessCallback & onProcess,int64_t waitTime)228 DBStatus RelationalStoreDelegateImpl::Sync(const std::vector<std::string> &devices, SyncMode mode, const Query &query,
229 const SyncProcessCallback &onProcess, int64_t waitTime)
230 {
231 CloudSyncOption option;
232 option.devices = devices;
233 option.mode = mode;
234 option.query = query;
235 option.waitTime = waitTime;
236 return Sync(option, onProcess);
237 }
238
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDb)239 DBStatus RelationalStoreDelegateImpl::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDb)
240 {
241 if (conn_ == nullptr || conn_->SetCloudDB(cloudDb) != E_OK) {
242 return DB_ERROR;
243 }
244 return OK;
245 }
246
SetCloudDbSchema(const DataBaseSchema & schema)247 DBStatus RelationalStoreDelegateImpl::SetCloudDbSchema(const DataBaseSchema &schema)
248 {
249 DataBaseSchema cloudSchema = schema;
250 if (!ParamCheckUtils::CheckSharedTableName(cloudSchema)) {
251 LOGE("[RelationalStore Delegate] SharedTableName check failed!");
252 return INVALID_ARGS;
253 }
254 if (conn_ == nullptr) {
255 return DB_ERROR;
256 }
257 // create shared table and set cloud db schema
258 int errorCode = conn_->PrepareAndSetCloudDbSchema(cloudSchema);
259 if (errorCode != E_OK) {
260 LOGE("[RelationalStore Delegate] set cloud schema failed!");
261 }
262 return TransferDBErrno(errorCode);
263 }
264
RegisterObserver(StoreObserver * observer)265 DBStatus RelationalStoreDelegateImpl::RegisterObserver(StoreObserver *observer)
266 {
267 if (observer == nullptr) {
268 return INVALID_ARGS;
269 }
270 if (conn_ == nullptr) {
271 return DB_ERROR;
272 }
273 std::string userId;
274 std::string appId;
275 std::string storeId;
276 int errCode = conn_->GetStoreInfo(userId, appId, storeId);
277 if (errCode != E_OK) {
278 return DB_ERROR;
279 }
280 errCode = conn_->RegisterObserverAction(observer, [observer, userId, appId, storeId](
281 const std::string &changedDevice, ChangedData &&changedData, bool isChangedData) {
282 if (isChangedData && observer != nullptr) {
283 observer->OnChange(Origin::ORIGIN_CLOUD, changedDevice, std::move(changedData));
284 LOGD("begin to observer on changed data");
285 return;
286 }
287 RelationalStoreChangedDataImpl data(changedDevice);
288 data.SetStoreProperty({userId, appId, storeId});
289 if (observer != nullptr) {
290 LOGD("begin to observer on changed, changedDevice=%s", STR_MASK(changedDevice));
291 observer->OnChange(data);
292 }
293 });
294 return TransferDBErrno(errCode);
295 }
296
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)297 DBStatus RelationalStoreDelegateImpl::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
298 {
299 if (conn_ == nullptr || conn_->SetIAssetLoader(loader) != E_OK) {
300 return DB_ERROR;
301 }
302 return OK;
303 }
304
UnRegisterObserver()305 DBStatus RelationalStoreDelegateImpl::UnRegisterObserver()
306 {
307 if (conn_ == nullptr) {
308 return DB_ERROR;
309 }
310 // unregister all observer of this delegate
311 return TransferDBErrno(conn_->UnRegisterObserverAction(nullptr));
312 }
313
UnRegisterObserver(StoreObserver * observer)314 DBStatus RelationalStoreDelegateImpl::UnRegisterObserver(StoreObserver *observer)
315 {
316 if (observer == nullptr) {
317 return INVALID_ARGS;
318 }
319 if (conn_ == nullptr) {
320 return DB_ERROR;
321 }
322 return TransferDBErrno(conn_->UnRegisterObserverAction(observer));
323 }
324
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess)325 DBStatus RelationalStoreDelegateImpl::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess)
326 {
327 uint64_t taskId = 0;
328 return Sync(option, onProcess, taskId);
329 }
330
SetTrackerTable(const TrackerSchema & schema)331 DBStatus RelationalStoreDelegateImpl::SetTrackerTable(const TrackerSchema &schema)
332 {
333 if (conn_ == nullptr) {
334 LOGE("[RelationalStore Delegate] Invalid connection for operation!");
335 return DB_ERROR;
336 }
337 if (schema.tableName.empty()) {
338 LOGE("[RelationalStore Delegate] tracker table is empty.");
339 return INVALID_ARGS;
340 }
341 if (!ParamCheckUtils::CheckRelationalTableName(schema.tableName)) {
342 LOGE("[RelationalStore Delegate] Invalid tracker table name.");
343 return INVALID_ARGS;
344 }
345 int errCode = conn_->SetTrackerTable(schema);
346 if (errCode != E_OK) {
347 if (errCode == -E_WITH_INVENTORY_DATA) {
348 LOGI("[RelationalStore Delegate] create tracker table for the first time.");
349 } else {
350 LOGE("[RelationalStore Delegate] Set Subscribe table failed:%d", errCode);
351 }
352 return TransferDBErrno(errCode);
353 }
354 return OK;
355 }
356
ExecuteSql(const SqlCondition & condition,std::vector<VBucket> & records)357 DBStatus RelationalStoreDelegateImpl::ExecuteSql(const SqlCondition &condition, std::vector<VBucket> &records)
358 {
359 if (conn_ == nullptr) {
360 LOGE("[RelationalStore Delegate] Invalid connection for operation!");
361 return DB_ERROR;
362 }
363 int errCode = conn_->ExecuteSql(condition, records);
364 if (errCode != E_OK) {
365 LOGE("[RelationalStore Delegate] execute sql failed:%d", errCode);
366 return TransferDBErrno(errCode);
367 }
368 return OK;
369 }
370
SetReference(const std::vector<TableReferenceProperty> & tableReferenceProperty)371 DBStatus RelationalStoreDelegateImpl::SetReference(const std::vector<TableReferenceProperty> &tableReferenceProperty)
372 {
373 if (conn_ == nullptr) {
374 LOGE("[RelationalStore SetReference] Invalid connection for operation!");
375 return DB_ERROR;
376 }
377 if (!ParamCheckUtils::CheckTableReference(tableReferenceProperty)) {
378 return INVALID_ARGS;
379 }
380 int errCode = conn_->SetReference(tableReferenceProperty);
381 if (errCode != E_OK) {
382 if (errCode != -E_TABLE_REFERENCE_CHANGED) {
383 LOGE("[RelationalStore] SetReference failed:%d", errCode);
384 } else {
385 LOGI("[RelationalStore] reference changed");
386 }
387 return TransferDBErrno(errCode);
388 }
389 LOGI("[RelationalStore Delegate] SetReference success");
390 return OK;
391 }
392
CleanTrackerData(const std::string & tableName,int64_t cursor)393 DBStatus RelationalStoreDelegateImpl::CleanTrackerData(const std::string &tableName, int64_t cursor)
394 {
395 if (conn_ == nullptr) {
396 LOGE("[RelationalStore Delegate] Invalid connection for operation!");
397 return DB_ERROR;
398 }
399 int errCode = conn_->CleanTrackerData(tableName, cursor);
400 if (errCode != E_OK) {
401 LOGE("[RelationalStore Delegate] clean tracker data failed:%d", errCode);
402 return TransferDBErrno(errCode);
403 }
404 LOGI("[RelationalStore Delegate] CleanTrackerData success");
405 return OK;
406 }
407
Pragma(PragmaCmd cmd,PragmaData & pragmaData)408 DBStatus RelationalStoreDelegateImpl::Pragma(PragmaCmd cmd, PragmaData &pragmaData)
409 {
410 if (cmd != PragmaCmd::LOGIC_DELETE_SYNC_DATA) {
411 return NOT_SUPPORT;
412 }
413 if (conn_ == nullptr) {
414 LOGE("[RelationalStore Delegate] Invalid connection for operation!");
415 return DB_ERROR;
416 }
417 int errCode = conn_->Pragma(cmd, pragmaData);
418 if (errCode != E_OK) {
419 LOGE("[RelationalStore Delegate] Pragma failed:%d", errCode);
420 return TransferDBErrno(errCode);
421 }
422 LOGI("[RelationalStore Delegate] Pragma success");
423 return OK;
424 }
425
UpsertData(const std::string & tableName,const std::vector<VBucket> & records,RecordStatus status)426 DBStatus RelationalStoreDelegateImpl::UpsertData(const std::string &tableName, const std::vector<VBucket> &records,
427 RecordStatus status)
428 {
429 if (conn_ == nullptr) {
430 LOGE("[RelationalStore Delegate] Invalid connection for operation!");
431 return DB_ERROR;
432 }
433 int errCode = conn_->UpsertData(status, tableName, records);
434 if (errCode != E_OK) {
435 LOGE("[RelationalStore Delegate] Upsert data failed:%d", errCode);
436 return TransferDBErrno(errCode);
437 }
438 LOGI("[RelationalStore Delegate] Upsert data success");
439 return OK;
440 }
441
SetCloudSyncConfig(const CloudSyncConfig & config)442 DBStatus RelationalStoreDelegateImpl::SetCloudSyncConfig(const CloudSyncConfig &config)
443 {
444 if (conn_ == nullptr) {
445 LOGE("[RelationalStore Delegate] Invalid connection for SetCloudSyncConfig!");
446 return DB_ERROR;
447 }
448 if (!DBCommon::CheckCloudSyncConfigValid(config)) {
449 return INVALID_ARGS;
450 }
451 int errCode = conn_->SetCloudSyncConfig(config);
452 if (errCode != E_OK) {
453 LOGE("[RelationalStore Delegate] SetCloudSyncConfig failed:%d", errCode);
454 return TransferDBErrno(errCode);
455 }
456 LOGI("[RelationalStore Delegate] SetCloudSyncConfig success");
457 return OK;
458 }
459
Sync(const CloudSyncOption & option,const SyncProcessCallback & onProcess,uint64_t taskId)460 DBStatus RelationalStoreDelegateImpl::Sync(const CloudSyncOption &option, const SyncProcessCallback &onProcess,
461 uint64_t taskId)
462 {
463 if (conn_ == nullptr) {
464 LOGE("[RelationalStore Delegate] Invalid connection for sync!");
465 return DB_ERROR;
466 }
467 int errCode = conn_->Sync(option, onProcess, taskId);
468 if (errCode != E_OK) {
469 LOGE("[RelationalStore Delegate] Cloud sync failed:%d", errCode);
470 return TransferDBErrno(errCode);
471 }
472 return OK;
473 }
474
GetCloudTaskStatus(uint64_t taskId)475 SyncProcess RelationalStoreDelegateImpl::GetCloudTaskStatus(uint64_t taskId)
476 {
477 SyncProcess syncProcess;
478 if (conn_ == nullptr) {
479 LOGE("[RelationalStore Delegate] Invalid connection for getting cloud task status!");
480 syncProcess.errCode = DB_ERROR;
481 return syncProcess;
482 }
483 return conn_->GetCloudTaskStatus(taskId);
484 }
485 } // namespace DistributedDB
486 #endif