1 /*
2  * Copyright (c) 2021, The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "TelemetryServer.h"
18 
19 #include "CarTelemetryImpl.h"
20 #include "RingBuffer.h"
21 
22 #include <aidl/android/automotive/telemetry/internal/CarDataInternal.h>
23 #include <android-base/logging.h>
24 
25 #include <inttypes.h>  // for PRIu64 and friends
26 
27 #include <memory>
28 
29 namespace android {
30 namespace automotive {
31 namespace telemetry {
32 
33 namespace {
34 
35 using ::aidl::android::automotive::telemetry::internal::CarDataInternal;
36 using ::aidl::android::automotive::telemetry::internal::ICarDataListener;
37 using ::aidl::android::frameworks::automotive::telemetry::CarData;
38 using ::android::base::Error;
39 using ::android::base::Result;
40 
41 enum {
42     MSG_PUSH_CAR_DATA_TO_LISTENER = 1,
43 };
44 
45 // If ICarDataListener cannot accept data, the next push should be delayed little bit to allow
46 // the listener to recover.
47 constexpr const std::chrono::seconds kPushCarDataFailureDelaySeconds = 1s;
48 }  // namespace
49 
TelemetryServer(LooperWrapper * looper,const std::chrono::nanoseconds & pushCarDataDelayNs,const int maxBufferSize)50 TelemetryServer::TelemetryServer(LooperWrapper* looper,
51                                  const std::chrono::nanoseconds& pushCarDataDelayNs,
52                                  const int maxBufferSize) :
53       mLooper(looper),
54       mPushCarDataDelayNs(pushCarDataDelayNs),
55       mRingBuffer(maxBufferSize),
56       mMessageHandler(new MessageHandlerImpl(this)) {}
57 
setListener(const std::shared_ptr<ICarDataListener> & listener)58 Result<void> TelemetryServer::setListener(const std::shared_ptr<ICarDataListener>& listener) {
59     const std::scoped_lock<std::mutex> lock(mMutex);
60     if (mCarDataListener != nullptr) {
61         return Error(::EX_ILLEGAL_STATE) << "ICarDataListener is already set";
62     }
63     mCarDataListener = listener;
64     mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
65                                 MSG_PUSH_CAR_DATA_TO_LISTENER);
66     return {};
67 }
68 
clearListener()69 void TelemetryServer::clearListener() {
70     const std::scoped_lock<std::mutex> lock(mMutex);
71     if (mCarDataListener == nullptr) {
72         return;
73     }
74     mCarDataListener = nullptr;
75     mLooper->removeMessages(mMessageHandler, MSG_PUSH_CAR_DATA_TO_LISTENER);
76 }
77 
getListener()78 std::shared_ptr<ICarDataListener> TelemetryServer::getListener() {
79     const std::scoped_lock<std::mutex> lock(mMutex);
80     return mCarDataListener;
81 }
82 
dump(int fd)83 void TelemetryServer::dump(int fd) {
84     const std::scoped_lock<std::mutex> lock(mMutex);
85     dprintf(fd, "  TelemetryServer:\n");
86     mRingBuffer.dump(fd);
87 }
88 
89 // TODO(b/174608802): Add 10kb size check for the `dataList`, see the AIDL for the limits
writeCarData(const std::vector<CarData> & dataList,uid_t publisherUid)90 void TelemetryServer::writeCarData(const std::vector<CarData>& dataList, uid_t publisherUid) {
91     const std::scoped_lock<std::mutex> lock(mMutex);
92     bool bufferWasEmptyBefore = mRingBuffer.size() == 0;
93     for (auto&& data : dataList) {
94         mRingBuffer.push({.mId = data.id,
95                           .mContent = std::move(data.content),
96                           .mPublisherUid = publisherUid});
97     }
98     // If the mRingBuffer was not empty, the message is already scheduled. It prevents scheduling
99     // too many unnecessary idendical messages in the looper.
100     if (mCarDataListener != nullptr && bufferWasEmptyBefore && mRingBuffer.size() > 0) {
101         mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
102                                     MSG_PUSH_CAR_DATA_TO_LISTENER);
103     }
104 }
105 
106 // Runs on the main thread.
pushCarDataToListeners()107 void TelemetryServer::pushCarDataToListeners() {
108     std::shared_ptr<ICarDataListener> listener;
109     std::vector<CarDataInternal> pendingCarDataInternals;
110     {
111         const std::scoped_lock<std::mutex> lock(mMutex);
112         // Remove extra messages.
113         mLooper->removeMessages(mMessageHandler, MSG_PUSH_CAR_DATA_TO_LISTENER);
114         if (mCarDataListener == nullptr || mRingBuffer.size() == 0) {
115             return;
116         }
117         listener = mCarDataListener;
118         // Push elements to pendingCarDataInternals in reverse order so we can send data
119         // from the back of the pendingCarDataInternals vector.
120         while (mRingBuffer.size() > 0) {
121             auto carData = std::move(mRingBuffer.popBack());
122             CarDataInternal data;
123             data.id = carData.mId;
124             data.content = std::move(carData.mContent);
125             pendingCarDataInternals.push_back(data);
126         }
127     }
128 
129     // Now the mutex is unlocked, we can do the heavy work.
130 
131     // TODO(b/186477983): send data in batch to improve performance, but careful sending too
132     //                    many data at once, as it could clog the Binder - it has <1MB limit.
133     while (!pendingCarDataInternals.empty()) {
134         auto status = listener->onCarDataReceived({pendingCarDataInternals.back()});
135         if (!status.isOk()) {
136             LOG(WARNING) << "Failed to push CarDataInternal, will try again: "
137                          << status.getMessage();
138             sleep(kPushCarDataFailureDelaySeconds.count());
139         } else {
140             pendingCarDataInternals.pop_back();
141         }
142     }
143 }
144 
MessageHandlerImpl(TelemetryServer * server)145 TelemetryServer::MessageHandlerImpl::MessageHandlerImpl(TelemetryServer* server) :
146       mTelemetryServer(server) {}
147 
handleMessage(const Message & message)148 void TelemetryServer::MessageHandlerImpl::handleMessage(const Message& message) {
149     switch (message.what) {
150         case MSG_PUSH_CAR_DATA_TO_LISTENER:
151             mTelemetryServer->pushCarDataToListeners();
152             break;
153         default:
154             LOG(WARNING) << "Unknown message: " << message.what;
155     }
156 }
157 
158 }  // namespace telemetry
159 }  // namespace automotive
160 }  // namespace android
161