/
/
/
1"""Playback session coordinator for Sendspin players."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections import deque
8from collections.abc import Iterator
9from contextlib import suppress
10from dataclasses import dataclass, field
11from typing import TYPE_CHECKING, Any
12from uuid import UUID, uuid4
13
14from aiosendspin.server.audio import AudioFormat as SendspinAudioFormat
15from aiosendspin.server.push_stream import MAIN_CHANNEL, PushStream
16from music_assistant_models.enums import ContentType
17from music_assistant_models.media_items.audio_format import AudioFormat
18
19from music_assistant.constants import CONF_OUTPUT_CHANNELS
20from music_assistant.helpers.audio import get_player_filter_params
21from music_assistant.helpers.ffmpeg import FFMpeg
22from music_assistant.models.player import PlayerMedia
23
24if TYPE_CHECKING:
25 from .player import SendspinPlayer
26
27
28# Same sample format expressed in both MA and Sendspin type systems.
29_PCM_FORMAT = AudioFormat(
30 content_type=ContentType.PCM_S32LE,
31 sample_rate=48000,
32 bit_depth=32,
33 channels=2,
34)
35_SENDSPIN_PCM_FORMAT = SendspinAudioFormat(
36 sample_rate=48000,
37 bit_depth=32,
38 channels=2,
39)
40# Max PCM slice fed to the producer per iteration.
41_PRODUCER_SLICE_US = 100_000
42# Max pending chunks between producer and committer before the producer blocks.
43_PRODUCER_BACKLOG_SIZE = 64
44# Backpressure threshold: push stream sleeps when buffered audio exceeds this.
45_PRODUCER_BUFFER_LIMIT_US = 30_000_000
46# Start join promotion once catchup processor lag is within this window of the history tail.
47_JOIN_PROMOTE_ARM_WINDOW_US = 2_000_000
48# Accept catchup output within this margin of the promotion target.
49_JOIN_PROMOTE_TOLERANCE_US = 50_000
50# Abort join catchup if promotion hasn't completed within this.
51_JOIN_PROMOTION_TIMEOUT_S = 15.0
52# Retain committed history this far behind real-time for late-join backfill.
53# This pre-history also warms up ffmpeg's internal filter buffers so the DSP
54# output has settled by the time the member's channel goes live.
55_HISTORY_KEEP_PAST_US = 1_000_000
56
57
58class _BufferedFfmpegProcessor:
59 """FFmpeg wrapper with small output carry-over buffer and duration-based reads."""
60
61 def __init__(self, ffmpeg: FFMpeg, audio_format: AudioFormat) -> None:
62 self._ffmpeg = ffmpeg
63 self._output_buffer = bytearray()
64 bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
65 self._sample_rate = int(audio_format.sample_rate)
66 self._frame_size = bytes_per_sample * int(audio_format.channels)
67 self._bytes_per_second = self._sample_rate * self._frame_size
68 # ~25ms worth of audio per read syscall.
69 self._read_quantum_bytes = max(1, int(self._bytes_per_second * 0.025))
70 self._produced_output_us = 0
71
72 async def start(self) -> None:
73 await self._ffmpeg.start()
74
75 async def close(self) -> None:
76 await self._ffmpeg.close()
77
78 async def push(self, pcm: bytes) -> None:
79 await self._ffmpeg.write(pcm)
80
81 @property
82 def produced_output_us(self) -> int:
83 """Return cumulative output duration currently drained from ffmpeg."""
84 return self._produced_output_us
85
86 async def read_duration_us(self, duration_us: int) -> bytes:
87 """Block-read exactly `duration_us` worth of processed PCM from ffmpeg."""
88 target_bytes = self._target_bytes_for_duration_us(duration_us)
89 if target_bytes == 0:
90 return b""
91
92 while len(self._output_buffer) < target_bytes:
93 missing = target_bytes - len(self._output_buffer)
94 read_size = max(self._read_quantum_bytes, missing)
95 chunk = await self._ffmpeg.readexactly(read_size)
96 self._output_buffer.extend(chunk)
97
98 out = bytes(self._output_buffer[:target_bytes])
99 del self._output_buffer[:target_bytes]
100 return out
101
102 async def drain_available(self) -> int:
103 """Non-blocking drain of ffmpeg stdout into internal buffer.
104
105 Returns cumulative produced output duration in microseconds.
106 """
107 while True:
108 try:
109 # 1ms timeout: non-blocking check for available data.
110 chunk = await asyncio.wait_for(
111 self._ffmpeg.read(self._read_quantum_bytes),
112 timeout=0.001,
113 )
114 except TimeoutError:
115 break
116 if not chunk:
117 break
118 self._output_buffer.extend(chunk)
119 self._produced_output_us += self._duration_us_for_bytes(len(chunk))
120 if len(chunk) < self._read_quantum_bytes:
121 break
122 return self._produced_output_us
123
124 async def drain_forever(self) -> None:
125 """Continuously drain ffmpeg stdout into internal buffer until EOF."""
126 while True:
127 chunk = await self._ffmpeg.read(self._read_quantum_bytes)
128 if not chunk:
129 break
130 self._output_buffer.extend(chunk)
131 self._produced_output_us += self._duration_us_for_bytes(len(chunk))
132
133 def pop_duration_us(self, duration_us: int) -> bytes | None:
134 """Pop exactly `duration_us` from already buffered output, or None if insufficient."""
135 target_bytes = self._target_bytes_for_duration_us(duration_us)
136 if target_bytes == 0:
137 return b""
138 if len(self._output_buffer) < target_bytes:
139 return None
140 out = bytes(self._output_buffer[:target_bytes])
141 del self._output_buffer[:target_bytes]
142 return out
143
144 def buffered_duration_us(self) -> int:
145 """Return buffered output duration currently available for immediate pop."""
146 return self._duration_us_for_bytes(len(self._output_buffer))
147
148 def pop_duration_us_or_pad(self, duration_us: int, pad_tolerance_us: int) -> bytes | None:
149 """Pop target duration; if short within tolerance, pad tail with silence."""
150 target_bytes = self._target_bytes_for_duration_us(duration_us)
151 if target_bytes == 0:
152 return b""
153 available = len(self._output_buffer)
154 if available >= target_bytes:
155 out = bytes(self._output_buffer[:target_bytes])
156 del self._output_buffer[:target_bytes]
157 return out
158 short_bytes = target_bytes - available
159 short_us = self._duration_us_for_bytes(short_bytes)
160 if short_us > max(0, pad_tolerance_us):
161 return None
162 out = bytes(self._output_buffer)
163 self._output_buffer.clear()
164 return out + (b"\x00" * short_bytes)
165
166 def _duration_us_for_bytes(self, byte_count: int) -> int:
167 if byte_count <= 0 or self._sample_rate <= 0 or self._frame_size <= 0:
168 return 0
169 frames = byte_count // self._frame_size
170 if frames <= 0:
171 return 0
172 return int((frames * 1_000_000) / self._sample_rate)
173
174 def _target_bytes_for_duration_us(self, duration_us: int) -> int:
175 """Convert duration to frame-aligned PCM byte count."""
176 if duration_us <= 0 or self._sample_rate <= 0 or self._frame_size <= 0:
177 return 0
178 samples = max(0, int((duration_us * self._sample_rate + 500_000) / 1_000_000))
179 return samples * self._frame_size
180
181
182@dataclass(slots=True)
183class _HistoryChunk:
184 start_time_us: int
185 duration_us: int
186 pcm: bytes
187
188
189@dataclass(slots=True)
190class _PendingChunk:
191 pcm: bytes
192 duration_us: int
193
194
195@dataclass(slots=True)
196class _JoinCatchupState:
197 """Per-member state for a join-catchup processor replaying history through DSP.
198
199 The processor is fed historical + live PCM via ``input_queue``. Once its
200 output catches up to the live stream (within tolerance), it is promoted to
201 the member's live pipeline. See ``_inject_ready_join_historical`` for the
202 full promotion lifecycle.
203 """
204
205 processor: _BufferedFfmpegProcessor
206 input_queue: asyncio.Queue[bytes | None]
207 writer_task: asyncio.Task[None]
208 drainer_task: asyncio.Task[None]
209 snapshot_task: asyncio.Task[None] | None = None
210 # Timeline position of the first history chunk fed into the processor.
211 first_history_start_us: int | None = None
212 # Timeline position up to which PCM has been enqueued into the processor.
213 fed_until_us: int | None = None
214 # End of the history snapshot taken when catchup started.
215 history_end_us: int | None = None
216 # Locked target: once set, promotion fires when output reaches this point.
217 promotion_target_end_us: int | None = None
218 # Monotonic time when promotion was armed, used for timeout detection.
219 promotion_armed_monotonic_s: float | None = None
220 write_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
221
222
223@dataclass(slots=True)
224class _PipelineConfig:
225 requires_transform: bool
226 output_channels: str
227 filter_params: tuple[str, ...]
228
229 @property
230 def signature(self) -> tuple[bool, str, tuple[str, ...]]:
231 return (self.requires_transform, self.output_channels, self.filter_params)
232
233
234@dataclass(slots=True)
235class _MemberPipeline:
236 player_id: str
237 channel_id: UUID
238 config: _PipelineConfig
239 processor: _BufferedFfmpegProcessor | None = None
240 ready: bool = False
241
242
243class SendspinPlaybackSession:
244 """Coordinates playback for a Sendspin player group leader.
245
246 The push stream supports multi-channel audio: members that need per-player
247 DSP (EQ, channel mixing, output routing) each get a dedicated ffmpeg
248 processor and a separate channel. Members without DSP share MAIN_CHANNEL
249 and receive the raw PCM directly.
250
251 Playback runs as two concurrent coroutines inside ``_run_playback``:
252
253 * **Producer** -- reads PCM from the MA stream, slices it into fixed-size
254 chunks, queues them, and writes each slice into per-member ffmpeg
255 processors (transform push) in parallel.
256 * **Consumer** -- dequeues chunks, reads the corresponding transformed
257 output from each processor (transform read), prepares all channels on
258 the push stream, commits audio, and applies backpressure via
259 ``sleep_to_limit_buffer``.
260
261 When a new member joins mid-playback, a *join-catchup* processor replays
262 committed history through the member's DSP chain so it can be promoted
263 to the live pipeline without an audible gap.
264 """
265
266 def __init__(self, player: SendspinPlayer) -> None:
267 """Initialize session coordinator bound to the owning player."""
268 self.player = player
269 self.playback_task: asyncio.Task[None] | None = None
270 self.pending_join_members: set[str] = set()
271 self._state_lock = asyncio.Lock()
272 self._members: set[str] = set()
273 self._member_pipelines: dict[str, _MemberPipeline] = {}
274 self._push_stream: PushStream | None = None
275 self._playback_running = False
276 self._timeline_start_us: int | None = None
277 self._first_commit_monotonic_us: int | None = None
278 self._produced_audio_us = 0
279 self._history: deque[_HistoryChunk] = deque()
280 self._join_catchup: dict[str, _JoinCatchupState] = {}
281 self._pipeline_config_cache: dict[str, _PipelineConfig] = {}
282 self._preassigned_channels: dict[str, UUID] = {}
283 self._mapping_dirty = True
284
285 # -- Helpers ---------------------------------------------------------------
286
287 def _attach_task_exception_logger(self, task: asyncio.Task[Any], name: str) -> None:
288 """Log unhandled exception from background task when it finishes."""
289
290 def _done_callback(done_task: asyncio.Task[Any]) -> None:
291 if done_task.cancelled():
292 return
293 with suppress(Exception):
294 exc = done_task.exception()
295 if exc is not None:
296 self.player.logger.exception(
297 "Background task failed: %s",
298 name,
299 exc_info=exc,
300 )
301
302 task.add_done_callback(_done_callback)
303
304 def _get_join_readiness(self) -> tuple[bool, str | None]:
305 """Check whether live join DSP preparation can be performed right now."""
306 if self._playback_running and self._push_stream is not None:
307 return (True, None)
308 return (False, "no active stream context")
309
310 # -- Snapshot helper -------------------------------------------------------
311
312 async def _snapshot_active_pipelines(
313 self,
314 ) -> tuple[set[str], tuple[tuple[str, _MemberPipeline], ...]]:
315 """Return (join_pending_ids, active_pipelines) under lock."""
316 async with self._state_lock:
317 members = self._members
318 return set(self._join_catchup), tuple(
319 (mid, p) for mid, p in self._member_pipelines.items() if mid in members
320 )
321
322 # -- Public API ------------------------------------------------------------
323
324 async def cancel(self, reason: str) -> None:
325 """Cancel and await the active playback task, if any."""
326 task = self.playback_task
327 if task is None:
328 return
329 if task.done():
330 if self.playback_task is task:
331 self.playback_task = None
332 return
333 self.player.logger.debug("Cancelling playback task (%s)", reason)
334 task.cancel()
335 with suppress(asyncio.CancelledError, Exception):
336 await task
337 if self.playback_task is task:
338 self.playback_task = None
339
340 async def start(self, media: PlayerMedia, restart: bool = False) -> None:
341 """Start background playback for `media`."""
342 active_task = self.playback_task
343 if active_task is not None and not active_task.done():
344 if not restart:
345 raise RuntimeError("playback already active")
346 await self.cancel("restart requested")
347 self.playback_task = asyncio.create_task(self._run_playback(media))
348
349 async def close(self) -> None:
350 """Stop playback and release all managed resources."""
351 await self.cancel("session close")
352 self.pending_join_members.clear()
353 async with self._state_lock:
354 self._members.clear()
355 self._mapping_dirty = True
356 await self._clear_member_pipelines()
357 await self._clear_join_catchup()
358 async with self._state_lock:
359 self._history.clear()
360 self._produced_audio_us = 0
361 self._timeline_start_us = None
362 self._first_commit_monotonic_us = None
363 self._pipeline_config_cache.clear()
364 self._preassigned_channels.clear()
365
366 async def add_member(self, player_id: str) -> None:
367 """Add a member to the group with DSP-aware lifecycle handling."""
368 async with self._state_lock:
369 if player_id in self._members:
370 self.pending_join_members.discard(player_id)
371 return
372 # Force a fresh channel identity for every new join cycle.
373 self._preassigned_channels[player_id] = uuid4()
374 self.pending_join_members.add(player_id)
375 try:
376 await self._start_join_catchup(player_id)
377 async with self._state_lock:
378 self._members.add(player_id)
379 self._mapping_dirty = True
380 except Exception:
381 await self._release_player_channel(player_id)
382 raise
383 finally:
384 self.pending_join_members.discard(player_id)
385
386 async def remove_member(self, player_id: str) -> None:
387 """Remove a member from the group and clean up per-member playback state."""
388 self.pending_join_members.discard(player_id)
389 async with self._state_lock:
390 self._members.discard(player_id)
391 self._mapping_dirty = True
392 self._pipeline_config_cache.pop(player_id, None)
393 self._preassigned_channels.pop(player_id, None)
394 await self._stop_join_catchup(player_id)
395 await self._release_player_channel(player_id)
396
397 async def sync_members(self, member_ids: set[str]) -> None:
398 """Reconcile session members to exactly the provided set."""
399 async with self._state_lock:
400 current_members = set(self._members)
401 for player_id in current_members - member_ids:
402 await self.remove_member(player_id)
403 for player_id in member_ids - current_members:
404 await self.add_member(player_id)
405
406 # -- Join catchup ----------------------------------------------------------
407
408 async def _start_join_catchup(self, player_id: str) -> None:
409 """Start dedicated join catchup processor fed from committed history."""
410 async with self._state_lock:
411 playback_active = self._playback_running and self._push_stream is not None
412 if not playback_active:
413 return
414
415 pipeline = await self._sync_member_pipeline(player_id)
416 if not pipeline.config.requires_transform:
417 return
418
419 await self._stop_join_catchup(player_id)
420
421 ffmpeg_obj = self._create_member_ffmpeg(pipeline.config.filter_params)
422 processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT)
423 await processor.start()
424 # Bounded queue sized to hold the full buffer duration with some headroom.
425 queue_size = (_PRODUCER_BUFFER_LIMIT_US // _PRODUCER_SLICE_US) + _PRODUCER_BACKLOG_SIZE
426 input_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=queue_size)
427
428 async with self._state_lock:
429 history_snapshot = list(self._history)
430 if not history_snapshot:
431 await processor.close()
432 return
433 history_end_us = history_snapshot[-1].start_time_us + history_snapshot[-1].duration_us
434
435 async def _writer() -> None:
436 while True:
437 chunk = await input_queue.get()
438 if chunk is None:
439 return
440 await processor.push(chunk)
441
442 async def _drainer() -> None:
443 await processor.drain_forever()
444
445 writer_task = asyncio.create_task(_writer())
446 drainer_task = asyncio.create_task(_drainer())
447 self._attach_task_exception_logger(writer_task, f"join_writer:{player_id}")
448 self._attach_task_exception_logger(drainer_task, f"join_drainer:{player_id}")
449
450 state = _JoinCatchupState(
451 processor=processor,
452 input_queue=input_queue,
453 writer_task=writer_task,
454 drainer_task=drainer_task,
455 history_end_us=history_end_us,
456 )
457 async with self._state_lock:
458 self._join_catchup[player_id] = state
459
460 async with self._state_lock:
461 current = self._join_catchup.get(player_id)
462 if current is not None and current.processor is processor:
463 current.snapshot_task = asyncio.create_task(
464 self._feed_join_history(player_id, processor, history_snapshot)
465 )
466 self._attach_task_exception_logger(
467 current.snapshot_task, f"join_snapshot:{player_id}"
468 )
469
470 async def _feed_join_history(
471 self,
472 player_id: str,
473 processor: _BufferedFfmpegProcessor,
474 history_snapshot: list[_HistoryChunk],
475 ) -> None:
476 """Feed historical PCM into a join-catchup processor."""
477 async with self._state_lock:
478 state = self._join_catchup.get(player_id)
479 if state is None or state.processor is not processor:
480 return
481 async with state.write_lock:
482 first_history_start_us: int | None = None
483 previous_end_us: int | None = None
484 for hist_chunk in history_snapshot:
485 if first_history_start_us is None:
486 first_history_start_us = hist_chunk.start_time_us
487 async with self._state_lock:
488 current = self._join_catchup.get(player_id)
489 if current is not None and current.processor is processor:
490 current.first_history_start_us = first_history_start_us
491 current.fed_until_us = first_history_start_us
492 if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us:
493 gap_us = hist_chunk.start_time_us - previous_end_us
494 silence = self._silence_for_duration_us(gap_us)
495 if silence:
496 await self._enqueue_join_pcm(state, silence)
497 await self._enqueue_join_pcm(state, hist_chunk.pcm)
498 previous_end_us = hist_chunk.start_time_us + hist_chunk.duration_us
499 async with self._state_lock:
500 current = self._join_catchup.get(player_id)
501 if current is not None and current.processor is processor:
502 current.fed_until_us = previous_end_us
503
504 async def _stop_join_catchup(self, player_id: str) -> None:
505 """Stop and remove dedicated join catchup processor for one player."""
506 async with self._state_lock:
507 state = self._join_catchup.pop(player_id, None)
508 if state is None:
509 return
510 if state.snapshot_task is not None:
511 state.snapshot_task.cancel()
512 with suppress(asyncio.CancelledError, Exception):
513 await state.snapshot_task
514 state.writer_task.cancel()
515 with suppress(asyncio.CancelledError, Exception):
516 await state.writer_task
517 state.drainer_task.cancel()
518 with suppress(asyncio.CancelledError, Exception):
519 await state.drainer_task
520 with suppress(Exception):
521 await state.processor.close()
522
523 async def _promote_join_catchup_processor(
524 self,
525 player_id: str,
526 pipeline: _MemberPipeline,
527 target_end_us: int,
528 ) -> None:
529 """Promote join catchup processor to the member's live DSP processor."""
530 old_processor: _BufferedFfmpegProcessor | None = None
531 async with self._state_lock:
532 state = self._join_catchup.pop(player_id, None)
533 if state is None:
534 return
535 old_processor = pipeline.processor
536 pipeline.processor = state.processor
537 if state.snapshot_task is not None:
538 state.snapshot_task.cancel()
539 with suppress(asyncio.CancelledError, Exception):
540 await state.snapshot_task
541 # Let writer flush queued PCM before handoff; cancel if queue is full.
542 try:
543 state.input_queue.put_nowait(None)
544 except asyncio.QueueFull:
545 state.writer_task.cancel()
546 with suppress(asyncio.CancelledError, Exception):
547 await state.writer_task
548 state.drainer_task.cancel()
549 with suppress(asyncio.CancelledError, Exception):
550 await state.drainer_task
551 if old_processor is not None and old_processor is not state.processor:
552 with suppress(Exception):
553 await old_processor.close()
554
555 async def _clear_join_catchup(self) -> None:
556 """Stop and remove all dedicated join catchup processors."""
557 async with self._state_lock:
558 player_ids = list(self._join_catchup.keys())
559 for player_id in player_ids:
560 await self._stop_join_catchup(player_id)
561
562 async def _release_player_channel(self, player_id: str) -> None:
563 """Release per-member channel/DSP state for a removed member."""
564 async with self._state_lock:
565 pipeline = self._member_pipelines.pop(player_id, None)
566 self._preassigned_channels.pop(player_id, None)
567 if pipeline is None or pipeline.processor is None:
568 return
569 await self._close_member_ffmpeg(pipeline.processor)
570
571 # -- Playback pipeline -----------------------------------------------------
572
573 async def _run_playback(self, media: PlayerMedia) -> None: # noqa: PLR0915
574 """Run the playback pipeline for a single media session.
575
576 Pulls PCM from the MA stream, feeds main + per-member DSP channels into the
577 Sendspin push stream, and commits audio continuously. Supports dynamic group
578 membership changes and late-join historical backfill while running.
579 """
580 push_stream = self._create_push_stream()
581 async with self._state_lock:
582 self._push_stream = push_stream
583 self._playback_running = True
584 self._history.clear()
585 self._produced_audio_us = 0
586 self._timeline_start_us = None
587 self._first_commit_monotonic_us = None
588 self._mapping_dirty = True
589 # Bounded queue between producer (stream reader) and consumer (committer).
590 pending_chunks: asyncio.Queue[_PendingChunk | None] = asyncio.Queue(
591 maxsize=_PRODUCER_BACKLOG_SIZE
592 )
593 # Shadow deque mirroring pending_chunks for join-catchup backlog peeking.
594 pending_backlog: deque[_PendingChunk] = deque()
595 pending_duration_us = 0
596
597 async def _produce_pending_chunks() -> None:
598 nonlocal pending_duration_us
599 audio_source = self.player.mass.streams.get_stream(media, _PCM_FORMAT)
600 async for chunk in audio_source:
601 if not chunk:
602 continue
603 for slice_chunk in self._iter_pcm_slices(chunk, _PCM_FORMAT, _PRODUCER_SLICE_US):
604 if not slice_chunk:
605 continue
606 duration_us = self._duration_us(slice_chunk, _PCM_FORMAT)
607 if duration_us <= 0:
608 continue
609 await self._refresh_member_mappings()
610 pending = _PendingChunk(pcm=slice_chunk, duration_us=duration_us)
611 await pending_chunks.put(pending)
612 pending_backlog.append(pending)
613 pending_duration_us += duration_us
614 join_pending_ids, pipelines = await self._snapshot_active_pipelines()
615 transform_pipelines: list[_MemberPipeline] = []
616 for member_id, pipeline in pipelines:
617 if not pipeline.config.requires_transform:
618 continue
619 if member_id in join_pending_ids:
620 continue
621 transform_pipelines.append(pipeline)
622 results = await asyncio.gather(
623 *(
624 self._transform_member_chunk(pipeline, slice_chunk)
625 for pipeline in transform_pipelines
626 ),
627 return_exceptions=True,
628 )
629 for pipeline, result in zip(transform_pipelines, results, strict=True):
630 if isinstance(result, BaseException):
631 self.player.logger.warning(
632 "Transform push failed for channel %s: %s",
633 pipeline.channel_id,
634 result,
635 )
636
637 async def _commit_pending_chunks() -> None:
638 nonlocal pending_duration_us
639 while True:
640 pending = await pending_chunks.get()
641 if pending is None:
642 break
643 pending_backlog.popleft()
644 pending_duration_us = max(0, pending_duration_us - pending.duration_us)
645 await self._inject_ready_join_historical(push_stream, pending_backlog, pending.pcm)
646 push_stream.prepare_audio(
647 pending.pcm, _SENDSPIN_PCM_FORMAT, channel_id=MAIN_CHANNEL
648 )
649 join_pending_ids, pipelines = await self._snapshot_active_pipelines()
650 transform_pipelines: list[_MemberPipeline] = []
651 for member_id, pipeline in pipelines:
652 if not pipeline.config.requires_transform:
653 continue
654 if member_id in join_pending_ids:
655 continue
656 transform_pipelines.append(pipeline)
657 transformed_chunks = await asyncio.gather(
658 *(
659 self._read_member_chunk(pipeline, pending.duration_us)
660 for pipeline in transform_pipelines
661 ),
662 return_exceptions=True,
663 )
664 for pipeline, transformed_chunk in zip(
665 transform_pipelines, transformed_chunks, strict=True
666 ):
667 if isinstance(transformed_chunk, BaseException):
668 self.player.logger.warning(
669 "Transform read failed for channel %s: %s",
670 pipeline.channel_id,
671 transformed_chunk,
672 )
673 continue
674 if transformed_chunk is None:
675 continue
676 push_stream.prepare_audio(
677 transformed_chunk,
678 _SENDSPIN_PCM_FORMAT,
679 channel_id=pipeline.channel_id,
680 )
681 commit_start_us = await push_stream.commit_audio()
682 await push_stream.sleep_to_limit_buffer(_PRODUCER_BUFFER_LIMIT_US)
683 commit_now_us = int(time.monotonic_ns() / 1000)
684 committed_history_chunk = _HistoryChunk(
685 start_time_us=int(commit_start_us),
686 duration_us=pending.duration_us,
687 pcm=pending.pcm,
688 )
689 async with self._state_lock:
690 if self._timeline_start_us is None:
691 self._timeline_start_us = int(commit_start_us)
692 if self._first_commit_monotonic_us is None:
693 self._first_commit_monotonic_us = commit_now_us
694 self._history.append(committed_history_chunk)
695 self._produced_audio_us += pending.duration_us
696 self._prune_history_locked(commit_now_us)
697 await self._fanout_history_chunk_to_join_processors(committed_history_chunk)
698
699 commit_task = asyncio.create_task(_commit_pending_chunks())
700 self._attach_task_exception_logger(commit_task, "commit_pending_chunks")
701 producer_stopped_cleanly = False
702 try:
703 await _produce_pending_chunks()
704 producer_stopped_cleanly = True
705 finally:
706 if producer_stopped_cleanly and not commit_task.done():
707 # Producer finished normally; send a None sentinel so the
708 # consumer exits cleanly. The queue may be full, so retry
709 # with a deadline before falling back to cancellation.
710 sentinel_sent = False
711 deadline = time.monotonic() + 1.0
712 while not sentinel_sent and not commit_task.done():
713 try:
714 pending_chunks.put_nowait(None)
715 sentinel_sent = True
716 except asyncio.QueueFull:
717 if time.monotonic() >= deadline:
718 break
719 await asyncio.sleep(0.01)
720 if sentinel_sent:
721 with suppress(asyncio.CancelledError, Exception):
722 await commit_task
723 else:
724 commit_task.cancel()
725 with suppress(asyncio.CancelledError, Exception):
726 await commit_task
727 else:
728 commit_task.cancel()
729 with suppress(asyncio.CancelledError, Exception):
730 await commit_task
731 with suppress(Exception):
732 self._stop_push_stream()
733 await self._clear_join_catchup()
734 await self._clear_member_pipelines()
735 async with self._state_lock:
736 self._push_stream = None
737 self._playback_running = False
738 self._timeline_start_us = None
739 self._first_commit_monotonic_us = None
740 self._produced_audio_us = 0
741 self._history.clear()
742
743 # -- Join injection --------------------------------------------------------
744
745 async def _inject_ready_join_historical(
746 self,
747 push_stream: PushStream,
748 pending_backlog: deque[_PendingChunk],
749 current_pcm: bytes,
750 ) -> bool:
751 """Inject join-catchup historical audio once processor output reaches history end.
752
753 Join promotion lifecycle:
754 1. A catchup processor is fed historical PCM and new commits in parallel.
755 2. Once the processor's output lag falls within _JOIN_PROMOTE_ARM_WINDOW_US
756 of the history tail, promotion is "armed" and a target end timestamp is locked.
757 3. Once output reaches the target (within _JOIN_PROMOTE_TOLERANCE_US), the
758 catchup processor is promoted to the member's live DSP pipeline.
759 4. If promotion doesn't complete within _JOIN_PROMOTION_TIMEOUT_S, it's aborted.
760 """
761 injected_any = False
762 async with self._state_lock:
763 items = list(self._join_catchup.items())
764 for player_id, state in items:
765 produced_output_us = state.processor.produced_output_us
766 async with self._state_lock:
767 current = self._join_catchup.get(player_id)
768 if current is None or current.processor is not state.processor:
769 continue
770 first_history_start_us = current.first_history_start_us
771 fed_until_us = current.fed_until_us
772 history_end_us = current.history_end_us
773 promotion_target_end_us = current.promotion_target_end_us
774 promotion_armed_monotonic_s = current.promotion_armed_monotonic_s
775 if first_history_start_us is None or fed_until_us is None or history_end_us is None:
776 continue
777 max_ready_end_us = min(
778 fed_until_us,
779 first_history_start_us + max(0, produced_output_us),
780 )
781 if promotion_target_end_us is None:
782 lag_to_tail_us = history_end_us - max_ready_end_us
783 if lag_to_tail_us > _JOIN_PROMOTE_ARM_WINDOW_US:
784 continue
785 async with self._state_lock:
786 current = self._join_catchup.get(player_id)
787 if current is None or current.processor is not state.processor:
788 continue
789 if current.promotion_target_end_us is None:
790 current.promotion_target_end_us = history_end_us
791 current.promotion_armed_monotonic_s = time.monotonic()
792 promotion_target_end_us = current.promotion_target_end_us
793 promotion_armed_monotonic_s = current.promotion_armed_monotonic_s
794 target_end_us = promotion_target_end_us
795 if (
796 promotion_armed_monotonic_s is not None
797 and time.monotonic() - promotion_armed_monotonic_s > _JOIN_PROMOTION_TIMEOUT_S
798 ):
799 self.player.logger.error(
800 "Join promotion timed out for %s after %.1fs; dropping join catchup",
801 player_id,
802 _JOIN_PROMOTION_TIMEOUT_S,
803 )
804 await self._stop_join_catchup(player_id)
805 continue
806 if max_ready_end_us + _JOIN_PROMOTE_TOLERANCE_US < target_end_us:
807 continue
808 inject_duration_us = target_end_us - first_history_start_us
809 transformed_history = state.processor.pop_duration_us_or_pad(
810 inject_duration_us, _JOIN_PROMOTE_TOLERANCE_US
811 )
812 if transformed_history is None:
813 continue
814 pipeline = await self._sync_member_pipeline(player_id)
815 # Split the blob into slices so push_stream can yield between encodes.
816 frame_stride = (_SENDSPIN_PCM_FORMAT.bit_depth // 8) * _SENDSPIN_PCM_FORMAT.channels
817 slice_bytes = (
818 int(_SENDSPIN_PCM_FORMAT.sample_rate * _PRODUCER_SLICE_US / 1_000_000)
819 * frame_stride
820 )
821 for offset in range(0, len(transformed_history), slice_bytes):
822 push_stream.prepare_historical_audio(
823 transformed_history[offset : offset + slice_bytes],
824 _SENDSPIN_PCM_FORMAT,
825 channel_id=pipeline.channel_id,
826 start_time_us=first_history_start_us if offset == 0 else None,
827 )
828 await self._prefeed_pending_backlog_for_join(state, current_pcm, pending_backlog)
829 await self._promote_join_catchup_processor(player_id, pipeline, target_end_us)
830 injected_any = True
831 return injected_any
832
833 async def _prefeed_pending_backlog_for_join(
834 self,
835 state: _JoinCatchupState,
836 current_pcm: bytes,
837 pending_backlog: deque[_PendingChunk],
838 ) -> None:
839 """Push current chunk + queued pending chunks into join processor before promotion.
840
841 Between the last committed chunk and the next commit, there may be
842 chunks already queued by the producer that the catchup processor hasn't
843 seen yet. Feeding them now avoids a gap in transformed audio after
844 promotion.
845 """
846 await self._enqueue_join_pcm(state, current_pcm)
847 for item in list(pending_backlog):
848 await self._enqueue_join_pcm(state, item.pcm)
849
850 async def _fanout_history_chunk_to_join_processors(self, hist_chunk: _HistoryChunk) -> None:
851 """Feed newly committed history chunk into all active join-catchup processors."""
852 async with self._state_lock:
853 items = list(self._join_catchup.items())
854 for player_id, state in items:
855 async with state.write_lock:
856 # Read current state under lock.
857 async with self._state_lock:
858 current = self._join_catchup.get(player_id)
859 if current is None or current.processor is not state.processor:
860 continue
861 previous_end_us = current.fed_until_us
862 first_history_start_us = current.first_history_start_us
863 # Initialize first_history_start_us if this is the first chunk.
864 if first_history_start_us is None:
865 first_history_start_us = hist_chunk.start_time_us
866 previous_end_us = first_history_start_us
867 # Fill timeline gaps with silence.
868 if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us:
869 gap_us = hist_chunk.start_time_us - previous_end_us
870 silence = self._silence_for_duration_us(gap_us)
871 if silence:
872 await self._enqueue_join_pcm(state, silence)
873 await self._enqueue_join_pcm(state, hist_chunk.pcm)
874 # Write updated state back under lock.
875 new_end_us = hist_chunk.start_time_us + hist_chunk.duration_us
876 async with self._state_lock:
877 current = self._join_catchup.get(player_id)
878 if current is not None and current.processor is state.processor:
879 if current.first_history_start_us is None:
880 current.first_history_start_us = first_history_start_us
881 if current.fed_until_us is None:
882 current.fed_until_us = first_history_start_us
883 current.fed_until_us = new_end_us
884 current.history_end_us = new_end_us
885
886 async def _enqueue_join_pcm(
887 self,
888 state: _JoinCatchupState,
889 pcm: bytes,
890 ) -> None:
891 """Enqueue PCM into a joining member writer queue.
892
893 Bails out immediately if the writer task is dead to avoid blocking
894 the commit loop on a queue with no consumer.
895 """
896 if state.writer_task.done():
897 return
898 try:
899 state.input_queue.put_nowait(pcm)
900 except asyncio.QueueFull:
901 if state.writer_task.done():
902 return
903 await state.input_queue.put(pcm)
904
905 # -- Member pipeline management --------------------------------------------
906
907 async def _refresh_member_mappings(self) -> None:
908 """Re-evaluate per-member channel mapping and DSP requirements."""
909 async with self._state_lock:
910 if not self._mapping_dirty:
911 return
912 member_ids = tuple(self._members)
913 self._mapping_dirty = False
914 for member_id in member_ids:
915 await self._sync_member_pipeline(member_id)
916
917 async def _sync_member_pipeline(self, player_id: str) -> _MemberPipeline:
918 """Create/update pipeline state for one member from current MA config."""
919 config = self._get_pipeline_config_cached(player_id)
920 release_processor: _BufferedFfmpegProcessor | None = None
921 start_processor: _BufferedFfmpegProcessor | None = None
922 async with self._state_lock:
923 current = self._member_pipelines.get(player_id)
924 if current is not None and current.config.signature == config.signature:
925 return current
926 if current and current.config.requires_transform:
927 channel_id = current.channel_id if config.requires_transform else MAIN_CHANNEL
928 release_processor = current.processor
929 elif config.requires_transform:
930 channel_id = self._get_or_create_preassigned_channel(player_id)
931 else:
932 channel_id = MAIN_CHANNEL
933 self._preassigned_channels.pop(player_id, None)
934 processor: _BufferedFfmpegProcessor | None = None
935 if config.requires_transform:
936 ffmpeg_obj = self._create_member_ffmpeg(config.filter_params)
937 processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT)
938 start_processor = processor
939 pipeline = _MemberPipeline(
940 player_id=player_id,
941 channel_id=channel_id,
942 config=config,
943 processor=processor,
944 )
945 self._member_pipelines[player_id] = pipeline
946 if start_processor is not None:
947 try:
948 await start_processor.start()
949 except Exception as err:
950 async with self._state_lock:
951 if (
952 self._member_pipelines.get(player_id) is not None
953 and self._member_pipelines[player_id].processor is start_processor
954 ):
955 self._member_pipelines.pop(player_id, None)
956 with suppress(Exception):
957 await self._close_member_ffmpeg(start_processor)
958 raise RuntimeError(f"Failed to start member DSP ffmpeg for {player_id}") from err
959 if release_processor is not None:
960 await self._close_member_ffmpeg(release_processor)
961 return pipeline
962
963 def _get_pipeline_config_cached(
964 self,
965 player_id: str,
966 *,
967 force_refresh: bool = False,
968 ) -> _PipelineConfig:
969 """Return cached pipeline config for a player, calculating on cache miss."""
970 if not force_refresh and (cached := self._pipeline_config_cache.get(player_id)) is not None:
971 return cached
972 config = self._read_pipeline_config(player_id)
973 self._pipeline_config_cache[player_id] = config
974 return config
975
976 def _read_pipeline_config(self, player_id: str) -> _PipelineConfig:
977 """Read MA config and determine if member needs a dedicated DSP channel."""
978 dsp_config = self.player.mass.config.get_player_dsp_config(player_id)
979 dsp_enabled = bool(dsp_config.enabled)
980 raw_output_channels = self.player.mass.config.get_raw_player_config_value(
981 player_id,
982 CONF_OUTPUT_CHANNELS,
983 "stereo",
984 )
985 output_channels = str(raw_output_channels or "stereo").strip().lower()
986 if output_channels not in {"stereo", "left", "right", "mono"}:
987 output_channels = "stereo"
988 try:
989 filter_params = tuple(
990 get_player_filter_params(
991 self.player.mass,
992 player_id,
993 _PCM_FORMAT,
994 _PCM_FORMAT,
995 )
996 )
997 except Exception:
998 filter_params = ()
999 custom_filter_graph = any(
1000 param.strip() and not param.strip().startswith("alimiter=") for param in filter_params
1001 )
1002 requires_transform = dsp_enabled or output_channels != "stereo" or custom_filter_graph
1003 return _PipelineConfig(
1004 requires_transform=requires_transform,
1005 output_channels=output_channels,
1006 filter_params=filter_params,
1007 )
1008
1009 def _get_or_create_preassigned_channel(self, player_id: str) -> UUID:
1010 """Return stable dedicated channel id for transform-required player."""
1011 if (channel_id := self._preassigned_channels.get(player_id)) is not None:
1012 return channel_id
1013 channel_id = uuid4()
1014 self._preassigned_channels[player_id] = channel_id
1015 return channel_id
1016
1017 # -- FFmpeg lifecycle ------------------------------------------------------
1018
1019 def _create_member_ffmpeg(self, filter_params: tuple[str, ...]) -> FFMpeg:
1020 """Create per-member FFMpeg for DSP pipeline."""
1021 return FFMpeg(
1022 audio_input="-",
1023 input_format=_PCM_FORMAT,
1024 output_format=_PCM_FORMAT,
1025 filter_params=list(filter_params),
1026 )
1027
1028 async def _transform_member_chunk(self, pipeline: _MemberPipeline, chunk: bytes) -> None:
1029 """Push one PCM chunk into a member DSP pipeline."""
1030 processor = pipeline.processor
1031 if processor is None:
1032 return
1033 await processor.push(chunk)
1034
1035 async def _read_member_chunk(
1036 self,
1037 pipeline: _MemberPipeline,
1038 duration_us: int,
1039 ) -> bytes | None:
1040 """Read one transformed chunk from a member DSP pipeline."""
1041 processor = pipeline.processor
1042 if processor is None or duration_us <= 0:
1043 return b""
1044 transformed = await processor.read_duration_us(duration_us)
1045 if not transformed:
1046 return None
1047 pipeline.ready = True
1048 return bytes(transformed)
1049
1050 async def _close_member_ffmpeg(self, processor: _BufferedFfmpegProcessor) -> None:
1051 """Close an ffmpeg processor, suppressing errors."""
1052 with suppress(Exception):
1053 await processor.close()
1054
1055 async def _clear_member_pipelines(self) -> None:
1056 """Release all member pipeline resources."""
1057 async with self._state_lock:
1058 pipelines = list(self._member_pipelines.values())
1059 self._member_pipelines.clear()
1060 for pipeline in pipelines:
1061 if pipeline.processor is not None:
1062 await self._close_member_ffmpeg(pipeline.processor)
1063
1064 # -- Push stream -----------------------------------------------------------
1065
1066 def _create_push_stream(self) -> PushStream:
1067 """Create PushStream with channel resolver for per-member routing."""
1068 return self.player.api.group.start_stream(channel_resolver=self._resolve_channel_for_player)
1069
1070 def _stop_push_stream(self) -> None:
1071 """Stop the active PushStream."""
1072 self.player.api.group.stop_stream()
1073
1074 def _resolve_channel_for_player(self, player_id: str) -> UUID:
1075 """Channel resolver callback for per-player routing."""
1076 pipeline = self._member_pipelines.get(player_id)
1077 if pipeline is not None:
1078 return pipeline.channel_id
1079 # The leader always receives MAIN_CHANNEL audio directly from the
1080 # commit loop; only group members get per-player DSP channels.
1081 if player_id == self.player.player_id:
1082 return MAIN_CHANNEL
1083 # Force a fresh config read for pending/unknown joiners so the very
1084 # first resolution (triggered by add_client) uses up-to-date DSP settings.
1085 force = player_id not in self._members
1086 config = self._get_pipeline_config_cached(player_id, force_refresh=force)
1087 if not config.requires_transform:
1088 return MAIN_CHANNEL
1089 return self._get_or_create_preassigned_channel(player_id)
1090
1091 # -- History ---------------------------------------------------------------
1092
1093 def _prune_history_locked(self, now_monotonic_us: int) -> None:
1094 """Drop old history chunks that are fully in the past."""
1095 if self._timeline_start_us is None or self._first_commit_monotonic_us is None:
1096 return
1097 elapsed_real_us = max(0, now_monotonic_us - self._first_commit_monotonic_us)
1098 source_now_us = self._timeline_start_us + elapsed_real_us
1099 cutoff_us = source_now_us - _HISTORY_KEEP_PAST_US
1100 while self._history and (
1101 self._history[0].start_time_us + self._history[0].duration_us <= cutoff_us
1102 ):
1103 self._history.popleft()
1104
1105 # -- PCM utilities ---------------------------------------------------------
1106
1107 @staticmethod
1108 def _duration_us(audio: bytes, audio_format: AudioFormat) -> int:
1109 """Compute chunk duration from PCM payload size."""
1110 bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
1111 bytes_per_second = (
1112 int(audio_format.sample_rate) * bytes_per_sample * int(audio_format.channels)
1113 )
1114 if bytes_per_second <= 0:
1115 return 0
1116 return int((len(audio) / bytes_per_second) * 1_000_000)
1117
1118 @staticmethod
1119 def _iter_pcm_slices(
1120 audio: bytes, audio_format: AudioFormat, target_duration_us: int
1121 ) -> Iterator[bytes]:
1122 """Yield frame-aligned PCM slices up to target duration."""
1123 if not audio:
1124 return
1125 bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
1126 frame_size = bytes_per_sample * int(audio_format.channels)
1127 if frame_size <= 0:
1128 yield audio
1129 return
1130 samples_per_slice = max(
1131 1, round((target_duration_us / 1_000_000) * int(audio_format.sample_rate))
1132 )
1133 slice_size = max(frame_size, samples_per_slice * frame_size)
1134 offset = 0
1135 audio_len = len(audio)
1136 while offset < audio_len:
1137 end = min(audio_len, offset + slice_size)
1138 if end < audio_len:
1139 aligned_end = end - (end % frame_size)
1140 if aligned_end <= offset:
1141 aligned_end = min(audio_len, offset + frame_size)
1142 end = aligned_end
1143 yield audio[offset:end]
1144 offset = end
1145
1146 @staticmethod
1147 def _silence_for_duration_us(duration_us: int) -> bytes:
1148 """Generate silent PCM with frame-aligned duration for the default format."""
1149 if duration_us <= 0:
1150 return b""
1151 bytes_per_sample = max(1, int(_PCM_FORMAT.bit_depth // 8))
1152 frame_size = bytes_per_sample * int(_PCM_FORMAT.channels)
1153 samples = max(0, round((duration_us / 1_000_000) * int(_PCM_FORMAT.sample_rate)))
1154 return b"\x00" * (samples * frame_size)
1155