music-assistant-server

28.9 KBPY
provider.py
28.9 KB699 lines • python
1"""SnapCastProvider."""
2
3from __future__ import annotations
4
5import asyncio
6import hashlib
7import logging
8import re
9import shutil
10import socket
11from contextlib import suppress
12from pathlib import Path
13from typing import TYPE_CHECKING, cast
14
15from bidict import bidict
16from music_assistant_models.enums import MediaType, PlaybackState
17from music_assistant_models.errors import SetupFailedError
18from snapcast.control.server import CONTROL_PORT, Snapserver
19from zeroconf import NonUniqueNameException
20from zeroconf.asyncio import AsyncServiceInfo
21
22from music_assistant.helpers.compare import create_safe_string
23from music_assistant.helpers.process import AsyncProcess
24from music_assistant.helpers.util import get_ip_pton
25from music_assistant.models.player_provider import PlayerProvider
26from music_assistant.providers.snapcast.constants import (
27    CONF_SERVER_BUFFER_SIZE,
28    CONF_SERVER_CHUNK_MS,
29    CONF_SERVER_CONTROL_PORT,
30    CONF_SERVER_HOST,
31    CONF_SERVER_INITIAL_VOLUME,
32    CONF_SERVER_SEND_AUDIO_TO_MUTED,
33    CONF_SERVER_TRANSPORT_CODEC,
34    CONF_STREAM_IDLE_THRESHOLD,
35    CONF_USE_EXTERNAL_SERVER,
36    CONTROL_SCRIPT,
37    DEFAULT_SNAPSERVER_CONFIG_FILE,
38    DEFAULT_SNAPSERVER_PLUGIN_DIR,
39    DEFAULT_SNAPSERVER_PORT,
40    MASS_ANNOUNCEMENT_POSTFIX,
41    MASS_STREAM_PREFIX,
42    SHIPPED_SNAPSERVER_CONFIG_FILE,
43    SNAPWEB_DIR,
44)
45from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
46from music_assistant.providers.snapcast.player import SnapCastPlayer
47from music_assistant.providers.universal_group.constants import UGP_PREFIX
48
49if TYPE_CHECKING:
50    from music_assistant_models.player import PlayerMedia
51
52    from .snap_cntrl_proto import SnapclientProto, SnapgroupProto, SnapserverProto
53
54
55async def _create_cntrl_server(
56    loop: asyncio.AbstractEventLoop,
57    host: str,
58    port: int = CONTROL_PORT,
59    reconnect: bool = False,
60) -> SnapserverProto:
61    """Server factory."""
62    server = Snapserver(loop, host, port, reconnect)
63    await server.start()
64    return cast("SnapserverProto", server)
65
66
67class SnapCastProvider(PlayerProvider):
68    """SnapCastProvider."""
69
70    _snapserver: SnapserverProto
71    _snapserver_runner: asyncio.Task[None] | None
72    _snapserver_started: asyncio.Event | None
73    _snapcast_server_host: str
74    _snapcast_server_control_port: int
75    _ids_map: bidict[str, str]  # ma_id / snapclient_id
76    _use_builtin_server: bool
77    _stop_called: bool
78    _controlscript_available: bool
79    _snapcast_ma_streams: dict[str, SnapcastMAStream]
80    _snapcast_ma_streams_lock: asyncio.Lock
81
82    @property
83    def queue_control_available(self) -> bool:
84        """Return whether queue-based control scripts are available.
85
86        Indicates if the Snapcast control script has been successfully initialized
87        and can be used to control playback via a queue-specific control channel.
88        """
89        return (
90            self._use_builtin_server
91            and self._controlscript_available
92            and self._snapserver_started is not None
93            and self._snapserver_started.is_set()
94        )
95
96    async def handle_async_init(self) -> None:
97        """Handle async initialization of the provider."""
98        # set snapcast logging
99        logging.getLogger("snapcast").setLevel(self.logger.level)
100        self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER)
101        self._stop_called = False
102        self._controlscript_available = False
103        if self._use_builtin_server:
104            if Path(DEFAULT_SNAPSERVER_CONFIG_FILE).exists():
105                self._snapcast_server_config_file = DEFAULT_SNAPSERVER_CONFIG_FILE
106            else:
107                # Fallback for dev environments without a Snapserver config file.
108                # If the file is missing, Snapserver silently ignores all command-line arguments.
109                self._snapcast_server_config_file = str(SHIPPED_SNAPSERVER_CONFIG_FILE)
110
111            self._snapcast_server_host = "127.0.0.1"
112            self._snapcast_server_control_port = DEFAULT_SNAPSERVER_PORT
113            self._snapcast_server_buffer_size = cast(
114                "int", self.config.get_value(CONF_SERVER_BUFFER_SIZE)
115            )
116            self._snapcast_server_chunk_ms = self.config.get_value(CONF_SERVER_CHUNK_MS)
117            self._snapcast_server_initial_volume = self.config.get_value(CONF_SERVER_INITIAL_VOLUME)
118            self._snapcast_server_send_to_muted = self.config.get_value(
119                CONF_SERVER_SEND_AUDIO_TO_MUTED
120            )
121            self._snapcast_server_transport_codec = self.config.get_value(
122                CONF_SERVER_TRANSPORT_CODEC
123            )
124        else:
125            self._snapcast_server_host = str(self.config.get_value(CONF_SERVER_HOST))
126            self._snapcast_server_control_port = int(
127                str(self.config.get_value(CONF_SERVER_CONTROL_PORT))
128            )
129        self._snapcast_stream_idle_threshold = self.config.get_value(CONF_STREAM_IDLE_THRESHOLD)
130        self._ids_map = bidict({})
131
132        self._snapcast_ma_streams = {}
133        self._snapcast_ma_streams_lock = asyncio.Lock()
134
135        if self._use_builtin_server:
136            await self._start_builtin_server()
137        else:
138            self._snapserver_runner = None
139            self._snapserver_started = None
140        try:
141            self._snapserver = await _create_cntrl_server(
142                self.mass.loop,
143                self._snapcast_server_host,
144                port=self._snapcast_server_control_port,
145                reconnect=True,
146            )
147            self._snapserver.set_on_update_callback(self._handle_update)
148            self.logger.info(
149                "Started connection to Snapserver %s",
150                f"{self._snapcast_server_host}:{self._snapcast_server_control_port}",
151            )
152            # register callback for when the connection gets lost to the snapserver
153            self._snapserver.set_on_disconnect_callback(self._handle_disconnect)
154
155        except OSError as err:
156            msg = "Unable to start the Snapserver connection ?"
157            raise SetupFailedError(msg) from err
158
159    async def loaded_in_mass(self) -> None:
160        """Call after the provider has been loaded."""
161        await super().loaded_in_mass()
162        # initial load of players
163        self._handle_update()
164
165    async def unload(self, is_removed: bool = False) -> None:
166        """Handle close/cleanup of the provider."""
167        self._stop_called = True
168
169        for snap_client in self._snapserver.clients:
170            player_id = self._get_ma_id(snap_client.identifier)
171            if not (player := self.mass.players.get(player_id, raise_unavailable=False)):
172                continue
173            if player.playback_state != PlaybackState.PLAYING:
174                continue
175            await player.stop()
176
177        for stream_name in list(self._snapcast_ma_streams):
178            await self.delete_ma_stream(stream_name)
179
180        self._snapserver.stop()
181        await self._stop_builtin_server()
182
183    async def _start_builtin_server(self) -> None:
184        """Start the built-in Snapserver."""
185        if self._use_builtin_server:
186            self._snapserver_started = asyncio.Event()
187            self._snapserver_runner = self.mass.create_task(self._builtin_server_runner())
188            await asyncio.wait_for(self._snapserver_started.wait(), 10)
189
190    async def _stop_builtin_server(self) -> None:
191        """Stop the built-in Snapserver."""
192        self.logger.info("Stopping, built-in Snapserver")
193        if self._snapserver_runner and not self._snapserver_runner.done():
194            self._snapserver_runner.cancel()
195
196    def _setup_controlscript(self) -> str | None:
197        """Copy control script to plugin directory (blocking I/O).
198
199        :return: plugin dir if successful, None otherwise.
200        """
201        logger = self.logger.getChild("snapserver")
202        if not CONTROL_SCRIPT.exists():
203            logger.warning("Control script does not exist: %s", CONTROL_SCRIPT)
204            return None
205
206        candidates = (
207            Path(DEFAULT_SNAPSERVER_PLUGIN_DIR),
208            # fallback directory for dev environments
209            Path(self.mass.storage_path) / "snapcast" / "plugins",
210        )
211        for plugin_dir in candidates:
212            control_dest = plugin_dir / "control.py"
213            try:
214                plugin_dir.mkdir(parents=True, exist_ok=True)
215                # Clean up existing file
216                control_dest.unlink(missing_ok=True)
217
218                # Copy the control script to the plugin directory
219                shutil.copy2(CONTROL_SCRIPT, control_dest)
220                # Ensure it's executable
221                control_dest.chmod(0o755)
222                logger.debug("Copied controlscript to: %s", control_dest)
223                return str(plugin_dir)
224            except (OSError, PermissionError) as err:
225                logger.debug("Could not copy controlscript to %s : %s", plugin_dir, err)
226        logger.warning("Could not copy controlscript (metadata/control disabled)")
227        return None
228
229    async def _builtin_server_runner(self) -> None:
230        """Start running the builtin snapserver."""
231        assert self._snapserver_started is not None  # for type checking
232        if self._snapserver_started.is_set():
233            raise RuntimeError("Snapserver is already started!")
234        logger = self.logger.getChild("snapserver")
235        logger.info("Starting builtin Snapserver...")
236        # register the snapcast mdns services
237        for name, port in (
238            ("-http", 1780),
239            ("-jsonrpc", 1705),
240            ("-stream", 1704),
241            ("-tcp", 1705),
242            ("", 1704),
243        ):
244            zeroconf_type = f"_snapcast{name}._tcp.local."
245            try:
246                info = AsyncServiceInfo(
247                    zeroconf_type,
248                    name=f"Snapcast.{zeroconf_type}",
249                    properties={"is_mass": "true"},
250                    addresses=[await get_ip_pton(str(self.mass.streams.publish_ip))],
251                    port=port,
252                    server=f"{socket.gethostname()}.local",
253                )
254                attr_name = f"zc_service_set{name}"
255                if getattr(self, attr_name, None):
256                    await self.mass.aiozc.async_update_service(info)
257                else:
258                    await self.mass.aiozc.async_register_service(info, strict=False)
259                setattr(self, attr_name, True)
260            except NonUniqueNameException:
261                self.logger.debug(
262                    "Could not register mdns record for %s as its already in use",
263                    zeroconf_type,
264                )
265            except Exception as err:
266                self.logger.exception(
267                    "Could not register mdns record for %s: %s", zeroconf_type, str(err)
268                )
269
270        args = [
271            "snapserver",
272            # config settings taken from
273            # https://raw.githubusercontent.com/badaix/snapcast/86cd4b2b63e750a72e0dfe6a46d47caf01426c8d/server/etc/snapserver.conf
274            f"--config={self._snapcast_server_config_file}",
275            f"--server.datadir={self.mass.storage_path}",
276            "--http.enabled=true",
277            "--http.port=1780",
278            f"--http.doc_root={SNAPWEB_DIR}",
279            "--tcp-control.enabled=true",
280            f"--tcp-control.port={self._snapcast_server_control_port}",
281            "--stream.sampleformat=48000:16:2",
282            f"--stream.buffer={self._snapcast_server_buffer_size}",
283            f"--stream.chunk_ms={self._snapcast_server_chunk_ms}",
284            f"--stream.codec={self._snapcast_server_transport_codec}",
285            f"--stream.send_to_muted={str(self._snapcast_server_send_to_muted).lower()}",
286            f"--streaming_client.initial_volume={self._snapcast_server_initial_volume}",
287        ]
288        loop = asyncio.get_running_loop()
289        plugin_dir = await loop.run_in_executor(None, self._setup_controlscript)
290        if plugin_dir is not None:
291            args.append(f"--stream.plugin_dir={plugin_dir}")
292            self._controlscript_available = True
293
294        started_handle: asyncio.Handle | None = None
295        async with AsyncProcess(args, stdout=True, name="snapserver") as snapserver_proc:
296            try:
297                # keep reading from stdout until exit
298                async for raw_data in snapserver_proc.iter_any():
299                    text = raw_data.decode().strip()
300                    for line in text.split("\n"):
301                        logger.debug(line)
302                        if "(Snapserver) Version 0." in line:
303                            # delay init a small bit to prevent race conditions
304                            # where we try to connect too soon
305                            if started_handle is None:
306                                started_handle = self.mass.loop.call_later(
307                                    2, self._snapserver_started.set
308                                )
309
310            except asyncio.CancelledError:
311                # Currently, MA doesn't guarantee a defined shutdown order;
312                # Make sure to close socket servers before
313                # shutting down the snapcast server.
314                #
315                # The snapserver doesn't always cleanup the control script processes
316                # properly. We do it explicitly when closing a socket server.
317                # Should be fixed on the server side, though.
318                for stream_name in list(self._snapcast_ma_streams):
319                    await self.delete_ma_stream(stream_name)
320                self._snapcast_ma_streams.clear()
321                raise
322
323            finally:
324                if started_handle is not None:
325                    started_handle.cancel()
326                if self._snapserver_started is not None:
327                    self._snapserver_started.clear()
328                self._controlscript_available = False
329
330    def _get_ma_id(self, snap_client_id: str) -> str:
331        search_dict = self._ids_map.inverse
332        ma_id = search_dict.get(snap_client_id)
333        assert ma_id is not None  # for type checking
334        return ma_id
335
336    def _get_snapclient_id(self, player_id: str) -> str:
337        search_dict = self._ids_map
338        snap_id = search_dict.get(player_id)
339        assert snap_id is not None  # for type checking
340        return snap_id
341
342    def _generate_and_register_id(self, snap_client_id: str) -> str:
343        search_dict = self._ids_map.inverse
344        if snap_client_id not in search_dict:
345            new_id = "ma_" + str(re.sub(r"\W+", "", snap_client_id))
346            self._ids_map[new_id] = snap_client_id
347            return new_id
348        return self._get_ma_id(snap_client_id)
349
350    def _handle_player_init(self, snap_client: SnapclientProto) -> SnapCastPlayer:
351        """Process Snapcast add to Player controller."""
352        player_id = self._generate_and_register_id(snap_client.identifier)
353        player = self.mass.players.get(player_id, raise_unavailable=False)
354        if not player:
355            snap_client = self._snapserver.client(self._get_snapclient_id(player_id))
356            player = SnapCastPlayer(
357                provider=self,
358                player_id=player_id,
359                snap_client=snap_client,
360            )
361            player.setup()
362        else:
363            player = cast("SnapCastPlayer", player)  # for type checking
364        asyncio.run_coroutine_threadsafe(
365            self.mass.players.register_or_update(player), loop=self.mass.loop
366        )
367        return player
368
369    def _handle_update(self) -> None:
370        """Process Snapcast init Player/Group and set callback ."""
371        for snap_client in self._snapserver.clients:
372            if not snap_client.identifier:
373                self.logger.warning(
374                    "Detected Snapclient %s without identifier, skipping", snap_client.friendly_name
375                )
376                continue
377            if ma_player := self._handle_player_init(snap_client):
378                snap_client.set_callback(ma_player._handle_player_update)
379        for snap_client in self._snapserver.clients:
380            if player := self.get_snap_player(client_id=snap_client.identifier):
381                snap_client.set_callback(player._handle_player_update)
382        self._update_group_callbacks()
383
384    def poke_group_members(self, snap_group: SnapgroupProto) -> None:
385        """Process Snapcast group callback."""
386        for snap_client_id in snap_group.clients:
387            if ma_player := self.get_snap_player(client_id=snap_client_id):
388                ma_player.poke_player_update()
389
390    def _handle_disconnect(self, exc: Exception) -> None:
391        """Handle disconnect callback from snapserver."""
392        if self._stop_called or self.mass.closing:
393            # prevent auto-reconnecting of snapcast controller
394            self._snapserver.stop()
395            # we're instructed to stop/exit, so no need to restart the connection
396            return
397        self.logger.info(
398            "Connection to SnapServer lost, reason: %s. Reloading provider in 5 seconds.",
399            str(exc),
400        )
401        # schedule a reload of the provider
402        self.mass.call_later(5, self.mass.load_provider, self.instance_id, allow_retry=True)
403
404    async def remove_player(self, player_id: str) -> None:
405        """Remove the client from the snapserver when it is deleted."""
406        success, error_msg = await self._snapserver.delete_client(
407            self._get_snapclient_id(player_id)
408        )
409        if success:
410            self.logger.debug("Snapclient removed %s", player_id)
411        else:
412            self.logger.warning("Unable to remove snapclient %s: %s", player_id, error_msg)
413
414    def _update_group_callbacks(self, poke: bool = False) -> None:
415        for grp in self._snapserver.groups:
416            grp.set_callback(self.poke_group_members)
417            if poke:
418                self.poke_group_members(grp)
419
420    async def ensure_player_owned_group(
421        self, ma_player_id: str, set_stream_id: str | None = None
422    ) -> SnapgroupProto | None:
423        """Ensure a Snapcast group is owned by the given player.
424
425        This method guarantees that the returned Snapcast group is *owned* by the
426        specified Music Assistant player, meaning the group name equals the
427        player's ID and the player is the group leader.
428
429        Behavior:
430        - If the player is already the leader of its current group, that group is
431        returned unchanged.
432        - If the player is a member of another group (but not the leader), the
433        player is removed from that group, which causes Snapcast to create a new
434        single-client group for the player.
435        - The resulting group is renamed to the player's ID.
436
437        If `set_stream_id` is provided and a new group is created, the group's
438        stream is updated accordingly.
439
440        Args:
441            ma_player_id: Music Assistant player ID.
442            set_stream_id: Optional Snapcast stream ID to assign to the player's group.
443
444        Returns:
445            The Snapcast group owned by the player, or ``None`` if the player is not
446            currently part of any group.
447        """
448        player_client = self.get_snap_client(player_id=ma_player_id)
449        if player_client is None:
450            return None
451
452        curr_group = player_client.group
453
454        if curr_group is None:
455            return None
456
457        if curr_group.name == ma_player_id:
458            return curr_group
459
460        group_members = list(curr_group.clients)
461        if len(group_members) > 1 and curr_group.name:
462            # player is member of other player group, remove it, which results in a new group
463            group_members.remove(player_client.identifier)
464            res = await self._snapserver.group_clients(curr_group.identifier, group_members)
465            if not (isinstance(res, dict) and "server" in res):
466                raise RuntimeError("Couldn't remove client from group")
467            self._snapserver.synchronize(res)
468            curr_group = player_client.group
469            if curr_group is None:
470                return None
471            if set_stream_id:
472                await curr_group.set_stream(set_stream_id)
473
474        await curr_group.set_name(ma_player_id)
475        return curr_group
476
477    async def isolate_player_to_dedicated_group(
478        self,
479        target_player_id: str,
480        target_stream_id: str | None = None,
481        others_stream_id: str | None = "default",
482    ) -> None:
483        """Isolate a player into a dedicated Snapcast group.
484
485        Ensures that the target player ends up in a group where it is the sole
486        member and group leader.
487
488        Behavior:
489        - The target player is first ensured to own its group.
490        - All other members of that group are removed.
491        - Each removed player is placed into its own dedicated group.
492        - Removed players' groups are optionally assigned `others_stream_id`.
493        - The target group is optionally assigned `target_stream_id`.
494
495        Callbacks for affected clients and groups are temporarily disabled during
496        the operation to avoid intermediate state updates.
497
498        Args:
499            target_player_id: Music Assistant player ID to isolate.
500            target_stream_id: Optional stream ID to assign to the target player's group.
501            others_stream_id: Stream ID assigned to newly created groups for removed players.
502        """
503        this_client_id = self._get_snapclient_id(target_player_id)
504        target_group = await self.ensure_player_owned_group(
505            target_player_id, set_stream_id=target_stream_id
506        )
507
508        if target_group is None:
509            return
510
511        target_group.set_callback(None)
512        group_members = list(target_group.clients)
513        group_members.remove(this_client_id)
514        for client_id in group_members:
515            client = self._snapserver.client(client_id)
516            client.set_callback(None)
517        if group_members:
518            res = await self._snapserver.group_clients(target_group.identifier, [this_client_id])
519            if not (isinstance(res, dict) and "server" in res):
520                raise RuntimeError("Couldn't remove client from group")
521            self._snapserver.synchronize(res)
522            for client_id in group_members:
523                ma_player_id = self._get_ma_id(client_id)
524                if ma_player := cast("SnapCastPlayer", self.mass.players.get(ma_player_id)):
525                    client = self._snapserver.client(client_id)
526                    if client is not None:
527                        if client.group is not None:
528                            await client.group.set_name(ma_player_id)
529                            if others_stream_id:
530                                await client.group.set_stream(others_stream_id)
531                        client.set_callback(ma_player._handle_player_update)
532
533        if target_stream_id is not None:
534            await target_group.set_stream(target_stream_id)
535
536    async def get_snapcast_media_stream(
537        self,
538        media: PlayerMedia,
539        filter_settings_owner: str | None = None,
540        existing_only: bool = False,
541    ) -> SnapcastMAStream | None:
542        """Get or create a Snapcast Music Assistant stream for the given media.
543
544        Determines a deterministic Snapcast stream name based on the media type
545        and source, and either returns an existing stream or creates a new one.
546
547        Behavior:
548        - Announcement and generic media streams use a hashed name.
549        - Plugin and queue-backed sources reuse a stable stream name.
550        - Queue-backed streams may persist across playback sessions.
551        - If `existing_only` is True, no new stream will be created.
552
553        Newly created streams are registered with the Snapcast server and fully
554        set up before being returned.
555
556        Args:
557            media: Media item to stream.
558            filter_settings_owner: Optional player/entity ID used to resolve DSP filters.
559            existing_only: If True, only return an existing stream.
560
561        Returns:
562            A ``SnapcastMAStream`` instance, or ``None`` if no stream exists and
563            `existing_only` is True.
564        """
565        stream_name: str = ""
566        name_suffix: str = ""
567        queue_id: str | None = None
568        source_id: str | None = None
569        destroy_on_stop = True
570
571        if media.media_type == MediaType.ANNOUNCEMENT:
572            stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
573            name_suffix = MASS_ANNOUNCEMENT_POSTFIX
574        elif media.media_type == MediaType.PLUGIN_SOURCE:
575            custom_data = media.custom_data or {}
576            plugin: str = media.title or custom_data.get("provider") or ""
577            player: str = f" {custom_data.get('player_id', '')}"
578            stream_name += f"{plugin} {player}"
579            source_id = custom_data.get("source_id")
580        elif media.source_id and media.source_id.startswith(UGP_PREFIX):
581            stream_name += media.source_id
582        elif media.source_id and media.queue_item_id:
583            stream_name += media.source_id
584            queue_id = media.source_id
585            source_id = media.source_id
586            destroy_on_stop = False
587        else:
588            stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
589
590        stream_name = create_safe_string(stream_name, lowercase=False)
591        stream_name = f"{MASS_STREAM_PREFIX}{stream_name}{name_suffix}"
592        async with self._snapcast_ma_streams_lock:
593            if not (stream := self._snapcast_ma_streams.get(stream_name)):
594                if existing_only:
595                    return None
596
597                stream = SnapcastMAStream(
598                    provider=self,
599                    media=media,
600                    stream_name=stream_name,
601                    filter_settings_owner=filter_settings_owner,
602                    source_id=source_id,
603                    use_cntrl_script=bool(queue_id) and self.queue_control_available,
604                    destroy_on_stop=destroy_on_stop,
605                )
606                self._snapcast_ma_streams[stream_name] = stream
607            else:
608                stream.update_media(media)
609        await stream.setup()
610        return stream
611
612    def get_snap_ma_stream(self, stream_name: str) -> SnapcastMAStream | None:
613        """Return an existing Music Assistant Snapcast stream by name.
614
615        Args:
616            stream_name: Snapcast stream name.
617
618        Returns:
619            The corresponding ``SnapcastMAStream`` instance, or ``None`` if not found.
620        """
621        return self._snapcast_ma_streams.get(stream_name)
622
623    async def delete_ma_stream(self, stream_name: str) -> None:
624        """Remove and destroy a Music Assistant Snapcast stream.
625
626        The stream is removed from internal tracking and its resources are
627        destroyed asynchronously. Errors during destruction are logged but
628        otherwise ignored.
629
630        Args:
631            stream_name: Snapcast stream name to delete.
632        """
633        async with self._snapcast_ma_streams_lock:
634            stream = self._snapcast_ma_streams.pop(stream_name, None)
635
636        if not stream:
637            return
638
639        try:
640            await stream.destroy()
641        except Exception:
642            self.logger.exception("Failed to destroy stream session %s", stream_name)
643
644    def update_stream_usage(self) -> None:
645        """Update usage state for all tracked Snapcast streams.
646
647        Marks streams as "in use" if they are currently assigned to any Snapcast
648        group, and schedules unused streams for delayed shutdown.
649
650        This method should be called whenever group or stream assignments change
651        on the Snapcast server.
652        """
653        unused_streams = set(self._snapcast_ma_streams.keys())
654        for grp in self._snapserver.groups:
655            stream_id = grp.stream
656            if stream_id in self._snapcast_ma_streams:
657                ma_stream = self._snapcast_ma_streams[stream_id]
658                ma_stream.set_in_use(True)
659                unused_streams.discard(stream_id)
660
661            if not unused_streams:
662                break
663
664        for stream_id in unused_streams:
665            self._snapcast_ma_streams[stream_id].set_in_use(False)
666
667    def get_snap_client(
668        self, *, client_id: str | None = None, player_id: str | None = None
669    ) -> SnapclientProto | None:
670        """Return the snapclient for either given client_id or player_id."""
671        if player_id is not None:
672            if client_id is not None and client_id != self._get_snapclient_id(client_id):
673                raise ValueError("provided client_id and player_id do not match")
674            client_id = self._get_snapclient_id(player_id)
675
676        if client_id:
677            with suppress(KeyError):
678                return self._snapserver.client(client_id)
679
680        return None
681
682    def get_snap_player(
683        self, *, client_id: str | None = None, player_id: str | None = None
684    ) -> SnapCastPlayer | None:
685        """Return the MA SnapCastPlayer for either given client_id or player_id."""
686        if client_id is not None:
687            if player_id is not None and player_id != self._get_ma_id(client_id):
688                raise ValueError("provided client_id and player_id do not match")
689            player_id = self._get_ma_id(client_id)
690
691        if player_id is None:
692            return None
693
694        if ma_player := self.mass.players.get(player_id):
695            assert isinstance(ma_player, SnapCastPlayer)  # for type checking
696            return ma_player
697
698        return None
699