1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #define LOG_TAG "BufferPoolManager"
17 //#define LOG_NDEBUG 0
18 
19 #include <bufferpool/ClientManager.h>
20 #include <hidl/HidlTransportSupport.h>
21 #include <sys/types.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include "BufferPoolClient.h"
26 #include "Observer.h"
27 #include "Accessor.h"
28 
29 namespace android {
30 namespace hardware {
31 namespace media {
32 namespace bufferpool {
33 namespace V2_0 {
34 namespace implementation {
35 
36 static constexpr int64_t kRegisterTimeoutUs = 500000; // 0.5 sec
37 static constexpr int64_t kCleanUpDurationUs = 1000000; // TODO: 1 sec tune
38 static constexpr int64_t kClientTimeoutUs = 5000000; // TODO: 5 secs tune
39 
40 /**
41  * The holder of the cookie of remote IClientManager.
42  * The cookie is process locally unique for each IClientManager.
43  * (The cookie is used to notify death of clients to bufferpool process.)
44  */
45 class ClientManagerCookieHolder {
46 public:
47     /**
48      * Creates a cookie holder for remote IClientManager(s).
49      */
50     ClientManagerCookieHolder();
51 
52     /**
53      * Gets a cookie for a remote IClientManager.
54      *
55      * @param manager   the specified remote IClientManager.
56      * @param added     true when the specified remote IClientManager is added
57      *                  newly, false otherwise.
58      *
59      * @return the process locally unique cookie for the specified IClientManager.
60      */
61     uint64_t getCookie(const sp<IClientManager> &manager, bool *added);
62 
63 private:
64     uint64_t mSeqId;
65     std::mutex mLock;
66     std::list<std::pair<const wp<IClientManager>, uint64_t>> mManagers;
67 };
68 
ClientManagerCookieHolder()69 ClientManagerCookieHolder::ClientManagerCookieHolder() : mSeqId(0){}
70 
getCookie(const sp<IClientManager> & manager,bool * added)71 uint64_t ClientManagerCookieHolder::getCookie(
72         const sp<IClientManager> &manager,
73         bool *added) {
74     std::lock_guard<std::mutex> lock(mLock);
75     for (auto it = mManagers.begin(); it != mManagers.end();) {
76         const sp<IClientManager> key = it->first.promote();
77         if (key) {
78             if (interfacesEqual(key, manager)) {
79                 *added = false;
80                 return it->second;
81             }
82             ++it;
83         } else {
84             it = mManagers.erase(it);
85         }
86     }
87     uint64_t id = mSeqId++;
88     *added = true;
89     mManagers.push_back(std::make_pair(manager, id));
90     return id;
91 }
92 
93 class ClientManager::Impl {
94 public:
95     Impl();
96 
97     // BnRegisterSender
98     ResultStatus registerSender(const sp<IAccessor> &accessor,
99                                 ConnectionId *pConnectionId);
100 
101     // BpRegisterSender
102     ResultStatus registerSender(const sp<IClientManager> &receiver,
103                                 ConnectionId senderId,
104                                 ConnectionId *receiverId);
105 
106     ResultStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator,
107                         ConnectionId *pConnectionId);
108 
109     ResultStatus close(ConnectionId connectionId);
110 
111     ResultStatus flush(ConnectionId connectionId);
112 
113     ResultStatus allocate(ConnectionId connectionId,
114                           const std::vector<uint8_t> &params,
115                           native_handle_t **handle,
116                           std::shared_ptr<BufferPoolData> *buffer);
117 
118     ResultStatus receive(ConnectionId connectionId,
119                          TransactionId transactionId,
120                          BufferId bufferId,
121                          int64_t timestampUs,
122                          native_handle_t **handle,
123                          std::shared_ptr<BufferPoolData> *buffer);
124 
125     ResultStatus postSend(ConnectionId receiverId,
126                           const std::shared_ptr<BufferPoolData> &buffer,
127                           TransactionId *transactionId,
128                           int64_t *timestampUs);
129 
130     ResultStatus getAccessor(ConnectionId connectionId,
131                              sp<IAccessor> *accessor);
132 
133     void cleanUp(bool clearCache = false);
134 
135 private:
136     // In order to prevent deadlock between multiple locks,
137     // always lock ClientCache.lock before locking ActiveClients.lock.
138     struct ClientCache {
139         // This lock is held for brief duration.
140         // Blocking operation is not performed while holding the lock.
141         std::mutex mMutex;
142         std::list<std::pair<const wp<IAccessor>, const std::weak_ptr<BufferPoolClient>>>
143                 mClients;
144         std::condition_variable mConnectCv;
145         bool mConnecting;
146         int64_t mLastCleanUpUs;
147 
ClientCacheandroid::hardware::media::bufferpool::V2_0::implementation::ClientManager::Impl::ClientCache148         ClientCache() : mConnecting(false), mLastCleanUpUs(getTimestampNow()) {}
149     } mCache;
150 
151     // Active clients which can be retrieved via ConnectionId
152     struct ActiveClients {
153         // This lock is held for brief duration.
154         // Blocking operation is not performed holding the lock.
155         std::mutex mMutex;
156         std::map<ConnectionId, const std::shared_ptr<BufferPoolClient>>
157                 mClients;
158     } mActive;
159 
160     sp<Observer> mObserver;
161 
162     ClientManagerCookieHolder mRemoteClientCookies;
163 };
164 
Impl()165 ClientManager::Impl::Impl()
166     : mObserver(new Observer()) {}
167 
registerSender(const sp<IAccessor> & accessor,ConnectionId * pConnectionId)168 ResultStatus ClientManager::Impl::registerSender(
169         const sp<IAccessor> &accessor, ConnectionId *pConnectionId) {
170     cleanUp();
171     int64_t timeoutUs = getTimestampNow() + kRegisterTimeoutUs;
172     do {
173         std::unique_lock<std::mutex> lock(mCache.mMutex);
174         for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) {
175             sp<IAccessor> sAccessor = it->first.promote();
176             if (sAccessor && interfacesEqual(sAccessor, accessor)) {
177                 const std::shared_ptr<BufferPoolClient> client = it->second.lock();
178                 if (client) {
179                     std::lock_guard<std::mutex> lock(mActive.mMutex);
180                     *pConnectionId = client->getConnectionId();
181                     if (mActive.mClients.find(*pConnectionId) != mActive.mClients.end()) {
182                         ALOGV("register existing connection %lld", (long long)*pConnectionId);
183                         return ResultStatus::ALREADY_EXISTS;
184                     }
185                 }
186                 mCache.mClients.erase(it);
187                 break;
188             }
189         }
190         if (!mCache.mConnecting) {
191             mCache.mConnecting = true;
192             lock.unlock();
193             ResultStatus result = ResultStatus::OK;
194             const std::shared_ptr<BufferPoolClient> client =
195                     std::make_shared<BufferPoolClient>(accessor, mObserver);
196             lock.lock();
197             if (!client) {
198                 result = ResultStatus::NO_MEMORY;
199             } else if (!client->isValid()) {
200                 result = ResultStatus::CRITICAL_ERROR;
201             }
202             if (result == ResultStatus::OK) {
203                 // TODO: handle insert fail. (malloc fail)
204                 const std::weak_ptr<BufferPoolClient> wclient = client;
205                 mCache.mClients.push_back(std::make_pair(accessor, wclient));
206                 ConnectionId conId = client->getConnectionId();
207                 mObserver->addClient(conId, wclient);
208                 {
209                     std::lock_guard<std::mutex> lock(mActive.mMutex);
210                     mActive.mClients.insert(std::make_pair(conId, client));
211                 }
212                 *pConnectionId = conId;
213                 ALOGV("register new connection %lld", (long long)*pConnectionId);
214             }
215             mCache.mConnecting = false;
216             lock.unlock();
217             mCache.mConnectCv.notify_all();
218             return result;
219         }
220         mCache.mConnectCv.wait_for(
221                 lock, std::chrono::microseconds(kRegisterTimeoutUs));
222     } while (getTimestampNow() < timeoutUs);
223     // TODO: return timeout error
224     return ResultStatus::CRITICAL_ERROR;
225 }
226 
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)227 ResultStatus ClientManager::Impl::registerSender(
228         const sp<IClientManager> &receiver,
229         ConnectionId senderId,
230         ConnectionId *receiverId) {
231     sp<IAccessor> accessor;
232     bool local = false;
233     {
234         std::lock_guard<std::mutex> lock(mActive.mMutex);
235         auto it = mActive.mClients.find(senderId);
236         if (it == mActive.mClients.end()) {
237             return ResultStatus::NOT_FOUND;
238         }
239         it->second->getAccessor(&accessor);
240         local = it->second->isLocal();
241     }
242     ResultStatus rs = ResultStatus::CRITICAL_ERROR;
243     if (accessor) {
244        Return<void> transResult = receiver->registerSender(
245                 accessor,
246                 [&rs, receiverId](
247                         ResultStatus status,
248                         int64_t connectionId) {
249                     rs = status;
250                     *receiverId = connectionId;
251                 });
252         if (!transResult.isOk()) {
253             return ResultStatus::CRITICAL_ERROR;
254         } else if (local && rs == ResultStatus::OK) {
255             sp<ConnectionDeathRecipient> recipient = Accessor::getConnectionDeathRecipient();
256             if (recipient)  {
257                 ALOGV("client death recipient registered %lld", (long long)*receiverId);
258                 bool added;
259                 uint64_t cookie = mRemoteClientCookies.getCookie(receiver, &added);
260                 recipient->addCookieToConnection(cookie, *receiverId);
261                 if (added) {
262                     Return<bool> transResult = receiver->linkToDeath(recipient, cookie);
263                 }
264             }
265         }
266     }
267     return rs;
268 }
269 
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)270 ResultStatus ClientManager::Impl::create(
271         const std::shared_ptr<BufferPoolAllocator> &allocator,
272         ConnectionId *pConnectionId) {
273     const sp<Accessor> accessor = new Accessor(allocator);
274     if (!accessor || !accessor->isValid()) {
275         return ResultStatus::CRITICAL_ERROR;
276     }
277     // TODO: observer is local. use direct call instead of hidl call.
278     std::shared_ptr<BufferPoolClient> client =
279             std::make_shared<BufferPoolClient>(accessor, mObserver);
280     if (!client || !client->isValid()) {
281         return ResultStatus::CRITICAL_ERROR;
282     }
283     // Since a new bufferpool is created, evict memories which are used by
284     // existing bufferpools and clients.
285     cleanUp(true);
286     {
287         // TODO: handle insert fail. (malloc fail)
288         std::lock_guard<std::mutex> lock(mCache.mMutex);
289         const std::weak_ptr<BufferPoolClient> wclient = client;
290         mCache.mClients.push_back(std::make_pair(accessor, wclient));
291         ConnectionId conId = client->getConnectionId();
292         mObserver->addClient(conId, wclient);
293         {
294             std::lock_guard<std::mutex> lock(mActive.mMutex);
295             mActive.mClients.insert(std::make_pair(conId, client));
296         }
297         *pConnectionId = conId;
298         ALOGV("create new connection %lld", (long long)*pConnectionId);
299     }
300     return ResultStatus::OK;
301 }
302 
close(ConnectionId connectionId)303 ResultStatus ClientManager::Impl::close(ConnectionId connectionId) {
304     std::unique_lock<std::mutex> lock1(mCache.mMutex);
305     std::unique_lock<std::mutex> lock2(mActive.mMutex);
306     auto it = mActive.mClients.find(connectionId);
307     if (it != mActive.mClients.end()) {
308         sp<IAccessor> accessor;
309         it->second->getAccessor(&accessor);
310         std::shared_ptr<BufferPoolClient> closing = it->second;
311         mActive.mClients.erase(connectionId);
312         for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
313             // clean up dead client caches
314             sp<IAccessor> cAccessor = cit->first.promote();
315             if (!cAccessor || (accessor && interfacesEqual(cAccessor, accessor))) {
316                 cit = mCache.mClients.erase(cit);
317             } else {
318                 cit++;
319             }
320         }
321         lock2.unlock();
322         lock1.unlock();
323         closing->flush();
324         return ResultStatus::OK;
325     }
326     return ResultStatus::NOT_FOUND;
327 }
328 
flush(ConnectionId connectionId)329 ResultStatus ClientManager::Impl::flush(ConnectionId connectionId) {
330     std::shared_ptr<BufferPoolClient> client;
331     {
332         std::lock_guard<std::mutex> lock(mActive.mMutex);
333         auto it = mActive.mClients.find(connectionId);
334         if (it == mActive.mClients.end()) {
335             return ResultStatus::NOT_FOUND;
336         }
337         client = it->second;
338     }
339     return client->flush();
340 }
341 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)342 ResultStatus ClientManager::Impl::allocate(
343         ConnectionId connectionId, const std::vector<uint8_t> &params,
344         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
345     std::shared_ptr<BufferPoolClient> client;
346     {
347         std::lock_guard<std::mutex> lock(mActive.mMutex);
348         auto it = mActive.mClients.find(connectionId);
349         if (it == mActive.mClients.end()) {
350             return ResultStatus::NOT_FOUND;
351         }
352         client = it->second;
353     }
354 #ifdef BUFFERPOOL_CLONE_HANDLES
355     native_handle_t *origHandle;
356     ResultStatus res = client->allocate(params, &origHandle, buffer);
357     if (res != ResultStatus::OK) {
358         return res;
359     }
360     *handle = native_handle_clone(origHandle);
361     if (handle == NULL) {
362         buffer->reset();
363         return ResultStatus::NO_MEMORY;
364     }
365     return ResultStatus::OK;
366 #else
367     return client->allocate(params, handle, buffer);
368 #endif
369 }
370 
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)371 ResultStatus ClientManager::Impl::receive(
372         ConnectionId connectionId, TransactionId transactionId,
373         BufferId bufferId, int64_t timestampUs,
374         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
375     std::shared_ptr<BufferPoolClient> client;
376     {
377         std::lock_guard<std::mutex> lock(mActive.mMutex);
378         auto it = mActive.mClients.find(connectionId);
379         if (it == mActive.mClients.end()) {
380             return ResultStatus::NOT_FOUND;
381         }
382         client = it->second;
383     }
384 #ifdef BUFFERPOOL_CLONE_HANDLES
385     native_handle_t *origHandle;
386     ResultStatus res = client->receive(
387             transactionId, bufferId, timestampUs, &origHandle, buffer);
388     if (res != ResultStatus::OK) {
389         return res;
390     }
391     *handle = native_handle_clone(origHandle);
392     if (handle == NULL) {
393         buffer->reset();
394         return ResultStatus::NO_MEMORY;
395     }
396     return ResultStatus::OK;
397 #else
398     return client->receive(transactionId, bufferId, timestampUs, handle, buffer);
399 #endif
400 }
401 
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)402 ResultStatus ClientManager::Impl::postSend(
403         ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
404         TransactionId *transactionId, int64_t *timestampUs) {
405     ConnectionId connectionId = buffer->mConnectionId;
406     std::shared_ptr<BufferPoolClient> client;
407     {
408         std::lock_guard<std::mutex> lock(mActive.mMutex);
409         auto it = mActive.mClients.find(connectionId);
410         if (it == mActive.mClients.end()) {
411             return ResultStatus::NOT_FOUND;
412         }
413         client = it->second;
414     }
415     return client->postSend(receiverId, buffer, transactionId, timestampUs);
416 }
417 
getAccessor(ConnectionId connectionId,sp<IAccessor> * accessor)418 ResultStatus ClientManager::Impl::getAccessor(
419         ConnectionId connectionId, sp<IAccessor> *accessor) {
420     std::shared_ptr<BufferPoolClient> client;
421     {
422         std::lock_guard<std::mutex> lock(mActive.mMutex);
423         auto it = mActive.mClients.find(connectionId);
424         if (it == mActive.mClients.end()) {
425             return ResultStatus::NOT_FOUND;
426         }
427         client = it->second;
428     }
429     return client->getAccessor(accessor);
430 }
431 
cleanUp(bool clearCache)432 void ClientManager::Impl::cleanUp(bool clearCache) {
433     int64_t now = getTimestampNow();
434     int64_t lastTransactionUs;
435     std::lock_guard<std::mutex> lock1(mCache.mMutex);
436     if (clearCache || mCache.mLastCleanUpUs + kCleanUpDurationUs < now) {
437         std::lock_guard<std::mutex> lock2(mActive.mMutex);
438         int cleaned = 0;
439         for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) {
440             if (!it->second->isActive(&lastTransactionUs, clearCache)) {
441                 if (lastTransactionUs + kClientTimeoutUs < now) {
442                     sp<IAccessor> accessor;
443                     it->second->getAccessor(&accessor);
444                     it = mActive.mClients.erase(it);
445                     ++cleaned;
446                     continue;
447                 }
448             }
449             ++it;
450         }
451         for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
452             // clean up dead client caches
453             sp<IAccessor> cAccessor = cit->first.promote();
454             if (!cAccessor) {
455                 cit = mCache.mClients.erase(cit);
456             } else {
457                 ++cit;
458             }
459         }
460         ALOGV("# of cleaned connections: %d", cleaned);
461         mCache.mLastCleanUpUs = now;
462     }
463 }
464 
465 // Methods from ::android::hardware::media::bufferpool::V2_0::IClientManager follow.
registerSender(const sp<::android::hardware::media::bufferpool::V2_0::IAccessor> & bufferPool,registerSender_cb _hidl_cb)466 Return<void> ClientManager::registerSender(const sp<::android::hardware::media::bufferpool::V2_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) {
467     if (mImpl) {
468         ConnectionId connectionId = -1;
469         ResultStatus status = mImpl->registerSender(bufferPool, &connectionId);
470         _hidl_cb(status, connectionId);
471     } else {
472         _hidl_cb(ResultStatus::CRITICAL_ERROR, -1);
473     }
474     return Void();
475 }
476 
477 // Methods for local use.
478 sp<ClientManager> ClientManager::sInstance;
479 std::mutex ClientManager::sInstanceLock;
480 
getInstance()481 sp<ClientManager> ClientManager::getInstance() {
482     std::lock_guard<std::mutex> lock(sInstanceLock);
483     if (!sInstance) {
484         sInstance = new ClientManager();
485     }
486     Accessor::createInvalidator();
487     Accessor::createEvictor();
488     return sInstance;
489 }
490 
ClientManager()491 ClientManager::ClientManager() : mImpl(new Impl()) {}
492 
~ClientManager()493 ClientManager::~ClientManager() {
494 }
495 
create(const std::shared_ptr<BufferPoolAllocator> & allocator,ConnectionId * pConnectionId)496 ResultStatus ClientManager::create(
497         const std::shared_ptr<BufferPoolAllocator> &allocator,
498         ConnectionId *pConnectionId) {
499     if (mImpl) {
500         return mImpl->create(allocator, pConnectionId);
501     }
502     return ResultStatus::CRITICAL_ERROR;
503 }
504 
registerSender(const sp<IClientManager> & receiver,ConnectionId senderId,ConnectionId * receiverId)505 ResultStatus ClientManager::registerSender(
506         const sp<IClientManager> &receiver,
507         ConnectionId senderId,
508         ConnectionId *receiverId) {
509     if (mImpl) {
510         return mImpl->registerSender(receiver, senderId, receiverId);
511     }
512     return ResultStatus::CRITICAL_ERROR;
513 }
514 
close(ConnectionId connectionId)515 ResultStatus ClientManager::close(ConnectionId connectionId) {
516     if (mImpl) {
517         return mImpl->close(connectionId);
518     }
519     return ResultStatus::CRITICAL_ERROR;
520 }
521 
flush(ConnectionId connectionId)522 ResultStatus ClientManager::flush(ConnectionId connectionId) {
523     if (mImpl) {
524         return mImpl->flush(connectionId);
525     }
526     return ResultStatus::CRITICAL_ERROR;
527 }
528 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)529 ResultStatus ClientManager::allocate(
530         ConnectionId connectionId, const std::vector<uint8_t> &params,
531         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
532     if (mImpl) {
533         return mImpl->allocate(connectionId, params, handle, buffer);
534     }
535     return ResultStatus::CRITICAL_ERROR;
536 }
537 
receive(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)538 ResultStatus ClientManager::receive(
539         ConnectionId connectionId, TransactionId transactionId,
540         BufferId bufferId, int64_t timestampUs,
541         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
542     if (mImpl) {
543         return mImpl->receive(connectionId, transactionId, bufferId,
544                               timestampUs, handle, buffer);
545     }
546     return ResultStatus::CRITICAL_ERROR;
547 }
548 
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)549 ResultStatus ClientManager::postSend(
550         ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
551         TransactionId *transactionId, int64_t* timestampUs) {
552     if (mImpl && buffer) {
553         return mImpl->postSend(receiverId, buffer, transactionId, timestampUs);
554     }
555     return ResultStatus::CRITICAL_ERROR;
556 }
557 
cleanUp()558 void ClientManager::cleanUp() {
559     if (mImpl) {
560         mImpl->cleanUp(true);
561     }
562 }
563 
564 }  // namespace implementation
565 }  // namespace V2_0
566 }  // namespace bufferpool
567 }  // namespace media
568 }  // namespace hardware
569 }  // namespace android
570