music-assistant-server

25.8 KBPY
sync_groups.py
25.8 KB610 lines • python
1"""
2Controller for (provider specific) SyncGroup players.
3
4A SyncGroup player is a virtual player that automatically groups multiple players
5together in a sync group, where one player is the sync leader
6and the other players are synced to that leader.
7"""
8
9from __future__ import annotations
10
11import asyncio
12from copy import deepcopy
13from typing import TYPE_CHECKING, cast
14
15import shortuuid
16from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
17from music_assistant_models.constants import PLAYER_CONTROL_NONE
18from music_assistant_models.enums import (
19    ConfigEntryType,
20    PlaybackState,
21    PlayerFeature,
22    PlayerType,
23    ProviderFeature,
24)
25from music_assistant_models.errors import UnsupportedFeaturedException
26from music_assistant_models.player import DeviceInfo, PlayerMedia, PlayerSource
27from propcache import under_cached_property as cached_property
28
29from music_assistant.constants import (
30    CONF_CROSSFADE_DURATION,
31    CONF_DYNAMIC_GROUP_MEMBERS,
32    CONF_ENABLE_ICY_METADATA,
33    CONF_FLOW_MODE,
34    CONF_GROUP_MEMBERS,
35    CONF_HTTP_PROFILE,
36    CONF_OUTPUT_CODEC,
37    CONF_SAMPLE_RATES,
38    CONF_SMART_FADES_MODE,
39    SYNCGROUP_PREFIX,
40)
41from music_assistant.models.player import GroupPlayer, Player
42
43if TYPE_CHECKING:
44    from music_assistant.models.player_provider import PlayerProvider
45
46    from .player_controller import PlayerController
47
48
49SUPPORT_DYNAMIC_LEADER = {
50    # providers that support dynamic leader selection in a syncgroup
51    # meaning that if you would remove the current leader from the group,
52    # the provider will automatically select a new leader from the remaining members
53    # and the music keeps playing uninterrupted.
54    "airplay",
55    "squeezelite",
56    # TODO: Get this working with Sonos as well (need to handle range requests)
57}
58
59OPTIONAL_FEATURES = {
60    PlayerFeature.ENQUEUE,
61    PlayerFeature.GAPLESS_PLAYBACK,
62    PlayerFeature.NEXT_PREVIOUS,
63    PlayerFeature.PAUSE,
64    PlayerFeature.PLAY_ANNOUNCEMENT,
65    PlayerFeature.SEEK,
66    PlayerFeature.SELECT_SOURCE,
67    PlayerFeature.VOLUME_MUTE,
68    PlayerFeature.MULTI_DEVICE_DSP,
69}
70
71
72class SyncGroupPlayer(GroupPlayer):
73    """Helper class for a (provider specific) SyncGroup player."""
74
75    _attr_type: PlayerType = PlayerType.GROUP
76    sync_leader: Player | None = None
77    """The active sync leader player for this syncgroup."""
78
79    @cached_property
80    def is_dynamic(self) -> bool:
81        """Return if the player is a dynamic group player."""
82        return bool(self.config.get_value(CONF_DYNAMIC_GROUP_MEMBERS, False))
83
84    def __init__(
85        self,
86        provider: PlayerProvider,
87        player_id: str,
88    ) -> None:
89        """Initialize GroupPlayer instance."""
90        super().__init__(provider, player_id)
91        self._attr_name = self.config.name or self.config.default_name or f"SyncGroup {player_id}"
92        self._attr_available = True
93        self._attr_powered = False  # group players are always powered off by default
94        self._attr_device_info = DeviceInfo(model="Sync Group", manufacturer=provider.name)
95        self._attr_supported_features = {
96            PlayerFeature.POWER,
97            PlayerFeature.VOLUME_SET,
98        }
99
100    async def on_config_updated(self) -> None:
101        """Handle logic when the player is loaded or updated."""
102        # Config is only available after the player was registered
103        self._cache.clear()  # clear to prevent loading old is_dynamic
104        static_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
105        if self.is_dynamic:
106            self._attr_static_group_members = []
107            self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
108        else:
109            self._attr_static_group_members = static_members.copy()
110            self._attr_supported_features.discard(PlayerFeature.SET_MEMBERS)
111        if not self.powered:
112            self._attr_group_members = static_members.copy()
113
114    @property
115    def supported_features(self) -> set[PlayerFeature]:
116        """Return the supported features of the player."""
117        members = self.group_members
118        reference_player: Player | None = self.sync_leader or (
119            self.mass.players.get(members[0]) if members else None
120        )
121        if reference_player:
122            base_features = self._attr_supported_features.copy()
123            # add features supported by the sync leader
124            for feature in OPTIONAL_FEATURES:
125                if feature in reference_player.supported_features:
126                    base_features.add(feature)
127            return base_features
128        return self._attr_supported_features
129
130    @property
131    def playback_state(self) -> PlaybackState:
132        """Return the current playback state of the player."""
133        if self.powered:
134            return self.sync_leader.playback_state if self.sync_leader else PlaybackState.IDLE
135        return PlaybackState.IDLE
136
137    @property
138    def requires_flow_mode(self) -> bool:
139        """Return if the player needs flow mode."""
140        if leader := self.sync_leader:
141            return leader.requires_flow_mode
142        return False
143
144    @property
145    def elapsed_time(self) -> float | None:
146        """Return the elapsed time in (fractional) seconds of the current track (if any)."""
147        return self.sync_leader.elapsed_time if self.sync_leader else None
148
149    @property
150    def elapsed_time_last_updated(self) -> float | None:
151        """Return when the elapsed time was last updated."""
152        return self.sync_leader.elapsed_time_last_updated if self.sync_leader else None
153
154    @property
155    def _current_media(self) -> PlayerMedia | None:
156        """Return the current media item (if any) loaded in the player."""
157        return self.sync_leader._current_media if self.sync_leader else self._attr_current_media
158
159    @property
160    def _active_source(self) -> str | None:
161        """Return the active source id (if any) of the player."""
162        return self.sync_leader._active_source if self.sync_leader else self._attr_active_source
163
164    @property
165    def _source_list(self) -> list[PlayerSource]:
166        """Return list of available (native) sources for this player."""
167        if self.sync_leader:
168            return self.sync_leader._source_list
169        return []
170
171    @property
172    def can_group_with(self) -> set[str]:
173        """
174        Return the id's of players this player can group with.
175
176        This should return set of player_id's this player can group/sync with
177        or just the provider's instance_id if all players can group with each other.
178        """
179        if self.is_dynamic and (leader := self.sync_leader):
180            return leader.can_group_with
181        if self.is_dynamic:
182            return {self.provider.instance_id}
183        return set()
184
185    async def get_config_entries(
186        self,
187        action: str | None = None,
188        values: dict[str, ConfigValueType] | None = None,
189    ) -> list[ConfigEntry]:
190        """Return all (provider/player specific) Config Entries for the given player (if any)."""
191        entries: list[ConfigEntry] = [
192            # syncgroup specific entries
193            ConfigEntry(
194                key=CONF_GROUP_MEMBERS,
195                type=ConfigEntryType.STRING,
196                multi_value=True,
197                label="Group members",
198                default_value=[],
199                description="Select all players you want to be part of this group",
200                required=False,  # needed for dynamic members (which allows empty members list)
201                options=[
202                    ConfigValueOption(x.display_name, x.player_id)
203                    for x in self.provider.players
204                    if x.type != PlayerType.GROUP
205                ],
206            ),
207            ConfigEntry(
208                key="dynamic_members",
209                type=ConfigEntryType.BOOLEAN,
210                label="Enable dynamic members",
211                description="Allow (un)joining members dynamically, so the group more or less "
212                "behaves the same like manually syncing players together, "
213                "with the main difference being that the group player will hold the queue.",
214                default_value=False,
215                required=False,
216            ),
217        ]
218        # combine base group entries with (base) player entries for this player type
219        child_player = next((x for x in self.provider.players if x.type == PlayerType.PLAYER), None)
220        if child_player:
221            allowed_conf_entries = (
222                CONF_HTTP_PROFILE,
223                CONF_ENABLE_ICY_METADATA,
224                CONF_CROSSFADE_DURATION,
225                CONF_OUTPUT_CODEC,
226                CONF_FLOW_MODE,
227                CONF_SAMPLE_RATES,
228                CONF_SMART_FADES_MODE,
229            )
230            child_config_entries = await child_player.get_config_entries()
231            entries.extend(
232                [entry for entry in child_config_entries if entry.key in allowed_conf_entries]
233            )
234        return entries
235
236    async def stop(self) -> None:
237        """Send STOP command to given player."""
238        if sync_leader := self.sync_leader:
239            await sync_leader.stop()
240
241    async def play(self) -> None:
242        """Send PLAY command to given player."""
243        if sync_leader := self.sync_leader:
244            await sync_leader.play()
245
246    async def pause(self) -> None:
247        """Send PAUSE command to given player."""
248        if sync_leader := self.sync_leader:
249            await sync_leader.pause()
250
251    async def power(self, powered: bool) -> None:
252        """Handle POWER command to group player."""
253        prev_power = self._attr_powered
254
255        # always stop at power off
256        if not powered and self.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
257            await self.stop()
258            self._attr_current_media = None
259
260        # optimistically set the group state
261        self._attr_powered = powered
262        if prev_power != powered:
263            self.update_state()
264
265        if powered:
266            # ensure static members are present when powering on
267            for static_group_member in self._attr_static_group_members:
268                member_player = self.mass.players.get(static_group_member)
269                if not member_player or not member_player.available or not member_player.enabled:
270                    if static_group_member in self._attr_group_members:
271                        self._attr_group_members.remove(static_group_member)
272                    continue
273                if static_group_member not in self._attr_group_members:
274                    # Always add static members when power(true) is called,
275                    # this will ensure that static members that just became available are added
276                    self._attr_group_members.append(static_group_member)
277            # Select sync leader and handle turn on
278            new_leader = self._select_sync_leader()
279            # handle TURN_ON of the group player by turning on all members
280            for member in self.mass.players.iter_group_members(
281                self, only_powered=False, active_only=False
282            ):
283                await self._handle_member_collisions(member)
284                if not member.powered and member.power_control != PLAYER_CONTROL_NONE:
285                    await self.mass.players._handle_cmd_power(member.player_id, True)
286            # Set up the sync group with the new leader
287            if prev_power and new_leader == self.sync_leader:
288                # Already powered on with same leader, just re-sync members without full transition
289                await self._form_syncgroup()
290            else:
291                await self._handle_leader_transition(new_leader)
292        elif prev_power and not powered:
293            # handle TURN_OFF of the group player by dissolving group and turning off all members
294            await self._dissolve_syncgroup()
295            # turn off all group members
296            for member in self.mass.players.iter_group_members(
297                self, only_powered=True, active_only=True
298            ):
299                if member.powered and member.power_control != PLAYER_CONTROL_NONE:
300                    await self.mass.players._handle_cmd_power(member.player_id, False)
301
302        if not powered:
303            # Reset to unfiltered static members list when powered off
304            # (the frontend will hide unavailable members)
305            self._attr_group_members = self._attr_static_group_members.copy()
306            # and clear the sync leader
307            self.sync_leader = None
308        self.update_state()
309
310    async def volume_set(self, volume_level: int) -> None:
311        """Send VOLUME_SET command to given player."""
312        # group volume is already handled in the player manager
313
314    async def play_media(self, media: PlayerMedia) -> None:
315        """Handle PLAY MEDIA on given player."""
316        # power on (which will also resync and add static members if needed)
317        await self.power(True)
318        # simply forward the command to the sync leader
319        if sync_leader := self.sync_leader:
320            await sync_leader.play_media(media)
321            self._attr_current_media = deepcopy(media)
322            self.update_state()
323        else:
324            raise RuntimeError("an empty group cannot play media, consider adding members first")
325
326    async def enqueue_next_media(self, media: PlayerMedia) -> None:
327        """Handle enqueuing of a next media item on the player."""
328        if sync_leader := self.sync_leader:
329            await sync_leader.enqueue_next_media(media)
330
331    async def select_source(self, source: str) -> None:
332        """
333        Handle SELECT SOURCE command on the player.
334
335        Will only be called if the PlayerFeature.SELECT_SOURCE is supported.
336
337        :param source: The source(id) to select, as defined in the source_list.
338        """
339        if sync_leader := self.sync_leader:
340            await sync_leader.select_source(source)
341            self.update_state()
342
343    async def set_members(
344        self,
345        player_ids_to_add: list[str] | None = None,
346        player_ids_to_remove: list[str] | None = None,
347    ) -> None:
348        """Handle SET_MEMBERS command on the player."""
349        if not self.is_dynamic:
350            raise UnsupportedFeaturedException(
351                f"Group {self.display_name} does not allow dynamically adding/removing members!"
352            )
353        # handle additions
354        final_players_to_add: list[str] = []
355        for player_id in player_ids_to_add or []:
356            if player_id in self._attr_group_members:
357                continue
358            if player_id == self.player_id:
359                raise UnsupportedFeaturedException(
360                    f"Cannot add {self.display_name} to itself as a member!"
361                )
362            self._attr_group_members.append(player_id)
363            final_players_to_add.append(player_id)
364        # handle removals
365        final_players_to_remove: list[str] = []
366        for player_id in player_ids_to_remove or []:
367            if player_id not in self._attr_group_members:
368                continue
369            if player_id == self.player_id:
370                raise UnsupportedFeaturedException(
371                    f"Cannot remove {self.display_name} from itself as a member!"
372                )
373            self._attr_group_members.remove(player_id)
374            final_players_to_remove.append(player_id)
375        self.update_state()
376        if not self.powered:
377            # Don't need to do anything else if the group is powered off
378            # The syncing will be done once powered on
379            return
380        next_leader = self._select_sync_leader()
381        prev_leader = self.sync_leader
382
383        if prev_leader and next_leader is None:
384            # Edge case: we no longer have any members in the group (and thus no leader)
385            await self._handle_leader_transition(None)
386        elif prev_leader != next_leader:
387            # Edge case: we had changed the leader (or just got one)
388            await self._handle_leader_transition(next_leader)
389        elif self.sync_leader and (player_ids_to_add or player_ids_to_remove):
390            # if the group still has the same leader, we need to (re)sync the members
391            # Handle collisions for newly added players
392            for player_id in final_players_to_add:
393                if player := self.mass.players.get(player_id):
394                    await self._handle_member_collisions(player)
395
396            await self.sync_leader.set_members(
397                player_ids_to_add=final_players_to_add,
398                player_ids_to_remove=final_players_to_remove,
399            )
400
401    async def _form_syncgroup(self) -> None:
402        """Form syncgroup by syncing all (possible) members."""
403        if self.sync_leader is None:
404            # This is an empty group, leader will be selected once a member is added
405            self._attr_group_members = []
406            self.update_state()
407            return
408        # ensure the sync leader is first in the list
409        self._attr_group_members = [
410            self.sync_leader.player_id,
411            *[x for x in self._attr_group_members if x != self.sync_leader.player_id],
412        ]
413        self.update_state()
414        members_to_sync: list[str] = []
415        members_to_remove: list[str] = []
416        for member in self.mass.players.iter_group_members(self, active_only=False):
417            # Handle collisions before attempting to sync
418            await self._handle_member_collisions(member)
419
420            if member.synced_to and member.synced_to != self.sync_leader.player_id:
421                # ungroup first
422                await member.ungroup()
423            if member.player_id == self.sync_leader.player_id:
424                # skip sync leader
425                continue
426            # Always add to members_to_sync to prevent them from being removed below
427            members_to_sync.append(member.player_id)
428        for former_members in self.sync_leader.group_members:
429            if (
430                former_members not in members_to_sync
431            ) and former_members != self.sync_leader.player_id:
432                members_to_remove.append(former_members)
433        if members_to_sync or members_to_remove:
434            await self.sync_leader.set_members(members_to_sync, members_to_remove)
435
436    async def _dissolve_syncgroup(self) -> None:
437        """Dissolve the current syncgroup by ungrouping all members and restoring leader queue."""
438        if sync_leader := self.sync_leader:
439            # dissolve the temporary syncgroup from the sync leader
440            sync_children = [x for x in sync_leader.group_members if x != sync_leader.player_id]
441            if sync_children:
442                await sync_leader.set_members(player_ids_to_remove=sync_children)
443            # Reset the leaders queue since it is no longer part of this group
444            sync_leader.update_state()
445
446    async def _handle_leader_transition(self, new_leader: Player | None) -> None:
447        """Handle transition from current leader to new leader."""
448        prev_leader = self.sync_leader
449        was_playing = False
450
451        if (
452            prev_leader
453            and new_leader
454            and prev_leader != new_leader
455            and self.provider.domain in SUPPORT_DYNAMIC_LEADER
456        ):
457            # provider supports dynamic leader selection, so just remove/add members
458            await prev_leader.ungroup()
459            self.sync_leader = new_leader
460            # allow some time to propagate the changes before resyncing
461            await asyncio.sleep(2)
462            await self._form_syncgroup()
463            return
464
465        if prev_leader:
466            # Save current media and playback state for potential restart
467            was_playing = self.playback_state == PlaybackState.PLAYING
468            # Stop current playback and dissolve existing group
469            await self.stop()
470            await self._dissolve_syncgroup()
471            # allow some time to propagate the changes before resyncing
472            await asyncio.sleep(2)
473
474        # Set new leader
475        self.sync_leader = new_leader
476
477        if new_leader:
478            # form a syncgroup with the new leader
479            await self._form_syncgroup()
480
481            # Restart playback if requested and we have media to play
482            if was_playing:
483                await self.mass.players._handle_cmd_resume(self.player_id)
484        else:
485            # We have no leader anymore, send update since we stopped playback
486            self.update_state()
487
488    def _select_sync_leader(self) -> Player | None:
489        """Select the active sync leader player for a syncgroup."""
490        if self.sync_leader and self.sync_leader.player_id in self.group_members:
491            # Don't change the sync leader if we already have one
492            return self.sync_leader
493        for prefer_sync_leader in (True, False):
494            for child_player in self.mass.players.iter_group_members(self):
495                if prefer_sync_leader and child_player.synced_to:
496                    # prefer the first player that already has sync children
497                    continue
498                if child_player.active_group not in (
499                    None,
500                    self.player_id,
501                    child_player.player_id,
502                ):
503                    # this should not happen (because its already handled in the power on logic),
504                    # but guard it just in case bad things happen
505                    continue
506                return child_player
507        return None
508
509    async def _handle_member_collisions(self, member: Player) -> None:
510        """Handle collisions when adding a member to the sync group."""
511        active_groups = member.active_groups
512        for group in active_groups:
513            if group == self.player_id:
514                continue
515            # collision: child player is part another group that is already active !
516            # solve this by trying to leave the group first
517            if other_group := self.mass.players.get(group):
518                if (
519                    other_group.supports_feature(PlayerFeature.SET_MEMBERS)
520                    and member.player_id not in other_group.static_group_members
521                ):
522                    await other_group.set_members(player_ids_to_remove=[member.player_id])
523                else:
524                    # if the other group does not support SET_MEMBERS or it is a static
525                    # member, we need to power it off to leave the group
526                    await other_group.power(False)
527        if (
528            member.synced_to is not None
529            and self.sync_leader
530            and member.synced_to != self.sync_leader.player_id
531            and (synced_to_player := self.mass.players.get(member.synced_to))
532            and member.player_id in synced_to_player.group_members
533        ):
534            # collision: child player is synced to another player and still in that group
535            # ungroup it first
536            await synced_to_player.set_members(player_ids_to_remove=[member.player_id])
537
538
539class SyncGroupController:
540    """Controller managing SyncGroup players."""
541
542    def __init__(self, player_controller: PlayerController) -> None:
543        """Initialize SyncGroupController."""
544        self.player_controller = player_controller
545        self.mass = player_controller.mass
546
547    async def create_group_player(
548        self, provider: PlayerProvider, name: str, members: list[str], dynamic: bool = True
549    ) -> Player:
550        """
551        Create new SyncGroup Player.
552
553        :param provider: The provider to create the group player for
554        :param name: Name of the group player
555        :param members: List of player ids to add to the group
556        :param dynamic: Whether the group is dynamic (members can change)
557        """
558        # default implementation for providers that support syncing players
559        if ProviderFeature.SYNC_PLAYERS not in provider.supported_features:
560            # the frontend should already prevent this, but just in case
561            raise UnsupportedFeaturedException(
562                f"Provider {provider.name} does not support player syncing!"
563            )
564        # Create a new syncgroup player with the given members
565        members = [x for x in members if x in [y.player_id for y in provider.players]]
566        player_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
567        self.mass.config.create_default_player_config(
568            player_id=player_id,
569            provider=provider.instance_id,
570            player_type=PlayerType.GROUP,
571            name=name,
572            enabled=True,
573            values={
574                CONF_GROUP_MEMBERS: members,
575                CONF_DYNAMIC_GROUP_MEMBERS: dynamic,
576            },
577        )
578        return await self._register_syncgroup_player(player_id, provider)
579
580    async def remove_group_player(self, player_id: str) -> None:
581        """
582        Remove a group player.
583
584        :param player_id: ID of the group player to remove.
585        """
586        # we simply permanently unregister the syncgroup player and wipe its config
587        await self.mass.players.unregister(player_id, True)
588
589    async def _register_syncgroup_player(self, player_id: str, provider: PlayerProvider) -> Player:
590        """Register a syncgroup player."""
591        syncgroup = SyncGroupPlayer(provider, player_id)
592        await self.mass.players.register_or_update(syncgroup)
593        return syncgroup
594
595    async def on_provider_loaded(self, provider: PlayerProvider) -> None:
596        """Handle logic when a provider is loaded."""
597        # register existing syncgroup players for this provider
598        for player_conf in await self.mass.config.get_player_configs(provider.instance_id):
599            if player_conf.player_id.startswith(SYNCGROUP_PREFIX):
600                await self._register_syncgroup_player(player_conf.player_id, provider)
601
602    async def on_provider_unload(self, provider: PlayerProvider) -> None:
603        """Handle logic when a provider is (about to get) unloaded."""
604        # unregister existing syncgroup players for this provider
605        for player in self.mass.players.all(
606            provider_filter=provider.instance_id, return_sync_groups=True
607        ):
608            if player.player_id.startswith(SYNCGROUP_PREFIX):
609                await self.mass.players.unregister(player.player_id, False)
610