1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "published_data_subscriber_manager.h"
17
18 #include <cinttypes>
19
20 #include "data_proxy_observer_stub.h"
21 #include "datashare_log.h"
22 #include "datashare_string_utils.h"
23
24 namespace OHOS {
25 namespace DataShare {
AddObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId,const PublishedDataCallback & callback)26 std::vector<OperationResult> PublishedDataSubscriberManager::AddObservers(void *subscriber,
27 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId,
28 const PublishedDataCallback &callback)
29 {
30 if (proxy == nullptr) {
31 LOG_ERROR("proxy is nullptr");
32 return std::vector<OperationResult>();
33 }
34 std::vector<Key> keys;
35 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
36 keys.emplace_back(uri, subscriberId);
37 });
38 return BaseCallbacks::AddObservers(
39 keys, subscriber, std::make_shared<Observer>(callback),
40 [this](const std::vector<Key> &localRegisterKeys, const std::shared_ptr<Observer> observer) {
41 Emit(localRegisterKeys, observer);
42 },
43 [&proxy, subscriber, &subscriberId, this](const std::vector<Key> &firstAddKeys,
44 const std::shared_ptr<Observer> observer, std::vector<OperationResult> &opResult) {
45 std::vector<std::string> firstAddUris;
46 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
47 firstAddUris.emplace_back(result);
48 });
49 if (firstAddUris.empty()) {
50 return;
51 }
52
53 Init();
54 auto subResults = proxy->SubscribePublishedData(firstAddUris, subscriberId, serviceCallback_);
55 std::vector<Key> failedKeys;
56 for (auto &subResult : subResults) {
57 opResult.emplace_back(subResult);
58 if (subResult.errCode_ != E_OK) {
59 failedKeys.emplace_back(subResult.key_, subscriberId);
60 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
61 }
62 }
63 if (failedKeys.size() > 0) {
64 BaseCallbacks::DelObservers(failedKeys, subscriber);
65 }
66 Destroy();
67 });
68 }
69
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy)70 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
71 std::shared_ptr<DataShareServiceProxy> proxy)
72 {
73 if (proxy == nullptr) {
74 LOG_ERROR("proxy is nullptr");
75 return std::vector<OperationResult>();
76 }
77 return BaseCallbacks::DelObservers(subscriber,
78 [&proxy, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
79 // delete all obs by subscriber
80 std::map<int64_t, std::vector<std::string>> keysMap;
81 for (auto const &key : lastDelKeys) {
82 lastChangeNodeMap_.Erase(key);
83 keysMap[key.subscriberId_].emplace_back(key.uri_);
84 }
85 for (auto const &[subscriberId, uris] : keysMap) {
86 auto results = proxy->UnSubscribePublishedData(uris, subscriberId);
87 opResult.insert(opResult.end(), results.begin(), results.end());
88 }
89 Destroy();
90 });
91 }
92
DelObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)93 std::vector<OperationResult> PublishedDataSubscriberManager::DelObservers(void *subscriber,
94 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
95 {
96 if (proxy == nullptr) {
97 LOG_ERROR("proxy is nullptr");
98 return std::vector<OperationResult>();
99 }
100 if (uris.empty()) {
101 return DelObservers(subscriber, proxy);
102 }
103
104 std::vector<Key> keys;
105 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
106 keys.emplace_back(uri, subscriberId);
107 });
108 return BaseCallbacks::DelObservers(keys, subscriber,
109 [&proxy, &subscriberId, this](const std::vector<Key> &lastDelKeys, std::vector<OperationResult> &opResult) {
110 std::vector<std::string> lastDelUris;
111 std::for_each(lastDelKeys.begin(), lastDelKeys.end(), [&lastDelUris, this](auto &result) {
112 lastChangeNodeMap_.Erase(result);
113 lastDelUris.emplace_back(result);
114 });
115 if (lastDelUris.empty()) {
116 return;
117 }
118 auto unsubResult = proxy->UnSubscribePublishedData(lastDelUris, subscriberId);
119 opResult.insert(opResult.end(), unsubResult.begin(), unsubResult.end());
120 Destroy();
121 });
122 }
123
EnableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)124 std::vector<OperationResult> PublishedDataSubscriberManager::EnableObservers(void *subscriber,
125 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
126 {
127 if (proxy == nullptr) {
128 LOG_ERROR("proxy is nullptr");
129 return std::vector<OperationResult>();
130 }
131 std::vector<Key> keys;
132 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
133 keys.emplace_back(uri, subscriberId);
134 });
135 return BaseCallbacks::EnableObservers(
136 keys, subscriber, [this](std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap) {
137 EmitOnEnable(obsMap);
138 },
139 [&proxy, &subscriberId, subscriber, this](const std::vector<Key> &firstAddKeys,
140 std::vector<OperationResult> &opResult) {
141 std::vector<std::string> firstAddUris;
142 std::for_each(firstAddKeys.begin(), firstAddKeys.end(), [&firstAddUris](auto &result) {
143 firstAddUris.emplace_back(result);
144 });
145 if (firstAddUris.empty()) {
146 return;
147 }
148 auto subResults = proxy->EnableSubscribePublishedData(firstAddUris, subscriberId);
149 std::vector<Key> failedKeys;
150 for (auto &subResult : subResults) {
151 opResult.emplace_back(subResult);
152 if (subResult.errCode_ != E_OK) {
153 failedKeys.emplace_back(subResult.key_, subscriberId);
154 LOG_WARN("registered failed, uri is %{public}s", subResult.key_.c_str());
155 }
156 }
157 if (failedKeys.size() > 0) {
158 BaseCallbacks::DisableObservers(failedKeys, subscriber);
159 }
160 });
161 }
162
DisableObservers(void * subscriber,std::shared_ptr<DataShareServiceProxy> proxy,const std::vector<std::string> & uris,int64_t subscriberId)163 std::vector<OperationResult> PublishedDataSubscriberManager::DisableObservers(void *subscriber,
164 std::shared_ptr<DataShareServiceProxy> proxy, const std::vector<std::string> &uris, int64_t subscriberId)
165 {
166 if (proxy == nullptr) {
167 LOG_ERROR("proxy is nullptr");
168 return std::vector<OperationResult>();
169 }
170 std::vector<Key> keys;
171 std::for_each(uris.begin(), uris.end(), [&keys, &subscriberId](auto &uri) {
172 keys.emplace_back(uri, subscriberId);
173 });
174 return BaseCallbacks::DisableObservers(keys, subscriber,
175 [&proxy, &subscriberId, this](const std::vector<Key> &lastDisabledKeys,
176 std::vector<OperationResult> &opResult) {
177 std::vector<std::string> lastDisabledUris;
178 std::for_each(lastDisabledKeys.begin(), lastDisabledKeys.end(), [&lastDisabledUris](auto &result) {
179 lastDisabledUris.emplace_back(result);
180 });
181 if (lastDisabledUris.empty()) {
182 return;
183 }
184
185 auto results = proxy->DisableSubscribePublishedData(lastDisabledUris, subscriberId);
186 opResult.insert(opResult.end(), results.begin(), results.end());
187 });
188 }
189
RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)190 void PublishedDataSubscriberManager::RecoverObservers(std::shared_ptr<DataShareServiceProxy> proxy)
191 {
192 if (proxy == nullptr) {
193 LOG_ERROR("proxy is nullptr");
194 return;
195 }
196
197 std::map<int64_t, std::vector<std::string>> keysMap;
198 std::vector<Key> keys = CallbacksManager::GetKeys();
199 for (const auto& key : keys) {
200 keysMap[key.subscriberId_].emplace_back(key.uri_);
201 }
202 for (const auto &[subscriberId, uris] : keysMap) {
203 auto results = proxy->SubscribePublishedData(uris, subscriberId, serviceCallback_);
204 for (const auto& result : results) {
205 if (result.errCode_ != E_OK) {
206 LOG_WARN("RecoverObservers failed, uri is %{public}s, errCode is %{public}d",
207 result.key_.c_str(), result.errCode_);
208 }
209 }
210 }
211 }
212
Emit(PublishedDataChangeNode & changeNode)213 void PublishedDataSubscriberManager::Emit(PublishedDataChangeNode &changeNode)
214 {
215 for (auto &data : changeNode.datas_) {
216 Key key(data.key_, data.subscriberId_);
217 lastChangeNodeMap_.Compute(key, [](const Key &, PublishedDataChangeNode &value) {
218 value.datas_.clear();
219 return true;
220 });
221 }
222 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
223 for (auto &data : changeNode.datas_) {
224 PublishedObserverMapKey key(data.key_, data.subscriberId_);
225 auto callbacks = BaseCallbacks::GetEnabledObservers(key);
226 if (callbacks.empty()) {
227 LOG_WARN("%{private}s nobody subscribe, but still notify", data.key_.c_str());
228 continue;
229 }
230 lastChangeNodeMap_.Compute(key, [&data, &changeNode](const Key &, PublishedDataChangeNode &value) {
231 value.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
232 value.ownerBundleName_ = changeNode.ownerBundleName_;
233 return true;
234 });
235 for (auto const &obs : callbacks) {
236 results[obs].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
237 }
238 BaseCallbacks::SetObserversNotifiedOnEnabled(key);
239 }
240 for (auto &[callback, node] : results) {
241 node.ownerBundleName_ = changeNode.ownerBundleName_;
242 callback->OnChange(node);
243 }
244 }
245
Emit(const std::vector<Key> & keys,const std::shared_ptr<Observer> & observer)246 void PublishedDataSubscriberManager::Emit(const std::vector<Key> &keys, const std::shared_ptr<Observer> &observer)
247 {
248 PublishedDataChangeNode node;
249 for (auto &key : keys) {
250 lastChangeNodeMap_.ComputeIfPresent(key, [&node](const Key &, PublishedDataChangeNode &value) {
251 for (auto &data : value.datas_) {
252 node.datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
253 }
254 node.ownerBundleName_ = value.ownerBundleName_;
255 return true;
256 });
257 }
258 observer->OnChange(node);
259 }
260
EmitOnEnable(std::map<Key,std::vector<ObserverNodeOnEnabled>> & obsMap)261 void PublishedDataSubscriberManager::EmitOnEnable(std::map<Key, std::vector<ObserverNodeOnEnabled>> &obsMap)
262 {
263 std::map<std::shared_ptr<Observer>, PublishedDataChangeNode> results;
264 for (auto &[key, obsVector] : obsMap) {
265 uint32_t num = 0;
266 lastChangeNodeMap_.ComputeIfPresent(key, [obsVector = obsVector, &results, &num](const Key &,
267 PublishedDataChangeNode &value) {
268 for (auto &data : value.datas_) {
269 PublishedObserverMapKey mapKey(data.key_, data.subscriberId_);
270 for (auto &obs : obsVector) {
271 if (obs.isNotifyOnEnabled_) {
272 num++;
273 results[obs.observer_].datas_.emplace_back(data.key_, data.subscriberId_, data.GetData());
274 results[obs.observer_].ownerBundleName_ = value.ownerBundleName_;
275 }
276 }
277 }
278 return true;
279 });
280 if (num > 0) {
281 LOG_INFO("%{public}u will refresh, total %{public}zu, uri %{public}s, subscribeId %{public}" PRId64,
282 num, obsVector.size(), DataShareStringUtils::Anonymous(key.uri_).c_str(), key.subscriberId_);
283 }
284 }
285 for (auto &[callback, node] : results) {
286 callback->OnChange(node);
287 }
288 }
289
Init()290 bool PublishedDataSubscriberManager::Init()
291 {
292 if (serviceCallback_ == nullptr) {
293 LOG_DEBUG("callback init");
294 serviceCallback_ = new PublishedDataObserverStub([this](PublishedDataChangeNode &changeNode) {
295 Emit(changeNode);
296 });
297 }
298 return true;
299 }
300
Destroy()301 void PublishedDataSubscriberManager::Destroy()
302 {
303 if (BaseCallbacks::GetAllSubscriberSize() == 0) {
304 if (serviceCallback_ != nullptr) {
305 serviceCallback_->ClearCallback();
306 }
307 LOG_INFO("no valid subscriber, delete callback");
308 serviceCallback_ = nullptr;
309 }
310 }
311
GetInstance()312 PublishedDataSubscriberManager &PublishedDataSubscriberManager::GetInstance()
313 {
314 static PublishedDataSubscriberManager manager;
315 return manager;
316 }
317
PublishedDataSubscriberManager()318 PublishedDataSubscriberManager::PublishedDataSubscriberManager()
319 {
320 serviceCallback_ = nullptr;
321 }
322
PublishedDataObserver(const PublishedDataCallback & callback)323 PublishedDataObserver::PublishedDataObserver(const PublishedDataCallback &callback) : callback_(callback) {}
324
OnChange(PublishedDataChangeNode & changeNode)325 void PublishedDataObserver::OnChange(PublishedDataChangeNode &changeNode)
326 {
327 callback_(changeNode);
328 }
329
operator ==(const PublishedDataObserver & rhs) const330 bool PublishedDataObserver::operator==(const PublishedDataObserver &rhs) const
331 {
332 return false;
333 }
334
operator !=(const PublishedDataObserver & rhs) const335 bool PublishedDataObserver::operator!=(const PublishedDataObserver &rhs) const
336 {
337 return !(rhs == *this);
338 }
339 } // namespace DataShare
340 } // namespace OHOS
341