1 
2 /*
3  * Copyright (c) 2021-2023 Huawei Device Co., Ltd.
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "event_queue_ffrt.h"
18 
19 #include "ffrt_inner.h"
20 #include "event_logger.h"
21 #include "event_handler.h"
22 
23 namespace OHOS {
24 namespace AppExecFwk {
25 namespace {
26 
27 DEFINE_EH_HILOG_LABEL("EventQueueFFRT");
28 constexpr static uint32_t MAX_DUMP_INFO_LENGTH = 120000;
29 constexpr static uint32_t MILLI_TO_MICRO = 1000;
30 static constexpr int FFRT_REMOVE_SUCC = 0;
TransferInnerPriority(EventQueue::Priority priority)31 ffrt_inner_queue_priority_t TransferInnerPriority(EventQueue::Priority priority)
32 {
33     ffrt_inner_queue_priority_t innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_low;
34     switch (priority) {
35         case EventQueue::Priority::VIP:
36             innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_vip;
37             break;
38         case EventQueue::Priority::IMMEDIATE:
39             innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_immediate;
40             break;
41         case EventQueue::Priority::HIGH:
42             innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_high;
43             break;
44         case EventQueue::Priority::LOW:
45             innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_low;
46             break;
47         case EventQueue::Priority::IDLE:
48             innerPriority = ffrt_inner_queue_priority_t::ffrt_inner_queue_priority_idle;
49             break;
50         default:
51             break;
52     }
53     return innerPriority;
54 }
55 
TransferQueuePtr(std::shared_ptr<ffrt::queue> queue)56 inline ffrt_queue_t* TransferQueuePtr(std::shared_ptr<ffrt::queue> queue)
57 {
58     if (queue) {
59         return reinterpret_cast<ffrt_queue_t*>(queue.get());
60     }
61     return nullptr;
62 }
63 }  // unnamed namespace
64 
EventQueueFFRT()65 EventQueueFFRT::EventQueueFFRT() : EventQueue()
66 {
67     // Destructed queue in the ffrt task needs to be switched to asynchronous to avoid parsing deadlocks
68     ffrtQueue_ = std::shared_ptr<ffrt::queue>(new ffrt::queue(static_cast<ffrt::queue_type>(
69         ffrt_inner_queue_type_t::ffrt_queue_eventhandler_adapter), "EventHandler_QUEUE"), [](ffrt::queue* ptr) {
70         if (ffrt_this_task_get_id()) {
71             ffrt::submit([ptr]() { delete ptr; });
72         } else {
73             delete ptr;
74         }
75     });
76     HILOGD("Event queue ffrt");
77 }
78 
EventQueueFFRT(const std::shared_ptr<IoWaiter> & ioWaiter)79 EventQueueFFRT::EventQueueFFRT(const std::shared_ptr<IoWaiter> &ioWaiter): EventQueue(ioWaiter)
80 {
81     // Destructed queue in the ffrt task needs to be switched to asynchronous to avoid parsing deadlocks
82     ffrtQueue_ = std::shared_ptr<ffrt::queue>(new ffrt::queue(static_cast<ffrt::queue_type>(
83         ffrt_inner_queue_type_t::ffrt_queue_eventhandler_adapter), "EventHandler_QUEUE"), [](ffrt::queue* ptr) {
84         if (ffrt_this_task_get_id()) {
85             ffrt::submit([ptr]() { delete ptr; });
86         } else {
87             delete ptr;
88         }
89     });
90     HILOGD("Event queue ffrt");
91 }
92 
~EventQueueFFRT()93 EventQueueFFRT::~EventQueueFFRT()
94 {
95     std::lock_guard<std::mutex> lock(queueLock_);
96     usable_.store(false);
97     ioWaiter_ = nullptr;
98     EH_LOGI_LIMIT("EventQueueFFRT is unavailable hence");
99 }
100 
Insert(InnerEvent::Pointer & event,Priority priority,EventInsertType insertType)101 void EventQueueFFRT::Insert(InnerEvent::Pointer &event, Priority priority, EventInsertType insertType)
102 {
103     InsertEvent(event, priority, false, insertType);
104 }
105 
RemoveOrphanByHandlerId(const std::string & handlerId)106 void EventQueueFFRT::RemoveOrphanByHandlerId(const std::string& handlerId)
107 {
108     // taskname: handler Id | has task | inner event id | param | task name
109     std::string regular = handlerId + "\\|.*";
110     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
111     if (queue == nullptr) {
112         HILOGW("Remove is unavailable.");
113         return;
114     }
115     ffrt_queue_cancel_by_name(*queue, regular.c_str());
116     std::lock_guard<std::mutex> lock(queueLock_);
117     if (!usable_.load()) {
118         HILOGW("EventQueueFFRT is unavailable.");
119         return;
120     }
121     RemoveInvalidFileDescriptor();
122 }
123 
124 
RemoveAll()125 void EventQueueFFRT::RemoveAll()
126 {
127     HILOGD("RemoveAll");
128     std::lock_guard<std::mutex> lock(queueLock_);
129     if (!usable_.load()) {
130         HILOGW("EventQueueFFRT is unavailable.");
131         return;
132     }
133     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
134     if (queue == nullptr) {
135         HILOGW("RemoveAll is unavailable.");
136         return;
137     }
138     ffrt_queue_cancel_all(*queue);
139 }
140 
Remove(const std::shared_ptr<EventHandler> & owner)141 void EventQueueFFRT::Remove(const std::shared_ptr<EventHandler> &owner)
142 {
143     HILOGD("Remove");
144     if (!owner) {
145         HILOGE("Invalid owner");
146         return;
147     }
148 
149     std::lock_guard<std::mutex> lock(queueLock_);
150     if (!usable_.load()) {
151         HILOGW("EventQueueFFRT is unavailable.");
152         return;
153     }
154 
155     // taskname: handler Id | has task | inner event id | param | task name
156     std::string regular = owner->GetHandlerId() + "\\|.*";
157     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
158     if (queue == nullptr) {
159         HILOGW("Remove is unavailable.");
160         return;
161     }
162     ffrt_queue_cancel_by_name(*queue, regular.c_str());
163 }
164 
Remove(const std::shared_ptr<EventHandler> & owner,uint32_t innerEventId)165 void EventQueueFFRT::Remove(const std::shared_ptr<EventHandler> &owner, uint32_t innerEventId)
166 {
167     HILOGD("Remove");
168     if (!owner) {
169         HILOGE("Invalid owner");
170         return;
171     }
172 
173     std::lock_guard<std::mutex> lock(queueLock_);
174     if (!usable_.load()) {
175         HILOGW("EventQueueFFRT is unavailable.");
176         return;
177     }
178 
179     // taskname: handler Id | has task | inner event id | param | task name
180     std::string regular = owner->GetHandlerId() + "\\|0\\|" + std::to_string(innerEventId) + "\\|.*";
181     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
182     if (queue == nullptr) {
183         HILOGW("Remove is unavailable.");
184         return;
185     }
186     ffrt_queue_cancel_by_name(*queue, regular.c_str());
187 }
188 
Remove(const std::shared_ptr<EventHandler> & owner,uint32_t innerEventId,int64_t param)189 void EventQueueFFRT::Remove(const std::shared_ptr<EventHandler> &owner, uint32_t innerEventId, int64_t param)
190 {
191     HILOGD("Remove");
192     if (!owner) {
193         HILOGE("Invalid owner");
194         return;
195     }
196 
197     std::lock_guard<std::mutex> lock(queueLock_);
198     if (!usable_.load()) {
199         HILOGW("EventQueueFFRT is unavailable.");
200         return;
201     }
202 
203     // taskname: handler Id | has task | inner event id | param | task name
204     std::string regular = owner->GetHandlerId() + "\\|0\\|" + std::to_string(innerEventId) + "\\|" +
205         std::to_string(param) + "\\|.*";
206     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
207     if (queue == nullptr) {
208         HILOGW("Remove is unavailable.");
209         return;
210     }
211     ffrt_queue_cancel_by_name(*queue, regular.c_str());
212 }
213 
Remove(const std::shared_ptr<EventHandler> & owner,const std::string & name)214 bool EventQueueFFRT::Remove(const std::shared_ptr<EventHandler> &owner, const std::string &name)
215 {
216     HILOGD("Remove");
217     if (!owner) {
218         HILOGE("Invalid owner");
219         return false;
220     }
221 
222     std::lock_guard<std::mutex> lock(queueLock_);
223     if (!usable_.load()) {
224         HILOGW("EventQueueFFRT is unavailable.");
225         return false;
226     }
227 
228     // taskname: handler Id | has task | inner event id | param | task name
229     std::string regular = owner->GetHandlerId() + "\\|1\\|" + ".*\\|" + name;
230     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
231     if (queue == nullptr) {
232         HILOGW("Remove is unavailable.");
233         return false;
234     }
235     int ret = ffrt_queue_cancel_by_name(*queue, regular.c_str());
236     return ret == FFRT_REMOVE_SUCC ? true : false;
237 }
238 
HasInnerEvent(const std::shared_ptr<EventHandler> & owner,uint32_t innerEventId)239 bool EventQueueFFRT::HasInnerEvent(const std::shared_ptr<EventHandler> &owner, uint32_t innerEventId)
240 {
241     if (!owner) {
242         HILOGE("Invalid owner");
243         return false;
244     }
245     std::lock_guard<std::mutex> lock(queueLock_);
246     if (!usable_.load()) {
247         HILOGW("EventQueueFFRT is unavailable.");
248         return false;
249     }
250 
251     // taskname: handler Id | has task | inner event id | param | task name
252     std::string regular = owner->GetHandlerId() + "\\|0\\|" + std::to_string(innerEventId) + "\\|.*";
253     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
254     if (queue == nullptr) {
255         HILOGW("Remove is unavailable.");
256         return false;
257     }
258     return ffrt_queue_has_task(*queue, regular.c_str());
259 }
260 
HasInnerEvent(const std::shared_ptr<EventHandler> & owner,int64_t param)261 bool EventQueueFFRT::HasInnerEvent(const std::shared_ptr<EventHandler> &owner, int64_t param)
262 {
263     if (!owner) {
264         HILOGE("Invalid owner");
265         return false;
266     }
267     std::lock_guard<std::mutex> lock(queueLock_);
268     if (!usable_.load()) {
269         HILOGW("EventQueueFFRT is unavailable.");
270         return false;
271     }
272 
273     // taskname: handler Id | has task | inner event id | param | task name
274     std::string regular = owner->GetHandlerId() + "\\|0\\|.*" + std::to_string(param) + "\\|.*";
275     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
276     if (queue == nullptr) {
277         HILOGW("Remove is unavailable.");
278         return false;
279     }
280     return ffrt_queue_has_task(*queue, regular.c_str());
281 }
282 
Dump(Dumper & dumper)283 void EventQueueFFRT::Dump(Dumper &dumper)
284 {
285     std::lock_guard<std::mutex> lock(queueLock_);
286     if (!usable_.load()) {
287         HILOGW("EventQueue is unavailable.");
288         return;
289     }
290 
291     std::unique_ptr<char[]> chars = std::make_unique<char[]>(MAX_DUMP_INFO_LENGTH);
292     if (chars == nullptr) {
293         return;
294     }
295     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
296     if (queue == nullptr) {
297         HILOGW("Dump is unavailable.");
298         return;
299     }
300     int ret = ffrt_queue_dump(*queue, dumper.GetTag().c_str(), chars.get(), MAX_DUMP_INFO_LENGTH, true);
301     if (ret > 0) {
302         dumper.Dump(chars.get());
303     }
304 }
305 
DumpQueueInfo(std::string & queueInfo)306 void EventQueueFFRT::DumpQueueInfo(std::string& queueInfo)
307 {
308     std::lock_guard<std::mutex> lock(queueLock_);
309     if (!usable_.load()) {
310         HILOGW("EventQueue is unavailable.");
311         return;
312     }
313 
314     std::unique_ptr<char[]> chars = std::make_unique<char[]>(MAX_DUMP_INFO_LENGTH);
315     if (chars == nullptr) {
316         return;
317     }
318     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
319     if (queue == nullptr) {
320         HILOGW("DumpQueueInfo is unavailable.");
321         return;
322     }
323     int ret = ffrt_queue_dump(*queue, "", chars.get(), MAX_DUMP_INFO_LENGTH, false);
324     if (ret > 0) {
325         queueInfo.append(chars.get());
326     }
327 }
328 
IsIdle()329 bool EventQueueFFRT::IsIdle()
330 {
331     std::lock_guard<std::mutex> lock(queueLock_);
332     if (!usable_.load()) {
333         HILOGW("EventQueue is unavailable.");
334         return false;
335     }
336 
337     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
338     if (queue == nullptr) {
339         HILOGW("IsIdle is unavailable.");
340         return false;
341     }
342     return ffrt_queue_is_idle(*queue);
343 }
344 
IsQueueEmpty()345 bool EventQueueFFRT::IsQueueEmpty()
346 {
347     std::lock_guard<std::mutex> lock(queueLock_);
348     if (!usable_.load()) {
349         HILOGW("EventQueue is unavailable.");
350         return false;
351     }
352 
353     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
354     if (queue == nullptr) {
355         HILOGW("IsIdle is unavailable.");
356         return false;
357     }
358     uint32_t queueNum = static_cast<uint32_t>(Priority::IDLE);
359     for (uint32_t i = 0; i < queueNum; i++) {
360         Priority priority = static_cast<Priority>(i);
361         ffrt_inner_queue_priority_t innerPriority = TransferInnerPriority(priority);
362         int size = ffrt_queue_size_dump(*queue, innerPriority);
363         if (size > 0) {
364             return false;
365         }
366     }
367     return true;
368 }
369 
DumpCurrentQueueSize()370 std::string EventQueueFFRT::DumpCurrentQueueSize()
371 {
372     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
373     if (queue == nullptr) {
374         HILOGW("DumpCurrentQueueSize is unavailable.");
375         return "";
376     }
377 
378     std::string dumpInfo = "Current queue size: ";
379     std::string prioritys[] = {"VIP = ", ", IMMEDIATE = ", ", HIGH =", ", LOW = ", ", IDLE = "};
380     uint32_t queueNum = static_cast<uint32_t>(Priority::IDLE);
381     for (uint32_t i = 0; i < queueNum; i++) {
382         dumpInfo += prioritys[i];
383         Priority priority = static_cast<Priority>(i);
384         ffrt_inner_queue_priority_t innerPriority = TransferInnerPriority(priority);
385         dumpInfo += std::to_string(ffrt_queue_size_dump(*queue, innerPriority));
386     }
387     dumpInfo += " ; ";
388     return dumpInfo;
389 }
390 
HasPreferEvent(int basePrio)391 bool EventQueueFFRT::HasPreferEvent(int basePrio)
392 {
393     return false;
394 }
395 
QueryPendingTaskInfo(int32_t fileDescriptor)396 PendingTaskInfo EventQueueFFRT::QueryPendingTaskInfo(int32_t fileDescriptor)
397 {
398     HILOGW("FFRT queue is not support.");
399     return PendingTaskInfo();
400 }
401 
CancelAndWait()402 void EventQueueFFRT::CancelAndWait()
403 {
404     HILOGD("FFRT CancelAndWait enter.");
405     if (!usable_.load()) {
406         HILOGW("CancelAndWait - EventQueue is unavailable.");
407         return;
408     }
409     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
410     if (queue == nullptr) {
411         HILOGW("CancelAndWait - queue is unavailable.");
412         return;
413     }
414     ffrt_queue_cancel_and_wait(*queue);
415 }
416 
GetFfrtQueue()417 void* EventQueueFFRT::GetFfrtQueue()
418 {
419     if (ffrtQueue_) {
420         return reinterpret_cast<void*>(ffrtQueue_.get());
421     }
422     return nullptr;
423 }
424 
InsertSyncEvent(InnerEvent::Pointer & event,Priority priority,EventInsertType insertType)425 void EventQueueFFRT::InsertSyncEvent(InnerEvent::Pointer &event, Priority priority, EventInsertType insertType)
426 {
427     InsertEvent(event, priority, true, insertType);
428 }
429 
InsertEvent(InnerEvent::Pointer & event,Priority priority,bool syncWait,EventInsertType insertType)430 void EventQueueFFRT::InsertEvent(InnerEvent::Pointer &event, Priority priority, bool syncWait,
431     EventInsertType insertType)
432 {
433     if (!event) {
434         HILOGE("Could not insert an invalid event");
435         return;
436     }
437     std::unique_lock<std::mutex> lock(queueLock_);
438     if (!usable_.load()) {
439         HILOGW("EventQueueFFRT is unavailable.");
440         return;
441     }
442 
443     // taskname: handler Id | has task | inner event id | param | task name
444     std::string taskName = event->GetOwnerId() + "|" + (event->HasTask() ? "1" : "0") + "|" +
445         std::to_string(event->GetInnerEventId()) + "|" + std::to_string(event->GetParam()) +
446         "|" + event->GetTaskName();
447     HILOGD("Submit task %{public}s, %{public}d, %{public}d, %{public}d.", taskName.c_str(), priority,
448         insertType, syncWait);
449     if (insertType == EventInsertType::AT_FRONT) {
450         SubmitEventAtFront(event, priority, syncWait, taskName, lock);
451     } else {
452         SubmitEventAtEnd(event, priority, syncWait, taskName, lock);
453     }
454 }
455 
456 // helper for move unique_ptr from lambda object to std::function object
457 template<class F>
MakeCopyableFunction(F && f)458 auto MakeCopyableFunction(F&& f)
459 {
460     using FType = std::decay_t<F>;
461     auto wrapper = std::make_shared<FType>(std::forward<F>(f));
462     return [wrapper]() { (*wrapper)(); };
463 }
464 
SubmitEventAtEnd(InnerEvent::Pointer & event,Priority priority,bool syncWait,const std::string & taskName,std::unique_lock<std::mutex> & lock)465 void EventQueueFFRT::SubmitEventAtEnd(InnerEvent::Pointer &event, Priority priority, bool syncWait,
466     const std::string &taskName, std::unique_lock<std::mutex> &lock)
467 {
468     uint64_t time = event->GetDelayTime();
469     ffrt_queue_priority_t queuePriority = static_cast<ffrt_queue_priority_t>(TransferInnerPriority(priority));
470     std::function<void()> task = MakeCopyableFunction([ffrtEvent = std::move(event)]() {
471         auto handler = new (std::nothrow) std::shared_ptr<EventHandler>(ffrtEvent->GetOwner());
472         if (handler && (*handler)) {
473             ffrt_queue_t* queue = reinterpret_cast<ffrt_queue_t*>(
474                 (*handler)->GetEventRunner()->GetEventQueue()->GetFfrtQueue());
475             if (queue != nullptr) {
476                 ffrt_queue_set_eventhandler(*queue, (void*)handler);
477             }
478             (*handler)->DistributeEvent(ffrtEvent);
479             if (queue != nullptr) {
480                 ffrt_queue_set_eventhandler(*queue, nullptr);
481             }
482         }
483         delete handler;
484     });
485 
486     if (syncWait) {
487         ffrt::task_handle handle = ffrtQueue_->submit_h(task, ffrt::task_attr().name(taskName.c_str())
488             .delay(time * MILLI_TO_MICRO).priority(queuePriority));
489         lock.unlock();
490         ffrtQueue_->wait(handle);
491     } else {
492         ffrtQueue_->submit(task, ffrt::task_attr().name(taskName.c_str()).delay(time * MILLI_TO_MICRO).
493             priority(queuePriority));
494     }
495 }
496 
SubmitEventAtFront(InnerEvent::Pointer & event,Priority priority,bool syncWait,const std::string & taskName,std::unique_lock<std::mutex> & lock)497 void EventQueueFFRT::SubmitEventAtFront(InnerEvent::Pointer &event, Priority priority, bool syncWait,
498     const std::string &taskName, std::unique_lock<std::mutex> &lock)
499 {
500     uint64_t time = event->GetDelayTime();
501     ffrt_queue_priority_t queuePriority = static_cast<ffrt_queue_priority_t>(TransferInnerPriority(priority));
502     ffrt_task_attr_t attribute;
503     (void)ffrt_task_attr_init(&attribute);
504     ffrt_task_attr_set_name(&attribute, taskName.c_str());
505     ffrt_task_attr_set_delay(&attribute, time * MILLI_TO_MICRO);
506     ffrt_task_attr_set_queue_priority(&attribute, queuePriority);
507 
508     std::function<void()> task = MakeCopyableFunction([ffrtEvent = std::move(event)]() {
509         auto handler = new (std::nothrow) std::shared_ptr<EventHandler>(ffrtEvent->GetOwner());
510         if (handler && (*handler)) {
511             ffrt_queue_t* queue = reinterpret_cast<ffrt_queue_t*>(
512                 (*handler)->GetEventRunner()->GetEventQueue()->GetFfrtQueue());
513             if (queue != nullptr) {
514                 ffrt_queue_set_eventhandler(*queue, (void*)handler);
515             }
516             (*handler)->DistributeEvent(ffrtEvent);
517             if (queue != nullptr) {
518                 ffrt_queue_set_eventhandler(*queue, nullptr);
519             }
520         }
521         delete handler;
522     });
523 
524     ffrt_queue_t* queue = TransferQueuePtr(ffrtQueue_);
525     if (queue == nullptr) {
526         HILOGW("SubmitEventAtFront is unavailable.");
527         return;
528     }
529     ffrt_function_header_t* header = ffrt::create_function_wrapper(task, ffrt_function_kind_queue);
530     if (syncWait) {
531         ffrt::task_handle handle = ffrt_queue_submit_head_h(*queue, header, &attribute);
532         lock.unlock();
533         ffrtQueue_->wait(handle);
534     } else {
535         ffrt_queue_submit_head(*queue, header, &attribute);
536     }
537 }
538 
539 }  // namespace AppExecFwk
540 }  // namespace OHOS
541