1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef PROCESS_COMMUNICATOR_IMPL_H
17 #define PROCESS_COMMUNICATOR_IMPL_H
18 
19 #include <mutex>
20 
21 #include "communication_provider.h"
22 #include "iprocess_communicator.h"
23 #include "route_head_handler.h"
24 
25 namespace OHOS {
26 namespace AppDistributedKv {
27 class API_EXPORT ProcessCommunicatorImpl : public DistributedDB::IProcessCommunicator, public AppDataChangeListener,
28                                            public AppDeviceChangeListener {
29 
30 public:
31     using DBStatus = DistributedDB::DBStatus;
32     using OnDeviceChange = DistributedDB::OnDeviceChange;
33     using OnDataReceive = DistributedDB::OnDataReceive;
34     using OnSendAble = DistributedDB::OnSendAble;
35     using DeviceInfos = DistributedDB::DeviceInfos;
36     using RouteHeadHandlerCreator =
37         std::function<std::shared_ptr<DistributedData::RouteHeadHandler>(const DistributedDB::ExtendInfo &info)>;
38 
39     API_EXPORT ProcessCommunicatorImpl();
40     API_EXPORT explicit ProcessCommunicatorImpl(RouteHeadHandlerCreator handlerCreator);
41     API_EXPORT ~ProcessCommunicatorImpl() override;
42 
43     DBStatus Start(const std::string &processLabel) override;
44     DBStatus Stop() override;
45 
46     DBStatus RegOnDeviceChange(const OnDeviceChange &callback) override;
47     DBStatus RegOnDataReceive(const OnDataReceive &callback) override;
48     void RegOnSendAble(const OnSendAble &sendAbleCallback) override;
49 
50     DBStatus SendData(const DeviceInfos &dstDevInfo, const uint8_t *data, uint32_t length) override;
51     DBStatus SendData(const DeviceInfos &dstDevInfo, const uint8_t *data, uint32_t length,
52         uint32_t totalLength) override;
53     uint32_t GetMtuSize() override;
54     uint32_t GetMtuSize(const DeviceInfos &devInfo) override;
55     uint32_t GetTimeout(const DeviceInfos &devInfo) override;
56     DeviceInfos GetLocalDeviceInfos() override;
57     std::vector<DeviceInfos> GetRemoteOnlineDeviceInfosList() override;
58     bool IsSameProcessLabelStartedOnPeerDevice(const DeviceInfos &peerDevInfo) override;
59     void OnDeviceChanged(const DeviceInfo &info, const DeviceChangeType &type) const override;
60     void OnSessionReady(const DeviceInfo &info, int32_t errCode) const override;
61 
62     API_EXPORT std::shared_ptr<DistributedDB::ExtendHeaderHandle> GetExtendHeaderHandle(
63         const DistributedDB::ExtendInfo &info) override;
64     API_EXPORT DBStatus CheckAndGetDataHeadInfo(
65         const uint8_t *data, uint32_t dataLen, uint32_t &headLen, std::vector<std::string> &users) override;
66 
67 private:
68     void OnMessage(const DeviceInfo &info, const uint8_t *ptr, const int size,
69                    const PipeInfo &pipeInfo) const override;
70 
71     std::string thisProcessLabel_;
72     OnDeviceChange onDeviceChangeHandler_;
73     OnDataReceive onDataReceiveHandler_;
74     OnSendAble sessionListener_;
75     RouteHeadHandlerCreator routeHeadHandlerCreator_; // route header handler creator
76 
77     mutable std::mutex onDeviceChangeMutex_;
78     mutable std::mutex onDataReceiveMutex_;
79     mutable std::mutex sessionMutex_;
80 
81     static constexpr uint32_t MTU_SIZE = 4194304; // the max transmission unit size(4M - 80B)
82     static constexpr uint32_t MTU_SIZE_WATCH = 81920; // the max transmission unit size(80K)
83 };
84 }  // namespace AppDistributedKv
85 }  // namespace OHOS
86 #endif // PROCESS_COMMUNICATOR_IMPL_H