/
/
/
1"""Various helpers/utilities for the AirPlay provider."""
2
3from __future__ import annotations
4
5import logging
6import os
7import platform
8import time
9from typing import TYPE_CHECKING
10
11from zeroconf import IPVersion
12
13from music_assistant.helpers.process import check_output
14from music_assistant.providers.airplay.constants import (
15 AIRPLAY_2_DEFAULT_MODELS,
16 BROKEN_AIRPLAY_MODELS,
17 StreamingProtocol,
18)
19
20if TYPE_CHECKING:
21 from zeroconf.asyncio import AsyncServiceInfo
22
23_LOGGER = logging.getLogger(__name__)
24
25# NTP epoch delta: difference between Unix epoch (1970) and NTP epoch (1900)
26NTP_EPOCH_DELTA = 0x83AA7E80 # 2208988800 seconds
27
28
29def convert_airplay_volume(value: float) -> int:
30 """Remap AirPlay Volume to 0..100 scale."""
31 airplay_min = -30
32 airplay_max = 0
33 normal_min = 0
34 normal_max = 100
35 portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min)
36 return int(portion + normal_min)
37
38
39def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]: # noqa: PLR0911
40 """Return Manufacturer and Model name from mdns info."""
41 manufacturer = info.decoded_properties.get("manufacturer")
42 model = info.decoded_properties.get("model")
43 if manufacturer and model:
44 return (manufacturer, model)
45 # try parse from am property
46 if am_property := info.decoded_properties.get("am"):
47 model = am_property
48
49 if not model:
50 model = "Unknown"
51
52 # parse apple model names
53 if model == "AudioAccessory6,1":
54 return ("Apple", "HomePod 2")
55 if model in ("AudioAccessory5,1", "AudioAccessorySingle5,1"):
56 return ("Apple", "HomePod Mini")
57 if model == "AppleTV1,1":
58 return ("Apple", "Apple TV Gen1")
59 if model == "AppleTV2,1":
60 return ("Apple", "Apple TV Gen2")
61 if model in ("AppleTV3,1", "AppleTV3,2"):
62 return ("Apple", "Apple TV Gen3")
63 if model == "AppleTV5,3":
64 return ("Apple", "Apple TV Gen4")
65 if model == "AppleTV6,2":
66 return ("Apple", "Apple TV 4K")
67 if model == "AppleTV11,1":
68 return ("Apple", "Apple TV 4K Gen2")
69 if model == "AppleTV14,1":
70 return ("Apple", "Apple TV 4K Gen3")
71 if model == "UPL-AMP":
72 return ("Ubiquiti Inc.", "UPL-AMP")
73 if "AirPort" in model:
74 return ("Apple", "AirPort Express")
75 if "AudioAccessory" in model:
76 return ("Apple", "HomePod")
77 if "AppleTV" in model:
78 model = "Apple TV"
79 manufacturer = "Apple"
80 # Detect Mac devices (Mac mini, MacBook, iMac, etc.)
81 # Model identifiers like: Mac16,11, MacBookPro18,3, iMac21,1
82 if model.startswith(("Mac", "iMac")):
83 # Parse Mac model to friendly name
84 if model.startswith("MacBookPro"):
85 return ("Apple", f"MacBook Pro ({model})")
86 if model.startswith("MacBookAir"):
87 return ("Apple", f"MacBook Air ({model})")
88 if model.startswith("MacBook"):
89 return ("Apple", f"MacBook ({model})")
90 if model.startswith("iMac"):
91 return ("Apple", f"iMac ({model})")
92 if model.startswith("Macmini"):
93 return ("Apple", f"Mac mini ({model})")
94 if model.startswith("MacPro"):
95 return ("Apple", f"Mac Pro ({model})")
96 if model.startswith("MacStudio"):
97 return ("Apple", f"Mac Studio ({model})")
98 # Generic Mac device (e.g. Mac16,11 for Mac mini M4)
99 return ("Apple", f"Mac ({model})")
100
101 return (manufacturer or "AirPlay", model)
102
103
104def get_primary_ip_address_from_zeroconf(discovery_info: AsyncServiceInfo) -> str | None:
105 """Get primary IP address from zeroconf discovery info."""
106 for address in discovery_info.parsed_addresses(IPVersion.V4Only):
107 if address.startswith("127"):
108 # filter out loopback address
109 continue
110 if address.startswith("169.254"):
111 # filter out APIPA address
112 continue
113 return address
114 return None
115
116
117def is_broken_airplay_model(manufacturer: str, model: str) -> bool:
118 """Check if a model is known to have broken AirPlay support."""
119 for broken_manufacturer, broken_model in BROKEN_AIRPLAY_MODELS:
120 if broken_manufacturer in (manufacturer, "*") and broken_model in (model, "*"):
121 return True
122 return False
123
124
125def is_airplay2_preferred_model(manufacturer: str, model: str) -> bool:
126 """Check if a model is known to work better with AirPlay 2 protocol."""
127 for ap2_manufacturer, ap2_model in AIRPLAY_2_DEFAULT_MODELS:
128 if ap2_manufacturer in (manufacturer, "*") and ap2_model in (model, "*"):
129 return True
130 return False
131
132
133async def get_cli_binary(protocol: StreamingProtocol) -> str:
134 """Find the correct raop/airplay binary belonging to the platform.
135
136 Args:
137 protocol: The streaming protocol (RAOP or AIRPLAY2)
138
139 Returns:
140 Path to the CLI binary
141
142 Raises:
143 RuntimeError: If the binary cannot be found
144 """
145
146 async def check_binary(cli_path: str) -> str | None:
147 try:
148 if protocol == StreamingProtocol.RAOP:
149 args = [
150 cli_path,
151 "-check",
152 ]
153 passing_output = "cliraop check"
154 else:
155 args = [
156 cli_path,
157 "--testrun",
158 ]
159 passing_output = "cliap2 check"
160
161 returncode, output = await check_output(*args)
162 _LOGGER.debug("%s returned %d with output: %s", cli_path, int(returncode), str(output))
163 if returncode == 0 and output.strip().decode() == passing_output:
164 return cli_path
165 except OSError:
166 pass
167 return None
168
169 base_path = os.path.join(os.path.dirname(__file__), "bin")
170 system = platform.system().lower().replace("darwin", "macos")
171 architecture = platform.machine().lower()
172
173 if protocol == StreamingProtocol.RAOP:
174 package = "cliraop"
175 elif protocol == StreamingProtocol.AIRPLAY2:
176 package = "cliap2"
177 else:
178 raise RuntimeError(f"Unsupported streaming protocol requested: {protocol}")
179
180 if bridge_binary := await check_binary(
181 os.path.join(base_path, f"{package}-{system}-{architecture}")
182 ):
183 return bridge_binary
184
185 msg = (
186 f"Unable to locate {protocol.name} CLI stream binary {package} for {system}/{architecture}"
187 )
188 raise RuntimeError(msg)
189
190
191def get_ntp_timestamp() -> int:
192 """
193 Get current NTP timestamp (64-bit).
194
195 Returns:
196 int: 64-bit NTP timestamp (upper 32 bits = seconds, lower 32 bits = fraction)
197 """
198 # Get current Unix timestamp with microsecond precision
199 current_time = time.time()
200
201 # Split into seconds and microseconds
202 seconds = int(current_time)
203 microseconds = int((current_time - seconds) * 1_000_000)
204
205 # Convert to NTP epoch (add offset from 1970 to 1900)
206 ntp_seconds = seconds + NTP_EPOCH_DELTA
207
208 # Convert microseconds to NTP fraction (2^32 parts per second)
209 # fraction = (microseconds * 2^32) / 1_000_000
210 ntp_fraction = int((microseconds << 32) / 1_000_000)
211
212 # Combine into 64-bit value
213 return (ntp_seconds << 32) | ntp_fraction
214
215
216def ntp_to_seconds_fraction(ntp_timestamp: int) -> tuple[int, int]:
217 """
218 Split NTP timestamp into seconds and fraction components.
219
220 Args:
221 ntp_timestamp: 64-bit NTP timestamp
222
223 Returns:
224 tuple: (seconds, fraction)
225 """
226 seconds = ntp_timestamp >> 32
227 fraction = ntp_timestamp & 0xFFFFFFFF
228 return seconds, fraction
229
230
231def ntp_to_unix_time(ntp_timestamp: int) -> float:
232 """
233 Convert NTP timestamp to Unix timestamp (float).
234
235 Args:
236 ntp_timestamp: 64-bit NTP timestamp
237
238 Returns:
239 float: Unix timestamp (seconds since 1970-01-01)
240 """
241 seconds = ntp_timestamp >> 32
242 fraction = ntp_timestamp & 0xFFFFFFFF
243
244 # Convert back to Unix epoch
245 unix_seconds = seconds - NTP_EPOCH_DELTA
246
247 # Convert fraction to microseconds
248 microseconds = (fraction * 1_000_000) >> 32
249
250 return unix_seconds + (microseconds / 1_000_000)
251
252
253def unix_time_to_ntp(unix_timestamp: float) -> int:
254 """
255 Convert Unix timestamp (float) to NTP timestamp.
256
257 Args:
258 unix_timestamp: Unix timestamp (seconds since 1970-01-01)
259
260 Returns:
261 int: 64-bit NTP timestamp
262 """
263 seconds = int(unix_timestamp)
264 microseconds = int((unix_timestamp - seconds) * 1_000_000)
265
266 # Convert to NTP epoch
267 ntp_seconds = seconds + NTP_EPOCH_DELTA
268
269 # Convert microseconds to NTP fraction
270 ntp_fraction = int((microseconds << 32) / 1_000_000)
271
272 return (ntp_seconds << 32) | ntp_fraction
273
274
275def player_id_to_mac_address(player_id: str) -> str:
276 """Convert a player_id to a MAC address-like string."""
277 # the player_id is the mac address prefixed with "ap"
278 hex_str = player_id.replace("ap", "").upper()
279 return ":".join(hex_str[i : i + 2] for i in range(0, 12, 2))
280
281
282def generate_active_remote_id(mac_address: str) -> str:
283 """
284 Generate an Active-Remote ID for DACP communication.
285
286 The Active-Remote ID is used to match DACP callbacks from devices to the
287 correct stream. This function generates a consistent ID based on the
288 player_id (=macaddress, =device id), converted to uint32).
289
290 :return: Active-Remote ID as decimal string.
291 """
292 # Convert MAC address format to uint32
293 # Remove colons: "AA:BB:CC:DD:EE:FF" -> "AABBCCDDEEFF"
294 hex_str = mac_address.replace(":", "").upper()
295 # Parse as uint64 and truncate to uint32 (lower 32 bits)
296 device_id_u64 = int(hex_str, 16)
297 device_id_u32 = device_id_u64 & 0xFFFFFFFF
298 return str(device_id_u32)
299
300
301def add_seconds_to_ntp(ntp_timestamp: int, seconds: float) -> int:
302 """
303 Add seconds to an NTP timestamp.
304
305 Args:
306 ntp_timestamp: 64-bit NTP timestamp
307 seconds: Number of seconds to add (can be fractional)
308
309 Returns:
310 int: New NTP timestamp with seconds added
311 """
312 # Extract whole seconds and fraction
313 whole_seconds = int(seconds)
314 fraction = seconds - whole_seconds
315
316 # Convert to NTP format (upper 32 bits = seconds, lower 32 bits = fraction)
317 ntp_seconds = whole_seconds << 32
318 ntp_fraction = int(fraction * (1 << 32))
319
320 return ntp_timestamp + ntp_seconds + ntp_fraction
321