music-assistant-server

20.8 KBPY
ma_stream.py
20.8 KB539 lines • python
1"""Music Assistant Snapcast source stream.
2
3This module implements a Music Assistant-managed Snapcast stream that is exposed to the
4Snapcast server as a TCP source. The stream is produced by running an FFmpeg pipeline
5which pulls audio from Music Assistant and pushes it to the Snapcast source URI.
6
7Optionally, a Unix socket server can be started to provide a control channel for a
8Snapcast control script (used by the built-in Snapcast server integration).
9"""
10
11from __future__ import annotations
12
13import asyncio
14import random
15import time
16import urllib.parse
17from contextlib import suppress
18from typing import TYPE_CHECKING, cast
19
20from music_assistant.helpers.audio import get_player_filter_params
21from music_assistant.helpers.ffmpeg import FFMpeg
22from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
23
24from .constants import (
25    CONTROL_SOCKET_PATH_TEMPLATE,
26    DEFAULT_SNAPCAST_FORMAT,
27)
28
29if TYPE_CHECKING:
30    from music_assistant_models.player import PlayerMedia
31
32    from .provider import SnapCastProvider
33    from .snap_cntrl_proto import SnapstreamProto
34
35
36class SnapcastMAStream:
37    """A Music Assistant-managed Snapcast stream.
38
39    The stream lifecycle is:
40    - setup: ensure required server resources exist (Snapcast source, optional socket server)
41    - start_stream: start the FFmpeg streaming task
42    - request_stop_stream / wait_for_stopped: stop streaming and await termination
43    - destroy: stop streaming, remove Snapcast source, and stop ancillary services
44
45    If `cntrl_queue_id` is provided, a Unix socket server is started to allow a Snapcast
46    control script to communicate with Music Assistant.
47    """
48
49    def __init__(
50        self,
51        provider: SnapCastProvider,
52        media: PlayerMedia,
53        stream_name: str,
54        source_id: str | None = None,
55        filter_settings_owner: str | None = None,
56        use_cntrl_script: bool = False,
57        destroy_on_stop: bool = False,
58    ) -> None:
59        """Initialize the stream.
60
61        Args:
62            provider: The Snapcast provider instance.
63            media: The media item to stream.
64            stream_name: Name used to register the stream on the Snapcast server.
65            cntrl_queue_id: If set, enables the control socket server used by the control script.
66            filter_settings_owner: Player/entity id used to fetch DSP/filter parameters.
67            destroy_on_stop: If true, delete this MA stream once streaming stops.
68        """
69        self.media = media
70        self.stream_name = stream_name
71        self.snap_stream: SnapstreamProto | None = None
72
73        self._provider = provider
74        self._logger = provider.logger
75        self._mass = provider.mass
76        self._source_id = source_id
77        self._use_cntrl_script = use_cntrl_script
78        self._cntrl_queue_id = source_id if use_cntrl_script else None
79        self._filter_settings_owner = filter_settings_owner
80        self._destroy_on_stop = destroy_on_stop
81
82        self._lifecycle_lock = asyncio.Lock()
83        self._destroyed = False
84        self._setup_done = False
85        self._is_streaming = False
86        self._restart_requested: bool = False
87        self._stop_requested: bool = False
88        self._streaming_started_at: float | None = None
89
90        self._socket_server: SnapcastSocketServer | None = None
91        self._socket_path: str | None = None
92        self._streamer_task: asyncio.Task[None] | None = None
93        self._stop_streamer_evt = asyncio.Event()
94        self._streamer_started_evt = asyncio.Event()
95        self._stop_timer: asyncio.Handle | None = None
96        self._stop_timer_started_at: float | None = None
97        self._filter_settings: list[str] | None = None
98
99    @property
100    def source_id(self) -> str | None:
101        """Return the source id this stream was created for."""
102        return self._source_id
103
104    @property
105    def stream_id(self) -> str | None:
106        """Return the Snapcast stream identifier, if registered."""
107        if self.snap_stream:
108            return self.snap_stream.identifier
109        return None
110
111    @property
112    def is_streaming(self) -> bool:
113        """Return True if the FFmpeg streaming task is currently running."""
114        return self._is_streaming
115
116    @property
117    def playback_started_at(self) -> float | None:
118        """Return when the playback started at the clients.
119
120        return The (UTC) timestamp when the playback was started on the client
121        or None if not started yet or not streaming.
122        """
123        if self._streaming_started_at is None:
124            return None
125        if self._provider._use_builtin_server:
126            buffer_ms = self._provider._snapcast_server_buffer_size
127            if time.time() - self._streaming_started_at < buffer_ms / 1000.0:
128                return None
129            return self._streaming_started_at + buffer_ms / 1000.0
130        return self._streaming_started_at
131
132    async def setup(self) -> None:
133        """Prepare the Snapcast stream resources.
134
135        Ensures a Snapcast source exists on the server. If `cntrl_queue_id` is set,
136        also starts the Unix socket server used by the control script.
137        """
138        async with self._lifecycle_lock:
139            if self._destroyed:
140                raise RuntimeError("Session is destroyed")
141            if self._setup_done:
142                return
143            if self._provider._snapserver is None:
144                raise RuntimeError("Snapserver needs to be setup first")
145
146            if self._cntrl_queue_id:
147                await self._start_socket_server()
148
149            await self._register_tcp_server_source()
150            self._setup_done = True
151
152    async def destroy(self) -> None:
153        """Stop streaming and tear down all resources.
154
155        This stops the streamer task (if running), removes the Snapcast source,
156        and stops the optional control socket server.
157        """
158        async with self._lifecycle_lock:
159            if self._destroyed:
160                return
161            self._destroyed = True
162
163        self.request_stop_stream()
164        await self.wait_for_stopped()
165        await self._remove_snap_source()
166        await self._stop_socket_server()
167
168    async def start_stream(self, allow_restart: bool = False) -> None:
169        """Start streaming the configured media to the Snapcast source.
170
171        Raises:
172            RuntimeError: If the streamer task is already running.
173        """
174        await self.setup()
175        async with self._lifecycle_lock:
176            if self._streamer_task and not self._streamer_task.done():
177                if not allow_restart:
178                    raise RuntimeError("streamer already running")
179                self._restart_if_running()
180                return
181
182            self._stop_requested = False
183            self._restart_requested = False
184            self._stop_streamer_evt.clear()
185            self._streamer_started_evt.clear()
186            self._streamer_task = self._mass.create_task(self._streamer_task_impl())
187            self._streamer_task.add_done_callback(self._on_streamer_done)
188
189    async def wait_for_started(self, timeout_sec: float | None = None) -> None:
190        """Wait until the streamer task signals it has started.
191
192        Args:
193            timeout_sec: Optional timeout in seconds.
194        """
195        try:
196            await asyncio.wait_for(self._streamer_started_evt.wait(), timeout_sec)
197        except TimeoutError:
198            self._logger.warning(
199                "Timeout waiting for stream %s to start; Canceling...",
200                self.stream_name,
201            )
202
203    def update_media(self, media: PlayerMedia) -> None:
204        """Update the media to play and restart the stream if required."""
205        if media != self.media:
206            self.media = media
207            self._restart_if_running()
208
209    def update_filter_settings(self, from_player: str | None = None) -> None:
210        """Update the filter setting."""
211        take_from = from_player or self._filter_settings_owner
212        if not take_from:
213            raise RuntimeError("No player provided to read filter settings from.")
214        new_settings = get_player_filter_params(
215            self._mass,
216            take_from,
217            DEFAULT_SNAPCAST_FORMAT,
218            DEFAULT_SNAPCAST_FORMAT,
219        )
220        if from_player:
221            self._filter_settings_owner = from_player
222        if new_settings != self._filter_settings:
223            self._restart_if_running()
224
225    def request_stop_stream(self) -> None:
226        """Request the streamer task to stop.
227
228        This is cooperative: the streamer task will stop when it observes the stop event.
229        Any pending inactivity stop timer is canceled.
230        """
231        self._stop_requested = True
232        self._restart_requested = False  # explicit stop cancels any pending restart
233        self._stop_streamer_evt.set()
234
235        self._stop_timer_started_at = None
236        if self._stop_timer:
237            self._stop_timer.cancel()
238
239    def set_in_use(self, in_use: bool) -> None:
240        """Mark the stream as in-use or idle.
241
242        When marked idle, a delayed stop is scheduled. When marked in-use, any pending
243        delayed stop is canceled.
244        """
245        if in_use:
246            self._stop_timer_started_at = None
247            if self._stop_timer:
248                self._stop_timer.cancel()
249        elif self._stop_timer_started_at is None:
250            self._stop_timer_started_at = self._mass.loop.time()
251            self._stop_timer = self._mass.loop.call_later(60.0, self.request_stop_stream)
252
253    async def wait_for_stopped(self, timeout_sec: float | None = None) -> None:
254        """Wait for the streamer task to finish.
255
256        If the task does not finish within the timeout, it is canceled and awaited.
257
258        Args:
259            timeout_sec: Optional timeout in seconds.
260        """
261        curr_task = self._streamer_task
262        if not curr_task:
263            return
264        try:
265            await asyncio.wait_for(curr_task, timeout_sec)
266        except asyncio.CancelledError:
267            self._logger.warning("Streamer task got canceled")
268        except TimeoutError:
269            self._logger.warning(
270                "Timeout waiting for stream %s to finish; Canceling...",
271                self.stream_name,
272            )
273            curr_task.cancel()
274            await asyncio.gather(curr_task, return_exceptions=True)
275
276    async def _streamer_task_impl(self) -> None:
277        """Streamer task implementation.
278
279        Runs FFmpeg to push audio to the Snapcast TCP source until FFmpeg exits or a stop
280        request is received. After exit, waits briefly for the Snapcast stream to report
281        an idle state.
282        """
283        stream_path = self._snap_get_stream_path()
284        if stream_path is None:
285            raise RuntimeError("The path to stream to is not set")
286
287        self._logger.debug("Start streaming to %s", stream_path)
288        self._stop_streamer_evt.clear()
289        self._streamer_started_evt.clear()
290        if self._filter_settings_owner:
291            self._filter_settings = get_player_filter_params(
292                self._mass,
293                self._filter_settings_owner,
294                DEFAULT_SNAPCAST_FORMAT,
295                DEFAULT_SNAPCAST_FORMAT,
296            )
297        audio_source = self._mass.streams.get_stream(self.media, DEFAULT_SNAPCAST_FORMAT)
298        try:
299            async with FFMpeg(
300                audio_input=audio_source,
301                input_format=DEFAULT_SNAPCAST_FORMAT,
302                output_format=DEFAULT_SNAPCAST_FORMAT,
303                filter_params=self._filter_settings or [],
304                audio_output=stream_path,
305                extra_input_args=["-y", "-re"],
306            ) as ffmpeg_proc:
307                wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait())
308                wait_stop = self._mass.create_task(self._stop_streamer_evt.wait())
309                self._streaming_started_at = time.time()
310                self._streamer_started_evt.set()
311                self._is_streaming = True
312
313                done, pending = await asyncio.wait(
314                    {wait_ffmpeg, wait_stop},
315                    return_when=asyncio.FIRST_COMPLETED,
316                )
317
318                if wait_stop in done and wait_ffmpeg not in done:
319                    self._logger.debug("Stopping stream %s requested.", self.stream_name)
320                    wait_ffmpeg.cancel()
321                    await asyncio.gather(wait_ffmpeg, return_exceptions=True)
322                    return
323
324                await wait_ffmpeg
325                for t in pending:
326                    t.cancel()
327                await asyncio.gather(*pending, return_exceptions=True)
328
329        finally:
330            self._is_streaming = False
331            self._logger.debug("Finished streaming to %s", stream_path)
332            # Wait a bit for snap stream to become idle
333            try:
334
335                async def wait_until_idle() -> None:
336                    while True:
337                        stream_is_idle = False
338                        with suppress(KeyError):
339                            snap_stream = self._provider._snapserver.stream(self.stream_name)
340                            stream_is_idle = snap_stream.status == "idle"
341                        if self._mass.closing or stream_is_idle:
342                            break
343                        await asyncio.sleep(0.25)
344
345                await asyncio.wait_for(wait_until_idle(), timeout=10.0)
346            except TimeoutError:
347                self._logger.warning(
348                    "Timeout waiting for stream %s to become idle",
349                    self.stream_name,
350                )
351            finally:
352                self._streaming_started_at = None
353
354    def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
355        """Handle streamer task completion and optional cleanup."""
356        restart = False
357        try:
358            t.result()
359        except asyncio.CancelledError:
360            self._logger.debug("Streamer task cancelled: %s", self.stream_name)
361        except Exception:
362            self._logger.exception("Streamer task failed")
363        finally:
364            restart = self._restart_requested and not self._destroyed
365
366            if self._streamer_task is t:
367                self._streamer_task = None
368
369            # reset per-run state
370            self._restart_requested = False
371            self._stop_requested = False
372            self._stop_streamer_evt.clear()
373            self._streamer_started_evt.clear()
374
375        if restart:
376            self._mass.create_task(self._restart_stream_locked())
377        elif self._destroy_on_stop:
378            self._mass.create_task(self._provider.delete_ma_stream(self.stream_name))
379
380    def _restart_if_running(self) -> None:
381        """Request a running stream to restart."""
382        t = self._streamer_task
383        if not t or t.done():
384            return
385
386        if self._stop_requested or self._stop_streamer_evt.is_set():
387            return
388
389        self._restart_requested = True
390        self._stop_requested = True
391        self._stop_streamer_evt.set()
392
393        self._stop_timer_started_at = None
394        if self._stop_timer:
395            self._stop_timer.cancel()
396
397    async def _restart_stream_locked(self) -> None:
398        """Restart the streamer under the lifecycle lock."""
399        async with self._lifecycle_lock:
400            if self._destroyed:
401                return
402            if self._streamer_task and not self._streamer_task.done():
403                return
404
405            # reset state and start a fresh run
406            self._stop_requested = False
407            self._restart_requested = False
408            self._stop_streamer_evt.clear()
409            self._streamer_started_evt.clear()
410
411            self._streamer_task = self._mass.create_task(self._streamer_task_impl())
412            self._streamer_task.add_done_callback(self._on_streamer_done)
413
414    async def _register_tcp_server_source(self) -> None:
415        """Create a Snapcast TCP source for this stream (or reuse an existing one)."""
416        # prefer to reuse existing stream if possible
417        if self.snap_stream:
418            return
419
420        # The control script is used only for music streams in the builtin server
421        extra_args = ""
422        if (cntrl_queue_id := self._cntrl_queue_id) is not None:
423            # Create socket server for control script communication
424            socket_path = self._socket_path
425            if socket_path is None:
426                raise RuntimeError("socket_path needs to be set if cntrl_queue_id is set")
427            extra_args = (
428                f"&controlscript={urllib.parse.quote_plus('control.py')}"
429                f"&controlscriptparams=--queueid={urllib.parse.quote_plus(cntrl_queue_id)}%20"
430                f"--socket={urllib.parse.quote_plus(socket_path)}%20"
431                f"--streamserver-ip={self._mass.streams.publish_ip}%20"
432                f"--streamserver-port={self._mass.streams.publish_port}"
433            )
434
435        attempts = 50
436        while attempts:
437            attempts -= 1
438            # pick a random port
439            port = random.randint(4953, 4953 + 200)
440            ## Do we need to add a time out here?
441            result = await self._provider._snapserver.stream_add_stream(
442                # NOTE: setting the sampleformat to something else
443                # (like 24 bits bit depth) does not seem to work at all!
444                f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
445                f"&idle_threshold={self._provider._snapcast_stream_idle_threshold}"
446                f"{extra_args}&name={self.stream_name}"
447            )
448            if result is None or "id" not in result:
449                # if the port is already taken, the result will be an error
450                self._logger.warning(result)
451                continue
452            ## Do we need to synchronize the snapserver repr first?
453            self.snap_stream = self._provider._snapserver.stream(result["id"])
454            self.snap_stream.set_callback(self._snap_on_stream_update)
455            return
456
457        if self._socket_server:
458            await self._stop_socket_server()
459
460        msg = "Unable to create stream - No free port found?"
461        raise RuntimeError(msg)
462
463    async def _remove_snap_source(self) -> None:
464        """Remove the Snapcast source created for this stream and detach groups."""
465        if self._mass.closing or self.snap_stream is None:
466            return
467
468        for snap_group in self._provider._snapserver.groups:
469            if snap_group.stream != self.snap_stream.identifier:
470                continue
471            self._logger.debug(f"Set stream of group {snap_group.name} to default.")
472            await snap_group.set_stream("default")
473
474        with suppress(KeyError, AttributeError):
475            snap_stream = self._provider._snapserver.stream(self.stream_name)
476            await self._provider._snapserver.stream_remove_stream(snap_stream.identifier)
477
478        if self._socket_server:
479            await self._stop_socket_server()
480        self._snap_on_stream_update()
481
482        return
483
484    def _snap_get_stream_path(self) -> str | None:
485        """Return the Snapcast TCP URI to stream to."""
486        if self.snap_stream is None:
487            return None
488
489        uri = self.snap_stream._stream.get("uri", {})
490        uri_host = uri.get("host", "")
491        stream_path = self.snap_stream.path or f"tcp://{uri_host}"
492        return stream_path.replace("0.0.0.0", self._provider._snapcast_server_host)
493
494    def _snap_on_stream_update(self, stream: SnapstreamProto | None = None) -> None:
495        """Handle Snapcast stream updates and trigger group member refresh."""
496        if self.snap_stream is None:
497            return
498
499        for snap_group in self._provider._snapserver.groups:
500            if snap_group.stream != self.snap_stream.identifier:
501                continue
502            self._provider.poke_group_members(snap_group)
503
504    async def _start_socket_server(self) -> str:
505        """Get or create a socket server for the given queue.
506
507        :return: The path to the Unix socket.
508        """
509        if self._socket_server:
510            return self._socket_server.socket_path
511
512        if self._cntrl_queue_id is None:
513            raise RuntimeError("Socket server require _cntrl_queue_id to be set")
514
515        socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=self._cntrl_queue_id)
516        socket_server = SnapcastSocketServer(
517            mass=self._mass,
518            queue_id=self._cntrl_queue_id,
519            socket_path=socket_path,
520            streamserver_ip=str(self._mass.streams.publish_ip),
521            streamserver_port=cast("int", self._mass.streams.publish_port),
522        )
523        await socket_server.start()
524        self._socket_server = socket_server
525        self._socket_path = socket_path
526        self._logger.debug(
527            "Created socket server for queue %s at %s", self._cntrl_queue_id, socket_path
528        )
529        return socket_path
530
531    async def _stop_socket_server(self) -> None:
532        """Stop and remove the socket server for the given queue."""
533        if not self._socket_server:
534            return
535
536        await self._socket_server.stop()
537        self._socket_server = None
538        self._logger.debug("Stopped socket server for queue %s", self._cntrl_queue_id)
539