/
/
/
1"""Music Assistant Snapcast source stream.
2
3This module implements a Music Assistant-managed Snapcast stream that is exposed to the
4Snapcast server as a TCP source. The stream is produced by running an FFmpeg pipeline
5which pulls audio from Music Assistant and pushes it to the Snapcast source URI.
6
7Optionally, a Unix socket server can be started to provide a control channel for a
8Snapcast control script (used by the built-in Snapcast server integration).
9"""
10
11from __future__ import annotations
12
13import asyncio
14import random
15import time
16import urllib.parse
17from contextlib import suppress
18from typing import TYPE_CHECKING, cast
19
20from music_assistant.helpers.audio import get_player_filter_params
21from music_assistant.helpers.ffmpeg import FFMpeg
22from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
23
24from .constants import (
25 CONTROL_SOCKET_PATH_TEMPLATE,
26 DEFAULT_SNAPCAST_FORMAT,
27)
28
29if TYPE_CHECKING:
30 from music_assistant_models.player import PlayerMedia
31
32 from .provider import SnapCastProvider
33 from .snap_cntrl_proto import SnapstreamProto
34
35
36class SnapcastMAStream:
37 """A Music Assistant-managed Snapcast stream.
38
39 The stream lifecycle is:
40 - setup: ensure required server resources exist (Snapcast source, optional socket server)
41 - start_stream: start the FFmpeg streaming task
42 - request_stop_stream / wait_for_stopped: stop streaming and await termination
43 - destroy: stop streaming, remove Snapcast source, and stop ancillary services
44
45 If `cntrl_queue_id` is provided, a Unix socket server is started to allow a Snapcast
46 control script to communicate with Music Assistant.
47 """
48
49 def __init__(
50 self,
51 provider: SnapCastProvider,
52 media: PlayerMedia,
53 stream_name: str,
54 source_id: str | None = None,
55 filter_settings_owner: str | None = None,
56 use_cntrl_script: bool = False,
57 destroy_on_stop: bool = False,
58 ) -> None:
59 """Initialize the stream.
60
61 Args:
62 provider: The Snapcast provider instance.
63 media: The media item to stream.
64 stream_name: Name used to register the stream on the Snapcast server.
65 cntrl_queue_id: If set, enables the control socket server used by the control script.
66 filter_settings_owner: Player/entity id used to fetch DSP/filter parameters.
67 destroy_on_stop: If true, delete this MA stream once streaming stops.
68 """
69 self.media = media
70 self.stream_name = stream_name
71 self.snap_stream: SnapstreamProto | None = None
72
73 self._provider = provider
74 self._logger = provider.logger
75 self._mass = provider.mass
76 self._source_id = source_id
77 self._use_cntrl_script = use_cntrl_script
78 self._cntrl_queue_id = source_id if use_cntrl_script else None
79 self._filter_settings_owner = filter_settings_owner
80 self._destroy_on_stop = destroy_on_stop
81
82 self._lifecycle_lock = asyncio.Lock()
83 self._destroyed = False
84 self._setup_done = False
85 self._is_streaming = False
86 self._restart_requested: bool = False
87 self._stop_requested: bool = False
88 self._streaming_started_at: float | None = None
89
90 self._socket_server: SnapcastSocketServer | None = None
91 self._socket_path: str | None = None
92 self._streamer_task: asyncio.Task[None] | None = None
93 self._stop_streamer_evt = asyncio.Event()
94 self._streamer_started_evt = asyncio.Event()
95 self._stop_timer: asyncio.Handle | None = None
96 self._stop_timer_started_at: float | None = None
97 self._filter_settings: list[str] | None = None
98
99 @property
100 def source_id(self) -> str | None:
101 """Return the source id this stream was created for."""
102 return self._source_id
103
104 @property
105 def stream_id(self) -> str | None:
106 """Return the Snapcast stream identifier, if registered."""
107 if self.snap_stream:
108 return self.snap_stream.identifier
109 return None
110
111 @property
112 def is_streaming(self) -> bool:
113 """Return True if the FFmpeg streaming task is currently running."""
114 return self._is_streaming
115
116 @property
117 def playback_started_at(self) -> float | None:
118 """Return when the playback started at the clients.
119
120 return The (UTC) timestamp when the playback was started on the client
121 or None if not started yet or not streaming.
122 """
123 if self._streaming_started_at is None:
124 return None
125 if self._provider._use_builtin_server:
126 buffer_ms = self._provider._snapcast_server_buffer_size
127 if time.time() - self._streaming_started_at < buffer_ms / 1000.0:
128 return None
129 return self._streaming_started_at + buffer_ms / 1000.0
130 return self._streaming_started_at
131
132 async def setup(self) -> None:
133 """Prepare the Snapcast stream resources.
134
135 Ensures a Snapcast source exists on the server. If `cntrl_queue_id` is set,
136 also starts the Unix socket server used by the control script.
137 """
138 async with self._lifecycle_lock:
139 if self._destroyed:
140 raise RuntimeError("Session is destroyed")
141 if self._setup_done:
142 return
143 if self._provider._snapserver is None:
144 raise RuntimeError("Snapserver needs to be setup first")
145
146 if self._cntrl_queue_id:
147 await self._start_socket_server()
148
149 await self._register_tcp_server_source()
150 self._setup_done = True
151
152 async def destroy(self) -> None:
153 """Stop streaming and tear down all resources.
154
155 This stops the streamer task (if running), removes the Snapcast source,
156 and stops the optional control socket server.
157 """
158 async with self._lifecycle_lock:
159 if self._destroyed:
160 return
161 self._destroyed = True
162
163 self.request_stop_stream()
164 await self.wait_for_stopped()
165 await self._remove_snap_source()
166 await self._stop_socket_server()
167
168 async def start_stream(self, allow_restart: bool = False) -> None:
169 """Start streaming the configured media to the Snapcast source.
170
171 Raises:
172 RuntimeError: If the streamer task is already running.
173 """
174 await self.setup()
175 async with self._lifecycle_lock:
176 if self._streamer_task and not self._streamer_task.done():
177 if not allow_restart:
178 raise RuntimeError("streamer already running")
179 self._restart_if_running()
180 return
181
182 self._stop_requested = False
183 self._restart_requested = False
184 self._stop_streamer_evt.clear()
185 self._streamer_started_evt.clear()
186 self._streamer_task = self._mass.create_task(self._streamer_task_impl())
187 self._streamer_task.add_done_callback(self._on_streamer_done)
188
189 async def wait_for_started(self, timeout_sec: float | None = None) -> None:
190 """Wait until the streamer task signals it has started.
191
192 Args:
193 timeout_sec: Optional timeout in seconds.
194 """
195 try:
196 await asyncio.wait_for(self._streamer_started_evt.wait(), timeout_sec)
197 except TimeoutError:
198 self._logger.warning(
199 "Timeout waiting for stream %s to start; Canceling...",
200 self.stream_name,
201 )
202
203 def update_media(self, media: PlayerMedia) -> None:
204 """Update the media to play and restart the stream if required."""
205 if media != self.media:
206 self.media = media
207 self._restart_if_running()
208
209 def update_filter_settings(self, from_player: str | None = None) -> None:
210 """Update the filter setting."""
211 take_from = from_player or self._filter_settings_owner
212 if not take_from:
213 raise RuntimeError("No player provided to read filter settings from.")
214 new_settings = get_player_filter_params(
215 self._mass,
216 take_from,
217 DEFAULT_SNAPCAST_FORMAT,
218 DEFAULT_SNAPCAST_FORMAT,
219 )
220 if from_player:
221 self._filter_settings_owner = from_player
222 if new_settings != self._filter_settings:
223 self._restart_if_running()
224
225 def request_stop_stream(self) -> None:
226 """Request the streamer task to stop.
227
228 This is cooperative: the streamer task will stop when it observes the stop event.
229 Any pending inactivity stop timer is canceled.
230 """
231 self._stop_requested = True
232 self._restart_requested = False # explicit stop cancels any pending restart
233 self._stop_streamer_evt.set()
234
235 self._stop_timer_started_at = None
236 if self._stop_timer:
237 self._stop_timer.cancel()
238
239 def set_in_use(self, in_use: bool) -> None:
240 """Mark the stream as in-use or idle.
241
242 When marked idle, a delayed stop is scheduled. When marked in-use, any pending
243 delayed stop is canceled.
244 """
245 if in_use:
246 self._stop_timer_started_at = None
247 if self._stop_timer:
248 self._stop_timer.cancel()
249 elif self._stop_timer_started_at is None:
250 self._stop_timer_started_at = self._mass.loop.time()
251 self._stop_timer = self._mass.loop.call_later(60.0, self.request_stop_stream)
252
253 async def wait_for_stopped(self, timeout_sec: float | None = None) -> None:
254 """Wait for the streamer task to finish.
255
256 If the task does not finish within the timeout, it is canceled and awaited.
257
258 Args:
259 timeout_sec: Optional timeout in seconds.
260 """
261 curr_task = self._streamer_task
262 if not curr_task:
263 return
264 try:
265 await asyncio.wait_for(curr_task, timeout_sec)
266 except asyncio.CancelledError:
267 self._logger.warning("Streamer task got canceled")
268 except TimeoutError:
269 self._logger.warning(
270 "Timeout waiting for stream %s to finish; Canceling...",
271 self.stream_name,
272 )
273 curr_task.cancel()
274 await asyncio.gather(curr_task, return_exceptions=True)
275
276 async def _streamer_task_impl(self) -> None:
277 """Streamer task implementation.
278
279 Runs FFmpeg to push audio to the Snapcast TCP source until FFmpeg exits or a stop
280 request is received. After exit, waits briefly for the Snapcast stream to report
281 an idle state.
282 """
283 stream_path = self._snap_get_stream_path()
284 if stream_path is None:
285 raise RuntimeError("The path to stream to is not set")
286
287 self._logger.debug("Start streaming to %s", stream_path)
288 self._stop_streamer_evt.clear()
289 self._streamer_started_evt.clear()
290 if self._filter_settings_owner:
291 self._filter_settings = get_player_filter_params(
292 self._mass,
293 self._filter_settings_owner,
294 DEFAULT_SNAPCAST_FORMAT,
295 DEFAULT_SNAPCAST_FORMAT,
296 )
297 audio_source = self._mass.streams.get_stream(
298 self.media, DEFAULT_SNAPCAST_FORMAT, self._filter_settings_owner
299 )
300 try:
301 async with FFMpeg(
302 audio_input=audio_source,
303 input_format=DEFAULT_SNAPCAST_FORMAT,
304 output_format=DEFAULT_SNAPCAST_FORMAT,
305 filter_params=self._filter_settings or [],
306 audio_output=stream_path,
307 extra_input_args=["-y", "-re"],
308 ) as ffmpeg_proc:
309 wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait())
310 wait_stop = self._mass.create_task(self._stop_streamer_evt.wait())
311 self._streaming_started_at = time.time()
312 self._streamer_started_evt.set()
313 self._is_streaming = True
314
315 done, pending = await asyncio.wait(
316 {wait_ffmpeg, wait_stop},
317 return_when=asyncio.FIRST_COMPLETED,
318 )
319
320 if wait_stop in done and wait_ffmpeg not in done:
321 self._logger.debug("Stopping stream %s requested.", self.stream_name)
322 wait_ffmpeg.cancel()
323 await asyncio.gather(wait_ffmpeg, return_exceptions=True)
324 return
325
326 await wait_ffmpeg
327 for t in pending:
328 t.cancel()
329 await asyncio.gather(*pending, return_exceptions=True)
330 except asyncio.CancelledError:
331 self._logger.debug("Snapcast stream %s cancelled", self.stream_name)
332 raise
333 except Exception as err:
334 self._logger.error("Snapcast stream %s error: %s", self.stream_name, err, exc_info=err)
335 raise
336 finally:
337 self._is_streaming = False
338 self._logger.debug("Finished streaming to %s", stream_path)
339 await self._wait_stream_idle()
340
341 async def _wait_stream_idle(self) -> None:
342 """Wait for the Snapcast stream to become idle after streaming ends."""
343 try:
344
345 async def wait_until_idle() -> None:
346 while True:
347 stream_is_idle = False
348 with suppress(KeyError):
349 snap_stream = self._provider._snapserver.stream(self.stream_name)
350 stream_is_idle = snap_stream.status == "idle"
351 if self._mass.closing or stream_is_idle:
352 break
353 await asyncio.sleep(0.25)
354
355 await asyncio.wait_for(wait_until_idle(), timeout=10.0)
356 except TimeoutError:
357 self._logger.warning(
358 "Timeout waiting for stream %s to become idle",
359 self.stream_name,
360 )
361 finally:
362 self._streaming_started_at = None
363
364 def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
365 """Handle streamer task completion and optional cleanup."""
366 restart = False
367 try:
368 t.result()
369 except asyncio.CancelledError:
370 self._logger.debug("Streamer task cancelled: %s", self.stream_name)
371 except Exception:
372 self._logger.exception("Streamer task failed")
373 finally:
374 restart = self._restart_requested and not self._destroyed
375
376 if self._streamer_task is t:
377 self._streamer_task = None
378
379 # reset per-run state
380 self._restart_requested = False
381 self._stop_requested = False
382 self._stop_streamer_evt.clear()
383 self._streamer_started_evt.clear()
384
385 if restart:
386 self._mass.create_task(self._restart_stream_locked())
387 elif self._destroy_on_stop:
388 self._mass.create_task(self._provider.delete_ma_stream(self.stream_name))
389
390 def _restart_if_running(self) -> None:
391 """Request a running stream to restart."""
392 t = self._streamer_task
393 if not t or t.done():
394 return
395
396 if self._stop_requested or self._stop_streamer_evt.is_set():
397 return
398
399 self._restart_requested = True
400 self._stop_requested = True
401 self._stop_streamer_evt.set()
402
403 self._stop_timer_started_at = None
404 if self._stop_timer:
405 self._stop_timer.cancel()
406
407 async def _restart_stream_locked(self) -> None:
408 """Restart the streamer under the lifecycle lock."""
409 async with self._lifecycle_lock:
410 if self._destroyed:
411 return
412 if self._streamer_task and not self._streamer_task.done():
413 return
414
415 # reset state and start a fresh run
416 self._stop_requested = False
417 self._restart_requested = False
418 self._stop_streamer_evt.clear()
419 self._streamer_started_evt.clear()
420
421 self._streamer_task = self._mass.create_task(self._streamer_task_impl())
422 self._streamer_task.add_done_callback(self._on_streamer_done)
423
424 async def _register_tcp_server_source(self) -> None:
425 """Create a Snapcast TCP source for this stream (or reuse an existing one)."""
426 # prefer to reuse existing stream if possible
427 if self.snap_stream:
428 return
429
430 # The control script is used only for music streams in the builtin server
431 extra_args = ""
432 if (cntrl_queue_id := self._cntrl_queue_id) is not None:
433 # Create socket server for control script communication
434 socket_path = self._socket_path
435 if socket_path is None:
436 raise RuntimeError("socket_path needs to be set if cntrl_queue_id is set")
437 extra_args = (
438 f"&controlscript={urllib.parse.quote_plus('control.py')}"
439 f"&controlscriptparams=--queueid={urllib.parse.quote_plus(cntrl_queue_id)}%20"
440 f"--socket={urllib.parse.quote_plus(socket_path)}%20"
441 f"--streamserver-ip={self._mass.streams.publish_ip}%20"
442 f"--streamserver-port={self._mass.streams.publish_port}"
443 )
444
445 attempts = 50
446 while attempts:
447 attempts -= 1
448 # pick a random port
449 port = random.randint(4953, 4953 + 200)
450 ## Do we need to add a time out here?
451 result = await self._provider._snapserver.stream_add_stream(
452 # NOTE: setting the sampleformat to something else
453 # (like 24 bits bit depth) does not seem to work at all!
454 f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
455 f"&idle_threshold={self._provider._snapcast_stream_idle_threshold}"
456 f"{extra_args}&name={self.stream_name}"
457 )
458 if result is None or "id" not in result:
459 # if the port is already taken, the result will be an error
460 self._logger.warning(result)
461 continue
462 ## Do we need to synchronize the snapserver repr first?
463 self.snap_stream = self._provider._snapserver.stream(result["id"])
464 self.snap_stream.set_callback(self._snap_on_stream_update)
465 return
466
467 if self._socket_server:
468 await self._stop_socket_server()
469
470 msg = "Unable to create stream - No free port found?"
471 raise RuntimeError(msg)
472
473 async def _remove_snap_source(self) -> None:
474 """Remove the Snapcast source created for this stream and detach groups."""
475 if self._mass.closing or self.snap_stream is None:
476 return
477
478 for snap_group in self._provider._snapserver.groups:
479 if snap_group.stream != self.snap_stream.identifier:
480 continue
481 self._logger.debug(f"Set stream of group {snap_group.name} to default.")
482 await snap_group.set_stream("default")
483
484 with suppress(KeyError, AttributeError):
485 snap_stream = self._provider._snapserver.stream(self.stream_name)
486 await self._provider._snapserver.stream_remove_stream(snap_stream.identifier)
487
488 if self._socket_server:
489 await self._stop_socket_server()
490 self._snap_on_stream_update()
491
492 return
493
494 def _snap_get_stream_path(self) -> str | None:
495 """Return the Snapcast TCP URI to stream to."""
496 if self.snap_stream is None:
497 return None
498
499 uri = self.snap_stream._stream.get("uri", {})
500 uri_host = uri.get("host", "")
501 stream_path = self.snap_stream.path or f"tcp://{uri_host}"
502 return stream_path.replace("0.0.0.0", self._provider._snapcast_server_host)
503
504 def _snap_on_stream_update(self, stream: SnapstreamProto | None = None) -> None:
505 """Handle Snapcast stream updates and trigger group member refresh."""
506 if self.snap_stream is None:
507 return
508
509 for snap_group in self._provider._snapserver.groups:
510 if snap_group.stream != self.snap_stream.identifier:
511 continue
512 self._provider.poke_group_members(snap_group)
513
514 async def _start_socket_server(self) -> str:
515 """Get or create a socket server for the given queue.
516
517 :return: The path to the Unix socket.
518 """
519 if self._socket_server:
520 return self._socket_server.socket_path
521
522 if self._cntrl_queue_id is None:
523 raise RuntimeError("Socket server require _cntrl_queue_id to be set")
524
525 socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=self._cntrl_queue_id)
526 socket_server = SnapcastSocketServer(
527 mass=self._mass,
528 queue_id=self._cntrl_queue_id,
529 socket_path=socket_path,
530 streamserver_ip=str(self._mass.streams.publish_ip),
531 streamserver_port=cast("int", self._mass.streams.publish_port),
532 )
533 await socket_server.start()
534 self._socket_server = socket_server
535 self._socket_path = socket_path
536 self._logger.debug(
537 "Created socket server for queue %s at %s", self._cntrl_queue_id, socket_path
538 )
539 return socket_path
540
541 async def _stop_socket_server(self) -> None:
542 """Stop and remove the socket server for the given queue."""
543 if not self._socket_server:
544 return
545
546 await self._socket_server.stop()
547 self._socket_server = None
548 self._logger.debug("Stopped socket server for queue %s", self._cntrl_queue_id)
549