/
/
/
1"""Snapcast Player."""
2
3from __future__ import annotations
4
5import asyncio
6from contextlib import suppress
7from typing import TYPE_CHECKING, TypedDict, cast
8
9from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
10from music_assistant_models.enums import (
11 IdentifierType,
12 MediaType,
13 PlaybackState,
14 PlayerFeature,
15 PlayerType,
16)
17from music_assistant_models.player import DeviceInfo, PlayerMedia
18from propcache import under_cached_property as cached_property
19
20from music_assistant.constants import ATTR_ANNOUNCEMENT_IN_PROGRESS, CONF_ENTRY_HTTP_PROFILE_HIDDEN
21from music_assistant.helpers.util import is_valid_mac_address
22from music_assistant.models.player import Player
23from music_assistant.providers.snapcast.constants import CONF_ENTRY_SAMPLE_RATES_SNAPCAST
24from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
25from music_assistant.providers.sync_group.constants import SGP_PREFIX
26
27if TYPE_CHECKING:
28 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
29
30 from music_assistant.providers.snapcast.provider import SnapCastProvider
31 from music_assistant.providers.snapcast.snap_cntrl_proto import SnapclientProto, SnapstreamProto
32
33
34class TrackedPlayerState(TypedDict, total=False):
35 """Tracked state for the Snapcast MA player.
36
37 It is used for change detection and state synchronization, and may be
38 partially populated depending on which information is
39 currently available.
40
41 Keys prefixed with ``_attr_`` are exposed as player attributes, while the
42 remaining keys represent internal Snapcast grouping and connection state.
43 """
44
45 # Player attribute fields
46 _attr_name: str
47 _attr_volume_level: float
48 _attr_volume_muted: bool
49 _attr_available: bool
50
51 # snapclient fields
52 connected: bool
53 stream_id: str
54 stream_status: str | None
55 grp_name: str
56 grp_member_ids: list[str]
57 grp_member_avail: list[bool]
58
59
60class SnapCastPlayer(Player):
61 """SnapCastPlayer."""
62
63 _attr_type = PlayerType.PROTOCOL
64
65 def __init__(
66 self,
67 provider: SnapCastProvider,
68 player_id: str,
69 snap_client: SnapclientProto,
70 ) -> None:
71 """Init."""
72 self.snap_client = snap_client
73 super().__init__(provider, player_id)
74
75 self._snap_ma_stream: SnapcastMAStream | None = None
76
77 self._update_worker: asyncio.Task[None] | None = None
78 self._poke_evt = asyncio.Event()
79 self._state_update_lock = asyncio.Lock()
80 self._last_tracked_state: TrackedPlayerState | None = None
81
82 @property
83 def snap_provider(self) -> SnapCastProvider:
84 """Return the Snapcast provider instance."""
85 return cast("SnapCastProvider", self.provider)
86
87 @property
88 def requires_flow_mode(self) -> bool:
89 """Return if the player requires flow mode."""
90 return True
91
92 @cached_property
93 def synced_to(self) -> str | None:
94 """Return the id of the player this player is synced to (sync leader)."""
95 grp_name = self.snap_group_name
96 if grp_name == self.player_id:
97 # is group leader
98 return None
99
100 grp_player_ids = self._get_player_ids_of_curr_group()
101 if len(grp_player_ids) < 2 or grp_name not in grp_player_ids:
102 return None
103
104 if leader_player := self.mass.players.get_player(grp_name):
105 return grp_name if leader_player.available else None
106
107 return None
108
109 @cached_property
110 def group_members(self) -> list[str]:
111 """Return the group members of the player."""
112 if not self._attr_available:
113 return []
114
115 grp_name = self.snap_group_name
116 if grp_name != self.player_id:
117 # only group leaders can have members
118 return []
119
120 player_ids = self._get_player_ids_of_curr_group()
121 if self.player_id not in player_ids:
122 # should not happen, unless the current
123 # state repr is invalid
124 return []
125
126 player_ids.remove(self.player_id)
127 connected = [
128 player_id
129 for player_id in player_ids
130 if (client := self.snap_provider.get_snap_client(player_id=player_id))
131 and client.connected
132 ]
133 if connected:
134 return [self.player_id, *connected]
135
136 return []
137
138 @property
139 def playback_state(self) -> PlaybackState:
140 """Return the current playback state of the player."""
141 snap_stream = self._get_active_snapstream()
142 if snap_stream is None:
143 return PlaybackState.IDLE
144
145 if snap_stream.identifier == "default" or snap_stream.status == "idle":
146 return PlaybackState.IDLE
147
148 return PlaybackState.PLAYING
149
150 @property
151 def elapsed_time(self) -> float | None:
152 """Return the elapsed time in (fractional) seconds of the current track (if any)."""
153 # using flow-mode, elapsed time will be estimated upstream from 'elapsed_time_last_updated'
154 return 0 if self.active_snap_ma_stream else None
155
156 @property
157 def elapsed_time_last_updated(self) -> float | None:
158 """
159 Return when the elapsed time was last updated.
160
161 return: The (UTC) timestamp when the elapsed time was last updated,
162 or None if it was never updated (or unknown).
163 """
164 # we only update on playback starts
165 if snap_ma_stream := self.active_snap_ma_stream:
166 return snap_ma_stream.playback_started_at
167 return None
168
169 def setup(self) -> None:
170 """Set up player."""
171 self._attr_name = self.snap_client.friendly_name
172 self._attr_available = self.snap_client.connected
173
174 host_dict = self.snap_client._client.get("host", {})
175 os, arch, ip, mac = (host_dict.get(key, "") for key in ["os", "arch", "ip", "mac"])
176 self._attr_device_info = DeviceInfo(
177 model=os,
178 manufacturer=arch,
179 )
180 if ip and (host := self.snap_client._client.get("host")):
181 self._attr_device_info.add_identifier(IdentifierType.IP_ADDRESS, host.get("ip"))
182 # Only add MAC address if it's valid (not 00:00:00:00:00:00)
183 if mac and is_valid_mac_address(mac):
184 self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac)
185 self._attr_supported_features = {
186 PlayerFeature.PLAY_MEDIA,
187 PlayerFeature.SET_MEMBERS,
188 PlayerFeature.VOLUME_SET,
189 PlayerFeature.VOLUME_MUTE,
190 PlayerFeature.PLAY_ANNOUNCEMENT,
191 }
192 self._attr_can_group_with = {self.snap_provider.instance_id}
193 if not self._update_worker:
194 self._update_worker = self.mass.create_task(self._player_update_worker)
195
196 async def volume_set(self, volume_level: int) -> None:
197 """Send VOLUME_SET command to given player."""
198 # Use optimistic server state for now
199 # not guaranteed that the client respects it
200 await self.snap_client.set_volume(volume_level)
201
202 async def stop(self) -> None:
203 """Send STOP command to given player."""
204 if ma_stream := self.active_snap_ma_stream:
205 ma_stream.request_stop_stream()
206 return
207
208 self.poke_player_update()
209
210 async def volume_mute(self, muted: bool) -> None:
211 """Send MUTE command to given player."""
212 # Use optimistic server state for now
213 # not guaranteed that the client respects it
214 # TODO: move this to the snapcast python library
215 vol = self.snap_client._client["config"]["volume"]
216 vol["muted"] = muted
217 res = await self.snap_provider._snapserver.client_volume(self.snap_client.identifier, vol)
218 if res and "muted" in res:
219 self.snap_client._client["config"]["volume"] = res
220 self.snap_client.callback()
221
222 async def set_members(
223 self,
224 player_ids_to_add: list[str] | None = None,
225 player_ids_to_remove: list[str] | None = None,
226 ) -> None:
227 """Handle SET_MEMBERS command on the player."""
228 # get the group owned by this player (identified by the group name)
229 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
230
231 if player_group is None:
232 return
233
234 player_group.set_callback(None)
235
236 curr_ma_player_ids = [
237 ma_id
238 for cli_id in player_group.clients
239 if (ma_id := self.snap_provider._get_ma_id(cli_id))
240 ]
241
242 curr_stream_id = player_group.stream
243 if curr_ma_stream := self.snap_provider.get_snap_ma_stream(curr_stream_id):
244 media = curr_ma_stream.media
245 if media.media_type == MediaType.PLUGIN_SOURCE:
246 custom_data = media.custom_data or {}
247 assigned_player = custom_data.get("player_id", "")
248 if assigned_player.startswith(SGP_PREFIX):
249 sync_group_player = self.mass.players.get_player(assigned_player)
250 else:
251 media_src_id = media.source_id or ""
252 if media_src_id.startswith(SGP_PREFIX):
253 sync_group_player = self.mass.players.get_player(media_src_id)
254 if sync_group_player and self.player_id in (player_ids_to_remove or []):
255 # players in sync_group_player.group_members will be rejoined
256 # remove others first
257 for id_to_remove in player_ids_to_remove or []:
258 if id_to_remove == self.player_id:
259 continue
260 if (
261 id_to_remove in curr_ma_player_ids
262 and id_to_remove not in sync_group_player.group_members
263 ):
264 await self.snap_provider.isolate_player_to_dedicated_group(
265 id_to_remove, target_stream_id="default"
266 )
267
268 # split remaining group into individual groups,
269 # keeps the current stream, set this group to default stream
270 await self.snap_provider.isolate_player_to_dedicated_group(
271 target_player_id=self.player_id,
272 target_stream_id="default",
273 others_stream_id=curr_stream_id,
274 )
275 else:
276 for player_id in player_ids_to_remove or []:
277 if player_id not in curr_ma_player_ids:
278 continue
279 await self.snap_provider.isolate_player_to_dedicated_group(
280 player_id, target_stream_id="default"
281 )
282 curr_ma_player_ids.remove(player_id)
283
284 for ma_id in player_ids_to_add or []:
285 if (
286 snap_id := self.snap_provider._get_snapclient_id(ma_id)
287 ) and ma_id not in curr_ma_player_ids:
288 await player_group.add_client(snap_id)
289
290 # some caller require instant state updates before returning
291 async with self._state_update_lock:
292 if await self._process_snapcast_client_state():
293 self.update_state()
294
295 self.snap_provider._update_group_callbacks(poke=True)
296
297 async def play_media(self, media: PlayerMedia) -> None:
298 """Handle PLAY MEDIA on given player."""
299 if self.synced_to:
300 msg = "A synced player cannot receive play commands directly"
301 raise RuntimeError(msg)
302
303 ma_stream = await self.snap_provider.get_snapcast_media_stream(
304 media, filter_settings_owner=self.player_id
305 )
306
307 if ma_stream is None or ma_stream.stream_id is None:
308 return
309
310 self._snap_ma_stream = ma_stream
311
312 # e.g. DSP settings require a restart
313 await self._snap_ma_stream.start_stream(allow_restart=True)
314
315 # if no announcement is playing we activate the stream now, otherwise it
316 # will be activated by play_announcement when the announcement is over.
317 if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
318 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
319 assert player_group is not None # for type checking
320 await player_group.set_stream(ma_stream.stream_id)
321
322 self.poke_player_update()
323
324 async def play_announcement(
325 self, announcement: PlayerMedia, volume_level: int | None = None
326 ) -> None:
327 """Handle (provider native) playback of an announcement on given player."""
328 was_synced_to: str | None = self.synced_to
329 orig_volume_level: int | None = self.volume_level
330
331 prev_stream = self.active_snap_ma_stream
332
333 ma_stream = await self.snap_provider.get_snapcast_media_stream(
334 announcement, filter_settings_owner=self.player_id
335 )
336 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
337
338 if ma_stream is None or ma_stream.stream_id is None or player_group is None:
339 return
340
341 await player_group.set_stream(ma_stream.stream_id)
342
343 if self.snap_provider._use_builtin_server:
344 await asyncio.sleep(self.snap_provider._snapcast_server_buffer_size / 1000.0)
345
346 if volume_level is not None:
347 await self.volume_set(volume_level)
348
349 await ma_stream.start_stream()
350 await ma_stream.wait_for_stopped()
351
352 if self.volume_level == volume_level and orig_volume_level is not None:
353 await self.volume_set(orig_volume_level)
354
355 if was_synced_to:
356 if (
357 leader_group := await self.snap_provider.ensure_player_owned_group(was_synced_to)
358 ) is None:
359 return
360 await leader_group.add_client(self.snap_client.identifier)
361 else:
362 await player_group.set_stream(
363 prev_stream.stream_id
364 if prev_stream and prev_stream.stream_id is not None
365 else "default"
366 )
367
368 async def get_config_entries(
369 self,
370 action: str | None = None,
371 values: dict[str, ConfigValueType] | None = None,
372 ) -> list[ConfigEntry]:
373 """Player config."""
374 return [
375 CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
376 # we don't use the http server for streaming
377 CONF_ENTRY_HTTP_PROFILE_HIDDEN,
378 ]
379
380 def _handle_player_update(self, snap_client: SnapclientProto) -> None:
381 """Forward snap_client updates."""
382 self.poke_player_update()
383
384 def poke_player_update(self) -> None:
385 """Signal that a player state update should be processed."""
386 self._poke_evt.set()
387
388 async def _player_update_worker(self) -> None:
389 """Aggregate and process player state update requests."""
390 while True:
391 await self._poke_evt.wait()
392 self._poke_evt.clear()
393 while True:
394 call_update: bool = False
395 async with self._state_update_lock:
396 call_update = await self._process_snapcast_client_state()
397 if call_update:
398 self.update_state()
399 if self._poke_evt.is_set():
400 self._poke_evt.clear()
401 continue
402 break
403
404 async def _process_snapcast_client_state(self) -> bool:
405 """Process the latest Snapcast client state and apply changes to this player.
406
407 Returns:
408 True if changes were applied and a state update should be emitted via
409 ``update_state()``; False if no update is necessary (or if required data
410 is temporarily unavailable and the update should be retried later).
411 """
412 snap_group = self.snap_client.group
413 if snap_group is None:
414 # some data syncing error, a client is always a group member
415 # retry again later, don't call update now
416 return False
417
418 stream_id = snap_group.stream
419 snap_stream: SnapstreamProto | None = None
420 with suppress(KeyError):
421 snap_stream = self.snap_provider._snapserver.stream(stream_id)
422
423 members = list(snap_group.clients) # snapshot
424
425 curr_state: TrackedPlayerState = {
426 "_attr_name": self.snap_client.friendly_name,
427 "_attr_volume_level": self.snap_client.volume,
428 "_attr_volume_muted": self.snap_client.muted,
429 "_attr_available": self.snap_client.connected,
430 "connected": self.snap_client.connected,
431 "stream_id": snap_group.stream,
432 "stream_status": snap_stream.status if snap_stream is not None else None,
433 "grp_name": snap_group.name,
434 "grp_member_ids": members,
435 "grp_member_avail": [
436 pl.available
437 for cl_id in members
438 if (pl_id := self.snap_provider._get_ma_id(cl_id))
439 and (pl := self.mass.players.get_player(pl_id))
440 ],
441 }
442
443 prev_state: TrackedPlayerState = (
444 self._last_tracked_state if self._last_tracked_state is not None else {}
445 )
446 self._last_tracked_state = curr_state
447
448 # change detection for simple attrs
449 changed_attrs = {
450 k: v for k, v in curr_state.items() if k.startswith("_attr_") and prev_state.get(k) != v
451 }
452
453 prev_connected = prev_state.get("connected", False)
454 now_connected = curr_state.get("connected", False)
455 connection_changed = prev_connected != now_connected
456
457 prev_stream_id = prev_state.get("stream_id")
458 curr_stream_id = curr_state["stream_id"]
459 prev_stream_status = prev_state.get("stream_status")
460 curr_stream_status = curr_state.get("stream_status")
461
462 stream_changed = (
463 prev_stream_id != curr_stream_id or prev_stream_status != curr_stream_status
464 )
465
466 grouping_changed = any(
467 prev_state.get(k) != curr_state.get(k)
468 for k in ("grp_name", "grp_member_ids", "grp_member_avail")
469 )
470
471 needs_processing = bool(
472 changed_attrs or grouping_changed or stream_changed or connection_changed
473 )
474 if not needs_processing:
475 return False
476
477 if connection_changed or grouping_changed:
478 self.snap_provider.poke_group_members(snap_group)
479
480 # help cleaning up unused streams
481 if curr_stream_id == "default" or (
482 (my_stream := self._snap_ma_stream)
483 and my_stream.stream_id in {prev_stream_id, curr_stream_id}
484 ):
485 self.snap_provider.update_stream_usage()
486
487 # apply changed attrs
488 for key, value in changed_attrs.items():
489 setattr(self, key, value)
490
491 # finally notify state update once
492 return True
493
494 @property
495 def active_snap_ma_stream(self) -> SnapcastMAStream | None:
496 """Return the MA stream source of the active group."""
497 grp = self.snap_client.group
498 if grp is None or grp.stream is None:
499 return None
500
501 if grp.stream == "default":
502 return None
503
504 return self.snap_provider.get_snap_ma_stream(grp.stream)
505
506 @property
507 def snap_group_name(self) -> str:
508 """Return the name of the active group."""
509 snap_group = self.snap_client.group
510 if snap_group is None:
511 return ""
512 return snap_group.name
513
514 @property
515 def current_media(self) -> PlayerMedia | None:
516 """Return the current media being played by the player."""
517 if snap_ma_stream := self.active_snap_ma_stream:
518 return snap_ma_stream.media
519 return None
520
521 @property
522 def active_source(self) -> str | None:
523 """Return the (id of) the active source of the player."""
524 grp = self.snap_client.group
525 if grp is None or grp.stream is None:
526 return None
527
528 if grp.stream == "default":
529 return None
530
531 if ma_stream := self.snap_provider.get_snap_ma_stream(grp.stream):
532 return ma_stream.source_id
533
534 # external snapcast stream
535 return grp.stream or None
536
537 def _get_active_snapstream(self) -> SnapstreamProto | None:
538 """Get active stream for given player_id."""
539 if group := self.snap_client.group:
540 with suppress(KeyError):
541 return self.snap_provider._snapserver.stream(group.stream)
542 return None
543
544 def _get_player_ids_of_curr_group(self) -> list[str]:
545 snap_group = self.snap_client.group
546 if snap_group is None:
547 return []
548 return [
549 ma_id
550 for client_id in snap_group.clients
551 if (ma_id := self.snap_provider._get_ma_id(client_id))
552 ]
553
554 def _get_players_of_curr_group(self) -> list[Player]:
555 return [
556 ma_player
557 for ma_id in self._get_player_ids_of_curr_group()
558 if (ma_player := self.mass.players.get_player(ma_id))
559 ]
560