1 /*
2 * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "message_looper.h"
17
18 #include <cstdio>
19 #include <cstdlib>
20 #include <unistd.h>
21 #include <cstring>
22 #include <iterator>
23 #include <sys/prctl.h>
24 #include <cerrno>
25 #include "media_log.h"
26 #include "fsm_common.h"
27
28 namespace OHOS {
29 #if (defined(__LITE__))
30 const uint32_t MSGLOOPER_LITEOS_STACK_SIZE = 0x10000;
31 #endif
32 const uint32_t MSGLOOPER_EMPTY_WAIT_TIME_US = 200 * 1000; /* empty wait time is 200 ms, namely 200 * 1000 us */
33
MessageLooper(std::string name)34 MessageLooper::MessageLooper(std::string name) : m_name(name)
35 {
36 m_eventQueue.clear();
37 }
38
~MessageLooper()39 MessageLooper::~MessageLooper()
40 {
41 (void)Deinit();
42 }
43
Deinit()44 int32_t MessageLooper::Deinit()
45 {
46 int32_t ret = HI_SUCCESS;
47 if (m_isInited) {
48 ret = Stop();
49 if (ret != HI_SUCCESS) {
50 MEDIA_ERR_LOG("MessageLooper Stop error");
51 }
52 m_eventQueue.clear();
53 delete m_msgPool;
54 m_msgPool = nullptr;
55 (void)pthread_mutex_destroy(&m_handlerMutex);
56 (void)pthread_cond_destroy(&m_queueCondition);
57 (void)pthread_mutex_destroy(&m_queueLock);
58 m_isInited = false;
59 }
60 return ret;
61 }
62
Init(uint32_t maxQueueSize,uint32_t msgPayloadLen,std::string name)63 int32_t MessageLooper::Init(uint32_t maxQueueSize, uint32_t msgPayloadLen, std::string name)
64 {
65 if (m_isInited) {
66 return HI_SUCCESS;
67 }
68 int32_t ret = pthread_mutex_init(&m_handlerMutex, nullptr);
69 if (ret != HI_SUCCESS) {
70 MEDIA_ERR_LOG("pthread_mutex_init m_handlerMutex error");
71 return HI_FAILURE;
72 }
73 ret = pthread_mutex_init(&m_queueLock, nullptr);
74 if (ret != HI_SUCCESS) {
75 MEDIA_ERR_LOG("pthread_mutex_init m_queueLock error");
76 goto MUTEX_DEL;
77 }
78 FsmCondInitRelative(m_queueCondition);
79 m_msgPool = new (std::nothrow) MMessagePool(name);
80 if (m_msgPool == nullptr) {
81 MEDIA_ERR_LOG("create m_msgPool faileld");
82 goto COND_DEL;
83 }
84 if (m_msgPool->Init(maxQueueSize, msgPayloadLen) != HI_SUCCESS) {
85 goto DEL_MSGPOOL;
86 }
87 m_isInited = true;
88 return HI_SUCCESS;
89
90 DEL_MSGPOOL:
91 delete m_msgPool;
92 m_msgPool = nullptr;
93 COND_DEL:
94 (void)pthread_cond_destroy(&m_queueCondition);
95 (void)pthread_mutex_destroy(&m_queueLock);
96 MUTEX_DEL:
97 (void)pthread_mutex_destroy(&m_handlerMutex);
98 return HI_FAILURE;
99 }
100
RegisterHandler(MessageHandler & handler)101 int32_t MessageLooper::RegisterHandler(MessageHandler &handler)
102 {
103 if (m_isThreadRunning) {
104 MEDIA_ERR_LOG("looper be in running, do not support reg handlr");
105 return HI_FAILURE;
106 }
107 m_msgHandlr = &handler;
108 return HI_SUCCESS;
109 }
110
Looper(void * args)111 void *MessageLooper::Looper(void *args)
112 {
113 MessageLooper *looper = static_cast<MessageLooper *>(args);
114 if (looper == nullptr) {
115 MEDIA_ERR_LOG("looper is null");
116 return nullptr;
117 }
118 looper->QueueHandlr();
119 return nullptr;
120 }
121
Start()122 int32_t MessageLooper::Start()
123 {
124 if (m_isThreadRunning) {
125 MEDIA_ERR_LOG("looper already be running");
126 return HI_SUCCESS;
127 }
128
129 if (m_msgHandlr == nullptr) {
130 MEDIA_ERR_LOG("no message handlr be registered");
131 return HI_FAILURE;
132 }
133
134 m_isThreadRunning = true;
135 #if (defined(__LITE__))
136 pthread_attr_t attr;
137
138 pthread_attr_init(&attr);
139 pthread_attr_setstacksize(&attr, MSGLOOPER_LITEOS_STACK_SIZE);
140 int32_t ret = pthread_create(&m_loopThrd, &attr, Looper, this);
141 #else
142 int32_t ret = pthread_create(&m_loopThrd, nullptr, Looper, this);
143 #endif
144 if (ret != 0) {
145 MEDIA_ERR_LOG("pthread_create failed %d", ret);
146 #if (defined(__LITE__))
147 pthread_attr_destroy(&attr);
148 #endif
149 m_isThreadRunning = false;
150 return HI_FAILURE;
151 }
152
153 #if (defined(__LITE__))
154 pthread_attr_destroy(&attr);
155 #endif
156
157 return HI_SUCCESS;
158 }
159
Stop()160 int32_t MessageLooper::Stop()
161 {
162 int32_t ret = HI_SUCCESS;
163 if (m_isThreadRunning) {
164 FSM_LOCK(m_queueLock);
165 m_isThreadRunning = false;
166 (void)pthread_cond_broadcast(&m_queueCondition);
167 FSM_UNLOCK(m_queueLock);
168 pthread_join(m_loopThrd, nullptr);
169 FSM_LOCK(m_queueLock);
170 if (!m_eventQueue.empty()) {
171 MEDIA_ERR_LOG("%s have msg in queue, could not stop error", m_name.c_str());
172 ret = HI_FAILURE;
173 }
174 FSM_UNLOCK(m_queueLock);
175 }
176 if (m_msgPool != nullptr) {
177 m_msgPool->Dump();
178 }
179 return ret;
180 }
181
QueueHandlr()182 void MessageLooper::QueueHandlr()
183 {
184 int32_t ret = HI_SUCCESS;
185
186 prctl(PR_SET_NAME, m_name.c_str(), 0, 0, 0);
187
188 while (m_isThreadRunning) {
189 FSM_LOCK(m_queueLock);
190 if (!m_isThreadRunning) { // must check value again inside lock
191 MEDIA_INFO_LOG("%s will stop", m_name.c_str());
192 FSM_UNLOCK(m_queueLock);
193 break;
194 }
195 if (m_eventQueue.empty()) {
196 FsmCondTimewait(m_queueCondition, m_queueLock, MSGLOOPER_EMPTY_WAIT_TIME_US);
197 FSM_UNLOCK(m_queueLock);
198 continue;
199 }
200
201 uint64_t whenUs = (*m_eventQueue.begin()).whenUs;
202 uint64_t nowUs = FsmGetCurTimeUs();
203 if (whenUs > nowUs) {
204 uint32_t delayUs = (uint32_t)(whenUs - nowUs);
205 ret = FsmCondTimewait(m_queueCondition, m_queueLock, delayUs);
206 if (ret != ETIMEDOUT && ret != HI_SUCCESS) {
207 MEDIA_INFO_LOG("FSM_Cond_Timewait err: %d", ret);
208 }
209 FSM_UNLOCK(m_queueLock);
210 continue;
211 }
212
213 MsgEvent event = *m_eventQueue.begin();
214 m_eventQueue.erase(m_eventQueue.begin());
215
216 FSM_UNLOCK(m_queueLock);
217
218 LockHandler();
219 if (m_msgHandlr == nullptr || m_msgPool == nullptr) {
220 MEDIA_INFO_LOG("msghandler or m_msgPool is null");
221 UnlockHandler();
222 break;
223 }
224 m_msgHandlr->OnMessageReceived(*(event.msg));
225 UnlockHandler();
226
227 // statemachine.post() set isNeedReply=false, for there is no requirment by now.
228 if (event.msg->isNeedReply && event.msg->msgCallback) {
229 event.msg->msgCallback(event.msg->privDate);
230 }
231
232 m_msgPool->PutMsg(*(event.msg));
233 }
234 }
235
Send(const MsgInfo & msg)236 int32_t MessageLooper::Send(const MsgInfo &msg)
237 {
238 LockHandler();
239 if (m_msgHandlr == nullptr) {
240 MEDIA_INFO_LOG("no message handlr be registered");
241 UnlockHandler();
242 return HI_FAILURE;
243 }
244 int32_t ret = m_msgHandlr->OnMessageReceived(msg);
245 UnlockHandler();
246
247 return ret;
248 }
249
Post(const MsgInfo & msg,uint64_t delayUs)250 int32_t MessageLooper::Post(const MsgInfo &msg, uint64_t delayUs)
251 {
252 if (!m_isThreadRunning) {
253 MEDIA_ERR_LOG("post failed, looper is stoped");
254 return HI_FAILURE;
255 }
256 if (m_msgPool == nullptr) {
257 MEDIA_ERR_LOG("msgPool is null");
258 return HI_FAILURE;
259 }
260 MsgInfo *msgInfo = m_msgPool->GetMsg(msg);
261 if (msgInfo == nullptr) {
262 MEDIA_ERR_LOG("getMsg err");
263 return HI_FAILURE;
264 }
265
266 msgInfo->isNeedReply = false;
267 InsertQueue(*msgInfo, delayUs);
268
269 return HI_SUCCESS;
270 }
271
InsertQueue(MsgInfo & msg,uint64_t delayUs)272 void MessageLooper::InsertQueue(MsgInfo &msg, uint64_t delayUs)
273 {
274 uint64_t curUs = FsmGetCurTimeUs();
275 uint64_t whenUs = (delayUs > 0) ? (curUs + delayUs) : curUs;
276
277 FSM_LOCK(m_queueLock);
278
279 MsgEvent event{&msg, whenUs};
280
281 if (m_eventQueue.empty()) {
282 m_eventQueue.push_back(event);
283 FSM_COND_SIGNAL(m_queueCondition);
284 } else {
285 std::list<MsgEvent>::iterator it = m_eventQueue.begin();
286 while (it != m_eventQueue.end() && (*it).whenUs <= whenUs) {
287 ++it;
288 }
289
290 m_eventQueue.insert(it, event);
291
292 if (it == m_eventQueue.begin()) {
293 FSM_COND_SIGNAL(m_queueCondition);
294 }
295 }
296
297 FSM_UNLOCK(m_queueLock);
298 }
299
HasMessage(const MsgInfo & msg)300 bool MessageLooper::HasMessage(const MsgInfo &msg)
301 {
302 bool isHasMsg = false;
303
304 FSM_LOCK(m_queueLock);
305
306 std::list<MsgEvent>::iterator it = m_eventQueue.begin();
307 while (it != m_eventQueue.end()) {
308 if (it->msg->what == msg.what) {
309 isHasMsg = true;
310 break;
311 }
312 ++it;
313 }
314
315 FSM_UNLOCK(m_queueLock);
316 return isHasMsg;
317 }
318
RemoveMessage(const MsgInfo & msg)319 void MessageLooper::RemoveMessage(const MsgInfo &msg)
320 {
321 MsgEvent event = { nullptr, 0 };
322 bool isFound = false;
323
324 FSM_LOCK(m_queueLock);
325
326 std::list<MsgEvent>::iterator it = m_eventQueue.begin();
327 while (it != m_eventQueue.end()) {
328 if (it->msg->what == msg.what) {
329 event = *it;
330 m_eventQueue.erase(it);
331 isFound = true;
332 break;
333 }
334 ++it;
335 }
336
337 FSM_UNLOCK(m_queueLock);
338
339 if (isFound && (m_msgPool != nullptr)) {
340 MEDIA_INFO_LOG("%s remove msg: %d", m_name.c_str(), event.msg->what);
341 m_msgPool->PutMsg(*(event.msg));
342 }
343 }
344
Dump()345 void MessageLooper::Dump()
346 {
347 FSM_LOCK(m_queueLock);
348
349 MEDIA_INFO_LOG(
350 "looper[%s], msg cnt: %zu", m_name.c_str(), m_eventQueue.size());
351 std::list<MsgEvent>::iterator it = m_eventQueue.begin();
352 while (it != m_eventQueue.end()) {
353 MEDIA_INFO_LOG("msg id[%d], handleWhen[%llu]", it->msg->what, it->whenUs);
354 ++it;
355 }
356 FSM_UNLOCK(m_queueLock);
357 if (m_msgPool != nullptr) {
358 m_msgPool->Dump();
359 }
360 }
361
LockHandler()362 void MessageLooper::LockHandler()
363 {
364 pthread_mutex_lock(&m_handlerMutex);
365 }
366
UnlockHandler()367 void MessageLooper::UnlockHandler()
368 {
369 pthread_mutex_unlock(&m_handlerMutex);
370 }
371 };
372