1 /* 2 * Copyright (C) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef RECORDER_TASK_QUEUE_H 17 #define RECORDER_TASK_QUEUE_H 18 19 #include <thread> 20 #include <condition_variable> 21 #include <mutex> 22 #include <functional> 23 #include <list> 24 #include <string> 25 #include <optional> 26 #include <type_traits> 27 #include "media_errors.h" 28 #include "nocopyable.h" 29 #include "qos.h" 30 31 namespace OHOS { 32 namespace Media { 33 /** 34 * Simple Generalized Task Queues for Easier Implementation of Asynchronous Programming Models 35 * 36 * You can refer to following examples to use this utility. 37 * 38 * Example 1: 39 * TaskQueue taskQ("your_task_queue_name"); 40 * taskQ.Start(); 41 * auto handler1 = std::make_shared<TaskHandler<int32_t>>([]() { 42 * // your job's detail code; 43 * }); 44 * taskQ.EnqueueTask(handler1); 45 * auto result = handler1->GetResult(); 46 * if (result.HasResult()) { 47 * MEDIA_LOGI("handler1 executed, result: %{public}d", result.Value()); 48 * } else { 49 * MEDIA_LOGI("handler1 not executed"); 50 * } 51 * 52 * Example 2: 53 * TaskQueue taskQ("your_task_queue_name"); 54 * taskQ.Start(); 55 * auto handler2 = std::make_shared<TaskHandler<void>>([]() { 56 * // your job's detail code; 57 * }); 58 * taskQ.EnqueueTask(handler2); 59 * auto result = handler2->GetResult(); 60 * if (result.HasResult()) { 61 * MEDIA_LOGI("handler2 executed"); 62 * } else { 63 * MEDIA_LOGI("handler2 not executed"); 64 * } 65 */ 66 67 class TaskQueue; 68 template <typename T> 69 class TaskHandler; 70 71 template <typename T> 72 struct TaskResult { HasResultTaskResult73 bool HasResult() 74 { 75 return val.has_value(); 76 } ValueTaskResult77 T Value() 78 { 79 return val.value(); 80 } 81 private: 82 friend class TaskHandler<T>; 83 std::optional<T> val; 84 }; 85 86 template <> 87 struct TaskResult<void> { 88 bool HasResult() 89 { 90 return executed; 91 } 92 private: 93 friend class TaskHandler<void>; 94 bool executed = false; 95 }; 96 97 class ITaskHandler { 98 public: 99 struct Attribute { 100 // periodic execute time, UINT64_MAX is not need to execute periodic. 101 uint64_t periodicTimeUs_ { UINT64_MAX }; 102 }; 103 virtual ~ITaskHandler() = default; 104 virtual void Execute() = 0; 105 virtual void Cancel() = 0; 106 virtual bool IsCanceled() = 0; 107 virtual Attribute GetAttribute() const = 0; 108 109 private: 110 // clear the internel executed or canceled state. 111 virtual void Clear() = 0; 112 friend class TaskQueue; 113 }; 114 115 template <typename T> 116 class TaskHandler : public ITaskHandler, public NoCopyable { 117 public: 118 TaskHandler(std::function<T(void)> task, ITaskHandler::Attribute attr = {}) : task_(task), attribute_(attr) {} 119 ~TaskHandler() = default; 120 121 void Execute() override 122 { 123 { 124 std::unique_lock<std::mutex> lock(mutex_); 125 if (state_ != TaskState::IDLE) { 126 return; 127 } 128 state_ = TaskState::RUNNING; 129 } 130 131 if constexpr (std::is_void_v<T>) { 132 task_(); 133 std::unique_lock<std::mutex> lock(mutex_); 134 state_ = TaskState::FINISHED; 135 result_.executed = true; 136 } else { 137 T result = task_(); 138 std::unique_lock<std::mutex> lock(mutex_); 139 state_ = TaskState::FINISHED; 140 result_.val = result; 141 } 142 cond_.notify_all(); 143 } 144 145 /* 146 * After the GetResult called, the last execute result will be clear 147 */ 148 TaskResult<T> GetResult() 149 { 150 std::unique_lock<std::mutex> lock(mutex_); 151 while ((state_ != TaskState::FINISHED) && (state_ != TaskState::CANCELED)) { 152 cond_.wait(lock); 153 } 154 155 return ClearResult(); 156 } 157 158 TaskResult<T> GetResultWithTimeLimit(uint32_t milliseconds) 159 { 160 std::unique_lock<std::mutex> lock(mutex_); 161 auto isFinished = cond_.wait_for(lock, std::chrono::milliseconds(milliseconds), 162 [this] { return state_ == TaskState::FINISHED || state_ == TaskState::CANCELED; }); 163 if (isFinished) { 164 auto res = ClearResult(); 165 if (state_ == TaskState::IDLE) { 166 return res; 167 } 168 } 169 TaskResult<T> tmp; 170 return tmp; 171 } 172 173 void Cancel() override 174 { 175 std::unique_lock<std::mutex> lock(mutex_); 176 if (state_ != RUNNING) { 177 state_ = TaskState::CANCELED; 178 cond_.notify_all(); 179 } 180 } 181 182 bool IsCanceled() override 183 { 184 std::unique_lock<std::mutex> lock(mutex_); 185 return state_ == TaskState::CANCELED; 186 } 187 188 ITaskHandler::Attribute GetAttribute() const override 189 { 190 return attribute_; 191 } 192 193 private: 194 TaskResult<T> ClearResult() 195 { 196 if (state_ == TaskState::FINISHED) { 197 state_ = TaskState::IDLE; 198 TaskResult<T> tmp; 199 if constexpr (std::is_void_v<T>) { 200 std::swap(tmp.executed, result_.executed); 201 } else { 202 result_.val.swap(tmp.val); 203 } 204 return tmp; 205 } 206 return result_; 207 } 208 209 void Clear() override 210 { 211 std::unique_lock<std::mutex> lock(mutex_); 212 (void)ClearResult(); 213 } 214 215 enum TaskState { 216 IDLE = 0, 217 RUNNING = 1, 218 CANCELED = 2, 219 FINISHED = 3, 220 }; 221 222 TaskState state_ = TaskState::IDLE; 223 std::mutex mutex_; 224 std::condition_variable cond_; 225 std::function<T(void)> task_; 226 TaskResult<T> result_; 227 ITaskHandler::Attribute attribute_; // task execute attribute. 228 }; 229 230 class __attribute__((visibility("default"))) TaskQueue : public NoCopyable { 231 public: 232 explicit TaskQueue(const std::string &name) : name_(name) {} 233 ~TaskQueue(); 234 235 int32_t Start(); 236 int32_t Stop() noexcept; 237 void SetQos(const OHOS::QOS::QosLevel level); 238 void ResetQos(); 239 bool IsTaskExecuting(); 240 241 // delayUs cannot be gt 10000000ULL. 242 __attribute__((no_sanitize("cfi"))) int32_t EnqueueTask(const std::shared_ptr<ITaskHandler> &task, 243 bool cancelNotExecuted = false, uint64_t delayUs = 0ULL); 244 245 private: 246 struct TaskHandlerItem { 247 std::shared_ptr<ITaskHandler> task_ { nullptr }; 248 uint64_t executeTimeNs_ { 0ULL }; 249 }; 250 __attribute__((no_sanitize("cfi"))) void TaskProcessor(); 251 __attribute__((no_sanitize("cfi"))) void CancelNotExecutedTaskLocked(); 252 253 bool isExit_ = true; 254 std::unique_ptr<std::thread> thread_; 255 std::list<TaskHandlerItem> taskList_; 256 std::mutex mutex_; 257 std::condition_variable cond_; 258 std::string name_; 259 pid_t tid_ = -1; 260 bool isTaskExecuting_ = false; 261 }; 262 } // namespace Media 263 } // namespace OHOS 264 #endif 265