/
/
/
1"""
2Implementation of a Stream for the Universal Group Player.
3
4Stream handler for Universal Groups, managing audio distribution to group members.
5Essentially, it multicasts an audio source to multiple client streams, allowing individual
6filter_params for each client.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import logging
13from collections.abc import AsyncGenerator, Awaitable, Callable
14from contextlib import suppress
15from typing import TYPE_CHECKING
16
17if TYPE_CHECKING:
18 from music_assistant_models.media_items import AudioFormat
19
20from music_assistant.constants import MASS_LOGGER_NAME
21from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
22from music_assistant.helpers.util import empty_queue
23
24LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.providers.ugp_stream")
25
26
27class UGPStream:
28 """
29 Implementation of a Stream for the Universal Group Player.
30
31 Stream handler for Universal Groups, managing audio distribution to group members.
32 Essentially, it multicasts an audio source to multiple client streams, allowing individual
33 filter_params for each client.
34 """
35
36 def __init__(
37 self,
38 audio_source: AsyncGenerator[bytes, None],
39 audio_format: AudioFormat,
40 base_pcm_format: AudioFormat,
41 ) -> None:
42 """Initialize UGP Stream."""
43 self.audio_source = audio_source
44 self.input_format = audio_format
45 self.base_pcm_format = base_pcm_format
46 self.subscribers: list[Callable[[bytes], Awaitable[None]]] = []
47 self._task: asyncio.Task[None] | None = None
48 self._done: asyncio.Event = asyncio.Event()
49
50 @property
51 def done(self) -> bool:
52 """Return if this stream is already done."""
53 return self._done.is_set() and self._task is not None and self._task.done()
54
55 async def stop(self) -> None:
56 """Stop/cancel the stream."""
57 if self._done.is_set():
58 return
59 if self._task and not self._task.done():
60 self._task.cancel()
61 with suppress(asyncio.CancelledError):
62 await self._task
63 self._done.set()
64
65 async def subscribe_raw(self) -> AsyncGenerator[bytes, None]:
66 """
67 Subscribe to the raw/unaltered audio stream.
68
69 The returned stream has the format `self.base_pcm_format`.
70 """
71 # start the runner as soon as the (first) client connects
72 if not self._task:
73 self._task = asyncio.create_task(self._runner())
74 queue: asyncio.Queue[bytes] = asyncio.Queue(10)
75 try:
76 self.subscribers.append(queue.put)
77 while True:
78 chunk = await queue.get()
79 if not chunk:
80 break
81 yield chunk
82 finally:
83 self.subscribers.remove(queue.put)
84 empty_queue(queue)
85 del queue
86
87 async def get_stream(
88 self, output_format: AudioFormat, filter_params: list[str] | None = None
89 ) -> AsyncGenerator[bytes, None]:
90 """Subscribe to the client specific audio stream."""
91 # start the runner as soon as the (first) client connects
92 async for chunk in get_ffmpeg_stream(
93 audio_input=self.subscribe_raw(),
94 input_format=self.base_pcm_format,
95 output_format=output_format,
96 filter_params=filter_params,
97 ):
98 yield chunk
99
100 async def _runner(self) -> None:
101 """Run the stream for the given audio source."""
102 await asyncio.sleep(0.25) # small delay to allow subscribers to connect
103 try:
104 async for chunk in get_ffmpeg_stream(
105 audio_input=self.audio_source,
106 input_format=self.input_format,
107 output_format=self.base_pcm_format,
108 # we don't allow the player to buffer too much ahead so we use readrate limiting
109 extra_input_args=["-readrate", "1.1", "-readrate_initial_burst", "10"],
110 ):
111 await asyncio.gather(
112 *[sub(chunk) for sub in self.subscribers],
113 return_exceptions=True,
114 )
115 except asyncio.CancelledError:
116 LOGGER.debug("UGP stream runner cancelled")
117 raise
118 except Exception as err:
119 LOGGER.error("UGP stream runner error: %s", err, exc_info=err)
120 finally:
121 # empty chunk when done
122 await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
123 self._done.set()
124