/
/
/
1"""Sync Group Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6from typing import TYPE_CHECKING, cast
7
8from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
9from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType
10from music_assistant_models.errors import UnsupportedFeaturedException
11from propcache import under_cached_property as cached_property
12
13from music_assistant.constants import (
14 APPLICATION_NAME,
15 CONF_DYNAMIC_GROUP_MEMBERS,
16 CONF_GROUP_MEMBERS,
17)
18from music_assistant.models.player import DeviceInfo, GroupPlayer, Player, PlayerMedia
19
20from .constants import CONF_ENTRY_SGP_NOTE, EXTRA_FEATURES_FROM_MEMBERS, SUPPORT_DYNAMIC_LEADER
21
22if TYPE_CHECKING:
23 from .provider import SyncGroupProvider
24
25
26class SyncGroupPlayer(GroupPlayer):
27 """Sync Group Player implementation."""
28
29 _attr_type: PlayerType = PlayerType.GROUP
30 sync_leader: Player | None = None
31 """The active sync leader player for this syncgroup."""
32
33 def __init__(
34 self,
35 provider: SyncGroupProvider,
36 player_id: str,
37 ) -> None:
38 """Initialize SyncGroupPlayer instance."""
39 super().__init__(provider, player_id)
40 self._attr_name = self.config.name or self.config.default_name or f"SyncGroup {player_id}"
41 self._attr_available = True
42 self._attr_device_info = DeviceInfo(model=provider.name, manufacturer=APPLICATION_NAME)
43 # Allow grouping with any player that supports syncing
44 # The actual compatibility is checked via can_group_with on each player
45 self._attr_can_group_with = set()
46
47 @cached_property
48 def is_dynamic(self) -> bool:
49 """Return if the player is a dynamic group player."""
50 return bool(self.config.get_value(CONF_DYNAMIC_GROUP_MEMBERS, False))
51
52 async def on_config_updated(self) -> None:
53 """Handle logic when the player is loaded or updated."""
54 # Config is only available after the player was registered
55 self._cache.clear() # clear to prevent loading old is_dynamic
56 default_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
57 if self.is_dynamic:
58 self._attr_static_group_members = []
59 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
60 else:
61 self._attr_static_group_members = default_members.copy()
62 self._attr_supported_features.discard(PlayerFeature.SET_MEMBERS)
63 self._attr_group_members = default_members.copy()
64
65 @cached_property
66 def supported_features(self) -> set[PlayerFeature]:
67 """Return the supported features of the player."""
68 # by default we don't have any features, except play_media
69 # but we can gain some features based on the capabilities of the sync leader
70 # set_members is only supported if it's a dynamic group
71 base_features: set[PlayerFeature] = {PlayerFeature.PLAY_MEDIA}
72 if self.is_dynamic:
73 base_features.add(PlayerFeature.SET_MEMBERS)
74 if not self.sync_leader:
75 return base_features
76 # add features supported by the sync leader
77 for feature in EXTRA_FEATURES_FROM_MEMBERS:
78 if feature in self.sync_leader.state.supported_features:
79 base_features.add(feature)
80 return base_features
81
82 @property
83 def playback_state(self) -> PlaybackState:
84 """Return the current playback state of the player."""
85 return self.sync_leader.state.playback_state if self.sync_leader else PlaybackState.IDLE
86
87 @property
88 def requires_flow_mode(self) -> bool:
89 """Return if the player needs flow mode."""
90 if leader := self.sync_leader:
91 return leader.flow_mode
92 return False
93
94 @property
95 def elapsed_time(self) -> float | None:
96 """Return the elapsed time in (fractional) seconds of the current track (if any)."""
97 return self.sync_leader.state.elapsed_time if self.sync_leader else None
98
99 @property
100 def elapsed_time_last_updated(self) -> float | None:
101 """Return when the elapsed time was last updated."""
102 return self.sync_leader.state.elapsed_time_last_updated if self.sync_leader else None
103
104 @property
105 def can_group_with(self) -> set[str]:
106 """Return the id's of players this player can group with."""
107 if not self.is_dynamic:
108 # in case of static members,
109 # we can only group with the players defined in the config, so we return those directly
110 return set(self._attr_static_group_members)
111 # if we already have a sync leader, we use its can_group_with as reference
112 if self.sync_leader:
113 return {self.sync_leader.player_id, *self.sync_leader.state.can_group_with}
114 # If we have no members, but we do have default members in the config,
115 # we can group with players that are compatible with those
116 default_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
117 for member_id in default_members:
118 member_player = self.mass.players.get_player(member_id)
119 if member_player and member_player.state.available:
120 return {*default_members, *member_player.state.can_group_with}
121 # Dynamic groups can potentially group with any compatible players
122 # Actual compatibility is validated when adding members
123 temp_can_group_with = set()
124 for player in self.mass.players.all_players(return_unavailable=False):
125 if not player.available or player.type == PlayerType.GROUP:
126 # let's avoid showing group players as options to group with
127 continue
128 if (
129 PlayerFeature.SET_MEMBERS in player.state.supported_features
130 and player.state.can_group_with
131 ):
132 temp_can_group_with.add(player.player_id)
133 return temp_can_group_with
134
135 async def get_config_entries(
136 self,
137 action: str | None = None,
138 values: dict[str, ConfigValueType] | None = None,
139 ) -> list[ConfigEntry]:
140 """Return all (provider/player specific) Config Entries for the given player (if any)."""
141 entries: list[ConfigEntry] = [
142 # syncgroup specific entries
143 CONF_ENTRY_SGP_NOTE,
144 ConfigEntry(
145 key=CONF_GROUP_MEMBERS,
146 type=ConfigEntryType.STRING,
147 multi_value=True,
148 label="Group members",
149 default_value=[],
150 description="Select all players you want to be part of this sync group. "
151 "Only compatible players (based on their sync protocol) can be grouped together.",
152 required=False, # needed for dynamic members (which allows empty members list)
153 options=[
154 ConfigValueOption(x.display_name, x.player_id)
155 for x in self.mass.players.all_players(True, False)
156 if x.type != PlayerType.GROUP
157 ],
158 ),
159 ConfigEntry(
160 key=CONF_DYNAMIC_GROUP_MEMBERS,
161 type=ConfigEntryType.BOOLEAN,
162 label="Enable dynamic members",
163 description="Allow (un)joining members dynamically, so the group more or less "
164 "behaves the same like manually syncing players together, "
165 "with the main difference being that the group player will hold the queue.",
166 default_value=False,
167 required=False,
168 ),
169 ]
170 return entries
171
172 async def stop(self) -> None:
173 """Send STOP command to given player."""
174 self._attr_current_media = None
175 self._attr_active_source = None
176 if sync_leader := self.sync_leader:
177 # Use internal handler to bypass group redirect logic and avoid infinite loop
178 # (sync_leader is part of this group, so redirect would loop back here)
179 await self.mass.players._handle_cmd_stop(sync_leader.player_id)
180 # dissolve the sync group since we stopped playback
181 await self._dissolve_syncgroup()
182
183 async def play(self) -> None:
184 """Send PLAY (unpause) command to given player."""
185 if sync_leader := self.sync_leader:
186 # Use internal handler to bypass group redirect logic and avoid infinite loop
187 await self.mass.players._handle_cmd_play(sync_leader.player_id)
188
189 async def play_media(self, media: PlayerMedia) -> None:
190 """Handle PLAY MEDIA on given player."""
191 self._attr_current_media = media
192 self._attr_active_source = media.source_id if media.source_id else None
193 if not self.sync_leader:
194 await self._form_syncgroup()
195 # simply forward the command to the sync leader
196 if sync_leader := self.sync_leader:
197 # Use internal handler to bypass group redirect logic and preserve protocol selection
198 await self.mass.players._handle_play_media(sync_leader.player_id, media)
199 self.update_state()
200 else:
201 raise RuntimeError("An empty group cannot play media, consider adding members first")
202
203 async def enqueue_next_media(self, media: PlayerMedia) -> None:
204 """Handle enqueuing of a next media item on the player."""
205 if sync_leader := self.sync_leader:
206 # Use internal handler to bypass group redirect logic and avoid infinite loop
207 await self.mass.players._handle_enqueue_next_media(sync_leader.player_id, media)
208
209 async def set_members(
210 self,
211 player_ids_to_add: list[str] | None = None,
212 player_ids_to_remove: list[str] | None = None,
213 ) -> None:
214 """Handle SET_MEMBERS command on the player."""
215 if not self.is_dynamic:
216 raise UnsupportedFeaturedException(
217 f"Group {self.display_name} does not allow dynamically adding/removing members!"
218 )
219 prev_leader = self.sync_leader
220 cur_leader = self._select_sync_leader(new_members=player_ids_to_add)
221 # handle additions
222 final_players_to_add: list[str] = []
223 can_group_with = cur_leader.state.can_group_with.copy() if cur_leader else set()
224 for member_id in player_ids_to_add or []:
225 if member_id == self.player_id:
226 continue # can not add self as member
227 member = self.mass.players.get_player(member_id)
228 if member is None or not member.available:
229 continue
230 if member_id not in self._attr_group_members:
231 self._attr_group_members.append(member_id)
232 if not cur_leader:
233 continue
234 if member_id != cur_leader.player_id and member_id not in can_group_with:
235 self.logger.debug(
236 f"Cannot add {member.display_name} to group {self.display_name} since it's "
237 f"not compatible with the current sync leader"
238 )
239 continue
240 if member_id != cur_leader.player_id:
241 final_players_to_add.append(member_id)
242
243 # handle removals
244 final_players_to_remove: list[str] = []
245 for member_id in player_ids_to_remove or []:
246 if member_id not in self._attr_group_members:
247 continue
248 if member_id == self.player_id:
249 raise UnsupportedFeaturedException(
250 f"Cannot remove {self.display_name} from itself as a member!"
251 )
252 self._attr_group_members.remove(member_id)
253 final_players_to_remove.append(member_id)
254 self.update_state()
255 if self.playback_state != PlaybackState.PLAYING:
256 # Don't need to do anything else if the group is not active
257 # The syncing will be done once playback starts
258 return
259 if prev_leader and cur_leader is None:
260 # Edge case: we no longer have any members in the group (and thus no leader)
261 await self._handle_leader_transition(None)
262 elif prev_leader and prev_leader != cur_leader:
263 # Edge case: we had changed the leader (or just got one)
264 await self._handle_leader_transition(cur_leader)
265 elif cur_leader and (player_ids_to_add or player_ids_to_remove):
266 # if the group still has the same leader, we just need to (re)sync the members
267 await self.mass.players.cmd_set_members(
268 cur_leader.player_id,
269 player_ids_to_add=final_players_to_add,
270 player_ids_to_remove=final_players_to_remove,
271 )
272
273 async def _form_syncgroup(self) -> None:
274 """Form syncgroup by syncing all (possible) members."""
275 if not self.sync_leader:
276 self.sync_leader = self._select_sync_leader()
277
278 if not self.sync_leader:
279 # we have no members in the group, so we can't form a syncgroup
280 return
281
282 # ensure the sync leader is first in the list
283 self._attr_group_members = [
284 self.sync_leader.player_id,
285 *[x for x in self._attr_group_members if x != self.sync_leader.player_id],
286 ]
287 members_to_sync = [
288 x
289 for x in self._attr_group_members
290 if x != self.sync_leader.player_id and x not in self.sync_leader.state.group_members
291 ]
292 if members_to_sync:
293 await self.mass.players.cmd_set_members(self.sync_leader.player_id, members_to_sync)
294
295 async def _dissolve_syncgroup(self) -> None:
296 """Dissolve the current syncgroup by ungrouping all members."""
297 if sync_leader := self.sync_leader:
298 # dissolve the temporary syncgroup from the sync leader
299 sync_children = [
300 x for x in sync_leader.state.group_members if x != sync_leader.player_id
301 ]
302 if sync_children:
303 await self.mass.players.cmd_set_members(sync_leader.player_id, [], sync_children)
304 self.sync_leader = None
305 self.update_state()
306
307 async def _handle_leader_transition(self, new_leader: Player | None) -> None:
308 """Handle transition from current leader to new leader."""
309 prev_leader = self.sync_leader
310 was_playing = False
311 if prev_leader and new_leader and prev_leader != new_leader:
312 # Check if the provider(protocol) supports dynamic leader selection
313 # For cross-provider sync groups, we need to check the provider domain
314 provider_protocol = None
315 if prev_leader.active_output_protocol and (
316 proto_prov := self.mass.get_provider(prev_leader.active_output_protocol)
317 ):
318 provider_protocol = proto_prov.domain
319 else:
320 provider_protocol = prev_leader.provider.domain
321
322 if provider_protocol and provider_protocol in SUPPORT_DYNAMIC_LEADER:
323 # TODO: figure out how to handle dynamic leader transition without
324 # stopping playback, which has become complicated due
325 # to a player can support multiple protocols
326 pass
327
328 if prev_leader:
329 # Save current media and playback state for potential restart
330 was_playing = self.playback_state == PlaybackState.PLAYING
331 # Stop current playback (which also dissolves the existing syncgroup)
332 await self.stop()
333 # allow some time to propagate the changes before resyncing
334 await asyncio.sleep(2)
335
336 # Set new leader
337 self.sync_leader = new_leader
338
339 if new_leader:
340 # form a syncgroup with the new leader
341 await self._form_syncgroup()
342 # Restart playback if requested and we have media to play
343 if was_playing:
344 await self.mass.players._handle_cmd_resume(self.player_id)
345 else:
346 # We have no leader anymore, send update since we stopped playback
347 self.update_state()
348
349 def _select_sync_leader(self, new_members: list[str] | None = None) -> Player | None:
350 """Select a (new) sync leader."""
351 if self.group_members and self.sync_leader and self.sync_leader.state.available:
352 # current leader is still available, no need to select a new one
353 return self.sync_leader
354 default_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
355 group_members = self.group_members or default_members or new_members or []
356 for member_id in group_members:
357 member_player = self.mass.players.get_player(member_id)
358 if member_player and member_player.state.available:
359 self.logger.debug(
360 f"Auto-selected {member_player.display_name} as sync leader for "
361 f"group {self.display_name}"
362 )
363 return member_player
364 return None
365