music-assistant-server

7.3 KBPY
airplay2.py
7.3 KB196 lines • python
1"""Logic for AirPlay 2 audio streaming to AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from typing import TYPE_CHECKING, cast
8
9from music_assistant_models.enums import PlaybackState
10
11from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
12from music_assistant.helpers.process import AsyncProcess
13from music_assistant.providers.airplay.constants import (
14    AIRPLAY2_MIN_LOG_LEVEL,
15    AIRPLAY_OUTPUT_BUFFER_MIN_DURATION_MS,
16    CONF_AIRPLAY_CREDENTIALS,
17    CONF_AIRPLAY_LATENCY,
18)
19from music_assistant.providers.airplay.helpers import get_cli_binary
20
21from ._protocol import AirPlayProtocol
22
23if TYPE_CHECKING:
24    from music_assistant.providers.airplay.provider import AirPlayProvider
25
26
27class AirPlay2Stream(AirPlayProtocol):
28    """
29    AirPlay 2 Audio Streamer.
30
31    Uses cliap2 (C executable based on owntone) for timestamped playback.
32    Audio is fed via stdin, commands via a named pipe.
33    """
34
35    @property
36    def _cli_loglevel(self) -> int:
37        """
38        Return a cliap2 aligned loglevel.
39
40        Ensures that minimum level required for required cliap2 stderr output is respected.
41        """
42        mass_level: int = 0
43        match self.prov.logger.level:
44            case logging.CRITICAL:
45                mass_level = 0
46            case logging.ERROR:
47                mass_level = 1
48            case logging.WARNING:
49                mass_level = 2
50            case logging.INFO:
51                mass_level = 3
52            case logging.DEBUG:
53                mass_level = 4
54        if self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
55            mass_level = 5
56        return max(mass_level, AIRPLAY2_MIN_LOG_LEVEL)
57
58    async def start(self, start_ntp: int) -> None:
59        """Start cliap2 process."""
60        assert self.player.airplay_discovery_info is not None
61        cli_binary = await get_cli_binary(self.player.protocol)
62        player_id = self.player.player_id
63        sync_adjust = self.player.config.get_value(CONF_SYNC_ADJUST)
64        assert isinstance(sync_adjust, int)
65        latency = self.player.config.get_value(
66            CONF_AIRPLAY_LATENCY, AIRPLAY_OUTPUT_BUFFER_MIN_DURATION_MS
67        )
68        assert isinstance(latency, int)
69
70        txt_kv: str = ""
71        for key, value in self.player.airplay_discovery_info.decoded_properties.items():
72            txt_kv += f'"{key}={value}" '
73
74        # cliap2 is the binary that handles the actual streaming to the player
75        # this binary leverages from the AirPlay2 support in OwnTone
76        # https://github.com/music-assistant/cliairplay
77
78        # Get AirPlay credentials if available (for Apple devices that require pairing)
79        airplay_credentials: str | None = None
80        if creds := self.player.config.get_value(CONF_AIRPLAY_CREDENTIALS):
81            airplay_credentials = str(creds)
82
83        # Get the provider's DACP ID for remote control callbacks
84        prov = cast("AirPlayProvider", self.prov)
85
86        cli_args = [
87            cli_binary,
88            "--name",
89            self.player.display_name,
90            "--hostname",
91            str(self.player.airplay_discovery_info.server),
92            "--address",
93            str(self.player.address),
94            "--port",
95            str(self.player.airplay_discovery_info.port),
96            "--txt",
97            txt_kv,
98            "--ntpstart",
99            str(start_ntp),
100            "--volume",
101            str(self.player.volume_level),
102            "--loglevel",
103            str(self._cli_loglevel),
104            "--dacp_id",
105            prov.dacp_id,
106            "--pipe",
107            "-",  # Use stdin for audio input
108            "--command_pipe",
109            self.commands_pipe.path,
110            "--latency",
111            str(latency),
112        ]
113
114        # Add credentials for authenticated AirPlay devices (Apple TV, HomePod, etc.)
115        # Native HAP pairing format: 192 hex chars = client_private_key(128) + server_public_key(64)
116        if airplay_credentials:
117            if len(airplay_credentials) == 192:
118                cli_args += ["--auth", airplay_credentials]
119            else:
120                self.player.logger.warning(
121                    "Invalid credentials length: %d (expected 192)",
122                    len(airplay_credentials),
123                )
124
125        self.player.logger.debug(
126            "Starting cliap2 process for player %s with args: %s",
127            player_id,
128            cli_args,
129        )
130        self._cli_proc = AsyncProcess(cli_args, stdin=True, stderr=True, name="cliap2")
131        await self._cli_proc.start()
132        # start reading the stderr of the cliap2 process from another task
133        self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
134
135    async def _stderr_reader(self) -> None:
136        """Monitor stderr for the running CLIap2 process."""
137        player = self.player
138        logger = player.logger
139        if not self._cli_proc:
140            return
141        async for line in self._cli_proc.iter_stderr():
142            if self._stopped:
143                break
144            if "player: event_play_start()" in line:
145                # successfully connected
146                self._connected.set()
147            if "Pause at" in line:
148                player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
149            elif "Restarted at" in line:
150                player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
151            elif "Starting at" in line:
152                # streaming has started
153                player.set_state_from_stream(
154                    state=PlaybackState.PLAYING, elapsed_time=0, stream=self
155                )
156            if "put delay detected" in line:
157                if "resetting all outputs" in line:
158                    logger.error(
159                        "Repeated output buffer low level detected, restarting playback..."
160                    )
161                    logger.info(
162                        "Recommended to increase 'Milliseconds of data to buffer' in player "
163                        "advanced settings"
164                    )
165                    self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
166                else:
167                    logger.warning("Output buffer low level detected!")
168            if "end of stream reached" in line:
169                logger.debug("End of stream reached")
170                break
171
172            # log cli stderr output in alignment with mass logging level
173            if "[FATAL]" in line:
174                logger.critical(line)
175            elif "[  LOG]" in line:
176                logger.error(line)
177            elif "[ INFO]" in line:
178                logger.info(line)
179            elif "[ WARN]" in line:
180                logger.warning(line)
181            elif "[DEBUG]" in line and "mass_timer_cb" in line:
182                # mass_timer_cb is very spammy, reduce it to verbose
183                logger.log(VERBOSE_LOG_LEVEL, line)
184            elif "[DEBUG]" in line:
185                logger.debug(line)
186            elif "[ SPAM]" in line:
187                logger.log(VERBOSE_LOG_LEVEL, line)
188            else:  # for now, log unknown lines as error
189                logger.error(line)
190            await asyncio.sleep(0)  # Yield to event loop
191
192        # ensure we're cleaned up afterwards (this also logs the returncode)
193        if not self._stopped:
194            self._stopped = True
195            self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
196