1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "kv_virtual_device.h"
16 
17 #include "log_print.h"
18 #include "virtual_multi_ver_sync_db_interface.h"
19 
20 namespace DistributedDB {
KvVirtualDevice(const std::string & deviceId)21 KvVirtualDevice::KvVirtualDevice(const std::string &deviceId) : GenericVirtualDevice(deviceId)
22 {
23 }
24 
~KvVirtualDevice()25 KvVirtualDevice::~KvVirtualDevice()
26 {
27 }
28 
GetData(const Key & key,VirtualDataItem & item)29 int KvVirtualDevice::GetData(const Key &key, VirtualDataItem &item)
30 {
31     VirtualSingleVerSyncDBInterface *syncAble = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
32     return syncAble->GetSyncData(key, item);
33 }
34 
PutData(const Key & key,const Value & value,const Timestamp & time,int flag)35 int KvVirtualDevice::PutData(const Key &key, const Value &value, const Timestamp &time, int flag)
36 {
37     VirtualSingleVerSyncDBInterface *syncAble = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
38     LOGI("dev %s put data time %" PRIu64, deviceId_.c_str(), time);
39     return syncAble->PutData(key, value, time, flag);
40 }
41 
SetSaveDataDelayTime(uint64_t milliDelayTime)42 void KvVirtualDevice::SetSaveDataDelayTime(uint64_t milliDelayTime)
43 {
44     VirtualSingleVerSyncDBInterface *syncInterface = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
45     syncInterface->SetSaveDataDelayTime(milliDelayTime);
46 }
47 
DelayGetSyncData(uint64_t milliDelayTime)48 void KvVirtualDevice::DelayGetSyncData(uint64_t milliDelayTime)
49 {
50     VirtualSingleVerSyncDBInterface *syncInterface = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
51     syncInterface->DelayGetSyncData(milliDelayTime);
52 }
53 
SetGetDataErrCode(int whichTime,int errCode,bool isGetDataControl)54 void KvVirtualDevice::SetGetDataErrCode(int whichTime, int errCode, bool isGetDataControl)
55 {
56     VirtualSingleVerSyncDBInterface *syncInterface = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
57     syncInterface->SetGetDataErrCode(whichTime, errCode, isGetDataControl);
58 }
59 
ResetDataControl()60 void KvVirtualDevice::ResetDataControl()
61 {
62     VirtualSingleVerSyncDBInterface *syncInterface = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
63     syncInterface->ResetDataControl();
64 }
65 
66 #ifndef OMIT_MULTI_VER
GetData(const Key & key,Value & value)67 int KvVirtualDevice::GetData(const Key &key, Value &value)
68 {
69     VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
70     return syncInterface->GetData(key, value);
71 }
72 
PutData(const Key & key,const Value & value)73 int KvVirtualDevice::PutData(const Key &key, const Value &value)
74 {
75     VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
76     return syncInterface->PutData(key, value);
77 }
78 
DeleteData(const Key & key)79 int KvVirtualDevice::DeleteData(const Key &key)
80 {
81     VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
82     return syncInterface->DeleteData(key);
83 }
84 
StartTransaction()85 int KvVirtualDevice::StartTransaction()
86 {
87     VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
88     return syncInterface->StartTransaction();
89 }
90 
Commit()91 int KvVirtualDevice::Commit()
92 {
93     VirtualMultiVerSyncDBInterface *syncInterface = static_cast<VirtualMultiVerSyncDBInterface *>(storage_);
94     return syncInterface->Commit();
95 }
96 #endif // OMIT_MULTI_VER
97 
Subscribe(QuerySyncObject query,bool wait,int id)98 int KvVirtualDevice::Subscribe(QuerySyncObject query, bool wait, int id)
99 {
100     auto operation = new (std::nothrow) SyncOperation(id, {remoteDeviceId_}, SUBSCRIBE_QUERY, nullptr, wait);
101     if (operation == nullptr) {
102         return -E_OUT_OF_MEMORY;
103     }
104     operation->Initialize();
105     operation->SetOnSyncFinished([operation](int id) {
106         operation->NotifyIfNeed();
107     });
108     operation->SetQuery(query);
109     context_->AddSyncOperation(operation);
110     operation->WaitIfNeed();
111     RefObject::KillAndDecObjRef(operation);
112     return E_OK;
113 }
114 
UnSubscribe(QuerySyncObject query,bool wait,int id)115 int KvVirtualDevice::UnSubscribe(QuerySyncObject query, bool wait, int id)
116 {
117     return UnSubscribe(query, wait, id, nullptr);
118 }
119 
UnSubscribe(const QuerySyncObject & query,bool wait,int id,const SyncOperation::UserCallback & callback)120 int KvVirtualDevice::UnSubscribe(const QuerySyncObject &query, bool wait, int id,
121     const SyncOperation::UserCallback &callback)
122 {
123     auto operation = new (std::nothrow) SyncOperation(id, {remoteDeviceId_}, UNSUBSCRIBE_QUERY, callback, wait);
124     if (operation == nullptr) {
125         return -E_OUT_OF_MEMORY;
126     }
127     operation->Initialize();
128     operation->SetOnSyncFinished([operation](int id) {
129         operation->NotifyIfNeed();
130     });
131     operation->SetQuery(query);
132     context_->AddSyncOperation(operation);
133     operation->WaitIfNeed();
134     RefObject::KillAndDecObjRef(operation);
135     return E_OK;
136 }
137 
SetSaveDataCallback(const std::function<void ()> & callback)138 void KvVirtualDevice::SetSaveDataCallback(const std::function<void()> &callback)
139 {
140     auto *syncAble = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
141     syncAble->SetSaveDataCallback(callback);
142 }
143 
EraseWaterMark(const std::string & dev)144 void KvVirtualDevice::EraseWaterMark(const std::string &dev)
145 {
146     metadata_->EraseDeviceWaterMark(dev, true);
147 }
148 
SetPushNotifier(const std::function<void (const std::string &)> & pushNotifier)149 void KvVirtualDevice::SetPushNotifier(const std::function<void(const std::string &)> &pushNotifier)
150 {
151     auto *syncAble = static_cast<VirtualSingleVerSyncDBInterface *>(storage_);
152     syncAble->SetPushNotifier(pushNotifier);
153 }
154 } // namespace DistributedDB