1 /*
2  * Copyright (c) 2022-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "AsyncMode"
17 
18 #include "pipeline/filters/codec/async_mode.h"
19 #include "foundation/osal/utils/util.h"
20 #include "foundation/utils/dump_buffer.h"
21 #include "pipeline/filters/common/plugin_utils.h"
22 #if !defined(OHOS_LITE) && defined(VIDEO_SUPPORT)
23 #include "plugin/common/surface_memory.h"
24 #endif
25 
26 namespace {
27 constexpr uint32_t DEFAULT_TRY_DECODE_TIME = 20;
28 }
29 
30 namespace OHOS {
31 namespace Media {
32 namespace Pipeline {
AsyncMode(std::string name)33 AsyncMode::AsyncMode(std::string name) : CodecMode(std::move(name))
34 {
35     MEDIA_LOG_I(PUBLIC_LOG_S " ThreadMode: ASYNC", codecName_.c_str());
36     isNeedQueueInputBuffer_ = true;
37 }
38 
~AsyncMode()39 AsyncMode::~AsyncMode()
40 {
41     MEDIA_LOG_D("Async mode dtor called");
42 }
43 
Release()44 ErrorCode AsyncMode::Release()
45 {
46     MEDIA_LOG_I("AsyncMode Release start.");
47     stopped_ = true;
48 
49     // 先停止线程 然后释放bufferQ 如果顺序反过来 可能导致线程访问已经释放的锁
50     if (inBufQue_) {
51         inBufQue_->SetActive(false);
52         inBufQue_.reset();
53     }
54     if (!isNeedQueueInputBuffer_) {
55         OSAL::ScopedLock lock(mutex_);
56         isNeedQueueInputBuffer_ = true;
57         cv_.NotifyOne();
58     }
59     if (handleFrameTask_) {
60         handleFrameTask_->Stop();
61         handleFrameTask_.reset();
62     }
63     if (outBufPool_) {
64         outBufPool_->SetActive(false);
65     }
66     if (decodeFrameTask_) {
67         decodeFrameTask_->Stop();
68         decodeFrameTask_.reset();
69     }
70     if (pushTask_ != nullptr) {
71         pushTask_->Stop();
72         pushTask_.reset();
73     }
74     {
75         OSAL::ScopedLock l(renderMutex_);
76         while (!outBufQue_.empty()) {
77             outBufQue_.pop();
78         }
79     }
80     MEDIA_LOG_I("AsyncMode Release end.");
81     return ErrorCode::SUCCESS;
82 }
83 
Configure()84 ErrorCode AsyncMode::Configure()
85 {
86     stopped_ = false;
87     FALSE_LOG_MSG_W(QueueAllBufferInPoolToPluginLocked() == ErrorCode::SUCCESS,
88                     "Can not configure all output buffers to plugin before start.");
89     FAIL_RETURN(CodecMode::Configure());
90     if (!isNeedQueueInputBuffer_) {
91         OSAL::ScopedLock lock(mutex_);
92         isNeedQueueInputBuffer_ = true;
93         cv_.NotifyOne();
94     }
95     if (handleFrameTask_) {
96         handleFrameTask_->Start();
97     }
98     if (decodeFrameTask_) {
99         decodeFrameTask_->Start();
100     }
101     if (pushTask_) {
102         pushTask_->Start();
103     }
104     return ErrorCode::SUCCESS;
105 }
106 
PushData(const std::string & inPort,const AVBufferPtr & buffer,int64_t offset)107 ErrorCode AsyncMode::PushData(const std::string &inPort, const AVBufferPtr& buffer, int64_t offset)
108 {
109     DUMP_BUFFER2LOG("AsyncMode in", buffer, offset);
110     MEDIA_LOG_DD("PushData called, inBufQue_->Size(): " PUBLIC_LOG_ZU ", capacity: " PUBLIC_LOG_ZU,
111                  inBufQue_->Size(), inBufQue_->Capacity());
112     if (buffer != nullptr && !stopped_) {
113         inBufQue_->Push(buffer);
114     } else {
115         MEDIA_LOG_DD("PushData buffer = nullptr.");
116         OSAL::SleepFor(DEFAULT_TRY_DECODE_TIME);
117     }
118     return ErrorCode::SUCCESS;
119 }
120 
Stop()121 ErrorCode AsyncMode::Stop()
122 {
123     MEDIA_LOG_I("AsyncMode stop start.");
124     stopped_ = true;
125     if (outBufPool_) {
126         outBufPool_->SetActive(false);
127     }
128     if (decodeFrameTask_) {
129         decodeFrameTask_->Stop();
130     }
131     if (pushTask_) {
132         pushTask_->Stop();
133     }
134     inBufQue_->SetActive(false);
135     {
136         OSAL::ScopedLock l(renderMutex_);
137         while (!outBufQue_.empty()) {
138             outBufQue_.pop();
139         }
140     }
141     if (!isNeedQueueInputBuffer_) {
142         OSAL::ScopedLock lock(mutex_);
143         isNeedQueueInputBuffer_ = true;
144         cv_.NotifyOne();
145     }
146     if (handleFrameTask_) {
147         handleFrameTask_->Stop();
148     }
149     outBufPool_.reset();
150     MEDIA_LOG_I("AsyncMode stop end.");
151     return ErrorCode::SUCCESS;
152 }
153 
FlushStart()154 void AsyncMode::FlushStart()
155 {
156     MEDIA_LOG_I("AsyncMode FlushStart entered.");
157     stopped_ = true; // thread will pause, should not enter endless loop
158     if (inBufQue_) {
159         inBufQue_->SetActive(false);
160     }
161     if (!isNeedQueueInputBuffer_) {
162         OSAL::ScopedLock lock(mutex_);
163         isNeedQueueInputBuffer_ = true;
164         cv_.NotifyOne();
165     }
166     if (handleFrameTask_) {
167         handleFrameTask_->Pause();
168     }
169     if (outBufPool_) {
170         outBufPool_->SetActive(false);
171     }
172     if (decodeFrameTask_) {
173         decodeFrameTask_->Pause();
174     }
175     if (pushTask_) {
176         pushTask_->Pause();
177     }
178     while (!outBufQue_.empty()) {
179         outBufQue_.pop();
180     }
181     MEDIA_LOG_I("AsyncMode FlushStart exit.");
182 }
183 
FlushEnd()184 void AsyncMode::FlushEnd()
185 {
186     MEDIA_LOG_I("AsyncMode FlushEnd entered");
187     stopped_ = false;
188     if (inBufQue_) {
189         inBufQue_->SetActive(true);
190     }
191     if (!isNeedQueueInputBuffer_) {
192         OSAL::ScopedLock lock(mutex_);
193         isNeedQueueInputBuffer_ = true;
194         cv_.NotifyOne();
195     }
196     if (handleFrameTask_) {
197         handleFrameTask_->Start();
198     }
199     if (outBufPool_) {
200         outBufPool_->SetActive(true);
201     }
202     if (decodeFrameTask_) {
203         decodeFrameTask_->Start();
204     }
205     if (pushTask_) {
206         pushTask_->Start();
207     }
208     MEDIA_LOG_I("AsyncMode FlushEnd exit");
209 }
210 
HandleFrame()211 ErrorCode AsyncMode::HandleFrame()
212 {
213     MEDIA_LOG_DD("AsyncMode handle frame called, inBufQue_->Size(): " PUBLIC_LOG_ZU, inBufQue_->Size());
214     auto oneBuffer = inBufQue_->Pop();
215     if (oneBuffer == nullptr) {
216         MEDIA_LOG_DD("decoder find nullptr in esBufferQ");
217         return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
218     }
219     Plugin::Status status = Plugin::Status::OK;
220     do {
221         DUMP_BUFFER2LOG("AsyncMode QueueInput to Plugin", oneBuffer, -1);
222         status = plugin_->QueueInputBuffer(oneBuffer, 0);
223         if (status == Plugin::Status::OK || status == Plugin::Status::END_OF_STREAM
224             || status != Plugin::Status::ERROR_AGAIN || stopped_) {
225             if (oneBuffer->flag & BUFFER_FLAG_EOS) {
226                 MEDIA_LOG_D("Handle frame receive EOS, pause async.");
227                 handleFrameTask_->PauseAsync();
228             }
229             break;
230         }
231         MEDIA_LOG_DD("Send data to plugin error: " PUBLIC_LOG_D32 ", currently, input buffer cannot be inputted, "
232             "send data can only continue after reading the output from ffmpeg.", static_cast<int32_t>(status));
233         OSAL::ScopedLock lock(mutex_);
234         isNeedQueueInputBuffer_ = false;
235         cv_.Wait(lock);
236     } while (true);
237     MEDIA_LOG_DD("Async handle frame finished");
238     return TranslatePluginStatus(status);
239 }
240 
DecodeFrame()241 ErrorCode AsyncMode::DecodeFrame()
242 {
243     MEDIA_LOG_DD("AsyncMode decode frame called, outBufPool_->Size(): " PUBLIC_LOG_ZU, outBufPool_->Size());
244     Plugin::Status status = Plugin::Status::OK;
245     auto newOutBuffer = outBufPool_->AllocateBuffer();
246     if (CheckBufferValidity(newOutBuffer) == ErrorCode::SUCCESS) {
247         newOutBuffer->Reset();
248         status = plugin_->QueueOutputBuffer(newOutBuffer, 0);
249         if (status == Plugin::Status::ERROR_NOT_ENOUGH_DATA) {
250             MEDIA_LOG_DD("QueueOutputBuffer failed, cause no enough data.");
251             if (!isNeedQueueInputBuffer_) {
252                 OSAL::ScopedLock lock(mutex_);
253                 cv_.NotifyOne();
254             }
255         }
256     } else {
257         MEDIA_LOG_DD("Invalid buffer.");
258     }
259     MEDIA_LOG_DD("Async decode frame finished");
260     return ErrorCode::SUCCESS;
261 }
262 
FinishFrame()263 ErrorCode AsyncMode::FinishFrame()
264 {
265     MEDIA_LOG_DD("FinishFrame begin, outBufQue size: " PUBLIC_LOG_ZU, outBufQue_.size());
266     std::shared_ptr<AVBuffer> frameBuffer = nullptr;
267     {
268         OSAL::ScopedLock l(renderMutex_);
269         if (!outBufQue_.empty()) {
270             frameBuffer = outBufQue_.front();
271             outBufQue_.pop();
272         }
273     }
274     if (frameBuffer != nullptr) {
275         auto oPort = outPorts_[0];
276         if (oPort->GetWorkMode() == WorkMode::PUSH) {
277             DUMP_BUFFER2LOG("AsyncMode PushData to Sink", frameBuffer, -1);
278             oPort->PushData(frameBuffer, -1);
279         } else {
280             MEDIA_LOG_W("decoder out port works in pull mode");
281             return ErrorCode::ERROR_INVALID_OPERATION;
282         }
283         if (frameBuffer->flag & BUFFER_FLAG_EOS) {
284             MEDIA_LOG_D("Finish frame receive EOS, pause async.");
285             pushTask_->PauseAsync();
286         }
287         frameBuffer.reset();
288     }
289     MEDIA_LOG_DD("AsyncMode finish frame success");
290     return ErrorCode::SUCCESS;
291 }
292 
OnOutputBufferDone(const std::shared_ptr<Plugin::Buffer> & buffer)293 void AsyncMode::OnOutputBufferDone(const std::shared_ptr<Plugin::Buffer>& buffer)
294 {
295     if (buffer == nullptr) {
296         MEDIA_LOG_E("Out put buffer is null.");
297         return;
298     }
299     if (buffer->flag & BUFFER_FLAG_EOS) {
300         if (outBufPool_) {
301             outBufPool_->SetActive(false);
302         }
303         MEDIA_LOG_D("Decode frame receive EOS, pause async.");
304         if (decodeFrameTask_ == nullptr) {
305             MEDIA_LOG_E("Decode frame task is closed.");
306             return;
307         }
308         decodeFrameTask_->PauseAsync();
309     }
310     {
311         OSAL::ScopedLock l(renderMutex_);
312         outBufQue_.push(buffer);
313     }
314     if (!isNeedQueueInputBuffer_) {
315         OSAL::ScopedLock lock(mutex_);
316         isNeedQueueInputBuffer_ = true;
317         cv_.NotifyOne();
318     }
319 }
320 
Prepare()321 ErrorCode AsyncMode::Prepare()
322 {
323     MEDIA_LOG_I("AsyncMode prepare called.");
324     if (!inBufQue_) {
325         inBufQue_ = std::make_shared<BlockingQueue<AVBufferPtr>>("asyncFilterInBufQue", GetInBufferPoolSize());
326     } else {
327         inBufQue_->SetActive(true);
328     }
329     if (!handleFrameTask_) {
330         handleFrameTask_ = std::make_shared<OSAL::Task>(codecName_ + "AsyncHandleFrame");
331         handleFrameTask_->RegisterHandler([this] { (void)HandleFrame(); });
332     }
333     if (!decodeFrameTask_) {
334         decodeFrameTask_ = std::make_shared<OSAL::Task>(codecName_ + "AsyncDecodeFrame");
335         decodeFrameTask_->RegisterHandler([this] { (void)DecodeFrame(); });
336     }
337     if (!pushTask_) {
338         pushTask_ = std::make_shared<OSAL::Task>(codecName_ + "AsyncPush");
339         pushTask_->RegisterHandler([this] { (void)FinishFrame(); });
340     }
341     return ErrorCode::SUCCESS;
342 }
343 
344 /**
345  * CheckBufferValidity 返回失败的原因是 SurfaceBuffer 申请失败。
346  * 这种情况,SurfaceBuffer 对应的 AVBuffer 仍然要回到 outBufPool_。继续循环,就会使得这个循环无法退出。
347  * 实际上是 SurfaceBuffer 申请成功才能退出。可能遇到了特殊情况,永远申请不到 SurfaceBuffer。
348  */
QueueAllBufferInPoolToPluginLocked()349 ErrorCode AsyncMode::QueueAllBufferInPoolToPluginLocked()
350 {
351     ErrorCode err = ErrorCode::SUCCESS;
352     while (!outBufPool_->Empty()) {
353         auto buf = outBufPool_->AllocateBuffer();
354         if (CheckBufferValidity(buf) != ErrorCode::SUCCESS) {
355             MEDIA_LOG_W("cannot allocate buffer in buffer pool");
356             break;
357         }
358         buf->Reset();
359         err = TranslatePluginStatus(plugin_->QueueOutputBuffer(buf, -1));
360         if (err != ErrorCode::SUCCESS) {
361             MEDIA_LOG_W("Queue output buffer error, plugin doesn't support queue all out buffers.");
362             break;
363         }
364     }
365     return err;
366 }
367 
CheckBufferValidity(std::shared_ptr<AVBuffer> & buffer)368 ErrorCode AsyncMode::CheckBufferValidity(std::shared_ptr<AVBuffer>& buffer)
369 {
370     if (buffer == nullptr) {
371         return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
372     }
373     auto memory = buffer->GetMemory();
374     if (memory == nullptr) {
375         return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
376     }
377 #if !defined(OHOS_LITE) && defined(VIDEO_SUPPORT)
378     if (memory->GetMemoryType() == Plugin::MemoryType::SURFACE_BUFFER) {
379         std::shared_ptr<Plugin::SurfaceMemory> surfaceMemory =
380             Plugin::ReinterpretPointerCast<Plugin::SurfaceMemory>(memory);
381 
382         // trigger surface memory to request surface buffer again when it is surface buffer
383         if (surfaceMemory->GetSurfaceBuffer() == nullptr) {
384             // Surface often obtain buffer failed, but doesn't cause any problem.
385             MEDIA_LOG_DD("Get surface buffer fail.");
386             return ErrorCode::ERROR_NO_MEMORY;
387         }
388     }
389 #endif
390     return ErrorCode::SUCCESS;
391 }
392 } // namespace Pipeline
393 } // namespace Media
394 } // namespace OHOS
395