/
/
/
1"""Unified AirPlay/RAOP stream session logic for AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections.abc import AsyncGenerator
8from contextlib import suppress
9from typing import TYPE_CHECKING
10
11from music_assistant_models.errors import PlayerCommandFailed
12
13from music_assistant.constants import CONF_SYNC_ADJUST
14from music_assistant.helpers.audio import get_player_filter_params
15from music_assistant.helpers.ffmpeg import FFMpeg
16from music_assistant.providers.airplay.helpers import ntp_to_unix_time, unix_time_to_ntp
17
18from .constants import (
19 AIRPLAY2_CONNECT_TIME_MS,
20 CONF_ENABLE_LATE_JOIN,
21 ENABLE_LATE_JOIN_DEFAULT,
22 RAOP_CONNECT_TIME_MS,
23 StreamingProtocol,
24)
25from .protocols.airplay2 import AirPlay2Stream
26from .protocols.raop import RaopStream
27
28if TYPE_CHECKING:
29 from music_assistant_models.media_items import AudioFormat
30
31 from .player import AirPlayPlayer
32 from .provider import AirPlayProvider
33
34
35class AirPlayStreamSession:
36 """Stream session (RAOP or AirPlay2) to one or more players."""
37
38 def __init__(
39 self,
40 airplay_provider: AirPlayProvider,
41 sync_clients: list[AirPlayPlayer],
42 pcm_format: AudioFormat,
43 ) -> None:
44 """Initialize AirPlayStreamSession.
45
46 :param airplay_provider: The AirPlay provider instance.
47 :param sync_clients: List of AirPlay players to stream to.
48 :param pcm_format: PCM format of the input stream.
49 """
50 assert sync_clients
51 self.prov = airplay_provider
52 self.mass = airplay_provider.mass
53 self.pcm_format = pcm_format
54 self.sync_clients = sync_clients
55 self._audio_source_task: asyncio.Task[None] | None = None
56 self._player_ffmpeg: dict[str, FFMpeg] = {}
57 self._lock = asyncio.Lock()
58 self.start_ntp: int = 0
59 self.start_time: float = 0.0
60 self.wait_start: float = 0.0
61 self.seconds_streamed: float = 0
62 self.total_pause_time: float = 0.0
63 self.last_paused: float | None = None
64 self._first_chunk_received = asyncio.Event()
65
66 async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
67 """Initialize stream session for all players."""
68 cur_time = time.time()
69 has_airplay2_client = any(
70 p.protocol == StreamingProtocol.AIRPLAY2 for p in self.sync_clients
71 )
72 wait_start = AIRPLAY2_CONNECT_TIME_MS if has_airplay2_client else RAOP_CONNECT_TIME_MS
73 wait_start_seconds = wait_start / 1000
74 self.wait_start = wait_start_seconds
75 self.start_time = cur_time + wait_start_seconds
76 self.start_ntp = unix_time_to_ntp(self.start_time)
77 await asyncio.gather(*[self._start_client(p, self.start_ntp) for p in self.sync_clients])
78 self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
79 try:
80 await asyncio.gather(
81 *[p.stream.wait_for_connection() for p in self.sync_clients if p.stream]
82 )
83 except Exception:
84 # playback failed to start, cleanup
85 await self.stop()
86 raise PlayerCommandFailed("Playback failed to start")
87
88 async def stop(self, force: bool = False) -> None:
89 """Stop playback and cleanup."""
90 if self._audio_source_task and not self._audio_source_task.done():
91 self._audio_source_task.cancel()
92 with suppress(asyncio.CancelledError):
93 await self._audio_source_task
94 if force:
95 await asyncio.gather(
96 *[self.stop_client(x, force=True) for x in self.sync_clients],
97 )
98 self.sync_clients = []
99 else:
100 await asyncio.gather(
101 *[self.remove_client(x) for x in self.sync_clients],
102 )
103
104 async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
105 """Remove a sync client from the session."""
106 async with self._lock:
107 if airplay_player not in self.sync_clients:
108 return
109 self.sync_clients.remove(airplay_player)
110 await self.stop_client(airplay_player)
111 # If this was the last client, stop the session
112 if not self.sync_clients:
113 await self.stop()
114 return
115
116 async def stop_client(self, airplay_player: AirPlayPlayer, force: bool = False) -> None:
117 """
118 Stop a client's stream and ffmpeg.
119
120 :param airplay_player: The player to stop.
121 :param force: If True, kill CLI process immediately.
122 """
123 ffmpeg = self._player_ffmpeg.pop(airplay_player.player_id, None)
124 if force:
125 if ffmpeg and not ffmpeg.closed:
126 await ffmpeg.kill()
127 if airplay_player.stream and airplay_player.stream.session == self:
128 await airplay_player.stream.stop(force=True)
129 else:
130 if ffmpeg and not ffmpeg.closed:
131 await ffmpeg.close()
132 if airplay_player.stream and airplay_player.stream.session == self:
133 await airplay_player.stream.stop()
134
135 async def add_client(self, airplay_player: AirPlayPlayer) -> None:
136 """Add a sync client to the session as a late joiner.
137
138 The late joiner will:
139 1. Start playing at a compensated NTP timestamp (start_ntp + offset)
140 2. Receive silence calculated dynamically based on how much audio has been sent
141 3. Then receive real audio chunks in sync with other players
142 """
143 sync_leader = self.sync_clients[0]
144 if not sync_leader.stream or not sync_leader.stream.running:
145 return
146
147 allow_late_join = self.prov.config.get_value(
148 CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
149 )
150 if not allow_late_join:
151 await self.stop()
152 if sync_leader.current_media:
153 self.mass.call_later(
154 0.5,
155 self.mass.players.cmd_resume(sync_leader.player_id),
156 task_id=f"resync_session_{sync_leader.player_id}",
157 )
158 return
159
160 async with self._lock:
161 skip_seconds = self.seconds_streamed
162 start_at = self.start_time + skip_seconds
163 start_ntp = unix_time_to_ntp(start_at)
164 if airplay_player not in self.sync_clients:
165 self.sync_clients.append(airplay_player)
166
167 await self._start_client(airplay_player, start_ntp)
168 if airplay_player.stream:
169 await airplay_player.stream.wait_for_connection()
170
171 async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
172 """Stream audio to all players."""
173 pcm_sample_size = self.pcm_format.pcm_sample_size
174 watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
175 try:
176 async for chunk in audio_source:
177 if not self._first_chunk_received.is_set():
178 watchdog_task.cancel()
179 with suppress(asyncio.CancelledError):
180 await watchdog_task
181 self._first_chunk_received.set()
182
183 if not self.sync_clients:
184 break
185
186 await self._write_chunk_to_all_players(chunk)
187 self.seconds_streamed += len(chunk) / pcm_sample_size
188 finally:
189 if not watchdog_task.done():
190 watchdog_task.cancel()
191 with suppress(asyncio.CancelledError):
192 await watchdog_task
193 async with self._lock:
194 await asyncio.gather(
195 *[
196 self._write_eof_to_player(x)
197 for x in self.sync_clients
198 if x.stream and x.stream.running
199 ],
200 return_exceptions=True,
201 )
202
203 async def _silence_watchdog(self, pcm_sample_size: int) -> None:
204 """Insert silence if audio source is slow to deliver first chunk."""
205 grace_period = 0.2
206 max_silence_padding = 5.0
207 silence_inserted = 0.0
208
209 await asyncio.sleep(grace_period)
210 while not self._first_chunk_received.is_set() and silence_inserted < max_silence_padding:
211 silence_duration = 0.1
212 silence_bytes = int(pcm_sample_size * silence_duration)
213 silence_chunk = bytes(silence_bytes)
214 await self._write_chunk_to_all_players(silence_chunk)
215 self.seconds_streamed += silence_duration
216 silence_inserted += silence_duration
217 await asyncio.sleep(0.05)
218
219 if silence_inserted > 0:
220 self.prov.logger.warning(
221 "Inserted %.1fs silence padding while waiting for audio source",
222 silence_inserted,
223 )
224
225 async def _write_chunk_to_all_players(self, chunk: bytes) -> None:
226 """Write a chunk to all connected players."""
227 async with self._lock:
228 sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
229 if not sync_clients:
230 return
231
232 # Write chunk to all players
233 write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
234 results = await asyncio.gather(*write_tasks, return_exceptions=True)
235
236 # Check for write errors or timeouts
237 players_to_remove: list[AirPlayPlayer] = []
238 for i, result in enumerate(results):
239 if i >= len(sync_clients):
240 continue
241 player = sync_clients[i]
242
243 if isinstance(result, TimeoutError):
244 self.prov.logger.warning(
245 "Removing player %s from session: stopped reading data (write timeout)",
246 player.player_id,
247 )
248 players_to_remove.append(player)
249 elif isinstance(result, Exception):
250 self.prov.logger.warning(
251 "Removing player %s from session due to write error: %s",
252 player.player_id,
253 result,
254 )
255 players_to_remove.append(player)
256
257 for player in players_to_remove:
258 self.mass.create_task(self.remove_client(player))
259
260 async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
261 """Write audio chunk to a player's ffmpeg process."""
262 player_id = airplay_player.player_id
263 if ffmpeg := self._player_ffmpeg.get(player_id):
264 if ffmpeg.closed:
265 return
266 await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
267
268 async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
269 """Write EOF to a specific player."""
270 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
271 await ffmpeg.write_eof()
272 await ffmpeg.wait_with_timeout(30)
273 if airplay_player.stream and airplay_player.stream._cli_proc:
274 await airplay_player.stream._cli_proc.write_eof()
275
276 async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
277 """Start CLI process and ffmpeg for a single client."""
278 if airplay_player.stream and airplay_player.stream.running:
279 await airplay_player.stream.stop()
280 if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
281 airplay_player.stream = AirPlay2Stream(airplay_player)
282 else:
283 airplay_player.stream = RaopStream(airplay_player)
284 airplay_player.stream.session = self
285 sync_adjust = airplay_player.config.get_value(CONF_SYNC_ADJUST, 0)
286 assert isinstance(sync_adjust, int)
287 if sync_adjust != 0:
288 start_ntp = unix_time_to_ntp(ntp_to_unix_time(start_ntp) + (sync_adjust / 1000))
289 await airplay_player.stream.start(start_ntp)
290 # Start ffmpeg to feed audio to CLI stdin
291 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
292 await ffmpeg.close()
293 filter_params = get_player_filter_params(
294 self.mass,
295 airplay_player.player_id,
296 self.pcm_format,
297 airplay_player.stream.pcm_format,
298 )
299 cli_proc = airplay_player.stream._cli_proc
300 assert cli_proc
301 assert cli_proc.proc
302 assert cli_proc.proc.stdin
303 stdin_transport = cli_proc.proc.stdin.transport
304 audio_output: str | int = stdin_transport.get_extra_info("pipe").fileno()
305 ffmpeg = FFMpeg(
306 audio_input="-",
307 input_format=self.pcm_format,
308 output_format=airplay_player.stream.pcm_format,
309 filter_params=filter_params,
310 audio_output=audio_output,
311 )
312 await ffmpeg.start()
313 self._player_ffmpeg[airplay_player.player_id] = ffmpeg
314