1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 * http://www.apache.org/licenses/LICENSE-2.0
7 * Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS,
9 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 * See the License for the specific language governing permissions and
11 * limitations under the License.
12 */
13
14 #include "source_node.h"
15 #include <unistd.h>
16
17 namespace OHOS::Camera {
SourceNode(const std::string & name,const std::string & type,const std::string & cameraId)18 SourceNode::SourceNode(const std::string& name, const std::string& type, const std::string &cameraId)
19 : NodeBase(name, type, cameraId)
20 {
21 name_ = name;
22 type_ = type;
23 cameraIds_ = cameraId;
24 CAMERA_LOGV("%{public}s enter, type(%{public}s)\n", name_.c_str(), type_.c_str());
25 }
26
~SourceNode()27 SourceNode::~SourceNode()
28 {
29 CAMERA_LOGV("%{public}s, source node dtor.", __FUNCTION__);
30 }
31
Init(const int32_t streamId)32 RetCode SourceNode::Init(const int32_t streamId)
33 {
34 (void)streamId;
35 return RC_OK;
36 }
37
Start(const int32_t streamId)38 RetCode SourceNode::Start(const int32_t streamId)
39 {
40 CAMERA_LOGI("SourceNode::Start [%{public}d] start", streamId);
41 std::shared_ptr<IPort> port = nullptr;
42 auto outPorts = GetOutPorts();
43 for (auto& p : outPorts) {
44 PortFormat format = {};
45 p->GetFormat(format);
46 if (streamId == format.streamId_) {
47 port = p;
48 break;
49 }
50 }
51 if (port == nullptr) {
52 return RC_ERROR;
53 }
54
55 {
56 std::lock_guard<std::mutex> l(hndl_);
57 if (handler_.count(streamId) > 0) {
58 CAMERA_LOGI("stream [%{public}d] start again, skip", streamId);
59 return RC_OK;
60 }
61 }
62
63 SetBufferCallback();
64 std::shared_ptr<PortHandler> ph = std::make_shared<PortHandler>(port, isAdjust_);
65 CHECK_IF_PTR_NULL_RETURN_VALUE(ph, RC_ERROR);
66 ph->setWideAndHigh(wide_, high_);
67 {
68 std::lock_guard<std::mutex> l(hndl_);
69 handler_[streamId] = ph;
70 }
71 RetCode rc = handler_[streamId]->StartCollectBuffers();
72 CHECK_IF_NOT_EQUAL_RETURN_VALUE(rc, RC_OK, RC_ERROR);
73
74 rc = handler_[streamId]->StartDistributeBuffers();
75 CHECK_IF_NOT_EQUAL_RETURN_VALUE(rc, RC_OK, RC_ERROR);
76
77 return RC_OK;
78 }
79
Flush(const int32_t streamId)80 RetCode SourceNode::Flush(const int32_t streamId)
81 {
82 CHECK_IF_NOT_EQUAL_RETURN_VALUE(handler_.count(streamId) > 0, true, RC_ERROR);
83 handler_[streamId]->StopCollectBuffers();
84 return RC_OK;
85 }
86
Stop(const int32_t streamId)87 RetCode SourceNode::Stop(const int32_t streamId)
88 {
89 CHECK_IF_NOT_EQUAL_RETURN_VALUE(handler_.count(streamId) > 0, true, RC_ERROR);
90 handler_[streamId]->StopDistributeBuffers();
91
92 {
93 std::lock_guard<std::mutex> l(hndl_);
94 auto it = handler_.find(streamId);
95 if (it != handler_.end()) {
96 handler_.erase(it);
97 }
98 }
99 return RC_OK;
100 }
101
Config(const int32_t streamId,const CaptureMeta & meta)102 RetCode SourceNode::Config(const int32_t streamId, const CaptureMeta& meta)
103 {
104 CHECK_IF_NOT_EQUAL_RETURN_VALUE(handler_.count(streamId) > 0, true, RC_ERROR);
105 (void)meta;
106 return RC_OK;
107 }
108
DeliverBuffer(std::shared_ptr<IBuffer> & buffer)109 void SourceNode::DeliverBuffer(std::shared_ptr<IBuffer>& buffer)
110 {
111 CHECK_IF_PTR_NULL_RETURN_VOID(buffer);
112 int32_t id = buffer->GetStreamId();
113 {
114 std::lock_guard<std::mutex> l(requestLock_);
115 CAMERA_LOGV("deliver a buffer to stream id:%{public}d", id);
116 if (captureRequests_.count(id) == 0) {
117 CAMERA_LOGV("queue size: 0");
118 buffer->SetBufferStatus(CAMERA_BUFFER_STATUS_INVALID);
119 } else if (captureRequests_[id].empty()) {
120 buffer->SetBufferStatus(CAMERA_BUFFER_STATUS_INVALID);
121 } else {
122 CAMERA_LOGV("queue size:%{public}u", captureRequests_[id].size());
123 buffer->SetCaptureId(captureRequests_[id].front());
124 captureRequests_[id].pop_front();
125 }
126 }
127 buffer->SetIsValidDataInSurfaceBuffer(false);
128 NodeBase::DeliverBuffer(buffer);
129 }
130
OnPackBuffer(std::shared_ptr<FrameSpec> frameSpec)131 void SourceNode::OnPackBuffer(std::shared_ptr<FrameSpec> frameSpec)
132 {
133 CAMERA_LOGI("SourceNode::OnPackBuffer enter");
134
135 CHECK_IF_PTR_NULL_RETURN_VOID(frameSpec);
136 auto buffer = frameSpec->buffer_;
137 CHECK_IF_PTR_NULL_RETURN_VOID(buffer);
138 handler_[buffer->GetStreamId()]->OnBuffer(buffer);
139
140 CAMERA_LOGI("SourceNode::OnPackBuffer exit");
141 return;
142 }
143
SetBufferCallback()144 void SourceNode::SetBufferCallback()
145 {
146 return;
147 }
148
ProvideBuffers(std::shared_ptr<FrameSpec> frameSpec)149 RetCode SourceNode::ProvideBuffers(std::shared_ptr<FrameSpec> frameSpec)
150 {
151 (void)frameSpec;
152 return RC_OK;
153 }
154
Capture(const int32_t streamId,const int32_t captureId)155 RetCode SourceNode::Capture(const int32_t streamId, const int32_t captureId)
156 {
157 std::lock_guard<std::mutex> l(requestLock_);
158 if (captureRequests_.count(streamId) == 0) {
159 captureRequests_[streamId] = {captureId};
160 } else {
161 captureRequests_[streamId].emplace_back(captureId);
162 }
163 CAMERA_LOGV("received a request from stream [id:%{public}d], queue size:%{public}u",
164 streamId, captureRequests_[streamId].size());
165 return RC_OK;
166 }
167
CancelCapture(const int32_t streamId)168 RetCode SourceNode::CancelCapture(const int32_t streamId)
169 {
170 (void)streamId;
171 return RC_OK;
172 }
173
PortHandler(std::shared_ptr<IPort> & p,bool isResize)174 SourceNode::PortHandler::PortHandler(std::shared_ptr<IPort>& p, bool isResize) : port(p), isResize_(isResize)
175 {
176 }
177
~PortHandler()178 SourceNode::PortHandler::~PortHandler()
179 {
180 CAMERA_LOGV("%{public}s, source node port handler dtor.", __FUNCTION__);
181 CollectorJoin();
182 DistributorJoin();
183 }
184
StartCollectBuffers()185 RetCode SourceNode::PortHandler::StartCollectBuffers()
186 {
187 CHECK_IF_PTR_NULL_RETURN_VALUE(port, RC_ERROR);
188 PortFormat format = {};
189 port->GetFormat(format);
190 uint32_t streamId = format.streamId_;
191
192 pool = BufferManager::GetInstance()->GetBufferPool(format.bufferPoolId_);
193 CHECK_IF_PTR_NULL_RETURN_VALUE(pool, RC_ERROR);
194 pool->NotifyStart();
195 CAMERA_LOGI("SourceNode::PortHandler::StartCollectBuffers");
196
197 {
198 std::unique_lock<std::mutex> l(cltLock);
199 cltRun = true;
200 }
201
202 collector = std::make_unique<std::thread>([this, &streamId] {
203 std::string name = "collect#" + std::to_string(streamId);
204 prctl(PR_SET_NAME, name.c_str());
205 CAMERA_LOGI("StartCollectBuffers thread start, name = %{public}s", name.c_str());
206 while (true) {
207 {
208 std::unique_lock<std::mutex> l(cltLock);
209 if (cltRun == false) {
210 CAMERA_LOGD("collect buffer thread break");
211 break;
212 }
213 }
214 CollectBuffers();
215 }
216 CAMERA_LOGI("StartCollectBuffers thread end, name = %{public}s", name.c_str());
217 });
218
219 return RC_OK;
220 }
221
CollectorJoin()222 RetCode SourceNode::PortHandler::CollectorJoin()
223 {
224 CHECK_IF_PTR_NULL_RETURN_VALUE(pool, RC_ERROR);
225 CAMERA_LOGI("SourceNode::PortHandler::CollectorJoin enter");
226 {
227 std::unique_lock<std::mutex> l(cltLock);
228 cltRun = false;
229 }
230 pool->NotifyStop();
231 if (collector != nullptr) {
232 collector->join();
233 collector.reset(nullptr);
234 }
235 CAMERA_LOGI("SourceNode::PortHandler::collector::join exit");
236 return RC_OK;
237 }
238
StopCollectBuffers()239 RetCode SourceNode::PortHandler::StopCollectBuffers()
240 {
241 RetCode rc = CollectorJoin();
242 CHECK_IF_NOT_EQUAL_RETURN_VALUE(rc, RC_OK, RC_ERROR);
243
244 auto node = port->GetNode();
245 if (node != nullptr) {
246 uint32_t n = pool->GetIdleBufferCount();
247 for (uint32_t i = 0; i < n; i++) {
248 auto buffer = pool->AcquireBuffer(-1);
249 node->DeliverBuffer(buffer);
250 }
251 }
252 CAMERA_LOGI("SourceNode::PortHandler::StopCollectBuffers exit");
253 return RC_OK;
254 }
255
CollectBuffers()256 void SourceNode::PortHandler::CollectBuffers()
257 {
258 CAMERA_LOGV("SourceNode::PortHandler::CollectBuffers");
259 CHECK_IF_PTR_NULL_RETURN_VOID(pool);
260 std::shared_ptr<IBuffer> buffer = pool->AcquireBuffer(-1);
261 CHECK_IF_PTR_NULL_RETURN_VOID(buffer);
262
263 PortFormat format = {};
264 port->GetFormat(format);
265 std::shared_ptr<FrameSpec> frameSpec = std::make_shared<FrameSpec>();
266 frameSpec->bufferPoolId_ = format.bufferPoolId_;
267 frameSpec->bufferCount_ = format.bufferCount_;
268 constexpr uint32_t NewBufferBytePrePiex = 4;
269 uint32_t bufferSize = maxWide_ * maxHigh_ * NewBufferBytePrePiex;
270 CAMERA_LOGI("streamId[%{public}d], bufferIndex[%{public}d], Size %{public}d => %{public}d",
271 buffer->GetStreamId(), buffer->GetIndex(), buffer->GetSize(), bufferSize);
272
273 if (buffer->GetVirAddress() == buffer->GetSuffaceBufferAddr()) {
274 CAMERA_LOGI("CollectBuffers begin malloc buffer");
275 auto bufferAddr = malloc(bufferSize);
276 if (bufferAddr != nullptr) {
277 buffer->SetVirAddress(bufferAddr);
278 buffer->SetSize(bufferSize);
279 } else {
280 CAMERA_LOGE("CollectBuffers malloc buffer fail");
281 }
282 }
283 frameSpec->buffer_ = buffer;
284 auto node = port->GetNode();
285 CHECK_IF_PTR_NULL_RETURN_VOID(node);
286 RetCode rc = node->ProvideBuffers(frameSpec);
287 if (rc == RC_ERROR) {
288 CAMERA_LOGE("provide buffer failed.");
289 }
290 }
291
StartDistributeBuffers()292 RetCode SourceNode::PortHandler::StartDistributeBuffers()
293 {
294 {
295 std::unique_lock<std::mutex> l(rblock);
296 dbtRun = true;
297 }
298
299 distributor = std::make_unique<std::thread>([this] {
300 PortFormat format = {};
301 port->GetFormat(format);
302 int id = format.streamId_;
303 std::string name = "distribute#" + std::to_string(id);
304 prctl(PR_SET_NAME, name.c_str());
305 CAMERA_LOGI("StartDistributeBuffers thread start, name = %{public}s", name.c_str());
306
307 while (true) {
308 {
309 std::unique_lock<std::mutex> l(rblock);
310 if (dbtRun == false) {
311 CAMERA_LOGD("distribute buffers thread break");
312 break;
313 }
314 }
315 DistributeBuffers();
316 }
317 CAMERA_LOGI("StartDistributeBuffers thread end, name = %{public}s", name.c_str());
318 });
319
320 return RC_OK;
321 }
322
DistributorJoin()323 RetCode SourceNode::PortHandler::DistributorJoin()
324 {
325 CAMERA_LOGV("SourceNode::PortHandler::DistributorJoin enter");
326 {
327 std::unique_lock<std::mutex> l(rblock);
328 dbtRun = false;
329 rbcv.notify_one();
330 }
331 if (distributor != nullptr) {
332 distributor->join();
333 distributor.reset(nullptr);
334 }
335 CAMERA_LOGV("SourceNode::PortHandler::DistributorJoin exit");
336 return RC_OK;
337 }
338
StopDistributeBuffers()339 RetCode SourceNode::PortHandler::StopDistributeBuffers()
340 {
341 RetCode rc = DistributorJoin();
342 CHECK_IF_NOT_EQUAL_RETURN_VALUE(rc, RC_OK, RC_ERROR);
343 FlushBuffers(); // flush buffers after stopping distributor
344 if (isResize_ == true) {
345 for (auto iter : cBuffer) {
346 free(iter.second);
347 }
348 cBuffer.clear();
349 }
350 CAMERA_LOGV("SourceNode::PortHandler::StopDistributeBuffers exit");
351 return RC_OK;
352 }
353
DistributeBuffers()354 void SourceNode::PortHandler::DistributeBuffers()
355 {
356 std::shared_ptr<IBuffer> buffer = nullptr;
357 {
358 std::unique_lock<std::mutex> l(rblock);
359 auto timeout = std::chrono::system_clock::now() + std::chrono::milliseconds(500); // 500ms
360 if (!rbcv.wait_until(l, timeout, [this] {
361 return (!dbtRun || !respondBufferList.empty());
362 })) {
363 CAMERA_LOGE("DistributeBuffers timeout, dbtRun=%{public}d, respondBufferList size=%{public}d",
364 dbtRun.load(std::memory_order_acquire), respondBufferList.size());
365 }
366
367 if (!dbtRun || respondBufferList.empty()) {
368 loopErrorCount_++;
369 if (loopErrorCount_ == LOOP_MAX_COUNT) {
370 CAMERA_LOGE("DistributeBuffers timeout 5s update dbtRun = false");
371 dbtRun = false;
372 }
373 return;
374 }
375 loopErrorCount_ = 0;
376 buffer = respondBufferList.front();
377 respondBufferList.pop_front();
378 }
379
380 auto node = port->GetNode();
381 CHECK_IF_PTR_NULL_RETURN_VOID(node);
382 CAMERA_LOGE("DistributeBuffers Loop, start deliverBuffer, streamId = %{public}d", buffer->GetStreamId());
383 node->DeliverBuffer(buffer);
384
385 return;
386 }
387
OnBuffer(std::shared_ptr<IBuffer> & buffer)388 void SourceNode::PortHandler::OnBuffer(std::shared_ptr<IBuffer>& buffer)
389 {
390 CAMERA_LOGV("SourceNode::PortHandler::OnBuffer enter");
391 {
392 std::unique_lock<std::mutex> l(rblock);
393 respondBufferList.emplace_back(buffer);
394 rbcv.notify_one();
395 }
396
397 CAMERA_LOGV("SourceNode::PortHandler::OnBuffer exit");
398
399 return;
400 }
401
FlushBuffers()402 void SourceNode::PortHandler::FlushBuffers()
403 {
404 CAMERA_LOGV("SourceNode::PortHandler::FlushBuffers enter");
405 if (respondBufferList.empty()) {
406 CAMERA_LOGV("SourceNode::PortHandler::FlushBuffers respondBufferList is empty");
407 return;
408 }
409
410 auto node = port->GetNode();
411 CHECK_IF_PTR_NULL_RETURN_VOID(node);
412 std::unique_lock<std::mutex> l(rblock);
413 while (!respondBufferList.empty()) {
414 auto buffer = respondBufferList.front();
415 node->DeliverBuffer(buffer);
416 respondBufferList.pop_front();
417 }
418 CAMERA_LOGV("SourceNode::PortHandler::FlushBuffers exit");
419
420 return;
421 }
422
setWideAndHigh(int32_t wide,int32_t high)423 void SourceNode::PortHandler::setWideAndHigh(int32_t wide, int32_t high)
424 {
425 maxWide_ = wide;
426 maxHigh_ = high;
427 }
428
429 REGISTERNODE(SourceNode, {"source"})
430 } // namespace OHOS::Camera
431