/
/
/
1"""Logic for AirPlay 2 audio streaming to AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from typing import TYPE_CHECKING, cast
8
9from music_assistant_models.enums import PlaybackState
10
11from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
12from music_assistant.helpers.process import AsyncProcess
13from music_assistant.providers.airplay.constants import (
14 AIRPLAY2_MIN_LOG_LEVEL,
15 CONF_AIRPLAY_CREDENTIALS,
16)
17from music_assistant.providers.airplay.helpers import get_cli_binary
18
19from ._protocol import AirPlayProtocol
20
21if TYPE_CHECKING:
22 from music_assistant.providers.airplay.provider import AirPlayProvider
23
24
25class AirPlay2Stream(AirPlayProtocol):
26 """
27 AirPlay 2 Audio Streamer.
28
29 Uses cliap2 (C executable based on owntone) for timestamped playback.
30 Audio is fed via stdin, commands via a named pipe.
31 """
32
33 @property
34 def _cli_loglevel(self) -> int:
35 """
36 Return a cliap2 aligned loglevel.
37
38 Ensures that minimum level required for required cliap2 stderr output is respected.
39 """
40 mass_level: int = 0
41 match self.prov.logger.level:
42 case logging.CRITICAL:
43 mass_level = 0
44 case logging.ERROR:
45 mass_level = 1
46 case logging.WARNING:
47 mass_level = 2
48 case logging.INFO:
49 mass_level = 3
50 case logging.DEBUG:
51 mass_level = 4
52 if self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
53 mass_level = 5
54 return max(mass_level, AIRPLAY2_MIN_LOG_LEVEL)
55
56 async def start(self, start_ntp: int) -> None:
57 """Start cliap2 process."""
58 cli_binary = await get_cli_binary(self.player.protocol)
59 assert self.player.airplay_discovery_info is not None
60
61 player_id = self.player.player_id
62 sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
63 assert isinstance(sync_adjust, int)
64
65 txt_kv: str = ""
66 for key, value in self.player.airplay_discovery_info.decoded_properties.items():
67 txt_kv += f'"{key}={value}" '
68
69 # cliap2 is the binary that handles the actual streaming to the player
70 # this binary leverages from the AirPlay2 support in owntones
71 # https://github.com/music-assistant/cliairplay
72
73 # Get AirPlay credentials if available (for Apple devices that require pairing)
74 airplay_credentials: str | None = None
75 if creds := self.player.config.get_value(CONF_AIRPLAY_CREDENTIALS):
76 airplay_credentials = str(creds)
77
78 # Get the provider's DACP ID for remote control callbacks
79 prov = cast("AirPlayProvider", self.prov)
80
81 cli_args = [
82 cli_binary,
83 "--name",
84 self.player.display_name,
85 "--hostname",
86 str(self.player.airplay_discovery_info.server),
87 "--address",
88 str(self.player.address),
89 "--port",
90 str(self.player.airplay_discovery_info.port),
91 "--txt",
92 txt_kv,
93 "--ntpstart",
94 str(start_ntp),
95 "--volume",
96 str(self.player.volume_level),
97 "--loglevel",
98 str(self._cli_loglevel),
99 "--dacp_id",
100 prov.dacp_id,
101 "--active_remote",
102 self.active_remote_id,
103 "--pipe",
104 "-", # Use stdin for audio input
105 "--command_pipe",
106 self.commands_pipe.path,
107 ]
108
109 # Add credentials for authenticated AirPlay devices (Apple TV, HomePod, etc.)
110 # Native HAP pairing format: 192 hex chars = client_private_key(128) + server_public_key(64)
111 if airplay_credentials:
112 if len(airplay_credentials) == 192:
113 cli_args += ["--auth", airplay_credentials]
114 else:
115 self.player.logger.warning(
116 "Invalid credentials length: %d (expected 192)",
117 len(airplay_credentials),
118 )
119
120 self.player.logger.debug(
121 "Starting cliap2 process for player %s with args: %s",
122 player_id,
123 cli_args,
124 )
125 self._cli_proc = AsyncProcess(cli_args, stdin=True, stderr=True, name="cliap2")
126 await self._cli_proc.start()
127 # start reading the stderr of the cliap2 process from another task
128 self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
129
130 async def _stderr_reader(self) -> None:
131 """Monitor stderr for the running CLIap2 process."""
132 player = self.player
133 logger = player.logger
134 if not self._cli_proc:
135 return
136 async for line in self._cli_proc.iter_stderr():
137 if self._stopped:
138 break
139 if "player: event_play_start()" in line:
140 # successfully connected
141 self._connected.set()
142 if "Pause at" in line:
143 player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
144 elif "Restarted at" in line:
145 player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
146 elif "Starting at" in line:
147 # streaming has started
148 player.set_state_from_stream(
149 state=PlaybackState.PLAYING, elapsed_time=0, stream=self
150 )
151 if "put delay detected" in line:
152 if "resetting all outputs" in line:
153 logger.error("High packet loss detected, restarting playback...")
154 self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
155 else:
156 logger.warning("Packet loss detected!")
157 if "end of stream reached" in line:
158 logger.debug("End of stream reached")
159 break
160
161 # log cli stderr output in alignment with mass logging level
162 if "[FATAL]" in line:
163 logger.critical(line)
164 elif "[ LOG]" in line:
165 logger.error(line)
166 elif "[ INFO]" in line:
167 logger.info(line)
168 elif "[ WARN]" in line:
169 logger.warning(line)
170 elif "[DEBUG]" in line and "mass_timer_cb" in line:
171 # mass_timer_cb is very spammy, reduce it to verbose
172 logger.log(VERBOSE_LOG_LEVEL, line)
173 elif "[DEBUG]" in line:
174 logger.debug(line)
175 elif "[ SPAM]" in line:
176 logger.log(VERBOSE_LOG_LEVEL, line)
177 else: # for now, log unknown lines as error
178 logger.error(line)
179 await asyncio.sleep(0) # Yield to event loop
180
181 # ensure we're cleaned up afterwards (this also logs the returncode)
182 if not self._stopped:
183 self._stopped = True
184 self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
185