/
/
/
1"""Helpers to deal with Cast devices."""
2
3from __future__ import annotations
4
5import urllib.error
6from dataclasses import asdict, dataclass
7from typing import TYPE_CHECKING
8from uuid import UUID
9
10from pychromecast import dial
11from pychromecast.const import CAST_TYPE_GROUP
12
13from music_assistant.constants import VERBOSE_LOG_LEVEL
14
15if TYPE_CHECKING:
16 from pychromecast.controllers.media import MediaStatus
17 from pychromecast.controllers.multizone import MultizoneManager
18 from pychromecast.controllers.receiver import CastStatus
19 from pychromecast.models import CastInfo
20 from pychromecast.socket_client import ConnectionStatus
21 from zeroconf import ServiceInfo, Zeroconf
22
23 from .player import ChromecastPlayer
24
25DEFAULT_PORT = 8009
26
27
28@dataclass
29class ChromecastInfo:
30 """Class to hold all data about a chromecast for creating connections.
31
32 This also has the same attributes as the mDNS fields by zeroconf.
33 """
34
35 services: set
36 uuid: UUID
37 model_name: str
38 friendly_name: str
39 host: str
40 port: int
41 cast_type: str | None = None
42 manufacturer: str | None = None
43 is_dynamic_group: bool | None = None
44 is_multichannel_group: bool = False # group created for e.g. stereo pair
45 is_multichannel_child: bool = False # speaker that is part of multichannel setup
46 mac_address: str | None = None # MAC address from eureka_info API
47
48 @property
49 def is_audio_group(self) -> bool:
50 """Return if the cast is an audio group."""
51 return self.cast_type == CAST_TYPE_GROUP
52
53 @classmethod
54 def from_cast_info(cls, cast_info: CastInfo) -> ChromecastInfo:
55 """Instantiate ChromecastInfo from CastInfo."""
56 return cls(**asdict(cast_info))
57
58 def update(self, cast_info: CastInfo) -> None:
59 """Update ChromecastInfo from CastInfo."""
60 for key, value in asdict(cast_info).items():
61 if not value:
62 continue
63 setattr(self, key, value)
64
65 def fill_out_missing_chromecast_info(self, zconf: Zeroconf) -> None:
66 """
67 Return a new ChromecastInfo object with missing attributes filled in.
68
69 Uses blocking HTTP / HTTPS.
70 """
71 if self.cast_type is None or self.manufacturer is None:
72 # Manufacturer and cast type is not available in mDNS data,
73 # get it over HTTP
74 cast_info = dial.get_cast_type(
75 self,
76 zconf=zconf,
77 )
78 self.cast_type = cast_info.cast_type
79 self.manufacturer = cast_info.manufacturer
80
81 # Fill out missing group information via HTTP API.
82 dynamic_groups, multichannel_groups = get_multizone_info(self.services, zconf)
83 self.is_dynamic_group = self.uuid in dynamic_groups
84 if self.uuid in multichannel_groups:
85 self.is_multichannel_group = True
86 elif (
87 multichannel_groups
88 # Prevent a multichannel group being marked as a multichannel child
89 # if not in UUID list
90 and self.cast_type != "group"
91 and self.model_name != "Google Cast Group"
92 ):
93 self.is_multichannel_child = True
94
95 # Get MAC address for device matching (not available for groups)
96 if self.mac_address is None and self.cast_type != "group":
97 self.mac_address = get_mac_address(self.services, zconf)
98
99
100def get_multizone_info(services: list[ServiceInfo], zconf: Zeroconf, timeout=30):
101 """Get multizone info from eureka endpoint."""
102 dynamic_groups: set[str] = set()
103 multichannel_groups: set[str] = set()
104 try:
105 _, status = dial._get_status(
106 services,
107 zconf,
108 "/setup/eureka_info?params=multizone",
109 True,
110 timeout,
111 None,
112 )
113 if "multizone" in status and "dynamic_groups" in status["multizone"]:
114 for group in status["multizone"]["dynamic_groups"]:
115 if udn := group.get("uuid"):
116 uuid = UUID(udn.replace("-", ""))
117 dynamic_groups.add(uuid)
118
119 if "multizone" in status and "groups" in status["multizone"]:
120 for group in status["multizone"]["groups"]:
121 if "multichannel_group" not in group:
122 continue
123 if group["multichannel_group"] and (udn := group.get("uuid")):
124 uuid = UUID(udn.replace("-", ""))
125 multichannel_groups.add(uuid)
126 except (urllib.error.HTTPError, urllib.error.URLError, OSError, KeyError, ValueError):
127 pass
128 return (dynamic_groups, multichannel_groups)
129
130
131def get_mac_address(services: list[ServiceInfo], zconf: Zeroconf, timeout: int = 10) -> str | None:
132 """Get MAC address from Chromecast eureka_info API.
133
134 :param services: List of zeroconf service info.
135 :param zconf: Zeroconf instance.
136 :param timeout: Request timeout in seconds.
137 :return: MAC address string or None if not available.
138 """
139 try:
140 _, status = dial._get_status(
141 services,
142 zconf,
143 "/setup/eureka_info?options=detail",
144 True,
145 timeout,
146 None,
147 )
148 if mac_address := status.get("mac_address"):
149 # Normalize to uppercase with colons
150 mac = mac_address.upper().replace("-", ":")
151 # Ensure proper format
152 if ":" not in mac and len(mac) == 12:
153 mac = ":".join(mac[i : i + 2] for i in range(0, 12, 2))
154 return mac
155 except (urllib.error.HTTPError, urllib.error.URLError, OSError, KeyError, ValueError):
156 pass
157 return None
158
159
160class CastStatusListener:
161 """
162 Helper class to handle pychromecast status callbacks.
163
164 Necessary because a CastDevice entity can create a new socket client
165 and therefore callbacks from multiple chromecast connections can
166 potentially arrive. This class allows invalidating past chromecast objects.
167 """
168
169 def __init__(
170 self,
171 castplayer: ChromecastPlayer,
172 mz_mgr: MultizoneManager,
173 mz_only=False,
174 ) -> None:
175 """Initialize the status listener."""
176 self.castplayer = castplayer
177 self._uuid = castplayer.cc.uuid
178 self._valid = True
179 self._mz_mgr = mz_mgr
180 if self.castplayer.cast_info.is_audio_group:
181 self._mz_mgr.add_multizone(castplayer.cc)
182 if mz_only:
183 return
184 castplayer.cc.register_status_listener(self)
185 castplayer.cc.socket_client.media_controller.register_status_listener(self)
186 castplayer.cc.register_connection_listener(self)
187 if not self.castplayer.cast_info.is_audio_group:
188 self._mz_mgr.register_listener(castplayer.cc.uuid, self)
189
190 def new_cast_status(self, status: CastStatus) -> None:
191 """Handle updated CastStatus."""
192 if not self._valid:
193 return
194 self.castplayer.on_new_cast_status(status)
195
196 def new_media_status(self, status: MediaStatus) -> None:
197 """Handle updated MediaStatus."""
198 if not self._valid:
199 return
200 self.castplayer.on_new_media_status(status)
201
202 def new_connection_status(self, status: ConnectionStatus) -> None:
203 """Handle updated ConnectionStatus."""
204 if not self._valid:
205 return
206 self.castplayer.on_new_connection_status(status)
207
208 def added_to_multizone(self, group_uuid) -> None:
209 """Handle the cast added to a group."""
210 self.castplayer.logger.debug(
211 "%s is added to multizone: %s", self.castplayer.display_name, group_uuid
212 )
213 self.new_cast_status(self.castplayer.cc.status)
214
215 def removed_from_multizone(self, group_uuid) -> None:
216 """Handle the cast removed from a group."""
217 if not self._valid:
218 return
219 if group_uuid == self.castplayer.active_source:
220 mass = self.castplayer.mass
221 mass.loop.call_soon_threadsafe(self.castplayer.update_state)
222 self.castplayer.logger.debug(
223 "%s is removed from multizone: %s", self.castplayer.display_name, group_uuid
224 )
225 self.new_cast_status(self.castplayer.cc.status)
226
227 def multizone_new_cast_status(self, group_uuid, cast_status) -> None:
228 """Handle reception of a new CastStatus for a group."""
229 mass = self.castplayer.mass
230 if group_player := mass.players.get_player(group_uuid):
231 if TYPE_CHECKING:
232 assert isinstance(group_player, ChromecastPlayer)
233 if group_player.cc.media_controller.is_active:
234 self.castplayer.active_cast_group = group_uuid
235 elif group_uuid == self.castplayer.active_cast_group:
236 self.castplayer.active_cast_group = None
237
238 self.castplayer.logger.log(
239 VERBOSE_LOG_LEVEL,
240 "%s got new cast status for group: %s",
241 self.castplayer.display_name,
242 group_uuid,
243 )
244 self.new_cast_status(self.castplayer.cc.status)
245
246 def multizone_new_media_status(self, group_uuid, media_status) -> None:
247 """Handle reception of a new MediaStatus for a group."""
248 if not self._valid:
249 return
250 self.castplayer.logger.log(
251 VERBOSE_LOG_LEVEL,
252 "%s got new media_status for group: %s",
253 self.castplayer.display_name,
254 group_uuid,
255 )
256 self.castplayer.on_new_media_status(media_status)
257
258 def load_media_failed(self, queue_item_id, error_code) -> None:
259 """Call when media failed to load."""
260 self.castplayer.logger.warning(
261 "Load media failed: %s - error code: %s", queue_item_id, error_code
262 )
263
264 def invalidate(self) -> None:
265 """
266 Invalidate this status listener.
267
268 All following callbacks won't be forwarded.
269 """
270 if self.castplayer.cast_info.is_audio_group:
271 self._mz_mgr.remove_multizone(self._uuid)
272 else:
273 self._mz_mgr.deregister_listener(self._uuid, self)
274 self._valid = False
275