/
/
/
1"""Logic for AirPlay 2 audio streaming to AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from typing import TYPE_CHECKING, cast
8
9from music_assistant_models.enums import PlaybackState
10
11from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
12from music_assistant.helpers.process import AsyncProcess
13from music_assistant.providers.airplay.constants import (
14 AIRPLAY2_MIN_LOG_LEVEL,
15 AIRPLAY_OUTPUT_BUFFER_MIN_DURATION_MS,
16 CONF_AIRPLAY_CREDENTIALS,
17 CONF_AIRPLAY_LATENCY,
18)
19from music_assistant.providers.airplay.helpers import get_cli_binary
20
21from ._protocol import AirPlayProtocol
22
23if TYPE_CHECKING:
24 from music_assistant.providers.airplay.provider import AirPlayProvider
25
26
27class AirPlay2Stream(AirPlayProtocol):
28 """
29 AirPlay 2 Audio Streamer.
30
31 Uses cliap2 (C executable based on owntone) for timestamped playback.
32 Audio is fed via stdin, commands via a named pipe.
33 """
34
35 @property
36 def _cli_loglevel(self) -> int:
37 """
38 Return a cliap2 aligned loglevel.
39
40 Ensures that minimum level required for required cliap2 stderr output is respected.
41 """
42 mass_level: int = 0
43 match self.prov.logger.level:
44 case logging.CRITICAL:
45 mass_level = 0
46 case logging.ERROR:
47 mass_level = 1
48 case logging.WARNING:
49 mass_level = 2
50 case logging.INFO:
51 mass_level = 3
52 case logging.DEBUG:
53 mass_level = 4
54 if self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
55 mass_level = 5
56 return max(mass_level, AIRPLAY2_MIN_LOG_LEVEL)
57
58 async def start(self, start_ntp: int) -> None:
59 """Start cliap2 process."""
60 assert self.player.airplay_discovery_info is not None
61 cli_binary = await get_cli_binary(self.player.protocol)
62 player_id = self.player.player_id
63 sync_adjust = self.player.config.get_value(CONF_SYNC_ADJUST)
64 assert isinstance(sync_adjust, int)
65 latency = self.player.config.get_value(
66 CONF_AIRPLAY_LATENCY, AIRPLAY_OUTPUT_BUFFER_MIN_DURATION_MS
67 )
68 assert isinstance(latency, int)
69
70 txt_kv: str = ""
71 for key, value in self.player.airplay_discovery_info.decoded_properties.items():
72 txt_kv += f'"{key}={value}" '
73
74 # cliap2 is the binary that handles the actual streaming to the player
75 # this binary leverages from the AirPlay2 support in OwnTone
76 # https://github.com/music-assistant/cliairplay
77
78 # Get AirPlay credentials if available (for Apple devices that require pairing)
79 airplay_credentials: str | None = None
80 if creds := self.player.config.get_value(CONF_AIRPLAY_CREDENTIALS):
81 airplay_credentials = str(creds)
82
83 # Get the provider's DACP ID for remote control callbacks
84 prov = cast("AirPlayProvider", self.prov)
85
86 cli_args = [
87 cli_binary,
88 "--name",
89 self.player.display_name,
90 "--hostname",
91 str(self.player.airplay_discovery_info.server),
92 "--address",
93 str(self.player.address),
94 "--port",
95 str(self.player.airplay_discovery_info.port),
96 "--txt",
97 txt_kv,
98 "--ntpstart",
99 str(start_ntp),
100 "--volume",
101 str(self.player.volume_level),
102 "--loglevel",
103 str(self._cli_loglevel),
104 "--dacp_id",
105 prov.dacp_id,
106 "--pipe",
107 "-", # Use stdin for audio input
108 "--command_pipe",
109 self.commands_pipe.path,
110 "--latency",
111 str(latency),
112 ]
113
114 # Add credentials for authenticated AirPlay devices (Apple TV, HomePod, etc.)
115 # Native HAP pairing format: 192 hex chars = client_private_key(128) + server_public_key(64)
116 if airplay_credentials:
117 if len(airplay_credentials) == 192:
118 cli_args += ["--auth", airplay_credentials]
119 else:
120 self.player.logger.warning(
121 "Invalid credentials length: %d (expected 192)",
122 len(airplay_credentials),
123 )
124
125 self.player.logger.debug(
126 "Starting cliap2 process for player %s with args: %s",
127 player_id,
128 cli_args,
129 )
130 self._cli_proc = AsyncProcess(cli_args, stdin=True, stderr=True, name="cliap2")
131 await self._cli_proc.start()
132 # start reading the stderr of the cliap2 process from another task
133 self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
134
135 async def _stderr_reader(self) -> None:
136 """Monitor stderr for the running CLIap2 process."""
137 player = self.player
138 logger = player.logger
139 if not self._cli_proc:
140 return
141 async for line in self._cli_proc.iter_stderr():
142 if self._stopped:
143 break
144 if "player: event_play_start()" in line:
145 # successfully connected
146 self._connected.set()
147 if "Pause at" in line:
148 player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
149 elif "Restarted at" in line:
150 player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
151 elif "Starting at" in line:
152 # streaming has started
153 player.set_state_from_stream(
154 state=PlaybackState.PLAYING, elapsed_time=0, stream=self
155 )
156 if "put delay detected" in line:
157 if "resetting all outputs" in line:
158 logger.error(
159 "Repeated output buffer low level detected, restarting playback..."
160 )
161 logger.info(
162 "Recommended to increase 'Milliseconds of data to buffer' in player "
163 "advanced settings"
164 )
165 self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
166 else:
167 logger.warning("Output buffer low level detected!")
168 if "end of stream reached" in line:
169 logger.debug("End of stream reached")
170 break
171
172 # log cli stderr output in alignment with mass logging level
173 if "[FATAL]" in line:
174 logger.critical(line)
175 elif "[ LOG]" in line:
176 logger.error(line)
177 elif "[ INFO]" in line:
178 logger.info(line)
179 elif "[ WARN]" in line:
180 logger.warning(line)
181 elif "[DEBUG]" in line and "mass_timer_cb" in line:
182 # mass_timer_cb is very spammy, reduce it to verbose
183 logger.log(VERBOSE_LOG_LEVEL, line)
184 elif "[DEBUG]" in line:
185 logger.debug(line)
186 elif "[ SPAM]" in line:
187 logger.log(VERBOSE_LOG_LEVEL, line)
188 else: # for now, log unknown lines as error
189 logger.error(line)
190 await asyncio.sleep(0) # Yield to event loop
191
192 # ensure we're cleaned up afterwards (this also logs the returncode)
193 if not self._stopped:
194 self._stopped = True
195 self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
196