1 /*
2  * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #ifdef RECORDER_SUPPORT
16 
17 #define HST_LOG_TAG "MuxerFilter"
18 
19 #include "pipeline/filters/muxer/muxer_filter.h"
20 #include "foundation/log.h"
21 #include "pipeline/factory/filter_factory.h"
22 #include "pipeline/filters/common/plugin_settings.h"
23 #include "pipeline/filters/common/plugin_utils.h"
24 #include "pipeline/filters/muxer/data_spliter.h"
25 #include "plugin/common/plugin_attr_desc.h"
26 
27 namespace OHOS {
28 namespace Media {
29 namespace Pipeline {
30 namespace {
Intersections(const std::vector<std::shared_ptr<Plugin::PluginInfo>> & caps1,const std::vector<std::pair<std::shared_ptr<Plugin::PluginInfo>,Plugin::Capability>> & caps2)31 std::vector<std::shared_ptr<Plugin::PluginInfo>> Intersections(
32     const std::vector<std::shared_ptr<Plugin::PluginInfo>>& caps1,
33     const std::vector<std::pair<std::shared_ptr<Plugin::PluginInfo>, Plugin::Capability>>& caps2)
34 {
35     std::vector<std::shared_ptr<Plugin::PluginInfo>> intersections;
36     for (const auto& cap1 : caps1) {
37         for (const auto& cap2 : caps2) {
38             if (cap1->name == cap2.first->name) {
39                 intersections.emplace_back(cap1);
40             }
41         }
42     }
43     return intersections;
44 }
45 }
46 static AutoRegisterFilter<MuxerFilter> g_registerFilterHelper("builtin.recorder.muxer");
47 
MuxerFilter(std::string name)48 MuxerFilter::MuxerFilter(std::string name) : FilterBase(std::move(name)),
49     muxerDataSink_(std::make_shared<MuxerDataSink>())
50 {
51     filterType_ = FilterType::MUXER;
52 }
53 
~MuxerFilter()54 MuxerFilter::~MuxerFilter() {}
Init(EventReceiver * receiver,FilterCallback * callback)55 void MuxerFilter::Init(EventReceiver* receiver, FilterCallback* callback)
56 {
57     this->eventReceiver_ = receiver;
58     this->callback_ = callback;
59     inPorts_.clear();
60     outPorts_.clear();
61     outPorts_.emplace_back(std::make_shared<Pipeline::OutPort>(this, PORT_NAME_DEFAULT));
62     muxerDataSink_->muxerFilter_ = this;
63     state_ = FilterState::INITIALIZED;
64 }
UpdateAndInitPluginByInfo(const std::shared_ptr<Plugin::PluginInfo> & selectedPluginInfo)65 bool MuxerFilter::UpdateAndInitPluginByInfo(const std::shared_ptr<Plugin::PluginInfo>& selectedPluginInfo)
66 {
67     if (selectedPluginInfo == nullptr) {
68         MEDIA_LOG_W("no available info to update plugin");
69         return false;
70     }
71     if (plugin_ != nullptr) {
72         if (targetPluginInfo_ != nullptr && targetPluginInfo_->name == selectedPluginInfo->name) {
73             if (plugin_->Reset() == Plugin::Status::OK) {
74                 return true;
75             }
76             MEDIA_LOG_W("reuse previous plugin " PUBLIC_LOG_S " failed, will create new plugin",
77                         targetPluginInfo_->name.c_str());
78         }
79         plugin_->Deinit();
80     }
81 
82     plugin_ = Plugin::PluginManager::Instance().CreateMuxerPlugin(selectedPluginInfo->name);
83     if (plugin_ == nullptr) {
84         MEDIA_LOG_E("cannot create plugin " PUBLIC_LOG_S, selectedPluginInfo->name.c_str());
85         return false;
86     }
87     auto err = TranslatePluginStatus(plugin_->Init());
88     if (err != ErrorCode::SUCCESS) {
89         MEDIA_LOG_E("muxer plugin init error");
90         return false;
91     }
92     plugin_->SetCallback(this);
93     targetPluginInfo_ = selectedPluginInfo;
94     return true;
95 }
96 
Negotiate(const std::string & inPort,const std::shared_ptr<const Plugin::Capability> & upstreamCap,Plugin::Capability & negotiatedCap,const Plugin::Meta & upstreamParams,Plugin::Meta & downstreamParams)97 bool MuxerFilter::Negotiate(const std::string& inPort,
98                             const std::shared_ptr<const Plugin::Capability>& upstreamCap,
99                             Plugin::Capability& negotiatedCap,
100                             const Plugin::Meta& upstreamParams,
101                             Plugin::Meta& downstreamParams)
102 {
103     if (state_ != FilterState::PREPARING) {
104         MEDIA_LOG_W("muxer filter is not in preparing when negotiate");
105         return false;
106     }
107     hasWriteHeader_ = false;
108     capabilityCache_.emplace_back(std::make_pair(inPort, *upstreamCap));
109     if (capabilityCache_.size() < inPorts_.size()) {
110         return true;
111     }
112     MEDIA_LOG_I("all track caps has been received, start negotiating downstream");
113     auto candidate = FindAvailablePluginsByOutputMime(containerMime_, Plugin::PluginType::MUXER);
114     for (const auto& cache: capabilityCache_) {
115         auto tmp = FindAvailablePlugins(cache.second, Plugin::PluginType::MUXER);
116         candidate = Intersections(candidate, tmp);
117         if (candidate.empty()) {
118             break;
119         }
120     }
121     if (candidate.empty()) {
122         MEDIA_LOG_E("cannot find any available plugins");
123         return false;
124     }
125     auto muxerCap = std::make_shared<Capability>(containerMime_);
126     Capability downCap;
127     if (!outPorts_[0]->Negotiate(muxerCap, downCap, upstreamParams, downstreamParams)) {
128         MEDIA_LOG_E("downstream of muxer filter negotiate failed");
129         return false;
130     }
131     // always use the first candidate plugin info
132     return UpdateAndInitPluginByInfo(candidate[0]);
133 }
AddTrackThenConfigure(const std::pair<std::string,Plugin::Meta> & metaPair)134 ErrorCode MuxerFilter::AddTrackThenConfigure(const std::pair<std::string, Plugin::Meta>& metaPair)
135 {
136     uint32_t trackId = 0;
137     ErrorCode isTranSuccess = TranslatePluginStatus(plugin_->AddTrack(trackId));
138     if (isTranSuccess != ErrorCode::SUCCESS) {
139         MEDIA_LOG_E("muxer plugin add track failed");
140         return isTranSuccess;
141     }
142     trackInfos_.emplace_back(TrackInfo{static_cast<int32_t>(trackId), metaPair.first, false});
143     auto parameterMap = PluginParameterTable::GetInstance().FindAllowedParameterMap(filterType_);
144     for (const auto& keyPair : parameterMap) {
145         Plugin::ValueType outValue;
146         auto isGetSuccess = metaPair.second.GetData(static_cast<Plugin::Tag>(keyPair.first), outValue);
147         if (isGetSuccess &&
148             (keyPair.second.second & PARAM_SET) &&
149             keyPair.second.first(keyPair.first, outValue)) {
150             plugin_->SetTrackParameter(trackId, keyPair.first, outValue);
151         } else {
152             if (!Plugin::HasTagInfo(keyPair.first)) {
153                 MEDIA_LOG_W("tag " PUBLIC_LOG_D32 " is not in map, may be update it?", keyPair.first);
154             } else {
155                 MEDIA_LOG_W("parameter " PUBLIC_LOG_S " in meta is not found or type mismatch",
156                     Plugin::GetTagStrName(keyPair.first));
157             }
158         }
159     }
160     return ErrorCode::SUCCESS;
161 }
162 
ConfigureToStart()163 ErrorCode MuxerFilter::ConfigureToStart()
164 {
165     ErrorCode ret;
166     for (const auto& cache: metaCache_) {
167         ret = AddTrackThenConfigure(cache);
168         if (ret != ErrorCode::SUCCESS) {
169             MEDIA_LOG_E("add and configure for track from inPort " PUBLIC_LOG_S " failed", cache.first.c_str());
170             return ret;
171         }
172     }
173     // todo add other global meta
174 
175     ret = TranslatePluginStatus(plugin_->Prepare());
176     if (ret != ErrorCode::SUCCESS) {
177         MEDIA_LOG_E("muxer plugin prepare failed");
178         return ret;
179     }
180     ret = TranslatePluginStatus(plugin_->Start());
181     if (ret != ErrorCode::SUCCESS) {
182         MEDIA_LOG_E("muxer plugin start failed");
183     }
184     return ret;
185 }
Configure(const std::string & inPort,const std::shared_ptr<const Plugin::Meta> & upstreamMeta,Plugin::Meta & upstreamParams,Plugin::Meta & downstreamParams)186 bool MuxerFilter::Configure(const std::string& inPort, const std::shared_ptr<const Plugin::Meta>& upstreamMeta,
187                             Plugin::Meta& upstreamParams, Plugin::Meta& downstreamParams)
188 {
189     std::string tmp;
190     if (!upstreamMeta->Get<Plugin::Tag::MIME>(tmp))  {
191         MEDIA_LOG_E("stream meta must contain mime, which is not found in current stream from port " PUBLIC_LOG_S,
192                     inPort.c_str());
193         return false;
194     }
195     metaCache_.emplace_back(std::make_pair(inPort, *upstreamMeta));
196     if (metaCache_.size() < inPorts_.size()) {
197         return true;
198     }
199     if (plugin_ == nullptr) {
200         MEDIA_LOG_E("cannot configure when no plugin available");
201         return false;
202     }
203 
204     auto meta = std::make_shared<Plugin::Meta>();
205     FALSE_LOG(meta->Set<Plugin::Tag::MIME>(containerMime_));
206     if (!outPorts_[0]->Configure(meta, upstreamParams, downstreamParams)) {
207         MEDIA_LOG_E("downstream of muxer filter configure failed");
208         return false;
209     }
210     plugin_->SetDataSink(muxerDataSink_);
211     auto ret = ConfigureToStart();
212     if (ret != ErrorCode::SUCCESS) {
213         MEDIA_LOG_E("muxer filter configure and start error");
214         OnEvent({name_, EventType::EVENT_ERROR, ret});
215         return false;
216     }
217     state_ = FilterState::READY;
218     OnEvent({name_, EventType::EVENT_READY});
219     MEDIA_LOG_I("muxer send EVENT_READY");
220     return true;
221 }
222 
SetOutputFormat(std::string containerMime)223 ErrorCode MuxerFilter::SetOutputFormat(std::string containerMime)
224 {
225     containerMime_ = std::move(containerMime);
226     return ErrorCode::SUCCESS;
227 }
228 
AddTrack(std::shared_ptr<InPort> & trackPort)229 ErrorCode MuxerFilter::AddTrack(std::shared_ptr<InPort> &trackPort)
230 {
231     if (state_ != FilterState::INITIALIZED) {
232         return ErrorCode::ERROR_INVALID_OPERATION;
233     }
234     trackPort = std::make_shared<InPort>(this, std::string(PORT_NAME_DEFAULT) + std::to_string(inPorts_.size()));
235     inPorts_.emplace_back(trackPort);
236     return ErrorCode::SUCCESS;
237 }
238 
SetMaxDuration(uint64_t maxDuration)239 ErrorCode MuxerFilter::SetMaxDuration(uint64_t maxDuration)
240 {
241     return ErrorCode::SUCCESS;
242 }
243 
SetMaxSize(uint64_t maxSize)244 ErrorCode MuxerFilter::SetMaxSize(uint64_t maxSize)
245 {
246     return ErrorCode::SUCCESS;
247 }
248 
StartNextSegment()249 ErrorCode MuxerFilter::StartNextSegment()
250 {
251     return ErrorCode::SUCCESS;
252 }
253 
SendEos()254 ErrorCode MuxerFilter::SendEos()
255 {
256     OSAL::ScopedLock lock(pushDataMutex_);
257     MEDIA_LOG_I("SendEos entered.");
258     eos_ = true;
259     if (hasWriteHeader_ && plugin_) {
260         plugin_->WriteTrailer();
261     }
262     hasWriteHeader_ = false;
263     auto buf = std::make_shared<AVBuffer>();
264     buf->flag |= BUFFER_FLAG_EOS;
265     outPorts_[0]->PushData(buf, -1);
266     metaCache_.clear();
267     return ErrorCode::SUCCESS;
268 }
269 
AllTracksEos()270 bool MuxerFilter::AllTracksEos()
271 {
272     return static_cast<size_t>(eosTrackCnt.load()) == trackInfos_.size();
273 }
UpdateEosState(const std::string & inPort)274 void MuxerFilter::UpdateEosState(const std::string& inPort)
275 {
276     int32_t eosCnt = 0;
277     for (auto& item : trackInfos_) {
278         if (item.inPort == inPort) {
279             item.eos = true;
280         }
281         if (item.eos) {
282             eosCnt++;
283         }
284     }
285     eosTrackCnt = eosCnt;
286 }
287 
PushData(const std::string & inPort,const AVBufferPtr & buffer,int64_t offset)288 ErrorCode MuxerFilter::PushData(const std::string& inPort, const AVBufferPtr& buffer, int64_t offset)
289 {
290     {
291         OSAL::ScopedLock lock(pushDataMutex_);
292         if (state_ != FilterState::READY && state_ != FilterState::PAUSED && state_ != FilterState::RUNNING) {
293             MEDIA_LOG_W("pushing data to muxer when state is " PUBLIC_LOG_D32, static_cast<int>(state_.load()));
294             return ErrorCode::ERROR_INVALID_OPERATION;
295         }
296         if (eos_) {
297             MEDIA_LOG_D("SendEos exit");
298             return ErrorCode::SUCCESS;
299         }
300         // todo we should consider more tracks
301         if (!hasWriteHeader_) {
302             plugin_->WriteHeader();
303             hasWriteHeader_ = true;
304         }
305         if (buffer->GetMemory()->GetSize() != 0) {
306             plugin_->WriteFrame(buffer);
307         }
308 
309         if (buffer->flag & BUFFER_FLAG_EOS) {
310             MEDIA_LOG_I("It is EOS buffer");
311             UpdateEosState(inPort);
312         }
313     }
314     if (AllTracksEos()) {
315         SendEos();
316     }
317     return ErrorCode::SUCCESS;
318 }
319 
WriteAt(int64_t offset,const std::shared_ptr<Plugin::Buffer> & buffer)320 Plugin::Status MuxerFilter::MuxerDataSink::WriteAt(int64_t offset, const std::shared_ptr<Plugin::Buffer> &buffer)
321 {
322     if (muxerFilter_ != nullptr) {
323         muxerFilter_->outPorts_[0]->PushData(buffer, offset);
324     }
325     return Plugin::Status::OK;
326 }
327 
Start()328 ErrorCode MuxerFilter::Start()
329 {
330     eos_ = false;
331     return FilterBase::Start();
332 }
333 } // Pipeline
334 } // Media
335 } // OHOS
336 #endif // RECORDER_SUPPORT