/
/
/
1"""Playback session coordinator for Sendspin players."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections import deque
8from collections.abc import Iterator
9from contextlib import suppress
10from dataclasses import dataclass, field
11from typing import TYPE_CHECKING, Any
12from uuid import UUID, uuid4
13
14from aiosendspin.server.audio import AudioFormat as SendspinAudioFormat
15from aiosendspin.server.push_stream import MAIN_CHANNEL, PushStream
16from music_assistant_models.enums import ContentType
17from music_assistant_models.media_items.audio_format import AudioFormat
18
19from music_assistant.constants import CONF_OUTPUT_CHANNELS
20from music_assistant.helpers.audio import get_player_filter_params
21from music_assistant.helpers.ffmpeg import FFMpeg
22from music_assistant.models.player import PlayerMedia
23
24if TYPE_CHECKING:
25 from .player import SendspinPlayer
26
27
28# Same sample format expressed in both MA and Sendspin type systems.
29_PCM_FORMAT = AudioFormat(
30 content_type=ContentType.PCM_S32LE,
31 sample_rate=48000,
32 bit_depth=32,
33 channels=2,
34)
35_SENDSPIN_PCM_FORMAT = SendspinAudioFormat(
36 sample_rate=48000,
37 bit_depth=32,
38 channels=2,
39)
40# Max PCM slice fed to the producer per iteration.
41_PRODUCER_SLICE_US = 100_000
42# Max pending chunks between producer and committer before the producer blocks.
43_PRODUCER_BACKLOG_SIZE = 64
44# Backpressure threshold: push stream sleeps when buffered audio exceeds this.
45_PRODUCER_BUFFER_LIMIT_US = 30_000_000
46# Start join promotion once catchup processor lag is within this window of the history tail.
47_JOIN_PROMOTE_ARM_WINDOW_US = 2_000_000
48# Accept catchup output within this margin of the promotion target.
49_JOIN_PROMOTE_TOLERANCE_US = 50_000
50# Abort join catchup if promotion hasn't completed within this.
51_JOIN_PROMOTION_TIMEOUT_S = 15.0
52# Retain committed history this far behind real-time for late-join backfill.
53# This pre-history also warms up ffmpeg's internal filter buffers so the DSP
54# output has settled by the time the member's channel goes live.
55_HISTORY_KEEP_PAST_US = 1_000_000
56
57
58class _BufferedFfmpegProcessor:
59 """FFmpeg wrapper with small output carry-over buffer and duration-based reads."""
60
61 def __init__(self, ffmpeg: FFMpeg, audio_format: AudioFormat) -> None:
62 self._ffmpeg = ffmpeg
63 self._output_buffer = bytearray()
64 bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
65 self._sample_rate = int(audio_format.sample_rate)
66 self._frame_size = bytes_per_sample * int(audio_format.channels)
67 self._bytes_per_second = self._sample_rate * self._frame_size
68 # ~25ms worth of audio per read syscall.
69 self._read_quantum_bytes = max(1, int(self._bytes_per_second * 0.025))
70 self._produced_output_us = 0
71
72 async def start(self) -> None:
73 await self._ffmpeg.start()
74
75 async def close(self) -> None:
76 await self._ffmpeg.close()
77
78 async def push(self, pcm: bytes) -> None:
79 await self._ffmpeg.write(pcm)
80
81 @property
82 def produced_output_us(self) -> int:
83 """Return cumulative output duration currently drained from ffmpeg."""
84 return self._produced_output_us
85
86 async def read_duration_us(self, duration_us: int) -> bytes:
87 """Block-read exactly `duration_us` worth of processed PCM from ffmpeg."""
88 target_bytes = self._target_bytes_for_duration_us(duration_us)
89 if target_bytes == 0:
90 return b""
91
92 while len(self._output_buffer) < target_bytes:
93 missing = target_bytes - len(self._output_buffer)
94 read_size = max(self._read_quantum_bytes, missing)
95 chunk = await self._ffmpeg.readexactly(read_size)
96 self._output_buffer.extend(chunk)
97
98 out = bytes(self._output_buffer[:target_bytes])
99 del self._output_buffer[:target_bytes]
100 return out
101
102 async def drain_available(self) -> int:
103 """Non-blocking drain of ffmpeg stdout into internal buffer.
104
105 Returns cumulative produced output duration in microseconds.
106 """
107 while True:
108 try:
109 # 1ms timeout: non-blocking check for available data.
110 chunk = await asyncio.wait_for(
111 self._ffmpeg.read(self._read_quantum_bytes),
112 timeout=0.001,
113 )
114 except TimeoutError:
115 break
116 if not chunk:
117 break
118 self._output_buffer.extend(chunk)
119 self._produced_output_us += self._duration_us_for_bytes(len(chunk))
120 if len(chunk) < self._read_quantum_bytes:
121 break
122 return self._produced_output_us
123
124 async def drain_forever(self) -> None:
125 """Continuously drain ffmpeg stdout into internal buffer until EOF."""
126 while True:
127 chunk = await self._ffmpeg.read(self._read_quantum_bytes)
128 if not chunk:
129 break
130 self._output_buffer.extend(chunk)
131 self._produced_output_us += self._duration_us_for_bytes(len(chunk))
132
133 def pop_duration_us(self, duration_us: int) -> bytes | None:
134 """Pop exactly `duration_us` from already buffered output, or None if insufficient."""
135 target_bytes = self._target_bytes_for_duration_us(duration_us)
136 if target_bytes == 0:
137 return b""
138 if len(self._output_buffer) < target_bytes:
139 return None
140 out = bytes(self._output_buffer[:target_bytes])
141 del self._output_buffer[:target_bytes]
142 return out
143
144 def buffered_duration_us(self) -> int:
145 """Return buffered output duration currently available for immediate pop."""
146 return self._duration_us_for_bytes(len(self._output_buffer))
147
148 def pop_duration_us_or_pad(self, duration_us: int, pad_tolerance_us: int) -> bytes | None:
149 """Pop target duration; if short within tolerance, pad tail with silence."""
150 target_bytes = self._target_bytes_for_duration_us(duration_us)
151 if target_bytes == 0:
152 return b""
153 available = len(self._output_buffer)
154 if available >= target_bytes:
155 out = bytes(self._output_buffer[:target_bytes])
156 del self._output_buffer[:target_bytes]
157 return out
158 short_bytes = target_bytes - available
159 short_us = self._duration_us_for_bytes(short_bytes)
160 if short_us > max(0, pad_tolerance_us):
161 return None
162 out = bytes(self._output_buffer)
163 self._output_buffer.clear()
164 return out + (b"\x00" * short_bytes)
165
166 def _duration_us_for_bytes(self, byte_count: int) -> int:
167 if byte_count <= 0 or self._sample_rate <= 0 or self._frame_size <= 0:
168 return 0
169 frames = byte_count // self._frame_size
170 if frames <= 0:
171 return 0
172 return int((frames * 1_000_000) / self._sample_rate)
173
174 def _target_bytes_for_duration_us(self, duration_us: int) -> int:
175 """Convert duration to frame-aligned PCM byte count."""
176 if duration_us <= 0 or self._sample_rate <= 0 or self._frame_size <= 0:
177 return 0
178 samples = max(0, int((duration_us * self._sample_rate + 500_000) / 1_000_000))
179 return samples * self._frame_size
180
181
182@dataclass(slots=True)
183class _HistoryChunk:
184 start_time_us: int
185 duration_us: int
186 pcm: bytes
187
188
189@dataclass(slots=True)
190class _PendingChunk:
191 pcm: bytes
192 duration_us: int
193
194
195@dataclass(slots=True)
196class _JoinCatchupState:
197 """Per-member state for a join-catchup processor replaying history through DSP.
198
199 The processor is fed historical + live PCM via ``input_queue``. Once its
200 output catches up to the live stream (within tolerance), it is promoted to
201 the member's live pipeline. See ``_inject_ready_join_historical`` for the
202 full promotion lifecycle.
203 """
204
205 processor: _BufferedFfmpegProcessor
206 input_queue: asyncio.Queue[bytes | None]
207 writer_task: asyncio.Task[None]
208 drainer_task: asyncio.Task[None]
209 snapshot_task: asyncio.Task[None] | None = None
210 # Timeline position of the first history chunk fed into the processor.
211 first_history_start_us: int | None = None
212 # Timeline position up to which PCM has been enqueued into the processor.
213 fed_until_us: int | None = None
214 # End of the history snapshot taken when catchup started.
215 history_end_us: int | None = None
216 # Locked target: once set, promotion fires when output reaches this point.
217 promotion_target_end_us: int | None = None
218 # Monotonic time when promotion was armed, used for timeout detection.
219 promotion_armed_monotonic_s: float | None = None
220 write_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
221
222
223@dataclass(slots=True)
224class _PipelineConfig:
225 requires_transform: bool
226 output_channels: str
227 filter_params: tuple[str, ...]
228
229 @property
230 def signature(self) -> tuple[bool, str, tuple[str, ...]]:
231 return (self.requires_transform, self.output_channels, self.filter_params)
232
233
234@dataclass(slots=True)
235class _MemberPipeline:
236 player_id: str
237 channel_id: UUID
238 config: _PipelineConfig
239 processor: _BufferedFfmpegProcessor | None = None
240 ready: bool = False
241
242
243class SendspinPlaybackSession:
244 """Coordinates playback for a Sendspin player group leader.
245
246 The push stream supports multi-channel audio: members that need per-player
247 DSP (EQ, channel mixing, output routing) each get a dedicated ffmpeg
248 processor and a separate channel. Members without DSP share MAIN_CHANNEL
249 and receive the raw PCM directly.
250
251 Playback runs as two concurrent coroutines inside ``_run_playback``:
252
253 * **Producer** -- reads PCM from the MA stream, slices it into fixed-size
254 chunks, queues them, and writes each slice into per-member ffmpeg
255 processors (transform push) in parallel.
256 * **Consumer** -- dequeues chunks, reads the corresponding transformed
257 output from each processor (transform read), prepares all channels on
258 the push stream, commits audio, and applies backpressure via
259 ``sleep_to_limit_buffer``.
260
261 When a new member joins mid-playback, a *join-catchup* processor replays
262 committed history through the member's DSP chain so it can be promoted
263 to the live pipeline without an audible gap.
264 """
265
266 def __init__(self, player: SendspinPlayer) -> None:
267 """Initialize session coordinator bound to the owning player."""
268 self.player = player
269 self.playback_task: asyncio.Task[None] | None = None
270 self.pending_join_members: set[str] = set()
271 self._state_lock = asyncio.Lock()
272 self._members: set[str] = set()
273 self._member_pipelines: dict[str, _MemberPipeline] = {}
274 self._push_stream: PushStream | None = None
275 self._playback_running = False
276 self._timeline_start_us: int | None = None
277 self._first_commit_monotonic_us: int | None = None
278 self._produced_audio_us = 0
279 self._history: deque[_HistoryChunk] = deque()
280 self._join_catchup: dict[str, _JoinCatchupState] = {}
281 self._pipeline_config_cache: dict[str, _PipelineConfig] = {}
282 self._preassigned_channels: dict[str, UUID] = {}
283 self._mapping_dirty = True
284
285 # -- Helpers ---------------------------------------------------------------
286
287 def _attach_task_exception_logger(self, task: asyncio.Task[Any], name: str) -> None:
288 """Log unhandled exception from background task when it finishes."""
289
290 def _done_callback(done_task: asyncio.Task[Any]) -> None:
291 if done_task.cancelled():
292 return
293 with suppress(Exception):
294 exc = done_task.exception()
295 if exc is not None:
296 self.player.logger.exception(
297 "Background task failed: %s",
298 name,
299 exc_info=exc,
300 )
301
302 task.add_done_callback(_done_callback)
303
304 def _get_join_readiness(self) -> tuple[bool, str | None]:
305 """Check whether live join DSP preparation can be performed right now."""
306 if self._playback_running and self._push_stream is not None:
307 return (True, None)
308 return (False, "no active stream context")
309
310 # -- Snapshot helper -------------------------------------------------------
311
312 async def _snapshot_active_pipelines(
313 self,
314 ) -> tuple[set[str], tuple[tuple[str, _MemberPipeline], ...]]:
315 """Return (join_pending_ids, active_pipelines) under lock."""
316 async with self._state_lock:
317 members = self._members
318 return set(self._join_catchup), tuple(
319 (mid, p) for mid, p in self._member_pipelines.items() if mid in members
320 )
321
322 # -- Public API ------------------------------------------------------------
323
324 async def cancel(self, reason: str) -> None:
325 """Cancel and await the active playback task, if any."""
326 task = self.playback_task
327 if task is None:
328 return
329 if task.done():
330 if self.playback_task is task:
331 self.playback_task = None
332 return
333 self.player.logger.debug("Cancelling playback task (%s)", reason)
334 task.cancel()
335 with suppress(asyncio.CancelledError, Exception):
336 await task
337 if self.playback_task is task:
338 self.playback_task = None
339
340 async def start(self, media: PlayerMedia, restart: bool = False) -> None:
341 """Start background playback for `media`."""
342 active_task = self.playback_task
343 if active_task is not None and not active_task.done():
344 if not restart:
345 raise RuntimeError("playback already active")
346 await self.cancel("restart requested")
347 self.playback_task = asyncio.create_task(self._run_playback(media))
348
349 async def close(self) -> None:
350 """Stop playback and release all managed resources."""
351 await self.cancel("session close")
352 self.pending_join_members.clear()
353 async with self._state_lock:
354 self._members.clear()
355 self._mapping_dirty = True
356 await self._clear_member_pipelines()
357 await self._clear_join_catchup()
358 async with self._state_lock:
359 self._history.clear()
360 self._produced_audio_us = 0
361 self._timeline_start_us = None
362 self._first_commit_monotonic_us = None
363 self._pipeline_config_cache.clear()
364 self._preassigned_channels.clear()
365
366 async def add_member(self, player_id: str) -> None:
367 """Add a member to the group with DSP-aware lifecycle handling."""
368 async with self._state_lock:
369 if player_id in self._members:
370 self.pending_join_members.discard(player_id)
371 return
372 # Force a fresh channel identity for every new join cycle.
373 self._preassigned_channels[player_id] = uuid4()
374 self.pending_join_members.add(player_id)
375 try:
376 await self._start_join_catchup(player_id)
377 async with self._state_lock:
378 self._members.add(player_id)
379 self._mapping_dirty = True
380 except Exception:
381 await self._release_player_channel(player_id)
382 raise
383 finally:
384 self.pending_join_members.discard(player_id)
385
386 async def remove_member(self, player_id: str) -> None:
387 """Remove a member from the group and clean up per-member playback state."""
388 self.pending_join_members.discard(player_id)
389 async with self._state_lock:
390 self._members.discard(player_id)
391 self._mapping_dirty = True
392 self._pipeline_config_cache.pop(player_id, None)
393 self._preassigned_channels.pop(player_id, None)
394 await self._stop_join_catchup(player_id)
395 await self._release_player_channel(player_id)
396
397 async def sync_members(self, member_ids: set[str]) -> None:
398 """Reconcile session members to exactly the provided set."""
399 async with self._state_lock:
400 current_members = set(self._members)
401 for player_id in current_members - member_ids:
402 await self.remove_member(player_id)
403 for player_id in member_ids - current_members:
404 await self.add_member(player_id)
405
406 # -- Join catchup ----------------------------------------------------------
407
408 async def _start_join_catchup(self, player_id: str) -> None:
409 """Start dedicated join catchup processor fed from committed history."""
410 async with self._state_lock:
411 playback_active = self._playback_running and self._push_stream is not None
412 if not playback_active:
413 return
414
415 pipeline = await self._sync_member_pipeline(player_id)
416 if not pipeline.config.requires_transform:
417 return
418
419 await self._stop_join_catchup(player_id)
420
421 ffmpeg_obj = self._create_member_ffmpeg(pipeline.config.filter_params)
422 processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT)
423 await processor.start()
424 # Bounded queue sized to hold the full buffer duration with some headroom.
425 queue_size = (_PRODUCER_BUFFER_LIMIT_US // _PRODUCER_SLICE_US) + _PRODUCER_BACKLOG_SIZE
426 input_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=queue_size)
427
428 async with self._state_lock:
429 history_snapshot = list(self._history)
430 if not history_snapshot:
431 await processor.close()
432 return
433 history_end_us = history_snapshot[-1].start_time_us + history_snapshot[-1].duration_us
434
435 async def _writer() -> None:
436 while True:
437 chunk = await input_queue.get()
438 if chunk is None:
439 return
440 await processor.push(chunk)
441
442 async def _drainer() -> None:
443 await processor.drain_forever()
444
445 writer_task = asyncio.create_task(_writer())
446 drainer_task = asyncio.create_task(_drainer())
447 self._attach_task_exception_logger(writer_task, f"join_writer:{player_id}")
448 self._attach_task_exception_logger(drainer_task, f"join_drainer:{player_id}")
449
450 state = _JoinCatchupState(
451 processor=processor,
452 input_queue=input_queue,
453 writer_task=writer_task,
454 drainer_task=drainer_task,
455 history_end_us=history_end_us,
456 )
457 async with self._state_lock:
458 self._join_catchup[player_id] = state
459
460 async with self._state_lock:
461 current = self._join_catchup.get(player_id)
462 if current is not None and current.processor is processor:
463 current.snapshot_task = asyncio.create_task(
464 self._feed_join_history(player_id, processor, history_snapshot)
465 )
466 self._attach_task_exception_logger(
467 current.snapshot_task, f"join_snapshot:{player_id}"
468 )
469
470 async def _feed_join_history(
471 self,
472 player_id: str,
473 processor: _BufferedFfmpegProcessor,
474 history_snapshot: list[_HistoryChunk],
475 ) -> None:
476 """Feed historical PCM into a join-catchup processor."""
477 async with self._state_lock:
478 state = self._join_catchup.get(player_id)
479 if state is None or state.processor is not processor:
480 return
481 async with state.write_lock:
482 first_history_start_us: int | None = None
483 previous_end_us: int | None = None
484 for hist_chunk in history_snapshot:
485 if first_history_start_us is None:
486 first_history_start_us = hist_chunk.start_time_us
487 async with self._state_lock:
488 current = self._join_catchup.get(player_id)
489 if current is not None and current.processor is processor:
490 current.first_history_start_us = first_history_start_us
491 current.fed_until_us = first_history_start_us
492 if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us:
493 gap_us = hist_chunk.start_time_us - previous_end_us
494 silence = self._silence_for_duration_us(gap_us)
495 if silence:
496 await self._enqueue_join_pcm(state, silence)
497 await self._enqueue_join_pcm(state, hist_chunk.pcm)
498 previous_end_us = hist_chunk.start_time_us + hist_chunk.duration_us
499 async with self._state_lock:
500 current = self._join_catchup.get(player_id)
501 if current is not None and current.processor is processor:
502 current.fed_until_us = previous_end_us
503
504 async def _stop_join_catchup(self, player_id: str) -> None:
505 """Stop and remove dedicated join catchup processor for one player."""
506 async with self._state_lock:
507 state = self._join_catchup.pop(player_id, None)
508 if state is None:
509 return
510 if state.snapshot_task is not None:
511 state.snapshot_task.cancel()
512 with suppress(asyncio.CancelledError, Exception):
513 await state.snapshot_task
514 state.writer_task.cancel()
515 with suppress(asyncio.CancelledError, Exception):
516 await state.writer_task
517 state.drainer_task.cancel()
518 with suppress(asyncio.CancelledError, Exception):
519 await state.drainer_task
520 with suppress(Exception):
521 await state.processor.close()
522
523 async def _promote_join_catchup_processor(
524 self,
525 player_id: str,
526 pipeline: _MemberPipeline,
527 target_end_us: int,
528 ) -> None:
529 """Promote join catchup processor to the member's live DSP processor."""
530 old_processor: _BufferedFfmpegProcessor | None = None
531 async with self._state_lock:
532 state = self._join_catchup.pop(player_id, None)
533 if state is None:
534 return
535 old_processor = pipeline.processor
536 pipeline.processor = state.processor
537 if state.snapshot_task is not None:
538 state.snapshot_task.cancel()
539 with suppress(asyncio.CancelledError, Exception):
540 await state.snapshot_task
541 # Let writer flush queued PCM before handoff; cancel if queue is full.
542 try:
543 state.input_queue.put_nowait(None)
544 except asyncio.QueueFull:
545 state.writer_task.cancel()
546 with suppress(asyncio.CancelledError, Exception):
547 await state.writer_task
548 state.drainer_task.cancel()
549 with suppress(asyncio.CancelledError, Exception):
550 await state.drainer_task
551 if old_processor is not None and old_processor is not state.processor:
552 with suppress(Exception):
553 await old_processor.close()
554
555 async def _clear_join_catchup(self) -> None:
556 """Stop and remove all dedicated join catchup processors."""
557 async with self._state_lock:
558 player_ids = list(self._join_catchup.keys())
559 for player_id in player_ids:
560 await self._stop_join_catchup(player_id)
561
562 async def _release_player_channel(self, player_id: str) -> None:
563 """Release per-member channel/DSP state for a removed member."""
564 async with self._state_lock:
565 pipeline = self._member_pipelines.pop(player_id, None)
566 self._preassigned_channels.pop(player_id, None)
567 if pipeline is None or pipeline.processor is None:
568 return
569 await self._close_member_ffmpeg(pipeline.processor)
570
571 # -- Playback pipeline -----------------------------------------------------
572
573 async def _run_playback(self, media: PlayerMedia) -> None: # noqa: PLR0915
574 """Run the playback pipeline for a single media session.
575
576 Pulls PCM from the MA stream, feeds main + per-member DSP channels into the
577 Sendspin push stream, and commits audio continuously. Supports dynamic group
578 membership changes and late-join historical backfill while running.
579 """
580 push_stream = self._create_push_stream()
581 async with self._state_lock:
582 self._push_stream = push_stream
583 self._playback_running = True
584 self._history.clear()
585 self._produced_audio_us = 0
586 self._timeline_start_us = None
587 self._first_commit_monotonic_us = None
588 self._mapping_dirty = True
589 # Bounded queue between producer (stream reader) and consumer (committer).
590 pending_chunks: asyncio.Queue[_PendingChunk | None] = asyncio.Queue(
591 maxsize=_PRODUCER_BACKLOG_SIZE
592 )
593 # Shadow deque mirroring pending_chunks for join-catchup backlog peeking.
594 pending_backlog: deque[_PendingChunk] = deque()
595 pending_duration_us = 0
596
597 async def _produce_pending_chunks() -> None:
598 nonlocal pending_duration_us
599 audio_source = self.player.mass.streams.get_stream(
600 media, _PCM_FORMAT, self.player.player_id
601 )
602 async for chunk in audio_source:
603 if not chunk:
604 continue
605 for slice_chunk in self._iter_pcm_slices(chunk, _PCM_FORMAT, _PRODUCER_SLICE_US):
606 if not slice_chunk:
607 continue
608 duration_us = self._duration_us(slice_chunk, _PCM_FORMAT)
609 if duration_us <= 0:
610 continue
611 await self._refresh_member_mappings()
612 pending = _PendingChunk(pcm=slice_chunk, duration_us=duration_us)
613 await pending_chunks.put(pending)
614 pending_backlog.append(pending)
615 pending_duration_us += duration_us
616 join_pending_ids, pipelines = await self._snapshot_active_pipelines()
617 transform_pipelines: list[_MemberPipeline] = []
618 for member_id, pipeline in pipelines:
619 if not pipeline.config.requires_transform:
620 continue
621 if member_id in join_pending_ids:
622 continue
623 transform_pipelines.append(pipeline)
624 results = await asyncio.gather(
625 *(
626 self._transform_member_chunk(pipeline, slice_chunk)
627 for pipeline in transform_pipelines
628 ),
629 return_exceptions=True,
630 )
631 for pipeline, result in zip(transform_pipelines, results, strict=True):
632 if isinstance(result, BaseException):
633 self.player.logger.warning(
634 "Transform push failed for channel %s: %s",
635 pipeline.channel_id,
636 result,
637 )
638
639 async def _commit_pending_chunks() -> None:
640 nonlocal pending_duration_us
641 while True:
642 pending = await pending_chunks.get()
643 if pending is None:
644 break
645 pending_backlog.popleft()
646 pending_duration_us = max(0, pending_duration_us - pending.duration_us)
647 await self._inject_ready_join_historical(push_stream, pending_backlog, pending.pcm)
648 push_stream.prepare_audio(
649 pending.pcm, _SENDSPIN_PCM_FORMAT, channel_id=MAIN_CHANNEL
650 )
651 join_pending_ids, pipelines = await self._snapshot_active_pipelines()
652 transform_pipelines: list[_MemberPipeline] = []
653 for member_id, pipeline in pipelines:
654 if not pipeline.config.requires_transform:
655 continue
656 if member_id in join_pending_ids:
657 continue
658 transform_pipelines.append(pipeline)
659 transformed_chunks = await asyncio.gather(
660 *(
661 self._read_member_chunk(pipeline, pending.duration_us)
662 for pipeline in transform_pipelines
663 ),
664 return_exceptions=True,
665 )
666 for pipeline, transformed_chunk in zip(
667 transform_pipelines, transformed_chunks, strict=True
668 ):
669 if isinstance(transformed_chunk, BaseException):
670 self.player.logger.warning(
671 "Transform read failed for channel %s: %s",
672 pipeline.channel_id,
673 transformed_chunk,
674 )
675 continue
676 if transformed_chunk is None:
677 continue
678 push_stream.prepare_audio(
679 transformed_chunk,
680 _SENDSPIN_PCM_FORMAT,
681 channel_id=pipeline.channel_id,
682 )
683 commit_start_us = await push_stream.commit_audio()
684 await push_stream.sleep_to_limit_buffer(_PRODUCER_BUFFER_LIMIT_US)
685 commit_now_us = int(time.monotonic_ns() / 1000)
686 committed_history_chunk = _HistoryChunk(
687 start_time_us=int(commit_start_us),
688 duration_us=pending.duration_us,
689 pcm=pending.pcm,
690 )
691 async with self._state_lock:
692 if self._timeline_start_us is None:
693 self._timeline_start_us = int(commit_start_us)
694 if self._first_commit_monotonic_us is None:
695 self._first_commit_monotonic_us = commit_now_us
696 self._history.append(committed_history_chunk)
697 self._produced_audio_us += pending.duration_us
698 self._prune_history_locked(commit_now_us)
699 await self._fanout_history_chunk_to_join_processors(committed_history_chunk)
700
701 commit_task = asyncio.create_task(_commit_pending_chunks())
702 self._attach_task_exception_logger(commit_task, "commit_pending_chunks")
703 producer_stopped_cleanly = False
704 try:
705 await _produce_pending_chunks()
706 producer_stopped_cleanly = True
707 finally:
708 if producer_stopped_cleanly and not commit_task.done():
709 # Producer finished normally; send a None sentinel so the
710 # consumer exits cleanly. The queue may be full, so retry
711 # with a deadline before falling back to cancellation.
712 sentinel_sent = False
713 deadline = time.monotonic() + 1.0
714 while not sentinel_sent and not commit_task.done():
715 try:
716 pending_chunks.put_nowait(None)
717 sentinel_sent = True
718 except asyncio.QueueFull:
719 if time.monotonic() >= deadline:
720 break
721 await asyncio.sleep(0.01)
722 if sentinel_sent:
723 with suppress(asyncio.CancelledError, Exception):
724 await commit_task
725 else:
726 commit_task.cancel()
727 with suppress(asyncio.CancelledError, Exception):
728 await commit_task
729 else:
730 commit_task.cancel()
731 with suppress(asyncio.CancelledError, Exception):
732 await commit_task
733 with suppress(Exception):
734 self._stop_push_stream()
735 await self._clear_join_catchup()
736 await self._clear_member_pipelines()
737 async with self._state_lock:
738 self._push_stream = None
739 self._playback_running = False
740 self._timeline_start_us = None
741 self._first_commit_monotonic_us = None
742 self._produced_audio_us = 0
743 self._history.clear()
744
745 # -- Join injection --------------------------------------------------------
746
747 async def _inject_ready_join_historical(
748 self,
749 push_stream: PushStream,
750 pending_backlog: deque[_PendingChunk],
751 current_pcm: bytes,
752 ) -> bool:
753 """Inject join-catchup historical audio once processor output reaches history end.
754
755 Join promotion lifecycle:
756 1. A catchup processor is fed historical PCM and new commits in parallel.
757 2. Once the processor's output lag falls within _JOIN_PROMOTE_ARM_WINDOW_US
758 of the history tail, promotion is "armed" and a target end timestamp is locked.
759 3. Once output reaches the target (within _JOIN_PROMOTE_TOLERANCE_US), the
760 catchup processor is promoted to the member's live DSP pipeline.
761 4. If promotion doesn't complete within _JOIN_PROMOTION_TIMEOUT_S, it's aborted.
762 """
763 injected_any = False
764 async with self._state_lock:
765 items = list(self._join_catchup.items())
766 for player_id, state in items:
767 produced_output_us = state.processor.produced_output_us
768 async with self._state_lock:
769 current = self._join_catchup.get(player_id)
770 if current is None or current.processor is not state.processor:
771 continue
772 first_history_start_us = current.first_history_start_us
773 fed_until_us = current.fed_until_us
774 history_end_us = current.history_end_us
775 promotion_target_end_us = current.promotion_target_end_us
776 promotion_armed_monotonic_s = current.promotion_armed_monotonic_s
777 if first_history_start_us is None or fed_until_us is None or history_end_us is None:
778 continue
779 max_ready_end_us = min(
780 fed_until_us,
781 first_history_start_us + max(0, produced_output_us),
782 )
783 if promotion_target_end_us is None:
784 lag_to_tail_us = history_end_us - max_ready_end_us
785 if lag_to_tail_us > _JOIN_PROMOTE_ARM_WINDOW_US:
786 continue
787 async with self._state_lock:
788 current = self._join_catchup.get(player_id)
789 if current is None or current.processor is not state.processor:
790 continue
791 if current.promotion_target_end_us is None:
792 current.promotion_target_end_us = history_end_us
793 current.promotion_armed_monotonic_s = time.monotonic()
794 promotion_target_end_us = current.promotion_target_end_us
795 promotion_armed_monotonic_s = current.promotion_armed_monotonic_s
796 target_end_us = promotion_target_end_us
797 if (
798 promotion_armed_monotonic_s is not None
799 and time.monotonic() - promotion_armed_monotonic_s > _JOIN_PROMOTION_TIMEOUT_S
800 ):
801 self.player.logger.error(
802 "Join promotion timed out for %s after %.1fs; dropping join catchup",
803 player_id,
804 _JOIN_PROMOTION_TIMEOUT_S,
805 )
806 await self._stop_join_catchup(player_id)
807 continue
808 if max_ready_end_us + _JOIN_PROMOTE_TOLERANCE_US < target_end_us:
809 continue
810 inject_duration_us = target_end_us - first_history_start_us
811 transformed_history = state.processor.pop_duration_us_or_pad(
812 inject_duration_us, _JOIN_PROMOTE_TOLERANCE_US
813 )
814 if transformed_history is None:
815 continue
816 pipeline = await self._sync_member_pipeline(player_id)
817 # Split the blob into slices so push_stream can yield between encodes.
818 frame_stride = (_SENDSPIN_PCM_FORMAT.bit_depth // 8) * _SENDSPIN_PCM_FORMAT.channels
819 slice_bytes = (
820 int(_SENDSPIN_PCM_FORMAT.sample_rate * _PRODUCER_SLICE_US / 1_000_000)
821 * frame_stride
822 )
823 for offset in range(0, len(transformed_history), slice_bytes):
824 push_stream.prepare_historical_audio(
825 transformed_history[offset : offset + slice_bytes],
826 _SENDSPIN_PCM_FORMAT,
827 channel_id=pipeline.channel_id,
828 start_time_us=first_history_start_us if offset == 0 else None,
829 )
830 await self._prefeed_pending_backlog_for_join(state, current_pcm, pending_backlog)
831 await self._promote_join_catchup_processor(player_id, pipeline, target_end_us)
832 injected_any = True
833 return injected_any
834
835 async def _prefeed_pending_backlog_for_join(
836 self,
837 state: _JoinCatchupState,
838 current_pcm: bytes,
839 pending_backlog: deque[_PendingChunk],
840 ) -> None:
841 """Push current chunk + queued pending chunks into join processor before promotion.
842
843 Between the last committed chunk and the next commit, there may be
844 chunks already queued by the producer that the catchup processor hasn't
845 seen yet. Feeding them now avoids a gap in transformed audio after
846 promotion.
847 """
848 await self._enqueue_join_pcm(state, current_pcm)
849 for item in list(pending_backlog):
850 await self._enqueue_join_pcm(state, item.pcm)
851
852 async def _fanout_history_chunk_to_join_processors(self, hist_chunk: _HistoryChunk) -> None:
853 """Feed newly committed history chunk into all active join-catchup processors."""
854 async with self._state_lock:
855 items = list(self._join_catchup.items())
856 for player_id, state in items:
857 async with state.write_lock:
858 # Read current state under lock.
859 async with self._state_lock:
860 current = self._join_catchup.get(player_id)
861 if current is None or current.processor is not state.processor:
862 continue
863 previous_end_us = current.fed_until_us
864 first_history_start_us = current.first_history_start_us
865 # Initialize first_history_start_us if this is the first chunk.
866 if first_history_start_us is None:
867 first_history_start_us = hist_chunk.start_time_us
868 previous_end_us = first_history_start_us
869 # Fill timeline gaps with silence.
870 if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us:
871 gap_us = hist_chunk.start_time_us - previous_end_us
872 silence = self._silence_for_duration_us(gap_us)
873 if silence:
874 await self._enqueue_join_pcm(state, silence)
875 await self._enqueue_join_pcm(state, hist_chunk.pcm)
876 # Write updated state back under lock.
877 new_end_us = hist_chunk.start_time_us + hist_chunk.duration_us
878 async with self._state_lock:
879 current = self._join_catchup.get(player_id)
880 if current is not None and current.processor is state.processor:
881 if current.first_history_start_us is None:
882 current.first_history_start_us = first_history_start_us
883 if current.fed_until_us is None:
884 current.fed_until_us = first_history_start_us
885 current.fed_until_us = new_end_us
886 current.history_end_us = new_end_us
887
888 async def _enqueue_join_pcm(
889 self,
890 state: _JoinCatchupState,
891 pcm: bytes,
892 ) -> None:
893 """Enqueue PCM into a joining member writer queue.
894
895 Bails out immediately if the writer task is dead to avoid blocking
896 the commit loop on a queue with no consumer.
897 """
898 if state.writer_task.done():
899 return
900 try:
901 state.input_queue.put_nowait(pcm)
902 except asyncio.QueueFull:
903 if state.writer_task.done():
904 return
905 await state.input_queue.put(pcm)
906
907 # -- Member pipeline management --------------------------------------------
908
909 async def _refresh_member_mappings(self) -> None:
910 """Re-evaluate per-member channel mapping and DSP requirements."""
911 async with self._state_lock:
912 if not self._mapping_dirty:
913 return
914 member_ids = tuple(self._members)
915 self._mapping_dirty = False
916 for member_id in member_ids:
917 await self._sync_member_pipeline(member_id)
918
919 async def _sync_member_pipeline(self, player_id: str) -> _MemberPipeline:
920 """Create/update pipeline state for one member from current MA config."""
921 config = self._get_pipeline_config_cached(player_id)
922 release_processor: _BufferedFfmpegProcessor | None = None
923 start_processor: _BufferedFfmpegProcessor | None = None
924 async with self._state_lock:
925 current = self._member_pipelines.get(player_id)
926 if current is not None and current.config.signature == config.signature:
927 return current
928 if current and current.config.requires_transform:
929 channel_id = current.channel_id if config.requires_transform else MAIN_CHANNEL
930 release_processor = current.processor
931 elif config.requires_transform:
932 channel_id = self._get_or_create_preassigned_channel(player_id)
933 else:
934 channel_id = MAIN_CHANNEL
935 self._preassigned_channels.pop(player_id, None)
936 processor: _BufferedFfmpegProcessor | None = None
937 if config.requires_transform:
938 ffmpeg_obj = self._create_member_ffmpeg(config.filter_params)
939 processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT)
940 start_processor = processor
941 pipeline = _MemberPipeline(
942 player_id=player_id,
943 channel_id=channel_id,
944 config=config,
945 processor=processor,
946 )
947 self._member_pipelines[player_id] = pipeline
948 if start_processor is not None:
949 try:
950 await start_processor.start()
951 except Exception as err:
952 async with self._state_lock:
953 if (
954 self._member_pipelines.get(player_id) is not None
955 and self._member_pipelines[player_id].processor is start_processor
956 ):
957 self._member_pipelines.pop(player_id, None)
958 with suppress(Exception):
959 await self._close_member_ffmpeg(start_processor)
960 raise RuntimeError(f"Failed to start member DSP ffmpeg for {player_id}") from err
961 if release_processor is not None:
962 await self._close_member_ffmpeg(release_processor)
963 return pipeline
964
965 def _get_pipeline_config_cached(
966 self,
967 player_id: str,
968 *,
969 force_refresh: bool = False,
970 ) -> _PipelineConfig:
971 """Return cached pipeline config for a player, calculating on cache miss."""
972 if not force_refresh and (cached := self._pipeline_config_cache.get(player_id)) is not None:
973 return cached
974 config = self._read_pipeline_config(player_id)
975 self._pipeline_config_cache[player_id] = config
976 return config
977
978 def _read_pipeline_config(self, player_id: str) -> _PipelineConfig:
979 """Read MA config and determine if member needs a dedicated DSP channel."""
980 dsp_config = self.player.mass.config.get_player_dsp_config(player_id)
981 dsp_enabled = bool(dsp_config.enabled)
982 raw_output_channels = self.player.mass.config.get_raw_player_config_value(
983 player_id,
984 CONF_OUTPUT_CHANNELS,
985 "stereo",
986 )
987 output_channels = str(raw_output_channels or "stereo").strip().lower()
988 if output_channels not in {"stereo", "left", "right", "mono"}:
989 output_channels = "stereo"
990 try:
991 filter_params = tuple(
992 get_player_filter_params(
993 self.player.mass,
994 player_id,
995 _PCM_FORMAT,
996 _PCM_FORMAT,
997 )
998 )
999 except Exception:
1000 filter_params = ()
1001 custom_filter_graph = any(
1002 param.strip() and not param.strip().startswith("alimiter=") for param in filter_params
1003 )
1004 requires_transform = dsp_enabled or output_channels != "stereo" or custom_filter_graph
1005 return _PipelineConfig(
1006 requires_transform=requires_transform,
1007 output_channels=output_channels,
1008 filter_params=filter_params,
1009 )
1010
1011 def _get_or_create_preassigned_channel(self, player_id: str) -> UUID:
1012 """Return stable dedicated channel id for transform-required player."""
1013 if (channel_id := self._preassigned_channels.get(player_id)) is not None:
1014 return channel_id
1015 channel_id = uuid4()
1016 self._preassigned_channels[player_id] = channel_id
1017 return channel_id
1018
1019 # -- FFmpeg lifecycle ------------------------------------------------------
1020
1021 def _create_member_ffmpeg(self, filter_params: tuple[str, ...]) -> FFMpeg:
1022 """Create per-member FFMpeg for DSP pipeline."""
1023 return FFMpeg(
1024 audio_input="-",
1025 input_format=_PCM_FORMAT,
1026 output_format=_PCM_FORMAT,
1027 filter_params=list(filter_params),
1028 )
1029
1030 async def _transform_member_chunk(self, pipeline: _MemberPipeline, chunk: bytes) -> None:
1031 """Push one PCM chunk into a member DSP pipeline."""
1032 processor = pipeline.processor
1033 if processor is None:
1034 return
1035 await processor.push(chunk)
1036
1037 async def _read_member_chunk(
1038 self,
1039 pipeline: _MemberPipeline,
1040 duration_us: int,
1041 ) -> bytes | None:
1042 """Read one transformed chunk from a member DSP pipeline."""
1043 processor = pipeline.processor
1044 if processor is None or duration_us <= 0:
1045 return b""
1046 transformed = await processor.read_duration_us(duration_us)
1047 if not transformed:
1048 return None
1049 pipeline.ready = True
1050 return bytes(transformed)
1051
1052 async def _close_member_ffmpeg(self, processor: _BufferedFfmpegProcessor) -> None:
1053 """Close an ffmpeg processor, suppressing errors."""
1054 with suppress(Exception):
1055 await processor.close()
1056
1057 async def _clear_member_pipelines(self) -> None:
1058 """Release all member pipeline resources."""
1059 async with self._state_lock:
1060 pipelines = list(self._member_pipelines.values())
1061 self._member_pipelines.clear()
1062 for pipeline in pipelines:
1063 if pipeline.processor is not None:
1064 await self._close_member_ffmpeg(pipeline.processor)
1065
1066 # -- Push stream -----------------------------------------------------------
1067
1068 def _create_push_stream(self) -> PushStream:
1069 """Create PushStream with channel resolver for per-member routing."""
1070 return self.player.api.group.start_stream(channel_resolver=self._resolve_channel_for_player)
1071
1072 def _stop_push_stream(self) -> None:
1073 """Stop the active PushStream."""
1074 self.player.api.group.stop_stream()
1075
1076 def _resolve_channel_for_player(self, player_id: str) -> UUID:
1077 """Channel resolver callback for per-player routing."""
1078 pipeline = self._member_pipelines.get(player_id)
1079 if pipeline is not None:
1080 return pipeline.channel_id
1081 # The leader always receives MAIN_CHANNEL audio directly from the
1082 # commit loop; only group members get per-player DSP channels.
1083 if player_id == self.player.player_id:
1084 return MAIN_CHANNEL
1085 # Force a fresh config read for pending/unknown joiners so the very
1086 # first resolution (triggered by add_client) uses up-to-date DSP settings.
1087 force = player_id not in self._members
1088 config = self._get_pipeline_config_cached(player_id, force_refresh=force)
1089 if not config.requires_transform:
1090 return MAIN_CHANNEL
1091 return self._get_or_create_preassigned_channel(player_id)
1092
1093 # -- History ---------------------------------------------------------------
1094
1095 def _prune_history_locked(self, now_monotonic_us: int) -> None:
1096 """Drop old history chunks that are fully in the past."""
1097 if self._timeline_start_us is None or self._first_commit_monotonic_us is None:
1098 return
1099 elapsed_real_us = max(0, now_monotonic_us - self._first_commit_monotonic_us)
1100 source_now_us = self._timeline_start_us + elapsed_real_us
1101 cutoff_us = source_now_us - _HISTORY_KEEP_PAST_US
1102 while self._history and (
1103 self._history[0].start_time_us + self._history[0].duration_us <= cutoff_us
1104 ):
1105 self._history.popleft()
1106
1107 # -- PCM utilities ---------------------------------------------------------
1108
1109 @staticmethod
1110 def _duration_us(audio: bytes, audio_format: AudioFormat) -> int:
1111 """Compute chunk duration from PCM payload size."""
1112 bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
1113 bytes_per_second = (
1114 int(audio_format.sample_rate) * bytes_per_sample * int(audio_format.channels)
1115 )
1116 if bytes_per_second <= 0:
1117 return 0
1118 return int((len(audio) / bytes_per_second) * 1_000_000)
1119
1120 @staticmethod
1121 def _iter_pcm_slices(
1122 audio: bytes, audio_format: AudioFormat, target_duration_us: int
1123 ) -> Iterator[bytes]:
1124 """Yield frame-aligned PCM slices up to target duration."""
1125 if not audio:
1126 return
1127 bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
1128 frame_size = bytes_per_sample * int(audio_format.channels)
1129 if frame_size <= 0:
1130 yield audio
1131 return
1132 samples_per_slice = max(
1133 1, round((target_duration_us / 1_000_000) * int(audio_format.sample_rate))
1134 )
1135 slice_size = max(frame_size, samples_per_slice * frame_size)
1136 offset = 0
1137 audio_len = len(audio)
1138 while offset < audio_len:
1139 end = min(audio_len, offset + slice_size)
1140 if end < audio_len:
1141 aligned_end = end - (end % frame_size)
1142 if aligned_end <= offset:
1143 aligned_end = min(audio_len, offset + frame_size)
1144 end = aligned_end
1145 yield audio[offset:end]
1146 offset = end
1147
1148 @staticmethod
1149 def _silence_for_duration_us(duration_us: int) -> bytes:
1150 """Generate silent PCM with frame-aligned duration for the default format."""
1151 if duration_us <= 0:
1152 return b""
1153 bytes_per_sample = max(1, int(_PCM_FORMAT.bit_depth // 8))
1154 frame_size = bytes_per_sample * int(_PCM_FORMAT.channels)
1155 samples = max(0, round((duration_us / 1_000_000) * int(_PCM_FORMAT.sample_rate)))
1156 return b"\x00" * (samples * frame_size)
1157