1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include <algorithm>
16 #include <cstring>
17 #include <iostream>
18 #include <memory>
19 #include <mutex>
20 #include <thread>
21 #include <sys/prctl.h>
22 #include <sys/stat.h>
23 #include <unistd.h>
24
25 #include <log_utils.h>
26 #include <seq_packet_socket_server.h>
27
28 #include "service_controller.h"
29 #include "cmd_executor.h"
30
31 namespace OHOS {
32 namespace HiviewDFX {
33 static const int MAX_CLIENT_CONNECTIONS = 100;
34
~CmdExecutor()35 CmdExecutor::~CmdExecutor()
36 {
37 std::lock_guard<std::mutex> lg(m_clientAccess);
38 for (auto& client : m_clients) {
39 client->m_stopThread.store(true);
40 }
41 for (auto& client : m_clients) {
42 if (client->m_clientThread.joinable()) {
43 client->m_clientThread.join();
44 }
45 }
46 }
47
MainLoop(const std::string & socketName)48 void CmdExecutor::MainLoop(const std::string& socketName)
49 {
50 SeqPacketSocketServer cmdServer(socketName, MAX_CLIENT_CONNECTIONS);
51 if (cmdServer.Init() < 0) {
52 std::cerr << "Failed to init control socket ! \n";
53 return;
54 }
55 std::cout << "Server started to listen !\n";
56 using namespace std::chrono_literals;
57 cmdServer.StartAcceptingConnection(
58 [this] (std::unique_ptr<Socket> handler) {
59 OnAcceptedConnection(std::move(handler));
60 },
61 3000ms,
62 [this] () {
63 CleanFinishedClients();
64 });
65 }
66
OnAcceptedConnection(std::unique_ptr<Socket> handler)67 void CmdExecutor::OnAcceptedConnection(std::unique_ptr<Socket> handler)
68 {
69 std::lock_guard<std::mutex> lg(m_clientAccess);
70 auto newVal = std::make_unique<ClientThread>();
71 if (newVal != nullptr) {
72 newVal->m_stopThread.store(false);
73 newVal->m_clientThread = std::thread([this](std::unique_ptr<Socket> handler) {
74 ClientEventLoop(std::move(handler));
75 }, std::move(handler));
76 m_clients.push_back(std::move(newVal));
77 }
78 }
79
ClientEventLoop(std::unique_ptr<Socket> handler)80 void CmdExecutor::ClientEventLoop(std::unique_ptr<Socket> handler)
81 {
82 decltype(m_clients)::iterator clientInfoIt;
83 {
84 std::lock_guard<std::mutex> lg(m_clientAccess);
85 clientInfoIt = std::find_if(m_clients.begin(), m_clients.end(),
86 [](const std::unique_ptr<ClientThread>& ct) {
87 return ct->m_clientThread.get_id() == std::this_thread::get_id();
88 });
89 }
90 if (clientInfoIt == m_clients.end()) {
91 std::cerr << "Failed to find client\n";
92 return;
93 }
94
95 prctl(PR_SET_NAME, m_name.c_str());
96 ServiceController serviceCtrl(std::move(handler), m_logCollector, m_hilogBuffer, m_kmsgBuffer);
97 serviceCtrl.CommunicationLoop((*clientInfoIt)->m_stopThread, m_cmdList);
98
99 std::lock_guard<std::mutex> ul(m_finishedClientAccess);
100 m_finishedClients.push_back(std::this_thread::get_id());
101 }
102
CleanFinishedClients()103 void CmdExecutor::CleanFinishedClients()
104 {
105 std::list<std::thread> threadsToJoin;
106 {
107 // select clients to clean up - pick threads that we have to be sure are ended
108 std::scoped_lock sl(m_finishedClientAccess, m_clientAccess);
109 for (auto threadId : m_finishedClients) {
110 auto clientInfoIt = std::find_if(m_clients.begin(), m_clients.end(),
111 [&threadId](const std::unique_ptr<ClientThread>& ct) {
112 return ct->m_clientThread.get_id() == threadId;
113 });
114 if (clientInfoIt != m_clients.end()) {
115 threadsToJoin.push_back(std::move((*clientInfoIt)->m_clientThread));
116 m_clients.erase(clientInfoIt);
117 }
118 }
119 m_finishedClients.clear();
120 }
121 for (auto& thread : threadsToJoin) {
122 if (thread.joinable()) {
123 thread.join();
124 }
125 }
126 }
127 } // namespace HiviewDFX
128 } // namespace OHOS
129