1 /*
2 * Copyright (c) 2022-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "AsyncMode"
17
18 #include "pipeline/filters/codec/async_mode.h"
19 #include "foundation/osal/utils/util.h"
20 #include "foundation/utils/dump_buffer.h"
21 #include "pipeline/filters/common/plugin_utils.h"
22 #if !defined(OHOS_LITE) && defined(VIDEO_SUPPORT)
23 #include "plugin/common/surface_memory.h"
24 #endif
25
26 namespace {
27 constexpr uint32_t DEFAULT_TRY_DECODE_TIME = 20;
28 }
29
30 namespace OHOS {
31 namespace Media {
32 namespace Pipeline {
AsyncMode(std::string name)33 AsyncMode::AsyncMode(std::string name) : CodecMode(std::move(name))
34 {
35 MEDIA_LOG_I(PUBLIC_LOG_S " ThreadMode: ASYNC", codecName_.c_str());
36 isNeedQueueInputBuffer_ = true;
37 }
38
~AsyncMode()39 AsyncMode::~AsyncMode()
40 {
41 MEDIA_LOG_D("Async mode dtor called");
42 }
43
Release()44 ErrorCode AsyncMode::Release()
45 {
46 MEDIA_LOG_I("AsyncMode Release start.");
47 stopped_ = true;
48
49 // 先停止线程 然后释放bufferQ 如果顺序反过来 可能导致线程访问已经释放的锁
50 if (inBufQue_) {
51 inBufQue_->SetActive(false);
52 inBufQue_.reset();
53 }
54 if (!isNeedQueueInputBuffer_) {
55 OSAL::ScopedLock lock(mutex_);
56 isNeedQueueInputBuffer_ = true;
57 cv_.NotifyOne();
58 }
59 if (handleFrameTask_) {
60 handleFrameTask_->Stop();
61 handleFrameTask_.reset();
62 }
63 if (outBufPool_) {
64 outBufPool_->SetActive(false);
65 }
66 if (decodeFrameTask_) {
67 decodeFrameTask_->Stop();
68 decodeFrameTask_.reset();
69 }
70 if (pushTask_ != nullptr) {
71 pushTask_->Stop();
72 pushTask_.reset();
73 }
74 {
75 OSAL::ScopedLock l(renderMutex_);
76 while (!outBufQue_.empty()) {
77 outBufQue_.pop();
78 }
79 }
80 MEDIA_LOG_I("AsyncMode Release end.");
81 return ErrorCode::SUCCESS;
82 }
83
Configure()84 ErrorCode AsyncMode::Configure()
85 {
86 stopped_ = false;
87 FALSE_LOG_MSG_W(QueueAllBufferInPoolToPluginLocked() == ErrorCode::SUCCESS,
88 "Can not configure all output buffers to plugin before start.");
89 FAIL_RETURN(CodecMode::Configure());
90 if (!isNeedQueueInputBuffer_) {
91 OSAL::ScopedLock lock(mutex_);
92 isNeedQueueInputBuffer_ = true;
93 cv_.NotifyOne();
94 }
95 if (handleFrameTask_) {
96 handleFrameTask_->Start();
97 }
98 if (decodeFrameTask_) {
99 decodeFrameTask_->Start();
100 }
101 if (pushTask_) {
102 pushTask_->Start();
103 }
104 return ErrorCode::SUCCESS;
105 }
106
PushData(const std::string & inPort,const AVBufferPtr & buffer,int64_t offset)107 ErrorCode AsyncMode::PushData(const std::string &inPort, const AVBufferPtr& buffer, int64_t offset)
108 {
109 DUMP_BUFFER2LOG("AsyncMode in", buffer, offset);
110 MEDIA_LOG_DD("PushData called, inBufQue_->Size(): " PUBLIC_LOG_ZU ", capacity: " PUBLIC_LOG_ZU,
111 inBufQue_->Size(), inBufQue_->Capacity());
112 if (buffer != nullptr && !stopped_) {
113 inBufQue_->Push(buffer);
114 } else {
115 MEDIA_LOG_DD("PushData buffer = nullptr.");
116 OSAL::SleepFor(DEFAULT_TRY_DECODE_TIME);
117 }
118 return ErrorCode::SUCCESS;
119 }
120
Stop()121 ErrorCode AsyncMode::Stop()
122 {
123 MEDIA_LOG_I("AsyncMode stop start.");
124 stopped_ = true;
125 if (outBufPool_) {
126 outBufPool_->SetActive(false);
127 }
128 if (decodeFrameTask_) {
129 decodeFrameTask_->Stop();
130 }
131 if (pushTask_) {
132 pushTask_->Stop();
133 }
134 inBufQue_->SetActive(false);
135 {
136 OSAL::ScopedLock l(renderMutex_);
137 while (!outBufQue_.empty()) {
138 outBufQue_.pop();
139 }
140 }
141 if (!isNeedQueueInputBuffer_) {
142 OSAL::ScopedLock lock(mutex_);
143 isNeedQueueInputBuffer_ = true;
144 cv_.NotifyOne();
145 }
146 if (handleFrameTask_) {
147 handleFrameTask_->Stop();
148 }
149 outBufPool_.reset();
150 MEDIA_LOG_I("AsyncMode stop end.");
151 return ErrorCode::SUCCESS;
152 }
153
FlushStart()154 void AsyncMode::FlushStart()
155 {
156 MEDIA_LOG_I("AsyncMode FlushStart entered.");
157 stopped_ = true; // thread will pause, should not enter endless loop
158 if (inBufQue_) {
159 inBufQue_->SetActive(false);
160 }
161 if (!isNeedQueueInputBuffer_) {
162 OSAL::ScopedLock lock(mutex_);
163 isNeedQueueInputBuffer_ = true;
164 cv_.NotifyOne();
165 }
166 if (handleFrameTask_) {
167 handleFrameTask_->Pause();
168 }
169 if (outBufPool_) {
170 outBufPool_->SetActive(false);
171 }
172 if (decodeFrameTask_) {
173 decodeFrameTask_->Pause();
174 }
175 if (pushTask_) {
176 pushTask_->Pause();
177 }
178 while (!outBufQue_.empty()) {
179 outBufQue_.pop();
180 }
181 MEDIA_LOG_I("AsyncMode FlushStart exit.");
182 }
183
FlushEnd()184 void AsyncMode::FlushEnd()
185 {
186 MEDIA_LOG_I("AsyncMode FlushEnd entered");
187 stopped_ = false;
188 if (inBufQue_) {
189 inBufQue_->SetActive(true);
190 }
191 if (!isNeedQueueInputBuffer_) {
192 OSAL::ScopedLock lock(mutex_);
193 isNeedQueueInputBuffer_ = true;
194 cv_.NotifyOne();
195 }
196 if (handleFrameTask_) {
197 handleFrameTask_->Start();
198 }
199 if (outBufPool_) {
200 outBufPool_->SetActive(true);
201 }
202 if (decodeFrameTask_) {
203 decodeFrameTask_->Start();
204 }
205 if (pushTask_) {
206 pushTask_->Start();
207 }
208 MEDIA_LOG_I("AsyncMode FlushEnd exit");
209 }
210
HandleFrame()211 ErrorCode AsyncMode::HandleFrame()
212 {
213 MEDIA_LOG_DD("AsyncMode handle frame called, inBufQue_->Size(): " PUBLIC_LOG_ZU, inBufQue_->Size());
214 auto oneBuffer = inBufQue_->Pop();
215 if (oneBuffer == nullptr) {
216 MEDIA_LOG_DD("decoder find nullptr in esBufferQ");
217 return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
218 }
219 Plugin::Status status = Plugin::Status::OK;
220 do {
221 DUMP_BUFFER2LOG("AsyncMode QueueInput to Plugin", oneBuffer, -1);
222 status = plugin_->QueueInputBuffer(oneBuffer, 0);
223 if (status == Plugin::Status::OK || status == Plugin::Status::END_OF_STREAM
224 || status != Plugin::Status::ERROR_AGAIN || stopped_) {
225 if (oneBuffer->flag & BUFFER_FLAG_EOS) {
226 MEDIA_LOG_D("Handle frame receive EOS, pause async.");
227 handleFrameTask_->PauseAsync();
228 }
229 break;
230 }
231 MEDIA_LOG_DD("Send data to plugin error: " PUBLIC_LOG_D32 ", currently, input buffer cannot be inputted, "
232 "send data can only continue after reading the output from ffmpeg.", static_cast<int32_t>(status));
233 OSAL::ScopedLock lock(mutex_);
234 isNeedQueueInputBuffer_ = false;
235 cv_.Wait(lock);
236 } while (true);
237 MEDIA_LOG_DD("Async handle frame finished");
238 return TranslatePluginStatus(status);
239 }
240
DecodeFrame()241 ErrorCode AsyncMode::DecodeFrame()
242 {
243 MEDIA_LOG_DD("AsyncMode decode frame called, outBufPool_->Size(): " PUBLIC_LOG_ZU, outBufPool_->Size());
244 Plugin::Status status = Plugin::Status::OK;
245 auto newOutBuffer = outBufPool_->AllocateBuffer();
246 if (CheckBufferValidity(newOutBuffer) == ErrorCode::SUCCESS) {
247 newOutBuffer->Reset();
248 status = plugin_->QueueOutputBuffer(newOutBuffer, 0);
249 if (status == Plugin::Status::ERROR_NOT_ENOUGH_DATA) {
250 MEDIA_LOG_DD("QueueOutputBuffer failed, cause no enough data.");
251 if (!isNeedQueueInputBuffer_) {
252 OSAL::ScopedLock lock(mutex_);
253 cv_.NotifyOne();
254 }
255 }
256 } else {
257 MEDIA_LOG_DD("Invalid buffer.");
258 }
259 MEDIA_LOG_DD("Async decode frame finished");
260 return ErrorCode::SUCCESS;
261 }
262
FinishFrame()263 ErrorCode AsyncMode::FinishFrame()
264 {
265 MEDIA_LOG_DD("FinishFrame begin, outBufQue size: " PUBLIC_LOG_ZU, outBufQue_.size());
266 std::shared_ptr<AVBuffer> frameBuffer = nullptr;
267 {
268 OSAL::ScopedLock l(renderMutex_);
269 if (!outBufQue_.empty()) {
270 frameBuffer = outBufQue_.front();
271 outBufQue_.pop();
272 }
273 }
274 if (frameBuffer != nullptr) {
275 auto oPort = outPorts_[0];
276 if (oPort->GetWorkMode() == WorkMode::PUSH) {
277 DUMP_BUFFER2LOG("AsyncMode PushData to Sink", frameBuffer, -1);
278 oPort->PushData(frameBuffer, -1);
279 } else {
280 MEDIA_LOG_W("decoder out port works in pull mode");
281 return ErrorCode::ERROR_INVALID_OPERATION;
282 }
283 if (frameBuffer->flag & BUFFER_FLAG_EOS) {
284 MEDIA_LOG_D("Finish frame receive EOS, pause async.");
285 pushTask_->PauseAsync();
286 }
287 frameBuffer.reset();
288 }
289 MEDIA_LOG_DD("AsyncMode finish frame success");
290 return ErrorCode::SUCCESS;
291 }
292
OnOutputBufferDone(const std::shared_ptr<Plugin::Buffer> & buffer)293 void AsyncMode::OnOutputBufferDone(const std::shared_ptr<Plugin::Buffer>& buffer)
294 {
295 if (buffer == nullptr) {
296 MEDIA_LOG_E("Out put buffer is null.");
297 return;
298 }
299 if (buffer->flag & BUFFER_FLAG_EOS) {
300 if (outBufPool_) {
301 outBufPool_->SetActive(false);
302 }
303 MEDIA_LOG_D("Decode frame receive EOS, pause async.");
304 if (decodeFrameTask_ == nullptr) {
305 MEDIA_LOG_E("Decode frame task is closed.");
306 return;
307 }
308 decodeFrameTask_->PauseAsync();
309 }
310 {
311 OSAL::ScopedLock l(renderMutex_);
312 outBufQue_.push(buffer);
313 }
314 if (!isNeedQueueInputBuffer_) {
315 OSAL::ScopedLock lock(mutex_);
316 isNeedQueueInputBuffer_ = true;
317 cv_.NotifyOne();
318 }
319 }
320
Prepare()321 ErrorCode AsyncMode::Prepare()
322 {
323 MEDIA_LOG_I("AsyncMode prepare called.");
324 if (!inBufQue_) {
325 inBufQue_ = std::make_shared<BlockingQueue<AVBufferPtr>>("asyncFilterInBufQue", GetInBufferPoolSize());
326 } else {
327 inBufQue_->SetActive(true);
328 }
329 if (!handleFrameTask_) {
330 handleFrameTask_ = std::make_shared<OSAL::Task>(codecName_ + "AsyncHandleFrame");
331 handleFrameTask_->RegisterHandler([this] { (void)HandleFrame(); });
332 }
333 if (!decodeFrameTask_) {
334 decodeFrameTask_ = std::make_shared<OSAL::Task>(codecName_ + "AsyncDecodeFrame");
335 decodeFrameTask_->RegisterHandler([this] { (void)DecodeFrame(); });
336 }
337 if (!pushTask_) {
338 pushTask_ = std::make_shared<OSAL::Task>(codecName_ + "AsyncPush");
339 pushTask_->RegisterHandler([this] { (void)FinishFrame(); });
340 }
341 return ErrorCode::SUCCESS;
342 }
343
344 /**
345 * CheckBufferValidity 返回失败的原因是 SurfaceBuffer 申请失败。
346 * 这种情况,SurfaceBuffer 对应的 AVBuffer 仍然要回到 outBufPool_。继续循环,就会使得这个循环无法退出。
347 * 实际上是 SurfaceBuffer 申请成功才能退出。可能遇到了特殊情况,永远申请不到 SurfaceBuffer。
348 */
QueueAllBufferInPoolToPluginLocked()349 ErrorCode AsyncMode::QueueAllBufferInPoolToPluginLocked()
350 {
351 ErrorCode err = ErrorCode::SUCCESS;
352 while (!outBufPool_->Empty()) {
353 auto buf = outBufPool_->AllocateBuffer();
354 if (CheckBufferValidity(buf) != ErrorCode::SUCCESS) {
355 MEDIA_LOG_W("cannot allocate buffer in buffer pool");
356 break;
357 }
358 buf->Reset();
359 err = TranslatePluginStatus(plugin_->QueueOutputBuffer(buf, -1));
360 if (err != ErrorCode::SUCCESS) {
361 MEDIA_LOG_W("Queue output buffer error, plugin doesn't support queue all out buffers.");
362 break;
363 }
364 }
365 return err;
366 }
367
CheckBufferValidity(std::shared_ptr<AVBuffer> & buffer)368 ErrorCode AsyncMode::CheckBufferValidity(std::shared_ptr<AVBuffer>& buffer)
369 {
370 if (buffer == nullptr) {
371 return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
372 }
373 auto memory = buffer->GetMemory();
374 if (memory == nullptr) {
375 return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
376 }
377 #if !defined(OHOS_LITE) && defined(VIDEO_SUPPORT)
378 if (memory->GetMemoryType() == Plugin::MemoryType::SURFACE_BUFFER) {
379 std::shared_ptr<Plugin::SurfaceMemory> surfaceMemory =
380 Plugin::ReinterpretPointerCast<Plugin::SurfaceMemory>(memory);
381
382 // trigger surface memory to request surface buffer again when it is surface buffer
383 if (surfaceMemory->GetSurfaceBuffer() == nullptr) {
384 // Surface often obtain buffer failed, but doesn't cause any problem.
385 MEDIA_LOG_DD("Get surface buffer fail.");
386 return ErrorCode::ERROR_NO_MEMORY;
387 }
388 }
389 #endif
390 return ErrorCode::SUCCESS;
391 }
392 } // namespace Pipeline
393 } // namespace Media
394 } // namespace OHOS
395