/
/
/
1"""Home Assistant Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from typing import TYPE_CHECKING, Any
8
9from hass_client.exceptions import FailedCommand
10from music_assistant_models.enums import PlaybackState, PlayerFeature, PlayerType
11
12from music_assistant.constants import (
13 CONF_ENTRY_ENABLE_ICY_METADATA_HIDDEN,
14 CONF_ENTRY_HTTP_PROFILE_FORCED_2,
15 CONF_ENTRY_OUTPUT_CODEC_DEFAULT_MP3,
16 HIDDEN_ANNOUNCE_VOLUME_CONFIG_ENTRIES,
17 create_output_codec_config_entry,
18 create_sample_rates_config_entry,
19)
20from music_assistant.helpers.datetime import from_iso_string
21from music_assistant.helpers.tags import async_parse_tags
22from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
23from music_assistant.models.player_provider import PlayerProvider
24from music_assistant.providers.hass.constants import (
25 OFF_STATES,
26 UNAVAILABLE_STATES,
27 MediaPlayerEntityFeature,
28 StateMap,
29)
30
31from .constants import CONF_ENTRY_WARN_HASS_INTEGRATION, WARN_HASS_INTEGRATIONS
32from .helpers import ESPHomeSupportedAudioFormat
33
34if TYPE_CHECKING:
35 from hass_client import HomeAssistantClient
36 from hass_client.models import CompressedState
37 from hass_client.models import Entity as HassEntity
38 from hass_client.models import State as HassState
39 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
40
41
42DEFAULT_PLAYER_CONFIG_ENTRIES = (CONF_ENTRY_OUTPUT_CODEC_DEFAULT_MP3,)
43
44
45class HomeAssistantPlayer(Player):
46 """Home Assistant Player implementation."""
47
48 _attr_type = PlayerType.PLAYER
49
50 def __init__(
51 self,
52 provider: PlayerProvider,
53 hass: HomeAssistantClient,
54 player_id: str,
55 hass_state: HassState,
56 dev_info: dict[str, Any],
57 extra_player_data: dict[str, Any],
58 entity_registry: dict[str, HassEntity],
59 ) -> None:
60 """Initialize the Home Assistant Player."""
61 super().__init__(provider, player_id)
62 self.hass = hass
63 self.hass_state = hass_state
64 self._extra_data = extra_player_data
65 # Set base attributes from Home Assistant state
66 self._attr_available = hass_state["state"] not in UNAVAILABLE_STATES
67 self._attr_device_info = DeviceInfo.from_dict(dev_info)
68 self._attr_playback_state = StateMap.get(hass_state["state"], PlaybackState.IDLE)
69 # Work out supported features
70 self._attr_supported_features = set()
71 hass_supported_features = MediaPlayerEntityFeature(
72 hass_state["attributes"]["supported_features"]
73 )
74 if MediaPlayerEntityFeature.PAUSE in hass_supported_features:
75 self._attr_supported_features.add(PlayerFeature.PAUSE)
76 if MediaPlayerEntityFeature.VOLUME_SET in hass_supported_features:
77 self._attr_supported_features.add(PlayerFeature.VOLUME_SET)
78 if MediaPlayerEntityFeature.VOLUME_MUTE in hass_supported_features:
79 self._attr_supported_features.add(PlayerFeature.VOLUME_MUTE)
80 if MediaPlayerEntityFeature.MEDIA_ANNOUNCE in hass_supported_features:
81 self._attr_supported_features.add(PlayerFeature.PLAY_ANNOUNCEMENT)
82 hass_domain = extra_player_data.get("hass_domain")
83 if hass_domain and MediaPlayerEntityFeature.GROUPING in hass_supported_features:
84 self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
85 self._attr_can_group_with = {
86 x["entity_id"]
87 for x in entity_registry.values()
88 if x["entity_id"].startswith("media_player") and x["platform"] == hass_domain
89 }
90 if (
91 MediaPlayerEntityFeature.TURN_ON in hass_supported_features
92 and MediaPlayerEntityFeature.TURN_OFF in hass_supported_features
93 ):
94 self._attr_supported_features.add(PlayerFeature.POWER)
95 self._attr_powered = hass_state["state"] not in OFF_STATES
96
97 self.extra_data["hass_supported_features"] = hass_supported_features
98 self._update_attributes(hass_state["attributes"])
99
100 @property
101 def requires_flow_mode(self) -> bool:
102 """Return if the player requires flow mode."""
103 # hass media players are a hot mess so play it safe and always use flow mode
104 return True
105
106 async def get_config_entries(
107 self,
108 action: str | None = None,
109 values: dict[str, ConfigValueType] | None = None,
110 ) -> list[ConfigEntry]:
111 """Return all (provider/player specific) Config Entries for the player."""
112 base_entries = [*DEFAULT_PLAYER_CONFIG_ENTRIES]
113 if self.extra_data.get("esphome_supported_audio_formats"):
114 # optimized config for new ESPHome mediaplayer
115 supported_sample_rates: list[int] = []
116 supported_bit_depths: list[int] = []
117 codec: str | None = None
118 supported_formats: list[ESPHomeSupportedAudioFormat] = self.extra_data[
119 "esphome_supported_audio_formats"
120 ]
121 # sort on purpose field, so we prefer the media pipeline
122 # but allows fallback to announcements pipeline if no media pipeline is available
123 supported_formats.sort(key=lambda x: x["purpose"])
124 for supported_format in supported_formats:
125 codec = supported_format["format"]
126 if supported_format["sample_rate"] not in supported_sample_rates:
127 supported_sample_rates.append(supported_format["sample_rate"])
128 bit_depth = (supported_format["sample_bytes"] or 2) * 8
129 if bit_depth not in supported_bit_depths:
130 supported_bit_depths.append(bit_depth)
131 if not supported_sample_rates or not supported_bit_depths:
132 # esphome device with no media pipeline configured
133 # simply use the default config of the media pipeline
134 supported_sample_rates = [48000]
135 supported_bit_depths = [16]
136
137 config_entries = [
138 *base_entries,
139 # New ESPHome mediaplayer (used in Voice PE) uses FLAC 48khz/16 bits
140 CONF_ENTRY_HTTP_PROFILE_FORCED_2,
141 ]
142
143 if codec is not None:
144 config_entries.append(create_output_codec_config_entry(True, codec))
145
146 config_entries.extend(
147 [
148 CONF_ENTRY_ENABLE_ICY_METADATA_HIDDEN,
149 create_sample_rates_config_entry(
150 supported_sample_rates=supported_sample_rates,
151 supported_bit_depths=supported_bit_depths,
152 hidden=True,
153 ),
154 # although the Voice PE supports announcements,
155 # it does not support volume for announcements
156 *HIDDEN_ANNOUNCE_VOLUME_CONFIG_ENTRIES,
157 ]
158 )
159
160 return config_entries
161
162 # add alert if player is a known player type that has a native provider in MA
163 if self.extra_data.get("hass_domain") in WARN_HASS_INTEGRATIONS:
164 base_entries = [CONF_ENTRY_WARN_HASS_INTEGRATION, *base_entries]
165
166 return base_entries
167
168 async def play(self) -> None:
169 """Handle PLAY command on the player."""
170 await self.hass.call_service(
171 domain="media_player",
172 service="media_play",
173 target={"entity_id": self.player_id},
174 )
175
176 async def pause(self) -> None:
177 """Handle PAUSE command on the player."""
178 await self.hass.call_service(
179 domain="media_player",
180 service="media_pause",
181 target={"entity_id": self.player_id},
182 )
183
184 async def stop(self) -> None:
185 """Send STOP command to player."""
186 try:
187 await self.hass.call_service(
188 domain="media_player",
189 service="media_stop",
190 target={"entity_id": self.player_id},
191 )
192 except FailedCommand as exc:
193 # some HA players do not support STOP
194 if "does not support" not in str(exc):
195 raise
196 if PlayerFeature.PAUSE in self.supported_features:
197 await self.pause()
198 finally:
199 self._attr_current_media = None
200 self.update_state()
201
202 async def volume_set(self, volume_level: int) -> None:
203 """Handle VOLUME_SET command on the player."""
204 await self.hass.call_service(
205 domain="media_player",
206 service="volume_set",
207 target={"entity_id": self.player_id},
208 service_data={"volume_level": volume_level / 100},
209 )
210
211 async def volume_mute(self, muted: bool) -> None:
212 """Handle VOLUME MUTE command on the player."""
213 await self.hass.call_service(
214 domain="media_player",
215 service="volume_mute",
216 target={"entity_id": self.player_id},
217 service_data={"is_volume_muted": muted},
218 )
219
220 async def power(self, powered: bool) -> None:
221 """Handle POWER command on the player."""
222 await self.hass.call_service(
223 domain="media_player",
224 service="turn_on" if powered else "turn_off",
225 target={"entity_id": self.player_id},
226 )
227
228 async def play_media(self, media: PlayerMedia) -> None:
229 """Handle PLAY MEDIA on given player."""
230 extra_data: dict[str, Any] = {
231 # passing metadata to the player
232 # so far only supported by google cast, but maybe others can follow
233 "metadata": {
234 "title": media.title,
235 "artist": media.artist,
236 "metadataType": 3,
237 "album": media.album,
238 "albumName": media.album,
239 "images": [{"url": media.image_url}] if media.image_url else None,
240 "imageUrl": media.image_url,
241 "duration": media.duration,
242 },
243 }
244 if self.extra_data.get("hass_domain") == "esphome":
245 # tell esphome mediaproxy to bypass the proxy,
246 # as MA already delivers an optimized stream
247 extra_data["bypass_proxy"] = True
248
249 # stop the player if it is already playing
250 if self.playback_state == PlaybackState.PLAYING:
251 await self.stop()
252
253 await self.hass.call_service(
254 domain="media_player",
255 service="play_media",
256 target={"entity_id": self.player_id},
257 service_data={
258 "media_content_id": media.uri,
259 "media_content_type": "music",
260 "enqueue": "replace",
261 "extra": extra_data,
262 },
263 )
264
265 # Optimistically update state
266 self._attr_current_media = media
267 self._attr_elapsed_time = 0
268 self._attr_elapsed_time_last_updated = time.time()
269 self._attr_playback_state = PlaybackState.PLAYING
270 self.update_state()
271
272 async def play_announcement(
273 self, announcement: PlayerMedia, volume_level: int | None = None
274 ) -> None:
275 """Handle (provider native) playback of an announcement on given player."""
276 self.logger.info(
277 "Playing announcement %s on %s",
278 announcement.uri,
279 self.display_name,
280 )
281 if volume_level is not None:
282 self.logger.warning(
283 "Announcement volume level is not supported for player %s",
284 self.display_name,
285 )
286 await self.hass.call_service(
287 domain="media_player",
288 service="play_media",
289 service_data={
290 "media_content_id": announcement.uri,
291 "media_content_type": "music",
292 "announce": True,
293 },
294 target={"entity_id": self.player_id},
295 )
296 # Wait until the announcement is finished playing
297 # This is helpful for people who want to play announcements in a sequence
298 media_info = await async_parse_tags(announcement.uri, require_duration=True)
299 duration = media_info.duration or 5
300 await asyncio.sleep(duration)
301 self.logger.debug(
302 "Playing announcement on %s completed",
303 self.display_name,
304 )
305
306 async def set_members(
307 self,
308 player_ids_to_add: list[str] | None = None,
309 player_ids_to_remove: list[str] | None = None,
310 ) -> None:
311 """
312 Handle SET_MEMBERS command on the player.
313
314 Group or ungroup the given child player(s) to/from this player.
315 Will only be called if the PlayerFeature.SET_MEMBERS is supported.
316
317 :param player_ids_to_add: List of player_id's to add to the group.
318 :param player_ids_to_remove: List of player_id's to remove from the group.
319 """
320 for player_id_to_remove in player_ids_to_remove or []:
321 await self.hass.call_service(
322 domain="media_player",
323 service="unjoin",
324 target={"entity_id": player_id_to_remove},
325 )
326 if player_ids_to_add:
327 await self.hass.call_service(
328 domain="media_player",
329 service="join",
330 service_data={"group_members": player_ids_to_add},
331 target={"entity_id": self.player_id},
332 )
333
334 def update_from_compressed_state(self, state: CompressedState) -> None:
335 """Handle updating the player with updated info in a HA CompressedState."""
336 if "s" in state:
337 self._attr_playback_state = StateMap.get(state["s"], PlaybackState.IDLE)
338 self._attr_available = state["s"] not in UNAVAILABLE_STATES
339 if PlayerFeature.POWER in self.supported_features:
340 self._attr_powered = state["s"] not in OFF_STATES
341 if "a" in state:
342 self._update_attributes(state["a"])
343 self.update_state()
344
345 def _update_attributes(self, attributes: dict[str, Any]) -> None:
346 """Update Player attributes from HA state attributes."""
347 # process optional attributes - these may not be present in all states
348 for key, value in attributes.items():
349 if key == "friendly_name":
350 self._attr_name = value
351 elif key == "media_position":
352 self._attr_elapsed_time = value
353 elif key == "media_position_updated_at":
354 self._attr_elapsed_time_last_updated = from_iso_string(value).timestamp()
355 elif key == "volume_level":
356 self._attr_volume_level = int(value * 100)
357 elif key == "is_volume_muted":
358 self._attr_volume_muted = value
359 elif key == "group_members":
360 group_members: list[str] = (
361 [
362 # ignore integrations that incorrectly set the group members attribute
363 # (e.g. linkplay)
364 x
365 for x in value
366 if x.startswith("media_player.")
367 ]
368 if value
369 else []
370 )
371 if group_members and group_members[0] == self.player_id:
372 # first in the list is the group leader
373 self._attr_group_members = group_members
374 elif group_members and group_members[0] != self.player_id:
375 # this player is not the group leader
376 self._attr_group_members.clear()
377 else:
378 self._attr_group_members.clear()
379