1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define LOG_TAG "JsKVStore"
16 #include "js_kv_store.h"
17 #include "js_util.h"
18 #include "js_kv_store_resultset.h"
19 #include "log_print.h"
20 #include "napi_queue.h"
21 #include "datashare_values_bucket.h"
22 #include "datashare_predicates.h"
23 #include "single_kvstore.h"
24 #include "kv_utils.h"
25 #include "kvstore_datashare_bridge.h"
26 
27 using namespace OHOS::DistributedKv;
28 using namespace OHOS::DataShare;
29 
30 namespace OHOS::DistributedData {
31 inline static uint8_t UNVALID_SUBSCRIBE_TYPE = 255;
32 std::map<std::string, JsKVStore::Exec> JsKVStore::onEventHandlers_ = {
33     { "dataChange", JsKVStore::OnDataChange },
34     { "syncComplete", JsKVStore::OnSyncComplete }
35 };
36 
37 std::map<std::string, JsKVStore::Exec> JsKVStore::offEventHandlers_ = {
38     { "dataChange", JsKVStore::OffDataChange },
39     { "syncComplete", JsKVStore::OffSyncComplete }
40 };
41 
ValidSubscribeType(uint8_t type)42 static bool ValidSubscribeType(uint8_t type)
43 {
44     return (SUBSCRIBE_LOCAL <= type) && (type <= SUBSCRIBE_LOCAL_REMOTE);
45 }
46 
ToSubscribeType(uint8_t type)47 static SubscribeType ToSubscribeType(uint8_t type)
48 {
49     switch (type) {
50         case 0:  // 0 means SUBSCRIBE_TYPE_LOCAL
51             return SubscribeType::SUBSCRIBE_TYPE_LOCAL;
52         case 1:  // 1 means SUBSCRIBE_TYPE_REMOTE
53             return SubscribeType::SUBSCRIBE_TYPE_REMOTE;
54         case 2:  // 2 means SUBSCRIBE_TYPE_ALL
55             return SubscribeType::SUBSCRIBE_TYPE_ALL;
56         default:
57             return static_cast<SubscribeType>(UNVALID_SUBSCRIBE_TYPE);
58     }
59 }
60 
JsKVStore(const std::string & storeId)61 JsKVStore::JsKVStore(const std::string& storeId)
62     : storeId_(storeId)
63 {
64 }
65 
~JsKVStore()66 JsKVStore::~JsKVStore()
67 {
68     ZLOGD("no memory leak for JsKVStore");
69     if (kvStore_ == nullptr) {
70         return;
71     }
72 
73     std::lock_guard<std::mutex> lck(listMutex_);
74     for (uint8_t type = SUBSCRIBE_LOCAL; type < SUBSCRIBE_COUNT; type++) {
75         for (auto& observer : dataObserver_[type]) {
76             auto subscribeType = ToSubscribeType(type);
77             kvStore_->UnSubscribeKvStore(subscribeType, observer);
78             observer->Clear();
79         }
80         dataObserver_[type].clear();
81     }
82 
83     kvStore_->UnRegisterSyncCallback();
84     for (auto &syncObserver : syncObservers_) {
85         syncObserver->Clear();
86     }
87     syncObservers_.clear();
88 }
89 
SetNative(std::shared_ptr<SingleKvStore> & kvStore)90 void JsKVStore::SetNative(std::shared_ptr<SingleKvStore>& kvStore)
91 {
92     kvStore_ = kvStore;
93 }
94 
GetNative()95 std::shared_ptr<SingleKvStore>& JsKVStore::GetNative()
96 {
97     return kvStore_;
98 }
99 
SetContextParam(std::shared_ptr<ContextParam> param)100 void JsKVStore::SetContextParam(std::shared_ptr<ContextParam> param)
101 {
102     param_ = param;
103 }
104 
IsInstanceOf(napi_env env,napi_value obj,const std::string & storeId,napi_value constructor)105 bool JsKVStore::IsInstanceOf(napi_env env, napi_value obj, const std::string& storeId, napi_value constructor)
106 {
107     bool result = false;
108     napi_status status = napi_instanceof(env, obj, constructor, &result);
109     CHECK_RETURN((status == napi_ok) && (result != false), "is not instance of JsKVStore!", false);
110 
111     JsKVStore* kvStore = nullptr;
112     status = napi_unwrap(env, obj, reinterpret_cast<void**>(&kvStore));
113     CHECK_RETURN((status == napi_ok) && (kvStore != nullptr), "can not unwrap to JsKVStore!", false);
114     return kvStore->storeId_ == storeId;
115 }
116 
117 /*
118  * [JS API Prototype]
119  * [AsyncCallback]
120  *      put(key:string, value:Uint8Array | string | boolean | number, callback: AsyncCallback<void>):void;
121  * [Promise]
122  *      put(key:string, value:Uint8Array | string | boolean | number):Promise<void>;
123  */
Put(napi_env env,napi_callback_info info)124 napi_value JsKVStore::Put(napi_env env, napi_callback_info info)
125 {
126     ZLOGD("KVStore::Put()");
127     struct PutContext : public ContextBase {
128         std::string key;
129         JSUtil::KvStoreVariant value;
130     };
131     auto ctxt = std::make_shared<PutContext>();
132 
133     ctxt->GetCbInfo(env, info, [env, ctxt](size_t argc, napi_value* argv) {
134         // required 2 arguments :: <key> <value>
135         CHECK_ARGS_RETURN_VOID(ctxt, argc == 2, "invalid arguments!");
136         ctxt->status = JSUtil::GetValue(env, argv[0], ctxt->key);
137         CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid key!");
138         ctxt->status = JSUtil::GetValue(env, argv[1], ctxt->value);
139         CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[1], i.e. invalid value!");
140     });
141 
142     auto execute = [ctxt]() {
143         DistributedKv::Key key(ctxt->key);
144         bool isSchemaStore = reinterpret_cast<JsKVStore *>(ctxt->native)->IsSchemaStore();
145         auto &kvStore = reinterpret_cast<JsKVStore *>(ctxt->native)->kvStore_;
146         DistributedKv::Value value = isSchemaStore ? DistributedKv::Blob(std::get<std::string>(ctxt->value))
147                                                    : JSUtil::VariantValue2Blob(ctxt->value);
148         Status status = kvStore->Put(key, value);
149         ZLOGD("kvStore->Put return %{public}d", status);
150         ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
151         CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->Put() failed!");
152     };
153     return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
154 }
155 
156 /*
157  * [JS API Prototype]
158  * [AsyncCallback]
159  *      delete(key: string, callback: AsyncCallback<void>): void;
160  * [Promise]
161  *      delete(key: string): Promise<void>;
162  */
Delete(napi_env env,napi_callback_info info)163 napi_value JsKVStore::Delete(napi_env env, napi_callback_info info)
164 {
165     ZLOGD("KVStore::Delete()");
166     struct DeleteContext : public ContextBase {
167         std::string key;
168     };
169     auto ctxt = std::make_shared<DeleteContext>();
170 
171     ctxt->GetCbInfo(env, info, [env, ctxt](size_t argc, napi_value* argv) {
172         // required 1 arguments :: <key>
173         CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments!");
174         ctxt->status = JSUtil::GetValue(env, argv[0], ctxt->key);
175         ZLOGD("kvStore->Delete %{public}.6s  status:%{public}d", ctxt->key.c_str(), ctxt->status);
176         CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid key!");
177     });
178 
179     return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), [ctxt]() {
180         OHOS::DistributedKv::Key key(ctxt->key);
181         auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
182         Status status = kvStore->Delete(key);
183         ZLOGD("kvStore->Delete %{public}.6s status:%{public}d", ctxt->key.c_str(), status);
184         ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
185         CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->Delete() failed!");
186     });
187 }
188 
189 /*
190  * [JS API Prototype]
191  * [Callback]
192  *      on(event:'syncComplete',syncCallback: Callback<Array<[string, number]>>):void;
193  *      on(event:'dataChange', subType: SubscribeType, observer: Callback<ChangeNotification>): void;
194  */
OnEvent(napi_env env,napi_callback_info info)195 napi_value JsKVStore::OnEvent(napi_env env, napi_callback_info info)
196 {
197     ZLOGD("in");
198     auto ctxt = std::make_shared<ContextBase>();
199     auto input = [env, ctxt](size_t argc, napi_value* argv) {
200         // required 2 arguments :: <event> [...] <callback>
201         CHECK_ARGS_RETURN_VOID(ctxt, argc >= 2, "invalid arguments!");
202         std::string event;
203         ctxt->status = JSUtil::GetValue(env, argv[0], event);
204         ZLOGI("subscribe to event:%{public}s", event.c_str());
205         auto handle = onEventHandlers_.find(event);
206         CHECK_ARGS_RETURN_VOID(ctxt, handle != onEventHandlers_.end(), "invalid arg[0], i.e. unsupported event");
207         // shift 1 argument, for JsKVStore::Exec.
208         handle->second(env, argc - 1, &argv[1], ctxt);
209     };
210     ctxt->GetCbInfoSync(env, info, input);
211     NAPI_ASSERT(env, ctxt->status == napi_ok, "invalid arguments!");
212     return nullptr;
213 }
214 
215 /*
216  * [JS API Prototype]
217  * [Callback]
218  *      off(event:'syncComplete',syncCallback: Callback<Array<[string, number]>>):void;
219  *      off(event:'dataChange', subType: SubscribeType, observer: Callback<ChangeNotification>): void;
220  */
OffEvent(napi_env env,napi_callback_info info)221 napi_value JsKVStore::OffEvent(napi_env env, napi_callback_info info)
222 {
223     ZLOGD("in");
224     auto ctxt = std::make_shared<ContextBase>();
225     auto input = [env, ctxt](size_t argc, napi_value* argv) {
226         // required 1 arguments :: <event> [callback]
227         CHECK_ARGS_RETURN_VOID(ctxt, argc >= 1, "invalid arguments!");
228         std::string event;
229         ctxt->status = JSUtil::GetValue(env, argv[0], event);
230         ZLOGI("unsubscribe to event:%{public}s", event.c_str());
231         auto handle = offEventHandlers_.find(event);
232         CHECK_ARGS_RETURN_VOID(ctxt, handle != offEventHandlers_.end(), "invalid arg[0], i.e. unsupported event");
233         // shift 1 argument, for JsKVStore::Exec.
234         handle->second(env, argc - 1, &argv[1], ctxt);
235     };
236     ctxt->GetCbInfoSync(env, info, input);
237     NAPI_ASSERT(env, ctxt->status == napi_ok, "invalid arguments!");
238     return nullptr;
239 }
240 
241 /*
242  * [JS API Prototype]
243  * [AsyncCallback]
244  *      putBatch(entries: Entry[], callback: AsyncCallback<void>):void;
245  * [Promise]
246  *      putBatch(entries: Entry[]):Promise<void>;
247  */
PutBatch(napi_env env,napi_callback_info info)248 napi_value JsKVStore::PutBatch(napi_env env, napi_callback_info info)
249 {
250     struct PutBatchContext : public ContextBase {
251         std::vector<Entry> entries;
252     };
253     auto ctxt = std::make_shared<PutBatchContext>();
254 
255     ctxt->GetCbInfo(env, info, [env, ctxt](size_t argc, napi_value* argv) {
256         // required 1 arguments :: <entries>
257         CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments!");
258         auto isSchemaStore = reinterpret_cast<JsKVStore*>(ctxt->native)->IsSchemaStore();
259         ctxt->status = JSUtil::GetValue(env, argv[0], ctxt->entries, isSchemaStore);
260         CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid entries!");
261     });
262 
263     auto execute = [ctxt]() {
264         auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
265         Status status = kvStore->PutBatch(ctxt->entries);
266         ZLOGD("kvStore->DeleteBatch return %{public}d", status);
267         ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
268         CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->PutBatch() failed!");
269     };
270     return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
271 }
272 
273 /*
274  * [JS API Prototype]
275  * [AsyncCallback]
276  *      deleteBatch(keys: string[], callback: AsyncCallback<void>):void;
277  * [Promise]
278  *      deleteBatch(keys: string[]):Promise<void>;
279  */
DeleteBatch(napi_env env,napi_callback_info info)280 napi_value JsKVStore::DeleteBatch(napi_env env, napi_callback_info info)
281 {
282     struct DeleteBatchContext : public ContextBase {
283         std::vector<std::string> keys;
284     };
285     auto ctxt = std::make_shared<DeleteBatchContext>();
286     auto input = [env, ctxt](size_t argc, napi_value* argv) {
287         // required 1 arguments :: <keys>
288         CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments!");
289         JSUtil::GetValue(env, argv[0], ctxt->keys);
290         CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid keys!");
291     };
292     ctxt->GetCbInfo(env, info, input);
293 
294     auto execute = [ctxt]() {
295         std::vector<DistributedKv::Key> keys;
296         for (auto it : ctxt->keys) {
297             DistributedKv::Key key(it);
298             keys.push_back(key);
299         }
300         auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
301         Status status = kvStore->DeleteBatch(keys);
302         ZLOGD("kvStore->DeleteBatch return %{public}d", status);
303         ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
304         CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->DeleteBatch failed!");
305     };
306     return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
307 }
308 
309 /*
310  * [JS API Prototype]
311  * [AsyncCallback]
312  *      startTransaction(callback: AsyncCallback<void>):void;
313  * [Promise]
314  *      startTransaction() : Promise<void>;
315  */
StartTransaction(napi_env env,napi_callback_info info)316 napi_value JsKVStore::StartTransaction(napi_env env, napi_callback_info info)
317 {
318     auto ctxt = std::make_shared<ContextBase>();
319     ctxt->GetCbInfo(env, info);
320 
321     auto execute = [ctxt]() {
322         auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
323         Status status = kvStore->StartTransaction();
324         ZLOGD("kvStore->StartTransaction return %{public}d", status);
325         ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
326         CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->StartTransaction() failed!");
327     };
328     return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
329 }
330 
331 /*
332  * [JS API Prototype]
333  * [AsyncCallback]
334  *      commit(callback: AsyncCallback<void>):void;
335  * [Promise]
336  *      commit() : Promise<void>;
337  */
Commit(napi_env env,napi_callback_info info)338 napi_value JsKVStore::Commit(napi_env env, napi_callback_info info)
339 {
340     auto ctxt = std::make_shared<ContextBase>();
341     ctxt->GetCbInfo(env, info);
342 
343     auto execute = [ctxt]() {
344         auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
345         Status status = kvStore->Commit();
346         ZLOGD("kvStore->Commit return %{public}d", status);
347         ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
348         CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->Commit() failed!");
349     };
350     return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
351 }
352 
353 /*
354  * [JS API Prototype]
355  * [AsyncCallback]
356  *      rollback(callback: AsyncCallback<void>):void;
357  * [Promise]
358  *      rollback() : Promise<void>;
359  */
Rollback(napi_env env,napi_callback_info info)360 napi_value JsKVStore::Rollback(napi_env env, napi_callback_info info)
361 {
362     auto ctxt = std::make_shared<ContextBase>();
363     ctxt->GetCbInfo(env, info);
364 
365     auto execute = [ctxt]() {
366         auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
367         Status status = kvStore->Rollback();
368         ZLOGD("kvStore->Commit return %{public}d", status);
369         ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
370         CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->Rollback() failed!");
371     };
372     return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
373 }
374 
375 /*
376  * [JS API Prototype]
377  * [AsyncCallback]
378  *      enableSync(enabled:boolean, callback: AsyncCallback<void>):void;
379  * [Promise]
380  *      enableSync(enabled:boolean) : Promise<void>;
381  */
EnableSync(napi_env env,napi_callback_info info)382 napi_value JsKVStore::EnableSync(napi_env env, napi_callback_info info)
383 {
384     struct EnableSyncContext : public ContextBase {
385         bool enable = false;
386     };
387     auto ctxt = std::make_shared<EnableSyncContext>();
388     auto input = [env, ctxt](size_t argc, napi_value* argv) {
389         // required 1 arguments :: <enable>
390         CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments!");
391         ctxt->status = napi_get_value_bool(env, argv[0], &ctxt->enable);
392         CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid enabled!");
393     };
394     ctxt->GetCbInfo(env, info, input);
395 
396     auto execute = [ctxt]() {
397         auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
398         Status status = kvStore->SetCapabilityEnabled(ctxt->enable);
399         ZLOGD("kvStore->SetCapabilityEnabled return %{public}d", status);
400         ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
401         CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->SetCapabilityEnabled() failed!");
402     };
403     return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
404 }
405 
406 /*
407  * [JS API Prototype]
408  * [AsyncCallback]
409  *      setSyncRange(localLabels:string[], remoteSupportLabels:string[], callback: AsyncCallback<void>):void;
410  * [Promise]
411  *      setSyncRange(localLabels:string[], remoteSupportLabels:string[]) : Promise<void>;
412  */
SetSyncRange(napi_env env,napi_callback_info info)413 napi_value JsKVStore::SetSyncRange(napi_env env, napi_callback_info info)
414 {
415     struct SyncRangeContext : public ContextBase {
416         std::vector<std::string> localLabels;
417         std::vector<std::string> remoteSupportLabels;
418     };
419     auto ctxt = std::make_shared<SyncRangeContext>();
420     auto input = [env, ctxt](size_t argc, napi_value* argv) {
421         // required 2 arguments :: <localLabels> <remoteSupportLabels>
422         CHECK_ARGS_RETURN_VOID(ctxt, argc == 2, "invalid arguments!");
423         ctxt->status = JSUtil::GetValue(env, argv[0], ctxt->localLabels);
424         CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid localLabels!");
425         ctxt->status = JSUtil::GetValue(env, argv[1], ctxt->remoteSupportLabels);
426         CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[1], i.e. invalid remoteSupportLabels!");
427     };
428     ctxt->GetCbInfo(env, info, input);
429 
430     auto execute = [ctxt]() {
431         auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
432         Status status = kvStore->SetCapabilityRange(ctxt->localLabels, ctxt->remoteSupportLabels);
433         ZLOGD("kvStore->SetCapabilityRange return %{public}d", status);
434         ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
435         CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->SetCapabilityRange() failed!");
436     };
437     return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
438 }
439 
440 /*
441  * [JS API Prototype] JsKVStore::OnDataChange is private non-static.
442  * [Callback]
443  *      on(event:'dataChange', subType: SubscribeType, observer: Callback<ChangeNotification>): void;
444  */
OnDataChange(napi_env env,size_t argc,napi_value * argv,std::shared_ptr<ContextBase> ctxt)445 void JsKVStore::OnDataChange(napi_env env, size_t argc, napi_value* argv, std::shared_ptr<ContextBase> ctxt)
446 {
447     // required 2 arguments :: <SubscribeType> <observer>
448     CHECK_ARGS_RETURN_VOID(ctxt, argc == 2, "invalid arguments on dataChange!");
449 
450     int32_t type = SUBSCRIBE_COUNT;
451     ctxt->status = napi_get_value_int32(env, argv[0], &type);
452     CHECK_STATUS_RETURN_VOID(ctxt, "napi_get_value_int32 failed!");
453     CHECK_ARGS_RETURN_VOID(ctxt, ValidSubscribeType(type), "invalid arg[1], i.e. invalid subscribeType");
454 
455     napi_valuetype valueType = napi_undefined;
456     ctxt->status = napi_typeof(env, argv[1], &valueType);
457     CHECK_STATUS_RETURN_VOID(ctxt, "napi_typeof failed!");
458     CHECK_ARGS_RETURN_VOID(ctxt, valueType == napi_function, "invalid arg[2], i.e. invalid callback");
459 
460     ZLOGI("subscribe dataChange, type: %{public}d", type);
461     auto proxy = reinterpret_cast<JsKVStore*>(ctxt->native);
462     std::lock_guard<std::mutex> lck(proxy->listMutex_);
463     for (auto& it : proxy->dataObserver_[type]) {
464         if (JSUtil::Equals(env, argv[1], it->GetCallback())) {
465             ZLOGI("function is already subscribe type");
466             return;
467         }
468     }
469 
470     ctxt->status =
471         proxy->Subscribe(type, std::make_shared<DataObserver>(proxy->uvQueue_, argv[1], proxy->IsSchemaStore()));
472     CHECK_STATUS_RETURN_VOID(ctxt, "Subscribe failed!");
473 }
474 
475 /*
476  * [JS API Prototype] JsKVStore::OffDataChange is private non-static.
477  * [Callback]
478  *      on(event:'dataChange', subType: SubscribeType, observer: Callback<ChangeNotification>): void;
479  * [NOTES!!!]  no SubscribeType while off...
480  *      off(event:'dataChange', observer: Callback<ChangeNotification>): void;
481  */
OffDataChange(napi_env env,size_t argc,napi_value * argv,std::shared_ptr<ContextBase> ctxt)482 void JsKVStore::OffDataChange(napi_env env, size_t argc, napi_value* argv, std::shared_ptr<ContextBase> ctxt)
483 {
484     // required 1 arguments :: [callback]
485     CHECK_ARGS_RETURN_VOID(ctxt, argc <= 1, "invalid arguments off dataChange!");
486     // have 1 arguments :: have the callback
487     if (argc == 1) {
488         napi_valuetype valueType = napi_undefined;
489         ctxt->status = napi_typeof(env, argv[0], &valueType);
490         CHECK_STATUS_RETURN_VOID(ctxt, "napi_typeof failed!");
491         CHECK_ARGS_RETURN_VOID(ctxt, valueType == napi_function, "invalid arg[1], i.e. invalid callback");
492     }
493     ZLOGI("unsubscribe dataChange, %{public}s specified observer.", (argc == 0) ? "without": "with");
494 
495     auto proxy = reinterpret_cast<JsKVStore*>(ctxt->native);
496     bool found = false;
497     napi_status status = napi_ok;
498     auto traverseType = [argc, argv, proxy, env, &found, &status](uint8_t type, auto& observers) {
499         auto it = observers.begin();
500         while (it != observers.end()) {
501             if ((argc == 1) && !JSUtil::Equals(env, argv[0], (*it)->GetCallback())) {
502                 ++it;
503                 continue; // specified observer and not current iterator
504             }
505             found = true;
506             status = proxy->UnSubscribe(type, *it);
507             if (status != napi_ok) {
508                 break; // stop on fail.
509             }
510             it = observers.erase(it);
511         }
512     };
513 
514     std::lock_guard<std::mutex> lck(proxy->listMutex_);
515     for (uint8_t type = SUBSCRIBE_LOCAL; type < SUBSCRIBE_COUNT; type++) {
516         traverseType(type, proxy->dataObserver_[type]);
517         if (status != napi_ok) {
518             break; // stop on fail.
519         }
520     }
521     CHECK_ARGS_RETURN_VOID(ctxt, found || (argc == 0), "not Subscribed!");
522 }
523 
524 /*
525  * [JS API Prototype] JsKVStore::OnSyncComplete is private non-static.
526  * [Callback]
527  *      on(event:'syncComplete',syncCallback: Callback<Array<[string, number]>>):void;
528  */
OnSyncComplete(napi_env env,size_t argc,napi_value * argv,std::shared_ptr<ContextBase> ctxt)529 void JsKVStore::OnSyncComplete(napi_env env, size_t argc, napi_value* argv, std::shared_ptr<ContextBase> ctxt)
530 {
531     // required 1 arguments :: <callback>
532     CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments on syncComplete!");
533     napi_valuetype valueType = napi_undefined;
534     ctxt->status = napi_typeof(env, argv[0], &valueType);
535     CHECK_STATUS_RETURN_VOID(ctxt, "napi_typeof failed!");
536     CHECK_ARGS_RETURN_VOID(ctxt, valueType == napi_function, "invalid arg[1], i.e. invalid callback");
537 
538     auto proxy = reinterpret_cast<JsKVStore*>(ctxt->native);
539     ctxt->status = proxy->RegisterSyncCallback(std::make_shared<SyncObserver>(proxy->uvQueue_, argv[0]));
540     CHECK_STATUS_RETURN_VOID(ctxt, "RegisterSyncCallback failed!");
541 }
542 
543 /*
544  * [JS API Prototype] JsKVStore::OffSyncComplete is private non-static.
545  * [Callback]
546  *      off(event:'syncComplete',syncCallback: Callback<Array<[string, number]>>):void;
547  */
OffSyncComplete(napi_env env,size_t argc,napi_value * argv,std::shared_ptr<ContextBase> ctxt)548 void JsKVStore::OffSyncComplete(napi_env env, size_t argc, napi_value* argv, std::shared_ptr<ContextBase> ctxt)
549 {
550     // required 1 arguments :: [callback]
551     CHECK_ARGS_RETURN_VOID(ctxt, argc <= 1, "invalid arguments off syncComplete!");
552     auto proxy = reinterpret_cast<JsKVStore*>(ctxt->native);
553     // have 1 arguments :: have the callback
554     if (argc == 1) {
555         napi_valuetype valueType = napi_undefined;
556         ctxt->status = napi_typeof(env, argv[0], &valueType);
557         CHECK_STATUS_RETURN_VOID(ctxt, "napi_typeof failed!");
558         CHECK_ARGS_RETURN_VOID(ctxt, valueType == napi_function, "invalid arg[1], i.e. invalid callback");
559         std::lock_guard<std::mutex> lck(proxy->listMutex_);
560         auto it = proxy->syncObservers_.begin();
561         while (it != proxy->syncObservers_.end()) {
562             if (JSUtil::Equals(env, argv[0], (*it)->GetCallback())) {
563                 (*it)->Clear();
564                 proxy->syncObservers_.erase(it);
565                 break;
566             }
567         }
568         ctxt->status = napi_ok;
569     }
570     ZLOGI("unsubscribe syncComplete, %{public}s specified observer.", (argc == 0) ? "without": "with");
571     if (argc == 0 || proxy->syncObservers_.empty()) {
572         ctxt->status = proxy->UnRegisterSyncCallback();
573     }
574     CHECK_STATUS_RETURN_VOID(ctxt, "UnRegisterSyncCallback failed!");
575 }
576 
577 /*
578  * [Internal private non-static]
579  */
RegisterSyncCallback(std::shared_ptr<SyncObserver> callback)580 napi_status JsKVStore::RegisterSyncCallback(std::shared_ptr<SyncObserver> callback)
581 {
582     Status status = kvStore_->RegisterSyncCallback(callback);
583     if (status != Status::SUCCESS) {
584         callback->Clear();
585         return napi_generic_failure;
586     }
587     std::lock_guard<std::mutex> lck(listMutex_);
588     syncObservers_.push_back(callback);
589     return napi_ok;
590 }
591 
UnRegisterSyncCallback()592 napi_status JsKVStore::UnRegisterSyncCallback()
593 {
594     Status status = kvStore_->UnRegisterSyncCallback();
595     if (status != Status::SUCCESS) {
596         return napi_generic_failure;
597     }
598     std::lock_guard<std::mutex> lck(listMutex_);
599     for (auto &syncObserver : syncObservers_) {
600         syncObserver->Clear();
601     }
602     syncObservers_.clear();
603     return napi_ok;
604 }
605 
Subscribe(uint8_t type,std::shared_ptr<DataObserver> observer)606 napi_status JsKVStore::Subscribe(uint8_t type, std::shared_ptr<DataObserver> observer)
607 {
608     auto subscribeType = ToSubscribeType(type);
609     Status status = kvStore_->SubscribeKvStore(subscribeType, observer);
610     ZLOGD("kvStore_->SubscribeKvStore(%{public}d) return %{public}d", type, status);
611     if (status != Status::SUCCESS) {
612         observer->Clear();
613         return napi_generic_failure;
614     }
615     dataObserver_[type].push_back(observer);
616     return napi_ok;
617 }
618 
UnSubscribe(uint8_t type,std::shared_ptr<DataObserver> observer)619 napi_status JsKVStore::UnSubscribe(uint8_t type, std::shared_ptr<DataObserver> observer)
620 {
621     auto subscribeType = ToSubscribeType(type);
622     Status status = kvStore_->UnSubscribeKvStore(subscribeType, observer);
623     ZLOGD("kvStore_->UnSubscribeKvStore(%{public}d) return %{public}d", type, status);
624     if (status == Status::SUCCESS) {
625         observer->Clear();
626         return napi_ok;
627     }
628     return napi_generic_failure;
629 }
630 
SetUvQueue(std::shared_ptr<UvQueue> uvQueue)631 void JsKVStore::SetUvQueue(std::shared_ptr<UvQueue> uvQueue)
632 {
633     uvQueue_ = uvQueue;
634 }
635 
IsSchemaStore() const636 bool JsKVStore::IsSchemaStore() const
637 {
638     return isSchemaStore_;
639 }
640 
SetSchemaInfo(bool isSchemaStore)641 void JsKVStore::SetSchemaInfo(bool isSchemaStore)
642 {
643     isSchemaStore_ = isSchemaStore;
644 }
645 
OnChange(const ChangeNotification & notification)646 void JsKVStore::DataObserver::OnChange(const ChangeNotification& notification)
647 {
648     ZLOGD("data change insert:%{public}zu, update:%{public}zu, delete:%{public}zu",
649         notification.GetInsertEntries().size(), notification.GetUpdateEntries().size(),
650         notification.GetDeleteEntries().size());
651     KvStoreObserver::OnChange(notification);
652 
653     auto args = [notification, isSchema = isSchema_](napi_env env, int& argc, napi_value* argv) {
654         // generate 1 arguments for callback function.
655         argc = 1;
656         JSUtil::SetValue(env, notification, argv[0], isSchema);
657     };
658     AsyncCall(args);
659 }
660 
SyncCompleted(const std::map<std::string,DistributedKv::Status> & results)661 void JsKVStore::SyncObserver::SyncCompleted(const std::map<std::string, DistributedKv::Status>& results)
662 {
663     auto args = [results](napi_env env, int& argc, napi_value* argv) {
664         // generate 1 arguments for callback function.
665         argc = 1;
666         JSUtil::SetValue(env, results, argv[0]);
667     };
668     AsyncCall(args);
669 }
670 } // namespace OHOS::DistributedData
671