1
2 /*
3 * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "event_queue_ffrt.h"
18
19 #include "ffrt_inner.h"
20 #include "event_logger.h"
21 #include "event_handler.h"
22
23 namespace OHOS {
24 namespace AppExecFwk {
25 namespace {
26
27 DEFINE_EH_HILOG_LABEL("EventQueueFFRT");
28 constexpr static uint32_t MAX_DUMP_INFO_LENGTH = 120000;
29 constexpr static uint32_t MILLI_TO_MICRO = 1000;
30 static constexpr int FFRT_REMOVE_SUCC = 0;
TransferInnerPriority(EventQueue::Priority priority)31 ffrt_inner_queue_priority_t TransferInnerPriority(EventQueue::Priority priority)
32 {
33 ffrt_inner_queue_priority_t innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_low;
34 switch (priority) {
35 case EventQueue::Priority::VIP:
36 innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_vip;
37 break;
38 case EventQueue::Priority::IMMEDIATE:
39 innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_immediate;
40 break;
41 case EventQueue::Priority::HIGH:
42 innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_high;
43 break;
44 case EventQueue::Priority::LOW:
45 innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_low;
46 break;
47 case EventQueue::Priority::IDLE:
48 innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_idle;
49 break;
50 default:
51 break;
52 }
53 return innerPriority;
54 }
55
TransferQueuePtr(std::shared_ptr<ffrt::queue> queue)56 inline ffrt_queue_t* TransferQueuePtr(std::shared_ptr<ffrt::queue> queue)
57 {
58 if (queue) {
59 return reinterpret_cast<ffrt_queue_t*>(queue.get());
60 }
61 return nullptr;
62 }
63 } // unnamed namespace
64
EventQueueFFRT()65 EventQueueFFRT::EventQueueFFRT() : EventQueue()
66 {
67 // Destructed queue in the ffrt task needs to be switched to asynchronous to avoid parsing deadlocks
68 ffrtQueue_ = std::shared_ptr<ffrt::queue>(new ffrt::queue(static_cast<ffrt::queue_type>(
69 ffrt_inner_queue_type_t::ffrt_queue_eventhandler_adapter), "EventHandler_QUEUE"), [](ffrt::queue* ptr) {
70 if (ffrt_this_task_get_id()) {
71 ffrt::submit([ptr]() { delete ptr; });
72 } else {
73 delete ptr;
74 }
75 });
76 HILOGD("Event queue ffrt");
77 }
78
EventQueueFFRT(const std::shared_ptr<IoWaiter> & ioWaiter)79 EventQueueFFRT::EventQueueFFRT(const std::shared_ptr<IoWaiter> &ioWaiter): EventQueue(ioWaiter)
80 {
81 // Destructed queue in the ffrt task needs to be switched to asynchronous to avoid parsing deadlocks
82 ffrtQueue_ = std::shared_ptr<ffrt::queue>(new ffrt::queue(static_cast<ffrt::queue_type>(
83 ffrt_inner_queue_type_t::ffrt_queue_eventhandler_adapter), "EventHandler_QUEUE"), [](ffrt::queue* ptr) {
84 if (ffrt_this_task_get_id()) {
85 ffrt::submit([ptr]() { delete ptr; });
86 } else {
87 delete ptr;
88 }
89 });
90 HILOGD("Event queue ffrt");
91 }
92
~EventQueueFFRT()93 EventQueueFFRT::~EventQueueFFRT()
94 {
95 std::lock_guard<std::mutex> lock(queueLock_);
96 usable_.store(false);
97 ioWaiter_ = nullptr;
98 EH_LOGI_LIMIT("EventQueueFFRT is unavailable hence");
99 }
100
Insert(InnerEvent::Pointer & event,Priority priority,EventInsertType insertType)101 void EventQueueFFRT::Insert(InnerEvent::Pointer &event, Priority priority, EventInsertType insertType)
102 {
103 InsertEvent(event, priority, false, insertType);
104 }
105
RemoveOrphanByHandlerId(const std::string & handlerId)106 void EventQueueFFRT::RemoveOrphanByHandlerId(const std::string& handlerId)
107 {
108 // taskname: handler Id | has task | inner event id | param | task name
109 std::string regular = handlerId + "\\|.*";
110 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
111 if (queue == nullptr) {
112 HILOGW("Remove is unavailable.");
113 return;
114 }
115 ffrt_queue_cancel_by_name(*queue, regular.c_str());
116 std::lock_guard<std::mutex> lock(queueLock_);
117 if (!usable_.load()) {
118 HILOGW("EventQueueFFRT is unavailable.");
119 return;
120 }
121 RemoveInvalidFileDescriptor();
122 }
123
124
RemoveAll()125 void EventQueueFFRT::RemoveAll()
126 {
127 HILOGD("RemoveAll");
128 std::lock_guard<std::mutex> lock(queueLock_);
129 if (!usable_.load()) {
130 HILOGW("EventQueueFFRT is unavailable.");
131 return;
132 }
133 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
134 if (queue == nullptr) {
135 HILOGW("RemoveAll is unavailable.");
136 return;
137 }
138 ffrt_queue_cancel_all(*queue);
139 }
140
Remove(const std::shared_ptr<EventHandler> & owner)141 void EventQueueFFRT::Remove(const std::shared_ptr<EventHandler> &owner)
142 {
143 HILOGD("Remove");
144 if (!owner) {
145 HILOGE("Invalid owner");
146 return;
147 }
148
149 std::lock_guard<std::mutex> lock(queueLock_);
150 if (!usable_.load()) {
151 HILOGW("EventQueueFFRT is unavailable.");
152 return;
153 }
154
155 // taskname: handler Id | has task | inner event id | param | task name
156 std::string regular = owner->GetHandlerId() + "\\|.*";
157 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
158 if (queue == nullptr) {
159 HILOGW("Remove is unavailable.");
160 return;
161 }
162 ffrt_queue_cancel_by_name(*queue, regular.c_str());
163 }
164
Remove(const std::shared_ptr<EventHandler> & owner,uint32_t innerEventId)165 void EventQueueFFRT::Remove(const std::shared_ptr<EventHandler> &owner, uint32_t innerEventId)
166 {
167 HILOGD("Remove");
168 if (!owner) {
169 HILOGE("Invalid owner");
170 return;
171 }
172
173 std::lock_guard<std::mutex> lock(queueLock_);
174 if (!usable_.load()) {
175 HILOGW("EventQueueFFRT is unavailable.");
176 return;
177 }
178
179 // taskname: handler Id | has task | inner event id | param | task name
180 std::string regular = owner->GetHandlerId() + "\\|0\\|" + std::to_string(innerEventId) + "\\|.*";
181 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
182 if (queue == nullptr) {
183 HILOGW("Remove is unavailable.");
184 return;
185 }
186 ffrt_queue_cancel_by_name(*queue, regular.c_str());
187 }
188
Remove(const std::shared_ptr<EventHandler> & owner,uint32_t innerEventId,int64_t param)189 void EventQueueFFRT::Remove(const std::shared_ptr<EventHandler> &owner, uint32_t innerEventId, int64_t param)
190 {
191 HILOGD("Remove");
192 if (!owner) {
193 HILOGE("Invalid owner");
194 return;
195 }
196
197 std::lock_guard<std::mutex> lock(queueLock_);
198 if (!usable_.load()) {
199 HILOGW("EventQueueFFRT is unavailable.");
200 return;
201 }
202
203 // taskname: handler Id | has task | inner event id | param | task name
204 std::string regular = owner->GetHandlerId() + "\\|0\\|" + std::to_string(innerEventId) + "\\|" +
205 std::to_string(param) + "\\|.*";
206 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
207 if (queue == nullptr) {
208 HILOGW("Remove is unavailable.");
209 return;
210 }
211 ffrt_queue_cancel_by_name(*queue, regular.c_str());
212 }
213
Remove(const std::shared_ptr<EventHandler> & owner,const std::string & name)214 bool EventQueueFFRT::Remove(const std::shared_ptr<EventHandler> &owner, const std::string &name)
215 {
216 HILOGD("Remove");
217 if (!owner) {
218 HILOGE("Invalid owner");
219 return false;
220 }
221
222 std::lock_guard<std::mutex> lock(queueLock_);
223 if (!usable_.load()) {
224 HILOGW("EventQueueFFRT is unavailable.");
225 return false;
226 }
227
228 // taskname: handler Id | has task | inner event id | param | task name
229 std::string regular = owner->GetHandlerId() + "\\|1\\|" + ".*\\|" + name;
230 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
231 if (queue == nullptr) {
232 HILOGW("Remove is unavailable.");
233 return false;
234 }
235 int ret = ffrt_queue_cancel_by_name(*queue, regular.c_str());
236 return ret == FFRT_REMOVE_SUCC ? true : false;
237 }
238
HasInnerEvent(const std::shared_ptr<EventHandler> & owner,uint32_t innerEventId)239 bool EventQueueFFRT::HasInnerEvent(const std::shared_ptr<EventHandler> &owner, uint32_t innerEventId)
240 {
241 if (!owner) {
242 HILOGE("Invalid owner");
243 return false;
244 }
245 std::lock_guard<std::mutex> lock(queueLock_);
246 if (!usable_.load()) {
247 HILOGW("EventQueueFFRT is unavailable.");
248 return false;
249 }
250
251 // taskname: handler Id | has task | inner event id | param | task name
252 std::string regular = owner->GetHandlerId() + "\\|0\\|" + std::to_string(innerEventId) + "\\|.*";
253 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
254 if (queue == nullptr) {
255 HILOGW("Remove is unavailable.");
256 return false;
257 }
258 return ffrt_queue_has_task(*queue, regular.c_str());
259 }
260
HasInnerEvent(const std::shared_ptr<EventHandler> & owner,int64_t param)261 bool EventQueueFFRT::HasInnerEvent(const std::shared_ptr<EventHandler> &owner, int64_t param)
262 {
263 if (!owner) {
264 HILOGE("Invalid owner");
265 return false;
266 }
267 std::lock_guard<std::mutex> lock(queueLock_);
268 if (!usable_.load()) {
269 HILOGW("EventQueueFFRT is unavailable.");
270 return false;
271 }
272
273 // taskname: handler Id | has task | inner event id | param | task name
274 std::string regular = owner->GetHandlerId() + "\\|0\\|.*" + std::to_string(param) + "\\|.*";
275 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
276 if (queue == nullptr) {
277 HILOGW("Remove is unavailable.");
278 return false;
279 }
280 return ffrt_queue_has_task(*queue, regular.c_str());
281 }
282
Dump(Dumper & dumper)283 void EventQueueFFRT::Dump(Dumper &dumper)
284 {
285 std::lock_guard<std::mutex> lock(queueLock_);
286 if (!usable_.load()) {
287 HILOGW("EventQueue is unavailable.");
288 return;
289 }
290
291 std::unique_ptr<char[]> chars = std::make_unique<char[]>(MAX_DUMP_INFO_LENGTH);
292 if (chars == nullptr) {
293 return;
294 }
295 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
296 if (queue == nullptr) {
297 HILOGW("Dump is unavailable.");
298 return;
299 }
300 int ret = ffrt_queue_dump(*queue, dumper.GetTag().c_str(), chars.get(), MAX_DUMP_INFO_LENGTH, true);
301 if (ret > 0) {
302 dumper.Dump(chars.get());
303 }
304 }
305
DumpQueueInfo(std::string & queueInfo)306 void EventQueueFFRT::DumpQueueInfo(std::string& queueInfo)
307 {
308 std::lock_guard<std::mutex> lock(queueLock_);
309 if (!usable_.load()) {
310 HILOGW("EventQueue is unavailable.");
311 return;
312 }
313
314 std::unique_ptr<char[]> chars = std::make_unique<char[]>(MAX_DUMP_INFO_LENGTH);
315 if (chars == nullptr) {
316 return;
317 }
318 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
319 if (queue == nullptr) {
320 HILOGW("DumpQueueInfo is unavailable.");
321 return;
322 }
323 int ret = ffrt_queue_dump(*queue, "", chars.get(), MAX_DUMP_INFO_LENGTH, false);
324 if (ret > 0) {
325 queueInfo.append(chars.get());
326 }
327 }
328
IsIdle()329 bool EventQueueFFRT::IsIdle()
330 {
331 std::lock_guard<std::mutex> lock(queueLock_);
332 if (!usable_.load()) {
333 HILOGW("EventQueue is unavailable.");
334 return false;
335 }
336
337 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
338 if (queue == nullptr) {
339 HILOGW("IsIdle is unavailable.");
340 return false;
341 }
342 return ffrt_queue_is_idle(*queue);
343 }
344
IsQueueEmpty()345 bool EventQueueFFRT::IsQueueEmpty()
346 {
347 std::lock_guard<std::mutex> lock(queueLock_);
348 if (!usable_.load()) {
349 HILOGW("EventQueue is unavailable.");
350 return false;
351 }
352
353 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
354 if (queue == nullptr) {
355 HILOGW("IsIdle is unavailable.");
356 return false;
357 }
358 uint32_t queueNum = static_cast<uint32_t>(Priority::IDLE);
359 for (uint32_t i = 0; i < queueNum; i++) {
360 Priority priority = static_cast<Priority>(i);
361 ffrt_inner_queue_priority_t innerPriority = TransferInnerPriority(priority);
362 int size = ffrt_queue_size_dump(*queue, innerPriority);
363 if (size > 0) {
364 return false;
365 }
366 }
367 return true;
368 }
369
DumpCurrentQueueSize()370 std::string EventQueueFFRT::DumpCurrentQueueSize()
371 {
372 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
373 if (queue == nullptr) {
374 HILOGW("DumpCurrentQueueSize is unavailable.");
375 return "";
376 }
377
378 std::string dumpInfo = "Current queue size: ";
379 std::string prioritys[] = {"VIP = ", ", IMMEDIATE = ", ", HIGH =", ", LOW = ", ", IDLE = "};
380 uint32_t queueNum = static_cast<uint32_t>(Priority::IDLE);
381 for (uint32_t i = 0; i < queueNum; i++) {
382 dumpInfo += prioritys[i];
383 Priority priority = static_cast<Priority>(i);
384 ffrt_inner_queue_priority_t innerPriority = TransferInnerPriority(priority);
385 dumpInfo += std::to_string(ffrt_queue_size_dump(*queue, innerPriority));
386 }
387 dumpInfo += " ; ";
388 return dumpInfo;
389 }
390
HasPreferEvent(int basePrio)391 bool EventQueueFFRT::HasPreferEvent(int basePrio)
392 {
393 return false;
394 }
395
QueryPendingTaskInfo(int32_t fileDescriptor)396 PendingTaskInfo EventQueueFFRT::QueryPendingTaskInfo(int32_t fileDescriptor)
397 {
398 HILOGW("FFRT queue is not support.");
399 return PendingTaskInfo();
400 }
401
CancelAndWait()402 void EventQueueFFRT::CancelAndWait()
403 {
404 HILOGD("FFRT CancelAndWait enter.");
405 if (!usable_.load()) {
406 HILOGW("CancelAndWait - EventQueue is unavailable.");
407 return;
408 }
409 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
410 if (queue == nullptr) {
411 HILOGW("CancelAndWait - queue is unavailable.");
412 return;
413 }
414 ffrt_queue_cancel_and_wait(*queue);
415 }
416
GetFfrtQueue()417 void* EventQueueFFRT::GetFfrtQueue()
418 {
419 if (ffrtQueue_) {
420 return reinterpret_cast<void*>(ffrtQueue_.get());
421 }
422 return nullptr;
423 }
424
InsertSyncEvent(InnerEvent::Pointer & event,Priority priority,EventInsertType insertType)425 void EventQueueFFRT::InsertSyncEvent(InnerEvent::Pointer &event, Priority priority, EventInsertType insertType)
426 {
427 InsertEvent(event, priority, true, insertType);
428 }
429
InsertEvent(InnerEvent::Pointer & event,Priority priority,bool syncWait,EventInsertType insertType)430 void EventQueueFFRT::InsertEvent(InnerEvent::Pointer &event, Priority priority, bool syncWait,
431 EventInsertType insertType)
432 {
433 if (!event) {
434 HILOGE("Could not insert an invalid event");
435 return;
436 }
437 std::unique_lock<std::mutex> lock(queueLock_);
438 if (!usable_.load()) {
439 HILOGW("EventQueueFFRT is unavailable.");
440 return;
441 }
442
443 // taskname: handler Id | has task | inner event id | param | task name
444 std::string taskName = event->GetOwnerId() + "|" + (event->HasTask() ? "1" : "0") + "|" +
445 std::to_string(event->GetInnerEventId()) + "|" + std::to_string(event->GetParam()) +
446 "|" + event->GetTaskName();
447 HILOGD("Submit task %{public}s, %{public}d, %{public}d, %{public}d.", taskName.c_str(), priority,
448 insertType, syncWait);
449 if (insertType == EventInsertType::AT_FRONT) {
450 SubmitEventAtFront(event, priority, syncWait, taskName, lock);
451 } else {
452 SubmitEventAtEnd(event, priority, syncWait, taskName, lock);
453 }
454 }
455
456 // helper for move unique_ptr from lambda object to std::function object
457 template<class F>
MakeCopyableFunction(F && f)458 auto MakeCopyableFunction(F&& f)
459 {
460 using FType = std::decay_t<F>;
461 auto wrapper = std::make_shared<FType>(std::forward<F>(f));
462 return [wrapper]() { (*wrapper)(); };
463 }
464
SubmitEventAtEnd(InnerEvent::Pointer & event,Priority priority,bool syncWait,const std::string & taskName,std::unique_lock<std::mutex> & lock)465 void EventQueueFFRT::SubmitEventAtEnd(InnerEvent::Pointer &event, Priority priority, bool syncWait,
466 const std::string &taskName, std::unique_lock<std::mutex> &lock)
467 {
468 uint64_t time = event->GetDelayTime();
469 ffrt_queue_priority_t queuePriority = static_cast<ffrt_queue_priority_t>(TransferInnerPriority(priority));
470 std::function<void()> task = MakeCopyableFunction([ffrtEvent = std::move(event)]() {
471 auto handler = new (std::nothrow) std::shared_ptr<EventHandler>(ffrtEvent->GetOwner());
472 if (handler && (*handler)) {
473 ffrt_queue_t* queue = reinterpret_cast<ffrt_queue_t*>(
474 (*handler)->GetEventRunner()->GetEventQueue()->GetFfrtQueue());
475 if (queue != nullptr) {
476 ffrt_queue_set_eventhandler(*queue, (void*)handler);
477 }
478 (*handler)->DistributeEvent(ffrtEvent);
479 if (queue != nullptr) {
480 ffrt_queue_set_eventhandler(*queue, nullptr);
481 }
482 }
483 delete handler;
484 });
485
486 if (syncWait) {
487 ffrt::task_handle handle = ffrtQueue_->submit_h(task, ffrt::task_attr().name(taskName.c_str())
488 .delay(time * MILLI_TO_MICRO).priority(queuePriority));
489 lock.unlock();
490 ffrtQueue_->wait(handle);
491 } else {
492 ffrtQueue_->submit(task, ffrt::task_attr().name(taskName.c_str()).delay(time * MILLI_TO_MICRO).
493 priority(queuePriority));
494 }
495 }
496
SubmitEventAtFront(InnerEvent::Pointer & event,Priority priority,bool syncWait,const std::string & taskName,std::unique_lock<std::mutex> & lock)497 void EventQueueFFRT::SubmitEventAtFront(InnerEvent::Pointer &event, Priority priority, bool syncWait,
498 const std::string &taskName, std::unique_lock<std::mutex> &lock)
499 {
500 uint64_t time = event->GetDelayTime();
501 ffrt_queue_priority_t queuePriority = static_cast<ffrt_queue_priority_t>(TransferInnerPriority(priority));
502 ffrt_task_attr_t attribute;
503 (void)ffrt_task_attr_init(&attribute);
504 ffrt_task_attr_set_name(&attribute, taskName.c_str());
505 ffrt_task_attr_set_delay(&attribute, time * MILLI_TO_MICRO);
506 ffrt_task_attr_set_queue_priority(&attribute, queuePriority);
507
508 std::function<void()> task = MakeCopyableFunction([ffrtEvent = std::move(event)]() {
509 auto handler = new (std::nothrow) std::shared_ptr<EventHandler>(ffrtEvent->GetOwner());
510 if (handler && (*handler)) {
511 ffrt_queue_t* queue = reinterpret_cast<ffrt_queue_t*>(
512 (*handler)->GetEventRunner()->GetEventQueue()->GetFfrtQueue());
513 if (queue != nullptr) {
514 ffrt_queue_set_eventhandler(*queue, (void*)handler);
515 }
516 (*handler)->DistributeEvent(ffrtEvent);
517 if (queue != nullptr) {
518 ffrt_queue_set_eventhandler(*queue, nullptr);
519 }
520 }
521 delete handler;
522 });
523
524 ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
525 if (queue == nullptr) {
526 HILOGW("SubmitEventAtFront is unavailable.");
527 return;
528 }
529 ffrt_function_header_t* header = ffrt::create_function_wrapper(task, ffrt_function_kind_queue);
530 if (syncWait) {
531 ffrt::task_handle handle = ffrt_queue_submit_head_h(*queue, header, &attribute);
532 lock.unlock();
533 ffrtQueue_->wait(handle);
534 } else {
535 ffrt_queue_submit_head(*queue, header, &attribute);
536 }
537 }
538
539 } // namespace AppExecFwk
540 } // namespace OHOS
541