/
/
/
1"""
2Controller for (provider specific) SyncGroup players.
3
4A SyncGroup player is a virtual player that automatically groups multiple players
5together in a sync group, where one player is the sync leader
6and the other players are synced to that leader.
7"""
8
9from __future__ import annotations
10
11import asyncio
12from copy import deepcopy
13from typing import TYPE_CHECKING, cast
14
15import shortuuid
16from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
17from music_assistant_models.constants import PLAYER_CONTROL_NONE
18from music_assistant_models.enums import (
19 ConfigEntryType,
20 PlaybackState,
21 PlayerFeature,
22 PlayerType,
23 ProviderFeature,
24)
25from music_assistant_models.errors import UnsupportedFeaturedException
26from music_assistant_models.player import DeviceInfo, PlayerMedia, PlayerSource
27from propcache import under_cached_property as cached_property
28
29from music_assistant.constants import (
30 CONF_CROSSFADE_DURATION,
31 CONF_DYNAMIC_GROUP_MEMBERS,
32 CONF_ENABLE_ICY_METADATA,
33 CONF_FLOW_MODE,
34 CONF_GROUP_MEMBERS,
35 CONF_HTTP_PROFILE,
36 CONF_OUTPUT_CODEC,
37 CONF_SAMPLE_RATES,
38 CONF_SMART_FADES_MODE,
39 SYNCGROUP_PREFIX,
40)
41from music_assistant.models.player import GroupPlayer, Player
42
43if TYPE_CHECKING:
44 from music_assistant.models.player_provider import PlayerProvider
45
46 from .player_controller import PlayerController
47
48
49SUPPORT_DYNAMIC_LEADER = {
50 # providers that support dynamic leader selection in a syncgroup
51 # meaning that if you would remove the current leader from the group,
52 # the provider will automatically select a new leader from the remaining members
53 # and the music keeps playing uninterrupted.
54 "airplay",
55 "squeezelite",
56 "snapcast",
57 # TODO: Get this working with Sonos as well (need to handle range requests)
58}
59
60OPTIONAL_FEATURES = {
61 PlayerFeature.ENQUEUE,
62 PlayerFeature.GAPLESS_PLAYBACK,
63 PlayerFeature.NEXT_PREVIOUS,
64 PlayerFeature.PAUSE,
65 PlayerFeature.PLAY_ANNOUNCEMENT,
66 PlayerFeature.SEEK,
67 PlayerFeature.SELECT_SOURCE,
68 PlayerFeature.VOLUME_MUTE,
69 PlayerFeature.MULTI_DEVICE_DSP,
70}
71
72
73class SyncGroupPlayer(GroupPlayer):
74 """Helper class for a (provider specific) SyncGroup player."""
75
76 _attr_type: PlayerType = PlayerType.GROUP
77 sync_leader: Player | None = None
78 """The active sync leader player for this syncgroup."""
79
80 @cached_property
81 def is_dynamic(self) -> bool:
82 """Return if the player is a dynamic group player."""
83 return bool(self.config.get_value(CONF_DYNAMIC_GROUP_MEMBERS, False))
84
85 def __init__(
86 self,
87 provider: PlayerProvider,
88 player_id: str,
89 ) -> None:
90 """Initialize GroupPlayer instance."""
91 super().__init__(provider, player_id)
92 self._attr_name = self.config.name or self.config.default_name or f"SyncGroup {player_id}"
93 self._attr_available = True
94 self._attr_powered = False # group players are always powered off by default
95 self._attr_device_info = DeviceInfo(model="Sync Group", manufacturer=provider.name)
96 self._attr_supported_features = {
97 PlayerFeature.POWER,
98 PlayerFeature.VOLUME_SET,
99 }
100
101 async def on_config_updated(self) -> None:
102 """Handle logic when the player is loaded or updated."""
103 # Config is only available after the player was registered
104 self._cache.clear() # clear to prevent loading old is_dynamic
105 static_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
106 if self.is_dynamic:
107 self._attr_static_group_members = []
108 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
109 else:
110 self._attr_static_group_members = static_members.copy()
111 self._attr_supported_features.discard(PlayerFeature.SET_MEMBERS)
112 if not self.powered:
113 self._attr_group_members = static_members.copy()
114
115 @property
116 def supported_features(self) -> set[PlayerFeature]:
117 """Return the supported features of the player."""
118 members = self.group_members
119 reference_player: Player | None = self.sync_leader or (
120 self.mass.players.get(members[0]) if members else None
121 )
122 if reference_player:
123 base_features = self._attr_supported_features.copy()
124 # add features supported by the sync leader
125 for feature in OPTIONAL_FEATURES:
126 if feature in reference_player.supported_features:
127 base_features.add(feature)
128 return base_features
129 return self._attr_supported_features
130
131 @property
132 def playback_state(self) -> PlaybackState:
133 """Return the current playback state of the player."""
134 if self.powered:
135 return self.sync_leader.playback_state if self.sync_leader else PlaybackState.IDLE
136 return PlaybackState.IDLE
137
138 @property
139 def requires_flow_mode(self) -> bool:
140 """Return if the player needs flow mode."""
141 if leader := self.sync_leader:
142 return leader.requires_flow_mode
143 return False
144
145 @property
146 def elapsed_time(self) -> float | None:
147 """Return the elapsed time in (fractional) seconds of the current track (if any)."""
148 return self.sync_leader.elapsed_time if self.sync_leader else None
149
150 @property
151 def elapsed_time_last_updated(self) -> float | None:
152 """Return when the elapsed time was last updated."""
153 return self.sync_leader.elapsed_time_last_updated if self.sync_leader else None
154
155 @property
156 def _current_media(self) -> PlayerMedia | None:
157 """Return the current media item (if any) loaded in the player."""
158 return self.sync_leader._current_media if self.sync_leader else self._attr_current_media
159
160 @property
161 def _active_source(self) -> str | None:
162 """Return the active source id (if any) of the player."""
163 return self.sync_leader._active_source if self.sync_leader else self._attr_active_source
164
165 @property
166 def _source_list(self) -> list[PlayerSource]:
167 """Return list of available (native) sources for this player."""
168 if self.sync_leader:
169 return self.sync_leader._source_list
170 return []
171
172 @property
173 def can_group_with(self) -> set[str]:
174 """
175 Return the id's of players this player can group with.
176
177 This should return set of player_id's this player can group/sync with
178 or just the provider's instance_id if all players can group with each other.
179 """
180 if self.is_dynamic and (leader := self.sync_leader):
181 return leader.can_group_with
182 if self.is_dynamic:
183 return {self.provider.instance_id}
184 return set()
185
186 async def get_config_entries(
187 self,
188 action: str | None = None,
189 values: dict[str, ConfigValueType] | None = None,
190 ) -> list[ConfigEntry]:
191 """Return all (provider/player specific) Config Entries for the given player (if any)."""
192 entries: list[ConfigEntry] = [
193 # syncgroup specific entries
194 ConfigEntry(
195 key=CONF_GROUP_MEMBERS,
196 type=ConfigEntryType.STRING,
197 multi_value=True,
198 label="Group members",
199 default_value=[],
200 description="Select all players you want to be part of this group",
201 required=False, # needed for dynamic members (which allows empty members list)
202 options=[
203 ConfigValueOption(x.display_name, x.player_id)
204 for x in self.provider.players
205 if x.type != PlayerType.GROUP
206 ],
207 ),
208 ConfigEntry(
209 key="dynamic_members",
210 type=ConfigEntryType.BOOLEAN,
211 label="Enable dynamic members",
212 description="Allow (un)joining members dynamically, so the group more or less "
213 "behaves the same like manually syncing players together, "
214 "with the main difference being that the group player will hold the queue.",
215 default_value=False,
216 required=False,
217 ),
218 ]
219 # combine base group entries with (base) player entries for this player type
220 child_player = next((x for x in self.provider.players if x.type == PlayerType.PLAYER), None)
221 if child_player:
222 allowed_conf_entries = (
223 CONF_HTTP_PROFILE,
224 CONF_ENABLE_ICY_METADATA,
225 CONF_CROSSFADE_DURATION,
226 CONF_OUTPUT_CODEC,
227 CONF_FLOW_MODE,
228 CONF_SAMPLE_RATES,
229 CONF_SMART_FADES_MODE,
230 )
231 child_config_entries = await child_player.get_config_entries()
232 entries.extend(
233 [entry for entry in child_config_entries if entry.key in allowed_conf_entries]
234 )
235 return entries
236
237 async def stop(self) -> None:
238 """Send STOP command to given player."""
239 if sync_leader := self.sync_leader:
240 await sync_leader.stop()
241
242 async def play(self) -> None:
243 """Send PLAY command to given player."""
244 if sync_leader := self.sync_leader:
245 await sync_leader.play()
246
247 async def pause(self) -> None:
248 """Send PAUSE command to given player."""
249 if sync_leader := self.sync_leader:
250 await sync_leader.pause()
251
252 async def power(self, powered: bool) -> None:
253 """Handle POWER command to group player."""
254 prev_power = self._attr_powered
255
256 # always stop at power off
257 if not powered and self.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
258 await self.stop()
259 self._attr_current_media = None
260
261 # optimistically set the group state
262 self._attr_powered = powered
263 if prev_power != powered:
264 self.update_state()
265
266 if powered:
267 # ensure static members are present when powering on
268 for static_group_member in self._attr_static_group_members:
269 member_player = self.mass.players.get(static_group_member)
270 if not member_player or not member_player.available or not member_player.enabled:
271 if static_group_member in self._attr_group_members:
272 self._attr_group_members.remove(static_group_member)
273 continue
274 if static_group_member not in self._attr_group_members:
275 # Always add static members when power(true) is called,
276 # this will ensure that static members that just became available are added
277 self._attr_group_members.append(static_group_member)
278 # Select sync leader and handle turn on
279 new_leader = self._select_sync_leader()
280 # handle TURN_ON of the group player by turning on all members
281 for member in self.mass.players.iter_group_members(
282 self, only_powered=False, active_only=False
283 ):
284 await self._handle_member_collisions(member)
285 if not member.powered and member.power_control != PLAYER_CONTROL_NONE:
286 await self.mass.players._handle_cmd_power(member.player_id, True)
287 # Set up the sync group with the new leader
288 if prev_power and new_leader == self.sync_leader:
289 # Already powered on with same leader, just re-sync members without full transition
290 await self._form_syncgroup()
291 else:
292 await self._handle_leader_transition(new_leader)
293 elif prev_power and not powered:
294 # handle TURN_OFF of the group player by dissolving group and turning off all members
295 await self._dissolve_syncgroup()
296 # turn off all group members
297 for member in self.mass.players.iter_group_members(
298 self, only_powered=True, active_only=True
299 ):
300 if member.powered and member.power_control != PLAYER_CONTROL_NONE:
301 await self.mass.players._handle_cmd_power(member.player_id, False)
302
303 if not powered:
304 # Reset to unfiltered static members list when powered off
305 # (the frontend will hide unavailable members)
306 self._attr_group_members = self._attr_static_group_members.copy()
307 # and clear the sync leader
308 self.sync_leader = None
309 self.update_state()
310
311 async def volume_set(self, volume_level: int) -> None:
312 """Send VOLUME_SET command to given player."""
313 # group volume is already handled in the player manager
314
315 async def play_media(self, media: PlayerMedia) -> None:
316 """Handle PLAY MEDIA on given player."""
317 # power on (which will also resync and add static members if needed)
318 await self.power(True)
319 # simply forward the command to the sync leader
320 if sync_leader := self.sync_leader:
321 await sync_leader.play_media(media)
322 self._attr_current_media = deepcopy(media)
323 self.update_state()
324 else:
325 raise RuntimeError("an empty group cannot play media, consider adding members first")
326
327 async def enqueue_next_media(self, media: PlayerMedia) -> None:
328 """Handle enqueuing of a next media item on the player."""
329 if sync_leader := self.sync_leader:
330 await sync_leader.enqueue_next_media(media)
331
332 async def select_source(self, source: str) -> None:
333 """
334 Handle SELECT SOURCE command on the player.
335
336 Will only be called if the PlayerFeature.SELECT_SOURCE is supported.
337
338 :param source: The source(id) to select, as defined in the source_list.
339 """
340 if sync_leader := self.sync_leader:
341 await sync_leader.select_source(source)
342 self.update_state()
343
344 async def set_members(
345 self,
346 player_ids_to_add: list[str] | None = None,
347 player_ids_to_remove: list[str] | None = None,
348 ) -> None:
349 """Handle SET_MEMBERS command on the player."""
350 if not self.is_dynamic:
351 raise UnsupportedFeaturedException(
352 f"Group {self.display_name} does not allow dynamically adding/removing members!"
353 )
354 # handle additions
355 final_players_to_add: list[str] = []
356 for player_id in player_ids_to_add or []:
357 if player_id in self._attr_group_members:
358 continue
359 if player_id == self.player_id:
360 raise UnsupportedFeaturedException(
361 f"Cannot add {self.display_name} to itself as a member!"
362 )
363 self._attr_group_members.append(player_id)
364 final_players_to_add.append(player_id)
365 # handle removals
366 final_players_to_remove: list[str] = []
367 for player_id in player_ids_to_remove or []:
368 if player_id not in self._attr_group_members:
369 continue
370 if player_id == self.player_id:
371 raise UnsupportedFeaturedException(
372 f"Cannot remove {self.display_name} from itself as a member!"
373 )
374 self._attr_group_members.remove(player_id)
375 final_players_to_remove.append(player_id)
376 self.update_state()
377 if not self.powered:
378 # Don't need to do anything else if the group is powered off
379 # The syncing will be done once powered on
380 return
381 next_leader = self._select_sync_leader()
382 prev_leader = self.sync_leader
383
384 if prev_leader and next_leader is None:
385 # Edge case: we no longer have any members in the group (and thus no leader)
386 await self._handle_leader_transition(None)
387 elif prev_leader != next_leader:
388 # Edge case: we had changed the leader (or just got one)
389 await self._handle_leader_transition(next_leader)
390 elif self.sync_leader and (player_ids_to_add or player_ids_to_remove):
391 # if the group still has the same leader, we need to (re)sync the members
392 # Handle collisions for newly added players
393 for player_id in final_players_to_add:
394 if player := self.mass.players.get(player_id):
395 await self._handle_member_collisions(player)
396
397 await self.sync_leader.set_members(
398 player_ids_to_add=final_players_to_add,
399 player_ids_to_remove=final_players_to_remove,
400 )
401
402 async def _form_syncgroup(self) -> None:
403 """Form syncgroup by syncing all (possible) members."""
404 if self.sync_leader is None:
405 # This is an empty group, leader will be selected once a member is added
406 self._attr_group_members = []
407 self.update_state()
408 return
409 # ensure the sync leader is first in the list
410 self._attr_group_members = [
411 self.sync_leader.player_id,
412 *[x for x in self._attr_group_members if x != self.sync_leader.player_id],
413 ]
414 self.update_state()
415 members_to_sync: list[str] = []
416 members_to_remove: list[str] = []
417 for member in self.mass.players.iter_group_members(self, active_only=False):
418 # Handle collisions before attempting to sync
419 await self._handle_member_collisions(member)
420
421 if member.synced_to and member.synced_to != self.sync_leader.player_id:
422 # ungroup first
423 await member.ungroup()
424 if member.player_id == self.sync_leader.player_id:
425 # skip sync leader
426 continue
427 # Always add to members_to_sync to prevent them from being removed below
428 members_to_sync.append(member.player_id)
429 for former_members in self.sync_leader.group_members:
430 if (
431 former_members not in members_to_sync
432 ) and former_members != self.sync_leader.player_id:
433 members_to_remove.append(former_members)
434 if members_to_sync or members_to_remove:
435 await self.sync_leader.set_members(members_to_sync, members_to_remove)
436
437 async def _dissolve_syncgroup(self) -> None:
438 """Dissolve the current syncgroup by ungrouping all members and restoring leader queue."""
439 if sync_leader := self.sync_leader:
440 # dissolve the temporary syncgroup from the sync leader
441 sync_children = [x for x in sync_leader.group_members if x != sync_leader.player_id]
442 if sync_children:
443 await sync_leader.set_members(player_ids_to_remove=sync_children)
444 # Reset the leaders queue since it is no longer part of this group
445 sync_leader.update_state()
446
447 async def _handle_leader_transition(self, new_leader: Player | None) -> None:
448 """Handle transition from current leader to new leader."""
449 prev_leader = self.sync_leader
450 was_playing = False
451
452 if (
453 prev_leader
454 and new_leader
455 and prev_leader != new_leader
456 and self.provider.domain in SUPPORT_DYNAMIC_LEADER
457 ):
458 # provider supports dynamic leader selection, so just remove/add members
459 await prev_leader.ungroup()
460 self.sync_leader = new_leader
461 # allow some time to propagate the changes before resyncing
462 await asyncio.sleep(2)
463 await self._form_syncgroup()
464 return
465
466 if prev_leader:
467 # Save current media and playback state for potential restart
468 was_playing = self.playback_state == PlaybackState.PLAYING
469 # Stop current playback and dissolve existing group
470 await self.stop()
471 await self._dissolve_syncgroup()
472 # allow some time to propagate the changes before resyncing
473 await asyncio.sleep(2)
474
475 # Set new leader
476 self.sync_leader = new_leader
477
478 if new_leader:
479 # form a syncgroup with the new leader
480 await self._form_syncgroup()
481
482 # Restart playback if requested and we have media to play
483 if was_playing:
484 await self.mass.players._handle_cmd_resume(self.player_id)
485 else:
486 # We have no leader anymore, send update since we stopped playback
487 self.update_state()
488
489 def _select_sync_leader(self) -> Player | None:
490 """Select the active sync leader player for a syncgroup."""
491 if self.sync_leader and self.sync_leader.player_id in self.group_members:
492 # Don't change the sync leader if we already have one
493 return self.sync_leader
494 for prefer_sync_leader in (True, False):
495 for child_player in self.mass.players.iter_group_members(self):
496 if prefer_sync_leader and child_player.synced_to:
497 # prefer the first player that already has sync children
498 continue
499 if child_player.active_group not in (
500 None,
501 self.player_id,
502 child_player.player_id,
503 ):
504 # this should not happen (because its already handled in the power on logic),
505 # but guard it just in case bad things happen
506 continue
507 return child_player
508 return None
509
510 async def _handle_member_collisions(self, member: Player) -> None:
511 """Handle collisions when adding a member to the sync group."""
512 active_groups = member.active_groups
513 for group in active_groups:
514 if group == self.player_id:
515 continue
516 # collision: child player is part another group that is already active !
517 # solve this by trying to leave the group first
518 if other_group := self.mass.players.get(group):
519 if (
520 other_group.supports_feature(PlayerFeature.SET_MEMBERS)
521 and member.player_id not in other_group.static_group_members
522 ):
523 await other_group.set_members(player_ids_to_remove=[member.player_id])
524 else:
525 # if the other group does not support SET_MEMBERS or it is a static
526 # member, we need to power it off to leave the group
527 await other_group.power(False)
528 if (
529 member.synced_to is not None
530 and self.sync_leader
531 and member.synced_to != self.sync_leader.player_id
532 and (synced_to_player := self.mass.players.get(member.synced_to))
533 and member.player_id in synced_to_player.group_members
534 ):
535 # collision: child player is synced to another player and still in that group
536 # ungroup it first
537 await synced_to_player.set_members(player_ids_to_remove=[member.player_id])
538
539
540class SyncGroupController:
541 """Controller managing SyncGroup players."""
542
543 def __init__(self, player_controller: PlayerController) -> None:
544 """Initialize SyncGroupController."""
545 self.player_controller = player_controller
546 self.mass = player_controller.mass
547
548 async def create_group_player(
549 self, provider: PlayerProvider, name: str, members: list[str], dynamic: bool = True
550 ) -> Player:
551 """
552 Create new SyncGroup Player.
553
554 :param provider: The provider to create the group player for
555 :param name: Name of the group player
556 :param members: List of player ids to add to the group
557 :param dynamic: Whether the group is dynamic (members can change)
558 """
559 # default implementation for providers that support syncing players
560 if ProviderFeature.SYNC_PLAYERS not in provider.supported_features:
561 # the frontend should already prevent this, but just in case
562 raise UnsupportedFeaturedException(
563 f"Provider {provider.name} does not support player syncing!"
564 )
565 # Create a new syncgroup player with the given members
566 members = [x for x in members if x in [y.player_id for y in provider.players]]
567 player_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
568 self.mass.config.create_default_player_config(
569 player_id=player_id,
570 provider=provider.instance_id,
571 player_type=PlayerType.GROUP,
572 name=name,
573 enabled=True,
574 values={
575 CONF_GROUP_MEMBERS: members,
576 CONF_DYNAMIC_GROUP_MEMBERS: dynamic,
577 },
578 )
579 return await self._register_syncgroup_player(player_id, provider)
580
581 async def remove_group_player(self, player_id: str) -> None:
582 """
583 Remove a group player.
584
585 :param player_id: ID of the group player to remove.
586 """
587 # we simply permanently unregister the syncgroup player and wipe its config
588 await self.mass.players.unregister(player_id, True)
589
590 async def _register_syncgroup_player(self, player_id: str, provider: PlayerProvider) -> Player:
591 """Register a syncgroup player."""
592 syncgroup = SyncGroupPlayer(provider, player_id)
593 await self.mass.players.register_or_update(syncgroup)
594 return syncgroup
595
596 async def on_provider_loaded(self, provider: PlayerProvider) -> None:
597 """Handle logic when a provider is loaded."""
598 # register existing syncgroup players for this provider
599 for player_conf in await self.mass.config.get_player_configs(provider.instance_id):
600 if player_conf.player_id.startswith(SYNCGROUP_PREFIX):
601 await self._register_syncgroup_player(player_conf.player_id, provider)
602
603 async def on_provider_unload(self, provider: PlayerProvider) -> None:
604 """Handle logic when a provider is (about to get) unloaded."""
605 # unregister existing syncgroup players for this provider
606 for player in self.mass.players.all(
607 provider_filter=provider.instance_id, return_sync_groups=True
608 ):
609 if player.player_id.startswith(SYNCGROUP_PREFIX):
610 await self.mass.players.unregister(player.player_id, False)
611