1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *     http://www.apache.org/licenses/LICENSE-2.0
7  * Unless required by applicable law or agreed to in writing, software
8  * distributed under the License is distributed on an "AS IS" BASIS,
9  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10  * See the License for the specific language governing permissions and
11  * limitations under the License.
12  */
13 
14 #include "source_node.h"
15 #include <unistd.h>
16 
17 namespace OHOS::Camera {
SourceNode(const std::string & name,const std::string & type,const std::string & cameraId)18 SourceNode::SourceNode(const std::string& name, const std::string& type, const std::string &cameraId)
19     : NodeBase(name, type, cameraId)
20 {
21     name_ = name;
22     type_ = type;
23     cameraIds_ = cameraId;
24     CAMERA_LOGV("%{public}s enter, type(%{public}s)\n", name_.c_str(), type_.c_str());
25 }
26 
~SourceNode()27 SourceNode::~SourceNode()
28 {
29     CAMERA_LOGV("%{public}s, source node dtor.", __FUNCTION__);
30 }
31 
Init(const int32_t streamId)32 RetCode SourceNode::Init(const int32_t streamId)
33 {
34     (void)streamId;
35     return RC_OK;
36 }
37 
Start(const int32_t streamId)38 RetCode SourceNode::Start(const int32_t streamId)
39 {
40     CAMERA_LOGI("SourceNode::Start [%{public}d] start", streamId);
41     std::shared_ptr<IPort> port = nullptr;
42     auto outPorts = GetOutPorts();
43     for (auto& p : outPorts) {
44         PortFormat format = {};
45         p->GetFormat(format);
46         if (streamId == format.streamId_) {
47             port = p;
48             break;
49         }
50     }
51     if (port == nullptr) {
52         return RC_ERROR;
53     }
54 
55     {
56         std::lock_guard<std::mutex> l(hndl_);
57         if (handler_.count(streamId) > 0) {
58             CAMERA_LOGI("stream [%{public}d] start again, skip", streamId);
59             return RC_OK;
60         }
61     }
62 
63     SetBufferCallback();
64     std::shared_ptr<PortHandler> ph = std::make_shared<PortHandler>(port, isAdjust_);
65     CHECK_IF_PTR_NULL_RETURN_VALUE(ph, RC_ERROR);
66     ph->setWideAndHigh(wide_, high_);
67     {
68         std::lock_guard<std::mutex> l(hndl_);
69         handler_[streamId] = ph;
70     }
71     RetCode rc = handler_[streamId]->StartCollectBuffers();
72     CHECK_IF_NOT_EQUAL_RETURN_VALUE(rc, RC_OK, RC_ERROR);
73 
74     rc = handler_[streamId]->StartDistributeBuffers();
75     CHECK_IF_NOT_EQUAL_RETURN_VALUE(rc, RC_OK, RC_ERROR);
76 
77     return RC_OK;
78 }
79 
Flush(const int32_t streamId)80 RetCode SourceNode::Flush(const int32_t streamId)
81 {
82     CHECK_IF_NOT_EQUAL_RETURN_VALUE(handler_.count(streamId) > 0, true, RC_ERROR);
83     handler_[streamId]->StopCollectBuffers();
84     return RC_OK;
85 }
86 
Stop(const int32_t streamId)87 RetCode SourceNode::Stop(const int32_t streamId)
88 {
89     CHECK_IF_NOT_EQUAL_RETURN_VALUE(handler_.count(streamId) > 0, true, RC_ERROR);
90     handler_[streamId]->StopDistributeBuffers();
91 
92     {
93         std::lock_guard<std::mutex> l(hndl_);
94         auto it = handler_.find(streamId);
95         if (it != handler_.end()) {
96             handler_.erase(it);
97         }
98     }
99     return RC_OK;
100 }
101 
Config(const int32_t streamId,const CaptureMeta & meta)102 RetCode SourceNode::Config(const int32_t streamId, const CaptureMeta& meta)
103 {
104     CHECK_IF_NOT_EQUAL_RETURN_VALUE(handler_.count(streamId) > 0, true, RC_ERROR);
105     (void)meta;
106     return RC_OK;
107 }
108 
DeliverBuffer(std::shared_ptr<IBuffer> & buffer)109 void SourceNode::DeliverBuffer(std::shared_ptr<IBuffer>& buffer)
110 {
111     CHECK_IF_PTR_NULL_RETURN_VOID(buffer);
112     int32_t id = buffer->GetStreamId();
113     {
114         std::lock_guard<std::mutex> l(requestLock_);
115         CAMERA_LOGV("deliver a buffer to stream id:%{public}d", id);
116         if (captureRequests_.count(id) == 0) {
117             CAMERA_LOGV("queue size: 0");
118             buffer->SetBufferStatus(CAMERA_BUFFER_STATUS_INVALID);
119         } else if (captureRequests_[id].empty()) {
120             buffer->SetBufferStatus(CAMERA_BUFFER_STATUS_INVALID);
121         } else {
122             CAMERA_LOGV("queue size:%{public}u", captureRequests_[id].size());
123             buffer->SetCaptureId(captureRequests_[id].front());
124             captureRequests_[id].pop_front();
125         }
126     }
127     buffer->SetIsValidDataInSurfaceBuffer(false);
128     NodeBase::DeliverBuffer(buffer);
129 }
130 
OnPackBuffer(std::shared_ptr<FrameSpec> frameSpec)131 void SourceNode::OnPackBuffer(std::shared_ptr<FrameSpec> frameSpec)
132 {
133     CAMERA_LOGI("SourceNode::OnPackBuffer enter");
134 
135     CHECK_IF_PTR_NULL_RETURN_VOID(frameSpec);
136     auto buffer = frameSpec->buffer_;
137     CHECK_IF_PTR_NULL_RETURN_VOID(buffer);
138     handler_[buffer->GetStreamId()]->OnBuffer(buffer);
139 
140     CAMERA_LOGI("SourceNode::OnPackBuffer exit");
141     return;
142 }
143 
SetBufferCallback()144 void SourceNode::SetBufferCallback()
145 {
146     return;
147 }
148 
ProvideBuffers(std::shared_ptr<FrameSpec> frameSpec)149 RetCode SourceNode::ProvideBuffers(std::shared_ptr<FrameSpec> frameSpec)
150 {
151     (void)frameSpec;
152     return RC_OK;
153 }
154 
Capture(const int32_t streamId,const int32_t captureId)155 RetCode SourceNode::Capture(const int32_t streamId, const int32_t captureId)
156 {
157     std::lock_guard<std::mutex> l(requestLock_);
158     if (captureRequests_.count(streamId) == 0) {
159         captureRequests_[streamId] = {captureId};
160     } else {
161         captureRequests_[streamId].emplace_back(captureId);
162     }
163     CAMERA_LOGV("received a request from stream [id:%{public}d], queue size:%{public}u",
164         streamId, captureRequests_[streamId].size());
165     return RC_OK;
166 }
167 
CancelCapture(const int32_t streamId)168 RetCode SourceNode::CancelCapture(const int32_t streamId)
169 {
170     (void)streamId;
171     return RC_OK;
172 }
173 
PortHandler(std::shared_ptr<IPort> & p,bool isResize)174 SourceNode::PortHandler::PortHandler(std::shared_ptr<IPort>& p, bool isResize) : port(p), isResize_(isResize)
175 {
176 }
177 
~PortHandler()178 SourceNode::PortHandler::~PortHandler()
179 {
180     CAMERA_LOGV("%{public}s, source node port handler dtor.", __FUNCTION__);
181     CollectorJoin();
182     DistributorJoin();
183 }
184 
StartCollectBuffers()185 RetCode SourceNode::PortHandler::StartCollectBuffers()
186 {
187     CHECK_IF_PTR_NULL_RETURN_VALUE(port, RC_ERROR);
188     PortFormat format = {};
189     port->GetFormat(format);
190     uint32_t streamId = format.streamId_;
191 
192     pool = BufferManager::GetInstance()->GetBufferPool(format.bufferPoolId_);
193     CHECK_IF_PTR_NULL_RETURN_VALUE(pool, RC_ERROR);
194     pool->NotifyStart();
195     CAMERA_LOGI("SourceNode::PortHandler::StartCollectBuffers");
196 
197     {
198         std::unique_lock<std::mutex> l(cltLock);
199         cltRun = true;
200     }
201 
202     collector = std::make_unique<std::thread>([this, &streamId] {
203         std::string name = "collect#" + std::to_string(streamId);
204         prctl(PR_SET_NAME, name.c_str());
205         CAMERA_LOGI("StartCollectBuffers thread start, name = %{public}s", name.c_str());
206         while (true) {
207             {
208                 std::unique_lock<std::mutex> l(cltLock);
209                 if (cltRun == false) {
210                     CAMERA_LOGD("collect buffer thread break");
211                     break;
212                 }
213             }
214             CollectBuffers();
215         }
216         CAMERA_LOGI("StartCollectBuffers thread end, name = %{public}s", name.c_str());
217     });
218 
219     return RC_OK;
220 }
221 
CollectorJoin()222 RetCode SourceNode::PortHandler::CollectorJoin()
223 {
224     CHECK_IF_PTR_NULL_RETURN_VALUE(pool, RC_ERROR);
225     CAMERA_LOGI("SourceNode::PortHandler::CollectorJoin enter");
226     {
227         std::unique_lock<std::mutex> l(cltLock);
228         cltRun = false;
229     }
230     pool->NotifyStop();
231     if (collector != nullptr) {
232         collector->join();
233         collector.reset(nullptr);
234     }
235     CAMERA_LOGI("SourceNode::PortHandler::collector::join exit");
236     return RC_OK;
237 }
238 
StopCollectBuffers()239 RetCode SourceNode::PortHandler::StopCollectBuffers()
240 {
241     RetCode rc = CollectorJoin();
242     CHECK_IF_NOT_EQUAL_RETURN_VALUE(rc, RC_OK, RC_ERROR);
243 
244     auto node = port->GetNode();
245     if (node != nullptr) {
246         uint32_t n = pool->GetIdleBufferCount();
247         for (uint32_t i = 0; i < n; i++) {
248             auto buffer = pool->AcquireBuffer(-1);
249             node->DeliverBuffer(buffer);
250         }
251     }
252     CAMERA_LOGI("SourceNode::PortHandler::StopCollectBuffers exit");
253     return RC_OK;
254 }
255 
CollectBuffers()256 void SourceNode::PortHandler::CollectBuffers()
257 {
258     CAMERA_LOGV("SourceNode::PortHandler::CollectBuffers");
259     CHECK_IF_PTR_NULL_RETURN_VOID(pool);
260     std::shared_ptr<IBuffer> buffer = pool->AcquireBuffer(-1);
261     CHECK_IF_PTR_NULL_RETURN_VOID(buffer);
262 
263     PortFormat format = {};
264     port->GetFormat(format);
265     std::shared_ptr<FrameSpec> frameSpec = std::make_shared<FrameSpec>();
266     frameSpec->bufferPoolId_ = format.bufferPoolId_;
267     frameSpec->bufferCount_ = format.bufferCount_;
268     constexpr uint32_t NewBufferBytePrePiex = 4;
269     uint32_t bufferSize = maxWide_ * maxHigh_ * NewBufferBytePrePiex;
270     CAMERA_LOGI("streamId[%{public}d], bufferIndex[%{public}d], Size %{public}d => %{public}d",
271                 buffer->GetStreamId(), buffer->GetIndex(), buffer->GetSize(), bufferSize);
272 
273     if (buffer->GetVirAddress() == buffer->GetSuffaceBufferAddr()) {
274         CAMERA_LOGI("CollectBuffers begin malloc buffer");
275         auto bufferAddr = malloc(bufferSize);
276         if (bufferAddr != nullptr) {
277             buffer->SetVirAddress(bufferAddr);
278             buffer->SetSize(bufferSize);
279         } else {
280             CAMERA_LOGE("CollectBuffers malloc buffer fail");
281         }
282     }
283     frameSpec->buffer_ = buffer;
284     auto node = port->GetNode();
285     CHECK_IF_PTR_NULL_RETURN_VOID(node);
286     RetCode rc = node->ProvideBuffers(frameSpec);
287     if (rc == RC_ERROR) {
288         CAMERA_LOGE("provide buffer failed.");
289     }
290 }
291 
StartDistributeBuffers()292 RetCode SourceNode::PortHandler::StartDistributeBuffers()
293 {
294     {
295         std::unique_lock<std::mutex> l(rblock);
296         dbtRun = true;
297     }
298 
299     distributor = std::make_unique<std::thread>([this] {
300         PortFormat format = {};
301         port->GetFormat(format);
302         int id = format.streamId_;
303         std::string name = "distribute#" + std::to_string(id);
304         prctl(PR_SET_NAME, name.c_str());
305         CAMERA_LOGI("StartDistributeBuffers thread start, name = %{public}s", name.c_str());
306 
307         while (true) {
308             {
309                 std::unique_lock<std::mutex> l(rblock);
310                 if (dbtRun == false) {
311                     CAMERA_LOGD("distribute buffers thread break");
312                     break;
313                 }
314             }
315             DistributeBuffers();
316         }
317         CAMERA_LOGI("StartDistributeBuffers thread end, name = %{public}s", name.c_str());
318     });
319 
320     return RC_OK;
321 }
322 
DistributorJoin()323 RetCode SourceNode::PortHandler::DistributorJoin()
324 {
325     CAMERA_LOGV("SourceNode::PortHandler::DistributorJoin enter");
326     {
327         std::unique_lock<std::mutex> l(rblock);
328         dbtRun = false;
329         rbcv.notify_one();
330     }
331     if (distributor != nullptr) {
332         distributor->join();
333         distributor.reset(nullptr);
334     }
335     CAMERA_LOGV("SourceNode::PortHandler::DistributorJoin exit");
336     return RC_OK;
337 }
338 
StopDistributeBuffers()339 RetCode SourceNode::PortHandler::StopDistributeBuffers()
340 {
341     RetCode rc = DistributorJoin();
342     CHECK_IF_NOT_EQUAL_RETURN_VALUE(rc, RC_OK, RC_ERROR);
343     FlushBuffers(); // flush buffers after stopping distributor
344     if (isResize_ == true) {
345         for (auto iter : cBuffer) {
346             free(iter.second);
347         }
348         cBuffer.clear();
349     }
350     CAMERA_LOGV("SourceNode::PortHandler::StopDistributeBuffers exit");
351     return RC_OK;
352 }
353 
DistributeBuffers()354 void SourceNode::PortHandler::DistributeBuffers()
355 {
356     std::shared_ptr<IBuffer> buffer = nullptr;
357     {
358         std::unique_lock<std::mutex> l(rblock);
359         auto timeout = std::chrono::system_clock::now() + std::chrono::milliseconds(500); // 500ms
360         if (!rbcv.wait_until(l, timeout, [this] {
361             return (!dbtRun || !respondBufferList.empty());
362             })) {
363             CAMERA_LOGE("DistributeBuffers timeout, dbtRun=%{public}d, respondBufferList size=%{public}d",
364                 dbtRun.load(std::memory_order_acquire), respondBufferList.size());
365         }
366 
367         if (!dbtRun || respondBufferList.empty()) {
368             loopErrorCount_++;
369             if (loopErrorCount_ == LOOP_MAX_COUNT) {
370                 CAMERA_LOGE("DistributeBuffers timeout 5s update dbtRun = false");
371                 dbtRun = false;
372             }
373             return;
374         }
375         loopErrorCount_ = 0;
376         buffer = respondBufferList.front();
377         respondBufferList.pop_front();
378     }
379 
380     auto node = port->GetNode();
381     CHECK_IF_PTR_NULL_RETURN_VOID(node);
382     CAMERA_LOGE("DistributeBuffers Loop, start deliverBuffer, streamId = %{public}d", buffer->GetStreamId());
383     node->DeliverBuffer(buffer);
384 
385     return;
386 }
387 
OnBuffer(std::shared_ptr<IBuffer> & buffer)388 void SourceNode::PortHandler::OnBuffer(std::shared_ptr<IBuffer>& buffer)
389 {
390     CAMERA_LOGV("SourceNode::PortHandler::OnBuffer enter");
391     {
392         std::unique_lock<std::mutex> l(rblock);
393         respondBufferList.emplace_back(buffer);
394         rbcv.notify_one();
395     }
396 
397     CAMERA_LOGV("SourceNode::PortHandler::OnBuffer exit");
398 
399     return;
400 }
401 
FlushBuffers()402 void SourceNode::PortHandler::FlushBuffers()
403 {
404     CAMERA_LOGV("SourceNode::PortHandler::FlushBuffers enter");
405     if (respondBufferList.empty()) {
406         CAMERA_LOGV("SourceNode::PortHandler::FlushBuffers respondBufferList is empty");
407         return;
408     }
409 
410     auto node = port->GetNode();
411     CHECK_IF_PTR_NULL_RETURN_VOID(node);
412     std::unique_lock<std::mutex> l(rblock);
413     while (!respondBufferList.empty()) {
414         auto buffer = respondBufferList.front();
415         node->DeliverBuffer(buffer);
416         respondBufferList.pop_front();
417     }
418     CAMERA_LOGV("SourceNode::PortHandler::FlushBuffers exit");
419 
420     return;
421 }
422 
setWideAndHigh(int32_t wide,int32_t high)423 void SourceNode::PortHandler::setWideAndHigh(int32_t wide, int32_t high)
424 {
425     maxWide_ = wide;
426     maxHigh_ = high;
427 }
428 
429 REGISTERNODE(SourceNode, {"source"})
430 } // namespace OHOS::Camera
431