/
/
/
1"""Unified AirPlay/RAOP stream session logic for AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections.abc import AsyncGenerator
8from contextlib import suppress
9from typing import TYPE_CHECKING
10
11from music_assistant_models.errors import PlayerCommandFailed
12
13from music_assistant.constants import CONF_SYNC_ADJUST
14from music_assistant.helpers.audio import get_player_filter_params
15from music_assistant.helpers.ffmpeg import FFMpeg
16from music_assistant.providers.airplay.helpers import ntp_to_unix_time, unix_time_to_ntp
17
18from .constants import (
19 AIRPLAY2_CONNECT_TIME_MS,
20 CONF_ENABLE_LATE_JOIN,
21 ENABLE_LATE_JOIN_DEFAULT,
22 RAOP_CONNECT_TIME_MS,
23 StreamingProtocol,
24)
25from .protocols.airplay2 import AirPlay2Stream
26from .protocols.raop import RaopStream
27
28if TYPE_CHECKING:
29 from music_assistant_models.media_items import AudioFormat
30
31 from .player import AirPlayPlayer
32 from .provider import AirPlayProvider
33
34
35class AirPlayStreamSession:
36 """Stream session (RAOP or AirPlay2) to one or more players."""
37
38 def __init__(
39 self,
40 airplay_provider: AirPlayProvider,
41 sync_clients: list[AirPlayPlayer],
42 pcm_format: AudioFormat,
43 ) -> None:
44 """Initialize AirPlayStreamSession.
45
46 :param airplay_provider: The AirPlay provider instance.
47 :param sync_clients: List of AirPlay players to stream to.
48 :param pcm_format: PCM format of the input stream.
49 """
50 assert sync_clients
51 self.prov = airplay_provider
52 self.mass = airplay_provider.mass
53 self.pcm_format = pcm_format
54 self.sync_clients = sync_clients
55 self._audio_source_task: asyncio.Task[None] | None = None
56 self._player_ffmpeg: dict[str, FFMpeg] = {}
57 self._lock = asyncio.Lock()
58 self.start_ntp: int = 0
59 self.start_time: float = 0.0
60 self.wait_start: float = 0.0
61 self.seconds_streamed: float = 0
62 self._first_chunk_received = asyncio.Event()
63
64 async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
65 """Initialize stream session for all players."""
66 cur_time = time.time()
67 has_airplay2_client = any(
68 p.protocol == StreamingProtocol.AIRPLAY2 for p in self.sync_clients
69 )
70 wait_start = AIRPLAY2_CONNECT_TIME_MS if has_airplay2_client else RAOP_CONNECT_TIME_MS
71 wait_start_seconds = wait_start / 1000
72 self.wait_start = wait_start_seconds
73 self.start_time = cur_time + wait_start_seconds
74 self.start_ntp = unix_time_to_ntp(self.start_time)
75 await asyncio.gather(*[self._start_client(p, self.start_ntp) for p in self.sync_clients])
76 self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
77 try:
78 await asyncio.gather(
79 *[p.stream.wait_for_connection() for p in self.sync_clients if p.stream]
80 )
81 except Exception:
82 # playback failed to start, cleanup
83 await self.stop()
84 raise PlayerCommandFailed("Playback failed to start")
85
86 async def stop(self, force: bool = False) -> None:
87 """Stop playback and cleanup."""
88 if self._audio_source_task and not self._audio_source_task.done():
89 self._audio_source_task.cancel()
90 with suppress(asyncio.CancelledError):
91 await self._audio_source_task
92 if force:
93 await asyncio.gather(
94 *[self.stop_client(x, force=True) for x in self.sync_clients],
95 )
96 self.sync_clients = []
97 else:
98 await asyncio.gather(
99 *[self.remove_client(x) for x in self.sync_clients],
100 )
101
102 async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
103 """Remove a sync client from the session."""
104 async with self._lock:
105 if airplay_player not in self.sync_clients:
106 return
107 self.sync_clients.remove(airplay_player)
108 await self.stop_client(airplay_player)
109 # If this was the last client, stop the session
110 if not self.sync_clients:
111 await self.stop()
112 return
113
114 async def stop_client(self, airplay_player: AirPlayPlayer, force: bool = False) -> None:
115 """
116 Stop a client's stream and ffmpeg.
117
118 :param airplay_player: The player to stop.
119 :param force: If True, kill CLI process immediately.
120 """
121 ffmpeg = self._player_ffmpeg.pop(airplay_player.player_id, None)
122 if force:
123 if ffmpeg and not ffmpeg.closed:
124 await ffmpeg.kill()
125 if airplay_player.stream and airplay_player.stream.session == self:
126 await airplay_player.stream.stop(force=True)
127 else:
128 if ffmpeg and not ffmpeg.closed:
129 await ffmpeg.close()
130 if airplay_player.stream and airplay_player.stream.session == self:
131 await airplay_player.stream.stop()
132
133 async def add_client(self, airplay_player: AirPlayPlayer) -> None:
134 """Add a sync client to the session as a late joiner.
135
136 The late joiner will:
137 1. Start playing at a compensated NTP timestamp (start_ntp + offset)
138 2. Receive silence calculated dynamically based on how much audio has been sent
139 3. Then receive real audio chunks in sync with other players
140 """
141 sync_leader = self.sync_clients[0]
142 if not sync_leader.stream or not sync_leader.stream.running:
143 return
144
145 allow_late_join = self.prov.config.get_value(
146 CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
147 )
148 if not allow_late_join:
149 await self.stop()
150 if sync_leader.state.current_media:
151 self.mass.call_later(
152 0.5,
153 self.mass.players.cmd_resume(sync_leader.player_id),
154 task_id=f"resync_session_{sync_leader.player_id}",
155 )
156 return
157
158 async with self._lock:
159 skip_seconds = self.seconds_streamed
160 start_at = self.start_time + skip_seconds
161 start_ntp = unix_time_to_ntp(start_at)
162 if airplay_player not in self.sync_clients:
163 self.sync_clients.append(airplay_player)
164
165 await self._start_client(airplay_player, start_ntp)
166 if airplay_player.stream:
167 await airplay_player.stream.wait_for_connection()
168
169 async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
170 """Stream audio to all players."""
171 pcm_sample_size = self.pcm_format.pcm_sample_size
172 watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
173 try:
174 async for chunk in audio_source:
175 if not self._first_chunk_received.is_set():
176 watchdog_task.cancel()
177 with suppress(asyncio.CancelledError):
178 await watchdog_task
179 self._first_chunk_received.set()
180
181 if not self.sync_clients:
182 break
183
184 await self._write_chunk_to_all_players(chunk)
185 self.seconds_streamed += len(chunk) / pcm_sample_size
186 finally:
187 if not watchdog_task.done():
188 watchdog_task.cancel()
189 with suppress(asyncio.CancelledError):
190 await watchdog_task
191 async with self._lock:
192 await asyncio.gather(
193 *[
194 self._write_eof_to_player(x)
195 for x in self.sync_clients
196 if x.stream and x.stream.running
197 ],
198 return_exceptions=True,
199 )
200
201 async def _silence_watchdog(self, pcm_sample_size: int) -> None:
202 """Insert silence if audio source is slow to deliver first chunk."""
203 grace_period = 0.2
204 max_silence_padding = 5.0
205 silence_inserted = 0.0
206
207 await asyncio.sleep(grace_period)
208 while not self._first_chunk_received.is_set() and silence_inserted < max_silence_padding:
209 silence_duration = 0.1
210 silence_bytes = int(pcm_sample_size * silence_duration)
211 silence_chunk = bytes(silence_bytes)
212 await self._write_chunk_to_all_players(silence_chunk)
213 self.seconds_streamed += silence_duration
214 silence_inserted += silence_duration
215 await asyncio.sleep(0.05)
216
217 if silence_inserted > 0:
218 self.prov.logger.warning(
219 "Inserted %.1fs silence padding while waiting for audio source",
220 silence_inserted,
221 )
222
223 async def _write_chunk_to_all_players(self, chunk: bytes) -> None:
224 """Write a chunk to all connected players."""
225 async with self._lock:
226 sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
227 if not sync_clients:
228 return
229
230 # Write chunk to all players
231 write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
232 results = await asyncio.gather(*write_tasks, return_exceptions=True)
233
234 # Check for write errors or timeouts
235 players_to_remove: list[AirPlayPlayer] = []
236 for i, result in enumerate(results):
237 if i >= len(sync_clients):
238 continue
239 player = sync_clients[i]
240
241 if isinstance(result, TimeoutError):
242 self.prov.logger.warning(
243 "Removing player %s from session: stopped reading data (write timeout)",
244 player.player_id,
245 )
246 players_to_remove.append(player)
247 elif isinstance(result, Exception):
248 self.prov.logger.warning(
249 "Removing player %s from session due to write error: %s",
250 player.player_id,
251 result,
252 )
253 players_to_remove.append(player)
254
255 for player in players_to_remove:
256 self.mass.create_task(self.remove_client(player))
257
258 async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
259 """Write audio chunk to a player's ffmpeg process."""
260 player_id = airplay_player.player_id
261 if ffmpeg := self._player_ffmpeg.get(player_id):
262 if ffmpeg.closed:
263 return
264 await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
265
266 async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
267 """Write EOF to a specific player."""
268 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
269 await ffmpeg.write_eof()
270 await ffmpeg.wait_with_timeout(30)
271 if airplay_player.stream and airplay_player.stream._cli_proc:
272 await airplay_player.stream._cli_proc.write_eof()
273
274 async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
275 """Start CLI process and ffmpeg for a single client."""
276 if airplay_player.stream and airplay_player.stream.running:
277 await airplay_player.stream.stop()
278 if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
279 airplay_player.stream = AirPlay2Stream(airplay_player)
280 else:
281 airplay_player.stream = RaopStream(airplay_player)
282 airplay_player.stream.session = self
283 sync_adjust = airplay_player.config.get_value(CONF_SYNC_ADJUST, 0)
284 assert isinstance(sync_adjust, int)
285 if sync_adjust != 0:
286 start_ntp = unix_time_to_ntp(ntp_to_unix_time(start_ntp) + (sync_adjust / 1000))
287 await airplay_player.stream.start(start_ntp)
288 # Start ffmpeg to feed audio to CLI stdin
289 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
290 await ffmpeg.close()
291 filter_params = get_player_filter_params(
292 self.mass,
293 airplay_player.player_id,
294 self.pcm_format,
295 airplay_player.stream.pcm_format,
296 )
297 cli_proc = airplay_player.stream._cli_proc
298 assert cli_proc
299 assert cli_proc.proc
300 assert cli_proc.proc.stdin
301 stdin_transport = cli_proc.proc.stdin.transport
302 audio_output: str | int = stdin_transport.get_extra_info("pipe").fileno()
303 ffmpeg = FFMpeg(
304 audio_input="-",
305 input_format=self.pcm_format,
306 output_format=airplay_player.stream.pcm_format,
307 filter_params=filter_params,
308 audio_output=audio_output,
309 )
310 await ffmpeg.start()
311 self._player_ffmpeg[airplay_player.player_id] = ffmpeg
312