1 /* 2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef HISTREAMER_FOUNDATION_BUFFER_POOL_H 17 #define HISTREAMER_FOUNDATION_BUFFER_POOL_H 18 19 #include <atomic> 20 #include <list> 21 #include <memory> 22 #include "foundation/osal/thread/condition_variable.h" 23 #include "foundation/osal/thread/mutex.h" 24 #include "foundation/osal/thread/scoped_lock.h" 25 #include "foundation/utils/constants.h" 26 #include "plugin/common/plugin_buffer.h" 27 28 namespace OHOS { 29 namespace Media { 30 template <typename T> 31 class BufferPool : public std::enable_shared_from_this<BufferPool<T>> { 32 public: 33 explicit BufferPool<T>(size_t poolSize) : poolSize_(poolSize), isActive_(true), allocInProgress(false) 34 { 35 } 36 37 BufferPool<T>(const BufferPool<T>&) = delete; 38 39 BufferPool<T>& operator=(const BufferPool<T>&) = delete; 40 ~BufferPool()41 ~BufferPool<T>() 42 { 43 isActive_ = false; 44 { 45 OSAL::ScopedLock lock(mutex_); 46 cv_.NotifyOne(); 47 } 48 FinishAllocInProgress(); 49 } 50 Create(size_t poolSize)51 static std::shared_ptr<BufferPool<T>> Create(size_t poolSize) 52 { 53 return std::make_shared<BufferPool<T>>(poolSize); 54 } 55 56 void Init(size_t msgSize = DEFAULT_FRAME_SIZE, Plugin::BufferMetaType type = Plugin::BufferMetaType::AUDIO, 57 size_t align = 0) 58 { 59 if (msgSize_ == msgSize && align == align_) { 60 return; 61 } 62 metaType_ = type; 63 msgSize_ = msgSize; 64 align_ = align; 65 freeBuffers_.clear(); 66 for (size_t i = 0; i < poolSize_; ++i) { 67 auto buf = new Plugin::Buffer(type); 68 buf->AllocMemory(nullptr, msgSize); 69 freeBuffers_.emplace_back(std::unique_ptr<T>(buf)); 70 } 71 } 72 Append(std::unique_ptr<T> buffer)73 bool Append(std::unique_ptr<T> buffer) 74 { 75 OSAL::ScopedLock lock(mutex_); 76 if (!isActive_ || freeBuffers_.size() >= poolSize_) { 77 return false; 78 } 79 freeBuffers_.emplace_back(std::move(buffer)); 80 cv_.NotifyOne(); 81 return true; 82 } 83 SetActive(bool active)84 void SetActive(bool active) 85 { 86 OSAL::ScopedLock lock(mutex_); 87 if (!active) { 88 cv_.NotifyOne(); 89 } 90 isActive_ = active; 91 } 92 RecycleBuffer(std::unique_ptr<T> buffer)93 void RecycleBuffer(std::unique_ptr<T> buffer) const 94 { 95 OSAL::ScopedLock lock(mutex_); 96 freeBuffers_.emplace_back(std::move(buffer)); 97 cv_.NotifyOne(); 98 } 99 AllocateBuffer()100 std::shared_ptr<T> AllocateBuffer() 101 { 102 OSAL::ScopedLock lock(mutex_); 103 allocInProgress = true; 104 if (freeBuffers_.empty() && isActive_) { 105 cv_.Wait(lock, [this] { return !isActive_ || !freeBuffers_.empty(); }); 106 } 107 std::shared_ptr<T> buffer; 108 if (isActive_) { 109 buffer = AllocateBufferUnprotected(); 110 } 111 allocInProgress = false; 112 cvFinishAlloc_.NotifyOne(); 113 return buffer; 114 } 115 AllocateBufferNonBlocking()116 std::shared_ptr<T> AllocateBufferNonBlocking() 117 { 118 OSAL::ScopedLock lock(mutex_); 119 if (freeBuffers_.empty() || !isActive_) { 120 return nullptr; 121 } 122 return AllocateBufferUnprotected(); 123 } 124 AllocateAppendBufferNonBlocking()125 std::shared_ptr<T> AllocateAppendBufferNonBlocking() 126 { 127 OSAL::ScopedLock lock(mutex_); 128 if (!isActive_) { 129 return nullptr; 130 } 131 if (freeBuffers_.empty()) { 132 poolSize_++; 133 auto buf = new Plugin::Buffer(metaType_); 134 buf->AllocMemory(nullptr, msgSize_); 135 freeBuffers_.emplace_back(std::unique_ptr<T>(buf)); 136 } 137 return AllocateBufferUnprotected(); 138 } 139 Size()140 size_t Size() const 141 { 142 OSAL::ScopedLock lock(mutex_); 143 return freeBuffers_.size(); 144 } 145 Capacity()146 size_t Capacity() const 147 { 148 return poolSize_; 149 } 150 Empty()151 bool Empty() const 152 { 153 OSAL::ScopedLock lock(mutex_); 154 return freeBuffers_.empty(); 155 } 156 157 private: AllocateBufferUnprotected()158 std::shared_ptr<T> AllocateBufferUnprotected() 159 { 160 std::weak_ptr<BufferPool<T>> weakRef(this->shared_from_this()); 161 std::shared_ptr<T> sptr(freeBuffers_.front().release(), [weakRef](T* ptr) { 162 auto pool = weakRef.lock(); 163 if (pool) { 164 pool->RecycleBuffer(std::unique_ptr<T>(ptr)); 165 } else { 166 delete ptr; 167 } 168 }); 169 freeBuffers_.pop_front(); 170 return sptr; 171 } 172 FinishAllocInProgress()173 void FinishAllocInProgress() 174 { 175 OSAL::ScopedLock lock(mutex_); 176 if (allocInProgress.load()) { 177 cvFinishAlloc_.Wait(lock, [this] { return !allocInProgress.load(); }); 178 } 179 } 180 181 mutable OSAL::Mutex mutex_; 182 mutable OSAL::ConditionVariable cv_; 183 mutable OSAL::ConditionVariable cvFinishAlloc_; 184 mutable std::list<std::unique_ptr<T>> freeBuffers_; 185 size_t poolSize_ {0}; 186 size_t msgSize_ {0}; 187 size_t align_ {0}; // 0: use default alignment. 188 std::atomic<bool> isActive_; 189 std::atomic<bool> allocInProgress; 190 Plugin::BufferMetaType metaType_ = Plugin::BufferMetaType::AUDIO; 191 }; 192 } // namespace Media 193 } // namespace OHOS 194 #endif // HISTREAMER_FOUNDATION_BUFFER_POOL_H 195