/
/
/
1"""
2Implementation of a Stream for the Universal Group Player.
3
4Stream handler for Universal Groups, managing audio distribution to group members.
5Essentially, it multicasts an audio source to multiple client streams, allowing individual
6filter_params for each client.
7"""
8
9from __future__ import annotations
10
11import asyncio
12from collections.abc import AsyncGenerator, Awaitable, Callable
13from contextlib import suppress
14from typing import TYPE_CHECKING
15
16if TYPE_CHECKING:
17 from music_assistant_models.media_items import AudioFormat
18
19from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
20from music_assistant.helpers.util import empty_queue
21
22
23class UGPStream:
24 """
25 Implementation of a Stream for the Universal Group Player.
26
27 Stream handler for Universal Groups, managing audio distribution to group members.
28 Essentially, it multicasts an audio source to multiple client streams, allowing individual
29 filter_params for each client.
30 """
31
32 def __init__(
33 self,
34 audio_source: AsyncGenerator[bytes, None],
35 audio_format: AudioFormat,
36 base_pcm_format: AudioFormat,
37 ) -> None:
38 """Initialize UGP Stream."""
39 self.audio_source = audio_source
40 self.input_format = audio_format
41 self.base_pcm_format = base_pcm_format
42 self.subscribers: list[Callable[[bytes], Awaitable[None]]] = []
43 self._task: asyncio.Task[None] | None = None
44 self._done: asyncio.Event = asyncio.Event()
45
46 @property
47 def done(self) -> bool:
48 """Return if this stream is already done."""
49 return self._done.is_set() and self._task is not None and self._task.done()
50
51 async def stop(self) -> None:
52 """Stop/cancel the stream."""
53 if self._done.is_set():
54 return
55 if self._task and not self._task.done():
56 self._task.cancel()
57 with suppress(asyncio.CancelledError):
58 await self._task
59 self._done.set()
60
61 async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
62 """
63 Subscribe to the raw/unaltered audio stream.
64
65 The returned stream has the format `self.base_pcm_format`.
66 """
67 # start the runner as soon as the (first) client connects
68 if not self._task:
69 self._task = asyncio.create_task(self._runner())
70 queue: asyncio.Queue[bytes] = asyncio.Queue(10)
71 try:
72 self.subscribers.append(queue.put)
73 while True:
74 chunk = await queue.get()
75 if not chunk:
76 break
77 yield chunk
78 finally:
79 self.subscribers.remove(queue.put)
80 empty_queue(queue)
81 del queue
82
83 async def get_stream(
84 self, output_format: AudioFormat, filter_params: list[str] | None = None
85 ) -> AsyncGenerator[bytes, None]:
86 """Subscribe to the client specific audio stream."""
87 # start the runner as soon as the (first) client connects
88 async for chunk in get_ffmpeg_stream(
89 audio_input=self.subscribe_raw(),
90 input_format=self.base_pcm_format,
91 output_format=output_format,
92 filter_params=filter_params,
93 ):
94 yield chunk
95
96 async def _runner(self) -> None:
97 """Run the stream for the given audio source."""
98 await asyncio.sleep(0.25) # small delay to allow subscribers to connect
99 async for chunk in get_ffmpeg_stream(
100 audio_input=self.audio_source,
101 input_format=self.input_format,
102 output_format=self.base_pcm_format,
103 # we don't allow the player to buffer too much ahead so we use readrate limiting
104 extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"],
105 ):
106 await asyncio.gather(
107 *[sub(chunk) for sub in self.subscribers],
108 return_exceptions=True,
109 )
110 # empty chunk when done
111 await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
112 self._done.set()
113