1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #include "thread_pool.h" 17 #include "errors.h" 18 #include "utils_log.h" 19 20 #include <memory> 21 #include <pthread.h> 22 23 namespace OHOS { 24 ThreadPool(const std::string & name)25ThreadPool::ThreadPool(const std::string& name) 26 : myName_(name), maxTaskNum_(0), running_(false) 27 { 28 } 29 ~ThreadPool()30ThreadPool::~ThreadPool() 31 { 32 if (running_) { 33 Stop(); 34 } 35 } 36 Start(int numThreads)37uint32_t ThreadPool::Start(int numThreads) 38 { 39 if (!threads_.empty()) { 40 return ERR_INVALID_OPERATION; 41 } 42 43 if (numThreads <= 0) { 44 return ERR_INVALID_VALUE; 45 } 46 running_ = true; 47 threads_.reserve(numThreads); 48 49 for (int i = 0; i < numThreads; ++i) { 50 std::thread t([this] { this->WorkInThread(); }); 51 // Give the name of ThreadPool to threads created by the ThreadPool. 52 int err = pthread_setname_np(t.native_handle(), (myName_ + std::to_string(i)).c_str()); 53 if (err != 0) { 54 UTILS_LOGD("Failed to set name to thread. %{public}s", strerror(err)); 55 } 56 threads_.push_back(std::move(t)); 57 } 58 return ERR_OK; 59 } 60 Stop()61void ThreadPool::Stop() 62 { 63 { 64 std::unique_lock<std::mutex> lock(mutex_); 65 running_ = false; 66 hasTaskToDo_.notify_all(); 67 } 68 69 for (auto& e : threads_) { 70 e.join(); 71 } 72 } 73 AddTask(const Task & f)74void ThreadPool::AddTask(const Task &f) 75 { 76 if (threads_.empty()) { 77 f(); 78 } else { 79 std::unique_lock<std::mutex> lock(mutex_); 80 while (Overloaded()) { 81 acceptNewTask_.wait(lock); 82 } 83 84 tasks_.push_back(f); 85 hasTaskToDo_.notify_one(); 86 } 87 } 88 GetCurTaskNum()89size_t ThreadPool::GetCurTaskNum() 90 { 91 std::unique_lock<std::mutex> lock(mutex_); 92 return tasks_.size(); 93 } 94 95 ScheduleTask()96ThreadPool::Task ThreadPool::ScheduleTask() 97 { 98 std::unique_lock<std::mutex> lock(mutex_); 99 while (tasks_.empty() && running_) { 100 hasTaskToDo_.wait(lock); 101 } 102 103 Task task; 104 if (!tasks_.empty()) { 105 task = tasks_.front(); 106 tasks_.pop_front(); 107 108 if (maxTaskNum_ > 0) { 109 acceptNewTask_.notify_one(); 110 } 111 } 112 return task; 113 } 114 Overloaded() const115bool ThreadPool::Overloaded() const 116 { 117 return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_); 118 } 119 WorkInThread()120void ThreadPool::WorkInThread() 121 { 122 while (running_) { 123 Task task = ScheduleTask(); 124 if (task) { 125 task(); 126 } 127 } 128 } 129 130 } // namespace OHOS 131