/
/
/
1"""Sync Group Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6from typing import TYPE_CHECKING, cast
7
8from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
9from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType
10from music_assistant_models.errors import UnsupportedFeaturedException
11from propcache import under_cached_property as cached_property
12
13from music_assistant.constants import (
14 APPLICATION_NAME,
15 CONF_DYNAMIC_GROUP_MEMBERS,
16 CONF_GROUP_MEMBERS,
17)
18from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
19
20from .constants import CONF_ENTRY_SGP_NOTE, EXTRA_FEATURES_FROM_MEMBERS
21
22if TYPE_CHECKING:
23 from .provider import SyncGroupProvider
24
25
26class SyncGroupPlayer(Player):
27 """Sync Group Player implementation."""
28
29 _attr_type: PlayerType = PlayerType.GROUP
30 sync_leader: Player | None = None
31 """The active sync leader player for this syncgroup."""
32
33 def __init__(
34 self,
35 provider: SyncGroupProvider,
36 player_id: str,
37 ) -> None:
38 """Initialize SyncGroupPlayer instance."""
39 super().__init__(provider, player_id)
40 self._attr_name = self.config.name or self.config.default_name or f"SyncGroup {player_id}"
41 self._attr_available = True
42 self._attr_device_info = DeviceInfo(model=provider.name, manufacturer=APPLICATION_NAME)
43 # Allow grouping with any player that supports syncing
44 # The actual compatibility is checked via can_group_with on each player
45 self._attr_can_group_with = set()
46
47 @cached_property
48 def is_dynamic(self) -> bool:
49 """Return if the player is a dynamic group player."""
50 return bool(self.config.get_value(CONF_DYNAMIC_GROUP_MEMBERS, False))
51
52 @cached_property
53 def synced_to(self) -> str | None:
54 """Return the id of the player this player is synced to (sync leader)."""
55 # groups can't be synced
56 return None
57
58 async def on_config_updated(self) -> None:
59 """Handle logic when the PlayerConfig is first loaded or updated."""
60 # Config is only available after the player was registered
61 self._cache.clear() # clear to prevent loading old is_dynamic
62 default_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
63 if self.is_dynamic:
64 self._attr_static_group_members = []
65 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
66 else:
67 self._attr_static_group_members = default_members.copy()
68 self._attr_supported_features.discard(PlayerFeature.SET_MEMBERS)
69 self._attr_group_members = default_members.copy()
70
71 @cached_property
72 def supported_features(self) -> set[PlayerFeature]:
73 """Return the supported features of the player."""
74 # by default we don't have any features, except play_media
75 # but we can gain some features based on the capabilities of the sync leader
76 # set_members is only supported if it's a dynamic group
77 base_features: set[PlayerFeature] = {PlayerFeature.PLAY_MEDIA}
78 if self.is_dynamic:
79 base_features.add(PlayerFeature.SET_MEMBERS)
80 if not self.sync_leader:
81 return base_features
82 # add features supported by the sync leader
83 for feature in EXTRA_FEATURES_FROM_MEMBERS:
84 if feature in self.sync_leader.state.supported_features:
85 base_features.add(feature)
86 return base_features
87
88 @property
89 def playback_state(self) -> PlaybackState:
90 """Return the current playback state of the player."""
91 return self.sync_leader.state.playback_state if self.sync_leader else PlaybackState.IDLE
92
93 @property
94 def requires_flow_mode(self) -> bool:
95 """Return if the player needs flow mode."""
96 if leader := self.sync_leader:
97 return leader.flow_mode
98 return False
99
100 @property
101 def elapsed_time(self) -> float | None:
102 """Return the elapsed time in (fractional) seconds of the current track (if any)."""
103 return self.sync_leader.state.elapsed_time if self.sync_leader else None
104
105 @property
106 def elapsed_time_last_updated(self) -> float | None:
107 """Return when the elapsed time was last updated."""
108 return self.sync_leader.state.elapsed_time_last_updated if self.sync_leader else None
109
110 @property
111 def can_group_with(self) -> set[str]:
112 """Return the id's of players this player can group with."""
113 if not self.is_dynamic:
114 # in case of static members,
115 # we can only group with the players defined in the config, so we return those directly
116 return set(self._attr_static_group_members)
117 # if we already have a sync leader, we use its can_group_with as reference
118 if self.sync_leader:
119 return {self.sync_leader.player_id, *self.sync_leader.state.can_group_with}
120 # If we have no members, but we do have default members in the config,
121 # we can group with players that are compatible with those
122 default_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
123 for member_id in default_members:
124 member_player = self.mass.players.get_player(member_id)
125 if member_player and member_player.state.available:
126 return {*default_members, *member_player.state.can_group_with}
127 # Dynamic groups can potentially group with any compatible players
128 # Actual compatibility is validated when adding members
129 temp_can_group_with = set()
130 for player in self.mass.players.all_players(return_unavailable=False):
131 if not player.available or player.type == PlayerType.GROUP:
132 # let's avoid showing group players as options to group with
133 continue
134 if (
135 PlayerFeature.SET_MEMBERS in player.state.supported_features
136 and player.state.can_group_with
137 and not player.state.active_group
138 ):
139 temp_can_group_with.add(player.player_id)
140 return temp_can_group_with
141
142 async def get_config_entries(
143 self,
144 action: str | None = None,
145 values: dict[str, ConfigValueType] | None = None,
146 ) -> list[ConfigEntry]:
147 """Return all (provider/player specific) Config Entries for the given player (if any)."""
148 entries: list[ConfigEntry] = [
149 # syncgroup specific entries
150 CONF_ENTRY_SGP_NOTE,
151 ConfigEntry(
152 key=CONF_GROUP_MEMBERS,
153 type=ConfigEntryType.STRING,
154 multi_value=True,
155 label="Group members",
156 default_value=[],
157 description="Select all players you want to be part of this sync group. "
158 "Only compatible players (based on their sync protocol) can be grouped together.",
159 required=False, # needed for dynamic members (which allows empty members list)
160 options=[
161 ConfigValueOption(x.display_name, x.player_id)
162 for x in self.mass.players.all_players(True, False)
163 if x.type != PlayerType.GROUP
164 ],
165 ),
166 ConfigEntry(
167 key=CONF_DYNAMIC_GROUP_MEMBERS,
168 type=ConfigEntryType.BOOLEAN,
169 label="Enable dynamic members",
170 description="Allow (un)joining members dynamically, so the group more or less "
171 "behaves the same like manually syncing players together, "
172 "with the main difference being that the group player will hold the queue.",
173 default_value=False,
174 required=False,
175 ),
176 ]
177 return entries
178
179 async def stop(self) -> None:
180 """Send STOP command to given player."""
181 self._attr_current_media = None
182 if sync_leader := self.sync_leader:
183 # Use internal handler to bypass group redirect logic and avoid infinite loop
184 # (sync_leader is part of this group, so redirect would loop back here)
185 await self.mass.players._handle_cmd_stop(sync_leader.player_id)
186 # dissolve the sync group since we stopped playback
187 self.mass.call_later(
188 5, self._dissolve_syncgroup, task_id=f"syncgroup_dissolve_{self.player_id}"
189 )
190
191 async def play(self) -> None:
192 """Send PLAY (unpause) command to given player."""
193 await self.mass.players.cmd_resume(
194 self.player_id, self._attr_active_source, self._attr_current_media
195 )
196
197 async def play_media(self, media: PlayerMedia) -> None:
198 """Handle PLAY MEDIA on given player."""
199 self._attr_current_media = media
200 self._attr_active_source = media.source_id if media.source_id else None
201 await self._form_syncgroup()
202 # simply forward the command to the sync leader
203 if sync_leader := self.sync_leader:
204 # Use internal handler to bypass group redirect logic and preserve protocol selection
205 await self.mass.players._handle_play_media(sync_leader.player_id, media)
206 self.update_state()
207 else:
208 raise RuntimeError("An empty group cannot play media, consider adding members first")
209
210 async def enqueue_next_media(self, media: PlayerMedia) -> None:
211 """Handle enqueuing of a next media item on the player."""
212 if sync_leader := self.sync_leader:
213 if PlayerFeature.ENQUEUE not in sync_leader.state.supported_features:
214 # this may happen in race conditions where we just switched sync leaders
215 # and the new leader doesn't support enqueueing next media.
216 return
217 # Use internal handler to bypass group redirect logic and avoid infinite loop
218 await self.mass.players._handle_enqueue_next_media(sync_leader.player_id, media)
219
220 async def set_members(
221 self,
222 player_ids_to_add: list[str] | None = None,
223 player_ids_to_remove: list[str] | None = None,
224 ) -> None:
225 """Handle SET_MEMBERS command on the player."""
226 if not self.is_dynamic:
227 raise UnsupportedFeaturedException(
228 f"Group {self.display_name} does not allow dynamically adding/removing members!"
229 )
230 prev_leader = self.sync_leader
231 was_playing = self.playback_state == PlaybackState.PLAYING
232 needs_restart = False
233 if prev_leader and prev_leader.player_id in (player_ids_to_remove or []):
234 # We're removing the current sync leader while the group is active
235 # We need to select a new leader before we can handle the member changes
236 self.logger.debug(
237 "Removing current sync leader %s from group %s while it is active, "
238 "selecting a new leader and dissolving the current syncgroup",
239 prev_leader.display_name,
240 self.display_name,
241 )
242 if was_playing:
243 await self.mass.players._handle_cmd_stop(prev_leader.player_id)
244 await asyncio.sleep(1)
245 await self._dissolve_syncgroup()
246 await asyncio.sleep(2)
247 needs_restart = was_playing
248
249 cur_leader = self._select_sync_leader(new_members=player_ids_to_add)
250 # handle additions
251 final_players_to_add: list[str] = []
252 can_group_with = cur_leader.state.can_group_with.copy() if cur_leader else set()
253 for member_id in player_ids_to_add or []:
254 if member_id == self.player_id:
255 continue # can not add self as member
256 member = self.mass.players.get_player(member_id)
257 if member is None or not member.available:
258 continue
259 if member_id not in self._attr_group_members:
260 self._attr_group_members.append(member_id)
261 if not cur_leader:
262 continue
263 if member_id != cur_leader.player_id and member_id not in can_group_with:
264 self.logger.debug(
265 f"Cannot add {member.display_name} to group {self.display_name} since it's "
266 f"not compatible with the current sync leader"
267 )
268 continue
269 if member_id != cur_leader.player_id:
270 final_players_to_add.append(member_id)
271
272 # handle removals
273 final_players_to_remove: list[str] = []
274 for member_id in player_ids_to_remove or []:
275 if member_id not in self._attr_group_members:
276 continue
277 if member_id == self.player_id:
278 raise UnsupportedFeaturedException(
279 f"Cannot remove {self.display_name} from itself as a member!"
280 )
281 self._attr_group_members.remove(member_id)
282 final_players_to_remove.append(member_id)
283 self.update_state()
284 if needs_restart:
285 await self.play()
286 return
287 if not was_playing:
288 # Don't need to do anything else if the group is not active
289 # The syncing will be done once playback starts
290 return
291 if cur_leader:
292 await self.mass.players.cmd_set_members(
293 cur_leader.player_id,
294 player_ids_to_add=final_players_to_add,
295 player_ids_to_remove=final_players_to_remove,
296 )
297
298 async def _form_syncgroup(self) -> None:
299 """Form syncgroup by syncing all (possible) members."""
300 self.mass.cancel_timer(f"syncgroup_dissolve_{self.player_id}")
301 if not self.sync_leader:
302 self.sync_leader = self._select_sync_leader()
303
304 if not self.sync_leader:
305 # we have no members in the group, so we can't form a syncgroup
306 return
307
308 # ensure the sync leader is first in the list
309 self._attr_group_members = [
310 self.sync_leader.player_id,
311 *[x for x in self._attr_group_members if x != self.sync_leader.player_id],
312 ]
313 members_to_sync = [
314 x
315 for x in self._attr_group_members
316 if x != self.sync_leader.player_id and x not in self.sync_leader.state.group_members
317 ]
318 if members_to_sync:
319 await self.mass.players.cmd_set_members(self.sync_leader.player_id, members_to_sync)
320
321 async def _dissolve_syncgroup(self) -> None:
322 """Dissolve the current syncgroup by ungrouping all members."""
323 if sync_leader := self.sync_leader:
324 # dissolve the temporary syncgroup from the sync leader
325 sync_children = [
326 x for x in sync_leader.state.group_members if x != sync_leader.player_id
327 ]
328 if sync_children:
329 await self.mass.players.cmd_set_members(sync_leader.player_id, [], sync_children)
330 self.sync_leader = None
331 self.update_state()
332
333 def _select_sync_leader(self, new_members: list[str] | None = None) -> Player | None:
334 """Select a (new) sync leader."""
335 if self.group_members and self.sync_leader and self.sync_leader.state.available:
336 # current leader is still available, no need to select a new one
337 return self.sync_leader
338 default_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
339 group_members = self.group_members or default_members or new_members or []
340 for member_id in group_members:
341 member_player = self.mass.players.get_player(member_id)
342 if member_player and member_player.state.available:
343 self.logger.debug(
344 f"Auto-selected {member_player.display_name} as sync leader for "
345 f"group {self.display_name}"
346 )
347 return member_player
348 return None
349