/
/
/
1"""Snapcast Player."""
2
3from __future__ import annotations
4
5import asyncio
6from contextlib import suppress
7from typing import TYPE_CHECKING, TypedDict, cast
8
9from music_assistant_models.enums import MediaType, PlaybackState, PlayerFeature
10from music_assistant_models.player import DeviceInfo, PlayerMedia
11from propcache import under_cached_property as cached_property
12
13from music_assistant.constants import (
14 ATTR_ANNOUNCEMENT_IN_PROGRESS,
15 CONF_ENTRY_HTTP_PROFILE_HIDDEN,
16 SYNCGROUP_PREFIX,
17)
18from music_assistant.models.player import Player
19from music_assistant.providers.snapcast.constants import CONF_ENTRY_SAMPLE_RATES_SNAPCAST
20from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
21
22if TYPE_CHECKING:
23 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
24
25 from music_assistant.providers.snapcast.provider import SnapCastProvider
26 from music_assistant.providers.snapcast.snap_cntrl_proto import (
27 SnapclientProto,
28 SnapstreamProto,
29 )
30
31
32class TrackedPlayerState(TypedDict, total=False):
33 """Tracked state for the Snapcast MA player.
34
35 It is used for change detection and state synchronization, and may be
36 partially populated depending on which information is
37 currently available.
38
39 Keys prefixed with ``_attr_`` are exposed as player attributes, while the
40 remaining keys represent internal Snapcast grouping and connection state.
41 """
42
43 # Player attribute fields
44 _attr_name: str
45 _attr_volume_level: float
46 _attr_volume_muted: bool
47 _attr_available: bool
48
49 # snapclient fields
50 connected: bool
51 stream_id: str
52 stream_status: str | None
53 grp_name: str
54 grp_member_ids: list[str]
55 grp_member_avail: list[bool]
56
57
58class SnapCastPlayer(Player):
59 """SnapCastPlayer."""
60
61 def __init__(
62 self,
63 provider: SnapCastProvider,
64 player_id: str,
65 snap_client: SnapclientProto,
66 ) -> None:
67 """Init."""
68 self.snap_client = snap_client
69 super().__init__(provider, player_id)
70
71 self._snap_ma_stream: SnapcastMAStream | None = None
72
73 self._update_worker: asyncio.Task[None] | None = None
74 self._poke_evt = asyncio.Event()
75 self._state_update_lock = asyncio.Lock()
76 self._last_tracked_state: TrackedPlayerState | None = None
77
78 @property
79 def snap_provider(self) -> SnapCastProvider:
80 """Return the Snapcast provider instance."""
81 return cast("SnapCastProvider", self.provider)
82
83 @property
84 def requires_flow_mode(self) -> bool:
85 """Return if the player requires flow mode."""
86 return True
87
88 @cached_property
89 def synced_to(self) -> str | None:
90 """Return the id of the player this player is synced to (sync leader)."""
91 grp_name = self.snap_group_name
92 if grp_name == self.player_id:
93 # is group leader
94 return None
95
96 grp_player_ids = self._get_player_ids_of_curr_group()
97 if len(grp_player_ids) < 2 or grp_name not in grp_player_ids:
98 return None
99
100 if leader_player := self.mass.players.get(grp_name):
101 return grp_name if leader_player.available else None
102
103 return None
104
105 @cached_property
106 def group_members(self) -> list[str]:
107 """Return the group members of the player."""
108 if not self._attr_available:
109 return []
110
111 grp_name = self.snap_group_name
112 if grp_name != self.player_id:
113 # only group leaders can have members
114 return []
115
116 player_ids = self._get_player_ids_of_curr_group()
117 if self.player_id not in player_ids:
118 # should not happen, unless the current
119 # state repr is invalid
120 return []
121
122 player_ids.remove(self.player_id)
123 connected = [
124 player_id
125 for player_id in player_ids
126 if (client := self.snap_provider.get_snap_client(player_id=player_id))
127 and client.connected
128 ]
129 if connected:
130 return [self.player_id, *connected]
131
132 return []
133
134 @property
135 def playback_state(self) -> PlaybackState:
136 """Return the current playback state of the player."""
137 snap_stream = self._get_active_snapstream()
138 if snap_stream is None:
139 return PlaybackState.IDLE
140
141 if snap_stream.identifier == "default" or snap_stream.status == "idle":
142 return PlaybackState.IDLE
143
144 return PlaybackState.PLAYING
145
146 def setup(self) -> None:
147 """Set up player."""
148 self._attr_name = self.snap_client.friendly_name
149 self._attr_available = self.snap_client.connected
150
151 host_dict = self.snap_client._client.get("host", {})
152 os, arch, ip = (host_dict.get(key, "") for key in ["os", "arch", "ip"])
153 self._attr_device_info = DeviceInfo(
154 model=os,
155 manufacturer=arch,
156 )
157 self._attr_device_info.ip_address = ip
158 self._attr_supported_features = {
159 PlayerFeature.SET_MEMBERS,
160 PlayerFeature.VOLUME_SET,
161 PlayerFeature.VOLUME_MUTE,
162 PlayerFeature.PLAY_ANNOUNCEMENT,
163 }
164 self._attr_can_group_with = {self.snap_provider.instance_id}
165 if not self._update_worker:
166 self._update_worker = self.mass.create_task(self._player_update_worker)
167
168 async def volume_set(self, volume_level: int) -> None:
169 """Send VOLUME_SET command to given player."""
170 # Use optimistic server state for now
171 # not guaranteed that the client respects it
172 await self.snap_client.set_volume(volume_level)
173
174 async def stop(self) -> None:
175 """Send STOP command to given player."""
176 if ma_stream := self.active_snap_ma_stream:
177 ma_stream.request_stop_stream()
178 return
179
180 self.poke_player_update()
181
182 async def volume_mute(self, muted: bool) -> None:
183 """Send MUTE command to given player."""
184 # Use optimistic server state for now
185 # not guaranteed that the client respects it
186 # TODO: move this to the snapcast python library
187 vol = self.snap_client._client["config"]["volume"]
188 vol["muted"] = muted
189 res = await self.snap_provider._snapserver.client_volume(self.snap_client.identifier, vol)
190 if res and "muted" in res:
191 self.snap_client._client["config"]["volume"] = res
192 self.snap_client.callback()
193
194 async def set_members(
195 self,
196 player_ids_to_add: list[str] | None = None,
197 player_ids_to_remove: list[str] | None = None,
198 ) -> None:
199 """Handle SET_MEMBERS command on the player."""
200 # get the group owned by this player (identified by the group name)
201 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
202
203 if player_group is None:
204 return
205
206 player_group.set_callback(None)
207
208 curr_ma_player_ids = [
209 ma_id
210 for cli_id in player_group.clients
211 if (ma_id := self.snap_provider._get_ma_id(cli_id))
212 ]
213
214 curr_stream_id = player_group.stream
215 sync_group_player = None
216 if curr_ma_stream := self.snap_provider.get_snap_ma_stream(curr_stream_id):
217 media = curr_ma_stream.media
218 if media.media_type == MediaType.PLUGIN_SOURCE:
219 custom_data = media.custom_data or {}
220 assigned_player = custom_data.get("player_id", "")
221 if assigned_player.startswith(SYNCGROUP_PREFIX):
222 sync_group_player = self.mass.players.get(assigned_player)
223 else:
224 media_src_id = media.source_id or ""
225 if media_src_id.startswith(SYNCGROUP_PREFIX):
226 sync_group_player = self.mass.players.get(media_src_id)
227
228 if sync_group_player and self.player_id in (player_ids_to_remove or []):
229 # players in sync_group_player.group_members will be rejoined
230 # remove others first
231 for id_to_remove in player_ids_to_remove or []:
232 if id_to_remove == self.player_id:
233 continue
234 if (
235 id_to_remove in curr_ma_player_ids
236 and id_to_remove not in sync_group_player.group_members
237 ):
238 await self.snap_provider.isolate_player_to_dedicated_group(
239 id_to_remove, target_stream_id="default"
240 )
241
242 # split remaining group into individual groups,
243 # keeps the current stream, set this group to default stream
244 await self.snap_provider.isolate_player_to_dedicated_group(
245 target_player_id=self.player_id,
246 target_stream_id="default",
247 others_stream_id=curr_stream_id,
248 )
249 else:
250 for player_id in player_ids_to_remove or []:
251 if player_id not in curr_ma_player_ids:
252 continue
253 await self.snap_provider.isolate_player_to_dedicated_group(
254 player_id, target_stream_id="default"
255 )
256 curr_ma_player_ids.remove(player_id)
257
258 for ma_id in player_ids_to_add or []:
259 if (
260 snap_id := self.snap_provider._get_snapclient_id(ma_id)
261 ) and ma_id not in curr_ma_player_ids:
262 await player_group.add_client(snap_id)
263
264 # some caller require instant state updates before returning
265 async with self._state_update_lock:
266 if await self._process_snapcast_client_state():
267 self.update_state()
268
269 self.snap_provider._update_group_callbacks(poke=True)
270
271 async def play_media(self, media: PlayerMedia) -> None:
272 """Handle PLAY MEDIA on given player."""
273 if self.synced_to:
274 msg = "A synced player cannot receive play commands directly"
275 raise RuntimeError(msg)
276
277 ma_stream = await self.snap_provider.get_snapcast_media_stream(
278 media, filter_settings_owner=self.player_id
279 )
280
281 if ma_stream is None or ma_stream.stream_id is None:
282 return
283
284 self._snap_ma_stream = ma_stream
285
286 # e.g. DSP settings require a restart
287 await self._snap_ma_stream.start_stream(allow_restart=True)
288
289 # if no announcement is playing we activate the stream now, otherwise it
290 # will be activated by play_announcement when the announcement is over.
291 if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
292 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
293 assert player_group is not None # for type checking
294 await player_group.set_stream(ma_stream.stream_id)
295
296 self.poke_player_update()
297
298 async def play_announcement(
299 self, announcement: PlayerMedia, volume_level: int | None = None
300 ) -> None:
301 """Handle (provider native) playback of an announcement on given player."""
302 was_synced_to: str | None = self.synced_to
303 orig_volume_level: int | None = self.volume_level
304
305 prev_stream = self.active_snap_ma_stream
306
307 ma_stream = await self.snap_provider.get_snapcast_media_stream(
308 announcement, filter_settings_owner=self.player_id
309 )
310 player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
311
312 if ma_stream is None or ma_stream.stream_id is None or player_group is None:
313 return
314
315 await player_group.set_stream(ma_stream.stream_id)
316
317 if self.snap_provider._use_builtin_server:
318 await asyncio.sleep(self.snap_provider._snapcast_server_buffer_size / 1000.0)
319
320 if volume_level is not None:
321 await self.volume_set(volume_level)
322
323 await ma_stream.start_stream()
324 await ma_stream.wait_for_stopped()
325
326 if self.volume_level == volume_level and orig_volume_level is not None:
327 await self.volume_set(orig_volume_level)
328
329 if was_synced_to:
330 if (
331 leader_group := await self.snap_provider.ensure_player_owned_group(was_synced_to)
332 ) is None:
333 return
334 await leader_group.add_client(self.snap_client.identifier)
335 else:
336 await player_group.set_stream(
337 prev_stream.stream_id
338 if prev_stream and prev_stream.stream_id is not None
339 else "default"
340 )
341
342 async def get_config_entries(
343 self,
344 action: str | None = None,
345 values: dict[str, ConfigValueType] | None = None,
346 ) -> list[ConfigEntry]:
347 """Player config."""
348 return [
349 CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
350 # we don't use the http server for streaming
351 CONF_ENTRY_HTTP_PROFILE_HIDDEN,
352 ]
353
354 def _handle_player_update(self, snap_client: SnapclientProto) -> None:
355 """Forward snap_client updates."""
356 self.poke_player_update()
357
358 def poke_player_update(self) -> None:
359 """Signal that a player state update should be processed."""
360 self._poke_evt.set()
361
362 async def _player_update_worker(self) -> None:
363 """Aggregate and process player state update requests."""
364 while True:
365 await self._poke_evt.wait()
366 self._poke_evt.clear()
367 while True:
368 call_update: bool = False
369 async with self._state_update_lock:
370 call_update = await self._process_snapcast_client_state()
371 if call_update:
372 self.update_state()
373 if self._poke_evt.is_set():
374 self._poke_evt.clear()
375 continue
376 break
377
378 async def _process_snapcast_client_state(self) -> bool:
379 """Process the latest Snapcast client state and apply changes to this player.
380
381 Returns:
382 True if changes were applied and a state update should be emitted via
383 ``update_state()``; False if no update is necessary (or if required data
384 is temporarily unavailable and the update should be retried later).
385 """
386 snap_group = self.snap_client.group
387 if snap_group is None:
388 # some data syncing error, a client is always a group member
389 # retry again later, don't call update now
390 return False
391
392 stream_id = snap_group.stream
393 snap_stream: SnapstreamProto | None = None
394 with suppress(KeyError):
395 snap_stream = self.snap_provider._snapserver.stream(stream_id)
396
397 members = list(snap_group.clients) # snapshot
398
399 curr_state: TrackedPlayerState = {
400 "_attr_name": self.snap_client.friendly_name,
401 "_attr_volume_level": self.snap_client.volume,
402 "_attr_volume_muted": self.snap_client.muted,
403 "_attr_available": self.snap_client.connected,
404 "connected": self.snap_client.connected,
405 "stream_id": snap_group.stream,
406 "stream_status": snap_stream.status if snap_stream is not None else None,
407 "grp_name": snap_group.name,
408 "grp_member_ids": members,
409 "grp_member_avail": [
410 pl.available
411 for cl_id in members
412 if (pl_id := self.snap_provider._get_ma_id(cl_id))
413 and (pl := self.mass.players.get(pl_id))
414 ],
415 }
416
417 prev_state: TrackedPlayerState = (
418 self._last_tracked_state if self._last_tracked_state is not None else {}
419 )
420 self._last_tracked_state = curr_state
421
422 # change detection for simple attrs
423 changed_attrs = {
424 k: v for k, v in curr_state.items() if k.startswith("_attr_") and prev_state.get(k) != v
425 }
426
427 prev_connected = prev_state.get("connected", False)
428 now_connected = curr_state.get("connected", False)
429 connection_changed = prev_connected != now_connected
430
431 prev_stream_id = prev_state.get("stream_id")
432 curr_stream_id = curr_state["stream_id"]
433 prev_stream_status = prev_state.get("stream_status")
434 curr_stream_status = curr_state.get("stream_status")
435
436 stream_changed = (
437 prev_stream_id != curr_stream_id or prev_stream_status != curr_stream_status
438 )
439
440 grouping_changed = any(
441 prev_state.get(k) != curr_state.get(k)
442 for k in ("grp_name", "grp_member_ids", "grp_member_avail")
443 )
444
445 needs_processing = bool(
446 changed_attrs or grouping_changed or stream_changed or connection_changed
447 )
448 if not needs_processing:
449 return False
450
451 if connection_changed or grouping_changed:
452 self.snap_provider.poke_group_members(snap_group)
453
454 # help cleaning up unused streams
455 if curr_stream_id == "default" or (
456 (my_stream := self._snap_ma_stream)
457 and my_stream.stream_id in {prev_stream_id, curr_stream_id}
458 ):
459 self.snap_provider.update_stream_usage()
460
461 # apply changed attrs
462 for key, value in changed_attrs.items():
463 setattr(self, key, value)
464
465 # finally notify state update once
466 return True
467
468 @property
469 def active_snap_ma_stream(self) -> SnapcastMAStream | None:
470 """Return the MA stream source of the active group."""
471 grp = self.snap_client.group
472 if grp is None or grp.stream is None:
473 return None
474
475 if grp.stream == "default":
476 return None
477
478 return self.snap_provider.get_snap_ma_stream(grp.stream)
479
480 @property
481 def snap_group_name(self) -> str:
482 """Return the name of the active group."""
483 snap_group = self.snap_client.group
484 if snap_group is None:
485 return ""
486 return snap_group.name
487
488 @cached_property
489 def _current_media(self) -> PlayerMedia | None:
490 """
491 Return the current media being played by the player.
492
493 Note that this is NOT the final current media of the player,
494 as it may be overridden by a active group/sync membership.
495 Hence it's marked as a private property.
496 The final current media can be retrieved by using the 'current_media' property.
497 """
498 if snap_ma_stream := self.active_snap_ma_stream:
499 return snap_ma_stream.media
500 return None
501
502 @property
503 def _active_source(self) -> str | None:
504 """
505 Return the (id of) the active source of the player.
506
507 Only required if the player supports PlayerFeature.SELECT_SOURCE.
508
509 Set to None if the player is not currently playing a source or
510 the player_id if the player is currently playing a MA queue.
511
512 Note that this is NOT the final active source of the player,
513 as it may be overridden by a active group/sync membership.
514 Hence it's marked as a private property.
515 The final active source can be retrieved by using the 'active_source' property.
516 """
517 grp = self.snap_client.group
518 if grp is None or grp.stream is None:
519 return None
520
521 if grp.stream == "default":
522 return None
523
524 if ma_stream := self.snap_provider.get_snap_ma_stream(grp.stream):
525 return ma_stream.source_id
526
527 # external snapcast stream
528 return grp.stream or None
529
530 def _get_active_snapstream(self) -> SnapstreamProto | None:
531 """Get active stream for given player_id."""
532 if group := self.snap_client.group:
533 with suppress(KeyError):
534 return self.snap_provider._snapserver.stream(group.stream)
535 return None
536
537 def _get_player_ids_of_curr_group(self) -> list[str]:
538 snap_group = self.snap_client.group
539 if snap_group is None:
540 return []
541 return [
542 ma_id
543 for client_id in snap_group.clients
544 if (ma_id := self.snap_provider._get_ma_id(client_id))
545 ]
546
547 def _get_players_of_curr_group(self) -> list[Player]:
548 return [
549 ma_player
550 for ma_id in self._get_player_ids_of_curr_group()
551 if (ma_player := self.mass.players.get(ma_id))
552 ]
553