music-assistant-server

21.1 KBPY
ma_stream.py
21.1 KB547 lines • python
1"""Music Assistant Snapcast source stream.
2
3This module implements a Music Assistant-managed Snapcast stream that is exposed to the
4Snapcast server as a TCP source. The stream is produced by running an FFmpeg pipeline
5which pulls audio from Music Assistant and pushes it to the Snapcast source URI.
6
7Optionally, a Unix socket server can be started to provide a control channel for a
8Snapcast control script (used by the built-in Snapcast server integration).
9"""
10
11from __future__ import annotations
12
13import asyncio
14import random
15import time
16import urllib.parse
17from contextlib import suppress
18from typing import TYPE_CHECKING, cast
19
20from music_assistant.helpers.audio import get_player_filter_params
21from music_assistant.helpers.ffmpeg import FFMpeg
22from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
23
24from .constants import (
25    CONTROL_SOCKET_PATH_TEMPLATE,
26    DEFAULT_SNAPCAST_FORMAT,
27)
28
29if TYPE_CHECKING:
30    from music_assistant_models.player import PlayerMedia
31
32    from .provider import SnapCastProvider
33    from .snap_cntrl_proto import SnapstreamProto
34
35
36class SnapcastMAStream:
37    """A Music Assistant-managed Snapcast stream.
38
39    The stream lifecycle is:
40    - setup: ensure required server resources exist (Snapcast source, optional socket server)
41    - start_stream: start the FFmpeg streaming task
42    - request_stop_stream / wait_for_stopped: stop streaming and await termination
43    - destroy: stop streaming, remove Snapcast source, and stop ancillary services
44
45    If `cntrl_queue_id` is provided, a Unix socket server is started to allow a Snapcast
46    control script to communicate with Music Assistant.
47    """
48
49    def __init__(
50        self,
51        provider: SnapCastProvider,
52        media: PlayerMedia,
53        stream_name: str,
54        source_id: str | None = None,
55        filter_settings_owner: str | None = None,
56        use_cntrl_script: bool = False,
57        destroy_on_stop: bool = False,
58    ) -> None:
59        """Initialize the stream.
60
61        Args:
62            provider: The Snapcast provider instance.
63            media: The media item to stream.
64            stream_name: Name used to register the stream on the Snapcast server.
65            cntrl_queue_id: If set, enables the control socket server used by the control script.
66            filter_settings_owner: Player/entity id used to fetch DSP/filter parameters.
67            destroy_on_stop: If true, delete this MA stream once streaming stops.
68        """
69        self.media = media
70        self.stream_name = stream_name
71        self.snap_stream: SnapstreamProto | None = None
72
73        self._provider = provider
74        self._logger = provider.logger
75        self._mass = provider.mass
76        self._source_id = source_id
77        self._use_cntrl_script = use_cntrl_script
78        self._cntrl_queue_id = source_id if use_cntrl_script else None
79        self._filter_settings_owner = filter_settings_owner
80        self._destroy_on_stop = destroy_on_stop
81
82        self._lifecycle_lock = asyncio.Lock()
83        self._destroyed = False
84        self._setup_done = False
85        self._is_streaming = False
86        self._restart_requested: bool = False
87        self._stop_requested: bool = False
88        self._streaming_started_at: float | None = None
89
90        self._socket_server: SnapcastSocketServer | None = None
91        self._socket_path: str | None = None
92        self._streamer_task: asyncio.Task[None] | None = None
93        self._stop_streamer_evt = asyncio.Event()
94        self._streamer_started_evt = asyncio.Event()
95        self._stop_timer: asyncio.Handle | None = None
96        self._stop_timer_started_at: float | None = None
97        self._filter_settings: list[str] | None = None
98
99    @property
100    def source_id(self) -> str | None:
101        """Return the source id this stream was created for."""
102        return self._source_id
103
104    @property
105    def stream_id(self) -> str | None:
106        """Return the Snapcast stream identifier, if registered."""
107        if self.snap_stream:
108            return self.snap_stream.identifier
109        return None
110
111    @property
112    def is_streaming(self) -> bool:
113        """Return True if the FFmpeg streaming task is currently running."""
114        return self._is_streaming
115
116    @property
117    def playback_started_at(self) -> float | None:
118        """Return when the playback started at the clients.
119
120        return The (UTC) timestamp when the playback was started on the client
121        or None if not started yet or not streaming.
122        """
123        if self._streaming_started_at is None:
124            return None
125        if self._provider._use_builtin_server:
126            buffer_ms = self._provider._snapcast_server_buffer_size
127            if time.time() - self._streaming_started_at < buffer_ms / 1000.0:
128                return None
129            return self._streaming_started_at + buffer_ms / 1000.0
130        return self._streaming_started_at
131
132    async def setup(self) -> None:
133        """Prepare the Snapcast stream resources.
134
135        Ensures a Snapcast source exists on the server. If `cntrl_queue_id` is set,
136        also starts the Unix socket server used by the control script.
137        """
138        async with self._lifecycle_lock:
139            if self._destroyed:
140                raise RuntimeError("Session is destroyed")
141            if self._setup_done:
142                return
143            if self._provider._snapserver is None:
144                raise RuntimeError("Snapserver needs to be setup first")
145
146            if self._cntrl_queue_id:
147                await self._start_socket_server()
148
149            await self._register_tcp_server_source()
150            self._setup_done = True
151
152    async def destroy(self) -> None:
153        """Stop streaming and tear down all resources.
154
155        This stops the streamer task (if running), removes the Snapcast source,
156        and stops the optional control socket server.
157        """
158        async with self._lifecycle_lock:
159            if self._destroyed:
160                return
161            self._destroyed = True
162
163        self.request_stop_stream()
164        await self.wait_for_stopped()
165        await self._remove_snap_source()
166        await self._stop_socket_server()
167
168    async def start_stream(self, allow_restart: bool = False) -> None:
169        """Start streaming the configured media to the Snapcast source.
170
171        Raises:
172            RuntimeError: If the streamer task is already running.
173        """
174        await self.setup()
175        async with self._lifecycle_lock:
176            if self._streamer_task and not self._streamer_task.done():
177                if not allow_restart:
178                    raise RuntimeError("streamer already running")
179                self._restart_if_running()
180                return
181
182            self._stop_requested = False
183            self._restart_requested = False
184            self._stop_streamer_evt.clear()
185            self._streamer_started_evt.clear()
186            self._streamer_task = self._mass.create_task(self._streamer_task_impl())
187            self._streamer_task.add_done_callback(self._on_streamer_done)
188
189    async def wait_for_started(self, timeout_sec: float | None = None) -> None:
190        """Wait until the streamer task signals it has started.
191
192        Args:
193            timeout_sec: Optional timeout in seconds.
194        """
195        try:
196            await asyncio.wait_for(self._streamer_started_evt.wait(), timeout_sec)
197        except TimeoutError:
198            self._logger.warning(
199                "Timeout waiting for stream %s to start; Canceling...",
200                self.stream_name,
201            )
202
203    def update_media(self, media: PlayerMedia) -> None:
204        """Update the media to play and restart the stream if required."""
205        if media != self.media:
206            self.media = media
207            self._restart_if_running()
208
209    def update_filter_settings(self, from_player: str | None = None) -> None:
210        """Update the filter setting."""
211        take_from = from_player or self._filter_settings_owner
212        if not take_from:
213            raise RuntimeError("No player provided to read filter settings from.")
214        new_settings = get_player_filter_params(
215            self._mass,
216            take_from,
217            DEFAULT_SNAPCAST_FORMAT,
218            DEFAULT_SNAPCAST_FORMAT,
219        )
220        if from_player:
221            self._filter_settings_owner = from_player
222        if new_settings != self._filter_settings:
223            self._restart_if_running()
224
225    def request_stop_stream(self) -> None:
226        """Request the streamer task to stop.
227
228        This is cooperative: the streamer task will stop when it observes the stop event.
229        Any pending inactivity stop timer is canceled.
230        """
231        self._stop_requested = True
232        self._restart_requested = False  # explicit stop cancels any pending restart
233        self._stop_streamer_evt.set()
234
235        self._stop_timer_started_at = None
236        if self._stop_timer:
237            self._stop_timer.cancel()
238
239    def set_in_use(self, in_use: bool) -> None:
240        """Mark the stream as in-use or idle.
241
242        When marked idle, a delayed stop is scheduled. When marked in-use, any pending
243        delayed stop is canceled.
244        """
245        if in_use:
246            self._stop_timer_started_at = None
247            if self._stop_timer:
248                self._stop_timer.cancel()
249        elif self._stop_timer_started_at is None:
250            self._stop_timer_started_at = self._mass.loop.time()
251            self._stop_timer = self._mass.loop.call_later(60.0, self.request_stop_stream)
252
253    async def wait_for_stopped(self, timeout_sec: float | None = None) -> None:
254        """Wait for the streamer task to finish.
255
256        If the task does not finish within the timeout, it is canceled and awaited.
257
258        Args:
259            timeout_sec: Optional timeout in seconds.
260        """
261        curr_task = self._streamer_task
262        if not curr_task:
263            return
264        try:
265            await asyncio.wait_for(curr_task, timeout_sec)
266        except asyncio.CancelledError:
267            self._logger.warning("Streamer task got canceled")
268        except TimeoutError:
269            self._logger.warning(
270                "Timeout waiting for stream %s to finish; Canceling...",
271                self.stream_name,
272            )
273            curr_task.cancel()
274            await asyncio.gather(curr_task, return_exceptions=True)
275
276    async def _streamer_task_impl(self) -> None:
277        """Streamer task implementation.
278
279        Runs FFmpeg to push audio to the Snapcast TCP source until FFmpeg exits or a stop
280        request is received. After exit, waits briefly for the Snapcast stream to report
281        an idle state.
282        """
283        stream_path = self._snap_get_stream_path()
284        if stream_path is None:
285            raise RuntimeError("The path to stream to is not set")
286
287        self._logger.debug("Start streaming to %s", stream_path)
288        self._stop_streamer_evt.clear()
289        self._streamer_started_evt.clear()
290        if self._filter_settings_owner:
291            self._filter_settings = get_player_filter_params(
292                self._mass,
293                self._filter_settings_owner,
294                DEFAULT_SNAPCAST_FORMAT,
295                DEFAULT_SNAPCAST_FORMAT,
296            )
297        audio_source = self._mass.streams.get_stream(self.media, DEFAULT_SNAPCAST_FORMAT)
298        try:
299            async with FFMpeg(
300                audio_input=audio_source,
301                input_format=DEFAULT_SNAPCAST_FORMAT,
302                output_format=DEFAULT_SNAPCAST_FORMAT,
303                filter_params=self._filter_settings or [],
304                audio_output=stream_path,
305                extra_input_args=["-y", "-re"],
306            ) as ffmpeg_proc:
307                wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait())
308                wait_stop = self._mass.create_task(self._stop_streamer_evt.wait())
309                self._streaming_started_at = time.time()
310                self._streamer_started_evt.set()
311                self._is_streaming = True
312
313                done, pending = await asyncio.wait(
314                    {wait_ffmpeg, wait_stop},
315                    return_when=asyncio.FIRST_COMPLETED,
316                )
317
318                if wait_stop in done and wait_ffmpeg not in done:
319                    self._logger.debug("Stopping stream %s requested.", self.stream_name)
320                    wait_ffmpeg.cancel()
321                    await asyncio.gather(wait_ffmpeg, return_exceptions=True)
322                    return
323
324                await wait_ffmpeg
325                for t in pending:
326                    t.cancel()
327                await asyncio.gather(*pending, return_exceptions=True)
328        except asyncio.CancelledError:
329            self._logger.debug("Snapcast stream %s cancelled", self.stream_name)
330            raise
331        except Exception as err:
332            self._logger.error("Snapcast stream %s error: %s", self.stream_name, err, exc_info=err)
333            raise
334        finally:
335            self._is_streaming = False
336            self._logger.debug("Finished streaming to %s", stream_path)
337            await self._wait_stream_idle()
338
339    async def _wait_stream_idle(self) -> None:
340        """Wait for the Snapcast stream to become idle after streaming ends."""
341        try:
342
343            async def wait_until_idle() -> None:
344                while True:
345                    stream_is_idle = False
346                    with suppress(KeyError):
347                        snap_stream = self._provider._snapserver.stream(self.stream_name)
348                        stream_is_idle = snap_stream.status == "idle"
349                    if self._mass.closing or stream_is_idle:
350                        break
351                    await asyncio.sleep(0.25)
352
353            await asyncio.wait_for(wait_until_idle(), timeout=10.0)
354        except TimeoutError:
355            self._logger.warning(
356                "Timeout waiting for stream %s to become idle",
357                self.stream_name,
358            )
359        finally:
360            self._streaming_started_at = None
361
362    def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
363        """Handle streamer task completion and optional cleanup."""
364        restart = False
365        try:
366            t.result()
367        except asyncio.CancelledError:
368            self._logger.debug("Streamer task cancelled: %s", self.stream_name)
369        except Exception:
370            self._logger.exception("Streamer task failed")
371        finally:
372            restart = self._restart_requested and not self._destroyed
373
374            if self._streamer_task is t:
375                self._streamer_task = None
376
377            # reset per-run state
378            self._restart_requested = False
379            self._stop_requested = False
380            self._stop_streamer_evt.clear()
381            self._streamer_started_evt.clear()
382
383        if restart:
384            self._mass.create_task(self._restart_stream_locked())
385        elif self._destroy_on_stop:
386            self._mass.create_task(self._provider.delete_ma_stream(self.stream_name))
387
388    def _restart_if_running(self) -> None:
389        """Request a running stream to restart."""
390        t = self._streamer_task
391        if not t or t.done():
392            return
393
394        if self._stop_requested or self._stop_streamer_evt.is_set():
395            return
396
397        self._restart_requested = True
398        self._stop_requested = True
399        self._stop_streamer_evt.set()
400
401        self._stop_timer_started_at = None
402        if self._stop_timer:
403            self._stop_timer.cancel()
404
405    async def _restart_stream_locked(self) -> None:
406        """Restart the streamer under the lifecycle lock."""
407        async with self._lifecycle_lock:
408            if self._destroyed:
409                return
410            if self._streamer_task and not self._streamer_task.done():
411                return
412
413            # reset state and start a fresh run
414            self._stop_requested = False
415            self._restart_requested = False
416            self._stop_streamer_evt.clear()
417            self._streamer_started_evt.clear()
418
419            self._streamer_task = self._mass.create_task(self._streamer_task_impl())
420            self._streamer_task.add_done_callback(self._on_streamer_done)
421
422    async def _register_tcp_server_source(self) -> None:
423        """Create a Snapcast TCP source for this stream (or reuse an existing one)."""
424        # prefer to reuse existing stream if possible
425        if self.snap_stream:
426            return
427
428        # The control script is used only for music streams in the builtin server
429        extra_args = ""
430        if (cntrl_queue_id := self._cntrl_queue_id) is not None:
431            # Create socket server for control script communication
432            socket_path = self._socket_path
433            if socket_path is None:
434                raise RuntimeError("socket_path needs to be set if cntrl_queue_id is set")
435            extra_args = (
436                f"&controlscript={urllib.parse.quote_plus('control.py')}"
437                f"&controlscriptparams=--queueid={urllib.parse.quote_plus(cntrl_queue_id)}%20"
438                f"--socket={urllib.parse.quote_plus(socket_path)}%20"
439                f"--streamserver-ip={self._mass.streams.publish_ip}%20"
440                f"--streamserver-port={self._mass.streams.publish_port}"
441            )
442
443        attempts = 50
444        while attempts:
445            attempts -= 1
446            # pick a random port
447            port = random.randint(4953, 4953 + 200)
448            ## Do we need to add a time out here?
449            result = await self._provider._snapserver.stream_add_stream(
450                # NOTE: setting the sampleformat to something else
451                # (like 24 bits bit depth) does not seem to work at all!
452                f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
453                f"&idle_threshold={self._provider._snapcast_stream_idle_threshold}"
454                f"{extra_args}&name={self.stream_name}"
455            )
456            if result is None or "id" not in result:
457                # if the port is already taken, the result will be an error
458                self._logger.warning(result)
459                continue
460            ## Do we need to synchronize the snapserver repr first?
461            self.snap_stream = self._provider._snapserver.stream(result["id"])
462            self.snap_stream.set_callback(self._snap_on_stream_update)
463            return
464
465        if self._socket_server:
466            await self._stop_socket_server()
467
468        msg = "Unable to create stream - No free port found?"
469        raise RuntimeError(msg)
470
471    async def _remove_snap_source(self) -> None:
472        """Remove the Snapcast source created for this stream and detach groups."""
473        if self._mass.closing or self.snap_stream is None:
474            return
475
476        for snap_group in self._provider._snapserver.groups:
477            if snap_group.stream != self.snap_stream.identifier:
478                continue
479            self._logger.debug(f"Set stream of group {snap_group.name} to default.")
480            await snap_group.set_stream("default")
481
482        with suppress(KeyError, AttributeError):
483            snap_stream = self._provider._snapserver.stream(self.stream_name)
484            await self._provider._snapserver.stream_remove_stream(snap_stream.identifier)
485
486        if self._socket_server:
487            await self._stop_socket_server()
488        self._snap_on_stream_update()
489
490        return
491
492    def _snap_get_stream_path(self) -> str | None:
493        """Return the Snapcast TCP URI to stream to."""
494        if self.snap_stream is None:
495            return None
496
497        uri = self.snap_stream._stream.get("uri", {})
498        uri_host = uri.get("host", "")
499        stream_path = self.snap_stream.path or f"tcp://{uri_host}"
500        return stream_path.replace("0.0.0.0", self._provider._snapcast_server_host)
501
502    def _snap_on_stream_update(self, stream: SnapstreamProto | None = None) -> None:
503        """Handle Snapcast stream updates and trigger group member refresh."""
504        if self.snap_stream is None:
505            return
506
507        for snap_group in self._provider._snapserver.groups:
508            if snap_group.stream != self.snap_stream.identifier:
509                continue
510            self._provider.poke_group_members(snap_group)
511
512    async def _start_socket_server(self) -> str:
513        """Get or create a socket server for the given queue.
514
515        :return: The path to the Unix socket.
516        """
517        if self._socket_server:
518            return self._socket_server.socket_path
519
520        if self._cntrl_queue_id is None:
521            raise RuntimeError("Socket server require _cntrl_queue_id to be set")
522
523        socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=self._cntrl_queue_id)
524        socket_server = SnapcastSocketServer(
525            mass=self._mass,
526            queue_id=self._cntrl_queue_id,
527            socket_path=socket_path,
528            streamserver_ip=str(self._mass.streams.publish_ip),
529            streamserver_port=cast("int", self._mass.streams.publish_port),
530        )
531        await socket_server.start()
532        self._socket_server = socket_server
533        self._socket_path = socket_path
534        self._logger.debug(
535            "Created socket server for queue %s at %s", self._cntrl_queue_id, socket_path
536        )
537        return socket_path
538
539    async def _stop_socket_server(self) -> None:
540        """Stop and remove the socket server for the given queue."""
541        if not self._socket_server:
542            return
543
544        await self._socket_server.stop()
545        self._socket_server = None
546        self._logger.debug("Stopped socket server for queue %s", self._cntrl_queue_id)
547