1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "subscribe_recorder.h"
16
17 namespace DistributedDB {
RecordSubscribe(const DistributedDB::DBInfo & dbInfo,const DistributedDB::DeviceID & deviceId,const DistributedDB::QuerySyncObject & query)18 void SubscribeRecorder::RecordSubscribe(const DistributedDB::DBInfo &dbInfo, const DistributedDB::DeviceID &deviceId,
19 const DistributedDB::QuerySyncObject &query)
20 {
21 std::lock_guard<std::mutex> autoLock(subscribeMutex_);
22 const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
23 [&dbInfo](const SubscribeEntry &entry) {
24 return entry.dbInfo.userId == dbInfo.userId && entry.dbInfo.appId == dbInfo.appId &&
25 entry.dbInfo.storeId == dbInfo.storeId;
26 });
27 if (findSubscribe != subscribeCache_.end()) {
28 findSubscribe->subscribeQuery[deviceId].push_back(query);
29 return;
30 }
31 SubscribeEntry entry;
32 entry.dbInfo = dbInfo;
33 entry.subscribeQuery[deviceId].push_back(query);
34 subscribeCache_.push_back(entry);
35 }
36
RemoveAllSubscribe()37 void SubscribeRecorder::RemoveAllSubscribe()
38 {
39 std::lock_guard<std::mutex> autoLock(subscribeMutex_);
40 subscribeCache_.clear();
41 }
42
RemoveRemoteSubscribe(const DistributedDB::DeviceID & deviceId)43 void SubscribeRecorder::RemoveRemoteSubscribe(const DistributedDB::DeviceID &deviceId)
44 {
45 std::lock_guard<std::mutex> autoLock(subscribeMutex_);
46 for (auto &entry: subscribeCache_) {
47 entry.subscribeQuery.erase(deviceId);
48 }
49 }
50
RemoveRemoteSubscribe(const DBInfo & dbInfo)51 void SubscribeRecorder::RemoveRemoteSubscribe(const DBInfo &dbInfo)
52 {
53 const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
54 [&dbInfo](const SubscribeEntry &entry) {
55 return CheckSameDBInfo(dbInfo, entry.dbInfo);
56 });
57 if (findSubscribe == subscribeCache_.end()) {
58 return;
59 }
60 subscribeCache_.erase(findSubscribe);
61 }
62
RemoveRemoteSubscribe(const DistributedDB::DBInfo & dbInfo,const DistributedDB::DeviceID & deviceId)63 void SubscribeRecorder::RemoveRemoteSubscribe(const DistributedDB::DBInfo &dbInfo,
64 const DistributedDB::DeviceID &deviceId)
65 {
66 std::lock_guard<std::mutex> autoLock(subscribeMutex_);
67 RemoveSubscribeQueries(dbInfo, deviceId);
68 }
69
RemoveRemoteSubscribe(const DistributedDB::DBInfo & dbInfo,const DistributedDB::DeviceID & deviceId,const DistributedDB::QuerySyncObject & query)70 void SubscribeRecorder::RemoveRemoteSubscribe(const DistributedDB::DBInfo &dbInfo,
71 const DistributedDB::DeviceID &deviceId, const DistributedDB::QuerySyncObject &query)
72 {
73 std::lock_guard<std::mutex> autoLock(subscribeMutex_);
74 RemoveSubscribeQuery(dbInfo, deviceId, query.GetIdentify());
75 }
76
GetSubscribeQuery(const DistributedDB::DBInfo & dbInfo,std::map<std::string,std::vector<QuerySyncObject>> & subscribeQuery) const77 void SubscribeRecorder::GetSubscribeQuery(const DistributedDB::DBInfo &dbInfo,
78 std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) const
79 {
80 std::lock_guard<std::mutex> autoLock(subscribeMutex_);
81 const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
82 [&dbInfo](const SubscribeEntry &entry) {
83 return entry.dbInfo.userId == dbInfo.userId && entry.dbInfo.appId == dbInfo.appId &&
84 entry.dbInfo.storeId == dbInfo.storeId;
85 });
86 if (findSubscribe != subscribeCache_.end()) {
87 subscribeQuery = findSubscribe->subscribeQuery;
88 }
89 }
90
RemoveSubscribeQueries(const DBInfo & dbInfo,const DeviceID & deviceId)91 void SubscribeRecorder::RemoveSubscribeQueries(const DBInfo &dbInfo, const DeviceID &deviceId)
92 {
93 const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
94 [&dbInfo](const SubscribeEntry &entry) {
95 return CheckSameDBInfo(dbInfo, entry.dbInfo);
96 });
97 if (findSubscribe == subscribeCache_.end()) {
98 return;
99 }
100 findSubscribe->subscribeQuery.erase(deviceId);
101 }
102
RemoveSubscribeQuery(const DBInfo & dbInfo,const DeviceID & deviceId,const std::string & queryId)103 void SubscribeRecorder::RemoveSubscribeQuery(const DBInfo &dbInfo, const DeviceID &deviceId,
104 const std::string &queryId)
105 {
106 const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
107 [&dbInfo](const SubscribeEntry &entry) {
108 return CheckSameDBInfo(dbInfo, entry.dbInfo);
109 });
110 if (findSubscribe == subscribeCache_.end()) {
111 return;
112 }
113 const auto &queryEntry = findSubscribe->subscribeQuery.find(deviceId);
114 if (queryEntry == findSubscribe->subscribeQuery.end()) {
115 return;
116 }
117 auto removeQuery = std::remove_if(
118 queryEntry->second.begin(), queryEntry->second.end(), [&queryId](const QuerySyncObject &checkQuery) {
119 return checkQuery.GetIdentify() == queryId;
120 });
121 queryEntry->second.erase(removeQuery, queryEntry->second.end());
122 }
123
CheckSameDBInfo(const DBInfo & srcDbInfo,const DBInfo & dtsDbInfo)124 bool SubscribeRecorder::CheckSameDBInfo(const DBInfo &srcDbInfo, const DBInfo &dtsDbInfo)
125 {
126 return srcDbInfo.userId == dtsDbInfo.userId && srcDbInfo.appId == dtsDbInfo.appId &&
127 srcDbInfo.storeId == dtsDbInfo.storeId;
128 }
129 }