1 /*
2  * Copyright (c) 2020-2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "message_looper.h"
17 
18 #include <cstdio>
19 #include <cstdlib>
20 #include <unistd.h>
21 #include <cstring>
22 #include <iterator>
23 #include <sys/prctl.h>
24 #include <cerrno>
25 #include "media_log.h"
26 #include "fsm_common.h"
27 
28 namespace OHOS {
29 #if (defined(__LITE__))
30 const uint32_t MSGLOOPER_LITEOS_STACK_SIZE = 0x10000;
31 #endif
32 const uint32_t MSGLOOPER_EMPTY_WAIT_TIME_US = 200 * 1000; /* empty wait time is 200 ms, namely 200 * 1000 us */
33 
MessageLooper(std::string name)34 MessageLooper::MessageLooper(std::string name) : m_name(name)
35 {
36     m_eventQueue.clear();
37 }
38 
~MessageLooper()39 MessageLooper::~MessageLooper()
40 {
41     (void)Deinit();
42 }
43 
Deinit()44 int32_t MessageLooper::Deinit()
45 {
46     int32_t ret = HI_SUCCESS;
47     if (m_isInited) {
48         ret = Stop();
49         if (ret != HI_SUCCESS) {
50             MEDIA_ERR_LOG("MessageLooper Stop error");
51         }
52         m_eventQueue.clear();
53         delete m_msgPool;
54         m_msgPool = nullptr;
55         (void)pthread_mutex_destroy(&m_handlerMutex);
56         (void)pthread_cond_destroy(&m_queueCondition);
57         (void)pthread_mutex_destroy(&m_queueLock);
58         m_isInited = false;
59     }
60     return ret;
61 }
62 
Init(uint32_t maxQueueSize,uint32_t msgPayloadLen,std::string name)63 int32_t MessageLooper::Init(uint32_t maxQueueSize, uint32_t msgPayloadLen, std::string name)
64 {
65     if (m_isInited) {
66         return HI_SUCCESS;
67     }
68     int32_t ret = pthread_mutex_init(&m_handlerMutex, nullptr);
69     if (ret != HI_SUCCESS) {
70         MEDIA_ERR_LOG("pthread_mutex_init m_handlerMutex error");
71         return HI_FAILURE;
72     }
73     ret = pthread_mutex_init(&m_queueLock, nullptr);
74     if (ret != HI_SUCCESS) {
75         MEDIA_ERR_LOG("pthread_mutex_init m_queueLock error");
76         goto MUTEX_DEL;
77     }
78     FsmCondInitRelative(m_queueCondition);
79     m_msgPool = new (std::nothrow) MMessagePool(name);
80     if (m_msgPool == nullptr) {
81         MEDIA_ERR_LOG("create m_msgPool faileld");
82         goto COND_DEL;
83     }
84     if (m_msgPool->Init(maxQueueSize, msgPayloadLen) != HI_SUCCESS) {
85         goto DEL_MSGPOOL;
86     }
87     m_isInited = true;
88     return HI_SUCCESS;
89 
90 DEL_MSGPOOL:
91     delete m_msgPool;
92     m_msgPool = nullptr;
93 COND_DEL:
94     (void)pthread_cond_destroy(&m_queueCondition);
95     (void)pthread_mutex_destroy(&m_queueLock);
96 MUTEX_DEL:
97     (void)pthread_mutex_destroy(&m_handlerMutex);
98     return HI_FAILURE;
99 }
100 
RegisterHandler(MessageHandler & handler)101 int32_t MessageLooper::RegisterHandler(MessageHandler &handler)
102 {
103     if (m_isThreadRunning) {
104         MEDIA_ERR_LOG("looper be in running, do not support reg handlr");
105         return HI_FAILURE;
106     }
107     m_msgHandlr = &handler;
108     return HI_SUCCESS;
109 }
110 
Looper(void * args)111 void *MessageLooper::Looper(void *args)
112 {
113     MessageLooper *looper = static_cast<MessageLooper *>(args);
114     if (looper == nullptr) {
115         MEDIA_ERR_LOG("looper is null");
116         return nullptr;
117     }
118     looper->QueueHandlr();
119     return nullptr;
120 }
121 
Start()122 int32_t MessageLooper::Start()
123 {
124     if (m_isThreadRunning) {
125         MEDIA_ERR_LOG("looper already be running");
126         return HI_SUCCESS;
127     }
128 
129     if (m_msgHandlr == nullptr) {
130         MEDIA_ERR_LOG("no message handlr be registered");
131         return HI_FAILURE;
132     }
133 
134     m_isThreadRunning = true;
135 #if (defined(__LITE__))
136     pthread_attr_t attr;
137 
138     pthread_attr_init(&attr);
139     pthread_attr_setstacksize(&attr, MSGLOOPER_LITEOS_STACK_SIZE);
140     int32_t ret = pthread_create(&m_loopThrd, &attr, Looper, this);
141 #else
142     int32_t ret = pthread_create(&m_loopThrd, nullptr, Looper, this);
143 #endif
144     if (ret != 0) {
145         MEDIA_ERR_LOG("pthread_create failed %d", ret);
146 #if (defined(__LITE__))
147         pthread_attr_destroy(&attr);
148 #endif
149         m_isThreadRunning = false;
150         return HI_FAILURE;
151     }
152 
153 #if (defined(__LITE__))
154     pthread_attr_destroy(&attr);
155 #endif
156 
157     return HI_SUCCESS;
158 }
159 
Stop()160 int32_t MessageLooper::Stop()
161 {
162     int32_t ret = HI_SUCCESS;
163     if (m_isThreadRunning) {
164         FSM_LOCK(m_queueLock);
165         m_isThreadRunning = false;
166         (void)pthread_cond_broadcast(&m_queueCondition);
167         FSM_UNLOCK(m_queueLock);
168         pthread_join(m_loopThrd, nullptr);
169         FSM_LOCK(m_queueLock);
170         if (!m_eventQueue.empty()) {
171             MEDIA_ERR_LOG("%s have msg in queue, could not stop error", m_name.c_str());
172             ret = HI_FAILURE;
173         }
174         FSM_UNLOCK(m_queueLock);
175     }
176     if (m_msgPool != nullptr) {
177         m_msgPool->Dump();
178     }
179     return ret;
180 }
181 
QueueHandlr()182 void MessageLooper::QueueHandlr()
183 {
184     int32_t ret = HI_SUCCESS;
185 
186     prctl(PR_SET_NAME, m_name.c_str(), 0, 0, 0);
187 
188     while (m_isThreadRunning) {
189         FSM_LOCK(m_queueLock);
190         if (!m_isThreadRunning) { // must check value again inside lock
191             MEDIA_INFO_LOG("%s will stop", m_name.c_str());
192             FSM_UNLOCK(m_queueLock);
193             break;
194         }
195         if (m_eventQueue.empty()) {
196             FsmCondTimewait(m_queueCondition, m_queueLock, MSGLOOPER_EMPTY_WAIT_TIME_US);
197             FSM_UNLOCK(m_queueLock);
198             continue;
199         }
200 
201         uint64_t whenUs = (*m_eventQueue.begin()).whenUs;
202         uint64_t nowUs = FsmGetCurTimeUs();
203         if (whenUs > nowUs) {
204             uint32_t delayUs = (uint32_t)(whenUs - nowUs);
205             ret = FsmCondTimewait(m_queueCondition, m_queueLock, delayUs);
206             if (ret != ETIMEDOUT && ret != HI_SUCCESS) {
207                 MEDIA_INFO_LOG("FSM_Cond_Timewait err: %d", ret);
208             }
209             FSM_UNLOCK(m_queueLock);
210             continue;
211         }
212 
213         MsgEvent event = *m_eventQueue.begin();
214         m_eventQueue.erase(m_eventQueue.begin());
215 
216         FSM_UNLOCK(m_queueLock);
217 
218         LockHandler();
219         if (m_msgHandlr == nullptr || m_msgPool == nullptr) {
220             MEDIA_INFO_LOG("msghandler or m_msgPool is null");
221             UnlockHandler();
222             break;
223         }
224         m_msgHandlr->OnMessageReceived(*(event.msg));
225         UnlockHandler();
226 
227         // statemachine.post() set isNeedReply=false, for there is no requirment by now.
228         if (event.msg->isNeedReply && event.msg->msgCallback) {
229             event.msg->msgCallback(event.msg->privDate);
230         }
231 
232         m_msgPool->PutMsg(*(event.msg));
233     }
234 }
235 
Send(const MsgInfo & msg)236 int32_t MessageLooper::Send(const MsgInfo &msg)
237 {
238     LockHandler();
239     if (m_msgHandlr == nullptr) {
240         MEDIA_INFO_LOG("no message handlr be registered");
241         UnlockHandler();
242         return HI_FAILURE;
243     }
244     int32_t ret = m_msgHandlr->OnMessageReceived(msg);
245     UnlockHandler();
246 
247     return ret;
248 }
249 
Post(const MsgInfo & msg,uint64_t delayUs)250 int32_t MessageLooper::Post(const MsgInfo &msg, uint64_t delayUs)
251 {
252     if (!m_isThreadRunning) {
253         MEDIA_ERR_LOG("post failed, looper is stoped");
254         return HI_FAILURE;
255     }
256     if (m_msgPool == nullptr) {
257         MEDIA_ERR_LOG("msgPool is null");
258         return HI_FAILURE;
259     }
260     MsgInfo *msgInfo = m_msgPool->GetMsg(msg);
261     if (msgInfo == nullptr) {
262         MEDIA_ERR_LOG("getMsg err");
263         return HI_FAILURE;
264     }
265 
266     msgInfo->isNeedReply = false;
267     InsertQueue(*msgInfo, delayUs);
268 
269     return HI_SUCCESS;
270 }
271 
InsertQueue(MsgInfo & msg,uint64_t delayUs)272 void MessageLooper::InsertQueue(MsgInfo &msg, uint64_t delayUs)
273 {
274     uint64_t curUs = FsmGetCurTimeUs();
275     uint64_t whenUs = (delayUs > 0) ? (curUs + delayUs) : curUs;
276 
277     FSM_LOCK(m_queueLock);
278 
279     MsgEvent event{&msg, whenUs};
280 
281     if (m_eventQueue.empty()) {
282         m_eventQueue.push_back(event);
283         FSM_COND_SIGNAL(m_queueCondition);
284     } else {
285         std::list<MsgEvent>::iterator it = m_eventQueue.begin();
286         while (it != m_eventQueue.end() && (*it).whenUs <= whenUs) {
287             ++it;
288         }
289 
290         m_eventQueue.insert(it, event);
291 
292         if (it == m_eventQueue.begin()) {
293             FSM_COND_SIGNAL(m_queueCondition);
294         }
295     }
296 
297     FSM_UNLOCK(m_queueLock);
298 }
299 
HasMessage(const MsgInfo & msg)300 bool MessageLooper::HasMessage(const MsgInfo &msg)
301 {
302     bool isHasMsg = false;
303 
304     FSM_LOCK(m_queueLock);
305 
306     std::list<MsgEvent>::iterator it = m_eventQueue.begin();
307     while (it != m_eventQueue.end()) {
308         if (it->msg->what == msg.what) {
309             isHasMsg = true;
310             break;
311         }
312         ++it;
313     }
314 
315     FSM_UNLOCK(m_queueLock);
316     return isHasMsg;
317 }
318 
RemoveMessage(const MsgInfo & msg)319 void MessageLooper::RemoveMessage(const MsgInfo &msg)
320 {
321     MsgEvent event = { nullptr, 0 };
322     bool isFound = false;
323 
324     FSM_LOCK(m_queueLock);
325 
326     std::list<MsgEvent>::iterator it = m_eventQueue.begin();
327     while (it != m_eventQueue.end()) {
328         if (it->msg->what == msg.what) {
329             event = *it;
330             m_eventQueue.erase(it);
331             isFound = true;
332             break;
333         }
334         ++it;
335     }
336 
337     FSM_UNLOCK(m_queueLock);
338 
339     if (isFound && (m_msgPool != nullptr)) {
340         MEDIA_INFO_LOG("%s remove msg: %d", m_name.c_str(), event.msg->what);
341         m_msgPool->PutMsg(*(event.msg));
342     }
343 }
344 
Dump()345 void MessageLooper::Dump()
346 {
347     FSM_LOCK(m_queueLock);
348 
349     MEDIA_INFO_LOG(
350                   "looper[%s], msg cnt: %zu", m_name.c_str(), m_eventQueue.size());
351     std::list<MsgEvent>::iterator it = m_eventQueue.begin();
352     while (it != m_eventQueue.end()) {
353         MEDIA_INFO_LOG("msg id[%d], handleWhen[%llu]", it->msg->what, it->whenUs);
354         ++it;
355     }
356     FSM_UNLOCK(m_queueLock);
357     if (m_msgPool != nullptr) {
358         m_msgPool->Dump();
359     }
360 }
361 
LockHandler()362 void MessageLooper::LockHandler()
363 {
364     pthread_mutex_lock(&m_handlerMutex);
365 }
366 
UnlockHandler()367 void MessageLooper::UnlockHandler()
368 {
369     pthread_mutex_unlock(&m_handlerMutex);
370 }
371 };
372