1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "network_adapter.h"
17 #include "db_constant.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "log_print.h"
21 #include "runtime_context.h"
22 
23 namespace DistributedDB {
24 namespace {
25 const std::string DEFAULT_PROCESS_LABEL = "Distributeddb_Anonymous_Process";
26 const std::string SCHEDULE_QUEUE_TAG = "NetworkAdapter";
27 }
28 
NetworkAdapter()29 NetworkAdapter::NetworkAdapter()
30     : processLabel_(DEFAULT_PROCESS_LABEL), processCommunicator_(nullptr)
31 {
32 }
33 
NetworkAdapter(const std::string & inProcessLabel)34 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel)
35     : processLabel_(inProcessLabel), processCommunicator_(nullptr)
36 {
37 }
38 
NetworkAdapter(const std::string & inProcessLabel,const std::shared_ptr<IProcessCommunicator> & inCommunicator)39 NetworkAdapter::NetworkAdapter(const std::string &inProcessLabel,
40     const std::shared_ptr<IProcessCommunicator> &inCommunicator)
41     : processLabel_(inProcessLabel), processCommunicator_(inCommunicator)
42 {
43 }
44 
~NetworkAdapter()45 NetworkAdapter::~NetworkAdapter()
46 {
47 }
48 
StartAdapter()49 int NetworkAdapter::StartAdapter()
50 {
51     LOGI("[NAdapt][Start] Enter, ProcessLabel=%s.", STR_MASK(DBCommon::TransferStringToHex(processLabel_)));
52     if (processLabel_.empty()) {
53         return -E_INVALID_ARGS;
54     }
55     if (!processCommunicator_) {
56         LOGE("[NAdapt][Start] ProcessCommunicator not be designated yet.");
57         return -E_INVALID_ARGS;
58     }
59     DBStatus errCode = processCommunicator_->Start(processLabel_);
60     if (errCode != DBStatus::OK) {
61         LOGE("[NAdapt][Start] Start Fail, errCode=%d.", static_cast<int>(errCode));
62         return -E_PERIPHERAL_INTERFACE_FAIL;
63     }
64     errCode = processCommunicator_->RegOnDataReceive(
65         [this](const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length) {
66             OnDataReceiveHandler(srcDevInfo, data, length);
67         });
68     if (errCode != DBStatus::OK) {
69         LOGE("[NAdapt][Start] RegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
70         // DO ROLLBACK
71         errCode = processCommunicator_->Stop();
72         LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
73         return -E_PERIPHERAL_INTERFACE_FAIL;
74     }
75     errCode = processCommunicator_->RegOnDeviceChange([this](const DeviceInfos &devInfo, bool isOnline) {
76         OnDeviceChangeHandler(devInfo, isOnline);
77     });
78     if (errCode != DBStatus::OK) {
79         LOGE("[NAdapt][Start] RegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
80         // DO ROLLBACK
81         errCode = processCommunicator_->RegOnDataReceive(nullptr);
82         LOGI("[NAdapt][Start] ROLLBACK: UnRegOnDataReceive errCode=%d.", static_cast<int>(errCode));
83         errCode = processCommunicator_->Stop();
84         LOGI("[NAdapt][Start] ROLLBACK: Stop errCode=%d.", static_cast<int>(errCode));
85         return -E_PERIPHERAL_INTERFACE_FAIL;
86     }
87     processCommunicator_->RegOnSendAble([this](const DeviceInfos &devInfo, int softBusErrCode) {
88         if (onSendableHandle_ != nullptr) {
89             onSendableHandle_(devInfo.identifier, softBusErrCode);
90         }
91     });
92     // These code is compensation for the probable defect of IProcessCommunicator implementation.
93     // As described in the agreement, for the missed online situation, we search for the online devices at beginning.
94     // OnDeviceChangeHandler is reused to check the existence of peer process.
95     // Since at this point, the CommunicatorAggregator had not been fully initialized,
96     // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
97     SearchOnlineRemoteDeviceAtStartup();
98     LOGI("[NAdapt][Start] Exit.");
99     return E_OK;
100 }
101 
102 // StartAdapter and StopAdapter are all innerly called by ICommunicatorAggregator
103 // If StopAdapter is called, the StartAdapter must have been called successfully before,
104 // so processCommunicator_ won't be null
StopAdapter()105 void NetworkAdapter::StopAdapter()
106 {
107     LOGI("[NAdapt][Stop] Enter, ProcessLabel=%s.", processLabel_.c_str());
108     processCommunicator_->RegOnSendAble(nullptr);
109     DBStatus errCode = processCommunicator_->RegOnDeviceChange(nullptr);
110     if (errCode != DBStatus::OK) {
111         LOGE("[NAdapt][Stop] UnRegOnDeviceChange Fail, errCode=%d.", static_cast<int>(errCode));
112     }
113     errCode = processCommunicator_->RegOnDataReceive(nullptr);
114     if (errCode != DBStatus::OK) {
115         LOGE("[NAdapt][Stop] UnRegOnDataReceive Fail, errCode=%d.", static_cast<int>(errCode));
116     }
117     errCode = processCommunicator_->Stop();
118     if (errCode != DBStatus::OK) {
119         LOGE("[NAdapt][Stop] Stop Fail, errCode=%d.", static_cast<int>(errCode));
120     }
121     // We don't reset the shared_ptr of commProvider here, the release of commProvider is done by deconstruct of adapter
122     // In this way, the adapter can be start again after stop it, since it still hold the an valid commProvider
123     // The async task is dependent on this Object. we have to wait until all async task finished.
124     LOGI("[NAdapt][Stop] Wait all async task done.");
125     std::unique_lock<std::mutex> asyncTaskDoneLock(asyncTaskDoneMutex_);
126     asyncTaskDoneCv_.wait(asyncTaskDoneLock, [this] { return pendingAsyncTaskCount_ <= 0; });
127     LOGI("[NAdapt][Stop] Exit.");
128 }
129 
130 namespace {
CheckAndAdjustMtuSize(uint32_t inMtuSize)131 uint32_t CheckAndAdjustMtuSize(uint32_t inMtuSize)
132 {
133     if (inMtuSize < DBConstant::MIN_MTU_SIZE) {
134         return DBConstant::MIN_MTU_SIZE;
135     } else if (inMtuSize > DBConstant::MAX_MTU_SIZE) {
136         return DBConstant::MAX_MTU_SIZE;
137     } else {
138         return (inMtuSize - (inMtuSize % sizeof(uint64_t))); // Octet alignment
139     }
140 }
141 
CheckAndAdjustTimeout(uint32_t inTimeout)142 uint32_t CheckAndAdjustTimeout(uint32_t inTimeout)
143 {
144     if (inTimeout < DBConstant::MIN_TIMEOUT) {
145         return DBConstant::MIN_TIMEOUT;
146     } else if (inTimeout > DBConstant::MAX_TIMEOUT) {
147         return DBConstant::MAX_TIMEOUT;
148     } else {
149         return inTimeout;
150     }
151 }
152 }
153 
GetMtuSize()154 uint32_t NetworkAdapter::GetMtuSize()
155 {
156     std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
157     if (!isMtuSizeValid_) {
158         mtuSize_ = processCommunicator_->GetMtuSize();
159         LOGD("[NAdapt][GetMtu] mtuSize=%" PRIu32 ".", mtuSize_);
160         mtuSize_ = CheckAndAdjustMtuSize(mtuSize_);
161         isMtuSizeValid_ = true;
162     }
163     return mtuSize_;
164 }
165 
GetMtuSize(const std::string & target)166 uint32_t NetworkAdapter::GetMtuSize(const std::string &target)
167 {
168 #ifndef OMIT_MTU_CACHE
169     DeviceInfos devInfo;
170     devInfo.identifier = target;
171     uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
172     return CheckAndAdjustMtuSize(oriMtuSize);
173 #else
174     std::lock_guard<std::mutex> mtuSizeLockGuard(mtuSizeMutex_);
175     if (devMapMtuSize_.count(target) == 0) {
176         DeviceInfos devInfo;
177         devInfo.identifier = target;
178         uint32_t oriMtuSize = processCommunicator_->GetMtuSize(devInfo);
179         LOGD("[NAdapt][GetMtu] mtuSize=%" PRIu32 " of target=%s{private}.", oriMtuSize, target.c_str());
180         devMapMtuSize_[target] = CheckAndAdjustMtuSize(oriMtuSize);
181     }
182     return devMapMtuSize_[target];
183 #endif
184 }
185 
GetTimeout()186 uint32_t NetworkAdapter::GetTimeout()
187 {
188     uint32_t timeout = processCommunicator_->GetTimeout();
189     LOGD("[NAdapt][GetTimeout] timeout_=%" PRIu32 " ms.", timeout);
190     return CheckAndAdjustTimeout(timeout);
191 }
192 
GetTimeout(const std::string & target)193 uint32_t NetworkAdapter::GetTimeout(const std::string &target)
194 {
195     DeviceInfos devInfos;
196     devInfos.identifier = target;
197     uint32_t timeout = processCommunicator_->GetTimeout(devInfos);
198     LOGD("[NAdapt][GetTimeout] timeout=%" PRIu32 " ms of target=%s{private}.", timeout, target.c_str());
199     return CheckAndAdjustTimeout(timeout);
200 }
201 
GetLocalIdentity(std::string & outTarget)202 int NetworkAdapter::GetLocalIdentity(std::string &outTarget)
203 {
204     std::lock_guard<std::mutex> identityLockGuard(identityMutex_);
205     DeviceInfos devInfo = processCommunicator_->GetLocalDeviceInfos();
206     if (devInfo.identifier.empty()) {
207         LOGE("[NetworkAdapter] Get empty local id");
208         return -E_PERIPHERAL_INTERFACE_FAIL;
209     }
210     if (devInfo.identifier != localIdentity_) {
211         LOGI("[NAdapt][GetLocal] localIdentity=%s{private}.", devInfo.identifier.c_str());
212     }
213     localIdentity_ = devInfo.identifier;
214     outTarget = localIdentity_;
215     return E_OK;
216 }
217 
SendBytes(const std::string & dstTarget,const uint8_t * bytes,uint32_t length,uint32_t totalLength)218 int NetworkAdapter::SendBytes(const std::string &dstTarget, const uint8_t *bytes, uint32_t length, uint32_t totalLength)
219 {
220     if (bytes == nullptr || length == 0) {
221         return -E_INVALID_ARGS;
222     }
223     LOGI("[NAdapt][SendBytes] Enter, to=%s{private}, length=%u, totalLength=%u", dstTarget.c_str(), length,
224         totalLength);
225     DeviceInfos dstDevInfo;
226     dstDevInfo.identifier = dstTarget;
227     DBStatus errCode = processCommunicator_->SendData(dstDevInfo, bytes, length,
228         totalLength > length ? totalLength : length);
229     if (errCode == DBStatus::RATE_LIMIT) {
230         LOGD("[NAdapt][SendBytes] rate limit!");
231         return -E_WAIT_RETRY;
232     }
233     if (errCode != DBStatus::OK) {
234         LOGE("[NAdapt][SendBytes] SendData Fail, errCode=%d.", static_cast<int>(errCode));
235         // These code is compensation for the probable defect of IProcessCommunicator implementation.
236         // As described in the agreement, for the missed offline situation, we check if still online at send fail.
237         // OnDeviceChangeHandler is reused but check the existence of peer process is done outerly.
238         // Since this thread is the sending_thread of the CommunicatorAggregator,
239         // We need an async task which bring about dependency on the lifecycle of this NetworkAdapter Object.
240         CheckDeviceOfflineAfterSendFail(dstDevInfo);
241         return static_cast<int>(errCode);
242     }
243     return E_OK;
244 }
245 
RegBytesReceiveCallback(const BytesReceiveCallback & onReceive,const Finalizer & inOper)246 int NetworkAdapter::RegBytesReceiveCallback(const BytesReceiveCallback &onReceive, const Finalizer &inOper)
247 {
248     std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
249     return RegCallBack(onReceive, onReceiveHandle_, inOper, onReceiveFinalizer_);
250 }
251 
RegTargetChangeCallback(const TargetChangeCallback & onChange,const Finalizer & inOper)252 int NetworkAdapter::RegTargetChangeCallback(const TargetChangeCallback &onChange, const Finalizer &inOper)
253 {
254     std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
255     return RegCallBack(onChange, onChangeHandle_, inOper, onChangeFinalizer_);
256 }
257 
RegSendableCallback(const SendableCallback & onSendable,const Finalizer & inOper)258 int NetworkAdapter::RegSendableCallback(const SendableCallback &onSendable, const Finalizer &inOper)
259 {
260     std::lock_guard<std::mutex> onSendableLockGard(onSendableMutex_);
261     return RegCallBack(onSendable, onSendableHandle_, inOper, onSendableFinalizer_);
262 }
263 
OnDataReceiveHandler(const DeviceInfos & srcDevInfo,const uint8_t * data,uint32_t length)264 void NetworkAdapter::OnDataReceiveHandler(const DeviceInfos &srcDevInfo, const uint8_t *data, uint32_t length)
265 {
266     if (data == nullptr || length == 0) {
267         LOGE("[NAdapt][OnDataRecv] data nullptr or length = %u.", length);
268         return;
269     }
270     uint32_t headLength = 0;
271     std::vector<std::string> userId;
272     DBStatus errCode = processCommunicator_->CheckAndGetDataHeadInfo(data, length, headLength, userId);
273     LOGI("[NAdapt][OnDataRecv] Enter, from=%s{private}, extendHeadLength=%u, totalLength=%u",
274         srcDevInfo.identifier.c_str(), headLength, length);
275     if (errCode == NO_PERMISSION) {
276         LOGI("[NAdapt][OnDataRecv] userId dismatched, drop packet");
277         return;
278     }
279     if (headLength >= length) {
280         LOGW("[NAdapt][OnDataRecv] head len is too big, drop packet");
281         return;
282     }
283     {
284         std::lock_guard<std::mutex> onReceiveLockGard(onReceiveMutex_);
285         if (!onReceiveHandle_) {
286             LOGE("[NAdapt][OnDataRecv] onReceiveHandle invalid.");
287             return;
288         }
289         std::string currentUserId;
290         if (userId.size() >= 1) {
291             currentUserId = userId[0];
292         }
293         onReceiveHandle_(srcDevInfo.identifier, data + headLength, length - headLength, currentUserId);
294     }
295     // These code is compensation for the probable defect of IProcessCommunicator implementation.
296     // As described in the agreement, for the missed online situation, we check the source dev when received.
297     // OnDeviceChangeHandler is reused to check the existence of peer process.
298     // Since this thread is the callback_thread of IProcessCommunicator, we do this check task directly in this thread.
299     CheckDeviceOnlineAfterReception(srcDevInfo);
300 }
301 
OnDeviceChangeHandler(const DeviceInfos & devInfo,bool isOnline)302 void NetworkAdapter::OnDeviceChangeHandler(const DeviceInfos &devInfo, bool isOnline)
303 {
304     LOGI("[NAdapt][OnDeviceChange] Enter, dev=%s{private}, isOnline=%d", devInfo.identifier.c_str(), isOnline);
305     // These code is compensation for the probable defect of IProcessCommunicator implementation.
306     // As described in the agreement, for the mistake online situation, we check the existence of peer process.
307     // The IProcessCommunicator implementation guarantee that no mistake offline will happen.
308     if (isOnline) {
309         if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
310             LOGI("[NAdapt][OnDeviceChange] Detect Not Really Online.");
311             std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
312             onlineRemoteDev_.erase(devInfo.identifier);
313             return;
314         }
315         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
316         onlineRemoteDev_.insert(devInfo.identifier);
317     } else {
318         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
319         onlineRemoteDev_.erase(devInfo.identifier);
320     }
321     // End compensation, do callback.
322     std::lock_guard<std::mutex> onChangeLockGard(onChangeMutex_);
323     if (!onChangeHandle_) {
324         LOGE("[NAdapt][OnDeviceChange] onChangeHandle_ invalid.");
325         return;
326     }
327     onChangeHandle_(devInfo.identifier, isOnline);
328 }
329 
SearchOnlineRemoteDeviceAtStartup()330 void NetworkAdapter::SearchOnlineRemoteDeviceAtStartup()
331 {
332     std::vector<DeviceInfos> onlineDev = processCommunicator_->GetRemoteOnlineDeviceInfosList();
333     LOGI("[NAdapt][SearchOnline] onlineDev count = %zu.", onlineDev.size());
334     if (onlineDev.empty()) {
335         return;
336     }
337     pendingAsyncTaskCount_.fetch_add(1);
338     // Note: onlineDev should be captured by value (must not by reference)
339     TaskAction callbackTask = [onlineDev, this]() {
340         LOGI("[NAdapt][SearchOnline] Begin Callback In Async Task.");
341         std::string localIdentity;
342         GetLocalIdentity(localIdentity); // It doesn't matter if getlocal fail and localIdentity be an empty string
343         for (auto &entry : onlineDev) {
344             if (entry.identifier == localIdentity) {
345                 LOGW("[NAdapt][SearchOnline] Detect Local Device in Remote Device List.");
346                 continue;
347             }
348             OnDeviceChangeHandler(entry, true);
349         }
350         pendingAsyncTaskCount_.fetch_sub(1);
351         asyncTaskDoneCv_.notify_all();
352         LOGI("[NAdapt][SearchOnline] End Callback In Async Task.");
353     };
354     // Use ScheduleQueuedTask to keep order
355     int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
356     if (errCode != E_OK) {
357         LOGE("[NAdapt][SearchOnline] ScheduleQueuedTask failed, errCode = %d.", errCode);
358         pendingAsyncTaskCount_.fetch_sub(1);
359         asyncTaskDoneCv_.notify_all();
360     }
361 }
362 
CheckDeviceOnlineAfterReception(const DeviceInfos & devInfo)363 void NetworkAdapter::CheckDeviceOnlineAfterReception(const DeviceInfos &devInfo)
364 {
365     bool isAlreadyOnline = true;
366     {
367         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
368         if (onlineRemoteDev_.count(devInfo.identifier) == 0) {
369             isAlreadyOnline = false;
370         }
371     }
372 
373     // Seem offline but receive data from it, let OnDeviceChangeHandler check whether it is really online
374     if (!isAlreadyOnline) {
375         OnDeviceChangeHandler(devInfo, true);
376     }
377 }
378 
CheckDeviceOfflineAfterSendFail(const DeviceInfos & devInfo)379 void NetworkAdapter::CheckDeviceOfflineAfterSendFail(const DeviceInfos &devInfo)
380 {
381     // Note: only the identifier field of devInfo is valid, enough to call IsSameProcessLabelStartedOnPeerDevice
382     bool isAlreadyOffline = true;
383     {
384         std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
385         if (onlineRemoteDev_.count(devInfo.identifier) != 0) {
386             isAlreadyOffline = false;
387         }
388     }
389 
390     // Seem online but send fail, we have to check whether still online
391     if (!isAlreadyOffline) {
392         if (!processCommunicator_->IsSameProcessLabelStartedOnPeerDevice(devInfo)) {
393             LOGW("[NAdapt][CheckAfterSend] Missed Offline Detected.");
394             {
395                 // Mark this device not online immediately to avoid repeatedly miss-offline detect when send continually
396                 std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
397                 onlineRemoteDev_.erase(devInfo.identifier);
398             }
399             pendingAsyncTaskCount_.fetch_add(1);
400             // Note: devInfo should be captured by value (must not by reference)
401             TaskAction callbackTask = [devInfo, this]() {
402                 LOGI("[NAdapt][CheckAfterSend] In Async Task, devInfo=%s{private}.", devInfo.identifier.c_str());
403                 OnDeviceChangeHandler(devInfo, false);
404                 pendingAsyncTaskCount_.fetch_sub(1);
405                 asyncTaskDoneCv_.notify_all();
406             };
407             // Use ScheduleQueuedTask to keep order
408             int errCode = RuntimeContext::GetInstance()->ScheduleQueuedTask(SCHEDULE_QUEUE_TAG, callbackTask);
409             if (errCode != E_OK) {
410                 LOGE("[NAdapt][CheckAfterSend] ScheduleQueuedTask failed, errCode = %d.", errCode);
411                 pendingAsyncTaskCount_.fetch_sub(1);
412                 asyncTaskDoneCv_.notify_all();
413             }
414         }
415     }
416 }
417 
IsDeviceOnline(const std::string & device)418 bool NetworkAdapter::IsDeviceOnline(const std::string &device)
419 {
420     std::lock_guard<std::mutex> onlineRemoteDevLockGuard(onlineRemoteDevMutex_);
421     return (onlineRemoteDev_.find(device) != onlineRemoteDev_.end());
422 }
423 
GetExtendHeaderHandle(const ExtendInfo & paramInfo)424 std::shared_ptr<ExtendHeaderHandle> NetworkAdapter::GetExtendHeaderHandle(const ExtendInfo &paramInfo)
425 {
426     return processCommunicator_->GetExtendHeaderHandle(paramInfo);
427 }
428 } // namespace DistributedDB
429