/
/
/
1"""FFMpeg related helpers."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import time
8from collections import deque
9from collections.abc import AsyncGenerator
10from contextlib import suppress
11from typing import TYPE_CHECKING, Final
12
13from music_assistant_models.enums import ContentType
14from music_assistant_models.errors import AudioError
15from music_assistant_models.helpers import get_global_cache_value, set_global_cache_values
16
17from music_assistant.constants import VERBOSE_LOG_LEVEL
18
19from .process import AsyncProcess, check_output
20from .util import close_async_generator
21
22if TYPE_CHECKING:
23 from music_assistant_models.media_items import AudioFormat
24
25LOGGER = logging.getLogger("ffmpeg")
26MINIMAL_FFMPEG_VERSION = 6
27CACHE_ATTR_LIBSOXR_PRESENT: Final[str] = "libsoxr_present"
28
29
30class FFMpeg(AsyncProcess):
31 """FFMpeg wrapped as AsyncProcess."""
32
33 def __init__(
34 self,
35 audio_input: AsyncGenerator[bytes, None] | str | int,
36 input_format: AudioFormat,
37 output_format: AudioFormat,
38 filter_params: list[str] | None = None,
39 extra_args: list[str] | None = None,
40 extra_input_args: list[str] | None = None,
41 extra_output_args: list[str] | None = None,
42 audio_output: str | int = "-",
43 collect_log_history: bool = False,
44 loglevel: str = "info",
45 ) -> None:
46 """Initialize AsyncProcess."""
47 ffmpeg_args = get_ffmpeg_args(
48 input_format=input_format,
49 output_format=output_format,
50 filter_params=filter_params or [],
51 extra_args=extra_args or [],
52 input_path=audio_input if isinstance(audio_input, str) else "-",
53 output_path=audio_output if isinstance(audio_output, str) else "-",
54 extra_input_args=extra_input_args or [],
55 extra_output_args=extra_output_args or [],
56 loglevel=loglevel,
57 )
58 self.audio_input = audio_input
59 self.input_format = input_format
60 self.collect_log_history = collect_log_history
61 self.log_history: deque[str] = deque(maxlen=100)
62 self.concat_error = False # switch to True if concat demuxer fails on MultiPartFiles
63 self._stdin_feeder_task: asyncio.Task[None] | None = None
64 self._stderr_reader_task: asyncio.Task[None] | None = None
65 self._input_codec_parsed = False
66 stdin: bool | int
67 if audio_input == "-" or isinstance(audio_input, AsyncGenerator):
68 stdin = True
69 else:
70 stdin = audio_input if isinstance(audio_input, int) else False
71 stdout = audio_output if isinstance(audio_output, int) else bool(audio_output == "-")
72 super().__init__(
73 ffmpeg_args,
74 stdin=stdin,
75 stdout=stdout,
76 stderr=True,
77 )
78 self.logger = LOGGER
79
80 async def start(self) -> None:
81 """Perform Async init of process."""
82 await super().start()
83 if self.proc:
84 self.logger = LOGGER.getChild(str(self.proc.pid))
85 clean_args = []
86 for arg in self._args[1:]:
87 if arg.startswith("http"):
88 clean_args.append("<URL>")
89 elif "/" in arg and "." in arg:
90 clean_args.append("<FILE>")
91 elif arg.startswith("data:application/"):
92 clean_args.append("<DATA>")
93 else:
94 clean_args.append(arg)
95 args_str = " ".join(clean_args)
96 self.logger.log(VERBOSE_LOG_LEVEL, "started with args: %s", args_str)
97 self._stderr_reader_task = asyncio.create_task(self._log_reader_task())
98 if isinstance(self.audio_input, AsyncGenerator):
99 self._stdin_feeder_task = asyncio.create_task(self._feed_stdin())
100
101 async def communicate(
102 self,
103 input: bytes | None = None, # noqa: A002
104 timeout: float | None = None,
105 ) -> tuple[bytes, bytes]:
106 """Override communicate to avoid blocking."""
107 if self._stdin_feeder_task:
108 if not self._stdin_feeder_task.done():
109 self._stdin_feeder_task.cancel()
110 # Always await the task to consume any exception and prevent
111 # "Task exception was never retrieved" errors.
112 try:
113 await self._stdin_feeder_task
114 except asyncio.CancelledError:
115 pass # Expected when we cancel the task
116 except Exception as err:
117 # Log unexpected exceptions from the stdin feeder before suppressing
118 # The audio source may have failed, and we need visibility into this
119 self.logger.warning(
120 "FFMpeg stdin feeder task ended with error: %s",
121 err,
122 )
123 if self._stderr_reader_task:
124 if not self._stderr_reader_task.done():
125 self._stderr_reader_task.cancel()
126 with suppress(asyncio.CancelledError, Exception):
127 await self._stderr_reader_task
128 return await super().communicate(input, timeout)
129
130 async def _log_reader_task(self) -> None:
131 """Read ffmpeg log from stderr."""
132 decode_errors = 0
133 async for line in self.iter_stderr():
134 if self.collect_log_history:
135 self.log_history.append(line)
136 # ffmpeg logging can be quite verbose, so we only log critical errors
137 # unless verbose logging is enabled
138 if "critical" in line:
139 self.logger.error(line)
140 elif self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
141 self.logger.log(VERBOSE_LOG_LEVEL, line)
142
143 if "Invalid data found when processing input" in line:
144 decode_errors += 1
145 if decode_errors >= 50:
146 self.logger.error(line)
147
148 if "Error during demuxing" in line:
149 # this can occur if using the concat demuxer for multipart files
150 # and should raise an exception to prevent false progress logging
151 self.concat_error = True
152
153 # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
154 if line.startswith("Stream #") and ": Audio: " in line:
155 if not self._input_codec_parsed:
156 content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
157 content_type_raw = content_type_raw.split(",")[0]
158 content_type = ContentType.try_parse(content_type_raw)
159 self.logger.log(
160 VERBOSE_LOG_LEVEL,
161 "Detected (input) content type: %s (%s)",
162 content_type,
163 content_type_raw,
164 )
165 if self.input_format.content_type == ContentType.UNKNOWN:
166 self.input_format.content_type = content_type
167 self.input_format.codec_type = content_type
168 self._input_codec_parsed = True
169 del line
170
171 async def _feed_stdin(self) -> None:
172 """Feed stdin with audio chunks from an AsyncGenerator."""
173 assert not isinstance(self.audio_input, str | int)
174 generator_exhausted = False
175 cancelled = False
176 status = "running"
177 chunk_count = 0
178 self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...")
179 try:
180 start = time.time()
181 async for chunk in self.audio_input:
182 chunk_count += 1
183 if self.closed:
184 return
185 await self.write(chunk)
186 generator_exhausted = True
187 except asyncio.CancelledError:
188 status = "cancelled"
189 raise
190 except Exception:
191 status = "aborted with error"
192 raise
193 finally:
194 LOGGER.log(
195 VERBOSE_LOG_LEVEL,
196 "fill_buffer_task: %s (%s chunks received) in in %.2fs",
197 status,
198 chunk_count,
199 time.time() - start,
200 )
201 if not cancelled:
202 await self.write_eof()
203 # we need to ensure that we close the async generator
204 # if we get cancelled otherwise it keeps lingering forever
205 if not generator_exhausted:
206 await close_async_generator(self.audio_input)
207
208
209async def get_ffmpeg_stream(
210 audio_input: AsyncGenerator[bytes, None] | str,
211 input_format: AudioFormat,
212 output_format: AudioFormat,
213 filter_params: list[str] | None = None,
214 extra_args: list[str] | None = None,
215 chunk_size: int | None = None,
216 extra_input_args: list[str] | None = None,
217 extra_output_args: list[str] | None = None,
218) -> AsyncGenerator[bytes, None]:
219 """
220 Get the ffmpeg audio stream as async generator.
221
222 Takes care of resampling and/or recoding if needed,
223 according to player preferences.
224 """
225 async with FFMpeg(
226 audio_input=audio_input,
227 input_format=input_format,
228 output_format=output_format,
229 filter_params=filter_params,
230 extra_args=extra_args,
231 extra_input_args=extra_input_args,
232 extra_output_args=extra_output_args,
233 collect_log_history=True,
234 ) as ffmpeg_proc:
235 # read final chunks from stdout
236 iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
237 async for chunk in iterator:
238 yield chunk
239 if ffmpeg_proc.returncode not in (None, 0) or ffmpeg_proc.concat_error:
240 # unclean exit of ffmpeg - raise error with log tail
241 log_lines = -20 if ffmpeg_proc.concat_error else -5
242 log_tail = "\n" + "\n".join(list(ffmpeg_proc.log_history)[log_lines:])
243 raise AudioError(log_tail)
244
245
246def get_ffmpeg_args( # noqa: PLR0915
247 input_format: AudioFormat,
248 output_format: AudioFormat,
249 filter_params: list[str],
250 extra_args: list[str] | None = None,
251 input_path: str = "-",
252 output_path: str = "-",
253 extra_input_args: list[str] | None = None,
254 extra_output_args: list[str] | None = None,
255 loglevel: str = "error",
256) -> list[str]:
257 """Collect all args to send to the ffmpeg process."""
258 if extra_args is None:
259 extra_args = []
260 if extra_input_args is None:
261 extra_input_args = []
262 if extra_output_args is None:
263 extra_output_args = []
264 # generic args
265 generic_args = [
266 "ffmpeg",
267 "-hide_banner",
268 "-loglevel",
269 loglevel,
270 "-nostats",
271 "-ignore_unknown",
272 "-protocol_whitelist",
273 "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp,concat",
274 "-probesize",
275 "8096",
276 "-analyzeduration",
277 "500000", # 0.5 seconds should be enough to detect the format
278 ]
279 # collect input args
280 if "-f" in extra_input_args:
281 # input format is already specified in the extra input args
282 input_args = extra_input_args
283 else:
284 input_args = [*extra_input_args]
285 if input_path.startswith("http"):
286 # append reconnect options for direct stream from http
287 input_args += [
288 # Reconnect automatically when disconnected before EOF is hit.
289 "-reconnect",
290 "1",
291 # Set the maximum delay in seconds after which to give up reconnecting.
292 "-reconnect_delay_max",
293 "10",
294 # If set then even streamed/non seekable streams will be reconnected on errors.
295 "-reconnect_streamed",
296 "1",
297 # Reconnect automatically in case of TCP/TLS errors during connect.
298 "-reconnect_on_network_error",
299 "0",
300 # A comma separated list of HTTP status codes to reconnect on.
301 # The list can include specific status codes (e.g. 503) or the strings 4xx / 5xx.
302 "-reconnect_on_http_error",
303 "5xx,429",
304 ]
305 if input_format.content_type.is_pcm():
306 input_args += [
307 "-ac",
308 str(input_format.channels),
309 "-channel_layout",
310 "mono" if input_format.channels == 1 else "stereo",
311 "-ar",
312 str(input_format.sample_rate),
313 "-acodec",
314 input_format.content_type.name.lower(),
315 "-f",
316 input_format.content_type.value,
317 ]
318 if input_format.codec_type != ContentType.UNKNOWN:
319 input_args += ["-acodec", input_format.codec_type.name.lower()]
320
321 # add input path at the end
322 input_args += ["-i", input_path]
323
324 # collect output args
325 output_args = [
326 "-ac",
327 str(output_format.channels),
328 "-channel_layout",
329 "mono" if output_format.channels == 1 else "stereo",
330 ]
331 if output_path.upper() == "NULL":
332 # devnull stream
333 output_path = "-"
334 output_args = ["-f", "null"]
335 elif output_format.content_type.is_pcm():
336 # use explicit format identifier for pcm formats
337 output_args += [
338 "-ar",
339 str(output_format.sample_rate),
340 "-acodec",
341 output_format.content_type.name.lower(),
342 "-f",
343 output_format.content_type.value,
344 ]
345 elif output_format.content_type == ContentType.NUT:
346 # passthrough-mode (for creating the cache) using NUT container
347 output_args = [
348 "-vn",
349 "-dn",
350 "-sn",
351 "-acodec",
352 "copy",
353 "-f",
354 "nut",
355 ]
356 elif output_format.content_type == ContentType.AAC:
357 output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k"]
358 elif output_format.content_type == ContentType.MP3:
359 output_args = ["-f", "mp3", "-b:a", "320k"]
360 elif output_format.content_type == ContentType.WAV:
361 pcm_format = ContentType.from_bit_depth(output_format.bit_depth)
362 output_args = [
363 "-ar",
364 str(output_format.sample_rate),
365 "-acodec",
366 pcm_format.name.lower(),
367 "-f",
368 "wav",
369 ]
370 elif output_format.content_type == ContentType.FLAC:
371 # use level 0 compression for fastest encoding
372 sample_fmt = "s32" if output_format.bit_depth > 16 else "s16"
373 output_args += [
374 "-sample_fmt",
375 sample_fmt,
376 "-ar",
377 str(output_format.sample_rate),
378 "-f",
379 "flac",
380 "-compression_level",
381 "0",
382 ]
383 else:
384 raise RuntimeError("Invalid/unsupported output format specified")
385
386 output_args += extra_output_args # append the extra output args
387 # append (final) output path at the end of the args
388 output_args.append(output_path)
389
390 # edge case: source file is not stereo - downmix to stereo
391 if input_format.channels > 2 and output_format.channels == 2:
392 filter_params = [
393 "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE",
394 *filter_params,
395 ]
396
397 # determine if we need to do resampling (or dithering)
398 if input_format.sample_rate != output_format.sample_rate or (
399 input_format.bit_depth > 16 and output_format.bit_depth == 16
400 ):
401 libsoxr_support = get_global_cache_value(CACHE_ATTR_LIBSOXR_PRESENT)
402 # prefer resampling with libsoxr due to its high quality
403 # but skip if loudnorm filter is present, due to this bug:
404 # https://trac.ffmpeg.org/ticket/11323
405 loudnorm_present = any("loudnorm" in f for f in filter_params)
406 if libsoxr_support and not loudnorm_present:
407 resample_filter = "aresample=resampler=soxr:precision=30"
408 else:
409 resample_filter = "aresample=resampler=swr"
410
411 # sample rate conversion
412 if input_format.sample_rate != output_format.sample_rate:
413 resample_filter += f":osr={output_format.sample_rate}"
414
415 # bit depth conversion: apply dithering when going down to 16 bits
416 # this is only needed when we need to back to 16 bits
417 # when going from 32bits FP to 24 bits no dithering is needed
418 if output_format.bit_depth == 16 and input_format.bit_depth > 16:
419 resample_filter += ":osf=s16:dither_method=triangular_hp"
420
421 filter_params.append(resample_filter)
422
423 if filter_params and "-filter_complex" not in extra_args:
424 extra_args += ["-af", ",".join(filter_params)]
425
426 return generic_args + input_args + extra_args + output_args
427
428
429async def check_ffmpeg_version() -> None:
430 """Check if ffmpeg is present (with libsoxr support)."""
431 # check for FFmpeg presence
432 try:
433 returncode, output = await check_output("ffmpeg", "-version")
434 except FileNotFoundError:
435 raise AudioError(
436 "FFmpeg binary is missing from system. "
437 "Please install ffmpeg on your OS to enable playback."
438 )
439 if returncode != 0:
440 err_msg = "Error determining FFmpeg version on your system."
441 if returncode < 0:
442 # error below 0 is often illegal instruction
443 err_msg += " - Your CPU may be too old to run this version of FFmpeg."
444 err_msg += f" - Additional info: {returncode} {output.decode().strip()}"
445 raise AudioError(err_msg)
446 # parse version number from output
447 try:
448 version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0]
449 except IndexError:
450 raise AudioError(
451 "Error determining FFmpeg version on your system."
452 f"Additional info: {returncode} {output.decode().strip()}"
453 )
454 libsoxr_support = "enable-libsoxr" in output.decode()
455 # use globals as in-memory cache
456 await set_global_cache_values({CACHE_ATTR_LIBSOXR_PRESENT: libsoxr_support})
457
458 major_version = int("".join(char for char in version.split(".")[0] if not char.isalpha()))
459 if major_version < MINIMAL_FFMPEG_VERSION:
460 raise AudioError(
461 f"FFmpeg version {version} is not supported. "
462 f"Minimal version required is {MINIMAL_FFMPEG_VERSION}."
463 )
464
465 LOGGER.info(
466 "Detected ffmpeg version %s %s",
467 version,
468 "with libsoxr support" if libsoxr_support else "",
469 )
470