/
/
/
1"""SnapCastProvider."""
2
3from __future__ import annotations
4
5import asyncio
6import hashlib
7import logging
8import re
9import shutil
10import socket
11from contextlib import suppress
12from pathlib import Path
13from typing import TYPE_CHECKING, cast
14
15from bidict import bidict
16from music_assistant_models.enums import MediaType, PlaybackState
17from music_assistant_models.errors import SetupFailedError
18from snapcast.control.server import CONTROL_PORT, Snapserver
19from zeroconf import NonUniqueNameException
20from zeroconf.asyncio import AsyncServiceInfo
21
22from music_assistant.helpers.compare import create_safe_string
23from music_assistant.helpers.process import AsyncProcess
24from music_assistant.helpers.util import get_ip_pton
25from music_assistant.models.player_provider import PlayerProvider
26from music_assistant.providers.snapcast.constants import (
27 CONF_SERVER_BUFFER_SIZE,
28 CONF_SERVER_CHUNK_MS,
29 CONF_SERVER_CONTROL_PORT,
30 CONF_SERVER_HOST,
31 CONF_SERVER_INITIAL_VOLUME,
32 CONF_SERVER_SEND_AUDIO_TO_MUTED,
33 CONF_SERVER_TRANSPORT_CODEC,
34 CONF_STREAM_IDLE_THRESHOLD,
35 CONF_USE_EXTERNAL_SERVER,
36 CONTROL_SCRIPT,
37 DEFAULT_SNAPSERVER_PORT,
38 MASS_ANNOUNCEMENT_POSTFIX,
39 MASS_STREAM_PREFIX,
40 SNAPWEB_DIR,
41)
42from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
43from music_assistant.providers.snapcast.player import SnapCastPlayer
44from music_assistant.providers.universal_group.constants import UGP_PREFIX
45
46if TYPE_CHECKING:
47 from music_assistant_models.player import PlayerMedia
48
49 from .snap_cntrl_proto import SnapclientProto, SnapgroupProto, SnapserverProto
50
51
52async def _create_cntrl_server(
53 loop: asyncio.AbstractEventLoop,
54 host: str,
55 port: int = CONTROL_PORT,
56 reconnect: bool = False,
57) -> SnapserverProto:
58 """Server factory."""
59 server = Snapserver(loop, host, port, reconnect)
60 await server.start()
61 return cast("SnapserverProto", server)
62
63
64class SnapCastProvider(PlayerProvider):
65 """SnapCastProvider."""
66
67 _snapserver: SnapserverProto
68 _snapserver_runner: asyncio.Task[None] | None
69 _snapserver_started: asyncio.Event | None
70 _snapcast_server_host: str
71 _snapcast_server_control_port: int
72 _ids_map: bidict[str, str] # ma_id / snapclient_id
73 _use_builtin_server: bool
74 _stop_called: bool
75 _controlscript_available: bool
76 _snapcast_ma_streams: dict[str, SnapcastMAStream]
77 _snapcast_ma_streams_lock: asyncio.Lock
78
79 @property
80 def use_queue_control(self) -> bool:
81 """Return whether queue-based control scripts are available.
82
83 Indicates if the Snapcast control script has been successfully initialized
84 and can be used to control playback via a queue-specific control channel.
85 """
86 return self._controlscript_available
87
88 async def handle_async_init(self) -> None:
89 """Handle async initialization of the provider."""
90 # set snapcast logging
91 logging.getLogger("snapcast").setLevel(self.logger.level)
92 self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER)
93 self._stop_called = False
94 self._controlscript_available = False
95 if self._use_builtin_server:
96 self._snapcast_server_host = "127.0.0.1"
97 self._snapcast_server_control_port = DEFAULT_SNAPSERVER_PORT
98 self._snapcast_server_buffer_size = cast(
99 "int", self.config.get_value(CONF_SERVER_BUFFER_SIZE)
100 )
101 self._snapcast_server_chunk_ms = self.config.get_value(CONF_SERVER_CHUNK_MS)
102 self._snapcast_server_initial_volume = self.config.get_value(CONF_SERVER_INITIAL_VOLUME)
103 self._snapcast_server_send_to_muted = self.config.get_value(
104 CONF_SERVER_SEND_AUDIO_TO_MUTED
105 )
106 self._snapcast_server_transport_codec = self.config.get_value(
107 CONF_SERVER_TRANSPORT_CODEC
108 )
109 else:
110 self._snapcast_server_host = str(self.config.get_value(CONF_SERVER_HOST))
111 self._snapcast_server_control_port = int(
112 str(self.config.get_value(CONF_SERVER_CONTROL_PORT))
113 )
114 self._snapcast_stream_idle_threshold = self.config.get_value(CONF_STREAM_IDLE_THRESHOLD)
115 self._ids_map = bidict({})
116
117 self._snapcast_ma_streams = {}
118 self._snapcast_ma_streams_lock = asyncio.Lock()
119
120 if self._use_builtin_server:
121 await self._start_builtin_server()
122 else:
123 self._snapserver_runner = None
124 self._snapserver_started = None
125 try:
126 self._snapserver = await _create_cntrl_server(
127 self.mass.loop,
128 self._snapcast_server_host,
129 port=self._snapcast_server_control_port,
130 reconnect=True,
131 )
132 self._snapserver.set_on_update_callback(self._handle_update)
133 self.logger.info(
134 "Started connection to Snapserver %s",
135 f"{self._snapcast_server_host}:{self._snapcast_server_control_port}",
136 )
137 # register callback for when the connection gets lost to the snapserver
138 self._snapserver.set_on_disconnect_callback(self._handle_disconnect)
139
140 except OSError as err:
141 msg = "Unable to start the Snapserver connection ?"
142 raise SetupFailedError(msg) from err
143
144 async def loaded_in_mass(self) -> None:
145 """Call after the provider has been loaded."""
146 await super().loaded_in_mass()
147 # initial load of players
148 self._handle_update()
149
150 async def unload(self, is_removed: bool = False) -> None:
151 """Handle close/cleanup of the provider."""
152 self._stop_called = True
153
154 for snap_client in self._snapserver.clients:
155 player_id = self._get_ma_id(snap_client.identifier)
156 if not (player := self.mass.players.get(player_id, raise_unavailable=False)):
157 continue
158 if player.playback_state != PlaybackState.PLAYING:
159 continue
160 await player.stop()
161
162 for stream_name in list(self._snapcast_ma_streams):
163 await self.delete_ma_stream(stream_name)
164
165 self._snapserver.stop()
166 await self._stop_builtin_server()
167
168 async def _start_builtin_server(self) -> None:
169 """Start the built-in Snapserver."""
170 if self._use_builtin_server:
171 self._snapserver_started = asyncio.Event()
172 self._snapserver_runner = self.mass.create_task(self._builtin_server_runner())
173 await asyncio.wait_for(self._snapserver_started.wait(), 10)
174
175 async def _stop_builtin_server(self) -> None:
176 """Stop the built-in Snapserver."""
177 self.logger.info("Stopping, built-in Snapserver")
178 if self._snapserver_runner and not self._snapserver_runner.done():
179 self._snapserver_runner.cancel()
180 if self._snapserver_started is not None:
181 self._snapserver_started.clear()
182
183 def _setup_controlscript(self) -> bool:
184 """Copy control script to plugin directory (blocking I/O).
185
186 :return: True if successful, False otherwise.
187 """
188 plugin_dir = Path("/usr/share/snapserver/plug-ins")
189 control_dest = plugin_dir / "control.py"
190 logger = self.logger.getChild("snapserver")
191 try:
192 plugin_dir.mkdir(parents=True, exist_ok=True)
193 # Clean up existing file
194 control_dest.unlink(missing_ok=True)
195 if not CONTROL_SCRIPT.exists():
196 logger.warning("Control script does not exist: %s", CONTROL_SCRIPT)
197 return False
198 # Copy the control script to the plugin directory
199 shutil.copy2(CONTROL_SCRIPT, control_dest)
200 # Ensure it's executable
201 control_dest.chmod(0o755)
202 logger.debug("Copied controlscript to: %s", control_dest)
203 return True
204 except (OSError, PermissionError) as err:
205 logger.warning(
206 "Could not copy controlscript (metadata/control disabled): %s",
207 err,
208 )
209 return False
210
211 async def _builtin_server_runner(self) -> None:
212 """Start running the builtin snapserver."""
213 assert self._snapserver_started is not None # for type checking
214 if self._snapserver_started.is_set():
215 raise RuntimeError("Snapserver is already started!")
216 logger = self.logger.getChild("snapserver")
217 logger.info("Starting builtin Snapserver...")
218 # register the snapcast mdns services
219 for name, port in (
220 ("-http", 1780),
221 ("-jsonrpc", 1705),
222 ("-stream", 1704),
223 ("-tcp", 1705),
224 ("", 1704),
225 ):
226 zeroconf_type = f"_snapcast{name}._tcp.local."
227 try:
228 info = AsyncServiceInfo(
229 zeroconf_type,
230 name=f"Snapcast.{zeroconf_type}",
231 properties={"is_mass": "true"},
232 addresses=[await get_ip_pton(str(self.mass.streams.publish_ip))],
233 port=port,
234 server=f"{socket.gethostname()}.local",
235 )
236 attr_name = f"zc_service_set{name}"
237 if getattr(self, attr_name, None):
238 await self.mass.aiozc.async_update_service(info)
239 else:
240 await self.mass.aiozc.async_register_service(info, strict=False)
241 setattr(self, attr_name, True)
242 except NonUniqueNameException:
243 self.logger.debug(
244 "Could not register mdns record for %s as its already in use",
245 zeroconf_type,
246 )
247 except Exception as err:
248 self.logger.exception(
249 "Could not register mdns record for %s: %s", zeroconf_type, str(err)
250 )
251
252 args = [
253 "snapserver",
254 # config settings taken from
255 # https://raw.githubusercontent.com/badaix/snapcast/86cd4b2b63e750a72e0dfe6a46d47caf01426c8d/server/etc/snapserver.conf
256 f"--server.datadir={self.mass.storage_path}",
257 "--http.enabled=true",
258 "--http.port=1780",
259 f"--http.doc_root={SNAPWEB_DIR}",
260 "--tcp.enabled=true",
261 f"--tcp.port={self._snapcast_server_control_port}",
262 "--stream.sampleformat=48000:16:2",
263 f"--stream.buffer={self._snapcast_server_buffer_size}",
264 f"--stream.chunk_ms={self._snapcast_server_chunk_ms}",
265 f"--stream.codec={self._snapcast_server_transport_codec}",
266 f"--stream.send_to_muted={str(self._snapcast_server_send_to_muted).lower()}",
267 f"--streaming_client.initial_volume={self._snapcast_server_initial_volume}",
268 ]
269 async with AsyncProcess(args, stdout=True, name="snapserver") as snapserver_proc:
270 try:
271 # keep reading from stdout until exit
272 async for raw_data in snapserver_proc.iter_any():
273 text = raw_data.decode().strip()
274 for line in text.split("\n"):
275 logger.debug(line)
276 if "(Snapserver) Version 0." in line:
277 # delay init a small bit to prevent race conditions
278 # where we try to connect too soon
279 self.mass.loop.call_later(2, self._snapserver_started.set)
280 # Copy control script after snapserver starts
281 # (run in executor to avoid blocking)
282 loop = asyncio.get_running_loop()
283 self._controlscript_available = await loop.run_in_executor(
284 None, self._setup_controlscript
285 )
286 except asyncio.CancelledError:
287 # Currently, MA doesn't guarantee a defined shutdown order;
288 # Make sure to close socket servers before
289 # shutting down the snapcast server.
290 #
291 # The snapserver doesn't always cleanup the control script processes
292 # properly. We do it explicitly when closing a socket server.
293 # Should be fixed on the server side, though.
294 for stream_name in list(self._snapcast_ma_streams):
295 await self.delete_ma_stream(stream_name)
296 self._snapcast_ma_streams.clear()
297 raise
298
299 def _get_ma_id(self, snap_client_id: str) -> str:
300 search_dict = self._ids_map.inverse
301 ma_id = search_dict.get(snap_client_id)
302 assert ma_id is not None # for type checking
303 return ma_id
304
305 def _get_snapclient_id(self, player_id: str) -> str:
306 search_dict = self._ids_map
307 snap_id = search_dict.get(player_id)
308 assert snap_id is not None # for type checking
309 return snap_id
310
311 def _generate_and_register_id(self, snap_client_id: str) -> str:
312 search_dict = self._ids_map.inverse
313 if snap_client_id not in search_dict:
314 new_id = "ma_" + str(re.sub(r"\W+", "", snap_client_id))
315 self._ids_map[new_id] = snap_client_id
316 return new_id
317 return self._get_ma_id(snap_client_id)
318
319 def _handle_player_init(self, snap_client: SnapclientProto) -> SnapCastPlayer:
320 """Process Snapcast add to Player controller."""
321 player_id = self._generate_and_register_id(snap_client.identifier)
322 player = self.mass.players.get(player_id, raise_unavailable=False)
323 if not player:
324 snap_client = self._snapserver.client(self._get_snapclient_id(player_id))
325 player = SnapCastPlayer(
326 provider=self,
327 player_id=player_id,
328 snap_client=snap_client,
329 )
330 player.setup()
331 else:
332 player = cast("SnapCastPlayer", player) # for type checking
333 asyncio.run_coroutine_threadsafe(
334 self.mass.players.register_or_update(player), loop=self.mass.loop
335 )
336 return player
337
338 def _handle_update(self) -> None:
339 """Process Snapcast init Player/Group and set callback ."""
340 for snap_client in self._snapserver.clients:
341 if not snap_client.identifier:
342 self.logger.warning(
343 "Detected Snapclient %s without identifier, skipping", snap_client.friendly_name
344 )
345 continue
346 if ma_player := self._handle_player_init(snap_client):
347 snap_client.set_callback(ma_player._handle_player_update)
348 for snap_client in self._snapserver.clients:
349 if player := self.get_snap_player(client_id=snap_client.identifier):
350 snap_client.set_callback(player._handle_player_update)
351 self._update_group_callbacks()
352
353 def poke_group_members(self, snap_group: SnapgroupProto) -> None:
354 """Process Snapcast group callback."""
355 for snap_client_id in snap_group.clients:
356 if ma_player := self.get_snap_player(client_id=snap_client_id):
357 ma_player.poke_player_update()
358
359 def _handle_disconnect(self, exc: Exception) -> None:
360 """Handle disconnect callback from snapserver."""
361 if self._stop_called or self.mass.closing:
362 # prevent auto-reconnecting of snapcast controller
363 self._snapserver.stop()
364 # we're instructed to stop/exit, so no need to restart the connection
365 return
366 self.logger.info(
367 "Connection to SnapServer lost, reason: %s. Reloading provider in 5 seconds.",
368 str(exc),
369 )
370 # schedule a reload of the provider
371 self.mass.call_later(5, self.mass.load_provider, self.instance_id, allow_retry=True)
372
373 async def remove_player(self, player_id: str) -> None:
374 """Remove the client from the snapserver when it is deleted."""
375 success, error_msg = await self._snapserver.delete_client(
376 self._get_snapclient_id(player_id)
377 )
378 if success:
379 self.logger.debug("Snapclient removed %s", player_id)
380 else:
381 self.logger.warning("Unable to remove snapclient %s: %s", player_id, error_msg)
382
383 def _update_group_callbacks(self, poke: bool = False) -> None:
384 for grp in self._snapserver.groups:
385 grp.set_callback(self.poke_group_members)
386 if poke:
387 self.poke_group_members(grp)
388
389 async def ensure_player_owned_group(
390 self, ma_player_id: str, set_stream_id: str | None = None
391 ) -> SnapgroupProto | None:
392 """Ensure a Snapcast group is owned by the given player.
393
394 This method guarantees that the returned Snapcast group is *owned* by the
395 specified Music Assistant player, meaning the group name equals the
396 player's ID and the player is the group leader.
397
398 Behavior:
399 - If the player is already the leader of its current group, that group is
400 returned unchanged.
401 - If the player is a member of another group (but not the leader), the
402 player is removed from that group, which causes Snapcast to create a new
403 single-client group for the player.
404 - The resulting group is renamed to the player's ID.
405
406 If `set_stream_id` is provided and a new group is created, the group's
407 stream is updated accordingly.
408
409 Args:
410 ma_player_id: Music Assistant player ID.
411 set_stream_id: Optional Snapcast stream ID to assign to the player's group.
412
413 Returns:
414 The Snapcast group owned by the player, or ``None`` if the player is not
415 currently part of any group.
416 """
417 player_client = self.get_snap_client(player_id=ma_player_id)
418 if player_client is None:
419 return None
420
421 curr_group = player_client.group
422
423 if curr_group is None:
424 return None
425
426 if curr_group.name == ma_player_id:
427 return curr_group
428
429 group_members = list(curr_group.clients)
430 if len(group_members) > 1 and curr_group.name:
431 # player is member of other player group, remove it, which results in a new group
432 group_members.remove(player_client.identifier)
433 res = await self._snapserver.group_clients(curr_group.identifier, group_members)
434 if not (isinstance(res, dict) and "server" in res):
435 raise RuntimeError("Couldn't remove client from group")
436 self._snapserver.synchronize(res)
437 curr_group = player_client.group
438 if curr_group is None:
439 return None
440 if set_stream_id:
441 await curr_group.set_stream(set_stream_id)
442
443 await curr_group.set_name(ma_player_id)
444 return curr_group
445
446 async def isolate_player_to_dedicated_group(
447 self,
448 target_player_id: str,
449 target_stream_id: str | None = None,
450 others_stream_id: str | None = "default",
451 ) -> None:
452 """Isolate a player into a dedicated Snapcast group.
453
454 Ensures that the target player ends up in a group where it is the sole
455 member and group leader.
456
457 Behavior:
458 - The target player is first ensured to own its group.
459 - All other members of that group are removed.
460 - Each removed player is placed into its own dedicated group.
461 - Removed players' groups are optionally assigned `others_stream_id`.
462 - The target group is optionally assigned `target_stream_id`.
463
464 Callbacks for affected clients and groups are temporarily disabled during
465 the operation to avoid intermediate state updates.
466
467 Args:
468 target_player_id: Music Assistant player ID to isolate.
469 target_stream_id: Optional stream ID to assign to the target player's group.
470 others_stream_id: Stream ID assigned to newly created groups for removed players.
471 """
472 this_client_id = self._get_snapclient_id(target_player_id)
473 target_group = await self.ensure_player_owned_group(
474 target_player_id, set_stream_id=target_stream_id
475 )
476
477 if target_group is None:
478 return
479
480 target_group.set_callback(None)
481 group_members = list(target_group.clients)
482 group_members.remove(this_client_id)
483 for client_id in group_members:
484 client = self._snapserver.client(client_id)
485 client.set_callback(None)
486 if group_members:
487 res = await self._snapserver.group_clients(target_group.identifier, [this_client_id])
488 if not (isinstance(res, dict) and "server" in res):
489 raise RuntimeError("Couldn't remove client from group")
490 self._snapserver.synchronize(res)
491 for client_id in group_members:
492 ma_player_id = self._get_ma_id(client_id)
493 if ma_player := cast("SnapCastPlayer", self.mass.players.get(ma_player_id)):
494 client = self._snapserver.client(client_id)
495 if client is not None:
496 if client.group is not None:
497 await client.group.set_name(ma_player_id)
498 if others_stream_id:
499 await client.group.set_stream(others_stream_id)
500 client.set_callback(ma_player._handle_player_update)
501
502 if target_stream_id is not None:
503 await target_group.set_stream(target_stream_id)
504
505 async def get_snapcast_media_stream(
506 self,
507 media: PlayerMedia,
508 filter_settings_owner: str | None = None,
509 existing_only: bool = False,
510 ) -> SnapcastMAStream | None:
511 """Get or create a Snapcast Music Assistant stream for the given media.
512
513 Determines a deterministic Snapcast stream name based on the media type
514 and source, and either returns an existing stream or creates a new one.
515
516 Behavior:
517 - Announcement and generic media streams use a hashed name.
518 - Plugin and queue-backed sources reuse a stable stream name.
519 - Queue-backed streams may persist across playback sessions.
520 - If `existing_only` is True, no new stream will be created.
521
522 Newly created streams are registered with the Snapcast server and fully
523 set up before being returned.
524
525 Args:
526 media: Media item to stream.
527 filter_settings_owner: Optional player/entity ID used to resolve DSP filters.
528 existing_only: If True, only return an existing stream.
529
530 Returns:
531 A ``SnapcastMAStream`` instance, or ``None`` if no stream exists and
532 `existing_only` is True.
533 """
534 stream_name: str = ""
535 name_suffix: str = ""
536 queue_id: str | None = None
537 source_id: str | None = None
538 destroy_on_stop = True
539
540 if media.media_type == MediaType.ANNOUNCEMENT:
541 stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
542 name_suffix = MASS_ANNOUNCEMENT_POSTFIX
543 elif media.media_type == MediaType.PLUGIN_SOURCE:
544 custom_data = media.custom_data or {}
545 plugin: str = media.title or custom_data.get("provider") or ""
546 player: str = f" {custom_data.get('player_id', '')}"
547 stream_name += f"{plugin} {player}"
548 source_id = custom_data.get("source_id")
549 elif media.source_id and media.source_id.startswith(UGP_PREFIX):
550 stream_name += media.source_id
551 elif media.source_id and media.queue_item_id:
552 stream_name += media.source_id
553 queue_id = media.source_id
554 source_id = media.source_id
555 destroy_on_stop = False
556 else:
557 stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
558
559 stream_name = create_safe_string(stream_name, lowercase=False)
560 stream_name = f"{MASS_STREAM_PREFIX}{stream_name}{name_suffix}"
561 async with self._snapcast_ma_streams_lock:
562 if not (stream := self._snapcast_ma_streams.get(stream_name)):
563 if existing_only:
564 return None
565
566 stream = SnapcastMAStream(
567 provider=self,
568 media=media,
569 stream_name=stream_name,
570 filter_settings_owner=filter_settings_owner,
571 source_id=source_id,
572 use_cntrl_script=bool(queue_id) and self.use_queue_control,
573 destroy_on_stop=destroy_on_stop,
574 )
575 self._snapcast_ma_streams[stream_name] = stream
576 else:
577 stream.update_media(media)
578 await stream.setup()
579 return stream
580
581 def get_snap_ma_stream(self, stream_name: str) -> SnapcastMAStream | None:
582 """Return an existing Music Assistant Snapcast stream by name.
583
584 Args:
585 stream_name: Snapcast stream name.
586
587 Returns:
588 The corresponding ``SnapcastMAStream`` instance, or ``None`` if not found.
589 """
590 return self._snapcast_ma_streams.get(stream_name)
591
592 async def delete_ma_stream(self, stream_name: str) -> None:
593 """Remove and destroy a Music Assistant Snapcast stream.
594
595 The stream is removed from internal tracking and its resources are
596 destroyed asynchronously. Errors during destruction are logged but
597 otherwise ignored.
598
599 Args:
600 stream_name: Snapcast stream name to delete.
601 """
602 async with self._snapcast_ma_streams_lock:
603 stream = self._snapcast_ma_streams.pop(stream_name, None)
604
605 if not stream:
606 return
607
608 try:
609 await stream.destroy()
610 except Exception:
611 self.logger.exception("Failed to destroy stream session %s", stream_name)
612
613 def update_stream_usage(self) -> None:
614 """Update usage state for all tracked Snapcast streams.
615
616 Marks streams as "in use" if they are currently assigned to any Snapcast
617 group, and schedules unused streams for delayed shutdown.
618
619 This method should be called whenever group or stream assignments change
620 on the Snapcast server.
621 """
622 unused_streams = set(self._snapcast_ma_streams.keys())
623 for grp in self._snapserver.groups:
624 stream_id = grp.stream
625 if stream_id in self._snapcast_ma_streams:
626 ma_stream = self._snapcast_ma_streams[stream_id]
627 ma_stream.set_in_use(True)
628 unused_streams.discard(stream_id)
629
630 if not unused_streams:
631 break
632
633 for stream_id in unused_streams:
634 self._snapcast_ma_streams[stream_id].set_in_use(False)
635
636 def get_snap_client(
637 self, *, client_id: str | None = None, player_id: str | None = None
638 ) -> SnapclientProto | None:
639 """Return the snapclient for either given client_id or player_id."""
640 if player_id is not None:
641 if client_id is not None and client_id != self._get_snapclient_id(client_id):
642 raise ValueError("provided client_id and player_id do not match")
643 client_id = self._get_snapclient_id(player_id)
644
645 if client_id:
646 with suppress(KeyError):
647 return self._snapserver.client(client_id)
648
649 return None
650
651 def get_snap_player(
652 self, *, client_id: str | None = None, player_id: str | None = None
653 ) -> SnapCastPlayer | None:
654 """Return the MA SnapCastPlayer for either given client_id or player_id."""
655 if client_id is not None:
656 if player_id is not None and player_id != self._get_ma_id(client_id):
657 raise ValueError("provided client_id and player_id do not match")
658 player_id = self._get_ma_id(client_id)
659
660 if player_id is None:
661 return None
662
663 if ma_player := self.mass.players.get(player_id):
664 assert isinstance(ma_player, SnapCastPlayer) # for type checking
665 return ma_player
666
667 return None
668