1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifdef HAVE_CONFIG_H
17 #include <config.h>
18 #endif
19 
20 #undef LOG_TAG
21 #define LOG_TAG "ModuleSplitStreamSink"
22 
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <errno.h>
26 #include <unistd.h>
27 #include <string.h>
28 #include "securec.h"
29 
30 #include <pulse/rtclock.h>
31 #include <pulse/timeval.h>
32 #include <pulse/util.h>
33 #include <pulse/xmalloc.h>
34 
35 #include <pulsecore/i18n.h>
36 #include <pulsecore/macro.h>
37 #include <pulsecore/sink.h>
38 #include <pulsecore/module.h>
39 #include <pulsecore/core-util.h>
40 #include <pulsecore/modargs.h>
41 #include <pulsecore/log.h>
42 #include <pulsecore/thread.h>
43 #include <pulsecore/thread-mq.h>
44 #include <pulsecore/rtpoll.h>
45 #include <pulsecore/mix.h>
46 #include <pulsecore/memblockq.h>
47 #include <pulsecore/memblock.h>
48 
49 #include <pthread.h>
50 #include "audio_log.h"
51 #include "audio_schedule.h"
52 #include "audio_utils_c.h"
53 #include "audio_hdiadapter_info.h"
54 #include "renderer_sink_adapter.h"
55 
56 #define DEFAULT_SINK_NAME "hdi_output"
57 #define DEFAULT_DEVICE_CLASS "primary"
58 #define DEFAULT_DEVICE_NETWORKID "LocalDevice"
59 #define DEFAULT_BUFFER_SIZE 8192
60 #define MAX_SINK_VOLUME_LEVEL 1.0
61 #define MIX_BUFFER_LENGTH (pa_page_size())
62 #define MAX_REWIND (7000 * PA_USEC_PER_MSEC)
63 #define USEC_PER_SEC 1000000
64 #define SCENE_TYPE_NUM 7
65 #define PA_ERR (-1)
66 #define MAX_PARTS 10
67 
68 #define STREAM_TYPE_MEDIA "1"
69 #define STREAM_TYPE_COMMUNICATION "2"
70 #define STREAM_TYPE_NAVIGATION "13"
71 
72 char *g_splitArr[MAX_PARTS];
73 int g_splitNums = 0;
74 const char *SPLIT_MODE;
75 const uint32_t SPLIT_ONE_STREAM = 1;
76 const uint32_t SPLIT_TWO_STREAM = 2;
77 const uint32_t SPLIT_THREE_STREAM = 3;
78 
79 PA_MODULE_AUTHOR("OpenHarmony");
80 PA_MODULE_DESCRIPTION(_("Split Stream Sink"));
81 PA_MODULE_VERSION(PACKAGE_VERSION);
82 PA_MODULE_LOAD_ONCE(false);
83 PA_MODULE_USAGE(
84         "sink_name=<name of sink> "
85         "sink_properties=<properties for the sink> "
86         "format=<sample format> "
87         "rate=<sample rate> "
88         "channels=<number of channels> "
89         "channel_map=<channel map>"
90         "buffer_size=<custom buffer size>"
91         "formats=<semi-colon separated sink formats>");
92 
93 static ssize_t SplitRenderWrite(struct RendererSinkAdapter *sinkAdapter, pa_memchunk *pchunk, char *streamType);
94 
95 struct userdata {
96     pa_core *core;
97     pa_module *module;
98     pa_sink *sink;
99     pa_thread *thread;
100     pa_thread_mq thread_mq;
101     pa_rtpoll *rtpoll;
102     uint32_t buffer_size;
103     pa_usec_t block_usec;
104     pa_usec_t timestamp;
105     pa_idxset *formats;
106     pa_thread *thread_split_hdi;
107     bool isHDISinkStarted;
108     struct RendererSinkAdapter *sinkAdapter;
109     pa_asyncmsgq *dq;
110     pa_atomic_t dflag;
111     pa_usec_t writeTime;
112     pa_usec_t prewrite;
113     pa_sink_state_t previousState;
114     pa_usec_t timestampLastLog;
115     const char *deviceNetworkId;
116     const char *adapterName;
117     uint32_t open_mic_speaker;
118     pa_sample_spec ss;
119     pa_channel_map map;
120     int32_t deviceType;
121     size_t bytesDropped;
122     uint32_t writeCount;
123     uint32_t renderCount;
124     uint32_t fixed_latency;
125     pa_usec_t lastProcessDataTime;
126     uint32_t renderInIdleState;
127 };
128 
129 static const char * const VALID_MODARGS[] = {
130     "sink_name",
131     "device_class",
132     "sink_properties",
133     "format",
134     "rate",
135     "channels",
136     "channel_map",
137     "buffer_size",
138     "file_path",
139     "adapter_name",
140     "fixed_latency",
141     "sink_latency",
142     "render_in_idle_state",
143     "open_mic_speaker",
144     "test_mode_on",
145     "network_id",
146     "device_type",
147     "offload_enable",
148     "split_mode",
149     NULL
150 };
151 
152 char *const SCENE_TYPE_SET[SCENE_TYPE_NUM] = {"SCENE_MUSIC", "SCENE_GAME", "SCENE_MOVIE", "SCENE_SPEECH", "SCENE_RING",
153     "SCENE_OTHERS", "EFFECT_NONE"};
154 
155 enum {
156     HDI_INIT,
157     HDI_DEINIT,
158     HDI_START,
159     HDI_STOP,
160     HDI_RENDER,
161     QUIT,
162     HDI_RENDER_MEDIA,
163     HDI_RENDER_NAVIGATION,
164     HDI_RENDER_COMMUNICATION
165 };
166 
ConvertPaToHdiAdapterFormat(pa_sample_format_t format)167 static enum HdiAdapterFormat ConvertPaToHdiAdapterFormat(pa_sample_format_t format)
168 {
169     enum HdiAdapterFormat adapterFormat;
170     switch (format) {
171         case PA_SAMPLE_U8:
172             adapterFormat = SAMPLE_U8;
173             break;
174         case PA_SAMPLE_S16LE:
175             adapterFormat = SAMPLE_S16;
176             break;
177         case PA_SAMPLE_S24LE:
178             adapterFormat = SAMPLE_S24;
179             break;
180         case PA_SAMPLE_S32LE:
181             adapterFormat = SAMPLE_S32;
182             break;
183         default:
184             adapterFormat = INVALID_WIDTH;
185             break;
186     }
187 
188     return adapterFormat;
189 }
190 
ConvertToSplitArr(const char * str)191 static void ConvertToSplitArr(const char *str)
192 {
193     for (int i = 0; i < MAX_PARTS; ++i) {
194         g_splitArr[i] = NULL;
195     }
196 
197     char *token;
198     char *copy = strdup(str);
199     int count = 0;
200 
201     token = strtok(copy, ":");
202     while (token != NULL && count < MAX_PARTS) {
203         g_splitArr[count] = (char *)malloc(strlen(token) + 1);
204         if (g_splitArr[count] != NULL) {
205             if (strcpy_s(g_splitArr[count], strlen(token) + 1, token) != 0) {
206                 AUDIO_ERR_LOG("strcpy_s failed.");
207             };
208             count++;
209         } else {
210             AUDIO_ERR_LOG("Memory allocation failed.\n");
211             break;
212         }
213         token = strtok(NULL, ":");
214     }
215     g_splitNums = count;
216     free(copy);
217 }
218 
SinkProcessMsg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)219 static int SinkProcessMsg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk)
220 {
221     switch (code) {
222         case PA_SINK_MESSAGE_GET_LATENCY:
223             *((int64_t*) data) = 0;
224             return 0;
225         default:
226             break;
227     }
228 
229     return pa_sink_process_msg(o, code, data, offset, chunk);
230 }
231 
232 /* Called from the IO thread. */
SinkSetStateInIoThreadCb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)233 static int SinkSetStateInIoThreadCb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause)
234 {
235     struct userdata *u;
236 
237     pa_assert(s);
238     pa_assert_se(u = s->userdata);
239 
240     if (s->thread_info.state == PA_SINK_SUSPENDED || s->thread_info.state == PA_SINK_INIT) {
241         if (PA_SINK_IS_OPENED(new_state)) {
242             u->timestamp = pa_rtclock_now();
243         }
244     }
245 
246     return 0;
247 }
248 
SinkUpdateRequestedLatencyCb(pa_sink * s)249 static void SinkUpdateRequestedLatencyCb(pa_sink *s)
250 {
251     struct userdata *u;
252     size_t nbytes;
253 
254     pa_sink_assert_ref(s);
255     pa_assert_se(u = s->userdata);
256 
257     u->block_usec = pa_sink_get_requested_latency_within_thread(s);
258 
259     if (u->block_usec == (pa_usec_t) -1) {
260         u->block_usec = s->thread_info.max_latency;
261     }
262 
263     nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
264     pa_sink_set_max_rewind_within_thread(s, nbytes);
265     pa_sink_set_max_request_within_thread(s, nbytes);
266 }
267 
SinkReconfigureCb(pa_sink * s,pa_sample_spec * spec,bool passthrough)268 static void SinkReconfigureCb(pa_sink *s, pa_sample_spec *spec, bool passthrough)
269 {
270     s->sample_spec = *spec;
271 }
272 
SinkSetFormatsCb(pa_sink * s,pa_idxset * formats)273 static bool SinkSetFormatsCb(pa_sink *s, pa_idxset *formats)
274 {
275     struct userdata *u = s->userdata;
276 
277     pa_assert(u);
278 
279     pa_idxset_free(u->formats, (pa_free_cb_t) pa_format_info_free);
280     u->formats = pa_idxset_copy(formats, (pa_copy_func_t) pa_format_info_copy);
281 
282     return true;
283 }
284 
SinkGetFormatsCb(pa_sink * s)285 static pa_idxset* SinkGetFormatsCb(pa_sink *s)
286 {
287     struct userdata *u = s->userdata;
288 
289     pa_assert(u);
290 
291     return pa_idxset_copy(u->formats, (pa_copy_func_t) pa_format_info_copy);
292 }
293 
ProcessRewind(struct userdata * u,pa_usec_t now)294 static void ProcessRewind(struct userdata *u, pa_usec_t now)
295 {
296     size_t rewindNbytes;
297     size_t inBuffer;
298     pa_usec_t delay;
299 
300     pa_assert(u);
301 
302     rewindNbytes = u->sink->thread_info.rewind_nbytes;
303     if (!PA_SINK_IS_OPENED(u->sink->thread_info.state) || rewindNbytes == 0) {
304         goto do_nothing;
305     }
306     AUDIO_DEBUG_LOG("Requested to rewind %lu bytes.", (unsigned long) rewindNbytes);
307 
308     if (u->timestamp <= now) {
309         goto do_nothing;
310     }
311 
312     delay = u->timestamp - now;
313     inBuffer = pa_usec_to_bytes(delay, &u->sink->sample_spec);
314     if (inBuffer == 0) {
315         goto do_nothing;
316     }
317 
318     if (rewindNbytes > inBuffer) {
319         rewindNbytes = inBuffer;
320     }
321 
322     pa_sink_process_rewind(u->sink, rewindNbytes);
323     u->timestamp -= pa_bytes_to_usec(rewindNbytes, &u->sink->sample_spec);
324 
325     AUDIO_DEBUG_LOG("Rewound %lu bytes.", (unsigned long) rewindNbytes);
326     return;
327 
328 do_nothing:
329     pa_sink_process_rewind(u->sink, 0);
330 }
331 
StartSplitStreamHdiIfRunning(struct userdata * u)332 static void StartSplitStreamHdiIfRunning(struct userdata *u)
333 {
334     AUTO_CTRACE("split_stream_sink::StartPrimaryHdiIfRunning");
335     if (u->isHDISinkStarted) {
336         return;
337     }
338 
339     if (u->sinkAdapter->RendererSinkStart(u->sinkAdapter)) {
340         AUDIO_ERR_LOG("split_stream_sink,audiorenderer control start failed!");
341         u->sinkAdapter->RendererSinkDeInit(u->sinkAdapter);
342     } else {
343         u->isHDISinkStarted = true;
344         u->writeCount = 0;
345         u->renderCount = 0;
346         AUDIO_INFO_LOG("StartPrimaryHdiIfRunning, Successfully restarted HDI renderer");
347         u->renderInIdleState = 1;
348     }
349 }
350 
SplitSinkRenderInputsDrop(pa_sink * si,pa_mix_info * infoIn,unsigned n,pa_memchunk * chunkIn)351 static void SplitSinkRenderInputsDrop(pa_sink *si, pa_mix_info *infoIn, unsigned n, pa_memchunk *chunkIn)
352 {
353     AUTO_CTRACE("split_stream_sink::SplitSinkRenderInputsDrop:%u:len:%zu", n, chunkIn->length);
354     pa_sink_assert_ref(si);
355     pa_sink_assert_io_context(si);
356     pa_assert(chunkIn);
357     pa_assert(chunkIn->memblock);
358     pa_assert(chunkIn->length > 0);
359 
360     /* We optimize for the case where the order of the inputs has not changed */
361 
362     pa_mix_info *infoCur = NULL;
363     pa_sink_input *sinkInput = NULL;
364     for (uint32_t k = 0; k < n; k++) {
365         sinkInput = infoIn[k].userdata;
366         pa_sink_input_assert_ref(sinkInput);
367         AUTO_CTRACE("hdi_sink::InnerCap:pa_sink_input_drop:%u:len:%zu", sinkInput->index, chunkIn->length);
368         pa_sink_input_drop(sinkInput, chunkIn->length);
369 
370         infoCur = infoIn + k;
371         if (infoCur) {
372             if (infoCur->chunk.memblock) {
373                 pa_memblock_unref(infoCur->chunk.memblock);
374                 pa_memchunk_reset(&infoCur->chunk);
375             }
376 
377             pa_sink_input_unref(infoCur->userdata);
378         }
379     }
380 }
381 
IsPeekCurrentSinkInput(char * streamType,const char * usageStr)382 static int IsPeekCurrentSinkInput(char *streamType, const char *usageStr)
383 {
384     int flag = 0;
385     if (g_splitNums == SPLIT_ONE_STREAM) {
386         flag = 1;
387     }
388 
389     if (g_splitNums == SPLIT_TWO_STREAM) {
390         if (strcmp(usageStr, STREAM_TYPE_NAVIGATION) && !strcmp(streamType, STREAM_TYPE_MEDIA)) {
391             flag = 1;
392         } else if (!strcmp(usageStr, STREAM_TYPE_NAVIGATION) && !strcmp(streamType, STREAM_TYPE_NAVIGATION)) {
393             flag = 1;
394         }
395     }
396 
397     if (g_splitNums == SPLIT_THREE_STREAM) {
398         if (strcmp(usageStr, STREAM_TYPE_NAVIGATION) && strcmp(usageStr, STREAM_TYPE_COMMUNICATION) &&
399             !strcmp(streamType, STREAM_TYPE_MEDIA)) {
400             flag = 1;
401         } else if (!strcmp(usageStr, STREAM_TYPE_NAVIGATION) && !strcmp(streamType, STREAM_TYPE_NAVIGATION)) {
402             flag = 1;
403         } else if (!strcmp(usageStr, STREAM_TYPE_COMMUNICATION) && !strcmp(streamType, STREAM_TYPE_COMMUNICATION)) {
404             flag = 1;
405         }
406     }
407 
408     return flag;
409 }
410 
SplitFillMixInfo(pa_sink * s,size_t * length,pa_mix_info * info,unsigned maxInfo,char * streamType)411 static unsigned SplitFillMixInfo(pa_sink *s, size_t *length, pa_mix_info *info, unsigned maxInfo, char *streamType)
412 {
413     AUTO_CTRACE("split_stream_sink::SplitFillMixInfo:len:%zu", *length);
414     pa_sink_input *i;
415     unsigned n = 0;
416     void *state = NULL;
417     size_t mixlength = *length;
418 
419     pa_sink_assert_ref(s);
420     pa_sink_assert_io_context(s);
421     pa_assert(info);
422 
423     while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL)) && maxInfo > 0) {
424         const char *usageStr = pa_proplist_gets(i->proplist, "stream.usage");
425         AUDIO_DEBUG_LOG("splitFillMixInfo usageStr = %{public}s, streamType = %{public}s", usageStr, streamType);
426         if (IsPeekCurrentSinkInput(streamType, usageStr)) {
427             pa_sink_input_assert_ref(i);
428 
429             AUTO_CTRACE("module_split_stream_sink::splitFillMixInfo::pa_sink_input_peek:%u len:%zu", i->index, *length);
430             pa_sink_input_peek(i, *length, &info->chunk, &info->volume);
431 
432             if (mixlength == 0 || info->chunk.length < mixlength)
433                 mixlength = info->chunk.length;
434 
435             if (pa_memblock_is_silence(info->chunk.memblock)) {
436                 pa_memblock_unref(info->chunk.memblock);
437                 continue;
438             }
439 
440             info->userdata = pa_sink_input_ref(i);
441             pa_assert(info->chunk.memblock);
442             pa_assert(info->chunk.length > 0);
443 
444             info++;
445             n++;
446             maxInfo--;
447 
448             if (mixlength > 0) {
449                 *length = mixlength;
450             }
451         }
452     }
453     return n;
454 }
455 
SplitSinkRenderMix(pa_sink * s,size_t length,pa_mix_info * info,unsigned n,pa_memchunk * result)456 static void SplitSinkRenderMix(pa_sink *s, size_t length, pa_mix_info *info, unsigned n, pa_memchunk *result)
457 {
458     if (n == 0) {
459         *result = s->silence;
460         pa_memblock_ref(result->memblock);
461 
462         if (result->length > length)
463             result->length = length;
464     } else if (n == 1) {
465         pa_cvolume volume;
466 
467         *result = info[0].chunk;
468         pa_memblock_ref(result->memblock);
469 
470         if (result->length > length)
471             result->length = length;
472 
473         pa_sw_cvolume_multiply(&volume, &s->thread_info.soft_volume, &info[0].volume);
474 
475         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&volume)) {
476             pa_memblock_unref(result->memblock);
477             pa_silence_memchunk_get(
478                 &s->core->silence_cache, s->core->mempool, result, &s->sample_spec, result->length);
479         } else if (!pa_cvolume_is_norm(&volume)) {
480             pa_memchunk_make_writable(result, 0);
481             pa_volume_memchunk(result, &s->sample_spec, &volume);
482         }
483     } else {
484         void *ptr;
485         result->memblock = pa_memblock_new(s->core->mempool, length);
486 
487         ptr = pa_memblock_acquire(result->memblock);
488         result->length =
489             pa_mix(info, n, ptr, length, &s->sample_spec, &s->thread_info.soft_volume, s->thread_info.soft_muted);
490         pa_memblock_release(result->memblock);
491 
492         result->index = 0;
493     }
494 }
495 
SplitPaSinkRender(pa_sink * s,size_t length,pa_memchunk * result,char * streamType)496 static unsigned SplitPaSinkRender(pa_sink *s, size_t length, pa_memchunk *result, char *streamType)
497 {
498     AUTO_CTRACE("module_split_stream_sink::SplitPaSinkRender:len:%zu", length);
499     unsigned streamCount = 0;
500     pa_mix_info info[MAX_MIX_CHANNELS];
501     unsigned n;
502     size_t blockSizeMax;
503 
504     pa_sink_assert_ref(s);
505     pa_sink_assert_io_context(s);
506     pa_assert(PA_SINK_IS_LINKED(s->thread_info.state));
507     pa_assert(pa_frame_aligned(length, &s->sample_spec));
508     pa_assert(result);
509 
510     pa_assert(!s->thread_info.rewind_requested);
511     pa_assert(s->thread_info.rewind_nbytes == 0);
512 
513     if (s->thread_info.state == PA_SINK_SUSPENDED) {
514         result->memblock = pa_memblock_ref(s->silence.memblock);
515         result->index = s->silence.index;
516         result->length = PA_MIN(s->silence.length, length);
517         return 0;
518     }
519 
520     pa_sink_ref(s);
521 
522     AUDIO_DEBUG_LOG("module_split_stream_sink, splitSinkRender in  length = %{public}zu", length);
523     if (length == 0) {
524         length = pa_frame_align(MIX_BUFFER_LENGTH, &s->sample_spec);
525     }
526 
527     blockSizeMax = pa_mempool_block_size_max(s->core->mempool);
528     if (length > blockSizeMax)
529         length = pa_frame_align(blockSizeMax, &s->sample_spec);
530 
531     pa_assert(length > 0);
532 
533     n = SplitFillMixInfo(s, &length, info, MAX_MIX_CHANNELS, streamType);
534     streamCount = n;
535     SplitSinkRenderMix(s, length, info, n, result);
536 
537     SplitSinkRenderInputsDrop(s, info, n, result);
538 
539     pa_sink_unref(s);
540     return streamCount;
541 }
542 
SplitSinkRenderIntoMix(pa_sink * s,size_t length,pa_mix_info * info,unsigned n,pa_memchunk * target)543 static void SplitSinkRenderIntoMix(pa_sink *s, size_t length, pa_mix_info *info, unsigned n, pa_memchunk *target)
544 {
545     if (n == 0) {
546         if (target->length > length)
547             target->length = length;
548 
549         pa_silence_memchunk(target, &s->sample_spec);
550     } else if (n == 1) {
551         pa_cvolume volume;
552 
553         if (target->length > length)
554             target->length = length;
555 
556         pa_sw_cvolume_multiply(&volume, &s->thread_info.soft_volume, &info[0].volume);
557 
558         if (s->thread_info.soft_muted || pa_cvolume_is_muted(&volume)) {
559             pa_silence_memchunk(target, &s->sample_spec);
560         } else {
561             pa_memchunk vChunk;
562 
563             vChunk = info[0].chunk;
564             pa_memblock_ref(vChunk.memblock);
565 
566             if (vChunk.length > length)
567                 vChunk.length = length;
568 
569             if (!pa_cvolume_is_norm(&volume)) {
570                 pa_memchunk_make_writable(&vChunk, 0);
571                 pa_volume_memchunk(&vChunk, &s->sample_spec, &volume);
572             }
573 
574             pa_memchunk_memcpy(target, &vChunk);
575             pa_memblock_unref(vChunk.memblock);
576         }
577     } else {
578         void *ptr;
579 
580         ptr = pa_memblock_acquire(target->memblock);
581 
582         target->length = pa_mix(info, n,
583                                 (uint8_t*) ptr + target->index, length,
584                                 &s->sample_spec,
585                                 &s->thread_info.soft_volume,
586                                 s->thread_info.soft_muted);
587 
588         pa_memblock_release(target->memblock);
589     }
590 }
591 
SplitPaSinkRenderInto(pa_sink * s,pa_memchunk * target,char * streamType)592 static void  SplitPaSinkRenderInto(pa_sink *s, pa_memchunk *target, char *streamType)
593 {
594     pa_mix_info info[MAX_MIX_CHANNELS];
595     unsigned n;
596     size_t length, blockSizeMax;
597 
598     pa_sink_assert_ref(s);
599     pa_sink_assert_io_context(s);
600     pa_assert(PA_SINK_IS_LINKED(s->thread_info.state));
601     pa_assert(pa_frame_aligned(target->length, &s->sample_spec));
602 
603     pa_assert(!s->thread_info.rewind_requested);
604     pa_assert(s->thread_info.rewind_nbytes == 0);
605 
606     if (s->thread_info.state == PA_SINK_SUSPENDED) {
607         pa_silence_memchunk(target, &s->sample_spec);
608         return;
609     }
610 
611     pa_sink_ref(s);
612 
613     length = target->length;
614     blockSizeMax = pa_mempool_block_size_max(s->core->mempool);
615     if (length > blockSizeMax)
616         length = pa_frame_align(blockSizeMax, &s->sample_spec);
617 
618     pa_assert(length > 0);
619 
620     n = SplitFillMixInfo(s, &length, info, MAX_MIX_CHANNELS, streamType);
621     SplitSinkRenderIntoMix(s, length, info, n, target);
622 
623     SplitSinkRenderInputsDrop(s, info, n, target);
624 
625     pa_sink_unref(s);
626 }
627 
SplitPaSinkRenderIntoFull(pa_sink * s,pa_memchunk * target,char * streamType)628 static void SplitPaSinkRenderIntoFull(pa_sink *s, pa_memchunk *target, char *streamType)
629 {
630     pa_memchunk chunk;
631     size_t l, d;
632 
633     pa_sink_assert_ref(s);
634     pa_sink_assert_io_context(s);
635     pa_assert(target);
636     pa_assert(target->length > 0);
637     pa_assert(target->memblock);
638     pa_assert(pa_frame_aligned(target->length, &s->sample_spec));
639 
640     if (s->thread_info.state == PA_SINK_SUSPENDED) {
641         pa_silence_memchunk(target, &s->sample_spec);
642         return;
643     }
644 
645     pa_sink_ref(s);
646 
647     l = target->length;
648     d = 0;
649     while (l > 0) {
650         chunk = *target;
651         chunk.index += d;
652         chunk.length -=d;
653 
654         SplitPaSinkRenderInto(s, &chunk, streamType);
655 
656         d += chunk.length;
657         l -= chunk.length;
658     }
659 
660     pa_sink_unref(s);
661 }
662 
SplitPaSinkRenderFull(pa_sink * s,size_t length,pa_memchunk * result,char * streamType)663 static unsigned SplitPaSinkRenderFull(pa_sink *s, size_t length, pa_memchunk *result, char *streamType)
664 {
665     unsigned nSink;
666     pa_sink_assert_ref(s);
667     pa_sink_assert_io_context(s);
668     pa_assert(PA_SINK_IS_LINKED(s->thread_info.state));
669     pa_assert(length > 0);
670     pa_assert(pa_frame_aligned(length, &s->sample_spec));
671     pa_assert(result);
672 
673     pa_assert(!s->thread_info.rewind_requested);
674     pa_assert(s->thread_info.rewind_nbytes == 0);
675 
676     if (s->thread_info.state == PA_SINK_SUSPENDED) {
677         result->memblock = pa_memblock_ref(s->silence.memblock);
678         result->index = s->silence.index;
679         result->length = PA_MIN(s->silence.length, length);
680         return 0;
681     }
682 
683     pa_sink_ref(s);
684 
685     AUDIO_DEBUG_LOG("module_split_stream_sink, splitSinkRender in  length = %{public}zu", length);
686     nSink = SplitPaSinkRender(s, length, result, streamType);
687     if (nSink == 0) {
688         return nSink;
689     }
690 
691     if (result->length < length) {
692         pa_memchunk chunk;
693 
694         pa_memchunk_make_writable(result, length);
695 
696         chunk.memblock = result->memblock;
697         chunk.index = result->index + result->length;
698         chunk.length = length - result->length;
699 
700         SplitPaSinkRenderIntoFull(s, &chunk, streamType);
701 
702         result->length = length;
703     }
704 
705     pa_sink_unref(s);
706     return nSink;
707 }
708 
ProcessRender(struct userdata * u,pa_usec_t now)709 static void ProcessRender(struct userdata *u, pa_usec_t now)
710 {
711     AUTO_CTRACE("module_split_stream_sink: ProcessRender");
712 
713     pa_assert(u);
714 
715     /* Fill the buffer up the latency size */
716     for (int i = 0; i < g_splitNums; i++) {
717         AUTO_CTRACE("module_split_stream_sink::ProcessRender:streamType:%s", g_splitArr[i]);
718         AUDIO_DEBUG_LOG("module_split_stream_sink: ProcessRender:streamType:%{public}s", g_splitArr[i]);
719 
720         pa_memchunk chunk;
721         unsigned chunkIsNull = 0;
722         chunkIsNull = SplitPaSinkRenderFull(u->sink, u->sink->thread_info.max_request, &chunk, g_splitArr[i]);
723         if (chunkIsNull == 0) {
724             continue;
725         }
726 
727         // start hdi
728         StartSplitStreamHdiIfRunning(u);
729 
730         AUDIO_DEBUG_LOG("module_split_stream_sink: ProcessRender send msg, chunk length = %{public}zu", chunk.length);
731         // send msg post data
732         if (!strcmp(g_splitArr[i], STREAM_TYPE_NAVIGATION)) {
733             pa_asyncmsgq_post(u->dq, NULL, HDI_RENDER_NAVIGATION, NULL, 0, &chunk, NULL);
734         } else if (!strcmp(g_splitArr[i], STREAM_TYPE_COMMUNICATION)) {
735             pa_asyncmsgq_post(u->dq, NULL, HDI_RENDER_COMMUNICATION, NULL, 0, &chunk, NULL);
736         } else {
737             pa_asyncmsgq_post(u->dq, NULL, HDI_RENDER_MEDIA, NULL, 0, &chunk, NULL);
738         }
739     }
740     u->timestamp += pa_bytes_to_usec(u->sink->thread_info.max_request, &u->sink->sample_spec);
741 }
742 
ThreadFunc(void * userdata)743 static void ThreadFunc(void *userdata)
744 {
745     ScheduleReportData(getpid(), gettid(), "audio_server");
746     struct userdata *u = userdata;
747     pa_assert(u);
748     AUDIO_DEBUG_LOG("Thread starting up");
749     if (u->core->realtime_scheduling) {
750         pa_thread_make_realtime(u->core->realtime_priority);
751     }
752     pa_thread_mq_install(&u->thread_mq);
753     u->timestamp = pa_rtclock_now();
754     for (;;) {
755         pa_usec_t now = 0;
756         int ret;
757 
758         if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
759             now = pa_rtclock_now();
760         }
761         if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) {
762             ProcessRewind(u, now);
763         }
764         /* Render some data and drop it immediately */
765         if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
766             if (u->timestamp <= now) {
767                 ProcessRender(u, now);
768             }
769             pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
770         } else {
771             pa_rtpoll_set_timer_disabled(u->rtpoll);
772         }
773         /* Hmm, nothing to do. Let's sleep */
774         if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) {
775             goto fail;
776         }
777         if (ret == 0) {
778             goto finish;
779         }
780     }
781 
782 fail:
783     /* If this was no regular exit from the loop we have to continue
784      * processing messages until we received PA_MESSAGE_SHUTDOWN */
785     pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core),
786         PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
787     pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
788 
789 finish:
790     AUDIO_DEBUG_LOG("Thread shutting down");
791 }
792 
ProcessSplitHdiRender(struct userdata * u,pa_memchunk * chunk,char * streamType)793 static void ProcessSplitHdiRender(struct userdata *u, pa_memchunk *chunk, char *streamType)
794 {
795     pa_usec_t now = pa_rtclock_now();
796     if (!u->isHDISinkStarted && now - u->timestampLastLog > USEC_PER_SEC) {
797         u->timestampLastLog = now;
798         const char *deviceClass = GetDeviceClass(u->sinkAdapter->deviceClass);
799         AUDIO_DEBUG_LOG("HDI not started, skip RenderWrite, wait sink[%s] suspend", deviceClass);
800         pa_memblock_unref(chunk->memblock);
801     } else if (!u->isHDISinkStarted) {
802         pa_memblock_unref(chunk->memblock);
803     } else if (SplitRenderWrite(u->sinkAdapter, chunk, streamType) < 0) {
804         u->bytesDropped += chunk->length;
805         AUDIO_ERR_LOG("RenderWrite failed");
806     }
807     if (pa_atomic_load(&u->dflag) == 1) {
808         pa_atomic_sub(&u->dflag, 1);
809     }
810     u->writeTime = pa_rtclock_now() - now;
811 }
812 
ThreadFuncWriteHDI(void * userdata)813 static void ThreadFuncWriteHDI(void *userdata)
814 {
815     // set audio thread priority
816     ScheduleReportData(getpid(), gettid(), "pulseaudio");
817 
818     struct userdata *u = userdata;
819     pa_assert(u);
820 
821     int32_t quit = 0;
822 
823     do {
824         int32_t code = 0;
825         pa_memchunk chunk;
826 
827         pa_assert_se(pa_asyncmsgq_get(u->dq, NULL, &code, NULL, NULL, &chunk, 1) == 0);
828 
829         switch (code) {
830             case HDI_RENDER_MEDIA: {
831                 ProcessSplitHdiRender(u, &chunk, STREAM_TYPE_MEDIA);
832                 break;
833             }
834             case HDI_RENDER_COMMUNICATION: {
835                 ProcessSplitHdiRender(u, &chunk, STREAM_TYPE_COMMUNICATION);
836                 break;
837             }
838             case HDI_RENDER_NAVIGATION: {
839                 ProcessSplitHdiRender(u, &chunk, STREAM_TYPE_NAVIGATION);
840                 break;
841             }
842             case QUIT:
843                 quit = 1;
844                 break;
845             default:
846                 break;
847         }
848         pa_asyncmsgq_done(u->dq, 0);
849     } while (!quit);
850 }
851 
SplitRenderWrite(struct RendererSinkAdapter * sinkAdapter,pa_memchunk * pchunk,char * streamType)852 static ssize_t SplitRenderWrite(struct RendererSinkAdapter *sinkAdapter, pa_memchunk *pchunk, char *streamType)
853 {
854     size_t index;
855     size_t length;
856     ssize_t count = 0;
857     void *p = NULL;
858 
859     pa_assert(pchunk);
860 
861     index = pchunk->index;
862     length = pchunk->length;
863     p = pa_memblock_acquire(pchunk->memblock);
864     pa_assert(p);
865 
866     while (true) {
867         uint64_t writeLen = 0;
868 
869         int32_t ret = sinkAdapter->RendererSplitRenderFrame(sinkAdapter, ((char*)p + index),
870             (uint64_t)length, &writeLen, streamType);
871         if (writeLen > length) {
872             AUDIO_ERR_LOG("Error writeLen > actual bytes. Length: %zu, Written: %" PRIu64 " bytes, %d ret",
873                          length, writeLen, ret);
874             count = -1 - count;
875             break;
876         }
877         if (writeLen == 0) {
878             AUDIO_ERR_LOG("Failed to render Length: %{public}zu, Written: %{public}" PRIu64 " bytes, %{public}d ret",
879                 length, writeLen, ret);
880             count = -1 - count;
881             break;
882         } else {
883             count += (ssize_t)writeLen;
884             index += writeLen;
885             length -= writeLen;
886             if (length == 0) {
887                 break;
888             }
889         }
890     }
891     pa_memblock_release(pchunk->memblock);
892     pa_memblock_unref(pchunk->memblock);
893 
894     return count;
895 }
896 
InitFailed(pa_module * m,pa_modargs * ma)897 static int InitFailed(pa_module *m, pa_modargs *ma)
898 {
899     AUDIO_ERR_LOG("Split Stream Sink Init Failed");
900     if (ma)
901         pa_modargs_free(ma);
902 
903     pa__done(m);
904 
905     return PA_ERR;
906 }
907 
CreateSink(pa_module * m,pa_modargs * ma,struct userdata * u)908 static int CreateSink(pa_module *m, pa_modargs *ma, struct userdata *u)
909 {
910     pa_sample_spec ss;
911     pa_channel_map map;
912     pa_sink_new_data data;
913     pa_format_info *format;
914 
915     pa_assert(m);
916 
917     ss = m->core->default_sample_spec;
918     map = m->core->default_channel_map;
919     if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
920         AUDIO_ERR_LOG("Invalid sample format specification or channel map");
921         return PA_ERR;
922     }
923 
924     pa_sink_new_data_init(&data);
925     data.driver = __FILE__;
926     data.module = m;
927     pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
928     pa_sink_new_data_set_sample_spec(&data, &ss);
929     pa_sink_new_data_set_channel_map(&data, &map);
930     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, _("Split Stream Output"));
931     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "renderer");
932     pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, "splitStream");
933 
934     u->formats = pa_idxset_new(NULL, NULL);
935     format = pa_format_info_new();
936     format->encoding = PA_ENCODING_PCM;
937     pa_idxset_put(u->formats, format, NULL);
938 
939     if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
940         AUDIO_ERR_LOG("Invalid properties");
941         pa_sink_new_data_done(&data);
942         return PA_ERR;
943     }
944 
945     u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY);
946     pa_sink_new_data_done(&data);
947 
948     if (!u->sink) {
949         AUDIO_ERR_LOG("Failed to create sink.");
950         return PA_ERR;
951     }
952 
953     u->sink->parent.process_msg = SinkProcessMsg;
954     u->sink->set_state_in_io_thread = SinkSetStateInIoThreadCb;
955     u->sink->update_requested_latency = SinkUpdateRequestedLatencyCb;
956     u->sink->reconfigure = SinkReconfigureCb;
957     u->sink->get_formats = SinkGetFormatsCb;
958     u->sink->set_formats = SinkSetFormatsCb;
959     u->sink->userdata = u;
960 
961     return 0;
962 }
963 
InitRemoteSink(struct userdata * u,const char * filePath)964 static int32_t InitRemoteSink(struct userdata *u, const char *filePath)
965 {
966     SinkAttr sample_attrs;
967     int32_t ret;
968 
969     sample_attrs.format = ConvertPaToHdiAdapterFormat(u->ss.format);
970     sample_attrs.adapterName = u->adapterName;
971     sample_attrs.openMicSpeaker = u->open_mic_speaker;
972     sample_attrs.sampleRate = (uint32_t) u->ss.rate;
973     sample_attrs.channel = u->ss.channels;
974     sample_attrs.volume = MAX_SINK_VOLUME_LEVEL;
975     sample_attrs.filePath = filePath;
976     sample_attrs.deviceNetworkId = u->deviceNetworkId;
977     sample_attrs.deviceType =  u->deviceType;
978     sample_attrs.aux =  SPLIT_MODE;
979 
980     ret = u->sinkAdapter->RendererSinkInit(u->sinkAdapter, &sample_attrs);
981     if (ret != 0) {
982         AUDIO_ERR_LOG("audiorenderer Init failed!");
983         u->sinkAdapter->RendererSinkDeInit(u->sinkAdapter);
984         return -1;
985     }
986 
987     return 0;
988 }
989 
PaHdiSinkNewInit(pa_module * m,pa_modargs * ma,struct userdata * u)990 static int32_t PaHdiSinkNewInit(pa_module *m, pa_modargs *ma, struct userdata *u)
991 {
992     size_t nbytes;
993     int mg;
994     pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
995     pa_sink_set_rtpoll(u->sink, u->rtpoll);
996 
997     u->buffer_size = DEFAULT_BUFFER_SIZE;
998 
999     mg = pa_modargs_get_value_u32(ma, "buffer_size", &u->buffer_size);
1000     CHECK_AND_RETURN_RET_LOG(mg >= 0, InitFailed(m, ma),
1001         "Failed to parse buffer_size arg in capturer sink");
1002 
1003     u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec);
1004 
1005     nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
1006     pa_sink_set_max_rewind(u->sink, nbytes);
1007     pa_sink_set_max_request(u->sink, u->buffer_size);
1008 
1009     if (u->fixed_latency) {
1010         pa_sink_set_fixed_latency(u->sink, u->block_usec);
1011     } else {
1012         pa_sink_set_latency_range(u->sink, 0, u->block_usec);
1013     }
1014 
1015     int32_t ret = LoadSinkAdapter(pa_modargs_get_value(ma, "device_class", DEFAULT_DEVICE_CLASS),
1016         pa_modargs_get_value(ma, "network_id", DEFAULT_DEVICE_NETWORKID), &u->sinkAdapter);
1017     if (ret) {
1018         AUDIO_ERR_LOG("Load adapter failed");
1019         return -1;
1020     }
1021 
1022     if (pa_modargs_get_value_s32(ma, "device_type", &u->deviceType) < 0) {
1023         AUDIO_ERR_LOG("Failed to parse deviceType argument.");
1024         return -1;
1025     }
1026 
1027     u->adapterName = pa_modargs_get_value(ma, "adapter_name", DEFAULT_DEVICE_CLASS);
1028     u->deviceNetworkId = pa_modargs_get_value(ma, "network_id", DEFAULT_DEVICE_NETWORKID);
1029 
1030     u->ss = m->core->default_sample_spec;
1031     u->map = m->core->default_channel_map;
1032     if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->ss, &u->map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1033         AUDIO_ERR_LOG("Failed to parse sample specification and channel map");
1034         return -1;
1035     }
1036 
1037     if (InitRemoteSink(u, pa_modargs_get_value(ma, "file_path", "")) < 0) {
1038         AUDIO_ERR_LOG("Failed to init remote audio render sink.");
1039         return -1;
1040     }
1041 
1042     return 0;
1043 }
1044 
pa__init(pa_module * m)1045 int pa__init(pa_module *m)
1046 {
1047     AUDIO_INFO_LOG("module_split_stream_sink pa__init start");
1048     struct userdata *u = NULL;
1049     pa_modargs *ma = NULL;
1050     int mq;
1051 
1052     pa_assert(m);
1053 
1054     ma = pa_modargs_new(m->argument, VALID_MODARGS);
1055     CHECK_AND_RETURN_RET_LOG(ma != NULL, InitFailed(m, ma), "Failed to parse module arguments:%{public}s", m->argument);
1056 
1057     SPLIT_MODE = pa_modargs_get_value(ma, "split_mode", "1");
1058     AUDIO_INFO_LOG("module_split_stream_sink pa__init splitMode is %{public}s", SPLIT_MODE);
1059     ConvertToSplitArr(SPLIT_MODE);
1060 
1061     m->userdata = u = pa_xnew0(struct userdata, 1);
1062     u->core = m->core;
1063     u->module = m;
1064     u->rtpoll = pa_rtpoll_new();
1065 
1066     mq = pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1067     CHECK_AND_RETURN_RET_LOG(mq >=0, InitFailed(m, ma), "pa_thread_mq_init() failed.");
1068 
1069     if (CreateSink(m, ma, u) != 0) {
1070         return InitFailed(m, ma);
1071     }
1072 
1073     if (PaHdiSinkNewInit(m, ma, u) < 0) {
1074         AUDIO_ERR_LOG("PaHdiSinkNewInit failed");
1075         return -1;
1076     }
1077 
1078     u->dq = pa_asyncmsgq_new(0);
1079 
1080     if (!(u->thread = pa_thread_new("OS_SplitStream", ThreadFunc, u))) {
1081         AUDIO_ERR_LOG("Failed to create thread.");
1082         return InitFailed(m, ma);
1083     }
1084 
1085     if (!(u->thread_split_hdi = pa_thread_new("OS_splitToHdi", ThreadFuncWriteHDI, u))) {
1086         AUDIO_ERR_LOG("Failed to create OS_splitToHdi.");
1087         return InitFailed(m, ma);
1088     }
1089 
1090     pa_sink_put(u->sink);
1091 
1092     pa_modargs_free(ma);
1093 
1094     return 0;
1095 }
1096 
pa__get_n_used(pa_module * m)1097 int pa__get_n_used(pa_module *m)
1098 {
1099     struct userdata *u;
1100 
1101     pa_assert(m);
1102     pa_assert_se(u = m->userdata);
1103 
1104     return pa_sink_linked_by(u->sink);
1105 }
1106 
pa__done(pa_module * m)1107 void pa__done(pa_module*m)
1108 {
1109     struct userdata *u;
1110 
1111     pa_assert(m);
1112 
1113     if (!(u = m->userdata)) {
1114         return;
1115     }
1116 
1117     if (u->sink) {
1118         pa_sink_unlink(u->sink);
1119     }
1120 
1121     if (u->thread) {
1122         pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1123         pa_thread_free(u->thread);
1124     }
1125 
1126     pa_thread_mq_done(&u->thread_mq);
1127 
1128     if (u->sink) {
1129         pa_sink_unref(u->sink);
1130     }
1131 
1132     if (u->rtpoll) {
1133         pa_rtpoll_free(u->rtpoll);
1134     }
1135 
1136     if (u->formats) {
1137         pa_idxset_free(u->formats, (pa_free_cb_t)pa_format_info_free);
1138     }
1139 
1140     pa_xfree(u);
1141 }
1142