1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "network_adapter.h"
17 #include "db_constant.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "runtime_context.h"
22
23 namespace DistributedDB {
24 namespace {
25 const std::string DEFAULT_PROCESS_LABEL = "Distributeddb_Anonymous_Process";
26 const std::string SCHEDULE_QUEUE_TAG = "NetworkAdapter";
27 }
28
NetworkAdapter()29 NetworkAdapter::NetworkAdapter()
30 : processLabel_(DEFAULT_PROCESS_LABEL), processCommunicator_(nullptr)
31 {
32 }
33
NetworkAdapter(const std::string & inProcessLabel)34 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel)
35 : processLabel_(inProcessLabel), processCommunicator_(nullptr)
36 {
37 }
38
NetworkAdapter(const std::string & inProcessLabel,const std::shared_ptr<IProcessCommunicator> & inCommunicator)39 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel,
40 const std::shared_ptr<IProcessCommunicator> &inCommunicator)
41 : processLabel_(inProcessLabel), processCommunicator_(inCommunicator)
42 {
43 }
44
~NetworkAdapter()45 NetworkAdapter::~NetworkAdapter()
46 {
47 }
48
StartAdapter()49 int NetworkAdapter::StartAdapter()
50 {
51 LOGI("[NAdapt][Start] Enter, ProcessLabel=%s.", STR_MASK(DBCommon::TransferStringToHex(processLabel_)));
52 if (processLabel_.empty()) {
53 return -E_INVALID_ARGS;
54 }
55 if (!processCommunicator_) {
56 LOGE("[NAdapt][Start] ProcessCommunicator not be designated yet.");
57 return -E_INVALID_ARGS;
58 }
59 DBStatus errCode = processCommunicator_->Start(processLabel_);
60 if (errCode != DBStatus::OK) {
61 LOGE("[NAdapt][Start] Start Fail, errCode=%d.", static_cast<int>(errCode));
62 return -E_PERIPHERAL_INTERFACE_FAIL;
63 }
64 errCode = processCommunicator_->RegOnDataReceive(
65 [this](const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length) {
66 OnDataReceiveHandler(srcDevInfo, data, length);
67 });
68 if (errCode != DBStatus::OK) {
69 LOGE("[NAdapt][Start] RegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
70 // DO ROLLBACK
71 errCode = processCommunicator_->Stop();
72 LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
73 return -E_PERIPHERAL_INTERFACE_FAIL;
74 }
75 errCode = processCommunicator_->RegOnDeviceChange([this](const DeviceInfos &devInfo, bool isOnline) {
76 OnDeviceChangeHandler(devInfo, isOnline);
77 });
78 if (errCode != DBStatus::OK) {
79 LOGE("[NAdapt][Start] RegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
80 // DO ROLLBACK
81 errCode = processCommunicator_->RegOnDataReceive(nullptr);
82 LOGI("[NAdapt][Start] ROLLBACK: UnRegOnDataReceive errCode=%d.", static_cast<int>(errCode));
83 errCode = processCommunicator_->Stop();
84 LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
85 return -E_PERIPHERAL_INTERFACE_FAIL;
86 }
87 processCommunicator_->RegOnSendAble([this](const DeviceInfos &devInfo, int softBusErrCode) {
88 if (onSendableHandle_ != nullptr) {
89 onSendableHandle_(devInfo.identifier, softBusErrCode);
90 }
91 });
92 // These code is compensation for the probable defect of IProcessCommunicator implementation.
93 // As described in the agreement, for the missed online situation, we search for the online devices at beginning.
94 // OnDeviceChangeHandler is reused to check the existence of peer process.
95 // Since at this point, the CommunicatorAggregator had not been fully initialized,
96 // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
97 SearchOnlineRemoteDeviceAtStartup();
98 LOGI("[NAdapt][Start] Exit.");
99 return E_OK;
100 }
101
102 // StartAdapter and StopAdapter are all innerly called by ICommunicatorAggregator
103 // If StopAdapter is called, the StartAdapter must have been called successfully before,
104 // so processCommunicator_ won't be null
StopAdapter()105 void NetworkAdapter::StopAdapter()
106 {
107 LOGI("[NAdapt][Stop] Enter, ProcessLabel=%s.", processLabel_.c_str());
108 processCommunicator_->RegOnSendAble(nullptr);
109 DBStatus errCode = processCommunicator_->RegOnDeviceChange(nullptr);
110 if (errCode != DBStatus::OK) {
111 LOGE("[NAdapt][Stop] UnRegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
112 }
113 errCode = processCommunicator_->RegOnDataReceive(nullptr);
114 if (errCode != DBStatus::OK) {
115 LOGE("[NAdapt][Stop] UnRegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
116 }
117 errCode = processCommunicator_->Stop();
118 if (errCode != DBStatus::OK) {
119 LOGE("[NAdapt][Stop] Stop Fail, errCode=%d.", static_cast<int>(errCode));
120 }
121 // We don't reset the shared_ptr of commProvider here, the release of commProvider is done by deconstruct of adapter
122 // In this way, the adapter can be start again after stop it, since it still hold the an valid commProvider
123 // The async task is dependent on this Object. we have to wait until all async task finished.
124 LOGI("[NAdapt][Stop] Wait all async task done.");
125 std::unique_lock<std::mutex> asyncTaskDoneLock(asyncTaskDoneMutex_);
126 asyncTaskDoneCv_.wait(asyncTaskDoneLock, [this] { return pendingAsyncTaskCount_ <= 0; });
127 LOGI("[NAdapt][Stop] Exit.");
128 }
129
130 namespace {
CheckAndAdjustMtuSize(uint32_t inMtuSize)131 uint32_t CheckAndAdjustMtuSize(uint32_t inMtuSize)
132 {
133 if (inMtuSize < DBConstant::MIN_MTU_SIZE) {
134 return DBConstant::MIN_MTU_SIZE;
135 } else if (inMtuSize > DBConstant::MAX_MTU_SIZE) {
136 return DBConstant::MAX_MTU_SIZE;
137 } else {
138 return (inMtuSize - (inMtuSize % sizeof(uint64_t))); // Octet alignment
139 }
140 }
141
CheckAndAdjustTimeout(uint32_t inTimeout)142 uint32_t CheckAndAdjustTimeout(uint32_t inTimeout)
143 {
144 if (inTimeout < DBConstant::MIN_TIMEOUT) {
145 return DBConstant::MIN_TIMEOUT;
146 } else if (inTimeout > DBConstant::MAX_TIMEOUT) {
147 return DBConstant::MAX_TIMEOUT;
148 } else {
149 return inTimeout;
150 }
151 }
152 }
153
GetMtuSize()154 uint32_t NetworkAdapter::GetMtuSize()
155 {
156 std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
157 if (!isMtuSizeValid_) {
158 mtuSize_ = processCommunicator_->GetMtuSize();
159 LOGD("[NAdapt][GetMtu] mtuSize=%" PRIu32 ".", mtuSize_);
160 mtuSize_ = CheckAndAdjustMtuSize(mtuSize_);
161 isMtuSizeValid_ = true;
162 }
163 return mtuSize_;
164 }
165
GetMtuSize(const std::string & target)166 uint32_t NetworkAdapter::GetMtuSize(const std::string &target)
167 {
168 #ifndef OMIT_MTU_CACHE
169 DeviceInfos devInfo;
170 devInfo.identifier = target;
171 uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
172 return CheckAndAdjustMtuSize(oriMtuSize);
173 #else
174 std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
175 if (devMapMtuSize_.count(target) == 0) {
176 DeviceInfos devInfo;
177 devInfo.identifier = target;
178 uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
179 LOGD("[NAdapt][GetMtu] mtuSize=%" PRIu32 " of target=%s{private}.", oriMtuSize, target.c_str());
180 devMapMtuSize_[target] = CheckAndAdjustMtuSize(oriMtuSize);
181 }
182 return devMapMtuSize_[target];
183 #endif
184 }
185
GetTimeout()186 uint32_t NetworkAdapter::GetTimeout()
187 {
188 uint32_t timeout = processCommunicator_->GetTimeout();
189 LOGD("[NAdapt][GetTimeout] timeout_=%" PRIu32 " ms.", timeout);
190 return CheckAndAdjustTimeout(timeout);
191 }
192
GetTimeout(const std::string & target)193 uint32_t NetworkAdapter::GetTimeout(const std::string &target)
194 {
195 DeviceInfos devInfos;
196 devInfos.identifier = target;
197 uint32_t timeout = processCommunicator_->GetTimeout(devInfos);
198 LOGD("[NAdapt][GetTimeout] timeout=%" PRIu32 " ms of target=%s{private}.", timeout, target.c_str());
199 return CheckAndAdjustTimeout(timeout);
200 }
201
GetLocalIdentity(std::string & outTarget)202 int NetworkAdapter::GetLocalIdentity(std::string &outTarget)
203 {
204 std::lock_guard<std::mutex> identityLockGuard(identityMutex_);
205 DeviceInfos devInfo = processCommunicator_->GetLocalDeviceInfos();
206 if (devInfo.identifier.empty()) {
207 LOGE("[NetworkAdapter] Get empty local id");
208 return -E_PERIPHERAL_INTERFACE_FAIL;
209 }
210 if (devInfo.identifier != localIdentity_) {
211 LOGI("[NAdapt][GetLocal] localIdentity=%s{private}.", devInfo.identifier.c_str());
212 }
213 localIdentity_ = devInfo.identifier;
214 outTarget = localIdentity_;
215 return E_OK;
216 }
217
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length,uint32_t totalLength)218 int NetworkAdapter::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length, uint32_t totalLength)
219 {
220 if (bytes == nullptr || length == 0) {
221 return -E_INVALID_ARGS;
222 }
223 LOGI("[NAdapt][SendBytes] Enter, to=%s{private}, length=%u, totalLength=%u", dstTarget.c_str(), length,
224 totalLength);
225 DeviceInfos dstDevInfo;
226 dstDevInfo.identifier = dstTarget;
227 DBStatus errCode = processCommunicator_->SendData(dstDevInfo, bytes, length,
228 totalLength > length ? totalLength : length);
229 if (errCode == DBStatus::RATE_LIMIT) {
230 LOGD("[NAdapt][SendBytes] rate limit!");
231 return -E_WAIT_RETRY;
232 }
233 if (errCode != DBStatus::OK) {
234 LOGE("[NAdapt][SendBytes] SendData Fail, errCode=%d.", static_cast<int>(errCode));
235 // These code is compensation for the probable defect of IProcessCommunicator implementation.
236 // As described in the agreement, for the missed offline situation, we check if still online at send fail.
237 // OnDeviceChangeHandler is reused but check the existence of peer process is done outerly.
238 // Since this thread is the sending_thread of the CommunicatorAggregator,
239 // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
240 CheckDeviceOfflineAfterSendFail(dstDevInfo);
241 return static_cast<int>(errCode);
242 }
243 return E_OK;
244 }
245
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)246 int NetworkAdapter::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
247 {
248 std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
249 return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
250 }
251
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)252 int NetworkAdapter::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
253 {
254 std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
255 return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
256 }
257
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)258 int NetworkAdapter::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
259 {
260 std::lock_guard<std::mutex> onSendableLockGard(onSendableMutex_);
261 return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
262 }
263
OnDataReceiveHandler(const DeviceInfos & srcDevInfo,const uint8_t * data,uint32_t length)264 void NetworkAdapter::OnDataReceiveHandler(const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length)
265 {
266 if (data == nullptr || length == 0) {
267 LOGE("[NAdapt][OnDataRecv] data nullptr or length = %u.", length);
268 return;
269 }
270 uint32_t headLength = 0;
271 std::vector<std::string> userId;
272 DBStatus errCode = processCommunicator_->CheckAndGetDataHeadInfo(data, length, headLength, userId);
273 LOGI("[NAdapt][OnDataRecv] Enter, from=%s{private}, extendHeadLength=%u, totalLength=%u",
274 srcDevInfo.identifier.c_str(), headLength, length);
275 if (errCode == NO_PERMISSION) {
276 LOGI("[NAdapt][OnDataRecv] userId dismatched, drop packet");
277 return;
278 }
279 if (headLength >= length) {
280 LOGW("[NAdapt][OnDataRecv] head len is too big, drop packet");
281 return;
282 }
283 {
284 std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
285 if (!onReceiveHandle_) {
286 LOGE("[NAdapt][OnDataRecv] onReceiveHandle invalid.");
287 return;
288 }
289 std::string currentUserId;
290 if (userId.size() >= 1) {
291 currentUserId = userId[0];
292 }
293 onReceiveHandle_(srcDevInfo.identifier, data + headLength, length - headLength, currentUserId);
294 }
295 // These code is compensation for the probable defect of IProcessCommunicator implementation.
296 // As described in the agreement, for the missed online situation, we check the source dev when received.
297 // OnDeviceChangeHandler is reused to check the existence of peer process.
298 // Since this thread is the callback_thread of IProcessCommunicator, we do this check task directly in this thread.
299 CheckDeviceOnlineAfterReception(srcDevInfo);
300 }
301
OnDeviceChangeHandler(const DeviceInfos & devInfo,bool isOnline)302 void NetworkAdapter::OnDeviceChangeHandler(const DeviceInfos &devInfo, bool isOnline)
303 {
304 LOGI("[NAdapt][OnDeviceChange] Enter, dev=%s{private}, isOnline=%d", devInfo.identifier.c_str(), isOnline);
305 // These code is compensation for the probable defect of IProcessCommunicator implementation.
306 // As described in the agreement, for the mistake online situation, we check the existence of peer process.
307 // The IProcessCommunicator implementation guarantee that no mistake offline will happen.
308 if (isOnline) {
309 if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
310 LOGI("[NAdapt][OnDeviceChange] Detect Not Really Online.");
311 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
312 onlineRemoteDev_.erase(devInfo.identifier);
313 return;
314 }
315 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
316 onlineRemoteDev_.insert(devInfo.identifier);
317 } else {
318 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
319 onlineRemoteDev_.erase(devInfo.identifier);
320 }
321 // End compensation, do callback.
322 std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
323 if (!onChangeHandle_) {
324 LOGE("[NAdapt][OnDeviceChange] onChangeHandle_ invalid.");
325 return;
326 }
327 onChangeHandle_(devInfo.identifier, isOnline);
328 }
329
SearchOnlineRemoteDeviceAtStartup()330 void NetworkAdapter::SearchOnlineRemoteDeviceAtStartup()
331 {
332 std::vector<DeviceInfos> onlineDev = processCommunicator_->GetRemoteOnlineDeviceInfosList();
333 LOGI("[NAdapt][SearchOnline] onlineDev count = %zu.", onlineDev.size());
334 if (onlineDev.empty()) {
335 return;
336 }
337 pendingAsyncTaskCount_.fetch_add(1);
338 // Note: onlineDev should be captured by value (must not by reference)
339 TaskAction callbackTask = [onlineDev, this]() {
340 LOGI("[NAdapt][SearchOnline] Begin Callback In Async Task.");
341 std::string localIdentity;
342 GetLocalIdentity(localIdentity); // It doesn't matter if getlocal fail and localIdentity be an empty string
343 for (auto &entry : onlineDev) {
344 if (entry.identifier == localIdentity) {
345 LOGW("[NAdapt][SearchOnline] Detect Local Device in Remote Device List.");
346 continue;
347 }
348 OnDeviceChangeHandler(entry, true);
349 }
350 pendingAsyncTaskCount_.fetch_sub(1);
351 asyncTaskDoneCv_.notify_all();
352 LOGI("[NAdapt][SearchOnline] End Callback In Async Task.");
353 };
354 // Use ScheduleQueuedTask to keep order
355 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
356 if (errCode != E_OK) {
357 LOGE("[NAdapt][SearchOnline] ScheduleQueuedTask failed, errCode = %d.", errCode);
358 pendingAsyncTaskCount_.fetch_sub(1);
359 asyncTaskDoneCv_.notify_all();
360 }
361 }
362
CheckDeviceOnlineAfterReception(const DeviceInfos & devInfo)363 void NetworkAdapter::CheckDeviceOnlineAfterReception(const DeviceInfos &devInfo)
364 {
365 bool isAlreadyOnline = true;
366 {
367 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
368 if (onlineRemoteDev_.count(devInfo.identifier) == 0) {
369 isAlreadyOnline = false;
370 }
371 }
372
373 // Seem offline but receive data from it, let OnDeviceChangeHandler check whether it is really online
374 if (!isAlreadyOnline) {
375 OnDeviceChangeHandler(devInfo, true);
376 }
377 }
378
CheckDeviceOfflineAfterSendFail(const DeviceInfos & devInfo)379 void NetworkAdapter::CheckDeviceOfflineAfterSendFail(const DeviceInfos &devInfo)
380 {
381 // Note: only the identifier field of devInfo is valid, enough to call IsSameProcessLabelStartedOnPeerDevice
382 bool isAlreadyOffline = true;
383 {
384 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
385 if (onlineRemoteDev_.count(devInfo.identifier) != 0) {
386 isAlreadyOffline = false;
387 }
388 }
389
390 // Seem online but send fail, we have to check whether still online
391 if (!isAlreadyOffline) {
392 if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
393 LOGW("[NAdapt][CheckAfterSend] Missed Offline Detected.");
394 {
395 // Mark this device not online immediately to avoid repeatedly miss-offline detect when send continually
396 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
397 onlineRemoteDev_.erase(devInfo.identifier);
398 }
399 pendingAsyncTaskCount_.fetch_add(1);
400 // Note: devInfo should be captured by value (must not by reference)
401 TaskAction callbackTask = [devInfo, this]() {
402 LOGI("[NAdapt][CheckAfterSend] In Async Task, devInfo=%s{private}.", devInfo.identifier.c_str());
403 OnDeviceChangeHandler(devInfo, false);
404 pendingAsyncTaskCount_.fetch_sub(1);
405 asyncTaskDoneCv_.notify_all();
406 };
407 // Use ScheduleQueuedTask to keep order
408 int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
409 if (errCode != E_OK) {
410 LOGE("[NAdapt][CheckAfterSend] ScheduleQueuedTask failed, errCode = %d.", errCode);
411 pendingAsyncTaskCount_.fetch_sub(1);
412 asyncTaskDoneCv_.notify_all();
413 }
414 }
415 }
416 }
417
IsDeviceOnline(const std::string & device)418 bool NetworkAdapter::IsDeviceOnline(const std::string &device)
419 {
420 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
421 return (onlineRemoteDev_.find(device) != onlineRemoteDev_.end());
422 }
423
GetExtendHeaderHandle(const ExtendInfo & paramInfo)424 std::shared_ptr<ExtendHeaderHandle> NetworkAdapter::GetExtendHeaderHandle(const ExtendInfo ¶mInfo)
425 {
426 return processCommunicator_->GetExtendHeaderHandle(paramInfo);
427 }
428 } // namespace DistributedDB
429