1 /*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "ARTPConnection"
19 #include <utils/Log.h>
20
21 #include "ARTPConnection.h"
22 #include "ARTPSource.h"
23 #include "ASessionDescription.h"
24
25 #include <media/stagefright/foundation/ABuffer.h>
26 #include <media/stagefright/foundation/ADebug.h>
27 #include <media/stagefright/foundation/AMessage.h>
28 #include <media/stagefright/foundation/AString.h>
29 #include <media/stagefright/foundation/hexdump.h>
30
31 #include <android/multinetwork.h>
32
33 #include <arpa/inet.h>
34 #include <sys/socket.h>
35
36 namespace android {
37
38 static const size_t kMaxUDPSize = 1500;
39
u16at(const uint8_t * data)40 static uint16_t u16at(const uint8_t *data) {
41 return data[0] << 8 | data[1];
42 }
43
u32at(const uint8_t * data)44 static uint32_t u32at(const uint8_t *data) {
45 return u16at(data) << 16 | u16at(&data[2]);
46 }
47
u64at(const uint8_t * data)48 static uint64_t u64at(const uint8_t *data) {
49 return (uint64_t)(u32at(data)) << 32 | u32at(&data[4]);
50 }
51
52 // static
53 const int64_t ARTPConnection::kSelectTimeoutUs = 1000LL;
54
55 struct ARTPConnection::StreamInfo {
56 bool isIPv6;
57 int mRTPSocket;
58 int mRTCPSocket;
59 sp<ASessionDescription> mSessionDesc;
60 size_t mIndex;
61 sp<AMessage> mNotifyMsg;
62 KeyedVector<uint32_t, sp<ARTPSource> > mSources;
63
64 int64_t mNumRTCPPacketsReceived;
65 int64_t mNumRTPPacketsReceived;
66 struct sockaddr_in mRemoteRTCPAddr;
67 struct sockaddr_in6 mRemoteRTCPAddr6;
68
69 bool mIsInjected;
70
71 // A place to save time when it polls
72 int64_t mLastPollTimeUs;
73 // RTCP Extension for CVO
74 int mCVOExtMap; // will be set to 0 if cvo is not negotiated in sdp
75 };
76
ARTPConnection(uint32_t flags)77 ARTPConnection::ARTPConnection(uint32_t flags)
78 : mFlags(flags),
79 mPollEventPending(false),
80 mLastReceiverReportTimeUs(-1),
81 mLastBitrateReportTimeUs(-1),
82 mTargetBitrate(-1),
83 mStaticJitterTimeMs(kStaticJitterTimeMs) {
84 }
85
~ARTPConnection()86 ARTPConnection::~ARTPConnection() {
87 }
88
addStream(int rtpSocket,int rtcpSocket,const sp<ASessionDescription> & sessionDesc,size_t index,const sp<AMessage> & notify,bool injected)89 void ARTPConnection::addStream(
90 int rtpSocket, int rtcpSocket,
91 const sp<ASessionDescription> &sessionDesc,
92 size_t index,
93 const sp<AMessage> ¬ify,
94 bool injected) {
95 sp<AMessage> msg = new AMessage(kWhatAddStream, this);
96 msg->setInt32("rtp-socket", rtpSocket);
97 msg->setInt32("rtcp-socket", rtcpSocket);
98 msg->setObject("session-desc", sessionDesc);
99 msg->setSize("index", index);
100 msg->setMessage("notify", notify);
101 msg->setInt32("injected", injected);
102 msg->post();
103 }
104
seekStream()105 void ARTPConnection::seekStream() {
106 sp<AMessage> msg = new AMessage(kWhatSeekStream, this);
107 msg->post();
108 }
109
removeStream(int rtpSocket,int rtcpSocket)110 void ARTPConnection::removeStream(int rtpSocket, int rtcpSocket) {
111 sp<AMessage> msg = new AMessage(kWhatRemoveStream, this);
112 msg->setInt32("rtp-socket", rtpSocket);
113 msg->setInt32("rtcp-socket", rtcpSocket);
114 msg->post();
115 }
116
bumpSocketBufferSize(int s)117 static void bumpSocketBufferSize(int s) {
118 int size = 256 * 1024;
119 CHECK_EQ(setsockopt(s, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)), 0);
120 }
121
122 // static
MakePortPair(int * rtpSocket,int * rtcpSocket,unsigned * rtpPort)123 void ARTPConnection::MakePortPair(
124 int *rtpSocket, int *rtcpSocket, unsigned *rtpPort) {
125 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
126 CHECK_GE(*rtpSocket, 0);
127
128 bumpSocketBufferSize(*rtpSocket);
129
130 *rtcpSocket = socket(AF_INET, SOCK_DGRAM, 0);
131 CHECK_GE(*rtcpSocket, 0);
132
133 bumpSocketBufferSize(*rtcpSocket);
134
135 /* rand() * 1000 may overflow int type, use long long */
136 unsigned start = (unsigned)((rand()* 1000LL)/RAND_MAX) + 15550;
137 start &= ~1;
138
139 for (unsigned port = start; port < 65535; port += 2) {
140 struct sockaddr_in addr;
141 memset(addr.sin_zero, 0, sizeof(addr.sin_zero));
142 addr.sin_family = AF_INET;
143 addr.sin_addr.s_addr = htonl(INADDR_ANY);
144 addr.sin_port = htons(port);
145
146 if (bind(*rtpSocket,
147 (const struct sockaddr *)&addr, sizeof(addr)) < 0) {
148 continue;
149 }
150
151 addr.sin_port = htons(port + 1);
152
153 if (bind(*rtcpSocket,
154 (const struct sockaddr *)&addr, sizeof(addr)) == 0) {
155 *rtpPort = port;
156 return;
157 } else {
158 // we should recreate a RTP socket to avoid bind other port in same RTP socket
159 close(*rtpSocket);
160
161 *rtpSocket = socket(AF_INET, SOCK_DGRAM, 0);
162 CHECK_GE(*rtpSocket, 0);
163 bumpSocketBufferSize(*rtpSocket);
164 }
165 }
166
167 TRESPASS();
168 }
169
170 // static
MakeRTPSocketPair(int * rtpSocket,int * rtcpSocket,const char * localIp,const char * remoteIp,unsigned localPort,unsigned remotePort,int64_t socketNetwork)171 void ARTPConnection::MakeRTPSocketPair(
172 int *rtpSocket, int *rtcpSocket, const char *localIp, const char *remoteIp,
173 unsigned localPort, unsigned remotePort, int64_t socketNetwork) {
174 bool isIPv6 = false;
175 if (strchr(localIp, ':') != NULL)
176 isIPv6 = true;
177
178 *rtpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
179 CHECK_GE(*rtpSocket, 0);
180
181 bumpSocketBufferSize(*rtpSocket);
182
183 *rtcpSocket = socket(isIPv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
184 CHECK_GE(*rtcpSocket, 0);
185
186 if (socketNetwork != 0) {
187 ALOGD("trying to bind rtp socket(%d) to network(%llu).",
188 *rtpSocket, (unsigned long long)socketNetwork);
189
190 int result = android_setsocknetwork((net_handle_t)socketNetwork, *rtpSocket);
191 if (result != 0) {
192 ALOGW("failed(%d) to bind rtp socket(%d) to network(%llu)",
193 result, *rtpSocket, (unsigned long long)socketNetwork);
194 }
195 result = android_setsocknetwork((net_handle_t)socketNetwork, *rtcpSocket);
196 if (result != 0) {
197 ALOGW("failed(%d) to bind rtcp socket(%d) to network(%llu)",
198 result, *rtcpSocket, (unsigned long long)socketNetwork);
199 }
200 }
201
202 bumpSocketBufferSize(*rtcpSocket);
203
204 struct sockaddr *addr;
205 struct sockaddr_in addr4;
206 struct sockaddr_in6 addr6;
207
208 if (isIPv6) {
209 addr = (struct sockaddr *)&addr6;
210 memset(&addr6, 0, sizeof(addr6));
211 addr6.sin6_family = AF_INET6;
212 inet_pton(AF_INET6, localIp, &addr6.sin6_addr);
213 addr6.sin6_port = htons((uint16_t)localPort);
214 } else {
215 addr = (struct sockaddr *)&addr4;
216 memset(&addr4, 0, sizeof(addr4));
217 addr4.sin_family = AF_INET;
218 addr4.sin_addr.s_addr = inet_addr(localIp);
219 addr4.sin_port = htons((uint16_t)localPort);
220 }
221
222 int sockopt = 1;
223 setsockopt(*rtpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
224 setsockopt(*rtcpSocket, SOL_SOCKET, SO_REUSEADDR, (int *)&sockopt, sizeof(sockopt));
225
226 int sizeSockSt = isIPv6 ? sizeof(addr6) : sizeof(addr4);
227
228 if (bind(*rtpSocket, addr, sizeSockSt) == 0) {
229 ALOGI("rtp socket successfully binded. addr=%s:%d", localIp, localPort);
230 } else {
231 ALOGE("failed to bind rtp socket addr=%s:%d err=%s", localIp, localPort, strerror(errno));
232 return;
233 }
234
235 if (isIPv6)
236 addr6.sin6_port = htons(localPort + 1);
237 else
238 addr4.sin_port = htons(localPort + 1);
239
240 if (bind(*rtcpSocket, addr, sizeSockSt) == 0) {
241 ALOGI("rtcp socket successfully binded. addr=%s:%d", localIp, localPort + 1);
242 } else {
243 ALOGE("failed to bind rtcp socket addr=%s:%d err=%s", localIp,
244 localPort + 1, strerror(errno));
245 }
246
247 // Re uses addr variable as remote addr.
248 if (isIPv6) {
249 memset(&addr6, 0, sizeof(addr6));
250 addr6.sin6_family = AF_INET6;
251 inet_pton(AF_INET6, remoteIp, &addr6.sin6_addr);
252 addr6.sin6_port = htons((uint16_t)remotePort);
253 } else {
254 memset(&addr4, 0, sizeof(addr4));
255 addr4.sin_family = AF_INET;
256 addr4.sin_addr.s_addr = inet_addr(remoteIp);
257 addr4.sin_port = htons((uint16_t)remotePort);
258 }
259 if (connect(*rtpSocket, addr, sizeSockSt) == 0) {
260 ALOGI("rtp socket successfully connected to remote=%s:%d", remoteIp, remotePort);
261 } else {
262 ALOGE("failed to connect rtp socket to remote addr=%s:%d err=%s", remoteIp,
263 remotePort, strerror(errno));
264 return;
265 }
266
267 if (isIPv6)
268 addr6.sin6_port = htons(remotePort + 1);
269 else
270 addr4.sin_port = htons(remotePort + 1);
271
272 if (connect(*rtcpSocket, addr, sizeSockSt) == 0) {
273 ALOGI("rtcp socket successfully connected to remote=%s:%d", remoteIp, remotePort + 1);
274 } else {
275 ALOGE("failed to connect rtcp socket addr=%s:%d err=%s", remoteIp,
276 remotePort + 1, strerror(errno));
277 return;
278 }
279 }
280
onMessageReceived(const sp<AMessage> & msg)281 void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
282 switch (msg->what()) {
283 case kWhatAddStream:
284 {
285 onAddStream(msg);
286 break;
287 }
288
289 case kWhatSeekStream:
290 {
291 onSeekStream(msg);
292 break;
293 }
294
295 case kWhatRemoveStream:
296 {
297 onRemoveStream(msg);
298 break;
299 }
300
301 case kWhatPollStreams:
302 {
303 onPollStreams();
304 break;
305 }
306
307 case kWhatAlarmStream:
308 {
309 onAlarmStream(msg);
310 break;
311 }
312
313 case kWhatInjectPacket:
314 {
315 onInjectPacket(msg);
316 break;
317 }
318
319 default:
320 {
321 TRESPASS();
322 break;
323 }
324 }
325 }
326
onAddStream(const sp<AMessage> & msg)327 void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
328 mStreams.push_back(StreamInfo());
329 StreamInfo *info = &*--mStreams.end();
330
331 int32_t s;
332 CHECK(msg->findInt32("rtp-socket", &s));
333 info->mRTPSocket = s;
334 CHECK(msg->findInt32("rtcp-socket", &s));
335 info->mRTCPSocket = s;
336
337 int32_t injected;
338 CHECK(msg->findInt32("injected", &injected));
339
340 info->mIsInjected = injected;
341
342 sp<RefBase> obj;
343 CHECK(msg->findObject("session-desc", &obj));
344 info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
345
346 CHECK(msg->findSize("index", &info->mIndex));
347 CHECK(msg->findMessage("notify", &info->mNotifyMsg));
348
349 info->mNumRTCPPacketsReceived = 0;
350 info->mNumRTPPacketsReceived = 0;
351 memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
352 memset(&info->mRemoteRTCPAddr6, 0, sizeof(info->mRemoteRTCPAddr6));
353
354 sp<ASessionDescription> sessionDesc = info->mSessionDesc;
355 info->mCVOExtMap = 0;
356 for (size_t i = 1; i < sessionDesc->countTracks(); ++i) {
357 int32_t cvoExtMap;
358 if (sessionDesc->getCvoExtMap(i, &cvoExtMap)) {
359 info->mCVOExtMap = cvoExtMap;
360 ALOGI("urn:3gpp:video-orientation(cvo) found as extmap:%d", info->mCVOExtMap);
361 } else {
362 ALOGI("urn:3gpp:video-orientation(cvo) not found :%d", info->mCVOExtMap);
363 }
364 }
365
366 if (!injected) {
367 postPollEvent();
368 }
369 }
370
onSeekStream(const sp<AMessage> & msg)371 void ARTPConnection::onSeekStream(const sp<AMessage> &msg) {
372 (void)msg; // unused param as of now.
373 List<StreamInfo>::iterator it = mStreams.begin();
374 while (it != mStreams.end()) {
375 for (size_t i = 0; i < it->mSources.size(); ++i) {
376 sp<ARTPSource> source = it->mSources.valueAt(i);
377 source->timeReset();
378 }
379 ++it;
380 }
381 }
382
onRemoveStream(const sp<AMessage> & msg)383 void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
384 int32_t rtpSocket, rtcpSocket;
385 CHECK(msg->findInt32("rtp-socket", &rtpSocket));
386 CHECK(msg->findInt32("rtcp-socket", &rtcpSocket));
387
388 List<StreamInfo>::iterator it = mStreams.begin();
389 while (it != mStreams.end()
390 && (it->mRTPSocket != rtpSocket || it->mRTCPSocket != rtcpSocket)) {
391 ++it;
392 }
393
394 if (it == mStreams.end()) {
395 return;
396 }
397
398 mStreams.erase(it);
399 }
400
postPollEvent()401 void ARTPConnection::postPollEvent() {
402 if (mPollEventPending) {
403 return;
404 }
405
406 sp<AMessage> msg = new AMessage(kWhatPollStreams, this);
407 msg->post();
408
409 mPollEventPending = true;
410 }
411
onPollStreams()412 void ARTPConnection::onPollStreams() {
413 mPollEventPending = false;
414
415 if (mStreams.empty()) {
416 return;
417 }
418
419 struct timeval tv;
420 tv.tv_sec = 0;
421 tv.tv_usec = kSelectTimeoutUs;
422
423 fd_set rs;
424 FD_ZERO(&rs);
425
426 int maxSocket = -1;
427 for (List<StreamInfo>::iterator it = mStreams.begin();
428 it != mStreams.end(); ++it) {
429 if ((*it).mIsInjected) {
430 continue;
431 }
432
433 FD_SET(it->mRTPSocket, &rs);
434 FD_SET(it->mRTCPSocket, &rs);
435
436 if (it->mRTPSocket > maxSocket) {
437 maxSocket = it->mRTPSocket;
438 }
439 if (it->mRTCPSocket > maxSocket) {
440 maxSocket = it->mRTCPSocket;
441 }
442 }
443
444 if (maxSocket == -1) {
445 return;
446 }
447
448 int64_t nowUs = ALooper::GetNowUs();
449 int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
450
451 if (res > 0) {
452 List<StreamInfo>::iterator it = mStreams.begin();
453 while (it != mStreams.end()) {
454 if ((*it).mIsInjected) {
455 ++it;
456 continue;
457 }
458 it->mLastPollTimeUs = nowUs;
459
460 status_t err = OK;
461 if (FD_ISSET(it->mRTPSocket, &rs)) {
462 err = receive(&*it, true);
463 }
464 if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
465 err = receive(&*it, false);
466 }
467
468 if (err == -ECONNRESET) {
469 // socket failure, this stream is dead, Jim.
470 for (size_t i = 0; i < it->mSources.size(); ++i) {
471 sp<AMessage> notify = it->mNotifyMsg->dup();
472 notify->setInt32("rtcp-event", 1);
473 notify->setInt32("payload-type", 400);
474 notify->setInt32("feedback-type", 1);
475 notify->setInt32("sender", it->mSources.valueAt(i)->getSelfID());
476 notify->post();
477
478 ALOGW("failed to receive RTP/RTCP datagram.");
479 }
480 it = mStreams.erase(it);
481 continue;
482 }
483
484 // add NACK and FIR that needs to be sent immediately.
485 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
486 for (size_t i = 0; i < it->mSources.size(); ++i) {
487 buffer->setRange(0, 0);
488 int cnt = it->mSources.valueAt(i)->addNACK(buffer);
489 if (cnt > 0) {
490 ALOGV("Send NACK for lost %d Packets", cnt);
491 send(&*it, buffer);
492 }
493
494 buffer->setRange(0, 0);
495 it->mSources.valueAt(i)->addFIR(buffer);
496 if (buffer->size() > 0) {
497 ALOGD("Send FIR immediately for lost Packets");
498 send(&*it, buffer);
499 }
500
501 buffer->setRange(0, 0);
502 it->mSources.valueAt(i)->addTMMBR(buffer, mTargetBitrate);
503 mTargetBitrate = -1;
504 if (buffer->size() > 0) {
505 ALOGV("Sending TMMBR...");
506 ssize_t n = send(&*it, buffer);
507
508 if (n != (ssize_t)buffer->size()) {
509 ALOGW("failed to send RTCP TMMBR (%s).",
510 n >= 0 ? "connection gone" : strerror(errno));
511 continue;
512 }
513 }
514 }
515
516 ++it;
517 }
518 }
519
520 checkRxBitrate(nowUs);
521
522 if (mLastReceiverReportTimeUs <= 0
523 || mLastReceiverReportTimeUs + 5000000LL <= nowUs) {
524 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
525 List<StreamInfo>::iterator it = mStreams.begin();
526 while (it != mStreams.end()) {
527 StreamInfo *s = &*it;
528
529 if (s->mIsInjected) {
530 ++it;
531 continue;
532 }
533
534 if (s->mNumRTCPPacketsReceived == 0) {
535 // We have never received any RTCP packets on this stream,
536 // we don't even know where to send a report.
537 ++it;
538 continue;
539 }
540
541 buffer->setRange(0, 0);
542
543 for (size_t i = 0; i < s->mSources.size(); ++i) {
544 sp<ARTPSource> source = s->mSources.valueAt(i);
545
546 source->addReceiverReport(buffer);
547
548 if (mFlags & kRegularlyRequestFIR) {
549 source->addFIR(buffer);
550 }
551 }
552
553 if (buffer->size() > 0) {
554 ALOGV("Sending RR...");
555
556 ssize_t n = send(s, buffer);
557
558 if (n != (ssize_t)buffer->size()) {
559 ALOGW("failed to send RTCP receiver report (%s).",
560 n >= 0 ? "connection gone" : strerror(errno));
561 ++it;
562 continue;
563 }
564
565 mLastReceiverReportTimeUs = nowUs;
566 }
567
568 ++it;
569 }
570 }
571
572 if (!mStreams.empty()) {
573 postPollEvent();
574 }
575 }
576
onAlarmStream(const sp<AMessage> msg)577 void ARTPConnection::onAlarmStream(const sp<AMessage> msg) {
578 sp<ARTPSource> source = nullptr;
579 if (msg->findObject("source", (sp<android::RefBase>*)&source)) {
580 source->processRTPPacket();
581 }
582 }
583
receive(StreamInfo * s,bool receiveRTP)584 status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
585 ALOGV("receiving %s", receiveRTP ? "RTP" : "RTCP");
586
587 CHECK(!s->mIsInjected);
588
589 sp<ABuffer> buffer = new ABuffer(65536);
590
591 struct sockaddr *pRemoteRTCPAddr;
592 int sizeSockSt;
593 if (s->isIPv6) {
594 pRemoteRTCPAddr = (struct sockaddr *)&s->mRemoteRTCPAddr6;
595 sizeSockSt = sizeof(struct sockaddr_in6);
596 } else {
597 pRemoteRTCPAddr = (struct sockaddr *)&s->mRemoteRTCPAddr;
598 sizeSockSt = sizeof(struct sockaddr_in);
599 }
600 socklen_t remoteAddrLen =
601 (!receiveRTP && s->mNumRTCPPacketsReceived == 0)
602 ? sizeSockSt : 0;
603
604 if (mFlags & kViLTEConnection) {
605 remoteAddrLen = 0;
606 }
607
608 ssize_t nbytes;
609 do {
610 nbytes = recvfrom(
611 receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
612 buffer->data(),
613 buffer->capacity(),
614 0,
615 remoteAddrLen > 0 ? pRemoteRTCPAddr : NULL,
616 remoteAddrLen > 0 ? &remoteAddrLen : NULL);
617 mCumulativeBytes += nbytes;
618 } while (nbytes < 0 && errno == EINTR);
619
620 if (nbytes <= 0) {
621 ALOGW("failed to recv rtp packet. cause=%s", strerror(errno));
622 // ECONNREFUSED may happen in next recvfrom() calling if one of
623 // outgoing packet can not be delivered to remote by using sendto()
624 if (errno == ECONNREFUSED) {
625 return -ECONNREFUSED;
626 } else {
627 return -ECONNRESET;
628 }
629 }
630
631 buffer->setRange(0, nbytes);
632
633 // ALOGI("received %d bytes.", buffer->size());
634
635 status_t err;
636 if (receiveRTP) {
637 err = parseRTP(s, buffer);
638 } else {
639 err = parseRTCP(s, buffer);
640 }
641
642 return err;
643 }
644
send(const StreamInfo * info,const sp<ABuffer> buffer)645 ssize_t ARTPConnection::send(const StreamInfo *info, const sp<ABuffer> buffer) {
646 struct sockaddr* pRemoteRTCPAddr;
647 int sizeSockSt;
648
649 /* It seems this isIPv6 variable is useless.
650 * We should remove it to prevent confusion */
651 if (info->isIPv6) {
652 pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr6;
653 sizeSockSt = sizeof(struct sockaddr_in6);
654 } else {
655 pRemoteRTCPAddr = (struct sockaddr *)&info->mRemoteRTCPAddr;
656 sizeSockSt = sizeof(struct sockaddr_in);
657 }
658
659 if (mFlags & kViLTEConnection) {
660 ALOGV("ViLTE RTCP");
661 pRemoteRTCPAddr = NULL;
662 sizeSockSt = 0;
663 }
664
665 ssize_t n;
666 do {
667 n = sendto(
668 info->mRTCPSocket, buffer->data(), buffer->size(), 0,
669 pRemoteRTCPAddr, sizeSockSt);
670 } while (n < 0 && errno == EINTR);
671
672 if (n < 0) {
673 ALOGW("failed to send rtcp packet. cause=%s", strerror(errno));
674 }
675
676 return n;
677 }
678
parseRTP(StreamInfo * s,const sp<ABuffer> & buffer)679 status_t ARTPConnection::parseRTP(StreamInfo *s, const sp<ABuffer> &buffer) {
680 size_t size = buffer->size();
681
682 if (size < 12) {
683 // Too short to be a valid RTP header.
684 return -1;
685 }
686
687 const uint8_t *data = buffer->data();
688
689 if ((data[0] >> 6) != 2) {
690 // Unsupported version.
691 return -1;
692 }
693
694 if ((data[1] & 0x7f) == 20 /* decimal */) {
695 // Unassigned payload type
696 return -1;
697 }
698
699 if (data[0] & 0x20) {
700 // Padding present.
701
702 size_t paddingLength = data[size - 1];
703
704 if (paddingLength + 12 > size) {
705 // If we removed this much padding we'd end up with something
706 // that's too short to be a valid RTP header.
707 return -1;
708 }
709
710 size -= paddingLength;
711 }
712
713 int numCSRCs = data[0] & 0x0f;
714
715 size_t payloadOffset = 12 + 4 * numCSRCs;
716
717 if (size < payloadOffset) {
718 // Not enough data to fit the basic header and all the CSRC entries.
719 return -1;
720 }
721
722 int32_t cvoDegrees = -1;
723 if (data[0] & 0x10) {
724 // Header eXtension present.
725
726 if (size < payloadOffset + 4) {
727 // Not enough data to fit the basic header, all CSRC entries
728 // and the first 4 bytes of the extension header.
729
730 return -1;
731 }
732
733 const uint8_t *extensionData = &data[payloadOffset];
734
735 size_t extensionLength =
736 (4 * (extensionData[2] << 8 | extensionData[3])) + 4;
737
738 if (size < payloadOffset + extensionLength) {
739 return -1;
740 }
741
742 parseRTPExt(s, (const uint8_t *)extensionData, extensionLength, &cvoDegrees);
743 payloadOffset += extensionLength;
744 }
745
746 uint32_t srcId = u32at(&data[8]);
747
748 sp<ARTPSource> source = findSource(s, srcId);
749
750 uint32_t rtpTime = u32at(&data[4]);
751
752 sp<AMessage> meta = buffer->meta();
753 meta->setInt32("ssrc", srcId);
754 meta->setInt32("rtp-time", rtpTime);
755 meta->setInt32("PT", data[1] & 0x7f);
756 meta->setInt32("M", data[1] >> 7);
757 if (cvoDegrees >= 0) {
758 meta->setInt32("cvo", cvoDegrees);
759 }
760
761 int32_t seq = u16at(&data[2]);
762 buffer->setInt32Data(seq);
763 buffer->setRange(payloadOffset, size - payloadOffset);
764
765 if (s->mNumRTPPacketsReceived++ == 0) {
766 sp<AMessage> notify = s->mNotifyMsg->dup();
767 notify->setInt32("first-rtp", true);
768 notify->setInt32("rtcp-event", 1);
769 notify->setInt32("payload-type", ARTPSource::RTP_FIRST_PACKET);
770 notify->setInt32("rtp-time", (int32_t)rtpTime);
771 notify->setInt32("rtp-seq-num", seq);
772 notify->setInt64("recv-time-us", ALooper::GetNowUs());
773 notify->post();
774
775 ALOGD("send first-rtp event to upper layer");
776 }
777
778 source->processRTPPacket(buffer);
779
780 return OK;
781 }
782
parseRTPExt(StreamInfo * s,const uint8_t * extHeader,size_t extLen,int32_t * cvoDegrees)783 status_t ARTPConnection::parseRTPExt(StreamInfo *s,
784 const uint8_t *extHeader, size_t extLen, int32_t *cvoDegrees) {
785 if (extLen < 4)
786 return -1;
787
788 uint16_t header = (extHeader[0] << 8) | (extHeader[1]);
789 bool isOnebyteHeader = false;
790
791 if (header == 0xBEDE) {
792 isOnebyteHeader = true;
793 } else if (header == 0x1000) {
794 ALOGW("parseRTPExt: two-byte header is not implemented yet");
795 return -1;
796 } else {
797 ALOGW("parseRTPExt: can not recognize header");
798 return -1;
799 }
800
801 const uint8_t *extPayload = extHeader + 4;
802 extLen -= 4;
803 size_t offset = 0; //start from first payload of rtp extension.
804 // one-byte header parser
805 while (isOnebyteHeader && offset < extLen) {
806 uint8_t extmapId = extPayload[offset] >> 4;
807 uint8_t length = (extPayload[offset] & 0xF) + 1;
808 offset++;
809
810 // padding case
811 if (extmapId == 0)
812 continue;
813
814 uint8_t data[16]; // maximum length value
815 for (uint8_t j = 0; offset + j <= extLen && j < length; j++) {
816 data[j] = extPayload[offset + j];
817 }
818
819 offset += length;
820
821 if (extmapId == s->mCVOExtMap) {
822 *cvoDegrees = (int32_t)data[0];
823 return OK;
824 }
825 }
826
827 return BAD_VALUE;
828 }
829
parseRTCP(StreamInfo * s,const sp<ABuffer> & buffer)830 status_t ARTPConnection::parseRTCP(StreamInfo *s, const sp<ABuffer> &buffer) {
831 if (s->mNumRTCPPacketsReceived++ == 0) {
832 sp<AMessage> notify = s->mNotifyMsg->dup();
833 notify->setInt32("first-rtcp", true);
834 notify->setInt32("rtcp-event", 1);
835 notify->setInt32("payload-type", ARTPSource::RTCP_FIRST_PACKET);
836 notify->setInt64("recv-time-us", ALooper::GetNowUs());
837 notify->post();
838
839 ALOGD("send first-rtcp event to upper layer");
840 }
841
842 const uint8_t *data = buffer->data();
843 size_t size = buffer->size();
844
845 while (size > 0) {
846 if (size < 8) {
847 // Too short to be a valid RTCP header
848 return -1;
849 }
850
851 if ((data[0] >> 6) != 2) {
852 // Unsupported version.
853 return -1;
854 }
855
856 if (data[0] & 0x20) {
857 // Padding present.
858
859 size_t paddingLength = data[size - 1];
860
861 if (paddingLength + 12 > size) {
862 // If we removed this much padding we'd end up with something
863 // that's too short to be a valid RTP header.
864 return -1;
865 }
866
867 size -= paddingLength;
868 }
869
870 size_t headerLength = 4 * (data[2] << 8 | data[3]) + 4;
871
872 if (size < headerLength) {
873 // Only received a partial packet?
874 return -1;
875 }
876
877 switch (data[1]) {
878 case 200:
879 {
880 parseSR(s, data, headerLength);
881 break;
882 }
883
884 case 201: // RR
885 case 202: // SDES
886 case 204: // APP
887 break;
888
889 case 205: // TSFB (transport layer specific feedback)
890 parseTSFB(s, data, headerLength);
891 break;
892 case 206: // PSFB (payload specific feedback)
893 // hexdump(data, headerLength);
894 parsePSFB(s, data, headerLength);
895 ALOGI("RTCP packet type %u of size %zu", (unsigned)data[1], headerLength);
896 break;
897
898 case 203:
899 {
900 parseBYE(s, data, headerLength);
901 break;
902 }
903
904 default:
905 {
906 ALOGW("Unknown RTCP packet type %u of size %zu",
907 (unsigned)data[1], headerLength);
908 break;
909 }
910 }
911
912 data += headerLength;
913 size -= headerLength;
914 }
915
916 return OK;
917 }
918
parseBYE(StreamInfo * s,const uint8_t * data,size_t size)919 status_t ARTPConnection::parseBYE(
920 StreamInfo *s, const uint8_t *data, size_t size) {
921 size_t SC = data[0] & 0x3f;
922
923 if (SC == 0 || size < (4 + SC * 4)) {
924 // Packet too short for the minimal BYE header.
925 return -1;
926 }
927
928 uint32_t id = u32at(&data[4]);
929
930 sp<ARTPSource> source = findSource(s, id);
931
932 // Report a final stastics to be used for rtp data usage.
933 int64_t nowUs = ALooper::GetNowUs();
934 int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
935 int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
936 source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
937
938 source->byeReceived();
939
940 return OK;
941 }
942
parseSR(StreamInfo * s,const uint8_t * data,size_t size)943 status_t ARTPConnection::parseSR(
944 StreamInfo *s, const uint8_t *data, size_t size) {
945 size_t RC = data[0] & 0x1f;
946
947 if (size < (7 + RC * 6) * 4) {
948 // Packet too short for the minimal SR header.
949 return -1;
950 }
951
952 uint32_t id = u32at(&data[4]);
953 uint64_t ntpTime = u64at(&data[8]);
954 uint32_t rtpTime = u32at(&data[16]);
955
956 #if 0
957 ALOGI("XXX timeUpdate: ssrc=0x%08x, rtpTime %u == ntpTime %.3f",
958 id,
959 rtpTime,
960 (ntpTime >> 32) + (double)(ntpTime & 0xffffffff) / (1ll << 32));
961 #endif
962
963 sp<ARTPSource> source = findSource(s, id);
964
965 source->timeUpdate(rtpTime, ntpTime);
966
967 return 0;
968 }
969
parseTSFB(StreamInfo * s,const uint8_t * data,size_t size)970 status_t ARTPConnection::parseTSFB(
971 StreamInfo *s, const uint8_t *data, size_t size) {
972 if (size < 12) {
973 // broken packet
974 return -1;
975 }
976
977 uint8_t msgType = data[0] & 0x1f;
978 uint32_t id = u32at(&data[4]);
979
980 const uint8_t *ptr = &data[12];
981 size -= 12;
982
983 using namespace std;
984 size_t FCISize;
985 switch(msgType) {
986 case 1: // Generic NACK
987 {
988 FCISize = 4;
989 while (size >= FCISize) {
990 uint16_t PID = u16at(&ptr[0]); // lost packet RTP number
991 uint16_t BLP = u16at(&ptr[2]); // Bitmask of following Lost Packets
992
993 size -= FCISize;
994 ptr += FCISize;
995
996 AString list_of_losts;
997 list_of_losts.append(PID);
998 for (int i=0 ; i<16 ; i++) {
999 bool is_lost = BLP & (0x1 << i);
1000 if (is_lost) {
1001 list_of_losts.append(", ");
1002 list_of_losts.append(PID + i);
1003 }
1004 }
1005 ALOGI("Opponent losts packet of RTP %s", list_of_losts.c_str());
1006 }
1007 break;
1008 }
1009 case 3: // TMMBR
1010 case 4: // TMMBN
1011 {
1012 FCISize = 8;
1013 while (size >= FCISize) {
1014 uint32_t MxTBR = u32at(&ptr[4]);
1015 uint32_t MxTBRExp = MxTBR >> 26;
1016 uint32_t MxTBRMantissa = (MxTBR >> 9) & 0x01FFFF;
1017 uint32_t overhead = MxTBR & 0x01FF;
1018
1019 size -= FCISize;
1020 ptr += FCISize;
1021
1022 uint32_t bitRate = (1 << MxTBRExp) * MxTBRMantissa;
1023
1024 if (msgType == 3)
1025 ALOGI("Op -> UE Req Tx bitrate : %d X 2^%d = %d",
1026 MxTBRMantissa, MxTBRExp, bitRate);
1027 else if (msgType == 4)
1028 ALOGI("OP -> UE Noti Rx bitrate : %d X 2^%d = %d",
1029 MxTBRMantissa, MxTBRExp, bitRate);
1030
1031 sp<AMessage> notify = s->mNotifyMsg->dup();
1032 notify->setInt32("rtcp-event", 1);
1033 notify->setInt32("payload-type", 205);
1034 notify->setInt32("feedback-type", msgType);
1035 notify->setInt32("sender", id);
1036 notify->setInt32("bit-rate", bitRate);
1037 notify->post();
1038 ALOGI("overhead : %d", overhead);
1039 }
1040 break;
1041 }
1042 default:
1043 {
1044 ALOGI("Not supported TSFB type %d", msgType);
1045 break;
1046 }
1047 }
1048
1049 return 0;
1050 }
1051
parsePSFB(StreamInfo * s,const uint8_t * data,size_t size)1052 status_t ARTPConnection::parsePSFB(
1053 StreamInfo *s, const uint8_t *data, size_t size) {
1054 if (size < 12) {
1055 // broken packet
1056 return -1;
1057 }
1058
1059 uint8_t msgType = data[0] & 0x1f;
1060 uint32_t id = u32at(&data[4]);
1061
1062 const uint8_t *ptr = &data[12];
1063 size -= 12;
1064
1065 using namespace std;
1066 switch(msgType) {
1067 case 1: // Picture Loss Indication (PLI)
1068 {
1069 if (size > 0) {
1070 // PLI does not need parameters
1071 break;
1072 };
1073 sp<AMessage> notify = s->mNotifyMsg->dup();
1074 notify->setInt32("rtcp-event", 1);
1075 notify->setInt32("payload-type", 206);
1076 notify->setInt32("feedback-type", msgType);
1077 notify->setInt32("sender", id);
1078 notify->post();
1079 ALOGI("PLI detected.");
1080 break;
1081 }
1082 case 4: // Full Intra Request (FIR)
1083 {
1084 if (size < 4) {
1085 break;
1086 }
1087 uint32_t requestedId = u32at(&ptr[0]);
1088 if (requestedId == (uint32_t)mSelfID) {
1089 sp<AMessage> notify = s->mNotifyMsg->dup();
1090 notify->setInt32("rtcp-event", 1);
1091 notify->setInt32("payload-type", 206);
1092 notify->setInt32("feedback-type", msgType);
1093 notify->setInt32("sender", id);
1094 notify->post();
1095 ALOGI("FIR detected.");
1096 }
1097 break;
1098 }
1099 default:
1100 {
1101 ALOGI("Not supported PSFB type %d", msgType);
1102 break;
1103 }
1104 }
1105
1106 return 0;
1107 }
findSource(StreamInfo * info,uint32_t srcId)1108 sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
1109 sp<ARTPSource> source;
1110 ssize_t index = info->mSources.indexOfKey(srcId);
1111 if (index < 0) {
1112 index = info->mSources.size();
1113
1114 source = new ARTPSource(
1115 srcId, info->mSessionDesc, info->mIndex, info->mNotifyMsg);
1116
1117 if (mFlags & kViLTEConnection) {
1118 setStaticJitterTimeMs(50);
1119 source->setPeriodicFIR(false);
1120 }
1121
1122 source->setSelfID(mSelfID);
1123 source->setStaticJitterTimeMs(mStaticJitterTimeMs);
1124 sp<AMessage> timer = new AMessage(kWhatAlarmStream, this);
1125 source->setJbTimer(timer);
1126 info->mSources.add(srcId, source);
1127 } else {
1128 source = info->mSources.valueAt(index);
1129 }
1130
1131 return source;
1132 }
1133
injectPacket(int index,const sp<ABuffer> & buffer)1134 void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
1135 sp<AMessage> msg = new AMessage(kWhatInjectPacket, this);
1136 msg->setInt32("index", index);
1137 msg->setBuffer("buffer", buffer);
1138 msg->post();
1139 }
1140
setSelfID(const uint32_t selfID)1141 void ARTPConnection::setSelfID(const uint32_t selfID) {
1142 mSelfID = selfID;
1143 }
1144
setStaticJitterTimeMs(const uint32_t jbTimeMs)1145 void ARTPConnection::setStaticJitterTimeMs(const uint32_t jbTimeMs) {
1146 mStaticJitterTimeMs = jbTimeMs;
1147 }
1148
setTargetBitrate(int32_t targetBitrate)1149 void ARTPConnection::setTargetBitrate(int32_t targetBitrate) {
1150 mTargetBitrate = targetBitrate;
1151 }
1152
checkRxBitrate(int64_t nowUs)1153 void ARTPConnection::checkRxBitrate(int64_t nowUs) {
1154 if (mLastBitrateReportTimeUs <= 0) {
1155 mCumulativeBytes = 0;
1156 mLastBitrateReportTimeUs = nowUs;
1157 }
1158 else if (mLastEarlyNotifyTimeUs + 100000ll <= nowUs) {
1159 int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1160 int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1161 mLastEarlyNotifyTimeUs = nowUs;
1162
1163 List<StreamInfo>::iterator it = mStreams.begin();
1164 while (it != mStreams.end()) {
1165 StreamInfo *s = &*it;
1166 if (s->mIsInjected) {
1167 ++it;
1168 continue;
1169 }
1170 for (size_t i = 0; i < s->mSources.size(); ++i) {
1171 sp<ARTPSource> source = s->mSources.valueAt(i);
1172 if (source->isNeedToEarlyNotify()) {
1173 source->notifyPktInfo(bitrate, nowUs, false /* isRegular */);
1174 mLastEarlyNotifyTimeUs = nowUs + (1000000ll * 3600 * 24); // after 1 day
1175 }
1176 }
1177 ++it;
1178 }
1179 }
1180 else if (mLastBitrateReportTimeUs + 1000000ll <= nowUs) {
1181 int32_t timeDiff = (nowUs - mLastBitrateReportTimeUs) / 1000000ll;
1182 int32_t bitrate = mCumulativeBytes * 8 / timeDiff;
1183 ALOGI("Actual Rx bitrate : %d bits/sec", bitrate);
1184
1185 sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
1186 List<StreamInfo>::iterator it = mStreams.begin();
1187 while (it != mStreams.end()) {
1188 StreamInfo *s = &*it;
1189 if (s->mIsInjected) {
1190 ++it;
1191 continue;
1192 }
1193
1194 if (s->mNumRTCPPacketsReceived == 0) {
1195 // We have never received any RTCP packets on this stream,
1196 // we don't even know where to send a report.
1197 ++it;
1198 continue;
1199 }
1200
1201 buffer->setRange(0, 0);
1202 for (size_t i = 0; i < s->mSources.size(); ++i) {
1203 sp<ARTPSource> source = s->mSources.valueAt(i);
1204 source->notifyPktInfo(bitrate, nowUs, true /* isRegular */);
1205 }
1206 ++it;
1207 }
1208 mCumulativeBytes = 0;
1209 mLastBitrateReportTimeUs = nowUs;
1210 mLastEarlyNotifyTimeUs = nowUs;
1211 }
1212 }
onInjectPacket(const sp<AMessage> & msg)1213 void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
1214 int32_t index;
1215 CHECK(msg->findInt32("index", &index));
1216
1217 sp<ABuffer> buffer;
1218 CHECK(msg->findBuffer("buffer", &buffer));
1219
1220 List<StreamInfo>::iterator it = mStreams.begin();
1221 while (it != mStreams.end()
1222 && it->mRTPSocket != index && it->mRTCPSocket != index) {
1223 ++it;
1224 }
1225
1226 if (it == mStreams.end()) {
1227 TRESPASS();
1228 }
1229
1230 StreamInfo *s = &*it;
1231
1232 if (it->mRTPSocket == index) {
1233 parseRTP(s, buffer);
1234 } else {
1235 parseRTCP(s, buffer);
1236 }
1237 }
1238
1239 } // namespace android
1240