/
/
/
1"""Music Assistant Snapcast source stream.
2
3This module implements a Music Assistant-managed Snapcast stream that is exposed to the
4Snapcast server as a TCP source. The stream is produced by running an FFmpeg pipeline
5which pulls audio from Music Assistant and pushes it to the Snapcast source URI.
6
7Optionally, a Unix socket server can be started to provide a control channel for a
8Snapcast control script (used by the built-in Snapcast server integration).
9"""
10
11from __future__ import annotations
12
13import asyncio
14import random
15import time
16import urllib.parse
17from contextlib import suppress
18from typing import TYPE_CHECKING, cast
19
20from music_assistant.helpers.audio import get_player_filter_params
21from music_assistant.helpers.ffmpeg import FFMpeg
22from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
23
24from .constants import (
25 CONTROL_SOCKET_PATH_TEMPLATE,
26 DEFAULT_SNAPCAST_FORMAT,
27)
28
29if TYPE_CHECKING:
30 from music_assistant_models.player import PlayerMedia
31
32 from .provider import SnapCastProvider
33 from .snap_cntrl_proto import SnapstreamProto
34
35
36class SnapcastMAStream:
37 """A Music Assistant-managed Snapcast stream.
38
39 The stream lifecycle is:
40 - setup: ensure required server resources exist (Snapcast source, optional socket server)
41 - start_stream: start the FFmpeg streaming task
42 - request_stop_stream / wait_for_stopped: stop streaming and await termination
43 - destroy: stop streaming, remove Snapcast source, and stop ancillary services
44
45 If `cntrl_queue_id` is provided, a Unix socket server is started to allow a Snapcast
46 control script to communicate with Music Assistant.
47 """
48
49 def __init__(
50 self,
51 provider: SnapCastProvider,
52 media: PlayerMedia,
53 stream_name: str,
54 source_id: str | None = None,
55 filter_settings_owner: str | None = None,
56 use_cntrl_script: bool = False,
57 destroy_on_stop: bool = False,
58 ) -> None:
59 """Initialize the stream.
60
61 Args:
62 provider: The Snapcast provider instance.
63 media: The media item to stream.
64 stream_name: Name used to register the stream on the Snapcast server.
65 cntrl_queue_id: If set, enables the control socket server used by the control script.
66 filter_settings_owner: Player/entity id used to fetch DSP/filter parameters.
67 destroy_on_stop: If true, delete this MA stream once streaming stops.
68 """
69 self.media = media
70 self.stream_name = stream_name
71 self.snap_stream: SnapstreamProto | None = None
72
73 self._provider = provider
74 self._logger = provider.logger
75 self._mass = provider.mass
76 self._source_id = source_id
77 self._use_cntrl_script = use_cntrl_script
78 self._cntrl_queue_id = source_id if use_cntrl_script else None
79 self._filter_settings_owner = filter_settings_owner
80 self._destroy_on_stop = destroy_on_stop
81
82 self._lifecycle_lock = asyncio.Lock()
83 self._destroyed = False
84 self._setup_done = False
85 self._is_streaming = False
86 self._restart_requested: bool = False
87 self._stop_requested: bool = False
88 self._streaming_started_at: float | None = None
89
90 self._socket_server: SnapcastSocketServer | None = None
91 self._socket_path: str | None = None
92 self._streamer_task: asyncio.Task[None] | None = None
93 self._stop_streamer_evt = asyncio.Event()
94 self._streamer_started_evt = asyncio.Event()
95 self._stop_timer: asyncio.Handle | None = None
96 self._stop_timer_started_at: float | None = None
97 self._filter_settings: list[str] | None = None
98
99 @property
100 def source_id(self) -> str | None:
101 """Return the source id this stream was created for."""
102 return self._source_id
103
104 @property
105 def stream_id(self) -> str | None:
106 """Return the Snapcast stream identifier, if registered."""
107 if self.snap_stream:
108 return self.snap_stream.identifier
109 return None
110
111 @property
112 def is_streaming(self) -> bool:
113 """Return True if the FFmpeg streaming task is currently running."""
114 return self._is_streaming
115
116 @property
117 def playback_started_at(self) -> float | None:
118 """Return when the playback started at the clients.
119
120 return The (UTC) timestamp when the playback was started on the client
121 or None if not started yet or not streaming.
122 """
123 if self._streaming_started_at is None:
124 return None
125 if self._provider._use_builtin_server:
126 buffer_ms = self._provider._snapcast_server_buffer_size
127 if time.time() - self._streaming_started_at < buffer_ms / 1000.0:
128 return None
129 return self._streaming_started_at + buffer_ms / 1000.0
130 return self._streaming_started_at
131
132 async def setup(self) -> None:
133 """Prepare the Snapcast stream resources.
134
135 Ensures a Snapcast source exists on the server. If `cntrl_queue_id` is set,
136 also starts the Unix socket server used by the control script.
137 """
138 async with self._lifecycle_lock:
139 if self._destroyed:
140 raise RuntimeError("Session is destroyed")
141 if self._setup_done:
142 return
143 if self._provider._snapserver is None:
144 raise RuntimeError("Snapserver needs to be setup first")
145
146 if self._cntrl_queue_id:
147 await self._start_socket_server()
148
149 await self._register_tcp_server_source()
150 self._setup_done = True
151
152 async def destroy(self) -> None:
153 """Stop streaming and tear down all resources.
154
155 This stops the streamer task (if running), removes the Snapcast source,
156 and stops the optional control socket server.
157 """
158 async with self._lifecycle_lock:
159 if self._destroyed:
160 return
161 self._destroyed = True
162
163 self.request_stop_stream()
164 await self.wait_for_stopped()
165 await self._remove_snap_source()
166 await self._stop_socket_server()
167
168 async def start_stream(self, allow_restart: bool = False) -> None:
169 """Start streaming the configured media to the Snapcast source.
170
171 Raises:
172 RuntimeError: If the streamer task is already running.
173 """
174 await self.setup()
175 async with self._lifecycle_lock:
176 if self._streamer_task and not self._streamer_task.done():
177 if not allow_restart:
178 raise RuntimeError("streamer already running")
179 self._restart_if_running()
180 return
181
182 self._stop_requested = False
183 self._restart_requested = False
184 self._stop_streamer_evt.clear()
185 self._streamer_started_evt.clear()
186 self._streamer_task = self._mass.create_task(self._streamer_task_impl())
187 self._streamer_task.add_done_callback(self._on_streamer_done)
188
189 async def wait_for_started(self, timeout_sec: float | None = None) -> None:
190 """Wait until the streamer task signals it has started.
191
192 Args:
193 timeout_sec: Optional timeout in seconds.
194 """
195 try:
196 await asyncio.wait_for(self._streamer_started_evt.wait(), timeout_sec)
197 except TimeoutError:
198 self._logger.warning(
199 "Timeout waiting for stream %s to start; Canceling...",
200 self.stream_name,
201 )
202
203 def update_media(self, media: PlayerMedia) -> None:
204 """Update the media to play and restart the stream if required."""
205 if media != self.media:
206 self.media = media
207 self._restart_if_running()
208
209 def update_filter_settings(self, from_player: str | None = None) -> None:
210 """Update the filter setting."""
211 take_from = from_player or self._filter_settings_owner
212 if not take_from:
213 raise RuntimeError("No player provided to read filter settings from.")
214 new_settings = get_player_filter_params(
215 self._mass,
216 take_from,
217 DEFAULT_SNAPCAST_FORMAT,
218 DEFAULT_SNAPCAST_FORMAT,
219 )
220 if from_player:
221 self._filter_settings_owner = from_player
222 if new_settings != self._filter_settings:
223 self._restart_if_running()
224
225 def request_stop_stream(self) -> None:
226 """Request the streamer task to stop.
227
228 This is cooperative: the streamer task will stop when it observes the stop event.
229 Any pending inactivity stop timer is canceled.
230 """
231 self._stop_requested = True
232 self._restart_requested = False # explicit stop cancels any pending restart
233 self._stop_streamer_evt.set()
234
235 self._stop_timer_started_at = None
236 if self._stop_timer:
237 self._stop_timer.cancel()
238
239 def set_in_use(self, in_use: bool) -> None:
240 """Mark the stream as in-use or idle.
241
242 When marked idle, a delayed stop is scheduled. When marked in-use, any pending
243 delayed stop is canceled.
244 """
245 if in_use:
246 self._stop_timer_started_at = None
247 if self._stop_timer:
248 self._stop_timer.cancel()
249 elif self._stop_timer_started_at is None:
250 self._stop_timer_started_at = self._mass.loop.time()
251 self._stop_timer = self._mass.loop.call_later(60.0, self.request_stop_stream)
252
253 async def wait_for_stopped(self, timeout_sec: float | None = None) -> None:
254 """Wait for the streamer task to finish.
255
256 If the task does not finish within the timeout, it is canceled and awaited.
257
258 Args:
259 timeout_sec: Optional timeout in seconds.
260 """
261 curr_task = self._streamer_task
262 if not curr_task:
263 return
264 try:
265 await asyncio.wait_for(curr_task, timeout_sec)
266 except asyncio.CancelledError:
267 self._logger.warning("Streamer task got canceled")
268 except TimeoutError:
269 self._logger.warning(
270 "Timeout waiting for stream %s to finish; Canceling...",
271 self.stream_name,
272 )
273 curr_task.cancel()
274 await asyncio.gather(curr_task, return_exceptions=True)
275
276 async def _streamer_task_impl(self) -> None:
277 """Streamer task implementation.
278
279 Runs FFmpeg to push audio to the Snapcast TCP source until FFmpeg exits or a stop
280 request is received. After exit, waits briefly for the Snapcast stream to report
281 an idle state.
282 """
283 stream_path = self._snap_get_stream_path()
284 if stream_path is None:
285 raise RuntimeError("The path to stream to is not set")
286
287 self._logger.debug("Start streaming to %s", stream_path)
288 self._stop_streamer_evt.clear()
289 self._streamer_started_evt.clear()
290 if self._filter_settings_owner:
291 self._filter_settings = get_player_filter_params(
292 self._mass,
293 self._filter_settings_owner,
294 DEFAULT_SNAPCAST_FORMAT,
295 DEFAULT_SNAPCAST_FORMAT,
296 )
297 audio_source = self._mass.streams.get_stream(self.media, DEFAULT_SNAPCAST_FORMAT)
298 try:
299 async with FFMpeg(
300 audio_input=audio_source,
301 input_format=DEFAULT_SNAPCAST_FORMAT,
302 output_format=DEFAULT_SNAPCAST_FORMAT,
303 filter_params=self._filter_settings or [],
304 audio_output=stream_path,
305 extra_input_args=["-y", "-re"],
306 ) as ffmpeg_proc:
307 wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait())
308 wait_stop = self._mass.create_task(self._stop_streamer_evt.wait())
309 self._streaming_started_at = time.time()
310 self._streamer_started_evt.set()
311 self._is_streaming = True
312
313 done, pending = await asyncio.wait(
314 {wait_ffmpeg, wait_stop},
315 return_when=asyncio.FIRST_COMPLETED,
316 )
317
318 if wait_stop in done and wait_ffmpeg not in done:
319 self._logger.debug("Stopping stream %s requested.", self.stream_name)
320 wait_ffmpeg.cancel()
321 await asyncio.gather(wait_ffmpeg, return_exceptions=True)
322 return
323
324 await wait_ffmpeg
325 for t in pending:
326 t.cancel()
327 await asyncio.gather(*pending, return_exceptions=True)
328 except asyncio.CancelledError:
329 self._logger.debug("Snapcast stream %s cancelled", self.stream_name)
330 raise
331 except Exception as err:
332 self._logger.error("Snapcast stream %s error: %s", self.stream_name, err, exc_info=err)
333 raise
334 finally:
335 self._is_streaming = False
336 self._logger.debug("Finished streaming to %s", stream_path)
337 await self._wait_stream_idle()
338
339 async def _wait_stream_idle(self) -> None:
340 """Wait for the Snapcast stream to become idle after streaming ends."""
341 try:
342
343 async def wait_until_idle() -> None:
344 while True:
345 stream_is_idle = False
346 with suppress(KeyError):
347 snap_stream = self._provider._snapserver.stream(self.stream_name)
348 stream_is_idle = snap_stream.status == "idle"
349 if self._mass.closing or stream_is_idle:
350 break
351 await asyncio.sleep(0.25)
352
353 await asyncio.wait_for(wait_until_idle(), timeout=10.0)
354 except TimeoutError:
355 self._logger.warning(
356 "Timeout waiting for stream %s to become idle",
357 self.stream_name,
358 )
359 finally:
360 self._streaming_started_at = None
361
362 def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
363 """Handle streamer task completion and optional cleanup."""
364 restart = False
365 try:
366 t.result()
367 except asyncio.CancelledError:
368 self._logger.debug("Streamer task cancelled: %s", self.stream_name)
369 except Exception:
370 self._logger.exception("Streamer task failed")
371 finally:
372 restart = self._restart_requested and not self._destroyed
373
374 if self._streamer_task is t:
375 self._streamer_task = None
376
377 # reset per-run state
378 self._restart_requested = False
379 self._stop_requested = False
380 self._stop_streamer_evt.clear()
381 self._streamer_started_evt.clear()
382
383 if restart:
384 self._mass.create_task(self._restart_stream_locked())
385 elif self._destroy_on_stop:
386 self._mass.create_task(self._provider.delete_ma_stream(self.stream_name))
387
388 def _restart_if_running(self) -> None:
389 """Request a running stream to restart."""
390 t = self._streamer_task
391 if not t or t.done():
392 return
393
394 if self._stop_requested or self._stop_streamer_evt.is_set():
395 return
396
397 self._restart_requested = True
398 self._stop_requested = True
399 self._stop_streamer_evt.set()
400
401 self._stop_timer_started_at = None
402 if self._stop_timer:
403 self._stop_timer.cancel()
404
405 async def _restart_stream_locked(self) -> None:
406 """Restart the streamer under the lifecycle lock."""
407 async with self._lifecycle_lock:
408 if self._destroyed:
409 return
410 if self._streamer_task and not self._streamer_task.done():
411 return
412
413 # reset state and start a fresh run
414 self._stop_requested = False
415 self._restart_requested = False
416 self._stop_streamer_evt.clear()
417 self._streamer_started_evt.clear()
418
419 self._streamer_task = self._mass.create_task(self._streamer_task_impl())
420 self._streamer_task.add_done_callback(self._on_streamer_done)
421
422 async def _register_tcp_server_source(self) -> None:
423 """Create a Snapcast TCP source for this stream (or reuse an existing one)."""
424 # prefer to reuse existing stream if possible
425 if self.snap_stream:
426 return
427
428 # The control script is used only for music streams in the builtin server
429 extra_args = ""
430 if (cntrl_queue_id := self._cntrl_queue_id) is not None:
431 # Create socket server for control script communication
432 socket_path = self._socket_path
433 if socket_path is None:
434 raise RuntimeError("socket_path needs to be set if cntrl_queue_id is set")
435 extra_args = (
436 f"&controlscript={urllib.parse.quote_plus('control.py')}"
437 f"&controlscriptparams=--queueid={urllib.parse.quote_plus(cntrl_queue_id)}%20"
438 f"--socket={urllib.parse.quote_plus(socket_path)}%20"
439 f"--streamserver-ip={self._mass.streams.publish_ip}%20"
440 f"--streamserver-port={self._mass.streams.publish_port}"
441 )
442
443 attempts = 50
444 while attempts:
445 attempts -= 1
446 # pick a random port
447 port = random.randint(4953, 4953 + 200)
448 ## Do we need to add a time out here?
449 result = await self._provider._snapserver.stream_add_stream(
450 # NOTE: setting the sampleformat to something else
451 # (like 24 bits bit depth) does not seem to work at all!
452 f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
453 f"&idle_threshold={self._provider._snapcast_stream_idle_threshold}"
454 f"{extra_args}&name={self.stream_name}"
455 )
456 if result is None or "id" not in result:
457 # if the port is already taken, the result will be an error
458 self._logger.warning(result)
459 continue
460 ## Do we need to synchronize the snapserver repr first?
461 self.snap_stream = self._provider._snapserver.stream(result["id"])
462 self.snap_stream.set_callback(self._snap_on_stream_update)
463 return
464
465 if self._socket_server:
466 await self._stop_socket_server()
467
468 msg = "Unable to create stream - No free port found?"
469 raise RuntimeError(msg)
470
471 async def _remove_snap_source(self) -> None:
472 """Remove the Snapcast source created for this stream and detach groups."""
473 if self._mass.closing or self.snap_stream is None:
474 return
475
476 for snap_group in self._provider._snapserver.groups:
477 if snap_group.stream != self.snap_stream.identifier:
478 continue
479 self._logger.debug(f"Set stream of group {snap_group.name} to default.")
480 await snap_group.set_stream("default")
481
482 with suppress(KeyError, AttributeError):
483 snap_stream = self._provider._snapserver.stream(self.stream_name)
484 await self._provider._snapserver.stream_remove_stream(snap_stream.identifier)
485
486 if self._socket_server:
487 await self._stop_socket_server()
488 self._snap_on_stream_update()
489
490 return
491
492 def _snap_get_stream_path(self) -> str | None:
493 """Return the Snapcast TCP URI to stream to."""
494 if self.snap_stream is None:
495 return None
496
497 uri = self.snap_stream._stream.get("uri", {})
498 uri_host = uri.get("host", "")
499 stream_path = self.snap_stream.path or f"tcp://{uri_host}"
500 return stream_path.replace("0.0.0.0", self._provider._snapcast_server_host)
501
502 def _snap_on_stream_update(self, stream: SnapstreamProto | None = None) -> None:
503 """Handle Snapcast stream updates and trigger group member refresh."""
504 if self.snap_stream is None:
505 return
506
507 for snap_group in self._provider._snapserver.groups:
508 if snap_group.stream != self.snap_stream.identifier:
509 continue
510 self._provider.poke_group_members(snap_group)
511
512 async def _start_socket_server(self) -> str:
513 """Get or create a socket server for the given queue.
514
515 :return: The path to the Unix socket.
516 """
517 if self._socket_server:
518 return self._socket_server.socket_path
519
520 if self._cntrl_queue_id is None:
521 raise RuntimeError("Socket server require _cntrl_queue_id to be set")
522
523 socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=self._cntrl_queue_id)
524 socket_server = SnapcastSocketServer(
525 mass=self._mass,
526 queue_id=self._cntrl_queue_id,
527 socket_path=socket_path,
528 streamserver_ip=str(self._mass.streams.publish_ip),
529 streamserver_port=cast("int", self._mass.streams.publish_port),
530 )
531 await socket_server.start()
532 self._socket_server = socket_server
533 self._socket_path = socket_path
534 self._logger.debug(
535 "Created socket server for queue %s at %s", self._cntrl_queue_id, socket_path
536 )
537 return socket_path
538
539 async def _stop_socket_server(self) -> None:
540 """Stop and remove the socket server for the given queue."""
541 if not self._socket_server:
542 return
543
544 await self._socket_server.stop()
545 self._socket_server = None
546 self._logger.debug("Stopped socket server for queue %s", self._cntrl_queue_id)
547