1 /*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #ifdef RECORDER_SUPPORT
16
17 #define HST_LOG_TAG "MuxerFilter"
18
19 #include "pipeline/filters/muxer/muxer_filter.h"
20 #include "foundation/log.h"
21 #include "pipeline/factory/filter_factory.h"
22 #include "pipeline/filters/common/plugin_settings.h"
23 #include "pipeline/filters/common/plugin_utils.h"
24 #include "pipeline/filters/muxer/data_spliter.h"
25 #include "plugin/common/plugin_attr_desc.h"
26
27 namespace OHOS {
28 namespace Media {
29 namespace Pipeline {
30 namespace {
Intersections(const std::vector<std::shared_ptr<Plugin::PluginInfo>> & caps1,const std::vector<std::pair<std::shared_ptr<Plugin::PluginInfo>,Plugin::Capability>> & caps2)31 std::vector<std::shared_ptr<Plugin::PluginInfo>> Intersections(
32 const std::vector<std::shared_ptr<Plugin::PluginInfo>>& caps1,
33 const std::vector<std::pair<std::shared_ptr<Plugin::PluginInfo>, Plugin::Capability>>& caps2)
34 {
35 std::vector<std::shared_ptr<Plugin::PluginInfo>> intersections;
36 for (const auto& cap1 : caps1) {
37 for (const auto& cap2 : caps2) {
38 if (cap1->name == cap2.first->name) {
39 intersections.emplace_back(cap1);
40 }
41 }
42 }
43 return intersections;
44 }
45 }
46 static AutoRegisterFilter<MuxerFilter> g_registerFilterHelper("builtin.recorder.muxer");
47
MuxerFilter(std::string name)48 MuxerFilter::MuxerFilter(std::string name) : FilterBase(std::move(name)),
49 muxerDataSink_(std::make_shared<MuxerDataSink>())
50 {
51 filterType_ = FilterType::MUXER;
52 }
53
~MuxerFilter()54 MuxerFilter::~MuxerFilter() {}
Init(EventReceiver * receiver,FilterCallback * callback)55 void MuxerFilter::Init(EventReceiver* receiver, FilterCallback* callback)
56 {
57 this->eventReceiver_ = receiver;
58 this->callback_ = callback;
59 inPorts_.clear();
60 outPorts_.clear();
61 outPorts_.emplace_back(std::make_shared<Pipeline::OutPort>(this, PORT_NAME_DEFAULT));
62 muxerDataSink_->muxerFilter_ = this;
63 state_ = FilterState::INITIALIZED;
64 }
UpdateAndInitPluginByInfo(const std::shared_ptr<Plugin::PluginInfo> & selectedPluginInfo)65 bool MuxerFilter::UpdateAndInitPluginByInfo(const std::shared_ptr<Plugin::PluginInfo>& selectedPluginInfo)
66 {
67 if (selectedPluginInfo == nullptr) {
68 MEDIA_LOG_W("no available info to update plugin");
69 return false;
70 }
71 if (plugin_ != nullptr) {
72 if (targetPluginInfo_ != nullptr && targetPluginInfo_->name == selectedPluginInfo->name) {
73 if (plugin_->Reset() == Plugin::Status::OK) {
74 return true;
75 }
76 MEDIA_LOG_W("reuse previous plugin " PUBLIC_LOG_S " failed, will create new plugin",
77 targetPluginInfo_->name.c_str());
78 }
79 plugin_->Deinit();
80 }
81
82 plugin_ = Plugin::PluginManager::Instance().CreateMuxerPlugin(selectedPluginInfo->name);
83 if (plugin_ == nullptr) {
84 MEDIA_LOG_E("cannot create plugin " PUBLIC_LOG_S, selectedPluginInfo->name.c_str());
85 return false;
86 }
87 auto err = TranslatePluginStatus(plugin_->Init());
88 if (err != ErrorCode::SUCCESS) {
89 MEDIA_LOG_E("muxer plugin init error");
90 return false;
91 }
92 plugin_->SetCallback(this);
93 targetPluginInfo_ = selectedPluginInfo;
94 return true;
95 }
96
Negotiate(const std::string & inPort,const std::shared_ptr<const Plugin::Capability> & upstreamCap,Plugin::Capability & negotiatedCap,const Plugin::Meta & upstreamParams,Plugin::Meta & downstreamParams)97 bool MuxerFilter::Negotiate(const std::string& inPort,
98 const std::shared_ptr<const Plugin::Capability>& upstreamCap,
99 Plugin::Capability& negotiatedCap,
100 const Plugin::Meta& upstreamParams,
101 Plugin::Meta& downstreamParams)
102 {
103 if (state_ != FilterState::PREPARING) {
104 MEDIA_LOG_W("muxer filter is not in preparing when negotiate");
105 return false;
106 }
107 hasWriteHeader_ = false;
108 capabilityCache_.emplace_back(std::make_pair(inPort, *upstreamCap));
109 if (capabilityCache_.size() < inPorts_.size()) {
110 return true;
111 }
112 MEDIA_LOG_I("all track caps has been received, start negotiating downstream");
113 auto candidate = FindAvailablePluginsByOutputMime(containerMime_, Plugin::PluginType::MUXER);
114 for (const auto& cache: capabilityCache_) {
115 auto tmp = FindAvailablePlugins(cache.second, Plugin::PluginType::MUXER);
116 candidate = Intersections(candidate, tmp);
117 if (candidate.empty()) {
118 break;
119 }
120 }
121 if (candidate.empty()) {
122 MEDIA_LOG_E("cannot find any available plugins");
123 return false;
124 }
125 auto muxerCap = std::make_shared<Capability>(containerMime_);
126 Capability downCap;
127 if (!outPorts_[0]->Negotiate(muxerCap, downCap, upstreamParams, downstreamParams)) {
128 MEDIA_LOG_E("downstream of muxer filter negotiate failed");
129 return false;
130 }
131 // always use the first candidate plugin info
132 return UpdateAndInitPluginByInfo(candidate[0]);
133 }
AddTrackThenConfigure(const std::pair<std::string,Plugin::Meta> & metaPair)134 ErrorCode MuxerFilter::AddTrackThenConfigure(const std::pair<std::string, Plugin::Meta>& metaPair)
135 {
136 uint32_t trackId = 0;
137 ErrorCode isTranSuccess = TranslatePluginStatus(plugin_->AddTrack(trackId));
138 if (isTranSuccess != ErrorCode::SUCCESS) {
139 MEDIA_LOG_E("muxer plugin add track failed");
140 return isTranSuccess;
141 }
142 trackInfos_.emplace_back(TrackInfo{static_cast<int32_t>(trackId), metaPair.first, false});
143 auto parameterMap = PluginParameterTable::GetInstance().FindAllowedParameterMap(filterType_);
144 for (const auto& keyPair : parameterMap) {
145 Plugin::ValueType outValue;
146 auto isGetSuccess = metaPair.second.GetData(static_cast<Plugin::Tag>(keyPair.first), outValue);
147 if (isGetSuccess &&
148 (keyPair.second.second & PARAM_SET) &&
149 keyPair.second.first(keyPair.first, outValue)) {
150 plugin_->SetTrackParameter(trackId, keyPair.first, outValue);
151 } else {
152 if (!Plugin::HasTagInfo(keyPair.first)) {
153 MEDIA_LOG_W("tag " PUBLIC_LOG_D32 " is not in map, may be update it?", keyPair.first);
154 } else {
155 MEDIA_LOG_W("parameter " PUBLIC_LOG_S " in meta is not found or type mismatch",
156 Plugin::GetTagStrName(keyPair.first));
157 }
158 }
159 }
160 return ErrorCode::SUCCESS;
161 }
162
ConfigureToStart()163 ErrorCode MuxerFilter::ConfigureToStart()
164 {
165 ErrorCode ret;
166 for (const auto& cache: metaCache_) {
167 ret = AddTrackThenConfigure(cache);
168 if (ret != ErrorCode::SUCCESS) {
169 MEDIA_LOG_E("add and configure for track from inPort " PUBLIC_LOG_S " failed", cache.first.c_str());
170 return ret;
171 }
172 }
173 // todo add other global meta
174
175 ret = TranslatePluginStatus(plugin_->Prepare());
176 if (ret != ErrorCode::SUCCESS) {
177 MEDIA_LOG_E("muxer plugin prepare failed");
178 return ret;
179 }
180 ret = TranslatePluginStatus(plugin_->Start());
181 if (ret != ErrorCode::SUCCESS) {
182 MEDIA_LOG_E("muxer plugin start failed");
183 }
184 return ret;
185 }
Configure(const std::string & inPort,const std::shared_ptr<const Plugin::Meta> & upstreamMeta,Plugin::Meta & upstreamParams,Plugin::Meta & downstreamParams)186 bool MuxerFilter::Configure(const std::string& inPort, const std::shared_ptr<const Plugin::Meta>& upstreamMeta,
187 Plugin::Meta& upstreamParams, Plugin::Meta& downstreamParams)
188 {
189 std::string tmp;
190 if (!upstreamMeta->Get<Plugin::Tag::MIME>(tmp)) {
191 MEDIA_LOG_E("stream meta must contain mime, which is not found in current stream from port " PUBLIC_LOG_S,
192 inPort.c_str());
193 return false;
194 }
195 metaCache_.emplace_back(std::make_pair(inPort, *upstreamMeta));
196 if (metaCache_.size() < inPorts_.size()) {
197 return true;
198 }
199 if (plugin_ == nullptr) {
200 MEDIA_LOG_E("cannot configure when no plugin available");
201 return false;
202 }
203
204 auto meta = std::make_shared<Plugin::Meta>();
205 FALSE_LOG(meta->Set<Plugin::Tag::MIME>(containerMime_));
206 if (!outPorts_[0]->Configure(meta, upstreamParams, downstreamParams)) {
207 MEDIA_LOG_E("downstream of muxer filter configure failed");
208 return false;
209 }
210 plugin_->SetDataSink(muxerDataSink_);
211 auto ret = ConfigureToStart();
212 if (ret != ErrorCode::SUCCESS) {
213 MEDIA_LOG_E("muxer filter configure and start error");
214 OnEvent({name_, EventType::EVENT_ERROR, ret});
215 return false;
216 }
217 state_ = FilterState::READY;
218 OnEvent({name_, EventType::EVENT_READY});
219 MEDIA_LOG_I("muxer send EVENT_READY");
220 return true;
221 }
222
SetOutputFormat(std::string containerMime)223 ErrorCode MuxerFilter::SetOutputFormat(std::string containerMime)
224 {
225 containerMime_ = std::move(containerMime);
226 return ErrorCode::SUCCESS;
227 }
228
AddTrack(std::shared_ptr<InPort> & trackPort)229 ErrorCode MuxerFilter::AddTrack(std::shared_ptr<InPort> &trackPort)
230 {
231 if (state_ != FilterState::INITIALIZED) {
232 return ErrorCode::ERROR_INVALID_OPERATION;
233 }
234 trackPort = std::make_shared<InPort>(this, std::string(PORT_NAME_DEFAULT) + std::to_string(inPorts_.size()));
235 inPorts_.emplace_back(trackPort);
236 return ErrorCode::SUCCESS;
237 }
238
SetMaxDuration(uint64_t maxDuration)239 ErrorCode MuxerFilter::SetMaxDuration(uint64_t maxDuration)
240 {
241 return ErrorCode::SUCCESS;
242 }
243
SetMaxSize(uint64_t maxSize)244 ErrorCode MuxerFilter::SetMaxSize(uint64_t maxSize)
245 {
246 return ErrorCode::SUCCESS;
247 }
248
StartNextSegment()249 ErrorCode MuxerFilter::StartNextSegment()
250 {
251 return ErrorCode::SUCCESS;
252 }
253
SendEos()254 ErrorCode MuxerFilter::SendEos()
255 {
256 OSAL::ScopedLock lock(pushDataMutex_);
257 MEDIA_LOG_I("SendEos entered.");
258 eos_ = true;
259 if (hasWriteHeader_ && plugin_) {
260 plugin_->WriteTrailer();
261 }
262 hasWriteHeader_ = false;
263 auto buf = std::make_shared<AVBuffer>();
264 buf->flag |= BUFFER_FLAG_EOS;
265 outPorts_[0]->PushData(buf, -1);
266 metaCache_.clear();
267 return ErrorCode::SUCCESS;
268 }
269
AllTracksEos()270 bool MuxerFilter::AllTracksEos()
271 {
272 return static_cast<size_t>(eosTrackCnt.load()) == trackInfos_.size();
273 }
UpdateEosState(const std::string & inPort)274 void MuxerFilter::UpdateEosState(const std::string& inPort)
275 {
276 int32_t eosCnt = 0;
277 for (auto& item : trackInfos_) {
278 if (item.inPort == inPort) {
279 item.eos = true;
280 }
281 if (item.eos) {
282 eosCnt++;
283 }
284 }
285 eosTrackCnt = eosCnt;
286 }
287
PushData(const std::string & inPort,const AVBufferPtr & buffer,int64_t offset)288 ErrorCode MuxerFilter::PushData(const std::string& inPort, const AVBufferPtr& buffer, int64_t offset)
289 {
290 {
291 OSAL::ScopedLock lock(pushDataMutex_);
292 if (state_ != FilterState::READY && state_ != FilterState::PAUSED && state_ != FilterState::RUNNING) {
293 MEDIA_LOG_W("pushing data to muxer when state is " PUBLIC_LOG_D32, static_cast<int>(state_.load()));
294 return ErrorCode::ERROR_INVALID_OPERATION;
295 }
296 if (eos_) {
297 MEDIA_LOG_D("SendEos exit");
298 return ErrorCode::SUCCESS;
299 }
300 // todo we should consider more tracks
301 if (!hasWriteHeader_) {
302 plugin_->WriteHeader();
303 hasWriteHeader_ = true;
304 }
305 if (buffer->GetMemory()->GetSize() != 0) {
306 plugin_->WriteFrame(buffer);
307 }
308
309 if (buffer->flag & BUFFER_FLAG_EOS) {
310 MEDIA_LOG_I("It is EOS buffer");
311 UpdateEosState(inPort);
312 }
313 }
314 if (AllTracksEos()) {
315 SendEos();
316 }
317 return ErrorCode::SUCCESS;
318 }
319
WriteAt(int64_t offset,const std::shared_ptr<Plugin::Buffer> & buffer)320 Plugin::Status MuxerFilter::MuxerDataSink::WriteAt(int64_t offset, const std::shared_ptr<Plugin::Buffer> &buffer)
321 {
322 if (muxerFilter_ != nullptr) {
323 muxerFilter_->outPorts_[0]->PushData(buffer, offset);
324 }
325 return Plugin::Status::OK;
326 }
327
Start()328 ErrorCode MuxerFilter::Start()
329 {
330 eos_ = false;
331 return FilterBase::Start();
332 }
333 } // Pipeline
334 } // Media
335 } // OHOS
336 #endif // RECORDER_SUPPORT