music-assistant-server

10.5 KBPY
helpers.py
10.5 KB331 lines • python
1"""Various helpers/utilities for the AirPlay provider."""
2
3from __future__ import annotations
4
5import logging
6import os
7import platform
8import time
9from typing import TYPE_CHECKING
10
11from zeroconf import IPVersion
12
13from music_assistant.helpers.process import check_output
14from music_assistant.providers.airplay.constants import (
15    AIRPLAY_2_DEFAULT_MODELS,
16    BROKEN_AIRPLAY_MODELS,
17    StreamingProtocol,
18)
19
20if TYPE_CHECKING:
21    from zeroconf.asyncio import AsyncServiceInfo
22
23_LOGGER = logging.getLogger(__name__)
24
25# NTP epoch delta: difference between Unix epoch (1970) and NTP epoch (1900)
26NTP_EPOCH_DELTA = 0x83AA7E80  # 2208988800 seconds
27
28
29def convert_airplay_volume(value: float) -> int:
30    """Remap AirPlay Volume to 0..100 scale."""
31    airplay_min = -30
32    airplay_max = 0
33    normal_min = 0
34    normal_max = 100
35    portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min)
36    return int(portion + normal_min)
37
38
39def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]:  # noqa: PLR0911
40    """Return Manufacturer and Model name from mdns info."""
41    manufacturer = info.decoded_properties.get("manufacturer")
42    model = info.decoded_properties.get("model")
43    if manufacturer and model:
44        return (manufacturer, model)
45    # try parse from am property
46    if am_property := info.decoded_properties.get("am"):
47        model = am_property
48
49    if not model:
50        model = "Unknown"
51
52    # parse apple model names
53    if model == "AudioAccessory6,1":
54        return ("Apple", "HomePod 2")
55    if model in ("AudioAccessory5,1", "AudioAccessorySingle5,1"):
56        return ("Apple", "HomePod Mini")
57    if model == "AppleTV1,1":
58        return ("Apple", "Apple TV Gen1")
59    if model == "AppleTV2,1":
60        return ("Apple", "Apple TV Gen2")
61    if model in ("AppleTV3,1", "AppleTV3,2"):
62        return ("Apple", "Apple TV Gen3")
63    if model == "AppleTV5,3":
64        return ("Apple", "Apple TV Gen4")
65    if model == "AppleTV6,2":
66        return ("Apple", "Apple TV 4K")
67    if model == "AppleTV11,1":
68        return ("Apple", "Apple TV 4K Gen2")
69    if model == "AppleTV14,1":
70        return ("Apple", "Apple TV 4K Gen3")
71    if model == "UPL-AMP":
72        return ("Ubiquiti Inc.", "UPL-AMP")
73    if "AirPort" in model:
74        return ("Apple", "AirPort Express")
75    if "AudioAccessory" in model:
76        return ("Apple", "HomePod")
77    if "AppleTV" in model:
78        model = "Apple TV"
79        manufacturer = "Apple"
80    # Detect Mac devices (Mac mini, MacBook, iMac, etc.)
81    # Model identifiers like: Mac16,11, MacBookPro18,3, iMac21,1
82    if model.startswith(("Mac", "iMac")):
83        # Parse Mac model to friendly name
84        if model.startswith("MacBookPro"):
85            return ("Apple", f"MacBook Pro ({model})")
86        if model.startswith("MacBookAir"):
87            return ("Apple", f"MacBook Air ({model})")
88        if model.startswith("MacBook"):
89            return ("Apple", f"MacBook ({model})")
90        if model.startswith("iMac"):
91            return ("Apple", f"iMac ({model})")
92        if model.startswith("Macmini"):
93            return ("Apple", f"Mac mini ({model})")
94        if model.startswith("MacPro"):
95            return ("Apple", f"Mac Pro ({model})")
96        if model.startswith("MacStudio"):
97            return ("Apple", f"Mac Studio ({model})")
98        # Generic Mac device (e.g. Mac16,11 for Mac mini M4)
99        return ("Apple", f"Mac ({model})")
100
101    return (manufacturer or "AirPlay", model)
102
103
104def get_primary_ip_address_from_zeroconf(discovery_info: AsyncServiceInfo) -> str | None:
105    """Get primary IP address from zeroconf discovery info."""
106    for address in discovery_info.parsed_addresses(IPVersion.V4Only):
107        if address.startswith("127"):
108            # filter out loopback address
109            continue
110        if address.startswith("169.254"):
111            # filter out APIPA address
112            continue
113        return address
114    return None
115
116
117def is_broken_airplay_model(manufacturer: str, model: str) -> bool:
118    """Check if a model is known to have broken AirPlay support."""
119    for broken_manufacturer, broken_model in BROKEN_AIRPLAY_MODELS:
120        if broken_manufacturer in (manufacturer, "*") and broken_model in (model, "*"):
121            return True
122    return False
123
124
125def is_airplay2_preferred_model(manufacturer: str, model: str) -> bool:
126    """Check if a model is known to work better with AirPlay 2 protocol."""
127    for ap2_manufacturer, ap2_model in AIRPLAY_2_DEFAULT_MODELS:
128        if ap2_manufacturer in (manufacturer, "*") and ap2_model in (model, "*"):
129            return True
130    return False
131
132
133def is_apple_device(manufacturer: str) -> bool:
134    """Check if a device is an Apple device with native AirPlay support.
135
136    Apple devices (HomePod, Apple TV, Mac, etc.) have native AirPlay support
137    and should be exposed as PlayerType.PLAYER. Non-Apple devices with AirPlay
138    support should be exposed as PlayerType.PROTOCOL.
139    """
140    return manufacturer.lower().startswith("apple")
141
142
143async def get_cli_binary(protocol: StreamingProtocol) -> str:
144    """Find the correct raop/airplay binary belonging to the platform.
145
146    Args:
147        protocol: The streaming protocol (RAOP or AIRPLAY2)
148
149    Returns:
150        Path to the CLI binary
151
152    Raises:
153        RuntimeError: If the binary cannot be found
154    """
155
156    async def check_binary(cli_path: str) -> str | None:
157        try:
158            if protocol == StreamingProtocol.RAOP:
159                args = [
160                    cli_path,
161                    "-check",
162                ]
163                passing_output = "cliraop check"
164            else:
165                args = [
166                    cli_path,
167                    "--testrun",
168                ]
169                passing_output = "cliap2 check"
170
171            returncode, output = await check_output(*args)
172            _LOGGER.debug("%s returned %d with output: %s", cli_path, int(returncode), str(output))
173            if returncode == 0 and output.strip().decode() == passing_output:
174                return cli_path
175        except OSError:
176            pass
177        return None
178
179    base_path = os.path.join(os.path.dirname(__file__), "bin")
180    system = platform.system().lower().replace("darwin", "macos")
181    architecture = platform.machine().lower()
182
183    if protocol == StreamingProtocol.RAOP:
184        package = "cliraop"
185    elif protocol == StreamingProtocol.AIRPLAY2:
186        package = "cliap2"
187    else:
188        raise RuntimeError(f"Unsupported streaming protocol requested: {protocol}")
189
190    if bridge_binary := await check_binary(
191        os.path.join(base_path, f"{package}-{system}-{architecture}")
192    ):
193        return bridge_binary
194
195    msg = (
196        f"Unable to locate {protocol.name} CLI stream binary {package} for {system}/{architecture}"
197    )
198    raise RuntimeError(msg)
199
200
201def get_ntp_timestamp() -> int:
202    """
203    Get current NTP timestamp (64-bit).
204
205    Returns:
206        int: 64-bit NTP timestamp (upper 32 bits = seconds, lower 32 bits = fraction)
207    """
208    # Get current Unix timestamp with microsecond precision
209    current_time = time.time()
210
211    # Split into seconds and microseconds
212    seconds = int(current_time)
213    microseconds = int((current_time - seconds) * 1_000_000)
214
215    # Convert to NTP epoch (add offset from 1970 to 1900)
216    ntp_seconds = seconds + NTP_EPOCH_DELTA
217
218    # Convert microseconds to NTP fraction (2^32 parts per second)
219    # fraction = (microseconds * 2^32) / 1_000_000
220    ntp_fraction = int((microseconds << 32) / 1_000_000)
221
222    # Combine into 64-bit value
223    return (ntp_seconds << 32) | ntp_fraction
224
225
226def ntp_to_seconds_fraction(ntp_timestamp: int) -> tuple[int, int]:
227    """
228    Split NTP timestamp into seconds and fraction components.
229
230    Args:
231        ntp_timestamp: 64-bit NTP timestamp
232
233    Returns:
234        tuple: (seconds, fraction)
235    """
236    seconds = ntp_timestamp >> 32
237    fraction = ntp_timestamp & 0xFFFFFFFF
238    return seconds, fraction
239
240
241def ntp_to_unix_time(ntp_timestamp: int) -> float:
242    """
243    Convert NTP timestamp to Unix timestamp (float).
244
245    Args:
246        ntp_timestamp: 64-bit NTP timestamp
247
248    Returns:
249        float: Unix timestamp (seconds since 1970-01-01)
250    """
251    seconds = ntp_timestamp >> 32
252    fraction = ntp_timestamp & 0xFFFFFFFF
253
254    # Convert back to Unix epoch
255    unix_seconds = seconds - NTP_EPOCH_DELTA
256
257    # Convert fraction to microseconds
258    microseconds = (fraction * 1_000_000) >> 32
259
260    return unix_seconds + (microseconds / 1_000_000)
261
262
263def unix_time_to_ntp(unix_timestamp: float) -> int:
264    """
265    Convert Unix timestamp (float) to NTP timestamp.
266
267    Args:
268        unix_timestamp: Unix timestamp (seconds since 1970-01-01)
269
270    Returns:
271        int: 64-bit NTP timestamp
272    """
273    seconds = int(unix_timestamp)
274    microseconds = int((unix_timestamp - seconds) * 1_000_000)
275
276    # Convert to NTP epoch
277    ntp_seconds = seconds + NTP_EPOCH_DELTA
278
279    # Convert microseconds to NTP fraction
280    ntp_fraction = int((microseconds << 32) / 1_000_000)
281
282    return (ntp_seconds << 32) | ntp_fraction
283
284
285def player_id_to_mac_address(player_id: str) -> str:
286    """Convert a player_id to a MAC address-like string."""
287    # the player_id is the mac address prefixed with "ap"
288    hex_str = player_id.replace("ap", "").upper()
289    return ":".join(hex_str[i : i + 2] for i in range(0, 12, 2))
290
291
292def generate_active_remote_id(mac_address: str) -> str:
293    """
294    Generate an Active-Remote ID for DACP communication.
295
296    The Active-Remote ID is used to match DACP callbacks from devices to the
297    correct stream. This function generates a consistent ID based on the
298    player_id (=macaddress, =device id), converted to uint32).
299
300    :return: Active-Remote ID as decimal string.
301    """
302    # Convert MAC address format to uint32
303    # Remove colons: "AA:BB:CC:DD:EE:FF" -> "AABBCCDDEEFF"
304    hex_str = mac_address.replace(":", "").upper()
305    # Parse as uint64 and truncate to uint32 (lower 32 bits)
306    device_id_u64 = int(hex_str, 16)
307    device_id_u32 = device_id_u64 & 0xFFFFFFFF
308    return str(device_id_u32)
309
310
311def add_seconds_to_ntp(ntp_timestamp: int, seconds: float) -> int:
312    """
313    Add seconds to an NTP timestamp.
314
315    Args:
316        ntp_timestamp: 64-bit NTP timestamp
317        seconds: Number of seconds to add (can be fractional)
318
319    Returns:
320        int: New NTP timestamp with seconds added
321    """
322    # Extract whole seconds and fraction
323    whole_seconds = int(seconds)
324    fraction = seconds - whole_seconds
325
326    # Convert to NTP format (upper 32 bits = seconds, lower 32 bits = fraction)
327    ntp_seconds = whole_seconds << 32
328    ntp_fraction = int(fraction * (1 << 32))
329
330    return ntp_timestamp + ntp_seconds + ntp_fraction
331