/
/
/
1"""Group Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6from copy import deepcopy
7from time import time
8from typing import TYPE_CHECKING, cast
9
10from aiohttp import web
11from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
12from music_assistant_models.constants import PLAYER_CONTROL_NONE
13from music_assistant_models.enums import (
14 ConfigEntryType,
15 ContentType,
16 MediaType,
17 PlaybackState,
18 PlayerFeature,
19 PlayerType,
20)
21from music_assistant_models.errors import UnsupportedFeaturedException
22from music_assistant_models.media_items import AudioFormat
23from propcache import under_cached_property as cached_property
24
25from music_assistant.constants import (
26 CONF_DYNAMIC_GROUP_MEMBERS,
27 CONF_GROUP_MEMBERS,
28 CONF_HTTP_PROFILE,
29 DEFAULT_STREAM_HEADERS,
30)
31from music_assistant.helpers.audio import get_player_filter_params
32from music_assistant.helpers.util import TaskManager
33from music_assistant.models.player import DeviceInfo, GroupPlayer, PlayerMedia
34from music_assistant.providers.universal_group.constants import UGP_FORMAT
35
36from .constants import CONF_ENTRY_SAMPLE_RATES_UGP, CONFIG_ENTRY_UGP_NOTE
37from .ugp_stream import UGPStream
38
39if TYPE_CHECKING:
40 from .provider import UniversalGroupProvider
41
42BASE_FEATURES = {PlayerFeature.POWER, PlayerFeature.VOLUME_SET, PlayerFeature.MULTI_DEVICE_DSP}
43
44
45class UniversalGroupPlayer(GroupPlayer):
46 """Universal Group Player implementation."""
47
48 def __init__(
49 self,
50 provider: UniversalGroupProvider,
51 player_id: str,
52 ) -> None:
53 """Initialize GroupPlayer instance."""
54 super().__init__(provider, player_id)
55 self.stream: UGPStream | None = None
56 self._attr_name = self.config.name or f"Universal Group {player_id}"
57 self._attr_available = True
58 self._attr_powered = False # group players are always powered off by default
59 self._attr_device_info = DeviceInfo(model="Universal Group", manufacturer=provider.name)
60 self._attr_supported_features = {*BASE_FEATURES}
61 self._attr_needs_poll = True
62 self._attr_poll_interval = 30
63 # register dynamic route for the ugp stream
64 self._on_unload_callbacks.append(
65 self.mass.streams.register_dynamic_route(
66 f"/ugp/{self.player_id}.flac", self._serve_ugp_stream
67 )
68 )
69 self._on_unload_callbacks.append(
70 self.mass.streams.register_dynamic_route(
71 f"/ugp/{self.player_id}.mp3", self._serve_ugp_stream
72 )
73 )
74 self._on_unload_callbacks.append(
75 self.mass.streams.register_dynamic_route(
76 f"/ugp/{self.player_id}.aac", self._serve_ugp_stream
77 )
78 )
79 # allow grouping with all providers, except the ugp provider itself
80 self._attr_can_group_with = {
81 x.instance_id
82 for x in self.mass.players.providers
83 if x.instance_id != self.provider.instance_id
84 }
85 self._set_attributes()
86
87 @property
88 def requires_flow_mode(self) -> bool:
89 """Return if the player requires flow mode."""
90 return True
91
92 async def on_config_updated(self) -> None:
93 """Handle logic when the player is loaded or updated."""
94 static_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
95 self._attr_static_group_members = static_members.copy()
96 if not self.powered:
97 self._attr_group_members = static_members.copy()
98 if self.is_dynamic:
99 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
100 elif PlayerFeature.SET_MEMBERS in self._attr_supported_features:
101 self._attr_supported_features.remove(PlayerFeature.SET_MEMBERS)
102
103 @cached_property
104 def is_dynamic(self) -> bool:
105 """Return if the player is a dynamic group player."""
106 return bool(self.config.get_value(CONF_DYNAMIC_GROUP_MEMBERS, False))
107
108 async def get_config_entries(
109 self,
110 action: str | None = None,
111 values: dict[str, ConfigValueType] | None = None,
112 ) -> list[ConfigEntry]:
113 """Return all (provider/player specific) Config Entries for the given player (if any)."""
114 return [
115 # add universal group specific entries
116 CONFIG_ENTRY_UGP_NOTE,
117 ConfigEntry(
118 key=CONF_GROUP_MEMBERS,
119 type=ConfigEntryType.STRING,
120 multi_value=True,
121 label="Group members",
122 default_value=[],
123 description="Select all players you want to be part of this group",
124 required=False, # needed for dynamic members (which allows empty members list)
125 options=[
126 ConfigValueOption(x.display_name, x.player_id)
127 for x in self.mass.players.all(True, False)
128 if x.type != PlayerType.GROUP
129 ],
130 ),
131 ConfigEntry(
132 key=CONF_DYNAMIC_GROUP_MEMBERS,
133 type=ConfigEntryType.BOOLEAN,
134 label="Enable dynamic members",
135 description="Allow members to (temporary) join/leave the group dynamically.",
136 default_value=False,
137 required=False,
138 ),
139 CONF_ENTRY_SAMPLE_RATES_UGP,
140 ]
141
142 async def stop(self) -> None:
143 """Handle STOP command."""
144 async with TaskManager(self.mass) as tg:
145 for member in self.mass.players.iter_group_members(self, active_only=True):
146 tg.create_task(member.stop())
147 # abort the stream session
148 if self.stream and not self.stream.done:
149 await self.stream.stop()
150 self.stream = None
151 self._set_attributes()
152
153 async def power(self, powered: bool) -> None:
154 """Handle POWER command to group player."""
155 # always stop at power off
156 if not powered and self.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
157 await self.stop()
158
159 # optimistically set the group state
160 prev_power = self._attr_powered
161 self._attr_powered = powered
162
163 if powered:
164 # reset the group members to the available static members when powering on
165 self._attr_group_members = []
166 for static_group_member in self._attr_static_group_members:
167 if (
168 (member_player := self.mass.players.get(static_group_member))
169 and member_player.available
170 and member_player.enabled
171 ):
172 self._attr_group_members.append(static_group_member)
173 # handle TURN_ON of the group player by turning on all members
174 for member in self.mass.players.iter_group_members(
175 self, only_powered=False, active_only=False
176 ):
177 if (
178 member.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED)
179 and member.active_source != self.active_source
180 ):
181 # stop playing existing content on member if we start the group player
182 await member.stop()
183 if member.active_group is not None and member.active_group != self.player_id:
184 # collision: child player is part of multiple groups
185 # and another group already active !
186 # solve this by trying to leave the group first
187 if other_group := self.mass.players.get(member.active_group):
188 if (
189 other_group.supports_feature(PlayerFeature.SET_MEMBERS)
190 and member.player_id not in other_group.static_group_members
191 ):
192 await other_group.set_members(player_ids_to_remove=[member.player_id])
193 else:
194 # if the other group does not support SET_MEMBERS or it is a static
195 # member, we need to power it off to leave the group
196 await other_group.power(False)
197 await asyncio.sleep(1)
198 await asyncio.sleep(1)
199 if member.synced_to:
200 # edge case: the member is part of a syncgroup - ungroup it first
201 await member.ungroup()
202 if not member.powered and member.power_control != PLAYER_CONTROL_NONE:
203 await self.mass.players.cmd_power(member.player_id, True)
204 elif prev_power:
205 # handle TURN_OFF of the group player by turning off all members
206 for member in self.mass.players.iter_group_members(
207 self, only_powered=True, active_only=True
208 ):
209 # handle TURN_OFF of the group player by turning off all members
210 if member.powered and member.power_control != PLAYER_CONTROL_NONE:
211 await self.mass.players.cmd_power(member.player_id, False)
212
213 if not powered:
214 # reset the original group members when powered off
215 self._attr_group_members = self._attr_static_group_members.copy()
216 self.update_state()
217
218 async def volume_set(self, volume_level: int) -> None:
219 """Send VOLUME_SET command to given player."""
220 # group volume is already handled in the player manager
221
222 async def play_media(self, media: PlayerMedia) -> None:
223 """Handle PLAY MEDIA on given player."""
224 await self.power(True)
225
226 if self.stream and not self.stream.done:
227 # stop any existing stream first
228 await self.stream.stop()
229
230 # select audio source
231 audio_source = self.mass.streams.get_stream(media, UGP_FORMAT)
232
233 # start the stream task
234 self.stream = UGPStream(
235 audio_source=audio_source, audio_format=UGP_FORMAT, base_pcm_format=UGP_FORMAT
236 )
237 base_url = f"{self.mass.streams.base_url}/ugp/{self.player_id}.flac"
238
239 # set the state optimistically
240 self._attr_current_media = deepcopy(media)
241 self._attr_elapsed_time = 0
242 self._attr_elapsed_time_last_updated = time() - 1
243 self._attr_playback_state = PlaybackState.PLAYING
244 self.update_state()
245
246 # forward to downstream play_media commands
247 async with TaskManager(self.mass) as tg:
248 for member in self.mass.players.iter_group_members(
249 self, only_powered=True, active_only=True
250 ):
251 tg.create_task(
252 member.play_media(
253 PlayerMedia(
254 uri=f"{base_url}?player_id={member.player_id}",
255 media_type=MediaType.FLOW_STREAM,
256 title=self.display_name,
257 source_id=self.player_id,
258 )
259 )
260 )
261
262 async def set_members(
263 self,
264 player_ids_to_add: list[str] | None = None,
265 player_ids_to_remove: list[str] | None = None,
266 ) -> None:
267 """Handle SET_MEMBERS command on the player."""
268 if not self.is_dynamic:
269 raise UnsupportedFeaturedException(
270 f"Group {self.display_name} does not allow dynamically adding/removing members!"
271 )
272 # handle additions
273 for player_id in player_ids_to_add or []:
274 if player_id in self._attr_group_members:
275 continue
276 if player_id == self.player_id:
277 raise UnsupportedFeaturedException(
278 f"Cannot add {self.display_name} to itself as a member!"
279 )
280 child_player = self.mass.players.get(player_id, True)
281 assert child_player # for type checking
282 if child_player.synced_to:
283 # This is player is part of a syncgroup - ungroup it first
284 await child_player.ungroup()
285 self._attr_group_members.append(player_id)
286 # let the newly add member join the stream if we're playing
287 if self.stream and not self.stream.done and self.powered:
288 base_url = f"{self.mass.streams.base_url}/ugp/{self.player_id}.flac"
289 await child_player.play_media(
290 media=PlayerMedia(
291 uri=f"{base_url}?player_id={player_id}",
292 media_type=MediaType.FLOW_STREAM,
293 title=self.display_name,
294 source_id=child_player.player_id,
295 ),
296 )
297 # handle removals
298 for player_id in player_ids_to_remove or []:
299 if player_id not in self._attr_group_members:
300 continue
301 if player_id == self.player_id:
302 raise UnsupportedFeaturedException(
303 f"Cannot remove {self.display_name} from itself as a member!"
304 )
305 self._attr_group_members.remove(player_id)
306 child_player = self.mass.players.get(player_id, True)
307 assert child_player is not None # for type checking
308 if child_player.playback_state in (
309 PlaybackState.PLAYING,
310 PlaybackState.PAUSED,
311 ):
312 # if the child player is playing the group stream, stop it
313 await child_player.stop()
314 self.update_state()
315
316 async def poll(self) -> None:
317 """Poll player for state updates."""
318 self._set_attributes()
319
320 async def on_unload(self) -> None:
321 """Handle logic when the player is unloaded from the Player controller."""
322 await super().on_unload()
323 if self.powered:
324 # edge case: the group player is powered and being unloaded
325 # make sure to turn it off first (which will also ungroup a syncgroup)
326 await self.power(False)
327
328 def _set_attributes(self) -> None:
329 """Set attributes of the group player."""
330 if self.is_dynamic and PlayerFeature.SET_MEMBERS not in self.supported_features:
331 # dynamic group players should support SET_MEMBERS feature
332 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
333 elif not self.is_dynamic and PlayerFeature.SET_MEMBERS in self.supported_features:
334 # static group players should not support SET_MEMBERS feature
335 self._attr_supported_features.discard(PlayerFeature.SET_MEMBERS)
336 # grab current media and state from one of the active players
337 for child_player in self.mass.players.iter_group_members(self, active_only=True):
338 self._attr_playback_state = child_player.playback_state
339 if child_player.elapsed_time:
340 self._attr_elapsed_time = child_player.elapsed_time
341 self._attr_elapsed_time_last_updated = child_player.elapsed_time_last_updated
342 break
343 else:
344 self._attr_playback_state = PlaybackState.IDLE
345 self.update_state()
346
347 async def _serve_ugp_stream(self, request: web.Request) -> web.StreamResponse:
348 """Serve the UGP (multi-client) flow stream audio to a player."""
349 ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
350 child_player_id = request.query.get("player_id") # optional!
351 output_format_str = request.path.rsplit(".")[-1]
352
353 if child_player_id and (child_player := self.mass.players.get(child_player_id)):
354 # Use the preferred output format of the child player
355 output_format = await self.mass.streams.get_output_format(
356 output_format_str=output_format_str,
357 player=child_player,
358 content_sample_rate=UGP_FORMAT.sample_rate,
359 content_bit_depth=UGP_FORMAT.bit_depth,
360 )
361 http_profile = await self.mass.config.get_player_config_value(
362 child_player_id, CONF_HTTP_PROFILE, return_type=str
363 )
364 elif output_format_str == "flac":
365 output_format = AudioFormat(content_type=ContentType.FLAC)
366 else:
367 output_format = AudioFormat(content_type=ContentType.MP3)
368 http_profile = "chunked"
369
370 if not (ugp_player := self.mass.players.get(ugp_player_id)):
371 raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")
372
373 if not self.stream or self.stream.done:
374 raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!")
375
376 headers = {
377 **DEFAULT_STREAM_HEADERS,
378 "Content-Type": f"audio/{output_format_str}",
379 "Accept-Ranges": "none",
380 "Cache-Control": "no-cache",
381 "Connection": "close",
382 }
383
384 resp = web.StreamResponse(status=200, reason="OK", headers=headers)
385 if http_profile == "forced_content_length":
386 resp.content_length = 4294967296
387 elif http_profile == "chunked":
388 resp.enable_chunked_encoding()
389
390 await resp.prepare(request)
391
392 # return early if this is not a GET request
393 if request.method != "GET":
394 return resp
395
396 # all checks passed, start streaming!
397 self.logger.debug(
398 "Start serving UGP flow audio stream for UGP-player %s to %s",
399 ugp_player.display_name,
400 child_player_id or request.remote,
401 )
402
403 # Generate filter params for the player specific DSP settings
404 filter_params = None
405 if child_player_id:
406 filter_params = get_player_filter_params(
407 self.mass, child_player_id, self.stream.input_format, output_format
408 )
409
410 async for chunk in self.stream.get_stream(
411 output_format,
412 filter_params=filter_params,
413 ):
414 try:
415 await resp.write(chunk)
416 except (ConnectionError, ConnectionResetError):
417 break
418
419 return resp
420