/
/
/
1"""Sync Group Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6from typing import TYPE_CHECKING, cast
7
8from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
9from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType
10from music_assistant_models.errors import UnsupportedFeaturedException
11from propcache import under_cached_property as cached_property
12
13from music_assistant.constants import (
14 APPLICATION_NAME,
15 CONF_DYNAMIC_GROUP_MEMBERS,
16 CONF_GROUP_MEMBERS,
17)
18from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
19
20from .constants import CONF_ENTRY_SGP_NOTE, EXTRA_FEATURES_FROM_MEMBERS
21
22if TYPE_CHECKING:
23 from .provider import SyncGroupProvider
24
25
26class SyncGroupPlayer(Player):
27 """Sync Group Player implementation."""
28
29 _attr_type: PlayerType = PlayerType.GROUP
30 sync_leader: Player | None = None
31 """The active sync leader player for this syncgroup."""
32
33 def __init__(
34 self,
35 provider: SyncGroupProvider,
36 player_id: str,
37 ) -> None:
38 """Initialize SyncGroupPlayer instance."""
39 super().__init__(provider, player_id)
40 self._attr_name = self.config.name or self.config.default_name or f"SyncGroup {player_id}"
41 self._attr_available = True
42 self._attr_device_info = DeviceInfo(model=provider.name, manufacturer=APPLICATION_NAME)
43 # Allow grouping with any player that supports syncing
44 # The actual compatibility is checked via can_group_with on each player
45 self._attr_can_group_with = set()
46
47 @cached_property
48 def is_dynamic(self) -> bool:
49 """Return if the player is a dynamic group player."""
50 return bool(self.config.get_value(CONF_DYNAMIC_GROUP_MEMBERS, False))
51
52 @cached_property
53 def synced_to(self) -> str | None:
54 """Return the id of the player this player is synced to (sync leader)."""
55 # groups can't be synced
56 return None
57
58 async def on_config_updated(self) -> None:
59 """Handle logic when the PlayerConfig is first loaded or updated."""
60 # Config is only available after the player was registered
61 self._cache.clear() # clear to prevent loading old is_dynamic
62 static_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
63 self._attr_static_group_members = static_members.copy()
64 if self.is_dynamic:
65 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
66 else:
67 self._attr_supported_features.discard(PlayerFeature.SET_MEMBERS)
68 self._attr_group_members = static_members.copy()
69
70 @cached_property
71 def supported_features(self) -> set[PlayerFeature]:
72 """Return the supported features of the player."""
73 # by default we don't have any features, except play_media
74 # but we can gain some features based on the capabilities of the sync leader
75 # set_members is only supported if it's a dynamic group
76 base_features: set[PlayerFeature] = {PlayerFeature.PLAY_MEDIA}
77 if self.is_dynamic:
78 base_features.add(PlayerFeature.SET_MEMBERS)
79 if not self.sync_leader:
80 return base_features
81 # add features supported by the sync leader
82 for feature in EXTRA_FEATURES_FROM_MEMBERS:
83 if feature in self.sync_leader.state.supported_features:
84 base_features.add(feature)
85 return base_features
86
87 @property
88 def playback_state(self) -> PlaybackState:
89 """Return the current playback state of the player."""
90 return self.sync_leader.state.playback_state if self.sync_leader else PlaybackState.IDLE
91
92 @property
93 def requires_flow_mode(self) -> bool:
94 """Return if the player needs flow mode."""
95 if leader := self.sync_leader:
96 return leader.flow_mode
97 return False
98
99 @property
100 def elapsed_time(self) -> float | None:
101 """Return the elapsed time in (fractional) seconds of the current track (if any)."""
102 return self.sync_leader.state.elapsed_time if self.sync_leader else None
103
104 @property
105 def elapsed_time_last_updated(self) -> float | None:
106 """Return when the elapsed time was last updated."""
107 return self.sync_leader.state.elapsed_time_last_updated if self.sync_leader else None
108
109 @property
110 def can_group_with(self) -> set[str]:
111 """Return the id's of players this player can group with."""
112 if not self.is_dynamic:
113 # in case of static members,
114 # we can only group with the players defined in the config, so we return those directly
115 return set(self._attr_static_group_members)
116 # if we already have a sync leader, we use its can_group_with as reference
117 if self.sync_leader:
118 return {self.sync_leader.player_id, *self.sync_leader.state.can_group_with}
119 # If we have no syncleader, but we do have group members
120 # grab 'can_group_with' from the first available member
121 for member_id in self._attr_group_members:
122 member_player = self.mass.players.get_player(member_id)
123 if member_player and member_player.state.available:
124 return {*self._attr_group_members, *member_player.state.can_group_with}
125 # Dynamic groups can potentially group with any compatible players
126 # Actual compatibility is validated when adding members
127 temp_can_group_with = set()
128 for player in self.mass.players.all_players(return_unavailable=False):
129 if not player.available or player.type == PlayerType.GROUP:
130 # let's avoid showing group players as options to group with
131 continue
132 if (
133 PlayerFeature.SET_MEMBERS in player.state.supported_features
134 and player.state.can_group_with
135 and not player.state.active_group
136 ):
137 temp_can_group_with.add(player.player_id)
138 return temp_can_group_with
139
140 async def get_config_entries(
141 self,
142 action: str | None = None,
143 values: dict[str, ConfigValueType] | None = None,
144 ) -> list[ConfigEntry]:
145 """Return all (provider/player specific) Config Entries for the given player (if any)."""
146 entries: list[ConfigEntry] = [
147 # syncgroup specific entries
148 CONF_ENTRY_SGP_NOTE,
149 ConfigEntry(
150 key=CONF_GROUP_MEMBERS,
151 type=ConfigEntryType.STRING,
152 multi_value=True,
153 label="Group members",
154 default_value=[],
155 description="Select all players you want to be part of this sync group. "
156 "Only compatible players (based on their sync protocol) can be grouped together.",
157 required=False, # needed for dynamic members (which allows empty members list)
158 options=[
159 ConfigValueOption(x.display_name, x.player_id)
160 for x in self.mass.players.all_players(True, False)
161 if x.type != PlayerType.GROUP
162 ],
163 ),
164 ConfigEntry(
165 key=CONF_DYNAMIC_GROUP_MEMBERS,
166 type=ConfigEntryType.BOOLEAN,
167 label="Enable dynamic members",
168 description="Allow (un)joining members dynamically, so the group more or less "
169 "behaves the same like manually syncing players together, "
170 "with the main difference being that the group player will hold the queue.",
171 default_value=False,
172 required=False,
173 ),
174 ]
175 return entries
176
177 async def stop(self) -> None:
178 """Send STOP command to given player."""
179 self._attr_current_media = None
180 if sync_leader := self.sync_leader:
181 # Use internal handler to bypass group redirect logic and avoid infinite loop
182 # (sync_leader is part of this group, so redirect would loop back here)
183 await self.mass.players._handle_cmd_stop(sync_leader.player_id)
184 # dissolve the sync group since we stopped playback
185 self.mass.call_later(
186 5, self._dissolve_syncgroup, task_id=f"syncgroup_dissolve_{self.player_id}"
187 )
188
189 async def play(self) -> None:
190 """Send PLAY (unpause) command to given player."""
191 await self.mass.players.cmd_resume(
192 self.player_id, self._attr_active_source, self._attr_current_media
193 )
194
195 async def play_media(self, media: PlayerMedia) -> None:
196 """Handle PLAY MEDIA on given player."""
197 self._attr_current_media = media
198 self._attr_active_source = media.source_id if media.source_id else None
199 await self._form_syncgroup()
200 # simply forward the command to the sync leader
201 if sync_leader := self.sync_leader:
202 # Use internal handler to bypass group redirect logic and preserve protocol selection
203 await self.mass.players._handle_play_media(sync_leader.player_id, media)
204 self.update_state()
205 else:
206 raise RuntimeError("An empty group cannot play media, consider adding members first")
207
208 async def enqueue_next_media(self, media: PlayerMedia) -> None:
209 """Handle enqueuing of a next media item on the player."""
210 if sync_leader := self.sync_leader:
211 if PlayerFeature.ENQUEUE not in sync_leader.state.supported_features:
212 # this may happen in race conditions where we just switched sync leaders
213 # and the new leader doesn't support enqueueing next media.
214 return
215 # Use internal handler to bypass group redirect logic and avoid infinite loop
216 await self.mass.players._handle_enqueue_next_media(sync_leader.player_id, media)
217
218 async def set_members(
219 self,
220 player_ids_to_add: list[str] | None = None,
221 player_ids_to_remove: list[str] | None = None,
222 ) -> None:
223 """Handle SET_MEMBERS command on the player."""
224 if not self.is_dynamic:
225 raise UnsupportedFeaturedException(
226 f"Group {self.display_name} does not allow dynamically adding/removing members!"
227 )
228 prev_leader = self.sync_leader
229 was_playing = self.playback_state == PlaybackState.PLAYING
230 needs_restart = False
231 if prev_leader and prev_leader.player_id in (player_ids_to_remove or []):
232 # We're removing the current sync leader while the group is active
233 # We need to select a new leader before we can handle the member changes
234 self.logger.info(
235 "Removing current sync leader %s from group %s while it is active, "
236 "dissolving the current syncgroup and will re-form it with a new leader",
237 prev_leader.display_name,
238 self.display_name,
239 )
240 if was_playing:
241 await self.mass.players._handle_cmd_stop(prev_leader.player_id)
242 await asyncio.sleep(1)
243 await self._dissolve_syncgroup()
244 await asyncio.sleep(2)
245 needs_restart = was_playing
246
247 cur_leader = self._select_sync_leader(new_members=player_ids_to_add)
248 # handle additions
249 final_players_to_add: list[str] = []
250 can_group_with = cur_leader.state.can_group_with.copy() if cur_leader else set()
251 for member_id in player_ids_to_add or []:
252 if member_id == self.player_id:
253 continue # can not add self as member
254 member = self.mass.players.get_player(member_id)
255 if member is None or not member.available:
256 continue
257 if member_id not in self._attr_group_members:
258 self._attr_group_members.append(member_id)
259 if not cur_leader:
260 continue
261 if member_id != cur_leader.player_id and member_id not in can_group_with:
262 self.logger.debug(
263 f"Cannot add {member.display_name} to group {self.display_name} since it's "
264 f"not compatible with the current sync leader"
265 )
266 continue
267 if member_id != cur_leader.player_id:
268 final_players_to_add.append(member_id)
269
270 # handle removals
271 final_players_to_remove: list[str] = []
272 for member_id in player_ids_to_remove or []:
273 if member_id not in self._attr_group_members:
274 continue
275 if member_id == self.player_id:
276 raise UnsupportedFeaturedException(
277 f"Cannot remove {self.display_name} from itself as a member!"
278 )
279 self._attr_group_members.remove(member_id)
280 final_players_to_remove.append(member_id)
281 self.update_state()
282 if needs_restart:
283 await self.play()
284 return
285 if not was_playing or not cur_leader:
286 # Don't need to do anything else if the group is not active
287 # The syncing will be done once playback starts
288 return
289 await self.mass.players.cmd_set_members(
290 cur_leader.player_id,
291 player_ids_to_add=final_players_to_add,
292 player_ids_to_remove=final_players_to_remove,
293 )
294
295 async def _form_syncgroup(self) -> None:
296 """Form syncgroup by syncing all (possible) members."""
297 self.mass.cancel_timer(f"syncgroup_dissolve_{self.player_id}")
298 if not self.sync_leader:
299 self.sync_leader = self._select_sync_leader()
300
301 if not self.sync_leader:
302 # we have no members in the group, so we can't form a syncgroup
303 return
304
305 # ensure the sync leader is first in the list
306 self._attr_group_members = [
307 self.sync_leader.player_id,
308 *[x for x in self._attr_group_members if x != self.sync_leader.player_id],
309 ]
310 members_to_sync = [
311 x
312 for x in self._attr_group_members
313 if x != self.sync_leader.player_id and x not in self.sync_leader.state.group_members
314 ]
315 if members_to_sync:
316 # If the sync leader is playing something independently, stop it first
317 # to prevent protocol switching from trying to resume the previous playback
318 # (we're about to start new playback on the syncgroup)
319 if self.sync_leader.state.playback_state == PlaybackState.PLAYING:
320 await self.mass.players._handle_cmd_stop(self.sync_leader.player_id)
321 await self.mass.players.cmd_set_members(self.sync_leader.player_id, members_to_sync)
322
323 async def _dissolve_syncgroup(self) -> None:
324 """Dissolve the current syncgroup by ungrouping all members."""
325 if sync_leader := self.sync_leader:
326 # dissolve the temporary syncgroup from the sync leader
327 sync_children = [
328 x for x in sync_leader.state.group_members if x != sync_leader.player_id
329 ]
330 if sync_children:
331 await self.mass.players.cmd_set_members(sync_leader.player_id, [], sync_children)
332 self.sync_leader = None
333 self.update_state()
334
335 def _select_sync_leader(self, new_members: list[str] | None = None) -> Player | None:
336 """Select a (new) sync leader."""
337 if self.group_members and self.sync_leader and self.sync_leader.state.available:
338 # current leader is still available, no need to select a new one
339 return self.sync_leader
340 group_members = self.group_members or new_members or []
341 for member_id in group_members:
342 member_player = self.mass.players.get_player(member_id)
343 if member_player and member_player.state.available:
344 self.logger.debug(
345 f"Auto-selected {member_player.display_name} as sync leader for "
346 f"group {self.display_name}"
347 )
348 return member_player
349 return None
350