/
/
/
1"""Helper for adding buffering to async (audio) generators."""
2
3from __future__ import annotations
4
5import asyncio
6import contextlib
7from collections.abc import AsyncGenerator, Callable
8from functools import wraps
9from typing import Any, Final, ParamSpec
10
11from music_assistant.helpers.util import close_async_generator, empty_queue
12
13# Type variables for the buffered decorator
14_P = ParamSpec("_P")
15
16DEFAULT_BUFFER_SIZE: Final = 30
17DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5
18
19# Keep strong references to producer tasks to prevent garbage collection
20# The event loop only keeps weak references to tasks
21_ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set()
22
23
24async def buffered(
25 generator: AsyncGenerator[bytes, None],
26 buffer_size: int = DEFAULT_BUFFER_SIZE,
27 min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
28) -> AsyncGenerator[bytes, None]:
29 """
30 Add buffering to an async generator that yields (chunks of) bytes.
31
32 This function uses an asyncio.Queue to decouple the producer (reading from the stream)
33 from the consumer (yielding to the client). The producer runs in a separate task and
34 fills the buffer, while the consumer yields from the buffer.
35
36 Args:
37 generator: The async generator to buffer
38 buffer_size: Maximum number of chunks to buffer (default: 30)
39 min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
40
41 Example:
42 async for chunk in buffered(my_generator(), buffer_size=100):
43 process(chunk)
44 """
45 buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
46 producer_error: Exception | None = None
47 threshold_reached = asyncio.Event()
48 cancelled = asyncio.Event()
49
50 if buffer_size <= 1:
51 # No buffering needed, yield directly
52 async for chunk in generator:
53 yield chunk
54 return
55
56 async def producer() -> None:
57 """Read from the original generator and fill the buffer.
58
59 Note: When the buffer is full, buffer.put() will naturally wait for the consumer
60 to drain items. This is the intended buffering behavior and may trigger asyncio
61 "slow callback" warnings (typically 0.1-0.2s) which are harmless and expected.
62 These warnings are filtered out in the main logging configuration.
63 """
64 nonlocal producer_error
65 generator_consumed = False
66 try:
67 async for chunk in generator:
68 generator_consumed = True
69 if cancelled.is_set():
70 # Consumer has stopped, exit cleanly
71 break
72 await buffer.put(chunk)
73 if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
74 threshold_reached.set()
75 # Yield to event loop every chunk to prevent blocking
76 await asyncio.sleep(0)
77 except Exception as err:
78 producer_error = err
79 if isinstance(err, asyncio.CancelledError):
80 raise
81 finally:
82 threshold_reached.set()
83 # Clean up the generator if needed
84 if not generator_consumed:
85 await close_async_generator(generator)
86 # Signal end of stream by putting None
87 # We must wait for space in the queue if needed, otherwise the consumer may
88 # hang waiting for data that will never come
89 if not cancelled.is_set():
90 await buffer.put(None)
91
92 # Start the producer task
93 loop = asyncio.get_running_loop()
94 producer_task = loop.create_task(producer())
95
96 # Keep a strong reference to prevent garbage collection issues
97 # The event loop only keeps weak references to tasks
98 _ACTIVE_PRODUCER_TASKS.add(producer_task)
99
100 # Remove from set when done
101 producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard)
102
103 try:
104 # Wait for initial buffer to fill
105 await threshold_reached.wait()
106
107 # Consume from buffer and yield
108 while True:
109 data = await buffer.get()
110 if data is None:
111 # End of stream
112 if producer_error:
113 raise producer_error
114 break
115 yield data
116
117 finally:
118 # Signal the producer to stop
119 cancelled.set()
120 # Drain the queue to unblock the producer if it's waiting on put()
121 empty_queue(buffer)
122 # Wait for the producer to finish cleanly with a timeout to prevent blocking
123 with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError):
124 await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0)
125
126
127def use_buffer(
128 buffer_size: int = DEFAULT_BUFFER_SIZE,
129 min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
130) -> Callable[
131 [Callable[_P, AsyncGenerator[bytes, None]]],
132 Callable[_P, AsyncGenerator[bytes, None]],
133]:
134 """
135 Add buffering to async generator functions that yield bytes (decorator).
136
137 This decorator uses an asyncio.Queue to decouple the producer (reading from the stream)
138 from the consumer (yielding to the client). The producer runs in a separate task and
139 fills the buffer, while the consumer yields from the buffer.
140
141 Args:
142 buffer_size: Maximum number of chunks to buffer (default: 30)
143 min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
144
145 Example:
146 @use_buffer(buffer_size=100)
147 async def my_stream() -> AsyncGenerator[bytes, None]:
148 ...
149 """
150
151 def decorator(
152 func: Callable[_P, AsyncGenerator[bytes, None]],
153 ) -> Callable[_P, AsyncGenerator[bytes, None]]:
154 @wraps(func)
155 async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]:
156 async for chunk in buffered(
157 func(*args, **kwargs),
158 buffer_size=buffer_size,
159 min_buffer_before_yield=min_buffer_before_yield,
160 ):
161 yield chunk
162
163 return wrapper
164
165 return decorator
166