/
/
/
1"""Unified AirPlay/RAOP stream session logic for AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections import deque
8from collections.abc import AsyncGenerator
9from contextlib import suppress
10from typing import TYPE_CHECKING
11
12from music_assistant_models.enums import PlaybackState
13from music_assistant_models.errors import PlayerCommandFailed
14
15from music_assistant.constants import CONF_SYNC_ADJUST
16from music_assistant.helpers.audio import get_player_filter_params
17from music_assistant.helpers.ffmpeg import FFMpeg
18from music_assistant.providers.airplay.helpers import ntp_to_unix_time, unix_time_to_ntp
19
20from .constants import (
21 AIRPLAY2_CONNECT_TIME_MS,
22 CONF_ENABLE_LATE_JOIN,
23 ENABLE_LATE_JOIN_DEFAULT,
24 RAOP_CONNECT_TIME_MS,
25 StreamingProtocol,
26)
27from .protocols.airplay2 import AirPlay2Stream
28from .protocols.raop import RaopStream
29
30if TYPE_CHECKING:
31 from music_assistant_models.media_items import AudioFormat
32
33 from .player import AirPlayPlayer
34 from .provider import AirPlayProvider
35
36
37class AirPlayStreamSession:
38 """Stream session (RAOP or AirPlay2) to one or more players."""
39
40 def __init__(
41 self,
42 airplay_provider: AirPlayProvider,
43 sync_clients: list[AirPlayPlayer],
44 pcm_format: AudioFormat,
45 ) -> None:
46 """Initialize AirPlayStreamSession.
47
48 :param airplay_provider: The AirPlay provider instance.
49 :param sync_clients: List of AirPlay players to stream to.
50 :param pcm_format: PCM format of the input stream.
51 """
52 assert sync_clients
53 self.prov = airplay_provider
54 self.mass = airplay_provider.mass
55 self.pcm_format = pcm_format
56 self.sync_clients = sync_clients
57 self._audio_source_task: asyncio.Task[None] | None = None
58 self._player_ffmpeg: dict[str, FFMpeg] = {}
59 self._lock = asyncio.Lock()
60 self.start_ntp: int = 0
61 self.start_time: float = 0.0
62 self.wait_start: float = 0.0
63 self.seconds_streamed: float = 0
64 self._first_chunk_received = asyncio.Event()
65 # Ring buffer for late joiners: stores (chunk_data, seconds_offset) tuples
66 # Chunks from streams controller are ~1 second each (pcm_sample_size bytes)
67 # Keep 8 seconds of buffer for late joiners (maxlen=10 for safety with variable sizes)
68 self._chunk_buffer: deque[tuple[bytes, float]] = deque(maxlen=10)
69
70 async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
71 """Initialize stream session for all players."""
72 cur_time = time.time()
73 has_airplay2_client = any(
74 p.protocol == StreamingProtocol.AIRPLAY2 for p in self.sync_clients
75 )
76 wait_start = AIRPLAY2_CONNECT_TIME_MS if has_airplay2_client else RAOP_CONNECT_TIME_MS
77 wait_start_seconds = wait_start / 1000
78 self.wait_start = wait_start_seconds
79 self.start_time = cur_time + wait_start_seconds
80 self.start_ntp = unix_time_to_ntp(self.start_time)
81 await asyncio.gather(*[self._start_client(p, self.start_ntp) for p in self.sync_clients])
82 self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
83 try:
84 await asyncio.gather(
85 *[p.stream.wait_for_connection() for p in self.sync_clients if p.stream]
86 )
87 except Exception:
88 # playback failed to start, cleanup
89 await self.stop()
90 raise PlayerCommandFailed("Playback failed to start")
91
92 async def stop(self) -> None:
93 """Stop playback and cleanup."""
94 if self._audio_source_task and not self._audio_source_task.done():
95 self._audio_source_task.cancel()
96 with suppress(asyncio.CancelledError):
97 await self._audio_source_task
98 await asyncio.gather(
99 *[self.remove_client(x) for x in self.sync_clients],
100 )
101
102 async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
103 """Remove a sync client from the session."""
104 async with self._lock:
105 if airplay_player not in self.sync_clients:
106 return
107 self.sync_clients.remove(airplay_player)
108 await self.stop_client(airplay_player)
109 airplay_player.set_state_from_stream(PlaybackState.IDLE)
110 # If this was the last client, stop the session
111 if not self.sync_clients:
112 await self.stop()
113 return
114
115 async def stop_client(self, airplay_player: AirPlayPlayer) -> None:
116 """
117 Stop a client's stream and ffmpeg.
118
119 :param airplay_player: The player to stop.
120 :param force: If True, kill CLI process immediately.
121 """
122 ffmpeg = self._player_ffmpeg.pop(airplay_player.player_id, None)
123 # note that we use kill instead of graceful close here,
124 # because otherwise it can take a very long time for the process to exit.
125 if ffmpeg and not ffmpeg.closed:
126 await ffmpeg.kill()
127 if airplay_player.stream and airplay_player.stream.session == self:
128 await airplay_player.stream.stop(force=True)
129
130 async def add_client(self, airplay_player: AirPlayPlayer) -> None:
131 """Add a sync client to the session as a late joiner.
132
133 The late joiner will:
134 1. Start with NTP timestamp accounting for buffered chunks we'll send
135 2. Receive buffered chunks immediately to prime the ffmpeg/CLI pipeline
136 3. Join the real-time stream in perfect sync with other players
137 """
138 sync_leader = self.sync_clients[0]
139 if not sync_leader.stream or not sync_leader.stream.running:
140 return
141
142 allow_late_join = self.prov.config.get_value(
143 CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
144 )
145 if not allow_late_join:
146 await self.stop()
147 if sync_leader.state.current_media:
148 self.mass.call_later(
149 0.5,
150 self.mass.players.cmd_resume(sync_leader.player_id),
151 task_id=f"resync_session_{sync_leader.player_id}",
152 )
153 return
154
155 async with self._lock:
156 # Get all buffered chunks to send
157 buffered_chunks = list(self._chunk_buffer)
158
159 if buffered_chunks:
160 # Calculate how much buffer we're sending
161 first_chunk_position = buffered_chunks[0][1]
162 buffer_duration = self.seconds_streamed - first_chunk_position
163
164 # Set start NTP to account for the buffer we're about to send
165 # Device will start at (current_position - buffer_duration) and catch up
166 start_at = self.start_time + (self.seconds_streamed - buffer_duration)
167
168 self.prov.logger.debug(
169 "Late joiner %s: sending %.2fs of buffered audio, start at %.2fs",
170 airplay_player.player_id,
171 buffer_duration,
172 self.seconds_streamed - buffer_duration,
173 )
174 else:
175 # No buffer available, start from current position
176 start_at = self.start_time + self.seconds_streamed
177 self.prov.logger.debug(
178 "Late joiner %s: no buffered chunks available, starting at %.2fs",
179 airplay_player.player_id,
180 self.seconds_streamed,
181 )
182
183 start_ntp = unix_time_to_ntp(start_at)
184
185 if airplay_player not in self.sync_clients:
186 self.sync_clients.append(airplay_player)
187
188 await self._start_client(airplay_player, start_ntp)
189 if airplay_player.stream:
190 await airplay_player.stream.wait_for_connection()
191
192 # Feed buffered chunks INSIDE the lock to prevent race conditions
193 # This ensures we don't send a new real-time chunk while feeding the buffer
194 if buffered_chunks:
195 await self._feed_buffered_chunks(airplay_player, buffered_chunks)
196
197 async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
198 """Stream audio to all players."""
199 pcm_sample_size = self.pcm_format.pcm_sample_size
200 watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
201 try:
202 async for chunk in audio_source:
203 if not self._first_chunk_received.is_set():
204 watchdog_task.cancel()
205 with suppress(asyncio.CancelledError):
206 await watchdog_task
207 self._first_chunk_received.set()
208
209 if not self.sync_clients:
210 break
211
212 await self._write_chunk_to_all_players(chunk)
213 self.seconds_streamed += len(chunk) / pcm_sample_size
214 finally:
215 if not watchdog_task.done():
216 watchdog_task.cancel()
217 with suppress(asyncio.CancelledError):
218 await watchdog_task
219 async with self._lock:
220 await asyncio.gather(
221 *[
222 self._write_eof_to_player(x)
223 for x in self.sync_clients
224 if x.stream and x.stream.running
225 ],
226 return_exceptions=True,
227 )
228
229 async def _silence_watchdog(self, pcm_sample_size: int) -> None:
230 """Insert silence if audio source is slow to deliver first chunk."""
231 grace_period = 0.2
232 max_silence_padding = 5.0
233 silence_inserted = 0.0
234
235 await asyncio.sleep(grace_period)
236 while not self._first_chunk_received.is_set() and silence_inserted < max_silence_padding:
237 silence_duration = 0.1
238 silence_bytes = int(pcm_sample_size * silence_duration)
239 silence_chunk = bytes(silence_bytes)
240 await self._write_chunk_to_all_players(silence_chunk)
241 self.seconds_streamed += silence_duration
242 silence_inserted += silence_duration
243 await asyncio.sleep(0.05)
244
245 if silence_inserted > 0:
246 self.prov.logger.warning(
247 "Inserted %.1fs silence padding while waiting for audio source",
248 silence_inserted,
249 )
250
251 async def _write_chunk_to_all_players(self, chunk: bytes) -> None:
252 """Write a chunk to all connected players."""
253 async with self._lock:
254 sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
255 if not sync_clients:
256 return
257
258 # Add chunk to ring buffer for late joiners (before seconds_streamed is updated)
259 chunk_position = self.seconds_streamed
260 self._chunk_buffer.append((chunk, chunk_position))
261
262 # Write chunk to all players
263 write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
264 results = await asyncio.gather(*write_tasks, return_exceptions=True)
265
266 # Check for write errors or timeouts
267 players_to_remove: list[AirPlayPlayer] = []
268 for i, result in enumerate(results):
269 if i >= len(sync_clients):
270 continue
271 player = sync_clients[i]
272
273 if isinstance(result, TimeoutError):
274 self.prov.logger.warning(
275 "Removing player %s from session: stopped reading data (write timeout)",
276 player.player_id,
277 )
278 players_to_remove.append(player)
279 elif isinstance(result, Exception):
280 self.prov.logger.warning(
281 "Removing player %s from session due to write error: %s",
282 player.player_id,
283 result,
284 )
285 players_to_remove.append(player)
286
287 for player in players_to_remove:
288 self.mass.create_task(self.remove_client(player))
289
290 async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
291 """Write audio chunk to a player's ffmpeg process."""
292 player_id = airplay_player.player_id
293 if ffmpeg := self._player_ffmpeg.get(player_id):
294 if ffmpeg.closed:
295 return
296 await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
297
298 async def _feed_buffered_chunks(
299 self,
300 airplay_player: AirPlayPlayer,
301 buffered_chunks: list[tuple[bytes, float]],
302 ) -> None:
303 """Feed buffered chunks to a late joiner to prime the ffmpeg pipeline.
304
305 :param airplay_player: The late joiner player.
306 :param buffered_chunks: List of (chunk_data, position) tuples to send.
307 """
308 try:
309 for chunk, _position in buffered_chunks:
310 await self._write_chunk_to_player(airplay_player, chunk)
311 except Exception as err:
312 self.prov.logger.warning(
313 "Failed to feed buffered chunks to late joiner %s: %s",
314 airplay_player.player_id,
315 err,
316 )
317 # Remove the client if feeding buffered chunks fails
318 self.mass.create_task(self.remove_client(airplay_player))
319
320 async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
321 """Write EOF to a specific player."""
322 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
323 await ffmpeg.write_eof()
324 await ffmpeg.wait_with_timeout(30)
325 if airplay_player.stream and airplay_player.stream._cli_proc:
326 await airplay_player.stream._cli_proc.write_eof()
327
328 async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
329 """Start CLI process and ffmpeg for a single client."""
330 if airplay_player.stream and airplay_player.stream.running:
331 await airplay_player.stream.stop()
332 if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
333 airplay_player.stream = AirPlay2Stream(airplay_player)
334 else:
335 airplay_player.stream = RaopStream(airplay_player)
336 airplay_player.stream.session = self
337 sync_adjust = airplay_player.config.get_value(CONF_SYNC_ADJUST, 0)
338 assert isinstance(sync_adjust, int)
339 if sync_adjust != 0:
340 start_ntp = unix_time_to_ntp(ntp_to_unix_time(start_ntp) + (sync_adjust / 1000))
341 await airplay_player.stream.start(start_ntp)
342 # Start ffmpeg to feed audio to CLI stdin
343 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
344 await ffmpeg.close()
345 filter_params = get_player_filter_params(
346 self.mass,
347 airplay_player.player_id,
348 self.pcm_format,
349 airplay_player.stream.pcm_format,
350 )
351 cli_proc = airplay_player.stream._cli_proc
352 assert cli_proc
353 assert cli_proc.proc
354 assert cli_proc.proc.stdin
355 stdin_transport = cli_proc.proc.stdin.transport
356 audio_output: str | int = stdin_transport.get_extra_info("pipe").fileno()
357 ffmpeg = FFMpeg(
358 audio_input="-",
359 input_format=self.pcm_format,
360 output_format=airplay_player.stream.pcm_format,
361 filter_params=filter_params,
362 audio_output=audio_output,
363 )
364 await ffmpeg.start()
365 self._player_ffmpeg[airplay_player.player_id] = ffmpeg
366