1 /*
2 * Copyright (c) 2021, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "TelemetryServer.h"
18
19 #include "CarTelemetryImpl.h"
20 #include "RingBuffer.h"
21
22 #include <aidl/android/automotive/telemetry/internal/CarDataInternal.h>
23 #include <android-base/logging.h>
24
25 #include <inttypes.h> // for PRIu64 and friends
26
27 #include <memory>
28
29 namespace android {
30 namespace automotive {
31 namespace telemetry {
32
33 namespace {
34
35 using ::aidl::android::automotive::telemetry::internal::CarDataInternal;
36 using ::aidl::android::automotive::telemetry::internal::ICarDataListener;
37 using ::aidl::android::frameworks::automotive::telemetry::CarData;
38 using ::android::base::Error;
39 using ::android::base::Result;
40
41 enum {
42 MSG_PUSH_CAR_DATA_TO_LISTENER = 1,
43 };
44
45 // If ICarDataListener cannot accept data, the next push should be delayed little bit to allow
46 // the listener to recover.
47 constexpr const std::chrono::seconds kPushCarDataFailureDelaySeconds = 1s;
48 } // namespace
49
TelemetryServer(LooperWrapper * looper,const std::chrono::nanoseconds & pushCarDataDelayNs,const int maxBufferSize)50 TelemetryServer::TelemetryServer(LooperWrapper* looper,
51 const std::chrono::nanoseconds& pushCarDataDelayNs,
52 const int maxBufferSize) :
53 mLooper(looper),
54 mPushCarDataDelayNs(pushCarDataDelayNs),
55 mRingBuffer(maxBufferSize),
56 mMessageHandler(new MessageHandlerImpl(this)) {}
57
setListener(const std::shared_ptr<ICarDataListener> & listener)58 Result<void> TelemetryServer::setListener(const std::shared_ptr<ICarDataListener>& listener) {
59 const std::scoped_lock<std::mutex> lock(mMutex);
60 if (mCarDataListener != nullptr) {
61 return Error(::EX_ILLEGAL_STATE) << "ICarDataListener is already set";
62 }
63 mCarDataListener = listener;
64 mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
65 MSG_PUSH_CAR_DATA_TO_LISTENER);
66 return {};
67 }
68
clearListener()69 void TelemetryServer::clearListener() {
70 const std::scoped_lock<std::mutex> lock(mMutex);
71 if (mCarDataListener == nullptr) {
72 return;
73 }
74 mCarDataListener = nullptr;
75 mLooper->removeMessages(mMessageHandler, MSG_PUSH_CAR_DATA_TO_LISTENER);
76 }
77
getListener()78 std::shared_ptr<ICarDataListener> TelemetryServer::getListener() {
79 const std::scoped_lock<std::mutex> lock(mMutex);
80 return mCarDataListener;
81 }
82
dump(int fd)83 void TelemetryServer::dump(int fd) {
84 const std::scoped_lock<std::mutex> lock(mMutex);
85 dprintf(fd, " TelemetryServer:\n");
86 mRingBuffer.dump(fd);
87 }
88
89 // TODO(b/174608802): Add 10kb size check for the `dataList`, see the AIDL for the limits
writeCarData(const std::vector<CarData> & dataList,uid_t publisherUid)90 void TelemetryServer::writeCarData(const std::vector<CarData>& dataList, uid_t publisherUid) {
91 const std::scoped_lock<std::mutex> lock(mMutex);
92 bool bufferWasEmptyBefore = mRingBuffer.size() == 0;
93 for (auto&& data : dataList) {
94 mRingBuffer.push({.mId = data.id,
95 .mContent = std::move(data.content),
96 .mPublisherUid = publisherUid});
97 }
98 // If the mRingBuffer was not empty, the message is already scheduled. It prevents scheduling
99 // too many unnecessary idendical messages in the looper.
100 if (mCarDataListener != nullptr && bufferWasEmptyBefore && mRingBuffer.size() > 0) {
101 mLooper->sendMessageDelayed(mPushCarDataDelayNs.count(), mMessageHandler,
102 MSG_PUSH_CAR_DATA_TO_LISTENER);
103 }
104 }
105
106 // Runs on the main thread.
pushCarDataToListeners()107 void TelemetryServer::pushCarDataToListeners() {
108 std::shared_ptr<ICarDataListener> listener;
109 std::vector<CarDataInternal> pendingCarDataInternals;
110 {
111 const std::scoped_lock<std::mutex> lock(mMutex);
112 // Remove extra messages.
113 mLooper->removeMessages(mMessageHandler, MSG_PUSH_CAR_DATA_TO_LISTENER);
114 if (mCarDataListener == nullptr || mRingBuffer.size() == 0) {
115 return;
116 }
117 listener = mCarDataListener;
118 // Push elements to pendingCarDataInternals in reverse order so we can send data
119 // from the back of the pendingCarDataInternals vector.
120 while (mRingBuffer.size() > 0) {
121 auto carData = std::move(mRingBuffer.popBack());
122 CarDataInternal data;
123 data.id = carData.mId;
124 data.content = std::move(carData.mContent);
125 pendingCarDataInternals.push_back(data);
126 }
127 }
128
129 // Now the mutex is unlocked, we can do the heavy work.
130
131 // TODO(b/186477983): send data in batch to improve performance, but careful sending too
132 // many data at once, as it could clog the Binder - it has <1MB limit.
133 while (!pendingCarDataInternals.empty()) {
134 auto status = listener->onCarDataReceived({pendingCarDataInternals.back()});
135 if (!status.isOk()) {
136 LOG(WARNING) << "Failed to push CarDataInternal, will try again: "
137 << status.getMessage();
138 sleep(kPushCarDataFailureDelaySeconds.count());
139 } else {
140 pendingCarDataInternals.pop_back();
141 }
142 }
143 }
144
MessageHandlerImpl(TelemetryServer * server)145 TelemetryServer::MessageHandlerImpl::MessageHandlerImpl(TelemetryServer* server) :
146 mTelemetryServer(server) {}
147
handleMessage(const Message & message)148 void TelemetryServer::MessageHandlerImpl::handleMessage(const Message& message) {
149 switch (message.what) {
150 case MSG_PUSH_CAR_DATA_TO_LISTENER:
151 mTelemetryServer->pushCarDataToListeners();
152 break;
153 default:
154 LOG(WARNING) << "Unknown message: " << message.what;
155 }
156 }
157
158 } // namespace telemetry
159 } // namespace automotive
160 } // namespace android
161