/
/
/
1"""Snapcast Player."""
2
3from __future__ import annotations
4
5import asyncio
6from contextlib import suppress
7from typing import TYPE_CHECKING, TypedDict, cast
8
9from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
10from music_assistant_models.enums import (
11 IdentifierType,
12 MediaType,
13 PlaybackState,
14 PlayerFeature,
15 PlayerType,
16)
17from music_assistant_models.player import DeviceInfo, PlayerMedia
18from propcache import under_cached_property as cached_property
19
20from music_assistant.constants import ATTR_ANNOUNCEMENT_IN_PROGRESS, CONF_ENTRY_HTTP_PROFILE_HIDDEN
21from music_assistant.helpers.util import is_valid_mac_address
22from music_assistant.models.player import Player
23from music_assistant.providers.snapcast.constants import CONF_ENTRY_SAMPLE_RATES_SNAPCAST
24from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
25from music_assistant.providers.sync_group.constants import SGP_PREFIX
26
27if TYPE_CHECKING:
28 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
29
30 from music_assistant.providers.snapcast.provider import SnapCastProvider
31 from music_assistant.providers.snapcast.snap_cntrl_proto import SnapclientProto, SnapstreamProto
32
33
34class TrackedPlayerState(TypedDict, total=False):
35 """Tracked state for the Snapcast MA player.
36
37 It is used for change detection and state synchronization, and may be
38 partially populated depending on which information is
39 currently available.
40
41 Keys prefixed with ``_attr_`` are exposed as player attributes, while the
42 remaining keys represent internal Snapcast grouping and connection state.
43 """
44
45 # Player attribute fields
46 _attr_name: str
47 _attr_volume_level: float
48 _attr_volume_muted: bool
49 _attr_available: bool
50
51 # snapclient fields
52 connected: bool
53 stream_id: str
54 stream_status: str | None
55 grp_name: str
56 grp_member_ids: list[str]
57 grp_member_avail: list[bool]
58
59
60class SnapCastPlayer(Player):
61 """SnapCastPlayer."""
62
63 _attr_type = PlayerType.PROTOCOL
64
65 def __init__(
66 self,
67 provider: SnapCastProvider,
68 player_id: str,
69 snap_client: SnapclientProto,
70 ) -> None:
71 """Init."""
72 self.snap_client = snap_client
73 super().__init__(provider, player_id)
74
75 self._snap_ma_stream: SnapcastMAStream | None = None
76
77 self._update_worker: asyncio.Task[None] | None = None
78 self._poke_evt = asyncio.Event()
79 self._state_update_lock = asyncio.Lock()
80 self._last_tracked_state: TrackedPlayerState | None = None
81
82 @property
83 def snap_provider(self) -> SnapCastProvider:
84 """Return the Snapcast provider instance."""
85 return cast("SnapCastProvider", self.provider)
86
87 @property
88 def requires_flow_mode(self) -> bool:
89 """Return if the player requires flow mode."""
90 return True
91
92 @cached_property
93 def synced_to(self) -> str | None:
94 """Return the id of the player this player is synced to (sync leader)."""
95 grp_name = self.snap_group_name
96 if grp_name == self.player_id:
97 # is group leader
98 return None
99
100 grp_player_ids = self._get_player_ids_of_curr_group()
101 if len(grp_player_ids) < 2 or grp_name not in grp_player_ids:
102 return None
103
104 if leader_player := self.mass.players.get_player(grp_name):
105 return grp_name if leader_player.available else None
106
107 return None
108
109 @cached_property
110 def group_members(self) -> list[str]:
111 """Return the group members of the player."""
112 if not self._attr_available:
113 return []
114
115 grp_name = self.snap_group_name
116 if grp_name != self.player_id:
117 # only group leaders can have members
118 return []
119
120 player_ids = self._get_player_ids_of_curr_group()
121 if self.player_id not in player_ids:
122 # should not happen, unless the current
123 # state repr is invalid
124 return []
125
126 player_ids.remove(self.player_id)
127 connected = [
128 player_id
129 for player_id in player_ids
130 if (client := self.snap_provider.get_snap_client(player_id=player_id))
131 and client.connected
132 ]
133 if connected:
134 return [self.player_id, *connected]
135
136 return []
137
138 @property
139 def playback_state(self) -> PlaybackState:
140 """Return the current playback state of the player."""
141 snap_stream = self._get_active_snapstream()
142 if snap_stream is None:
143 return PlaybackState.IDLE
144
145 if snap_stream.identifier == "default" or snap_stream.status == "idle":
146 return PlaybackState.IDLE
147
148 return PlaybackState.PLAYING
149
150 @property
151 def elapsed_time(self) -> float | None:
152 """Return the elapsed time in (fractional) seconds of the current track (if any)."""
153 # using flow-mode, elapsed time will be estimated upstream from 'elapsed_time_last_updated'
154 return 0 if self.active_snap_ma_stream else None
155
156 @property
157 def elapsed_time_last_updated(self) -> float | None:
158 """
159 Return when the elapsed time was last updated.
160
161 return: The (UTC) timestamp when the elapsed time was last updated,
162 or None if it was never updated (or unknown).
163 """
164 # we only update on playback starts
165 if snap_ma_stream := self.active_snap_ma_stream:
166 return snap_ma_stream.playback_started_at
167 return None
168
169 def setup(self) -> None:
170 """Set up player."""
171 self._attr_name = self.snap_client.friendly_name
172 self._attr_available = self.snap_client.connected
173
174 host_dict = self.snap_client._client.get("host", {})
175 os, arch, ip, mac = (host_dict.get(key, "") for key in ["os", "arch", "ip", "mac"])
176 self._attr_device_info = DeviceInfo(
177 model=os,
178 manufacturer=arch,
179 )
180 if ip and (host := self.snap_client._client.get("host")):
181 self._attr_device_info.add_identifier(IdentifierType.IP_ADDRESS, host.get("ip"))
182 # Only add MAC address if it's valid (not 00:00:00:00:00:00)
183 if mac and is_valid_mac_address(mac):
184 self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac)
185 self._attr_supported_features = {
186 PlayerFeature.PLAY_MEDIA,
187 PlayerFeature.SET_MEMBERS,
188 PlayerFeature.VOLUME_SET,
189 PlayerFeature.VOLUME_MUTE,
190 PlayerFeature.PLAY_ANNOUNCEMENT,
191 }
192 self._attr_can_group_with = {self.snap_provider.instance_id}
193 if not self._update_worker:
194 self._update_worker = self.mass.create_task(self._player_update_worker)
195
196 async def volume_set(self, volume_level: int) -> None:
197 """Send VOLUME_SET command to given player."""
198 # Use optimistic server state for now
199 # not guaranteed that the client respects it
200 await self.snap_client.set_volume(volume_level)
201
202 async def stop(self) -> None:
203 """Send STOP command to given player."""
204 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
205 assert player_group is not None # for type checking
206 await player_group.set_stream("default")
207 if ma_stream := self.active_snap_ma_stream:
208 ma_stream.request_stop_stream()
209 return
210
211 self.poke_player_update()
212
213 async def volume_mute(self, muted: bool) -> None:
214 """Send MUTE command to given player."""
215 # Use optimistic server state for now
216 # not guaranteed that the client respects it
217 # TODO: move this to the snapcast python library
218 vol = self.snap_client._client["config"]["volume"]
219 vol["muted"] = muted
220 res = await self.snap_provider._snapserver.client_volume(self.snap_client.identifier, vol)
221 if res and "muted" in res:
222 self.snap_client._client["config"]["volume"] = res
223 self.snap_client.callback()
224
225 async def set_members(
226 self,
227 player_ids_to_add: list[str] | None = None,
228 player_ids_to_remove: list[str] | None = None,
229 ) -> None:
230 """Handle SET_MEMBERS command on the player."""
231 # get the group owned by this player (identified by the group name)
232 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
233
234 if player_group is None:
235 return
236
237 player_group.set_callback(None)
238
239 curr_ma_player_ids = [
240 ma_id
241 for cli_id in player_group.clients
242 if (ma_id := self.snap_provider._get_ma_id(cli_id))
243 ]
244
245 curr_stream_id = player_group.stream
246 sync_group_player: Player | None = None
247 if curr_ma_stream := self.snap_provider.get_snap_ma_stream(curr_stream_id):
248 media = curr_ma_stream.media
249 if media.media_type == MediaType.PLUGIN_SOURCE:
250 custom_data = media.custom_data or {}
251 assigned_player = custom_data.get("player_id", "")
252 if assigned_player.startswith(SGP_PREFIX):
253 sync_group_player = self.mass.players.get_player(assigned_player)
254 else:
255 media_src_id = media.source_id or ""
256 if media_src_id.startswith(SGP_PREFIX):
257 sync_group_player = self.mass.players.get_player(media_src_id)
258 if sync_group_player and self.player_id in (player_ids_to_remove or []):
259 # players in sync_group_player.group_members will be rejoined
260 # remove others first
261 for id_to_remove in player_ids_to_remove or []:
262 if id_to_remove == self.player_id:
263 continue
264 if (
265 id_to_remove in curr_ma_player_ids
266 and id_to_remove not in sync_group_player.group_members
267 ):
268 await self.snap_provider.isolate_player_to_dedicated_group(
269 id_to_remove, target_stream_id="default"
270 )
271
272 # split remaining group into individual groups,
273 # keeps the current stream, set this group to default stream
274 await self.snap_provider.isolate_player_to_dedicated_group(
275 target_player_id=self.player_id,
276 target_stream_id="default",
277 others_stream_id=curr_stream_id,
278 )
279 else:
280 for player_id in player_ids_to_remove or []:
281 if player_id not in curr_ma_player_ids:
282 continue
283 await self.snap_provider.isolate_player_to_dedicated_group(
284 player_id, target_stream_id="default"
285 )
286 curr_ma_player_ids.remove(player_id)
287
288 for ma_id in player_ids_to_add or []:
289 if (
290 snap_id := self.snap_provider._get_snapclient_id(ma_id)
291 ) and ma_id not in curr_ma_player_ids:
292 await player_group.add_client(snap_id)
293
294 # some caller require instant state updates before returning
295 async with self._state_update_lock:
296 if await self._process_snapcast_client_state():
297 self.update_state()
298
299 self.snap_provider._update_group_callbacks(poke=True)
300
301 async def play_media(self, media: PlayerMedia) -> None:
302 """Handle PLAY MEDIA on given player."""
303 if self.synced_to:
304 msg = "A synced player cannot receive play commands directly"
305 raise RuntimeError(msg)
306
307 ma_stream = await self.snap_provider.get_snapcast_media_stream(
308 media, filter_settings_owner=self.player_id
309 )
310
311 if ma_stream is None or ma_stream.stream_id is None:
312 return
313
314 self._snap_ma_stream = ma_stream
315
316 # e.g. DSP settings require a restart
317 await self._snap_ma_stream.start_stream(allow_restart=True)
318
319 # if no announcement is playing we activate the stream now, otherwise it
320 # will be activated by play_announcement when the announcement is over.
321 if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
322 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
323 assert player_group is not None # for type checking
324 await player_group.set_stream(ma_stream.stream_id)
325
326 self.poke_player_update()
327
328 async def play_announcement(
329 self, announcement: PlayerMedia, volume_level: int | None = None
330 ) -> None:
331 """Handle (provider native) playback of an announcement on given player."""
332 was_synced_to: str | None = self.synced_to
333 orig_volume_level: int | None = self.volume_level
334
335 prev_stream = self.active_snap_ma_stream
336
337 ma_stream = await self.snap_provider.get_snapcast_media_stream(
338 announcement, filter_settings_owner=self.player_id
339 )
340 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
341
342 if ma_stream is None or ma_stream.stream_id is None or player_group is None:
343 return
344
345 await player_group.set_stream(ma_stream.stream_id)
346
347 if self.snap_provider._use_builtin_server:
348 await asyncio.sleep(self.snap_provider._snapcast_server_buffer_size / 1000.0)
349
350 if volume_level is not None:
351 await self.volume_set(volume_level)
352
353 await ma_stream.start_stream()
354 await ma_stream.wait_for_stopped()
355
356 if self.volume_level == volume_level and orig_volume_level is not None:
357 await self.volume_set(orig_volume_level)
358
359 if was_synced_to:
360 if (
361 leader_group := await self.snap_provider.ensure_player_owned_group(was_synced_to)
362 ) is None:
363 return
364 await leader_group.add_client(self.snap_client.identifier)
365 else:
366 await player_group.set_stream(
367 prev_stream.stream_id
368 if prev_stream and prev_stream.stream_id is not None
369 else "default"
370 )
371
372 async def get_config_entries(
373 self,
374 action: str | None = None,
375 values: dict[str, ConfigValueType] | None = None,
376 ) -> list[ConfigEntry]:
377 """Player config."""
378 return [
379 CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
380 # we don't use the http server for streaming
381 CONF_ENTRY_HTTP_PROFILE_HIDDEN,
382 ]
383
384 def _handle_player_update(self, snap_client: SnapclientProto) -> None:
385 """Forward snap_client updates."""
386 self.poke_player_update()
387
388 def poke_player_update(self) -> None:
389 """Signal that a player state update should be processed."""
390 self._poke_evt.set()
391
392 async def _player_update_worker(self) -> None:
393 """Aggregate and process player state update requests."""
394 while True:
395 await self._poke_evt.wait()
396 self._poke_evt.clear()
397 while True:
398 call_update: bool = False
399 async with self._state_update_lock:
400 call_update = await self._process_snapcast_client_state()
401 if call_update:
402 self.update_state()
403 if self._poke_evt.is_set():
404 self._poke_evt.clear()
405 continue
406 break
407
408 async def _process_snapcast_client_state(self) -> bool:
409 """Process the latest Snapcast client state and apply changes to this player.
410
411 Returns:
412 True if changes were applied and a state update should be emitted via
413 ``update_state()``; False if no update is necessary (or if required data
414 is temporarily unavailable and the update should be retried later).
415 """
416 snap_group = self.snap_client.group
417 if snap_group is None:
418 # some data syncing error, a client is always a group member
419 # retry again later, don't call update now
420 return False
421
422 stream_id = snap_group.stream
423 snap_stream: SnapstreamProto | None = None
424 with suppress(KeyError):
425 snap_stream = self.snap_provider._snapserver.stream(stream_id)
426
427 members = list(snap_group.clients) # snapshot
428
429 curr_state: TrackedPlayerState = {
430 "_attr_name": self.snap_client.friendly_name,
431 "_attr_volume_level": self.snap_client.volume,
432 "_attr_volume_muted": self.snap_client.muted,
433 "_attr_available": self.snap_client.connected,
434 "connected": self.snap_client.connected,
435 "stream_id": snap_group.stream,
436 "stream_status": snap_stream.status if snap_stream is not None else None,
437 "grp_name": snap_group.name,
438 "grp_member_ids": members,
439 "grp_member_avail": [
440 pl.available
441 for cl_id in members
442 if (pl_id := self.snap_provider._get_ma_id(cl_id))
443 and (pl := self.mass.players.get_player(pl_id))
444 ],
445 }
446
447 prev_state: TrackedPlayerState = (
448 self._last_tracked_state if self._last_tracked_state is not None else {}
449 )
450 self._last_tracked_state = curr_state
451
452 # change detection for simple attrs
453 changed_attrs = {
454 k: v for k, v in curr_state.items() if k.startswith("_attr_") and prev_state.get(k) != v
455 }
456
457 prev_connected = prev_state.get("connected", False)
458 now_connected = curr_state.get("connected", False)
459 connection_changed = prev_connected != now_connected
460
461 prev_stream_id = prev_state.get("stream_id")
462 curr_stream_id = curr_state["stream_id"]
463 prev_stream_status = prev_state.get("stream_status")
464 curr_stream_status = curr_state.get("stream_status")
465
466 stream_changed = (
467 prev_stream_id != curr_stream_id or prev_stream_status != curr_stream_status
468 )
469
470 grouping_changed = any(
471 prev_state.get(k) != curr_state.get(k)
472 for k in ("grp_name", "grp_member_ids", "grp_member_avail")
473 )
474
475 needs_processing = bool(
476 changed_attrs or grouping_changed or stream_changed or connection_changed
477 )
478 if not needs_processing:
479 return False
480
481 if connection_changed or grouping_changed:
482 self.snap_provider.poke_group_members(snap_group)
483
484 # help cleaning up unused streams
485 if curr_stream_id == "default" or (
486 (my_stream := self._snap_ma_stream)
487 and my_stream.stream_id in {prev_stream_id, curr_stream_id}
488 ):
489 self.snap_provider.update_stream_usage()
490
491 # apply changed attrs
492 for key, value in changed_attrs.items():
493 setattr(self, key, value)
494
495 # finally notify state update once
496 return True
497
498 @property
499 def active_snap_ma_stream(self) -> SnapcastMAStream | None:
500 """Return the MA stream source of the active group."""
501 grp = self.snap_client.group
502 if grp is None or grp.stream is None:
503 return None
504
505 if grp.stream == "default":
506 return None
507
508 return self.snap_provider.get_snap_ma_stream(grp.stream)
509
510 @property
511 def snap_group_name(self) -> str:
512 """Return the name of the active group."""
513 snap_group = self.snap_client.group
514 if snap_group is None:
515 return ""
516 return snap_group.name
517
518 @property
519 def current_media(self) -> PlayerMedia | None:
520 """Return the current media being played by the player."""
521 if snap_ma_stream := self.active_snap_ma_stream:
522 return snap_ma_stream.media
523 return None
524
525 @property
526 def active_source(self) -> str | None:
527 """Return the (id of) the active source of the player."""
528 grp = self.snap_client.group
529 if grp is None or grp.stream is None:
530 return None
531
532 if grp.stream == "default":
533 return None
534
535 if ma_stream := self.snap_provider.get_snap_ma_stream(grp.stream):
536 return ma_stream.source_id
537
538 # external snapcast stream
539 return grp.stream or None
540
541 def _get_active_snapstream(self) -> SnapstreamProto | None:
542 """Get active stream for given player_id."""
543 if group := self.snap_client.group:
544 with suppress(KeyError):
545 return self.snap_provider._snapserver.stream(group.stream)
546 return None
547
548 def _get_player_ids_of_curr_group(self) -> list[str]:
549 snap_group = self.snap_client.group
550 if snap_group is None:
551 return []
552 return [
553 ma_id
554 for client_id in snap_group.clients
555 if (ma_id := self.snap_provider._get_ma_id(client_id))
556 ]
557
558 def _get_players_of_curr_group(self) -> list[Player]:
559 return [
560 ma_player
561 for ma_id in self._get_player_ids_of_curr_group()
562 if (ma_player := self.mass.players.get_player(ma_id))
563 ]
564