/
/
/
1"""
2AsyncProcess.
3
4Wrapper around asyncio subprocess to help with using pipe streams and
5taking care of properly closing the process in case of exit (on both success and failures),
6without deadlocking.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import logging
13import os
14
15# if TYPE_CHECKING:
16from collections.abc import AsyncGenerator
17from contextlib import suppress
18from signal import SIGINT
19from types import TracebackType
20from typing import Self
21
22from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
23
24LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process")
25
26DEFAULT_CHUNKSIZE = 64000
27
28
29def get_subprocess_env(env: dict[str, str] | None = None) -> dict[str, str]:
30 """Get environment for subprocess, stripping LD_PRELOAD to avoid jemalloc warnings."""
31 result = dict(os.environ)
32 result.pop("LD_PRELOAD", None)
33 if env:
34 result.update(env)
35 return result
36
37
38class AsyncProcess:
39 """
40 AsyncProcess.
41
42 Wrapper around asyncio subprocess to help with using pipe streams and
43 taking care of properly closing the process in case of exit (on both success and failures),
44 without deadlocking.
45 """
46
47 _stdin_feeder_task: asyncio.Task[None] | None = None # used for ffmpeg
48 _stderr_reader_task: asyncio.Task[None] | None = None # used for ffmpeg
49
50 def __init__(
51 self,
52 args: list[str],
53 stdin: bool | int | None = None,
54 stdout: bool | int | None = None,
55 stderr: bool | int | None = False,
56 name: str | None = None,
57 env: dict[str, str] | None = None,
58 ) -> None:
59 """Initialize AsyncProcess.
60
61 :param args: Command and arguments to execute.
62 :param stdin: Stdin configuration (True for PIPE, False for None, or custom).
63 :param stdout: Stdout configuration (True for PIPE, False for None, or custom).
64 :param stderr: Stderr configuration (True for PIPE, False for DEVNULL, or custom).
65 :param name: Process name for logging.
66 :param env: Environment variables for the subprocess (None inherits parent env).
67 """
68 self.proc: asyncio.subprocess.Process | None = None
69 if name is None:
70 name = args[0].split(os.sep)[-1]
71 self.name = name
72 self.logger = LOGGER.getChild(name)
73 self._args = args
74 self._stdin = None if stdin is False else stdin
75 self._stdout = None if stdout is False else stdout
76 self._stderr = asyncio.subprocess.DEVNULL if stderr is False else stderr
77 self._env = get_subprocess_env(env)
78 self._stderr_lock = asyncio.Lock()
79 self._stdout_lock = asyncio.Lock()
80 self._stdin_lock = asyncio.Lock()
81 self._close_called = False
82 self._returncode: int | None = None
83
84 @property
85 def closed(self) -> bool:
86 """Return if the process was closed."""
87 return self._close_called or self.returncode is not None
88
89 @property
90 def returncode(self) -> int | None:
91 """Return the erturncode of the process."""
92 if self._returncode is not None:
93 return self._returncode
94 if self.proc is None:
95 return None
96 if (ret_code := self.proc.returncode) is not None:
97 self._returncode = ret_code
98 return ret_code
99
100 async def __aenter__(self) -> Self:
101 """Enter context manager."""
102 await self.start()
103 return self
104
105 async def __aexit__(
106 self,
107 exc_type: type[BaseException] | None,
108 exc_val: BaseException | None,
109 exc_tb: TracebackType | None,
110 ) -> bool | None:
111 """Exit context manager."""
112 # make sure we close and cleanup the process
113 await self.close()
114 self._returncode = self.returncode
115 return None
116
117 async def start(self) -> None:
118 """Perform Async init of process."""
119 self.proc = await asyncio.create_subprocess_exec(
120 *self._args,
121 stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
122 stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
123 stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
124 env=self._env,
125 )
126 self.logger.log(
127 VERBOSE_LOG_LEVEL, "Process %s started with PID %s", self.name, self.proc.pid
128 )
129
130 async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
131 """Yield chunks of n size from the process stdout."""
132 while True:
133 chunk = await self.readexactly(n)
134 if len(chunk) == 0:
135 break
136 yield chunk
137
138 async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
139 """Yield chunks as they come in from process stdout."""
140 while True:
141 chunk = await self.read(n)
142 if len(chunk) == 0:
143 break
144 yield chunk
145
146 async def readexactly(self, n: int) -> bytes:
147 """Read exactly n bytes from the process stdout (or less if eof)."""
148 if self._close_called:
149 return b""
150 assert self.proc is not None # for type checking
151 assert self.proc.stdout is not None # for type checking
152 async with self._stdout_lock:
153 try:
154 return await self.proc.stdout.readexactly(n)
155 except asyncio.IncompleteReadError as err:
156 return err.partial
157
158 async def read(self, n: int) -> bytes:
159 """Read up to n bytes from the stdout stream.
160
161 If n is positive, this function try to read n bytes,
162 and may return less or equal bytes than requested, but at least one byte.
163 If EOF was received before any byte is read, this function returns empty byte object.
164 """
165 if self._close_called:
166 return b""
167 assert self.proc is not None # for type checking
168 assert self.proc.stdout is not None # for type checking
169 async with self._stdout_lock:
170 return await self.proc.stdout.read(n)
171
172 async def write(self, data: bytes) -> None:
173 """Write data to process stdin."""
174 if self._close_called or self.proc is None:
175 return
176 if self.proc.stdin is None:
177 return
178 async with self._stdin_lock:
179 self.proc.stdin.write(data)
180 with suppress(BrokenPipeError, ConnectionResetError):
181 await self.proc.stdin.drain()
182
183 async def write_eof(self) -> None:
184 """Write end of file to to process stdin."""
185 if self._close_called or self.proc is None:
186 return
187 if self.proc.stdin is None:
188 return
189 async with self._stdin_lock:
190 try:
191 if self.proc.stdin.can_write_eof():
192 self.proc.stdin.write_eof()
193 await self.proc.stdin.drain()
194 except (
195 AttributeError,
196 AssertionError,
197 BrokenPipeError,
198 RuntimeError,
199 ConnectionResetError,
200 ):
201 # already exited, race condition
202 pass
203
204 async def read_stderr(self) -> bytes:
205 """Read line from stderr."""
206 if self.returncode is not None:
207 return b""
208 assert self.proc is not None # for type checking
209 assert self.proc.stderr is not None # for type checking
210 async with self._stderr_lock:
211 try:
212 return await self.proc.stderr.readline()
213 except ValueError as err:
214 # we're waiting for a line (separator found), but the line was too big
215 # this may happen with ffmpeg during a long (radio) stream where progress
216 # gets outputted to the stderr but no newline
217 # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
218 # NOTE: this consumes the line that was too big
219 if "chunk exceed the limit" in str(err):
220 return await self.proc.stderr.readline()
221 # raise for all other (value) errors
222 raise
223
224 async def iter_stderr(self) -> AsyncGenerator[str, None]:
225 """Iterate lines from the stderr stream as string."""
226 line: str | bytes
227 while True:
228 line = await self.read_stderr()
229 if line == b"":
230 break
231 line = line.decode("utf-8", errors="ignore").strip()
232 if not line:
233 continue
234 yield line
235
236 async def communicate(
237 self,
238 input: bytes | None = None, # noqa: A002
239 timeout: float | None = None,
240 ) -> tuple[bytes, bytes]:
241 """Communicate with the process and return stdout and stderr."""
242 if self.closed:
243 raise RuntimeError("communicate called while process already done")
244 # abort existing readers on stderr/stdout first before we send communicate
245 await self._stderr_lock.acquire()
246 await self._stdout_lock.acquire()
247 assert self.proc is not None # for type checking
248 stdout, stderr = await asyncio.wait_for(self.proc.communicate(input), timeout)
249 return (stdout, stderr)
250
251 async def close(self) -> None:
252 """Close/terminate the process and wait for exit."""
253 self._close_called = True
254 if not self.proc:
255 return
256
257 # cancel existing stdin feeder task if any
258 if self._stdin_feeder_task:
259 if not self._stdin_feeder_task.done():
260 self._stdin_feeder_task.cancel()
261 # Always await the task to consume any exception and prevent
262 # "Task exception was never retrieved" errors.
263 try:
264 await self._stdin_feeder_task
265 except asyncio.CancelledError:
266 pass # Expected when we cancel the task
267 except Exception as err:
268 # Log unexpected exceptions from the stdin feeder before suppressing
269 LOGGER.warning(
270 "Process stdin feeder task ended with error: %s",
271 err,
272 )
273
274 # close stdin to signal we're done sending data
275 with suppress(TimeoutError, asyncio.CancelledError):
276 await asyncio.wait_for(self._stdin_lock.acquire(), 5)
277 if self.proc.stdin and not self.proc.stdin.is_closing():
278 self.proc.stdin.close()
279 elif not self.proc.stdin and self.proc.returncode is None:
280 self.proc.send_signal(SIGINT)
281
282 # ensure we have no more readers active and stdout is drained
283 with suppress(TimeoutError, asyncio.CancelledError):
284 await asyncio.wait_for(self._stdout_lock.acquire(), 5)
285 if self.proc.stdout and not self.proc.stdout.at_eof():
286 with suppress(Exception):
287 await self.proc.stdout.read(-1)
288 # if we have a stderr task active, allow it to finish
289 if self._stderr_reader_task:
290 with suppress(TimeoutError, asyncio.CancelledError):
291 await asyncio.wait_for(self._stderr_reader_task, 5)
292 elif self.proc.stderr and not self.proc.stderr.at_eof():
293 with suppress(TimeoutError, asyncio.CancelledError):
294 await asyncio.wait_for(self._stderr_lock.acquire(), 5)
295 # drain stderr
296 with suppress(Exception):
297 await self.proc.stderr.read(-1)
298
299 # make sure the process is really cleaned up.
300 # especially with pipes this can cause deadlocks if not properly guarded
301 # we need to ensure stdout and stderr are flushed and stdin closed
302 pid = self.proc.pid
303 terminate_attempts = 0
304 while self.returncode is None:
305 try:
306 # use communicate to flush all pipe buffers
307 await asyncio.wait_for(self.proc.communicate(), 2)
308 except TimeoutError:
309 terminate_attempts += 1
310 self.logger.debug(
311 "Process %s with PID %s did not stop in time (attempt %d). Sending SIGKILL...",
312 self.name,
313 pid,
314 terminate_attempts,
315 )
316 # Use os.kill for more direct signal delivery
317 with suppress(ProcessLookupError, OSError):
318 os.kill(pid, 9) # SIGKILL = 9
319 # Give up after 5 attempts - process may be zombie
320 if terminate_attempts >= 5:
321 self.logger.warning(
322 "Process %s (PID %s) did not terminate after %d SIGKILL attempts",
323 self.name,
324 pid,
325 terminate_attempts,
326 )
327 break
328 self.logger.log(
329 VERBOSE_LOG_LEVEL,
330 "Process %s with PID %s stopped with returncode %s",
331 self.name,
332 self.proc.pid,
333 self.returncode,
334 )
335
336 async def kill(self) -> None:
337 """
338 Immediately kill the process with SIGKILL.
339
340 Use this for forceful termination when the process doesn't respond to
341 normal termination signals. Unlike close(), this doesn't attempt graceful
342 shutdown - it immediately sends SIGKILL.
343 """
344 self._close_called = True
345 if not self.proc or self.returncode is not None:
346 return
347
348 pid = self.proc.pid
349
350 # Cancel stdin feeder task if any
351 if self._stdin_feeder_task and not self._stdin_feeder_task.done():
352 self._stdin_feeder_task.cancel()
353 with suppress(asyncio.CancelledError, Exception):
354 await self._stdin_feeder_task
355
356 # Cancel stderr reader task if any
357 if self._stderr_reader_task and not self._stderr_reader_task.done():
358 self._stderr_reader_task.cancel()
359 with suppress(asyncio.CancelledError, Exception):
360 await self._stderr_reader_task
361
362 # Close stdin to signal we're done sending data
363 # Note: Don't manually call feed_eof() on stdout/stderr - this causes
364 # "feed_data after feed_eof" assertion errors when the subprocess transport
365 # still has buffered data to deliver. Let the process termination naturally
366 # close the streams.
367 if self.proc.stdin and not self.proc.stdin.is_closing():
368 self.proc.stdin.close()
369
370 # Send SIGKILL immediately using os.kill for more direct signal delivery
371 self.logger.debug("Killing process %s with PID %s", self.name, pid)
372 with suppress(ProcessLookupError, OSError):
373 os.kill(pid, 9) # SIGKILL = 9
374
375 # Wait for process to actually terminate
376 try:
377 await asyncio.wait_for(self.proc.wait(), 2)
378 except TimeoutError:
379 # Try one more time with os.kill
380 with suppress(ProcessLookupError, OSError):
381 os.kill(pid, 9)
382 try:
383 await asyncio.wait_for(self.proc.wait(), 2)
384 except TimeoutError:
385 self.logger.warning(
386 "Process %s with PID %s did not terminate after SIGKILL - may be zombie",
387 self.name,
388 pid,
389 )
390
391 self.logger.log(
392 VERBOSE_LOG_LEVEL,
393 "Process %s with PID %s killed with returncode %s",
394 self.name,
395 pid,
396 self.returncode,
397 )
398
399 async def wait(self) -> int:
400 """Wait for the process and return the returncode."""
401 if self._returncode is None:
402 assert self.proc is not None
403 self._returncode = await self.proc.wait()
404 return self._returncode
405
406 async def wait_with_timeout(self, timeout: int) -> int:
407 """Wait for the process and return the returncode with a timeout."""
408 return await asyncio.wait_for(self.wait(), timeout)
409
410 def attach_stderr_reader(self, task: asyncio.Task[None]) -> None:
411 """Attach a stderr reader task to this process."""
412 self._stderr_reader_task = task
413
414
415async def check_output(*args: str, env: dict[str, str] | None = None) -> tuple[int, bytes]:
416 """Run subprocess and return returncode and output."""
417 proc = await asyncio.create_subprocess_exec(
418 *args,
419 stderr=asyncio.subprocess.STDOUT,
420 stdout=asyncio.subprocess.PIPE,
421 env=get_subprocess_env(env),
422 )
423 stdout, _ = await proc.communicate()
424 assert proc.returncode is not None # for type checking
425 return (proc.returncode, stdout)
426
427
428async def communicate(
429 args: list[str],
430 input: bytes | None = None, # noqa: A002
431) -> tuple[int, bytes, bytes]:
432 """Communicate with subprocess and return returncode, stdout and stderr output."""
433 proc = await asyncio.create_subprocess_exec(
434 *args,
435 stderr=asyncio.subprocess.PIPE,
436 stdout=asyncio.subprocess.PIPE,
437 stdin=asyncio.subprocess.PIPE if input is not None else None,
438 env=get_subprocess_env(),
439 )
440 stdout, stderr = await proc.communicate(input)
441 assert proc.returncode is not None # for type checking
442 return (proc.returncode, stdout, stderr)
443