1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "published_data_subscriber_manager.h"
17 
18 #include <cinttypes>
19 
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23 
24 namespace OHOS {
25 namespace DataShare {
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId,const PublishedDataCallback & callback)26 std::vector<OperationResult> PublishedDataSubscriberManager::AddObservers(void *subscriber,
27     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId,
28     const PublishedDataCallback &callback)
29 {
30     if (proxy == nullptr) {
31         LOG_ERROR("proxy is nullptr");
32         return std::vector<OperationResult>();
33     }
34     std::vector<Key> keys;
35     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
36         keys.emplace_back(uri, subscriberId);
37     });
38     return BaseCallbacks::AddObservers(
39         keys, subscriber, std::make_shared<Observer>(callback),
40         [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
41             Emit(localRegisterKeys, observer);
42         },
43         [&proxy, subscriber, &subscriberId, this](const std::vector<Key> &firstAddKeys,
44             const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
45             std::vector<std::string> firstAddUris;
46             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
47                 firstAddUris.emplace_back(result);
48             });
49             if (firstAddUris.empty()) {
50                 return;
51             }
52 
53             Init();
54             auto subResults = proxy->SubscribePublishedData(firstAddUris, subscriberId, serviceCallback_);
55             std::vector<Key> failedKeys;
56             for (auto &subResult : subResults) {
57                 opResult.emplace_back(subResult);
58                 if (subResult.errCode_ != E_OK) {
59                     failedKeys.emplace_back(subResult.key_, subscriberId);
60                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
61                 }
62             }
63             if (failedKeys.size() > 0) {
64                 BaseCallbacks::DelObservers(failedKeys, subscriber);
65             }
66             Destroy();
67         });
68 }
69 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)70 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
71     std::shared_ptr<DataShareServiceProxy> proxy)
72 {
73     if (proxy == nullptr) {
74         LOG_ERROR("proxy is nullptr");
75         return std::vector<OperationResult>();
76     }
77     return BaseCallbacks::DelObservers(subscriber,
78         [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
79             // delete all obs by subscriber
80             std::map<int64_t, std::vector<std::string>> keysMap;
81             for (auto const &key : lastDelKeys) {
82                 lastChangeNodeMap_.Erase(key);
83                 keysMap[key.subscriberId_].emplace_back(key.uri_);
84             }
85             for (auto const &[subscriberId, uris] : keysMap) {
86                 auto results = proxy->UnSubscribePublishedData(uris, subscriberId);
87                 opResult.insert(opResult.end(), results.begin(), results.end());
88             }
89             Destroy();
90         });
91 }
92 
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)93 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
94     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
95 {
96     if (proxy == nullptr) {
97         LOG_ERROR("proxy is nullptr");
98         return std::vector<OperationResult>();
99     }
100     if (uris.empty()) {
101         return DelObservers(subscriber, proxy);
102     }
103 
104     std::vector<Key> keys;
105     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
106         keys.emplace_back(uri, subscriberId);
107     });
108     return BaseCallbacks::DelObservers(keys, subscriber,
109         [&proxy, &subscriberId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
110             std::vector<std::string> lastDelUris;
111             std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
112                 lastChangeNodeMap_.Erase(result);
113                 lastDelUris.emplace_back(result);
114             });
115             if (lastDelUris.empty()) {
116                 return;
117             }
118             auto unsubResult = proxy->UnSubscribePublishedData(lastDelUris, subscriberId);
119             opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
120             Destroy();
121         });
122 }
123 
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)124 std::vector<OperationResult> PublishedDataSubscriberManager::EnableObservers(void *subscriber,
125     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
126 {
127     if (proxy == nullptr) {
128         LOG_ERROR("proxy is nullptr");
129         return std::vector<OperationResult>();
130     }
131     std::vector<Key> keys;
132     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
133         keys.emplace_back(uri, subscriberId);
134     });
135     return BaseCallbacks::EnableObservers(
136         keys, subscriber, [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
137             EmitOnEnable(obsMap);
138         },
139         [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &firstAddKeys,
140         std::vector<OperationResult> &opResult) {
141             std::vector<std::string> firstAddUris;
142             std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
143                 firstAddUris.emplace_back(result);
144             });
145             if (firstAddUris.empty()) {
146                 return;
147             }
148             auto subResults = proxy->EnableSubscribePublishedData(firstAddUris, subscriberId);
149             std::vector<Key> failedKeys;
150             for (auto &subResult : subResults) {
151                 opResult.emplace_back(subResult);
152                 if (subResult.errCode_ != E_OK) {
153                     failedKeys.emplace_back(subResult.key_, subscriberId);
154                     LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
155                 }
156             }
157             if (failedKeys.size() > 0) {
158                 BaseCallbacks::DisableObservers(failedKeys, subscriber);
159             }
160         });
161 }
162 
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)163 std::vector<OperationResult> PublishedDataSubscriberManager::DisableObservers(void *subscriber,
164     std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
165 {
166     if (proxy == nullptr) {
167         LOG_ERROR("proxy is nullptr");
168         return std::vector<OperationResult>();
169     }
170     std::vector<Key> keys;
171     std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
172         keys.emplace_back(uri, subscriberId);
173     });
174     return BaseCallbacks::DisableObservers(keys, subscriber,
175         [&proxy, &subscriberId, this](const std::vector<Key> &lastDisabledKeys,
176         std::vector<OperationResult> &opResult) {
177             std::vector<std::string> lastDisabledUris;
178             std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
179                 lastDisabledUris.emplace_back(result);
180             });
181             if (lastDisabledUris.empty()) {
182                 return;
183             }
184 
185             auto results = proxy->DisableSubscribePublishedData(lastDisabledUris, subscriberId);
186             opResult.insert(opResult.end(), results.begin(), results.end());
187         });
188 }
189 
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)190 void PublishedDataSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
191 {
192     if (proxy == nullptr) {
193         LOG_ERROR("proxy is nullptr");
194         return;
195     }
196 
197     std::map<int64_t, std::vector<std::string>> keysMap;
198     std::vector<Key> keys = CallbacksManager::GetKeys();
199     for (const auto& key : keys) {
200         keysMap[key.subscriberId_].emplace_back(key.uri_);
201     }
202     for (const auto &[subscriberId, uris] : keysMap) {
203         auto results = proxy->SubscribePublishedData(uris, subscriberId, serviceCallback_);
204         for (const auto& result : results) {
205             if (result.errCode_ != E_OK) {
206                 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
207                          result.key_.c_str(), result.errCode_);
208             }
209         }
210     }
211 }
212 
Emit(PublishedDataChangeNode & changeNode)213 void PublishedDataSubscriberManager::Emit(PublishedDataChangeNode &changeNode)
214 {
215     for (auto &data : changeNode.datas_) {
216         Key key(data.key_, data.subscriberId_);
217         lastChangeNodeMap_.Compute(key, [](const Key &, PublishedDataChangeNode &value) {
218             value.datas_.clear();
219             return true;
220         });
221     }
222     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
223     for (auto &data : changeNode.datas_) {
224         PublishedObserverMapKey key(data.key_, data.subscriberId_);
225         auto callbacks = BaseCallbacks::GetEnabledObservers(key);
226         if (callbacks.empty()) {
227             LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
228             continue;
229         }
230         lastChangeNodeMap_.Compute(key, [&data, &changeNode](const Key &, PublishedDataChangeNode &value) {
231             value.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
232             value.ownerBundleName_ = changeNode.ownerBundleName_;
233             return true;
234         });
235         for (auto const &obs : callbacks) {
236             results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
237         }
238         BaseCallbacks::SetObserversNotifiedOnEnabled(key);
239     }
240     for (auto &[callback, node] : results) {
241         node.ownerBundleName_ = changeNode.ownerBundleName_;
242         callback->OnChange(node);
243     }
244 }
245 
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)246 void PublishedDataSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
247 {
248     PublishedDataChangeNode node;
249     for (auto &key : keys) {
250         lastChangeNodeMap_.ComputeIfPresent(key, [&node](const Key &, PublishedDataChangeNode &value) {
251             for (auto &data : value.datas_) {
252                 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
253             }
254             node.ownerBundleName_ = value.ownerBundleName_;
255             return true;
256         });
257     }
258     observer->OnChange(node);
259 }
260 
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)261 void PublishedDataSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
262 {
263     std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
264     for (auto &[key, obsVector] : obsMap) {
265         uint32_t num = 0;
266         lastChangeNodeMap_.ComputeIfPresent(key, [obsVector = obsVector, &results, &num](const Key &,
267             PublishedDataChangeNode &value) {
268             for (auto &data : value.datas_) {
269                 PublishedObserverMapKey mapKey(data.key_, data.subscriberId_);
270                 for (auto &obs : obsVector) {
271                     if (obs.isNotifyOnEnabled_) {
272                         num++;
273                         results[obs.observer_].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
274                         results[obs.observer_].ownerBundleName_ = value.ownerBundleName_;
275                     }
276                 }
277             }
278             return true;
279         });
280         if (num > 0) {
281             LOG_INFO("%{public}u will refresh, total %{public}zu, uri %{public}s, subscribeId %{public}" PRId64,
282                 num, obsVector.size(), DataShareStringUtils::Anonymous(key.uri_).c_str(), key.subscriberId_);
283         }
284     }
285     for (auto &[callback, node] : results) {
286         callback->OnChange(node);
287     }
288 }
289 
Init()290 bool PublishedDataSubscriberManager::Init()
291 {
292     if (serviceCallback_ == nullptr) {
293         LOG_DEBUG("callback init");
294         serviceCallback_ = new PublishedDataObserverStub([this](PublishedDataChangeNode &changeNode) {
295             Emit(changeNode);
296         });
297     }
298     return true;
299 }
300 
Destroy()301 void PublishedDataSubscriberManager::Destroy()
302 {
303     if (BaseCallbacks::GetAllSubscriberSize() == 0) {
304         if (serviceCallback_ != nullptr) {
305             serviceCallback_->ClearCallback();
306         }
307         LOG_INFO("no valid subscriber, delete callback");
308         serviceCallback_ = nullptr;
309     }
310 }
311 
GetInstance()312 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
313 {
314     static PublishedDataSubscriberManager manager;
315     return manager;
316 }
317 
PublishedDataSubscriberManager()318 PublishedDataSubscriberManager::PublishedDataSubscriberManager()
319 {
320     serviceCallback_ = nullptr;
321 }
322 
PublishedDataObserver(const PublishedDataCallback & callback)323 PublishedDataObserver::PublishedDataObserver(const PublishedDataCallback &callback) : callback_(callback) {}
324 
OnChange(PublishedDataChangeNode & changeNode)325 void PublishedDataObserver::OnChange(PublishedDataChangeNode &changeNode)
326 {
327     callback_(changeNode);
328 }
329 
operator ==(const PublishedDataObserver & rhs) const330 bool PublishedDataObserver::operator==(const PublishedDataObserver &rhs) const
331 {
332     return false;
333 }
334 
operator !=(const PublishedDataObserver & rhs) const335 bool PublishedDataObserver::operator!=(const PublishedDataObserver &rhs) const
336 {
337     return !(rhs == *this);
338 }
339 } // namespace DataShare
340 } // namespace OHOS
341