/
/
/
1"""SnapCastProvider."""
2
3from __future__ import annotations
4
5import asyncio
6import hashlib
7import logging
8import re
9import shutil
10import socket
11from contextlib import suppress
12from pathlib import Path
13from typing import TYPE_CHECKING, cast
14
15from bidict import bidict
16from music_assistant_models.enums import MediaType, PlaybackState
17from music_assistant_models.errors import SetupFailedError
18from snapcast.control.server import CONTROL_PORT, Snapserver
19from zeroconf import NonUniqueNameException
20from zeroconf.asyncio import AsyncServiceInfo
21
22from music_assistant.helpers.compare import create_safe_string
23from music_assistant.helpers.process import AsyncProcess
24from music_assistant.helpers.util import get_ip_pton
25from music_assistant.models.player_provider import PlayerProvider
26from music_assistant.providers.snapcast.constants import (
27 CONF_SERVER_BUFFER_SIZE,
28 CONF_SERVER_CHUNK_MS,
29 CONF_SERVER_CONTROL_PORT,
30 CONF_SERVER_HOST,
31 CONF_SERVER_INITIAL_VOLUME,
32 CONF_SERVER_SEND_AUDIO_TO_MUTED,
33 CONF_SERVER_TRANSPORT_CODEC,
34 CONF_STREAM_IDLE_THRESHOLD,
35 CONF_USE_EXTERNAL_SERVER,
36 CONTROL_SCRIPT,
37 DEFAULT_SNAPSERVER_CONFIG_FILE,
38 DEFAULT_SNAPSERVER_PLUGIN_DIR,
39 DEFAULT_SNAPSERVER_PORT,
40 MASS_ANNOUNCEMENT_POSTFIX,
41 MASS_STREAM_PREFIX,
42 SHIPPED_SNAPSERVER_CONFIG_FILE,
43 SNAPWEB_DIR,
44)
45from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
46from music_assistant.providers.snapcast.player import SnapCastPlayer
47from music_assistant.providers.universal_group.constants import UGP_PREFIX
48
49if TYPE_CHECKING:
50 from music_assistant_models.player import PlayerMedia
51
52 from .snap_cntrl_proto import SnapclientProto, SnapgroupProto, SnapserverProto
53
54
55async def _create_cntrl_server(
56 loop: asyncio.AbstractEventLoop,
57 host: str,
58 port: int = CONTROL_PORT,
59 reconnect: bool = False,
60) -> SnapserverProto:
61 """Server factory."""
62 server = Snapserver(loop, host, port, reconnect)
63 await server.start()
64 return cast("SnapserverProto", server)
65
66
67class SnapCastProvider(PlayerProvider):
68 """SnapCastProvider."""
69
70 _snapserver: SnapserverProto
71 _snapserver_runner: asyncio.Task[None] | None
72 _snapserver_started: asyncio.Event | None
73 _snapcast_server_host: str
74 _snapcast_server_control_port: int
75 _ids_map: bidict[str, str] # ma_id / snapclient_id
76 _use_builtin_server: bool
77 _stop_called: bool
78 _controlscript_available: bool
79 _snapcast_ma_streams: dict[str, SnapcastMAStream]
80 _snapcast_ma_streams_lock: asyncio.Lock
81
82 @property
83 def queue_control_available(self) -> bool:
84 """Return whether queue-based control scripts are available.
85
86 Indicates if the Snapcast control script has been successfully initialized
87 and can be used to control playback via a queue-specific control channel.
88 """
89 return (
90 self._use_builtin_server
91 and self._controlscript_available
92 and self._snapserver_started is not None
93 and self._snapserver_started.is_set()
94 )
95
96 async def handle_async_init(self) -> None:
97 """Handle async initialization of the provider."""
98 # set snapcast logging
99 logging.getLogger("snapcast").setLevel(self.logger.level)
100 self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER)
101 self._stop_called = False
102 self._controlscript_available = False
103 if self._use_builtin_server:
104 if Path(DEFAULT_SNAPSERVER_CONFIG_FILE).exists():
105 self._snapcast_server_config_file = DEFAULT_SNAPSERVER_CONFIG_FILE
106 else:
107 # Fallback for dev environments without a Snapserver config file.
108 # If the file is missing, Snapserver silently ignores all command-line arguments.
109 self._snapcast_server_config_file = str(SHIPPED_SNAPSERVER_CONFIG_FILE)
110
111 self._snapcast_server_host = "127.0.0.1"
112 self._snapcast_server_control_port = DEFAULT_SNAPSERVER_PORT
113 self._snapcast_server_buffer_size = cast(
114 "int", self.config.get_value(CONF_SERVER_BUFFER_SIZE)
115 )
116 self._snapcast_server_chunk_ms = self.config.get_value(CONF_SERVER_CHUNK_MS)
117 self._snapcast_server_initial_volume = self.config.get_value(CONF_SERVER_INITIAL_VOLUME)
118 self._snapcast_server_send_to_muted = self.config.get_value(
119 CONF_SERVER_SEND_AUDIO_TO_MUTED
120 )
121 self._snapcast_server_transport_codec = self.config.get_value(
122 CONF_SERVER_TRANSPORT_CODEC
123 )
124 else:
125 self._snapcast_server_host = str(self.config.get_value(CONF_SERVER_HOST))
126 self._snapcast_server_control_port = int(
127 str(self.config.get_value(CONF_SERVER_CONTROL_PORT))
128 )
129 self._snapcast_stream_idle_threshold = self.config.get_value(CONF_STREAM_IDLE_THRESHOLD)
130 self._ids_map = bidict({})
131
132 self._snapcast_ma_streams = {}
133 self._snapcast_ma_streams_lock = asyncio.Lock()
134
135 if self._use_builtin_server:
136 await self._start_builtin_server()
137 else:
138 self._snapserver_runner = None
139 self._snapserver_started = None
140 try:
141 self._snapserver = await _create_cntrl_server(
142 self.mass.loop,
143 self._snapcast_server_host,
144 port=self._snapcast_server_control_port,
145 reconnect=True,
146 )
147 self._snapserver.set_on_update_callback(self._handle_update)
148 self.logger.info(
149 "Started connection to Snapserver %s",
150 f"{self._snapcast_server_host}:{self._snapcast_server_control_port}",
151 )
152 # register callback for when the connection gets lost to the snapserver
153 self._snapserver.set_on_disconnect_callback(self._handle_disconnect)
154
155 except OSError as err:
156 msg = "Unable to start the Snapserver connection ?"
157 raise SetupFailedError(msg) from err
158
159 async def loaded_in_mass(self) -> None:
160 """Call after the provider has been loaded."""
161 await super().loaded_in_mass()
162 # initial load of players
163 self._handle_update()
164
165 async def unload(self, is_removed: bool = False) -> None:
166 """Handle close/cleanup of the provider."""
167 self._stop_called = True
168
169 for snap_client in self._snapserver.clients:
170 player_id = self._get_ma_id(snap_client.identifier)
171 if not (player := self.mass.players.get_player(player_id, raise_unavailable=False)):
172 continue
173 if player.playback_state != PlaybackState.PLAYING:
174 continue
175 await player.stop()
176
177 for stream_name in list(self._snapcast_ma_streams):
178 await self.delete_ma_stream(stream_name)
179
180 self._snapserver.stop()
181 await self._stop_builtin_server()
182
183 async def _start_builtin_server(self) -> None:
184 """Start the built-in Snapserver."""
185 if self._use_builtin_server:
186 self._snapserver_started = asyncio.Event()
187 self._snapserver_runner = self.mass.create_task(self._builtin_server_runner())
188 await asyncio.wait_for(self._snapserver_started.wait(), 10)
189
190 async def _stop_builtin_server(self) -> None:
191 """Stop the built-in Snapserver."""
192 self.logger.info("Stopping, built-in Snapserver")
193 if self._snapserver_runner and not self._snapserver_runner.done():
194 self._snapserver_runner.cancel()
195
196 def _setup_controlscript(self) -> str | None:
197 """Copy control script to plugin directory (blocking I/O).
198
199 :return: plugin dir if successful, None otherwise.
200 """
201 logger = self.logger.getChild("snapserver")
202 if not CONTROL_SCRIPT.exists():
203 logger.warning("Control script does not exist: %s", CONTROL_SCRIPT)
204 return None
205
206 candidates = (
207 Path(DEFAULT_SNAPSERVER_PLUGIN_DIR),
208 # fallback directory for dev environments
209 Path(self.mass.storage_path) / "snapcast" / "plugins",
210 )
211 for plugin_dir in candidates:
212 control_dest = plugin_dir / "control.py"
213 try:
214 plugin_dir.mkdir(parents=True, exist_ok=True)
215 # Clean up existing file
216 control_dest.unlink(missing_ok=True)
217
218 # Copy the control script to the plugin directory
219 shutil.copy2(CONTROL_SCRIPT, control_dest)
220 # Ensure it's executable
221 control_dest.chmod(0o755)
222 logger.debug("Copied controlscript to: %s", control_dest)
223 return str(plugin_dir)
224 except (OSError, PermissionError) as err:
225 logger.debug("Could not copy controlscript to %s : %s", plugin_dir, err)
226 logger.warning("Could not copy controlscript (metadata/control disabled)")
227 return None
228
229 async def _builtin_server_runner(self) -> None:
230 """Start running the builtin snapserver."""
231 assert self._snapserver_started is not None # for type checking
232 if self._snapserver_started.is_set():
233 raise RuntimeError("Snapserver is already started!")
234 logger = self.logger.getChild("snapserver")
235 logger.info("Starting builtin Snapserver...")
236 # register the snapcast mdns services
237 for name, port in (
238 ("-http", 1780),
239 ("-jsonrpc", 1705),
240 ("-stream", 1704),
241 ("-tcp", 1705),
242 ("", 1704),
243 ):
244 zeroconf_type = f"_snapcast{name}._tcp.local."
245 try:
246 info = AsyncServiceInfo(
247 zeroconf_type,
248 name=f"Snapcast.{zeroconf_type}",
249 properties={"is_mass": "true"},
250 addresses=[await get_ip_pton(str(self.mass.streams.publish_ip))],
251 port=port,
252 server=f"{socket.gethostname()}.local",
253 )
254 attr_name = f"zc_service_set{name}"
255 if getattr(self, attr_name, None):
256 await self.mass.aiozc.async_update_service(info)
257 else:
258 await self.mass.aiozc.async_register_service(info, strict=False)
259 setattr(self, attr_name, True)
260 except NonUniqueNameException:
261 self.logger.debug(
262 "Could not register mdns record for %s as its already in use",
263 zeroconf_type,
264 )
265 except Exception as err:
266 self.logger.exception(
267 "Could not register mdns record for %s: %s", zeroconf_type, str(err)
268 )
269
270 args = [
271 "snapserver",
272 # config settings taken from
273 # https://raw.githubusercontent.com/badaix/snapcast/86cd4b2b63e750a72e0dfe6a46d47caf01426c8d/server/etc/snapserver.conf
274 f"--config={self._snapcast_server_config_file}",
275 f"--server.datadir={self.mass.storage_path}",
276 "--http.enabled=true",
277 "--http.port=1780",
278 f"--http.doc_root={SNAPWEB_DIR}",
279 "--tcp-control.enabled=true",
280 f"--tcp-control.port={self._snapcast_server_control_port}",
281 "--stream.sampleformat=48000:16:2",
282 f"--stream.buffer={self._snapcast_server_buffer_size}",
283 f"--stream.chunk_ms={self._snapcast_server_chunk_ms}",
284 f"--stream.codec={self._snapcast_server_transport_codec}",
285 f"--stream.send_to_muted={str(self._snapcast_server_send_to_muted).lower()}",
286 f"--streaming_client.initial_volume={self._snapcast_server_initial_volume}",
287 ]
288 loop = asyncio.get_running_loop()
289 plugin_dir = await loop.run_in_executor(None, self._setup_controlscript)
290 if plugin_dir is not None:
291 args.append(f"--stream.plugin_dir={plugin_dir}")
292 self._controlscript_available = True
293
294 started_handle: asyncio.Handle | None = None
295 async with AsyncProcess(args, stdout=True, name="snapserver") as snapserver_proc:
296 try:
297 # keep reading from stdout until exit
298 async for raw_data in snapserver_proc.iter_any():
299 text = raw_data.decode().strip()
300 for line in text.split("\n"):
301 logger.debug(line)
302 if "(Snapserver) Version 0." in line:
303 # delay init a small bit to prevent race conditions
304 # where we try to connect too soon
305 if started_handle is None:
306 started_handle = self.mass.loop.call_later(
307 2, self._snapserver_started.set
308 )
309
310 except asyncio.CancelledError:
311 # Currently, MA doesn't guarantee a defined shutdown order;
312 # Make sure to close socket servers before
313 # shutting down the snapcast server.
314 #
315 # The snapserver doesn't always cleanup the control script processes
316 # properly. We do it explicitly when closing a socket server.
317 # Should be fixed on the server side, though.
318 for stream_name in list(self._snapcast_ma_streams):
319 await self.delete_ma_stream(stream_name)
320 self._snapcast_ma_streams.clear()
321 raise
322
323 finally:
324 if started_handle is not None:
325 started_handle.cancel()
326 if self._snapserver_started is not None:
327 self._snapserver_started.clear()
328 self._controlscript_available = False
329
330 def _get_ma_id(self, snap_client_id: str) -> str:
331 search_dict = self._ids_map.inverse
332 ma_id = search_dict.get(snap_client_id)
333 assert ma_id is not None # for type checking
334 return ma_id
335
336 def _get_snapclient_id(self, player_id: str) -> str:
337 search_dict = self._ids_map
338 snap_id = search_dict.get(player_id)
339 assert snap_id is not None # for type checking
340 return snap_id
341
342 def _generate_and_register_id(self, snap_client_id: str) -> str:
343 search_dict = self._ids_map.inverse
344 if snap_client_id not in search_dict:
345 new_id = "ma_" + str(re.sub(r"\W+", "", snap_client_id))
346 self._ids_map[new_id] = snap_client_id
347 return new_id
348 return self._get_ma_id(snap_client_id)
349
350 def _handle_player_init(self, snap_client: SnapclientProto) -> SnapCastPlayer:
351 """Process Snapcast add to Player controller."""
352 player_id = self._generate_and_register_id(snap_client.identifier)
353 player = self.mass.players.get_player(player_id, raise_unavailable=False)
354 if not player:
355 snap_client = self._snapserver.client(self._get_snapclient_id(player_id))
356 player = SnapCastPlayer(
357 provider=self,
358 player_id=player_id,
359 snap_client=snap_client,
360 )
361 player.setup()
362 else:
363 player = cast("SnapCastPlayer", player) # for type checking
364 asyncio.run_coroutine_threadsafe(
365 self.mass.players.register_or_update(player), loop=self.mass.loop
366 )
367 return player
368
369 def _handle_update(self) -> None:
370 """Process Snapcast init Player/Group and set callback ."""
371 for snap_client in self._snapserver.clients:
372 if not snap_client.identifier:
373 self.logger.warning(
374 "Detected Snapclient %s without identifier, skipping", snap_client.friendly_name
375 )
376 continue
377 if ma_player := self._handle_player_init(snap_client):
378 snap_client.set_callback(ma_player._handle_player_update)
379 for snap_client in self._snapserver.clients:
380 if player := self.get_snap_player(client_id=snap_client.identifier):
381 snap_client.set_callback(player._handle_player_update)
382 self._update_group_callbacks()
383
384 def poke_group_members(self, snap_group: SnapgroupProto) -> None:
385 """Process Snapcast group callback."""
386 for snap_client_id in snap_group.clients:
387 if ma_player := self.get_snap_player(client_id=snap_client_id):
388 ma_player.poke_player_update()
389
390 def _handle_disconnect(self, exc: Exception) -> None:
391 """Handle disconnect callback from snapserver."""
392 if self._stop_called or self.mass.closing:
393 # prevent auto-reconnecting of snapcast controller
394 self._snapserver.stop()
395 # we're instructed to stop/exit, so no need to restart the connection
396 return
397 self.logger.info(
398 "Connection to SnapServer lost, reason: %s. Reloading provider in 5 seconds.",
399 str(exc),
400 )
401 # schedule a reload of the provider
402 self.mass.call_later(5, self.mass.load_provider, self.instance_id, allow_retry=True)
403
404 async def remove_player(self, player_id: str) -> None:
405 """Remove the client from the snapserver when it is deleted."""
406 success, error_msg = await self._snapserver.delete_client(
407 self._get_snapclient_id(player_id)
408 )
409 if success:
410 self.logger.debug("Snapclient removed %s", player_id)
411 else:
412 self.logger.warning("Unable to remove snapclient %s: %s", player_id, error_msg)
413
414 def _update_group_callbacks(self, poke: bool = False) -> None:
415 for grp in self._snapserver.groups:
416 grp.set_callback(self.poke_group_members)
417 if poke:
418 self.poke_group_members(grp)
419
420 async def ensure_player_owned_group(
421 self, ma_player_id: str, set_stream_id: str | None = None
422 ) -> SnapgroupProto | None:
423 """Ensure a Snapcast group is owned by the given player.
424
425 This method guarantees that the returned Snapcast group is *owned* by the
426 specified Music Assistant player, meaning the group name equals the
427 player's ID and the player is the group leader.
428
429 Behavior:
430 - If the player is already the leader of its current group, that group is
431 returned unchanged.
432 - If the player is a member of another group (but not the leader), the
433 player is removed from that group, which causes Snapcast to create a new
434 single-client group for the player.
435 - The resulting group is renamed to the player's ID.
436
437 If `set_stream_id` is provided and a new group is created, the group's
438 stream is updated accordingly.
439
440 Args:
441 ma_player_id: Music Assistant player ID.
442 set_stream_id: Optional Snapcast stream ID to assign to the player's group.
443
444 Returns:
445 The Snapcast group owned by the player, or ``None`` if the player is not
446 currently part of any group.
447 """
448 player_client = self.get_snap_client(player_id=ma_player_id)
449 if player_client is None:
450 return None
451
452 curr_group = player_client.group
453
454 if curr_group is None:
455 return None
456
457 if curr_group.name == ma_player_id:
458 return curr_group
459
460 group_members = list(curr_group.clients)
461 if len(group_members) > 1 and curr_group.name:
462 # player is member of other player group, remove it, which results in a new group
463 group_members.remove(player_client.identifier)
464 res = await self._snapserver.group_clients(curr_group.identifier, group_members)
465 if not (isinstance(res, dict) and "server" in res):
466 raise RuntimeError("Couldn't remove client from group")
467 self._snapserver.synchronize(res)
468 curr_group = player_client.group
469 if curr_group is None:
470 return None
471 if set_stream_id:
472 await curr_group.set_stream(set_stream_id)
473
474 await curr_group.set_name(ma_player_id)
475 return curr_group
476
477 async def isolate_player_to_dedicated_group(
478 self,
479 target_player_id: str,
480 target_stream_id: str | None = None,
481 others_stream_id: str | None = "default",
482 ) -> None:
483 """Isolate a player into a dedicated Snapcast group.
484
485 Ensures that the target player ends up in a group where it is the sole
486 member and group leader.
487
488 Behavior:
489 - The target player is first ensured to own its group.
490 - All other members of that group are removed.
491 - Each removed player is placed into its own dedicated group.
492 - Removed players' groups are optionally assigned `others_stream_id`.
493 - The target group is optionally assigned `target_stream_id`.
494
495 Callbacks for affected clients and groups are temporarily disabled during
496 the operation to avoid intermediate state updates.
497
498 Args:
499 target_player_id: Music Assistant player ID to isolate.
500 target_stream_id: Optional stream ID to assign to the target player's group.
501 others_stream_id: Stream ID assigned to newly created groups for removed players.
502 """
503 this_client_id = self._get_snapclient_id(target_player_id)
504 target_group = await self.ensure_player_owned_group(
505 target_player_id, set_stream_id=target_stream_id
506 )
507
508 if target_group is None:
509 return
510
511 target_group.set_callback(None)
512 group_members = list(target_group.clients)
513 group_members.remove(this_client_id)
514 for client_id in group_members:
515 client = self._snapserver.client(client_id)
516 client.set_callback(None)
517 if group_members:
518 res = await self._snapserver.group_clients(target_group.identifier, [this_client_id])
519 if not (isinstance(res, dict) and "server" in res):
520 raise RuntimeError("Couldn't remove client from group")
521 self._snapserver.synchronize(res)
522 for client_id in group_members:
523 ma_player_id = self._get_ma_id(client_id)
524 if ma_player := cast("SnapCastPlayer", self.mass.players.get_player(ma_player_id)):
525 client = self._snapserver.client(client_id)
526 if client is not None:
527 if client.group is not None:
528 await client.group.set_name(ma_player_id)
529 if others_stream_id:
530 await client.group.set_stream(others_stream_id)
531 client.set_callback(ma_player._handle_player_update)
532
533 if target_stream_id is not None:
534 await target_group.set_stream(target_stream_id)
535
536 async def get_snapcast_media_stream(
537 self,
538 media: PlayerMedia,
539 filter_settings_owner: str | None = None,
540 existing_only: bool = False,
541 ) -> SnapcastMAStream | None:
542 """Get or create a Snapcast Music Assistant stream for the given media.
543
544 Determines a deterministic Snapcast stream name based on the media type
545 and source, and either returns an existing stream or creates a new one.
546
547 Behavior:
548 - Announcement and generic media streams use a hashed name.
549 - Plugin and queue-backed sources reuse a stable stream name.
550 - Queue-backed streams may persist across playback sessions.
551 - If `existing_only` is True, no new stream will be created.
552
553 Newly created streams are registered with the Snapcast server and fully
554 set up before being returned.
555
556 Args:
557 media: Media item to stream.
558 filter_settings_owner: Optional player/entity ID used to resolve DSP filters.
559 existing_only: If True, only return an existing stream.
560
561 Returns:
562 A ``SnapcastMAStream`` instance, or ``None`` if no stream exists and
563 `existing_only` is True.
564 """
565 stream_name: str = ""
566 name_suffix: str = ""
567 queue_id: str | None = None
568 source_id: str | None = None
569 destroy_on_stop = True
570
571 if media.media_type == MediaType.ANNOUNCEMENT:
572 stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
573 name_suffix = MASS_ANNOUNCEMENT_POSTFIX
574 elif media.media_type == MediaType.PLUGIN_SOURCE:
575 custom_data = media.custom_data or {}
576 plugin: str = media.title or custom_data.get("provider") or ""
577 player: str = f" {custom_data.get('player_id', '')}"
578 stream_name += f"{plugin} {player}"
579 source_id = custom_data.get("source_id")
580 elif media.source_id and media.source_id.startswith(UGP_PREFIX):
581 stream_name += media.source_id
582 elif media.source_id and media.queue_item_id:
583 stream_name += media.source_id
584 queue_id = media.source_id
585 source_id = media.source_id
586 destroy_on_stop = False
587 else:
588 stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
589
590 stream_name = create_safe_string(stream_name, lowercase=False)
591 stream_name = f"{MASS_STREAM_PREFIX}{stream_name}{name_suffix}"
592 async with self._snapcast_ma_streams_lock:
593 if not (stream := self._snapcast_ma_streams.get(stream_name)):
594 if existing_only:
595 return None
596
597 stream = SnapcastMAStream(
598 provider=self,
599 media=media,
600 stream_name=stream_name,
601 filter_settings_owner=filter_settings_owner,
602 source_id=source_id,
603 use_cntrl_script=bool(queue_id) and self.queue_control_available,
604 destroy_on_stop=destroy_on_stop,
605 )
606 self._snapcast_ma_streams[stream_name] = stream
607 else:
608 stream.update_media(media)
609 await stream.setup()
610 return stream
611
612 def get_snap_ma_stream(self, stream_name: str) -> SnapcastMAStream | None:
613 """Return an existing Music Assistant Snapcast stream by name.
614
615 Args:
616 stream_name: Snapcast stream name.
617
618 Returns:
619 The corresponding ``SnapcastMAStream`` instance, or ``None`` if not found.
620 """
621 return self._snapcast_ma_streams.get(stream_name)
622
623 async def delete_ma_stream(self, stream_name: str) -> None:
624 """Remove and destroy a Music Assistant Snapcast stream.
625
626 The stream is removed from internal tracking and its resources are
627 destroyed asynchronously. Errors during destruction are logged but
628 otherwise ignored.
629
630 Args:
631 stream_name: Snapcast stream name to delete.
632 """
633 async with self._snapcast_ma_streams_lock:
634 stream = self._snapcast_ma_streams.pop(stream_name, None)
635
636 if not stream:
637 return
638
639 try:
640 await stream.destroy()
641 except Exception:
642 self.logger.exception("Failed to destroy stream session %s", stream_name)
643
644 def update_stream_usage(self) -> None:
645 """Update usage state for all tracked Snapcast streams.
646
647 Marks streams as "in use" if they are currently assigned to any Snapcast
648 group, and schedules unused streams for delayed shutdown.
649
650 This method should be called whenever group or stream assignments change
651 on the Snapcast server.
652 """
653 unused_streams = set(self._snapcast_ma_streams.keys())
654 for grp in self._snapserver.groups:
655 stream_id = grp.stream
656 if stream_id in self._snapcast_ma_streams:
657 ma_stream = self._snapcast_ma_streams[stream_id]
658 ma_stream.set_in_use(True)
659 unused_streams.discard(stream_id)
660
661 if not unused_streams:
662 break
663
664 for stream_id in unused_streams:
665 self._snapcast_ma_streams[stream_id].set_in_use(False)
666
667 def get_snap_client(
668 self, *, client_id: str | None = None, player_id: str | None = None
669 ) -> SnapclientProto | None:
670 """Return the snapclient for either given client_id or player_id."""
671 if player_id is not None:
672 if client_id is not None and client_id != self._get_snapclient_id(client_id):
673 raise ValueError("provided client_id and player_id do not match")
674 client_id = self._get_snapclient_id(player_id)
675
676 if client_id:
677 with suppress(KeyError):
678 return self._snapserver.client(client_id)
679
680 return None
681
682 def get_snap_player(
683 self, *, client_id: str | None = None, player_id: str | None = None
684 ) -> SnapCastPlayer | None:
685 """Return the MA SnapCastPlayer for either given client_id or player_id."""
686 if client_id is not None:
687 if player_id is not None and player_id != self._get_ma_id(client_id):
688 raise ValueError("provided client_id and player_id do not match")
689 player_id = self._get_ma_id(client_id)
690
691 if player_id is None:
692 return None
693
694 if ma_player := self.mass.players.get_player(player_id):
695 assert isinstance(ma_player, SnapCastPlayer) # for type checking
696 return ma_player
697
698 return None
699