/
/
/
1"""Music Assistant Snapcast source stream.
2
3This module implements a Music Assistant-managed Snapcast stream that is exposed to the
4Snapcast server as a TCP source. The stream is produced by running an FFmpeg pipeline
5which pulls audio from Music Assistant and pushes it to the Snapcast source URI.
6
7Optionally, a Unix socket server can be started to provide a control channel for a
8Snapcast control script (used by the built-in Snapcast server integration).
9"""
10
11from __future__ import annotations
12
13import asyncio
14import random
15import urllib.parse
16from contextlib import suppress
17from typing import TYPE_CHECKING, cast
18
19from music_assistant.helpers.audio import get_player_filter_params
20from music_assistant.helpers.ffmpeg import FFMpeg
21from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
22
23from .constants import (
24 CONTROL_SOCKET_PATH_TEMPLATE,
25 DEFAULT_SNAPCAST_FORMAT,
26)
27
28if TYPE_CHECKING:
29 from music_assistant_models.player import PlayerMedia
30
31 from .provider import SnapCastProvider
32 from .snap_cntrl_proto import SnapstreamProto
33
34
35class SnapcastMAStream:
36 """A Music Assistant-managed Snapcast stream.
37
38 The stream lifecycle is:
39 - setup: ensure required server resources exist (Snapcast source, optional socket server)
40 - start_stream: start the FFmpeg streaming task
41 - request_stop_stream / wait_for_stopped: stop streaming and await termination
42 - destroy: stop streaming, remove Snapcast source, and stop ancillary services
43
44 If `cntrl_queue_id` is provided, a Unix socket server is started to allow a Snapcast
45 control script to communicate with Music Assistant.
46 """
47
48 def __init__(
49 self,
50 provider: SnapCastProvider,
51 media: PlayerMedia,
52 stream_name: str,
53 source_id: str | None = None,
54 filter_settings_owner: str | None = None,
55 use_cntrl_script: bool = False,
56 destroy_on_stop: bool = False,
57 ) -> None:
58 """Initialize the stream.
59
60 Args:
61 provider: The Snapcast provider instance.
62 media: The media item to stream.
63 stream_name: Name used to register the stream on the Snapcast server.
64 cntrl_queue_id: If set, enables the control socket server used by the control script.
65 filter_settings_owner: Player/entity id used to fetch DSP/filter parameters.
66 destroy_on_stop: If true, delete this MA stream once streaming stops.
67 """
68 self.media = media
69 self.stream_name = stream_name
70 self.snap_stream: SnapstreamProto | None = None
71
72 self._provider = provider
73 self._logger = provider.logger
74 self._mass = provider.mass
75 self._source_id = source_id
76 self._use_cntrl_script = use_cntrl_script
77 self._cntrl_queue_id = source_id if use_cntrl_script else None
78 self._filter_settings_owner = filter_settings_owner
79 self._destroy_on_stop = destroy_on_stop
80
81 self._lifecycle_lock = asyncio.Lock()
82 self._destroyed = False
83 self._setup_done = False
84 self._is_streaming = False
85 self._restart_requested: bool = False
86 self._stop_requested: bool = False
87
88 self._socket_server: SnapcastSocketServer | None = None
89 self._socket_path: str | None = None
90 self._streamer_task: asyncio.Task[None] | None = None
91 self._stop_streamer_evt = asyncio.Event()
92 self._streamer_started_evt = asyncio.Event()
93 self._stop_timer: asyncio.Handle | None = None
94 self._stop_timer_started_at: float | None = None
95 self._filter_settings: list[str] | None = None
96
97 @property
98 def source_id(self) -> str | None:
99 """Return the source id this stream was created for."""
100 return self._source_id
101
102 @property
103 def stream_id(self) -> str | None:
104 """Return the Snapcast stream identifier, if registered."""
105 if self.snap_stream:
106 return self.snap_stream.identifier
107 return None
108
109 @property
110 def is_streaming(self) -> bool:
111 """Return True if the FFmpeg streaming task is currently running."""
112 return self._is_streaming
113
114 async def setup(self) -> None:
115 """Prepare the Snapcast stream resources.
116
117 Ensures a Snapcast source exists on the server. If `cntrl_queue_id` is set,
118 also starts the Unix socket server used by the control script.
119 """
120 async with self._lifecycle_lock:
121 if self._destroyed:
122 raise RuntimeError("Session is destroyed")
123 if self._setup_done:
124 return
125 if self._provider._snapserver is None:
126 raise RuntimeError("Snapserver needs to be setup first")
127
128 if self._cntrl_queue_id:
129 await self._start_socket_server()
130
131 await self._register_tcp_server_source()
132 self._setup_done = True
133
134 async def destroy(self) -> None:
135 """Stop streaming and tear down all resources.
136
137 This stops the streamer task (if running), removes the Snapcast source,
138 and stops the optional control socket server.
139 """
140 async with self._lifecycle_lock:
141 if self._destroyed:
142 return
143 self._destroyed = True
144
145 self.request_stop_stream()
146 await self.wait_for_stopped()
147 await self._remove_snap_source()
148 await self._stop_socket_server()
149
150 async def start_stream(self, allow_restart: bool = False) -> None:
151 """Start streaming the configured media to the Snapcast source.
152
153 Raises:
154 RuntimeError: If the streamer task is already running.
155 """
156 await self.setup()
157 async with self._lifecycle_lock:
158 if self._streamer_task and not self._streamer_task.done():
159 if not allow_restart:
160 raise RuntimeError("streamer already running")
161 self._restart_if_running()
162 return
163
164 self._stop_requested = False
165 self._restart_requested = False
166 self._stop_streamer_evt.clear()
167 self._streamer_started_evt.clear()
168 self._streamer_task = self._mass.create_task(self._streamer_task_impl())
169 self._streamer_task.add_done_callback(self._on_streamer_done)
170
171 async def wait_for_started(self, timeout_sec: float | None = None) -> None:
172 """Wait until the streamer task signals it has started.
173
174 Args:
175 timeout_sec: Optional timeout in seconds.
176 """
177 try:
178 await asyncio.wait_for(self._streamer_started_evt.wait(), timeout_sec)
179 except TimeoutError:
180 self._logger.warning(
181 "Timeout waiting for stream %s to start; Canceling...",
182 self.stream_name,
183 )
184
185 def update_media(self, media: PlayerMedia) -> None:
186 """Update the media to play and restart the stream if required."""
187 if media != self.media:
188 self.media = media
189 self._restart_if_running()
190
191 def update_filter_settings(self, from_player: str | None = None) -> None:
192 """Update the filter setting."""
193 take_from = from_player or self._filter_settings_owner
194 if not take_from:
195 raise RuntimeError("No player provided to read filter settings from.")
196 new_settings = get_player_filter_params(
197 self._mass,
198 take_from,
199 DEFAULT_SNAPCAST_FORMAT,
200 DEFAULT_SNAPCAST_FORMAT,
201 )
202 if from_player:
203 self._filter_settings_owner = from_player
204 if new_settings != self._filter_settings:
205 self._restart_if_running()
206
207 def request_stop_stream(self) -> None:
208 """Request the streamer task to stop.
209
210 This is cooperative: the streamer task will stop when it observes the stop event.
211 Any pending inactivity stop timer is canceled.
212 """
213 self._stop_requested = True
214 self._restart_requested = False # explicit stop cancels any pending restart
215 self._stop_streamer_evt.set()
216
217 self._stop_timer_started_at = None
218 if self._stop_timer:
219 self._stop_timer.cancel()
220
221 def set_in_use(self, in_use: bool) -> None:
222 """Mark the stream as in-use or idle.
223
224 When marked idle, a delayed stop is scheduled. When marked in-use, any pending
225 delayed stop is canceled.
226 """
227 if in_use:
228 self._stop_timer_started_at = None
229 if self._stop_timer:
230 self._stop_timer.cancel()
231 elif self._stop_timer_started_at is None:
232 self._stop_timer_started_at = self._mass.loop.time()
233 self._stop_timer = self._mass.loop.call_later(60.0, self.request_stop_stream)
234
235 async def wait_for_stopped(self, timeout_sec: float | None = None) -> None:
236 """Wait for the streamer task to finish.
237
238 If the task does not finish within the timeout, it is canceled and awaited.
239
240 Args:
241 timeout_sec: Optional timeout in seconds.
242 """
243 curr_task = self._streamer_task
244 if not curr_task:
245 return
246 try:
247 await asyncio.wait_for(curr_task, timeout_sec)
248 except asyncio.CancelledError:
249 self._logger.warning("Streamer task got canceled")
250 except TimeoutError:
251 self._logger.warning(
252 "Timeout waiting for stream %s to finish; Canceling...",
253 self.stream_name,
254 )
255 curr_task.cancel()
256 await asyncio.gather(curr_task, return_exceptions=True)
257
258 async def _streamer_task_impl(self) -> None:
259 """Streamer task implementation.
260
261 Runs FFmpeg to push audio to the Snapcast TCP source until FFmpeg exits or a stop
262 request is received. After exit, waits briefly for the Snapcast stream to report
263 an idle state.
264 """
265 stream_path = self._snap_get_stream_path()
266 if stream_path is None:
267 raise RuntimeError("The path to stream to is not set")
268
269 self._logger.debug("Start streaming to %s", stream_path)
270 self._stop_streamer_evt.clear()
271 self._streamer_started_evt.clear()
272 if self._filter_settings_owner:
273 self._filter_settings = get_player_filter_params(
274 self._mass,
275 self._filter_settings_owner,
276 DEFAULT_SNAPCAST_FORMAT,
277 DEFAULT_SNAPCAST_FORMAT,
278 )
279 audio_source = self._mass.streams.get_stream(self.media, DEFAULT_SNAPCAST_FORMAT)
280 try:
281 async with FFMpeg(
282 audio_input=audio_source,
283 input_format=DEFAULT_SNAPCAST_FORMAT,
284 output_format=DEFAULT_SNAPCAST_FORMAT,
285 filter_params=self._filter_settings or [],
286 audio_output=stream_path,
287 extra_input_args=["-y", "-re"],
288 ) as ffmpeg_proc:
289 wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait())
290 wait_stop = self._mass.create_task(self._stop_streamer_evt.wait())
291 self._streamer_started_evt.set()
292 self._is_streaming = True
293
294 done, pending = await asyncio.wait(
295 {wait_ffmpeg, wait_stop},
296 return_when=asyncio.FIRST_COMPLETED,
297 )
298
299 if wait_stop in done and wait_ffmpeg not in done:
300 self._logger.debug("Stopping stream %s requested.", self.stream_name)
301 wait_ffmpeg.cancel()
302 await asyncio.gather(wait_ffmpeg, return_exceptions=True)
303 return
304
305 await wait_ffmpeg
306 for t in pending:
307 t.cancel()
308 await asyncio.gather(*pending, return_exceptions=True)
309
310 finally:
311 self._is_streaming = False
312 self._logger.debug("Finished streaming to %s", stream_path)
313 # Wait a bit for snap stream to become idle
314 try:
315
316 async def wait_until_idle() -> None:
317 while True:
318 stream_is_idle = False
319 with suppress(KeyError):
320 snap_stream = self._provider._snapserver.stream(self.stream_name)
321 stream_is_idle = snap_stream.status == "idle"
322 if self._mass.closing or stream_is_idle:
323 break
324 await asyncio.sleep(0.25)
325
326 await asyncio.wait_for(wait_until_idle(), timeout=10.0)
327
328 except TimeoutError:
329 self._logger.warning(
330 "Timeout waiting for stream %s to become idle",
331 self.stream_name,
332 )
333
334 def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
335 """Handle streamer task completion and optional cleanup."""
336 restart = False
337 try:
338 t.result()
339 except asyncio.CancelledError:
340 self._logger.debug("Streamer task cancelled: %s", self.stream_name)
341 except Exception:
342 self._logger.exception("Streamer task failed")
343 finally:
344 restart = self._restart_requested and not self._destroyed
345
346 if self._streamer_task is t:
347 self._streamer_task = None
348
349 # reset per-run state
350 self._restart_requested = False
351 self._stop_requested = False
352 self._stop_streamer_evt.clear()
353 self._streamer_started_evt.clear()
354
355 if restart:
356 self._mass.create_task(self._restart_stream_locked())
357 elif self._destroy_on_stop:
358 self._mass.create_task(self._provider.delete_ma_stream(self.stream_name))
359
360 def _restart_if_running(self) -> None:
361 """Request a running stream to restart."""
362 t = self._streamer_task
363 if not t or t.done():
364 return
365
366 if self._stop_requested or self._stop_streamer_evt.is_set():
367 return
368
369 self._restart_requested = True
370 self._stop_requested = True
371 self._stop_streamer_evt.set()
372
373 self._stop_timer_started_at = None
374 if self._stop_timer:
375 self._stop_timer.cancel()
376
377 async def _restart_stream_locked(self) -> None:
378 """Restart the streamer under the lifecycle lock."""
379 async with self._lifecycle_lock:
380 if self._destroyed:
381 return
382 if self._streamer_task and not self._streamer_task.done():
383 return
384
385 # reset state and start a fresh run
386 self._stop_requested = False
387 self._restart_requested = False
388 self._stop_streamer_evt.clear()
389 self._streamer_started_evt.clear()
390
391 self._streamer_task = self._mass.create_task(self._streamer_task_impl())
392 self._streamer_task.add_done_callback(self._on_streamer_done)
393
394 async def _register_tcp_server_source(self) -> None:
395 """Create a Snapcast TCP source for this stream (or reuse an existing one)."""
396 # prefer to reuse existing stream if possible
397 if self.snap_stream:
398 return
399
400 # The control script is used only for music streams in the builtin server
401 extra_args = ""
402 if (cntrl_queue_id := self._cntrl_queue_id) is not None:
403 # Create socket server for control script communication
404 socket_path = self._socket_path
405 if socket_path is None:
406 raise RuntimeError("socket_path needs to be set if cntrl_queue_id is set")
407 extra_args = (
408 f"&controlscript={urllib.parse.quote_plus('control.py')}"
409 f"&controlscriptparams=--queueid={urllib.parse.quote_plus(cntrl_queue_id)}%20"
410 f"--socket={urllib.parse.quote_plus(socket_path)}%20"
411 f"--streamserver-ip={self._mass.streams.publish_ip}%20"
412 f"--streamserver-port={self._mass.streams.publish_port}"
413 )
414
415 attempts = 50
416 while attempts:
417 attempts -= 1
418 # pick a random port
419 port = random.randint(4953, 4953 + 200)
420 ## Do we need to add a time out here?
421 result = await self._provider._snapserver.stream_add_stream(
422 # NOTE: setting the sampleformat to something else
423 # (like 24 bits bit depth) does not seem to work at all!
424 f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
425 f"&idle_threshold={self._provider._snapcast_stream_idle_threshold}"
426 f"{extra_args}&name={self.stream_name}"
427 )
428 if result is None or "id" not in result:
429 # if the port is already taken, the result will be an error
430 self._logger.warning(result)
431 continue
432 ## Do we need to synchronize the snapserver repr first?
433 self.snap_stream = self._provider._snapserver.stream(result["id"])
434 self.snap_stream.set_callback(self._snap_on_stream_update)
435 return
436
437 if self._socket_server:
438 await self._stop_socket_server()
439
440 msg = "Unable to create stream - No free port found?"
441 raise RuntimeError(msg)
442
443 async def _remove_snap_source(self) -> None:
444 """Remove the Snapcast source created for this stream and detach groups."""
445 if self._mass.closing or self.snap_stream is None:
446 return
447
448 for snap_group in self._provider._snapserver.groups:
449 if snap_group.stream != self.snap_stream.identifier:
450 continue
451 self._logger.debug(f"Set stream of group {snap_group.name} to default.")
452 await snap_group.set_stream("default")
453
454 with suppress(KeyError, AttributeError):
455 snap_stream = self._provider._snapserver.stream(self.stream_name)
456 await self._provider._snapserver.stream_remove_stream(snap_stream.identifier)
457
458 if self._socket_server:
459 await self._stop_socket_server()
460 self._snap_on_stream_update()
461
462 return
463
464 def _snap_get_stream_path(self) -> str | None:
465 """Return the Snapcast TCP URI to stream to."""
466 if self.snap_stream is None:
467 return None
468
469 uri = self.snap_stream._stream.get("uri", {})
470 uri_host = uri.get("host", "")
471 stream_path = self.snap_stream.path or f"tcp://{uri_host}"
472 return stream_path.replace("0.0.0.0", self._provider._snapcast_server_host)
473
474 def _snap_on_stream_update(self, stream: SnapstreamProto | None = None) -> None:
475 """Handle Snapcast stream updates and trigger group member refresh."""
476 if self.snap_stream is None:
477 return
478
479 for snap_group in self._provider._snapserver.groups:
480 if snap_group.stream != self.snap_stream.identifier:
481 continue
482 self._provider.poke_group_members(snap_group)
483
484 async def _start_socket_server(self) -> str:
485 """Get or create a socket server for the given queue.
486
487 :return: The path to the Unix socket.
488 """
489 if self._socket_server:
490 return self._socket_server.socket_path
491
492 if self._cntrl_queue_id is None:
493 raise RuntimeError("Socket server require _cntrl_queue_id to be set")
494
495 socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=self._cntrl_queue_id)
496 socket_server = SnapcastSocketServer(
497 mass=self._mass,
498 queue_id=self._cntrl_queue_id,
499 socket_path=socket_path,
500 streamserver_ip=str(self._mass.streams.publish_ip),
501 streamserver_port=cast("int", self._mass.streams.publish_port),
502 )
503 await socket_server.start()
504 self._socket_server = socket_server
505 self._socket_path = socket_path
506 self._logger.debug(
507 "Created socket server for queue %s at %s", self._cntrl_queue_id, socket_path
508 )
509 return socket_path
510
511 async def _stop_socket_server(self) -> None:
512 """Stop and remove the socket server for the given queue."""
513 if not self._socket_server:
514 return
515
516 await self._socket_server.stop()
517 self._socket_server = None
518 self._logger.debug("Stopped socket server for queue %s", self._cntrl_queue_id)
519