/
/
/
1"""Unified AirPlay/RAOP stream session logic for AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections.abc import AsyncGenerator
8from contextlib import suppress
9from typing import TYPE_CHECKING
10
11from music_assistant_models.errors import PlayerCommandFailed
12
13from music_assistant.constants import CONF_SYNC_ADJUST
14from music_assistant.helpers.audio import get_player_filter_params
15from music_assistant.helpers.ffmpeg import FFMpeg
16from music_assistant.providers.airplay.helpers import ntp_to_unix_time, unix_time_to_ntp
17
18from .constants import (
19 AIRPLAY2_CONNECT_TIME_MS,
20 CONF_ENABLE_LATE_JOIN,
21 ENABLE_LATE_JOIN_DEFAULT,
22 RAOP_CONNECT_TIME_MS,
23 StreamingProtocol,
24)
25from .protocols.airplay2 import AirPlay2Stream
26from .protocols.raop import RaopStream
27
28if TYPE_CHECKING:
29 from music_assistant_models.media_items import AudioFormat
30
31 from .player import AirPlayPlayer
32 from .provider import AirPlayProvider
33
34
35class AirPlayStreamSession:
36 """Stream session (RAOP or AirPlay2) to one or more players."""
37
38 def __init__(
39 self,
40 airplay_provider: AirPlayProvider,
41 sync_clients: list[AirPlayPlayer],
42 pcm_format: AudioFormat,
43 ) -> None:
44 """Initialize AirPlayStreamSession.
45
46 :param airplay_provider: The AirPlay provider instance.
47 :param sync_clients: List of AirPlay players to stream to.
48 :param pcm_format: PCM format of the input stream.
49 """
50 assert sync_clients
51 self.prov = airplay_provider
52 self.mass = airplay_provider.mass
53 self.pcm_format = pcm_format
54 self.sync_clients = sync_clients
55 self._audio_source_task: asyncio.Task[None] | None = None
56 self._player_ffmpeg: dict[str, FFMpeg] = {}
57 self._lock = asyncio.Lock()
58 self.start_ntp: int = 0
59 self.start_time: float = 0.0
60 self.wait_start: float = 0.0
61 self.seconds_streamed: float = 0
62 self._first_chunk_received = asyncio.Event()
63
64 async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
65 """Initialize stream session for all players."""
66 cur_time = time.time()
67 has_airplay2_client = any(
68 p.protocol == StreamingProtocol.AIRPLAY2 for p in self.sync_clients
69 )
70 wait_start = AIRPLAY2_CONNECT_TIME_MS if has_airplay2_client else RAOP_CONNECT_TIME_MS
71 wait_start_seconds = wait_start / 1000
72 self.wait_start = wait_start_seconds
73 self.start_time = cur_time + wait_start_seconds
74 self.start_ntp = unix_time_to_ntp(self.start_time)
75 await asyncio.gather(*[self._start_client(p, self.start_ntp) for p in self.sync_clients])
76 self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
77 try:
78 await asyncio.gather(
79 *[p.stream.wait_for_connection() for p in self.sync_clients if p.stream]
80 )
81 except Exception:
82 # playback failed to start, cleanup
83 await self.stop()
84 raise PlayerCommandFailed("Playback failed to start")
85
86 async def stop(self) -> None:
87 """Stop playback and cleanup."""
88 if self._audio_source_task and not self._audio_source_task.done():
89 self._audio_source_task.cancel()
90 with suppress(asyncio.CancelledError):
91 await self._audio_source_task
92 await asyncio.gather(
93 *[self.remove_client(x) for x in self.sync_clients],
94 )
95
96 async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
97 """Remove a sync client from the session."""
98 async with self._lock:
99 if airplay_player not in self.sync_clients:
100 return
101 self.sync_clients.remove(airplay_player)
102 await self.stop_client(airplay_player)
103 # If this was the last client, stop the session
104 if not self.sync_clients:
105 await self.stop()
106 return
107
108 async def stop_client(self, airplay_player: AirPlayPlayer) -> None:
109 """
110 Stop a client's stream and ffmpeg.
111
112 :param airplay_player: The player to stop.
113 :param force: If True, kill CLI process immediately.
114 """
115 ffmpeg = self._player_ffmpeg.pop(airplay_player.player_id, None)
116 # note that we use kill instead of graceful close here,
117 # because otherwise it can take a very long time for the process to exit.
118 if ffmpeg and not ffmpeg.closed:
119 await ffmpeg.kill()
120 if airplay_player.stream and airplay_player.stream.session == self:
121 await airplay_player.stream.stop(force=True)
122
123 async def add_client(self, airplay_player: AirPlayPlayer) -> None:
124 """Add a sync client to the session as a late joiner.
125
126 The late joiner will:
127 1. Start playing at a compensated NTP timestamp (start_ntp + offset)
128 2. Receive silence calculated dynamically based on how much audio has been sent
129 3. Then receive real audio chunks in sync with other players
130 """
131 sync_leader = self.sync_clients[0]
132 if not sync_leader.stream or not sync_leader.stream.running:
133 return
134
135 allow_late_join = self.prov.config.get_value(
136 CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
137 )
138 if not allow_late_join:
139 await self.stop()
140 if sync_leader.state.current_media:
141 self.mass.call_later(
142 0.5,
143 self.mass.players.cmd_resume(sync_leader.player_id),
144 task_id=f"resync_session_{sync_leader.player_id}",
145 )
146 return
147
148 async with self._lock:
149 skip_seconds = self.seconds_streamed
150 start_at = self.start_time + skip_seconds
151 start_ntp = unix_time_to_ntp(start_at)
152 if airplay_player not in self.sync_clients:
153 self.sync_clients.append(airplay_player)
154
155 await self._start_client(airplay_player, start_ntp)
156 if airplay_player.stream:
157 await airplay_player.stream.wait_for_connection()
158
159 async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
160 """Stream audio to all players."""
161 pcm_sample_size = self.pcm_format.pcm_sample_size
162 watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
163 try:
164 async for chunk in audio_source:
165 if not self._first_chunk_received.is_set():
166 watchdog_task.cancel()
167 with suppress(asyncio.CancelledError):
168 await watchdog_task
169 self._first_chunk_received.set()
170
171 if not self.sync_clients:
172 break
173
174 await self._write_chunk_to_all_players(chunk)
175 self.seconds_streamed += len(chunk) / pcm_sample_size
176 finally:
177 if not watchdog_task.done():
178 watchdog_task.cancel()
179 with suppress(asyncio.CancelledError):
180 await watchdog_task
181 async with self._lock:
182 await asyncio.gather(
183 *[
184 self._write_eof_to_player(x)
185 for x in self.sync_clients
186 if x.stream and x.stream.running
187 ],
188 return_exceptions=True,
189 )
190
191 async def _silence_watchdog(self, pcm_sample_size: int) -> None:
192 """Insert silence if audio source is slow to deliver first chunk."""
193 grace_period = 0.2
194 max_silence_padding = 5.0
195 silence_inserted = 0.0
196
197 await asyncio.sleep(grace_period)
198 while not self._first_chunk_received.is_set() and silence_inserted < max_silence_padding:
199 silence_duration = 0.1
200 silence_bytes = int(pcm_sample_size * silence_duration)
201 silence_chunk = bytes(silence_bytes)
202 await self._write_chunk_to_all_players(silence_chunk)
203 self.seconds_streamed += silence_duration
204 silence_inserted += silence_duration
205 await asyncio.sleep(0.05)
206
207 if silence_inserted > 0:
208 self.prov.logger.warning(
209 "Inserted %.1fs silence padding while waiting for audio source",
210 silence_inserted,
211 )
212
213 async def _write_chunk_to_all_players(self, chunk: bytes) -> None:
214 """Write a chunk to all connected players."""
215 async with self._lock:
216 sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
217 if not sync_clients:
218 return
219
220 # Write chunk to all players
221 write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
222 results = await asyncio.gather(*write_tasks, return_exceptions=True)
223
224 # Check for write errors or timeouts
225 players_to_remove: list[AirPlayPlayer] = []
226 for i, result in enumerate(results):
227 if i >= len(sync_clients):
228 continue
229 player = sync_clients[i]
230
231 if isinstance(result, TimeoutError):
232 self.prov.logger.warning(
233 "Removing player %s from session: stopped reading data (write timeout)",
234 player.player_id,
235 )
236 players_to_remove.append(player)
237 elif isinstance(result, Exception):
238 self.prov.logger.warning(
239 "Removing player %s from session due to write error: %s",
240 player.player_id,
241 result,
242 )
243 players_to_remove.append(player)
244
245 for player in players_to_remove:
246 self.mass.create_task(self.remove_client(player))
247
248 async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
249 """Write audio chunk to a player's ffmpeg process."""
250 player_id = airplay_player.player_id
251 if ffmpeg := self._player_ffmpeg.get(player_id):
252 if ffmpeg.closed:
253 return
254 await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
255
256 async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
257 """Write EOF to a specific player."""
258 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
259 await ffmpeg.write_eof()
260 await ffmpeg.wait_with_timeout(30)
261 if airplay_player.stream and airplay_player.stream._cli_proc:
262 await airplay_player.stream._cli_proc.write_eof()
263
264 async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
265 """Start CLI process and ffmpeg for a single client."""
266 if airplay_player.stream and airplay_player.stream.running:
267 await airplay_player.stream.stop()
268 if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
269 airplay_player.stream = AirPlay2Stream(airplay_player)
270 else:
271 airplay_player.stream = RaopStream(airplay_player)
272 airplay_player.stream.session = self
273 sync_adjust = airplay_player.config.get_value(CONF_SYNC_ADJUST, 0)
274 assert isinstance(sync_adjust, int)
275 if sync_adjust != 0:
276 start_ntp = unix_time_to_ntp(ntp_to_unix_time(start_ntp) + (sync_adjust / 1000))
277 await airplay_player.stream.start(start_ntp)
278 # Start ffmpeg to feed audio to CLI stdin
279 if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
280 await ffmpeg.close()
281 filter_params = get_player_filter_params(
282 self.mass,
283 airplay_player.player_id,
284 self.pcm_format,
285 airplay_player.stream.pcm_format,
286 )
287 cli_proc = airplay_player.stream._cli_proc
288 assert cli_proc
289 assert cli_proc.proc
290 assert cli_proc.proc.stdin
291 stdin_transport = cli_proc.proc.stdin.transport
292 audio_output: str | int = stdin_transport.get_extra_info("pipe").fileno()
293 ffmpeg = FFMpeg(
294 audio_input="-",
295 input_format=self.pcm_format,
296 output_format=airplay_player.stream.pcm_format,
297 filter_params=filter_params,
298 audio_output=audio_output,
299 )
300 await ffmpeg.start()
301 self._player_ffmpeg[airplay_player.player_id] = ffmpeg
302