music-assistant-server

20.6 KBPY
player.py
20.6 KB572 lines • python
1"""Snapcast Player."""
2
3from __future__ import annotations
4
5import asyncio
6from contextlib import suppress
7from typing import TYPE_CHECKING, TypedDict, cast
8
9from music_assistant_models.enums import MediaType, PlaybackState, PlayerFeature
10from music_assistant_models.player import DeviceInfo, PlayerMedia
11from propcache import under_cached_property as cached_property
12
13from music_assistant.constants import (
14    ATTR_ANNOUNCEMENT_IN_PROGRESS,
15    CONF_ENTRY_HTTP_PROFILE_HIDDEN,
16    SYNCGROUP_PREFIX,
17)
18from music_assistant.models.player import Player
19from music_assistant.providers.snapcast.constants import CONF_ENTRY_SAMPLE_RATES_SNAPCAST
20from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
21
22if TYPE_CHECKING:
23    from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
24
25    from music_assistant.providers.snapcast.provider import SnapCastProvider
26    from music_assistant.providers.snapcast.snap_cntrl_proto import (
27        SnapclientProto,
28        SnapstreamProto,
29    )
30
31
32class TrackedPlayerState(TypedDict, total=False):
33    """Tracked state for the Snapcast MA player.
34
35    It is used for change detection and state synchronization, and may be
36    partially populated depending on which information is
37    currently available.
38
39    Keys prefixed with ``_attr_`` are exposed as player attributes, while the
40    remaining keys represent internal Snapcast grouping and connection state.
41    """
42
43    # Player attribute fields
44    _attr_name: str
45    _attr_volume_level: float
46    _attr_volume_muted: bool
47    _attr_available: bool
48
49    # snapclient fields
50    connected: bool
51    stream_id: str
52    stream_status: str | None
53    grp_name: str
54    grp_member_ids: list[str]
55    grp_member_avail: list[bool]
56
57
58class SnapCastPlayer(Player):
59    """SnapCastPlayer."""
60
61    def __init__(
62        self,
63        provider: SnapCastProvider,
64        player_id: str,
65        snap_client: SnapclientProto,
66    ) -> None:
67        """Init."""
68        self.snap_client = snap_client
69        super().__init__(provider, player_id)
70
71        self._snap_ma_stream: SnapcastMAStream | None = None
72
73        self._update_worker: asyncio.Task[None] | None = None
74        self._poke_evt = asyncio.Event()
75        self._state_update_lock = asyncio.Lock()
76        self._last_tracked_state: TrackedPlayerState | None = None
77
78    @property
79    def snap_provider(self) -> SnapCastProvider:
80        """Return the Snapcast provider instance."""
81        return cast("SnapCastProvider", self.provider)
82
83    @property
84    def requires_flow_mode(self) -> bool:
85        """Return if the player requires flow mode."""
86        return True
87
88    @cached_property
89    def synced_to(self) -> str | None:
90        """Return the id of the player this player is synced to (sync leader)."""
91        grp_name = self.snap_group_name
92        if grp_name == self.player_id:
93            # is group leader
94            return None
95
96        grp_player_ids = self._get_player_ids_of_curr_group()
97        if len(grp_player_ids) < 2 or grp_name not in grp_player_ids:
98            return None
99
100        if leader_player := self.mass.players.get(grp_name):
101            return grp_name if leader_player.available else None
102
103        return None
104
105    @cached_property
106    def group_members(self) -> list[str]:
107        """Return the group members of the player."""
108        if not self._attr_available:
109            return []
110
111        grp_name = self.snap_group_name
112        if grp_name != self.player_id:
113            # only group leaders can have members
114            return []
115
116        player_ids = self._get_player_ids_of_curr_group()
117        if self.player_id not in player_ids:
118            # should not happen, unless the current
119            # state repr is invalid
120            return []
121
122        player_ids.remove(self.player_id)
123        connected = [
124            player_id
125            for player_id in player_ids
126            if (client := self.snap_provider.get_snap_client(player_id=player_id))
127            and client.connected
128        ]
129        if connected:
130            return [self.player_id, *connected]
131
132        return []
133
134    @property
135    def playback_state(self) -> PlaybackState:
136        """Return the current playback state of the player."""
137        snap_stream = self._get_active_snapstream()
138        if snap_stream is None:
139            return PlaybackState.IDLE
140
141        if snap_stream.identifier == "default" or snap_stream.status == "idle":
142            return PlaybackState.IDLE
143
144        return PlaybackState.PLAYING
145
146    @property
147    def elapsed_time(self) -> float | None:
148        """Return the elapsed time in (fractional) seconds of the current track (if any)."""
149        # using flow-mode, elapsed time will be estimated upstream from 'elapsed_time_last_updated'
150        return 0 if self.active_snap_ma_stream else None
151
152    @property
153    def elapsed_time_last_updated(self) -> float | None:
154        """
155        Return when the elapsed time was last updated.
156
157        return: The (UTC) timestamp when the elapsed time was last updated,
158        or None if it was never updated (or unknown).
159        """
160        # we only update on playback starts
161        if snap_ma_stream := self.active_snap_ma_stream:
162            return snap_ma_stream.playback_started_at
163        return None
164
165    def setup(self) -> None:
166        """Set up player."""
167        self._attr_name = self.snap_client.friendly_name
168        self._attr_available = self.snap_client.connected
169
170        host_dict = self.snap_client._client.get("host", {})
171        os, arch, ip = (host_dict.get(key, "") for key in ["os", "arch", "ip"])
172        self._attr_device_info = DeviceInfo(
173            model=os,
174            manufacturer=arch,
175        )
176        self._attr_device_info.ip_address = ip
177        self._attr_supported_features = {
178            PlayerFeature.SET_MEMBERS,
179            PlayerFeature.VOLUME_SET,
180            PlayerFeature.VOLUME_MUTE,
181            PlayerFeature.PLAY_ANNOUNCEMENT,
182        }
183        self._attr_can_group_with = {self.snap_provider.instance_id}
184        if not self._update_worker:
185            self._update_worker = self.mass.create_task(self._player_update_worker)
186
187    async def volume_set(self, volume_level: int) -> None:
188        """Send VOLUME_SET command to given player."""
189        # Use optimistic server state for now
190        # not guaranteed that the client respects it
191        await self.snap_client.set_volume(volume_level)
192
193    async def stop(self) -> None:
194        """Send STOP command to given player."""
195        if ma_stream := self.active_snap_ma_stream:
196            ma_stream.request_stop_stream()
197            return
198
199        self.poke_player_update()
200
201    async def volume_mute(self, muted: bool) -> None:
202        """Send MUTE command to given player."""
203        # Use optimistic server state for now
204        # not guaranteed that the client respects it
205        # TODO: move this to the snapcast python library
206        vol = self.snap_client._client["config"]["volume"]
207        vol["muted"] = muted
208        res = await self.snap_provider._snapserver.client_volume(self.snap_client.identifier, vol)
209        if res and "muted" in res:
210            self.snap_client._client["config"]["volume"] = res
211            self.snap_client.callback()
212
213    async def set_members(
214        self,
215        player_ids_to_add: list[str] | None = None,
216        player_ids_to_remove: list[str] | None = None,
217    ) -> None:
218        """Handle SET_MEMBERS command on the player."""
219        # get the group owned by this player (identified by the group name)
220        player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
221
222        if player_group is None:
223            return
224
225        player_group.set_callback(None)
226
227        curr_ma_player_ids = [
228            ma_id
229            for cli_id in player_group.clients
230            if (ma_id := self.snap_provider._get_ma_id(cli_id))
231        ]
232
233        curr_stream_id = player_group.stream
234        sync_group_player = None
235        if curr_ma_stream := self.snap_provider.get_snap_ma_stream(curr_stream_id):
236            media = curr_ma_stream.media
237            if media.media_type == MediaType.PLUGIN_SOURCE:
238                custom_data = media.custom_data or {}
239                assigned_player = custom_data.get("player_id", "")
240                if assigned_player.startswith(SYNCGROUP_PREFIX):
241                    sync_group_player = self.mass.players.get(assigned_player)
242            else:
243                media_src_id = media.source_id or ""
244                if media_src_id.startswith(SYNCGROUP_PREFIX):
245                    sync_group_player = self.mass.players.get(media_src_id)
246
247        if sync_group_player and self.player_id in (player_ids_to_remove or []):
248            # players in sync_group_player.group_members will be rejoined
249            # remove others first
250            for id_to_remove in player_ids_to_remove or []:
251                if id_to_remove == self.player_id:
252                    continue
253                if (
254                    id_to_remove in curr_ma_player_ids
255                    and id_to_remove not in sync_group_player.group_members
256                ):
257                    await self.snap_provider.isolate_player_to_dedicated_group(
258                        id_to_remove, target_stream_id="default"
259                    )
260
261            # split remaining group into individual groups,
262            # keeps the current stream, set this group to default stream
263            await self.snap_provider.isolate_player_to_dedicated_group(
264                target_player_id=self.player_id,
265                target_stream_id="default",
266                others_stream_id=curr_stream_id,
267            )
268        else:
269            for player_id in player_ids_to_remove or []:
270                if player_id not in curr_ma_player_ids:
271                    continue
272                await self.snap_provider.isolate_player_to_dedicated_group(
273                    player_id, target_stream_id="default"
274                )
275                curr_ma_player_ids.remove(player_id)
276
277        for ma_id in player_ids_to_add or []:
278            if (
279                snap_id := self.snap_provider._get_snapclient_id(ma_id)
280            ) and ma_id not in curr_ma_player_ids:
281                await player_group.add_client(snap_id)
282
283        # some caller require instant state updates before returning
284        async with self._state_update_lock:
285            if await self._process_snapcast_client_state():
286                self.update_state()
287
288        self.snap_provider._update_group_callbacks(poke=True)
289
290    async def play_media(self, media: PlayerMedia) -> None:
291        """Handle PLAY MEDIA on given player."""
292        if self.synced_to:
293            msg = "A synced player cannot receive play commands directly"
294            raise RuntimeError(msg)
295
296        ma_stream = await self.snap_provider.get_snapcast_media_stream(
297            media, filter_settings_owner=self.player_id
298        )
299
300        if ma_stream is None or ma_stream.stream_id is None:
301            return
302
303        self._snap_ma_stream = ma_stream
304
305        # e.g. DSP settings require a restart
306        await self._snap_ma_stream.start_stream(allow_restart=True)
307
308        # if no announcement is playing we activate the stream now, otherwise it
309        # will be activated by play_announcement when the announcement is over.
310        if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
311            player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
312            assert player_group is not None  # for type checking
313            await player_group.set_stream(ma_stream.stream_id)
314
315        self.poke_player_update()
316
317    async def play_announcement(
318        self, announcement: PlayerMedia, volume_level: int | None = None
319    ) -> None:
320        """Handle (provider native) playback of an announcement on given player."""
321        was_synced_to: str | None = self.synced_to
322        orig_volume_level: int | None = self.volume_level
323
324        prev_stream = self.active_snap_ma_stream
325
326        ma_stream = await self.snap_provider.get_snapcast_media_stream(
327            announcement, filter_settings_owner=self.player_id
328        )
329        player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
330
331        if ma_stream is None or ma_stream.stream_id is None or player_group is None:
332            return
333
334        await player_group.set_stream(ma_stream.stream_id)
335
336        if self.snap_provider._use_builtin_server:
337            await asyncio.sleep(self.snap_provider._snapcast_server_buffer_size / 1000.0)
338
339        if volume_level is not None:
340            await self.volume_set(volume_level)
341
342        await ma_stream.start_stream()
343        await ma_stream.wait_for_stopped()
344
345        if self.volume_level == volume_level and orig_volume_level is not None:
346            await self.volume_set(orig_volume_level)
347
348        if was_synced_to:
349            if (
350                leader_group := await self.snap_provider.ensure_player_owned_group(was_synced_to)
351            ) is None:
352                return
353            await leader_group.add_client(self.snap_client.identifier)
354        else:
355            await player_group.set_stream(
356                prev_stream.stream_id
357                if prev_stream and prev_stream.stream_id is not None
358                else "default"
359            )
360
361    async def get_config_entries(
362        self,
363        action: str | None = None,
364        values: dict[str, ConfigValueType] | None = None,
365    ) -> list[ConfigEntry]:
366        """Player config."""
367        return [
368            CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
369            # we don't use the http server for streaming
370            CONF_ENTRY_HTTP_PROFILE_HIDDEN,
371        ]
372
373    def _handle_player_update(self, snap_client: SnapclientProto) -> None:
374        """Forward snap_client updates."""
375        self.poke_player_update()
376
377    def poke_player_update(self) -> None:
378        """Signal that a player state update should be processed."""
379        self._poke_evt.set()
380
381    async def _player_update_worker(self) -> None:
382        """Aggregate and process player state update requests."""
383        while True:
384            await self._poke_evt.wait()
385            self._poke_evt.clear()
386            while True:
387                call_update: bool = False
388                async with self._state_update_lock:
389                    call_update = await self._process_snapcast_client_state()
390                if call_update:
391                    self.update_state()
392                if self._poke_evt.is_set():
393                    self._poke_evt.clear()
394                    continue
395                break
396
397    async def _process_snapcast_client_state(self) -> bool:
398        """Process the latest Snapcast client state and apply changes to this player.
399
400        Returns:
401        True if changes were applied and a state update should be emitted via
402        ``update_state()``; False if no update is necessary (or if required data
403        is temporarily unavailable and the update should be retried later).
404        """
405        snap_group = self.snap_client.group
406        if snap_group is None:
407            # some data syncing error, a client is always a group member
408            # retry again later, don't call update now
409            return False
410
411        stream_id = snap_group.stream
412        snap_stream: SnapstreamProto | None = None
413        with suppress(KeyError):
414            snap_stream = self.snap_provider._snapserver.stream(stream_id)
415
416        members = list(snap_group.clients)  # snapshot
417
418        curr_state: TrackedPlayerState = {
419            "_attr_name": self.snap_client.friendly_name,
420            "_attr_volume_level": self.snap_client.volume,
421            "_attr_volume_muted": self.snap_client.muted,
422            "_attr_available": self.snap_client.connected,
423            "connected": self.snap_client.connected,
424            "stream_id": snap_group.stream,
425            "stream_status": snap_stream.status if snap_stream is not None else None,
426            "grp_name": snap_group.name,
427            "grp_member_ids": members,
428            "grp_member_avail": [
429                pl.available
430                for cl_id in members
431                if (pl_id := self.snap_provider._get_ma_id(cl_id))
432                and (pl := self.mass.players.get(pl_id))
433            ],
434        }
435
436        prev_state: TrackedPlayerState = (
437            self._last_tracked_state if self._last_tracked_state is not None else {}
438        )
439        self._last_tracked_state = curr_state
440
441        # change detection for simple attrs
442        changed_attrs = {
443            k: v for k, v in curr_state.items() if k.startswith("_attr_") and prev_state.get(k) != v
444        }
445
446        prev_connected = prev_state.get("connected", False)
447        now_connected = curr_state.get("connected", False)
448        connection_changed = prev_connected != now_connected
449
450        prev_stream_id = prev_state.get("stream_id")
451        curr_stream_id = curr_state["stream_id"]
452        prev_stream_status = prev_state.get("stream_status")
453        curr_stream_status = curr_state.get("stream_status")
454
455        stream_changed = (
456            prev_stream_id != curr_stream_id or prev_stream_status != curr_stream_status
457        )
458
459        grouping_changed = any(
460            prev_state.get(k) != curr_state.get(k)
461            for k in ("grp_name", "grp_member_ids", "grp_member_avail")
462        )
463
464        needs_processing = bool(
465            changed_attrs or grouping_changed or stream_changed or connection_changed
466        )
467        if not needs_processing:
468            return False
469
470        if connection_changed or grouping_changed:
471            self.snap_provider.poke_group_members(snap_group)
472
473        # help cleaning up unused streams
474        if curr_stream_id == "default" or (
475            (my_stream := self._snap_ma_stream)
476            and my_stream.stream_id in {prev_stream_id, curr_stream_id}
477        ):
478            self.snap_provider.update_stream_usage()
479
480        # apply changed attrs
481        for key, value in changed_attrs.items():
482            setattr(self, key, value)
483
484        # finally notify state update once
485        return True
486
487    @property
488    def active_snap_ma_stream(self) -> SnapcastMAStream | None:
489        """Return the MA stream source of the active group."""
490        grp = self.snap_client.group
491        if grp is None or grp.stream is None:
492            return None
493
494        if grp.stream == "default":
495            return None
496
497        return self.snap_provider.get_snap_ma_stream(grp.stream)
498
499    @property
500    def snap_group_name(self) -> str:
501        """Return the name of the active group."""
502        snap_group = self.snap_client.group
503        if snap_group is None:
504            return ""
505        return snap_group.name
506
507    @cached_property
508    def _current_media(self) -> PlayerMedia | None:
509        """
510        Return the current media being played by the player.
511
512        Note that this is NOT the final current media of the player,
513        as it may be overridden by a active group/sync membership.
514        Hence it's marked as a private property.
515        The final current media can be retrieved by using the 'current_media' property.
516        """
517        if snap_ma_stream := self.active_snap_ma_stream:
518            return snap_ma_stream.media
519        return None
520
521    @property
522    def _active_source(self) -> str | None:
523        """
524        Return the (id of) the active source of the player.
525
526        Only required if the player supports PlayerFeature.SELECT_SOURCE.
527
528        Set to None if the player is not currently playing a source or
529        the player_id if the player is currently playing a MA queue.
530
531        Note that this is NOT the final active source of the player,
532        as it may be overridden by a active group/sync membership.
533        Hence it's marked as a private property.
534        The final active source can be retrieved by using the 'active_source' property.
535        """
536        grp = self.snap_client.group
537        if grp is None or grp.stream is None:
538            return None
539
540        if grp.stream == "default":
541            return None
542
543        if ma_stream := self.snap_provider.get_snap_ma_stream(grp.stream):
544            return ma_stream.source_id
545
546        # external snapcast stream
547        return grp.stream or None
548
549    def _get_active_snapstream(self) -> SnapstreamProto | None:
550        """Get active stream for given player_id."""
551        if group := self.snap_client.group:
552            with suppress(KeyError):
553                return self.snap_provider._snapserver.stream(group.stream)
554        return None
555
556    def _get_player_ids_of_curr_group(self) -> list[str]:
557        snap_group = self.snap_client.group
558        if snap_group is None:
559            return []
560        return [
561            ma_id
562            for client_id in snap_group.clients
563            if (ma_id := self.snap_provider._get_ma_id(client_id))
564        ]
565
566    def _get_players_of_curr_group(self) -> list[Player]:
567        return [
568            ma_player
569            for ma_id in self._get_player_ids_of_curr_group()
570            if (ma_player := self.mass.players.get(ma_id))
571        ]
572