/
/
/
1"""Various helpers/utilities for the AirPlay provider."""
2
3from __future__ import annotations
4
5import logging
6import os
7import platform
8import time
9from typing import TYPE_CHECKING
10
11from zeroconf import IPVersion
12
13from music_assistant.helpers.process import check_output
14from music_assistant.providers.airplay.constants import (
15 AIRPLAY_2_DEFAULT_MODELS,
16 BROKEN_AIRPLAY_MODELS,
17 StreamingProtocol,
18)
19
20if TYPE_CHECKING:
21 from zeroconf.asyncio import AsyncServiceInfo
22
23_LOGGER = logging.getLogger(__name__)
24
25# NTP epoch delta: difference between Unix epoch (1970) and NTP epoch (1900)
26NTP_EPOCH_DELTA = 0x83AA7E80 # 2208988800 seconds
27
28
29def convert_airplay_volume(value: float) -> int:
30 """Remap AirPlay Volume to 0..100 scale."""
31 airplay_min = -30
32 airplay_max = 0
33 normal_min = 0
34 normal_max = 100
35 portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min)
36 return int(portion + normal_min)
37
38
39def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]: # noqa: PLR0911
40 """Return Manufacturer and Model name from mdns info."""
41 manufacturer = info.decoded_properties.get("manufacturer")
42 model = info.decoded_properties.get("model")
43 if manufacturer and model:
44 return (manufacturer, model)
45 # try parse from am property
46 if am_property := info.decoded_properties.get("am"):
47 model = am_property
48
49 if not model:
50 model = "Unknown"
51
52 # parse apple model names
53 if model == "AudioAccessory6,1":
54 return ("Apple", "HomePod 2")
55 if model in ("AudioAccessory5,1", "AudioAccessorySingle5,1"):
56 return ("Apple", "HomePod Mini")
57 if model == "AppleTV1,1":
58 return ("Apple", "Apple TV Gen1")
59 if model == "AppleTV2,1":
60 return ("Apple", "Apple TV Gen2")
61 if model in ("AppleTV3,1", "AppleTV3,2"):
62 return ("Apple", "Apple TV Gen3")
63 if model == "AppleTV5,3":
64 return ("Apple", "Apple TV Gen4")
65 if model == "AppleTV6,2":
66 return ("Apple", "Apple TV 4K")
67 if model == "AppleTV11,1":
68 return ("Apple", "Apple TV 4K Gen2")
69 if model == "AppleTV14,1":
70 return ("Apple", "Apple TV 4K Gen3")
71 if model == "UPL-AMP":
72 return ("Ubiquiti Inc.", "UPL-AMP")
73 if "AirPort" in model:
74 return ("Apple", "AirPort Express")
75 if "AudioAccessory" in model:
76 return ("Apple", "HomePod")
77 if "AppleTV" in model:
78 model = "Apple TV"
79 manufacturer = "Apple"
80 # Detect Mac devices (Mac mini, MacBook, iMac, etc.)
81 # Model identifiers like: Mac16,11, MacBookPro18,3, iMac21,1
82 if model.startswith(("Mac", "iMac")):
83 # Parse Mac model to friendly name
84 if model.startswith("MacBookPro"):
85 return ("Apple", f"MacBook Pro ({model})")
86 if model.startswith("MacBookAir"):
87 return ("Apple", f"MacBook Air ({model})")
88 if model.startswith("MacBook"):
89 return ("Apple", f"MacBook ({model})")
90 if model.startswith("iMac"):
91 return ("Apple", f"iMac ({model})")
92 if model.startswith("Macmini"):
93 return ("Apple", f"Mac mini ({model})")
94 if model.startswith("MacPro"):
95 return ("Apple", f"Mac Pro ({model})")
96 if model.startswith("MacStudio"):
97 return ("Apple", f"Mac Studio ({model})")
98 # Generic Mac device (e.g. Mac16,11 for Mac mini M4)
99 return ("Apple", f"Mac ({model})")
100
101 return (manufacturer or "AirPlay", model)
102
103
104def get_primary_ip_address_from_zeroconf(discovery_info: AsyncServiceInfo) -> str | None:
105 """Get primary IP address from zeroconf discovery info."""
106 for address in discovery_info.parsed_addresses(IPVersion.V4Only):
107 if address.startswith("127"):
108 # filter out loopback address
109 continue
110 if address.startswith("169.254"):
111 # filter out APIPA address
112 continue
113 return address
114 return None
115
116
117def is_broken_airplay_model(manufacturer: str, model: str) -> bool:
118 """Check if a model is known to have broken AirPlay support."""
119 for broken_manufacturer, broken_model in BROKEN_AIRPLAY_MODELS:
120 if broken_manufacturer in (manufacturer, "*") and broken_model in (model, "*"):
121 return True
122 return False
123
124
125def is_airplay2_preferred_model(manufacturer: str, model: str) -> bool:
126 """Check if a model is known to work better with AirPlay 2 protocol."""
127 for ap2_manufacturer, ap2_model in AIRPLAY_2_DEFAULT_MODELS:
128 if ap2_manufacturer in (manufacturer, "*") and ap2_model in (model, "*"):
129 return True
130 return False
131
132
133def is_apple_device(manufacturer: str) -> bool:
134 """Check if a device is an Apple device with native AirPlay support.
135
136 Apple devices (HomePod, Apple TV, Mac, etc.) have native AirPlay support
137 and should be exposed as PlayerType.PLAYER. Non-Apple devices with AirPlay
138 support should be exposed as PlayerType.PROTOCOL.
139 """
140 return manufacturer.lower().startswith("apple")
141
142
143async def get_cli_binary(protocol: StreamingProtocol) -> str:
144 """Find the correct raop/airplay binary belonging to the platform.
145
146 Args:
147 protocol: The streaming protocol (RAOP or AIRPLAY2)
148
149 Returns:
150 Path to the CLI binary
151
152 Raises:
153 RuntimeError: If the binary cannot be found
154 """
155
156 async def check_binary(cli_path: str) -> str | None:
157 try:
158 if protocol == StreamingProtocol.RAOP:
159 args = [
160 cli_path,
161 "-check",
162 ]
163 passing_output = "cliraop check"
164 else:
165 args = [
166 cli_path,
167 "--testrun",
168 ]
169 passing_output = "cliap2 check"
170
171 returncode, output = await check_output(*args)
172 _LOGGER.debug("%s returned %d with output: %s", cli_path, int(returncode), str(output))
173 if returncode == 0 and output.strip().decode() == passing_output:
174 return cli_path
175 except OSError:
176 pass
177 return None
178
179 base_path = os.path.join(os.path.dirname(__file__), "bin")
180 system = platform.system().lower().replace("darwin", "macos")
181 architecture = platform.machine().lower()
182
183 if protocol == StreamingProtocol.RAOP:
184 package = "cliraop"
185 elif protocol == StreamingProtocol.AIRPLAY2:
186 package = "cliap2"
187 else:
188 raise RuntimeError(f"Unsupported streaming protocol requested: {protocol}")
189
190 if bridge_binary := await check_binary(
191 os.path.join(base_path, f"{package}-{system}-{architecture}")
192 ):
193 return bridge_binary
194
195 msg = (
196 f"Unable to locate {protocol.name} CLI stream binary {package} for {system}/{architecture}"
197 )
198 raise RuntimeError(msg)
199
200
201def get_ntp_timestamp() -> int:
202 """
203 Get current NTP timestamp (64-bit).
204
205 Returns:
206 int: 64-bit NTP timestamp (upper 32 bits = seconds, lower 32 bits = fraction)
207 """
208 # Get current Unix timestamp with microsecond precision
209 current_time = time.time()
210
211 # Split into seconds and microseconds
212 seconds = int(current_time)
213 microseconds = int((current_time - seconds) * 1_000_000)
214
215 # Convert to NTP epoch (add offset from 1970 to 1900)
216 ntp_seconds = seconds + NTP_EPOCH_DELTA
217
218 # Convert microseconds to NTP fraction (2^32 parts per second)
219 # fraction = (microseconds * 2^32) / 1_000_000
220 ntp_fraction = int((microseconds << 32) / 1_000_000)
221
222 # Combine into 64-bit value
223 return (ntp_seconds << 32) | ntp_fraction
224
225
226def ntp_to_seconds_fraction(ntp_timestamp: int) -> tuple[int, int]:
227 """
228 Split NTP timestamp into seconds and fraction components.
229
230 Args:
231 ntp_timestamp: 64-bit NTP timestamp
232
233 Returns:
234 tuple: (seconds, fraction)
235 """
236 seconds = ntp_timestamp >> 32
237 fraction = ntp_timestamp & 0xFFFFFFFF
238 return seconds, fraction
239
240
241def ntp_to_unix_time(ntp_timestamp: int) -> float:
242 """
243 Convert NTP timestamp to Unix timestamp (float).
244
245 Args:
246 ntp_timestamp: 64-bit NTP timestamp
247
248 Returns:
249 float: Unix timestamp (seconds since 1970-01-01)
250 """
251 seconds = ntp_timestamp >> 32
252 fraction = ntp_timestamp & 0xFFFFFFFF
253
254 # Convert back to Unix epoch
255 unix_seconds = seconds - NTP_EPOCH_DELTA
256
257 # Convert fraction to microseconds
258 microseconds = (fraction * 1_000_000) >> 32
259
260 return unix_seconds + (microseconds / 1_000_000)
261
262
263def unix_time_to_ntp(unix_timestamp: float) -> int:
264 """
265 Convert Unix timestamp (float) to NTP timestamp.
266
267 Args:
268 unix_timestamp: Unix timestamp (seconds since 1970-01-01)
269
270 Returns:
271 int: 64-bit NTP timestamp
272 """
273 seconds = int(unix_timestamp)
274 microseconds = int((unix_timestamp - seconds) * 1_000_000)
275
276 # Convert to NTP epoch
277 ntp_seconds = seconds + NTP_EPOCH_DELTA
278
279 # Convert microseconds to NTP fraction
280 ntp_fraction = int((microseconds << 32) / 1_000_000)
281
282 return (ntp_seconds << 32) | ntp_fraction
283
284
285def player_id_to_mac_address(player_id: str) -> str:
286 """Convert a player_id to a MAC address-like string."""
287 # the player_id is the mac address prefixed with "ap"
288 hex_str = player_id.replace("ap", "").upper()
289 return ":".join(hex_str[i : i + 2] for i in range(0, 12, 2))
290
291
292def generate_active_remote_id(mac_address: str) -> str:
293 """
294 Generate an Active-Remote ID for DACP communication.
295
296 The Active-Remote ID is used to match DACP callbacks from devices to the
297 correct stream. This function generates a consistent ID based on the
298 player_id (=macaddress, =device id), converted to uint32).
299
300 :return: Active-Remote ID as decimal string.
301 """
302 # Convert MAC address format to uint32
303 # Remove colons: "AA:BB:CC:DD:EE:FF" -> "AABBCCDDEEFF"
304 hex_str = mac_address.replace(":", "").upper()
305 # Parse as uint64 and truncate to uint32 (lower 32 bits)
306 device_id_u64 = int(hex_str, 16)
307 device_id_u32 = device_id_u64 & 0xFFFFFFFF
308 return str(device_id_u32)
309
310
311def add_seconds_to_ntp(ntp_timestamp: int, seconds: float) -> int:
312 """
313 Add seconds to an NTP timestamp.
314
315 Args:
316 ntp_timestamp: 64-bit NTP timestamp
317 seconds: Number of seconds to add (can be fractional)
318
319 Returns:
320 int: New NTP timestamp with seconds added
321 """
322 # Extract whole seconds and fraction
323 whole_seconds = int(seconds)
324 fraction = seconds - whole_seconds
325
326 # Convert to NTP format (upper 32 bits = seconds, lower 32 bits = fraction)
327 ntp_seconds = whole_seconds << 32
328 ntp_fraction = int(fraction * (1 << 32))
329
330 return ntp_timestamp + ntp_seconds + ntp_fraction
331