1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "kv_virtual_device.h"
16
17 #include "log_print.h"
18 #include "virtual_multi_ver_sync_db_interface.h"
19
20 namespace DistributedDB {
KvVirtualDevice(const std::string & deviceId)21 KvVirtualDevice::KvVirtualDevice(const std::string &deviceId) : GenericVirtualDevice(deviceId)
22 {
23 }
24
~KvVirtualDevice()25 KvVirtualDevice::~KvVirtualDevice()
26 {
27 }
28
GetData(const Key & key,VirtualDataItem & item)29 int KvVirtualDevice::GetData(const Key &key, VirtualDataItem &item)
30 {
31 VirtualSingleVerSyncDBInterface *syncAble = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
32 return syncAble->GetSyncData(key, item);
33 }
34
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)35 int KvVirtualDevice::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
36 {
37 VirtualSingleVerSyncDBInterface *syncAble = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
38 LOGI("dev %s put data time %" PRIu64, deviceId_.c_str(), time);
39 return syncAble->PutData(key, value, time, flag);
40 }
41
SetSaveDataDelayTime(uint64_t milliDelayTime)42 void KvVirtualDevice::SetSaveDataDelayTime(uint64_t milliDelayTime)
43 {
44 VirtualSingleVerSyncDBInterface *syncInterface = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
45 syncInterface->SetSaveDataDelayTime(milliDelayTime);
46 }
47
DelayGetSyncData(uint64_t milliDelayTime)48 void KvVirtualDevice::DelayGetSyncData(uint64_t milliDelayTime)
49 {
50 VirtualSingleVerSyncDBInterface *syncInterface = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
51 syncInterface->DelayGetSyncData(milliDelayTime);
52 }
53
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)54 void KvVirtualDevice::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
55 {
56 VirtualSingleVerSyncDBInterface *syncInterface = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
57 syncInterface->SetGetDataErrCode(whichTime, errCode, isGetDataControl);
58 }
59
ResetDataControl()60 void KvVirtualDevice::ResetDataControl()
61 {
62 VirtualSingleVerSyncDBInterface *syncInterface = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
63 syncInterface->ResetDataControl();
64 }
65
66 #ifndef OMIT_MULTI_VER
GetData(const Key & key,Value & value)67 int KvVirtualDevice::GetData(const Key &key, Value &value)
68 {
69 VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
70 return syncInterface->GetData(key, value);
71 }
72
PutData(const Key & key,const Value & value)73 int KvVirtualDevice::PutData(const Key &key, const Value &value)
74 {
75 VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
76 return syncInterface->PutData(key, value);
77 }
78
DeleteData(const Key & key)79 int KvVirtualDevice::DeleteData(const Key &key)
80 {
81 VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
82 return syncInterface->DeleteData(key);
83 }
84
StartTransaction()85 int KvVirtualDevice::StartTransaction()
86 {
87 VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
88 return syncInterface->StartTransaction();
89 }
90
Commit()91 int KvVirtualDevice::Commit()
92 {
93 VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
94 return syncInterface->Commit();
95 }
96 #endif // OMIT_MULTI_VER
97
Subscribe(QuerySyncObject query,bool wait,int id)98 int KvVirtualDevice::Subscribe(QuerySyncObject query, bool wait, int id)
99 {
100 auto operation = new (std::nothrow) SyncOperation(id, {remoteDeviceId_}, SUBSCRIBE_QUERY, nullptr, wait);
101 if (operation == nullptr) {
102 return -E_OUT_OF_MEMORY;
103 }
104 operation->Initialize();
105 operation->SetOnSyncFinished([operation](int id) {
106 operation->NotifyIfNeed();
107 });
108 operation->SetQuery(query);
109 context_->AddSyncOperation(operation);
110 operation->WaitIfNeed();
111 RefObject::KillAndDecObjRef(operation);
112 return E_OK;
113 }
114
UnSubscribe(QuerySyncObject query,bool wait,int id)115 int KvVirtualDevice::UnSubscribe(QuerySyncObject query, bool wait, int id)
116 {
117 return UnSubscribe(query, wait, id, nullptr);
118 }
119
UnSubscribe(const QuerySyncObject & query,bool wait,int id,const SyncOperation::UserCallback & callback)120 int KvVirtualDevice::UnSubscribe(const QuerySyncObject &query, bool wait, int id,
121 const SyncOperation::UserCallback &callback)
122 {
123 auto operation = new (std::nothrow) SyncOperation(id, {remoteDeviceId_}, UNSUBSCRIBE_QUERY, callback, wait);
124 if (operation == nullptr) {
125 return -E_OUT_OF_MEMORY;
126 }
127 operation->Initialize();
128 operation->SetOnSyncFinished([operation](int id) {
129 operation->NotifyIfNeed();
130 });
131 operation->SetQuery(query);
132 context_->AddSyncOperation(operation);
133 operation->WaitIfNeed();
134 RefObject::KillAndDecObjRef(operation);
135 return E_OK;
136 }
137
SetSaveDataCallback(const std::function<void ()> & callback)138 void KvVirtualDevice::SetSaveDataCallback(const std::function<void()> &callback)
139 {
140 auto *syncAble = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
141 syncAble->SetSaveDataCallback(callback);
142 }
143
EraseWaterMark(const std::string & dev)144 void KvVirtualDevice::EraseWaterMark(const std::string &dev)
145 {
146 metadata_->EraseDeviceWaterMark(dev, true);
147 }
148
SetPushNotifier(const std::function<void (const std::string &)> & pushNotifier)149 void KvVirtualDevice::SetPushNotifier(const std::function<void(const std::string &)> &pushNotifier)
150 {
151 auto *syncAble = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
152 syncAble->SetPushNotifier(pushNotifier);
153 }
154 } // namespace DistributedDB