music-assistant-server

3.9 KBPY
helpers.py
3.9 KB93 lines • python
1"""Helpers and utilities for the Home Assistant PlayerProvider."""
2
3from __future__ import annotations
4
5import logging
6import os
7from typing import TYPE_CHECKING, TypedDict, cast
8
9from music_assistant_models.errors import InvalidDataError, LoginFailed
10
11from music_assistant.providers.hass.constants import MediaPlayerEntityFeature
12
13from .constants import BLOCKLISTED_HASS_INTEGRATIONS
14
15if TYPE_CHECKING:
16    from collections.abc import AsyncGenerator
17
18    from hass_client.models import State as HassState
19
20    from music_assistant.providers.hass import HomeAssistantProvider
21
22
23async def get_hass_media_players(
24    hass_prov: HomeAssistantProvider,
25) -> AsyncGenerator[HassState, None]:
26    """Return all HA state objects for (valid) media_player entities."""
27    entity_registry = {x["entity_id"]: x for x in await hass_prov.hass.get_entity_registry()}
28    for state in await hass_prov.hass.get_states():
29        if not state["entity_id"].startswith("media_player"):
30            continue
31        if "mass_player_type" in state["attributes"]:
32            # filter out mass players
33            continue
34        if "friendly_name" not in state["attributes"]:
35            # filter out invalid/unavailable players
36            continue
37        supported_features = MediaPlayerEntityFeature(state["attributes"]["supported_features"])
38        if MediaPlayerEntityFeature.PLAY_MEDIA not in supported_features:
39            continue
40        if entity_registry_entry := entity_registry.get(state["entity_id"]):
41            hass_domain = entity_registry_entry["platform"]
42            if hass_domain in BLOCKLISTED_HASS_INTEGRATIONS:
43                continue
44        yield state
45
46
47class ESPHomeSupportedAudioFormat(TypedDict):
48    """ESPHome Supported Audio Format."""
49
50    format: str  # flac, wav or mp3
51    sample_rate: int  # e.g. 48000
52    num_channels: int  # 1 for announcements, 2 for media
53    purpose: int  # 0 for media, 1 for announcements
54    sample_bytes: int  # 1 for 8 bit, 2 for 16 bit, 4 for 32 bit
55
56
57async def get_esphome_supported_audio_formats(
58    hass_prov: HomeAssistantProvider, conf_entry_id: str
59) -> list[ESPHomeSupportedAudioFormat]:
60    """Get supported audio formats for an ESPHome device."""
61    result: list[ESPHomeSupportedAudioFormat] = []
62    try:
63        # TODO: expose this in the hass client lib instead of hacking around private vars
64        ws_url = hass_prov.hass._websocket_url or "ws://supervisor/core/websocket"
65        hass_url = ws_url.replace("ws://", "http://").replace("wss://", "https://")
66        hass_url = hass_url.replace("/api/websocket", "").replace("/websocket", "")
67        api_token = hass_prov.hass._token or os.environ.get("HASSIO_TOKEN")
68        url = f"{hass_url}/api/diagnostics/config_entry/{conf_entry_id}"
69        headers = {
70            "Authorization": f"Bearer {api_token}",
71            "content-type": "application/json",
72        }
73        async with hass_prov.mass.http_session.get(url, headers=headers) as response:
74            if response.status != 200:
75                raise LoginFailed("Unable to contact Home Assistant to retrieve diagnostics")
76            data = await response.json()
77            if "data" not in data or "storage_data" not in data["data"]:
78                return result
79            if "media_player" not in data["data"]["storage_data"]:
80                raise InvalidDataError("Media player info not found in ESPHome diagnostics")
81            for media_player_obj in data["data"]["storage_data"]["media_player"]:
82                if "supported_formats" not in media_player_obj:
83                    continue
84                for supported_format_obj in media_player_obj["supported_formats"]:
85                    result.append(cast("ESPHomeSupportedAudioFormat", supported_format_obj))
86    except Exception as exc:
87        hass_prov.logger.warning(
88            "Failed to fetch diagnostics for ESPHome player: %s",
89            str(exc),
90            exc_info=exc if hass_prov.logger.isEnabledFor(logging.DEBUG) else None,
91        )
92    return result
93