/
/
/
1"""Snapcast Player."""
2
3from __future__ import annotations
4
5import asyncio
6from contextlib import suppress
7from typing import TYPE_CHECKING, TypedDict, cast
8
9from music_assistant_models.enums import MediaType, PlaybackState, PlayerFeature
10from music_assistant_models.player import DeviceInfo, PlayerMedia
11from propcache import under_cached_property as cached_property
12
13from music_assistant.constants import (
14 ATTR_ANNOUNCEMENT_IN_PROGRESS,
15 CONF_ENTRY_HTTP_PROFILE_HIDDEN,
16 SYNCGROUP_PREFIX,
17)
18from music_assistant.models.player import Player
19from music_assistant.providers.snapcast.constants import CONF_ENTRY_SAMPLE_RATES_SNAPCAST
20from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
21
22if TYPE_CHECKING:
23 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
24
25 from music_assistant.providers.snapcast.provider import SnapCastProvider
26 from music_assistant.providers.snapcast.snap_cntrl_proto import (
27 SnapclientProto,
28 SnapstreamProto,
29 )
30
31
32class TrackedPlayerState(TypedDict, total=False):
33 """Tracked state for the Snapcast MA player.
34
35 It is used for change detection and state synchronization, and may be
36 partially populated depending on which information is
37 currently available.
38
39 Keys prefixed with ``_attr_`` are exposed as player attributes, while the
40 remaining keys represent internal Snapcast grouping and connection state.
41 """
42
43 # Player attribute fields
44 _attr_name: str
45 _attr_volume_level: float
46 _attr_volume_muted: bool
47 _attr_available: bool
48
49 # snapclient fields
50 connected: bool
51 stream_id: str
52 stream_status: str | None
53 grp_name: str
54 grp_member_ids: list[str]
55 grp_member_avail: list[bool]
56
57
58class SnapCastPlayer(Player):
59 """SnapCastPlayer."""
60
61 def __init__(
62 self,
63 provider: SnapCastProvider,
64 player_id: str,
65 snap_client: SnapclientProto,
66 ) -> None:
67 """Init."""
68 self.snap_client = snap_client
69 super().__init__(provider, player_id)
70
71 self._snap_ma_stream: SnapcastMAStream | None = None
72
73 self._update_worker: asyncio.Task[None] | None = None
74 self._poke_evt = asyncio.Event()
75 self._state_update_lock = asyncio.Lock()
76 self._last_tracked_state: TrackedPlayerState | None = None
77
78 @property
79 def snap_provider(self) -> SnapCastProvider:
80 """Return the Snapcast provider instance."""
81 return cast("SnapCastProvider", self.provider)
82
83 @property
84 def requires_flow_mode(self) -> bool:
85 """Return if the player requires flow mode."""
86 return True
87
88 @cached_property
89 def synced_to(self) -> str | None:
90 """Return the id of the player this player is synced to (sync leader)."""
91 grp_name = self.snap_group_name
92 if grp_name == self.player_id:
93 # is group leader
94 return None
95
96 grp_player_ids = self._get_player_ids_of_curr_group()
97 if len(grp_player_ids) < 2 or grp_name not in grp_player_ids:
98 return None
99
100 if leader_player := self.mass.players.get(grp_name):
101 return grp_name if leader_player.available else None
102
103 return None
104
105 @cached_property
106 def group_members(self) -> list[str]:
107 """Return the group members of the player."""
108 if not self._attr_available:
109 return []
110
111 grp_name = self.snap_group_name
112 if grp_name != self.player_id:
113 # only group leaders can have members
114 return []
115
116 player_ids = self._get_player_ids_of_curr_group()
117 if self.player_id not in player_ids:
118 # should not happen, unless the current
119 # state repr is invalid
120 return []
121
122 player_ids.remove(self.player_id)
123 connected = [
124 player_id
125 for player_id in player_ids
126 if (client := self.snap_provider.get_snap_client(player_id=player_id))
127 and client.connected
128 ]
129 if connected:
130 return [self.player_id, *connected]
131
132 return []
133
134 @property
135 def playback_state(self) -> PlaybackState:
136 """Return the current playback state of the player."""
137 snap_stream = self._get_active_snapstream()
138 if snap_stream is None:
139 return PlaybackState.IDLE
140
141 if snap_stream.identifier == "default" or snap_stream.status == "idle":
142 return PlaybackState.IDLE
143
144 return PlaybackState.PLAYING
145
146 @property
147 def elapsed_time(self) -> float | None:
148 """Return the elapsed time in (fractional) seconds of the current track (if any)."""
149 # using flow-mode, elapsed time will be estimated upstream from 'elapsed_time_last_updated'
150 return 0 if self.active_snap_ma_stream else None
151
152 @property
153 def elapsed_time_last_updated(self) -> float | None:
154 """
155 Return when the elapsed time was last updated.
156
157 return: The (UTC) timestamp when the elapsed time was last updated,
158 or None if it was never updated (or unknown).
159 """
160 # we only update on playback starts
161 if snap_ma_stream := self.active_snap_ma_stream:
162 return snap_ma_stream.playback_started_at
163 return None
164
165 def setup(self) -> None:
166 """Set up player."""
167 self._attr_name = self.snap_client.friendly_name
168 self._attr_available = self.snap_client.connected
169
170 host_dict = self.snap_client._client.get("host", {})
171 os, arch, ip = (host_dict.get(key, "") for key in ["os", "arch", "ip"])
172 self._attr_device_info = DeviceInfo(
173 model=os,
174 manufacturer=arch,
175 )
176 self._attr_device_info.ip_address = ip
177 self._attr_supported_features = {
178 PlayerFeature.SET_MEMBERS,
179 PlayerFeature.VOLUME_SET,
180 PlayerFeature.VOLUME_MUTE,
181 PlayerFeature.PLAY_ANNOUNCEMENT,
182 }
183 self._attr_can_group_with = {self.snap_provider.instance_id}
184 if not self._update_worker:
185 self._update_worker = self.mass.create_task(self._player_update_worker)
186
187 async def volume_set(self, volume_level: int) -> None:
188 """Send VOLUME_SET command to given player."""
189 # Use optimistic server state for now
190 # not guaranteed that the client respects it
191 await self.snap_client.set_volume(volume_level)
192
193 async def stop(self) -> None:
194 """Send STOP command to given player."""
195 if ma_stream := self.active_snap_ma_stream:
196 ma_stream.request_stop_stream()
197 return
198
199 self.poke_player_update()
200
201 async def volume_mute(self, muted: bool) -> None:
202 """Send MUTE command to given player."""
203 # Use optimistic server state for now
204 # not guaranteed that the client respects it
205 # TODO: move this to the snapcast python library
206 vol = self.snap_client._client["config"]["volume"]
207 vol["muted"] = muted
208 res = await self.snap_provider._snapserver.client_volume(self.snap_client.identifier, vol)
209 if res and "muted" in res:
210 self.snap_client._client["config"]["volume"] = res
211 self.snap_client.callback()
212
213 async def set_members(
214 self,
215 player_ids_to_add: list[str] | None = None,
216 player_ids_to_remove: list[str] | None = None,
217 ) -> None:
218 """Handle SET_MEMBERS command on the player."""
219 # get the group owned by this player (identified by the group name)
220 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
221
222 if player_group is None:
223 return
224
225 player_group.set_callback(None)
226
227 curr_ma_player_ids = [
228 ma_id
229 for cli_id in player_group.clients
230 if (ma_id := self.snap_provider._get_ma_id(cli_id))
231 ]
232
233 curr_stream_id = player_group.stream
234 sync_group_player = None
235 if curr_ma_stream := self.snap_provider.get_snap_ma_stream(curr_stream_id):
236 media = curr_ma_stream.media
237 if media.media_type == MediaType.PLUGIN_SOURCE:
238 custom_data = media.custom_data or {}
239 assigned_player = custom_data.get("player_id", "")
240 if assigned_player.startswith(SYNCGROUP_PREFIX):
241 sync_group_player = self.mass.players.get(assigned_player)
242 else:
243 media_src_id = media.source_id or ""
244 if media_src_id.startswith(SYNCGROUP_PREFIX):
245 sync_group_player = self.mass.players.get(media_src_id)
246
247 if sync_group_player and self.player_id in (player_ids_to_remove or []):
248 # players in sync_group_player.group_members will be rejoined
249 # remove others first
250 for id_to_remove in player_ids_to_remove or []:
251 if id_to_remove == self.player_id:
252 continue
253 if (
254 id_to_remove in curr_ma_player_ids
255 and id_to_remove not in sync_group_player.group_members
256 ):
257 await self.snap_provider.isolate_player_to_dedicated_group(
258 id_to_remove, target_stream_id="default"
259 )
260
261 # split remaining group into individual groups,
262 # keeps the current stream, set this group to default stream
263 await self.snap_provider.isolate_player_to_dedicated_group(
264 target_player_id=self.player_id,
265 target_stream_id="default",
266 others_stream_id=curr_stream_id,
267 )
268 else:
269 for player_id in player_ids_to_remove or []:
270 if player_id not in curr_ma_player_ids:
271 continue
272 await self.snap_provider.isolate_player_to_dedicated_group(
273 player_id, target_stream_id="default"
274 )
275 curr_ma_player_ids.remove(player_id)
276
277 for ma_id in player_ids_to_add or []:
278 if (
279 snap_id := self.snap_provider._get_snapclient_id(ma_id)
280 ) and ma_id not in curr_ma_player_ids:
281 await player_group.add_client(snap_id)
282
283 # some caller require instant state updates before returning
284 async with self._state_update_lock:
285 if await self._process_snapcast_client_state():
286 self.update_state()
287
288 self.snap_provider._update_group_callbacks(poke=True)
289
290 async def play_media(self, media: PlayerMedia) -> None:
291 """Handle PLAY MEDIA on given player."""
292 if self.synced_to:
293 msg = "A synced player cannot receive play commands directly"
294 raise RuntimeError(msg)
295
296 ma_stream = await self.snap_provider.get_snapcast_media_stream(
297 media, filter_settings_owner=self.player_id
298 )
299
300 if ma_stream is None or ma_stream.stream_id is None:
301 return
302
303 self._snap_ma_stream = ma_stream
304
305 # e.g. DSP settings require a restart
306 await self._snap_ma_stream.start_stream(allow_restart=True)
307
308 # if no announcement is playing we activate the stream now, otherwise it
309 # will be activated by play_announcement when the announcement is over.
310 if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
311 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
312 assert player_group is not None # for type checking
313 await player_group.set_stream(ma_stream.stream_id)
314
315 self.poke_player_update()
316
317 async def play_announcement(
318 self, announcement: PlayerMedia, volume_level: int | None = None
319 ) -> None:
320 """Handle (provider native) playback of an announcement on given player."""
321 was_synced_to: str | None = self.synced_to
322 orig_volume_level: int | None = self.volume_level
323
324 prev_stream = self.active_snap_ma_stream
325
326 ma_stream = await self.snap_provider.get_snapcast_media_stream(
327 announcement, filter_settings_owner=self.player_id
328 )
329 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
330
331 if ma_stream is None or ma_stream.stream_id is None or player_group is None:
332 return
333
334 await player_group.set_stream(ma_stream.stream_id)
335
336 if self.snap_provider._use_builtin_server:
337 await asyncio.sleep(self.snap_provider._snapcast_server_buffer_size / 1000.0)
338
339 if volume_level is not None:
340 await self.volume_set(volume_level)
341
342 await ma_stream.start_stream()
343 await ma_stream.wait_for_stopped()
344
345 if self.volume_level == volume_level and orig_volume_level is not None:
346 await self.volume_set(orig_volume_level)
347
348 if was_synced_to:
349 if (
350 leader_group := await self.snap_provider.ensure_player_owned_group(was_synced_to)
351 ) is None:
352 return
353 await leader_group.add_client(self.snap_client.identifier)
354 else:
355 await player_group.set_stream(
356 prev_stream.stream_id
357 if prev_stream and prev_stream.stream_id is not None
358 else "default"
359 )
360
361 async def get_config_entries(
362 self,
363 action: str | None = None,
364 values: dict[str, ConfigValueType] | None = None,
365 ) -> list[ConfigEntry]:
366 """Player config."""
367 return [
368 CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
369 # we don't use the http server for streaming
370 CONF_ENTRY_HTTP_PROFILE_HIDDEN,
371 ]
372
373 def _handle_player_update(self, snap_client: SnapclientProto) -> None:
374 """Forward snap_client updates."""
375 self.poke_player_update()
376
377 def poke_player_update(self) -> None:
378 """Signal that a player state update should be processed."""
379 self._poke_evt.set()
380
381 async def _player_update_worker(self) -> None:
382 """Aggregate and process player state update requests."""
383 while True:
384 await self._poke_evt.wait()
385 self._poke_evt.clear()
386 while True:
387 call_update: bool = False
388 async with self._state_update_lock:
389 call_update = await self._process_snapcast_client_state()
390 if call_update:
391 self.update_state()
392 if self._poke_evt.is_set():
393 self._poke_evt.clear()
394 continue
395 break
396
397 async def _process_snapcast_client_state(self) -> bool:
398 """Process the latest Snapcast client state and apply changes to this player.
399
400 Returns:
401 True if changes were applied and a state update should be emitted via
402 ``update_state()``; False if no update is necessary (or if required data
403 is temporarily unavailable and the update should be retried later).
404 """
405 snap_group = self.snap_client.group
406 if snap_group is None:
407 # some data syncing error, a client is always a group member
408 # retry again later, don't call update now
409 return False
410
411 stream_id = snap_group.stream
412 snap_stream: SnapstreamProto | None = None
413 with suppress(KeyError):
414 snap_stream = self.snap_provider._snapserver.stream(stream_id)
415
416 members = list(snap_group.clients) # snapshot
417
418 curr_state: TrackedPlayerState = {
419 "_attr_name": self.snap_client.friendly_name,
420 "_attr_volume_level": self.snap_client.volume,
421 "_attr_volume_muted": self.snap_client.muted,
422 "_attr_available": self.snap_client.connected,
423 "connected": self.snap_client.connected,
424 "stream_id": snap_group.stream,
425 "stream_status": snap_stream.status if snap_stream is not None else None,
426 "grp_name": snap_group.name,
427 "grp_member_ids": members,
428 "grp_member_avail": [
429 pl.available
430 for cl_id in members
431 if (pl_id := self.snap_provider._get_ma_id(cl_id))
432 and (pl := self.mass.players.get(pl_id))
433 ],
434 }
435
436 prev_state: TrackedPlayerState = (
437 self._last_tracked_state if self._last_tracked_state is not None else {}
438 )
439 self._last_tracked_state = curr_state
440
441 # change detection for simple attrs
442 changed_attrs = {
443 k: v for k, v in curr_state.items() if k.startswith("_attr_") and prev_state.get(k) != v
444 }
445
446 prev_connected = prev_state.get("connected", False)
447 now_connected = curr_state.get("connected", False)
448 connection_changed = prev_connected != now_connected
449
450 prev_stream_id = prev_state.get("stream_id")
451 curr_stream_id = curr_state["stream_id"]
452 prev_stream_status = prev_state.get("stream_status")
453 curr_stream_status = curr_state.get("stream_status")
454
455 stream_changed = (
456 prev_stream_id != curr_stream_id or prev_stream_status != curr_stream_status
457 )
458
459 grouping_changed = any(
460 prev_state.get(k) != curr_state.get(k)
461 for k in ("grp_name", "grp_member_ids", "grp_member_avail")
462 )
463
464 needs_processing = bool(
465 changed_attrs or grouping_changed or stream_changed or connection_changed
466 )
467 if not needs_processing:
468 return False
469
470 if connection_changed or grouping_changed:
471 self.snap_provider.poke_group_members(snap_group)
472
473 # help cleaning up unused streams
474 if curr_stream_id == "default" or (
475 (my_stream := self._snap_ma_stream)
476 and my_stream.stream_id in {prev_stream_id, curr_stream_id}
477 ):
478 self.snap_provider.update_stream_usage()
479
480 # apply changed attrs
481 for key, value in changed_attrs.items():
482 setattr(self, key, value)
483
484 # finally notify state update once
485 return True
486
487 @property
488 def active_snap_ma_stream(self) -> SnapcastMAStream | None:
489 """Return the MA stream source of the active group."""
490 grp = self.snap_client.group
491 if grp is None or grp.stream is None:
492 return None
493
494 if grp.stream == "default":
495 return None
496
497 return self.snap_provider.get_snap_ma_stream(grp.stream)
498
499 @property
500 def snap_group_name(self) -> str:
501 """Return the name of the active group."""
502 snap_group = self.snap_client.group
503 if snap_group is None:
504 return ""
505 return snap_group.name
506
507 @cached_property
508 def _current_media(self) -> PlayerMedia | None:
509 """
510 Return the current media being played by the player.
511
512 Note that this is NOT the final current media of the player,
513 as it may be overridden by a active group/sync membership.
514 Hence it's marked as a private property.
515 The final current media can be retrieved by using the 'current_media' property.
516 """
517 if snap_ma_stream := self.active_snap_ma_stream:
518 return snap_ma_stream.media
519 return None
520
521 @property
522 def _active_source(self) -> str | None:
523 """
524 Return the (id of) the active source of the player.
525
526 Only required if the player supports PlayerFeature.SELECT_SOURCE.
527
528 Set to None if the player is not currently playing a source or
529 the player_id if the player is currently playing a MA queue.
530
531 Note that this is NOT the final active source of the player,
532 as it may be overridden by a active group/sync membership.
533 Hence it's marked as a private property.
534 The final active source can be retrieved by using the 'active_source' property.
535 """
536 grp = self.snap_client.group
537 if grp is None or grp.stream is None:
538 return None
539
540 if grp.stream == "default":
541 return None
542
543 if ma_stream := self.snap_provider.get_snap_ma_stream(grp.stream):
544 return ma_stream.source_id
545
546 # external snapcast stream
547 return grp.stream or None
548
549 def _get_active_snapstream(self) -> SnapstreamProto | None:
550 """Get active stream for given player_id."""
551 if group := self.snap_client.group:
552 with suppress(KeyError):
553 return self.snap_provider._snapserver.stream(group.stream)
554 return None
555
556 def _get_player_ids_of_curr_group(self) -> list[str]:
557 snap_group = self.snap_client.group
558 if snap_group is None:
559 return []
560 return [
561 ma_id
562 for client_id in snap_group.clients
563 if (ma_id := self.snap_provider._get_ma_id(client_id))
564 ]
565
566 def _get_players_of_curr_group(self) -> list[Player]:
567 return [
568 ma_player
569 for ma_id in self._get_player_ids_of_curr_group()
570 if (ma_player := self.mass.players.get(ma_id))
571 ]
572