/* * Copyright (C) 2021 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef RECORDER_TASK_QUEUE_H #define RECORDER_TASK_QUEUE_H #include #include #include #include #include #include #include #include #include "media_errors.h" #include "nocopyable.h" #include "qos.h" namespace OHOS { namespace Media { /** * Simple Generalized Task Queues for Easier Implementation of Asynchronous Programming Models * * You can refer to following examples to use this utility. * * Example 1: * TaskQueue taskQ("your_task_queue_name"); * taskQ.Start(); * auto handler1 = std::make_shared>([]() { * // your job's detail code; * }); * taskQ.EnqueueTask(handler1); * auto result = handler1->GetResult(); * if (result.HasResult()) { * MEDIA_LOGI("handler1 executed, result: %{public}d", result.Value()); * } else { * MEDIA_LOGI("handler1 not executed"); * } * * Example 2: * TaskQueue taskQ("your_task_queue_name"); * taskQ.Start(); * auto handler2 = std::make_shared>([]() { * // your job's detail code; * }); * taskQ.EnqueueTask(handler2); * auto result = handler2->GetResult(); * if (result.HasResult()) { * MEDIA_LOGI("handler2 executed"); * } else { * MEDIA_LOGI("handler2 not executed"); * } */ class TaskQueue; template class TaskHandler; template struct TaskResult { bool HasResult() { return val.has_value(); } T Value() { return val.value(); } private: friend class TaskHandler; std::optional val; }; template <> struct TaskResult { bool HasResult() { return executed; } private: friend class TaskHandler; bool executed = false; }; class ITaskHandler { public: struct Attribute { // periodic execute time, UINT64_MAX is not need to execute periodic. uint64_t periodicTimeUs_ { UINT64_MAX }; }; virtual ~ITaskHandler() = default; virtual void Execute() = 0; virtual void Cancel() = 0; virtual bool IsCanceled() = 0; virtual Attribute GetAttribute() const = 0; private: // clear the internel executed or canceled state. virtual void Clear() = 0; friend class TaskQueue; }; template class TaskHandler : public ITaskHandler, public NoCopyable { public: TaskHandler(std::function task, ITaskHandler::Attribute attr = {}) : task_(task), attribute_(attr) {} ~TaskHandler() = default; void Execute() override { { std::unique_lock lock(mutex_); if (state_ != TaskState::IDLE) { return; } state_ = TaskState::RUNNING; } if constexpr (std::is_void_v) { task_(); std::unique_lock lock(mutex_); state_ = TaskState::FINISHED; result_.executed = true; } else { T result = task_(); std::unique_lock lock(mutex_); state_ = TaskState::FINISHED; result_.val = result; } cond_.notify_all(); } /* * After the GetResult called, the last execute result will be clear */ TaskResult GetResult() { std::unique_lock lock(mutex_); while ((state_ != TaskState::FINISHED) && (state_ != TaskState::CANCELED)) { cond_.wait(lock); } return ClearResult(); } TaskResult GetResultWithTimeLimit(uint32_t milliseconds) { std::unique_lock lock(mutex_); auto isFinished = cond_.wait_for(lock, std::chrono::milliseconds(milliseconds), [this] { return state_ == TaskState::FINISHED || state_ == TaskState::CANCELED; }); if (isFinished) { auto res = ClearResult(); if (state_ == TaskState::IDLE) { return res; } } TaskResult tmp; return tmp; } void Cancel() override { std::unique_lock lock(mutex_); if (state_ != RUNNING) { state_ = TaskState::CANCELED; cond_.notify_all(); } } bool IsCanceled() override { std::unique_lock lock(mutex_); return state_ == TaskState::CANCELED; } ITaskHandler::Attribute GetAttribute() const override { return attribute_; } private: TaskResult ClearResult() { if (state_ == TaskState::FINISHED) { state_ = TaskState::IDLE; TaskResult tmp; if constexpr (std::is_void_v) { std::swap(tmp.executed, result_.executed); } else { result_.val.swap(tmp.val); } return tmp; } return result_; } void Clear() override { std::unique_lock lock(mutex_); (void)ClearResult(); } enum TaskState { IDLE = 0, RUNNING = 1, CANCELED = 2, FINISHED = 3, }; TaskState state_ = TaskState::IDLE; std::mutex mutex_; std::condition_variable cond_; std::function task_; TaskResult result_; ITaskHandler::Attribute attribute_; // task execute attribute. }; class __attribute__((visibility("default"))) TaskQueue : public NoCopyable { public: explicit TaskQueue(const std::string &name) : name_(name) {} ~TaskQueue(); int32_t Start(); int32_t Stop() noexcept; void SetQos(const OHOS::QOS::QosLevel level); void ResetQos(); bool IsTaskExecuting(); // delayUs cannot be gt 10000000ULL. __attribute__((no_sanitize("cfi"))) int32_t EnqueueTask(const std::shared_ptr &task, bool cancelNotExecuted = false, uint64_t delayUs = 0ULL); private: struct TaskHandlerItem { std::shared_ptr task_ { nullptr }; uint64_t executeTimeNs_ { 0ULL }; }; __attribute__((no_sanitize("cfi"))) void TaskProcessor(); __attribute__((no_sanitize("cfi"))) void CancelNotExecutedTaskLocked(); bool isExit_ = true; std::unique_ptr thread_; std::list taskList_; std::mutex mutex_; std::condition_variable cond_; std::string name_; pid_t tid_ = -1; bool isTaskExecuting_ = false; }; } // namespace Media } // namespace OHOS #endif