music-assistant-server

9.9 KBPY
helpers.py
9.9 KB275 lines • python
1"""Helpers to deal with Cast devices."""
2
3from __future__ import annotations
4
5import urllib.error
6from dataclasses import asdict, dataclass
7from typing import TYPE_CHECKING
8from uuid import UUID
9
10from pychromecast import dial
11from pychromecast.const import CAST_TYPE_GROUP
12
13from music_assistant.constants import VERBOSE_LOG_LEVEL
14
15if TYPE_CHECKING:
16    from pychromecast.controllers.media import MediaStatus
17    from pychromecast.controllers.multizone import MultizoneManager
18    from pychromecast.controllers.receiver import CastStatus
19    from pychromecast.models import CastInfo
20    from pychromecast.socket_client import ConnectionStatus
21    from zeroconf import ServiceInfo, Zeroconf
22
23    from .player import ChromecastPlayer
24
25DEFAULT_PORT = 8009
26
27
28@dataclass
29class ChromecastInfo:
30    """Class to hold all data about a chromecast for creating connections.
31
32    This also has the same attributes as the mDNS fields by zeroconf.
33    """
34
35    services: set
36    uuid: UUID
37    model_name: str
38    friendly_name: str
39    host: str
40    port: int
41    cast_type: str | None = None
42    manufacturer: str | None = None
43    is_dynamic_group: bool | None = None
44    is_multichannel_group: bool = False  # group created for e.g. stereo pair
45    is_multichannel_child: bool = False  # speaker that is part of multichannel setup
46    mac_address: str | None = None  # MAC address from eureka_info API
47
48    @property
49    def is_audio_group(self) -> bool:
50        """Return if the cast is an audio group."""
51        return self.cast_type == CAST_TYPE_GROUP
52
53    @classmethod
54    def from_cast_info(cls, cast_info: CastInfo) -> ChromecastInfo:
55        """Instantiate ChromecastInfo from CastInfo."""
56        return cls(**asdict(cast_info))
57
58    def update(self, cast_info: CastInfo) -> None:
59        """Update ChromecastInfo from CastInfo."""
60        for key, value in asdict(cast_info).items():
61            if not value:
62                continue
63            setattr(self, key, value)
64
65    def fill_out_missing_chromecast_info(self, zconf: Zeroconf) -> None:
66        """
67        Return a new ChromecastInfo object with missing attributes filled in.
68
69        Uses blocking HTTP / HTTPS.
70        """
71        if self.cast_type is None or self.manufacturer is None:
72            # Manufacturer and cast type is not available in mDNS data,
73            # get it over HTTP
74            cast_info = dial.get_cast_type(
75                self,
76                zconf=zconf,
77            )
78            self.cast_type = cast_info.cast_type
79            self.manufacturer = cast_info.manufacturer
80
81        # Fill out missing group information via HTTP API.
82        dynamic_groups, multichannel_groups = get_multizone_info(self.services, zconf)
83        self.is_dynamic_group = self.uuid in dynamic_groups
84        if self.uuid in multichannel_groups:
85            self.is_multichannel_group = True
86        elif (
87            multichannel_groups
88            # Prevent a multichannel group being marked as a multichannel child
89            # if not in UUID list
90            and self.cast_type != "group"
91            and self.model_name != "Google Cast Group"
92        ):
93            self.is_multichannel_child = True
94
95        # Get MAC address for device matching (not available for groups)
96        if self.mac_address is None and self.cast_type != "group":
97            self.mac_address = get_mac_address(self.services, zconf)
98
99
100def get_multizone_info(services: list[ServiceInfo], zconf: Zeroconf, timeout=30):
101    """Get multizone info from eureka endpoint."""
102    dynamic_groups: set[str] = set()
103    multichannel_groups: set[str] = set()
104    try:
105        _, status = dial._get_status(
106            services,
107            zconf,
108            "/setup/eureka_info?params=multizone",
109            True,
110            timeout,
111            None,
112        )
113        if "multizone" in status and "dynamic_groups" in status["multizone"]:
114            for group in status["multizone"]["dynamic_groups"]:
115                if udn := group.get("uuid"):
116                    uuid = UUID(udn.replace("-", ""))
117                    dynamic_groups.add(uuid)
118
119        if "multizone" in status and "groups" in status["multizone"]:
120            for group in status["multizone"]["groups"]:
121                if "multichannel_group" not in group:
122                    continue
123                if group["multichannel_group"] and (udn := group.get("uuid")):
124                    uuid = UUID(udn.replace("-", ""))
125                    multichannel_groups.add(uuid)
126    except (urllib.error.HTTPError, urllib.error.URLError, OSError, KeyError, ValueError):
127        pass
128    return (dynamic_groups, multichannel_groups)
129
130
131def get_mac_address(services: list[ServiceInfo], zconf: Zeroconf, timeout: int = 10) -> str | None:
132    """Get MAC address from Chromecast eureka_info API.
133
134    :param services: List of zeroconf service info.
135    :param zconf: Zeroconf instance.
136    :param timeout: Request timeout in seconds.
137    :return: MAC address string or None if not available.
138    """
139    try:
140        _, status = dial._get_status(
141            services,
142            zconf,
143            "/setup/eureka_info?options=detail",
144            True,
145            timeout,
146            None,
147        )
148        if mac_address := status.get("mac_address"):
149            # Normalize to uppercase with colons
150            mac = mac_address.upper().replace("-", ":")
151            # Ensure proper format
152            if ":" not in mac and len(mac) == 12:
153                mac = ":".join(mac[i : i + 2] for i in range(0, 12, 2))
154            return mac
155    except (urllib.error.HTTPError, urllib.error.URLError, OSError, KeyError, ValueError):
156        pass
157    return None
158
159
160class CastStatusListener:
161    """
162    Helper class to handle pychromecast status callbacks.
163
164    Necessary because a CastDevice entity can create a new socket client
165    and therefore callbacks from multiple chromecast connections can
166    potentially arrive. This class allows invalidating past chromecast objects.
167    """
168
169    def __init__(
170        self,
171        castplayer: ChromecastPlayer,
172        mz_mgr: MultizoneManager,
173        mz_only=False,
174    ) -> None:
175        """Initialize the status listener."""
176        self.castplayer = castplayer
177        self._uuid = castplayer.cc.uuid
178        self._valid = True
179        self._mz_mgr = mz_mgr
180        if self.castplayer.cast_info.is_audio_group:
181            self._mz_mgr.add_multizone(castplayer.cc)
182        if mz_only:
183            return
184        castplayer.cc.register_status_listener(self)
185        castplayer.cc.socket_client.media_controller.register_status_listener(self)
186        castplayer.cc.register_connection_listener(self)
187        if not self.castplayer.cast_info.is_audio_group:
188            self._mz_mgr.register_listener(castplayer.cc.uuid, self)
189
190    def new_cast_status(self, status: CastStatus) -> None:
191        """Handle updated CastStatus."""
192        if not self._valid:
193            return
194        self.castplayer.on_new_cast_status(status)
195
196    def new_media_status(self, status: MediaStatus) -> None:
197        """Handle updated MediaStatus."""
198        if not self._valid:
199            return
200        self.castplayer.on_new_media_status(status)
201
202    def new_connection_status(self, status: ConnectionStatus) -> None:
203        """Handle updated ConnectionStatus."""
204        if not self._valid:
205            return
206        self.castplayer.on_new_connection_status(status)
207
208    def added_to_multizone(self, group_uuid) -> None:
209        """Handle the cast added to a group."""
210        self.castplayer.logger.debug(
211            "%s is added to multizone: %s", self.castplayer.display_name, group_uuid
212        )
213        self.new_cast_status(self.castplayer.cc.status)
214
215    def removed_from_multizone(self, group_uuid) -> None:
216        """Handle the cast removed from a group."""
217        if not self._valid:
218            return
219        if group_uuid == self.castplayer.active_source:
220            mass = self.castplayer.mass
221            mass.loop.call_soon_threadsafe(self.castplayer.update_state)
222        self.castplayer.logger.debug(
223            "%s is removed from multizone: %s", self.castplayer.display_name, group_uuid
224        )
225        self.new_cast_status(self.castplayer.cc.status)
226
227    def multizone_new_cast_status(self, group_uuid, cast_status) -> None:
228        """Handle reception of a new CastStatus for a group."""
229        mass = self.castplayer.mass
230        if group_player := mass.players.get_player(group_uuid):
231            if TYPE_CHECKING:
232                assert isinstance(group_player, ChromecastPlayer)
233            if group_player.cc.media_controller.is_active:
234                self.castplayer.active_cast_group = group_uuid
235            elif group_uuid == self.castplayer.active_cast_group:
236                self.castplayer.active_cast_group = None
237
238        self.castplayer.logger.log(
239            VERBOSE_LOG_LEVEL,
240            "%s got new cast status for group: %s",
241            self.castplayer.display_name,
242            group_uuid,
243        )
244        self.new_cast_status(self.castplayer.cc.status)
245
246    def multizone_new_media_status(self, group_uuid, media_status) -> None:
247        """Handle reception of a new MediaStatus for a group."""
248        if not self._valid:
249            return
250        self.castplayer.logger.log(
251            VERBOSE_LOG_LEVEL,
252            "%s got new media_status for group: %s",
253            self.castplayer.display_name,
254            group_uuid,
255        )
256        self.castplayer.on_new_media_status(media_status)
257
258    def load_media_failed(self, queue_item_id, error_code) -> None:
259        """Call when media failed to load."""
260        self.castplayer.logger.warning(
261            "Load media failed: %s - error code: %s", queue_item_id, error_code
262        )
263
264    def invalidate(self) -> None:
265        """
266        Invalidate this status listener.
267
268        All following callbacks won't be forwarded.
269        """
270        if self.castplayer.cast_info.is_audio_group:
271            self._mz_mgr.remove_multizone(self._uuid)
272        else:
273            self._mz_mgr.deregister_listener(self._uuid, self)
274        self._valid = False
275