/
/
/
1"""FFMpeg related helpers."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import time
8from collections import deque
9from collections.abc import AsyncGenerator
10from contextlib import suppress
11from typing import TYPE_CHECKING, Final
12
13from music_assistant_models.enums import ContentType
14from music_assistant_models.errors import AudioError
15from music_assistant_models.helpers import get_global_cache_value, set_global_cache_values
16
17from music_assistant.constants import VERBOSE_LOG_LEVEL
18
19from .process import AsyncProcess, check_output
20from .util import close_async_generator
21
22if TYPE_CHECKING:
23 from music_assistant_models.media_items import AudioFormat
24
25LOGGER = logging.getLogger("ffmpeg")
26MINIMAL_FFMPEG_VERSION = 6
27CACHE_ATTR_LIBSOXR_PRESENT: Final[str] = "libsoxr_present"
28
29
30class FFMpeg(AsyncProcess):
31 """FFMpeg wrapped as AsyncProcess."""
32
33 def __init__(
34 self,
35 audio_input: AsyncGenerator[bytes, None] | str | int,
36 input_format: AudioFormat,
37 output_format: AudioFormat,
38 filter_params: list[str] | None = None,
39 extra_args: list[str] | None = None,
40 extra_input_args: list[str] | None = None,
41 extra_output_args: list[str] | None = None,
42 audio_output: str | int = "-",
43 collect_log_history: bool = False,
44 loglevel: str = "info",
45 ) -> None:
46 """Initialize AsyncProcess."""
47 ffmpeg_args = get_ffmpeg_args(
48 input_format=input_format,
49 output_format=output_format,
50 filter_params=filter_params or [],
51 extra_args=extra_args or [],
52 input_path=audio_input if isinstance(audio_input, str) else "-",
53 output_path=audio_output if isinstance(audio_output, str) else "-",
54 extra_input_args=extra_input_args or [],
55 extra_output_args=extra_output_args or [],
56 loglevel=loglevel,
57 )
58 self.audio_input = audio_input
59 self.input_format = input_format
60 self.collect_log_history = collect_log_history
61 self.log_history: deque[str] = deque(maxlen=100)
62 self.concat_error = False # switch to True if concat demuxer fails on MultiPartFiles
63 self._stdin_feeder_task: asyncio.Task[None] | None = None
64 self._stderr_reader_task: asyncio.Task[None] | None = None
65 self._input_codec_parsed = False
66 stdin: bool | int
67 if audio_input == "-" or isinstance(audio_input, AsyncGenerator):
68 stdin = True
69 else:
70 stdin = audio_input if isinstance(audio_input, int) else False
71 stdout = audio_output if isinstance(audio_output, int) else bool(audio_output == "-")
72 super().__init__(
73 ffmpeg_args,
74 stdin=stdin,
75 stdout=stdout,
76 stderr=True,
77 )
78 self.logger = LOGGER
79
80 async def start(self) -> None:
81 """Perform Async init of process."""
82 await super().start()
83 if self.proc:
84 self.logger = LOGGER.getChild(str(self.proc.pid))
85 clean_args = []
86 for arg in self._args[1:]:
87 if arg.startswith("http"):
88 clean_args.append("<URL>")
89 elif "/" in arg and "." in arg:
90 clean_args.append("<FILE>")
91 elif arg.startswith("data:application/"):
92 clean_args.append("<DATA>")
93 else:
94 clean_args.append(arg)
95 args_str = " ".join(clean_args)
96 self.logger.log(VERBOSE_LOG_LEVEL, "started with args: %s", args_str)
97 self._stderr_reader_task = asyncio.create_task(self._log_reader_task())
98 if isinstance(self.audio_input, AsyncGenerator):
99 self._stdin_feeder_task = asyncio.create_task(self._feed_stdin())
100
101 async def communicate(
102 self,
103 input: bytes | None = None, # noqa: A002
104 timeout: float | None = None,
105 ) -> tuple[bytes, bytes]:
106 """Override communicate to avoid blocking."""
107 if self._stdin_feeder_task:
108 if not self._stdin_feeder_task.done():
109 self._stdin_feeder_task.cancel()
110 # Always await the task to consume any exception and prevent
111 # "Task exception was never retrieved" errors.
112 # Suppress CancelledError (from cancel) and any other exception
113 # since exceptions have already been propagated through the generator chain.
114 with suppress(asyncio.CancelledError, Exception):
115 await self._stdin_feeder_task
116 if self._stderr_reader_task:
117 if not self._stderr_reader_task.done():
118 self._stderr_reader_task.cancel()
119 with suppress(asyncio.CancelledError, Exception):
120 await self._stderr_reader_task
121 return await super().communicate(input, timeout)
122
123 async def _log_reader_task(self) -> None:
124 """Read ffmpeg log from stderr."""
125 decode_errors = 0
126 async for line in self.iter_stderr():
127 if self.collect_log_history:
128 self.log_history.append(line)
129 # ffmpeg logging can be quite verbose, so we only log critical errors
130 # unless verbose logging is enabled
131 if "critical" in line:
132 self.logger.error(line)
133 elif self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
134 self.logger.log(VERBOSE_LOG_LEVEL, line)
135
136 if "Invalid data found when processing input" in line:
137 decode_errors += 1
138 if decode_errors >= 50:
139 self.logger.error(line)
140
141 if "Error during demuxing" in line:
142 # this can occur if using the concat demuxer for multipart files
143 # and should raise an exception to prevent false progress logging
144 self.concat_error = True
145
146 # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
147 if line.startswith("Stream #") and ": Audio: " in line:
148 if not self._input_codec_parsed:
149 content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
150 content_type_raw = content_type_raw.split(",")[0]
151 content_type = ContentType.try_parse(content_type_raw)
152 self.logger.log(
153 VERBOSE_LOG_LEVEL,
154 "Detected (input) content type: %s (%s)",
155 content_type,
156 content_type_raw,
157 )
158 if self.input_format.content_type == ContentType.UNKNOWN:
159 self.input_format.content_type = content_type
160 self.input_format.codec_type = content_type
161 self._input_codec_parsed = True
162 del line
163
164 async def _feed_stdin(self) -> None:
165 """Feed stdin with audio chunks from an AsyncGenerator."""
166 assert not isinstance(self.audio_input, str | int)
167 generator_exhausted = False
168 cancelled = False
169 status = "running"
170 chunk_count = 0
171 self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...")
172 try:
173 start = time.time()
174 async for chunk in self.audio_input:
175 chunk_count += 1
176 if self.closed:
177 return
178 await self.write(chunk)
179 generator_exhausted = True
180 except asyncio.CancelledError:
181 status = "cancelled"
182 raise
183 except Exception:
184 status = "aborted with error"
185 raise
186 finally:
187 LOGGER.log(
188 VERBOSE_LOG_LEVEL,
189 "fill_buffer_task: %s (%s chunks received) in in %.2fs",
190 status,
191 chunk_count,
192 time.time() - start,
193 )
194 if not cancelled:
195 await self.write_eof()
196 # we need to ensure that we close the async generator
197 # if we get cancelled otherwise it keeps lingering forever
198 if not generator_exhausted:
199 await close_async_generator(self.audio_input)
200
201
202async def get_ffmpeg_stream(
203 audio_input: AsyncGenerator[bytes, None] | str,
204 input_format: AudioFormat,
205 output_format: AudioFormat,
206 filter_params: list[str] | None = None,
207 extra_args: list[str] | None = None,
208 chunk_size: int | None = None,
209 extra_input_args: list[str] | None = None,
210 extra_output_args: list[str] | None = None,
211) -> AsyncGenerator[bytes, None]:
212 """
213 Get the ffmpeg audio stream as async generator.
214
215 Takes care of resampling and/or recoding if needed,
216 according to player preferences.
217 """
218 async with FFMpeg(
219 audio_input=audio_input,
220 input_format=input_format,
221 output_format=output_format,
222 filter_params=filter_params,
223 extra_args=extra_args,
224 extra_input_args=extra_input_args,
225 extra_output_args=extra_output_args,
226 collect_log_history=True,
227 ) as ffmpeg_proc:
228 # read final chunks from stdout
229 iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
230 async for chunk in iterator:
231 yield chunk
232 if ffmpeg_proc.returncode not in (None, 0) or ffmpeg_proc.concat_error:
233 # unclean exit of ffmpeg - raise error with log tail
234 log_lines = -20 if ffmpeg_proc.concat_error else -5
235 log_tail = "\n" + "\n".join(list(ffmpeg_proc.log_history)[log_lines:])
236 raise AudioError(log_tail)
237
238
239def get_ffmpeg_args( # noqa: PLR0915
240 input_format: AudioFormat,
241 output_format: AudioFormat,
242 filter_params: list[str],
243 extra_args: list[str] | None = None,
244 input_path: str = "-",
245 output_path: str = "-",
246 extra_input_args: list[str] | None = None,
247 extra_output_args: list[str] | None = None,
248 loglevel: str = "error",
249) -> list[str]:
250 """Collect all args to send to the ffmpeg process."""
251 if extra_args is None:
252 extra_args = []
253 if extra_input_args is None:
254 extra_input_args = []
255 if extra_output_args is None:
256 extra_output_args = []
257 # generic args
258 generic_args = [
259 "ffmpeg",
260 "-hide_banner",
261 "-loglevel",
262 loglevel,
263 "-nostats",
264 "-ignore_unknown",
265 "-protocol_whitelist",
266 "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp,concat",
267 "-probesize",
268 "8096",
269 "-analyzeduration",
270 "500000", # 0.5 seconds should be enough to detect the format
271 ]
272 # collect input args
273 if "-f" in extra_input_args:
274 # input format is already specified in the extra input args
275 input_args = extra_input_args
276 else:
277 input_args = [*extra_input_args]
278 if input_path.startswith("http"):
279 # append reconnect options for direct stream from http
280 input_args += [
281 # Reconnect automatically when disconnected before EOF is hit.
282 "-reconnect",
283 "1",
284 # Set the maximum delay in seconds after which to give up reconnecting.
285 "-reconnect_delay_max",
286 "10",
287 # If set then even streamed/non seekable streams will be reconnected on errors.
288 "-reconnect_streamed",
289 "1",
290 # Reconnect automatically in case of TCP/TLS errors during connect.
291 "-reconnect_on_network_error",
292 "0",
293 # A comma separated list of HTTP status codes to reconnect on.
294 # The list can include specific status codes (e.g. 503) or the strings 4xx / 5xx.
295 "-reconnect_on_http_error",
296 "5xx,429",
297 ]
298 if input_format.content_type.is_pcm():
299 input_args += [
300 "-ac",
301 str(input_format.channels),
302 "-channel_layout",
303 "mono" if input_format.channels == 1 else "stereo",
304 "-ar",
305 str(input_format.sample_rate),
306 "-acodec",
307 input_format.content_type.name.lower(),
308 "-f",
309 input_format.content_type.value,
310 ]
311 if input_format.codec_type != ContentType.UNKNOWN:
312 input_args += ["-acodec", input_format.codec_type.name.lower()]
313
314 # add input path at the end
315 input_args += ["-i", input_path]
316
317 # collect output args
318 output_args = [
319 "-ac",
320 str(output_format.channels),
321 "-channel_layout",
322 "mono" if output_format.channels == 1 else "stereo",
323 ]
324 if output_path.upper() == "NULL":
325 # devnull stream
326 output_path = "-"
327 output_args = ["-f", "null"]
328 elif output_format.content_type.is_pcm():
329 # use explicit format identifier for pcm formats
330 output_args += [
331 "-ar",
332 str(output_format.sample_rate),
333 "-acodec",
334 output_format.content_type.name.lower(),
335 "-f",
336 output_format.content_type.value,
337 ]
338 elif output_format.content_type == ContentType.NUT:
339 # passthrough-mode (for creating the cache) using NUT container
340 output_args = [
341 "-vn",
342 "-dn",
343 "-sn",
344 "-acodec",
345 "copy",
346 "-f",
347 "nut",
348 ]
349 elif output_format.content_type == ContentType.AAC:
350 output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k"]
351 elif output_format.content_type == ContentType.MP3:
352 output_args = ["-f", "mp3", "-b:a", "320k"]
353 elif output_format.content_type == ContentType.WAV:
354 pcm_format = ContentType.from_bit_depth(output_format.bit_depth)
355 output_args = [
356 "-ar",
357 str(output_format.sample_rate),
358 "-acodec",
359 pcm_format.name.lower(),
360 "-f",
361 "wav",
362 ]
363 elif output_format.content_type == ContentType.FLAC:
364 # use level 0 compression for fastest encoding
365 sample_fmt = "s32" if output_format.bit_depth > 16 else "s16"
366 output_args += [
367 "-sample_fmt",
368 sample_fmt,
369 "-ar",
370 str(output_format.sample_rate),
371 "-f",
372 "flac",
373 "-compression_level",
374 "0",
375 ]
376 else:
377 raise RuntimeError("Invalid/unsupported output format specified")
378
379 output_args += extra_output_args # append the extra output args
380 # append (final) output path at the end of the args
381 output_args.append(output_path)
382
383 # edge case: source file is not stereo - downmix to stereo
384 if input_format.channels > 2 and output_format.channels == 2:
385 filter_params = [
386 "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE",
387 *filter_params,
388 ]
389
390 # determine if we need to do resampling (or dithering)
391 if input_format.sample_rate != output_format.sample_rate or (
392 input_format.bit_depth > 16 and output_format.bit_depth == 16
393 ):
394 libsoxr_support = get_global_cache_value(CACHE_ATTR_LIBSOXR_PRESENT)
395 # prefer resampling with libsoxr due to its high quality
396 # but skip if loudnorm filter is present, due to this bug:
397 # https://trac.ffmpeg.org/ticket/11323
398 loudnorm_present = any("loudnorm" in f for f in filter_params)
399 if libsoxr_support and not loudnorm_present:
400 resample_filter = "aresample=resampler=soxr:precision=30"
401 else:
402 resample_filter = "aresample=resampler=swr"
403
404 # sample rate conversion
405 if input_format.sample_rate != output_format.sample_rate:
406 resample_filter += f":osr={output_format.sample_rate}"
407
408 # bit depth conversion: apply dithering when going down to 16 bits
409 # this is only needed when we need to back to 16 bits
410 # when going from 32bits FP to 24 bits no dithering is needed
411 if output_format.bit_depth == 16 and input_format.bit_depth > 16:
412 resample_filter += ":osf=s16:dither_method=triangular_hp"
413
414 filter_params.append(resample_filter)
415
416 if filter_params and "-filter_complex" not in extra_args:
417 extra_args += ["-af", ",".join(filter_params)]
418
419 return generic_args + input_args + extra_args + output_args
420
421
422async def check_ffmpeg_version() -> None:
423 """Check if ffmpeg is present (with libsoxr support)."""
424 # check for FFmpeg presence
425 try:
426 returncode, output = await check_output("ffmpeg", "-version")
427 except FileNotFoundError:
428 raise AudioError(
429 "FFmpeg binary is missing from system. "
430 "Please install ffmpeg on your OS to enable playback."
431 )
432 if returncode != 0:
433 err_msg = "Error determining FFmpeg version on your system."
434 if returncode < 0:
435 # error below 0 is often illegal instruction
436 err_msg += " - Your CPU may be too old to run this version of FFmpeg."
437 err_msg += f" - Additional info: {returncode} {output.decode().strip()}"
438 raise AudioError(err_msg)
439 # parse version number from output
440 try:
441 version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0]
442 except IndexError:
443 raise AudioError(
444 "Error determining FFmpeg version on your system."
445 f"Additional info: {returncode} {output.decode().strip()}"
446 )
447 libsoxr_support = "enable-libsoxr" in output.decode()
448 # use globals as in-memory cache
449 await set_global_cache_values({CACHE_ATTR_LIBSOXR_PRESENT: libsoxr_support})
450
451 major_version = int("".join(char for char in version.split(".")[0] if not char.isalpha()))
452 if major_version < MINIMAL_FFMPEG_VERSION:
453 raise AudioError(
454 f"FFmpeg version {version} is not supported. "
455 f"Minimal version required is {MINIMAL_FFMPEG_VERSION}."
456 )
457
458 LOGGER.info(
459 "Detected ffmpeg version %s %s",
460 version,
461 "with libsoxr support" if libsoxr_support else "",
462 )
463