music-assistant-server

25.3 KBPY
__init__.py
25.3 KB606 lines • python
1"""
2Home Assistant Plugin for Music Assistant.
3
4The plugin is the core of all communication to/from Home Assistant and
5responsible for maintaining the WebSocket API connection to HA.
6Also, the Music Assistant integration within HA will relay its own api
7communication over the HA api for more flexibility as well as security.
8"""
9
10from __future__ import annotations
11
12import asyncio
13import logging
14import os
15from functools import partial
16from typing import TYPE_CHECKING, cast
17
18import shortuuid
19from hass_client import HomeAssistantClient
20from hass_client.exceptions import BaseHassClientError
21from hass_client.utils import (
22    base_url,
23    get_auth_url,
24    get_long_lived_token,
25    get_token,
26    get_websocket_url,
27)
28from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
29from music_assistant_models.enums import ConfigEntryType, ProviderFeature
30from music_assistant_models.errors import LoginFailed, SetupFailedError
31from music_assistant_models.player_control import PlayerControl
32
33from music_assistant.constants import MASS_LOGO_ONLINE, VERBOSE_LOG_LEVEL
34from music_assistant.helpers.auth import AuthenticationHelper
35from music_assistant.helpers.util import try_parse_int
36from music_assistant.models.plugin import PluginProvider
37
38from .constants import OFF_STATES, MediaPlayerEntityFeature
39
40if TYPE_CHECKING:
41    from hass_client.models import CompressedState, Device, EntityStateEvent
42    from music_assistant_models.config_entries import ProviderConfig
43    from music_assistant_models.provider import ProviderManifest
44
45    from music_assistant.mass import MusicAssistant
46    from music_assistant.models import ProviderInstanceType
47
48DOMAIN = "hass"
49CONF_URL = "url"
50CONF_AUTH_TOKEN = "token"
51CONF_ACTION_AUTH = "auth"
52CONF_VERIFY_SSL = "verify_ssl"
53CONF_POWER_CONTROLS = "power_controls"
54CONF_MUTE_CONTROLS = "mute_controls"
55CONF_VOLUME_CONTROLS = "volume_controls"
56
57SUPPORTED_FEATURES: set[ProviderFeature] = (
58    set()
59)  # we don't have any special supported features (yet)
60
61
62async def setup(
63    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
64) -> ProviderInstanceType:
65    """Initialize provider(instance) with given configuration."""
66    return HomeAssistantProvider(mass, manifest, config, SUPPORTED_FEATURES)
67
68
69async def get_config_entries(
70    mass: MusicAssistant,
71    instance_id: str | None = None,
72    action: str | None = None,
73    values: dict[str, ConfigValueType] | None = None,
74) -> tuple[ConfigEntry, ...]:
75    """
76    Return Config entries to setup this provider.
77
78    instance_id: id of an existing provider instance (None if new instance setup).
79    action: [optional] action key called from config entries UI.
80    values: the (intermediate) raw values for config entries sent with the action.
81    """
82    # config flow auth action/step (authenticate button clicked)
83    if action == CONF_ACTION_AUTH and values:
84        hass_url = values[CONF_URL]
85        async with AuthenticationHelper(mass, str(values["session_id"])) as auth_helper:
86            client_id = base_url(auth_helper.callback_url)
87            auth_url = get_auth_url(
88                hass_url,
89                auth_helper.callback_url,
90                client_id=client_id,
91                state=values["session_id"],
92            )
93            result = await auth_helper.authenticate(auth_url)
94        if result["state"] != values["session_id"]:
95            msg = "session id mismatch"
96            raise LoginFailed(msg)
97        # get access token after auth was a success
98        token_details = await get_token(hass_url, result["code"], client_id=client_id)
99        # register for a long lived token
100        long_lived_token = await get_long_lived_token(
101            hass_url,
102            token_details["access_token"],
103            client_name=f"Music Assistant {shortuuid.random(6)}",
104            client_icon=MASS_LOGO_ONLINE,
105            lifespan=365 * 2,
106        )
107        # set the retrieved token on the values object to pass along
108        values[CONF_AUTH_TOKEN] = long_lived_token
109
110    base_entries: tuple[ConfigEntry, ...]
111    if mass.running_as_hass_addon:
112        # on supervisor, we use the internal url
113        # token set to None for auto retrieval
114        base_entries = (
115            ConfigEntry(
116                key=CONF_URL,
117                type=ConfigEntryType.STRING,
118                label=CONF_URL,
119                required=True,
120                default_value="http://supervisor/core/api",
121                value="http://supervisor/core/api",
122                hidden=True,
123            ),
124            ConfigEntry(
125                key=CONF_AUTH_TOKEN,
126                type=ConfigEntryType.STRING,
127                label=CONF_AUTH_TOKEN,
128                required=False,
129                default_value=None,
130                value=None,
131                hidden=True,
132            ),
133            ConfigEntry(
134                key=CONF_VERIFY_SSL,
135                type=ConfigEntryType.BOOLEAN,
136                label=CONF_VERIFY_SSL,
137                required=False,
138                default_value=False,
139                hidden=True,
140            ),
141        )
142    else:
143        # manual configuration
144        base_entries = (
145            ConfigEntry(
146                key=CONF_URL,
147                type=ConfigEntryType.STRING,
148                label="URL",
149                required=True,
150                description="URL to your Home Assistant instance (e.g. http://192.168.1.1:8123)",
151                value=cast("str", values.get(CONF_URL)) if values else None,
152            ),
153            ConfigEntry(
154                key=CONF_ACTION_AUTH,
155                type=ConfigEntryType.ACTION,
156                label="(re)Authenticate Home Assistant",
157                description="Authenticate to your home assistant "
158                "instance and generate the long lived token.",
159                action=CONF_ACTION_AUTH,
160                depends_on=CONF_URL,
161                required=False,
162            ),
163            ConfigEntry(
164                key=CONF_AUTH_TOKEN,
165                type=ConfigEntryType.SECURE_STRING,
166                label="Authentication token for HomeAssistant",
167                description="You can either paste a Long Lived Token here manually or use the "
168                "'authenticate' button to generate a token for you with logging in.",
169                depends_on=CONF_URL,
170                value=cast("str", values.get(CONF_AUTH_TOKEN)) if values else None,
171                advanced=True,
172            ),
173            ConfigEntry(
174                key=CONF_VERIFY_SSL,
175                type=ConfigEntryType.BOOLEAN,
176                label="Verify SSL",
177                required=False,
178                description="Whether or not to verify the certificate of SSL/TLS connections.",
179                advanced=True,
180                default_value=True,
181            ),
182        )
183
184    # append player controls entries (if we have an active instance)
185    if instance_id and (hass_prov := mass.get_provider(instance_id)) and hass_prov.available:
186        hass_prov = cast("HomeAssistantProvider", hass_prov)
187        return (*base_entries, *(await _get_player_control_config_entries(hass_prov.hass)))
188
189    return (
190        *base_entries,
191        ConfigEntry(
192            key=CONF_POWER_CONTROLS,
193            type=ConfigEntryType.STRING,
194            multi_value=True,
195            label=CONF_POWER_CONTROLS,
196            default_value=[],
197        ),
198        ConfigEntry(
199            key=CONF_VOLUME_CONTROLS,
200            type=ConfigEntryType.STRING,
201            multi_value=True,
202            label=CONF_VOLUME_CONTROLS,
203            default_value=[],
204        ),
205        ConfigEntry(
206            key=CONF_MUTE_CONTROLS,
207            type=ConfigEntryType.STRING,
208            multi_value=True,
209            label=CONF_MUTE_CONTROLS,
210            default_value=[],
211        ),
212    )
213
214
215async def _get_player_control_config_entries(hass: HomeAssistantClient) -> tuple[ConfigEntry, ...]:
216    """Return all HA state objects for (valid) media_player entities."""
217    all_power_entities: list[ConfigValueOption] = []
218    all_mute_entities: list[ConfigValueOption] = []
219    all_volume_entities: list[ConfigValueOption] = []
220    # collect all entities that are usable for player controls
221    if not hass.connected:
222        return ()
223    for state in await hass.get_states():
224        entity_platform = state["entity_id"].split(".")[0]
225        if "friendly_name" not in state["attributes"]:
226            name = state["entity_id"]
227        else:
228            name = f"{state['attributes']['friendly_name']} ({state['entity_id']})"
229
230        if entity_platform in ("switch", "input_boolean"):
231            # simple on/off controls are suitable as power and mute controls
232            all_power_entities.append(ConfigValueOption(name, state["entity_id"]))
233            all_mute_entities.append(ConfigValueOption(name, state["entity_id"]))
234            continue
235        if entity_platform in ("number", "input_number"):
236            # number and input_number are very similar, both are suitable for volume control
237            all_volume_entities.append(ConfigValueOption(name, state["entity_id"]))
238            continue
239
240        # media player can be used as control, depending on features
241        if entity_platform != "media_player":
242            continue
243        if "mass_player_type" in state["attributes"]:
244            # filter out mass players
245            continue
246        supported_features = MediaPlayerEntityFeature(state["attributes"]["supported_features"])
247        if MediaPlayerEntityFeature.VOLUME_MUTE in supported_features:
248            all_mute_entities.append(ConfigValueOption(name, state["entity_id"]))
249        if MediaPlayerEntityFeature.VOLUME_SET in supported_features:
250            all_volume_entities.append(ConfigValueOption(name, state["entity_id"]))
251        if (
252            MediaPlayerEntityFeature.TURN_ON in supported_features
253            and MediaPlayerEntityFeature.TURN_OFF in supported_features
254        ):
255            all_power_entities.append(ConfigValueOption(name, state["entity_id"]))
256    all_power_entities.sort(key=lambda x: x.title)
257    all_mute_entities.sort(key=lambda x: x.title)
258    all_volume_entities.sort(key=lambda x: x.title)
259    return (
260        ConfigEntry(
261            key=CONF_POWER_CONTROLS,
262            type=ConfigEntryType.STRING,
263            multi_value=True,
264            label="Player Power Control entities",
265            required=True,
266            options=all_power_entities,
267            default_value=[],
268            description="Specify which Home Assistant entities you "
269            "like to import as player Power controls in Music Assistant.",
270            category="player_controls",
271        ),
272        ConfigEntry(
273            key=CONF_VOLUME_CONTROLS,
274            type=ConfigEntryType.STRING,
275            multi_value=True,
276            label="Player Volume Control entities",
277            required=True,
278            options=all_volume_entities,
279            default_value=[],
280            description="Specify which Home Assistant entities you "
281            "like to import as player Volume controls in Music Assistant.",
282            category="player_controls",
283        ),
284        ConfigEntry(
285            key=CONF_MUTE_CONTROLS,
286            type=ConfigEntryType.STRING,
287            multi_value=True,
288            label="Player Mute Control entities",
289            required=True,
290            options=all_mute_entities,
291            default_value=[],
292            description="Specify which Home Assistant entities you "
293            "like to import as player Mute controls in Music Assistant.",
294            category="player_controls",
295        ),
296    )
297
298
299class HomeAssistantProvider(PluginProvider):
300    """Home Assistant Plugin for Music Assistant."""
301
302    hass: HomeAssistantClient
303    _listen_task: asyncio.Task[None] | None = None
304    _player_controls: dict[str, PlayerControl] | None = None
305
306    async def handle_async_init(self) -> None:
307        """Handle async initialization of the plugin."""
308        self._player_controls = {}
309        url = get_websocket_url(self.config.get_value(CONF_URL))
310        token = self.config.get_value(CONF_AUTH_TOKEN)
311        logging.getLogger("hass_client").setLevel(self.logger.level + 10)
312        ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
313        http_session = self.mass.http_session if ssl else self.mass.http_session_no_ssl
314        self.hass = HomeAssistantClient(url, token, http_session)
315        try:
316            await self.hass.connect()
317        except BaseHassClientError as err:
318            err_msg = str(err) or err.__class__.__name__
319            raise SetupFailedError(err_msg) from err
320        self._listen_task = self.mass.create_task(self._hass_listener())
321
322    async def loaded_in_mass(self) -> None:
323        """Call after the provider has been loaded."""
324        await self._register_player_controls()
325
326    async def unload(self, is_removed: bool = False) -> None:
327        """
328        Handle unload/close of the provider.
329
330        Called when provider is deregistered (e.g. MA exiting or config reloading).
331        """
332        # unregister all player controls
333        if self._player_controls:
334            for entity_id in self._player_controls:
335                self.mass.players.remove_player_control(entity_id)
336        if self._listen_task and not self._listen_task.done():
337            self._listen_task.cancel()
338        await self.hass.disconnect()
339
340    async def _hass_listener(self) -> None:
341        """Start listening on the HA websockets."""
342        try:
343            # start listening will block until the connection is lost/closed
344            await self.hass.start_listening()
345        except BaseHassClientError as err:
346            self.logger.warning("Connection to HA lost due to error: %s", err)
347        self.logger.info("Connection to HA lost. Connection will be automatically retried later.")
348        # schedule a reload of the provider
349        self.available = False
350        self.mass.call_later(5, self.mass.load_provider, self.instance_id, allow_retry=True)
351
352    def _on_entity_state_update(self, event: EntityStateEvent) -> None:
353        """Handle Entity State event."""
354        if entity_additions := event.get("a"):
355            for entity_id, state in entity_additions.items():
356                self._update_control_from_state_msg(entity_id, state)
357        if entity_changes := event.get("c"):
358            for entity_id, state_diff in entity_changes.items():
359                if "+" not in state_diff:
360                    continue
361                self._update_control_from_state_msg(entity_id, state_diff["+"])
362
363    async def _register_player_controls(self) -> None:
364        """Register all player controls."""
365        power_controls = cast("list[str]", self.config.get_value(CONF_POWER_CONTROLS))
366        mute_controls = cast("list[str]", self.config.get_value(CONF_MUTE_CONTROLS))
367        volume_controls = cast("list[str]", self.config.get_value(CONF_VOLUME_CONTROLS))
368        control_entity_ids: set[str] = {
369            *power_controls,
370            *mute_controls,
371            *volume_controls,
372        }
373        hass_states = {
374            state["entity_id"]: state
375            for state in await self.hass.get_states()
376            if state["entity_id"] in control_entity_ids
377        }
378        assert self._player_controls is not None  # for type checking
379        for entity_id in control_entity_ids:
380            entity_platform = entity_id.split(".")[0]
381            hass_state = hass_states.get(entity_id)
382            if hass_state and (friendly_name := hass_state["attributes"].get("friendly_name")):
383                name = f"{friendly_name} ({entity_id})"
384            else:
385                name = entity_id
386            control = PlayerControl(
387                id=entity_id,
388                provider=self.instance_id,
389                name=name,
390            )
391            if entity_id in power_controls:
392                control.supports_power = True
393                control.power_state = hass_state["state"] not in OFF_STATES if hass_state else False
394                control.power_on = partial(self._handle_player_control_power_on, entity_id)
395                control.power_off = partial(self._handle_player_control_power_off, entity_id)
396            if entity_id in volume_controls:
397                control.supports_volume = True
398                if not hass_state:
399                    control.volume_level = 0
400                elif entity_platform == "media_player":
401                    control.volume_level = int(
402                        hass_state["attributes"].get("volume_level", 0) * 100
403                    )
404                else:
405                    control.volume_level = try_parse_int(hass_state["state"]) or 0
406                control.volume_set = partial(self._handle_player_control_volume_set, entity_id)
407            if entity_id in mute_controls:
408                control.supports_mute = True
409                if not hass_state:
410                    control.volume_muted = False
411                elif entity_platform == "media_player":
412                    control.volume_muted = hass_state["attributes"].get("volume_muted")
413                elif hass_state:
414                    control.volume_muted = hass_state["state"] not in OFF_STATES
415                else:
416                    control.volume_muted = False
417                control.mute_set = partial(self._handle_player_control_mute_set, entity_id)
418            self._player_controls[entity_id] = control
419            await self.mass.players.register_player_control(control)
420        # register for entity state updates
421        await self.hass.subscribe_entities(self._on_entity_state_update, list(control_entity_ids))
422
423    async def _handle_player_control_power_on(self, entity_id: str) -> None:
424        """Handle powering on the playercontrol."""
425        await self.hass.call_service(
426            domain="homeassistant",
427            service="turn_on",
428            target={"entity_id": entity_id},
429        )
430
431    async def _handle_player_control_power_off(self, entity_id: str) -> None:
432        """Handle powering off the playercontrol."""
433        await self.hass.call_service(
434            domain="homeassistant",
435            service="turn_off",
436            target={"entity_id": entity_id},
437        )
438
439    async def _handle_player_control_mute_set(self, entity_id: str, muted: bool) -> None:
440        """Handle muting the playercontrol."""
441        if entity_id.startswith("media_player."):
442            await self.hass.call_service(
443                domain="media_player",
444                service="volume_mute",
445                service_data={"is_volume_muted": muted},
446                target={"entity_id": entity_id},
447            )
448        else:
449            await self.hass.call_service(
450                domain="homeassistant",
451                service="turn_off" if muted else "turn_on",
452                target={"entity_id": entity_id},
453            )
454
455    async def _handle_player_control_volume_set(self, entity_id: str, volume_level: int) -> None:
456        """Handle setting volume on the playercontrol."""
457        domain = entity_id.split(".", 1)[0]
458
459        if domain == "media_player":
460            await self.hass.call_service(
461                domain=domain,
462                service="volume_set",
463                service_data={"volume_level": volume_level / 100},
464                target={"entity_id": entity_id},
465            )
466            return
467
468        # At this point, `set_value` will work for both `number` or `input_number`
469        await self.hass.call_service(
470            domain=domain,
471            service="set_value",
472            target={"entity_id": entity_id},
473            service_data={"value": volume_level},
474        )
475
476    async def get_device_by_connection(
477        self,
478        connection_value: str,
479        connection_type: str = "mac",
480    ) -> Device | None:
481        """
482        Get device details from Home Assistant by connection type and value.
483
484        :param connection_value: The connection value (e.g. MAC address).
485        :param connection_type: The connection type (default: 'mac').
486        """
487        devices = await self.hass.get_device_registry()
488        for device in devices:
489            for connection in device.get("connections", []):
490                if (
491                    len(connection) == 2
492                    and connection[0] == connection_type
493                    and connection[1].lower() == connection_value.lower()
494                ):
495                    return device
496        return None
497
498    def _update_control_from_state_msg(self, entity_id: str, state: CompressedState) -> None:
499        """Update PlayerControl from state(update) message."""
500        if self._player_controls is None:
501            return
502        if not (player_control := self._player_controls.get(entity_id)):
503            return
504        entity_platform = entity_id.split(".")[0]
505        if "s" in state:
506            # state changed
507            if player_control.supports_power:
508                player_control.power_state = state["s"] not in OFF_STATES
509            if player_control.supports_mute and entity_platform != "media_player":
510                player_control.volume_muted = state["s"] not in OFF_STATES
511            if player_control.supports_volume and entity_platform != "media_player":
512                player_control.volume_level = try_parse_int(state["s"]) or 0
513        if "a" in state and (attributes := state["a"]):
514            if player_control.supports_volume:
515                if entity_platform == "media_player":
516                    player_control.volume_level = int(attributes.get("volume_level", 0) * 100)
517                else:
518                    player_control.volume_level = try_parse_int(attributes.get("value")) or 0
519            if player_control.supports_mute and entity_platform == "media_player":
520                player_control.volume_muted = attributes.get("volume_muted")
521        self.mass.players.update_player_control(entity_id)
522
523    async def get_user_details(self, ha_user_id: str) -> tuple[str | None, str | None, str | None]:
524        """
525        Get user username, display name and avatar URL from Home Assistant.
526
527        Looks up the user in config/auth/list for username, and the person entity
528        for display name and picture URL.
529
530        :param ha_user_id: Home Assistant user ID.
531        :return: Tuple of (username, display_name, avatar_url) or all None if not found.
532        """
533        try:
534            username: str | None = None
535            display_name: str | None = None
536            avatar_url: str | None = None
537
538            # Get username from config/auth/list (admin endpoint, we have admin access)
539            try:
540                users = await self.hass.send_command("config/auth/list")
541                for user in users or []:
542                    if user.get("id") == ha_user_id:
543                        username = user.get("username")
544                        # Also get name as fallback display name
545                        if not display_name:
546                            display_name = user.get("name")
547                        break
548            except Exception as err:
549                self.logger.log(VERBOSE_LOG_LEVEL, "Failed to get HA user list: %s", err)
550
551            # Get external URL for building avatar URL
552            ha_url: str | None = None
553            try:
554                network_urls = await self.hass.send_command("network/url")
555                if network_urls:
556                    ha_url = network_urls.get("external") or network_urls.get("internal")
557            except Exception as err:
558                self.logger.log(VERBOSE_LOG_LEVEL, "Failed to get HA network URLs: %s", err)
559
560            # Find person linked to this HA user ID for display name and avatar
561            try:
562                persons = await self.hass.send_command("person/list")
563                # person/list returns {storage: [...], config: [...]}
564                all_persons = (persons.get("storage") or []) + (persons.get("config") or [])
565                for person in all_persons:
566                    if person.get("user_id") == ha_user_id:
567                        # Person name takes priority for display name
568                        if person_name := person.get("name"):
569                            display_name = person_name
570                        if (person_picture := person.get("picture")) and ha_url:
571                            avatar_url = f"{ha_url.rstrip('/')}{person_picture}"
572                        break
573            except Exception as err:
574                self.logger.log(VERBOSE_LOG_LEVEL, "Failed to get HA person details: %s", err)
575
576            self.logger.log(
577                VERBOSE_LOG_LEVEL,
578                "get_user_details for %s: username=%s, display_name=%s, avatar_url=%s",
579                ha_user_id,
580                username,
581                display_name,
582                avatar_url,
583            )
584            return username, display_name, avatar_url
585        except Exception as err:
586            self.logger.warning("Failed to get HA user details: %s", err)
587            return None, None, None
588
589    async def resolve_image(self, path: str) -> bytes:
590        """Resolve an image from an image path."""
591        ha_url = cast("str", self.config.get_value(CONF_URL)).rstrip("/")
592        if ha_url.endswith("/api") and path.startswith("/api/"):
593            url = f"{ha_url}{path[4:]}"
594        else:
595            url = f"{ha_url}{path}"
596
597        # Use HASSIO_TOKEN when running as addon (token config is None)
598        token = self.config.get_value(CONF_AUTH_TOKEN) or os.environ.get("HASSIO_TOKEN")
599        headers = {"Authorization": f"Bearer {token}"} if token else {}
600
601        ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
602        http_session = self.mass.http_session if ssl else self.mass.http_session_no_ssl
603        async with http_session.get(url, headers=headers) as response:
604            response.raise_for_status()
605            return await response.read()
606