/
/
/
1"""Music Assistant Snapcast source stream.
2
3This module implements a Music Assistant-managed Snapcast stream that is exposed to the
4Snapcast server as a TCP source. The stream is produced by running an FFmpeg pipeline
5which pulls audio from Music Assistant and pushes it to the Snapcast source URI.
6
7Optionally, a Unix socket server can be started to provide a control channel for a
8Snapcast control script (used by the built-in Snapcast server integration).
9"""
10
11from __future__ import annotations
12
13import asyncio
14import random
15import time
16import urllib.parse
17from contextlib import suppress
18from typing import TYPE_CHECKING, cast
19
20from music_assistant.helpers.audio import get_player_filter_params
21from music_assistant.helpers.ffmpeg import FFMpeg
22from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
23
24from .constants import (
25 CONTROL_SOCKET_PATH_TEMPLATE,
26 DEFAULT_SNAPCAST_FORMAT,
27)
28
29if TYPE_CHECKING:
30 from music_assistant_models.player import PlayerMedia
31
32 from .provider import SnapCastProvider
33 from .snap_cntrl_proto import SnapstreamProto
34
35
36class SnapcastMAStream:
37 """A Music Assistant-managed Snapcast stream.
38
39 The stream lifecycle is:
40 - setup: ensure required server resources exist (Snapcast source, optional socket server)
41 - start_stream: start the FFmpeg streaming task
42 - request_stop_stream / wait_for_stopped: stop streaming and await termination
43 - destroy: stop streaming, remove Snapcast source, and stop ancillary services
44
45 If `cntrl_queue_id` is provided, a Unix socket server is started to allow a Snapcast
46 control script to communicate with Music Assistant.
47 """
48
49 def __init__(
50 self,
51 provider: SnapCastProvider,
52 media: PlayerMedia,
53 stream_name: str,
54 source_id: str | None = None,
55 filter_settings_owner: str | None = None,
56 use_cntrl_script: bool = False,
57 destroy_on_stop: bool = False,
58 ) -> None:
59 """Initialize the stream.
60
61 Args:
62 provider: The Snapcast provider instance.
63 media: The media item to stream.
64 stream_name: Name used to register the stream on the Snapcast server.
65 cntrl_queue_id: If set, enables the control socket server used by the control script.
66 filter_settings_owner: Player/entity id used to fetch DSP/filter parameters.
67 destroy_on_stop: If true, delete this MA stream once streaming stops.
68 """
69 self.media = media
70 self.stream_name = stream_name
71 self.snap_stream: SnapstreamProto | None = None
72
73 self._provider = provider
74 self._logger = provider.logger
75 self._mass = provider.mass
76 self._source_id = source_id
77 self._use_cntrl_script = use_cntrl_script
78 self._cntrl_queue_id = source_id if use_cntrl_script else None
79 self._filter_settings_owner = filter_settings_owner
80 self._destroy_on_stop = destroy_on_stop
81
82 self._lifecycle_lock = asyncio.Lock()
83 self._destroyed = False
84 self._setup_done = False
85 self._is_streaming = False
86 self._restart_requested: bool = False
87 self._stop_requested: bool = False
88 self._streaming_started_at: float | None = None
89
90 self._socket_server: SnapcastSocketServer | None = None
91 self._socket_path: str | None = None
92 self._streamer_task: asyncio.Task[None] | None = None
93 self._stop_streamer_evt = asyncio.Event()
94 self._streamer_started_evt = asyncio.Event()
95 self._stop_timer: asyncio.Handle | None = None
96 self._stop_timer_started_at: float | None = None
97 self._filter_settings: list[str] | None = None
98
99 @property
100 def source_id(self) -> str | None:
101 """Return the source id this stream was created for."""
102 return self._source_id
103
104 @property
105 def stream_id(self) -> str | None:
106 """Return the Snapcast stream identifier, if registered."""
107 if self.snap_stream:
108 return self.snap_stream.identifier
109 return None
110
111 @property
112 def is_streaming(self) -> bool:
113 """Return True if the FFmpeg streaming task is currently running."""
114 return self._is_streaming
115
116 @property
117 def playback_started_at(self) -> float | None:
118 """Return when the playback started at the clients.
119
120 return The (UTC) timestamp when the playback was started on the client
121 or None if not started yet or not streaming.
122 """
123 if self._streaming_started_at is None:
124 return None
125 if self._provider._use_builtin_server:
126 buffer_ms = self._provider._snapcast_server_buffer_size
127 if time.time() - self._streaming_started_at < buffer_ms / 1000.0:
128 return None
129 return self._streaming_started_at + buffer_ms / 1000.0
130 return self._streaming_started_at
131
132 async def setup(self) -> None:
133 """Prepare the Snapcast stream resources.
134
135 Ensures a Snapcast source exists on the server. If `cntrl_queue_id` is set,
136 also starts the Unix socket server used by the control script.
137 """
138 async with self._lifecycle_lock:
139 if self._destroyed:
140 raise RuntimeError("Session is destroyed")
141 if self._setup_done:
142 return
143 if self._provider._snapserver is None:
144 raise RuntimeError("Snapserver needs to be setup first")
145
146 if self._cntrl_queue_id:
147 await self._start_socket_server()
148
149 await self._register_tcp_server_source()
150 self._setup_done = True
151
152 async def destroy(self) -> None:
153 """Stop streaming and tear down all resources.
154
155 This stops the streamer task (if running), removes the Snapcast source,
156 and stops the optional control socket server.
157 """
158 async with self._lifecycle_lock:
159 if self._destroyed:
160 return
161 self._destroyed = True
162
163 self.request_stop_stream()
164 await self.wait_for_stopped()
165 await self._remove_snap_source()
166 await self._stop_socket_server()
167
168 async def start_stream(self, allow_restart: bool = False) -> None:
169 """Start streaming the configured media to the Snapcast source.
170
171 Raises:
172 RuntimeError: If the streamer task is already running.
173 """
174 await self.setup()
175 async with self._lifecycle_lock:
176 if self._streamer_task and not self._streamer_task.done():
177 if not allow_restart:
178 raise RuntimeError("streamer already running")
179 self._restart_if_running()
180 return
181
182 self._stop_requested = False
183 self._restart_requested = False
184 self._stop_streamer_evt.clear()
185 self._streamer_started_evt.clear()
186 self._streamer_task = self._mass.create_task(self._streamer_task_impl())
187 self._streamer_task.add_done_callback(self._on_streamer_done)
188
189 async def wait_for_started(self, timeout_sec: float | None = None) -> None:
190 """Wait until the streamer task signals it has started.
191
192 Args:
193 timeout_sec: Optional timeout in seconds.
194 """
195 try:
196 await asyncio.wait_for(self._streamer_started_evt.wait(), timeout_sec)
197 except TimeoutError:
198 self._logger.warning(
199 "Timeout waiting for stream %s to start; Canceling...",
200 self.stream_name,
201 )
202
203 def update_media(self, media: PlayerMedia) -> None:
204 """Update the media to play and restart the stream if required."""
205 if media != self.media:
206 self.media = media
207 self._restart_if_running()
208
209 def update_filter_settings(self, from_player: str | None = None) -> None:
210 """Update the filter setting."""
211 take_from = from_player or self._filter_settings_owner
212 if not take_from:
213 raise RuntimeError("No player provided to read filter settings from.")
214 new_settings = get_player_filter_params(
215 self._mass,
216 take_from,
217 DEFAULT_SNAPCAST_FORMAT,
218 DEFAULT_SNAPCAST_FORMAT,
219 )
220 if from_player:
221 self._filter_settings_owner = from_player
222 if new_settings != self._filter_settings:
223 self._restart_if_running()
224
225 def request_stop_stream(self) -> None:
226 """Request the streamer task to stop.
227
228 This is cooperative: the streamer task will stop when it observes the stop event.
229 Any pending inactivity stop timer is canceled.
230 """
231 self._stop_requested = True
232 self._restart_requested = False # explicit stop cancels any pending restart
233 self._stop_streamer_evt.set()
234
235 self._stop_timer_started_at = None
236 if self._stop_timer:
237 self._stop_timer.cancel()
238
239 def set_in_use(self, in_use: bool) -> None:
240 """Mark the stream as in-use or idle.
241
242 When marked idle, a delayed stop is scheduled. When marked in-use, any pending
243 delayed stop is canceled.
244 """
245 if in_use:
246 self._stop_timer_started_at = None
247 if self._stop_timer:
248 self._stop_timer.cancel()
249 elif self._stop_timer_started_at is None:
250 self._stop_timer_started_at = self._mass.loop.time()
251 self._stop_timer = self._mass.loop.call_later(60.0, self.request_stop_stream)
252
253 async def wait_for_stopped(self, timeout_sec: float | None = None) -> None:
254 """Wait for the streamer task to finish.
255
256 If the task does not finish within the timeout, it is canceled and awaited.
257
258 Args:
259 timeout_sec: Optional timeout in seconds.
260 """
261 curr_task = self._streamer_task
262 if not curr_task:
263 return
264 try:
265 await asyncio.wait_for(curr_task, timeout_sec)
266 except asyncio.CancelledError:
267 self._logger.warning("Streamer task got canceled")
268 except TimeoutError:
269 self._logger.warning(
270 "Timeout waiting for stream %s to finish; Canceling...",
271 self.stream_name,
272 )
273 curr_task.cancel()
274 await asyncio.gather(curr_task, return_exceptions=True)
275
276 async def _streamer_task_impl(self) -> None:
277 """Streamer task implementation.
278
279 Runs FFmpeg to push audio to the Snapcast TCP source until FFmpeg exits or a stop
280 request is received. After exit, waits briefly for the Snapcast stream to report
281 an idle state.
282 """
283 stream_path = self._snap_get_stream_path()
284 if stream_path is None:
285 raise RuntimeError("The path to stream to is not set")
286
287 self._logger.debug("Start streaming to %s", stream_path)
288 self._stop_streamer_evt.clear()
289 self._streamer_started_evt.clear()
290 if self._filter_settings_owner:
291 self._filter_settings = get_player_filter_params(
292 self._mass,
293 self._filter_settings_owner,
294 DEFAULT_SNAPCAST_FORMAT,
295 DEFAULT_SNAPCAST_FORMAT,
296 )
297 audio_source = self._mass.streams.get_stream(self.media, DEFAULT_SNAPCAST_FORMAT)
298 try:
299 async with FFMpeg(
300 audio_input=audio_source,
301 input_format=DEFAULT_SNAPCAST_FORMAT,
302 output_format=DEFAULT_SNAPCAST_FORMAT,
303 filter_params=self._filter_settings or [],
304 audio_output=stream_path,
305 extra_input_args=["-y", "-re"],
306 ) as ffmpeg_proc:
307 wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait())
308 wait_stop = self._mass.create_task(self._stop_streamer_evt.wait())
309 self._streaming_started_at = time.time()
310 self._streamer_started_evt.set()
311 self._is_streaming = True
312
313 done, pending = await asyncio.wait(
314 {wait_ffmpeg, wait_stop},
315 return_when=asyncio.FIRST_COMPLETED,
316 )
317
318 if wait_stop in done and wait_ffmpeg not in done:
319 self._logger.debug("Stopping stream %s requested.", self.stream_name)
320 wait_ffmpeg.cancel()
321 await asyncio.gather(wait_ffmpeg, return_exceptions=True)
322 return
323
324 await wait_ffmpeg
325 for t in pending:
326 t.cancel()
327 await asyncio.gather(*pending, return_exceptions=True)
328
329 finally:
330 self._is_streaming = False
331 self._logger.debug("Finished streaming to %s", stream_path)
332 # Wait a bit for snap stream to become idle
333 try:
334
335 async def wait_until_idle() -> None:
336 while True:
337 stream_is_idle = False
338 with suppress(KeyError):
339 snap_stream = self._provider._snapserver.stream(self.stream_name)
340 stream_is_idle = snap_stream.status == "idle"
341 if self._mass.closing or stream_is_idle:
342 break
343 await asyncio.sleep(0.25)
344
345 await asyncio.wait_for(wait_until_idle(), timeout=10.0)
346 except TimeoutError:
347 self._logger.warning(
348 "Timeout waiting for stream %s to become idle",
349 self.stream_name,
350 )
351 finally:
352 self._streaming_started_at = None
353
354 def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
355 """Handle streamer task completion and optional cleanup."""
356 restart = False
357 try:
358 t.result()
359 except asyncio.CancelledError:
360 self._logger.debug("Streamer task cancelled: %s", self.stream_name)
361 except Exception:
362 self._logger.exception("Streamer task failed")
363 finally:
364 restart = self._restart_requested and not self._destroyed
365
366 if self._streamer_task is t:
367 self._streamer_task = None
368
369 # reset per-run state
370 self._restart_requested = False
371 self._stop_requested = False
372 self._stop_streamer_evt.clear()
373 self._streamer_started_evt.clear()
374
375 if restart:
376 self._mass.create_task(self._restart_stream_locked())
377 elif self._destroy_on_stop:
378 self._mass.create_task(self._provider.delete_ma_stream(self.stream_name))
379
380 def _restart_if_running(self) -> None:
381 """Request a running stream to restart."""
382 t = self._streamer_task
383 if not t or t.done():
384 return
385
386 if self._stop_requested or self._stop_streamer_evt.is_set():
387 return
388
389 self._restart_requested = True
390 self._stop_requested = True
391 self._stop_streamer_evt.set()
392
393 self._stop_timer_started_at = None
394 if self._stop_timer:
395 self._stop_timer.cancel()
396
397 async def _restart_stream_locked(self) -> None:
398 """Restart the streamer under the lifecycle lock."""
399 async with self._lifecycle_lock:
400 if self._destroyed:
401 return
402 if self._streamer_task and not self._streamer_task.done():
403 return
404
405 # reset state and start a fresh run
406 self._stop_requested = False
407 self._restart_requested = False
408 self._stop_streamer_evt.clear()
409 self._streamer_started_evt.clear()
410
411 self._streamer_task = self._mass.create_task(self._streamer_task_impl())
412 self._streamer_task.add_done_callback(self._on_streamer_done)
413
414 async def _register_tcp_server_source(self) -> None:
415 """Create a Snapcast TCP source for this stream (or reuse an existing one)."""
416 # prefer to reuse existing stream if possible
417 if self.snap_stream:
418 return
419
420 # The control script is used only for music streams in the builtin server
421 extra_args = ""
422 if (cntrl_queue_id := self._cntrl_queue_id) is not None:
423 # Create socket server for control script communication
424 socket_path = self._socket_path
425 if socket_path is None:
426 raise RuntimeError("socket_path needs to be set if cntrl_queue_id is set")
427 extra_args = (
428 f"&controlscript={urllib.parse.quote_plus('control.py')}"
429 f"&controlscriptparams=--queueid={urllib.parse.quote_plus(cntrl_queue_id)}%20"
430 f"--socket={urllib.parse.quote_plus(socket_path)}%20"
431 f"--streamserver-ip={self._mass.streams.publish_ip}%20"
432 f"--streamserver-port={self._mass.streams.publish_port}"
433 )
434
435 attempts = 50
436 while attempts:
437 attempts -= 1
438 # pick a random port
439 port = random.randint(4953, 4953 + 200)
440 ## Do we need to add a time out here?
441 result = await self._provider._snapserver.stream_add_stream(
442 # NOTE: setting the sampleformat to something else
443 # (like 24 bits bit depth) does not seem to work at all!
444 f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
445 f"&idle_threshold={self._provider._snapcast_stream_idle_threshold}"
446 f"{extra_args}&name={self.stream_name}"
447 )
448 if result is None or "id" not in result:
449 # if the port is already taken, the result will be an error
450 self._logger.warning(result)
451 continue
452 ## Do we need to synchronize the snapserver repr first?
453 self.snap_stream = self._provider._snapserver.stream(result["id"])
454 self.snap_stream.set_callback(self._snap_on_stream_update)
455 return
456
457 if self._socket_server:
458 await self._stop_socket_server()
459
460 msg = "Unable to create stream - No free port found?"
461 raise RuntimeError(msg)
462
463 async def _remove_snap_source(self) -> None:
464 """Remove the Snapcast source created for this stream and detach groups."""
465 if self._mass.closing or self.snap_stream is None:
466 return
467
468 for snap_group in self._provider._snapserver.groups:
469 if snap_group.stream != self.snap_stream.identifier:
470 continue
471 self._logger.debug(f"Set stream of group {snap_group.name} to default.")
472 await snap_group.set_stream("default")
473
474 with suppress(KeyError, AttributeError):
475 snap_stream = self._provider._snapserver.stream(self.stream_name)
476 await self._provider._snapserver.stream_remove_stream(snap_stream.identifier)
477
478 if self._socket_server:
479 await self._stop_socket_server()
480 self._snap_on_stream_update()
481
482 return
483
484 def _snap_get_stream_path(self) -> str | None:
485 """Return the Snapcast TCP URI to stream to."""
486 if self.snap_stream is None:
487 return None
488
489 uri = self.snap_stream._stream.get("uri", {})
490 uri_host = uri.get("host", "")
491 stream_path = self.snap_stream.path or f"tcp://{uri_host}"
492 return stream_path.replace("0.0.0.0", self._provider._snapcast_server_host)
493
494 def _snap_on_stream_update(self, stream: SnapstreamProto | None = None) -> None:
495 """Handle Snapcast stream updates and trigger group member refresh."""
496 if self.snap_stream is None:
497 return
498
499 for snap_group in self._provider._snapserver.groups:
500 if snap_group.stream != self.snap_stream.identifier:
501 continue
502 self._provider.poke_group_members(snap_group)
503
504 async def _start_socket_server(self) -> str:
505 """Get or create a socket server for the given queue.
506
507 :return: The path to the Unix socket.
508 """
509 if self._socket_server:
510 return self._socket_server.socket_path
511
512 if self._cntrl_queue_id is None:
513 raise RuntimeError("Socket server require _cntrl_queue_id to be set")
514
515 socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=self._cntrl_queue_id)
516 socket_server = SnapcastSocketServer(
517 mass=self._mass,
518 queue_id=self._cntrl_queue_id,
519 socket_path=socket_path,
520 streamserver_ip=str(self._mass.streams.publish_ip),
521 streamserver_port=cast("int", self._mass.streams.publish_port),
522 )
523 await socket_server.start()
524 self._socket_server = socket_server
525 self._socket_path = socket_path
526 self._logger.debug(
527 "Created socket server for queue %s at %s", self._cntrl_queue_id, socket_path
528 )
529 return socket_path
530
531 async def _stop_socket_server(self) -> None:
532 """Stop and remove the socket server for the given queue."""
533 if not self._socket_server:
534 return
535
536 await self._socket_server.stop()
537 self._socket_server = None
538 self._logger.debug("Stopped socket server for queue %s", self._cntrl_queue_id)
539