1 /*
2  * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef HISTREAMER_FOUNDATION_BUFFER_POOL_H
17 #define HISTREAMER_FOUNDATION_BUFFER_POOL_H
18 
19 #include <atomic>
20 #include <list>
21 #include <memory>
22 #include "foundation/osal/thread/condition_variable.h"
23 #include "foundation/osal/thread/mutex.h"
24 #include "foundation/osal/thread/scoped_lock.h"
25 #include "foundation/utils/constants.h"
26 #include "plugin/common/plugin_buffer.h"
27 
28 namespace OHOS {
29 namespace Media {
30 template <typename T>
31 class BufferPool : public std::enable_shared_from_this<BufferPool<T>> {
32 public:
33     explicit BufferPool<T>(size_t poolSize) : poolSize_(poolSize), isActive_(true), allocInProgress(false)
34     {
35     }
36 
37     BufferPool<T>(const BufferPool<T>&) = delete;
38 
39     BufferPool<T>& operator=(const BufferPool<T>&) = delete;
40 
~BufferPool()41     ~BufferPool<T>()
42     {
43         isActive_ = false;
44         {
45             OSAL::ScopedLock lock(mutex_);
46             cv_.NotifyOne();
47         }
48         FinishAllocInProgress();
49     }
50 
Create(size_t poolSize)51     static std::shared_ptr<BufferPool<T>> Create(size_t poolSize)
52     {
53         return std::make_shared<BufferPool<T>>(poolSize);
54     }
55 
56     void Init(size_t msgSize = DEFAULT_FRAME_SIZE, Plugin::BufferMetaType type = Plugin::BufferMetaType::AUDIO,
57               size_t align = 0)
58     {
59         if (msgSize_ == msgSize && align == align_) {
60             return;
61         }
62         metaType_ = type;
63         msgSize_ = msgSize;
64         align_ = align;
65         freeBuffers_.clear();
66         for (size_t i = 0; i < poolSize_; ++i) {
67             auto buf = new Plugin::Buffer(type);
68             buf->AllocMemory(nullptr, msgSize);
69             freeBuffers_.emplace_back(std::unique_ptr<T>(buf));
70         }
71     }
72 
Append(std::unique_ptr<T> buffer)73     bool Append(std::unique_ptr<T> buffer)
74     {
75         OSAL::ScopedLock lock(mutex_);
76         if (!isActive_ || freeBuffers_.size() >= poolSize_) {
77             return false;
78         }
79         freeBuffers_.emplace_back(std::move(buffer));
80         cv_.NotifyOne();
81         return true;
82     }
83 
SetActive(bool active)84     void SetActive(bool active)
85     {
86         OSAL::ScopedLock lock(mutex_);
87         if (!active) {
88             cv_.NotifyOne();
89         }
90         isActive_ = active;
91     }
92 
RecycleBuffer(std::unique_ptr<T> buffer)93     void RecycleBuffer(std::unique_ptr<T> buffer) const
94     {
95         OSAL::ScopedLock lock(mutex_);
96         freeBuffers_.emplace_back(std::move(buffer));
97         cv_.NotifyOne();
98     }
99 
AllocateBuffer()100     std::shared_ptr<T> AllocateBuffer()
101     {
102         OSAL::ScopedLock lock(mutex_);
103         allocInProgress = true;
104         if (freeBuffers_.empty() && isActive_) {
105             cv_.Wait(lock, [this] { return !isActive_ || !freeBuffers_.empty(); });
106         }
107         std::shared_ptr<T> buffer;
108         if (isActive_) {
109             buffer = AllocateBufferUnprotected();
110         }
111         allocInProgress = false;
112         cvFinishAlloc_.NotifyOne();
113         return buffer;
114     }
115 
AllocateBufferNonBlocking()116     std::shared_ptr<T> AllocateBufferNonBlocking()
117     {
118         OSAL::ScopedLock lock(mutex_);
119         if (freeBuffers_.empty() || !isActive_) {
120             return nullptr;
121         }
122         return AllocateBufferUnprotected();
123     }
124 
AllocateAppendBufferNonBlocking()125     std::shared_ptr<T> AllocateAppendBufferNonBlocking()
126     {
127         OSAL::ScopedLock lock(mutex_);
128         if (!isActive_) {
129             return nullptr;
130         }
131         if (freeBuffers_.empty()) {
132             poolSize_++;
133             auto buf = new Plugin::Buffer(metaType_);
134             buf->AllocMemory(nullptr, msgSize_);
135             freeBuffers_.emplace_back(std::unique_ptr<T>(buf));
136         }
137         return AllocateBufferUnprotected();
138     }
139 
Size()140     size_t Size() const
141     {
142         OSAL::ScopedLock lock(mutex_);
143         return freeBuffers_.size();
144     }
145 
Capacity()146     size_t Capacity() const
147     {
148         return poolSize_;
149     }
150 
Empty()151     bool Empty() const
152     {
153         OSAL::ScopedLock lock(mutex_);
154         return freeBuffers_.empty();
155     }
156 
157 private:
AllocateBufferUnprotected()158     std::shared_ptr<T> AllocateBufferUnprotected()
159     {
160         std::weak_ptr<BufferPool<T>> weakRef(this->shared_from_this());
161         std::shared_ptr<T> sptr(freeBuffers_.front().release(), [weakRef](T* ptr) {
162             auto pool = weakRef.lock();
163             if (pool) {
164                 pool->RecycleBuffer(std::unique_ptr<T>(ptr));
165             } else {
166                 delete ptr;
167             }
168         });
169         freeBuffers_.pop_front();
170         return sptr;
171     }
172 
FinishAllocInProgress()173     void FinishAllocInProgress()
174     {
175         OSAL::ScopedLock lock(mutex_);
176         if (allocInProgress.load()) {
177             cvFinishAlloc_.Wait(lock, [this] { return !allocInProgress.load(); });
178         }
179     }
180 
181     mutable OSAL::Mutex mutex_;
182     mutable OSAL::ConditionVariable cv_;
183     mutable OSAL::ConditionVariable cvFinishAlloc_;
184     mutable std::list<std::unique_ptr<T>> freeBuffers_;
185     size_t poolSize_ {0};
186     size_t msgSize_ {0};
187     size_t align_ {0}; // 0: use default alignment.
188     std::atomic<bool> isActive_;
189     std::atomic<bool> allocInProgress;
190     Plugin::BufferMetaType metaType_ = Plugin::BufferMetaType::AUDIO;
191 };
192 } // namespace Media
193 } // namespace OHOS
194 #endif // HISTREAMER_FOUNDATION_BUFFER_POOL_H
195