1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "subscribe_recorder.h"
16 
17 namespace DistributedDB {
RecordSubscribe(const DistributedDB::DBInfo & dbInfo,const DistributedDB::DeviceID & deviceId,const DistributedDB::QuerySyncObject & query)18 void SubscribeRecorder::RecordSubscribe(const DistributedDB::DBInfo &dbInfo, const DistributedDB::DeviceID &deviceId,
19     const DistributedDB::QuerySyncObject &query)
20 {
21     std::lock_guard<std::mutex> autoLock(subscribeMutex_);
22     const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
23     [&dbInfo](const SubscribeEntry &entry) {
24         return entry.dbInfo.userId == dbInfo.userId && entry.dbInfo.appId == dbInfo.appId &&
25             entry.dbInfo.storeId == dbInfo.storeId;
26     });
27     if (findSubscribe != subscribeCache_.end()) {
28         findSubscribe->subscribeQuery[deviceId].push_back(query);
29         return;
30     }
31     SubscribeEntry entry;
32     entry.dbInfo = dbInfo;
33     entry.subscribeQuery[deviceId].push_back(query);
34     subscribeCache_.push_back(entry);
35 }
36 
RemoveAllSubscribe()37 void SubscribeRecorder::RemoveAllSubscribe()
38 {
39     std::lock_guard<std::mutex> autoLock(subscribeMutex_);
40     subscribeCache_.clear();
41 }
42 
RemoveRemoteSubscribe(const DistributedDB::DeviceID & deviceId)43 void SubscribeRecorder::RemoveRemoteSubscribe(const DistributedDB::DeviceID &deviceId)
44 {
45     std::lock_guard<std::mutex> autoLock(subscribeMutex_);
46     for (auto &entry: subscribeCache_) {
47         entry.subscribeQuery.erase(deviceId);
48     }
49 }
50 
RemoveRemoteSubscribe(const DBInfo & dbInfo)51 void SubscribeRecorder::RemoveRemoteSubscribe(const DBInfo &dbInfo)
52 {
53     const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
54         [&dbInfo](const SubscribeEntry &entry) {
55             return CheckSameDBInfo(dbInfo, entry.dbInfo);
56         });
57     if (findSubscribe == subscribeCache_.end()) {
58         return;
59     }
60     subscribeCache_.erase(findSubscribe);
61 }
62 
RemoveRemoteSubscribe(const DistributedDB::DBInfo & dbInfo,const DistributedDB::DeviceID & deviceId)63 void SubscribeRecorder::RemoveRemoteSubscribe(const DistributedDB::DBInfo &dbInfo,
64     const DistributedDB::DeviceID &deviceId)
65 {
66     std::lock_guard<std::mutex> autoLock(subscribeMutex_);
67     RemoveSubscribeQueries(dbInfo, deviceId);
68 }
69 
RemoveRemoteSubscribe(const DistributedDB::DBInfo & dbInfo,const DistributedDB::DeviceID & deviceId,const DistributedDB::QuerySyncObject & query)70 void SubscribeRecorder::RemoveRemoteSubscribe(const DistributedDB::DBInfo &dbInfo,
71     const DistributedDB::DeviceID &deviceId, const DistributedDB::QuerySyncObject &query)
72 {
73     std::lock_guard<std::mutex> autoLock(subscribeMutex_);
74     RemoveSubscribeQuery(dbInfo, deviceId, query.GetIdentify());
75 }
76 
GetSubscribeQuery(const DistributedDB::DBInfo & dbInfo,std::map<std::string,std::vector<QuerySyncObject>> & subscribeQuery) const77 void SubscribeRecorder::GetSubscribeQuery(const DistributedDB::DBInfo &dbInfo,
78     std::map<std::string, std::vector<QuerySyncObject>> &subscribeQuery) const
79 {
80     std::lock_guard<std::mutex> autoLock(subscribeMutex_);
81     const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
82     [&dbInfo](const SubscribeEntry &entry) {
83         return entry.dbInfo.userId == dbInfo.userId && entry.dbInfo.appId == dbInfo.appId &&
84                entry.dbInfo.storeId == dbInfo.storeId;
85     });
86     if (findSubscribe != subscribeCache_.end()) {
87         subscribeQuery = findSubscribe->subscribeQuery;
88     }
89 }
90 
RemoveSubscribeQueries(const DBInfo & dbInfo,const DeviceID & deviceId)91 void SubscribeRecorder::RemoveSubscribeQueries(const DBInfo &dbInfo, const DeviceID &deviceId)
92 {
93     const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
94         [&dbInfo](const SubscribeEntry &entry) {
95             return CheckSameDBInfo(dbInfo, entry.dbInfo);
96         });
97     if (findSubscribe == subscribeCache_.end()) {
98         return;
99     }
100     findSubscribe->subscribeQuery.erase(deviceId);
101 }
102 
RemoveSubscribeQuery(const DBInfo & dbInfo,const DeviceID & deviceId,const std::string & queryId)103 void SubscribeRecorder::RemoveSubscribeQuery(const DBInfo &dbInfo, const DeviceID &deviceId,
104     const std::string &queryId)
105 {
106     const auto &findSubscribe = std::find_if(subscribeCache_.begin(), subscribeCache_.end(),
107         [&dbInfo](const SubscribeEntry &entry) {
108             return CheckSameDBInfo(dbInfo, entry.dbInfo);
109         });
110     if (findSubscribe == subscribeCache_.end()) {
111         return;
112     }
113     const auto &queryEntry = findSubscribe->subscribeQuery.find(deviceId);
114     if (queryEntry == findSubscribe->subscribeQuery.end()) {
115         return;
116     }
117     auto removeQuery = std::remove_if(
118         queryEntry->second.begin(), queryEntry->second.end(), [&queryId](const QuerySyncObject &checkQuery) {
119         return checkQuery.GetIdentify() == queryId;
120     });
121     queryEntry->second.erase(removeQuery, queryEntry->second.end());
122 }
123 
CheckSameDBInfo(const DBInfo & srcDbInfo,const DBInfo & dtsDbInfo)124 bool SubscribeRecorder::CheckSameDBInfo(const DBInfo &srcDbInfo, const DBInfo &dtsDbInfo)
125 {
126     return srcDbInfo.userId == dtsDbInfo.userId && srcDbInfo.appId == dtsDbInfo.appId &&
127         srcDbInfo.storeId == dtsDbInfo.storeId;
128 }
129 }