1 /*
2 * Copyright (c) 2022-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "wrapper_listener.h"
17
18 #include <cerrno>
19 #include <cstdlib>
20 #include <fcntl.h>
21 #include <memory>
22 #include <poll.h>
23 #include <sys/socket.h>
24 #include <unistd.h>
25 #include <vector>
26 #include <pthread.h>
27
28 #include "ffrt.h"
29 #include "ffrt_inner.h"
30 #include "netlink_define.h"
31 #include "netnative_log_wrapper.h"
32
33 namespace OHOS::nmd {
34 namespace {
35 constexpr int32_t PRET_SIZE = 2;
36 } // namespace
37
WrapperListener(int32_t socket,std::function<void (int32_t)> recvFunc)38 WrapperListener::WrapperListener(int32_t socket, std::function<void(int32_t)> recvFunc)
39 {
40 socket_ = socket;
41 startReceiveFunc_ = recvFunc;
42 }
43
~WrapperListener()44 WrapperListener::~WrapperListener()
45 {
46 Stop();
47 }
48
Start()49 int32_t WrapperListener::Start()
50 {
51 if (socket_ < 0) {
52 NETNATIVE_LOGE("listener socket_ < 0 %{public}d", socket_);
53 return NetlinkResult::ERROR;
54 }
55
56 int pipeRet = pipe2(pipe_, O_CLOEXEC);
57 if (pipeRet != 0) {
58 NETNATIVE_LOGE("pipeRes = %{public}d, pipe create failed errno = %{public}d, %{public}s", pipeRet, errno,
59 strerror(errno));
60 return NetlinkResult::ERROR;
61 }
62 ffrt::submit([this]() { WrapperListener::ListenThread(this); }, {}, {}, ffrt::task_attr().name("WrapListen"));
63 return NetlinkResult::OK;
64 }
65
Stop()66 int32_t WrapperListener::Stop()
67 {
68 NETNATIVE_LOGI("WrapperListener: Stop");
69 char pipe = PIPE_SHUTDOWN;
70 if (TEMP_FAILURE_RETRY(write(pipe_[1], &pipe, sizeof(pipe))) != 1) {
71 NETNATIVE_LOGE("write pipe failed errno = %{public}d, %{public}s", errno, strerror(errno));
72 return NetlinkResult::ERROR;
73 }
74
75 for (auto &pi : pipe_) {
76 if (pi > 0) {
77 close(pi);
78 }
79 }
80
81 if (socket_ > -1) {
82 close(socket_);
83 }
84
85 return NetlinkResult::OK;
86 }
87
ListenThread(WrapperListener * listener)88 void WrapperListener::ListenThread(WrapperListener *listener)
89 {
90 listener->Listen();
91 }
92
Listen()93 void WrapperListener::Listen()
94 {
95 if (startReceiveFunc_ == nullptr) {
96 NETNATIVE_LOGE("startReceiveFunc_ is nullptr start listen failed");
97 return;
98 }
99 while (true) {
100 std::vector<pollfd> pollFds;
101 std::lock_guard<ffrt::mutex> lock(clientsLock_);
102 pollFds.reserve(PRET_SIZE + 1);
103 pollfd polfd;
104 polfd.fd = pipe_[0];
105 polfd.events = POLLIN;
106 pollFds.emplace_back(polfd);
107 polfd.fd = socket_;
108 polfd.events = POLLIN;
109 pollFds.emplace_back(polfd);
110 ffrt::sync_io(socket_);
111 int32_t ret = TEMP_FAILURE_RETRY(poll(pollFds.data(), pollFds.size(), -1));
112 if (ret < 0) {
113 ffrt::this_task::sleep_for(std::chrono::seconds(1));
114 }
115
116 if (pollFds[0].revents & (POLLIN | POLLERR)) {
117 char ctlp = PIPE_SHUTDOWN;
118 TEMP_FAILURE_RETRY(read(pipe_[0], &ctlp, sizeof(ctlp)));
119 if (ctlp == PIPE_SHUTDOWN) {
120 break;
121 }
122 continue;
123 }
124 startReceiveFunc_(socket_);
125 }
126 }
127 } // namespace OHOS::nmd
128