/
/
/
1"""Group Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6from copy import deepcopy
7from time import time
8from typing import TYPE_CHECKING, cast
9
10from aiohttp import web
11from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
12from music_assistant_models.constants import PLAYER_CONTROL_NONE
13from music_assistant_models.enums import (
14 ConfigEntryType,
15 ContentType,
16 MediaType,
17 PlaybackState,
18 PlayerFeature,
19 PlayerType,
20)
21from music_assistant_models.errors import UnsupportedFeaturedException
22from music_assistant_models.media_items import AudioFormat
23from propcache import under_cached_property as cached_property
24
25from music_assistant.constants import (
26 CONF_DYNAMIC_GROUP_MEMBERS,
27 CONF_GROUP_MEMBERS,
28 CONF_HTTP_PROFILE,
29 DEFAULT_STREAM_HEADERS,
30)
31from music_assistant.helpers.audio import get_player_filter_params
32from music_assistant.helpers.util import TaskManager
33from music_assistant.models.player import DeviceInfo, GroupPlayer, PlayerMedia
34from music_assistant.providers.universal_group.constants import UGP_FORMAT
35
36from .constants import CONF_ENTRY_SAMPLE_RATES_UGP, CONFIG_ENTRY_UGP_NOTE
37from .ugp_stream import UGPStream
38
39if TYPE_CHECKING:
40 from .provider import UniversalGroupProvider
41
42BASE_FEATURES = {
43 PlayerFeature.PLAY_MEDIA,
44 PlayerFeature.POWER,
45 PlayerFeature.VOLUME_SET,
46 PlayerFeature.MULTI_DEVICE_DSP,
47}
48
49
50class UniversalGroupPlayer(GroupPlayer):
51 """Universal Group Player implementation."""
52
53 def __init__(
54 self,
55 provider: UniversalGroupProvider,
56 player_id: str,
57 ) -> None:
58 """Initialize GroupPlayer instance."""
59 super().__init__(provider, player_id)
60 self.stream: UGPStream | None = None
61 self._attr_name = self.config.name or f"Universal Group {player_id}"
62 self._attr_available = True
63 self._attr_powered = False # group players are always powered off by default
64 self._attr_device_info = DeviceInfo(model="Universal Group", manufacturer=provider.name)
65 self._attr_supported_features = {*BASE_FEATURES}
66 self._attr_needs_poll = True
67 self._attr_poll_interval = 30
68 # register dynamic route for the ugp stream
69 self._on_unload_callbacks.append(
70 self.mass.streams.register_dynamic_route(
71 f"/ugp/{self.player_id}.flac", self._serve_ugp_stream
72 )
73 )
74 self._on_unload_callbacks.append(
75 self.mass.streams.register_dynamic_route(
76 f"/ugp/{self.player_id}.mp3", self._serve_ugp_stream
77 )
78 )
79 self._on_unload_callbacks.append(
80 self.mass.streams.register_dynamic_route(
81 f"/ugp/{self.player_id}.aac", self._serve_ugp_stream
82 )
83 )
84 self._set_attributes()
85
86 @property
87 def requires_flow_mode(self) -> bool:
88 """Return if the player requires flow mode."""
89 return True
90
91 @property
92 def can_group_with(self) -> set[str]:
93 """Return the id's of players this player can group with."""
94 if not self.is_dynamic:
95 # in case of static members,
96 # we can only group with the players defined in the config, so we return those directly
97 return set(self._attr_static_group_members)
98 # allow grouping with all providers, except the ugp provider itself
99 return {
100 x.instance_id
101 for x in self.mass.players.providers
102 if x.instance_id != self.provider.instance_id
103 }
104
105 async def on_config_updated(self) -> None:
106 """Handle logic when the player is loaded or updated."""
107 static_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
108 self._attr_static_group_members = static_members.copy()
109 if not self.powered:
110 self._attr_group_members = static_members.copy()
111 if self.is_dynamic:
112 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
113 elif PlayerFeature.SET_MEMBERS in self._attr_supported_features:
114 self._attr_supported_features.remove(PlayerFeature.SET_MEMBERS)
115
116 @cached_property
117 def is_dynamic(self) -> bool:
118 """Return if the player is a dynamic group player."""
119 return bool(self.config.get_value(CONF_DYNAMIC_GROUP_MEMBERS, False))
120
121 async def get_config_entries(
122 self,
123 action: str | None = None,
124 values: dict[str, ConfigValueType] | None = None,
125 ) -> list[ConfigEntry]:
126 """Return all (provider/player specific) Config Entries for the given player (if any)."""
127 return [
128 # add universal group specific entries
129 CONFIG_ENTRY_UGP_NOTE,
130 ConfigEntry(
131 key=CONF_GROUP_MEMBERS,
132 type=ConfigEntryType.STRING,
133 multi_value=True,
134 label="Group members",
135 default_value=[],
136 description="Select all players you want to be part of this group",
137 required=False, # needed for dynamic members (which allows empty members list)
138 options=[
139 ConfigValueOption(x.display_name, x.player_id)
140 for x in self.mass.players.all_players(True, False)
141 if x.type != PlayerType.GROUP
142 ],
143 ),
144 ConfigEntry(
145 key=CONF_DYNAMIC_GROUP_MEMBERS,
146 type=ConfigEntryType.BOOLEAN,
147 label="Enable dynamic members",
148 description="Allow members to (temporary) join/leave the group dynamically.",
149 default_value=False,
150 required=False,
151 ),
152 CONF_ENTRY_SAMPLE_RATES_UGP,
153 ]
154
155 async def stop(self) -> None:
156 """Handle STOP command."""
157 async with TaskManager(self.mass) as tg:
158 for member in self.mass.players.iter_group_members(self, active_only=True):
159 # Use internal handler to get protocol selection and avoid redirect
160 tg.create_task(self.mass.players._handle_cmd_stop(member.player_id))
161 # abort the stream session
162 if self.stream and not self.stream.done:
163 await self.stream.stop()
164 self.stream = None
165 self._set_attributes()
166
167 async def power(self, powered: bool) -> None:
168 """Handle POWER command to group player."""
169 # always stop at power off
170 if not powered and self._attr_playback_state in (
171 PlaybackState.PLAYING,
172 PlaybackState.PAUSED,
173 ):
174 await self.stop()
175
176 # optimistically set the group state
177 prev_power = self._attr_powered
178 self._attr_powered = powered
179
180 if powered:
181 # reset the group members to the available static members when powering on
182 self._attr_group_members = []
183 for static_group_member in self._attr_static_group_members:
184 if (
185 (member_player := self.mass.players.get_player(static_group_member))
186 and member_player.available
187 and member_player.enabled
188 ):
189 self._attr_group_members.append(static_group_member)
190 # handle TURN_ON of the group player by turning on all members
191 for member in self.mass.players.iter_group_members(
192 self, only_powered=False, active_only=False
193 ):
194 if (
195 member.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED)
196 and member.active_source != self.active_source
197 ):
198 # stop playing existing content on member if we start the group player
199 # Use internal handler to get protocol selection and avoid redirect
200 await self.mass.players._handle_cmd_stop(member.player_id)
201 if (
202 member.state.active_group is not None
203 and member.state.active_group != self.player_id
204 ):
205 # collision: child player is part of multiple groups
206 # and another group already active !
207 # solve this by trying to leave the group first
208 if other_group := self.mass.players.get_player(member.state.active_group):
209 if (
210 other_group.supports_feature(PlayerFeature.SET_MEMBERS)
211 and member.player_id not in other_group.static_group_members
212 ):
213 await other_group.set_members(player_ids_to_remove=[member.player_id])
214 else:
215 # if the other group does not support SET_MEMBERS or it is a static
216 # member, we need to power it off to leave the group
217 await other_group.power(False)
218 await asyncio.sleep(1)
219 await asyncio.sleep(1)
220 if member.synced_to:
221 # edge case: the member is part of a syncgroup - ungroup it first
222 await member.ungroup()
223 if not member.powered and member.power_control != PLAYER_CONTROL_NONE:
224 await self.mass.players.cmd_power(member.player_id, True)
225 elif prev_power:
226 # handle TURN_OFF of the group player by turning off all members
227 for member in self.mass.players.iter_group_members(
228 self, only_powered=True, active_only=True
229 ):
230 # handle TURN_OFF of the group player by turning off all members
231 if member.powered and member.power_control != PLAYER_CONTROL_NONE:
232 await self.mass.players.cmd_power(member.player_id, False)
233
234 if not powered:
235 # reset the original group members when powered off
236 self._attr_group_members = self._attr_static_group_members.copy()
237 self.update_state()
238
239 async def volume_set(self, volume_level: int) -> None:
240 """Send VOLUME_SET command to given player."""
241 # group volume is already handled in the player manager
242
243 async def play_media(self, media: PlayerMedia) -> None:
244 """Handle PLAY MEDIA on given player."""
245 await self.power(True)
246
247 if self.stream and not self.stream.done:
248 # stop any existing stream first
249 await self.stream.stop()
250
251 # select audio source
252 audio_source = self.mass.streams.get_stream(media, UGP_FORMAT)
253
254 # start the stream task
255 self.stream = UGPStream(
256 audio_source=audio_source, audio_format=UGP_FORMAT, base_pcm_format=UGP_FORMAT
257 )
258 base_url = f"{self.mass.streams.base_url}/ugp/{self.player_id}.flac"
259
260 # set the state optimistically
261 self._attr_current_media = deepcopy(media)
262 self._attr_elapsed_time = 0
263 self._attr_elapsed_time_last_updated = time() - 1
264 self._attr_playback_state = PlaybackState.PLAYING
265 self.update_state()
266
267 # forward to downstream play_media commands
268 async with TaskManager(self.mass) as tg:
269 for member in self.mass.players.iter_group_members(
270 self, only_powered=True, active_only=True
271 ):
272 # Use internal handler to get protocol selection and avoid redirect
273 tg.create_task(
274 self.mass.players._handle_play_media(
275 member.player_id,
276 PlayerMedia(
277 uri=f"{base_url}?player_id={member.player_id}",
278 media_type=MediaType.FLOW_STREAM,
279 title=self.display_name,
280 source_id=self.player_id,
281 ),
282 )
283 )
284
285 async def set_members(
286 self,
287 player_ids_to_add: list[str] | None = None,
288 player_ids_to_remove: list[str] | None = None,
289 ) -> None:
290 """Handle SET_MEMBERS command on the player."""
291 if not self.is_dynamic:
292 raise UnsupportedFeaturedException(
293 f"Group {self.display_name} does not allow dynamically adding/removing members!"
294 )
295 # handle additions
296 for player_id in player_ids_to_add or []:
297 if player_id in self._attr_group_members:
298 continue
299 if player_id == self.player_id:
300 raise UnsupportedFeaturedException(
301 f"Cannot add {self.display_name} to itself as a member!"
302 )
303 child_player = self.mass.players.get_player(player_id, True)
304 assert child_player # for type checking
305 if child_player.synced_to:
306 # This is player is part of a syncgroup - ungroup it first
307 await child_player.ungroup()
308 self._attr_group_members.append(player_id)
309 # let the newly add member join the stream if we're playing
310 if self.stream and not self.stream.done and self.powered:
311 base_url = f"{self.mass.streams.base_url}/ugp/{self.player_id}.flac"
312 # Use internal handler to get protocol selection and avoid redirect
313 await self.mass.players._handle_play_media(
314 player_id,
315 PlayerMedia(
316 uri=f"{base_url}?player_id={player_id}",
317 media_type=MediaType.FLOW_STREAM,
318 title=self.display_name,
319 source_id=player_id,
320 ),
321 )
322 # handle removals
323 for player_id in player_ids_to_remove or []:
324 if player_id not in self._attr_group_members:
325 continue
326 if player_id == self.player_id:
327 raise UnsupportedFeaturedException(
328 f"Cannot remove {self.display_name} from itself as a member!"
329 )
330 self._attr_group_members.remove(player_id)
331 child_player = self.mass.players.get_player(player_id, True)
332 assert child_player is not None # for type checking
333 if child_player.playback_state in (
334 PlaybackState.PLAYING,
335 PlaybackState.PAUSED,
336 ):
337 # if the child player is playing the group stream, stop it
338 # Use internal handler to get protocol selection and avoid redirect
339 await self.mass.players._handle_cmd_stop(player_id)
340 self.update_state()
341
342 async def poll(self) -> None:
343 """Poll player for state updates."""
344 self._set_attributes()
345
346 async def on_unload(self) -> None:
347 """Handle logic when the player is unloaded from the Player controller."""
348 await super().on_unload()
349 if self.powered:
350 # edge case: the group player is powered and being unloaded
351 # make sure to turn it off first (which will also ungroup a syncgroup)
352 await self.power(False)
353
354 def _set_attributes(self) -> None:
355 """Set attributes of the group player."""
356 if self.is_dynamic and PlayerFeature.SET_MEMBERS not in self.supported_features:
357 # dynamic group players should support SET_MEMBERS feature
358 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
359 elif not self.is_dynamic and PlayerFeature.SET_MEMBERS in self.supported_features:
360 # static group players should not support SET_MEMBERS feature
361 self._attr_supported_features.discard(PlayerFeature.SET_MEMBERS)
362 # grab current media and state from one of the active players
363 for child_player in self.mass.players.iter_group_members(self, active_only=True):
364 self._attr_playback_state = child_player.playback_state
365 if child_player.elapsed_time:
366 self._attr_elapsed_time = child_player.elapsed_time
367 self._attr_elapsed_time_last_updated = child_player.elapsed_time_last_updated
368 break
369 else:
370 self._attr_playback_state = PlaybackState.IDLE
371 self.update_state()
372
373 async def _serve_ugp_stream(self, request: web.Request) -> web.StreamResponse:
374 """Serve the UGP (multi-client) flow stream audio to a player."""
375 ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
376 child_player_id = request.query.get("player_id") # optional!
377 output_format_str = request.path.rsplit(".")[-1]
378
379 if child_player_id and (child_player := self.mass.players.get_player(child_player_id)):
380 # Use the preferred output format of the child player
381 output_format = await self.mass.streams.get_output_format(
382 output_format_str=output_format_str,
383 player=child_player,
384 content_sample_rate=UGP_FORMAT.sample_rate,
385 content_bit_depth=UGP_FORMAT.bit_depth,
386 )
387 http_profile = await self.mass.config.get_player_config_value(
388 child_player_id, CONF_HTTP_PROFILE, return_type=str
389 )
390 elif output_format_str == "flac":
391 output_format = AudioFormat(content_type=ContentType.FLAC)
392 else:
393 output_format = AudioFormat(content_type=ContentType.MP3)
394 http_profile = "chunked"
395
396 if not (ugp_player := self.mass.players.get_player(ugp_player_id)):
397 raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")
398
399 if not self.stream or self.stream.done:
400 raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!")
401
402 headers = {
403 **DEFAULT_STREAM_HEADERS,
404 "Content-Type": f"audio/{output_format_str}",
405 "Accept-Ranges": "none",
406 "Cache-Control": "no-cache",
407 "Connection": "close",
408 }
409
410 resp = web.StreamResponse(status=200, reason="OK", headers=headers)
411 if http_profile == "forced_content_length":
412 resp.content_length = 4294967296
413 elif http_profile == "chunked":
414 resp.enable_chunked_encoding()
415
416 await resp.prepare(request)
417
418 # return early if this is not a GET request
419 if request.method != "GET":
420 return resp
421
422 # all checks passed, start streaming!
423 self.logger.debug(
424 "Start serving UGP flow audio stream for UGP-player %s to %s",
425 ugp_player.display_name,
426 child_player_id or request.remote,
427 )
428
429 # Generate filter params for the player specific DSP settings
430 filter_params = None
431 if child_player_id:
432 filter_params = get_player_filter_params(
433 self.mass, child_player_id, self.stream.input_format, output_format
434 )
435
436 async for chunk in self.stream.get_stream(
437 output_format,
438 filter_params=filter_params,
439 ):
440 try:
441 await resp.write(chunk)
442 except (ConnectionError, ConnectionResetError):
443 break
444
445 return resp
446