1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <gtest/gtest.h>
17 #include <thread>
18 
19 #include "db_constant.h"
20 #include "distributeddb_data_generate_unit_test.h"
21 #include "distributeddb_tools_unit_test.h"
22 #include "kv_store_nb_delegate.h"
23 #include "kv_virtual_device.h"
24 #include "platform_specific.h"
25 #include "query.h"
26 #include "query_sync_object.h"
27 #include "runtime_config.h"
28 #include "single_ver_data_sync.h"
29 #include "single_ver_serialize_manager.h"
30 #include "subscribe_manager.h"
31 #include "subscribe_recorder.h"
32 #include "sync_types.h"
33 
34 using namespace testing::ext;
35 using namespace DistributedDB;
36 using namespace DistributedDBUnitTest;
37 using namespace std;
38 
39 namespace {
40     string g_testDir;
41     const string SCHEMA_STORE_ID = "kv_store_sync_schema_test";
42     const std::string DEVICE_A = "deviceA";
43     const std::string DEVICE_B = "deviceB";
44     const std::string DEVICE_C = "deviceC";
45 
46     KvStoreDelegateManager g_schemaMgr(SCHEMA_APP_ID, USER_ID);
47     KvStoreConfig g_config;
48     DistributedDBToolsUnitTest g_tool;
49     DBStatus g_schemaKvDelegateStatus = INVALID_ARGS;
50     KvStoreNbDelegate* g_schemaKvDelegatePtr = nullptr;
51     VirtualCommunicatorAggregator* g_communicatorAggregator = nullptr;
52     KvVirtualDevice* g_deviceB = nullptr;
53     KvVirtualDevice* g_deviceC = nullptr;
54 
55     // the type of g_kvDelegateCallback is function<void(DBStatus, KvStoreDelegate*)>
56     auto g_schemaKvDelegateCallback = bind(&DistributedDBToolsUnitTest::KvStoreNbDelegateCallback,
57         placeholders::_1, placeholders::_2, std::ref(g_schemaKvDelegateStatus), std::ref(g_schemaKvDelegatePtr));
58     const string SCHEMA_STRING =
59     "{\"SCHEMA_VERSION\":\"1.0\","
60     "\"SCHEMA_MODE\":\"STRICT\","
61     "\"SCHEMA_DEFINE\":{"
62     "\"field_name1\":\"BOOL\","
63     "\"field_name2\":\"BOOL\","
64     "\"field_name3\":\"INTEGER, NOT NULL\","
65     "\"field_name4\":\"LONG, DEFAULT 100\","
66     "\"field_name5\":\"DOUBLE, NOT NULL, DEFAULT 3.14\","
67     "\"field_name6\":\"STRING, NOT NULL, DEFAULT '3.1415'\","
68     "\"field_name7\":\"LONG, DEFAULT 100\","
69     "\"field_name8\":\"LONG, DEFAULT 100\","
70     "\"field_name9\":\"LONG, DEFAULT 100\","
71     "\"field_name10\":\"LONG, DEFAULT 100\""
72     "},"
73     "\"SCHEMA_INDEXES\":[\"$.field_name1\", \"$.field_name2\"]}";
74 
75     const std::string SCHEMA_VALUE1 =
76     "{\"field_name1\":true,"
77     "\"field_name2\":false,"
78     "\"field_name3\":10,"
79     "\"field_name4\":20,"
80     "\"field_name5\":3.14,"
81     "\"field_name6\":\"3.1415\","
82     "\"field_name7\":100,"
83     "\"field_name8\":100,"
84     "\"field_name9\":100,"
85     "\"field_name10\":100}";
86 
87     const std::string SCHEMA_VALUE2 =
88     "{\"field_name1\":false,"
89     "\"field_name2\":true,"
90     "\"field_name3\":100,"
91     "\"field_name4\":200,"
92     "\"field_name5\":3.14,"
93     "\"field_name6\":\"3.1415\","
94     "\"field_name7\":100,"
95     "\"field_name8\":100,"
96     "\"field_name9\":100,"
97     "\"field_name10\":100}";
98 
99 class DistributedDBSingleVerP2PSubscribeSyncTest : public testing::Test {
100 public:
101     static void SetUpTestCase(void);
102     static void TearDownTestCase(void);
103     void SetUp();
104     void TearDown();
105 protected:
106     static void WaitUntilNotify(KvVirtualDevice &virtualDevice);
107 };
108 
SetUpTestCase(void)109 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUpTestCase(void)
110 {
111     /**
112      * @tc.setup: Init datadir and Virtual Communicator.
113      */
114     DistributedDBToolsUnitTest::TestDirInit(g_testDir);
115     g_config.dataDir = g_testDir;
116     g_schemaMgr.SetKvStoreConfig(g_config);
117 
118     string dir = g_testDir + "/single_ver";
119     DIR* dirTmp = opendir(dir.c_str());
120     if (dirTmp == nullptr) {
121         OS::MakeDBDirectory(dir);
122     } else {
123         closedir(dirTmp);
124     }
125 
126     g_communicatorAggregator = new (std::nothrow) VirtualCommunicatorAggregator();
127     ASSERT_TRUE(g_communicatorAggregator != nullptr);
128     RuntimeContext::GetInstance()->SetCommunicatorAggregator(g_communicatorAggregator);
129 }
130 
TearDownTestCase(void)131 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDownTestCase(void)
132 {
133     /**
134      * @tc.teardown: Release virtual Communicator and clear data dir.
135      */
136     if (DistributedDBToolsUnitTest::RemoveTestDbFiles(g_testDir) != 0) {
137         LOGE("rm test db files error!");
138     }
139     RuntimeContext::GetInstance()->SetCommunicatorAggregator(nullptr);
140 }
141 
SetUp(void)142 void DistributedDBSingleVerP2PSubscribeSyncTest::SetUp(void)
143 {
144     DistributedDBToolsUnitTest::PrintTestCaseInfo();
145     /**
146      * @tc.setup: create virtual device B and get a KvStoreNbDelegate as deviceA
147      */
148     g_deviceB = new (std::nothrow) KvVirtualDevice(DEVICE_B);
149     ASSERT_TRUE(g_deviceB != nullptr);
150     VirtualSingleVerSyncDBInterface *syncInterfaceB = new (std::nothrow) VirtualSingleVerSyncDBInterface();
151     ASSERT_TRUE(syncInterfaceB != nullptr);
152     ASSERT_EQ(g_deviceB->Initialize(g_communicatorAggregator, syncInterfaceB), E_OK);
153     g_deviceC = new (std::nothrow) KvVirtualDevice(DEVICE_C);
154     ASSERT_TRUE(g_deviceC != nullptr);
155     VirtualSingleVerSyncDBInterface *syncInterfaceC = new (std::nothrow) VirtualSingleVerSyncDBInterface();
156     ASSERT_TRUE(syncInterfaceC != nullptr);
157     ASSERT_EQ(g_deviceC->Initialize(g_communicatorAggregator, syncInterfaceC), E_OK);
158 }
159 
TearDown(void)160 void DistributedDBSingleVerP2PSubscribeSyncTest::TearDown(void)
161 {
162     /**
163      * @tc.teardown: Release device A, B
164      */
165     if (g_schemaKvDelegatePtr != nullptr) {
166         ASSERT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
167         g_schemaKvDelegatePtr = nullptr;
168         DBStatus status = g_schemaMgr.DeleteKvStore(SCHEMA_STORE_ID);
169         LOGD("delete kv store status %d", status);
170         ASSERT_TRUE(status == OK);
171     }
172     if (g_deviceB != nullptr) {
173         delete g_deviceB;
174         g_deviceB = nullptr;
175     }
176     if (g_deviceC != nullptr) {
177         delete g_deviceC;
178         g_deviceC = nullptr;
179     }
180     PermissionCheckCallbackV2 nullCallback;
181     EXPECT_EQ(g_schemaMgr.SetPermissionCheckCallback(nullCallback), OK);
182 }
183 
WaitUntilNotify(KvVirtualDevice & virtualDevice)184 void DistributedDBSingleVerP2PSubscribeSyncTest::WaitUntilNotify(KvVirtualDevice &virtualDevice)
185 {
186     bool notify = false;
187     std::condition_variable cv;
188     std::mutex notifyMutex;
189     virtualDevice.SetPushNotifier([&notify, &cv, &notifyMutex](const std::string &) {
190         {
191             std::lock_guard<std::mutex> autoLock(notifyMutex);
192             notify = true;
193         }
194         cv.notify_all();
195     });
196     {
197         LOGI("Begin wait notify");
198         std::unique_lock<std::mutex> uniqueLock(notifyMutex);
199         (void)cv.wait_for(uniqueLock, std::chrono::milliseconds(DBConstant::MIN_TIMEOUT), [&notify]() {
200             return notify;
201         });
202         LOGI("End wait notify");
203     }
204     virtualDevice.SetPushNotifier(nullptr);
205     std::this_thread::sleep_for(std::chrono::seconds(1));
206 }
207 
InitSubSchemaDb()208 void InitSubSchemaDb()
209 {
210     g_config.dataDir = g_testDir;
211     g_schemaMgr.SetKvStoreConfig(g_config);
212     KvStoreNbDelegate::Option option;
213     option.schema = SCHEMA_STRING;
214     g_schemaMgr.GetKvStore(SCHEMA_STORE_ID, option, g_schemaKvDelegateCallback);
215     ASSERT_TRUE(g_schemaKvDelegateStatus == OK);
216     ASSERT_TRUE(g_schemaKvDelegatePtr != nullptr);
217 }
218 
CheckUnFinishedMap(uint32_t sizeA,uint32_t sizeB,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)219 void CheckUnFinishedMap(uint32_t sizeA, uint32_t sizeB, std::vector<std::string> &deviceAQueies,
220     std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
221 {
222     std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
223     subManager.GetAllUnFinishSubQueries(allSyncQueries);
224     ASSERT_TRUE(allSyncQueries[DEVICE_A].size() == sizeA);
225     ASSERT_TRUE(allSyncQueries[DEVICE_B].size() == sizeB);
226     for (auto &item : allSyncQueries[DEVICE_A]) {
227         std::string queryId = item.GetIdentify();
228         ASSERT_TRUE(std::find(deviceAQueies.begin(), deviceAQueies.end(), queryId) != deviceAQueies.end());
229     }
230     for (auto &item : allSyncQueries[DEVICE_B]) {
231         std::string queryId = item.GetIdentify();
232         ASSERT_TRUE(std::find(deviceBQueies.begin(), deviceBQueies.end(), queryId) != deviceBQueies.end());
233     }
234 }
235 
InitLocalSubscribeMap(QuerySyncObject & queryCommonObj,std::map<std::string,QuerySyncObject> & queryMap,std::vector<std::string> & deviceAQueies,std::vector<std::string> & deviceBQueies,SubscribeManager & subManager)236 void InitLocalSubscribeMap(QuerySyncObject &queryCommonObj, std::map<std::string, QuerySyncObject> &queryMap,
237     std::vector<std::string> &deviceAQueies, std::vector<std::string> &deviceBQueies, SubscribeManager &subManager)
238 {
239     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
240     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
241     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
242     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryCommonObj) == E_OK);
243     queryMap[queryCommonObj.GetIdentify()] = queryCommonObj;
244     deviceAQueies.push_back(queryCommonObj.GetIdentify());
245     deviceBQueies.push_back(queryCommonObj.GetIdentify());
246     for (int i = 0; i < 3; i++) { // 3 subscribe
247         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
248         deviceAQueies.push_back(querySyncObj.GetIdentify());
249         queryMap[querySyncObj.GetIdentify()] = querySyncObj;
250         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
251         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_A, querySyncObj) == E_OK);
252     }
253     for (int i = 0; i < 1; i++) {
254         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('b' + i)}));
255         deviceBQueies.push_back(querySyncObj.GetIdentify());
256         queryMap[querySyncObj.GetIdentify()] = querySyncObj;
257         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
258         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(DEVICE_B, querySyncObj) == E_OK);
259     }
260 }
261 /**
262  * @tc.name: SubscribeRequestTest001
263  * @tc.desc: test Serialize/DoSerialize SubscribeRequest
264  * @tc.type: FUNC
265  * @tc.require: AR000FN6G9
266  * @tc.author: zhuwentao
267  */
268 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeRequestTest001, TestSize.Level1)
269 {
270     /**
271      * @tc.steps: step1. prepare a SubscribeRequest.
272      */
273     auto packet = new (std::nothrow) SubscribeRequest;
274     ASSERT_TRUE(packet != nullptr);
275     packet->SetPacketHead(100, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
276     Query query = Query::Select().EqualTo("$.field_name1", 1);
277     QuerySyncObject syncQuery(query);
278     packet->SetQuery(syncQuery);
279 
280     /**
281      * @tc.steps: step2. put the SubscribeRequest Packet into a message.
282      */
283     Message msg;
284     msg.SetExternalObject(packet);
285     msg.SetMessageId(CONTROL_SYNC_MESSAGE);
286     msg.SetMessageType(TYPE_REQUEST);
287 
288     /**
289      * @tc.steps: step3. Serialization the message to a buffer.
290      */
291     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
292     LOGE("test leng = %d", len);
293     uint8_t *buffer = new (nothrow) uint8_t[len];
294     ASSERT_TRUE(buffer != nullptr);
295     ASSERT_EQ(SingleVerSerializeManager::Serialization(buffer, len, &msg), E_OK);
296 
297     /**
298      * @tc.steps: step4. DeSerialization the buffer to a message.
299      */
300     Message outMsg;
301     outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
302     outMsg.SetMessageType(TYPE_REQUEST);
303     ASSERT_EQ(SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg), E_OK);
304 
305     /**
306      * @tc.steps: step5. checkout the outMsg.
307      * @tc.expected: step5. outMsg equal the the in msg
308      */
309     auto outPacket = outMsg.GetObject<SubscribeRequest>();
310     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
311     EXPECT_EQ(outPacket->GetSendCode(), 100);
312     EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
313     EXPECT_EQ(outPacket->GetFlag(), 1u);
314     EXPECT_EQ(outPacket->GetQuery().GetIdentify(), syncQuery.GetIdentify());
315     delete[] buffer;
316 }
317 
318 /**
319  * @tc.name: ControlAckTest001
320  * @tc.desc: test Serialize/DoSerialize ControlAckPacket
321  * @tc.type: FUNC
322  * @tc.require: AR000FN6G9
323  * @tc.author: zhuwentao
324  */
325 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, ControlAckTest001, TestSize.Level1)
326 {
327     /**
328      * @tc.steps: step1. prepare a ControlAckPacket.
329      */
330     ControlAckPacket packet;
331     packet.SetPacketHead(-E_NOT_SUPPORT, SOFTWARE_VERSION_CURRENT, SUBSCRIBE_QUERY_CMD, 1);
332 
333     /**
334      * @tc.steps: step2. put the QuerySyncAckPacket into a message.
335      */
336     Message msg;
337     msg.SetCopiedObject(packet);
338     msg.SetMessageId(CONTROL_SYNC_MESSAGE);
339     msg.SetMessageType(TYPE_RESPONSE);
340 
341     /**
342      * @tc.steps: step3. Serialization the message to a buffer.
343      */
344     int len = static_cast<int>(SingleVerSerializeManager::CalculateLen(&msg));
345     LOGE("test leng = %d", len);
346     uint8_t *buffer = new (nothrow) uint8_t[len];
347     ASSERT_TRUE(buffer != nullptr);
348     int errCode = SingleVerSerializeManager::Serialization(buffer, len, &msg);
349     ASSERT_EQ(errCode, E_OK);
350 
351     /**
352      * @tc.steps: step4. DeSerialization the buffer to a message.
353      */
354     Message outMsg;
355     outMsg.SetMessageId(CONTROL_SYNC_MESSAGE);
356     outMsg.SetMessageType(TYPE_RESPONSE);
357     errCode = SingleVerSerializeManager::DeSerialization(buffer, len, &outMsg);
358     ASSERT_EQ(errCode, E_OK);
359 
360     /**
361      * @tc.steps: step5. checkout the outMsg.
362      * @tc.expected: step5. outMsg equal the the in msg
363      */
364     auto outPacket = outMsg.GetObject<ControlAckPacket>();
365     EXPECT_EQ(outPacket->GetVersion(), SOFTWARE_VERSION_CURRENT);
366     EXPECT_EQ(outPacket->GetRecvCode(), -E_NOT_SUPPORT);
367     EXPECT_EQ(outPacket->GetcontrolCmdType(), SUBSCRIBE_QUERY_CMD);
368     EXPECT_EQ(outPacket->GetFlag(), 1u);
369     delete[] buffer;
370 }
371 
372 /**
373  * @tc.name: subscribeManager001
374  * @tc.desc: test subscribe class subscribe local function with one device
375  * @tc.type: FUNC
376  * @tc.require: AR000FN6G9
377  * @tc.author: zhuwentao
378  */
379 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager001, TestSize.Level1)
380 {
381     SubscribeManager subManager;
382     std::string device = "device_A";
383     /**
384      * @tc.steps: step1. test one device limit four subscribe queries in local map
385      */
386     LOGI("============step 1============");
387     for (int i = 0; i < 4; i++) {
388         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
389         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj) == E_OK);
390         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj) == E_OK);
391     }
392     std::vector<QuerySyncObject> subscribeQueries;
393     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
394     ASSERT_TRUE(subscribeQueries.size() == 4);
395     subscribeQueries.clear();
396     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
397     int errCode = subManager.ReserveLocalSubscribeQuery(device, querySyncObj1);
398     ASSERT_TRUE(errCode != E_OK);
399     /**
400      * @tc.steps: step2. allow to subscribe existed query
401      */
402     LOGI("============step 2============");
403     QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
404     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
405     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
406     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
407     ASSERT_TRUE(subscribeQueries.size() == 4);
408     subscribeQueries.clear();
409     /**
410      * @tc.steps: step3. unsubscribe no existed queries
411      */
412     LOGI("============step 3============");
413     subManager.RemoveLocalSubscribeQuery(device, querySyncObj1);
414     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
415     ASSERT_TRUE(subscribeQueries.size() == 4);
416     subscribeQueries.clear();
417     /**
418      * @tc.steps: step4. unsubscribe queries
419      */
420     LOGI("============step 4============");
421     for (int i = 0; i < 4; i++) {
422         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
423         subManager.RemoveLocalSubscribeQuery(device, querySyncObj);
424     }
425     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
426     ASSERT_TRUE(subscribeQueries.size() == 0);
427 
428     /**
429      * @tc.steps: step5. reserve twice while subscribe queries
430      */
431     LOGI("============step 5============");
432     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
433     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
434     ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device, querySyncObj2) == E_OK);
435     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
436     ASSERT_TRUE(subscribeQueries.size() == 1);
437     subscribeQueries.clear();
438     subManager.RemoveLocalSubscribeQuery(device, querySyncObj2);
439     subManager.GetLocalSubscribeQueries(device, subscribeQueries);
440     ASSERT_TRUE(subscribeQueries.size() == 0);
441 }
442 
443 /**
444  * @tc.name: subscribeManager002
445  * @tc.desc: test subscribe class subscribe remote function with one device
446  * @tc.type: FUNC
447  * @tc.require: AR000FN6G9
448  * @tc.author: zhuwentao
449  */
450 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager002, TestSize.Level1)
451 {
452     SubscribeManager subManager;
453     std::string device = "device_A";
454     /**
455      * @tc.steps: step1. test one device limit four subscribe queries in remote map
456      */
457     LOGI("============step 1============");
458     for (int i = 0; i < 4; i++) {
459         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
460         ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
461         ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj) == E_OK);
462     }
463     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 4)}));
464     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj1) != E_OK);
465     std::vector<std::string> subscribeQueryId;
466     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
467 ASSERT_TRUE(subscribeQueryId.size() == 4);
468     subscribeQueryId.clear();
469     /**
470      * @tc.steps: step2. allow to subscribe existed query
471      */
472     LOGI("============step 2============");
473     QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 3)}));
474     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
475     ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device, querySyncObj2) == E_OK);
476     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
477     ASSERT_TRUE(subscribeQueryId.size() == 4);
478     subscribeQueryId.clear();
479     /**
480      * @tc.steps: step3. unsubscribe no existed queries
481      */
482     LOGI("============step 3============");
483     subManager.RemoveRemoteSubscribeQuery(device, querySyncObj1);
484     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
485     ASSERT_TRUE(subscribeQueryId.size() == 4);
486     subscribeQueryId.clear();
487     /**
488      * @tc.steps: step4. unsubscribe queries
489      */
490     LOGI("============step 4============");
491     for (int i = 0; i < 4; i++) {
492         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
493         subManager.RemoveRemoteSubscribeQuery(device, querySyncObj);
494     }
495     subManager.GetRemoteSubscribeQueryIds(device, subscribeQueryId);
496     ASSERT_TRUE(subscribeQueryId.size() == 0);
497 }
498 
499 /**
500  * @tc.name: subscribeManager003
501  * @tc.desc: test subscribe class subscribe remote function with multi device
502  * @tc.type: FUNC
503  * @tc.require: AR000FN6G9
504  * @tc.author: zhuwentao
505  */
506 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager003, TestSize.Level1)
507 {
508     SubscribeManager subManager;
509     std::string device = "device_";
510     std::vector<QuerySyncObject> subscribeQueries;
511     /**
512      * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
513      */
514     LOGI("============step 1============");
515     QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
516     for (int i = 0; i < 32; i++) {
517         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
518         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
519     }
520     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
521     for (int i = 0; i < 32; i++) {
522         subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
523         ASSERT_TRUE(subscribeQueries.size() == 1);
524         subscribeQueries.clear();
525     }
526     /**
527      * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
528      */
529     LOGI("============step 2============");
530     for (int i = 0; i < 32; i++) {
531         subManager.ClearLocalSubscribeQuery(device + std::to_string(i));
532         subManager.GetLocalSubscribeQueries(device + std::to_string(i), subscribeQueries);
533         ASSERT_TRUE(subscribeQueries.size() == 0);
534         subscribeQueries.clear();
535     }
536     /**
537      * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
538      */
539     LOGI("============step 3============");
540     for (int i = 0; i < 8; i++) {
541         QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
542         ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
543         ASSERT_TRUE(subManager.ActiveLocalSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
544     }
545     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
546     ASSERT_TRUE(subManager.ReserveLocalSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
547 }
548 
549 /**
550  * @tc.name: subscribeManager004
551  * @tc.desc: test subscribe class subscribe remote function with multi device
552  * @tc.type: FUNC
553  * @tc.require: AR000FN6G9
554  * @tc.author: zhuwentao
555  */
556 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager004, TestSize.Level1)
557 {
558     SubscribeManager subManager;
559     std::string device = "device_";
560     std::vector<std::string> subscribeQueryId;
561     /**
562      * @tc.steps: step1. test mutil device limit 32 devices in remote map and check each device has one subscribe
563      */
564     LOGI("============step 1============");
565     QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 1)}));
566     for (int i = 0; i < 32; i++) {
567         ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
568         ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj) == E_OK);
569     }
570     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(33), querySyncObj) != E_OK);
571     for (int i = 0; i < 32; i++) {
572         subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
573         ASSERT_TRUE(subscribeQueryId.size() == 1);
574         subscribeQueryId.clear();
575     }
576     /**
577      * @tc.steps: step2. clear remote subscribe query map and check each device has no subscribe
578      */
579     LOGI("============step 2============");
580     for (int i = 0; i < 32; i++) {
581         subManager.ClearRemoteSubscribeQuery(device + std::to_string(i));
582         subManager.GetRemoteSubscribeQueryIds(device + std::to_string(i), subscribeQueryId);
583         ASSERT_TRUE(subscribeQueryId.size() == 0);
584         subscribeQueryId.clear();
585     }
586     subManager.ClearRemoteSubscribeQuery(device);
587     /**
588      * @tc.steps: step3. test mutil device limit 8 queries in db and check each device has one subscribe
589      */
590     LOGI("============step 3============");
591     for (int i = 0; i < 8; i++) {
592         QuerySyncObject querySyncObj2(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
593         ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
594         ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(device + std::to_string(i), querySyncObj2) == E_OK);
595     }
596     QuerySyncObject querySyncObj1(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + 8)}));
597     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(device + std::to_string(8), querySyncObj1) != E_OK);
598 }
599 
600 /**
601  * @tc.name: subscribeManager005
602  * @tc.desc: test subscribe class subscribe remote function with put into unfinished map
603  * @tc.type: FUNC
604  * @tc.require: AR000FN6G9
605  * @tc.author: zhuwentao
606  */
607 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager005, TestSize.Level1)
608 {
609     SubscribeManager subManager;
610     std::vector<QuerySyncObject> subscribeQueries;
611     std::map<std::string, QuerySyncObject> queryMap;
612     std::vector<std::string> deviceAQueies;
613     std::vector<std::string> deviceBQueies;
614     QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
615     /**
616      * @tc.steps: step1. test one devices has 4 subscribes and another has 2 in local map, put into unfinished map
617      */
618     LOGI("============step 1============");
619     InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
620     /**
621      * @tc.steps: step2. check all device unFinished subscribe queries and put into unfinished map
622      */
623     LOGI("============step 2============");
624     subManager.GetLocalSubscribeQueries(DEVICE_A, subscribeQueries);
625     ASSERT_TRUE(subscribeQueries.size() == 4);
626     subManager.PutLocalUnFinishedSubQueries(DEVICE_A, subscribeQueries);
627     subscribeQueries.clear();
628     subManager.GetLocalSubscribeQueries(DEVICE_B, subscribeQueries);
629     ASSERT_TRUE(subscribeQueries.size() == 2);
630     subManager.PutLocalUnFinishedSubQueries(DEVICE_B, subscribeQueries);
631     subscribeQueries.clear();
632     /**
633      * @tc.steps: step3. get all device unFinished subscribe queries and check
634      */
635     LOGI("============step 3============");
636     CheckUnFinishedMap(4, 2, deviceAQueies, deviceBQueies, subManager);
637     /**
638      * @tc.steps: step4. active some subscribe queries
639      */
640     LOGI("============step 4============");
641     subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
642     subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryMap[deviceAQueies[3]]);
643     subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryMap[deviceBQueies[1]]);
644     deviceAQueies.erase(deviceAQueies.begin() + 3);
645     deviceAQueies.erase(deviceAQueies.begin());
646     queryMap.erase(queryMap[deviceBQueies[1]].GetIdentify());
647     deviceBQueies.erase(deviceBQueies.begin() + 1);
648     /**
649      * @tc.steps: step5. get all device unFinished subscribe queries and check
650      */
651     LOGI("============step 5============");
652     CheckUnFinishedMap(2, 1, deviceAQueies, deviceBQueies, subManager);
653     /**
654      * @tc.steps: step6. remove left subscribe queries
655      */
656     LOGI("============step 6============");
657     for (int i = 0; i < 2; i++) {
658         QuerySyncObject querySyncObj(Query::Select().PrefixKey({'a', static_cast<uint8_t>('a' + i)}));
659         subManager.RemoveLocalSubscribeQuery(DEVICE_A, querySyncObj);
660     }
661     subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
662     subManager.RemoveLocalSubscribeQuery(DEVICE_B, queryCommonObj);
663     /**
664      * @tc.steps: step7. get all device unFinished subscribe queries and check
665      */
666     LOGI("============step 7============");
667     CheckUnFinishedMap(0, 0, deviceAQueies, deviceBQueies, subManager);
668 }
669 
670 /**
671  * @tc.name: subscribeManager006
672  * @tc.desc: test exception branch of subscribe manager
673  * @tc.type: FUNC
674  * @tc.require:
675  * @tc.author: zhangshijie
676  */
677 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeManager006, TestSize.Level1)
678 {
679     /**
680      * @tc.steps: step1. active a query sync object which is not in local subscribe map
681      * @tc.expected:step1 return -E_INTERNAL_ERROR
682      */
683     SubscribeManager subManager;
684     QuerySyncObject queryCommonObj(Query::Select().PrefixKey({'a'}));
685     EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_A, queryCommonObj), -E_INTERNAL_ERROR);
686     subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
687     subManager.RemoveLocalSubscribeQuery(DEVICE_A, queryCommonObj);
688     std::vector<QuerySyncObject> subscribeQueries;
689     subManager.PutLocalUnFinishedSubQueries(DEVICE_A, subscribeQueries);
690     std::map<std::string, std::vector<QuerySyncObject>> allSyncQueries;
691     subManager.GetAllUnFinishSubQueries(allSyncQueries);
692 
693     /**
694      * @tc.steps: step2. call IsLastRemoteContainSubscribe with a device not in remote subscribe map
695      * @tc.expected: step2 return false
696      */
697     std::string queryId = "queryId";
698     EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
699 
700     /**
701      * @tc.steps: step3. active local subscribe with a device which is not in local subscribe map and
702      * a query sync object which is in local subscribe map
703      * @tc.expected: step3 return -E_INTERNAL_ERROR
704      */
705     std::vector<std::string> deviceAQueies;
706     std::vector<std::string> deviceBQueies;
707     std::map<std::string, QuerySyncObject> queryMap;
708     InitLocalSubscribeMap(queryCommonObj, queryMap, deviceAQueies, deviceBQueies, subManager);
709     ASSERT_TRUE(queryMap.size() > 0);
710     std::string devNotExists = "device_not_exists";
711     EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(devNotExists, queryMap.begin()->second), -E_INTERNAL_ERROR);
712     QuerySyncObject queryObj(Query::Select().PrefixKey({'b'}));
713     EXPECT_EQ(subManager.ReserveLocalSubscribeQuery("test_dev", queryObj), E_OK);
714     subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryObj);
715 
716     EXPECT_EQ(subManager.ActiveLocalSubscribeQuery(DEVICE_B, queryObj), -E_INTERNAL_ERROR);
717     subManager.DeleteLocalSubscribeQuery(DEVICE_A, queryCommonObj);
718     ASSERT_TRUE(subManager.ReserveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
719     ASSERT_TRUE(subManager.ActiveRemoteSubscribeQuery(DEVICE_A, queryCommonObj) == E_OK);
720     EXPECT_EQ(subManager.IsLastRemoteContainSubscribe(DEVICE_A, queryId), false);
721     deviceAQueies.push_back(DEVICE_A);
722     EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), E_OK);
723 
724     /**
725      * @tc.steps: step4. add MAX_DEVICES_NUM device, then call LocalSubscribeLimitCheck
726      * @tc.expected: step4 return -E_MAX_LIMITS
727      */
728     for (size_t i = 0 ; i < MAX_DEVICES_NUM; i++) {
729         deviceAQueies.push_back("device_" + std::to_string(i));
730     }
731     EXPECT_EQ(subManager.LocalSubscribeLimitCheck(deviceAQueies, queryCommonObj), -E_MAX_LIMITS);
732 }
733 
734 /**
735  * @tc.name: subscribeSync001
736  * @tc.desc: test subscribe normal sync
737  * @tc.type: FUNC
738  * @tc.require: AR000FN6G9
739  * @tc.author: zhuwentao
740  */
741 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync001, TestSize.Level1)
742 {
743     /**
744      * @tc.steps: step1. InitSchemaDb
745     */
746     LOGI("============step 1============");
747     InitSubSchemaDb();
748     DBStatus status = OK;
749     std::vector<std::string> devices;
750     devices.push_back(g_deviceB->GetDeviceId());
751     Query query = Query::Select().EqualTo("$.field_name1", 1);
752     QuerySyncObject querySyncObj(query);
753 
754     /**
755      * @tc.steps: step2. deviceB subscribe query to deviceA
756     */
757     LOGI("============step 2============");
758     g_deviceB->Subscribe(querySyncObj, true, 1);
759 
760     /**
761      * @tc.steps: step3. deviceA put {key1, SCHEMA_VALUE1} and wait 1s
762     */
763     LOGI("============step 3============");
764     Value value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
765     Key key = {'1'};
766     status = g_schemaKvDelegatePtr->Put(key, value);
767     EXPECT_EQ(status, OK);
768     WaitUntilNotify(*g_deviceB);
769     /**
770      * @tc.steps: step4. deviceB has {key11, SCHEMA_VALUE1}
771     */
772     LOGI("============step 4============");
773     VirtualDataItem item;
774     g_deviceB->GetData(key, item);
775     EXPECT_TRUE(item.value == value);
776 
777     /**
778      * @tc.steps: step5. deviceB unsubscribe query to deviceA
779     */
780     g_deviceB->UnSubscribe(querySyncObj, true, 2);
781 
782     /**
783      * @tc.steps: step5. deviceA put {key2, SCHEMA_VALUE1} and wait 1s
784     */
785     LOGI("============step 5============");
786     Value value2(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end());
787     Key key2 = {'2'};
788     status = g_schemaKvDelegatePtr->Put(key2, value2);
789     EXPECT_EQ(status, OK);
790     WaitUntilNotify(*g_deviceB);
791     /**
792      * @tc.steps: step6. deviceB don't has {key2, SCHEMA_VALUE1}
793     */
794     LOGI("============step 6============");
795     VirtualDataItem item2;
796     EXPECT_TRUE(g_deviceB->GetData(key2, item2) != E_OK);
797 }
798 
799 /**
800  * @tc.name: subscribeSync002
801  * @tc.desc: test subscribe sync over 32 devices,limit,orderBy
802  * @tc.type: FUNC
803  * @tc.require: AR000FN6G9
804  * @tc.author: zhuwentao
805  */
806 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync002, TestSize.Level1)
807 {
808     /**
809      * @tc.steps: step1. InitSchemaDb
810     */
811     LOGI("============step 1============");
812     InitSubSchemaDb();
813     std::vector<std::string> devices;
814     std::string device = "device_";
815     Query query = Query::Select().EqualTo("$.field_name1", 1);
816 
817     /**
818      * @tc.steps: step2. deviceA subscribe query to 33 devices, and return overlimit
819     */
820     LOGI("============step 2============");
821     for (int i = 0; i < 33; i++) {
822         devices.push_back(device + std::to_string(i));
823     }
824     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == OVER_MAX_LIMITS);
825 
826     /**
827      * @tc.steps: step3. deviceA subscribe query with limit
828     */
829     LOGI("============step 3============");
830     devices.clear();
831     devices.push_back("device_B");
832     Query query2 = Query::Select().EqualTo("$.field_name1", 1).Limit(20, 0);
833     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query2, true) == NOT_SUPPORT);
834 
835     /**
836      * @tc.steps: step4. deviceA subscribe query with orderBy
837     */
838     LOGI("============step 4============");
839     Query query3 = Query::Select().EqualTo("$.field_name1", 1).OrderBy("$.field_name7");
840     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query3, true) == NOT_SUPPORT);
841 }
842 
843 /**
844  * @tc.name: subscribeSync003
845  * @tc.desc: test subscribe sync with inkeys query
846  * @tc.type: FUNC
847  * @tc.require: AR000GOHO7
848  * @tc.author: lidongwei
849  */
850 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync003, TestSize.Level1)
851 {
852     /**
853      * @tc.steps: step1. InitSchemaDb
854      */
855     LOGI("============step 1============");
856     InitSubSchemaDb();
857     std::vector<std::string> devices;
858     devices.push_back(g_deviceB->GetDeviceId());
859     g_deviceB->Online();
860 
861     /**
862      * @tc.steps: step2. deviceB subscribe inkeys(k2k4) query to deviceA
863      */
864     LOGI("============step 2============");
865     Query query = Query::Select().InKeys({KEY_2, KEY_4});
866     g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
867 
868     /**
869      * @tc.steps: step3. deviceA put k1-k5 and wait
870      */
871     LOGI("============step 3============");
872     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
873         {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
874         {KEY_2, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
875         {KEY_3, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
876         {KEY_4, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
877         {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
878     }));
879     WaitUntilNotify(*g_deviceB);
880 
881     /**
882      * @tc.steps: step4. deviceB has k2k4, has no k1k3k5
883      */
884     LOGI("============step 4============");
885     VirtualDataItem item;
886     EXPECT_EQ(g_deviceB->GetData(KEY_2, item), E_OK);
887     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
888     EXPECT_EQ(g_deviceB->GetData(KEY_4, item), E_OK);
889     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
890     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
891     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), -E_NOT_FOUND);
892     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
893 }
894 
895 /**
896  * @tc.name: subscribeSync004
897  * @tc.desc: test subscribe sync with inkeys query
898  * @tc.type: FUNC
899  * @tc.require: AR000GOHO7
900  * @tc.author: lidongwei
901  */
902 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync004, TestSize.Level1)
903 {
904     /**
905      * @tc.steps: step1. InitSchemaDb
906      */
907     LOGI("============step 1============");
908     InitSubSchemaDb();
909     std::vector<std::string> devices;
910     devices.push_back(g_deviceB->GetDeviceId());
911 
912     /**
913      * @tc.steps: step2. deviceB subscribe inkeys(k3k5) and equal to query to deviceA
914      */
915     LOGI("============step 2============");
916     Query query = Query::Select().InKeys({KEY_3, KEY_5}).EqualTo("$.field_name3", 100); // 100 for test.
917     g_deviceB->Subscribe(QuerySyncObject(query), true, 2);
918 
919     /**
920      * @tc.steps: step3. deviceA put k1v2,k3v2,k5v1 and wait
921      */
922     LOGI("============step 3============");
923     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
924         {KEY_1, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
925         {KEY_3, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end())},
926         {KEY_5, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
927     }));
928     WaitUntilNotify(*g_deviceB);
929 
930     /**
931      * @tc.steps: step4. deviceB has k3, has no k1k5
932      */
933     LOGI("============step 4============");
934     VirtualDataItem item;
935     EXPECT_EQ(g_deviceB->GetData(KEY_3, item), E_OK);
936     EXPECT_EQ(item.value, Value(SCHEMA_VALUE2.begin(), SCHEMA_VALUE2.end()));
937     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
938     EXPECT_EQ(g_deviceB->GetData(KEY_5, item), -E_NOT_FOUND);
939 }
940 
941 /**
942  * @tc.name: subscribeSync005
943  * @tc.desc: test subscribe sync with inkeys query
944  * @tc.type: FUNC
945  * @tc.require: AR000GOHO7
946  * @tc.author: lidongwei
947  */
948 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync005, TestSize.Level1)
949 {
950     /**
951      * @tc.steps: step1. InitSchemaDb
952      */
953     LOGI("============step 1============");
954     InitSubSchemaDb();
955     std::vector<std::string> devices;
956     devices.push_back(g_deviceB->GetDeviceId());
957 
958     /**
959      * @tc.steps: step2. deviceB subscribe inkeys(k1, key6) and prefix key "k" query to deviceA
960      */
961     LOGI("============step 2============");
962     Key key6 { 'k', '6' };
963     Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
964     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
965 
966     /**
967      * @tc.steps: step3. deviceA put k1,key6 and wait
968      */
969     LOGI("============step 3============");
970     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
971         {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
972         {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
973     }));
974     WaitUntilNotify(*g_deviceB);
975 
976     /**
977      * @tc.steps: step4. deviceB has key6, has no k1
978      */
979     LOGI("============step 4============");
980     VirtualDataItem item;
981     EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
982     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
983     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
984 }
985 
986 
987 /**
988  * @tc.name: subscribeSync006
989  * @tc.desc: test one device unsubscribe no effect other device
990  * @tc.type: FUNC
991  * @tc.require: AR000GOHO7
992  * @tc.author: zhangqiquan
993  */
994 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync006, TestSize.Level1)
995 {
996     /**
997      * @tc.steps: step1. InitSchemaDb
998      */
999     LOGI("============step 1============");
1000     InitSubSchemaDb();
1001     std::vector<std::string> devices;
1002     devices.push_back(g_deviceB->GetDeviceId());
1003     devices.push_back(g_deviceC->GetDeviceId());
1004 
1005     /**
1006      * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1007      */
1008     LOGI("============step 2============");
1009     Key key6 { 'k', '6' };
1010     Query query = Query::Select().InKeys({KEY_1, key6}).PrefixKey({ 'k' });
1011     g_deviceB->Online();
1012     g_deviceC->Online();
1013     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1014     g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
1015 
1016     /**
1017      * @tc.steps: step3. deviceC unsubscribe
1018      */
1019     LOGI("============step 3============");
1020     g_deviceC->UnSubscribe(QuerySyncObject(query), true, 3);
1021 
1022     /**
1023      * @tc.steps: step4. deviceA put k1,key6 and wait
1024      */
1025     LOGI("============step 4============");
1026     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
1027         {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1028         {KEY_1, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1029     }));
1030     WaitUntilNotify(*g_deviceB);
1031 
1032     /**
1033      * @tc.steps: step5. deviceB has key6, has no k1
1034      */
1035     LOGI("============step 5============");
1036     VirtualDataItem item;
1037     EXPECT_EQ(g_deviceB->GetData(key6, item), E_OK);
1038     EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1039     EXPECT_EQ(g_deviceB->GetData(KEY_1, item), -E_NOT_FOUND);
1040 }
1041 
1042 /**
1043  * @tc.name: subscribeSync007
1044  * @tc.desc: test subscribe query with order by write time
1045  * @tc.type: FUNC
1046  * @tc.require: AR000H5VLO
1047  * @tc.author: zhuwentao
1048  */
1049 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync007, TestSize.Level1)
1050 {
1051     /**
1052      * @tc.steps: step1. InitSchemaDb
1053     */
1054     LOGI("============step 1============");
1055     InitSubSchemaDb();
1056     std::vector<std::string> devices = {"DEVICE_B"};
1057 
1058     /**
1059      * @tc.steps: step2. deviceA subscribe query with order by write time
1060      * * @tc.expected: step2. interface return not support
1061     */
1062     Query query = Query::Select().EqualTo("$.field_name1", 1).OrderByWriteTime(false);
1063     EXPECT_TRUE(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1064     EXPECT_TRUE(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true) == NOT_SUPPORT);
1065 }
1066 
1067 /**
1068  * @tc.name: SubscribeSync008
1069  * @tc.desc: test subscribe with reopen db
1070  * @tc.type: FUNC
1071  * @tc.require: AR000HGD0B
1072  * @tc.author: zhangqiquan
1073  */
1074 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync008, TestSize.Level1)
1075 {
1076     /**
1077      * @tc.steps: step1. InitSchemaDb
1078      */
1079     std::shared_ptr<DBInfoHandleTest> handleTest = std::make_shared<DBInfoHandleTest>();
1080     RuntimeConfig::SetDBInfoHandle(handleTest);
1081 
1082     LOGI("============step 1============");
1083     InitSubSchemaDb();
1084     std::vector<std::string> devices;
1085     devices.push_back(g_deviceB->GetDeviceId());
1086 
1087     /**
1088      * @tc.steps: step2. deviceB subscribe query to deviceA
1089      */
1090     LOGI("============step 2============");
1091     Key key6 { 'k', '6' };
1092     Query query = Query::Select();
1093     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1094 
1095     /**
1096      * @tc.steps: step3. deviceA put k1,key6 and wait
1097      */
1098     LOGI("============step 3============");
1099     EXPECT_EQ(OK, g_schemaKvDelegatePtr->PutBatch({
1100         {key6, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())},
1101     }));
1102     EXPECT_EQ(g_schemaMgr.CloseKvStore(g_schemaKvDelegatePtr), OK);
1103     g_schemaKvDelegatePtr = nullptr;
1104     InitSubSchemaDb();
1105     g_deviceB->Online();
1106     WaitUntilNotify(*g_deviceB);
1107 
1108     /**
1109      * @tc.steps: step4. deviceB has key6
1110      */
1111     LOGI("============step 4============");
1112     VirtualDataItem item;
1113     if (g_deviceB->GetData(key6, item) == E_OK) {
1114         EXPECT_EQ(item.value, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end()));
1115     }
1116     RuntimeConfig::SetDBInfoHandle(nullptr);
1117 }
1118 
1119 namespace {
CreateKvVirtualDevice(const std::string & deviceName)1120 KvVirtualDevice *CreateKvVirtualDevice(const std::string &deviceName)
1121 {
1122     KvVirtualDevice *device = nullptr;
1123     do {
1124         if (g_communicatorAggregator == nullptr) {
1125             break;
1126         }
1127         device = new (std::nothrow) KvVirtualDevice(deviceName);
1128         if (device == nullptr) {
1129             break;
1130         }
1131         auto interface = new (std::nothrow) VirtualSingleVerSyncDBInterface();
1132         if (interface == nullptr) {
1133             delete device;
1134             device = nullptr;
1135             break;
1136         }
1137         EXPECT_EQ(device->Initialize(g_communicatorAggregator, interface), E_OK);
1138     } while (false);
1139     return device;
1140 }
1141 }
1142 
1143 /**
1144  * @tc.name: SubscribeSync009
1145  * @tc.desc: test subscribe query with 33 device
1146  * @tc.type: FUNC
1147  * @tc.require: AR000H5VLO
1148  * @tc.author: zhangqiquan
1149  */
1150 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync009, TestSize.Level3)
1151 {
1152     /**
1153      * @tc.steps: step1. InitSchemaDb
1154      */
1155     LOGI("============step 1============");
1156     InitSubSchemaDb();
1157     const int maxDeviceCount = 32;
1158     std::vector<KvVirtualDevice *> devices;
1159     for (int i = 0; i < maxDeviceCount; ++i) {
1160         std::string deviceName = "D_" + std::to_string(i);
1161         auto device = CreateKvVirtualDevice(deviceName);
1162         EXPECT_NE(device, nullptr);
1163         if (device == nullptr) {
1164             continue;
1165         }
1166         devices.push_back(device);
1167     }
1168 
1169     /**
1170      * @tc.steps: step2. 33 device subscribe
1171      */
1172     LOGI("============step 2============");
1173     Query query = Query::Select();
1174     for (const auto &dev: devices) {
1175         dev->Online();
1176         dev->Subscribe(QuerySyncObject(query), true, 1); // sync id is 1
1177     }
1178     g_deviceB->Subscribe(QuerySyncObject(query), true, 1); // sync id is 1
1179     /**
1180      * @tc.steps: step3. 32 unsubscribe
1181      */
1182     LOGI("============step 3============");
__anonf239d4230502(std::map<std::string, int> res) 1183     SyncOperation::UserCallback callback = [](std::map<std::string, int> res) {
1184         ASSERT_EQ(res.size(), 1u);
1185         EXPECT_EQ(res["real_device"], SyncOperation::OP_FINISHED_ALL);
1186     };
1187     for (const auto &dev: devices) {
1188         dev->UnSubscribe(QuerySyncObject(query), true, 1, callback); // sync id is 1
1189         delete dev;
1190     }
1191 }
1192 
1193 /*
1194  * @tc.name: SubscribeSync010
1195  * @tc.desc: test subscribe query cache
1196  * @tc.type: FUNC
1197  * @tc.require: AR000H5VLO
1198  * @tc.author: zhangqiquan
1199  */
1200 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync010, TestSize.Level1)
1201 {
1202     SubscribeRecorder recorder;
1203     DBInfo dbInfo = {
1204         USER_ID,
1205         APP_ID,
1206         STORE_ID_1,
1207         false,
1208         true
1209     };
1210     Query query = Query::Select();
1211     QuerySyncObject querySyncObject(query);
1212     /**
1213      * @tc.steps: step1. Insert one record twice and remove
1214      */
1215     recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1216     recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1217     recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1218     std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1219     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1220     for (const auto &entry: subscribeQuery) {
1221         EXPECT_EQ(entry.second.size(), 0u);
1222     }
1223     /**
1224      * @tc.steps: step2. Remove no exist data
1225      */
1226     recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1227     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1228     for (const auto &entry: subscribeQuery) {
1229         EXPECT_EQ(entry.second.size(), 0u);
1230     }
1231     /**
1232      * @tc.steps: step3. insert two data and remove one data
1233      */
1234     recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1235     Query query2 = Query::Select().EqualTo("test", "test");
1236     recorder.RecordSubscribe(dbInfo, DEVICE_A, QuerySyncObject(query2));
1237     recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1238     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1239     for (const auto &entry: subscribeQuery) {
1240         EXPECT_EQ(entry.second.size(), 1u);
1241     }
1242     /**
1243      * @tc.steps: step4. remove no exist data
1244      */
1245     dbInfo.storeId = STORE_ID_2;
1246     recorder.RemoveRemoteSubscribe(dbInfo, DEVICE_A);
1247     dbInfo.storeId = STORE_ID_1;
1248     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1249     for (const auto &entry: subscribeQuery) {
1250         EXPECT_EQ(entry.second.size(), 1u);
1251     }
1252 }
1253 
1254 /*
1255  * @tc.name: SubscribeSync011
1256  * @tc.desc: test subscribe query cache
1257  * @tc.type: FUNC
1258  * @tc.require: AR000H5VLO
1259  * @tc.author: zhangqiquan
1260  */
1261 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync011, TestSize.Level1)
1262 {
1263     SubscribeRecorder recorder;
1264     DBInfo dbInfo = {
1265             USER_ID,
1266             APP_ID,
1267             STORE_ID_1,
1268             false,
1269             true
1270     };
1271     /**
1272      * @tc.steps: step1. Insert 2 record in db1 and 1 record in db2
1273      */
1274     Query query = Query::Select();
1275     QuerySyncObject querySyncObject(query);
1276     recorder.RecordSubscribe(dbInfo, DEVICE_A, querySyncObject);
1277     recorder.RecordSubscribe(dbInfo, DEVICE_B, querySyncObject);
1278     DBInfo dbInfo2 = dbInfo;
1279     dbInfo2.storeId = STORE_ID_2;
1280     recorder.RecordSubscribe(dbInfo2, DEVICE_B, querySyncObject);
1281     /**
1282      * @tc.steps: step2. Insert 2 record in db1
1283      */
1284     recorder.RemoveRemoteSubscribe(dbInfo);
1285     std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1286     recorder.GetSubscribeQuery(dbInfo, subscribeQuery);
1287     EXPECT_EQ(subscribeQuery.size(), 0u);
1288     recorder.GetSubscribeQuery(dbInfo2, subscribeQuery);
1289     EXPECT_EQ(subscribeQuery.size(), 1u);
1290     /**
1291      * @tc.steps: step3. Insert 1 record in db2
1292      */
1293     recorder.RemoveRemoteSubscribe(dbInfo2);
1294     subscribeQuery.clear();
1295     recorder.GetSubscribeQuery(dbInfo2, subscribeQuery);
1296     EXPECT_EQ(subscribeQuery.size(), 0u);
1297 }
1298 
1299 /**
1300  * @tc.name: subscribeSync012
1301  * @tc.desc: test one device unsubscribe no effect other device
1302  * @tc.type: FUNC
1303  * @tc.require: AR000GOHO7
1304  * @tc.author: zhangqiquan
1305  */
1306 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync012, TestSize.Level1)
1307 {
1308     /**
1309      * @tc.steps: step1. InitSchemaDb
1310      */
1311     LOGI("============step 1============");
1312     InitSubSchemaDb();
1313     std::vector<std::string> devices;
1314     devices.push_back(g_deviceB->GetDeviceId());
1315     devices.push_back(g_deviceC->GetDeviceId());
1316 
1317     /**
1318      * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1319      */
1320     LOGI("============step 2============");
1321     Key key6 { 'k', '6' };
1322     Query query = Query::Select();
1323     g_deviceB->Online();
1324     g_deviceC->Online();
1325     g_deviceB->Subscribe(QuerySyncObject(query), true, 3);
1326     g_deviceC->Subscribe(QuerySyncObject(query), true, 3);
1327 
1328     /**
1329      * @tc.steps: step3. deviceC unsubscribe
1330      */
1331     LOGI("============step 3============");
1332     g_deviceC->Offline();
1333 
1334     /**
1335      * @tc.steps: step4. deviceA put k1,key6 and wait
1336      */
1337     LOGI("============step 4============");
1338     const uint8_t putItemCount = 10u;
1339     std::vector<Key> dataKeys;
1340     for (uint8_t i = 0u; i < putItemCount; ++i) {
1341         Key key = { i };
1342         dataKeys.push_back(key);
1343         EXPECT_EQ(g_schemaKvDelegatePtr->Put(key, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())), OK);
1344         WaitUntilNotify(*g_deviceB);
1345     }
1346     /**
1347      * @tc.steps: step5. deviceB has key6, has no k1
1348      */
1349     LOGI("============step 5============");
1350     for (const auto &key: dataKeys) {
1351         VirtualDataItem item;
1352         EXPECT_EQ(g_deviceB->GetData(key, item), E_OK);
1353         EXPECT_EQ(g_deviceC->GetData(key, item), -E_NOT_FOUND);
1354     }
1355 }
1356 
1357 /*
1358  * @tc.name: SubscribeSync013
1359  * @tc.desc: test subscribe query cache with remote support false
1360  * @tc.type: FUNC
1361  * @tc.require: AR000H5VLO
1362  * @tc.author: zhangqiquan
1363  */
1364 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync013, TestSize.Level1)
1365 {
1366     std::shared_ptr<DBInfoHandleTest> handle = std::make_shared<DBInfoHandleTest>();
1367     RuntimeConfig::SetDBInfoHandle(handle);
1368     handle->SetLocalIsSupport(true);
1369     DBInfo dbInfo = {
1370         USER_ID,
1371         APP_ID,
1372         STORE_ID_1,
1373         false,
1374         true
1375     };
1376     RuntimeContext::GetInstance()->SetRemoteOptimizeCommunication(DEVICE_A, false);
1377     Query query = Query::Select();
1378     QuerySyncObject querySyncObject(query);
1379     /**
1380      * @tc.steps: step1. Insert one record
1381      */
1382     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1383     std::map<std::string, std::vector<QuerySyncObject>> subscribeQuery;
1384     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1385     EXPECT_EQ(subscribeQuery.size(), 1u);
1386     /**
1387      * @tc.steps: step2. Remove one record
1388      */
1389     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1390     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1391     EXPECT_EQ(subscribeQuery[DEVICE_A].size(), 0u);
1392     /**
1393      * @tc.steps: step3. Record again and remove by dbInfo and device
1394      */
1395     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1396     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo, DEVICE_A);
1397     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1398     EXPECT_EQ(subscribeQuery.size(), 0u);
1399     /**
1400      * @tc.steps: step4. Record again and remove by device
1401      */
1402     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1403     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(DEVICE_A);
1404     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1405     EXPECT_EQ(subscribeQuery.size(), 0u);
1406     /**
1407      * @tc.steps: step5. Record again and remove by dbInfo
1408      */
1409     RuntimeContext::GetInstance()->RecordRemoteSubscribe(dbInfo, DEVICE_A, querySyncObject);
1410     RuntimeContext::GetInstance()->RemoveRemoteSubscribe(dbInfo);
1411     RuntimeContext::GetInstance()->GetSubscribeQuery(dbInfo, subscribeQuery);
1412     EXPECT_EQ(subscribeQuery.size(), 0u);
1413     RuntimeConfig::SetDBInfoHandle(nullptr);
1414 }
1415 /**
1416  * @tc.name: SubscribeSync014
1417  * @tc.desc: test device subscribe with put a lot of times
1418  * @tc.type: FUNC
1419  * @tc.require: AR000GOHO7
1420  * @tc.author: zhangqiquan
1421  */
1422 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync014, TestSize.Level3)
1423 {
1424     /**
1425     * @tc.steps: step1. InitSchemaDb
1426     */
1427     LOGI("============step 1============");
1428     InitSubSchemaDb();
1429     std::vector<std::string> devices;
1430     devices.push_back(g_deviceB->GetDeviceId());
1431     /**
1432      * @tc.steps: step2. deviceB unsubscribe inkeys(k1, key6) and prefix key "k" query to deviceA
1433      */
1434     LOGI("============step 2============");
1435     Key key6 { 'k', '6' };
1436     Query query = Query::Select();
1437     g_deviceB->Online();
1438     g_deviceB->Subscribe(QuerySyncObject(query), true, 1);
1439     /**
1440      * @tc.steps: step3. deviceA put a lot of time
1441      * @tc.expected: step3 put performance was not effected by subscribe
1442      */
1443     LOGI("============step 4============");
1444     std::vector<Key> dataKeys;
1445     const uint64_t PUT_LIMIT_30S = 30 * 1000000; // 30s = 30 * 1000000us
1446     LOGD("BEGIN PUT");
1447     for (uint8_t i = 0u; i < 10u; ++i) { // loop 10 times
1448         Key key = { i };
1449         dataKeys.push_back(key);
1450         uint64_t curTime = 0;
1451         uint64_t lastTime = 0;
1452         EXPECT_EQ(OS::GetCurrentSysTimeInMicrosecond(curTime), E_OK);
1453         lastTime = curTime;
1454         EXPECT_EQ(g_schemaKvDelegatePtr->Put(key, Value(SCHEMA_VALUE1.begin(), SCHEMA_VALUE1.end())), OK);
1455         EXPECT_EQ(OS::GetCurrentSysTimeInMicrosecond(curTime), E_OK);
1456         EXPECT_LE(curTime - lastTime, PUT_LIMIT_30S);
1457     }
1458     LOGD("END PUT");
1459 }
1460 
1461 /**
1462  * @tc.name: subscribeSync015
1463  * @tc.desc: test subscribe query with range
1464  * @tc.type: FUNC
1465  * @tc.require: DTS2023112110763
1466  * @tc.author: mazhao
1467  */
1468 HWTEST_F(DistributedDBSingleVerP2PSubscribeSyncTest, SubscribeSync015, TestSize.Level1)
1469 {
1470     /**
1471      * @tc.steps: step1. InitSchemaDb
1472     */
1473     LOGI("============step 1============");
1474     InitSubSchemaDb();
1475     std::vector<std::string> devices = {"DEVICE_B"};
1476 
1477     /**
1478      * @tc.steps: step2. deviceA subscribe query by Range.
1479      * * @tc.expected: step2. interface return not support
1480     */
1481     Query query = Query::Select().Range({}, {});
1482     EXPECT_EQ(g_schemaKvDelegatePtr->SubscribeRemoteQuery(devices, nullptr, query, true), NOT_SUPPORT);
1483     EXPECT_EQ(g_schemaKvDelegatePtr->UnSubscribeRemoteQuery(devices, nullptr, query, true), NOT_SUPPORT);
1484 }
1485 } // namespace