/
/
/
1"""Group Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6from copy import deepcopy
7from time import time
8from typing import TYPE_CHECKING, cast
9
10from aiohttp import web
11from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
12from music_assistant_models.constants import PLAYER_CONTROL_NONE
13from music_assistant_models.enums import (
14 ConfigEntryType,
15 ContentType,
16 MediaType,
17 PlaybackState,
18 PlayerFeature,
19 PlayerType,
20)
21from music_assistant_models.errors import UnsupportedFeaturedException
22from music_assistant_models.media_items import AudioFormat
23from propcache import under_cached_property as cached_property
24
25from music_assistant.constants import (
26 CONF_DYNAMIC_GROUP_MEMBERS,
27 CONF_GROUP_MEMBERS,
28 CONF_HTTP_PROFILE,
29 DEFAULT_STREAM_HEADERS,
30)
31from music_assistant.helpers.audio import get_player_filter_params
32from music_assistant.helpers.util import TaskManager
33from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
34from music_assistant.providers.universal_group.constants import UGP_FORMAT
35
36from .constants import CONF_ENTRY_SAMPLE_RATES_UGP, CONFIG_ENTRY_UGP_NOTE
37from .ugp_stream import UGPStream
38
39if TYPE_CHECKING:
40 from .provider import UniversalGroupProvider
41
42BASE_FEATURES = {
43 PlayerFeature.PLAY_MEDIA,
44 PlayerFeature.POWER,
45 PlayerFeature.VOLUME_SET,
46 PlayerFeature.MULTI_DEVICE_DSP,
47}
48
49
50class UniversalGroupPlayer(Player):
51 """Universal Group Player implementation."""
52
53 _attr_type: PlayerType = PlayerType.GROUP
54
55 def __init__(
56 self,
57 provider: UniversalGroupProvider,
58 player_id: str,
59 ) -> None:
60 """Initialize UniversalGroupPlayer instance."""
61 super().__init__(provider, player_id)
62 self.stream: UGPStream | None = None
63 self._attr_name = self.config.name or f"Universal Group {player_id}"
64 self._attr_available = True
65 self._attr_powered = False # group players are always powered off by default
66 self._attr_device_info = DeviceInfo(model="Universal Group", manufacturer=provider.name)
67 self._attr_supported_features = {*BASE_FEATURES}
68 self._attr_needs_poll = True
69 self._attr_poll_interval = 30
70 # register dynamic route for the ugp stream
71 self._on_unload_callbacks.append(
72 self.mass.streams.register_dynamic_route(
73 f"/ugp/{self.player_id}.flac", self._serve_ugp_stream
74 )
75 )
76 self._on_unload_callbacks.append(
77 self.mass.streams.register_dynamic_route(
78 f"/ugp/{self.player_id}.mp3", self._serve_ugp_stream
79 )
80 )
81 self._on_unload_callbacks.append(
82 self.mass.streams.register_dynamic_route(
83 f"/ugp/{self.player_id}.aac", self._serve_ugp_stream
84 )
85 )
86 self._set_attributes()
87
88 @property
89 def requires_flow_mode(self) -> bool:
90 """Return if the player requires flow mode."""
91 return True
92
93 @cached_property
94 def synced_to(self) -> str | None:
95 """Return the id of the player this player is synced to (sync leader)."""
96 # groups can't be synced
97 return None
98
99 @property
100 def can_group_with(self) -> set[str]:
101 """Return the id's of players this player can group with."""
102 if not self.is_dynamic:
103 # in case of static members,
104 # we can only group with the players defined in the config, so we return those directly
105 return set(self._attr_static_group_members)
106 # allow grouping with all providers, except the ugp provider itself
107 return {
108 x.instance_id
109 for x in self.mass.players.providers
110 if x.instance_id != self.provider.instance_id
111 }
112
113 async def on_config_updated(self) -> None:
114 """Handle logic when the PlayerConfig is first loaded or updated."""
115 static_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
116 self._attr_static_group_members = static_members.copy()
117 if not self.powered:
118 self._attr_group_members = static_members.copy()
119 if self.is_dynamic:
120 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
121 elif PlayerFeature.SET_MEMBERS in self._attr_supported_features:
122 self._attr_supported_features.remove(PlayerFeature.SET_MEMBERS)
123
124 @cached_property
125 def is_dynamic(self) -> bool:
126 """Return if the player is a dynamic group player."""
127 return bool(self.config.get_value(CONF_DYNAMIC_GROUP_MEMBERS, False))
128
129 async def get_config_entries(
130 self,
131 action: str | None = None,
132 values: dict[str, ConfigValueType] | None = None,
133 ) -> list[ConfigEntry]:
134 """Return all (provider/player specific) Config Entries for the given player (if any)."""
135 return [
136 # add universal group specific entries
137 CONFIG_ENTRY_UGP_NOTE,
138 ConfigEntry(
139 key=CONF_GROUP_MEMBERS,
140 type=ConfigEntryType.STRING,
141 multi_value=True,
142 label="Group members",
143 default_value=[],
144 description="Select all players you want to be part of this group",
145 required=False, # needed for dynamic members (which allows empty members list)
146 options=[
147 ConfigValueOption(x.display_name, x.player_id)
148 for x in self.mass.players.all_players(True, False)
149 if x.type != PlayerType.GROUP
150 ],
151 ),
152 ConfigEntry(
153 key=CONF_DYNAMIC_GROUP_MEMBERS,
154 type=ConfigEntryType.BOOLEAN,
155 label="Enable dynamic members",
156 description="Allow members to (temporary) join/leave the group dynamically.",
157 default_value=False,
158 required=False,
159 ),
160 CONF_ENTRY_SAMPLE_RATES_UGP,
161 ]
162
163 async def stop(self) -> None:
164 """Handle STOP command."""
165 async with TaskManager(self.mass) as tg:
166 for member in self.mass.players.iter_group_members(self, active_only=True):
167 # Use internal handler to get protocol selection and avoid redirect
168 tg.create_task(self.mass.players._handle_cmd_stop(member.player_id))
169 # abort the stream session
170 if self.stream and not self.stream.done:
171 await self.stream.stop()
172 self.stream = None
173 self._set_attributes()
174
175 async def power(self, powered: bool) -> None:
176 """Handle POWER command to group player."""
177 # always stop at power off
178 if not powered and self._attr_playback_state in (
179 PlaybackState.PLAYING,
180 PlaybackState.PAUSED,
181 ):
182 await self.stop()
183
184 # optimistically set the group state
185 prev_power = self._attr_powered
186 self._attr_powered = powered
187
188 if powered:
189 # reset the group members to the available static members when powering on
190 self._attr_group_members = []
191 for static_group_member in self._attr_static_group_members:
192 if (
193 (member_player := self.mass.players.get_player(static_group_member))
194 and member_player.available
195 and member_player.enabled
196 ):
197 self._attr_group_members.append(static_group_member)
198 # handle TURN_ON of the group player by turning on all members
199 for member in self.mass.players.iter_group_members(
200 self, only_powered=False, active_only=False
201 ):
202 if (
203 member.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED)
204 and member.active_source != self.active_source
205 ):
206 # stop playing existing content on member if we start the group player
207 # Use internal handler to get protocol selection and avoid redirect
208 await self.mass.players._handle_cmd_stop(member.player_id)
209 if (
210 member.state.active_group is not None
211 and member.state.active_group != self.player_id
212 ):
213 # collision: child player is part of multiple groups
214 # and another group already active !
215 # solve this by trying to leave the group first
216 if other_group := self.mass.players.get_player(member.state.active_group):
217 if (
218 other_group.supports_feature(PlayerFeature.SET_MEMBERS)
219 and member.player_id not in other_group.static_group_members
220 ):
221 await other_group.set_members(player_ids_to_remove=[member.player_id])
222 else:
223 # if the other group does not support SET_MEMBERS or it is a static
224 # member, we need to power it off to leave the group
225 await other_group.power(False)
226 await asyncio.sleep(1)
227 await asyncio.sleep(1)
228 if member.synced_to:
229 # edge case: the member is part of a syncgroup - ungroup it first
230 await member.ungroup()
231 if not member.powered and member.power_control != PLAYER_CONTROL_NONE:
232 await self.mass.players.cmd_power(member.player_id, True)
233 elif prev_power:
234 # handle TURN_OFF of the group player by turning off all members
235 for member in self.mass.players.iter_group_members(
236 self, only_powered=True, active_only=True
237 ):
238 # handle TURN_OFF of the group player by turning off all members
239 if member.powered and member.power_control != PLAYER_CONTROL_NONE:
240 await self.mass.players.cmd_power(member.player_id, False)
241
242 if not powered:
243 # reset the original group members when powered off
244 self._attr_group_members = self._attr_static_group_members.copy()
245 self.update_state()
246
247 async def volume_set(self, volume_level: int) -> None:
248 """Send VOLUME_SET command to given player."""
249 # group volume is already handled in the player manager
250
251 async def play_media(self, media: PlayerMedia) -> None:
252 """Handle PLAY MEDIA on given player."""
253 await self.power(True)
254
255 if self.stream and not self.stream.done:
256 # stop any existing stream first
257 await self.stream.stop()
258
259 # select audio source
260 audio_source = self.mass.streams.get_stream(media, UGP_FORMAT)
261
262 # start the stream task
263 self.stream = UGPStream(
264 audio_source=audio_source, audio_format=UGP_FORMAT, base_pcm_format=UGP_FORMAT
265 )
266 base_url = f"{self.mass.streams.base_url}/ugp/{self.player_id}.flac"
267
268 # set the state optimistically
269 self._attr_current_media = deepcopy(media)
270 self._attr_elapsed_time = 0
271 self._attr_elapsed_time_last_updated = time() - 1
272 self._attr_playback_state = PlaybackState.PLAYING
273 self.update_state()
274
275 # forward to downstream play_media commands
276 async with TaskManager(self.mass) as tg:
277 for member in self.mass.players.iter_group_members(
278 self, only_powered=True, active_only=True
279 ):
280 # Use internal handler to get protocol selection and avoid redirect
281 tg.create_task(
282 self.mass.players._handle_play_media(
283 member.player_id,
284 PlayerMedia(
285 uri=f"{base_url}?player_id={member.player_id}",
286 media_type=MediaType.FLOW_STREAM,
287 title=self.display_name,
288 source_id=self.player_id,
289 ),
290 )
291 )
292
293 async def set_members(
294 self,
295 player_ids_to_add: list[str] | None = None,
296 player_ids_to_remove: list[str] | None = None,
297 ) -> None:
298 """Handle SET_MEMBERS command on the player."""
299 if not self.is_dynamic:
300 raise UnsupportedFeaturedException(
301 f"Group {self.display_name} does not allow dynamically adding/removing members!"
302 )
303 # handle additions
304 for player_id in player_ids_to_add or []:
305 if player_id in self._attr_group_members:
306 continue
307 if player_id == self.player_id:
308 raise UnsupportedFeaturedException(
309 f"Cannot add {self.display_name} to itself as a member!"
310 )
311 child_player = self.mass.players.get_player(player_id, True)
312 assert child_player # for type checking
313 if child_player.synced_to:
314 # This is player is part of a syncgroup - ungroup it first
315 await child_player.ungroup()
316 self._attr_group_members.append(player_id)
317 # let the newly add member join the stream if we're playing
318 if self.stream and not self.stream.done and self.powered:
319 base_url = f"{self.mass.streams.base_url}/ugp/{self.player_id}.flac"
320 # Use internal handler to get protocol selection and avoid redirect
321 await self.mass.players._handle_play_media(
322 player_id,
323 PlayerMedia(
324 uri=f"{base_url}?player_id={player_id}",
325 media_type=MediaType.FLOW_STREAM,
326 title=self.display_name,
327 source_id=player_id,
328 ),
329 )
330 # handle removals
331 for player_id in player_ids_to_remove or []:
332 if player_id not in self._attr_group_members:
333 continue
334 if player_id == self.player_id:
335 raise UnsupportedFeaturedException(
336 f"Cannot remove {self.display_name} from itself as a member!"
337 )
338 self._attr_group_members.remove(player_id)
339 child_player = self.mass.players.get_player(player_id, True)
340 assert child_player is not None # for type checking
341 if child_player.playback_state in (
342 PlaybackState.PLAYING,
343 PlaybackState.PAUSED,
344 ):
345 # if the child player is playing the group stream, stop it
346 # Use internal handler to get protocol selection and avoid redirect
347 await self.mass.players._handle_cmd_stop(player_id)
348 self.update_state()
349
350 async def poll(self) -> None:
351 """Poll player for state updates."""
352 self._set_attributes()
353
354 async def on_unload(self) -> None:
355 """Handle logic when the player is unloaded from the Player controller."""
356 await super().on_unload()
357 if self.powered:
358 # edge case: the group player is powered and being unloaded
359 # make sure to turn it off first (which will also ungroup a syncgroup)
360 await self.power(False)
361
362 def _set_attributes(self) -> None:
363 """Set attributes of the group player."""
364 if self.is_dynamic and PlayerFeature.SET_MEMBERS not in self.supported_features:
365 # dynamic group players should support SET_MEMBERS feature
366 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
367 elif not self.is_dynamic and PlayerFeature.SET_MEMBERS in self.supported_features:
368 # static group players should not support SET_MEMBERS feature
369 self._attr_supported_features.discard(PlayerFeature.SET_MEMBERS)
370 # grab current media and state from one of the active players
371 for child_player in self.mass.players.iter_group_members(self, active_only=True):
372 self._attr_playback_state = child_player.playback_state
373 if child_player.elapsed_time:
374 self._attr_elapsed_time = child_player.elapsed_time
375 self._attr_elapsed_time_last_updated = child_player.elapsed_time_last_updated
376 break
377 else:
378 self._attr_playback_state = PlaybackState.IDLE
379 self.update_state()
380
381 async def _serve_ugp_stream(self, request: web.Request) -> web.StreamResponse:
382 """Serve the UGP (multi-client) flow stream audio to a player."""
383 ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
384 child_player_id = request.query.get("player_id") # optional!
385 output_format_str = request.path.rsplit(".")[-1]
386
387 if child_player_id and (child_player := self.mass.players.get_player(child_player_id)):
388 # Use the preferred output format of the child player
389 output_format = await self.mass.streams.get_output_format(
390 output_format_str=output_format_str,
391 player=child_player,
392 content_sample_rate=UGP_FORMAT.sample_rate,
393 content_bit_depth=UGP_FORMAT.bit_depth,
394 )
395 http_profile = await self.mass.config.get_player_config_value(
396 child_player_id, CONF_HTTP_PROFILE, return_type=str
397 )
398 elif output_format_str == "flac":
399 output_format = AudioFormat(content_type=ContentType.FLAC)
400 else:
401 output_format = AudioFormat(content_type=ContentType.MP3)
402 http_profile = "chunked"
403
404 if not (ugp_player := self.mass.players.get_player(ugp_player_id)):
405 raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")
406
407 if not self.stream or self.stream.done:
408 raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!")
409
410 headers = {
411 **DEFAULT_STREAM_HEADERS,
412 "Content-Type": f"audio/{output_format_str}",
413 "Accept-Ranges": "none",
414 "Cache-Control": "no-cache",
415 "Connection": "close",
416 }
417
418 resp = web.StreamResponse(status=200, reason="OK", headers=headers)
419 if http_profile == "forced_content_length":
420 resp.content_length = 4294967296
421 elif http_profile == "chunked":
422 resp.enable_chunked_encoding()
423
424 await resp.prepare(request)
425
426 # return early if this is not a GET request
427 if request.method != "GET":
428 return resp
429
430 # all checks passed, start streaming!
431 self.logger.debug(
432 "Start serving UGP flow audio stream for UGP-player %s to %s",
433 ugp_player.display_name,
434 child_player_id or request.remote,
435 )
436
437 # Generate filter params for the player specific DSP settings
438 filter_params = None
439 if child_player_id:
440 filter_params = get_player_filter_params(
441 self.mass, child_player_id, self.stream.input_format, output_format
442 )
443
444 async for chunk in self.stream.get_stream(
445 output_format,
446 filter_params=filter_params,
447 ):
448 try:
449 await resp.write(chunk)
450 except (ConnectionError, ConnectionResetError):
451 break
452
453 return resp
454