1 /*
2  * Copyright (C) 2010 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <stdio.h>
18 #include <stdint.h>
19 #include <string.h>
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <sys/epoll.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <sys/stat.h>
26 #include <sys/time.h>
27 #include <time.h>
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 
31 // #define LOG_NDEBUG 0
32 #define LOG_TAG "AudioGroup"
33 #include <cutils/atomic.h>
34 #include <cutils/properties.h>
35 #include <utils/Log.h>
36 #include <utils/Errors.h>
37 #include <utils/RefBase.h>
38 #include <utils/threads.h>
39 #include <utils/SystemClock.h>
40 #include <media/AudioRecord.h>
41 #include <media/AudioTrack.h>
42 #include <media/AudioEffect.h>
43 #include <system/audio_effects/effect_aec.h>
44 #include <system/audio.h>
45 
46 #include <nativehelper/ScopedUtfChars.h>
47 #include <android/content/AttributionSourceState.h>
48 #include <android_os_Parcel.h>
49 
50 #include "jni.h"
51 #include <nativehelper/JNIHelp.h>
52 
53 #include "AudioCodec.h"
54 #include "EchoSuppressor.h"
55 
56 extern int parse(JNIEnv *env, jstring jAddress, int port, sockaddr_storage *ss);
57 
58 namespace {
59 
60 using namespace android;
61 
62 using android::content::AttributionSourceState;
63 
64 int gRandom = -1;
65 
66 // We use a circular array to implement jitter buffer. The simplest way is doing
67 // a modulo operation on the index while accessing the array. However modulo can
68 // be expensive on some platforms, such as ARM. Thus we round up the size of the
69 // array to the nearest power of 2 and then use bitwise-and instead of modulo.
70 // Currently we make it 2048ms long and assume packet interval is 50ms or less.
71 // The first 100ms is the place where samples get mixed. The rest is the real
72 // jitter buffer. For a stream at 8000Hz it takes 32 kilobytes. These numbers
73 // are chosen by experiments and each of them can be adjusted as needed.
74 
75 // Originally a stream does not send packets when it is receive-only or there is
76 // nothing to mix. However, this causes some problems with certain firewalls and
77 // proxies. A firewall might remove a port mapping when there is no outgoing
78 // packet for a preiod of time, and a proxy might wait for incoming packets from
79 // both sides before start forwarding. To solve these problems, we send out a
80 // silence packet on the stream for every second. It should be good enough to
81 // keep the stream alive with relatively low resources.
82 
83 // Other notes:
84 // + We use elapsedRealtime() to get the time. Since we use 32bit variables
85 //   instead of 64bit ones, comparison must be done by subtraction.
86 // + Sampling rate must be multiple of 1000Hz, and packet length must be in
87 //   milliseconds. No floating points.
88 // + If we cannot get enough CPU, we drop samples and simulate packet loss.
89 // + Resampling is not done yet, so streams in one group must use the same rate.
90 //   For the first release only 8000Hz is supported.
91 
92 #define BUFFER_SIZE     2048
93 #define HISTORY_SIZE    100
94 #define MEASURE_BASE    100
95 #define MEASURE_PERIOD  5000
96 #define DTMF_PERIOD     200
97 
98 class AudioStream
99 {
100 public:
101     AudioStream();
102     ~AudioStream();
103     bool set(int mode, int socket, sockaddr_storage *remote,
104         AudioCodec *codec, int sampleRate, int sampleCount,
105         int codecType, int dtmfType);
106 
107     void sendDtmf(int event);
108     bool mix(int32_t *output, int head, int tail, int sampleRate);
109     void encode(int tick, AudioStream *chain);
110     void decode(int tick);
111 
112 private:
113     enum {
114         NORMAL = 0,
115         SEND_ONLY = 1,
116         RECEIVE_ONLY = 2,
117         LAST_MODE = 2,
118     };
119 
120     int mMode;
121     int mSocket;
122     sockaddr_storage mRemote;
123     AudioCodec *mCodec;
124     uint32_t mCodecMagic;
125     uint32_t mDtmfMagic;
126     bool mFixRemote;
127 
128     int mTick;
129     int mSampleRate;
130     int mSampleCount;
131     int mInterval;
132     int mKeepAlive;
133 
134     int16_t *mBuffer;
135     int mBufferMask;
136     int mBufferHead;
137     int mBufferTail;
138     int mLatencyTimer;
139     int mLatencyScore;
140 
141     uint16_t mSequence;
142     uint32_t mTimestamp;
143     uint32_t mSsrc;
144 
145     int mDtmfEvent;
146     int mDtmfStart;
147 
148     AudioStream *mNext;
149 
150     friend class AudioGroup;
151 };
152 
AudioStream()153 AudioStream::AudioStream()
154 {
155     mSocket = -1;
156     mCodec = NULL;
157     mBuffer = NULL;
158     mNext = NULL;
159 }
160 
~AudioStream()161 AudioStream::~AudioStream()
162 {
163     close(mSocket);
164     delete mCodec;
165     delete [] mBuffer;
166     ALOGD("stream[%d] is dead", mSocket);
167 }
168 
set(int mode,int socket,sockaddr_storage * remote,AudioCodec * codec,int sampleRate,int sampleCount,int codecType,int dtmfType)169 bool AudioStream::set(int mode, int socket, sockaddr_storage *remote,
170     AudioCodec *codec, int sampleRate, int sampleCount,
171     int codecType, int dtmfType)
172 {
173     if (mode < 0 || mode > LAST_MODE) {
174         return false;
175     }
176     mMode = mode;
177 
178     mCodecMagic = (0x8000 | codecType) << 16;
179     mDtmfMagic = (dtmfType == -1) ? 0 : (0x8000 | dtmfType) << 16;
180 
181     mTick = elapsedRealtime();
182     mSampleRate = sampleRate / 1000;
183     mSampleCount = sampleCount;
184     mInterval = mSampleCount / mSampleRate;
185 
186     // Allocate jitter buffer.
187     for (mBufferMask = 8; mBufferMask < mSampleRate; mBufferMask <<= 1);
188     mBufferMask *= BUFFER_SIZE;
189     mBuffer = new int16_t[mBufferMask];
190     --mBufferMask;
191     mBufferHead = 0;
192     mBufferTail = 0;
193     mLatencyTimer = 0;
194     mLatencyScore = 0;
195 
196     // Initialize random bits.
197     read(gRandom, &mSequence, sizeof(mSequence));
198     read(gRandom, &mTimestamp, sizeof(mTimestamp));
199     read(gRandom, &mSsrc, sizeof(mSsrc));
200 
201     mDtmfEvent = -1;
202     mDtmfStart = 0;
203 
204     // Only take over these things when succeeded.
205     mSocket = socket;
206     if (codec) {
207         mRemote = *remote;
208         mCodec = codec;
209 
210         // Here we should never get an private address, but some buggy proxy
211         // servers do give us one. To solve this, we replace the address when
212         // the first time we successfully decode an incoming packet.
213         mFixRemote = false;
214         if (remote->ss_family == AF_INET) {
215             unsigned char *address =
216                 (unsigned char *)&((sockaddr_in *)remote)->sin_addr;
217             if (address[0] == 10 ||
218                 (address[0] == 172 && (address[1] >> 4) == 1) ||
219                 (address[0] == 192 && address[1] == 168)) {
220                 mFixRemote = true;
221             }
222         }
223     }
224 
225     ALOGD("stream[%d] is configured as %s %dkHz %dms mode %d", mSocket,
226         (codec ? codec->name : "RAW"), mSampleRate, mInterval, mMode);
227     return true;
228 }
229 
sendDtmf(int event)230 void AudioStream::sendDtmf(int event)
231 {
232     if (mDtmfMagic != 0) {
233         mDtmfEvent = event << 24;
234         mDtmfStart = mTimestamp + mSampleCount;
235     }
236 }
237 
mix(int32_t * output,int head,int tail,int sampleRate)238 bool AudioStream::mix(int32_t *output, int head, int tail, int sampleRate)
239 {
240     if (mMode == SEND_ONLY) {
241         return false;
242     }
243 
244     if (head - mBufferHead < 0) {
245         head = mBufferHead;
246     }
247     if (tail - mBufferTail > 0) {
248         tail = mBufferTail;
249     }
250     if (tail - head <= 0) {
251         return false;
252     }
253 
254     head *= mSampleRate;
255     tail *= mSampleRate;
256 
257     if (sampleRate == mSampleRate) {
258         for (int i = head; i - tail < 0; ++i) {
259             output[i - head] += mBuffer[i & mBufferMask];
260         }
261     } else {
262         // TODO: implement resampling.
263         return false;
264     }
265     return true;
266 }
267 
encode(int tick,AudioStream * chain)268 void AudioStream::encode(int tick, AudioStream *chain)
269 {
270     if (tick - mTick >= mInterval) {
271         // We just missed the train. Pretend that packets in between are lost.
272         int skipped = (tick - mTick) / mInterval;
273         mTick += skipped * mInterval;
274         mSequence += skipped;
275         mTimestamp += skipped * mSampleCount;
276         ALOGV("stream[%d] skips %d packets", mSocket, skipped);
277     }
278 
279     tick = mTick;
280     mTick += mInterval;
281     ++mSequence;
282     mTimestamp += mSampleCount;
283 
284     // If there is an ongoing DTMF event, send it now.
285     if (mMode != RECEIVE_ONLY && mDtmfEvent != -1) {
286         int duration = mTimestamp - mDtmfStart;
287         // Make sure duration is reasonable.
288         if (duration >= 0 && duration < mSampleRate * DTMF_PERIOD) {
289             duration += mSampleCount;
290             int32_t buffer[4] = {
291                 static_cast<int32_t>(htonl(mDtmfMagic | mSequence)),
292                 static_cast<int32_t>(htonl(mDtmfStart)),
293                 static_cast<int32_t>(mSsrc),
294                 static_cast<int32_t>(htonl(mDtmfEvent | duration)),
295             };
296             if (duration >= mSampleRate * DTMF_PERIOD) {
297                 buffer[3] |= htonl(1 << 23);
298                 mDtmfEvent = -1;
299             }
300             sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
301                 (sockaddr *)&mRemote, sizeof(mRemote));
302             return;
303         }
304         mDtmfEvent = -1;
305     }
306 
307     int32_t buffer[mSampleCount + 3];
308     bool data = false;
309     if (mMode != RECEIVE_ONLY) {
310         // Mix all other streams.
311         memset(buffer, 0, sizeof(buffer));
312         while (chain) {
313             if (chain != this) {
314                 data |= chain->mix(buffer, tick - mInterval, tick, mSampleRate);
315             }
316             chain = chain->mNext;
317         }
318     }
319 
320     int16_t samples[mSampleCount];
321     if (data) {
322         // Saturate into 16 bits.
323         for (int i = 0; i < mSampleCount; ++i) {
324             int32_t sample = buffer[i];
325             if (sample < -32768) {
326                 sample = -32768;
327             }
328             if (sample > 32767) {
329                 sample = 32767;
330             }
331             samples[i] = sample;
332         }
333     } else {
334         if ((mTick ^ mKeepAlive) >> 10 == 0) {
335             return;
336         }
337         mKeepAlive = mTick;
338         memset(samples, 0, sizeof(samples));
339 
340         if (mMode != RECEIVE_ONLY) {
341             ALOGV("stream[%d] no data", mSocket);
342         }
343     }
344 
345     if (!mCodec) {
346         // Special case for device stream.
347         send(mSocket, samples, sizeof(samples), MSG_DONTWAIT);
348         return;
349     }
350 
351     // Cook the packet and send it out.
352     buffer[0] = htonl(mCodecMagic | mSequence);
353     buffer[1] = htonl(mTimestamp);
354     buffer[2] = mSsrc;
355     int length = mCodec->encode(&buffer[3], samples);
356     if (length <= 0) {
357         ALOGV("stream[%d] encoder error", mSocket);
358         return;
359     }
360     sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
361         sizeof(mRemote));
362 }
363 
decode(int tick)364 void AudioStream::decode(int tick)
365 {
366     char c;
367     if (mMode == SEND_ONLY) {
368         recv(mSocket, &c, 1, MSG_DONTWAIT);
369         return;
370     }
371 
372     // Make sure mBufferHead and mBufferTail are reasonable.
373     if ((unsigned int)(tick + BUFFER_SIZE - mBufferHead) > BUFFER_SIZE * 2) {
374         mBufferHead = tick - HISTORY_SIZE;
375         mBufferTail = mBufferHead;
376     }
377 
378     if (tick - mBufferHead > HISTORY_SIZE) {
379         // Throw away outdated samples.
380         mBufferHead = tick - HISTORY_SIZE;
381         if (mBufferTail - mBufferHead < 0) {
382             mBufferTail = mBufferHead;
383         }
384     }
385 
386     // Adjust the jitter buffer if the latency keeps larger than the threshold
387     // in the measurement period.
388     int score = mBufferTail - tick - MEASURE_BASE;
389     if (mLatencyScore > score || mLatencyScore <= 0) {
390         mLatencyScore = score;
391         mLatencyTimer = tick;
392     } else if (tick - mLatencyTimer >= MEASURE_PERIOD) {
393         ALOGV("stream[%d] reduces latency of %dms", mSocket, mLatencyScore);
394         mBufferTail -= mLatencyScore;
395         mLatencyScore = -1;
396     }
397 
398     int count = (BUFFER_SIZE - (mBufferTail - mBufferHead)) * mSampleRate;
399     if (count < mSampleCount) {
400         // Buffer overflow. Drop the packet.
401         ALOGV("stream[%d] buffer overflow", mSocket);
402         recv(mSocket, &c, 1, MSG_DONTWAIT);
403         return;
404     }
405 
406     // Receive the packet and decode it.
407     int16_t samples[count];
408     if (!mCodec) {
409         // Special case for device stream.
410         count = recv(mSocket, samples, sizeof(samples),
411             MSG_TRUNC | MSG_DONTWAIT) >> 1;
412     } else {
413         __attribute__((aligned(4))) uint8_t buffer[2048];
414         sockaddr_storage remote;
415         socklen_t addrlen = sizeof(remote);
416 
417         int bufferSize = sizeof(buffer);
418         int length = recvfrom(mSocket, buffer, bufferSize,
419             MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &addrlen);
420 
421         // Do we need to check SSRC, sequence, and timestamp? They are not
422         // reliable but at least they can be used to identify duplicates?
423         if (length < 12 || length > bufferSize ||
424             (ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
425             ALOGV("stream[%d] malformed packet", mSocket);
426             return;
427         }
428         int offset = 12 + ((buffer[0] & 0x0F) << 2);
429         if (offset+2 >= bufferSize) {
430             ALOGV("invalid buffer offset: %d", offset+2);
431             return;
432         }
433         if ((buffer[0] & 0x10) != 0) {
434             offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
435         }
436         if (offset >= bufferSize) {
437             ALOGV("invalid buffer offset: %d", offset);
438             return;
439         }
440         if ((buffer[0] & 0x20) != 0) {
441             length -= buffer[length - 1];
442         }
443         length -= offset;
444         if (length >= 0) {
445             length = mCodec->decode(samples, count, &buffer[offset], length);
446         }
447         if (length > 0 && mFixRemote) {
448             mRemote = remote;
449             mFixRemote = false;
450         }
451         count = length;
452     }
453     if (count <= 0) {
454         ALOGV("stream[%d] decoder error", mSocket);
455         return;
456     }
457 
458     if (tick - mBufferTail > 0) {
459         // Buffer underrun. Reset the jitter buffer.
460         ALOGV("stream[%d] buffer underrun", mSocket);
461         if (mBufferTail - mBufferHead <= 0) {
462             mBufferHead = tick + mInterval;
463             mBufferTail = mBufferHead;
464         } else {
465             int tail = (tick + mInterval) * mSampleRate;
466             for (int i = mBufferTail * mSampleRate; i - tail < 0; ++i) {
467                 mBuffer[i & mBufferMask] = 0;
468             }
469             mBufferTail = tick + mInterval;
470         }
471     }
472 
473     // Append to the jitter buffer.
474     int tail = mBufferTail * mSampleRate;
475     for (int i = 0; i < count; ++i) {
476         mBuffer[tail & mBufferMask] = samples[i];
477         ++tail;
478     }
479     mBufferTail += mInterval;
480 }
481 
482 //------------------------------------------------------------------------------
483 
484 class AudioGroup
485 {
486 public:
487     explicit AudioGroup(const AttributionSourceState &attributionSource);
488     ~AudioGroup();
489     bool set(int sampleRate, int sampleCount);
490 
491     bool setMode(int mode);
492     bool sendDtmf(int event);
493     bool add(AudioStream *stream);
494     bool remove(AudioStream *stream);
platformHasAec()495     bool platformHasAec() { return mPlatformHasAec; }
496 
497 private:
498     enum {
499         ON_HOLD = 0,
500         MUTED = 1,
501         NORMAL = 2,
502         ECHO_SUPPRESSION = 3,
503         LAST_MODE = 3,
504     };
505 
506     bool checkPlatformAec();
507 
508     AudioStream *mChain;
509     int mEventQueue;
510     volatile int mDtmfEvent;
511 
512     const AttributionSourceState mAttributionSource;
513 
514     int mMode;
515     int mSampleRate;
516     size_t mSampleCount;
517     int mDeviceSocket;
518     bool mPlatformHasAec;
519 
520     class NetworkThread : public Thread
521     {
522     public:
NetworkThread(AudioGroup * group)523         explicit NetworkThread(AudioGroup *group) : Thread(false), mGroup(group) {}
524 
start()525         bool start()
526         {
527             if (run("Network", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
528                 ALOGE("cannot start network thread");
529                 return false;
530             }
531             return true;
532         }
533 
534     private:
535         AudioGroup *mGroup;
536         bool threadLoop();
537     };
538     sp<NetworkThread> mNetworkThread;
539 
540     class DeviceThread : public Thread
541     {
542     public:
DeviceThread(AudioGroup * group)543         explicit DeviceThread(AudioGroup *group) : Thread(false), mGroup(group) {}
544 
start()545         bool start()
546         {
547             if (run("Device", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
548                 ALOGE("cannot start device thread");
549                 return false;
550             }
551             return true;
552         }
553 
554     private:
555         AudioGroup *mGroup;
556         bool threadLoop();
557     };
558     sp<DeviceThread> mDeviceThread;
559 };
560 
AudioGroup(const AttributionSourceState & attributionSource)561 AudioGroup::AudioGroup(const AttributionSourceState &attributionSource)
562         : mAttributionSource(attributionSource)
563 {
564     mMode = ON_HOLD;
565     mChain = NULL;
566     mEventQueue = -1;
567     mDtmfEvent = -1;
568     mDeviceSocket = -1;
569     mNetworkThread = new NetworkThread(this);
570     mDeviceThread = new DeviceThread(this);
571     mPlatformHasAec = checkPlatformAec();
572 }
573 
~AudioGroup()574 AudioGroup::~AudioGroup()
575 {
576     mNetworkThread->requestExitAndWait();
577     mDeviceThread->requestExitAndWait();
578     close(mEventQueue);
579     close(mDeviceSocket);
580     while (mChain) {
581         AudioStream *next = mChain->mNext;
582         delete mChain;
583         mChain = next;
584     }
585     ALOGD("group[%d] is dead", mDeviceSocket);
586 }
587 
set(int sampleRate,int sampleCount)588 bool AudioGroup::set(int sampleRate, int sampleCount)
589 {
590     mEventQueue = epoll_create1(EPOLL_CLOEXEC);
591     if (mEventQueue == -1) {
592         ALOGE("epoll_create1: %s", strerror(errno));
593         return false;
594     }
595 
596     mSampleRate = sampleRate;
597     mSampleCount = sampleCount;
598 
599     // Create device socket.
600     int pair[2];
601     if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
602         ALOGE("socketpair: %s", strerror(errno));
603         return false;
604     }
605     mDeviceSocket = pair[0];
606 
607     // Create device stream.
608     mChain = new AudioStream;
609     if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
610         sampleRate, sampleCount, -1, -1)) {
611         close(pair[1]);
612         ALOGE("cannot initialize device stream");
613         return false;
614     }
615 
616     // Give device socket a reasonable timeout.
617     timeval tv;
618     tv.tv_sec = 0;
619     tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
620     if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
621         ALOGE("setsockopt: %s", strerror(errno));
622         return false;
623     }
624 
625     // Add device stream into event queue.
626     epoll_event event;
627     event.events = EPOLLIN;
628     event.data.ptr = mChain;
629     if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
630         ALOGE("epoll_ctl: %s", strerror(errno));
631         return false;
632     }
633 
634     // Anything else?
635     ALOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
636     return true;
637 }
638 
setMode(int mode)639 bool AudioGroup::setMode(int mode)
640 {
641     if (mode < 0 || mode > LAST_MODE) {
642         return false;
643     }
644     // FIXME: temporary code to overcome echo and mic gain issues on herring and tuna boards.
645     // Must be modified/removed when the root cause of the issue is fixed in the hardware or
646     // driver
647     char value[PROPERTY_VALUE_MAX];
648     property_get("ro.product.board", value, "");
649     if (mode == NORMAL &&
650             (!strcmp(value, "herring") || !strcmp(value, "tuna"))) {
651         mode = ECHO_SUPPRESSION;
652     }
653     if (mMode == mode) {
654         return true;
655     }
656 
657     mDeviceThread->requestExitAndWait();
658     ALOGD("group[%d] switches from mode %d to %d", mDeviceSocket, mMode, mode);
659     mMode = mode;
660     return (mode == ON_HOLD) || mDeviceThread->start();
661 }
662 
sendDtmf(int event)663 bool AudioGroup::sendDtmf(int event)
664 {
665     if (event < 0 || event > 15) {
666         return false;
667     }
668 
669     // DTMF is rarely used, so we try to make it as lightweight as possible.
670     // Using volatile might be dodgy, but using a pipe or pthread primitives
671     // or stop-set-restart threads seems too heavy. Will investigate later.
672     timespec ts;
673     ts.tv_sec = 0;
674     ts.tv_nsec = 100000000;
675     for (int i = 0; mDtmfEvent != -1 && i < 20; ++i) {
676         nanosleep(&ts, NULL);
677     }
678     if (mDtmfEvent != -1) {
679         return false;
680     }
681     mDtmfEvent = event;
682     nanosleep(&ts, NULL);
683     return true;
684 }
685 
add(AudioStream * stream)686 bool AudioGroup::add(AudioStream *stream)
687 {
688     mNetworkThread->requestExitAndWait();
689 
690     epoll_event event;
691     event.events = EPOLLIN;
692     event.data.ptr = stream;
693     if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, stream->mSocket, &event)) {
694         ALOGE("epoll_ctl: %s", strerror(errno));
695         return false;
696     }
697 
698     stream->mNext = mChain->mNext;
699     mChain->mNext = stream;
700     if (!mNetworkThread->start()) {
701         // Only take over the stream when succeeded.
702         mChain->mNext = stream->mNext;
703         return false;
704     }
705 
706     ALOGD("stream[%d] joins group[%d]", stream->mSocket, mDeviceSocket);
707     return true;
708 }
709 
remove(AudioStream * stream)710 bool AudioGroup::remove(AudioStream *stream)
711 {
712     mNetworkThread->requestExitAndWait();
713 
714     for (AudioStream *chain = mChain; chain->mNext; chain = chain->mNext) {
715         if (chain->mNext == stream) {
716             if (epoll_ctl(mEventQueue, EPOLL_CTL_DEL, stream->mSocket, NULL)) {
717                 ALOGE("epoll_ctl: %s", strerror(errno));
718                 return false;
719             }
720             chain->mNext = stream->mNext;
721             ALOGD("stream[%d] leaves group[%d]", stream->mSocket, mDeviceSocket);
722             delete stream;
723             break;
724         }
725     }
726 
727     // Do not start network thread if there is only one stream.
728     if (!mChain->mNext || !mNetworkThread->start()) {
729         return false;
730     }
731     return true;
732 }
733 
threadLoop()734 bool AudioGroup::NetworkThread::threadLoop()
735 {
736     AudioStream *chain = mGroup->mChain;
737     int tick = elapsedRealtime();
738     int deadline = tick + 10;
739     int count = 0;
740 
741     for (AudioStream *stream = chain; stream; stream = stream->mNext) {
742         if (tick - stream->mTick >= 0) {
743             stream->encode(tick, chain);
744         }
745         if (deadline - stream->mTick > 0) {
746             deadline = stream->mTick;
747         }
748         ++count;
749     }
750 
751     int event = mGroup->mDtmfEvent;
752     if (event != -1) {
753         for (AudioStream *stream = chain; stream; stream = stream->mNext) {
754             stream->sendDtmf(event);
755         }
756         mGroup->mDtmfEvent = -1;
757     }
758 
759     deadline -= tick;
760     if (deadline < 1) {
761         deadline = 1;
762     }
763 
764     epoll_event events[count];
765     count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
766     if (count == -1) {
767         ALOGE("epoll_wait: %s", strerror(errno));
768         return false;
769     }
770     for (int i = 0; i < count; ++i) {
771         ((AudioStream *)events[i].data.ptr)->decode(tick);
772     }
773 
774     return true;
775 }
776 
checkPlatformAec()777 bool AudioGroup::checkPlatformAec()
778 {
779     effect_descriptor_t fxDesc;
780     uint32_t numFx;
781 
782     if (AudioEffect::queryNumberEffects(&numFx) != NO_ERROR) {
783         return false;
784     }
785     for (uint32_t i = 0; i < numFx; i++) {
786         if (AudioEffect::queryEffect(i, &fxDesc) != NO_ERROR) {
787             continue;
788         }
789         if (memcmp(&fxDesc.type, FX_IID_AEC, sizeof(effect_uuid_t)) == 0) {
790             return true;
791         }
792     }
793     return false;
794 }
795 
threadLoop()796 bool AudioGroup::DeviceThread::threadLoop()
797 {
798     int mode = mGroup->mMode;
799     int sampleRate = mGroup->mSampleRate;
800     size_t sampleCount = mGroup->mSampleCount;
801     int deviceSocket = mGroup->mDeviceSocket;
802 
803     // Find out the frame count for AudioTrack and AudioRecord.
804     size_t output = 0;
805     size_t input = 0;
806     if (AudioTrack::getMinFrameCount(&output, AUDIO_STREAM_VOICE_CALL,
807         sampleRate) != NO_ERROR || output <= 0 ||
808         AudioRecord::getMinFrameCount(&input, sampleRate,
809         AUDIO_FORMAT_PCM_16_BIT, AUDIO_CHANNEL_IN_MONO) != NO_ERROR || input <= 0) {
810         ALOGE("cannot compute frame count");
811         return false;
812     }
813     ALOGD("reported frame count: output %zu, input %zu", output, input);
814 
815     if (output < sampleCount * 2) {
816         output = sampleCount * 2;
817     }
818     if (input < sampleCount * 2) {
819         input = sampleCount * 2;
820     }
821     ALOGD("adjusted frame count: output %zu, input %zu", output, input);
822 
823     // Initialize AudioTrack and AudioRecord.
824     sp<AudioTrack> track = new AudioTrack();
825     sp<AudioRecord> record = new AudioRecord(mGroup->mAttributionSource);
826     // Set caller name so it can be logged in destructor.
827     // MediaMetricsConstants.h: AMEDIAMETRICS_PROP_CALLERNAME_VALUE_RTP
828     track->setCallerName("rtp");
829     record->setCallerName("rtp");
830     if (track->set(AUDIO_STREAM_VOICE_CALL, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
831                 AUDIO_CHANNEL_OUT_MONO, output, AUDIO_OUTPUT_FLAG_NONE, NULL /*callback_t*/,
832                 NULL /*user*/, 0 /*notificationFrames*/, 0 /*sharedBuffer*/,
833                 false /*threadCanCallJava*/, AUDIO_SESSION_ALLOCATE,
834                 AudioTrack::TRANSFER_OBTAIN) != NO_ERROR ||
835             record->set(AUDIO_SOURCE_VOICE_COMMUNICATION, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
836                 AUDIO_CHANNEL_IN_MONO, input, NULL /*callback_t*/, NULL /*user*/,
837                 0 /*notificationFrames*/, false /*threadCanCallJava*/, AUDIO_SESSION_ALLOCATE,
838                 AudioRecord::TRANSFER_OBTAIN) != NO_ERROR) {
839         ALOGE("cannot initialize audio device");
840         return false;
841     }
842     ALOGD("latency: output %d, input %d", track->latency(), record->latency());
843 
844     // Give device socket a reasonable buffer size.
845     setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
846     setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
847 
848     // Drain device socket.
849     char c;
850     while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
851 
852     // check if platform supports echo cancellation and do not active local echo suppression in
853     // this case
854     EchoSuppressor *echo = NULL;
855     sp<AudioEffect> aec;
856     if (mode == ECHO_SUPPRESSION) {
857         if (mGroup->platformHasAec()) {
858             aec = new AudioEffect(mGroup->mAttributionSource);
859             aec->set(FX_IID_AEC,
860                      NULL,
861                      0,
862                      0,
863                      0,
864                      record->getSessionId(),
865                      AUDIO_IO_HANDLE_NONE); // record sessionId is sufficient.
866             status_t status = aec->initCheck();
867             if (status == NO_ERROR || status == ALREADY_EXISTS) {
868                 aec->setEnabled(true);
869             } else {
870                 aec.clear();
871             }
872         }
873         // Create local echo suppressor if platform AEC cannot be used.
874         if (aec == 0) {
875              echo = new EchoSuppressor(sampleCount,
876                                        (track->latency() + record->latency()) * sampleRate / 1000);
877         }
878     }
879     // Start AudioRecord before AudioTrack. This prevents AudioTrack from being
880     // disabled due to buffer underrun while waiting for AudioRecord.
881     if (mode != MUTED) {
882         record->start();
883         int16_t one;
884         // FIXME this may not work any more
885         record->read(&one, sizeof(one));
886     }
887     track->start();
888 
889     while (!exitPending()) {
890         int16_t output[sampleCount];
891         if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
892             memset(output, 0, sizeof(output));
893         }
894 
895         int16_t input[sampleCount];
896         int toWrite = sampleCount;
897         int toRead = (mode == MUTED) ? 0 : sampleCount;
898         int chances = 100;
899 
900         while (--chances > 0 && (toWrite > 0 || toRead > 0)) {
901             if (toWrite > 0) {
902                 AudioTrack::Buffer buffer;
903                 buffer.frameCount = toWrite;
904 
905                 status_t status = track->obtainBuffer(&buffer, 1);
906                 if (status == NO_ERROR) {
907                     int offset = sampleCount - toWrite;
908                     memcpy(buffer.i8, &output[offset], buffer.size);
909                     toWrite -= buffer.frameCount;
910                     track->releaseBuffer(&buffer);
911                 } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
912                     ALOGE("cannot write to AudioTrack");
913                     goto exit;
914                 }
915             }
916 
917             if (toRead > 0) {
918                 AudioRecord::Buffer buffer;
919                 buffer.frameCount = toRead;
920 
921                 status_t status = record->obtainBuffer(&buffer, 1);
922                 if (status == NO_ERROR) {
923                     int offset = sampleCount - toRead;
924                     memcpy(&input[offset], buffer.i8, buffer.size);
925                     toRead -= buffer.frameCount;
926                     record->releaseBuffer(&buffer);
927                 } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
928                     ALOGE("cannot read from AudioRecord");
929                     goto exit;
930                 }
931             }
932         }
933 
934         if (chances <= 0) {
935             ALOGW("device loop timeout");
936             while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
937         }
938 
939         if (mode != MUTED) {
940             if (echo != NULL) {
941                 ALOGV("echo->run()");
942                 echo->run(output, input);
943             }
944             send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
945         }
946     }
947 
948 exit:
949     delete echo;
950     return true;
951 }
952 
953 //------------------------------------------------------------------------------
954 
955 static jfieldID gNative;
956 static jfieldID gMode;
957 
add(JNIEnv * env,jobject thiz,jint mode,jint socket,jstring jRemoteAddress,jint remotePort,jstring jCodecSpec,jint dtmfType,jobject jAttributionSource)958 jlong add(JNIEnv *env, jobject thiz, jint mode,
959     jint socket, jstring jRemoteAddress, jint remotePort,
960     jstring jCodecSpec, jint dtmfType, jobject jAttributionSource)
961 {
962     AudioCodec *codec = NULL;
963     AudioStream *stream = NULL;
964     AudioGroup *group = NULL;
965 
966     // Sanity check.
967     sockaddr_storage remote;
968     if (parse(env, jRemoteAddress, remotePort, &remote) < 0) {
969         // Exception already thrown.
970         return 0;
971     }
972     if (!jCodecSpec) {
973         jniThrowNullPointerException(env, "codecSpec");
974         return 0;
975     }
976     const char *codecSpec = env->GetStringUTFChars(jCodecSpec, NULL);
977     if (!codecSpec) {
978         // Exception already thrown.
979         return 0;
980     }
981     socket = dup(socket);
982     if (socket == -1) {
983         jniThrowException(env, "java/lang/IllegalStateException",
984             "cannot get stream socket");
985         return 0;
986     }
987 
988     Parcel* parcel = parcelForJavaObject(env, jAttributionSource);
989     AttributionSourceState attributionSource;
990     attributionSource.readFromParcel(parcel);
991 
992     // Create audio codec.
993     int codecType = -1;
994     char codecName[16];
995     int sampleRate = -1;
996     sscanf(codecSpec, "%d %15[^/]%*c%d", &codecType, codecName, &sampleRate);
997     codec = newAudioCodec(codecName);
998     int sampleCount = (codec ? codec->set(sampleRate, codecSpec) : -1);
999     env->ReleaseStringUTFChars(jCodecSpec, codecSpec);
1000     if (sampleCount <= 0) {
1001         jniThrowException(env, "java/lang/IllegalStateException",
1002             "cannot initialize audio codec");
1003         goto error;
1004     }
1005 
1006     // Create audio stream.
1007     stream = new AudioStream;
1008     if (!stream->set(mode, socket, &remote, codec, sampleRate, sampleCount,
1009         codecType, dtmfType)) {
1010         jniThrowException(env, "java/lang/IllegalStateException",
1011             "cannot initialize audio stream");
1012         goto error;
1013     }
1014     socket = -1;
1015     codec = NULL;
1016 
1017     // Create audio group.
1018     group = (AudioGroup *)env->GetLongField(thiz, gNative);
1019     if (!group) {
1020         int mode = env->GetIntField(thiz, gMode);
1021         group = new AudioGroup(attributionSource);
1022         if (!group->set(8000, 256) || !group->setMode(mode)) {
1023             jniThrowException(env, "java/lang/IllegalStateException",
1024                 "cannot initialize audio group");
1025             goto error;
1026         }
1027     }
1028 
1029     // Add audio stream into audio group.
1030     if (!group->add(stream)) {
1031         jniThrowException(env, "java/lang/IllegalStateException",
1032             "cannot add audio stream");
1033         goto error;
1034     }
1035 
1036     // Succeed.
1037     env->SetLongField(thiz, gNative, (jlong)group);
1038     return (jlong)stream;
1039 
1040 error:
1041     delete group;
1042     delete stream;
1043     delete codec;
1044     close(socket);
1045     env->SetLongField(thiz, gNative, 0);
1046     return 0;
1047 }
1048 
remove(JNIEnv * env,jobject thiz,jlong stream)1049 void remove(JNIEnv *env, jobject thiz, jlong stream)
1050 {
1051     AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
1052     if (group) {
1053         if (!stream || !group->remove((AudioStream *)stream)) {
1054             delete group;
1055             env->SetLongField(thiz, gNative, 0);
1056         }
1057     }
1058 }
1059 
setMode(JNIEnv * env,jobject thiz,jint mode)1060 void setMode(JNIEnv *env, jobject thiz, jint mode)
1061 {
1062     AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
1063     if (group && !group->setMode(mode)) {
1064         jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
1065     }
1066 }
1067 
sendDtmf(JNIEnv * env,jobject thiz,jint event)1068 void sendDtmf(JNIEnv *env, jobject thiz, jint event)
1069 {
1070     AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
1071     if (group && !group->sendDtmf(event)) {
1072         jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
1073     }
1074 }
1075 
1076 JNINativeMethod gMethods[] = {
1077     {"nativeAdd", "(IILjava/lang/String;ILjava/lang/String;ILandroid/os/Parcel;)J", (void *)add},
1078     {"nativeRemove", "(J)V", (void *)remove},
1079     {"nativeSetMode", "(I)V", (void *)setMode},
1080     {"nativeSendDtmf", "(I)V", (void *)sendDtmf},
1081 };
1082 
1083 } // namespace
1084 
registerAudioGroup(JNIEnv * env)1085 int registerAudioGroup(JNIEnv *env)
1086 {
1087     gRandom = open("/dev/urandom", O_RDONLY | O_CLOEXEC);
1088     if (gRandom == -1) {
1089         ALOGE("urandom: %s", strerror(errno));
1090         return -1;
1091     }
1092 
1093     jclass clazz;
1094     if ((clazz = env->FindClass("android/net/rtp/AudioGroup")) == NULL ||
1095         (gNative = env->GetFieldID(clazz, "mNative", "J")) == NULL ||
1096         (gMode = env->GetFieldID(clazz, "mMode", "I")) == NULL ||
1097         env->RegisterNatives(clazz, gMethods, NELEM(gMethods)) < 0) {
1098         ALOGE("JNI registration failed");
1099         return -1;
1100     }
1101 
1102     return 0;
1103 }
1104