1 /*
2  * Copyright (C) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef RECORDER_TASK_QUEUE_H
17 #define RECORDER_TASK_QUEUE_H
18 
19 #include <thread>
20 #include <condition_variable>
21 #include <mutex>
22 #include <functional>
23 #include <list>
24 #include <string>
25 #include <optional>
26 #include <type_traits>
27 #include "media_errors.h"
28 #include "nocopyable.h"
29 #include "qos.h"
30 
31 namespace OHOS {
32 namespace Media {
33 /**
34  * Simple Generalized Task Queues for Easier Implementation of Asynchronous Programming Models
35  *
36  * You can refer to following examples to use this utility.
37  *
38  * Example 1:
39  * TaskQueue taskQ("your_task_queue_name");
40  * taskQ.Start();
41  * auto handler1 = std::make_shared<TaskHandler<int32_t>>([]() {
42  *     // your job's detail code;
43  * });
44  * taskQ.EnqueueTask(handler1);
45  * auto result = handler1->GetResult();
46  * if (result.HasResult()) {
47  *     MEDIA_LOGI("handler1 executed, result: %{public}d", result.Value());
48  * } else {
49  *     MEDIA_LOGI("handler1 not executed");
50  * }
51  *
52  * Example 2:
53  * TaskQueue taskQ("your_task_queue_name");
54  * taskQ.Start();
55  * auto handler2 = std::make_shared<TaskHandler<void>>([]() {
56  *     // your job's detail code;
57  * });
58  * taskQ.EnqueueTask(handler2);
59  * auto result = handler2->GetResult();
60  * if (result.HasResult()) {
61  *     MEDIA_LOGI("handler2 executed");
62  * } else {
63  *     MEDIA_LOGI("handler2 not executed");
64  * }
65  */
66 
67 class TaskQueue;
68 template <typename T>
69 class TaskHandler;
70 
71 template <typename T>
72 struct TaskResult {
HasResultTaskResult73     bool HasResult()
74     {
75         return val.has_value();
76     }
ValueTaskResult77     T Value()
78     {
79         return val.value();
80     }
81 private:
82     friend class TaskHandler<T>;
83     std::optional<T> val;
84 };
85 
86 template <>
87 struct TaskResult<void> {
88     bool HasResult()
89     {
90         return executed;
91     }
92 private:
93     friend class TaskHandler<void>;
94     bool executed = false;
95 };
96 
97 class ITaskHandler {
98 public:
99     struct Attribute {
100         // periodic execute time, UINT64_MAX is not need to execute periodic.
101         uint64_t periodicTimeUs_ { UINT64_MAX };
102     };
103     virtual ~ITaskHandler() = default;
104     virtual void Execute() = 0;
105     virtual void Cancel() = 0;
106     virtual bool IsCanceled() = 0;
107     virtual Attribute GetAttribute() const = 0;
108 
109 private:
110     // clear the internel executed or canceled state.
111     virtual void Clear() = 0;
112     friend class TaskQueue;
113 };
114 
115 template <typename T>
116 class TaskHandler : public ITaskHandler, public NoCopyable {
117 public:
118     TaskHandler(std::function<T(void)> task, ITaskHandler::Attribute attr = {}) : task_(task), attribute_(attr) {}
119     ~TaskHandler() = default;
120 
121     void Execute() override
122     {
123         {
124             std::unique_lock<std::mutex> lock(mutex_);
125             if (state_ != TaskState::IDLE) {
126                 return;
127             }
128             state_ = TaskState::RUNNING;
129         }
130 
131         if constexpr (std::is_void_v<T>) {
132             task_();
133             std::unique_lock<std::mutex> lock(mutex_);
134             state_ = TaskState::FINISHED;
135             result_.executed = true;
136         } else {
137             T result = task_();
138             std::unique_lock<std::mutex> lock(mutex_);
139             state_ = TaskState::FINISHED;
140             result_.val = result;
141         }
142         cond_.notify_all();
143     }
144 
145     /*
146      * After the GetResult called, the last execute result will be clear
147      */
148     TaskResult<T> GetResult()
149     {
150         std::unique_lock<std::mutex> lock(mutex_);
151         while ((state_ != TaskState::FINISHED) && (state_ != TaskState::CANCELED)) {
152             cond_.wait(lock);
153         }
154 
155         return ClearResult();
156     }
157 
158     TaskResult<T> GetResultWithTimeLimit(uint32_t milliseconds)
159     {
160         std::unique_lock<std::mutex> lock(mutex_);
161         auto isFinished = cond_.wait_for(lock, std::chrono::milliseconds(milliseconds),
162             [this] { return state_ == TaskState::FINISHED || state_ == TaskState::CANCELED; });
163         if (isFinished) {
164             auto res = ClearResult();
165             if (state_ == TaskState::IDLE) {
166                 return res;
167             }
168         }
169         TaskResult<T> tmp;
170         return tmp;
171     }
172 
173     void Cancel() override
174     {
175         std::unique_lock<std::mutex> lock(mutex_);
176         if (state_ != RUNNING) {
177             state_ = TaskState::CANCELED;
178             cond_.notify_all();
179         }
180     }
181 
182     bool IsCanceled() override
183     {
184         std::unique_lock<std::mutex> lock(mutex_);
185         return state_ == TaskState::CANCELED;
186     }
187 
188     ITaskHandler::Attribute GetAttribute() const override
189     {
190         return attribute_;
191     }
192 
193 private:
194     TaskResult<T> ClearResult()
195     {
196         if (state_ == TaskState::FINISHED) {
197             state_ = TaskState::IDLE;
198             TaskResult<T> tmp;
199             if constexpr (std::is_void_v<T>) {
200                 std::swap(tmp.executed, result_.executed);
201             } else {
202                 result_.val.swap(tmp.val);
203             }
204             return tmp;
205         }
206         return result_;
207     }
208 
209     void Clear() override
210     {
211         std::unique_lock<std::mutex> lock(mutex_);
212         (void)ClearResult();
213     }
214 
215     enum TaskState {
216         IDLE = 0,
217         RUNNING = 1,
218         CANCELED = 2,
219         FINISHED = 3,
220     };
221 
222     TaskState state_ = TaskState::IDLE;
223     std::mutex mutex_;
224     std::condition_variable cond_;
225     std::function<T(void)> task_;
226     TaskResult<T> result_;
227     ITaskHandler::Attribute attribute_; // task execute attribute.
228 };
229 
230 class __attribute__((visibility("default"))) TaskQueue : public NoCopyable {
231 public:
232     explicit TaskQueue(const std::string &name) : name_(name) {}
233     ~TaskQueue();
234 
235     int32_t Start();
236     int32_t Stop() noexcept;
237     void SetQos(const OHOS::QOS::QosLevel level);
238     void ResetQos();
239     bool IsTaskExecuting();
240 
241     // delayUs cannot be gt 10000000ULL.
242     __attribute__((no_sanitize("cfi"))) int32_t EnqueueTask(const std::shared_ptr<ITaskHandler> &task,
243         bool cancelNotExecuted = false, uint64_t delayUs = 0ULL);
244 
245 private:
246     struct TaskHandlerItem {
247         std::shared_ptr<ITaskHandler> task_ { nullptr };
248         uint64_t executeTimeNs_ { 0ULL };
249     };
250     __attribute__((no_sanitize("cfi"))) void TaskProcessor();
251     __attribute__((no_sanitize("cfi"))) void CancelNotExecutedTaskLocked();
252 
253     bool isExit_ = true;
254     std::unique_ptr<std::thread> thread_;
255     std::list<TaskHandlerItem> taskList_;
256     std::mutex mutex_;
257     std::condition_variable cond_;
258     std::string name_;
259     pid_t tid_ = -1;
260     bool isTaskExecuting_ = false;
261 };
262 } // namespace Media
263 } // namespace OHOS
264 #endif
265