/
/
/
1"""HEOS Player implementation."""
2
3from __future__ import annotations
4
5from copy import copy
6from typing import TYPE_CHECKING, cast
7
8from music_assistant_models.enums import MediaType, PlaybackState, PlayerFeature, PlayerType
9from music_assistant_models.errors import SetupFailedError
10from music_assistant_models.player import DeviceInfo, PlayerSource
11from pyheos import Heos, const
12
13from music_assistant.constants import create_sample_rates_config_entry
14from music_assistant.models.player import Player, PlayerMedia
15from music_assistant.providers.heos.helpers import media_uri_from_now_playing_media
16
17from .constants import HEOS_MEDIA_TYPE_TO_MEDIA_TYPE, HEOS_PLAY_STATE_TO_PLAYBACK_STATE
18
19if TYPE_CHECKING:
20 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
21 from pyheos import HeosPlayer as PyHeosPlayer
22
23 from .provider import HeosPlayerProvider
24
25
26PLAYER_FEATURES = {
27 PlayerFeature.VOLUME_SET,
28 PlayerFeature.VOLUME_MUTE,
29 PlayerFeature.PAUSE,
30 PlayerFeature.NEXT_PREVIOUS,
31 PlayerFeature.SELECT_SOURCE,
32 PlayerFeature.SET_MEMBERS,
33}
34
35
36class HeosPlayer(Player):
37 """HeosPlayer in Music Assistant."""
38
39 _heos: Heos
40 _device: PyHeosPlayer
41
42 @property
43 def requires_flow_mode(self) -> bool:
44 """Return if the player requires flow mode."""
45 return True
46
47 def __init__(self, provider: HeosPlayerProvider, device: PyHeosPlayer) -> None:
48 """Initialize the Player."""
49 super().__init__(provider, str(device.player_id))
50
51 self._device: PyHeosPlayer = device
52
53 if self._device.heos is None:
54 raise SetupFailedError("HEOS device has no controller assigned")
55
56 # Keep internal reference so we don't need to check None on each call
57 self._heos = self._device.heos
58
59 self._attr_type = PlayerType.PLAYER
60 self._attr_supported_features = PLAYER_FEATURES
61 self._attr_can_group_with = {self.provider.instance_id}
62
63 async def setup(self) -> None:
64 """Set up the player."""
65 self.set_device_info()
66 self.set_dynamic_attributes()
67
68 await self.mass.players.register_or_update(self)
69
70 self._on_unload_callbacks.append(
71 self._device.add_on_player_event(self._player_event_received)
72 )
73
74 await self.build_group_list()
75 await self.build_source_list()
76
77 def set_device_info(self) -> None:
78 """Set all device info attributes."""
79 # Extract manufacturer and model from device model string, if available
80 model_parts = self._device.model.split(maxsplit=1)
81 manufacturer = model_parts[0] if len(model_parts) == 2 else "HEOS"
82 model = model_parts[1] if len(model_parts) == 2 else self._device.model
83
84 _device_info = DeviceInfo(
85 model=model,
86 software_version=self._device.version,
87 manufacturer=manufacturer,
88 )
89 _device_info.ip_address = self._device.ip_address
90 self._attr_device_info = _device_info
91 self._attr_available = self._device.available
92 self._attr_name = self._device.name
93
94 async def build_group_list(self) -> None:
95 """Build group list based on group info from controller."""
96 # Group IDs are the player ID of the leader
97 if self._device.group_id is not None and str(self._device.group_id) == self.player_id:
98 group_info = await self._heos.get_group_info(self._device.group_id)
99 self._attr_group_members = [
100 str(group_info.lead_player_id),
101 *(str(member) for member in group_info.member_player_ids),
102 ]
103 else:
104 self._attr_group_members.clear()
105
106 self.update_state()
107
108 async def build_source_list(self) -> None:
109 """Build source list based on music source list, combined with player specific inputs."""
110 prov = cast("HeosPlayerProvider", self.provider)
111 self._attr_source_list = prov.music_source_list[:] # copy so we can modify
112
113 for input_source in prov.input_source_list:
114 # Only add input sources that belong to this player
115 if str(input_source.source_id) != self.player_id or input_source.media_id is None:
116 continue
117
118 self._attr_source_list.append(
119 PlayerSource(
120 id=input_source.media_id,
121 name=input_source.name,
122 can_play_pause=True,
123 )
124 )
125
126 self.update_state()
127
128 async def _player_event_received(self, event: str) -> None:
129 """Handle player device events."""
130 self.logger.debug("[%s] Event received: %s", self._device.name, event)
131
132 match event:
133 case const.EVENT_PLAYER_STATE_CHANGED:
134 self._update_player_state()
135
136 case const.EVENT_PLAYER_NOW_PLAYING_CHANGED:
137 self._update_player_current_media()
138 self._update_player_playing_progress()
139
140 case const.EVENT_PLAYER_NOW_PLAYING_PROGRESS:
141 self._update_player_playing_progress()
142
143 case const.EVENT_PLAYER_VOLUME_CHANGED:
144 self._update_player_volume()
145
146 case _:
147 # Update everything on other events
148 self.set_dynamic_attributes()
149
150 self.update_state()
151
152 def _update_player_volume(self) -> None:
153 """Update volume properties."""
154 self._attr_volume_level = self._device.volume
155 self._attr_volume_muted = self._device.is_muted
156
157 def _update_player_state(self) -> None:
158 """Update playback state."""
159 self._attr_playback_state = HEOS_PLAY_STATE_TO_PLAYBACK_STATE.get(
160 self._device.state, PlaybackState.UNKNOWN
161 )
162
163 def _update_player_current_media(self) -> None:
164 """Update current media properties."""
165 now_playing = self._device.now_playing_media
166
167 # Only update if we're not playing from our queue
168 # HEOS does not make a distinction on source ID when playing from a DLNA server, USB stick,
169 # generic URL (like MA), or other local source.
170 # We can only know we're playing from MA if we started this session.
171 if (now_playing.source_id != const.MUSIC_SOURCE_LOCAL_MUSIC) or (
172 self._attr_active_source != self.player_id
173 ):
174 self.logger.debug(
175 "[%s] Now playing changed externally: %s", self._device.name, now_playing
176 )
177
178 if now_playing.source_id == const.MUSIC_SOURCE_AUX_INPUT:
179 self._attr_active_source = str(now_playing.media_id)
180 else:
181 self._attr_active_source = str(now_playing.source_id)
182
183 self._attr_current_media = PlayerMedia(
184 uri=now_playing.media_id or media_uri_from_now_playing_media(now_playing),
185 media_type=HEOS_MEDIA_TYPE_TO_MEDIA_TYPE.get(
186 now_playing.type,
187 MediaType.UNKNOWN,
188 ),
189 title=now_playing.song,
190 artist=now_playing.artist,
191 album=now_playing.album,
192 image_url=now_playing.image_url,
193 duration=now_playing.duration,
194 source_id=str(now_playing.source_id),
195 elapsed_time=now_playing.current_position,
196 elapsed_time_last_updated=(
197 now_playing.current_position_updated.timestamp()
198 if now_playing.current_position_updated
199 else None
200 ),
201 # TODO: We can use custom_data to set the IDs
202 )
203
204 def _update_player_playing_progress(self) -> None:
205 """Update current media progress properties."""
206 now_playing = self._device.now_playing_media
207
208 self._attr_elapsed_time = (
209 now_playing.current_position / 1000 if now_playing.current_position else None
210 )
211 self._attr_elapsed_time_last_updated = (
212 now_playing.current_position_updated.timestamp()
213 if now_playing.current_position_updated
214 else None
215 )
216
217 def set_dynamic_attributes(self) -> None:
218 """Update all player dynamic attributes."""
219 self._update_player_volume()
220 self._update_player_state()
221 self._update_player_current_media()
222 self._update_player_playing_progress()
223
224 async def volume_set(self, volume_level: int) -> None:
225 """Handle VOLUME_SET command on the player."""
226 await self._device.set_volume(volume_level)
227
228 async def volume_mute(self, muted: bool) -> None:
229 """Handle VOLUME MUTE command on the player."""
230 if muted:
231 await self._device.mute()
232 else:
233 await self._device.unmute()
234
235 async def play(self) -> None:
236 """Handle PLAY command on the player."""
237 await self._device.play()
238
239 async def stop(self) -> None:
240 """Handle STOP command on the player."""
241 await self._device.stop()
242
243 async def pause(self) -> None:
244 """Handle PAUSE command on the player."""
245 await self._device.pause()
246
247 async def next_track(self) -> None:
248 """Handle NEXT_TRACK command on the player."""
249 await self._device.play_next()
250
251 async def previous_track(self) -> None:
252 """Handle PREVIOUS_TRACK command on the player."""
253 await self._device.play_previous()
254
255 async def play_media(self, media: PlayerMedia) -> None:
256 """Handle PLAY MEDIA command on given player."""
257 url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
258 await self._device.play_url(url)
259
260 self._attr_current_media = media
261 self._attr_active_source = self.player_id
262
263 self.update_state()
264
265 async def set_members(
266 self,
267 player_ids_to_add: list[str] | None = None,
268 player_ids_to_remove: list[str] | None = None,
269 ) -> None:
270 """Handle SET MEMBERS command on player."""
271 if player_ids_to_add is None and player_ids_to_remove is None:
272 return
273
274 members: list[str] = copy(self._attr_group_members)
275
276 # Make sure we are always in the group
277 if self.player_id not in members:
278 members = [self.player_id, *members]
279
280 for added_player_id in player_ids_to_add or []:
281 members.append(added_player_id)
282
283 for removed_player_id in player_ids_to_remove or []:
284 members.remove(removed_player_id)
285
286 if len(members) <= 1:
287 await self._heos.remove_group(self._device.player_id)
288 else:
289 await self._heos.set_group([int(player) for player in members])
290 # group_members will be updated when group_changed event is handled
291
292 async def get_config_entries(
293 self,
294 action: str | None = None,
295 values: dict[str, ConfigValueType] | None = None,
296 ) -> list[ConfigEntry]:
297 """Return all (provider/player specific) Config Entries for the player."""
298 return [
299 # Gen 1 devices, like HEOS Link, only support up to 48kHz/16bit
300 create_sample_rates_config_entry(
301 max_sample_rate=192000,
302 safe_max_sample_rate=48000,
303 max_bit_depth=24,
304 safe_max_bit_depth=16,
305 ),
306 ]
307