music-assistant-server

62.5 KBPY
audio.py
62.5 KB1,621 lines • python
1"""Various helpers for audio streaming and manipulation."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import os
8import re
9import struct
10import time
11from collections.abc import AsyncGenerator
12from functools import partial
13from io import BytesIO
14from typing import TYPE_CHECKING, Final, cast
15
16import aiofiles
17import shortuuid
18from aiohttp import ClientTimeout
19from music_assistant_models.dsp import DSPConfig, DSPDetails, DSPState
20from music_assistant_models.enums import (
21    ContentType,
22    MediaType,
23    PlayerFeature,
24    PlayerType,
25    StreamType,
26    VolumeNormalizationMode,
27)
28from music_assistant_models.errors import (
29    AudioError,
30    InvalidDataError,
31    MediaNotFoundError,
32    MusicAssistantError,
33    ProviderUnavailableError,
34)
35from music_assistant_models.media_items import AudioFormat
36from music_assistant_models.streamdetails import MultiPartPath
37
38from music_assistant.constants import (
39    CONF_ENTRY_OUTPUT_LIMITER,
40    CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
41    CONF_OUTPUT_CHANNELS,
42    CONF_VOLUME_NORMALIZATION,
43    CONF_VOLUME_NORMALIZATION_RADIO,
44    CONF_VOLUME_NORMALIZATION_TARGET,
45    CONF_VOLUME_NORMALIZATION_TRACKS,
46    MASS_LOGGER_NAME,
47    VERBOSE_LOG_LEVEL,
48)
49from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
50from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
51from music_assistant.helpers.util import clean_stream_title, remove_file
52from music_assistant.providers.sync_group.constants import SGP_PREFIX
53
54from .audio_buffer import AudioBuffer
55from .dsp import filter_to_ffmpeg_params
56from .ffmpeg import FFMpeg, get_ffmpeg_args, get_ffmpeg_stream
57from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
58from .process import AsyncProcess, communicate
59from .util import detect_charset
60
61if TYPE_CHECKING:
62    from music_assistant_models.config_entries import CoreConfig, PlayerConfig
63    from music_assistant_models.queue_item import QueueItem
64    from music_assistant_models.streamdetails import StreamDetails
65
66    from music_assistant.mass import MusicAssistant
67    from music_assistant.models.music_provider import MusicProvider
68    from music_assistant.models.player import Player
69    from music_assistant.providers.sync_group import SyncGroupPlayer
70
71LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio")
72
73# ruff: noqa: PLR0915
74
75HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
76HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
77
78SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
79
80CACHE_CATEGORY_RESOLVED_RADIO_URL: Final[int] = 100
81CACHE_PROVIDER: Final[str] = "audio"
82
83
84def align_audio_to_frame_boundary(audio_data: bytes, pcm_format: AudioFormat) -> bytes:
85    """Align audio data to frame boundaries by truncating incomplete frames.
86
87    :param audio_data: Raw PCM audio data to align.
88    :param pcm_format: AudioFormat of the audio data.
89    """
90    bytes_per_sample = pcm_format.bit_depth // 8
91    frame_size = bytes_per_sample * pcm_format.channels
92    valid_bytes = (len(audio_data) // frame_size) * frame_size
93    if valid_bytes != len(audio_data):
94        LOGGER.debug(
95            "Truncating %d bytes from audio buffer to align to frame boundary",
96            len(audio_data) - valid_bytes,
97        )
98        return audio_data[:valid_bytes]
99    return audio_data
100
101
102async def strip_silence(
103    mass: MusicAssistant,  # noqa: ARG001
104    audio_data: bytes,
105    pcm_format: AudioFormat,
106    reverse: bool = False,
107) -> bytes:
108    """Strip silence from begin or end of pcm audio using ffmpeg."""
109    args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"]
110    args += [
111        "-acodec",
112        pcm_format.content_type.name.lower(),
113        "-f",
114        pcm_format.content_type.value,
115        "-ac",
116        str(pcm_format.channels),
117        "-ar",
118        str(pcm_format.sample_rate),
119        "-i",
120        "-",
121    ]
122    # filter args
123    if reverse:
124        args += [
125            "-af",
126            "areverse,atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02,areverse",
127        ]
128    else:
129        args += [
130            "-af",
131            "atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02",
132        ]
133    # output args
134    args += ["-f", pcm_format.content_type.value, "-"]
135    _returncode, stripped_data, _stderr = await communicate(args, audio_data)
136
137    # return stripped audio
138    bytes_stripped = len(audio_data) - len(stripped_data)
139    if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
140        seconds_stripped = round(bytes_stripped / pcm_format.pcm_sample_size, 2)
141        location = "end" if reverse else "begin"
142        LOGGER.log(
143            VERBOSE_LOG_LEVEL,
144            "stripped %s seconds of silence from %s of pcm audio. bytes stripped: %s",
145            seconds_stripped,
146            location,
147            bytes_stripped,
148        )
149    return stripped_data
150
151
152def get_player_dsp_details(
153    mass: MusicAssistant, player: Player, group_preventing_dsp: bool = False
154) -> DSPDetails:
155    """Return DSP details of single a player.
156
157    This will however not check if the queried player is part of a group.
158    The caller is responsible for passing the result of is_grouping_preventing_dsp of
159    the leader/PlayerGroup as the group_preventing_dsp argument in such cases.
160    """
161    dsp_config = mass.config.get_player_dsp_config(player.player_id)
162    dsp_state = DSPState.ENABLED if dsp_config.enabled else DSPState.DISABLED
163    if dsp_state == DSPState.ENABLED and (
164        group_preventing_dsp or is_grouping_preventing_dsp(player)
165    ):
166        dsp_state = DSPState.DISABLED_BY_UNSUPPORTED_GROUP
167        dsp_config = DSPConfig(enabled=False)
168    elif dsp_state == DSPState.DISABLED:
169        # DSP is disabled by the user, remove all filters
170        dsp_config = DSPConfig(enabled=False)
171
172    # remove disabled filters
173    dsp_config.filters = [x for x in dsp_config.filters if x.enabled]
174
175    output_limiter = is_output_limiter_enabled(mass, player)
176    return DSPDetails(
177        state=dsp_state,
178        input_gain=dsp_config.input_gain,
179        filters=dsp_config.filters,
180        output_gain=dsp_config.output_gain,
181        output_limiter=output_limiter,
182        output_format=player.extra_data.get("output_format", None),
183    )
184
185
186def get_stream_dsp_details(
187    mass: MusicAssistant,
188    queue_id: str,
189) -> dict[str, DSPDetails]:
190    """Return DSP details of all players playing this queue, keyed by player_id."""
191    player = mass.players.get_player(queue_id)
192    dsp: dict[str, DSPDetails] = {}
193    assert player is not None  # for type checking
194    group_preventing_dsp = is_grouping_preventing_dsp(player)
195    output_format = None
196    is_external_group = False
197
198    if player.player_id.startswith(SGP_PREFIX):
199        if group_preventing_dsp:
200            sgp_player = cast("SyncGroupPlayer", player)
201            if sync_leader := sgp_player.sync_leader:
202                output_format = sync_leader.extra_data.get("output_format", None)
203    else:
204        # We only add real players (so skip the PlayerGroups as they only sync containing players)
205        details = get_player_dsp_details(mass, player)
206        dsp[player.player_id] = details
207        if group_preventing_dsp:
208            # The leader is responsible for sending the (combined) audio stream, so get
209            # the output format from the leader.
210            output_format = player.extra_data.get("output_format", None)
211        is_external_group = player.state.type in (PlayerType.GROUP, PlayerType.STEREO_PAIR)
212
213    # We don't enumerate all group members in case this group is externally created
214    # (e.g. a Chromecast group from the Google Home app)
215    if player and player.state.group_members and not is_external_group:
216        # grouped playback, get DSP details for each player in the group
217        for child_id in player.state.group_members:
218            # skip if we already have the details (so if it's the group leader)
219            if child_id in dsp:
220                continue
221            if child_player := mass.players.get_player(child_id):
222                dsp[child_id] = get_player_dsp_details(
223                    mass, child_player, group_preventing_dsp=group_preventing_dsp
224                )
225                if group_preventing_dsp:
226                    # Use the correct format from the group leader, since
227                    # this player is part of a group that does not support
228                    # multi device DSP processing.
229                    dsp[child_id].output_format = output_format
230    return dsp
231
232
233async def get_stream_details(
234    mass: MusicAssistant,
235    queue_item: QueueItem,
236    seek_position: int = 0,
237    fade_in: bool = False,
238    prefer_album_loudness: bool = False,
239) -> StreamDetails:
240    """
241    Get streamdetails for the given QueueItem.
242
243    This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
244    Do not try to request streamdetails too much in advance as this is expiring data.
245    """
246    streamdetails: StreamDetails | None = None
247    time_start = time.time()
248    LOGGER.debug("Getting streamdetails for %s", queue_item.uri)
249    if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
250        LOGGER.warning("seeking is not possible on duration-less streams!")
251        seek_position = 0
252
253    if not queue_item.media_item and not queue_item.streamdetails:
254        # in case of a non-media item queue item, the streamdetails should already be provided
255        # this should not happen, but guard it just in case
256        raise MediaNotFoundError(
257            f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
258        )
259    buffer: AudioBuffer | None = None
260    if queue_item.streamdetails and (
261        (queue_item.streamdetails.created_at + queue_item.streamdetails.expiration) > time.time()
262        or ((buffer := queue_item.streamdetails.buffer) and buffer.is_valid(seek_position))
263    ):
264        # already got a fresh/unused (or unexpired) streamdetails
265        streamdetails = queue_item.streamdetails
266    else:
267        # need to (re)create streamdetails
268        # retrieve streamdetails from provider
269
270        media_item = queue_item.media_item
271        assert media_item is not None  # for type checking
272        preferred_providers: list[str] = []
273        if (
274            (queue := mass.player_queues.get(queue_item.queue_id))
275            and queue.userid
276            and (playback_user := await mass.webserver.auth.get_user(queue.userid))
277            and playback_user.provider_filter
278        ):
279            # handle steering into user preferred providerinstance
280            preferred_providers = playback_user.provider_filter
281        else:
282            preferred_providers = [x.provider_instance for x in media_item.provider_mappings]
283        for allow_other_provider in (False, True):
284            # sort by quality and check item's availability
285            for prov_media in sorted(
286                media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
287            ):
288                if not prov_media.available:
289                    LOGGER.debug(f"Skipping unavailable {prov_media}")
290                    continue
291                if (
292                    not allow_other_provider
293                    and prov_media.provider_instance not in preferred_providers
294                ):
295                    continue
296                # guard that provider is available
297                music_prov = mass.get_provider(prov_media.provider_instance)
298                if TYPE_CHECKING:  # avoid circular import
299                    assert isinstance(music_prov, MusicProvider)
300                if not music_prov:
301                    LOGGER.debug(f"Skipping {prov_media} - provider not available")
302                    continue  # provider not available ?
303                # get streamdetails from provider
304                try:
305                    BYPASS_THROTTLER.set(True)
306                    streamdetails = await music_prov.get_stream_details(
307                        prov_media.item_id, media_item.media_type
308                    )
309                except MusicAssistantError as err:
310                    LOGGER.warning(str(err))
311                else:
312                    break
313                finally:
314                    BYPASS_THROTTLER.set(False)
315
316        if not streamdetails:
317            msg = f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
318            raise MediaNotFoundError(msg)
319
320        # work out how to handle radio stream
321        if (
322            streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
323            and streamdetails.media_type == MediaType.RADIO
324            and isinstance(streamdetails.path, str)
325        ):
326            resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
327            streamdetails.path = resolved_url
328            streamdetails.stream_type = stream_type
329            # Set up metadata monitoring callback for HLS radio streams
330            if stream_type == StreamType.HLS:
331                streamdetails.stream_metadata_update_callback = partial(
332                    _update_hls_radio_metadata, mass
333                )
334                streamdetails.stream_metadata_update_interval = 5
335        # handle volume normalization details
336        if result := await mass.music.get_loudness(
337            streamdetails.item_id,
338            streamdetails.provider,
339            media_type=queue_item.media_type,
340        ):
341            streamdetails.loudness = result[0]
342            streamdetails.loudness_album = result[1]
343
344    # set queue_id on the streamdetails so we know what is being streamed
345    streamdetails.queue_id = queue_item.queue_id
346    # handle skip/fade_in details
347    streamdetails.seek_position = seek_position
348    streamdetails.fade_in = fade_in
349    if not streamdetails.duration:
350        streamdetails.duration = queue_item.duration
351    streamdetails.prefer_album_loudness = prefer_album_loudness
352    player_settings = await mass.config.get_player_config(streamdetails.queue_id)
353    core_config = await mass.config.get_core_config("streams")
354    conf_volume_normalization_target = float(
355        str(player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET, -17))
356    )
357    # guard against invalid volume normalization values
358    # range and default_value are guaranteed to be set for this constant
359    volume_range = CONF_ENTRY_VOLUME_NORMALIZATION_TARGET.range
360    assert volume_range is not None
361    if (
362        conf_volume_normalization_target < volume_range[0]
363        or conf_volume_normalization_target >= volume_range[1]
364    ):
365        default_val = CONF_ENTRY_VOLUME_NORMALIZATION_TARGET.default_value
366        assert isinstance(default_val, (int, float))
367        conf_volume_normalization_target = float(default_val)
368        LOGGER.warning(
369            "Invalid volume normalization target configured for player %s, "
370            "resetting to default of %s dB",
371            streamdetails.queue_id,
372            CONF_ENTRY_VOLUME_NORMALIZATION_TARGET.default_value,
373        )
374    streamdetails.target_loudness = conf_volume_normalization_target
375    streamdetails.volume_normalization_mode = _get_normalization_mode(
376        core_config, player_settings, streamdetails
377    )
378
379    # attach the DSP details of all group members
380    streamdetails.dsp = get_stream_dsp_details(mass, streamdetails.queue_id)
381
382    LOGGER.debug(
383        "retrieved streamdetails for %s in %s milliseconds",
384        queue_item.uri,
385        int((time.time() - time_start) * 1000),
386    )
387    return streamdetails
388
389
390async def get_buffered_media_stream(
391    mass: MusicAssistant,
392    streamdetails: StreamDetails,
393    pcm_format: AudioFormat,
394    seek_position: int = 0,
395    filter_params: list[str] | None = None,
396) -> AsyncGenerator[bytes, None]:
397    """Get audio stream for given media details as raw PCM with buffering."""
398    LOGGER.log(
399        VERBOSE_LOG_LEVEL,
400        "buffered_media_stream: Starting for %s (seek: %s)",
401        streamdetails.uri,
402        seek_position,
403    )
404
405    # checksum based on filter_params
406    checksum = f"{filter_params}"
407
408    async def fill_buffer_task() -> None:
409        """Background task to fill the audio buffer."""
410        chunk_count = 0
411        status = "running"
412        try:
413            async for chunk in get_media_stream(
414                mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params
415            ):
416                chunk_count += 1
417                await audio_buffer.put(chunk)
418                # Yield to event loop to prevent blocking warnings
419                await asyncio.sleep(0)
420            # Only set EOF if we completed successfully
421            await audio_buffer.set_eof()
422        except asyncio.CancelledError:
423            status = "cancelled"
424            raise
425        except Exception:
426            status = "aborted with error"
427            raise
428        finally:
429            LOGGER.log(
430                VERBOSE_LOG_LEVEL,
431                "fill_buffer_task: %s (%s chunks) for %s",
432                status,
433                chunk_count,
434                streamdetails.uri,
435            )
436
437    # check for existing buffer and reuse if possible
438    existing_buffer: AudioBuffer | None = streamdetails.buffer
439    if existing_buffer is not None:
440        if not existing_buffer.is_valid(checksum, seek_position):
441            LOGGER.log(
442                VERBOSE_LOG_LEVEL,
443                "buffered_media_stream: Existing buffer invalid for %s (seek: %s, discarded: %s)",
444                streamdetails.uri,
445                seek_position,
446                existing_buffer._discarded_chunks,
447            )
448            await existing_buffer.clear()
449            streamdetails.buffer = None
450            existing_buffer = None
451        else:
452            LOGGER.debug(
453                "buffered_media_stream: Reusing existing buffer for %s - "
454                "available: %ss, seek: %s, discarded: %s",
455                streamdetails.uri,
456                existing_buffer.seconds_available,
457                seek_position,
458                existing_buffer._discarded_chunks,
459            )
460            audio_buffer = existing_buffer
461
462    if not existing_buffer and seek_position > 60:
463        # If seeking into the track and no valid buffer exists,
464        # just start a normal stream without buffering,
465        # otherwise we would need to fill the buffer up to the seek position first
466        # which is not efficient.
467        LOGGER.debug(
468            "buffered_media_stream: No existing buffer and seek >60s for %s, "
469            "starting normal (unbuffered) stream",
470            streamdetails.uri,
471        )
472        async for chunk in get_media_stream(
473            mass,
474            streamdetails,
475            pcm_format,
476            seek_position=seek_position,
477            filter_params=filter_params,
478        ):
479            yield chunk
480        return
481
482    if not existing_buffer:
483        # create new audio buffer and start fill task
484        LOGGER.debug(
485            "buffered_media_stream: Creating new buffer for %s",
486            streamdetails.uri,
487        )
488        audio_buffer = AudioBuffer(pcm_format, checksum)
489        streamdetails.buffer = audio_buffer
490        task = mass.loop.create_task(fill_buffer_task())
491        audio_buffer.attach_producer_task(task)
492
493    # special case: pcm format mismatch, resample on the fly
494    # this may happen in some special situations such as crossfading
495    # and its a bit of a waste to throw away the existing buffer
496    if audio_buffer.pcm_format != pcm_format:
497        LOGGER.info(
498            "buffered_media_stream: pcm format mismatch, resampling on the fly for %s - "
499            "buffer format: %s - requested format: %s",
500            streamdetails.uri,
501            audio_buffer.pcm_format,
502            pcm_format,
503        )
504        async for chunk in get_ffmpeg_stream(
505            audio_input=audio_buffer.iter(seek_position=seek_position),
506            input_format=audio_buffer.pcm_format,
507            output_format=pcm_format,
508        ):
509            yield chunk
510        return
511
512    # yield data from the buffer
513    chunk_count = 0
514    try:
515        async for chunk in audio_buffer.iter(seek_position=seek_position):
516            chunk_count += 1
517            yield chunk
518    finally:
519        LOGGER.log(
520            VERBOSE_LOG_LEVEL,
521            "buffered_media_stream: Completed, yielded %s chunks",
522            chunk_count,
523        )
524
525
526async def get_media_stream(
527    mass: MusicAssistant,
528    streamdetails: StreamDetails,
529    pcm_format: AudioFormat,
530    seek_position: int = 0,
531    filter_params: list[str] | None = None,
532) -> AsyncGenerator[bytes, None]:
533    """Get audio stream for given media details as raw PCM."""
534    logger = LOGGER.getChild("media_stream")
535    logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
536    extra_input_args = streamdetails.extra_input_args or []
537
538    # work out audio source for these streamdetails
539    audio_source: str | AsyncGenerator[bytes, None]
540    stream_type = streamdetails.stream_type
541    if stream_type == StreamType.CUSTOM:
542        music_prov = mass.get_provider(streamdetails.provider)
543        if TYPE_CHECKING:  # avoid circular import
544            assert isinstance(music_prov, MusicProvider)
545        audio_source = music_prov.get_audio_stream(
546            streamdetails,
547            seek_position=seek_position if streamdetails.can_seek else 0,
548        )
549        seek_position = 0 if streamdetails.can_seek else seek_position
550    elif stream_type == StreamType.ICY:
551        assert isinstance(streamdetails.path, str)  # for type checking
552        audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
553        seek_position = 0  # seeking not possible on radio streams
554    elif stream_type == StreamType.HLS:
555        assert isinstance(streamdetails.path, str)  # for type checking
556        substream = await get_hls_substream(mass, streamdetails.path)
557        audio_source = substream.path
558        if streamdetails.media_type == MediaType.RADIO:
559            # HLS streams (especially the BBC) struggle when they're played directly
560            # with ffmpeg, where they just stop after some minutes,
561            # so we tell ffmpeg to loop around in this case.
562            extra_input_args += ["-stream_loop", "-1", "-re"]
563    else:
564        # all other stream types (HTTP, FILE, etc)
565        if stream_type == StreamType.ENCRYPTED_HTTP:
566            assert streamdetails.decryption_key is not None  # for type checking
567            extra_input_args += ["-decryption_key", streamdetails.decryption_key]
568        if isinstance(streamdetails.path, list):
569            # multi part stream
570            audio_source = get_multi_file_stream(mass, streamdetails, seek_position)
571            seek_position = 0  # handled by get_multi_file_stream
572        else:
573            # regular single file/url stream
574            assert isinstance(streamdetails.path, str)  # for type checking
575            audio_source = streamdetails.path
576
577    # handle seek support
578    if seek_position and streamdetails.duration and streamdetails.allow_seek:
579        extra_input_args += ["-ss", str(int(seek_position))]
580
581    bytes_sent = 0
582    finished = False
583    cancelled = False
584    first_chunk_received = False
585    ffmpeg_proc = FFMpeg(
586        audio_input=audio_source,
587        input_format=streamdetails.audio_format,
588        output_format=pcm_format,
589        filter_params=filter_params,
590        extra_input_args=extra_input_args,
591        collect_log_history=True,
592        loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
593    )
594
595    try:
596        await ffmpeg_proc.start()
597        assert ffmpeg_proc.proc is not None  # for type checking
598        logger.debug(
599            "Started media stream for %s - using streamtype: %s - pcm format: %s - ffmpeg PID: %s",
600            streamdetails.uri,
601            streamdetails.stream_type,
602            pcm_format.content_type.value,
603            ffmpeg_proc.proc.pid,
604        )
605        stream_start = mass.loop.time()
606
607        chunk_size = get_chunksize(pcm_format, 1)
608        async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
609            if not first_chunk_received:
610                # At this point ffmpeg has started and should now know the codec used
611                # for encoding the audio.
612                first_chunk_received = True
613                streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type
614                logger.debug(
615                    "First chunk received after %.2f seconds (codec detected: %s)",
616                    mass.loop.time() - stream_start,
617                    ffmpeg_proc.input_format.codec_type,
618                )
619            yield chunk
620            bytes_sent += len(chunk)
621
622        # end of audio/track reached
623        logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
624        # wait until stderr also completed reading
625        await ffmpeg_proc.wait_with_timeout(5)
626        if ffmpeg_proc.returncode not in (0, None):
627            log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:])
628            raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}")
629        if bytes_sent == 0:
630            # edge case: no audio data was received at all
631            raise AudioError("No audio was received")
632        finished = True
633    except (Exception, GeneratorExit, asyncio.CancelledError) as err:
634        if isinstance(err, asyncio.CancelledError | GeneratorExit):
635            # we were cancelled, just raise
636            cancelled = True
637            raise
638        # dump the last 10 lines of the log in case of an unclean exit
639        logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
640        raise AudioError(f"Error while streaming: {err}") from err
641    finally:
642        # always ensure close is called which also handles all cleanup
643        await ffmpeg_proc.close()
644        # determine how many seconds we've received
645        # for pcm output we can calculate this easily
646        seconds_received = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
647        # store accurate duration
648        if finished and not seek_position and seconds_received:
649            streamdetails.duration = int(seconds_received)
650
651        logger.log(
652            VERBOSE_LOG_LEVEL,
653            "stream %s (with code %s) for %s",
654            "cancelled" if cancelled else "finished" if finished else "aborted",
655            ffmpeg_proc.returncode,
656            streamdetails.uri,
657        )
658
659        # parse loudnorm data if we have that collected (and enabled)
660        if (
661            (streamdetails.loudness is None or finished)
662            and streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC
663            and (finished or (seconds_received >= 300))
664        ):
665            # if dynamic volume normalization is enabled
666            # the loudnorm filter will output the measurement in the log,
667            # so we can use that directly instead of analyzing the audio
668            logger.log(VERBOSE_LOG_LEVEL, "Collecting loudness measurement...")
669            if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
670                logger.debug(
671                    "Loudness measurement for %s: %s dB",
672                    streamdetails.uri,
673                    loudness_details,
674                )
675                mass.create_task(
676                    mass.music.set_loudness(
677                        streamdetails.item_id,
678                        streamdetails.provider,
679                        loudness_details,
680                        media_type=streamdetails.media_type,
681                    )
682                )
683
684
685def create_wave_header(
686    samplerate: int = 44100, channels: int = 2, bitspersample: int = 16, duration: int | None = None
687) -> bytes:
688    """Generate a wave header from given params."""
689    file = BytesIO()
690
691    # Generate format chunk
692    format_chunk_spec = b"<4sLHHLLHH"
693    format_chunk = struct.pack(
694        format_chunk_spec,
695        b"fmt ",  # Chunk id
696        16,  # Size of this chunk (excluding chunk id and this field)
697        1,  # Audio format, 1 for PCM
698        channels,  # Number of channels
699        int(samplerate),  # Samplerate, 44100, 48000, etc.
700        int(samplerate * channels * (bitspersample / 8)),  # Byterate
701        int(channels * (bitspersample / 8)),  # Blockalign
702        bitspersample,  # 16 bits for two byte samples, etc.
703    )
704    # Generate data chunk
705    # duration = 3600*6.7
706    data_chunk_spec = b"<4sL"
707    if duration is None:
708        # use max value possible
709        datasize = 4254768000  # = 6,7 hours at 44100/16
710    else:
711        # calculate from duration
712        numsamples = samplerate * duration
713        datasize = int(numsamples * channels * (bitspersample / 8))
714    data_chunk = struct.pack(
715        data_chunk_spec,
716        b"data",  # Chunk id
717        int(datasize),  # Chunk size (excluding chunk id and this field)
718    )
719    sum_items = [
720        # "WAVE" string following size field
721        4,
722        # "fmt " + chunk size field + chunk size
723        struct.calcsize(format_chunk_spec),
724        # Size of data chunk spec + data size
725        struct.calcsize(data_chunk_spec) + datasize,
726    ]
727    # Generate main header
728    all_chunks_size = int(sum(sum_items))
729    main_header_spec = b"<4sL4s"
730    main_header = struct.pack(main_header_spec, b"RIFF", all_chunks_size, b"WAVE")
731    # Write all the contents in
732    file.write(main_header)
733    file.write(format_chunk)
734    file.write(data_chunk)
735
736    # return file.getvalue(), all_chunks_size + 8
737    return file.getvalue()
738
739
740async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, StreamType]:
741    """
742    Resolve a streaming radio URL.
743
744    Unwraps any playlists if needed.
745    Determines if the stream supports ICY metadata.
746
747    Returns tuple;
748    - unfolded URL as string
749    - StreamType to determine ICY (radio) or HLS stream.
750    """
751    if cache := await mass.cache.get(
752        key=url, provider=CACHE_PROVIDER, category=CACHE_CATEGORY_RESOLVED_RADIO_URL
753    ):
754        if TYPE_CHECKING:  # for type checking
755            cache = cast("tuple[str, str]", cache)
756        return (cache[0], StreamType(cache[1]))
757    stream_type = StreamType.HTTP
758    resolved_url = url
759    timeout = ClientTimeout(total=None, connect=10, sock_read=5)
760    try:
761        async with mass.http_session_no_ssl.get(
762            url, headers=HTTP_HEADERS_ICY, allow_redirects=True, timeout=timeout
763        ) as resp:
764            headers = resp.headers
765            resp.raise_for_status()
766            if not resp.headers:
767                raise InvalidDataError("no headers found")
768        if headers.get("icy-metaint") is not None:
769            stream_type = StreamType.ICY
770        if (
771            url.endswith((".m3u", ".m3u8", ".pls"))
772            or ".m3u?" in url
773            or ".m3u8?" in url
774            or ".pls?" in url
775            or "audio/x-mpegurl" in headers.get("content-type", "")
776            or "audio/x-scpls" in headers.get("content-type", "")
777        ):
778            # url is playlist, we need to unfold it
779            try:
780                substreams = await fetch_playlist(mass, url)
781                if not any(x for x in substreams if x.length):
782                    for line in substreams:
783                        if not line.is_url:
784                            continue
785                        # unfold first url of playlist
786                        return await resolve_radio_stream(mass, line.path)
787                    raise InvalidDataError("No content found in playlist")
788            except IsHLSPlaylist:
789                stream_type = StreamType.HLS
790
791    except Exception as err:
792        LOGGER.warning("Error while parsing radio URL %s: %s", url, str(err))
793        return (url, stream_type)
794
795    result = (resolved_url, stream_type)
796    cache_expiration = 3600 * 3
797    await mass.cache.set(
798        url,
799        result,
800        expiration=cache_expiration,
801        provider=CACHE_PROVIDER,
802        category=CACHE_CATEGORY_RESOLVED_RADIO_URL,
803    )
804    return result
805
806
807async def get_icy_radio_stream(
808    mass: MusicAssistant, url: str, streamdetails: StreamDetails
809) -> AsyncGenerator[bytes, None]:
810    """Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
811    timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
812    LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
813    async with mass.http_session_no_ssl.get(
814        url, allow_redirects=True, headers=HTTP_HEADERS_ICY, timeout=timeout
815    ) as resp:
816        headers = resp.headers
817        meta_int = int(headers["icy-metaint"])
818        while True:
819            try:
820                yield await resp.content.readexactly(meta_int)
821                meta_byte = await resp.content.readexactly(1)
822                if meta_byte == b"\x00":
823                    continue
824                meta_length = ord(meta_byte) * 16
825                meta_data = await resp.content.readexactly(meta_length)
826            except asyncio.exceptions.IncompleteReadError:
827                break
828            if not meta_data:
829                continue
830            meta_data = meta_data.rstrip(b"\0")
831            stream_title_re = re.search(rb"StreamTitle='([^']*)';", meta_data)
832            if not stream_title_re:
833                continue
834            try:
835                # in 99% of the cases the stream title is utf-8 encoded
836                stream_title = stream_title_re.group(1).decode("utf-8")
837            except UnicodeDecodeError:
838                # fallback to iso-8859-1
839                stream_title = stream_title_re.group(1).decode("iso-8859-1", errors="replace")
840            cleaned_stream_title = clean_stream_title(stream_title)
841            if cleaned_stream_title != streamdetails.stream_title:
842                LOGGER.log(
843                    VERBOSE_LOG_LEVEL,
844                    "ICY Radio streamtitle original: %s",
845                    stream_title,
846                )
847                LOGGER.log(
848                    VERBOSE_LOG_LEVEL,
849                    "ICY Radio streamtitle cleaned: %s",
850                    cleaned_stream_title,
851                )
852                streamdetails.stream_title = cleaned_stream_title
853
854
855def parse_extinf_metadata(extinf_line: str) -> dict[str, str]:
856    """
857    Parse metadata from HLS EXTINF line.
858
859    Extracts structured metadata like title="...", artist="..." from EXTINF lines.
860    Common in iHeartRadio and other commercial radio HLS streams.
861
862    :param extinf_line: The EXTINF line containing metadata
863    """
864    metadata = {}
865
866    # Pattern to match key="value" pairs in the EXTINF line
867    # Handles nested quotes by matching everything until the closing quote
868    pattern = r'(\w+)="([^"]*)"'
869
870    matches = re.findall(pattern, extinf_line)
871    for key, value in matches:
872        metadata[key.lower()] = value
873
874    return metadata
875
876
877async def _update_hls_radio_metadata(
878    mass: MusicAssistant,
879    streamdetails: StreamDetails,
880    elapsed_time: int,  # noqa: ARG001
881) -> None:
882    """
883    Update HLS radio stream metadata by fetching the playlist.
884
885    Fetches the HLS playlist and extracts metadata from EXTINF lines.
886
887    :param mass: MusicAssistant instance
888    :param streamdetails: StreamDetails object to update with metadata
889    :param elapsed_time: Current playback position in seconds (unused for live radio)
890    """
891    try:
892        # Get the actual media playlist URL from cache or resolve it
893        # We cache the media_playlist_url in streamdetails.data to avoid re-resolving
894        if streamdetails.data is None:
895            streamdetails.data = {}
896        media_playlist_url = streamdetails.data.get("hls_media_playlist_url")
897        if not media_playlist_url:
898            try:
899                assert isinstance(streamdetails.path, str)  # for type checking
900                substream = await get_hls_substream(mass, streamdetails.path)
901                media_playlist_url = substream.path
902                streamdetails.data["hls_media_playlist_url"] = media_playlist_url
903            except Exception as err:
904                LOGGER.warning(
905                    "Failed to resolve HLS substream for metadata monitoring: %s",
906                    err,
907                )
908                return
909
910        # Fetch the media playlist
911        timeout = ClientTimeout(total=0, connect=10, sock_read=30)
912        async with mass.http_session_no_ssl.get(media_playlist_url, timeout=timeout) as resp:
913            resp.raise_for_status()
914            playlist_content = await resp.text()
915
916        # Parse the playlist and look for EXTINF metadata
917        # The most recent segment usually has the current metadata
918        lines = playlist_content.strip().split("\n")
919        for line in reversed(lines):
920            if line.startswith("#EXTINF:"):
921                # Extract metadata from EXTINF line
922                metadata = parse_extinf_metadata(line)
923
924                # Build stream title from title and artist
925                title = metadata.get("title", "")
926                artist = metadata.get("artist", "")
927
928                if title or artist:
929                    # Format as "Artist - Title"
930                    if artist and title:
931                        stream_title = f"{artist} - {title}"
932                    elif title:
933                        stream_title = title
934                    else:
935                        stream_title = artist
936
937                    # Clean the stream title
938                    cleaned_title = clean_stream_title(stream_title)
939
940                    # Only update if changed
941                    if cleaned_title != streamdetails.stream_title and cleaned_title:
942                        LOGGER.log(
943                            VERBOSE_LOG_LEVEL,
944                            "HLS Radio metadata updated: %s",
945                            cleaned_title,
946                        )
947                        streamdetails.stream_title = cleaned_title
948
949                # Only check the most recent EXTINF
950                break
951
952    except Exception as err:
953        LOGGER.debug(
954            "Error fetching HLS metadata: %s",
955            err,
956        )
957
958
959async def get_hls_substream(
960    mass: MusicAssistant,
961    url: str,
962) -> PlaylistItem:
963    """Select the (highest quality) HLS substream for given HLS playlist/URL."""
964    timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
965    # fetch master playlist and select (best) child playlist
966    # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
967    async with mass.http_session_no_ssl.get(
968        url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
969    ) as resp:
970        resp.raise_for_status()
971        raw_data = await resp.read()
972        encoding = resp.charset or await detect_charset(raw_data)
973        master_m3u_data = raw_data.decode(encoding)
974    substreams = parse_m3u(master_m3u_data)
975    # There is a chance that we did not get a master playlist with subplaylists
976    # but just a single master/sub playlist with the actual audio stream(s)
977    # so we need to detect if the playlist child's contain audio streams or
978    # sub-playlists.
979    if any(
980        x
981        for x in substreams
982        if (x.length or x.path.endswith((".mp4", ".aac")))
983        and not x.path.endswith((".m3u", ".m3u8"))
984    ):
985        return PlaylistItem(path=url, key=substreams[0].key)
986    # sort substreams on best quality (highest bandwidth) when available
987    if any(x for x in substreams if x.stream_info):
988        substreams.sort(
989            key=lambda x: int(
990                x.stream_info.get("BANDWIDTH", "0") if x.stream_info is not None else 0
991            ),
992            reverse=True,
993        )
994    substream = substreams[0]
995    if not substream.path.startswith("http"):
996        # path is relative, stitch it together
997        base_path = url.rsplit("/", 1)[0]
998        substream.path = base_path + "/" + substream.path
999    return substream
1000
1001
1002async def get_http_stream(
1003    mass: MusicAssistant,
1004    url: str,
1005    streamdetails: StreamDetails,
1006    seek_position: int = 0,
1007    verify_ssl: bool = True,
1008) -> AsyncGenerator[bytes, None]:
1009    """Get audio stream from HTTP."""
1010    LOGGER.debug("Start HTTP stream for %s (seek_position %s)", streamdetails.uri, seek_position)
1011    if seek_position:
1012        assert streamdetails.duration, "Duration required for seek requests"
1013    http_session = mass.http_session if verify_ssl else mass.http_session_no_ssl
1014    # try to get filesize with a head request
1015    seek_supported = streamdetails.can_seek
1016    if seek_position or not streamdetails.size:
1017        async with http_session.head(url, allow_redirects=True, headers=HTTP_HEADERS) as resp:
1018            resp.raise_for_status()
1019            if size := resp.headers.get("Content-Length"):
1020                streamdetails.size = int(size)
1021            seek_supported = resp.headers.get("Accept-Ranges") == "bytes"
1022    # headers
1023    headers = {**HTTP_HEADERS}
1024    timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
1025    skip_bytes = 0
1026    if seek_position and streamdetails.size:
1027        assert streamdetails.duration is not None  # for type checking
1028        skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position)
1029        headers["Range"] = f"bytes={skip_bytes}-{streamdetails.size}"
1030
1031    # seeking an unknown or container format is not supported due to the (moov) headers
1032    if seek_position and (
1033        not seek_supported
1034        or streamdetails.audio_format.content_type
1035        in (
1036            ContentType.UNKNOWN,
1037            ContentType.M4A,
1038            ContentType.M4B,
1039        )
1040    ):
1041        LOGGER.warning(
1042            "Seeking in %s (%s) not possible.",
1043            streamdetails.uri,
1044            streamdetails.audio_format.output_format_str,
1045        )
1046        seek_position = 0
1047        streamdetails.seek_position = 0
1048
1049    # start the streaming from http
1050    bytes_received = 0
1051    async with http_session.get(
1052        url, allow_redirects=True, headers=headers, timeout=timeout
1053    ) as resp:
1054        is_partial = resp.status == 206
1055        if seek_position and not is_partial:
1056            raise InvalidDataError("HTTP source does not support seeking!")
1057        resp.raise_for_status()
1058        async for chunk in resp.content.iter_any():
1059            bytes_received += len(chunk)
1060            yield chunk
1061
1062    # store size on streamdetails for later use
1063    if not streamdetails.size:
1064        streamdetails.size = bytes_received
1065    LOGGER.debug(
1066        "Finished HTTP stream for %s (transferred %s/%s bytes)",
1067        streamdetails.uri,
1068        bytes_received,
1069        streamdetails.size,
1070    )
1071
1072
1073async def get_file_stream(
1074    mass: MusicAssistant,  # noqa: ARG001
1075    filename: str,
1076    streamdetails: StreamDetails,
1077    seek_position: int = 0,
1078) -> AsyncGenerator[bytes, None]:
1079    """Get audio stream from local accessible file."""
1080    if seek_position:
1081        assert streamdetails.duration, "Duration required for seek requests"
1082    if not streamdetails.size:
1083        stat = await asyncio.to_thread(os.stat, filename)
1084        streamdetails.size = stat.st_size
1085
1086    # seeking an unknown or container format is not supported due to the (moov) headers
1087    if seek_position and (
1088        streamdetails.audio_format.content_type
1089        in (
1090            ContentType.UNKNOWN,
1091            ContentType.M4A,
1092            ContentType.M4B,
1093            ContentType.MP4,
1094        )
1095    ):
1096        LOGGER.warning(
1097            "Seeking in %s (%s) not possible.",
1098            streamdetails.uri,
1099            streamdetails.audio_format.output_format_str,
1100        )
1101        seek_position = 0
1102        streamdetails.seek_position = 0
1103
1104    chunk_size = get_chunksize(streamdetails.audio_format)
1105    async with aiofiles.open(streamdetails.data, "rb") as _file:
1106        if seek_position:
1107            assert streamdetails.duration is not None  # for type checking
1108            seek_pos = int((streamdetails.size / streamdetails.duration) * seek_position)
1109            await _file.seek(seek_pos)
1110        # yield chunks of data from file
1111        while True:
1112            data = await _file.read(chunk_size)
1113            if not data:
1114                break
1115            yield data
1116
1117
1118def _get_parts_from_position(
1119    parts: list[MultiPartPath], seek_position: int
1120) -> tuple[list[MultiPartPath], int]:
1121    """Get the remaining parts list from a timestamp.
1122
1123    Arguments:
1124    parts: The list of  parts
1125    seek_position: The seeking position in seconds of the tracklist
1126
1127    Returns:
1128        In a tuple, A list of  parts, starting with the one at the requested
1129        seek position and the position in seconds to seek to in the first
1130        track.
1131    """
1132    skipped_duration = 0.0
1133    for i, part in enumerate(parts):
1134        if not isinstance(part, MultiPartPath):
1135            raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1136        if part.duration is None:
1137            return parts, seek_position
1138        if skipped_duration + part.duration < seek_position:
1139            skipped_duration += part.duration
1140            continue
1141
1142        position = seek_position - skipped_duration
1143
1144        # Seeking in some parts is inaccurate, making the seek to a chapter land on the end of
1145        # the previous track. If we're within 2 second of the end, skip the current track
1146        if position + 2 >= part.duration:
1147            LOGGER.debug(
1148                f"Skipping to the next part due to seek position being at the end: {position}"
1149            )
1150            if i + 1 < len(parts):
1151                return parts[i + 1 :], 0
1152            return parts[i:], int(position)  # last part, cannot skip
1153
1154        return parts[i:], int(position)
1155
1156    raise IndexError(f"Could not find any candidate part for position {seek_position}")
1157
1158
1159async def get_multi_file_stream(
1160    mass: MusicAssistant,  # noqa: ARG001
1161    streamdetails: StreamDetails,
1162    seek_position: int = 0,
1163) -> AsyncGenerator[bytes, None]:
1164    """Return audio stream for a concatenation of multiple files.
1165
1166    Arguments:
1167    seek_position: The position to seek to in seconds
1168    """
1169    if not isinstance(streamdetails.path, list):
1170        raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1171    parts, seek_position = _get_parts_from_position(streamdetails.path, seek_position)
1172    files_list = [part.path for part in parts]
1173
1174    # concat input files
1175    temp_file = f"/tmp/{shortuuid.random(20)}.txt"  # noqa: S108
1176    async with aiofiles.open(temp_file, "w") as f:
1177        for path in files_list:
1178            await f.write(f"file '{path}'\n")
1179
1180    try:
1181        async for chunk in get_ffmpeg_stream(
1182            audio_input=temp_file,
1183            input_format=streamdetails.audio_format,
1184            output_format=AudioFormat(
1185                content_type=ContentType.NUT,
1186                sample_rate=streamdetails.audio_format.sample_rate,
1187                bit_depth=streamdetails.audio_format.bit_depth,
1188                channels=streamdetails.audio_format.channels,
1189            ),
1190            extra_input_args=[
1191                "-safe",
1192                "0",
1193                "-f",
1194                "concat",
1195                "-i",
1196                temp_file,
1197                "-ss",
1198                str(seek_position),
1199            ],
1200        ):
1201            yield chunk
1202    finally:
1203        await remove_file(temp_file)
1204
1205
1206async def get_preview_stream(
1207    mass: MusicAssistant,
1208    provider_instance_id_or_domain: str,
1209    item_id: str,
1210    media_type: MediaType = MediaType.TRACK,
1211) -> AsyncGenerator[bytes, None]:
1212    """Create a 30 seconds preview audioclip for the given streamdetails."""
1213    if not (music_prov := mass.get_provider(provider_instance_id_or_domain)):
1214        raise ProviderUnavailableError
1215    if TYPE_CHECKING:  # avoid circular import
1216        assert isinstance(music_prov, MusicProvider)
1217
1218    # Validate that item_id corresponds to a valid item in the provider for security
1219    if not await music_prov.get_item(media_type, item_id):
1220        msg = f"Item {item_id} not found in provider {provider_instance_id_or_domain}"
1221        raise MediaNotFoundError(msg)
1222
1223    streamdetails = await music_prov.get_stream_details(item_id, media_type)
1224    pcm_format = AudioFormat(
1225        content_type=ContentType.from_bit_depth(streamdetails.audio_format.bit_depth),
1226        sample_rate=streamdetails.audio_format.sample_rate,
1227        bit_depth=streamdetails.audio_format.bit_depth,
1228        channels=streamdetails.audio_format.channels,
1229    )
1230    async for chunk in get_ffmpeg_stream(
1231        audio_input=get_media_stream(
1232            mass=mass,
1233            streamdetails=streamdetails,
1234            pcm_format=pcm_format,
1235        ),
1236        input_format=pcm_format,
1237        output_format=AudioFormat(content_type=ContentType.AAC),
1238        extra_input_args=["-t", "30"],  # cut after 30 seconds
1239    ):
1240        yield chunk
1241
1242
1243async def get_silence(
1244    duration: int,
1245    output_format: AudioFormat,
1246) -> AsyncGenerator[bytes, None]:
1247    """Create stream of silence, encoded to format of choice."""
1248    if output_format.content_type.is_pcm():
1249        # pcm = just zeros
1250        for _ in range(duration):
1251            yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1252        return
1253    if output_format.content_type == ContentType.WAV:
1254        # wav silence = wave header + zero's
1255        yield create_wave_header(
1256            samplerate=output_format.sample_rate,
1257            channels=2,
1258            bitspersample=output_format.bit_depth,
1259            duration=duration,
1260        )
1261        for _ in range(duration):
1262            yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1263        return
1264    # use ffmpeg for all other encodings
1265    args = [
1266        "ffmpeg",
1267        "-hide_banner",
1268        "-loglevel",
1269        "quiet",
1270        "-f",
1271        "lavfi",
1272        "-i",
1273        f"anullsrc=r={output_format.sample_rate}:cl={'stereo'}",
1274        "-t",
1275        str(duration),
1276        "-f",
1277        output_format.output_format_str,
1278        "-",
1279    ]
1280    async with AsyncProcess(args, stdout=True) as ffmpeg_proc:
1281        async for chunk in ffmpeg_proc.iter_chunked():
1282            yield chunk
1283
1284
1285async def resample_pcm_audio(
1286    input_audio: bytes,
1287    input_format: AudioFormat,
1288    output_format: AudioFormat,
1289) -> bytes:
1290    """
1291    Resample (a chunk of) PCM audio from input_format to output_format using ffmpeg.
1292
1293    :param input_audio: Raw PCM audio data to resample.
1294    :param input_format: AudioFormat of the input audio.
1295    :param output_format: Desired AudioFormat for the output audio.
1296
1297    :return: Resampled audio data, frame-aligned. Returns empty bytes if resampling fails.
1298    """
1299    if input_format == output_format:
1300        return input_audio
1301    LOGGER.log(VERBOSE_LOG_LEVEL, f"Resampling audio from {input_format} to {output_format}")
1302    try:
1303        ffmpeg_args = get_ffmpeg_args(
1304            input_format=input_format, output_format=output_format, filter_params=[]
1305        )
1306        _, stdout, stderr = await communicate(ffmpeg_args, input_audio)
1307        if not stdout:
1308            LOGGER.error(
1309                "Resampling failed: no output from ffmpeg. Input: %s, Output: %s, stderr: %s",
1310                input_format,
1311                output_format,
1312                stderr.decode() if stderr else "(no stderr)",
1313            )
1314            return b""
1315        # Ensure frame alignment after resampling
1316        return align_audio_to_frame_boundary(stdout, output_format)
1317    except Exception as err:
1318        LOGGER.exception(
1319            "Failed to resample audio from %s to %s: %s",
1320            input_format,
1321            output_format,
1322            err,
1323        )
1324        return b""
1325
1326
1327def get_chunksize(
1328    fmt: AudioFormat,
1329    seconds: float = 1,
1330) -> int:
1331    """Get a default chunk/file size for given contenttype in bytes."""
1332    pcm_size = int(fmt.sample_rate * (fmt.bit_depth / 8) * fmt.channels * seconds)
1333    if fmt.content_type.is_pcm() or fmt.content_type == ContentType.WAV:
1334        return pcm_size
1335    if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
1336        return pcm_size
1337    if fmt.bit_rate and fmt.bit_rate < 10000:
1338        return int(((fmt.bit_rate * 1000) / 8) * seconds)
1339    if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
1340        # assume 74.7% compression ratio (level 0)
1341        # source: https://z-issue.com/wp/flac-compression-level-comparison/
1342        return int(pcm_size * 0.747)
1343    if fmt.content_type in (ContentType.MP3, ContentType.OGG):
1344        return int((320000 / 8) * seconds)
1345    if fmt.content_type in (ContentType.AAC, ContentType.M4A):
1346        return int((256000 / 8) * seconds)
1347    return int((320000 / 8) * seconds)
1348
1349
1350def is_grouping_preventing_dsp(player: Player) -> bool:
1351    """Check if grouping is preventing DSP from being applied to this leader/PlayerGroup.
1352
1353    If this returns True, no DSP should be applied to the player.
1354    This function will not check if the Player is in a group, the caller should do that first.
1355    """
1356    # We require the caller to handle non-leader cases themselves since player.state.synced_to
1357    # can be unreliable in some edge cases
1358    multi_device_dsp_supported = PlayerFeature.MULTI_DEVICE_DSP in player.state.supported_features
1359    child_count = len(player.state.group_members) if player.state.group_members else 0
1360
1361    is_multiple_devices: bool
1362    if player.provider.domain == "player_group":
1363        # PlayerGroups have no leader, so having a child count of 1 means
1364        # the group actually contains only a single player.
1365        is_multiple_devices = child_count > 1
1366    elif player.state.type == PlayerType.GROUP:
1367        # This is an group player external to Music Assistant.
1368        is_multiple_devices = True
1369    else:
1370        is_multiple_devices = child_count > 0
1371    return is_multiple_devices and not multi_device_dsp_supported
1372
1373
1374def is_output_limiter_enabled(mass: MusicAssistant, player: Player) -> bool:
1375    """Check if the player has the output limiter enabled.
1376
1377    Unlike DSP, the limiter is still configurable when synchronized without MULTI_DEVICE_DSP.
1378    So in grouped scenarios without MULTI_DEVICE_DSP, the permanent sync group or the leader gets
1379    decides if the limiter should be turned on or not.
1380    """
1381    deciding_player_id = player.player_id
1382    if player.state.active_group:
1383        # Syncgroup, get from the group player
1384        deciding_player_id = player.state.active_group
1385    elif player.state.synced_to:
1386        # Not in sync group, but synced, get from the leader
1387        deciding_player_id = player.state.synced_to
1388    output_limiter_enabled = mass.config.get_raw_player_config_value(
1389        deciding_player_id,
1390        CONF_ENTRY_OUTPUT_LIMITER.key,
1391        CONF_ENTRY_OUTPUT_LIMITER.default_value,
1392    )
1393    return bool(output_limiter_enabled)
1394
1395
1396def get_player_filter_params(
1397    mass: MusicAssistant,
1398    player_id: str,
1399    input_format: AudioFormat,
1400    output_format: AudioFormat,
1401) -> list[str]:
1402    """Get player specific filter parameters for ffmpeg (if any)."""
1403    filter_params = []
1404
1405    player = mass.players.get_player(player_id)
1406    # In case this is a protocol player, their DSP config is stored
1407    # under the parent (native or universal) player that wraps them.
1408    dsp_player_id = player_id
1409    if player and player.protocol_parent_id:
1410        dsp_player_id = player.protocol_parent_id
1411    dsp = mass.config.get_player_dsp_config(dsp_player_id)
1412    limiter_enabled = True
1413
1414    if player:
1415        if is_grouping_preventing_dsp(player):
1416            # We can not correctly apply DSP to a grouped player without multi-device DSP support,
1417            # so we disable it.
1418            dsp.enabled = False
1419        elif player.provider.domain == "player_group" and (
1420            PlayerFeature.MULTI_DEVICE_DSP not in player.state.supported_features
1421        ):
1422            # This is a special case! We have a player group where:
1423            # - The group leader does not support MULTI_DEVICE_DSP
1424            # - But only contains a single player (since nothing is preventing DSP)
1425            # We can still apply the DSP of that single player.
1426            if player.state.group_members:
1427                child_player = mass.players.get_player(player.state.group_members[0])
1428                assert child_player is not None  # for type checking
1429                dsp = mass.config.get_player_dsp_config(child_player.player_id)
1430            else:
1431                # This should normally never happen, but if it does, we disable DSP.
1432                dsp.enabled = False
1433
1434        # We here implicitly know what output format is used for the player
1435        # in the audio processing steps. We save this information to
1436        # later be able to show this to the user in the UI.
1437        player.extra_data["output_format"] = output_format
1438
1439        limiter_enabled = is_output_limiter_enabled(mass, player)
1440
1441    if dsp.enabled:
1442        # Apply input gain
1443        if dsp.input_gain != 0:
1444            filter_params.append(f"volume={dsp.input_gain}dB")
1445
1446        # Process each DSP filter sequentially
1447        for f in dsp.filters:
1448            if not f.enabled:
1449                continue
1450
1451            # Apply filter
1452            filter_params.extend(filter_to_ffmpeg_params(f, input_format))
1453
1454        # Apply output gain
1455        if dsp.output_gain != 0:
1456            filter_params.append(f"volume={dsp.output_gain}dB")
1457
1458    conf_channels = mass.config.get_raw_player_config_value(
1459        player_id, CONF_OUTPUT_CHANNELS, "stereo"
1460    )
1461
1462    # handle output mixing only left or right
1463    if conf_channels == "left":
1464        filter_params.append("pan=mono|c0=FL")
1465    elif conf_channels == "right":
1466        filter_params.append("pan=mono|c0=FR")
1467
1468    # Add safety limiter at the end
1469    if limiter_enabled:
1470        filter_params.append("alimiter=limit=-2dB:level=false:asc=true")
1471
1472    LOGGER.debug("Generated ffmpeg params for player %s: %s", player_id, filter_params)
1473    return filter_params
1474
1475
1476def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
1477    """Parse Loudness measurement from ffmpeg stderr output."""
1478    stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
1479    if "[Parsed_loudnorm_0 @" not in stderr_data:
1480        return None
1481    for jsun_chunk in stderr_data.split(" { "):
1482        try:
1483            stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}"
1484            loudness_data = json_loads(stderr_data)
1485            return float(loudness_data["input_i"])
1486        except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError):
1487            continue
1488    return None
1489
1490
1491async def analyze_loudness(
1492    mass: MusicAssistant,
1493    streamdetails: StreamDetails,
1494) -> None:
1495    """Analyze media item's audio, to calculate EBU R128 loudness."""
1496    if await mass.music.get_loudness(
1497        streamdetails.item_id,
1498        streamdetails.provider,
1499        media_type=streamdetails.media_type,
1500    ):
1501        # only when needed we do the analyze job
1502        return
1503
1504    logger = LOGGER.getChild("analyze_loudness")
1505    logger.debug("Start analyzing audio for %s", streamdetails.uri)
1506
1507    extra_input_args = [
1508        # limit to 10 minutes to reading too much in memory
1509        "-t",
1510        "600",
1511    ]
1512    # work out audio source for these streamdetails
1513    stream_type = streamdetails.stream_type
1514    audio_source: str | AsyncGenerator[bytes, None]
1515    if stream_type == StreamType.CUSTOM:
1516        music_prov = mass.get_provider(streamdetails.provider)
1517        if TYPE_CHECKING:  # avoid circular import
1518            assert isinstance(music_prov, MusicProvider)
1519        audio_source = music_prov.get_audio_stream(streamdetails)
1520    elif stream_type == StreamType.ICY:
1521        assert isinstance(streamdetails.path, str)  # for type checking
1522        audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
1523    elif stream_type == StreamType.HLS:
1524        assert isinstance(streamdetails.path, str)  # for type checking
1525        substream = await get_hls_substream(mass, streamdetails.path)
1526        audio_source = substream.path
1527    else:
1528        # all other stream types (HTTP, FILE, etc)
1529        if stream_type == StreamType.ENCRYPTED_HTTP:
1530            assert streamdetails.decryption_key is not None  # for type checking
1531            extra_input_args += ["-decryption_key", streamdetails.decryption_key]
1532        if isinstance(streamdetails.path, list):
1533            # multi part stream - just use a single file for the measurement
1534            audio_source = streamdetails.path[1].path
1535        else:
1536            # regular single file/url stream
1537            assert isinstance(streamdetails.path, str)  # for type checking
1538            audio_source = streamdetails.path
1539
1540    # calculate BS.1770 R128 integrated loudness with ffmpeg
1541    async with FFMpeg(
1542        audio_input=audio_source,
1543        input_format=streamdetails.audio_format,
1544        output_format=streamdetails.audio_format,
1545        audio_output="NULL",
1546        filter_params=["ebur128=framelog=verbose"],
1547        extra_input_args=extra_input_args,
1548        collect_log_history=True,
1549        loglevel="info",
1550    ) as ffmpeg_proc:
1551        await ffmpeg_proc.wait()
1552        log_lines = ffmpeg_proc.log_history
1553        log_lines_str = "\n".join(log_lines)
1554        try:
1555            loudness_str = (
1556                log_lines_str.split("Integrated loudness")[1].split("I:")[1].split("LUFS")[0]
1557            )
1558            loudness = float(loudness_str.strip())
1559        except (IndexError, ValueError, AttributeError):
1560            LOGGER.warning(
1561                "Could not determine integrated loudness of %s - %s",
1562                streamdetails.uri,
1563                log_lines_str or "received empty value",
1564            )
1565        else:
1566            await mass.music.set_loudness(
1567                streamdetails.item_id,
1568                streamdetails.provider,
1569                loudness,
1570                media_type=streamdetails.media_type,
1571            )
1572            logger.debug(
1573                "Integrated loudness of %s is: %s",
1574                streamdetails.uri,
1575                loudness,
1576            )
1577
1578
1579def _get_normalization_mode(
1580    core_config: CoreConfig, player_config: PlayerConfig, streamdetails: StreamDetails
1581) -> VolumeNormalizationMode:
1582    if not player_config.get_value(CONF_VOLUME_NORMALIZATION):
1583        # disabled for this player
1584        return VolumeNormalizationMode.DISABLED
1585    if streamdetails.target_loudness is None:
1586        # no target loudness set, disable normalization
1587        return VolumeNormalizationMode.DISABLED
1588    # work out preference for track or radio
1589    preference = VolumeNormalizationMode(
1590        str(
1591            core_config.get_value(
1592                CONF_VOLUME_NORMALIZATION_RADIO
1593                if streamdetails.media_type == MediaType.RADIO
1594                else CONF_VOLUME_NORMALIZATION_TRACKS,
1595            )
1596        )
1597    )
1598
1599    # handle no measurement available but fallback to dynamic mode is allowed
1600    if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_DYNAMIC:
1601        return VolumeNormalizationMode.DYNAMIC
1602
1603    # handle no measurement available and no fallback allowed
1604    if streamdetails.loudness is None and preference == VolumeNormalizationMode.MEASUREMENT_ONLY:
1605        return VolumeNormalizationMode.DISABLED
1606
1607    # handle no measurement available and fallback to fixed gain is allowed
1608    if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_FIXED_GAIN:
1609        return VolumeNormalizationMode.FIXED_GAIN
1610
1611    # handle measurement available - chosen mode is measurement
1612    if streamdetails.loudness is not None and preference not in (
1613        VolumeNormalizationMode.DISABLED,
1614        VolumeNormalizationMode.FIXED_GAIN,
1615        VolumeNormalizationMode.DYNAMIC,
1616    ):
1617        return VolumeNormalizationMode.MEASUREMENT_ONLY
1618
1619    # simply return the preference
1620    return preference
1621