1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #include <utils/Log.h>
20 
21 #include "ARTPConnection.h"
22 #include "ARTPSource.h"
23 #include "ASessionDescription.h"
24 
25 #include <media/stagefright/foundation/ABuffer.h>
26 #include <media/stagefright/foundation/ADebug.h>
27 #include <media/stagefright/foundation/AMessage.h>
28 #include <media/stagefright/foundation/AString.h>
29 #include <media/stagefright/foundation/hexdump.h>
30 
31 #include <android/multinetwork.h>
32 
33 #include <arpa/inet.h>
34 #include <sys/socket.h>
35 
36 namespace android {
37 
38 static const size_t kMaxUDPSize = 1500;
39 
u16at(const uint8_t * data)40 static uint16_t u16at(const uint8_t *data) {
41     return data[0] << 8 | data[1];
42 }
43 
u32at(const uint8_t * data)44 static uint32_t u32at(const uint8_t *data) {
45     return u16at(data) << 16 | u16at(&data[2]);
46 }
47 
u64at(const uint8_t * data)48 static uint64_t u64at(const uint8_t *data) {
49     return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
50 }
51 
52 // static
53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000LL;
54 
55 struct ARTPConnection::StreamInfo {
56     bool isIPv6;
57     int mRTPSocket;
58     int mRTCPSocket;
59     sp<ASessionDescription> mSessionDesc;
60     size_t mIndex;
61     sp<AMessage> mNotifyMsg;
62     KeyedVector<uint32_t, sp<ARTPSource> > mSources;
63 
64     int64_t mNumRTCPPacketsReceived;
65     int64_t mNumRTPPacketsReceived;
66     struct sockaddr_in mRemoteRTCPAddr;
67     struct sockaddr_in6 mRemoteRTCPAddr6;
68 
69     bool mIsInjected;
70 
71     // A place to save time when it polls
72     int64_t mLastPollTimeUs;
73     // RTCP Extension for CVO
74     int mCVOExtMap; // will be set to 0 if cvo is not negotiated in sdp
75 };
76 
ARTPConnection(uint32_t flags)77 ARTPConnection::ARTPConnection(uint32_t flags)
78     : mFlags(flags),
79       mPollEventPending(false),
80       mLastReceiverReportTimeUs(-1),
81       mLastBitrateReportTimeUs(-1),
82       mTargetBitrate(-1),
83       mStaticJitterTimeMs(kStaticJitterTimeMs) {
84 }
85 
~ARTPConnection()86 ARTPConnection::~ARTPConnection() {
87 }
88 
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)89 void ARTPConnection::addStream(
90         int rtpSocket, int rtcpSocket,
91         const sp<ASessionDescription> &sessionDesc,
92         size_t index,
93         const sp<AMessage> &notify,
94         bool injected) {
95     sp<AMessage> msg = new AMessage(kWhatAddStream, this);
96     msg->setInt32("rtp-socket", rtpSocket);
97     msg->setInt32("rtcp-socket", rtcpSocket);
98     msg->setObject("session-desc", sessionDesc);
99     msg->setSize("index", index);
100     msg->setMessage("notify", notify);
101     msg->setInt32("injected", injected);
102     msg->post();
103 }
104 
seekStream()105 void ARTPConnection::seekStream() {
106     sp<AMessage> msg = new AMessage(kWhatSeekStream, this);
107     msg->post();
108 }
109 
removeStream(int rtpSocket,int rtcpSocket)110 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
111     sp<AMessage> msg = new AMessage(kWhatRemoveStream, this);
112     msg->setInt32("rtp-socket", rtpSocket);
113     msg->setInt32("rtcp-socket", rtcpSocket);
114     msg->post();
115 }
116 
bumpSocketBufferSize(int s)117 static void bumpSocketBufferSize(int s) {
118     int size = 256 * 1024;
119     CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
120 }
121 
122 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)123 void ARTPConnection::MakePortPair(
124         int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
125     *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
126     CHECK_GE(*rtpSocket, 0);
127 
128     bumpSocketBufferSize(*rtpSocket);
129 
130     *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
131     CHECK_GE(*rtcpSocket, 0);
132 
133     bumpSocketBufferSize(*rtcpSocket);
134 
135     /* rand() * 1000 may overflow int type, use long long */
136     unsigned start = (unsigned)((rand()* 1000LL)/RAND_MAX) + 15550;
137     start &= ~1;
138 
139     for (unsigned port = start; port < 65535; port += 2) {
140         struct sockaddr_in addr;
141         memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
142         addr.sin_family = AF_INET;
143         addr.sin_addr.s_addr = htonl(INADDR_ANY);
144         addr.sin_port = htons(port);
145 
146         if (bind(*rtpSocket,
147                  (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
148             continue;
149         }
150 
151         addr.sin_port = htons(port + 1);
152 
153         if (bind(*rtcpSocket,
154                  (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
155             *rtpPort = port;
156             return;
157         } else {
158             // we should recreate a RTP socket to avoid bind other port in same RTP socket
159             close(*rtpSocket);
160 
161             *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
162             CHECK_GE(*rtpSocket, 0);
163             bumpSocketBufferSize(*rtpSocket);
164         }
165     }
166 
167     TRESPASS();
168 }
169 
170 // static
MakeRTPSocketPair(int * rtpSocket,int * rtcpSocket,const char * localIp,const char * remoteIp,unsigned localPort,unsigned remotePort,int64_t socketNetwork)171 void ARTPConnection::MakeRTPSocketPair(
172         int *rtpSocket, int *rtcpSocket, const char *localIp, const char *remoteIp,
173         unsigned localPort, unsigned remotePort, int64_t socketNetwork) {
174     bool isIPv6 = false;
175     if (strchr(localIp, ':') != NULL)
176         isIPv6 = true;
177 
178     *rtpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
179     CHECK_GE(*rtpSocket, 0);
180 
181     bumpSocketBufferSize(*rtpSocket);
182 
183     *rtcpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
184     CHECK_GE(*rtcpSocket, 0);
185 
186     if (socketNetwork != 0) {
187         ALOGD("trying to bind rtp socket(%d) to network(%llu).",
188                 *rtpSocket, (unsigned long long)socketNetwork);
189 
190         int result = android_setsocknetwork((net_handle_t)socketNetwork, *rtpSocket);
191         if (result != 0) {
192             ALOGW("failed(%d) to bind rtp socket(%d) to network(%llu)",
193                     result, *rtpSocket, (unsigned long long)socketNetwork);
194         }
195         result = android_setsocknetwork((net_handle_t)socketNetwork, *rtcpSocket);
196         if (result != 0) {
197             ALOGW("failed(%d) to bind rtcp socket(%d) to network(%llu)",
198                     result, *rtcpSocket, (unsigned long long)socketNetwork);
199         }
200     }
201 
202     bumpSocketBufferSize(*rtcpSocket);
203 
204     struct sockaddr *addr;
205     struct sockaddr_in addr4;
206     struct sockaddr_in6 addr6;
207 
208     if (isIPv6) {
209         addr = (struct sockaddr *)&addr6;
210         memset(&addr6, 0, sizeof(addr6));
211         addr6.sin6_family = AF_INET6;
212         inet_pton(AF_INET6, localIp, &addr6.sin6_addr);
213         addr6.sin6_port = htons((uint16_t)localPort);
214     } else {
215         addr = (struct sockaddr *)&addr4;
216         memset(&addr4, 0, sizeof(addr4));
217         addr4.sin_family = AF_INET;
218         addr4.sin_addr.s_addr = inet_addr(localIp);
219         addr4.sin_port = htons((uint16_t)localPort);
220     }
221 
222     int sockopt = 1;
223     setsockopt(*rtpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
224     setsockopt(*rtcpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
225 
226     int sizeSockSt = isIPv6 ? sizeof(addr6) : sizeof(addr4);
227 
228     if (bind(*rtpSocket, addr, sizeSockSt) == 0) {
229         ALOGI("rtp socket successfully binded. addr=%s:%d", localIp, localPort);
230     } else {
231         ALOGE("failed to bind rtp socket addr=%s:%d err=%s", localIp, localPort, strerror(errno));
232         return;
233     }
234 
235     if (isIPv6)
236         addr6.sin6_port = htons(localPort + 1);
237     else
238         addr4.sin_port = htons(localPort + 1);
239 
240     if (bind(*rtcpSocket, addr, sizeSockSt) == 0) {
241         ALOGI("rtcp socket successfully binded. addr=%s:%d", localIp, localPort + 1);
242     } else {
243         ALOGE("failed to bind rtcp socket addr=%s:%d err=%s", localIp,
244                 localPort + 1, strerror(errno));
245     }
246 
247     // Re uses addr variable as remote addr.
248     if (isIPv6) {
249         memset(&addr6, 0, sizeof(addr6));
250         addr6.sin6_family = AF_INET6;
251         inet_pton(AF_INET6, remoteIp, &addr6.sin6_addr);
252         addr6.sin6_port = htons((uint16_t)remotePort);
253     } else {
254         memset(&addr4, 0, sizeof(addr4));
255         addr4.sin_family = AF_INET;
256         addr4.sin_addr.s_addr = inet_addr(remoteIp);
257         addr4.sin_port = htons((uint16_t)remotePort);
258     }
259     if (connect(*rtpSocket, addr, sizeSockSt) == 0) {
260         ALOGI("rtp socket successfully connected to remote=%s:%d", remoteIp, remotePort);
261     } else {
262         ALOGE("failed to connect rtp socket to remote addr=%s:%d err=%s", remoteIp,
263                 remotePort, strerror(errno));
264         return;
265     }
266 
267     if (isIPv6)
268         addr6.sin6_port = htons(remotePort + 1);
269     else
270         addr4.sin_port = htons(remotePort + 1);
271 
272     if (connect(*rtcpSocket, addr, sizeSockSt) == 0) {
273         ALOGI("rtcp socket successfully connected to remote=%s:%d", remoteIp, remotePort + 1);
274     } else {
275         ALOGE("failed to connect rtcp socket addr=%s:%d err=%s", remoteIp,
276                 remotePort + 1, strerror(errno));
277         return;
278     }
279 }
280 
onMessageReceived(const sp<AMessage> & msg)281 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
282     switch (msg->what()) {
283         case kWhatAddStream:
284         {
285             onAddStream(msg);
286             break;
287         }
288 
289         case kWhatSeekStream:
290         {
291             onSeekStream(msg);
292             break;
293         }
294 
295         case kWhatRemoveStream:
296         {
297             onRemoveStream(msg);
298             break;
299         }
300 
301         case kWhatPollStreams:
302         {
303             onPollStreams();
304             break;
305         }
306 
307         case kWhatAlarmStream:
308         {
309             onAlarmStream(msg);
310             break;
311         }
312 
313         case kWhatInjectPacket:
314         {
315             onInjectPacket(msg);
316             break;
317         }
318 
319         default:
320         {
321             TRESPASS();
322             break;
323         }
324     }
325 }
326 
onAddStream(const sp<AMessage> & msg)327 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
328     mStreams.push_back(StreamInfo());
329     StreamInfo *info = &*--mStreams.end();
330 
331     int32_t s;
332     CHECK(msg->findInt32("rtp-socket", &s));
333     info->mRTPSocket = s;
334     CHECK(msg->findInt32("rtcp-socket", &s));
335     info->mRTCPSocket = s;
336 
337     int32_t injected;
338     CHECK(msg->findInt32("injected", &injected));
339 
340     info->mIsInjected = injected;
341 
342     sp<RefBase> obj;
343     CHECK(msg->findObject("session-desc", &obj));
344     info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
345 
346     CHECK(msg->findSize("index", &info->mIndex));
347     CHECK(msg->findMessage("notify", &info->mNotifyMsg));
348 
349     info->mNumRTCPPacketsReceived = 0;
350     info->mNumRTPPacketsReceived = 0;
351     memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
352     memset(&info->mRemoteRTCPAddr6, 0, sizeof(info->mRemoteRTCPAddr6));
353 
354     sp<ASessionDescription> sessionDesc = info->mSessionDesc;
355     info->mCVOExtMap = 0;
356     for (size_t i = 1; i < sessionDesc->countTracks(); ++i) {
357         int32_t cvoExtMap;
358         if (sessionDesc->getCvoExtMap(i, &cvoExtMap)) {
359             info->mCVOExtMap = cvoExtMap;
360             ALOGI("urn:3gpp:video-orientation(cvo) found as extmap:%d", info->mCVOExtMap);
361         } else {
362             ALOGI("urn:3gpp:video-orientation(cvo) not found :%d", info->mCVOExtMap);
363         }
364     }
365 
366     if (!injected) {
367         postPollEvent();
368     }
369 }
370 
onSeekStream(const sp<AMessage> & msg)371 void ARTPConnection::onSeekStream(const sp<AMessage> &msg) {
372     (void)msg; // unused param as of now.
373     List<StreamInfo>::iterator it = mStreams.begin();
374     while (it != mStreams.end()) {
375         for (size_t i = 0; i < it->mSources.size(); ++i) {
376             sp<ARTPSource> source = it->mSources.valueAt(i);
377             source->timeReset();
378         }
379         ++it;
380     }
381 }
382 
onRemoveStream(const sp<AMessage> & msg)383 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
384     int32_t rtpSocket, rtcpSocket;
385     CHECK(msg->findInt32("rtp-socket", &rtpSocket));
386     CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
387 
388     List<StreamInfo>::iterator it = mStreams.begin();
389     while (it != mStreams.end()
390            && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
391         ++it;
392     }
393 
394     if (it == mStreams.end()) {
395         return;
396     }
397 
398     mStreams.erase(it);
399 }
400 
postPollEvent()401 void ARTPConnection::postPollEvent() {
402     if (mPollEventPending) {
403         return;
404     }
405 
406     sp<AMessage> msg = new AMessage(kWhatPollStreams, this);
407     msg->post();
408 
409     mPollEventPending = true;
410 }
411 
onPollStreams()412 void ARTPConnection::onPollStreams() {
413     mPollEventPending = false;
414 
415     if (mStreams.empty()) {
416         return;
417     }
418 
419     struct timeval tv;
420     tv.tv_sec = 0;
421     tv.tv_usec = kSelectTimeoutUs;
422 
423     fd_set rs;
424     FD_ZERO(&rs);
425 
426     int maxSocket = -1;
427     for (List<StreamInfo>::iterator it = mStreams.begin();
428          it != mStreams.end(); ++it) {
429         if ((*it).mIsInjected) {
430             continue;
431         }
432 
433         FD_SET(it->mRTPSocket, &rs);
434         FD_SET(it->mRTCPSocket, &rs);
435 
436         if (it->mRTPSocket > maxSocket) {
437             maxSocket = it->mRTPSocket;
438         }
439         if (it->mRTCPSocket > maxSocket) {
440             maxSocket = it->mRTCPSocket;
441         }
442     }
443 
444     if (maxSocket == -1) {
445         return;
446     }
447 
448     int64_t nowUs = ALooper::GetNowUs();
449     int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
450 
451     if (res > 0) {
452         List<StreamInfo>::iterator it = mStreams.begin();
453         while (it != mStreams.end()) {
454             if ((*it).mIsInjected) {
455                 ++it;
456                 continue;
457             }
458             it->mLastPollTimeUs = nowUs;
459 
460             status_t err = OK;
461             if (FD_ISSET(it->mRTPSocket, &rs)) {
462                 err = receive(&*it, true);
463             }
464             if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
465                 err = receive(&*it, false);
466             }
467 
468             if (err == -ECONNRESET) {
469                 // socket failure, this stream is dead, Jim.
470                 for (size_t i = 0; i < it->mSources.size(); ++i) {
471                     sp<AMessage> notify = it->mNotifyMsg->dup();
472                     notify->setInt32("rtcp-event", 1);
473                     notify->setInt32("payload-type", 400);
474                     notify->setInt32("feedback-type", 1);
475                     notify->setInt32("sender", it->mSources.valueAt(i)->getSelfID());
476                     notify->post();
477 
478                     ALOGW("failed to receive RTP/RTCP datagram.");
479                 }
480                 it = mStreams.erase(it);
481                 continue;
482             }
483 
484             // add NACK and FIR that needs to be sent immediately.
485             sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
486             for (size_t i = 0; i < it->mSources.size(); ++i) {
487                 buffer->setRange(0, 0);
488                 int cnt = it->mSources.valueAt(i)->addNACK(buffer);
489                 if (cnt > 0) {
490                     ALOGV("Send NACK for lost %d Packets", cnt);
491                     send(&*it, buffer);
492                 }
493 
494                 buffer->setRange(0, 0);
495                 it->mSources.valueAt(i)->addFIR(buffer);
496                 if (buffer->size() > 0) {
497                     ALOGD("Send FIR immediately for lost Packets");
498                     send(&*it, buffer);
499                 }
500 
501                 buffer->setRange(0, 0);
502                 it->mSources.valueAt(i)->addTMMBR(buffer, mTargetBitrate);
503                 mTargetBitrate = -1;
504                 if (buffer->size() > 0) {
505                     ALOGV("Sending TMMBR...");
506                     ssize_t n = send(&*it, buffer);
507 
508                     if (n != (ssize_t)buffer->size()) {
509                         ALOGW("failed to send RTCP TMMBR (%s).",
510                                 n >= 0 ? "connection gone" : strerror(errno));
511                         continue;
512                     }
513                 }
514             }
515 
516             ++it;
517         }
518     }
519 
520     checkRxBitrate(nowUs);
521 
522     if (mLastReceiverReportTimeUs <= 0
523             || mLastReceiverReportTimeUs + 5000000LL <= nowUs) {
524         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
525         List<StreamInfo>::iterator it = mStreams.begin();
526         while (it != mStreams.end()) {
527             StreamInfo *s = &*it;
528 
529             if (s->mIsInjected) {
530                 ++it;
531                 continue;
532             }
533 
534             if (s->mNumRTCPPacketsReceived == 0) {
535                 // We have never received any RTCP packets on this stream,
536                 // we don't even know where to send a report.
537                 ++it;
538                 continue;
539             }
540 
541             buffer->setRange(0, 0);
542 
543             for (size_t i = 0; i < s->mSources.size(); ++i) {
544                 sp<ARTPSource> source = s->mSources.valueAt(i);
545 
546                 source->addReceiverReport(buffer);
547 
548                 if (mFlags & kRegularlyRequestFIR) {
549                     source->addFIR(buffer);
550                 }
551             }
552 
553             if (buffer->size() > 0) {
554                 ALOGV("Sending RR...");
555 
556                 ssize_t n = send(s, buffer);
557 
558                 if (n != (ssize_t)buffer->size()) {
559                     ALOGW("failed to send RTCP receiver report (%s).",
560                             n >= 0 ? "connection gone" : strerror(errno));
561                     ++it;
562                     continue;
563                 }
564 
565                 mLastReceiverReportTimeUs = nowUs;
566             }
567 
568             ++it;
569         }
570     }
571 
572     if (!mStreams.empty()) {
573         postPollEvent();
574     }
575 }
576 
onAlarmStream(const sp<AMessage> msg)577 void ARTPConnection::onAlarmStream(const sp<AMessage> msg) {
578     sp<ARTPSource> source = nullptr;
579     if (msg->findObject("source", (sp<android::RefBase>*)&source)) {
580         source->processRTPPacket();
581     }
582 }
583 
receive(StreamInfo * s,bool receiveRTP)584 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
585     ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
586 
587     CHECK(!s->mIsInjected);
588 
589     sp<ABuffer> buffer = new ABuffer(65536);
590 
591     struct sockaddr *pRemoteRTCPAddr;
592     int sizeSockSt;
593     if (s->isIPv6) {
594         pRemoteRTCPAddr = (struct sockaddr *)&s->mRemoteRTCPAddr6;
595         sizeSockSt = sizeof(struct sockaddr_in6);
596     } else {
597         pRemoteRTCPAddr = (struct sockaddr *)&s->mRemoteRTCPAddr;
598         sizeSockSt = sizeof(struct sockaddr_in);
599     }
600     socklen_t remoteAddrLen =
601         (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
602             ? sizeSockSt : 0;
603 
604     if (mFlags & kViLTEConnection) {
605         remoteAddrLen = 0;
606     }
607 
608     ssize_t nbytes;
609     do {
610         nbytes = recvfrom(
611             receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
612             buffer->data(),
613             buffer->capacity(),
614             0,
615             remoteAddrLen > 0 ? pRemoteRTCPAddr : NULL,
616             remoteAddrLen > 0 ? &remoteAddrLen : NULL);
617         mCumulativeBytes += nbytes;
618     } while (nbytes < 0 && errno == EINTR);
619 
620     if (nbytes <= 0) {
621         ALOGW("failed to recv rtp packet. cause=%s", strerror(errno));
622         // ECONNREFUSED may happen in next recvfrom() calling if one of
623         // outgoing packet can not be delivered to remote by using sendto()
624         if (errno == ECONNREFUSED) {
625             return -ECONNREFUSED;
626         } else {
627             return -ECONNRESET;
628         }
629     }
630 
631     buffer->setRange(0, nbytes);
632 
633     // ALOGI("received %d bytes.", buffer->size());
634 
635     status_t err;
636     if (receiveRTP) {
637         err = parseRTP(s, buffer);
638     } else {
639         err = parseRTCP(s, buffer);
640     }
641 
642     return err;
643 }
644 
send(const StreamInfo * info,const sp<ABuffer> buffer)645 ssize_t ARTPConnection::send(const StreamInfo *info, const sp<ABuffer> buffer) {
646         struct sockaddr* pRemoteRTCPAddr;
647         int sizeSockSt;
648 
649         /* It seems this isIPv6 variable is useless.
650          * We should remove it to prevent confusion */
651         if (info->isIPv6) {
652             pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr6;
653             sizeSockSt = sizeof(struct sockaddr_in6);
654         } else {
655             pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr;
656             sizeSockSt = sizeof(struct sockaddr_in);
657         }
658 
659         if (mFlags & kViLTEConnection) {
660             ALOGV("ViLTE RTCP");
661             pRemoteRTCPAddr = NULL;
662             sizeSockSt = 0;
663         }
664 
665         ssize_t n;
666         do {
667             n = sendto(
668                     info->mRTCPSocket, buffer->data(), buffer->size(), 0,
669                     pRemoteRTCPAddr, sizeSockSt);
670         } while (n < 0 && errno == EINTR);
671 
672         if (n < 0) {
673             ALOGW("failed to send rtcp packet. cause=%s", strerror(errno));
674         }
675 
676         return n;
677 }
678 
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)679 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
680     size_t size = buffer->size();
681 
682     if (size < 12) {
683         // Too short to be a valid RTP header.
684         return -1;
685     }
686 
687     const uint8_t *data = buffer->data();
688 
689     if ((data[0] >> 6) != 2) {
690         // Unsupported version.
691         return -1;
692     }
693 
694     if ((data[1] & 0x7f) == 20 /* decimal */) {
695         // Unassigned payload type
696         return -1;
697     }
698 
699     if (data[0] & 0x20) {
700         // Padding present.
701 
702         size_t paddingLength = data[size - 1];
703 
704         if (paddingLength + 12 > size) {
705             // If we removed this much padding we'd end up with something
706             // that's too short to be a valid RTP header.
707             return -1;
708         }
709 
710         size -= paddingLength;
711     }
712 
713     int numCSRCs = data[0] & 0x0f;
714 
715     size_t payloadOffset = 12 + 4 * numCSRCs;
716 
717     if (size < payloadOffset) {
718         // Not enough data to fit the basic header and all the CSRC entries.
719         return -1;
720     }
721 
722     int32_t cvoDegrees = -1;
723     if (data[0] & 0x10) {
724         // Header eXtension present.
725 
726         if (size < payloadOffset + 4) {
727             // Not enough data to fit the basic header, all CSRC entries
728             // and the first 4 bytes of the extension header.
729 
730             return -1;
731         }
732 
733         const uint8_t *extensionData = &data[payloadOffset];
734 
735         size_t extensionLength =
736             (4 * (extensionData[2] << 8 | extensionData[3])) + 4;
737 
738         if (size < payloadOffset + extensionLength) {
739             return -1;
740         }
741 
742         parseRTPExt(s, (const uint8_t *)extensionData, extensionLength, &cvoDegrees);
743         payloadOffset += extensionLength;
744     }
745 
746     uint32_t srcId = u32at(&data[8]);
747 
748     sp<ARTPSource> source = findSource(s, srcId);
749 
750     uint32_t rtpTime = u32at(&data[4]);
751 
752     sp<AMessage> meta = buffer->meta();
753     meta->setInt32("ssrc", srcId);
754     meta->setInt32("rtp-time", rtpTime);
755     meta->setInt32("PT", data[1] & 0x7f);
756     meta->setInt32("M", data[1] >> 7);
757     if (cvoDegrees >= 0) {
758         meta->setInt32("cvo", cvoDegrees);
759     }
760 
761     int32_t seq = u16at(&data[2]);
762     buffer->setInt32Data(seq);
763     buffer->setRange(payloadOffset, size - payloadOffset);
764 
765     if (s->mNumRTPPacketsReceived++ == 0) {
766         sp<AMessage> notify = s->mNotifyMsg->dup();
767         notify->setInt32("first-rtp", true);
768         notify->setInt32("rtcp-event", 1);
769         notify->setInt32("payload-type", ARTPSource::RTP_FIRST_PACKET);
770         notify->setInt32("rtp-time", (int32_t)rtpTime);
771         notify->setInt32("rtp-seq-num", seq);
772         notify->setInt64("recv-time-us", ALooper::GetNowUs());
773         notify->post();
774 
775         ALOGD("send first-rtp event to upper layer");
776     }
777 
778     source->processRTPPacket(buffer);
779 
780     return OK;
781 }
782 
parseRTPExt(StreamInfo * s,const uint8_t * extHeader,size_t extLen,int32_t * cvoDegrees)783 status_t ARTPConnection::parseRTPExt(StreamInfo *s,
784         const uint8_t *extHeader, size_t extLen, int32_t *cvoDegrees) {
785     if (extLen < 4)
786         return -1;
787 
788     uint16_t header = (extHeader[0] << 8) | (extHeader[1]);
789     bool isOnebyteHeader = false;
790 
791     if (header == 0xBEDE) {
792         isOnebyteHeader = true;
793     } else if (header == 0x1000) {
794         ALOGW("parseRTPExt: two-byte header is not implemented yet");
795         return -1;
796     } else {
797         ALOGW("parseRTPExt: can not recognize header");
798         return -1;
799     }
800 
801     const uint8_t *extPayload = extHeader + 4;
802     extLen -= 4;
803     size_t offset = 0; //start from first payload of rtp extension.
804     // one-byte header parser
805     while (isOnebyteHeader && offset < extLen) {
806         uint8_t extmapId = extPayload[offset] >> 4;
807         uint8_t length = (extPayload[offset] & 0xF) + 1;
808         offset++;
809 
810         // padding case
811         if (extmapId == 0)
812             continue;
813 
814         uint8_t data[16]; // maximum length value
815         for (uint8_t j = 0; offset + j <= extLen && j < length; j++) {
816             data[j] = extPayload[offset + j];
817         }
818 
819         offset += length;
820 
821         if (extmapId == s->mCVOExtMap) {
822             *cvoDegrees = (int32_t)data[0];
823             return OK;
824         }
825     }
826 
827     return BAD_VALUE;
828 }
829 
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)830 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
831     if (s->mNumRTCPPacketsReceived++ == 0) {
832         sp<AMessage> notify = s->mNotifyMsg->dup();
833         notify->setInt32("first-rtcp", true);
834         notify->setInt32("rtcp-event", 1);
835         notify->setInt32("payload-type", ARTPSource::RTCP_FIRST_PACKET);
836         notify->setInt64("recv-time-us", ALooper::GetNowUs());
837         notify->post();
838 
839         ALOGD("send first-rtcp event to upper layer");
840     }
841 
842     const uint8_t *data = buffer->data();
843     size_t size = buffer->size();
844 
845     while (size > 0) {
846         if (size < 8) {
847             // Too short to be a valid RTCP header
848             return -1;
849         }
850 
851         if ((data[0] >> 6) != 2) {
852             // Unsupported version.
853             return -1;
854         }
855 
856         if (data[0] & 0x20) {
857             // Padding present.
858 
859             size_t paddingLength = data[size - 1];
860 
861             if (paddingLength + 12 > size) {
862                 // If we removed this much padding we'd end up with something
863                 // that's too short to be a valid RTP header.
864                 return -1;
865             }
866 
867             size -= paddingLength;
868         }
869 
870         size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
871 
872         if (size < headerLength) {
873             // Only received a partial packet?
874             return -1;
875         }
876 
877         switch (data[1]) {
878             case 200:
879             {
880                 parseSR(s, data, headerLength);
881                 break;
882             }
883 
884             case 201:  // RR
885             case 202:  // SDES
886             case 204:  // APP
887                 break;
888 
889             case 205:  // TSFB (transport layer specific feedback)
890                 parseTSFB(s, data, headerLength);
891                 break;
892             case 206:  // PSFB (payload specific feedback)
893                 // hexdump(data, headerLength);
894                 parsePSFB(s, data, headerLength);
895                 ALOGI("RTCP packet type %u of size %zu", (unsigned)data[1], headerLength);
896                 break;
897 
898             case 203:
899             {
900                 parseBYE(s, data, headerLength);
901                 break;
902             }
903 
904             default:
905             {
906                 ALOGW("Unknown RTCP packet type %u of size %zu",
907                      (unsigned)data[1], headerLength);
908                 break;
909             }
910         }
911 
912         data += headerLength;
913         size -= headerLength;
914     }
915 
916     return OK;
917 }
918 
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)919 status_t ARTPConnection::parseBYE(
920         StreamInfo *s, const uint8_t *data, size_t size) {
921     size_t SC = data[0] & 0x3f;
922 
923     if (SC == 0 || size < (4 + SC * 4)) {
924         // Packet too short for the minimal BYE header.
925         return -1;
926     }
927 
928     uint32_t id = u32at(&data[4]);
929 
930     sp<ARTPSource> source = findSource(s, id);
931 
932     // Report a final stastics to be used for rtp data usage.
933     int64_t nowUs = ALooper::GetNowUs();
934     int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
935     int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
936     source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
937 
938     source->byeReceived();
939 
940     return OK;
941 }
942 
parseSR(StreamInfo * s,const uint8_t * data,size_t size)943 status_t ARTPConnection::parseSR(
944         StreamInfo *s, const uint8_t *data, size_t size) {
945     size_t RC = data[0] & 0x1f;
946 
947     if (size < (7 + RC * 6) * 4) {
948         // Packet too short for the minimal SR header.
949         return -1;
950     }
951 
952     uint32_t id = u32at(&data[4]);
953     uint64_t ntpTime = u64at(&data[8]);
954     uint32_t rtpTime = u32at(&data[16]);
955 
956 #if 0
957     ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
958          id,
959          rtpTime,
960          (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
961 #endif
962 
963     sp<ARTPSource> source = findSource(s, id);
964 
965     source->timeUpdate(rtpTime, ntpTime);
966 
967     return 0;
968 }
969 
parseTSFB(StreamInfo * s,const uint8_t * data,size_t size)970 status_t ARTPConnection::parseTSFB(
971         StreamInfo *s, const uint8_t *data, size_t size) {
972     if (size < 12) {
973         // broken packet
974         return -1;
975     }
976 
977     uint8_t msgType = data[0] & 0x1f;
978     uint32_t id = u32at(&data[4]);
979 
980     const uint8_t *ptr = &data[12];
981     size -= 12;
982 
983     using namespace std;
984     size_t FCISize;
985     switch(msgType) {
986         case 1:     // Generic NACK
987         {
988             FCISize = 4;
989             while (size >= FCISize) {
990                 uint16_t PID = u16at(&ptr[0]);  // lost packet RTP number
991                 uint16_t BLP = u16at(&ptr[2]);  // Bitmask of following Lost Packets
992 
993                 size -= FCISize;
994                 ptr += FCISize;
995 
996                 AString list_of_losts;
997                 list_of_losts.append(PID);
998                 for (int i=0 ; i<16 ; i++) {
999                     bool is_lost = BLP & (0x1 << i);
1000                     if (is_lost) {
1001                         list_of_losts.append(", ");
1002                         list_of_losts.append(PID + i);
1003                     }
1004                 }
1005                 ALOGI("Opponent losts packet of RTP %s", list_of_losts.c_str());
1006             }
1007             break;
1008         }
1009         case 3:     // TMMBR
1010         case 4:     // TMMBN
1011         {
1012             FCISize = 8;
1013             while (size >= FCISize) {
1014                 uint32_t MxTBR = u32at(&ptr[4]);
1015                 uint32_t MxTBRExp = MxTBR >> 26;
1016                 uint32_t MxTBRMantissa = (MxTBR >> 9) & 0x01FFFF;
1017                 uint32_t overhead = MxTBR & 0x01FF;
1018 
1019                 size -= FCISize;
1020                 ptr += FCISize;
1021 
1022                 uint32_t bitRate = (1 << MxTBRExp) * MxTBRMantissa;
1023 
1024                 if (msgType == 3)
1025                     ALOGI("Op -> UE Req Tx bitrate : %d X 2^%d = %d",
1026                         MxTBRMantissa, MxTBRExp, bitRate);
1027                 else if (msgType == 4)
1028                     ALOGI("OP -> UE Noti Rx bitrate : %d X 2^%d = %d",
1029                         MxTBRMantissa, MxTBRExp, bitRate);
1030 
1031                 sp<AMessage> notify = s->mNotifyMsg->dup();
1032                 notify->setInt32("rtcp-event", 1);
1033                 notify->setInt32("payload-type", 205);
1034                 notify->setInt32("feedback-type", msgType);
1035                 notify->setInt32("sender", id);
1036                 notify->setInt32("bit-rate", bitRate);
1037                 notify->post();
1038                 ALOGI("overhead : %d", overhead);
1039             }
1040             break;
1041         }
1042         default:
1043         {
1044             ALOGI("Not supported TSFB type %d", msgType);
1045             break;
1046         }
1047     }
1048 
1049     return 0;
1050 }
1051 
parsePSFB(StreamInfo * s,const uint8_t * data,size_t size)1052 status_t ARTPConnection::parsePSFB(
1053         StreamInfo *s, const uint8_t *data, size_t size) {
1054     if (size < 12) {
1055         // broken packet
1056         return -1;
1057     }
1058 
1059     uint8_t msgType = data[0] & 0x1f;
1060     uint32_t id = u32at(&data[4]);
1061 
1062     const uint8_t *ptr = &data[12];
1063     size -= 12;
1064 
1065     using namespace std;
1066     switch(msgType) {
1067         case 1:     // Picture Loss Indication (PLI)
1068         {
1069             if (size > 0) {
1070                 // PLI does not need parameters
1071                 break;
1072             };
1073             sp<AMessage> notify = s->mNotifyMsg->dup();
1074             notify->setInt32("rtcp-event", 1);
1075             notify->setInt32("payload-type", 206);
1076             notify->setInt32("feedback-type", msgType);
1077             notify->setInt32("sender", id);
1078             notify->post();
1079             ALOGI("PLI detected.");
1080             break;
1081         }
1082         case 4:     // Full Intra Request (FIR)
1083         {
1084             if (size < 4) {
1085                 break;
1086             }
1087             uint32_t requestedId = u32at(&ptr[0]);
1088             if (requestedId == (uint32_t)mSelfID) {
1089                 sp<AMessage> notify = s->mNotifyMsg->dup();
1090                 notify->setInt32("rtcp-event", 1);
1091                 notify->setInt32("payload-type", 206);
1092                 notify->setInt32("feedback-type", msgType);
1093                 notify->setInt32("sender", id);
1094                 notify->post();
1095                 ALOGI("FIR detected.");
1096             }
1097             break;
1098         }
1099         default:
1100         {
1101             ALOGI("Not supported PSFB type %d", msgType);
1102             break;
1103         }
1104     }
1105 
1106     return 0;
1107 }
findSource(StreamInfo * info,uint32_t srcId)1108 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
1109     sp<ARTPSource> source;
1110     ssize_t index = info->mSources.indexOfKey(srcId);
1111     if (index < 0) {
1112         index = info->mSources.size();
1113 
1114         source = new ARTPSource(
1115                 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
1116 
1117         if (mFlags & kViLTEConnection) {
1118             setStaticJitterTimeMs(50);
1119             source->setPeriodicFIR(false);
1120         }
1121 
1122         source->setSelfID(mSelfID);
1123         source->setStaticJitterTimeMs(mStaticJitterTimeMs);
1124         sp<AMessage> timer = new AMessage(kWhatAlarmStream, this);
1125         source->setJbTimer(timer);
1126         info->mSources.add(srcId, source);
1127     } else {
1128         source = info->mSources.valueAt(index);
1129     }
1130 
1131     return source;
1132 }
1133 
injectPacket(int index,const sp<ABuffer> & buffer)1134 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
1135     sp<AMessage> msg = new AMessage(kWhatInjectPacket, this);
1136     msg->setInt32("index", index);
1137     msg->setBuffer("buffer", buffer);
1138     msg->post();
1139 }
1140 
setSelfID(const uint32_t selfID)1141 void ARTPConnection::setSelfID(const uint32_t selfID) {
1142     mSelfID = selfID;
1143 }
1144 
setStaticJitterTimeMs(const uint32_t jbTimeMs)1145 void ARTPConnection::setStaticJitterTimeMs(const uint32_t jbTimeMs) {
1146     mStaticJitterTimeMs = jbTimeMs;
1147 }
1148 
setTargetBitrate(int32_t targetBitrate)1149 void ARTPConnection::setTargetBitrate(int32_t targetBitrate) {
1150     mTargetBitrate = targetBitrate;
1151 }
1152 
checkRxBitrate(int64_t nowUs)1153 void ARTPConnection::checkRxBitrate(int64_t nowUs) {
1154     if (mLastBitrateReportTimeUs <= 0) {
1155         mCumulativeBytes = 0;
1156         mLastBitrateReportTimeUs = nowUs;
1157     }
1158     else if (mLastEarlyNotifyTimeUs + 100000ll <= nowUs) {
1159         int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1160         int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1161         mLastEarlyNotifyTimeUs = nowUs;
1162 
1163         List<StreamInfo>::iterator it = mStreams.begin();
1164         while (it != mStreams.end()) {
1165             StreamInfo *s = &*it;
1166             if (s->mIsInjected) {
1167                 ++it;
1168                 continue;
1169             }
1170             for (size_t i = 0; i < s->mSources.size(); ++i) {
1171                 sp<ARTPSource> source = s->mSources.valueAt(i);
1172                 if (source->isNeedToEarlyNotify()) {
1173                     source->notifyPktInfo(bitrate, nowUs, false /* isRegular */);
1174                     mLastEarlyNotifyTimeUs = nowUs + (1000000ll * 3600 * 24); // after 1 day
1175                 }
1176             }
1177             ++it;
1178         }
1179     }
1180     else if (mLastBitrateReportTimeUs + 1000000ll <= nowUs) {
1181         int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1182         int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1183         ALOGI("Actual Rx bitrate : %d bits/sec", bitrate);
1184 
1185         sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
1186         List<StreamInfo>::iterator it = mStreams.begin();
1187         while (it != mStreams.end()) {
1188             StreamInfo *s = &*it;
1189             if (s->mIsInjected) {
1190                 ++it;
1191                 continue;
1192             }
1193 
1194             if (s->mNumRTCPPacketsReceived == 0) {
1195                 // We have never received any RTCP packets on this stream,
1196                 // we don't even know where to send a report.
1197                 ++it;
1198                 continue;
1199             }
1200 
1201             buffer->setRange(0, 0);
1202             for (size_t i = 0; i < s->mSources.size(); ++i) {
1203                 sp<ARTPSource> source = s->mSources.valueAt(i);
1204                 source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
1205             }
1206             ++it;
1207         }
1208         mCumulativeBytes = 0;
1209         mLastBitrateReportTimeUs = nowUs;
1210         mLastEarlyNotifyTimeUs = nowUs;
1211     }
1212 }
onInjectPacket(const sp<AMessage> & msg)1213 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
1214     int32_t index;
1215     CHECK(msg->findInt32("index", &index));
1216 
1217     sp<ABuffer> buffer;
1218     CHECK(msg->findBuffer("buffer", &buffer));
1219 
1220     List<StreamInfo>::iterator it = mStreams.begin();
1221     while (it != mStreams.end()
1222            && it->mRTPSocket != index && it->mRTCPSocket != index) {
1223         ++it;
1224     }
1225 
1226     if (it == mStreams.end()) {
1227         TRESPASS();
1228     }
1229 
1230     StreamInfo *s = &*it;
1231 
1232     if (it->mRTPSocket == index) {
1233         parseRTP(s, buffer);
1234     } else {
1235         parseRTCP(s, buffer);
1236     }
1237 }
1238 
1239 }  // namespace android
1240