/
/
/
1"""HEOS Player implementation."""
2
3from __future__ import annotations
4
5from copy import copy
6from typing import TYPE_CHECKING, cast
7
8from music_assistant_models.enums import MediaType, PlaybackState, PlayerFeature, PlayerType
9from music_assistant_models.errors import SetupFailedError
10from music_assistant_models.player import DeviceInfo, PlayerSource
11from pyheos import Heos, const
12
13from music_assistant.constants import create_sample_rates_config_entry
14from music_assistant.models.player import Player, PlayerMedia
15from music_assistant.providers.heos.helpers import media_uri_from_now_playing_media
16
17from .constants import HEOS_MEDIA_TYPE_TO_MEDIA_TYPE, HEOS_PLAY_STATE_TO_PLAYBACK_STATE
18
19if TYPE_CHECKING:
20 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
21 from pyheos import HeosPlayer as PyHeosPlayer
22
23 from .provider import HeosPlayerProvider
24
25
26PLAYER_FEATURES = {
27 PlayerFeature.VOLUME_SET,
28 PlayerFeature.VOLUME_MUTE,
29 PlayerFeature.PAUSE,
30 PlayerFeature.NEXT_PREVIOUS,
31 PlayerFeature.SELECT_SOURCE,
32 PlayerFeature.SET_MEMBERS,
33 PlayerFeature.PLAY_MEDIA,
34}
35
36
37class HeosPlayer(Player):
38 """HeosPlayer in Music Assistant."""
39
40 _heos: Heos
41 _device: PyHeosPlayer
42
43 @property
44 def requires_flow_mode(self) -> bool:
45 """Return if the player requires flow mode."""
46 return True
47
48 def __init__(self, provider: HeosPlayerProvider, device: PyHeosPlayer) -> None:
49 """Initialize the Player."""
50 super().__init__(provider, str(device.player_id))
51
52 self._device: PyHeosPlayer = device
53
54 if self._device.heos is None:
55 raise SetupFailedError("HEOS device has no controller assigned")
56
57 # Keep internal reference so we don't need to check None on each call
58 self._heos = self._device.heos
59
60 self._attr_type = PlayerType.PLAYER
61 self._attr_supported_features = PLAYER_FEATURES
62 self._attr_can_group_with = {self.provider.instance_id}
63
64 async def setup(self) -> None:
65 """Set up the player."""
66 self.set_device_info()
67 self.set_dynamic_attributes()
68
69 await self.mass.players.register_or_update(self)
70
71 self._on_unload_callbacks.append(
72 self._device.add_on_player_event(self._player_event_received)
73 )
74
75 await self.build_group_list()
76 await self.build_source_list()
77
78 def set_device_info(self) -> None:
79 """Set all device info attributes."""
80 # Extract manufacturer and model from device model string, if available
81 model_parts = self._device.model.split(maxsplit=1)
82 manufacturer = model_parts[0] if len(model_parts) == 2 else "HEOS"
83 model = model_parts[1] if len(model_parts) == 2 else self._device.model
84
85 _device_info = DeviceInfo(
86 model=model,
87 software_version=self._device.version,
88 manufacturer=manufacturer,
89 )
90 _device_info.ip_address = self._device.ip_address
91 self._attr_device_info = _device_info
92 self._attr_available = self._device.available
93 self._attr_name = self._device.name
94
95 async def build_group_list(self) -> None:
96 """Build group list based on group info from controller."""
97 # Group IDs are the player ID of the leader
98 if self._device.group_id is not None and str(self._device.group_id) == self.player_id:
99 group_info = await self._heos.get_group_info(self._device.group_id)
100 self._attr_group_members = [
101 str(group_info.lead_player_id),
102 *(str(member) for member in group_info.member_player_ids),
103 ]
104 else:
105 self._attr_group_members.clear()
106
107 self.update_state()
108
109 async def build_source_list(self) -> None:
110 """Build source list based on music source list, combined with player specific inputs."""
111 prov = cast("HeosPlayerProvider", self.provider)
112 self._attr_source_list = prov.music_source_list[:] # copy so we can modify
113
114 for input_source in prov.input_source_list:
115 # Only add input sources that belong to this player
116 if str(input_source.source_id) != self.player_id or input_source.media_id is None:
117 continue
118
119 self._attr_source_list.append(
120 PlayerSource(
121 id=input_source.media_id,
122 name=input_source.name,
123 can_play_pause=True,
124 )
125 )
126
127 self.update_state()
128
129 async def _player_event_received(self, event: str) -> None:
130 """Handle player device events."""
131 self.logger.debug("[%s] Event received: %s", self._device.name, event)
132
133 match event:
134 case const.EVENT_PLAYER_STATE_CHANGED:
135 self._update_player_state()
136
137 case const.EVENT_PLAYER_NOW_PLAYING_CHANGED:
138 self._update_player_current_media()
139 self._update_player_playing_progress()
140
141 case const.EVENT_PLAYER_NOW_PLAYING_PROGRESS:
142 self._update_player_playing_progress()
143
144 case const.EVENT_PLAYER_VOLUME_CHANGED:
145 self._update_player_volume()
146
147 case _:
148 # Update everything on other events
149 self.set_dynamic_attributes()
150
151 self.update_state()
152
153 def _update_player_volume(self) -> None:
154 """Update volume properties."""
155 self._attr_volume_level = self._device.volume
156 self._attr_volume_muted = self._device.is_muted
157
158 def _update_player_state(self) -> None:
159 """Update playback state."""
160 self._attr_playback_state = HEOS_PLAY_STATE_TO_PLAYBACK_STATE.get(
161 self._device.state, PlaybackState.UNKNOWN
162 )
163
164 def _update_player_current_media(self) -> None:
165 """Update current media properties."""
166 now_playing = self._device.now_playing_media
167
168 # Only update if we're not playing from our queue
169 # HEOS does not make a distinction on source ID when playing from a DLNA server, USB stick,
170 # generic URL (like MA), or other local source.
171 # We can only know we're playing from MA if we started this session.
172 if (now_playing.source_id != const.MUSIC_SOURCE_LOCAL_MUSIC) or (
173 self._attr_active_source != self.player_id
174 ):
175 self.logger.debug(
176 "[%s] Now playing changed externally: %s", self._device.name, now_playing
177 )
178
179 if now_playing.source_id == const.MUSIC_SOURCE_AUX_INPUT:
180 self._attr_active_source = str(now_playing.media_id)
181 else:
182 self._attr_active_source = str(now_playing.source_id)
183
184 self._attr_current_media = PlayerMedia(
185 uri=now_playing.media_id or media_uri_from_now_playing_media(now_playing),
186 media_type=HEOS_MEDIA_TYPE_TO_MEDIA_TYPE.get(
187 now_playing.type,
188 MediaType.UNKNOWN,
189 ),
190 title=now_playing.song,
191 artist=now_playing.artist,
192 album=now_playing.album,
193 image_url=now_playing.image_url,
194 duration=now_playing.duration,
195 source_id=str(now_playing.source_id),
196 elapsed_time=now_playing.current_position,
197 elapsed_time_last_updated=(
198 now_playing.current_position_updated.timestamp()
199 if now_playing.current_position_updated
200 else None
201 ),
202 # TODO: We can use custom_data to set the IDs
203 )
204
205 def _update_player_playing_progress(self) -> None:
206 """Update current media progress properties."""
207 now_playing = self._device.now_playing_media
208
209 self._attr_elapsed_time = (
210 now_playing.current_position / 1000 if now_playing.current_position else None
211 )
212 self._attr_elapsed_time_last_updated = (
213 now_playing.current_position_updated.timestamp()
214 if now_playing.current_position_updated
215 else None
216 )
217
218 def set_dynamic_attributes(self) -> None:
219 """Update all player dynamic attributes."""
220 self._update_player_volume()
221 self._update_player_state()
222 self._update_player_current_media()
223 self._update_player_playing_progress()
224
225 async def volume_set(self, volume_level: int) -> None:
226 """Handle VOLUME_SET command on the player."""
227 await self._device.set_volume(volume_level)
228
229 async def volume_mute(self, muted: bool) -> None:
230 """Handle VOLUME MUTE command on the player."""
231 if muted:
232 await self._device.mute()
233 else:
234 await self._device.unmute()
235
236 async def play(self) -> None:
237 """Handle PLAY command on the player."""
238 await self._device.play()
239
240 async def stop(self) -> None:
241 """Handle STOP command on the player."""
242 await self._device.stop()
243
244 async def pause(self) -> None:
245 """Handle PAUSE command on the player."""
246 await self._device.pause()
247
248 async def next_track(self) -> None:
249 """Handle NEXT_TRACK command on the player."""
250 await self._device.play_next()
251
252 async def previous_track(self) -> None:
253 """Handle PREVIOUS_TRACK command on the player."""
254 await self._device.play_previous()
255
256 async def play_media(self, media: PlayerMedia) -> None:
257 """Handle PLAY MEDIA command on given player."""
258 url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
259 await self._device.play_url(url)
260
261 self._attr_current_media = media
262 self._attr_active_source = self.player_id
263
264 self.update_state()
265
266 async def set_members(
267 self,
268 player_ids_to_add: list[str] | None = None,
269 player_ids_to_remove: list[str] | None = None,
270 ) -> None:
271 """Handle SET MEMBERS command on player."""
272 if player_ids_to_add is None and player_ids_to_remove is None:
273 return
274
275 members: list[str] = copy(self._attr_group_members)
276
277 # Make sure we are always in the group
278 if self.player_id not in members:
279 members = [self.player_id, *members]
280
281 for added_player_id in player_ids_to_add or []:
282 members.append(added_player_id)
283
284 for removed_player_id in player_ids_to_remove or []:
285 members.remove(removed_player_id)
286
287 if len(members) <= 1:
288 await self._heos.remove_group(self._device.player_id)
289 else:
290 await self._heos.set_group([int(player) for player in members])
291 # group_members will be updated when group_changed event is handled
292
293 async def get_config_entries(
294 self,
295 action: str | None = None,
296 values: dict[str, ConfigValueType] | None = None,
297 ) -> list[ConfigEntry]:
298 """Return all (provider/player specific) Config Entries for the player."""
299 return [
300 # Gen 1 devices, like HEOS Link, only support up to 48kHz/16bit
301 create_sample_rates_config_entry(
302 max_sample_rate=192000,
303 safe_max_sample_rate=48000,
304 max_bit_depth=24,
305 safe_max_bit_depth=16,
306 ),
307 ]
308