/
/
/
1"""Unified AirPlay/RAOP stream session logic for AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections import deque
8from collections.abc import AsyncGenerator
9from contextlib import suppress
10from typing import TYPE_CHECKING
11
12from music_assistant_models.errors import PlayerCommandFailed
13
14from music_assistant.constants import CONF_SYNC_ADJUST
15from music_assistant.helpers.audio import get_player_filter_params
16from music_assistant.helpers.ffmpeg import FFMpeg
17from music_assistant.providers.airplay.helpers import ntp_to_unix_time, unix_time_to_ntp
18
19from .constants import (
20 AIRPLAY2_CONNECT_TIME_MS,
21 CONF_ENABLE_LATE_JOIN,
22 ENABLE_LATE_JOIN_DEFAULT,
23 RAOP_CONNECT_TIME_MS,
24 StreamingProtocol,
25)
26from .protocols.airplay2 import AirPlay2Stream
27from .protocols.raop import RaopStream
28
29if TYPE_CHECKING:
30 from music_assistant_models.media_items import AudioFormat
31
32 from .player import AirPlayPlayer
33 from .provider import AirPlayProvider
34
35
36class AirPlayStreamSession:
37 """Stream session (RAOP or AirPlay2) to one or more players."""
38
39 def __init__(
40 self,
41 airplay_provider: AirPlayProvider,
42 sync_clients: list[AirPlayPlayer],
43 pcm_format: AudioFormat,
44 ) -> None:
45 """Initialize AirPlayStreamSession.
46
47 :param airplay_provider: The AirPlay provider instance.
48 :param sync_clients: List of AirPlay players to stream to.
49 :param pcm_format: PCM format of the input stream.
50 """
51 assert sync_clients
52 self.prov = airplay_provider
53 self.mass = airplay_provider.mass
54 self.pcm_format = pcm_format
55 self.sync_clients = sync_clients
56 self._audio_source_task: asyncio.Task[None] | None = None
57 self._player_ffmpeg: dict[str, FFMpeg] = {}
58 self._lock = asyncio.Lock()
59 self.start_ntp: int = 0
60 self.start_time: float = 0.0
61 self.wait_start: float = 0.0
62 self.seconds_streamed: float = 0
63 self._first_chunk_received = asyncio.Event()
64 # Ring buffer for late joiners: stores (chunk_data, seconds_offset) tuples
65 # Chunks from streams controller are ~1 second each (pcm_sample_size bytes)
66 # Keep 8 seconds of buffer for late joiners (maxlen=10 for safety with variable sizes)
67 self._chunk_buffer: deque[tuple[bytes, float]] = deque(maxlen=10)
68
69 async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
70 """Initialize stream session for all players."""
71 cur_time = time.time()
72 has_airplay2_client = any(
73 p.protocol == StreamingProtocol.AIRPLAY2 for p in self.sync_clients
74 )
75 wait_start = AIRPLAY2_CONNECT_TIME_MS if has_airplay2_client else RAOP_CONNECT_TIME_MS
76 wait_start_seconds = wait_start / 1000
77 self.wait_start = wait_start_seconds
78 self.start_time = cur_time + wait_start_seconds
79 self.start_ntp = unix_time_to_ntp(self.start_time)
80 await asyncio.gather(*[self._start_client(p, self.start_ntp) for p in self.sync_clients])
81 self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
82 try:
83 await asyncio.gather(
84 *[p.stream.wait_for_connection() for p in self.sync_clients if p.stream]
85 )
86 except Exception:
87 # playback failed to start, cleanup
88 await self.stop()
89 raise PlayerCommandFailed("Playback failed to start")
90
91 async def stop(self) -> None:
92 """Stop playback and cleanup."""
93 if self._audio_source_task and not self._audio_source_task.done():
94 self._audio_source_task.cancel()
95 with suppress(asyncio.CancelledError):
96 await self._audio_source_task
97 await asyncio.gather(
98 *[self.remove_client(x) for x in self.sync_clients],
99 )
100
101 async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
102 """Remove a sync client from the session."""
103 async with self._lock:
104 if airplay_player not in self.sync_clients:
105 return
106 self.sync_clients.remove(airplay_player)
107 await self.stop_client(airplay_player)
108 # If this was the last client, stop the session
109 if not self.sync_clients:
110 await self.stop()
111 return
112
113 async def stop_client(self, airplay_player: AirPlayPlayer) -> None:
114 """
115 Stop a client's stream and ffmpeg.
116
117 :param airplay_player: The player to stop.
118 :param force: If True, kill CLI process immediately.
119 """
120 ffmpeg = self._player_ffmpeg.pop(airplay_player.player_id, None)
121 # note that we use kill instead of graceful close here,
122 # because otherwise it can take a very long time for the process to exit.
123 if ffmpeg and not ffmpeg.closed:
124 await ffmpeg.kill()
125 if airplay_player.stream and airplay_player.stream.session == self:
126 await airplay_player.stream.stop(force=True)
127
128 async def add_client(self, airplay_player: AirPlayPlayer) -> None:
129 """Add a sync client to the session as a late joiner.
130
131 The late joiner will:
132 1. Start with NTP timestamp accounting for buffered chunks we'll send
133 2. Receive buffered chunks immediately to prime the ffmpeg/CLI pipeline
134 3. Join the real-time stream in perfect sync with other players
135 """
136 sync_leader = self.sync_clients[0]
137 if not sync_leader.stream or not sync_leader.stream.running:
138 return
139
140 allow_late_join = self.prov.config.get_value(
141 CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
142 )
143 if not allow_late_join:
144 await self.stop()
145 if sync_leader.state.current_media:
146 self.mass.call_later(
147 0.5,
148 self.mass.players.cmd_resume(sync_leader.player_id),
149 task_id=f"resync_session_{sync_leader.player_id}",
150 )
151 return
152
153 async with self._lock:
154 # Get all buffered chunks to send
155 buffered_chunks = list(self._chunk_buffer)
156
157 if buffered_chunks:
158 # Calculate how much buffer we're sending
159 first_chunk_position = buffered_chunks[0][1]
160 buffer_duration = self.seconds_streamed - first_chunk_position
161
162 # Set start NTP to account for the buffer we're about to send
163 # Device will start at (current_position - buffer_duration) and catch up
164 start_at = self.start_time + (self.seconds_streamed - buffer_duration)
165
166 self.prov.logger.debug(
167 "Late joiner %s: sending %.2fs of buffered audio, start at %.2fs",
168 airplay_player.player_id,
169 buffer_duration,
170 self.seconds_streamed - buffer_duration,
171 )
172 else:
173 # No buffer available, start from current position
174 start_at = self.start_time + self.seconds_streamed
175 self.prov.logger.debug(
176 "Late joiner %s: no buffered chunks available, starting at %.2fs",
177 airplay_player.player_id,
178 self.seconds_streamed,
179 )
180
181 start_ntp = unix_time_to_ntp(start_at)
182
183 if airplay_player not in self.sync_clients:
184 self.sync_clients.append(airplay_player)
185
186 await self._start_client(airplay_player, start_ntp)
187 if airplay_player.stream:
188 await airplay_player.stream.wait_for_connection()
189
190 # Feed buffered chunks INSIDE the lock to prevent race conditions
191 # This ensures we don't send a new real-time chunk while feeding the buffer
192 if buffered_chunks:
193 await self._feed_buffered_chunks(airplay_player, buffered_chunks)
194
195 async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
196 """Stream audio to all players."""
197 pcm_sample_size = self.pcm_format.pcm_sample_size
198 watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
199 try:
200 async for chunk in audio_source:
201 if not self._first_chunk_received.is_set():
202 watchdog_task.cancel()
203 with suppress(asyncio.CancelledError):
204 await watchdog_task
205 self._first_chunk_received.set()
206
207 if not self.sync_clients:
208 break
209
210 await self._write_chunk_to_all_players(chunk)
211 self.seconds_streamed += len(chunk) / pcm_sample_size
212 finally:
213 if not watchdog_task.done():
214 watchdog_task.cancel()
215 with suppress(asyncio.CancelledError):
216 await watchdog_task
217 async with self._lock:
218 await asyncio.gather(
219 *[
220 self._write_eof_to_player(x)
221 for x in self.sync_clients
222 if x.stream and x.stream.running
223 ],
224 return_exceptions=True,
225 )
226
227 async def _silence_watchdog(self, pcm_sample_size: int) -> None:
228 """Insert silence if audio source is slow to deliver first chunk."""
229 grace_period = 0.2
230 max_silence_padding = 5.0
231 silence_inserted = 0.0
232
233 await asyncio.sleep(grace_period)
234 while not self._first_chunk_received.is_set() and silence_inserted < max_silence_padding:
235 silence_duration = 0.1
236 silence_bytes = int(pcm_sample_size * silence_duration)
237 silence_chunk = bytes(silence_bytes)
238 await self._write_chunk_to_all_players(silence_chunk)
239 self.seconds_streamed += silence_duration
240 silence_inserted += silence_duration
241 await asyncio.sleep(0.05)
242
243 if silence_inserted > 0:
244 self.prov.logger.warning(
245 "Inserted %.1fs silence padding while waiting for audio source",
246 silence_inserted,
247 )
248
249 async def _write_chunk_to_all_players(self, chunk: bytes) -> None:
250 """Write a chunk to all connected players."""
251 async with self._lock:
252 sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
253 if not sync_clients:
254 return
255
256 # Add chunk to ring buffer for late joiners (before seconds_streamed is updated)
257 chunk_position = self.seconds_streamed
258 self._chunk_buffer.append((chunk, chunk_position))
259
260 # Write chunk to all players
261 write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
262 results = await asyncio.gather(*write_tasks, return_exceptions=True)
263
264 # Check for write errors or timeouts
265 players_to_remove: list[AirPlayPlayer] = []
266 for i, result in enumerate(results):
267 if i >= len(sync_clients):
268 continue
269 player = sync_clients[i]
270
271 if isinstance(result, TimeoutError):
272 self.prov.logger.warning(
273 "Removing player %s from session: stopped reading data (write timeout)",
274 player.player_id,
275 )
276 players_to_remove.append(player)
277 elif isinstance(result, Exception):
278 self.prov.logger.warning(
279 "Removing player %s from session due to write error: %s",
280 player.player_id,
281 result,
282 )
283 players_to_remove.append(player)
284
285 for player in players_to_remove:
286 self.mass.create_task(self.remove_client(player))
287
288 async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
289 """Write audio chunk to a player's ffmpeg process."""
290 player_id = airplay_player.player_id
291 if ffmpeg := self._player_ffmpeg.get(player_id):
292 if ffmpeg.closed:
293 return
294 await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
295
296 async def _feed_buffered_chunks(
297 self,
298 airplay_player: AirPlayPlayer,
299 buffered_chunks: list[tuple[bytes, float]],
300 ) -> None:
301 """Feed buffered chunks to a late joiner to prime the ffmpeg pipeline.
302
303 :param airplay_player: The late joiner player.
304 :param buffered_chunks: List of (chunk_data, position) tuples to send.
305 """
306 try:
307 for chunk, _position in buffered_chunks:
308 await self._write_chunk_to_player(airplay_player, chunk)
309 except Exception as err:
310 self.prov.logger.warning(
311 "Failed to feed buffered chunks to late joiner %s: %s",
312 airplay_player.player_id,
313 err,
314 )
315 # Remove the client if feeding buffered chunks fails
316 self.mass.create_task(self.remove_client(airplay_player))
317
318 async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
319 """Write EOF to a specific player."""
320 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
321 await ffmpeg.write_eof()
322 await ffmpeg.wait_with_timeout(30)
323 if airplay_player.stream and airplay_player.stream._cli_proc:
324 await airplay_player.stream._cli_proc.write_eof()
325
326 async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
327 """Start CLI process and ffmpeg for a single client."""
328 if airplay_player.stream and airplay_player.stream.running:
329 await airplay_player.stream.stop()
330 if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
331 airplay_player.stream = AirPlay2Stream(airplay_player)
332 else:
333 airplay_player.stream = RaopStream(airplay_player)
334 airplay_player.stream.session = self
335 sync_adjust = airplay_player.config.get_value(CONF_SYNC_ADJUST, 0)
336 assert isinstance(sync_adjust, int)
337 if sync_adjust != 0:
338 start_ntp = unix_time_to_ntp(ntp_to_unix_time(start_ntp) + (sync_adjust / 1000))
339 await airplay_player.stream.start(start_ntp)
340 # Start ffmpeg to feed audio to CLI stdin
341 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
342 await ffmpeg.close()
343 filter_params = get_player_filter_params(
344 self.mass,
345 airplay_player.player_id,
346 self.pcm_format,
347 airplay_player.stream.pcm_format,
348 )
349 cli_proc = airplay_player.stream._cli_proc
350 assert cli_proc
351 assert cli_proc.proc
352 assert cli_proc.proc.stdin
353 stdin_transport = cli_proc.proc.stdin.transport
354 audio_output: str | int = stdin_transport.get_extra_info("pipe").fileno()
355 ffmpeg = FFMpeg(
356 audio_input="-",
357 input_format=self.pcm_format,
358 output_format=airplay_player.stream.pcm_format,
359 filter_params=filter_params,
360 audio_output=audio_output,
361 )
362 await ffmpeg.start()
363 self._player_ffmpeg[airplay_player.player_id] = ffmpeg
364