/
/
/
1"""Unified AirPlay/RAOP stream session logic for AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections import deque
8from collections.abc import AsyncGenerator
9from contextlib import suppress
10from typing import TYPE_CHECKING
11
12from music_assistant_models.enums import PlaybackState
13from music_assistant_models.errors import PlayerCommandFailed
14
15from music_assistant.constants import CONF_SYNC_ADJUST
16from music_assistant.helpers.audio import get_player_filter_params
17from music_assistant.helpers.ffmpeg import FFMpeg
18from music_assistant.providers.airplay.helpers import ntp_to_unix_time, unix_time_to_ntp
19
20from .constants import (
21 AIRPLAY2_CONNECT_TIME_MS,
22 CONF_ENABLE_LATE_JOIN,
23 ENABLE_LATE_JOIN_DEFAULT,
24 RAOP_CONNECT_TIME_MS,
25 StreamingProtocol,
26)
27from .protocols.airplay2 import AirPlay2Stream
28from .protocols.raop import RaopStream
29
30if TYPE_CHECKING:
31 from music_assistant_models.media_items import AudioFormat
32
33 from .player import AirPlayPlayer
34 from .provider import AirPlayProvider
35
36
37class AirPlayStreamSession:
38 """Stream session (RAOP or AirPlay2) to one or more players."""
39
40 def __init__(
41 self,
42 airplay_provider: AirPlayProvider,
43 sync_clients: list[AirPlayPlayer],
44 pcm_format: AudioFormat,
45 ) -> None:
46 """Initialize AirPlayStreamSession.
47
48 :param airplay_provider: The AirPlay provider instance.
49 :param sync_clients: List of AirPlay players to stream to.
50 :param pcm_format: PCM format of the input stream.
51 """
52 assert sync_clients
53 self.prov = airplay_provider
54 self.mass = airplay_provider.mass
55 self.pcm_format = pcm_format
56 self.sync_clients = sync_clients
57 self._audio_source_task: asyncio.Task[None] | None = None
58 self._player_ffmpeg: dict[str, FFMpeg] = {}
59 self._lock = asyncio.Lock()
60 self.start_ntp: int = 0
61 self.start_time: float = 0.0
62 self.wait_start: float = 0.0
63 self.seconds_streamed: float = 0
64 self._first_chunk_received = asyncio.Event()
65 # Ring buffer for late joiners: stores (chunk_data, seconds_offset) tuples
66 # Chunks from streams controller are ~1 second each (pcm_sample_size bytes)
67 # Keep 8 seconds of buffer for late joiners (maxlen=10 for safety with variable sizes)
68 self._chunk_buffer: deque[tuple[bytes, float]] = deque(maxlen=10)
69
70 async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
71 """Initialize stream session for all players."""
72 cur_time = time.time()
73 has_airplay2_client = any(
74 p.protocol == StreamingProtocol.AIRPLAY2 for p in self.sync_clients
75 )
76 wait_start = AIRPLAY2_CONNECT_TIME_MS if has_airplay2_client else RAOP_CONNECT_TIME_MS
77 wait_start_seconds = wait_start / 1000
78 self.wait_start = wait_start_seconds
79 self.start_time = cur_time + wait_start_seconds
80 self.start_ntp = unix_time_to_ntp(self.start_time)
81 await asyncio.gather(*[self._start_client(p, self.start_ntp) for p in self.sync_clients])
82 self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
83 try:
84 await asyncio.gather(
85 *[p.stream.wait_for_connection() for p in self.sync_clients if p.stream]
86 )
87 except Exception:
88 # playback failed to start, cleanup
89 await self.stop()
90 raise PlayerCommandFailed("Playback failed to start")
91
92 async def stop(self) -> None:
93 """Stop playback and cleanup."""
94 if self._audio_source_task and not self._audio_source_task.done():
95 self._audio_source_task.cancel()
96 with suppress(asyncio.CancelledError):
97 await self._audio_source_task
98 await asyncio.gather(
99 *[self.remove_client(x) for x in self.sync_clients],
100 )
101
102 async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
103 """Remove a sync client from the session."""
104 async with self._lock:
105 if airplay_player not in self.sync_clients:
106 return
107 self.sync_clients.remove(airplay_player)
108 await self.stop_client(airplay_player)
109 airplay_player.set_state_from_stream(PlaybackState.IDLE)
110 # If this was the last client, stop the session
111 if not self.sync_clients:
112 await self.stop()
113 return
114
115 async def stop_client(self, airplay_player: AirPlayPlayer) -> None:
116 """
117 Stop a client's stream and ffmpeg.
118
119 :param airplay_player: The player to stop.
120 :param force: If True, kill CLI process immediately.
121 """
122 ffmpeg = self._player_ffmpeg.pop(airplay_player.player_id, None)
123 # note that we use kill instead of graceful close here,
124 # because otherwise it can take a very long time for the process to exit.
125 if ffmpeg and not ffmpeg.closed:
126 await ffmpeg.kill()
127 if airplay_player.stream and airplay_player.stream.session == self:
128 await airplay_player.stream.stop(force=True)
129
130 async def add_client(self, airplay_player: AirPlayPlayer) -> None:
131 """Add a sync client to the session as a late joiner.
132
133 The late joiner will:
134 1. Start with NTP timestamp accounting for buffered chunks we'll send
135 2. Receive buffered chunks immediately to prime the ffmpeg/CLI pipeline
136 3. Join the real-time stream in perfect sync with other players
137 """
138 sync_leader = self.sync_clients[0]
139 if not sync_leader.stream or not sync_leader.stream.running:
140 return
141
142 allow_late_join = self.prov.config.get_value(
143 CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
144 )
145 if not allow_late_join:
146 await self.stop()
147 if sync_leader.state.current_media:
148 self.mass.call_later(
149 0.5,
150 self.mass.players.cmd_resume(sync_leader.player_id),
151 task_id=f"resync_session_{sync_leader.player_id}",
152 )
153 return
154
155 async with self._lock:
156 # Get buffered chunks to send, but limit to ~5 seconds to avoid
157 # blocking real-time streaming to other players (causes packet loss)
158 max_late_join_buffer_seconds = 5.0
159 all_buffered = list(self._chunk_buffer)
160
161 # Filter to only include chunks within the time limit
162 if all_buffered:
163 min_position = self.seconds_streamed - max_late_join_buffer_seconds
164 buffered_chunks = [
165 (chunk, pos) for chunk, pos in all_buffered if pos >= min_position
166 ]
167 else:
168 buffered_chunks = []
169
170 if buffered_chunks:
171 # Calculate how much buffer we're sending
172 first_chunk_position = buffered_chunks[0][1]
173 buffer_duration = self.seconds_streamed - first_chunk_position
174
175 # Set start NTP to account for the buffer we're about to send
176 # Device will start at (current_position - buffer_duration) and catch up
177 start_at = self.start_time + (self.seconds_streamed - buffer_duration)
178
179 self.prov.logger.debug(
180 "Late joiner %s: sending %.2fs of buffered audio, start at %.2fs",
181 airplay_player.player_id,
182 buffer_duration,
183 self.seconds_streamed - buffer_duration,
184 )
185 else:
186 # No buffer available, start from current position
187 start_at = self.start_time + self.seconds_streamed
188 self.prov.logger.debug(
189 "Late joiner %s: no buffered chunks available, starting at %.2fs",
190 airplay_player.player_id,
191 self.seconds_streamed,
192 )
193
194 start_ntp = unix_time_to_ntp(start_at)
195
196 if airplay_player not in self.sync_clients:
197 self.sync_clients.append(airplay_player)
198
199 await self._start_client(airplay_player, start_ntp)
200 if airplay_player.stream:
201 await airplay_player.stream.wait_for_connection()
202
203 # Feed buffered chunks INSIDE the lock to prevent race conditions
204 # This ensures we don't send a new real-time chunk while feeding the buffer
205 if buffered_chunks:
206 await self._feed_buffered_chunks(airplay_player, buffered_chunks)
207
208 async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
209 """Stream audio to all players."""
210 pcm_sample_size = self.pcm_format.pcm_sample_size
211 watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
212 try:
213 async for chunk in audio_source:
214 if not self._first_chunk_received.is_set():
215 watchdog_task.cancel()
216 with suppress(asyncio.CancelledError):
217 await watchdog_task
218 self._first_chunk_received.set()
219
220 if not self.sync_clients:
221 break
222
223 has_running_clients = await self._write_chunk_to_all_players(chunk)
224 if not has_running_clients:
225 self.prov.logger.debug("No running clients remaining, stopping audio streamer")
226 break
227 self.seconds_streamed += len(chunk) / pcm_sample_size
228 finally:
229 if not watchdog_task.done():
230 watchdog_task.cancel()
231 with suppress(asyncio.CancelledError):
232 await watchdog_task
233 async with self._lock:
234 await asyncio.gather(
235 *[
236 self._write_eof_to_player(x)
237 for x in self.sync_clients
238 if x.stream and x.stream.running
239 ],
240 return_exceptions=True,
241 )
242
243 async def _silence_watchdog(self, pcm_sample_size: int) -> None:
244 """Insert silence if audio source is slow to deliver first chunk."""
245 grace_period = 0.2
246 max_silence_padding = 5.0
247 silence_inserted = 0.0
248
249 await asyncio.sleep(grace_period)
250 while not self._first_chunk_received.is_set() and silence_inserted < max_silence_padding:
251 silence_duration = 0.1
252 silence_bytes = int(pcm_sample_size * silence_duration)
253 silence_chunk = bytes(silence_bytes)
254 has_running_clients = await self._write_chunk_to_all_players(silence_chunk)
255 if not has_running_clients:
256 break
257 self.seconds_streamed += silence_duration
258 silence_inserted += silence_duration
259 await asyncio.sleep(0.05)
260
261 if silence_inserted > 0:
262 self.prov.logger.warning(
263 "Inserted %.1fs silence padding while waiting for audio source",
264 silence_inserted,
265 )
266
267 async def _write_chunk_to_all_players(self, chunk: bytes) -> bool:
268 """Write a chunk to all connected players.
269
270 :return: True if there are still running clients, False otherwise.
271 """
272 async with self._lock:
273 sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
274 if not sync_clients:
275 return False
276
277 # Add chunk to ring buffer for late joiners (before seconds_streamed is updated)
278 chunk_position = self.seconds_streamed
279 self._chunk_buffer.append((chunk, chunk_position))
280
281 # Write chunk to all players
282 write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
283 results = await asyncio.gather(*write_tasks, return_exceptions=True)
284
285 # Check for write errors or timeouts
286 players_to_remove: list[AirPlayPlayer] = []
287 for i, result in enumerate(results):
288 if i >= len(sync_clients):
289 continue
290 player = sync_clients[i]
291
292 if isinstance(result, TimeoutError):
293 self.prov.logger.warning(
294 "Removing player %s from session: stopped reading data (write timeout)",
295 player.player_id,
296 )
297 players_to_remove.append(player)
298 elif isinstance(result, Exception):
299 self.prov.logger.warning(
300 "Removing player %s from session due to write error: %s",
301 player.player_id,
302 result,
303 )
304 players_to_remove.append(player)
305
306 for player in players_to_remove:
307 self.mass.create_task(self.remove_client(player))
308
309 # Return False if all clients were removed (or scheduled for removal)
310 remaining_clients = len(sync_clients) - len(players_to_remove)
311 return remaining_clients > 0
312
313 async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
314 """Write audio chunk to a player's ffmpeg process."""
315 player_id = airplay_player.player_id
316 if ffmpeg := self._player_ffmpeg.get(player_id):
317 if ffmpeg.closed:
318 return
319 await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
320
321 async def _feed_buffered_chunks(
322 self,
323 airplay_player: AirPlayPlayer,
324 buffered_chunks: list[tuple[bytes, float]],
325 ) -> None:
326 """Feed buffered chunks to a late joiner to prime the ffmpeg pipeline.
327
328 :param airplay_player: The late joiner player.
329 :param buffered_chunks: List of (chunk_data, position) tuples to send.
330 """
331 try:
332 for chunk, _position in buffered_chunks:
333 await self._write_chunk_to_player(airplay_player, chunk)
334 except Exception as err:
335 self.prov.logger.warning(
336 "Failed to feed buffered chunks to late joiner %s: %s",
337 airplay_player.player_id,
338 err,
339 )
340 # Remove the client if feeding buffered chunks fails
341 self.mass.create_task(self.remove_client(airplay_player))
342
343 async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
344 """Write EOF to a specific player."""
345 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
346 await ffmpeg.write_eof()
347 await ffmpeg.wait_with_timeout(30)
348 if airplay_player.stream and airplay_player.stream._cli_proc:
349 await airplay_player.stream._cli_proc.write_eof()
350
351 async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
352 """Start CLI process and ffmpeg for a single client."""
353 if airplay_player.stream and airplay_player.stream.running:
354 await airplay_player.stream.stop()
355 if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
356 airplay_player.stream = AirPlay2Stream(airplay_player)
357 else:
358 airplay_player.stream = RaopStream(airplay_player)
359 airplay_player.stream.session = self
360 sync_adjust = airplay_player.config.get_value(CONF_SYNC_ADJUST, 0)
361 assert isinstance(sync_adjust, int)
362 if sync_adjust != 0:
363 start_ntp = unix_time_to_ntp(ntp_to_unix_time(start_ntp) + (sync_adjust / 1000))
364 await airplay_player.stream.start(start_ntp)
365 # Start ffmpeg to feed audio to CLI stdin
366 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
367 await ffmpeg.close()
368 filter_params = get_player_filter_params(
369 self.mass,
370 airplay_player.player_id,
371 self.pcm_format,
372 airplay_player.stream.pcm_format,
373 )
374 cli_proc = airplay_player.stream._cli_proc
375 assert cli_proc
376 assert cli_proc.proc
377 assert cli_proc.proc.stdin
378 stdin_transport = cli_proc.proc.stdin.transport
379 audio_output: str | int = stdin_transport.get_extra_info("pipe").fileno()
380 ffmpeg = FFMpeg(
381 audio_input="-",
382 input_format=self.pcm_format,
383 output_format=airplay_player.stream.pcm_format,
384 filter_params=filter_params,
385 audio_output=audio_output,
386 )
387 await ffmpeg.start()
388 self._player_ffmpeg[airplay_player.player_id] = ffmpeg
389