/
/
/
1"""Home Assistant Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from typing import TYPE_CHECKING, Any, cast
8
9from hass_client.exceptions import FailedCommand
10from music_assistant_models.enums import (
11 IdentifierType,
12 ImageType,
13 MediaType,
14 PlaybackState,
15 PlayerFeature,
16)
17from music_assistant_models.media_items import MediaItemImage
18
19from music_assistant.constants import (
20 CONF_ENTRY_ENABLE_ICY_METADATA_HIDDEN,
21 CONF_ENTRY_HTTP_PROFILE_FORCED_2,
22 CONF_ENTRY_OUTPUT_CODEC_DEFAULT_MP3,
23 HIDDEN_ANNOUNCE_VOLUME_CONFIG_ENTRIES,
24 create_output_codec_config_entry,
25 create_sample_rates_config_entry,
26)
27from music_assistant.helpers.datetime import from_iso_string
28from music_assistant.helpers.tags import async_parse_tags
29from music_assistant.models.player import DeviceInfo, Player, PlayerMedia, PlayerSource
30from music_assistant.models.player_provider import PlayerProvider
31from music_assistant.providers.hass.constants import (
32 OFF_STATES,
33 UNAVAILABLE_STATES,
34 MediaPlayerEntityFeature,
35 StateMap,
36)
37
38from .constants import CONF_ENTRY_WARN_HASS_INTEGRATION, WARN_HASS_INTEGRATIONS
39from .helpers import ESPHomeSupportedAudioFormat
40
41if TYPE_CHECKING:
42 from hass_client import HomeAssistantClient
43 from hass_client.models import CompressedState
44 from hass_client.models import Entity as HassEntity
45 from hass_client.models import State as HassState
46 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
47
48 from .provider import HomeAssistantPlayerProvider
49
50
51DEFAULT_PLAYER_CONFIG_ENTRIES = (CONF_ENTRY_OUTPUT_CODEC_DEFAULT_MP3,)
52
53
54class HomeAssistantPlayer(Player):
55 """Home Assistant Player implementation."""
56
57 def __init__(
58 self,
59 provider: PlayerProvider,
60 hass: HomeAssistantClient,
61 player_id: str,
62 hass_state: HassState,
63 dev_info: dict[str, Any],
64 extra_player_data: dict[str, Any],
65 entity_registry: dict[str, HassEntity],
66 ) -> None:
67 """Initialize the Home Assistant Player."""
68 super().__init__(provider, player_id)
69 self.hass = hass
70 self.hass_state = hass_state
71 self._extra_data = extra_player_data
72 # Set base attributes from Home Assistant state
73 self._attr_available = hass_state["state"] not in UNAVAILABLE_STATES
74 self._attr_device_info = DeviceInfo(
75 model=dev_info.get("model", ""),
76 manufacturer=dev_info.get("manufacturer", ""),
77 software_version=dev_info.get("software_version"),
78 )
79 if mac_address := dev_info.get("mac_address"):
80 self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac_address)
81 self._attr_playback_state = StateMap.get(hass_state["state"], PlaybackState.IDLE)
82 # Work out supported features
83 self._attr_supported_features = {PlayerFeature.PLAY_MEDIA}
84 hass_supported_features = MediaPlayerEntityFeature(
85 hass_state["attributes"]["supported_features"]
86 )
87 if MediaPlayerEntityFeature.VOLUME_SET in hass_supported_features:
88 self._attr_supported_features.add(PlayerFeature.VOLUME_SET)
89 if MediaPlayerEntityFeature.VOLUME_MUTE in hass_supported_features:
90 self._attr_supported_features.add(PlayerFeature.VOLUME_MUTE)
91 if MediaPlayerEntityFeature.MEDIA_ANNOUNCE in hass_supported_features:
92 self._attr_supported_features.add(PlayerFeature.PLAY_ANNOUNCEMENT)
93 hass_domain = extra_player_data.get("hass_domain")
94 if hass_domain and MediaPlayerEntityFeature.GROUPING in hass_supported_features:
95 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
96 self._attr_can_group_with = {
97 x["entity_id"]
98 for x in entity_registry.values()
99 if x["entity_id"].startswith("media_player") and x["platform"] == hass_domain
100 }
101 if (
102 MediaPlayerEntityFeature.TURN_ON in hass_supported_features
103 and MediaPlayerEntityFeature.TURN_OFF in hass_supported_features
104 ):
105 self._attr_supported_features.add(PlayerFeature.POWER)
106 self._attr_powered = hass_state["state"] not in OFF_STATES
107
108 self.extra_data["hass_supported_features"] = hass_supported_features
109 self._hass_attributes: dict[str, Any] = {}
110
111 # Add External source to support next/prev commands when playing external content
112 self._attr_source_list.append(
113 PlayerSource(
114 id="External",
115 name="External Source",
116 passive=True,
117 )
118 )
119 # Set dynamic features (PAUSE, NEXT_PREVIOUS, SEEK) via shared helper
120 self._update_hass_features(hass_supported_features)
121
122 self._update_attributes(hass_state["attributes"])
123
124 @property
125 def requires_flow_mode(self) -> bool:
126 """Return if the player requires flow mode."""
127 # hass media players are a hot mess so play it safe and always use flow mode
128 return True
129
130 async def get_config_entries(
131 self,
132 action: str | None = None,
133 values: dict[str, ConfigValueType] | None = None,
134 ) -> list[ConfigEntry]:
135 """Return all (provider/player specific) Config Entries for the player."""
136 base_entries = [*DEFAULT_PLAYER_CONFIG_ENTRIES]
137 if self.extra_data.get("esphome_supported_audio_formats"):
138 # optimized config for new ESPHome mediaplayer
139 supported_sample_rates: list[int] = []
140 supported_bit_depths: list[int] = []
141 codec: str | None = None
142 supported_formats: list[ESPHomeSupportedAudioFormat] = self.extra_data[
143 "esphome_supported_audio_formats"
144 ]
145 # sort on purpose field, so we prefer the media pipeline
146 # but allows fallback to announcements pipeline if no media pipeline is available
147 supported_formats.sort(key=lambda x: x["purpose"])
148 for supported_format in supported_formats:
149 codec = supported_format["format"]
150 if supported_format["sample_rate"] not in supported_sample_rates:
151 supported_sample_rates.append(supported_format["sample_rate"])
152 bit_depth = (supported_format["sample_bytes"] or 2) * 8
153 if bit_depth not in supported_bit_depths:
154 supported_bit_depths.append(bit_depth)
155 if not supported_sample_rates or not supported_bit_depths:
156 # esphome device with no media pipeline configured
157 # simply use the default config of the media pipeline
158 supported_sample_rates = [48000]
159 supported_bit_depths = [16]
160
161 config_entries = [
162 *base_entries,
163 # New ESPHome mediaplayer (used in Voice PE) uses FLAC 48khz/16 bits
164 CONF_ENTRY_HTTP_PROFILE_FORCED_2,
165 ]
166
167 if codec is not None:
168 config_entries.append(create_output_codec_config_entry(True, codec))
169
170 config_entries.extend(
171 [
172 CONF_ENTRY_ENABLE_ICY_METADATA_HIDDEN,
173 create_sample_rates_config_entry(
174 supported_sample_rates=supported_sample_rates,
175 supported_bit_depths=supported_bit_depths,
176 hidden=True,
177 ),
178 # although the Voice PE supports announcements,
179 # it does not support volume for announcements
180 *HIDDEN_ANNOUNCE_VOLUME_CONFIG_ENTRIES,
181 ]
182 )
183
184 return config_entries
185
186 # add alert if player is a known player type that has a native provider in MA
187 if self.extra_data.get("hass_domain") in WARN_HASS_INTEGRATIONS:
188 base_entries = [CONF_ENTRY_WARN_HASS_INTEGRATION, *base_entries]
189
190 return base_entries
191
192 async def play(self) -> None:
193 """Handle PLAY command on the player."""
194 await self.hass.call_service(
195 domain="media_player",
196 service="media_play",
197 target={"entity_id": self.player_id},
198 )
199
200 async def pause(self) -> None:
201 """Handle PAUSE command on the player."""
202 await self.hass.call_service(
203 domain="media_player",
204 service="media_pause",
205 target={"entity_id": self.player_id},
206 )
207
208 async def stop(self) -> None:
209 """Send STOP command to player."""
210 try:
211 await self.hass.call_service(
212 domain="media_player",
213 service="media_stop",
214 target={"entity_id": self.player_id},
215 )
216 except FailedCommand as exc:
217 # some HA players do not support STOP
218 if "does not support" not in str(exc):
219 raise
220 if PlayerFeature.PAUSE in self.supported_features:
221 await self.pause()
222 finally:
223 self._attr_current_media = None
224 self.update_state()
225
226 async def volume_set(self, volume_level: int) -> None:
227 """Handle VOLUME_SET command on the player."""
228 await self.hass.call_service(
229 domain="media_player",
230 service="volume_set",
231 target={"entity_id": self.player_id},
232 service_data={"volume_level": volume_level / 100},
233 )
234
235 async def volume_mute(self, muted: bool) -> None:
236 """Handle VOLUME MUTE command on the player."""
237 await self.hass.call_service(
238 domain="media_player",
239 service="volume_mute",
240 target={"entity_id": self.player_id},
241 service_data={"is_volume_muted": muted},
242 )
243
244 async def power(self, powered: bool) -> None:
245 """Handle POWER command on the player."""
246 await self.hass.call_service(
247 domain="media_player",
248 service="turn_on" if powered else "turn_off",
249 target={"entity_id": self.player_id},
250 )
251
252 async def next_track(self) -> None:
253 """Handle NEXT_TRACK command on the player."""
254 await self.hass.call_service(
255 domain="media_player",
256 service="media_next_track",
257 target={"entity_id": self.player_id},
258 )
259
260 async def previous_track(self) -> None:
261 """Handle PREVIOUS_TRACK command on the player."""
262 await self.hass.call_service(
263 domain="media_player",
264 service="media_previous_track",
265 target={"entity_id": self.player_id},
266 )
267
268 async def play_media(self, media: PlayerMedia) -> None:
269 """Handle PLAY MEDIA on given player."""
270 url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
271 extra_data: dict[str, Any] = {
272 # passing metadata to the player
273 # so far only supported by google cast, but maybe others can follow
274 "metadata": {
275 "title": media.title,
276 "artist": media.artist,
277 "metadataType": 3,
278 "album": media.album,
279 "albumName": media.album,
280 "images": [{"url": media.image_url}] if media.image_url else None,
281 "imageUrl": media.image_url,
282 "duration": media.duration,
283 },
284 }
285 if self.extra_data.get("hass_domain") == "esphome":
286 # tell esphome mediaproxy to bypass the proxy,
287 # as MA already delivers an optimized stream
288 extra_data["bypass_proxy"] = True
289
290 # stop the player if it is already playing
291 if self._attr_playback_state == PlaybackState.PLAYING:
292 await self.stop()
293
294 await self.hass.call_service(
295 domain="media_player",
296 service="play_media",
297 target={"entity_id": self.player_id},
298 service_data={
299 "media_content_id": url,
300 "media_content_type": "music",
301 "enqueue": "replace",
302 "extra": extra_data,
303 },
304 )
305
306 # Optimistically update state
307 self._attr_current_media = media
308 self._attr_elapsed_time = 0
309 self._attr_elapsed_time_last_updated = time.time()
310 self._attr_playback_state = PlaybackState.PLAYING
311 self.update_state()
312
313 async def play_announcement(
314 self, announcement: PlayerMedia, volume_level: int | None = None
315 ) -> None:
316 """Handle (provider native) playback of an announcement on given player."""
317 self.logger.info(
318 "Playing announcement %s on %s",
319 announcement.uri,
320 self.display_name,
321 )
322 if volume_level is not None:
323 self.logger.warning(
324 "Announcement volume level is not supported for player %s",
325 self.display_name,
326 )
327 await self.hass.call_service(
328 domain="media_player",
329 service="play_media",
330 service_data={
331 "media_content_id": announcement.uri,
332 "media_content_type": "music",
333 "announce": True,
334 },
335 target={"entity_id": self.player_id},
336 )
337 # Wait until the announcement is finished playing
338 # This is helpful for people who want to play announcements in a sequence
339 media_info = await async_parse_tags(announcement.uri, require_duration=True)
340 duration = media_info.duration or 5
341 await asyncio.sleep(duration)
342 self.logger.debug(
343 "Playing announcement on %s completed",
344 self.display_name,
345 )
346
347 async def set_members(
348 self,
349 player_ids_to_add: list[str] | None = None,
350 player_ids_to_remove: list[str] | None = None,
351 ) -> None:
352 """
353 Handle SET_MEMBERS command on the player.
354
355 Group or ungroup the given child player(s) to/from this player.
356 Will only be called if the PlayerFeature.SET_MEMBERS is supported.
357
358 :param player_ids_to_add: List of player_id's to add to the group.
359 :param player_ids_to_remove: List of player_id's to remove from the group.
360 """
361 for player_id_to_remove in player_ids_to_remove or []:
362 await self.hass.call_service(
363 domain="media_player",
364 service="unjoin",
365 target={"entity_id": player_id_to_remove},
366 )
367 if player_ids_to_add:
368 await self.hass.call_service(
369 domain="media_player",
370 service="join",
371 service_data={"group_members": player_ids_to_add},
372 target={"entity_id": self.player_id},
373 )
374
375 def update_from_compressed_state(self, state: CompressedState) -> None:
376 """Handle updating the player with updated info in a HA CompressedState."""
377 if "s" in state:
378 self._attr_playback_state = StateMap.get(state["s"], PlaybackState.IDLE)
379 self._attr_available = state["s"] not in UNAVAILABLE_STATES
380 if PlayerFeature.POWER in self.supported_features:
381 self._attr_powered = state["s"] not in OFF_STATES
382 if "a" in state:
383 self._update_attributes(state["a"])
384 self.update_state()
385
386 def _update_hass_features(self, hass_supported_features: MediaPlayerEntityFeature) -> None:
387 """Update player and External source features based on HA supported features."""
388 # Update player supported features for PAUSE and NEXT_PREVIOUS
389 if MediaPlayerEntityFeature.PAUSE in hass_supported_features:
390 self._attr_supported_features.add(PlayerFeature.PAUSE)
391 else:
392 self._attr_supported_features.discard(PlayerFeature.PAUSE)
393
394 has_next_prev = (
395 MediaPlayerEntityFeature.NEXT_TRACK in hass_supported_features
396 or MediaPlayerEntityFeature.PREVIOUS_TRACK in hass_supported_features
397 )
398 if has_next_prev:
399 self._attr_supported_features.add(PlayerFeature.NEXT_PREVIOUS)
400 else:
401 self._attr_supported_features.discard(PlayerFeature.NEXT_PREVIOUS)
402
403 # Update the External source capabilities
404 for source in self._attr_source_list:
405 if source.id == "External":
406 source.can_play_pause = MediaPlayerEntityFeature.PAUSE in hass_supported_features
407 source.can_next_previous = has_next_prev
408 source.can_seek = MediaPlayerEntityFeature.SEEK in hass_supported_features
409 break
410
411 def _update_attributes(self, attributes: dict[str, Any]) -> None:
412 """Update Player attributes from HA state attributes."""
413 self._hass_attributes.update(attributes)
414
415 # process optional attributes - these may not be present in all states
416 for key, value in attributes.items():
417 if key == "friendly_name":
418 self._attr_name = value
419 elif key == "media_position":
420 self._attr_elapsed_time = value
421 elif key == "media_position_updated_at":
422 self._attr_elapsed_time_last_updated = from_iso_string(value).timestamp()
423 elif key == "volume_level":
424 self._attr_volume_level = int(value * 100)
425 elif key == "is_volume_muted":
426 self._attr_volume_muted = value
427 elif key == "group_members":
428 group_members: list[str] = (
429 [
430 # ignore integrations that incorrectly set the group members attribute
431 # (e.g. linkplay)
432 x
433 for x in value
434 if x.startswith("media_player.")
435 ]
436 if value
437 else []
438 )
439 if group_members and group_members[0] == self.player_id:
440 # first in the list is the group leader
441 self._attr_group_members = group_members
442 elif group_members and group_members[0] != self.player_id:
443 # this player is not the group leader
444 self._attr_group_members.clear()
445 else:
446 self._attr_group_members.clear()
447 elif key == "supported_features":
448 # Update supported features dynamically via shared helper
449 hass_supported_features = MediaPlayerEntityFeature(value)
450 self.extra_data["hass_supported_features"] = hass_supported_features
451 self._update_hass_features(hass_supported_features)
452
453 # Check for external playback (not from Music Assistant).
454 # Without media_content_id we cannot reliably determine the source,
455 # so we later only react to state updates that include it.
456 media_content_id = self._hass_attributes.get("media_content_id", "")
457 is_ma_playback = media_content_id.startswith(self.mass.streams.base_url)
458 media_title = self._hass_attributes.get("media_title")
459
460 if media_content_id and is_ma_playback:
461 # MA playback - ensure active_source points to player_id for queue lookup.
462 # The actual current_media will be set by MA's queue controller.
463 self._attr_active_source = None
464 elif (
465 media_content_id
466 and media_title
467 and self.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED)
468 ):
469 # External playback detected - set current_media from HA attributes
470 ha_content_type = self._hass_attributes.get("media_content_type", "")
471 media_type = MediaType.RADIO if ha_content_type == "radio" else MediaType.UNKNOWN
472 current_media = PlayerMedia(
473 uri=media_content_id,
474 media_type=media_type,
475 title=media_title,
476 artist=self._hass_attributes.get("media_artist"),
477 album=self._hass_attributes.get("media_album_name"),
478 image_url=self._get_image_url(self._hass_attributes),
479 duration=int(self._hass_attributes.get("media_duration", 0) or 0) or None,
480 )
481 self._attr_current_media = current_media
482 self._attr_active_source = "External"
483
484 elif self.playback_state == PlaybackState.IDLE:
485 # Clear external media if it was set
486 if self._attr_active_source and self._attr_active_source not in (
487 self.player_id,
488 None,
489 ):
490 self._attr_current_media = None
491 self._attr_active_source = None
492
493 def _get_image_url(self, attributes: dict[str, Any]) -> str | None:
494 """Get the image URL from the attributes."""
495 if entity_picture := attributes.get("entity_picture"):
496 entity_picture = str(entity_picture)
497 if entity_picture.startswith("http"):
498 return entity_picture
499
500 # Access via provider -> hass_prov
501 prov = cast("HomeAssistantPlayerProvider", self.provider)
502
503 # Use proxy for internal HA images
504 # We create a MediaItemImage with the hass provider as source
505 # This will trigger resolve_image on the hass provider when requested
506 image = MediaItemImage(
507 type=ImageType.THUMB,
508 path=entity_picture,
509 provider=prov.hass_prov.instance_id,
510 remotely_accessible=False,
511 )
512 return self.mass.metadata.get_image_url(image)
513 return None
514