1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <gtest/gtest.h>
17 #include <thread>
18
19 #include "db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "kv_store_nb_delegate.h"
23 #include "kv_virtual_device.h"
24 #include "platform_specific.h"
25 #include "query.h"
26 #include "query_sync_object.h"
27 #include "runtime_config.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_serialize_manager.h"
30 #include "subscribe_manager.h"
31 #include "subscribe_recorder.h"
32 #include "sync_types.h"
33
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 using namespace std;
38
39 namespace {
40 string g_testDir;
41 const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
42 const std::string DEVICE_A = "deviceA";
43 const std::string DEVICE_B = "deviceB";
44 const std::string DEVICE_C = "deviceC";
45
46 KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
47 KvStoreConfig g_config;
48 DistributedDBToolsUnitTest g_tool;
49 DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
50 KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
51 VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
52 KvVirtualDevice* g_deviceB = nullptr;
53 KvVirtualDevice* g_deviceC = nullptr;
54
55 // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
56 auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
57 placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
58 const string SCHEMA_STRING =
59 "{\"SCHEMA_VERSION\":\"1.0\","
60 "\"SCHEMA_MODE\":\"STRICT\","
61 "\"SCHEMA_DEFINE\":{"
62 "\"field_name1\":\"BOOL\","
63 "\"field_name2\":\"BOOL\","
64 "\"field_name3\":\"INTEGER, NOT NULL\","
65 "\"field_name4\":\"LONG, DEFAULT 100\","
66 "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
67 "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
68 "\"field_name7\":\"LONG, DEFAULT 100\","
69 "\"field_name8\":\"LONG, DEFAULT 100\","
70 "\"field_name9\":\"LONG, DEFAULT 100\","
71 "\"field_name10\":\"LONG, DEFAULT 100\""
72 "},"
73 "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
74
75 const std::string SCHEMA_VALUE1 =
76 "{\"field_name1\":true,"
77 "\"field_name2\":false,"
78 "\"field_name3\":10,"
79 "\"field_name4\":20,"
80 "\"field_name5\":3.14,"
81 "\"field_name6\":\"3.1415\","
82 "\"field_name7\":100,"
83 "\"field_name8\":100,"
84 "\"field_name9\":100,"
85 "\"field_name10\":100}";
86
87 const std::string SCHEMA_VALUE2 =
88 "{\"field_name1\":false,"
89 "\"field_name2\":true,"
90 "\"field_name3\":100,"
91 "\"field_name4\":200,"
92 "\"field_name5\":3.14,"
93 "\"field_name6\":\"3.1415\","
94 "\"field_name7\":100,"
95 "\"field_name8\":100,"
96 "\"field_name9\":100,"
97 "\"field_name10\":100}";
98
99 class DistributedDBSingleVerP2PSubscribeSyncTest : public testing::Test {
100 public:
101 static void SetUpTestCase(void);
102 static void TearDownTestCase(void);
103 void SetUp();
104 void TearDown();
105 protected:
106 static void WaitUntilNotify(KvVirtualDevice &virtualDevice);
107 };
108
SetUpTestCase(void)109 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUpTestCase(void)
110 {
111 /**
112 * @tc.setup: Init datadir and Virtual Communicator.
113 */
114 DistributedDBToolsUnitTest::TestDirInit(g_testDir);
115 g_config.dataDir = g_testDir;
116 g_schemaMgr.SetKvStoreConfig(g_config);
117
118 string dir = g_testDir + "/single_ver";
119 DIR* dirTmp = opendir(dir.c_str());
120 if (dirTmp == nullptr) {
121 OS::MakeDBDirectory(dir);
122 } else {
123 closedir(dirTmp);
124 }
125
126 g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
127 ASSERT_TRUE(g_communicatorAggregator != nullptr);
128 RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
129 }
130
TearDownTestCase(void)131 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDownTestCase(void)
132 {
133 /**
134 * @tc.teardown: Release virtual Communicator and clear data dir.
135 */
136 if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
137 LOGE("rm test db files error!");
138 }
139 RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
140 }
141
SetUp(void)142 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUp(void)
143 {
144 DistributedDBToolsUnitTest::PrintTestCaseInfo();
145 /**
146 * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
147 */
148 g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
149 ASSERT_TRUE(g_deviceB != nullptr);
150 VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
151 ASSERT_TRUE(syncInterfaceB != nullptr);
152 ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
153 g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
154 ASSERT_TRUE(g_deviceC != nullptr);
155 VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
156 ASSERT_TRUE(syncInterfaceC != nullptr);
157 ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
158 }
159
TearDown(void)160 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDown(void)
161 {
162 /**
163 * @tc.teardown: Release device A, B
164 */
165 if (g_schemaKvDelegatePtr != nullptr) {
166 ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
167 g_schemaKvDelegatePtr = nullptr;
168 DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
169 LOGD("delete kv store status %d", status);
170 ASSERT_TRUE(status == OK);
171 }
172 if (g_deviceB != nullptr) {
173 delete g_deviceB;
174 g_deviceB = nullptr;
175 }
176 if (g_deviceC != nullptr) {
177 delete g_deviceC;
178 g_deviceC = nullptr;
179 }
180 PermissionCheckCallbackV2 nullCallback;
181 EXPECT_EQ(g_schemaMgr.SetPermissionCheckCallback(nullCallback), OK);
182 }
183
WaitUntilNotify(KvVirtualDevice & virtualDevice)184 void DistributedDBSingleVerP2PSubscribeSyncTest::WaitUntilNotify(KvVirtualDevice &virtualDevice)
185 {
186 bool notify = false;
187 std::condition_variable cv;
188 std::mutex notifyMutex;
189 virtualDevice.SetPushNotifier([¬ify, &cv, ¬ifyMutex](const std::string &) {
190 {
191 std::lock_guard<std::mutex> autoLock(notifyMutex);
192 notify = true;
193 }
194 cv.notify_all();
195 });
196 {
197 LOGI("Begin wait notify");
198 std::unique_lock<std::mutex> uniqueLock(notifyMutex);
199 (void)cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [¬ify]() {
200 return notify;
201 });
202 LOGI("End wait notify");
203 }
204 virtualDevice.SetPushNotifier(nullptr);
205 std::this_thread::sleep_for(std::chrono::seconds(1));
206 }
207
InitSubSchemaDb()208 void InitSubSchemaDb()
209 {
210 g_config.dataDir = g_testDir;
211 g_schemaMgr.SetKvStoreConfig(g_config);
212 KvStoreNbDelegate::Option option;
213 option.schema = SCHEMA_STRING;
214 g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
215 ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
216 ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
217 }
218
CheckUnFinishedMap(uint32_t sizeA,uint32_t sizeB,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)219 void CheckUnFinishedMap(uint32_t sizeA, uint32_t sizeB, std::vector<std::string> &deviceAQueies,
220 std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
221 {
222 std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
223 subManager.GetAllUnFinishSubQueries(allSyncQueries);
224 ASSERT_TRUE(allSyncQueries[DEVICE_A].size() == sizeA);
225 ASSERT_TRUE(allSyncQueries[DEVICE_B].size() == sizeB);
226 for (auto &item : allSyncQueries[DEVICE_A]) {
227 std::string queryId = item.GetIdentify();
228 ASSERT_TRUE(std::find(deviceAQueies.begin(), deviceAQueies.end(), queryId) != deviceAQueies.end());
229 }
230 for (auto &item : allSyncQueries[DEVICE_B]) {
231 std::string queryId = item.GetIdentify();
232 ASSERT_TRUE(std::find(deviceBQueies.begin(), deviceBQueies.end(), queryId) != deviceBQueies.end());
233 }
234 }
235
InitLocalSubscribeMap(QuerySyncObject & queryCommonObj,std::map<std::string,QuerySyncObject> & queryMap,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)236 void InitLocalSubscribeMap(QuerySyncObject &queryCommonObj, std::map<std::string, QuerySyncObject> &queryMap,
237 std::vector<std::string> &deviceAQueies, std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
238 {
239 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
240 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
241 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
242 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
243 queryMap[queryCommonObj.GetIdentify()] = queryCommonObj;
244 deviceAQueies.push_back(queryCommonObj.GetIdentify());
245 deviceBQueies.push_back(queryCommonObj.GetIdentify());
246 for (int i = 0; i < 3; i++) { // 3 subscribe
247 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
248 deviceAQueies.push_back(querySyncObj.GetIdentify());
249 queryMap[querySyncObj.GetIdentify()] = querySyncObj;
250 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
251 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
252 }
253 for (int i = 0; i < 1; i++) {
254 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('b' + i)}));
255 deviceBQueies.push_back(querySyncObj.GetIdentify());
256 queryMap[querySyncObj.GetIdentify()] = querySyncObj;
257 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
258 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
259 }
260 }
261 /**
262 * @tc.name: SubscribeRequestTest001
263 * @tc.desc: test Serialize/DoSerialize SubscribeRequest
264 * @tc.type: FUNC
265 * @tc.require: AR000FN6G9
266 * @tc.author: zhuwentao
267 */
268 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeRequestTest001, TestSize.Level1)
269 {
270 /**
271 * @tc.steps: step1. prepare a SubscribeRequest.
272 */
273 auto packet = new (std::nothrow) SubscribeRequest;
274 ASSERT_TRUE(packet != nullptr);
275 packet->SetPacketHead(100, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
276 Query query = Query::Select().EqualTo("$.field_name1", 1);
277 QuerySyncObject syncQuery(query);
278 packet->SetQuery(syncQuery);
279
280 /**
281 * @tc.steps: step2. put the SubscribeRequest Packet into a message.
282 */
283 Message msg;
284 msg.SetExternalObject(packet);
285 msg.SetMessageId(CONTROL_SYNC_MESSAGE);
286 msg.SetMessageType(TYPE_REQUEST);
287
288 /**
289 * @tc.steps: step3. Serialization the message to a buffer.
290 */
291 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
292 LOGE("test leng = %d", len);
293 uint8_t *buffer = new (nothrow) uint8_t[len];
294 ASSERT_TRUE(buffer != nullptr);
295 ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer, len, &msg), E_OK);
296
297 /**
298 * @tc.steps: step4. DeSerialization the buffer to a message.
299 */
300 Message outMsg;
301 outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
302 outMsg.SetMessageType(TYPE_REQUEST);
303 ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg), E_OK);
304
305 /**
306 * @tc.steps: step5. checkout the outMsg.
307 * @tc.expected: step5. outMsg equal the the in msg
308 */
309 auto outPacket = outMsg.GetObject<SubscribeRequest>();
310 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
311 EXPECT_EQ(outPacket->GetSendCode(), 100);
312 EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
313 EXPECT_EQ(outPacket->GetFlag(), 1u);
314 EXPECT_EQ(outPacket->GetQuery().GetIdentify(), syncQuery.GetIdentify());
315 delete[] buffer;
316 }
317
318 /**
319 * @tc.name: ControlAckTest001
320 * @tc.desc: test Serialize/DoSerialize ControlAckPacket
321 * @tc.type: FUNC
322 * @tc.require: AR000FN6G9
323 * @tc.author: zhuwentao
324 */
325 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, ControlAckTest001, TestSize.Level1)
326 {
327 /**
328 * @tc.steps: step1. prepare a ControlAckPacket.
329 */
330 ControlAckPacket packet;
331 packet.SetPacketHead(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
332
333 /**
334 * @tc.steps: step2. put the QuerySyncAckPacket into a message.
335 */
336 Message msg;
337 msg.SetCopiedObject(packet);
338 msg.SetMessageId(CONTROL_SYNC_MESSAGE);
339 msg.SetMessageType(TYPE_RESPONSE);
340
341 /**
342 * @tc.steps: step3. Serialization the message to a buffer.
343 */
344 int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
345 LOGE("test leng = %d", len);
346 uint8_t *buffer = new (nothrow) uint8_t[len];
347 ASSERT_TRUE(buffer != nullptr);
348 int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
349 ASSERT_EQ(errCode, E_OK);
350
351 /**
352 * @tc.steps: step4. DeSerialization the buffer to a message.
353 */
354 Message outMsg;
355 outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
356 outMsg.SetMessageType(TYPE_RESPONSE);
357 errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
358 ASSERT_EQ(errCode, E_OK);
359
360 /**
361 * @tc.steps: step5. checkout the outMsg.
362 * @tc.expected: step5. outMsg equal the the in msg
363 */
364 auto outPacket = outMsg.GetObject<ControlAckPacket>();
365 EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
366 EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
367 EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
368 EXPECT_EQ(outPacket->GetFlag(), 1u);
369 delete[] buffer;
370 }
371
372 /**
373 * @tc.name: subscribeManager001
374 * @tc.desc: test subscribe class subscribe local function with one device
375 * @tc.type: FUNC
376 * @tc.require: AR000FN6G9
377 * @tc.author: zhuwentao
378 */
379 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager001, TestSize.Level1)
380 {
381 SubscribeManager subManager;
382 std::string device = "device_A";
383 /**
384 * @tc.steps: step1. test one device limit four subscribe queries in local map
385 */
386 LOGI("============step 1============");
387 for (int i = 0; i < 4; i++) {
388 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
389 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj) == E_OK);
390 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj) == E_OK);
391 }
392 std::vector<QuerySyncObject> subscribeQueries;
393 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
394 ASSERT_TRUE(subscribeQueries.size() == 4);
395 subscribeQueries.clear();
396 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
397 int errCode = subManager.ReserveLocalSubscribeQuery(device, querySyncObj1);
398 ASSERT_TRUE(errCode != E_OK);
399 /**
400 * @tc.steps: step2. allow to subscribe existed query
401 */
402 LOGI("============step 2============");
403 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
404 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
405 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
406 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
407 ASSERT_TRUE(subscribeQueries.size() == 4);
408 subscribeQueries.clear();
409 /**
410 * @tc.steps: step3. unsubscribe no existed queries
411 */
412 LOGI("============step 3============");
413 subManager.RemoveLocalSubscribeQuery(device, querySyncObj1);
414 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
415 ASSERT_TRUE(subscribeQueries.size() == 4);
416 subscribeQueries.clear();
417 /**
418 * @tc.steps: step4. unsubscribe queries
419 */
420 LOGI("============step 4============");
421 for (int i = 0; i < 4; i++) {
422 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
423 subManager.RemoveLocalSubscribeQuery(device, querySyncObj);
424 }
425 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
426 ASSERT_TRUE(subscribeQueries.size() == 0);
427
428 /**
429 * @tc.steps: step5. reserve twice while subscribe queries
430 */
431 LOGI("============step 5============");
432 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
433 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
434 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
435 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
436 ASSERT_TRUE(subscribeQueries.size() == 1);
437 subscribeQueries.clear();
438 subManager.RemoveLocalSubscribeQuery(device, querySyncObj2);
439 subManager.GetLocalSubscribeQueries(device, subscribeQueries);
440 ASSERT_TRUE(subscribeQueries.size() == 0);
441 }
442
443 /**
444 * @tc.name: subscribeManager002
445 * @tc.desc: test subscribe class subscribe remote function with one device
446 * @tc.type: FUNC
447 * @tc.require: AR000FN6G9
448 * @tc.author: zhuwentao
449 */
450 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager002, TestSize.Level1)
451 {
452 SubscribeManager subManager;
453 std::string device = "device_A";
454 /**
455 * @tc.steps: step1. test one device limit four subscribe queries in remote map
456 */
457 LOGI("============step 1============");
458 for (int i = 0; i < 4; i++) {
459 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
460 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
461 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
462 }
463 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
464 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj1) != E_OK);
465 std::vector<std::string> subscribeQueryId;
466 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
467 ASSERT_TRUE(subscribeQueryId.size() == 4);
468 subscribeQueryId.clear();
469 /**
470 * @tc.steps: step2. allow to subscribe existed query
471 */
472 LOGI("============step 2============");
473 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
474 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
475 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
476 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
477 ASSERT_TRUE(subscribeQueryId.size() == 4);
478 subscribeQueryId.clear();
479 /**
480 * @tc.steps: step3. unsubscribe no existed queries
481 */
482 LOGI("============step 3============");
483 subManager.RemoveRemoteSubscribeQuery(device, querySyncObj1);
484 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
485 ASSERT_TRUE(subscribeQueryId.size() == 4);
486 subscribeQueryId.clear();
487 /**
488 * @tc.steps: step4. unsubscribe queries
489 */
490 LOGI("============step 4============");
491 for (int i = 0; i < 4; i++) {
492 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
493 subManager.RemoveRemoteSubscribeQuery(device, querySyncObj);
494 }
495 subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
496 ASSERT_TRUE(subscribeQueryId.size() == 0);
497 }
498
499 /**
500 * @tc.name: subscribeManager003
501 * @tc.desc: test subscribe class subscribe remote function with multi device
502 * @tc.type: FUNC
503 * @tc.require: AR000FN6G9
504 * @tc.author: zhuwentao
505 */
506 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager003, TestSize.Level1)
507 {
508 SubscribeManager subManager;
509 std::string device = "device_";
510 std::vector<QuerySyncObject> subscribeQueries;
511 /**
512 * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
513 */
514 LOGI("============step 1============");
515 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
516 for (int i = 0; i < 32; i++) {
517 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
518 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
519 }
520 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
521 for (int i = 0; i < 32; i++) {
522 subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
523 ASSERT_TRUE(subscribeQueries.size() == 1);
524 subscribeQueries.clear();
525 }
526 /**
527 * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
528 */
529 LOGI("============step 2============");
530 for (int i = 0; i < 32; i++) {
531 subManager.ClearLocalSubscribeQuery(device + std::to_string(i));
532 subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
533 ASSERT_TRUE(subscribeQueries.size() == 0);
534 subscribeQueries.clear();
535 }
536 /**
537 * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
538 */
539 LOGI("============step 3============");
540 for (int i = 0; i < 8; i++) {
541 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
542 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
543 ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
544 }
545 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
546 ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
547 }
548
549 /**
550 * @tc.name: subscribeManager004
551 * @tc.desc: test subscribe class subscribe remote function with multi device
552 * @tc.type: FUNC
553 * @tc.require: AR000FN6G9
554 * @tc.author: zhuwentao
555 */
556 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager004, TestSize.Level1)
557 {
558 SubscribeManager subManager;
559 std::string device = "device_";
560 std::vector<std::string> subscribeQueryId;
561 /**
562 * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
563 */
564 LOGI("============step 1============");
565 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
566 for (int i = 0; i < 32; i++) {
567 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
568 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
569 }
570 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
571 for (int i = 0; i < 32; i++) {
572 subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
573 ASSERT_TRUE(subscribeQueryId.size() == 1);
574 subscribeQueryId.clear();
575 }
576 /**
577 * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
578 */
579 LOGI("============step 2============");
580 for (int i = 0; i < 32; i++) {
581 subManager.ClearRemoteSubscribeQuery(device + std::to_string(i));
582 subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
583 ASSERT_TRUE(subscribeQueryId.size() == 0);
584 subscribeQueryId.clear();
585 }
586 subManager.ClearRemoteSubscribeQuery(device);
587 /**
588 * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
589 */
590 LOGI("============step 3============");
591 for (int i = 0; i < 8; i++) {
592 QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
593 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
594 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
595 }
596 QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
597 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
598 }
599
600 /**
601 * @tc.name: subscribeManager005
602 * @tc.desc: test subscribe class subscribe remote function with put into unfinished map
603 * @tc.type: FUNC
604 * @tc.require: AR000FN6G9
605 * @tc.author: zhuwentao
606 */
607 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager005, TestSize.Level1)
608 {
609 SubscribeManager subManager;
610 std::vector<QuerySyncObject> subscribeQueries;
611 std::map<std::string, QuerySyncObject> queryMap;
612 std::vector<std::string> deviceAQueies;
613 std::vector<std::string> deviceBQueies;
614 QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
615 /**
616 * @tc.steps: step1. test one devices has 4 subscribes and another has 2 in local map, put into unfinished map
617 */
618 LOGI("============step 1============");
619 InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
620 /**
621 * @tc.steps: step2. check all device unFinished subscribe queries and put into unfinished map
622 */
623 LOGI("============step 2============");
624 subManager.GetLocalSubscribeQueries(DEVICE_A, subscribeQueries);
625 ASSERT_TRUE(subscribeQueries.size() == 4);
626 subManager.PutLocalUnFinishedSubQueries(DEVICE_A, subscribeQueries);
627 subscribeQueries.clear();
628 subManager.GetLocalSubscribeQueries(DEVICE_B, subscribeQueries);
629 ASSERT_TRUE(subscribeQueries.size() == 2);
630 subManager.PutLocalUnFinishedSubQueries(DEVICE_B, subscribeQueries);
631 subscribeQueries.clear();
632 /**
633 * @tc.steps: step3. get all device unFinished subscribe queries and check
634 */
635 LOGI("============step 3============");
636 CheckUnFinishedMap(4, 2, deviceAQueies, deviceBQueies, subManager);
637 /**
638 * @tc.steps: step4. active some subscribe queries
639 */
640 LOGI("============step 4============");
641 subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
642 subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryMap[deviceAQueies[3]]);
643 subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryMap[deviceBQueies[1]]);
644 deviceAQueies.erase(deviceAQueies.begin() + 3);
645 deviceAQueies.erase(deviceAQueies.begin());
646 queryMap.erase(queryMap[deviceBQueies[1]].GetIdentify());
647 deviceBQueies.erase(deviceBQueies.begin() + 1);
648 /**
649 * @tc.steps: step5. get all device unFinished subscribe queries and check
650 */
651 LOGI("============step 5============");
652 CheckUnFinishedMap(2, 1, deviceAQueies, deviceBQueies, subManager);
653 /**
654 * @tc.steps: step6. remove left subscribe queries
655 */
656 LOGI("============step 6============");
657 for (int i = 0; i < 2; i++) {
658 QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
659 subManager.RemoveLocalSubscribeQuery(DEVICE_A, querySyncObj);
660 }
661 subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
662 subManager.RemoveLocalSubscribeQuery(DEVICE_B, queryCommonObj);
663 /**
664 * @tc.steps: step7. get all device unFinished subscribe queries and check
665 */
666 LOGI("============step 7============");
667 CheckUnFinishedMap(0, 0, deviceAQueies, deviceBQueies, subManager);
668 }
669
670 /**
671 * @tc.name: subscribeManager006
672 * @tc.desc: test exception branch of subscribe manager
673 * @tc.type: FUNC
674 * @tc.require:
675 * @tc.author: zhangshijie
676 */
677 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager006, TestSize.Level1)
678 {
679 /**
680 * @tc.steps: step1. active a query sync object which is not in local subscribe map
681 * @tc.expected:step1 return -E_INTERNAL_ERROR
682 */
683 SubscribeManager subManager;
684 QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
685 EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj), -E_INTERNAL_ERROR);
686 subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
687 subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
688 std::vector<QuerySyncObject> subscribeQueries;
689 subManager.PutLocalUnFinishedSubQueries(DEVICE_A, subscribeQueries);
690 std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
691 subManager.GetAllUnFinishSubQueries(allSyncQueries);
692
693 /**
694 * @tc.steps: step2. call IsLastRemoteContainSubscribe with a device not in remote subscribe map
695 * @tc.expected: step2 return false
696 */
697 std::string queryId = "queryId";
698 EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
699
700 /**
701 * @tc.steps: step3. active local subscribe with a device which is not in local subscribe map and
702 * a query sync object which is in local subscribe map
703 * @tc.expected: step3 return -E_INTERNAL_ERROR
704 */
705 std::vector<std::string> deviceAQueies;
706 std::vector<std::string> deviceBQueies;
707 std::map<std::string, QuerySyncObject> queryMap;
708 InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
709 ASSERT_TRUE(queryMap.size() > 0);
710 std::string devNotExists = "device_not_exists";
711 EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(devNotExists, queryMap.begin()->second), -E_INTERNAL_ERROR);
712 QuerySyncObject queryObj(Query::Select().PrefixKey({'b'}));
713 EXPECT_EQ(subManager.ReserveLocalSubscribeQuery("test_dev", queryObj), E_OK);
714 subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryObj);
715
716 EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryObj), -E_INTERNAL_ERROR);
717 subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
718 ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
719 ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
720 EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
721 deviceAQueies.push_back(DEVICE_A);
722 EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), E_OK);
723
724 /**
725 * @tc.steps: step4. add MAX_DEVICES_NUM device, then call LocalSubscribeLimitCheck
726 * @tc.expected: step4 return -E_MAX_LIMITS
727 */
728 for (size_t i = 0 ; i < MAX_DEVICES_NUM; i++) {
729 deviceAQueies.push_back("device_" + std::to_string(i));
730 }
731 EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), -E_MAX_LIMITS);
732 }
733
734 /**
735 * @tc.name: subscribeSync001
736 * @tc.desc: test subscribe normal sync
737 * @tc.type: FUNC
738 * @tc.require: AR000FN6G9
739 * @tc.author: zhuwentao
740 */
741 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync001, TestSize.Level1)
742 {
743 /**
744 * @tc.steps: step1. InitSchemaDb
745 */
746 LOGI("============step 1============");
747 InitSubSchemaDb();
748 DBStatus status = OK;
749 std::vector<std::string> devices;
750 devices.push_back(g_deviceB->GetDeviceId());
751 Query query = Query::Select().EqualTo("$.field_name1", 1);
752 QuerySyncObject querySyncObj(query);
753
754 /**
755 * @tc.steps: step2. deviceB subscribe query to deviceA
756 */
757 LOGI("============step 2============");
758 g_deviceB->Subscribe(querySyncObj, true, 1);
759
760 /**
761 * @tc.steps: step3. deviceA put {key1, SCHEMA_VALUE1} and wait 1s
762 */
763 LOGI("============step 3============");
764 Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
765 Key key = {'1'};
766 status = g_schemaKvDelegatePtr->Put(key, value);
767 EXPECT_EQ(status, OK);
768 WaitUntilNotify(*g_deviceB);
769 /**
770 * @tc.steps: step4. deviceB has {key11, SCHEMA_VALUE1}
771 */
772 LOGI("============step 4============");
773 VirtualDataItem item;
774 g_deviceB->GetData(key, item);
775 EXPECT_TRUE(item.value == value);
776
777 /**
778 * @tc.steps: step5. deviceB unsubscribe query to deviceA
779 */
780 g_deviceB->UnSubscribe(querySyncObj, true, 2);
781
782 /**
783 * @tc.steps: step5. deviceA put {key2, SCHEMA_VALUE1} and wait 1s
784 */
785 LOGI("============step 5============");
786 Value value2(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
787 Key key2 = {'2'};
788 status = g_schemaKvDelegatePtr->Put(key2, value2);
789 EXPECT_EQ(status, OK);
790 WaitUntilNotify(*g_deviceB);
791 /**
792 * @tc.steps: step6. deviceB don't has {key2, SCHEMA_VALUE1}
793 */
794 LOGI("============step 6============");
795 VirtualDataItem item2;
796 EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
797 }
798
799 /**
800 * @tc.name: subscribeSync002
801 * @tc.desc: test subscribe sync over 32 devices,limit,orderBy
802 * @tc.type: FUNC
803 * @tc.require: AR000FN6G9
804 * @tc.author: zhuwentao
805 */
806 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync002, TestSize.Level1)
807 {
808 /**
809 * @tc.steps: step1. InitSchemaDb
810 */
811 LOGI("============step 1============");
812 InitSubSchemaDb();
813 std::vector<std::string> devices;
814 std::string device = "device_";
815 Query query = Query::Select().EqualTo("$.field_name1", 1);
816
817 /**
818 * @tc.steps: step2. deviceA subscribe query to 33 devices, and return overlimit
819 */
820 LOGI("============step 2============");
821 for (int i = 0; i < 33; i++) {
822 devices.push_back(device + std::to_string(i));
823 }
824 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == OVER_MAX_LIMITS);
825
826 /**
827 * @tc.steps: step3. deviceA subscribe query with limit
828 */
829 LOGI("============step 3============");
830 devices.clear();
831 devices.push_back("device_B");
832 Query query2 = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
833 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query2, true) == NOT_SUPPORT);
834
835 /**
836 * @tc.steps: step4. deviceA subscribe query with orderBy
837 */
838 LOGI("============step 4============");
839 Query query3 = Query::Select().EqualTo("$.field_name1", 1).OrderBy("$.field_name7");
840 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query3, true) == NOT_SUPPORT);
841 }
842
843 /**
844 * @tc.name: subscribeSync003
845 * @tc.desc: test subscribe sync with inkeys query
846 * @tc.type: FUNC
847 * @tc.require: AR000GOHO7
848 * @tc.author: lidongwei
849 */
850 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync003, TestSize.Level1)
851 {
852 /**
853 * @tc.steps: step1. InitSchemaDb
854 */
855 LOGI("============step 1============");
856 InitSubSchemaDb();
857 std::vector<std::string> devices;
858 devices.push_back(g_deviceB->GetDeviceId());
859 g_deviceB->Online();
860
861 /**
862 * @tc.steps: step2. deviceB subscribe inkeys(k2k4) query to deviceA
863 */
864 LOGI("============step 2============");
865 Query query = Query::Select().InKeys({KEY_2, KEY_4});
866 g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
867
868 /**
869 * @tc.steps: step3. deviceA put k1-k5 and wait
870 */
871 LOGI("============step 3============");
872 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
873 {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
874 {KEY_2, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
875 {KEY_3, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
876 {KEY_4, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
877 {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
878 }));
879 WaitUntilNotify(*g_deviceB);
880
881 /**
882 * @tc.steps: step4. deviceB has k2k4, has no k1k3k5
883 */
884 LOGI("============step 4============");
885 VirtualDataItem item;
886 EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
887 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
888 EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
889 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
890 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
891 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
892 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
893 }
894
895 /**
896 * @tc.name: subscribeSync004
897 * @tc.desc: test subscribe sync with inkeys query
898 * @tc.type: FUNC
899 * @tc.require: AR000GOHO7
900 * @tc.author: lidongwei
901 */
902 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync004, TestSize.Level1)
903 {
904 /**
905 * @tc.steps: step1. InitSchemaDb
906 */
907 LOGI("============step 1============");
908 InitSubSchemaDb();
909 std::vector<std::string> devices;
910 devices.push_back(g_deviceB->GetDeviceId());
911
912 /**
913 * @tc.steps: step2. deviceB subscribe inkeys(k3k5) and equal to query to deviceA
914 */
915 LOGI("============step 2============");
916 Query query = Query::Select().InKeys({KEY_3, KEY_5}).EqualTo("$.field_name3", 100); // 100 for test.
917 g_deviceB->Subscribe(QuerySyncObject(query), true, 2);
918
919 /**
920 * @tc.steps: step3. deviceA put k1v2,k3v2,k5v1 and wait
921 */
922 LOGI("============step 3============");
923 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
924 {KEY_1, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
925 {KEY_3, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
926 {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
927 }));
928 WaitUntilNotify(*g_deviceB);
929
930 /**
931 * @tc.steps: step4. deviceB has k3, has no k1k5
932 */
933 LOGI("============step 4============");
934 VirtualDataItem item;
935 EXPECT_EQ(g_deviceB->GetData(KEY_3, item), E_OK);
936 EXPECT_EQ(item.value, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end()));
937 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
938 EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
939 }
940
941 /**
942 * @tc.name: subscribeSync005
943 * @tc.desc: test subscribe sync with inkeys query
944 * @tc.type: FUNC
945 * @tc.require: AR000GOHO7
946 * @tc.author: lidongwei
947 */
948 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync005, TestSize.Level1)
949 {
950 /**
951 * @tc.steps: step1. InitSchemaDb
952 */
953 LOGI("============step 1============");
954 InitSubSchemaDb();
955 std::vector<std::string> devices;
956 devices.push_back(g_deviceB->GetDeviceId());
957
958 /**
959 * @tc.steps: step2. deviceB subscribe inkeys(k1, key6) and prefix key "k" query to deviceA
960 */
961 LOGI("============step 2============");
962 Key key6 { 'k', '6' };
963 Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
964 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
965
966 /**
967 * @tc.steps: step3. deviceA put k1,key6 and wait
968 */
969 LOGI("============step 3============");
970 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
971 {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
972 {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
973 }));
974 WaitUntilNotify(*g_deviceB);
975
976 /**
977 * @tc.steps: step4. deviceB has key6, has no k1
978 */
979 LOGI("============step 4============");
980 VirtualDataItem item;
981 EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
982 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
983 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
984 }
985
986
987 /**
988 * @tc.name: subscribeSync006
989 * @tc.desc: test one device unsubscribe no effect other device
990 * @tc.type: FUNC
991 * @tc.require: AR000GOHO7
992 * @tc.author: zhangqiquan
993 */
994 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync006, TestSize.Level1)
995 {
996 /**
997 * @tc.steps: step1. InitSchemaDb
998 */
999 LOGI("============step 1============");
1000 InitSubSchemaDb();
1001 std::vector<std::string> devices;
1002 devices.push_back(g_deviceB->GetDeviceId());
1003 devices.push_back(g_deviceC->GetDeviceId());
1004
1005 /**
1006 * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1007 */
1008 LOGI("============step 2============");
1009 Key key6 { 'k', '6' };
1010 Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
1011 g_deviceB->Online();
1012 g_deviceC->Online();
1013 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1014 g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
1015
1016 /**
1017 * @tc.steps: step3. deviceC unsubscribe
1018 */
1019 LOGI("============step 3============");
1020 g_deviceC->UnSubscribe(QuerySyncObject(query), true, 3);
1021
1022 /**
1023 * @tc.steps: step4. deviceA put k1,key6 and wait
1024 */
1025 LOGI("============step 4============");
1026 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
1027 {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1028 {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1029 }));
1030 WaitUntilNotify(*g_deviceB);
1031
1032 /**
1033 * @tc.steps: step5. deviceB has key6, has no k1
1034 */
1035 LOGI("============step 5============");
1036 VirtualDataItem item;
1037 EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
1038 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1039 EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
1040 }
1041
1042 /**
1043 * @tc.name: subscribeSync007
1044 * @tc.desc: test subscribe query with order by write time
1045 * @tc.type: FUNC
1046 * @tc.require: AR000H5VLO
1047 * @tc.author: zhuwentao
1048 */
1049 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync007, TestSize.Level1)
1050 {
1051 /**
1052 * @tc.steps: step1. InitSchemaDb
1053 */
1054 LOGI("============step 1============");
1055 InitSubSchemaDb();
1056 std::vector<std::string> devices = {"DEVICE_B"};
1057
1058 /**
1059 * @tc.steps: step2. deviceA subscribe query with order by write time
1060 * * @tc.expected: step2. interface return not support
1061 */
1062 Query query = Query::Select().EqualTo("$.field_name1", 1).OrderByWriteTime(false);
1063 EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1064 EXPECT_TRUE(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1065 }
1066
1067 /**
1068 * @tc.name: SubscribeSync008
1069 * @tc.desc: test subscribe with reopen db
1070 * @tc.type: FUNC
1071 * @tc.require: AR000HGD0B
1072 * @tc.author: zhangqiquan
1073 */
1074 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync008, TestSize.Level1)
1075 {
1076 /**
1077 * @tc.steps: step1. InitSchemaDb
1078 */
1079 std::shared_ptr<DBInfoHandleTest> handleTest = std::make_shared<DBInfoHandleTest>();
1080 RuntimeConfig::SetDBInfoHandle(handleTest);
1081
1082 LOGI("============step 1============");
1083 InitSubSchemaDb();
1084 std::vector<std::string> devices;
1085 devices.push_back(g_deviceB->GetDeviceId());
1086
1087 /**
1088 * @tc.steps: step2. deviceB subscribe query to deviceA
1089 */
1090 LOGI("============step 2============");
1091 Key key6 { 'k', '6' };
1092 Query query = Query::Select();
1093 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1094
1095 /**
1096 * @tc.steps: step3. deviceA put k1,key6 and wait
1097 */
1098 LOGI("============step 3============");
1099 EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
1100 {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1101 }));
1102 EXPECT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
1103 g_schemaKvDelegatePtr = nullptr;
1104 InitSubSchemaDb();
1105 g_deviceB->Online();
1106 WaitUntilNotify(*g_deviceB);
1107
1108 /**
1109 * @tc.steps: step4. deviceB has key6
1110 */
1111 LOGI("============step 4============");
1112 VirtualDataItem item;
1113 if (g_deviceB->GetData(key6, item) == E_OK) {
1114 EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1115 }
1116 RuntimeConfig::SetDBInfoHandle(nullptr);
1117 }
1118
1119 namespace {
CreateKvVirtualDevice(const std::string & deviceName)1120 KvVirtualDevice *CreateKvVirtualDevice(const std::string &deviceName)
1121 {
1122 KvVirtualDevice *device = nullptr;
1123 do {
1124 if (g_communicatorAggregator == nullptr) {
1125 break;
1126 }
1127 device = new (std::nothrow) KvVirtualDevice(deviceName);
1128 if (device == nullptr) {
1129 break;
1130 }
1131 auto interface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1132 if (interface == nullptr) {
1133 delete device;
1134 device = nullptr;
1135 break;
1136 }
1137 EXPECT_EQ(device->Initialize(g_communicatorAggregator, interface), E_OK);
1138 } while (false);
1139 return device;
1140 }
1141 }
1142
1143 /**
1144 * @tc.name: SubscribeSync009
1145 * @tc.desc: test subscribe query with 33 device
1146 * @tc.type: FUNC
1147 * @tc.require: AR000H5VLO
1148 * @tc.author: zhangqiquan
1149 */
1150 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync009, TestSize.Level3)
1151 {
1152 /**
1153 * @tc.steps: step1. InitSchemaDb
1154 */
1155 LOGI("============step 1============");
1156 InitSubSchemaDb();
1157 const int maxDeviceCount = 32;
1158 std::vector<KvVirtualDevice *> devices;
1159 for (int i = 0; i < maxDeviceCount; ++i) {
1160 std::string deviceName = "D_" + std::to_string(i);
1161 auto device = CreateKvVirtualDevice(deviceName);
1162 EXPECT_NE(device, nullptr);
1163 if (device == nullptr) {
1164 continue;
1165 }
1166 devices.push_back(device);
1167 }
1168
1169 /**
1170 * @tc.steps: step2. 33 device subscribe
1171 */
1172 LOGI("============step 2============");
1173 Query query = Query::Select();
1174 for (const auto &dev: devices) {
1175 dev->Online();
1176 dev->Subscribe(QuerySyncObject(query), true, 1); // sync id is 1
1177 }
1178 g_deviceB->Subscribe(QuerySyncObject(query), true, 1); // sync id is 1
1179 /**
1180 * @tc.steps: step3. 32 unsubscribe
1181 */
1182 LOGI("============step 3============");
__anonf239d4230502(std::map<std::string, int> res) 1183 SyncOperation::UserCallback callback = [](std::map<std::string, int> res) {
1184 ASSERT_EQ(res.size(), 1u);
1185 EXPECT_EQ(res["real_device"], SyncOperation::OP_FINISHED_ALL);
1186 };
1187 for (const auto &dev: devices) {
1188 dev->UnSubscribe(QuerySyncObject(query), true, 1, callback); // sync id is 1
1189 delete dev;
1190 }
1191 }
1192
1193 /*
1194 * @tc.name: SubscribeSync010
1195 * @tc.desc: test subscribe query cache
1196 * @tc.type: FUNC
1197 * @tc.require: AR000H5VLO
1198 * @tc.author: zhangqiquan
1199 */
1200 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync010, TestSize.Level1)
1201 {
1202 SubscribeRecorder recorder;
1203 DBInfo dbInfo = {
1204 USER_ID,
1205 APP_ID,
1206 STORE_ID_1,
1207 false,
1208 true
1209 };
1210 Query query = Query::Select();
1211 QuerySyncObject querySyncObject(query);
1212 /**
1213 * @tc.steps: step1. Insert one record twice and remove
1214 */
1215 recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1216 recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1217 recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1218 std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1219 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1220 for (const auto &entry: subscribeQuery) {
1221 EXPECT_EQ(entry.second.size(), 0u);
1222 }
1223 /**
1224 * @tc.steps: step2. Remove no exist data
1225 */
1226 recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1227 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1228 for (const auto &entry: subscribeQuery) {
1229 EXPECT_EQ(entry.second.size(), 0u);
1230 }
1231 /**
1232 * @tc.steps: step3. insert two data and remove one data
1233 */
1234 recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1235 Query query2 = Query::Select().EqualTo("test", "test");
1236 recorder.RecordSubscribe(dbInfo, DEVICE_A, QuerySyncObject(query2));
1237 recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1238 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1239 for (const auto &entry: subscribeQuery) {
1240 EXPECT_EQ(entry.second.size(), 1u);
1241 }
1242 /**
1243 * @tc.steps: step4. remove no exist data
1244 */
1245 dbInfo.storeId = STORE_ID_2;
1246 recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A);
1247 dbInfo.storeId = STORE_ID_1;
1248 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1249 for (const auto &entry: subscribeQuery) {
1250 EXPECT_EQ(entry.second.size(), 1u);
1251 }
1252 }
1253
1254 /*
1255 * @tc.name: SubscribeSync011
1256 * @tc.desc: test subscribe query cache
1257 * @tc.type: FUNC
1258 * @tc.require: AR000H5VLO
1259 * @tc.author: zhangqiquan
1260 */
1261 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync011, TestSize.Level1)
1262 {
1263 SubscribeRecorder recorder;
1264 DBInfo dbInfo = {
1265 USER_ID,
1266 APP_ID,
1267 STORE_ID_1,
1268 false,
1269 true
1270 };
1271 /**
1272 * @tc.steps: step1. Insert 2 record in db1 and 1 record in db2
1273 */
1274 Query query = Query::Select();
1275 QuerySyncObject querySyncObject(query);
1276 recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1277 recorder.RecordSubscribe(dbInfo, DEVICE_B, querySyncObject);
1278 DBInfo dbInfo2 = dbInfo;
1279 dbInfo2.storeId = STORE_ID_2;
1280 recorder.RecordSubscribe(dbInfo2, DEVICE_B, querySyncObject);
1281 /**
1282 * @tc.steps: step2. Insert 2 record in db1
1283 */
1284 recorder.RemoveRemoteSubscribe(dbInfo);
1285 std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1286 recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1287 EXPECT_EQ(subscribeQuery.size(), 0u);
1288 recorder.GetSubscribeQuery(dbInfo2, subscribeQuery);
1289 EXPECT_EQ(subscribeQuery.size(), 1u);
1290 /**
1291 * @tc.steps: step3. Insert 1 record in db2
1292 */
1293 recorder.RemoveRemoteSubscribe(dbInfo2);
1294 subscribeQuery.clear();
1295 recorder.GetSubscribeQuery(dbInfo2, subscribeQuery);
1296 EXPECT_EQ(subscribeQuery.size(), 0u);
1297 }
1298
1299 /**
1300 * @tc.name: subscribeSync012
1301 * @tc.desc: test one device unsubscribe no effect other device
1302 * @tc.type: FUNC
1303 * @tc.require: AR000GOHO7
1304 * @tc.author: zhangqiquan
1305 */
1306 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync012, TestSize.Level1)
1307 {
1308 /**
1309 * @tc.steps: step1. InitSchemaDb
1310 */
1311 LOGI("============step 1============");
1312 InitSubSchemaDb();
1313 std::vector<std::string> devices;
1314 devices.push_back(g_deviceB->GetDeviceId());
1315 devices.push_back(g_deviceC->GetDeviceId());
1316
1317 /**
1318 * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1319 */
1320 LOGI("============step 2============");
1321 Key key6 { 'k', '6' };
1322 Query query = Query::Select();
1323 g_deviceB->Online();
1324 g_deviceC->Online();
1325 g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1326 g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
1327
1328 /**
1329 * @tc.steps: step3. deviceC unsubscribe
1330 */
1331 LOGI("============step 3============");
1332 g_deviceC->Offline();
1333
1334 /**
1335 * @tc.steps: step4. deviceA put k1,key6 and wait
1336 */
1337 LOGI("============step 4============");
1338 const uint8_t putItemCount = 10u;
1339 std::vector<Key> dataKeys;
1340 for (uint8_t i = 0u; i < putItemCount; ++i) {
1341 Key key = { i };
1342 dataKeys.push_back(key);
1343 EXPECT_EQ(g_schemaKvDelegatePtr->Put(key, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())), OK);
1344 WaitUntilNotify(*g_deviceB);
1345 }
1346 /**
1347 * @tc.steps: step5. deviceB has key6, has no k1
1348 */
1349 LOGI("============step 5============");
1350 for (const auto &key: dataKeys) {
1351 VirtualDataItem item;
1352 EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1353 EXPECT_EQ(g_deviceC->GetData(key, item), -E_NOT_FOUND);
1354 }
1355 }
1356
1357 /*
1358 * @tc.name: SubscribeSync013
1359 * @tc.desc: test subscribe query cache with remote support false
1360 * @tc.type: FUNC
1361 * @tc.require: AR000H5VLO
1362 * @tc.author: zhangqiquan
1363 */
1364 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync013, TestSize.Level1)
1365 {
1366 std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1367 RuntimeConfig::SetDBInfoHandle(handle);
1368 handle->SetLocalIsSupport(true);
1369 DBInfo dbInfo = {
1370 USER_ID,
1371 APP_ID,
1372 STORE_ID_1,
1373 false,
1374 true
1375 };
1376 RuntimeContext::GetInstance()->SetRemoteOptimizeCommunication(DEVICE_A, false);
1377 Query query = Query::Select();
1378 QuerySyncObject querySyncObject(query);
1379 /**
1380 * @tc.steps: step1. Insert one record
1381 */
1382 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1383 std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1384 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1385 EXPECT_EQ(subscribeQuery.size(), 1u);
1386 /**
1387 * @tc.steps: step2. Remove one record
1388 */
1389 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1390 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1391 EXPECT_EQ(subscribeQuery[DEVICE_A].size(), 0u);
1392 /**
1393 * @tc.steps: step3. Record again and remove by dbInfo and device
1394 */
1395 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1396 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, DEVICE_A);
1397 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1398 EXPECT_EQ(subscribeQuery.size(), 0u);
1399 /**
1400 * @tc.steps: step4. Record again and remove by device
1401 */
1402 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1403 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(DEVICE_A);
1404 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1405 EXPECT_EQ(subscribeQuery.size(), 0u);
1406 /**
1407 * @tc.steps: step5. Record again and remove by dbInfo
1408 */
1409 RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1410 RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo);
1411 RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1412 EXPECT_EQ(subscribeQuery.size(), 0u);
1413 RuntimeConfig::SetDBInfoHandle(nullptr);
1414 }
1415 /**
1416 * @tc.name: SubscribeSync014
1417 * @tc.desc: test device subscribe with put a lot of times
1418 * @tc.type: FUNC
1419 * @tc.require: AR000GOHO7
1420 * @tc.author: zhangqiquan
1421 */
1422 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync014, TestSize.Level3)
1423 {
1424 /**
1425 * @tc.steps: step1. InitSchemaDb
1426 */
1427 LOGI("============step 1============");
1428 InitSubSchemaDb();
1429 std::vector<std::string> devices;
1430 devices.push_back(g_deviceB->GetDeviceId());
1431 /**
1432 * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1433 */
1434 LOGI("============step 2============");
1435 Key key6 { 'k', '6' };
1436 Query query = Query::Select();
1437 g_deviceB->Online();
1438 g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
1439 /**
1440 * @tc.steps: step3. deviceA put a lot of time
1441 * @tc.expected: step3 put performance was not effected by subscribe
1442 */
1443 LOGI("============step 4============");
1444 std::vector<Key> dataKeys;
1445 const uint64_t PUT_LIMIT_30S = 30 * 1000000; // 30s = 30 * 1000000us
1446 LOGD("BEGIN PUT");
1447 for (uint8_t i = 0u; i < 10u; ++i) { // loop 10 times
1448 Key key = { i };
1449 dataKeys.push_back(key);
1450 uint64_t curTime = 0;
1451 uint64_t lastTime = 0;
1452 EXPECT_EQ(OS::GetCurrentSysTimeInMicrosecond(curTime), E_OK);
1453 lastTime = curTime;
1454 EXPECT_EQ(g_schemaKvDelegatePtr->Put(key, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())), OK);
1455 EXPECT_EQ(OS::GetCurrentSysTimeInMicrosecond(curTime), E_OK);
1456 EXPECT_LE(curTime - lastTime, PUT_LIMIT_30S);
1457 }
1458 LOGD("END PUT");
1459 }
1460
1461 /**
1462 * @tc.name: subscribeSync015
1463 * @tc.desc: test subscribe query with range
1464 * @tc.type: FUNC
1465 * @tc.require: DTS2023112110763
1466 * @tc.author: mazhao
1467 */
1468 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync015, TestSize.Level1)
1469 {
1470 /**
1471 * @tc.steps: step1. InitSchemaDb
1472 */
1473 LOGI("============step 1============");
1474 InitSubSchemaDb();
1475 std::vector<std::string> devices = {"DEVICE_B"};
1476
1477 /**
1478 * @tc.steps: step2. deviceA subscribe query by Range.
1479 * * @tc.expected: step2. interface return not support
1480 */
1481 Query query = Query::Select().Range({}, {});
1482 EXPECT_EQ(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true), NOT_SUPPORT);
1483 EXPECT_EQ(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true), NOT_SUPPORT);
1484 }
1485 } // namespace