/
/
/
1"""
2AsyncProcess.
3
4Wrapper around asyncio subprocess to help with using pipe streams and
5taking care of properly closing the process in case of exit (on both success and failures),
6without deadlocking.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import logging
13import os
14
15# if TYPE_CHECKING:
16from collections.abc import AsyncGenerator
17from contextlib import suppress
18from signal import SIGINT
19from types import TracebackType
20from typing import Self
21
22from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
23
24LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process")
25
26DEFAULT_CHUNKSIZE = 64000
27
28
29def get_subprocess_env(env: dict[str, str] | None = None) -> dict[str, str]:
30 """Get environment for subprocess, stripping LD_PRELOAD to avoid jemalloc warnings."""
31 result = dict(os.environ)
32 result.pop("LD_PRELOAD", None)
33 if env:
34 result.update(env)
35 return result
36
37
38class AsyncProcess:
39 """
40 AsyncProcess.
41
42 Wrapper around asyncio subprocess to help with using pipe streams and
43 taking care of properly closing the process in case of exit (on both success and failures),
44 without deadlocking.
45 """
46
47 _stdin_feeder_task: asyncio.Task[None] | None = None # used for ffmpeg
48 _stderr_reader_task: asyncio.Task[None] | None = None # used for ffmpeg
49
50 def __init__(
51 self,
52 args: list[str],
53 stdin: bool | int | None = None,
54 stdout: bool | int | None = None,
55 stderr: bool | int | None = False,
56 name: str | None = None,
57 env: dict[str, str] | None = None,
58 ) -> None:
59 """Initialize AsyncProcess.
60
61 :param args: Command and arguments to execute.
62 :param stdin: Stdin configuration (True for PIPE, False for None, or custom).
63 :param stdout: Stdout configuration (True for PIPE, False for None, or custom).
64 :param stderr: Stderr configuration (True for PIPE, False for DEVNULL, or custom).
65 :param name: Process name for logging.
66 :param env: Environment variables for the subprocess (None inherits parent env).
67 """
68 self.proc: asyncio.subprocess.Process | None = None
69 if name is None:
70 name = args[0].split(os.sep)[-1]
71 self.name = name
72 self.logger = LOGGER.getChild(name)
73 self._args = args
74 self._stdin = None if stdin is False else stdin
75 self._stdout = None if stdout is False else stdout
76 self._stderr = asyncio.subprocess.DEVNULL if stderr is False else stderr
77 self._env = get_subprocess_env(env)
78 self._stderr_lock = asyncio.Lock()
79 self._stdout_lock = asyncio.Lock()
80 self._stdin_lock = asyncio.Lock()
81 self._close_called = False
82 self._returncode: int | None = None
83
84 @property
85 def closed(self) -> bool:
86 """Return if the process was closed."""
87 return self._close_called or self.returncode is not None
88
89 @property
90 def returncode(self) -> int | None:
91 """Return the erturncode of the process."""
92 if self._returncode is not None:
93 return self._returncode
94 if self.proc is None:
95 return None
96 if (ret_code := self.proc.returncode) is not None:
97 self._returncode = ret_code
98 return ret_code
99
100 async def __aenter__(self) -> Self:
101 """Enter context manager."""
102 await self.start()
103 return self
104
105 async def __aexit__(
106 self,
107 exc_type: type[BaseException] | None,
108 exc_val: BaseException | None,
109 exc_tb: TracebackType | None,
110 ) -> bool | None:
111 """Exit context manager."""
112 # make sure we close and cleanup the process
113 await self.close()
114 self._returncode = self.returncode
115 return None
116
117 async def start(self) -> None:
118 """Perform Async init of process."""
119 self.proc = await asyncio.create_subprocess_exec(
120 *self._args,
121 stdin=asyncio.subprocess.PIPE if self._stdin is True else self._stdin,
122 stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
123 stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
124 env=self._env,
125 )
126 self.logger.log(
127 VERBOSE_LOG_LEVEL, "Process %s started with PID %s", self.name, self.proc.pid
128 )
129
130 async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
131 """Yield chunks of n size from the process stdout."""
132 while True:
133 chunk = await self.readexactly(n)
134 if len(chunk) == 0:
135 break
136 yield chunk
137
138 async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
139 """Yield chunks as they come in from process stdout."""
140 while True:
141 chunk = await self.read(n)
142 if len(chunk) == 0:
143 break
144 yield chunk
145
146 async def readexactly(self, n: int) -> bytes:
147 """Read exactly n bytes from the process stdout (or less if eof)."""
148 if self._close_called:
149 return b""
150 assert self.proc is not None # for type checking
151 assert self.proc.stdout is not None # for type checking
152 async with self._stdout_lock:
153 try:
154 return await self.proc.stdout.readexactly(n)
155 except asyncio.IncompleteReadError as err:
156 return err.partial
157
158 async def read(self, n: int) -> bytes:
159 """Read up to n bytes from the stdout stream.
160
161 If n is positive, this function try to read n bytes,
162 and may return less or equal bytes than requested, but at least one byte.
163 If EOF was received before any byte is read, this function returns empty byte object.
164 """
165 if self._close_called:
166 return b""
167 assert self.proc is not None # for type checking
168 assert self.proc.stdout is not None # for type checking
169 async with self._stdout_lock:
170 return await self.proc.stdout.read(n)
171
172 async def write(self, data: bytes) -> None:
173 """Write data to process stdin."""
174 if self._close_called or self.proc is None:
175 return
176 if self.proc.stdin is None:
177 return
178 async with self._stdin_lock:
179 self.proc.stdin.write(data)
180 with suppress(BrokenPipeError, ConnectionResetError):
181 await self.proc.stdin.drain()
182
183 async def write_eof(self) -> None:
184 """Write end of file to to process stdin."""
185 if self._close_called or self.proc is None:
186 return
187 if self.proc.stdin is None:
188 return
189 async with self._stdin_lock:
190 try:
191 if self.proc.stdin.can_write_eof():
192 self.proc.stdin.write_eof()
193 await self.proc.stdin.drain()
194 except (
195 AttributeError,
196 AssertionError,
197 BrokenPipeError,
198 RuntimeError,
199 ConnectionResetError,
200 ):
201 # already exited, race condition
202 pass
203
204 async def read_stderr(self) -> bytes:
205 """Read line from stderr."""
206 if self.returncode is not None:
207 return b""
208 assert self.proc is not None # for type checking
209 assert self.proc.stderr is not None # for type checking
210 async with self._stderr_lock:
211 try:
212 return await self.proc.stderr.readline()
213 except ValueError as err:
214 # we're waiting for a line (separator found), but the line was too big
215 # this may happen with ffmpeg during a long (radio) stream where progress
216 # gets outputted to the stderr but no newline
217 # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
218 # NOTE: this consumes the line that was too big
219 if "chunk exceed the limit" in str(err):
220 return await self.proc.stderr.readline()
221 # raise for all other (value) errors
222 raise
223
224 async def iter_stderr(self) -> AsyncGenerator[str, None]:
225 """Iterate lines from the stderr stream as string."""
226 line: str | bytes
227 while True:
228 line = await self.read_stderr()
229 if line == b"":
230 break
231 line = line.decode("utf-8", errors="ignore").strip()
232 if not line:
233 continue
234 yield line
235
236 async def communicate(
237 self,
238 input: bytes | None = None, # noqa: A002
239 timeout: float | None = None,
240 ) -> tuple[bytes, bytes]:
241 """Communicate with the process and return stdout and stderr."""
242 if self.closed:
243 raise RuntimeError("communicate called while process already done")
244 # abort existing readers on stderr/stdout first before we send communicate
245 await self._stderr_lock.acquire()
246 await self._stdout_lock.acquire()
247 assert self.proc is not None # for type checking
248 stdout, stderr = await asyncio.wait_for(self.proc.communicate(input), timeout)
249 return (stdout, stderr)
250
251 async def close(self) -> None:
252 """Close/terminate the process and wait for exit."""
253 self._close_called = True
254 if not self.proc:
255 return
256
257 # cancel existing stdin feeder task if any
258 if self._stdin_feeder_task:
259 if not self._stdin_feeder_task.done():
260 self._stdin_feeder_task.cancel()
261 # Always await the task to consume any exception and prevent
262 # "Task exception was never retrieved" errors.
263 # Suppress CancelledError (from cancel) and any other exception
264 # since exceptions have already been propagated through the generator chain.
265 with suppress(asyncio.CancelledError, Exception):
266 await self._stdin_feeder_task
267
268 # close stdin to signal we're done sending data
269 with suppress(TimeoutError, asyncio.CancelledError):
270 await asyncio.wait_for(self._stdin_lock.acquire(), 5)
271 if self.proc.stdin and not self.proc.stdin.is_closing():
272 self.proc.stdin.close()
273 elif not self.proc.stdin and self.proc.returncode is None:
274 self.proc.send_signal(SIGINT)
275
276 # ensure we have no more readers active and stdout is drained
277 with suppress(TimeoutError, asyncio.CancelledError):
278 await asyncio.wait_for(self._stdout_lock.acquire(), 5)
279 if self.proc.stdout and not self.proc.stdout.at_eof():
280 with suppress(Exception):
281 await self.proc.stdout.read(-1)
282 # if we have a stderr task active, allow it to finish
283 if self._stderr_reader_task:
284 with suppress(TimeoutError, asyncio.CancelledError):
285 await asyncio.wait_for(self._stderr_reader_task, 5)
286 elif self.proc.stderr and not self.proc.stderr.at_eof():
287 with suppress(TimeoutError, asyncio.CancelledError):
288 await asyncio.wait_for(self._stderr_lock.acquire(), 5)
289 # drain stderr
290 with suppress(Exception):
291 await self.proc.stderr.read(-1)
292
293 # make sure the process is really cleaned up.
294 # especially with pipes this can cause deadlocks if not properly guarded
295 # we need to ensure stdout and stderr are flushed and stdin closed
296 pid = self.proc.pid
297 terminate_attempts = 0
298 while self.returncode is None:
299 try:
300 # use communicate to flush all pipe buffers
301 await asyncio.wait_for(self.proc.communicate(), 2)
302 except TimeoutError:
303 terminate_attempts += 1
304 self.logger.debug(
305 "Process %s with PID %s did not stop in time (attempt %d). Sending SIGKILL...",
306 self.name,
307 pid,
308 terminate_attempts,
309 )
310 # Use os.kill for more direct signal delivery
311 with suppress(ProcessLookupError, OSError):
312 os.kill(pid, 9) # SIGKILL = 9
313 # Give up after 5 attempts - process may be zombie
314 if terminate_attempts >= 5:
315 self.logger.warning(
316 "Process %s (PID %s) did not terminate after %d SIGKILL attempts",
317 self.name,
318 pid,
319 terminate_attempts,
320 )
321 break
322 self.logger.log(
323 VERBOSE_LOG_LEVEL,
324 "Process %s with PID %s stopped with returncode %s",
325 self.name,
326 self.proc.pid,
327 self.returncode,
328 )
329
330 async def kill(self) -> None:
331 """
332 Immediately kill the process with SIGKILL.
333
334 Use this for forceful termination when the process doesn't respond to
335 normal termination signals. Unlike close(), this doesn't attempt graceful
336 shutdown - it immediately sends SIGKILL.
337 """
338 self._close_called = True
339 if not self.proc or self.returncode is not None:
340 return
341
342 pid = self.proc.pid
343
344 # Cancel stdin feeder task if any
345 if self._stdin_feeder_task and not self._stdin_feeder_task.done():
346 self._stdin_feeder_task.cancel()
347 with suppress(asyncio.CancelledError, Exception):
348 await self._stdin_feeder_task
349
350 # Cancel stderr reader task if any
351 if self._stderr_reader_task and not self._stderr_reader_task.done():
352 self._stderr_reader_task.cancel()
353 with suppress(asyncio.CancelledError, Exception):
354 await self._stderr_reader_task
355
356 # Close all pipes first to prevent any I/O blocking
357 # This helps processes stuck on blocked I/O to receive signals
358 if self.proc.stdin and not self.proc.stdin.is_closing():
359 self.proc.stdin.close()
360 if self.proc.stdout:
361 self.proc.stdout.feed_eof()
362 if self.proc.stderr:
363 self.proc.stderr.feed_eof()
364
365 # Send SIGKILL immediately using os.kill for more direct signal delivery
366 self.logger.debug("Killing process %s with PID %s", self.name, pid)
367 with suppress(ProcessLookupError, OSError):
368 os.kill(pid, 9) # SIGKILL = 9
369
370 # Wait for process to actually terminate
371 try:
372 await asyncio.wait_for(self.proc.wait(), 2)
373 except TimeoutError:
374 # Try one more time with os.kill
375 with suppress(ProcessLookupError, OSError):
376 os.kill(pid, 9)
377 try:
378 await asyncio.wait_for(self.proc.wait(), 2)
379 except TimeoutError:
380 self.logger.warning(
381 "Process %s with PID %s did not terminate after SIGKILL - may be zombie",
382 self.name,
383 pid,
384 )
385
386 self.logger.log(
387 VERBOSE_LOG_LEVEL,
388 "Process %s with PID %s killed with returncode %s",
389 self.name,
390 pid,
391 self.returncode,
392 )
393
394 async def wait(self) -> int:
395 """Wait for the process and return the returncode."""
396 if self._returncode is None:
397 assert self.proc is not None
398 self._returncode = await self.proc.wait()
399 return self._returncode
400
401 async def wait_with_timeout(self, timeout: int) -> int:
402 """Wait for the process and return the returncode with a timeout."""
403 return await asyncio.wait_for(self.wait(), timeout)
404
405 def attach_stderr_reader(self, task: asyncio.Task[None]) -> None:
406 """Attach a stderr reader task to this process."""
407 self._stderr_reader_task = task
408
409
410async def check_output(*args: str, env: dict[str, str] | None = None) -> tuple[int, bytes]:
411 """Run subprocess and return returncode and output."""
412 proc = await asyncio.create_subprocess_exec(
413 *args,
414 stderr=asyncio.subprocess.STDOUT,
415 stdout=asyncio.subprocess.PIPE,
416 env=get_subprocess_env(env),
417 )
418 stdout, _ = await proc.communicate()
419 assert proc.returncode is not None # for type checking
420 return (proc.returncode, stdout)
421
422
423async def communicate(
424 args: list[str],
425 input: bytes | None = None, # noqa: A002
426) -> tuple[int, bytes, bytes]:
427 """Communicate with subprocess and return returncode, stdout and stderr output."""
428 proc = await asyncio.create_subprocess_exec(
429 *args,
430 stderr=asyncio.subprocess.PIPE,
431 stdout=asyncio.subprocess.PIPE,
432 stdin=asyncio.subprocess.PIPE if input is not None else None,
433 env=get_subprocess_env(),
434 )
435 stdout, stderr = await proc.communicate(input)
436 assert proc.returncode is not None # for type checking
437 return (proc.returncode, stdout, stderr)
438