/
/
/
1"""Snapcast Player."""
2
3from __future__ import annotations
4
5import asyncio
6from contextlib import suppress
7from typing import TYPE_CHECKING, TypedDict, cast
8
9from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
10from music_assistant_models.enums import (
11 IdentifierType,
12 MediaType,
13 PlaybackState,
14 PlayerFeature,
15 PlayerType,
16)
17from music_assistant_models.player import DeviceInfo, PlayerMedia
18from propcache import under_cached_property as cached_property
19
20from music_assistant.constants import ATTR_ANNOUNCEMENT_IN_PROGRESS, CONF_ENTRY_HTTP_PROFILE_HIDDEN
21from music_assistant.models.player import Player
22from music_assistant.providers.snapcast.constants import CONF_ENTRY_SAMPLE_RATES_SNAPCAST
23from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
24from music_assistant.providers.sync_group.constants import SGP_PREFIX
25
26if TYPE_CHECKING:
27 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
28
29 from music_assistant.providers.snapcast.provider import SnapCastProvider
30 from music_assistant.providers.snapcast.snap_cntrl_proto import SnapclientProto, SnapstreamProto
31
32
33class TrackedPlayerState(TypedDict, total=False):
34 """Tracked state for the Snapcast MA player.
35
36 It is used for change detection and state synchronization, and may be
37 partially populated depending on which information is
38 currently available.
39
40 Keys prefixed with ``_attr_`` are exposed as player attributes, while the
41 remaining keys represent internal Snapcast grouping and connection state.
42 """
43
44 # Player attribute fields
45 _attr_name: str
46 _attr_volume_level: float
47 _attr_volume_muted: bool
48 _attr_available: bool
49
50 # snapclient fields
51 connected: bool
52 stream_id: str
53 stream_status: str | None
54 grp_name: str
55 grp_member_ids: list[str]
56 grp_member_avail: list[bool]
57
58
59class SnapCastPlayer(Player):
60 """SnapCastPlayer."""
61
62 _attr_type = PlayerType.PROTOCOL
63
64 def __init__(
65 self,
66 provider: SnapCastProvider,
67 player_id: str,
68 snap_client: SnapclientProto,
69 ) -> None:
70 """Init."""
71 self.snap_client = snap_client
72 super().__init__(provider, player_id)
73
74 self._snap_ma_stream: SnapcastMAStream | None = None
75
76 self._update_worker: asyncio.Task[None] | None = None
77 self._poke_evt = asyncio.Event()
78 self._state_update_lock = asyncio.Lock()
79 self._last_tracked_state: TrackedPlayerState | None = None
80
81 @property
82 def snap_provider(self) -> SnapCastProvider:
83 """Return the Snapcast provider instance."""
84 return cast("SnapCastProvider", self.provider)
85
86 @property
87 def requires_flow_mode(self) -> bool:
88 """Return if the player requires flow mode."""
89 return True
90
91 @cached_property
92 def synced_to(self) -> str | None:
93 """Return the id of the player this player is synced to (sync leader)."""
94 grp_name = self.snap_group_name
95 if grp_name == self.player_id:
96 # is group leader
97 return None
98
99 grp_player_ids = self._get_player_ids_of_curr_group()
100 if len(grp_player_ids) < 2 or grp_name not in grp_player_ids:
101 return None
102
103 if leader_player := self.mass.players.get_player(grp_name):
104 return grp_name if leader_player.available else None
105
106 return None
107
108 @cached_property
109 def group_members(self) -> list[str]:
110 """Return the group members of the player."""
111 if not self._attr_available:
112 return []
113
114 grp_name = self.snap_group_name
115 if grp_name != self.player_id:
116 # only group leaders can have members
117 return []
118
119 player_ids = self._get_player_ids_of_curr_group()
120 if self.player_id not in player_ids:
121 # should not happen, unless the current
122 # state repr is invalid
123 return []
124
125 player_ids.remove(self.player_id)
126 connected = [
127 player_id
128 for player_id in player_ids
129 if (client := self.snap_provider.get_snap_client(player_id=player_id))
130 and client.connected
131 ]
132 if connected:
133 return [self.player_id, *connected]
134
135 return []
136
137 @property
138 def playback_state(self) -> PlaybackState:
139 """Return the current playback state of the player."""
140 snap_stream = self._get_active_snapstream()
141 if snap_stream is None:
142 return PlaybackState.IDLE
143
144 if snap_stream.identifier == "default" or snap_stream.status == "idle":
145 return PlaybackState.IDLE
146
147 return PlaybackState.PLAYING
148
149 @property
150 def elapsed_time(self) -> float | None:
151 """Return the elapsed time in (fractional) seconds of the current track (if any)."""
152 # using flow-mode, elapsed time will be estimated upstream from 'elapsed_time_last_updated'
153 return 0 if self.active_snap_ma_stream else None
154
155 @property
156 def elapsed_time_last_updated(self) -> float | None:
157 """
158 Return when the elapsed time was last updated.
159
160 return: The (UTC) timestamp when the elapsed time was last updated,
161 or None if it was never updated (or unknown).
162 """
163 # we only update on playback starts
164 if snap_ma_stream := self.active_snap_ma_stream:
165 return snap_ma_stream.playback_started_at
166 return None
167
168 def setup(self) -> None:
169 """Set up player."""
170 self._attr_name = self.snap_client.friendly_name
171 self._attr_available = self.snap_client.connected
172
173 host_dict = self.snap_client._client.get("host", {})
174 os, arch, ip, mac = (host_dict.get(key, "") for key in ["os", "arch", "ip", "mac"])
175 self._attr_device_info = DeviceInfo(
176 model=os,
177 manufacturer=arch,
178 )
179 if ip and (host := self.snap_client._client.get("host")):
180 self._attr_device_info.add_identifier(IdentifierType.IP_ADDRESS, host.get("ip"))
181 if mac:
182 self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac)
183 self._attr_supported_features = {
184 PlayerFeature.PLAY_MEDIA,
185 PlayerFeature.SET_MEMBERS,
186 PlayerFeature.VOLUME_SET,
187 PlayerFeature.VOLUME_MUTE,
188 PlayerFeature.PLAY_ANNOUNCEMENT,
189 }
190 self._attr_can_group_with = {self.snap_provider.instance_id}
191 if not self._update_worker:
192 self._update_worker = self.mass.create_task(self._player_update_worker)
193
194 async def volume_set(self, volume_level: int) -> None:
195 """Send VOLUME_SET command to given player."""
196 # Use optimistic server state for now
197 # not guaranteed that the client respects it
198 await self.snap_client.set_volume(volume_level)
199
200 async def stop(self) -> None:
201 """Send STOP command to given player."""
202 if ma_stream := self.active_snap_ma_stream:
203 ma_stream.request_stop_stream()
204 return
205
206 self.poke_player_update()
207
208 async def volume_mute(self, muted: bool) -> None:
209 """Send MUTE command to given player."""
210 # Use optimistic server state for now
211 # not guaranteed that the client respects it
212 # TODO: move this to the snapcast python library
213 vol = self.snap_client._client["config"]["volume"]
214 vol["muted"] = muted
215 res = await self.snap_provider._snapserver.client_volume(self.snap_client.identifier, vol)
216 if res and "muted" in res:
217 self.snap_client._client["config"]["volume"] = res
218 self.snap_client.callback()
219
220 async def set_members(
221 self,
222 player_ids_to_add: list[str] | None = None,
223 player_ids_to_remove: list[str] | None = None,
224 ) -> None:
225 """Handle SET_MEMBERS command on the player."""
226 # get the group owned by this player (identified by the group name)
227 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
228
229 if player_group is None:
230 return
231
232 player_group.set_callback(None)
233
234 curr_ma_player_ids = [
235 ma_id
236 for cli_id in player_group.clients
237 if (ma_id := self.snap_provider._get_ma_id(cli_id))
238 ]
239
240 curr_stream_id = player_group.stream
241 if curr_ma_stream := self.snap_provider.get_snap_ma_stream(curr_stream_id):
242 media = curr_ma_stream.media
243 if media.media_type == MediaType.PLUGIN_SOURCE:
244 custom_data = media.custom_data or {}
245 assigned_player = custom_data.get("player_id", "")
246 if assigned_player.startswith(SGP_PREFIX):
247 sync_group_player = self.mass.players.get_player(assigned_player)
248 else:
249 media_src_id = media.source_id or ""
250 if media_src_id.startswith(SGP_PREFIX):
251 sync_group_player = self.mass.players.get_player(media_src_id)
252 if sync_group_player and self.player_id in (player_ids_to_remove or []):
253 # players in sync_group_player.group_members will be rejoined
254 # remove others first
255 for id_to_remove in player_ids_to_remove or []:
256 if id_to_remove == self.player_id:
257 continue
258 if (
259 id_to_remove in curr_ma_player_ids
260 and id_to_remove not in sync_group_player.group_members
261 ):
262 await self.snap_provider.isolate_player_to_dedicated_group(
263 id_to_remove, target_stream_id="default"
264 )
265
266 # split remaining group into individual groups,
267 # keeps the current stream, set this group to default stream
268 await self.snap_provider.isolate_player_to_dedicated_group(
269 target_player_id=self.player_id,
270 target_stream_id="default",
271 others_stream_id=curr_stream_id,
272 )
273 else:
274 for player_id in player_ids_to_remove or []:
275 if player_id not in curr_ma_player_ids:
276 continue
277 await self.snap_provider.isolate_player_to_dedicated_group(
278 player_id, target_stream_id="default"
279 )
280 curr_ma_player_ids.remove(player_id)
281
282 for ma_id in player_ids_to_add or []:
283 if (
284 snap_id := self.snap_provider._get_snapclient_id(ma_id)
285 ) and ma_id not in curr_ma_player_ids:
286 await player_group.add_client(snap_id)
287
288 # some caller require instant state updates before returning
289 async with self._state_update_lock:
290 if await self._process_snapcast_client_state():
291 self.update_state()
292
293 self.snap_provider._update_group_callbacks(poke=True)
294
295 async def play_media(self, media: PlayerMedia) -> None:
296 """Handle PLAY MEDIA on given player."""
297 if self.synced_to:
298 msg = "A synced player cannot receive play commands directly"
299 raise RuntimeError(msg)
300
301 ma_stream = await self.snap_provider.get_snapcast_media_stream(
302 media, filter_settings_owner=self.player_id
303 )
304
305 if ma_stream is None or ma_stream.stream_id is None:
306 return
307
308 self._snap_ma_stream = ma_stream
309
310 # e.g. DSP settings require a restart
311 await self._snap_ma_stream.start_stream(allow_restart=True)
312
313 # if no announcement is playing we activate the stream now, otherwise it
314 # will be activated by play_announcement when the announcement is over.
315 if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
316 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
317 assert player_group is not None # for type checking
318 await player_group.set_stream(ma_stream.stream_id)
319
320 self.poke_player_update()
321
322 async def play_announcement(
323 self, announcement: PlayerMedia, volume_level: int | None = None
324 ) -> None:
325 """Handle (provider native) playback of an announcement on given player."""
326 was_synced_to: str | None = self.synced_to
327 orig_volume_level: int | None = self.volume_level
328
329 prev_stream = self.active_snap_ma_stream
330
331 ma_stream = await self.snap_provider.get_snapcast_media_stream(
332 announcement, filter_settings_owner=self.player_id
333 )
334 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
335
336 if ma_stream is None or ma_stream.stream_id is None or player_group is None:
337 return
338
339 await player_group.set_stream(ma_stream.stream_id)
340
341 if self.snap_provider._use_builtin_server:
342 await asyncio.sleep(self.snap_provider._snapcast_server_buffer_size / 1000.0)
343
344 if volume_level is not None:
345 await self.volume_set(volume_level)
346
347 await ma_stream.start_stream()
348 await ma_stream.wait_for_stopped()
349
350 if self.volume_level == volume_level and orig_volume_level is not None:
351 await self.volume_set(orig_volume_level)
352
353 if was_synced_to:
354 if (
355 leader_group := await self.snap_provider.ensure_player_owned_group(was_synced_to)
356 ) is None:
357 return
358 await leader_group.add_client(self.snap_client.identifier)
359 else:
360 await player_group.set_stream(
361 prev_stream.stream_id
362 if prev_stream and prev_stream.stream_id is not None
363 else "default"
364 )
365
366 async def get_config_entries(
367 self,
368 action: str | None = None,
369 values: dict[str, ConfigValueType] | None = None,
370 ) -> list[ConfigEntry]:
371 """Player config."""
372 return [
373 CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
374 # we don't use the http server for streaming
375 CONF_ENTRY_HTTP_PROFILE_HIDDEN,
376 ]
377
378 def _handle_player_update(self, snap_client: SnapclientProto) -> None:
379 """Forward snap_client updates."""
380 self.poke_player_update()
381
382 def poke_player_update(self) -> None:
383 """Signal that a player state update should be processed."""
384 self._poke_evt.set()
385
386 async def _player_update_worker(self) -> None:
387 """Aggregate and process player state update requests."""
388 while True:
389 await self._poke_evt.wait()
390 self._poke_evt.clear()
391 while True:
392 call_update: bool = False
393 async with self._state_update_lock:
394 call_update = await self._process_snapcast_client_state()
395 if call_update:
396 self.update_state()
397 if self._poke_evt.is_set():
398 self._poke_evt.clear()
399 continue
400 break
401
402 async def _process_snapcast_client_state(self) -> bool:
403 """Process the latest Snapcast client state and apply changes to this player.
404
405 Returns:
406 True if changes were applied and a state update should be emitted via
407 ``update_state()``; False if no update is necessary (or if required data
408 is temporarily unavailable and the update should be retried later).
409 """
410 snap_group = self.snap_client.group
411 if snap_group is None:
412 # some data syncing error, a client is always a group member
413 # retry again later, don't call update now
414 return False
415
416 stream_id = snap_group.stream
417 snap_stream: SnapstreamProto | None = None
418 with suppress(KeyError):
419 snap_stream = self.snap_provider._snapserver.stream(stream_id)
420
421 members = list(snap_group.clients) # snapshot
422
423 curr_state: TrackedPlayerState = {
424 "_attr_name": self.snap_client.friendly_name,
425 "_attr_volume_level": self.snap_client.volume,
426 "_attr_volume_muted": self.snap_client.muted,
427 "_attr_available": self.snap_client.connected,
428 "connected": self.snap_client.connected,
429 "stream_id": snap_group.stream,
430 "stream_status": snap_stream.status if snap_stream is not None else None,
431 "grp_name": snap_group.name,
432 "grp_member_ids": members,
433 "grp_member_avail": [
434 pl.available
435 for cl_id in members
436 if (pl_id := self.snap_provider._get_ma_id(cl_id))
437 and (pl := self.mass.players.get_player(pl_id))
438 ],
439 }
440
441 prev_state: TrackedPlayerState = (
442 self._last_tracked_state if self._last_tracked_state is not None else {}
443 )
444 self._last_tracked_state = curr_state
445
446 # change detection for simple attrs
447 changed_attrs = {
448 k: v for k, v in curr_state.items() if k.startswith("_attr_") and prev_state.get(k) != v
449 }
450
451 prev_connected = prev_state.get("connected", False)
452 now_connected = curr_state.get("connected", False)
453 connection_changed = prev_connected != now_connected
454
455 prev_stream_id = prev_state.get("stream_id")
456 curr_stream_id = curr_state["stream_id"]
457 prev_stream_status = prev_state.get("stream_status")
458 curr_stream_status = curr_state.get("stream_status")
459
460 stream_changed = (
461 prev_stream_id != curr_stream_id or prev_stream_status != curr_stream_status
462 )
463
464 grouping_changed = any(
465 prev_state.get(k) != curr_state.get(k)
466 for k in ("grp_name", "grp_member_ids", "grp_member_avail")
467 )
468
469 needs_processing = bool(
470 changed_attrs or grouping_changed or stream_changed or connection_changed
471 )
472 if not needs_processing:
473 return False
474
475 if connection_changed or grouping_changed:
476 self.snap_provider.poke_group_members(snap_group)
477
478 # help cleaning up unused streams
479 if curr_stream_id == "default" or (
480 (my_stream := self._snap_ma_stream)
481 and my_stream.stream_id in {prev_stream_id, curr_stream_id}
482 ):
483 self.snap_provider.update_stream_usage()
484
485 # apply changed attrs
486 for key, value in changed_attrs.items():
487 setattr(self, key, value)
488
489 # finally notify state update once
490 return True
491
492 @property
493 def active_snap_ma_stream(self) -> SnapcastMAStream | None:
494 """Return the MA stream source of the active group."""
495 grp = self.snap_client.group
496 if grp is None or grp.stream is None:
497 return None
498
499 if grp.stream == "default":
500 return None
501
502 return self.snap_provider.get_snap_ma_stream(grp.stream)
503
504 @property
505 def snap_group_name(self) -> str:
506 """Return the name of the active group."""
507 snap_group = self.snap_client.group
508 if snap_group is None:
509 return ""
510 return snap_group.name
511
512 @property
513 def current_media(self) -> PlayerMedia | None:
514 """Return the current media being played by the player."""
515 if snap_ma_stream := self.active_snap_ma_stream:
516 return snap_ma_stream.media
517 return None
518
519 @property
520 def active_source(self) -> str | None:
521 """Return the (id of) the active source of the player."""
522 grp = self.snap_client.group
523 if grp is None or grp.stream is None:
524 return None
525
526 if grp.stream == "default":
527 return None
528
529 if ma_stream := self.snap_provider.get_snap_ma_stream(grp.stream):
530 return ma_stream.source_id
531
532 # external snapcast stream
533 return grp.stream or None
534
535 def _get_active_snapstream(self) -> SnapstreamProto | None:
536 """Get active stream for given player_id."""
537 if group := self.snap_client.group:
538 with suppress(KeyError):
539 return self.snap_provider._snapserver.stream(group.stream)
540 return None
541
542 def _get_player_ids_of_curr_group(self) -> list[str]:
543 snap_group = self.snap_client.group
544 if snap_group is None:
545 return []
546 return [
547 ma_id
548 for client_id in snap_group.clients
549 if (ma_id := self.snap_provider._get_ma_id(client_id))
550 ]
551
552 def _get_players_of_curr_group(self) -> list[Player]:
553 return [
554 ma_player
555 for ma_id in self._get_player_ids_of_curr_group()
556 if (ma_player := self.mass.players.get_player(ma_id))
557 ]
558