1 /*
2  * Copyright 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "Codec2-FilterWrapper"
19 #include <android-base/logging.h>
20 
21 #include <set>
22 
23 #include <dlfcn.h>
24 
25 #include <C2Config.h>
26 #include <C2Debug.h>
27 #include <C2ParamInternal.h>
28 
29 #include <codec2/hidl/plugin/FilterPlugin.h>
30 
31 #include <FilterWrapper.h>
32 
33 namespace android {
34 
35 namespace {
36 
37 // Indices that the last filter in the chain should consume.
38 static constexpr uint32_t kTypesForLastFilter[] = {
39     // In case we have an output surface, we want to use the block pool
40     // backed by the output surface for the output buffer going to the client.
41     C2PortBlockPoolsTuning::output::PARAM_TYPE,
42 };
43 
44 class WrappedDecoderInterface : public C2ComponentInterface {
45 public:
WrappedDecoderInterface(std::shared_ptr<C2ComponentInterface> intf,std::vector<FilterWrapper::Component> && filters,std::weak_ptr<FilterWrapper> filterWrapper)46     WrappedDecoderInterface(
47             std::shared_ptr<C2ComponentInterface> intf,
48             std::vector<FilterWrapper::Component> &&filters,
49             std::weak_ptr<FilterWrapper> filterWrapper)
50         : mIntf(intf), mFilterWrapper(filterWrapper) {
51         takeFilters(std::move(filters));
52     }
53 
54     ~WrappedDecoderInterface() override = default;
55 
takeFilters(std::vector<FilterWrapper::Component> && filters)56     void takeFilters(std::vector<FilterWrapper::Component> &&filters) {
57         std::unique_lock lock(mMutex);
58         std::vector<std::unique_ptr<C2Param>> lastFilterParams;
59         if (!mFilters.empty()) {
60             std::vector<C2Param::Index> indices;
61             std::vector<std::shared_ptr<C2ParamDescriptor>> paramDescs;
62             c2_status_t err = mFilters.back().intf->querySupportedParams_nb(&paramDescs);
63             if (err != C2_OK) {
64                 LOG(WARNING) << "WrappedDecoderInterface: " << mFilters.back().traits.name
65                         << " returned error for querySupportedParams_nb; err=" << err;
66                 paramDescs.clear();
67             }
68             for (const std::shared_ptr<C2ParamDescriptor> &paramDesc : paramDescs) {
69                 C2Param::Index index = paramDesc->index();
70                 if (std::count(
71                             std::begin(kTypesForLastFilter),
72                             std::end(kTypesForLastFilter),
73                             index.type()) != 0) {
74                     if (index.forStream()) {
75                         // querySupportedParams does not return per-stream params.
76                         // We only support stream-0 for now.
77                         index = index.withStream(0u);
78                     }
79                     indices.push_back(index);
80                 }
81             }
82             if (!indices.empty()) {
83                 mFilters.back().intf->query_vb({}, indices, C2_MAY_BLOCK, &lastFilterParams);
84             }
85         }
86 
87         // TODO: documentation
88         mFilters = std::move(filters);
89         mTypeToIndexForQuery.clear();
90         mTypeToIndexForConfig.clear();
91         for (size_t i = 0; i < mFilters.size(); ++i) {
92             if (i == 0) {
93                 transferParams_l(mIntf, mFilters[0].intf, C2_MAY_BLOCK);
94             } else {
95                 transferParams_l(mFilters[i - 1].intf, mFilters[i].intf, C2_MAY_BLOCK);
96             }
97             for (C2Param::Type type : mFilters[i].desc.controlParams) {
98                 mTypeToIndexForQuery[type.type()] = i;
99                 mTypeToIndexForConfig[type.type() & ~C2Param::CoreIndex::IS_REQUEST_FLAG] = i;
100             }
101             for (C2Param::Type type : mFilters[i].desc.affectedParams) {
102                 mTypeToIndexForQuery[type.type()] = i;
103             }
104         }
105         for (size_t i = mFilters.size(); i > 0; --i) {
106             if (i == 1) {
107                 backPropagateParams_l(mIntf, mFilters[0].intf, C2_MAY_BLOCK);
108             } else {
109                 backPropagateParams_l(mFilters[i - 2].intf, mFilters[i - 1].intf, C2_MAY_BLOCK);
110             }
111         }
112         if (!mFilters.empty()) {
113             for (uint32_t type : kTypesForLastFilter) {
114                 mTypeToIndexForQuery[type] = mFilters.size() - 1;
115                 mTypeToIndexForConfig[type & ~C2Param::CoreIndex::IS_REQUEST_FLAG] =
116                     mFilters.size() - 1;
117             }
118             if (!lastFilterParams.empty()) {
119                 std::vector<C2Param *> paramPtrs(lastFilterParams.size());
120                 std::transform(
121                         lastFilterParams.begin(),
122                         lastFilterParams.end(),
123                         paramPtrs.begin(),
124                         [](const std::unique_ptr<C2Param> &param) {
125                             return param.get();
126                         });
127                 std::vector<std::unique_ptr<C2SettingResult>> failures;
128                 mFilters.back().intf->config_vb(paramPtrs, C2_MAY_BLOCK, &failures);
129             }
130         }
131     }
132 
getName() const133     C2String getName() const override { return mIntf->getName(); }
134 
getId() const135     c2_node_id_t getId() const override { return mIntf->getId(); }
136 
query_vb(const std::vector<C2Param * > & stackParams,const std::vector<C2Param::Index> & heapParamIndices,c2_blocking_t mayBlock,std::vector<std::unique_ptr<C2Param>> * const heapParams) const137     c2_status_t query_vb(
138             const std::vector<C2Param *> &stackParams,
139             const std::vector<C2Param::Index> &heapParamIndices,
140             c2_blocking_t mayBlock,
141             std::vector<std::unique_ptr<C2Param>>* const heapParams) const override {
142         std::unique_lock lock(mMutex);
143         std::list<C2Param *> stackParamsList(stackParams.size());
144         std::copy_n(stackParams.begin(), stackParams.size(), stackParamsList.begin());
145         heapParams->clear();
146         c2_status_t result = C2_OK;
147         // TODO: loop optimization
148         for (size_t i = 0; i < mFilters.size(); ++i) {
149             // Filter stack params according to mTypeToIndexForQuery
150             std::vector<C2Param *> stackParamsForFilter;
151             for (auto it = stackParamsList.begin(); it != stackParamsList.end(); ) {
152                 C2Param *param = *it;
153                 uint32_t type = param->type().type();
154                 auto it2 = mTypeToIndexForQuery.find(type);
155                 if (it2 == mTypeToIndexForQuery.end() || it2->second != i) {
156                     ++it;
157                     continue;
158                 }
159                 stackParamsForFilter.push_back(param);
160                 it = stackParamsList.erase(it);
161             }
162             // Filter heap params according to mTypeToIndexForQuery
163             std::vector<C2Param::Index> heapParamIndicesForFilter;
164             for (size_t j = 0; j < heapParamIndices.size(); ++j) {
165                 uint32_t type = heapParamIndices[j].type();
166                 auto it = mTypeToIndexForQuery.find(type);
167                 if (it == mTypeToIndexForQuery.end() || it->second != i) {
168                     continue;
169                 }
170                 heapParamIndicesForFilter.push_back(heapParamIndices[j]);
171             }
172             std::vector<std::unique_ptr<C2Param>> heapParamsForFilter;
173             const std::shared_ptr<C2ComponentInterface> &filter = mFilters[i].intf;
174             c2_status_t err = filter->query_vb(
175                     stackParamsForFilter, heapParamIndicesForFilter, mayBlock,
176                     &heapParamsForFilter);
177             if (err != C2_OK && err != C2_BAD_INDEX) {
178                 LOG(WARNING) << "WrappedDecoderInterface: " << filter->getName()
179                         << " returned error for query_vb; err=" << err;
180                 result = err;
181                 continue;
182             }
183             heapParams->insert(
184                     heapParams->end(),
185                     std::make_move_iterator(heapParamsForFilter.begin()),
186                     std::make_move_iterator(heapParamsForFilter.end()));
187         }
188 
189         std::vector<C2Param *> stackParamsForIntf;
190         std::copy_n(stackParamsList.begin(), stackParamsList.size(), stackParamsForIntf.begin());
191 
192         // Gather heap params that did not get queried from the filter interfaces above.
193         // These need to be queried from the decoder interface.
194         std::vector<C2Param::Index> heapParamIndicesForIntf;
195         for (size_t j = 0; j < heapParamIndices.size(); ++j) {
196             uint32_t type = heapParamIndices[j].type();
197             if (mTypeToIndexForQuery.find(type) != mTypeToIndexForQuery.end()) {
198                 continue;
199             }
200             heapParamIndicesForIntf.push_back(heapParamIndices[j]);
201         }
202 
203         std::vector<std::unique_ptr<C2Param>> heapParamsForIntf;
204         c2_status_t err = mIntf->query_vb(
205                 stackParamsForIntf, heapParamIndicesForIntf, mayBlock, &heapParamsForIntf);
206         if (err != C2_OK) {
207             LOG(err == C2_BAD_INDEX ? VERBOSE : WARNING)
208                     << "WrappedDecoderInterface: " << mIntf->getName()
209                     << " returned error for query_vb; err=" << err;
210             result = err;
211         }
212 
213         // TODO: params needs to preserve the order
214         heapParams->insert(
215                 heapParams->end(),
216                 std::make_move_iterator(heapParamsForIntf.begin()),
217                 std::make_move_iterator(heapParamsForIntf.end()));
218 
219         return result;
220     }
221 
config_vb(const std::vector<C2Param * > & params,c2_blocking_t mayBlock,std::vector<std::unique_ptr<C2SettingResult>> * const failures)222     c2_status_t config_vb(
223             const std::vector<C2Param *> &params,
224             c2_blocking_t mayBlock,
225             std::vector<std::unique_ptr<C2SettingResult>>* const failures) override {
226         std::unique_lock lock(mMutex);
227         c2_status_t result = C2_OK;
228         std::vector<C2Param *> paramsForIntf;
229         for (C2Param* param : params) {
230             auto it = mTypeToIndexForConfig.find(param->type().type());
231             if (it != mTypeToIndexForConfig.end()) {
232                 continue;
233             }
234             paramsForIntf.push_back(param);
235         }
236         c2_status_t err = mIntf->config_vb(paramsForIntf, mayBlock, failures);
237         if (err != C2_OK) {
238             LOG(err == C2_BAD_INDEX ? VERBOSE : WARNING)
239                     << "WrappedDecoderInterface: " << mIntf->getName()
240                     << " returned error for config_vb; err=" << err;
241             result = err;
242         }
243         for (size_t i = 0; i < mFilters.size(); ++i) {
244             if (i == 0) {
245                 transferParams_l(mIntf, mFilters[0].intf, mayBlock);
246             } else {
247                 transferParams_l(mFilters[i - 1].intf, mFilters[i].intf, mayBlock);
248             }
249             const std::shared_ptr<C2ComponentInterface> &filter = mFilters[i].intf;
250             std::vector<std::unique_ptr<C2SettingResult>> filterFailures;
251             std::vector<C2Param *> paramsForFilter;
252             for (C2Param* param : params) {
253                 auto it = mTypeToIndexForConfig.find(param->type().type());
254                 if (it != mTypeToIndexForConfig.end() && it->second != i) {
255                     continue;
256                 }
257                 paramsForFilter.push_back(param);
258             }
259             c2_status_t err = filter->config_vb(paramsForFilter, mayBlock, &filterFailures);
260             if (err != C2_OK) {
261                 LOG(err == C2_BAD_INDEX ? VERBOSE : WARNING)
262                         << "WrappedDecoderInterface: " << filter->getName()
263                         << " returned error for config_vb; err=" << err;
264                 result = err;
265             }
266         }
267         for (size_t i = mFilters.size(); i > 0; --i) {
268             if (i == 1) {
269                 backPropagateParams_l(mIntf, mFilters[0].intf, mayBlock);
270             } else {
271                 backPropagateParams_l(mFilters[i - 2].intf, mFilters[i - 1].intf, mayBlock);
272             }
273         }
274 
275         return result;
276     }
277 
createTunnel_sm(c2_node_id_t)278     c2_status_t createTunnel_sm(c2_node_id_t) override { return C2_OMITTED; }
releaseTunnel_sm(c2_node_id_t)279     c2_status_t releaseTunnel_sm(c2_node_id_t) override { return C2_OMITTED; }
280 
querySupportedParams_nb(std::vector<std::shared_ptr<C2ParamDescriptor>> * const params) const281     c2_status_t querySupportedParams_nb(
282             std::vector<std::shared_ptr<C2ParamDescriptor>> * const params) const override {
283         std::unique_lock lock(mMutex);
284         c2_status_t result = mIntf->querySupportedParams_nb(params);
285         if (result != C2_OK) {
286             LOG(WARNING) << "WrappedDecoderInterface: " << mIntf->getName()
287                     << " returned error for querySupportedParams_nb; err=" << result;
288             return result;
289         }
290         // TODO: optimization idea --- pre-compute at takeFilter().
291         for (const FilterWrapper::Component &filter : mFilters) {
292             std::vector<std::shared_ptr<C2ParamDescriptor>> filterParams;
293             c2_status_t err = filter.intf->querySupportedParams_nb(&filterParams);
294             if (err != C2_OK) {
295                 LOG(WARNING) << "WrappedDecoderInterface: " << filter.intf->getName()
296                         << " returned error for querySupportedParams_nb; err=" << result;
297                 result = err;
298                 continue;
299             }
300             for (const std::shared_ptr<C2ParamDescriptor> &paramDesc : filterParams) {
301                 if (std::count(
302                         filter.desc.controlParams.begin(),
303                         filter.desc.controlParams.end(),
304                         paramDesc->index().type()) == 0) {
305                     continue;
306                 }
307                 params->push_back(paramDesc);
308             }
309         }
310         return result;
311     }
312 
querySupportedValues_vb(std::vector<C2FieldSupportedValuesQuery> & fields,c2_blocking_t mayBlock) const313     c2_status_t querySupportedValues_vb(
314             std::vector<C2FieldSupportedValuesQuery> &fields,
315             c2_blocking_t mayBlock) const override {
316         std::unique_lock lock(mMutex);
317         c2_status_t result = mIntf->querySupportedValues_vb(fields, mayBlock);
318         if (result != C2_OK && result != C2_BAD_INDEX) {
319             LOG(WARNING) << "WrappedDecoderInterface: " << mIntf->getName()
320                     << " returned error for querySupportedParams_nb; err=" << result;
321             return result;
322         }
323         for (const FilterWrapper::Component &filter : mFilters) {
324             std::vector<C2FieldSupportedValuesQuery> filterFields;
325             std::vector<size_t> indices;
326             for (size_t i = 0; i < fields.size(); ++i) {
327                 const C2FieldSupportedValuesQuery &field = fields[i];
328                 uint32_t type = C2Param::Index(_C2ParamInspector::GetIndex(field.field())).type();
329                 if (std::count(
330                         filter.desc.controlParams.begin(),
331                         filter.desc.controlParams.end(),
332                         type) == 0) {
333                     continue;
334                 }
335                 filterFields.push_back(field);
336                 indices.push_back(i);
337             }
338             c2_status_t err = filter.intf->querySupportedValues_vb(filterFields, mayBlock);
339             if (err != C2_OK && err != C2_BAD_INDEX) {
340                 LOG(WARNING) << "WrappedDecoderInterface: " << filter.intf->getName()
341                         << " returned error for querySupportedParams_nb; err=" << result;
342                 result = err;
343                 continue;
344             }
345             for (size_t i = 0; i < filterFields.size(); ++i) {
346                 fields[indices[i]] = filterFields[i];
347             }
348         }
349         return result;
350     }
351 
352 private:
353     mutable std::mutex mMutex;
354     std::shared_ptr<C2ComponentInterface> mIntf;
355     std::vector<FilterWrapper::Component> mFilters;
356     std::weak_ptr<FilterWrapper> mFilterWrapper;
357     std::map<uint32_t, size_t> mTypeToIndexForQuery;
358     std::map<uint32_t, size_t> mTypeToIndexForConfig;
359 
transferParams_l(const std::shared_ptr<C2ComponentInterface> & curr,const std::shared_ptr<C2ComponentInterface> & next,c2_blocking_t mayBlock)360     c2_status_t transferParams_l(
361             const std::shared_ptr<C2ComponentInterface> &curr,
362             const std::shared_ptr<C2ComponentInterface> &next,
363             c2_blocking_t mayBlock) {
364         // NOTE: this implementation is preliminary --- it could change once
365         // we define what parameters needs to be propagated in component chaining.
366         std::vector<std::shared_ptr<C2ParamDescriptor>> paramDescs;
367         c2_status_t err = next->querySupportedParams_nb(&paramDescs);
368         if (err != C2_OK) {
369             LOG(DEBUG) << "WrappedDecoderInterface: " << next->getName()
370                     << " returned error for querySupportedParams_nb; err=" << err;
371             return err;
372         }
373         // Find supported input params from the next interface and flip direction
374         // so they become output params.
375         std::vector<C2Param::Index> indices;
376         for (const std::shared_ptr<C2ParamDescriptor> &paramDesc : paramDescs) {
377             C2Param::Index index = paramDesc->index();
378             if (!index.forInput() || paramDesc->isReadOnly()) {
379                 continue;
380             }
381             if (index.forStream()) {
382                 uint32_t stream = index.stream();
383                 index = index.withPort(true /* output */).withStream(stream);
384             } else {
385                 index = index.withPort(true /* output */);
386             }
387             indices.push_back(index);
388         }
389         // Query those output params from the current interface
390         std::vector<std::unique_ptr<C2Param>> heapParams;
391         err = curr->query_vb({}, indices, mayBlock, &heapParams);
392         if (err != C2_OK && err != C2_BAD_INDEX) {
393             LOG(DEBUG) << "WrappedDecoderInterface: " << curr->getName()
394                     << " returned error for query_vb; err=" << err;
395             return err;
396         }
397         // Flip the direction of the queried params, so they become input parameters.
398         // Configure the next interface with the params.
399         std::vector<C2Param *> configParams;
400         for (size_t i = 0; i < heapParams.size(); ++i) {
401             if (!heapParams[i]) {
402                 continue;
403             }
404             if (heapParams[i]->forStream()) {
405                 heapParams[i] = C2Param::CopyAsStream(
406                         *heapParams[i], false /* output */, heapParams[i]->stream());
407             } else {
408                 heapParams[i] = C2Param::CopyAsPort(*heapParams[i], false /* output */);
409             }
410             configParams.push_back(heapParams[i].get());
411         }
412         std::vector<std::unique_ptr<C2SettingResult>> failures;
413         err = next->config_vb(configParams, mayBlock, &failures);
414         if (err != C2_OK && err != C2_BAD_INDEX) {
415             LOG(DEBUG) << "WrappedDecoderInterface: " << next->getName()
416                     << " returned error for config_vb; err=" << err;
417             return err;
418         }
419         return C2_OK;
420     }
421 
backPropagateParams_l(const std::shared_ptr<C2ComponentInterface> & curr,const std::shared_ptr<C2ComponentInterface> & next,c2_blocking_t mayBlock)422     c2_status_t backPropagateParams_l(
423             const std::shared_ptr<C2ComponentInterface> &curr,
424             const std::shared_ptr<C2ComponentInterface> &next,
425             c2_blocking_t mayBlock) {
426         // NOTE: this implementation is preliminary --- it could change once
427         // we define what parameters needs to be propagated in component chaining.
428         std::shared_ptr<FilterWrapper> filterWrapper = mFilterWrapper.lock();
429         if (!filterWrapper) {
430             LOG(DEBUG) << "WrappedDecoderInterface: FilterWrapper not found";
431             return C2_OK;
432         }
433         std::vector<std::unique_ptr<C2Param>> params;
434         c2_status_t err = filterWrapper->queryParamsForPreviousComponent(next, &params);
435         if (err != C2_OK) {
436             LOG(DEBUG) << "WrappedDecoderInterface: FilterWrapper returned error for "
437                 << "queryParamsForPreviousComponent; intf=" << next->getName() << " err=" << err;
438             return C2_OK;
439         }
440         std::vector<C2Param *> configParams;
441         for (size_t i = 0; i < params.size(); ++i) {
442             if (!params[i]) {
443                 continue;
444             }
445             configParams.push_back(params[i].get());
446         }
447         std::vector<std::unique_ptr<C2SettingResult>> failures;
448         curr->config_vb(configParams, mayBlock, &failures);
449         if (err != C2_OK && err != C2_BAD_INDEX) {
450             LOG(DEBUG) << "WrappedDecoderInterface: " << next->getName()
451                     << " returned error for config_vb; err=" << err;
452             return err;
453         }
454         return C2_OK;
455     }
456 };
457 
458 class WrappedDecoder : public C2Component, public std::enable_shared_from_this<WrappedDecoder> {
459 public:
WrappedDecoder(std::shared_ptr<C2Component> comp,std::vector<FilterWrapper::Component> && filters,std::weak_ptr<FilterWrapper> filterWrapper)460     WrappedDecoder(
461             std::shared_ptr<C2Component> comp,
462             std::vector<FilterWrapper::Component> &&filters,
463             std::weak_ptr<FilterWrapper> filterWrapper)
464         : mComp(comp), mFilters(std::move(filters)), mFilterWrapper(filterWrapper) {
465         std::vector<FilterWrapper::Component> filtersDup(mFilters);
466         mIntf = std::make_shared<WrappedDecoderInterface>(
467                 comp->intf(), std::move(filtersDup), filterWrapper);
468     }
469 
470     ~WrappedDecoder() override = default;
471 
intf()472     std::shared_ptr<C2ComponentInterface> intf() override { return mIntf; }
473 
setListener_vb(const std::shared_ptr<Listener> & listener,c2_blocking_t mayBlock)474     c2_status_t setListener_vb(
475             const std::shared_ptr<Listener> &listener, c2_blocking_t mayBlock) override {
476         if (listener) {
477             setListenerInternal(mFilters, listener, mayBlock);
478         } else {
479             mComp->setListener_vb(nullptr, mayBlock);
480             for (FilterWrapper::Component &filter : mFilters) {
481                 filter.comp->setListener_vb(nullptr, mayBlock);
482             }
483         }
484         mListener = listener;
485         return C2_OK;
486     }
487 
queue_nb(std::list<std::unique_ptr<C2Work>> * const items)488     c2_status_t queue_nb(std::list<std::unique_ptr<C2Work>>* const items) override {
489         return mComp->queue_nb(items);
490     }
491 
announce_nb(const std::vector<C2WorkOutline> &)492     c2_status_t announce_nb(const std::vector<C2WorkOutline> &) override {
493         return C2_OMITTED;
494     }
495 
flush_sm(flush_mode_t mode,std::list<std::unique_ptr<C2Work>> * const flushedWork)496     c2_status_t flush_sm(
497             flush_mode_t mode, std::list<std::unique_ptr<C2Work>>* const flushedWork) override {
498         c2_status_t result = mComp->flush_sm(mode, flushedWork);
499         std::list<std::unique_ptr<C2Work>> filterFlushedWork;
500         for (FilterWrapper::Component filter : mRunningFilters) {
501             c2_status_t err = filter.comp->flush_sm(mode, &filterFlushedWork);
502             if (err != C2_OK) {
503                 result = err;
504             }
505             flushedWork->splice(flushedWork->end(), filterFlushedWork);
506         }
507         return result;
508     }
509 
drain_nb(drain_mode_t mode)510     c2_status_t drain_nb(drain_mode_t mode) override {
511         // TODO: simplify using comp->drain_nb(mode)
512         switch (mode) {
513         case DRAIN_COMPONENT_WITH_EOS: {
514             std::unique_ptr<C2Work> eosWork{new C2Work};
515             eosWork->input.flags = C2FrameData::FLAG_END_OF_STREAM;
516             eosWork->worklets.push_back(std::make_unique<C2Worklet>());
517             std::list<std::unique_ptr<C2Work>> items;
518             items.push_back(std::move(eosWork));
519             mComp->queue_nb(&items);
520             return C2_OK;
521         }
522         case DRAIN_COMPONENT_NO_EOS:
523         case DRAIN_CHAIN:
524         default:
525             return C2_BAD_VALUE;
526         }
527     }
528 
start()529     c2_status_t start() override {
530         std::vector<FilterWrapper::Component> filters;
531         if (std::shared_ptr<FilterWrapper> filterWrapper = mFilterWrapper.lock()) {
532             // Let's check if we have filters that we can skip
533             for (FilterWrapper::Component &filter : mFilters) {
534                 if (!filterWrapper->isFilteringEnabled(filter.intf)) {
535                     LOG(VERBOSE) << "filtering disabled for " << filter.traits.name;
536                     continue;
537                 }
538                 LOG(VERBOSE) << "filtering enabled for " << filter.traits.name;
539                 filters.push_back(filter);
540             }
541             if (filters.size() < mFilters.size()) {
542                 LOG(VERBOSE) << (mFilters.size() - filters.size()) << " filter(s) skipped";
543                 setListenerInternal(filters, mListener, C2_MAY_BLOCK);
544                 std::vector filtersCopy(filters);
545                 mIntf->takeFilters(std::move(filtersCopy));
546             }
547         }
548 
549         c2_status_t err = mComp->start();
550         if (err != C2_OK) {
551             return err;
552         }
553         for (FilterWrapper::Component &filter : filters) {
554             c2_status_t err = filter.comp->start();
555             if (err != C2_OK) {
556                 // Previous components are already started successfully;
557                 // we ended up in an incoherent state.
558                 return C2_CORRUPTED;
559             }
560         }
561         mRunningFilters = std::move(filters);
562         return C2_OK;
563     }
564 
stop()565     c2_status_t stop() override {
566         c2_status_t err = mComp->stop();
567         if (err != C2_OK) {
568             return err;
569         }
570         for (FilterWrapper::Component filter : mRunningFilters) {
571             c2_status_t err = filter.comp->stop();
572             if (err != C2_OK) {
573                 // Previous components are already stopped successfully;
574                 // we ended up in an incoherent state.
575                 return C2_CORRUPTED;
576             }
577         }
578         mRunningFilters.clear();
579         return C2_OK;
580     }
581 
reset()582     c2_status_t reset() override {
583         c2_status_t result = mComp->reset();
584         if (result != C2_OK) {
585             result = C2_CORRUPTED;
586         }
587         for (FilterWrapper::Component filter : mFilters) {
588             c2_status_t err = filter.comp->reset();
589             if (err != C2_OK) {
590                 // Previous components are already reset successfully;
591                 // we ended up in an incoherent state.
592                 result = C2_CORRUPTED;
593                 // continue for the rest of the chain
594             }
595         }
596         mRunningFilters.clear();
597         return result;
598     }
599 
release()600     c2_status_t release() override {
601         c2_status_t result = mComp->release();
602         if (result != C2_OK) {
603             result = C2_CORRUPTED;
604         }
605         for (FilterWrapper::Component filter : mFilters) {
606             c2_status_t err = filter.comp->release();
607             if (err != C2_OK) {
608                 // Previous components are already released successfully;
609                 // we ended up in an incoherent state.
610                 result = C2_CORRUPTED;
611                 // continue for the rest of the chain
612             }
613         }
614         mRunningFilters.clear();
615         return result;
616     }
617 
618 private:
619     class PassingListener : public Listener {
620     public:
PassingListener(std::shared_ptr<C2Component> wrappedComponent,const std::shared_ptr<Listener> & wrappedComponentListener,std::shared_ptr<C2Component> nextComponent)621         PassingListener(
622                 std::shared_ptr<C2Component> wrappedComponent,
623                 const std::shared_ptr<Listener> &wrappedComponentListener,
624                 std::shared_ptr<C2Component> nextComponent)
625             : mWrappedComponent(wrappedComponent),
626               mWrappedComponentListener(wrappedComponentListener),
627               mNextComponent(nextComponent) {
628         }
629 
onWorkDone_nb(std::weak_ptr<C2Component>,std::list<std::unique_ptr<C2Work>> workItems)630         void onWorkDone_nb(
631                 std::weak_ptr<C2Component>,
632                 std::list<std::unique_ptr<C2Work>> workItems) override {
633             std::shared_ptr<C2Component> nextComponent = mNextComponent.lock();
634             std::list<std::unique_ptr<C2Work>> failedWorkItems;
635             if (!nextComponent) {
636                 for (std::unique_ptr<C2Work> &work : workItems) {
637                     // Next component unexpectedly released while the work is
638                     // in-flight. Report C2_CORRUPTED to the client.
639                     work->result = C2_CORRUPTED;
640                     failedWorkItems.push_back(std::move(work));
641                 }
642                 workItems.clear();
643             } else {
644                 for (auto it = workItems.begin(); it != workItems.end(); ) {
645                     const std::unique_ptr<C2Work> &work = *it;
646                     if (work->result != C2_OK
647                             || work->worklets.size() != 1) {
648                         failedWorkItems.push_back(std::move(*it));
649                         it = workItems.erase(it);
650                         continue;
651                     }
652                     C2FrameData &output = work->worklets.front()->output;
653                     c2_cntr64_t customOrdinal = work->input.ordinal.customOrdinal;
654                     work->input = std::move(output);
655                     work->input.ordinal.customOrdinal = customOrdinal;
656                     output.flags = C2FrameData::flags_t(0);
657                     output.buffers.clear();
658                     output.configUpdate.clear();
659                     output.infoBuffers.clear();
660                     ++it;
661                 }
662             }
663             if (!failedWorkItems.empty()) {
664                 for (const std::unique_ptr<C2Work> &work : failedWorkItems) {
665                     LOG(VERBOSE) << "work #" << work->input.ordinal.frameIndex.peek()
666                             << " failed: err=" << work->result
667                             << " worklets.size()=" << work->worklets.size();
668                 }
669                 if (std::shared_ptr<Listener> wrappedComponentListener =
670                         mWrappedComponentListener.lock()) {
671                     wrappedComponentListener->onWorkDone_nb(
672                             mWrappedComponent, std::move(failedWorkItems));
673                 }
674             }
675             if (!workItems.empty()) {
676                 nextComponent->queue_nb(&workItems);
677             }
678         }
679 
onTripped_nb(std::weak_ptr<C2Component>,std::vector<std::shared_ptr<C2SettingResult>>)680         void onTripped_nb(
681                 std::weak_ptr<C2Component>,
682                 std::vector<std::shared_ptr<C2SettingResult>>) override {
683             // Trip not supported
684         }
685 
onError_nb(std::weak_ptr<C2Component>,uint32_t errorCode)686         void onError_nb(std::weak_ptr<C2Component>, uint32_t errorCode) {
687             if (std::shared_ptr<Listener> wrappedComponentListener =
688                     mWrappedComponentListener.lock()) {
689                 wrappedComponentListener->onError_nb(mWrappedComponent, errorCode);
690             }
691         }
692 
693     private:
694         std::weak_ptr<C2Component> mWrappedComponent;
695         std::weak_ptr<Listener> mWrappedComponentListener;
696         std::weak_ptr<C2Component> mNextComponent;
697     };
698 
699     class LastListener : public Listener {
700     public:
LastListener(std::shared_ptr<C2Component> wrappedComponent,const std::shared_ptr<Listener> & wrappedComponentListener)701         LastListener(
702                 std::shared_ptr<C2Component> wrappedComponent,
703                 const std::shared_ptr<Listener> &wrappedComponentListener)
704             : mWrappedComponent(wrappedComponent),
705               mWrappedComponentListener(wrappedComponentListener) {
706         }
707 
onWorkDone_nb(std::weak_ptr<C2Component>,std::list<std::unique_ptr<C2Work>> workItems)708         void onWorkDone_nb(
709                 std::weak_ptr<C2Component>,
710                 std::list<std::unique_ptr<C2Work>> workItems) override {
711             if (mWrappedComponent.expired()) {
712                 return;
713             }
714             if (std::shared_ptr<Listener> wrappedComponentListener =
715                     mWrappedComponentListener.lock()) {
716                 wrappedComponentListener->onWorkDone_nb(
717                         mWrappedComponent, std::move(workItems));
718             }
719         }
720 
onTripped_nb(std::weak_ptr<C2Component>,std::vector<std::shared_ptr<C2SettingResult>>)721         void onTripped_nb(
722                 std::weak_ptr<C2Component>,
723                 std::vector<std::shared_ptr<C2SettingResult>>) override {
724             // Trip not supported
725         }
726 
onError_nb(std::weak_ptr<C2Component>,uint32_t errorCode)727         void onError_nb(std::weak_ptr<C2Component>, uint32_t errorCode) {
728             if (mWrappedComponent.expired()) {
729                 return;
730             }
731             if (std::shared_ptr<Listener> wrappedComponentListener =
732                     mWrappedComponentListener.lock()) {
733                 wrappedComponentListener->onError_nb(mWrappedComponent, errorCode);
734             }
735         }
736 
737     private:
738         std::weak_ptr<C2Component> mWrappedComponent;
739         std::weak_ptr<Listener> mWrappedComponentListener;
740     };
741 
742     std::shared_ptr<C2Component> mComp;
743     std::shared_ptr<WrappedDecoderInterface> mIntf;
744     std::vector<FilterWrapper::Component> mFilters;
745     std::vector<FilterWrapper::Component> mRunningFilters;
746     std::weak_ptr<FilterWrapper> mFilterWrapper;
747     std::shared_ptr<Listener> mListener;
748 #if defined(LOG_NDEBUG) && !LOG_NDEBUG
749     base::ScopedLogSeverity mScopedLogSeverity{base::VERBOSE};
750 #endif
751 
setListenerInternal(const std::vector<FilterWrapper::Component> & filters,const std::shared_ptr<Listener> & listener,c2_blocking_t mayBlock)752     c2_status_t setListenerInternal(
753             const std::vector<FilterWrapper::Component> &filters,
754             const std::shared_ptr<Listener> &listener,
755             c2_blocking_t mayBlock) {
756         if (filters.empty()) {
757             return mComp->setListener_vb(listener, mayBlock);
758         }
759         std::shared_ptr passingListener = std::make_shared<PassingListener>(
760                 shared_from_this(),
761                 listener,
762                 filters.front().comp);
763         mComp->setListener_vb(passingListener, mayBlock);
764         for (size_t i = 0; i < filters.size() - 1; ++i) {
765             filters[i].comp->setListener_vb(
766                     std::make_shared<PassingListener>(
767                             shared_from_this(),
768                             listener,
769                             filters[i + 1].comp),
770                     mayBlock);
771         }
772         filters.back().comp->setListener_vb(
773                 std::make_shared<LastListener>(shared_from_this(), listener), mayBlock);
774         return C2_OK;
775     }
776 };
777 
778 }  // anonymous namespace
779 
FilterWrapper(std::unique_ptr<Plugin> && plugin)780 FilterWrapper::FilterWrapper(std::unique_ptr<Plugin> &&plugin)
781     : mInit(NO_INIT),
782       mPlugin(std::move(plugin)) {
783     if (mPlugin->status() != OK) {
784         LOG(ERROR) << "plugin not OK: " << mPlugin->status();
785         mPlugin.reset();
786         return;
787     }
788     mStore = mPlugin->getStore();
789     if (!mStore) {
790         LOG(ERROR) << "no store";
791         mPlugin.reset();
792         return;
793     }
794     std::vector<std::shared_ptr<const C2Component::Traits>> traits =
795         mStore->listComponents();
796     std::sort(
797             traits.begin(),
798             traits.end(),
799             [](std::shared_ptr<const C2Component::Traits> &a,
800                     std::shared_ptr<const C2Component::Traits> &b) {
801                 return a->rank < b->rank;
802             });
803     for (size_t i = 0; i < traits.size(); ++i) {
804         const std::shared_ptr<const C2Component::Traits> &trait = traits[i];
805         if (trait->domain == C2Component::DOMAIN_OTHER
806                 || trait->domain == C2Component::DOMAIN_AUDIO
807                 || trait->kind != C2Component::KIND_OTHER) {
808             LOG(DEBUG) << trait->name << " is ignored because of domain/kind: "
809                 << trait->domain << "/" << trait->kind;
810             continue;
811         }
812         Descriptor desc;
813         if (!mPlugin->describe(trait->name, &desc)) {
814             LOG(DEBUG) << trait->name << " is ignored because describe() failed";
815             continue;
816         }
817         mComponents.push_back({nullptr, nullptr, *trait, desc});
818     }
819     if (mComponents.empty()) {
820         LOG(DEBUG) << "FilterWrapper: no filter component found";
821         mPlugin.reset();
822         return;
823     }
824     mInit = OK;
825 }
826 
~FilterWrapper()827 FilterWrapper::~FilterWrapper() {
828 }
829 
createFilters()830 std::vector<FilterWrapper::Component> FilterWrapper::createFilters() {
831     std::vector<FilterWrapper::Component> filters;
832     for (const FilterWrapper::Component &filter : mComponents) {
833         std::shared_ptr<C2Component> comp;
834         std::shared_ptr<C2ComponentInterface> intf;
835         if (C2_OK != mStore->createComponent(filter.traits.name, &comp)) {
836             return {};
837         }
838         filters.push_back({comp, comp->intf(), filter.traits, filter.desc});
839     }
840     return filters;
841 }
842 
getTraits(const std::shared_ptr<C2ComponentInterface> & intf)843 C2Component::Traits FilterWrapper::getTraits(
844         const std::shared_ptr<C2ComponentInterface> &intf) {
845     {
846         std::unique_lock lock(mCacheMutex);
847         if (mCachedTraits.count(intf->getName())) {
848             return mCachedTraits.at(intf->getName());
849         }
850     }
851     C2ComponentDomainSetting domain;
852     C2ComponentKindSetting kind;
853     c2_status_t err = intf->query_vb({&domain, &kind}, {}, C2_MAY_BLOCK, nullptr);
854     C2Component::Traits traits = {
855         "query failed",  // name
856         C2Component::DOMAIN_OTHER,
857         C2Component::KIND_OTHER,
858         0,   // rank, unused
859         "",  // media type, unused
860         "",  // owner, unused
861         {},  // aliases, unused
862     };
863     if (err == C2_OK) {
864         traits = {
865             intf->getName(),
866             domain.value,
867             kind.value,
868             0,   // rank, unused
869             "",  // media type, unused
870             "",  // owner, unused
871             {},  // aliases, unused
872         };
873         std::unique_lock lock(mCacheMutex);
874         mCachedTraits[traits.name] = traits;
875     }
876     return traits;
877 }
878 
maybeWrapInterface(const std::shared_ptr<C2ComponentInterface> intf)879 std::shared_ptr<C2ComponentInterface> FilterWrapper::maybeWrapInterface(
880         const std::shared_ptr<C2ComponentInterface> intf) {
881     if (mInit != OK) {
882         LOG(VERBOSE) << "maybeWrapInterface: Wrapper not initialized: "
883                 << intf->getName() << " is not wrapped.";
884         return intf;
885     }
886     C2Component::Traits traits = getTraits(intf);
887     if (traits.name != intf->getName()) {
888         LOG(INFO) << "maybeWrapInterface: Querying traits from " << intf->getName()
889                 << " failed; not wrapping the interface";
890         return intf;
891     }
892     if ((traits.domain != C2Component::DOMAIN_VIDEO && traits.domain != C2Component::DOMAIN_IMAGE)
893             || traits.kind != C2Component::KIND_DECODER) {
894         LOG(VERBOSE) << "maybeWrapInterface: " << traits.name
895                 << " is not video/image decoder; not wrapping the interface";
896         return intf;
897     }
898     return std::make_shared<WrappedDecoderInterface>(
899             intf, createFilters(), weak_from_this());
900 }
901 
maybeWrapComponent(const std::shared_ptr<C2Component> comp)902 std::shared_ptr<C2Component> FilterWrapper::maybeWrapComponent(
903         const std::shared_ptr<C2Component> comp) {
904     if (mInit != OK) {
905         LOG(VERBOSE) << "maybeWrapComponent: Wrapper not initialized: "
906                 << comp->intf()->getName() << " is not wrapped.";
907         return comp;
908     }
909     C2Component::Traits traits = getTraits(comp->intf());
910     if (traits.name != comp->intf()->getName()) {
911         LOG(INFO) << "maybeWrapComponent: Querying traits from " << comp->intf()->getName()
912                 << " failed; not wrapping the component";
913         return comp;
914     }
915     if ((traits.domain != C2Component::DOMAIN_VIDEO && traits.domain != C2Component::DOMAIN_IMAGE)
916             || traits.kind != C2Component::KIND_DECODER) {
917         LOG(VERBOSE) << "maybeWrapComponent: " << traits.name
918                 << " is not video/image decoder; not wrapping the component";
919         return comp;
920     }
921     std::vector<Component> filters = createFilters();
922     std::shared_ptr wrapped = std::make_shared<WrappedDecoder>(
923             comp, std::vector(filters), weak_from_this());
924     {
925         std::unique_lock lock(mWrappedComponentsMutex);
926         std::vector<std::weak_ptr<const C2Component>> &components =
927             mWrappedComponents.emplace_back();
928         components.push_back(wrapped);
929         components.push_back(comp);
930         for (const Component &filter : filters) {
931             components.push_back(filter.comp);
932         }
933     }
934     return wrapped;
935 }
936 
isFilteringEnabled(const std::shared_ptr<C2ComponentInterface> & intf)937 bool FilterWrapper::isFilteringEnabled(const std::shared_ptr<C2ComponentInterface> &intf) {
938     if (mInit != OK) {
939         LOG(WARNING) << "isFilteringEnabled: Wrapper not initialized: ";
940         return false;
941     }
942     return mPlugin->isFilteringEnabled(intf);
943 }
944 
createBlockPool(C2PlatformAllocatorStore::id_t allocatorId,std::shared_ptr<const C2Component> component,std::shared_ptr<C2BlockPool> * pool)945 c2_status_t FilterWrapper::createBlockPool(
946         C2PlatformAllocatorStore::id_t allocatorId,
947         std::shared_ptr<const C2Component> component,
948         std::shared_ptr<C2BlockPool> *pool) {
949     std::unique_lock lock(mWrappedComponentsMutex);
950     for (auto it = mWrappedComponents.begin(); it != mWrappedComponents.end(); ) {
951         std::shared_ptr<const C2Component> comp = it->front().lock();
952         if (!comp) {
953             it = mWrappedComponents.erase(it);
954             continue;
955         }
956         if (component == comp) {
957             std::vector<std::shared_ptr<const C2Component>> components(it->size());
958             std::transform(
959                     it->begin(), it->end(), components.begin(),
960                     [](const std::weak_ptr<const C2Component> &el) {
961                         return el.lock();
962                     });
963             if (C2_OK == CreateCodec2BlockPool(allocatorId, components, pool)) {
964                 return C2_OK;
965             }
966         }
967         ++it;
968     }
969     return CreateCodec2BlockPool(allocatorId, component, pool);
970 }
971 
queryParamsForPreviousComponent(const std::shared_ptr<C2ComponentInterface> & intf,std::vector<std::unique_ptr<C2Param>> * params)972 c2_status_t FilterWrapper::queryParamsForPreviousComponent(
973         const std::shared_ptr<C2ComponentInterface> &intf,
974         std::vector<std::unique_ptr<C2Param>> *params) {
975     if (mInit != OK) {
976         LOG(WARNING) << "queryParamsForPreviousComponent: Wrapper not initialized: ";
977         return C2_NO_INIT;
978     }
979     return mPlugin->queryParamsForPreviousComponent(intf, params);
980 }
981 
982 }  // namespace android
983