music-assistant-server

21.3 KBPY
player.py
21.3 KB514 lines • python
1"""Home Assistant Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from typing import TYPE_CHECKING, Any, cast
8
9from hass_client.exceptions import FailedCommand
10from music_assistant_models.enums import (
11    IdentifierType,
12    ImageType,
13    MediaType,
14    PlaybackState,
15    PlayerFeature,
16)
17from music_assistant_models.media_items import MediaItemImage
18
19from music_assistant.constants import (
20    CONF_ENTRY_ENABLE_ICY_METADATA_HIDDEN,
21    CONF_ENTRY_HTTP_PROFILE_FORCED_2,
22    CONF_ENTRY_OUTPUT_CODEC_DEFAULT_MP3,
23    HIDDEN_ANNOUNCE_VOLUME_CONFIG_ENTRIES,
24    create_output_codec_config_entry,
25    create_sample_rates_config_entry,
26)
27from music_assistant.helpers.datetime import from_iso_string
28from music_assistant.helpers.tags import async_parse_tags
29from music_assistant.models.player import DeviceInfo, Player, PlayerMedia, PlayerSource
30from music_assistant.models.player_provider import PlayerProvider
31from music_assistant.providers.hass.constants import (
32    OFF_STATES,
33    UNAVAILABLE_STATES,
34    MediaPlayerEntityFeature,
35    StateMap,
36)
37
38from .constants import CONF_ENTRY_WARN_HASS_INTEGRATION, WARN_HASS_INTEGRATIONS
39from .helpers import ESPHomeSupportedAudioFormat
40
41if TYPE_CHECKING:
42    from hass_client import HomeAssistantClient
43    from hass_client.models import CompressedState
44    from hass_client.models import Entity as HassEntity
45    from hass_client.models import State as HassState
46    from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
47
48    from .provider import HomeAssistantPlayerProvider
49
50
51DEFAULT_PLAYER_CONFIG_ENTRIES = (CONF_ENTRY_OUTPUT_CODEC_DEFAULT_MP3,)
52
53
54class HomeAssistantPlayer(Player):
55    """Home Assistant Player implementation."""
56
57    def __init__(
58        self,
59        provider: PlayerProvider,
60        hass: HomeAssistantClient,
61        player_id: str,
62        hass_state: HassState,
63        dev_info: dict[str, Any],
64        extra_player_data: dict[str, Any],
65        entity_registry: dict[str, HassEntity],
66    ) -> None:
67        """Initialize the Home Assistant Player."""
68        super().__init__(provider, player_id)
69        self.hass = hass
70        self.hass_state = hass_state
71        self._extra_data = extra_player_data
72        # Set base attributes from Home Assistant state
73        self._attr_available = hass_state["state"] not in UNAVAILABLE_STATES
74        self._attr_device_info = DeviceInfo(
75            model=dev_info.get("model", ""),
76            manufacturer=dev_info.get("manufacturer", ""),
77            software_version=dev_info.get("software_version"),
78        )
79        if mac_address := dev_info.get("mac_address"):
80            self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac_address)
81        self._attr_playback_state = StateMap.get(hass_state["state"], PlaybackState.IDLE)
82        # Work out supported features
83        self._attr_supported_features = {PlayerFeature.PLAY_MEDIA}
84        hass_supported_features = MediaPlayerEntityFeature(
85            hass_state["attributes"]["supported_features"]
86        )
87        if MediaPlayerEntityFeature.VOLUME_SET in hass_supported_features:
88            self._attr_supported_features.add(PlayerFeature.VOLUME_SET)
89        if MediaPlayerEntityFeature.VOLUME_MUTE in hass_supported_features:
90            self._attr_supported_features.add(PlayerFeature.VOLUME_MUTE)
91        if MediaPlayerEntityFeature.MEDIA_ANNOUNCE in hass_supported_features:
92            self._attr_supported_features.add(PlayerFeature.PLAY_ANNOUNCEMENT)
93        hass_domain = extra_player_data.get("hass_domain")
94        if hass_domain and MediaPlayerEntityFeature.GROUPING in hass_supported_features:
95            self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
96            self._attr_can_group_with = {
97                x["entity_id"]
98                for x in entity_registry.values()
99                if x["entity_id"].startswith("media_player") and x["platform"] == hass_domain
100            }
101        if (
102            MediaPlayerEntityFeature.TURN_ON in hass_supported_features
103            and MediaPlayerEntityFeature.TURN_OFF in hass_supported_features
104        ):
105            self._attr_supported_features.add(PlayerFeature.POWER)
106            self._attr_powered = hass_state["state"] not in OFF_STATES
107
108        self.extra_data["hass_supported_features"] = hass_supported_features
109        self._hass_attributes: dict[str, Any] = {}
110
111        # Add External source to support next/prev commands when playing external content
112        self._attr_source_list.append(
113            PlayerSource(
114                id="External",
115                name="External Source",
116                passive=True,
117            )
118        )
119        # Set dynamic features (PAUSE, NEXT_PREVIOUS, SEEK) via shared helper
120        self._update_hass_features(hass_supported_features)
121
122        self._update_attributes(hass_state["attributes"])
123
124    @property
125    def requires_flow_mode(self) -> bool:
126        """Return if the player requires flow mode."""
127        # hass media players are a hot mess so play it safe and always use flow mode
128        return True
129
130    async def get_config_entries(
131        self,
132        action: str | None = None,
133        values: dict[str, ConfigValueType] | None = None,
134    ) -> list[ConfigEntry]:
135        """Return all (provider/player specific) Config Entries for the player."""
136        base_entries = [*DEFAULT_PLAYER_CONFIG_ENTRIES]
137        if self.extra_data.get("esphome_supported_audio_formats"):
138            # optimized config for new ESPHome mediaplayer
139            supported_sample_rates: list[int] = []
140            supported_bit_depths: list[int] = []
141            codec: str | None = None
142            supported_formats: list[ESPHomeSupportedAudioFormat] = self.extra_data[
143                "esphome_supported_audio_formats"
144            ]
145            # sort on purpose field, so we prefer the media pipeline
146            # but allows fallback to announcements pipeline if no media pipeline is available
147            supported_formats.sort(key=lambda x: x["purpose"])
148            for supported_format in supported_formats:
149                codec = supported_format["format"]
150                if supported_format["sample_rate"] not in supported_sample_rates:
151                    supported_sample_rates.append(supported_format["sample_rate"])
152                bit_depth = (supported_format["sample_bytes"] or 2) * 8
153                if bit_depth not in supported_bit_depths:
154                    supported_bit_depths.append(bit_depth)
155            if not supported_sample_rates or not supported_bit_depths:
156                # esphome device with no media pipeline configured
157                # simply use the default config of the media pipeline
158                supported_sample_rates = [48000]
159                supported_bit_depths = [16]
160
161            config_entries = [
162                *base_entries,
163                # New ESPHome mediaplayer (used in Voice PE) uses FLAC 48khz/16 bits
164                CONF_ENTRY_HTTP_PROFILE_FORCED_2,
165            ]
166
167            if codec is not None:
168                config_entries.append(create_output_codec_config_entry(True, codec))
169
170            config_entries.extend(
171                [
172                    CONF_ENTRY_ENABLE_ICY_METADATA_HIDDEN,
173                    create_sample_rates_config_entry(
174                        supported_sample_rates=supported_sample_rates,
175                        supported_bit_depths=supported_bit_depths,
176                        hidden=True,
177                    ),
178                    # although the Voice PE supports announcements,
179                    # it does not support volume for announcements
180                    *HIDDEN_ANNOUNCE_VOLUME_CONFIG_ENTRIES,
181                ]
182            )
183
184            return config_entries
185
186        # add alert if player is a known player type that has a native provider in MA
187        if self.extra_data.get("hass_domain") in WARN_HASS_INTEGRATIONS:
188            base_entries = [CONF_ENTRY_WARN_HASS_INTEGRATION, *base_entries]
189
190        return base_entries
191
192    async def play(self) -> None:
193        """Handle PLAY command on the player."""
194        await self.hass.call_service(
195            domain="media_player",
196            service="media_play",
197            target={"entity_id": self.player_id},
198        )
199
200    async def pause(self) -> None:
201        """Handle PAUSE command on the player."""
202        await self.hass.call_service(
203            domain="media_player",
204            service="media_pause",
205            target={"entity_id": self.player_id},
206        )
207
208    async def stop(self) -> None:
209        """Send STOP command to player."""
210        try:
211            await self.hass.call_service(
212                domain="media_player",
213                service="media_stop",
214                target={"entity_id": self.player_id},
215            )
216        except FailedCommand as exc:
217            # some HA players do not support STOP
218            if "does not support" not in str(exc):
219                raise
220            if PlayerFeature.PAUSE in self.supported_features:
221                await self.pause()
222        finally:
223            self._attr_current_media = None
224            self.update_state()
225
226    async def volume_set(self, volume_level: int) -> None:
227        """Handle VOLUME_SET command on the player."""
228        await self.hass.call_service(
229            domain="media_player",
230            service="volume_set",
231            target={"entity_id": self.player_id},
232            service_data={"volume_level": volume_level / 100},
233        )
234
235    async def volume_mute(self, muted: bool) -> None:
236        """Handle VOLUME MUTE command on the player."""
237        await self.hass.call_service(
238            domain="media_player",
239            service="volume_mute",
240            target={"entity_id": self.player_id},
241            service_data={"is_volume_muted": muted},
242        )
243
244    async def power(self, powered: bool) -> None:
245        """Handle POWER command on the player."""
246        await self.hass.call_service(
247            domain="media_player",
248            service="turn_on" if powered else "turn_off",
249            target={"entity_id": self.player_id},
250        )
251
252    async def next_track(self) -> None:
253        """Handle NEXT_TRACK command on the player."""
254        await self.hass.call_service(
255            domain="media_player",
256            service="media_next_track",
257            target={"entity_id": self.player_id},
258        )
259
260    async def previous_track(self) -> None:
261        """Handle PREVIOUS_TRACK command on the player."""
262        await self.hass.call_service(
263            domain="media_player",
264            service="media_previous_track",
265            target={"entity_id": self.player_id},
266        )
267
268    async def play_media(self, media: PlayerMedia) -> None:
269        """Handle PLAY MEDIA on given player."""
270        url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
271        extra_data: dict[str, Any] = {
272            # passing metadata to the player
273            # so far only supported by google cast, but maybe others can follow
274            "metadata": {
275                "title": media.title,
276                "artist": media.artist,
277                "metadataType": 3,
278                "album": media.album,
279                "albumName": media.album,
280                "images": [{"url": media.image_url}] if media.image_url else None,
281                "imageUrl": media.image_url,
282                "duration": media.duration,
283            },
284        }
285        if self.extra_data.get("hass_domain") == "esphome":
286            # tell esphome mediaproxy to bypass the proxy,
287            # as MA already delivers an optimized stream
288            extra_data["bypass_proxy"] = True
289
290        # stop the player if it is already playing
291        if self._attr_playback_state == PlaybackState.PLAYING:
292            await self.stop()
293
294        await self.hass.call_service(
295            domain="media_player",
296            service="play_media",
297            target={"entity_id": self.player_id},
298            service_data={
299                "media_content_id": url,
300                "media_content_type": "music",
301                "enqueue": "replace",
302                "extra": extra_data,
303            },
304        )
305
306        # Optimistically update state
307        self._attr_current_media = media
308        self._attr_elapsed_time = 0
309        self._attr_elapsed_time_last_updated = time.time()
310        self._attr_playback_state = PlaybackState.PLAYING
311        self.update_state()
312
313    async def play_announcement(
314        self, announcement: PlayerMedia, volume_level: int | None = None
315    ) -> None:
316        """Handle (provider native) playback of an announcement on given player."""
317        self.logger.info(
318            "Playing announcement %s on %s",
319            announcement.uri,
320            self.display_name,
321        )
322        if volume_level is not None:
323            self.logger.warning(
324                "Announcement volume level is not supported for player %s",
325                self.display_name,
326            )
327        await self.hass.call_service(
328            domain="media_player",
329            service="play_media",
330            service_data={
331                "media_content_id": announcement.uri,
332                "media_content_type": "music",
333                "announce": True,
334            },
335            target={"entity_id": self.player_id},
336        )
337        # Wait until the announcement is finished playing
338        # This is helpful for people who want to play announcements in a sequence
339        media_info = await async_parse_tags(announcement.uri, require_duration=True)
340        duration = media_info.duration or 5
341        await asyncio.sleep(duration)
342        self.logger.debug(
343            "Playing announcement on %s completed",
344            self.display_name,
345        )
346
347    async def set_members(
348        self,
349        player_ids_to_add: list[str] | None = None,
350        player_ids_to_remove: list[str] | None = None,
351    ) -> None:
352        """
353        Handle SET_MEMBERS command on the player.
354
355        Group or ungroup the given child player(s) to/from this player.
356        Will only be called if the PlayerFeature.SET_MEMBERS is supported.
357
358        :param player_ids_to_add: List of player_id's to add to the group.
359        :param player_ids_to_remove: List of player_id's to remove from the group.
360        """
361        for player_id_to_remove in player_ids_to_remove or []:
362            await self.hass.call_service(
363                domain="media_player",
364                service="unjoin",
365                target={"entity_id": player_id_to_remove},
366            )
367        if player_ids_to_add:
368            await self.hass.call_service(
369                domain="media_player",
370                service="join",
371                service_data={"group_members": player_ids_to_add},
372                target={"entity_id": self.player_id},
373            )
374
375    def update_from_compressed_state(self, state: CompressedState) -> None:
376        """Handle updating the player with updated info in a HA CompressedState."""
377        if "s" in state:
378            self._attr_playback_state = StateMap.get(state["s"], PlaybackState.IDLE)
379            self._attr_available = state["s"] not in UNAVAILABLE_STATES
380            if PlayerFeature.POWER in self.supported_features:
381                self._attr_powered = state["s"] not in OFF_STATES
382        if "a" in state:
383            self._update_attributes(state["a"])
384        self.update_state()
385
386    def _update_hass_features(self, hass_supported_features: MediaPlayerEntityFeature) -> None:
387        """Update player and External source features based on HA supported features."""
388        # Update player supported features for PAUSE and NEXT_PREVIOUS
389        if MediaPlayerEntityFeature.PAUSE in hass_supported_features:
390            self._attr_supported_features.add(PlayerFeature.PAUSE)
391        else:
392            self._attr_supported_features.discard(PlayerFeature.PAUSE)
393
394        has_next_prev = (
395            MediaPlayerEntityFeature.NEXT_TRACK in hass_supported_features
396            or MediaPlayerEntityFeature.PREVIOUS_TRACK in hass_supported_features
397        )
398        if has_next_prev:
399            self._attr_supported_features.add(PlayerFeature.NEXT_PREVIOUS)
400        else:
401            self._attr_supported_features.discard(PlayerFeature.NEXT_PREVIOUS)
402
403        # Update the External source capabilities
404        for source in self._attr_source_list:
405            if source.id == "External":
406                source.can_play_pause = MediaPlayerEntityFeature.PAUSE in hass_supported_features
407                source.can_next_previous = has_next_prev
408                source.can_seek = MediaPlayerEntityFeature.SEEK in hass_supported_features
409                break
410
411    def _update_attributes(self, attributes: dict[str, Any]) -> None:
412        """Update Player attributes from HA state attributes."""
413        self._hass_attributes.update(attributes)
414
415        # process optional attributes - these may not be present in all states
416        for key, value in attributes.items():
417            if key == "friendly_name":
418                self._attr_name = value
419            elif key == "media_position":
420                self._attr_elapsed_time = value
421            elif key == "media_position_updated_at":
422                self._attr_elapsed_time_last_updated = from_iso_string(value).timestamp()
423            elif key == "volume_level":
424                self._attr_volume_level = int(value * 100)
425            elif key == "is_volume_muted":
426                self._attr_volume_muted = value
427            elif key == "group_members":
428                group_members: list[str] = (
429                    [
430                        # ignore integrations that incorrectly set the group members attribute
431                        # (e.g. linkplay)
432                        x
433                        for x in value
434                        if x.startswith("media_player.")
435                    ]
436                    if value
437                    else []
438                )
439                if group_members and group_members[0] == self.player_id:
440                    # first in the list is the group leader
441                    self._attr_group_members = group_members
442                elif group_members and group_members[0] != self.player_id:
443                    # this player is not the group leader
444                    self._attr_group_members.clear()
445                else:
446                    self._attr_group_members.clear()
447            elif key == "supported_features":
448                # Update supported features dynamically via shared helper
449                hass_supported_features = MediaPlayerEntityFeature(value)
450                self.extra_data["hass_supported_features"] = hass_supported_features
451                self._update_hass_features(hass_supported_features)
452
453        # Check for external playback (not from Music Assistant).
454        # Without media_content_id we cannot reliably determine the source,
455        # so we later only react to state updates that include it.
456        media_content_id = self._hass_attributes.get("media_content_id", "")
457        is_ma_playback = media_content_id.startswith(self.mass.streams.base_url)
458        media_title = self._hass_attributes.get("media_title")
459
460        if media_content_id and is_ma_playback:
461            # MA playback - ensure active_source points to player_id for queue lookup.
462            # The actual current_media will be set by MA's queue controller.
463            self._attr_active_source = None
464        elif (
465            media_content_id
466            and media_title
467            and self.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED)
468        ):
469            # External playback detected - set current_media from HA attributes
470            ha_content_type = self._hass_attributes.get("media_content_type", "")
471            media_type = MediaType.RADIO if ha_content_type == "radio" else MediaType.UNKNOWN
472            current_media = PlayerMedia(
473                uri=media_content_id,
474                media_type=media_type,
475                title=media_title,
476                artist=self._hass_attributes.get("media_artist"),
477                album=self._hass_attributes.get("media_album_name"),
478                image_url=self._get_image_url(self._hass_attributes),
479                duration=int(self._hass_attributes.get("media_duration", 0) or 0) or None,
480            )
481            self._attr_current_media = current_media
482            self._attr_active_source = "External"
483
484        elif self.playback_state == PlaybackState.IDLE:
485            # Clear external media if it was set
486            if self._attr_active_source and self._attr_active_source not in (
487                self.player_id,
488                None,
489            ):
490                self._attr_current_media = None
491                self._attr_active_source = None
492
493    def _get_image_url(self, attributes: dict[str, Any]) -> str | None:
494        """Get the image URL from the attributes."""
495        if entity_picture := attributes.get("entity_picture"):
496            entity_picture = str(entity_picture)
497            if entity_picture.startswith("http"):
498                return entity_picture
499
500            # Access via provider -> hass_prov
501            prov = cast("HomeAssistantPlayerProvider", self.provider)
502
503            # Use proxy for internal HA images
504            # We create a MediaItemImage with the hass provider as source
505            # This will trigger resolve_image on the hass provider when requested
506            image = MediaItemImage(
507                type=ImageType.THUMB,
508                path=entity_picture,
509                provider=prov.hass_prov.instance_id,
510                remotely_accessible=False,
511            )
512            return self.mass.metadata.get_image_url(image)
513        return None
514