/
/
/
1"""HEOS Player implementation."""
2
3from __future__ import annotations
4
5from copy import copy
6from typing import TYPE_CHECKING, cast
7
8from music_assistant_models.enums import MediaType, PlaybackState, PlayerFeature, PlayerType
9from music_assistant_models.errors import SetupFailedError
10from music_assistant_models.player import DeviceInfo, PlayerSource
11from pyheos import Heos, const
12
13from music_assistant.constants import create_sample_rates_config_entry
14from music_assistant.models.player import Player, PlayerMedia
15from music_assistant.providers.heos.helpers import media_uri_from_now_playing_media
16
17from .constants import HEOS_MEDIA_TYPE_TO_MEDIA_TYPE, HEOS_PLAY_STATE_TO_PLAYBACK_STATE
18
19if TYPE_CHECKING:
20 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
21 from pyheos import HeosPlayer as PyHeosPlayer
22
23 from .provider import HeosPlayerProvider
24
25
26PLAYER_FEATURES = {
27 PlayerFeature.VOLUME_SET,
28 PlayerFeature.VOLUME_MUTE,
29 PlayerFeature.PAUSE,
30 PlayerFeature.NEXT_PREVIOUS,
31 PlayerFeature.SELECT_SOURCE,
32 PlayerFeature.SET_MEMBERS,
33 PlayerFeature.PLAY_MEDIA,
34}
35
36
37class HeosPlayer(Player):
38 """HeosPlayer in Music Assistant."""
39
40 _heos: Heos
41 _device: PyHeosPlayer
42
43 @property
44 def requires_flow_mode(self) -> bool:
45 """Return if the player requires flow mode."""
46 return True
47
48 def __init__(self, provider: HeosPlayerProvider, device: PyHeosPlayer) -> None:
49 """Initialize the Player."""
50 super().__init__(provider, str(device.player_id))
51
52 self._device: PyHeosPlayer = device
53
54 if self._device.heos is None:
55 raise SetupFailedError("HEOS device has no controller assigned")
56
57 # Keep internal reference so we don't need to check None on each call
58 self._heos = self._device.heos
59
60 self._attr_type = PlayerType.PLAYER
61 self._attr_supported_features = PLAYER_FEATURES
62 self._attr_can_group_with = {self.provider.instance_id}
63
64 async def setup(self) -> None:
65 """Set up the player."""
66 self.set_device_info()
67 self.set_dynamic_attributes(update_media=True)
68
69 await self.mass.players.register_or_update(self)
70
71 self._on_unload_callbacks.append(
72 self._device.add_on_player_event(self._player_event_received)
73 )
74
75 await self.build_group_list()
76 await self.build_source_list()
77
78 def set_device_info(self) -> None:
79 """Set all device info attributes."""
80 # Extract manufacturer and model from device model string, if available
81 model_parts = self._device.model.split(maxsplit=1)
82 manufacturer = model_parts[0] if len(model_parts) == 2 else "HEOS"
83 model = model_parts[1] if len(model_parts) == 2 else self._device.model
84
85 _device_info = DeviceInfo(
86 model=model,
87 software_version=self._device.version,
88 manufacturer=manufacturer,
89 )
90 _device_info.ip_address = self._device.ip_address
91 self._attr_device_info = _device_info
92 self._attr_available = self._device.available
93 self._attr_name = self._device.name
94
95 async def build_group_list(self) -> None:
96 """Build group list based on group info from controller."""
97 # Group IDs are the player ID of the leader
98 if self._device.group_id is not None and str(self._device.group_id) == self.player_id:
99 group_info = await self._heos.get_group_info(self._device.group_id)
100 self._attr_group_members = [
101 str(group_info.lead_player_id),
102 *(str(member) for member in group_info.member_player_ids),
103 ]
104 else:
105 self._attr_group_members.clear()
106
107 self.update_state()
108
109 async def build_source_list(self) -> None:
110 """Build source list based on music source list, combined with player specific inputs."""
111 prov = cast("HeosPlayerProvider", self.provider)
112 self._attr_source_list = prov.music_source_list[:] # copy so we can modify
113
114 for input_source in prov.input_source_list:
115 # Only add input sources that belong to this player
116 if str(input_source.source_id) != self.player_id or input_source.media_id is None:
117 continue
118
119 self._attr_source_list.append(
120 PlayerSource(
121 id=input_source.media_id,
122 name=input_source.name,
123 can_play_pause=True,
124 )
125 )
126
127 self.update_state()
128
129 async def _player_event_received(self, event: str) -> None:
130 """Handle player device events."""
131 self.logger.debug("[%s] Event received: %s", self._device.name, event)
132
133 match event:
134 case const.EVENT_PLAYER_STATE_CHANGED:
135 self._update_player_state()
136
137 case const.EVENT_PLAYER_NOW_PLAYING_CHANGED:
138 self._update_player_current_media()
139 self._update_player_playing_progress()
140
141 case const.EVENT_PLAYER_NOW_PLAYING_PROGRESS:
142 self._update_player_playing_progress()
143
144 case const.EVENT_PLAYER_VOLUME_CHANGED:
145 self._update_player_volume()
146
147 case const.EVENT_PLAYER_PLAYBACK_ERROR:
148 self.logger.error(
149 "[%s] Playback error: %s", self._device.name, self._device.playback_error
150 )
151 self.set_dynamic_attributes()
152
153 case _:
154 # Update everything on other events
155 self.set_dynamic_attributes()
156
157 self.update_state()
158
159 def _update_player_volume(self) -> None:
160 """Update volume properties."""
161 self._attr_volume_level = self._device.volume
162 self._attr_volume_muted = self._device.is_muted
163
164 def _update_player_state(self) -> None:
165 """Update playback state."""
166 self._attr_playback_state = HEOS_PLAY_STATE_TO_PLAYBACK_STATE.get(
167 self._device.state, PlaybackState.UNKNOWN
168 )
169
170 def _update_player_current_media(self) -> None:
171 """Update current media properties."""
172 now_playing = self._device.now_playing_media
173
174 # Only update if we're not playing from our queue
175 # HEOS does not make a distinction on source ID when playing from a DLNA server, USB stick,
176 # generic URL (like MA), or other local source.
177 # We can only know we're playing from MA if we started this session.
178 if (now_playing.source_id != const.MUSIC_SOURCE_LOCAL_MUSIC) or (
179 self._attr_active_source != self.player_id
180 ):
181 self.logger.debug(
182 "[%s] Now playing changed externally: %s", self._device.name, now_playing
183 )
184
185 if now_playing.source_id == const.MUSIC_SOURCE_AUX_INPUT:
186 self._attr_active_source = str(now_playing.media_id)
187 else:
188 self._attr_active_source = str(now_playing.source_id)
189
190 self._attr_current_media = PlayerMedia(
191 uri=now_playing.media_id or media_uri_from_now_playing_media(now_playing),
192 media_type=HEOS_MEDIA_TYPE_TO_MEDIA_TYPE.get(
193 now_playing.type,
194 MediaType.UNKNOWN,
195 ),
196 title=now_playing.song,
197 artist=now_playing.artist,
198 album=now_playing.album,
199 image_url=now_playing.image_url,
200 duration=now_playing.duration,
201 source_id=str(now_playing.source_id),
202 elapsed_time=now_playing.current_position,
203 elapsed_time_last_updated=(
204 now_playing.current_position_updated.timestamp()
205 if now_playing.current_position_updated
206 else None
207 ),
208 # TODO: We can use custom_data to set the IDs
209 )
210
211 def _update_player_playing_progress(self) -> None:
212 """Update current media progress properties."""
213 now_playing = self._device.now_playing_media
214
215 self._attr_elapsed_time = (
216 now_playing.current_position / 1000 if now_playing.current_position else None
217 )
218 self._attr_elapsed_time_last_updated = (
219 now_playing.current_position_updated.timestamp()
220 if now_playing.current_position_updated
221 else None
222 )
223
224 def set_dynamic_attributes(self, update_media: bool = False) -> None:
225 """Update all player dynamic attributes."""
226 self._update_player_volume()
227 self._update_player_state()
228
229 if update_media:
230 self._update_player_current_media()
231
232 self._update_player_playing_progress()
233
234 async def volume_set(self, volume_level: int) -> None:
235 """Handle VOLUME_SET command on the player."""
236 await self._device.set_volume(volume_level)
237
238 async def volume_mute(self, muted: bool) -> None:
239 """Handle VOLUME MUTE command on the player."""
240 if muted:
241 await self._device.mute()
242 else:
243 await self._device.unmute()
244
245 async def play(self) -> None:
246 """Handle PLAY command on the player."""
247 await self._device.play()
248
249 async def stop(self) -> None:
250 """Handle STOP command on the player."""
251 await self._device.stop()
252
253 async def pause(self) -> None:
254 """Handle PAUSE command on the player."""
255 await self._device.pause()
256
257 async def next_track(self) -> None:
258 """Handle NEXT_TRACK command on the player."""
259 await self._device.play_next()
260
261 async def previous_track(self) -> None:
262 """Handle PREVIOUS_TRACK command on the player."""
263 await self._device.play_previous()
264
265 async def play_media(self, media: PlayerMedia) -> None:
266 """Handle PLAY MEDIA command on given player."""
267 await self._device.clear_queue()
268
269 url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
270 await self._device.play_url(url)
271
272 self._attr_current_media = media
273 self._attr_active_source = self.player_id
274
275 self.update_state()
276
277 async def set_members(
278 self,
279 player_ids_to_add: list[str] | None = None,
280 player_ids_to_remove: list[str] | None = None,
281 ) -> None:
282 """Handle SET MEMBERS command on player."""
283 if player_ids_to_add is None and player_ids_to_remove is None:
284 return
285
286 members: list[str] = copy(self._attr_group_members)
287
288 # Make sure we are always in the group
289 if self.player_id not in members:
290 members = [self.player_id, *members]
291
292 for added_player_id in player_ids_to_add or []:
293 members.append(added_player_id)
294
295 for removed_player_id in player_ids_to_remove or []:
296 members.remove(removed_player_id)
297
298 if len(members) <= 1:
299 await self._heos.remove_group(self._device.player_id)
300 else:
301 await self._heos.set_group([int(player) for player in members])
302 # group_members will be updated when group_changed event is handled
303
304 async def get_config_entries(
305 self,
306 action: str | None = None,
307 values: dict[str, ConfigValueType] | None = None,
308 ) -> list[ConfigEntry]:
309 """Return all (provider/player specific) Config Entries for the player."""
310 return [
311 # Gen 1 devices, like HEOS Link, only support up to 48kHz/16bit
312 create_sample_rates_config_entry(
313 max_sample_rate=192000,
314 safe_max_sample_rate=48000,
315 max_bit_depth=24,
316 safe_max_bit_depth=16,
317 ),
318 ]
319