1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifdef HAVE_CONFIG_H
17 #include <config.h>
18 #endif
19
20 #undef LOG_TAG
21 #define LOG_TAG "ModuleSplitStreamSink"
22
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <errno.h>
26 #include <unistd.h>
27 #include <string.h>
28 #include "securec.h"
29
30 #include <pulse/rtclock.h>
31 #include <pulse/timeval.h>
32 #include <pulse/util.h>
33 #include <pulse/xmalloc.h>
34
35 #include <pulsecore/i18n.h>
36 #include <pulsecore/macro.h>
37 #include <pulsecore/sink.h>
38 #include <pulsecore/module.h>
39 #include <pulsecore/core-util.h>
40 #include <pulsecore/modargs.h>
41 #include <pulsecore/log.h>
42 #include <pulsecore/thread.h>
43 #include <pulsecore/thread-mq.h>
44 #include <pulsecore/rtpoll.h>
45 #include <pulsecore/mix.h>
46 #include <pulsecore/memblockq.h>
47 #include <pulsecore/memblock.h>
48
49 #include <pthread.h>
50 #include "audio_log.h"
51 #include "audio_schedule.h"
52 #include "audio_utils_c.h"
53 #include "audio_hdiadapter_info.h"
54 #include "renderer_sink_adapter.h"
55
56 #define DEFAULT_SINK_NAME "hdi_output"
57 #define DEFAULT_DEVICE_CLASS "primary"
58 #define DEFAULT_DEVICE_NETWORKID "LocalDevice"
59 #define DEFAULT_BUFFER_SIZE 8192
60 #define MAX_SINK_VOLUME_LEVEL 1.0
61 #define MIX_BUFFER_LENGTH (pa_page_size())
62 #define MAX_REWIND (7000 * PA_USEC_PER_MSEC)
63 #define USEC_PER_SEC 1000000
64 #define SCENE_TYPE_NUM 7
65 #define PA_ERR (-1)
66 #define MAX_PARTS 10
67
68 #define STREAM_TYPE_MEDIA "1"
69 #define STREAM_TYPE_COMMUNICATION "2"
70 #define STREAM_TYPE_NAVIGATION "13"
71
72 char *g_splitArr[MAX_PARTS];
73 int g_splitNums = 0;
74 const char *SPLIT_MODE;
75 const uint32_t SPLIT_ONE_STREAM = 1;
76 const uint32_t SPLIT_TWO_STREAM = 2;
77 const uint32_t SPLIT_THREE_STREAM = 3;
78
79 PA_MODULE_AUTHOR("OpenHarmony");
80 PA_MODULE_DESCRIPTION(_("Split Stream Sink"));
81 PA_MODULE_VERSION(PACKAGE_VERSION);
82 PA_MODULE_LOAD_ONCE(false);
83 PA_MODULE_USAGE(
84 "sink_name=<name of sink> "
85 "sink_properties=<properties for the sink> "
86 "format=<sample format> "
87 "rate=<sample rate> "
88 "channels=<number of channels> "
89 "channel_map=<channel map>"
90 "buffer_size=<custom buffer size>"
91 "formats=<semi-colon separated sink formats>");
92
93 static ssize_t SplitRenderWrite(struct RendererSinkAdapter *sinkAdapter, pa_memchunk *pchunk, char *streamType);
94
95 struct userdata {
96 pa_core *core;
97 pa_module *module;
98 pa_sink *sink;
99 pa_thread *thread;
100 pa_thread_mq thread_mq;
101 pa_rtpoll *rtpoll;
102 uint32_t buffer_size;
103 pa_usec_t block_usec;
104 pa_usec_t timestamp;
105 pa_idxset *formats;
106 pa_thread *thread_split_hdi;
107 bool isHDISinkStarted;
108 struct RendererSinkAdapter *sinkAdapter;
109 pa_asyncmsgq *dq;
110 pa_atomic_t dflag;
111 pa_usec_t writeTime;
112 pa_usec_t prewrite;
113 pa_sink_state_t previousState;
114 pa_usec_t timestampLastLog;
115 const char *deviceNetworkId;
116 const char *adapterName;
117 uint32_t open_mic_speaker;
118 pa_sample_spec ss;
119 pa_channel_map map;
120 int32_t deviceType;
121 size_t bytesDropped;
122 uint32_t writeCount;
123 uint32_t renderCount;
124 uint32_t fixed_latency;
125 pa_usec_t lastProcessDataTime;
126 uint32_t renderInIdleState;
127 };
128
129 static const char * const VALID_MODARGS[] = {
130 "sink_name",
131 "device_class",
132 "sink_properties",
133 "format",
134 "rate",
135 "channels",
136 "channel_map",
137 "buffer_size",
138 "file_path",
139 "adapter_name",
140 "fixed_latency",
141 "sink_latency",
142 "render_in_idle_state",
143 "open_mic_speaker",
144 "test_mode_on",
145 "network_id",
146 "device_type",
147 "offload_enable",
148 "split_mode",
149 NULL
150 };
151
152 char *const SCENE_TYPE_SET[SCENE_TYPE_NUM] = {"SCENE_MUSIC", "SCENE_GAME", "SCENE_MOVIE", "SCENE_SPEECH", "SCENE_RING",
153 "SCENE_OTHERS", "EFFECT_NONE"};
154
155 enum {
156 HDI_INIT,
157 HDI_DEINIT,
158 HDI_START,
159 HDI_STOP,
160 HDI_RENDER,
161 QUIT,
162 HDI_RENDER_MEDIA,
163 HDI_RENDER_NAVIGATION,
164 HDI_RENDER_COMMUNICATION
165 };
166
ConvertPaToHdiAdapterFormat(pa_sample_format_t format)167 static enum HdiAdapterFormat ConvertPaToHdiAdapterFormat(pa_sample_format_t format)
168 {
169 enum HdiAdapterFormat adapterFormat;
170 switch (format) {
171 case PA_SAMPLE_U8:
172 adapterFormat = SAMPLE_U8;
173 break;
174 case PA_SAMPLE_S16LE:
175 adapterFormat = SAMPLE_S16;
176 break;
177 case PA_SAMPLE_S24LE:
178 adapterFormat = SAMPLE_S24;
179 break;
180 case PA_SAMPLE_S32LE:
181 adapterFormat = SAMPLE_S32;
182 break;
183 default:
184 adapterFormat = INVALID_WIDTH;
185 break;
186 }
187
188 return adapterFormat;
189 }
190
ConvertToSplitArr(const char * str)191 static void ConvertToSplitArr(const char *str)
192 {
193 for (int i = 0; i < MAX_PARTS; ++i) {
194 g_splitArr[i] = NULL;
195 }
196
197 char *token;
198 char *copy = strdup(str);
199 int count = 0;
200
201 token = strtok(copy, ":");
202 while (token != NULL && count < MAX_PARTS) {
203 g_splitArr[count] = (char *)malloc(strlen(token) + 1);
204 if (g_splitArr[count] != NULL) {
205 if (strcpy_s(g_splitArr[count], strlen(token) + 1, token) != 0) {
206 AUDIO_ERR_LOG("strcpy_s failed.");
207 };
208 count++;
209 } else {
210 AUDIO_ERR_LOG("Memory allocation failed.\n");
211 break;
212 }
213 token = strtok(NULL, ":");
214 }
215 g_splitNums = count;
216 free(copy);
217 }
218
SinkProcessMsg(pa_msgobject * o,int code,void * data,int64_t offset,pa_memchunk * chunk)219 static int SinkProcessMsg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk)
220 {
221 switch (code) {
222 case PA_SINK_MESSAGE_GET_LATENCY:
223 *((int64_t*) data) = 0;
224 return 0;
225 default:
226 break;
227 }
228
229 return pa_sink_process_msg(o, code, data, offset, chunk);
230 }
231
232 /* Called from the IO thread. */
SinkSetStateInIoThreadCb(pa_sink * s,pa_sink_state_t new_state,pa_suspend_cause_t new_suspend_cause)233 static int SinkSetStateInIoThreadCb(pa_sink *s, pa_sink_state_t new_state, pa_suspend_cause_t new_suspend_cause)
234 {
235 struct userdata *u;
236
237 pa_assert(s);
238 pa_assert_se(u = s->userdata);
239
240 if (s->thread_info.state == PA_SINK_SUSPENDED || s->thread_info.state == PA_SINK_INIT) {
241 if (PA_SINK_IS_OPENED(new_state)) {
242 u->timestamp = pa_rtclock_now();
243 }
244 }
245
246 return 0;
247 }
248
SinkUpdateRequestedLatencyCb(pa_sink * s)249 static void SinkUpdateRequestedLatencyCb(pa_sink *s)
250 {
251 struct userdata *u;
252 size_t nbytes;
253
254 pa_sink_assert_ref(s);
255 pa_assert_se(u = s->userdata);
256
257 u->block_usec = pa_sink_get_requested_latency_within_thread(s);
258
259 if (u->block_usec == (pa_usec_t) -1) {
260 u->block_usec = s->thread_info.max_latency;
261 }
262
263 nbytes = pa_usec_to_bytes(u->block_usec, &s->sample_spec);
264 pa_sink_set_max_rewind_within_thread(s, nbytes);
265 pa_sink_set_max_request_within_thread(s, nbytes);
266 }
267
SinkReconfigureCb(pa_sink * s,pa_sample_spec * spec,bool passthrough)268 static void SinkReconfigureCb(pa_sink *s, pa_sample_spec *spec, bool passthrough)
269 {
270 s->sample_spec = *spec;
271 }
272
SinkSetFormatsCb(pa_sink * s,pa_idxset * formats)273 static bool SinkSetFormatsCb(pa_sink *s, pa_idxset *formats)
274 {
275 struct userdata *u = s->userdata;
276
277 pa_assert(u);
278
279 pa_idxset_free(u->formats, (pa_free_cb_t) pa_format_info_free);
280 u->formats = pa_idxset_copy(formats, (pa_copy_func_t) pa_format_info_copy);
281
282 return true;
283 }
284
SinkGetFormatsCb(pa_sink * s)285 static pa_idxset* SinkGetFormatsCb(pa_sink *s)
286 {
287 struct userdata *u = s->userdata;
288
289 pa_assert(u);
290
291 return pa_idxset_copy(u->formats, (pa_copy_func_t) pa_format_info_copy);
292 }
293
ProcessRewind(struct userdata * u,pa_usec_t now)294 static void ProcessRewind(struct userdata *u, pa_usec_t now)
295 {
296 size_t rewindNbytes;
297 size_t inBuffer;
298 pa_usec_t delay;
299
300 pa_assert(u);
301
302 rewindNbytes = u->sink->thread_info.rewind_nbytes;
303 if (!PA_SINK_IS_OPENED(u->sink->thread_info.state) || rewindNbytes == 0) {
304 goto do_nothing;
305 }
306 AUDIO_DEBUG_LOG("Requested to rewind %lu bytes.", (unsigned long) rewindNbytes);
307
308 if (u->timestamp <= now) {
309 goto do_nothing;
310 }
311
312 delay = u->timestamp - now;
313 inBuffer = pa_usec_to_bytes(delay, &u->sink->sample_spec);
314 if (inBuffer == 0) {
315 goto do_nothing;
316 }
317
318 if (rewindNbytes > inBuffer) {
319 rewindNbytes = inBuffer;
320 }
321
322 pa_sink_process_rewind(u->sink, rewindNbytes);
323 u->timestamp -= pa_bytes_to_usec(rewindNbytes, &u->sink->sample_spec);
324
325 AUDIO_DEBUG_LOG("Rewound %lu bytes.", (unsigned long) rewindNbytes);
326 return;
327
328 do_nothing:
329 pa_sink_process_rewind(u->sink, 0);
330 }
331
StartSplitStreamHdiIfRunning(struct userdata * u)332 static void StartSplitStreamHdiIfRunning(struct userdata *u)
333 {
334 AUTO_CTRACE("split_stream_sink::StartPrimaryHdiIfRunning");
335 if (u->isHDISinkStarted) {
336 return;
337 }
338
339 if (u->sinkAdapter->RendererSinkStart(u->sinkAdapter)) {
340 AUDIO_ERR_LOG("split_stream_sink,audiorenderer control start failed!");
341 u->sinkAdapter->RendererSinkDeInit(u->sinkAdapter);
342 } else {
343 u->isHDISinkStarted = true;
344 u->writeCount = 0;
345 u->renderCount = 0;
346 AUDIO_INFO_LOG("StartPrimaryHdiIfRunning, Successfully restarted HDI renderer");
347 u->renderInIdleState = 1;
348 }
349 }
350
SplitSinkRenderInputsDrop(pa_sink * si,pa_mix_info * infoIn,unsigned n,pa_memchunk * chunkIn)351 static void SplitSinkRenderInputsDrop(pa_sink *si, pa_mix_info *infoIn, unsigned n, pa_memchunk *chunkIn)
352 {
353 AUTO_CTRACE("split_stream_sink::SplitSinkRenderInputsDrop:%u:len:%zu", n, chunkIn->length);
354 pa_sink_assert_ref(si);
355 pa_sink_assert_io_context(si);
356 pa_assert(chunkIn);
357 pa_assert(chunkIn->memblock);
358 pa_assert(chunkIn->length > 0);
359
360 /* We optimize for the case where the order of the inputs has not changed */
361
362 pa_mix_info *infoCur = NULL;
363 pa_sink_input *sinkInput = NULL;
364 for (uint32_t k = 0; k < n; k++) {
365 sinkInput = infoIn[k].userdata;
366 pa_sink_input_assert_ref(sinkInput);
367 AUTO_CTRACE("hdi_sink::InnerCap:pa_sink_input_drop:%u:len:%zu", sinkInput->index, chunkIn->length);
368 pa_sink_input_drop(sinkInput, chunkIn->length);
369
370 infoCur = infoIn + k;
371 if (infoCur) {
372 if (infoCur->chunk.memblock) {
373 pa_memblock_unref(infoCur->chunk.memblock);
374 pa_memchunk_reset(&infoCur->chunk);
375 }
376
377 pa_sink_input_unref(infoCur->userdata);
378 }
379 }
380 }
381
IsPeekCurrentSinkInput(char * streamType,const char * usageStr)382 static int IsPeekCurrentSinkInput(char *streamType, const char *usageStr)
383 {
384 int flag = 0;
385 if (g_splitNums == SPLIT_ONE_STREAM) {
386 flag = 1;
387 }
388
389 if (g_splitNums == SPLIT_TWO_STREAM) {
390 if (strcmp(usageStr, STREAM_TYPE_NAVIGATION) && !strcmp(streamType, STREAM_TYPE_MEDIA)) {
391 flag = 1;
392 } else if (!strcmp(usageStr, STREAM_TYPE_NAVIGATION) && !strcmp(streamType, STREAM_TYPE_NAVIGATION)) {
393 flag = 1;
394 }
395 }
396
397 if (g_splitNums == SPLIT_THREE_STREAM) {
398 if (strcmp(usageStr, STREAM_TYPE_NAVIGATION) && strcmp(usageStr, STREAM_TYPE_COMMUNICATION) &&
399 !strcmp(streamType, STREAM_TYPE_MEDIA)) {
400 flag = 1;
401 } else if (!strcmp(usageStr, STREAM_TYPE_NAVIGATION) && !strcmp(streamType, STREAM_TYPE_NAVIGATION)) {
402 flag = 1;
403 } else if (!strcmp(usageStr, STREAM_TYPE_COMMUNICATION) && !strcmp(streamType, STREAM_TYPE_COMMUNICATION)) {
404 flag = 1;
405 }
406 }
407
408 return flag;
409 }
410
SplitFillMixInfo(pa_sink * s,size_t * length,pa_mix_info * info,unsigned maxInfo,char * streamType)411 static unsigned SplitFillMixInfo(pa_sink *s, size_t *length, pa_mix_info *info, unsigned maxInfo, char *streamType)
412 {
413 AUTO_CTRACE("split_stream_sink::SplitFillMixInfo:len:%zu", *length);
414 pa_sink_input *i;
415 unsigned n = 0;
416 void *state = NULL;
417 size_t mixlength = *length;
418
419 pa_sink_assert_ref(s);
420 pa_sink_assert_io_context(s);
421 pa_assert(info);
422
423 while ((i = pa_hashmap_iterate(s->thread_info.inputs, &state, NULL)) && maxInfo > 0) {
424 const char *usageStr = pa_proplist_gets(i->proplist, "stream.usage");
425 AUDIO_DEBUG_LOG("splitFillMixInfo usageStr = %{public}s, streamType = %{public}s", usageStr, streamType);
426 if (IsPeekCurrentSinkInput(streamType, usageStr)) {
427 pa_sink_input_assert_ref(i);
428
429 AUTO_CTRACE("module_split_stream_sink::splitFillMixInfo::pa_sink_input_peek:%u len:%zu", i->index, *length);
430 pa_sink_input_peek(i, *length, &info->chunk, &info->volume);
431
432 if (mixlength == 0 || info->chunk.length < mixlength)
433 mixlength = info->chunk.length;
434
435 if (pa_memblock_is_silence(info->chunk.memblock)) {
436 pa_memblock_unref(info->chunk.memblock);
437 continue;
438 }
439
440 info->userdata = pa_sink_input_ref(i);
441 pa_assert(info->chunk.memblock);
442 pa_assert(info->chunk.length > 0);
443
444 info++;
445 n++;
446 maxInfo--;
447
448 if (mixlength > 0) {
449 *length = mixlength;
450 }
451 }
452 }
453 return n;
454 }
455
SplitSinkRenderMix(pa_sink * s,size_t length,pa_mix_info * info,unsigned n,pa_memchunk * result)456 static void SplitSinkRenderMix(pa_sink *s, size_t length, pa_mix_info *info, unsigned n, pa_memchunk *result)
457 {
458 if (n == 0) {
459 *result = s->silence;
460 pa_memblock_ref(result->memblock);
461
462 if (result->length > length)
463 result->length = length;
464 } else if (n == 1) {
465 pa_cvolume volume;
466
467 *result = info[0].chunk;
468 pa_memblock_ref(result->memblock);
469
470 if (result->length > length)
471 result->length = length;
472
473 pa_sw_cvolume_multiply(&volume, &s->thread_info.soft_volume, &info[0].volume);
474
475 if (s->thread_info.soft_muted || pa_cvolume_is_muted(&volume)) {
476 pa_memblock_unref(result->memblock);
477 pa_silence_memchunk_get(
478 &s->core->silence_cache, s->core->mempool, result, &s->sample_spec, result->length);
479 } else if (!pa_cvolume_is_norm(&volume)) {
480 pa_memchunk_make_writable(result, 0);
481 pa_volume_memchunk(result, &s->sample_spec, &volume);
482 }
483 } else {
484 void *ptr;
485 result->memblock = pa_memblock_new(s->core->mempool, length);
486
487 ptr = pa_memblock_acquire(result->memblock);
488 result->length =
489 pa_mix(info, n, ptr, length, &s->sample_spec, &s->thread_info.soft_volume, s->thread_info.soft_muted);
490 pa_memblock_release(result->memblock);
491
492 result->index = 0;
493 }
494 }
495
SplitPaSinkRender(pa_sink * s,size_t length,pa_memchunk * result,char * streamType)496 static unsigned SplitPaSinkRender(pa_sink *s, size_t length, pa_memchunk *result, char *streamType)
497 {
498 AUTO_CTRACE("module_split_stream_sink::SplitPaSinkRender:len:%zu", length);
499 unsigned streamCount = 0;
500 pa_mix_info info[MAX_MIX_CHANNELS];
501 unsigned n;
502 size_t blockSizeMax;
503
504 pa_sink_assert_ref(s);
505 pa_sink_assert_io_context(s);
506 pa_assert(PA_SINK_IS_LINKED(s->thread_info.state));
507 pa_assert(pa_frame_aligned(length, &s->sample_spec));
508 pa_assert(result);
509
510 pa_assert(!s->thread_info.rewind_requested);
511 pa_assert(s->thread_info.rewind_nbytes == 0);
512
513 if (s->thread_info.state == PA_SINK_SUSPENDED) {
514 result->memblock = pa_memblock_ref(s->silence.memblock);
515 result->index = s->silence.index;
516 result->length = PA_MIN(s->silence.length, length);
517 return 0;
518 }
519
520 pa_sink_ref(s);
521
522 AUDIO_DEBUG_LOG("module_split_stream_sink, splitSinkRender in length = %{public}zu", length);
523 if (length == 0) {
524 length = pa_frame_align(MIX_BUFFER_LENGTH, &s->sample_spec);
525 }
526
527 blockSizeMax = pa_mempool_block_size_max(s->core->mempool);
528 if (length > blockSizeMax)
529 length = pa_frame_align(blockSizeMax, &s->sample_spec);
530
531 pa_assert(length > 0);
532
533 n = SplitFillMixInfo(s, &length, info, MAX_MIX_CHANNELS, streamType);
534 streamCount = n;
535 SplitSinkRenderMix(s, length, info, n, result);
536
537 SplitSinkRenderInputsDrop(s, info, n, result);
538
539 pa_sink_unref(s);
540 return streamCount;
541 }
542
SplitSinkRenderIntoMix(pa_sink * s,size_t length,pa_mix_info * info,unsigned n,pa_memchunk * target)543 static void SplitSinkRenderIntoMix(pa_sink *s, size_t length, pa_mix_info *info, unsigned n, pa_memchunk *target)
544 {
545 if (n == 0) {
546 if (target->length > length)
547 target->length = length;
548
549 pa_silence_memchunk(target, &s->sample_spec);
550 } else if (n == 1) {
551 pa_cvolume volume;
552
553 if (target->length > length)
554 target->length = length;
555
556 pa_sw_cvolume_multiply(&volume, &s->thread_info.soft_volume, &info[0].volume);
557
558 if (s->thread_info.soft_muted || pa_cvolume_is_muted(&volume)) {
559 pa_silence_memchunk(target, &s->sample_spec);
560 } else {
561 pa_memchunk vChunk;
562
563 vChunk = info[0].chunk;
564 pa_memblock_ref(vChunk.memblock);
565
566 if (vChunk.length > length)
567 vChunk.length = length;
568
569 if (!pa_cvolume_is_norm(&volume)) {
570 pa_memchunk_make_writable(&vChunk, 0);
571 pa_volume_memchunk(&vChunk, &s->sample_spec, &volume);
572 }
573
574 pa_memchunk_memcpy(target, &vChunk);
575 pa_memblock_unref(vChunk.memblock);
576 }
577 } else {
578 void *ptr;
579
580 ptr = pa_memblock_acquire(target->memblock);
581
582 target->length = pa_mix(info, n,
583 (uint8_t*) ptr + target->index, length,
584 &s->sample_spec,
585 &s->thread_info.soft_volume,
586 s->thread_info.soft_muted);
587
588 pa_memblock_release(target->memblock);
589 }
590 }
591
SplitPaSinkRenderInto(pa_sink * s,pa_memchunk * target,char * streamType)592 static void SplitPaSinkRenderInto(pa_sink *s, pa_memchunk *target, char *streamType)
593 {
594 pa_mix_info info[MAX_MIX_CHANNELS];
595 unsigned n;
596 size_t length, blockSizeMax;
597
598 pa_sink_assert_ref(s);
599 pa_sink_assert_io_context(s);
600 pa_assert(PA_SINK_IS_LINKED(s->thread_info.state));
601 pa_assert(pa_frame_aligned(target->length, &s->sample_spec));
602
603 pa_assert(!s->thread_info.rewind_requested);
604 pa_assert(s->thread_info.rewind_nbytes == 0);
605
606 if (s->thread_info.state == PA_SINK_SUSPENDED) {
607 pa_silence_memchunk(target, &s->sample_spec);
608 return;
609 }
610
611 pa_sink_ref(s);
612
613 length = target->length;
614 blockSizeMax = pa_mempool_block_size_max(s->core->mempool);
615 if (length > blockSizeMax)
616 length = pa_frame_align(blockSizeMax, &s->sample_spec);
617
618 pa_assert(length > 0);
619
620 n = SplitFillMixInfo(s, &length, info, MAX_MIX_CHANNELS, streamType);
621 SplitSinkRenderIntoMix(s, length, info, n, target);
622
623 SplitSinkRenderInputsDrop(s, info, n, target);
624
625 pa_sink_unref(s);
626 }
627
SplitPaSinkRenderIntoFull(pa_sink * s,pa_memchunk * target,char * streamType)628 static void SplitPaSinkRenderIntoFull(pa_sink *s, pa_memchunk *target, char *streamType)
629 {
630 pa_memchunk chunk;
631 size_t l, d;
632
633 pa_sink_assert_ref(s);
634 pa_sink_assert_io_context(s);
635 pa_assert(target);
636 pa_assert(target->length > 0);
637 pa_assert(target->memblock);
638 pa_assert(pa_frame_aligned(target->length, &s->sample_spec));
639
640 if (s->thread_info.state == PA_SINK_SUSPENDED) {
641 pa_silence_memchunk(target, &s->sample_spec);
642 return;
643 }
644
645 pa_sink_ref(s);
646
647 l = target->length;
648 d = 0;
649 while (l > 0) {
650 chunk = *target;
651 chunk.index += d;
652 chunk.length -=d;
653
654 SplitPaSinkRenderInto(s, &chunk, streamType);
655
656 d += chunk.length;
657 l -= chunk.length;
658 }
659
660 pa_sink_unref(s);
661 }
662
SplitPaSinkRenderFull(pa_sink * s,size_t length,pa_memchunk * result,char * streamType)663 static unsigned SplitPaSinkRenderFull(pa_sink *s, size_t length, pa_memchunk *result, char *streamType)
664 {
665 unsigned nSink;
666 pa_sink_assert_ref(s);
667 pa_sink_assert_io_context(s);
668 pa_assert(PA_SINK_IS_LINKED(s->thread_info.state));
669 pa_assert(length > 0);
670 pa_assert(pa_frame_aligned(length, &s->sample_spec));
671 pa_assert(result);
672
673 pa_assert(!s->thread_info.rewind_requested);
674 pa_assert(s->thread_info.rewind_nbytes == 0);
675
676 if (s->thread_info.state == PA_SINK_SUSPENDED) {
677 result->memblock = pa_memblock_ref(s->silence.memblock);
678 result->index = s->silence.index;
679 result->length = PA_MIN(s->silence.length, length);
680 return 0;
681 }
682
683 pa_sink_ref(s);
684
685 AUDIO_DEBUG_LOG("module_split_stream_sink, splitSinkRender in length = %{public}zu", length);
686 nSink = SplitPaSinkRender(s, length, result, streamType);
687 if (nSink == 0) {
688 return nSink;
689 }
690
691 if (result->length < length) {
692 pa_memchunk chunk;
693
694 pa_memchunk_make_writable(result, length);
695
696 chunk.memblock = result->memblock;
697 chunk.index = result->index + result->length;
698 chunk.length = length - result->length;
699
700 SplitPaSinkRenderIntoFull(s, &chunk, streamType);
701
702 result->length = length;
703 }
704
705 pa_sink_unref(s);
706 return nSink;
707 }
708
ProcessRender(struct userdata * u,pa_usec_t now)709 static void ProcessRender(struct userdata *u, pa_usec_t now)
710 {
711 AUTO_CTRACE("module_split_stream_sink: ProcessRender");
712
713 pa_assert(u);
714
715 /* Fill the buffer up the latency size */
716 for (int i = 0; i < g_splitNums; i++) {
717 AUTO_CTRACE("module_split_stream_sink::ProcessRender:streamType:%s", g_splitArr[i]);
718 AUDIO_DEBUG_LOG("module_split_stream_sink: ProcessRender:streamType:%{public}s", g_splitArr[i]);
719
720 pa_memchunk chunk;
721 unsigned chunkIsNull = 0;
722 chunkIsNull = SplitPaSinkRenderFull(u->sink, u->sink->thread_info.max_request, &chunk, g_splitArr[i]);
723 if (chunkIsNull == 0) {
724 continue;
725 }
726
727 // start hdi
728 StartSplitStreamHdiIfRunning(u);
729
730 AUDIO_DEBUG_LOG("module_split_stream_sink: ProcessRender send msg, chunk length = %{public}zu", chunk.length);
731 // send msg post data
732 if (!strcmp(g_splitArr[i], STREAM_TYPE_NAVIGATION)) {
733 pa_asyncmsgq_post(u->dq, NULL, HDI_RENDER_NAVIGATION, NULL, 0, &chunk, NULL);
734 } else if (!strcmp(g_splitArr[i], STREAM_TYPE_COMMUNICATION)) {
735 pa_asyncmsgq_post(u->dq, NULL, HDI_RENDER_COMMUNICATION, NULL, 0, &chunk, NULL);
736 } else {
737 pa_asyncmsgq_post(u->dq, NULL, HDI_RENDER_MEDIA, NULL, 0, &chunk, NULL);
738 }
739 }
740 u->timestamp += pa_bytes_to_usec(u->sink->thread_info.max_request, &u->sink->sample_spec);
741 }
742
ThreadFunc(void * userdata)743 static void ThreadFunc(void *userdata)
744 {
745 ScheduleReportData(getpid(), gettid(), "audio_server");
746 struct userdata *u = userdata;
747 pa_assert(u);
748 AUDIO_DEBUG_LOG("Thread starting up");
749 if (u->core->realtime_scheduling) {
750 pa_thread_make_realtime(u->core->realtime_priority);
751 }
752 pa_thread_mq_install(&u->thread_mq);
753 u->timestamp = pa_rtclock_now();
754 for (;;) {
755 pa_usec_t now = 0;
756 int ret;
757
758 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
759 now = pa_rtclock_now();
760 }
761 if (PA_UNLIKELY(u->sink->thread_info.rewind_requested)) {
762 ProcessRewind(u, now);
763 }
764 /* Render some data and drop it immediately */
765 if (PA_SINK_IS_OPENED(u->sink->thread_info.state)) {
766 if (u->timestamp <= now) {
767 ProcessRender(u, now);
768 }
769 pa_rtpoll_set_timer_absolute(u->rtpoll, u->timestamp);
770 } else {
771 pa_rtpoll_set_timer_disabled(u->rtpoll);
772 }
773 /* Hmm, nothing to do. Let's sleep */
774 if ((ret = pa_rtpoll_run(u->rtpoll)) < 0) {
775 goto fail;
776 }
777 if (ret == 0) {
778 goto finish;
779 }
780 }
781
782 fail:
783 /* If this was no regular exit from the loop we have to continue
784 * processing messages until we received PA_MESSAGE_SHUTDOWN */
785 pa_asyncmsgq_post(u->thread_mq.outq, PA_MSGOBJECT(u->core),
786 PA_CORE_MESSAGE_UNLOAD_MODULE, u->module, 0, NULL, NULL);
787 pa_asyncmsgq_wait_for(u->thread_mq.inq, PA_MESSAGE_SHUTDOWN);
788
789 finish:
790 AUDIO_DEBUG_LOG("Thread shutting down");
791 }
792
ProcessSplitHdiRender(struct userdata * u,pa_memchunk * chunk,char * streamType)793 static void ProcessSplitHdiRender(struct userdata *u, pa_memchunk *chunk, char *streamType)
794 {
795 pa_usec_t now = pa_rtclock_now();
796 if (!u->isHDISinkStarted && now - u->timestampLastLog > USEC_PER_SEC) {
797 u->timestampLastLog = now;
798 const char *deviceClass = GetDeviceClass(u->sinkAdapter->deviceClass);
799 AUDIO_DEBUG_LOG("HDI not started, skip RenderWrite, wait sink[%s] suspend", deviceClass);
800 pa_memblock_unref(chunk->memblock);
801 } else if (!u->isHDISinkStarted) {
802 pa_memblock_unref(chunk->memblock);
803 } else if (SplitRenderWrite(u->sinkAdapter, chunk, streamType) < 0) {
804 u->bytesDropped += chunk->length;
805 AUDIO_ERR_LOG("RenderWrite failed");
806 }
807 if (pa_atomic_load(&u->dflag) == 1) {
808 pa_atomic_sub(&u->dflag, 1);
809 }
810 u->writeTime = pa_rtclock_now() - now;
811 }
812
ThreadFuncWriteHDI(void * userdata)813 static void ThreadFuncWriteHDI(void *userdata)
814 {
815 // set audio thread priority
816 ScheduleReportData(getpid(), gettid(), "pulseaudio");
817
818 struct userdata *u = userdata;
819 pa_assert(u);
820
821 int32_t quit = 0;
822
823 do {
824 int32_t code = 0;
825 pa_memchunk chunk;
826
827 pa_assert_se(pa_asyncmsgq_get(u->dq, NULL, &code, NULL, NULL, &chunk, 1) == 0);
828
829 switch (code) {
830 case HDI_RENDER_MEDIA: {
831 ProcessSplitHdiRender(u, &chunk, STREAM_TYPE_MEDIA);
832 break;
833 }
834 case HDI_RENDER_COMMUNICATION: {
835 ProcessSplitHdiRender(u, &chunk, STREAM_TYPE_COMMUNICATION);
836 break;
837 }
838 case HDI_RENDER_NAVIGATION: {
839 ProcessSplitHdiRender(u, &chunk, STREAM_TYPE_NAVIGATION);
840 break;
841 }
842 case QUIT:
843 quit = 1;
844 break;
845 default:
846 break;
847 }
848 pa_asyncmsgq_done(u->dq, 0);
849 } while (!quit);
850 }
851
SplitRenderWrite(struct RendererSinkAdapter * sinkAdapter,pa_memchunk * pchunk,char * streamType)852 static ssize_t SplitRenderWrite(struct RendererSinkAdapter *sinkAdapter, pa_memchunk *pchunk, char *streamType)
853 {
854 size_t index;
855 size_t length;
856 ssize_t count = 0;
857 void *p = NULL;
858
859 pa_assert(pchunk);
860
861 index = pchunk->index;
862 length = pchunk->length;
863 p = pa_memblock_acquire(pchunk->memblock);
864 pa_assert(p);
865
866 while (true) {
867 uint64_t writeLen = 0;
868
869 int32_t ret = sinkAdapter->RendererSplitRenderFrame(sinkAdapter, ((char*)p + index),
870 (uint64_t)length, &writeLen, streamType);
871 if (writeLen > length) {
872 AUDIO_ERR_LOG("Error writeLen > actual bytes. Length: %zu, Written: %" PRIu64 " bytes, %d ret",
873 length, writeLen, ret);
874 count = -1 - count;
875 break;
876 }
877 if (writeLen == 0) {
878 AUDIO_ERR_LOG("Failed to render Length: %{public}zu, Written: %{public}" PRIu64 " bytes, %{public}d ret",
879 length, writeLen, ret);
880 count = -1 - count;
881 break;
882 } else {
883 count += (ssize_t)writeLen;
884 index += writeLen;
885 length -= writeLen;
886 if (length == 0) {
887 break;
888 }
889 }
890 }
891 pa_memblock_release(pchunk->memblock);
892 pa_memblock_unref(pchunk->memblock);
893
894 return count;
895 }
896
InitFailed(pa_module * m,pa_modargs * ma)897 static int InitFailed(pa_module *m, pa_modargs *ma)
898 {
899 AUDIO_ERR_LOG("Split Stream Sink Init Failed");
900 if (ma)
901 pa_modargs_free(ma);
902
903 pa__done(m);
904
905 return PA_ERR;
906 }
907
CreateSink(pa_module * m,pa_modargs * ma,struct userdata * u)908 static int CreateSink(pa_module *m, pa_modargs *ma, struct userdata *u)
909 {
910 pa_sample_spec ss;
911 pa_channel_map map;
912 pa_sink_new_data data;
913 pa_format_info *format;
914
915 pa_assert(m);
916
917 ss = m->core->default_sample_spec;
918 map = m->core->default_channel_map;
919 if (pa_modargs_get_sample_spec_and_channel_map(ma, &ss, &map, PA_CHANNEL_MAP_DEFAULT) < 0) {
920 AUDIO_ERR_LOG("Invalid sample format specification or channel map");
921 return PA_ERR;
922 }
923
924 pa_sink_new_data_init(&data);
925 data.driver = __FILE__;
926 data.module = m;
927 pa_sink_new_data_set_name(&data, pa_modargs_get_value(ma, "sink_name", DEFAULT_SINK_NAME));
928 pa_sink_new_data_set_sample_spec(&data, &ss);
929 pa_sink_new_data_set_channel_map(&data, &map);
930 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_DESCRIPTION, _("Split Stream Output"));
931 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_CLASS, "renderer");
932 pa_proplist_sets(data.proplist, PA_PROP_DEVICE_STRING, "splitStream");
933
934 u->formats = pa_idxset_new(NULL, NULL);
935 format = pa_format_info_new();
936 format->encoding = PA_ENCODING_PCM;
937 pa_idxset_put(u->formats, format, NULL);
938
939 if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
940 AUDIO_ERR_LOG("Invalid properties");
941 pa_sink_new_data_done(&data);
942 return PA_ERR;
943 }
944
945 u->sink = pa_sink_new(m->core, &data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY);
946 pa_sink_new_data_done(&data);
947
948 if (!u->sink) {
949 AUDIO_ERR_LOG("Failed to create sink.");
950 return PA_ERR;
951 }
952
953 u->sink->parent.process_msg = SinkProcessMsg;
954 u->sink->set_state_in_io_thread = SinkSetStateInIoThreadCb;
955 u->sink->update_requested_latency = SinkUpdateRequestedLatencyCb;
956 u->sink->reconfigure = SinkReconfigureCb;
957 u->sink->get_formats = SinkGetFormatsCb;
958 u->sink->set_formats = SinkSetFormatsCb;
959 u->sink->userdata = u;
960
961 return 0;
962 }
963
InitRemoteSink(struct userdata * u,const char * filePath)964 static int32_t InitRemoteSink(struct userdata *u, const char *filePath)
965 {
966 SinkAttr sample_attrs;
967 int32_t ret;
968
969 sample_attrs.format = ConvertPaToHdiAdapterFormat(u->ss.format);
970 sample_attrs.adapterName = u->adapterName;
971 sample_attrs.openMicSpeaker = u->open_mic_speaker;
972 sample_attrs.sampleRate = (uint32_t) u->ss.rate;
973 sample_attrs.channel = u->ss.channels;
974 sample_attrs.volume = MAX_SINK_VOLUME_LEVEL;
975 sample_attrs.filePath = filePath;
976 sample_attrs.deviceNetworkId = u->deviceNetworkId;
977 sample_attrs.deviceType = u->deviceType;
978 sample_attrs.aux = SPLIT_MODE;
979
980 ret = u->sinkAdapter->RendererSinkInit(u->sinkAdapter, &sample_attrs);
981 if (ret != 0) {
982 AUDIO_ERR_LOG("audiorenderer Init failed!");
983 u->sinkAdapter->RendererSinkDeInit(u->sinkAdapter);
984 return -1;
985 }
986
987 return 0;
988 }
989
PaHdiSinkNewInit(pa_module * m,pa_modargs * ma,struct userdata * u)990 static int32_t PaHdiSinkNewInit(pa_module *m, pa_modargs *ma, struct userdata *u)
991 {
992 size_t nbytes;
993 int mg;
994 pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
995 pa_sink_set_rtpoll(u->sink, u->rtpoll);
996
997 u->buffer_size = DEFAULT_BUFFER_SIZE;
998
999 mg = pa_modargs_get_value_u32(ma, "buffer_size", &u->buffer_size);
1000 CHECK_AND_RETURN_RET_LOG(mg >= 0, InitFailed(m, ma),
1001 "Failed to parse buffer_size arg in capturer sink");
1002
1003 u->block_usec = pa_bytes_to_usec(u->buffer_size, &u->sink->sample_spec);
1004
1005 nbytes = pa_usec_to_bytes(u->block_usec, &u->sink->sample_spec);
1006 pa_sink_set_max_rewind(u->sink, nbytes);
1007 pa_sink_set_max_request(u->sink, u->buffer_size);
1008
1009 if (u->fixed_latency) {
1010 pa_sink_set_fixed_latency(u->sink, u->block_usec);
1011 } else {
1012 pa_sink_set_latency_range(u->sink, 0, u->block_usec);
1013 }
1014
1015 int32_t ret = LoadSinkAdapter(pa_modargs_get_value(ma, "device_class", DEFAULT_DEVICE_CLASS),
1016 pa_modargs_get_value(ma, "network_id", DEFAULT_DEVICE_NETWORKID), &u->sinkAdapter);
1017 if (ret) {
1018 AUDIO_ERR_LOG("Load adapter failed");
1019 return -1;
1020 }
1021
1022 if (pa_modargs_get_value_s32(ma, "device_type", &u->deviceType) < 0) {
1023 AUDIO_ERR_LOG("Failed to parse deviceType argument.");
1024 return -1;
1025 }
1026
1027 u->adapterName = pa_modargs_get_value(ma, "adapter_name", DEFAULT_DEVICE_CLASS);
1028 u->deviceNetworkId = pa_modargs_get_value(ma, "network_id", DEFAULT_DEVICE_NETWORKID);
1029
1030 u->ss = m->core->default_sample_spec;
1031 u->map = m->core->default_channel_map;
1032 if (pa_modargs_get_sample_spec_and_channel_map(ma, &u->ss, &u->map, PA_CHANNEL_MAP_DEFAULT) < 0) {
1033 AUDIO_ERR_LOG("Failed to parse sample specification and channel map");
1034 return -1;
1035 }
1036
1037 if (InitRemoteSink(u, pa_modargs_get_value(ma, "file_path", "")) < 0) {
1038 AUDIO_ERR_LOG("Failed to init remote audio render sink.");
1039 return -1;
1040 }
1041
1042 return 0;
1043 }
1044
pa__init(pa_module * m)1045 int pa__init(pa_module *m)
1046 {
1047 AUDIO_INFO_LOG("module_split_stream_sink pa__init start");
1048 struct userdata *u = NULL;
1049 pa_modargs *ma = NULL;
1050 int mq;
1051
1052 pa_assert(m);
1053
1054 ma = pa_modargs_new(m->argument, VALID_MODARGS);
1055 CHECK_AND_RETURN_RET_LOG(ma != NULL, InitFailed(m, ma), "Failed to parse module arguments:%{public}s", m->argument);
1056
1057 SPLIT_MODE = pa_modargs_get_value(ma, "split_mode", "1");
1058 AUDIO_INFO_LOG("module_split_stream_sink pa__init splitMode is %{public}s", SPLIT_MODE);
1059 ConvertToSplitArr(SPLIT_MODE);
1060
1061 m->userdata = u = pa_xnew0(struct userdata, 1);
1062 u->core = m->core;
1063 u->module = m;
1064 u->rtpoll = pa_rtpoll_new();
1065
1066 mq = pa_thread_mq_init(&u->thread_mq, m->core->mainloop, u->rtpoll);
1067 CHECK_AND_RETURN_RET_LOG(mq >=0, InitFailed(m, ma), "pa_thread_mq_init() failed.");
1068
1069 if (CreateSink(m, ma, u) != 0) {
1070 return InitFailed(m, ma);
1071 }
1072
1073 if (PaHdiSinkNewInit(m, ma, u) < 0) {
1074 AUDIO_ERR_LOG("PaHdiSinkNewInit failed");
1075 return -1;
1076 }
1077
1078 u->dq = pa_asyncmsgq_new(0);
1079
1080 if (!(u->thread = pa_thread_new("OS_SplitStream", ThreadFunc, u))) {
1081 AUDIO_ERR_LOG("Failed to create thread.");
1082 return InitFailed(m, ma);
1083 }
1084
1085 if (!(u->thread_split_hdi = pa_thread_new("OS_splitToHdi", ThreadFuncWriteHDI, u))) {
1086 AUDIO_ERR_LOG("Failed to create OS_splitToHdi.");
1087 return InitFailed(m, ma);
1088 }
1089
1090 pa_sink_put(u->sink);
1091
1092 pa_modargs_free(ma);
1093
1094 return 0;
1095 }
1096
pa__get_n_used(pa_module * m)1097 int pa__get_n_used(pa_module *m)
1098 {
1099 struct userdata *u;
1100
1101 pa_assert(m);
1102 pa_assert_se(u = m->userdata);
1103
1104 return pa_sink_linked_by(u->sink);
1105 }
1106
pa__done(pa_module * m)1107 void pa__done(pa_module*m)
1108 {
1109 struct userdata *u;
1110
1111 pa_assert(m);
1112
1113 if (!(u = m->userdata)) {
1114 return;
1115 }
1116
1117 if (u->sink) {
1118 pa_sink_unlink(u->sink);
1119 }
1120
1121 if (u->thread) {
1122 pa_asyncmsgq_send(u->thread_mq.inq, NULL, PA_MESSAGE_SHUTDOWN, NULL, 0, NULL);
1123 pa_thread_free(u->thread);
1124 }
1125
1126 pa_thread_mq_done(&u->thread_mq);
1127
1128 if (u->sink) {
1129 pa_sink_unref(u->sink);
1130 }
1131
1132 if (u->rtpoll) {
1133 pa_rtpoll_free(u->rtpoll);
1134 }
1135
1136 if (u->formats) {
1137 pa_idxset_free(u->formats, (pa_free_cb_t)pa_format_info_free);
1138 }
1139
1140 pa_xfree(u);
1141 }
1142