1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define LOG_TAG "JsKVStore"
16 #include "js_kv_store.h"
17 #include "js_util.h"
18 #include "js_kv_store_resultset.h"
19 #include "log_print.h"
20 #include "napi_queue.h"
21 #include "datashare_values_bucket.h"
22 #include "datashare_predicates.h"
23 #include "single_kvstore.h"
24 #include "kv_utils.h"
25 #include "kvstore_datashare_bridge.h"
26
27 using namespace OHOS::DistributedKv;
28 using namespace OHOS::DataShare;
29
30 namespace OHOS::DistributedData {
31 inline static uint8_t UNVALID_SUBSCRIBE_TYPE = 255;
32 std::map<std::string, JsKVStore::Exec> JsKVStore::onEventHandlers_ = {
33 { "dataChange", JsKVStore::OnDataChange },
34 { "syncComplete", JsKVStore::OnSyncComplete }
35 };
36
37 std::map<std::string, JsKVStore::Exec> JsKVStore::offEventHandlers_ = {
38 { "dataChange", JsKVStore::OffDataChange },
39 { "syncComplete", JsKVStore::OffSyncComplete }
40 };
41
ValidSubscribeType(uint8_t type)42 static bool ValidSubscribeType(uint8_t type)
43 {
44 return (SUBSCRIBE_LOCAL <= type) && (type <= SUBSCRIBE_LOCAL_REMOTE);
45 }
46
ToSubscribeType(uint8_t type)47 static SubscribeType ToSubscribeType(uint8_t type)
48 {
49 switch (type) {
50 case 0: // 0 means SUBSCRIBE_TYPE_LOCAL
51 return SubscribeType::SUBSCRIBE_TYPE_LOCAL;
52 case 1: // 1 means SUBSCRIBE_TYPE_REMOTE
53 return SubscribeType::SUBSCRIBE_TYPE_REMOTE;
54 case 2: // 2 means SUBSCRIBE_TYPE_ALL
55 return SubscribeType::SUBSCRIBE_TYPE_ALL;
56 default:
57 return static_cast<SubscribeType>(UNVALID_SUBSCRIBE_TYPE);
58 }
59 }
60
JsKVStore(const std::string & storeId)61 JsKVStore::JsKVStore(const std::string& storeId)
62 : storeId_(storeId)
63 {
64 }
65
~JsKVStore()66 JsKVStore::~JsKVStore()
67 {
68 ZLOGD("no memory leak for JsKVStore");
69 if (kvStore_ == nullptr) {
70 return;
71 }
72
73 std::lock_guard<std::mutex> lck(listMutex_);
74 for (uint8_t type = SUBSCRIBE_LOCAL; type < SUBSCRIBE_COUNT; type++) {
75 for (auto& observer : dataObserver_[type]) {
76 auto subscribeType = ToSubscribeType(type);
77 kvStore_->UnSubscribeKvStore(subscribeType, observer);
78 observer->Clear();
79 }
80 dataObserver_[type].clear();
81 }
82
83 kvStore_->UnRegisterSyncCallback();
84 for (auto &syncObserver : syncObservers_) {
85 syncObserver->Clear();
86 }
87 syncObservers_.clear();
88 }
89
SetNative(std::shared_ptr<SingleKvStore> & kvStore)90 void JsKVStore::SetNative(std::shared_ptr<SingleKvStore>& kvStore)
91 {
92 kvStore_ = kvStore;
93 }
94
GetNative()95 std::shared_ptr<SingleKvStore>& JsKVStore::GetNative()
96 {
97 return kvStore_;
98 }
99
SetContextParam(std::shared_ptr<ContextParam> param)100 void JsKVStore::SetContextParam(std::shared_ptr<ContextParam> param)
101 {
102 param_ = param;
103 }
104
IsInstanceOf(napi_env env,napi_value obj,const std::string & storeId,napi_value constructor)105 bool JsKVStore::IsInstanceOf(napi_env env, napi_value obj, const std::string& storeId, napi_value constructor)
106 {
107 bool result = false;
108 napi_status status = napi_instanceof(env, obj, constructor, &result);
109 CHECK_RETURN((status == napi_ok) && (result != false), "is not instance of JsKVStore!", false);
110
111 JsKVStore* kvStore = nullptr;
112 status = napi_unwrap(env, obj, reinterpret_cast<void**>(&kvStore));
113 CHECK_RETURN((status == napi_ok) && (kvStore != nullptr), "can not unwrap to JsKVStore!", false);
114 return kvStore->storeId_ == storeId;
115 }
116
117 /*
118 * [JS API Prototype]
119 * [AsyncCallback]
120 * put(key:string, value:Uint8Array | string | boolean | number, callback: AsyncCallback<void>):void;
121 * [Promise]
122 * put(key:string, value:Uint8Array | string | boolean | number):Promise<void>;
123 */
Put(napi_env env,napi_callback_info info)124 napi_value JsKVStore::Put(napi_env env, napi_callback_info info)
125 {
126 ZLOGD("KVStore::Put()");
127 struct PutContext : public ContextBase {
128 std::string key;
129 JSUtil::KvStoreVariant value;
130 };
131 auto ctxt = std::make_shared<PutContext>();
132
133 ctxt->GetCbInfo(env, info, [env, ctxt](size_t argc, napi_value* argv) {
134 // required 2 arguments :: <key> <value>
135 CHECK_ARGS_RETURN_VOID(ctxt, argc == 2, "invalid arguments!");
136 ctxt->status = JSUtil::GetValue(env, argv[0], ctxt->key);
137 CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid key!");
138 ctxt->status = JSUtil::GetValue(env, argv[1], ctxt->value);
139 CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[1], i.e. invalid value!");
140 });
141
142 auto execute = [ctxt]() {
143 DistributedKv::Key key(ctxt->key);
144 bool isSchemaStore = reinterpret_cast<JsKVStore *>(ctxt->native)->IsSchemaStore();
145 auto &kvStore = reinterpret_cast<JsKVStore *>(ctxt->native)->kvStore_;
146 DistributedKv::Value value = isSchemaStore ? DistributedKv::Blob(std::get<std::string>(ctxt->value))
147 : JSUtil::VariantValue2Blob(ctxt->value);
148 Status status = kvStore->Put(key, value);
149 ZLOGD("kvStore->Put return %{public}d", status);
150 ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
151 CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->Put() failed!");
152 };
153 return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
154 }
155
156 /*
157 * [JS API Prototype]
158 * [AsyncCallback]
159 * delete(key: string, callback: AsyncCallback<void>): void;
160 * [Promise]
161 * delete(key: string): Promise<void>;
162 */
Delete(napi_env env,napi_callback_info info)163 napi_value JsKVStore::Delete(napi_env env, napi_callback_info info)
164 {
165 ZLOGD("KVStore::Delete()");
166 struct DeleteContext : public ContextBase {
167 std::string key;
168 };
169 auto ctxt = std::make_shared<DeleteContext>();
170
171 ctxt->GetCbInfo(env, info, [env, ctxt](size_t argc, napi_value* argv) {
172 // required 1 arguments :: <key>
173 CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments!");
174 ctxt->status = JSUtil::GetValue(env, argv[0], ctxt->key);
175 ZLOGD("kvStore->Delete %{public}.6s status:%{public}d", ctxt->key.c_str(), ctxt->status);
176 CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid key!");
177 });
178
179 return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), [ctxt]() {
180 OHOS::DistributedKv::Key key(ctxt->key);
181 auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
182 Status status = kvStore->Delete(key);
183 ZLOGD("kvStore->Delete %{public}.6s status:%{public}d", ctxt->key.c_str(), status);
184 ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
185 CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->Delete() failed!");
186 });
187 }
188
189 /*
190 * [JS API Prototype]
191 * [Callback]
192 * on(event:'syncComplete',syncCallback: Callback<Array<[string, number]>>):void;
193 * on(event:'dataChange', subType: SubscribeType, observer: Callback<ChangeNotification>): void;
194 */
OnEvent(napi_env env,napi_callback_info info)195 napi_value JsKVStore::OnEvent(napi_env env, napi_callback_info info)
196 {
197 ZLOGD("in");
198 auto ctxt = std::make_shared<ContextBase>();
199 auto input = [env, ctxt](size_t argc, napi_value* argv) {
200 // required 2 arguments :: <event> [...] <callback>
201 CHECK_ARGS_RETURN_VOID(ctxt, argc >= 2, "invalid arguments!");
202 std::string event;
203 ctxt->status = JSUtil::GetValue(env, argv[0], event);
204 ZLOGI("subscribe to event:%{public}s", event.c_str());
205 auto handle = onEventHandlers_.find(event);
206 CHECK_ARGS_RETURN_VOID(ctxt, handle != onEventHandlers_.end(), "invalid arg[0], i.e. unsupported event");
207 // shift 1 argument, for JsKVStore::Exec.
208 handle->second(env, argc - 1, &argv[1], ctxt);
209 };
210 ctxt->GetCbInfoSync(env, info, input);
211 NAPI_ASSERT(env, ctxt->status == napi_ok, "invalid arguments!");
212 return nullptr;
213 }
214
215 /*
216 * [JS API Prototype]
217 * [Callback]
218 * off(event:'syncComplete',syncCallback: Callback<Array<[string, number]>>):void;
219 * off(event:'dataChange', subType: SubscribeType, observer: Callback<ChangeNotification>): void;
220 */
OffEvent(napi_env env,napi_callback_info info)221 napi_value JsKVStore::OffEvent(napi_env env, napi_callback_info info)
222 {
223 ZLOGD("in");
224 auto ctxt = std::make_shared<ContextBase>();
225 auto input = [env, ctxt](size_t argc, napi_value* argv) {
226 // required 1 arguments :: <event> [callback]
227 CHECK_ARGS_RETURN_VOID(ctxt, argc >= 1, "invalid arguments!");
228 std::string event;
229 ctxt->status = JSUtil::GetValue(env, argv[0], event);
230 ZLOGI("unsubscribe to event:%{public}s", event.c_str());
231 auto handle = offEventHandlers_.find(event);
232 CHECK_ARGS_RETURN_VOID(ctxt, handle != offEventHandlers_.end(), "invalid arg[0], i.e. unsupported event");
233 // shift 1 argument, for JsKVStore::Exec.
234 handle->second(env, argc - 1, &argv[1], ctxt);
235 };
236 ctxt->GetCbInfoSync(env, info, input);
237 NAPI_ASSERT(env, ctxt->status == napi_ok, "invalid arguments!");
238 return nullptr;
239 }
240
241 /*
242 * [JS API Prototype]
243 * [AsyncCallback]
244 * putBatch(entries: Entry[], callback: AsyncCallback<void>):void;
245 * [Promise]
246 * putBatch(entries: Entry[]):Promise<void>;
247 */
PutBatch(napi_env env,napi_callback_info info)248 napi_value JsKVStore::PutBatch(napi_env env, napi_callback_info info)
249 {
250 struct PutBatchContext : public ContextBase {
251 std::vector<Entry> entries;
252 };
253 auto ctxt = std::make_shared<PutBatchContext>();
254
255 ctxt->GetCbInfo(env, info, [env, ctxt](size_t argc, napi_value* argv) {
256 // required 1 arguments :: <entries>
257 CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments!");
258 auto isSchemaStore = reinterpret_cast<JsKVStore*>(ctxt->native)->IsSchemaStore();
259 ctxt->status = JSUtil::GetValue(env, argv[0], ctxt->entries, isSchemaStore);
260 CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid entries!");
261 });
262
263 auto execute = [ctxt]() {
264 auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
265 Status status = kvStore->PutBatch(ctxt->entries);
266 ZLOGD("kvStore->DeleteBatch return %{public}d", status);
267 ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
268 CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->PutBatch() failed!");
269 };
270 return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
271 }
272
273 /*
274 * [JS API Prototype]
275 * [AsyncCallback]
276 * deleteBatch(keys: string[], callback: AsyncCallback<void>):void;
277 * [Promise]
278 * deleteBatch(keys: string[]):Promise<void>;
279 */
DeleteBatch(napi_env env,napi_callback_info info)280 napi_value JsKVStore::DeleteBatch(napi_env env, napi_callback_info info)
281 {
282 struct DeleteBatchContext : public ContextBase {
283 std::vector<std::string> keys;
284 };
285 auto ctxt = std::make_shared<DeleteBatchContext>();
286 auto input = [env, ctxt](size_t argc, napi_value* argv) {
287 // required 1 arguments :: <keys>
288 CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments!");
289 JSUtil::GetValue(env, argv[0], ctxt->keys);
290 CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid keys!");
291 };
292 ctxt->GetCbInfo(env, info, input);
293
294 auto execute = [ctxt]() {
295 std::vector<DistributedKv::Key> keys;
296 for (auto it : ctxt->keys) {
297 DistributedKv::Key key(it);
298 keys.push_back(key);
299 }
300 auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
301 Status status = kvStore->DeleteBatch(keys);
302 ZLOGD("kvStore->DeleteBatch return %{public}d", status);
303 ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
304 CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->DeleteBatch failed!");
305 };
306 return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
307 }
308
309 /*
310 * [JS API Prototype]
311 * [AsyncCallback]
312 * startTransaction(callback: AsyncCallback<void>):void;
313 * [Promise]
314 * startTransaction() : Promise<void>;
315 */
StartTransaction(napi_env env,napi_callback_info info)316 napi_value JsKVStore::StartTransaction(napi_env env, napi_callback_info info)
317 {
318 auto ctxt = std::make_shared<ContextBase>();
319 ctxt->GetCbInfo(env, info);
320
321 auto execute = [ctxt]() {
322 auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
323 Status status = kvStore->StartTransaction();
324 ZLOGD("kvStore->StartTransaction return %{public}d", status);
325 ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
326 CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->StartTransaction() failed!");
327 };
328 return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
329 }
330
331 /*
332 * [JS API Prototype]
333 * [AsyncCallback]
334 * commit(callback: AsyncCallback<void>):void;
335 * [Promise]
336 * commit() : Promise<void>;
337 */
Commit(napi_env env,napi_callback_info info)338 napi_value JsKVStore::Commit(napi_env env, napi_callback_info info)
339 {
340 auto ctxt = std::make_shared<ContextBase>();
341 ctxt->GetCbInfo(env, info);
342
343 auto execute = [ctxt]() {
344 auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
345 Status status = kvStore->Commit();
346 ZLOGD("kvStore->Commit return %{public}d", status);
347 ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
348 CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->Commit() failed!");
349 };
350 return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
351 }
352
353 /*
354 * [JS API Prototype]
355 * [AsyncCallback]
356 * rollback(callback: AsyncCallback<void>):void;
357 * [Promise]
358 * rollback() : Promise<void>;
359 */
Rollback(napi_env env,napi_callback_info info)360 napi_value JsKVStore::Rollback(napi_env env, napi_callback_info info)
361 {
362 auto ctxt = std::make_shared<ContextBase>();
363 ctxt->GetCbInfo(env, info);
364
365 auto execute = [ctxt]() {
366 auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
367 Status status = kvStore->Rollback();
368 ZLOGD("kvStore->Commit return %{public}d", status);
369 ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
370 CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->Rollback() failed!");
371 };
372 return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
373 }
374
375 /*
376 * [JS API Prototype]
377 * [AsyncCallback]
378 * enableSync(enabled:boolean, callback: AsyncCallback<void>):void;
379 * [Promise]
380 * enableSync(enabled:boolean) : Promise<void>;
381 */
EnableSync(napi_env env,napi_callback_info info)382 napi_value JsKVStore::EnableSync(napi_env env, napi_callback_info info)
383 {
384 struct EnableSyncContext : public ContextBase {
385 bool enable = false;
386 };
387 auto ctxt = std::make_shared<EnableSyncContext>();
388 auto input = [env, ctxt](size_t argc, napi_value* argv) {
389 // required 1 arguments :: <enable>
390 CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments!");
391 ctxt->status = napi_get_value_bool(env, argv[0], &ctxt->enable);
392 CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid enabled!");
393 };
394 ctxt->GetCbInfo(env, info, input);
395
396 auto execute = [ctxt]() {
397 auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
398 Status status = kvStore->SetCapabilityEnabled(ctxt->enable);
399 ZLOGD("kvStore->SetCapabilityEnabled return %{public}d", status);
400 ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
401 CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->SetCapabilityEnabled() failed!");
402 };
403 return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
404 }
405
406 /*
407 * [JS API Prototype]
408 * [AsyncCallback]
409 * setSyncRange(localLabels:string[], remoteSupportLabels:string[], callback: AsyncCallback<void>):void;
410 * [Promise]
411 * setSyncRange(localLabels:string[], remoteSupportLabels:string[]) : Promise<void>;
412 */
SetSyncRange(napi_env env,napi_callback_info info)413 napi_value JsKVStore::SetSyncRange(napi_env env, napi_callback_info info)
414 {
415 struct SyncRangeContext : public ContextBase {
416 std::vector<std::string> localLabels;
417 std::vector<std::string> remoteSupportLabels;
418 };
419 auto ctxt = std::make_shared<SyncRangeContext>();
420 auto input = [env, ctxt](size_t argc, napi_value* argv) {
421 // required 2 arguments :: <localLabels> <remoteSupportLabels>
422 CHECK_ARGS_RETURN_VOID(ctxt, argc == 2, "invalid arguments!");
423 ctxt->status = JSUtil::GetValue(env, argv[0], ctxt->localLabels);
424 CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[0], i.e. invalid localLabels!");
425 ctxt->status = JSUtil::GetValue(env, argv[1], ctxt->remoteSupportLabels);
426 CHECK_STATUS_RETURN_VOID(ctxt, "invalid arg[1], i.e. invalid remoteSupportLabels!");
427 };
428 ctxt->GetCbInfo(env, info, input);
429
430 auto execute = [ctxt]() {
431 auto& kvStore = reinterpret_cast<JsKVStore*>(ctxt->native)->kvStore_;
432 Status status = kvStore->SetCapabilityRange(ctxt->localLabels, ctxt->remoteSupportLabels);
433 ZLOGD("kvStore->SetCapabilityRange return %{public}d", status);
434 ctxt->status = (status == Status::SUCCESS) ? napi_ok : napi_generic_failure;
435 CHECK_STATUS_RETURN_VOID(ctxt, "kvStore->SetCapabilityRange() failed!");
436 };
437 return NapiQueue::AsyncWork(env, ctxt, std::string(__FUNCTION__), execute);
438 }
439
440 /*
441 * [JS API Prototype] JsKVStore::OnDataChange is private non-static.
442 * [Callback]
443 * on(event:'dataChange', subType: SubscribeType, observer: Callback<ChangeNotification>): void;
444 */
OnDataChange(napi_env env,size_t argc,napi_value * argv,std::shared_ptr<ContextBase> ctxt)445 void JsKVStore::OnDataChange(napi_env env, size_t argc, napi_value* argv, std::shared_ptr<ContextBase> ctxt)
446 {
447 // required 2 arguments :: <SubscribeType> <observer>
448 CHECK_ARGS_RETURN_VOID(ctxt, argc == 2, "invalid arguments on dataChange!");
449
450 int32_t type = SUBSCRIBE_COUNT;
451 ctxt->status = napi_get_value_int32(env, argv[0], &type);
452 CHECK_STATUS_RETURN_VOID(ctxt, "napi_get_value_int32 failed!");
453 CHECK_ARGS_RETURN_VOID(ctxt, ValidSubscribeType(type), "invalid arg[1], i.e. invalid subscribeType");
454
455 napi_valuetype valueType = napi_undefined;
456 ctxt->status = napi_typeof(env, argv[1], &valueType);
457 CHECK_STATUS_RETURN_VOID(ctxt, "napi_typeof failed!");
458 CHECK_ARGS_RETURN_VOID(ctxt, valueType == napi_function, "invalid arg[2], i.e. invalid callback");
459
460 ZLOGI("subscribe dataChange, type: %{public}d", type);
461 auto proxy = reinterpret_cast<JsKVStore*>(ctxt->native);
462 std::lock_guard<std::mutex> lck(proxy->listMutex_);
463 for (auto& it : proxy->dataObserver_[type]) {
464 if (JSUtil::Equals(env, argv[1], it->GetCallback())) {
465 ZLOGI("function is already subscribe type");
466 return;
467 }
468 }
469
470 ctxt->status =
471 proxy->Subscribe(type, std::make_shared<DataObserver>(proxy->uvQueue_, argv[1], proxy->IsSchemaStore()));
472 CHECK_STATUS_RETURN_VOID(ctxt, "Subscribe failed!");
473 }
474
475 /*
476 * [JS API Prototype] JsKVStore::OffDataChange is private non-static.
477 * [Callback]
478 * on(event:'dataChange', subType: SubscribeType, observer: Callback<ChangeNotification>): void;
479 * [NOTES!!!] no SubscribeType while off...
480 * off(event:'dataChange', observer: Callback<ChangeNotification>): void;
481 */
OffDataChange(napi_env env,size_t argc,napi_value * argv,std::shared_ptr<ContextBase> ctxt)482 void JsKVStore::OffDataChange(napi_env env, size_t argc, napi_value* argv, std::shared_ptr<ContextBase> ctxt)
483 {
484 // required 1 arguments :: [callback]
485 CHECK_ARGS_RETURN_VOID(ctxt, argc <= 1, "invalid arguments off dataChange!");
486 // have 1 arguments :: have the callback
487 if (argc == 1) {
488 napi_valuetype valueType = napi_undefined;
489 ctxt->status = napi_typeof(env, argv[0], &valueType);
490 CHECK_STATUS_RETURN_VOID(ctxt, "napi_typeof failed!");
491 CHECK_ARGS_RETURN_VOID(ctxt, valueType == napi_function, "invalid arg[1], i.e. invalid callback");
492 }
493 ZLOGI("unsubscribe dataChange, %{public}s specified observer.", (argc == 0) ? "without": "with");
494
495 auto proxy = reinterpret_cast<JsKVStore*>(ctxt->native);
496 bool found = false;
497 napi_status status = napi_ok;
498 auto traverseType = [argc, argv, proxy, env, &found, &status](uint8_t type, auto& observers) {
499 auto it = observers.begin();
500 while (it != observers.end()) {
501 if ((argc == 1) && !JSUtil::Equals(env, argv[0], (*it)->GetCallback())) {
502 ++it;
503 continue; // specified observer and not current iterator
504 }
505 found = true;
506 status = proxy->UnSubscribe(type, *it);
507 if (status != napi_ok) {
508 break; // stop on fail.
509 }
510 it = observers.erase(it);
511 }
512 };
513
514 std::lock_guard<std::mutex> lck(proxy->listMutex_);
515 for (uint8_t type = SUBSCRIBE_LOCAL; type < SUBSCRIBE_COUNT; type++) {
516 traverseType(type, proxy->dataObserver_[type]);
517 if (status != napi_ok) {
518 break; // stop on fail.
519 }
520 }
521 CHECK_ARGS_RETURN_VOID(ctxt, found || (argc == 0), "not Subscribed!");
522 }
523
524 /*
525 * [JS API Prototype] JsKVStore::OnSyncComplete is private non-static.
526 * [Callback]
527 * on(event:'syncComplete',syncCallback: Callback<Array<[string, number]>>):void;
528 */
OnSyncComplete(napi_env env,size_t argc,napi_value * argv,std::shared_ptr<ContextBase> ctxt)529 void JsKVStore::OnSyncComplete(napi_env env, size_t argc, napi_value* argv, std::shared_ptr<ContextBase> ctxt)
530 {
531 // required 1 arguments :: <callback>
532 CHECK_ARGS_RETURN_VOID(ctxt, argc == 1, "invalid arguments on syncComplete!");
533 napi_valuetype valueType = napi_undefined;
534 ctxt->status = napi_typeof(env, argv[0], &valueType);
535 CHECK_STATUS_RETURN_VOID(ctxt, "napi_typeof failed!");
536 CHECK_ARGS_RETURN_VOID(ctxt, valueType == napi_function, "invalid arg[1], i.e. invalid callback");
537
538 auto proxy = reinterpret_cast<JsKVStore*>(ctxt->native);
539 ctxt->status = proxy->RegisterSyncCallback(std::make_shared<SyncObserver>(proxy->uvQueue_, argv[0]));
540 CHECK_STATUS_RETURN_VOID(ctxt, "RegisterSyncCallback failed!");
541 }
542
543 /*
544 * [JS API Prototype] JsKVStore::OffSyncComplete is private non-static.
545 * [Callback]
546 * off(event:'syncComplete',syncCallback: Callback<Array<[string, number]>>):void;
547 */
OffSyncComplete(napi_env env,size_t argc,napi_value * argv,std::shared_ptr<ContextBase> ctxt)548 void JsKVStore::OffSyncComplete(napi_env env, size_t argc, napi_value* argv, std::shared_ptr<ContextBase> ctxt)
549 {
550 // required 1 arguments :: [callback]
551 CHECK_ARGS_RETURN_VOID(ctxt, argc <= 1, "invalid arguments off syncComplete!");
552 auto proxy = reinterpret_cast<JsKVStore*>(ctxt->native);
553 // have 1 arguments :: have the callback
554 if (argc == 1) {
555 napi_valuetype valueType = napi_undefined;
556 ctxt->status = napi_typeof(env, argv[0], &valueType);
557 CHECK_STATUS_RETURN_VOID(ctxt, "napi_typeof failed!");
558 CHECK_ARGS_RETURN_VOID(ctxt, valueType == napi_function, "invalid arg[1], i.e. invalid callback");
559 std::lock_guard<std::mutex> lck(proxy->listMutex_);
560 auto it = proxy->syncObservers_.begin();
561 while (it != proxy->syncObservers_.end()) {
562 if (JSUtil::Equals(env, argv[0], (*it)->GetCallback())) {
563 (*it)->Clear();
564 proxy->syncObservers_.erase(it);
565 break;
566 }
567 }
568 ctxt->status = napi_ok;
569 }
570 ZLOGI("unsubscribe syncComplete, %{public}s specified observer.", (argc == 0) ? "without": "with");
571 if (argc == 0 || proxy->syncObservers_.empty()) {
572 ctxt->status = proxy->UnRegisterSyncCallback();
573 }
574 CHECK_STATUS_RETURN_VOID(ctxt, "UnRegisterSyncCallback failed!");
575 }
576
577 /*
578 * [Internal private non-static]
579 */
RegisterSyncCallback(std::shared_ptr<SyncObserver> callback)580 napi_status JsKVStore::RegisterSyncCallback(std::shared_ptr<SyncObserver> callback)
581 {
582 Status status = kvStore_->RegisterSyncCallback(callback);
583 if (status != Status::SUCCESS) {
584 callback->Clear();
585 return napi_generic_failure;
586 }
587 std::lock_guard<std::mutex> lck(listMutex_);
588 syncObservers_.push_back(callback);
589 return napi_ok;
590 }
591
UnRegisterSyncCallback()592 napi_status JsKVStore::UnRegisterSyncCallback()
593 {
594 Status status = kvStore_->UnRegisterSyncCallback();
595 if (status != Status::SUCCESS) {
596 return napi_generic_failure;
597 }
598 std::lock_guard<std::mutex> lck(listMutex_);
599 for (auto &syncObserver : syncObservers_) {
600 syncObserver->Clear();
601 }
602 syncObservers_.clear();
603 return napi_ok;
604 }
605
Subscribe(uint8_t type,std::shared_ptr<DataObserver> observer)606 napi_status JsKVStore::Subscribe(uint8_t type, std::shared_ptr<DataObserver> observer)
607 {
608 auto subscribeType = ToSubscribeType(type);
609 Status status = kvStore_->SubscribeKvStore(subscribeType, observer);
610 ZLOGD("kvStore_->SubscribeKvStore(%{public}d) return %{public}d", type, status);
611 if (status != Status::SUCCESS) {
612 observer->Clear();
613 return napi_generic_failure;
614 }
615 dataObserver_[type].push_back(observer);
616 return napi_ok;
617 }
618
UnSubscribe(uint8_t type,std::shared_ptr<DataObserver> observer)619 napi_status JsKVStore::UnSubscribe(uint8_t type, std::shared_ptr<DataObserver> observer)
620 {
621 auto subscribeType = ToSubscribeType(type);
622 Status status = kvStore_->UnSubscribeKvStore(subscribeType, observer);
623 ZLOGD("kvStore_->UnSubscribeKvStore(%{public}d) return %{public}d", type, status);
624 if (status == Status::SUCCESS) {
625 observer->Clear();
626 return napi_ok;
627 }
628 return napi_generic_failure;
629 }
630
SetUvQueue(std::shared_ptr<UvQueue> uvQueue)631 void JsKVStore::SetUvQueue(std::shared_ptr<UvQueue> uvQueue)
632 {
633 uvQueue_ = uvQueue;
634 }
635
IsSchemaStore() const636 bool JsKVStore::IsSchemaStore() const
637 {
638 return isSchemaStore_;
639 }
640
SetSchemaInfo(bool isSchemaStore)641 void JsKVStore::SetSchemaInfo(bool isSchemaStore)
642 {
643 isSchemaStore_ = isSchemaStore;
644 }
645
OnChange(const ChangeNotification & notification)646 void JsKVStore::DataObserver::OnChange(const ChangeNotification& notification)
647 {
648 ZLOGD("data change insert:%{public}zu, update:%{public}zu, delete:%{public}zu",
649 notification.GetInsertEntries().size(), notification.GetUpdateEntries().size(),
650 notification.GetDeleteEntries().size());
651 KvStoreObserver::OnChange(notification);
652
653 auto args = [notification, isSchema = isSchema_](napi_env env, int& argc, napi_value* argv) {
654 // generate 1 arguments for callback function.
655 argc = 1;
656 JSUtil::SetValue(env, notification, argv[0], isSchema);
657 };
658 AsyncCall(args);
659 }
660
SyncCompleted(const std::map<std::string,DistributedKv::Status> & results)661 void JsKVStore::SyncObserver::SyncCompleted(const std::map<std::string, DistributedKv::Status>& results)
662 {
663 auto args = [results](napi_env env, int& argc, napi_value* argv) {
664 // generate 1 arguments for callback function.
665 argc = 1;
666 JSUtil::SetValue(env, results, argv[0]);
667 };
668 AsyncCall(args);
669 }
670 } // namespace OHOS::DistributedData
671