music-assistant-server

61.5 KBPY
audio.py
61.5 KB1,602 lines • python
1"""Various helpers for audio streaming and manipulation."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import os
8import re
9import struct
10import time
11from collections.abc import AsyncGenerator
12from functools import partial
13from io import BytesIO
14from typing import TYPE_CHECKING, Final, cast
15
16import aiofiles
17import shortuuid
18from aiohttp import ClientTimeout
19from music_assistant_models.dsp import DSPConfig, DSPDetails, DSPState
20from music_assistant_models.enums import (
21    ContentType,
22    MediaType,
23    PlayerFeature,
24    PlayerType,
25    StreamType,
26    VolumeNormalizationMode,
27)
28from music_assistant_models.errors import (
29    AudioError,
30    InvalidDataError,
31    MediaNotFoundError,
32    MusicAssistantError,
33    ProviderUnavailableError,
34)
35from music_assistant_models.media_items import AudioFormat
36from music_assistant_models.streamdetails import MultiPartPath
37
38from music_assistant.constants import (
39    CONF_ENTRY_OUTPUT_LIMITER,
40    CONF_OUTPUT_CHANNELS,
41    CONF_VOLUME_NORMALIZATION,
42    CONF_VOLUME_NORMALIZATION_RADIO,
43    CONF_VOLUME_NORMALIZATION_TARGET,
44    CONF_VOLUME_NORMALIZATION_TRACKS,
45    MASS_LOGGER_NAME,
46    VERBOSE_LOG_LEVEL,
47)
48from music_assistant.controllers.players.sync_groups import SyncGroupPlayer
49from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
50from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
51from music_assistant.helpers.util import clean_stream_title, remove_file
52
53from .audio_buffer import AudioBuffer
54from .dsp import filter_to_ffmpeg_params
55from .ffmpeg import FFMpeg, get_ffmpeg_args, get_ffmpeg_stream
56from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
57from .process import AsyncProcess, communicate
58from .util import detect_charset
59
60if TYPE_CHECKING:
61    from music_assistant_models.config_entries import CoreConfig, PlayerConfig
62    from music_assistant_models.queue_item import QueueItem
63    from music_assistant_models.streamdetails import StreamDetails
64
65    from music_assistant.mass import MusicAssistant
66    from music_assistant.models.music_provider import MusicProvider
67    from music_assistant.models.player import Player
68
69LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio")
70
71# ruff: noqa: PLR0915
72
73HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
74HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
75
76SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
77
78CACHE_CATEGORY_RESOLVED_RADIO_URL: Final[int] = 100
79CACHE_PROVIDER: Final[str] = "audio"
80
81
82def align_audio_to_frame_boundary(audio_data: bytes, pcm_format: AudioFormat) -> bytes:
83    """Align audio data to frame boundaries by truncating incomplete frames.
84
85    :param audio_data: Raw PCM audio data to align.
86    :param pcm_format: AudioFormat of the audio data.
87    """
88    bytes_per_sample = pcm_format.bit_depth // 8
89    frame_size = bytes_per_sample * pcm_format.channels
90    valid_bytes = (len(audio_data) // frame_size) * frame_size
91    if valid_bytes != len(audio_data):
92        LOGGER.debug(
93            "Truncating %d bytes from audio buffer to align to frame boundary",
94            len(audio_data) - valid_bytes,
95        )
96        return audio_data[:valid_bytes]
97    return audio_data
98
99
100async def strip_silence(
101    mass: MusicAssistant,  # noqa: ARG001
102    audio_data: bytes,
103    pcm_format: AudioFormat,
104    reverse: bool = False,
105) -> bytes:
106    """Strip silence from begin or end of pcm audio using ffmpeg."""
107    args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"]
108    args += [
109        "-acodec",
110        pcm_format.content_type.name.lower(),
111        "-f",
112        pcm_format.content_type.value,
113        "-ac",
114        str(pcm_format.channels),
115        "-ar",
116        str(pcm_format.sample_rate),
117        "-i",
118        "-",
119    ]
120    # filter args
121    if reverse:
122        args += [
123            "-af",
124            "areverse,atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02,areverse",
125        ]
126    else:
127        args += [
128            "-af",
129            "atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02",
130        ]
131    # output args
132    args += ["-f", pcm_format.content_type.value, "-"]
133    _returncode, stripped_data, _stderr = await communicate(args, audio_data)
134
135    # return stripped audio
136    bytes_stripped = len(audio_data) - len(stripped_data)
137    if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
138        seconds_stripped = round(bytes_stripped / pcm_format.pcm_sample_size, 2)
139        location = "end" if reverse else "begin"
140        LOGGER.log(
141            VERBOSE_LOG_LEVEL,
142            "stripped %s seconds of silence from %s of pcm audio. bytes stripped: %s",
143            seconds_stripped,
144            location,
145            bytes_stripped,
146        )
147    return stripped_data
148
149
150def get_player_dsp_details(
151    mass: MusicAssistant, player: Player, group_preventing_dsp: bool = False
152) -> DSPDetails:
153    """Return DSP details of single a player.
154
155    This will however not check if the queried player is part of a group.
156    The caller is responsible for passing the result of is_grouping_preventing_dsp of
157    the leader/PlayerGroup as the group_preventing_dsp argument in such cases.
158    """
159    dsp_config = mass.config.get_player_dsp_config(player.player_id)
160    dsp_state = DSPState.ENABLED if dsp_config.enabled else DSPState.DISABLED
161    if dsp_state == DSPState.ENABLED and (
162        group_preventing_dsp or is_grouping_preventing_dsp(player)
163    ):
164        dsp_state = DSPState.DISABLED_BY_UNSUPPORTED_GROUP
165        dsp_config = DSPConfig(enabled=False)
166    elif dsp_state == DSPState.DISABLED:
167        # DSP is disabled by the user, remove all filters
168        dsp_config = DSPConfig(enabled=False)
169
170    # remove disabled filters
171    dsp_config.filters = [x for x in dsp_config.filters if x.enabled]
172
173    output_limiter = is_output_limiter_enabled(mass, player)
174    return DSPDetails(
175        state=dsp_state,
176        input_gain=dsp_config.input_gain,
177        filters=dsp_config.filters,
178        output_gain=dsp_config.output_gain,
179        output_limiter=output_limiter,
180        output_format=player.extra_data.get("output_format", None),
181    )
182
183
184def get_stream_dsp_details(
185    mass: MusicAssistant,
186    queue_id: str,
187) -> dict[str, DSPDetails]:
188    """Return DSP details of all players playing this queue, keyed by player_id."""
189    player = mass.players.get(queue_id)
190    dsp: dict[str, DSPDetails] = {}
191    assert player is not None  # for type checking
192    group_preventing_dsp = is_grouping_preventing_dsp(player)
193    output_format = None
194    is_external_group = False
195
196    if player.type == PlayerType.GROUP and isinstance(player, SyncGroupPlayer):
197        if group_preventing_dsp:
198            if sync_leader := player.sync_leader:
199                output_format = sync_leader.extra_data.get("output_format", None)
200    else:
201        # We only add real players (so skip the PlayerGroups as they only sync containing players)
202        details = get_player_dsp_details(mass, player)
203        dsp[player.player_id] = details
204        if group_preventing_dsp:
205            # The leader is responsible for sending the (combined) audio stream, so get
206            # the output format from the leader.
207            output_format = player.extra_data.get("output_format", None)
208        is_external_group = player.type in (PlayerType.GROUP, PlayerType.STEREO_PAIR)
209
210    # We don't enumerate all group members in case this group is externally created
211    # (e.g. a Chromecast group from the Google Home app)
212    if player and player.group_members and not is_external_group:
213        # grouped playback, get DSP details for each player in the group
214        for child_id in player.group_members:
215            # skip if we already have the details (so if it's the group leader)
216            if child_id in dsp:
217                continue
218            if child_player := mass.players.get(child_id):
219                dsp[child_id] = get_player_dsp_details(
220                    mass, child_player, group_preventing_dsp=group_preventing_dsp
221                )
222                if group_preventing_dsp:
223                    # Use the correct format from the group leader, since
224                    # this player is part of a group that does not support
225                    # multi device DSP processing.
226                    dsp[child_id].output_format = output_format
227    return dsp
228
229
230async def get_stream_details(
231    mass: MusicAssistant,
232    queue_item: QueueItem,
233    seek_position: int = 0,
234    fade_in: bool = False,
235    prefer_album_loudness: bool = False,
236) -> StreamDetails:
237    """
238    Get streamdetails for the given QueueItem.
239
240    This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
241    Do not try to request streamdetails too much in advance as this is expiring data.
242    """
243    streamdetails: StreamDetails | None = None
244    time_start = time.time()
245    LOGGER.debug("Getting streamdetails for %s", queue_item.uri)
246    if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
247        LOGGER.warning("seeking is not possible on duration-less streams!")
248        seek_position = 0
249
250    if not queue_item.media_item and not queue_item.streamdetails:
251        # in case of a non-media item queue item, the streamdetails should already be provided
252        # this should not happen, but guard it just in case
253        raise MediaNotFoundError(
254            f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
255        )
256    buffer: AudioBuffer | None = None
257    if queue_item.streamdetails and (
258        (queue_item.streamdetails.created_at + queue_item.streamdetails.expiration) > time.time()
259        or ((buffer := queue_item.streamdetails.buffer) and buffer.is_valid(seek_position))
260    ):
261        # already got a fresh/unused (or unexpired) streamdetails
262        streamdetails = queue_item.streamdetails
263    else:
264        # need to (re)create streamdetails
265        # retrieve streamdetails from provider
266
267        media_item = queue_item.media_item
268        assert media_item is not None  # for type checking
269        preferred_providers: list[str] = []
270        if (
271            (queue := mass.player_queues.get(queue_item.queue_id))
272            and queue.userid
273            and (playback_user := await mass.webserver.auth.get_user(queue.userid))
274            and playback_user.provider_filter
275        ):
276            # handle steering into user preferred providerinstance
277            preferred_providers = playback_user.provider_filter
278        else:
279            preferred_providers = [x.provider_instance for x in media_item.provider_mappings]
280        for allow_other_provider in (False, True):
281            # sort by quality and check item's availability
282            for prov_media in sorted(
283                media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
284            ):
285                if not prov_media.available:
286                    LOGGER.debug(f"Skipping unavailable {prov_media}")
287                    continue
288                if (
289                    not allow_other_provider
290                    and prov_media.provider_instance not in preferred_providers
291                ):
292                    continue
293                # guard that provider is available
294                music_prov = mass.get_provider(prov_media.provider_instance)
295                if TYPE_CHECKING:  # avoid circular import
296                    assert isinstance(music_prov, MusicProvider)
297                if not music_prov:
298                    LOGGER.debug(f"Skipping {prov_media} - provider not available")
299                    continue  # provider not available ?
300                # get streamdetails from provider
301                try:
302                    BYPASS_THROTTLER.set(True)
303                    streamdetails = await music_prov.get_stream_details(
304                        prov_media.item_id, media_item.media_type
305                    )
306                except MusicAssistantError as err:
307                    LOGGER.warning(str(err))
308                else:
309                    break
310                finally:
311                    BYPASS_THROTTLER.set(False)
312
313        if not streamdetails:
314            msg = f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
315            raise MediaNotFoundError(msg)
316
317        # work out how to handle radio stream
318        if (
319            streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
320            and streamdetails.media_type == MediaType.RADIO
321            and isinstance(streamdetails.path, str)
322        ):
323            resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
324            streamdetails.path = resolved_url
325            streamdetails.stream_type = stream_type
326            # Set up metadata monitoring callback for HLS radio streams
327            if stream_type == StreamType.HLS:
328                streamdetails.stream_metadata_update_callback = partial(
329                    _update_hls_radio_metadata, mass
330                )
331                streamdetails.stream_metadata_update_interval = 5
332        # handle volume normalization details
333        if result := await mass.music.get_loudness(
334            streamdetails.item_id,
335            streamdetails.provider,
336            media_type=queue_item.media_type,
337        ):
338            streamdetails.loudness = result[0]
339            streamdetails.loudness_album = result[1]
340
341    # set queue_id on the streamdetails so we know what is being streamed
342    streamdetails.queue_id = queue_item.queue_id
343    # handle skip/fade_in details
344    streamdetails.seek_position = seek_position
345    streamdetails.fade_in = fade_in
346    if not streamdetails.duration:
347        streamdetails.duration = queue_item.duration
348    streamdetails.prefer_album_loudness = prefer_album_loudness
349    player_settings = await mass.config.get_player_config(streamdetails.queue_id)
350    core_config = await mass.config.get_core_config("streams")
351    conf_volume_normalization_target = float(
352        str(player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET, -17))
353    )
354    if conf_volume_normalization_target < -30 or conf_volume_normalization_target >= 0:
355        conf_volume_normalization_target = -17.0  # reset to default if out of bounds
356        LOGGER.warning(
357            "Invalid volume normalization target configured for player %s, "
358            "resetting to default of -17.0 dB",
359            streamdetails.queue_id,
360        )
361    streamdetails.target_loudness = conf_volume_normalization_target
362    streamdetails.volume_normalization_mode = _get_normalization_mode(
363        core_config, player_settings, streamdetails
364    )
365
366    # attach the DSP details of all group members
367    streamdetails.dsp = get_stream_dsp_details(mass, streamdetails.queue_id)
368
369    LOGGER.debug(
370        "retrieved streamdetails for %s in %s milliseconds",
371        queue_item.uri,
372        int((time.time() - time_start) * 1000),
373    )
374    return streamdetails
375
376
377async def get_buffered_media_stream(
378    mass: MusicAssistant,
379    streamdetails: StreamDetails,
380    pcm_format: AudioFormat,
381    seek_position: int = 0,
382    filter_params: list[str] | None = None,
383) -> AsyncGenerator[bytes, None]:
384    """Get audio stream for given media details as raw PCM with buffering."""
385    LOGGER.log(
386        VERBOSE_LOG_LEVEL,
387        "buffered_media_stream: Starting for %s (seek: %s)",
388        streamdetails.uri,
389        seek_position,
390    )
391
392    # checksum based on filter_params
393    checksum = f"{filter_params}"
394
395    async def fill_buffer_task() -> None:
396        """Background task to fill the audio buffer."""
397        chunk_count = 0
398        status = "running"
399        try:
400            async for chunk in get_media_stream(
401                mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params
402            ):
403                chunk_count += 1
404                await audio_buffer.put(chunk)
405                # Yield to event loop to prevent blocking warnings
406                await asyncio.sleep(0)
407            # Only set EOF if we completed successfully
408            await audio_buffer.set_eof()
409        except asyncio.CancelledError:
410            status = "cancelled"
411            raise
412        except Exception:
413            status = "aborted with error"
414            raise
415        finally:
416            LOGGER.log(
417                VERBOSE_LOG_LEVEL,
418                "fill_buffer_task: %s (%s chunks) for %s",
419                status,
420                chunk_count,
421                streamdetails.uri,
422            )
423
424    # check for existing buffer and reuse if possible
425    existing_buffer: AudioBuffer | None = streamdetails.buffer
426    if existing_buffer is not None:
427        if not existing_buffer.is_valid(checksum, seek_position):
428            LOGGER.log(
429                VERBOSE_LOG_LEVEL,
430                "buffered_media_stream: Existing buffer invalid for %s (seek: %s, discarded: %s)",
431                streamdetails.uri,
432                seek_position,
433                existing_buffer._discarded_chunks,
434            )
435            await existing_buffer.clear()
436            streamdetails.buffer = None
437            existing_buffer = None
438        else:
439            LOGGER.debug(
440                "buffered_media_stream: Reusing existing buffer for %s - "
441                "available: %ss, seek: %s, discarded: %s",
442                streamdetails.uri,
443                existing_buffer.seconds_available,
444                seek_position,
445                existing_buffer._discarded_chunks,
446            )
447            audio_buffer = existing_buffer
448
449    if not existing_buffer and seek_position > 60:
450        # If seeking into the track and no valid buffer exists,
451        # just start a normal stream without buffering,
452        # otherwise we would need to fill the buffer up to the seek position first
453        # which is not efficient.
454        LOGGER.debug(
455            "buffered_media_stream: No existing buffer and seek >60s for %s, "
456            "starting normal (unbuffered) stream",
457            streamdetails.uri,
458        )
459        async for chunk in get_media_stream(
460            mass,
461            streamdetails,
462            pcm_format,
463            seek_position=seek_position,
464            filter_params=filter_params,
465        ):
466            yield chunk
467        return
468
469    if not existing_buffer:
470        # create new audio buffer and start fill task
471        LOGGER.debug(
472            "buffered_media_stream: Creating new buffer for %s",
473            streamdetails.uri,
474        )
475        audio_buffer = AudioBuffer(pcm_format, checksum)
476        streamdetails.buffer = audio_buffer
477        task = mass.loop.create_task(fill_buffer_task())
478        audio_buffer.attach_producer_task(task)
479
480    # special case: pcm format mismatch, resample on the fly
481    # this may happen in some special situations such as crossfading
482    # and its a bit of a waste to throw away the existing buffer
483    if audio_buffer.pcm_format != pcm_format:
484        LOGGER.info(
485            "buffered_media_stream: pcm format mismatch, resampling on the fly for %s - "
486            "buffer format: %s - requested format: %s",
487            streamdetails.uri,
488            audio_buffer.pcm_format,
489            pcm_format,
490        )
491        async for chunk in get_ffmpeg_stream(
492            audio_input=audio_buffer.iter(seek_position=seek_position),
493            input_format=audio_buffer.pcm_format,
494            output_format=pcm_format,
495        ):
496            yield chunk
497        return
498
499    # yield data from the buffer
500    chunk_count = 0
501    try:
502        async for chunk in audio_buffer.iter(seek_position=seek_position):
503            chunk_count += 1
504            yield chunk
505    finally:
506        LOGGER.log(
507            VERBOSE_LOG_LEVEL,
508            "buffered_media_stream: Completed, yielded %s chunks",
509            chunk_count,
510        )
511
512
513async def get_media_stream(
514    mass: MusicAssistant,
515    streamdetails: StreamDetails,
516    pcm_format: AudioFormat,
517    seek_position: int = 0,
518    filter_params: list[str] | None = None,
519) -> AsyncGenerator[bytes, None]:
520    """Get audio stream for given media details as raw PCM."""
521    logger = LOGGER.getChild("media_stream")
522    logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
523    extra_input_args = streamdetails.extra_input_args or []
524
525    # work out audio source for these streamdetails
526    audio_source: str | AsyncGenerator[bytes, None]
527    stream_type = streamdetails.stream_type
528    if stream_type == StreamType.CUSTOM:
529        music_prov = mass.get_provider(streamdetails.provider)
530        if TYPE_CHECKING:  # avoid circular import
531            assert isinstance(music_prov, MusicProvider)
532        audio_source = music_prov.get_audio_stream(
533            streamdetails,
534            seek_position=seek_position if streamdetails.can_seek else 0,
535        )
536        seek_position = 0 if streamdetails.can_seek else seek_position
537    elif stream_type == StreamType.ICY:
538        assert isinstance(streamdetails.path, str)  # for type checking
539        audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
540        seek_position = 0  # seeking not possible on radio streams
541    elif stream_type == StreamType.HLS:
542        assert isinstance(streamdetails.path, str)  # for type checking
543        substream = await get_hls_substream(mass, streamdetails.path)
544        audio_source = substream.path
545        if streamdetails.media_type == MediaType.RADIO:
546            # HLS streams (especially the BBC) struggle when they're played directly
547            # with ffmpeg, where they just stop after some minutes,
548            # so we tell ffmpeg to loop around in this case.
549            extra_input_args += ["-stream_loop", "-1", "-re"]
550    else:
551        # all other stream types (HTTP, FILE, etc)
552        if stream_type == StreamType.ENCRYPTED_HTTP:
553            assert streamdetails.decryption_key is not None  # for type checking
554            extra_input_args += ["-decryption_key", streamdetails.decryption_key]
555        if isinstance(streamdetails.path, list):
556            # multi part stream
557            audio_source = get_multi_file_stream(mass, streamdetails, seek_position)
558            seek_position = 0  # handled by get_multi_file_stream
559        else:
560            # regular single file/url stream
561            assert isinstance(streamdetails.path, str)  # for type checking
562            audio_source = streamdetails.path
563
564    # handle seek support
565    if seek_position and streamdetails.duration and streamdetails.allow_seek:
566        extra_input_args += ["-ss", str(int(seek_position))]
567
568    bytes_sent = 0
569    finished = False
570    cancelled = False
571    first_chunk_received = False
572    ffmpeg_proc = FFMpeg(
573        audio_input=audio_source,
574        input_format=streamdetails.audio_format,
575        output_format=pcm_format,
576        filter_params=filter_params,
577        extra_input_args=extra_input_args,
578        collect_log_history=True,
579        loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
580    )
581
582    try:
583        await ffmpeg_proc.start()
584        assert ffmpeg_proc.proc is not None  # for type checking
585        logger.debug(
586            "Started media stream for %s - using streamtype: %s - pcm format: %s - ffmpeg PID: %s",
587            streamdetails.uri,
588            streamdetails.stream_type,
589            pcm_format.content_type.value,
590            ffmpeg_proc.proc.pid,
591        )
592        stream_start = mass.loop.time()
593
594        chunk_size = get_chunksize(pcm_format, 1)
595        async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
596            if not first_chunk_received:
597                # At this point ffmpeg has started and should now know the codec used
598                # for encoding the audio.
599                first_chunk_received = True
600                streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type
601                logger.debug(
602                    "First chunk received after %.2f seconds (codec detected: %s)",
603                    mass.loop.time() - stream_start,
604                    ffmpeg_proc.input_format.codec_type,
605                )
606            yield chunk
607            bytes_sent += len(chunk)
608
609        # end of audio/track reached
610        logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
611        # wait until stderr also completed reading
612        await ffmpeg_proc.wait_with_timeout(5)
613        if ffmpeg_proc.returncode not in (0, None):
614            log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:])
615            raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}")
616        if bytes_sent == 0:
617            # edge case: no audio data was received at all
618            raise AudioError("No audio was received")
619        finished = True
620    except (Exception, GeneratorExit, asyncio.CancelledError) as err:
621        if isinstance(err, asyncio.CancelledError | GeneratorExit):
622            # we were cancelled, just raise
623            cancelled = True
624            raise
625        # dump the last 10 lines of the log in case of an unclean exit
626        logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
627        raise AudioError(f"Error while streaming: {err}") from err
628    finally:
629        # always ensure close is called which also handles all cleanup
630        await ffmpeg_proc.close()
631        # determine how many seconds we've received
632        # for pcm output we can calculate this easily
633        seconds_received = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
634        # store accurate duration
635        if finished and not seek_position and seconds_received:
636            streamdetails.duration = int(seconds_received)
637
638        logger.log(
639            VERBOSE_LOG_LEVEL,
640            "stream %s (with code %s) for %s",
641            "cancelled" if cancelled else "finished" if finished else "aborted",
642            ffmpeg_proc.returncode,
643            streamdetails.uri,
644        )
645
646        # parse loudnorm data if we have that collected (and enabled)
647        if (
648            (streamdetails.loudness is None or finished)
649            and streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC
650            and (finished or (seconds_received >= 300))
651        ):
652            # if dynamic volume normalization is enabled
653            # the loudnorm filter will output the measurement in the log,
654            # so we can use that directly instead of analyzing the audio
655            logger.log(VERBOSE_LOG_LEVEL, "Collecting loudness measurement...")
656            if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
657                logger.debug(
658                    "Loudness measurement for %s: %s dB",
659                    streamdetails.uri,
660                    loudness_details,
661                )
662                mass.create_task(
663                    mass.music.set_loudness(
664                        streamdetails.item_id,
665                        streamdetails.provider,
666                        loudness_details,
667                        media_type=streamdetails.media_type,
668                    )
669                )
670
671
672def create_wave_header(
673    samplerate: int = 44100, channels: int = 2, bitspersample: int = 16, duration: int | None = None
674) -> bytes:
675    """Generate a wave header from given params."""
676    file = BytesIO()
677
678    # Generate format chunk
679    format_chunk_spec = b"<4sLHHLLHH"
680    format_chunk = struct.pack(
681        format_chunk_spec,
682        b"fmt ",  # Chunk id
683        16,  # Size of this chunk (excluding chunk id and this field)
684        1,  # Audio format, 1 for PCM
685        channels,  # Number of channels
686        int(samplerate),  # Samplerate, 44100, 48000, etc.
687        int(samplerate * channels * (bitspersample / 8)),  # Byterate
688        int(channels * (bitspersample / 8)),  # Blockalign
689        bitspersample,  # 16 bits for two byte samples, etc.
690    )
691    # Generate data chunk
692    # duration = 3600*6.7
693    data_chunk_spec = b"<4sL"
694    if duration is None:
695        # use max value possible
696        datasize = 4254768000  # = 6,7 hours at 44100/16
697    else:
698        # calculate from duration
699        numsamples = samplerate * duration
700        datasize = int(numsamples * channels * (bitspersample / 8))
701    data_chunk = struct.pack(
702        data_chunk_spec,
703        b"data",  # Chunk id
704        int(datasize),  # Chunk size (excluding chunk id and this field)
705    )
706    sum_items = [
707        # "WAVE" string following size field
708        4,
709        # "fmt " + chunk size field + chunk size
710        struct.calcsize(format_chunk_spec),
711        # Size of data chunk spec + data size
712        struct.calcsize(data_chunk_spec) + datasize,
713    ]
714    # Generate main header
715    all_chunks_size = int(sum(sum_items))
716    main_header_spec = b"<4sL4s"
717    main_header = struct.pack(main_header_spec, b"RIFF", all_chunks_size, b"WAVE")
718    # Write all the contents in
719    file.write(main_header)
720    file.write(format_chunk)
721    file.write(data_chunk)
722
723    # return file.getvalue(), all_chunks_size + 8
724    return file.getvalue()
725
726
727async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, StreamType]:
728    """
729    Resolve a streaming radio URL.
730
731    Unwraps any playlists if needed.
732    Determines if the stream supports ICY metadata.
733
734    Returns tuple;
735    - unfolded URL as string
736    - StreamType to determine ICY (radio) or HLS stream.
737    """
738    if cache := await mass.cache.get(
739        key=url, provider=CACHE_PROVIDER, category=CACHE_CATEGORY_RESOLVED_RADIO_URL
740    ):
741        if TYPE_CHECKING:  # for type checking
742            cache = cast("tuple[str, str]", cache)
743        return (cache[0], StreamType(cache[1]))
744    stream_type = StreamType.HTTP
745    resolved_url = url
746    timeout = ClientTimeout(total=None, connect=10, sock_read=5)
747    try:
748        async with mass.http_session_no_ssl.get(
749            url, headers=HTTP_HEADERS_ICY, allow_redirects=True, timeout=timeout
750        ) as resp:
751            headers = resp.headers
752            resp.raise_for_status()
753            if not resp.headers:
754                raise InvalidDataError("no headers found")
755        if headers.get("icy-metaint") is not None:
756            stream_type = StreamType.ICY
757        if (
758            url.endswith((".m3u", ".m3u8", ".pls"))
759            or ".m3u?" in url
760            or ".m3u8?" in url
761            or ".pls?" in url
762            or "audio/x-mpegurl" in headers.get("content-type", "")
763            or "audio/x-scpls" in headers.get("content-type", "")
764        ):
765            # url is playlist, we need to unfold it
766            try:
767                substreams = await fetch_playlist(mass, url)
768                if not any(x for x in substreams if x.length):
769                    for line in substreams:
770                        if not line.is_url:
771                            continue
772                        # unfold first url of playlist
773                        return await resolve_radio_stream(mass, line.path)
774                    raise InvalidDataError("No content found in playlist")
775            except IsHLSPlaylist:
776                stream_type = StreamType.HLS
777
778    except Exception as err:
779        LOGGER.warning("Error while parsing radio URL %s: %s", url, str(err))
780        return (url, stream_type)
781
782    result = (resolved_url, stream_type)
783    cache_expiration = 3600 * 3
784    await mass.cache.set(
785        url,
786        result,
787        expiration=cache_expiration,
788        provider=CACHE_PROVIDER,
789        category=CACHE_CATEGORY_RESOLVED_RADIO_URL,
790    )
791    return result
792
793
794async def get_icy_radio_stream(
795    mass: MusicAssistant, url: str, streamdetails: StreamDetails
796) -> AsyncGenerator[bytes, None]:
797    """Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
798    timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
799    LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
800    async with mass.http_session_no_ssl.get(
801        url, allow_redirects=True, headers=HTTP_HEADERS_ICY, timeout=timeout
802    ) as resp:
803        headers = resp.headers
804        meta_int = int(headers["icy-metaint"])
805        while True:
806            try:
807                yield await resp.content.readexactly(meta_int)
808                meta_byte = await resp.content.readexactly(1)
809                if meta_byte == b"\x00":
810                    continue
811                meta_length = ord(meta_byte) * 16
812                meta_data = await resp.content.readexactly(meta_length)
813            except asyncio.exceptions.IncompleteReadError:
814                break
815            if not meta_data:
816                continue
817            meta_data = meta_data.rstrip(b"\0")
818            stream_title_re = re.search(rb"StreamTitle='([^']*)';", meta_data)
819            if not stream_title_re:
820                continue
821            try:
822                # in 99% of the cases the stream title is utf-8 encoded
823                stream_title = stream_title_re.group(1).decode("utf-8")
824            except UnicodeDecodeError:
825                # fallback to iso-8859-1
826                stream_title = stream_title_re.group(1).decode("iso-8859-1", errors="replace")
827            cleaned_stream_title = clean_stream_title(stream_title)
828            if cleaned_stream_title != streamdetails.stream_title:
829                LOGGER.log(
830                    VERBOSE_LOG_LEVEL,
831                    "ICY Radio streamtitle original: %s",
832                    stream_title,
833                )
834                LOGGER.log(
835                    VERBOSE_LOG_LEVEL,
836                    "ICY Radio streamtitle cleaned: %s",
837                    cleaned_stream_title,
838                )
839                streamdetails.stream_title = cleaned_stream_title
840
841
842def parse_extinf_metadata(extinf_line: str) -> dict[str, str]:
843    """
844    Parse metadata from HLS EXTINF line.
845
846    Extracts structured metadata like title="...", artist="..." from EXTINF lines.
847    Common in iHeartRadio and other commercial radio HLS streams.
848
849    :param extinf_line: The EXTINF line containing metadata
850    """
851    metadata = {}
852
853    # Pattern to match key="value" pairs in the EXTINF line
854    # Handles nested quotes by matching everything until the closing quote
855    pattern = r'(\w+)="([^"]*)"'
856
857    matches = re.findall(pattern, extinf_line)
858    for key, value in matches:
859        metadata[key.lower()] = value
860
861    return metadata
862
863
864async def _update_hls_radio_metadata(
865    mass: MusicAssistant,
866    streamdetails: StreamDetails,
867    elapsed_time: int,  # noqa: ARG001
868) -> None:
869    """
870    Update HLS radio stream metadata by fetching the playlist.
871
872    Fetches the HLS playlist and extracts metadata from EXTINF lines.
873
874    :param mass: MusicAssistant instance
875    :param streamdetails: StreamDetails object to update with metadata
876    :param elapsed_time: Current playback position in seconds (unused for live radio)
877    """
878    try:
879        # Get the actual media playlist URL from cache or resolve it
880        # We cache the media_playlist_url in streamdetails.data to avoid re-resolving
881        if streamdetails.data is None:
882            streamdetails.data = {}
883        media_playlist_url = streamdetails.data.get("hls_media_playlist_url")
884        if not media_playlist_url:
885            try:
886                assert isinstance(streamdetails.path, str)  # for type checking
887                substream = await get_hls_substream(mass, streamdetails.path)
888                media_playlist_url = substream.path
889                streamdetails.data["hls_media_playlist_url"] = media_playlist_url
890            except Exception as err:
891                LOGGER.warning(
892                    "Failed to resolve HLS substream for metadata monitoring: %s",
893                    err,
894                )
895                return
896
897        # Fetch the media playlist
898        timeout = ClientTimeout(total=0, connect=10, sock_read=30)
899        async with mass.http_session_no_ssl.get(media_playlist_url, timeout=timeout) as resp:
900            resp.raise_for_status()
901            playlist_content = await resp.text()
902
903        # Parse the playlist and look for EXTINF metadata
904        # The most recent segment usually has the current metadata
905        lines = playlist_content.strip().split("\n")
906        for line in reversed(lines):
907            if line.startswith("#EXTINF:"):
908                # Extract metadata from EXTINF line
909                metadata = parse_extinf_metadata(line)
910
911                # Build stream title from title and artist
912                title = metadata.get("title", "")
913                artist = metadata.get("artist", "")
914
915                if title or artist:
916                    # Format as "Artist - Title"
917                    if artist and title:
918                        stream_title = f"{artist} - {title}"
919                    elif title:
920                        stream_title = title
921                    else:
922                        stream_title = artist
923
924                    # Clean the stream title
925                    cleaned_title = clean_stream_title(stream_title)
926
927                    # Only update if changed
928                    if cleaned_title != streamdetails.stream_title and cleaned_title:
929                        LOGGER.log(
930                            VERBOSE_LOG_LEVEL,
931                            "HLS Radio metadata updated: %s",
932                            cleaned_title,
933                        )
934                        streamdetails.stream_title = cleaned_title
935
936                # Only check the most recent EXTINF
937                break
938
939    except Exception as err:
940        LOGGER.debug(
941            "Error fetching HLS metadata: %s",
942            err,
943        )
944
945
946async def get_hls_substream(
947    mass: MusicAssistant,
948    url: str,
949) -> PlaylistItem:
950    """Select the (highest quality) HLS substream for given HLS playlist/URL."""
951    timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
952    # fetch master playlist and select (best) child playlist
953    # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
954    async with mass.http_session_no_ssl.get(
955        url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
956    ) as resp:
957        resp.raise_for_status()
958        raw_data = await resp.read()
959        encoding = resp.charset or await detect_charset(raw_data)
960        master_m3u_data = raw_data.decode(encoding)
961    substreams = parse_m3u(master_m3u_data)
962    # There is a chance that we did not get a master playlist with subplaylists
963    # but just a single master/sub playlist with the actual audio stream(s)
964    # so we need to detect if the playlist child's contain audio streams or
965    # sub-playlists.
966    if any(
967        x
968        for x in substreams
969        if (x.length or x.path.endswith((".mp4", ".aac")))
970        and not x.path.endswith((".m3u", ".m3u8"))
971    ):
972        return PlaylistItem(path=url, key=substreams[0].key)
973    # sort substreams on best quality (highest bandwidth) when available
974    if any(x for x in substreams if x.stream_info):
975        substreams.sort(
976            key=lambda x: int(
977                x.stream_info.get("BANDWIDTH", "0") if x.stream_info is not None else 0
978            ),
979            reverse=True,
980        )
981    substream = substreams[0]
982    if not substream.path.startswith("http"):
983        # path is relative, stitch it together
984        base_path = url.rsplit("/", 1)[0]
985        substream.path = base_path + "/" + substream.path
986    return substream
987
988
989async def get_http_stream(
990    mass: MusicAssistant,
991    url: str,
992    streamdetails: StreamDetails,
993    seek_position: int = 0,
994    verify_ssl: bool = True,
995) -> AsyncGenerator[bytes, None]:
996    """Get audio stream from HTTP."""
997    LOGGER.debug("Start HTTP stream for %s (seek_position %s)", streamdetails.uri, seek_position)
998    if seek_position:
999        assert streamdetails.duration, "Duration required for seek requests"
1000    http_session = mass.http_session if verify_ssl else mass.http_session_no_ssl
1001    # try to get filesize with a head request
1002    seek_supported = streamdetails.can_seek
1003    if seek_position or not streamdetails.size:
1004        async with http_session.head(url, allow_redirects=True, headers=HTTP_HEADERS) as resp:
1005            resp.raise_for_status()
1006            if size := resp.headers.get("Content-Length"):
1007                streamdetails.size = int(size)
1008            seek_supported = resp.headers.get("Accept-Ranges") == "bytes"
1009    # headers
1010    headers = {**HTTP_HEADERS}
1011    timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
1012    skip_bytes = 0
1013    if seek_position and streamdetails.size:
1014        assert streamdetails.duration is not None  # for type checking
1015        skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position)
1016        headers["Range"] = f"bytes={skip_bytes}-{streamdetails.size}"
1017
1018    # seeking an unknown or container format is not supported due to the (moov) headers
1019    if seek_position and (
1020        not seek_supported
1021        or streamdetails.audio_format.content_type
1022        in (
1023            ContentType.UNKNOWN,
1024            ContentType.M4A,
1025            ContentType.M4B,
1026        )
1027    ):
1028        LOGGER.warning(
1029            "Seeking in %s (%s) not possible.",
1030            streamdetails.uri,
1031            streamdetails.audio_format.output_format_str,
1032        )
1033        seek_position = 0
1034        streamdetails.seek_position = 0
1035
1036    # start the streaming from http
1037    bytes_received = 0
1038    async with http_session.get(
1039        url, allow_redirects=True, headers=headers, timeout=timeout
1040    ) as resp:
1041        is_partial = resp.status == 206
1042        if seek_position and not is_partial:
1043            raise InvalidDataError("HTTP source does not support seeking!")
1044        resp.raise_for_status()
1045        async for chunk in resp.content.iter_any():
1046            bytes_received += len(chunk)
1047            yield chunk
1048
1049    # store size on streamdetails for later use
1050    if not streamdetails.size:
1051        streamdetails.size = bytes_received
1052    LOGGER.debug(
1053        "Finished HTTP stream for %s (transferred %s/%s bytes)",
1054        streamdetails.uri,
1055        bytes_received,
1056        streamdetails.size,
1057    )
1058
1059
1060async def get_file_stream(
1061    mass: MusicAssistant,  # noqa: ARG001
1062    filename: str,
1063    streamdetails: StreamDetails,
1064    seek_position: int = 0,
1065) -> AsyncGenerator[bytes, None]:
1066    """Get audio stream from local accessible file."""
1067    if seek_position:
1068        assert streamdetails.duration, "Duration required for seek requests"
1069    if not streamdetails.size:
1070        stat = await asyncio.to_thread(os.stat, filename)
1071        streamdetails.size = stat.st_size
1072
1073    # seeking an unknown or container format is not supported due to the (moov) headers
1074    if seek_position and (
1075        streamdetails.audio_format.content_type
1076        in (
1077            ContentType.UNKNOWN,
1078            ContentType.M4A,
1079            ContentType.M4B,
1080            ContentType.MP4,
1081        )
1082    ):
1083        LOGGER.warning(
1084            "Seeking in %s (%s) not possible.",
1085            streamdetails.uri,
1086            streamdetails.audio_format.output_format_str,
1087        )
1088        seek_position = 0
1089        streamdetails.seek_position = 0
1090
1091    chunk_size = get_chunksize(streamdetails.audio_format)
1092    async with aiofiles.open(streamdetails.data, "rb") as _file:
1093        if seek_position:
1094            assert streamdetails.duration is not None  # for type checking
1095            seek_pos = int((streamdetails.size / streamdetails.duration) * seek_position)
1096            await _file.seek(seek_pos)
1097        # yield chunks of data from file
1098        while True:
1099            data = await _file.read(chunk_size)
1100            if not data:
1101                break
1102            yield data
1103
1104
1105def _get_parts_from_position(
1106    parts: list[MultiPartPath], seek_position: int
1107) -> tuple[list[MultiPartPath], int]:
1108    """Get the remaining parts list from a timestamp.
1109
1110    Arguments:
1111    parts: The list of  parts
1112    seek_position: The seeking position in seconds of the tracklist
1113
1114    Returns:
1115        In a tuple, A list of  parts, starting with the one at the requested
1116        seek position and the position in seconds to seek to in the first
1117        track.
1118    """
1119    skipped_duration = 0.0
1120    for i, part in enumerate(parts):
1121        if not isinstance(part, MultiPartPath):
1122            raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1123        if part.duration is None:
1124            return parts, seek_position
1125        if skipped_duration + part.duration < seek_position:
1126            skipped_duration += part.duration
1127            continue
1128
1129        position = seek_position - skipped_duration
1130
1131        # Seeking in some parts is inaccurate, making the seek to a chapter land on the end of
1132        # the previous track. If we're within 2 second of the end, skip the current track
1133        if position + 2 >= part.duration:
1134            LOGGER.debug(
1135                f"Skipping to the next part due to seek position being at the end: {position}"
1136            )
1137            if i + 1 < len(parts):
1138                return parts[i + 1 :], 0
1139            return parts[i:], int(position)  # last part, cannot skip
1140
1141        return parts[i:], int(position)
1142
1143    raise IndexError(f"Could not find any candidate part for position {seek_position}")
1144
1145
1146async def get_multi_file_stream(
1147    mass: MusicAssistant,  # noqa: ARG001
1148    streamdetails: StreamDetails,
1149    seek_position: int = 0,
1150) -> AsyncGenerator[bytes, None]:
1151    """Return audio stream for a concatenation of multiple files.
1152
1153    Arguments:
1154    seek_position: The position to seek to in seconds
1155    """
1156    if not isinstance(streamdetails.path, list):
1157        raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1158    parts, seek_position = _get_parts_from_position(streamdetails.path, seek_position)
1159    files_list = [part.path for part in parts]
1160
1161    # concat input files
1162    temp_file = f"/tmp/{shortuuid.random(20)}.txt"  # noqa: S108
1163    async with aiofiles.open(temp_file, "w") as f:
1164        for path in files_list:
1165            await f.write(f"file '{path}'\n")
1166
1167    try:
1168        async for chunk in get_ffmpeg_stream(
1169            audio_input=temp_file,
1170            input_format=streamdetails.audio_format,
1171            output_format=AudioFormat(
1172                content_type=ContentType.NUT,
1173                sample_rate=streamdetails.audio_format.sample_rate,
1174                bit_depth=streamdetails.audio_format.bit_depth,
1175                channels=streamdetails.audio_format.channels,
1176            ),
1177            extra_input_args=[
1178                "-safe",
1179                "0",
1180                "-f",
1181                "concat",
1182                "-i",
1183                temp_file,
1184                "-ss",
1185                str(seek_position),
1186            ],
1187        ):
1188            yield chunk
1189    finally:
1190        await remove_file(temp_file)
1191
1192
1193async def get_preview_stream(
1194    mass: MusicAssistant,
1195    provider_instance_id_or_domain: str,
1196    item_id: str,
1197    media_type: MediaType = MediaType.TRACK,
1198) -> AsyncGenerator[bytes, None]:
1199    """Create a 30 seconds preview audioclip for the given streamdetails."""
1200    if not (music_prov := mass.get_provider(provider_instance_id_or_domain)):
1201        raise ProviderUnavailableError
1202    if TYPE_CHECKING:  # avoid circular import
1203        assert isinstance(music_prov, MusicProvider)
1204
1205    # Validate that item_id corresponds to a valid item in the provider for security
1206    if not await music_prov.get_item(media_type, item_id):
1207        msg = f"Item {item_id} not found in provider {provider_instance_id_or_domain}"
1208        raise MediaNotFoundError(msg)
1209
1210    streamdetails = await music_prov.get_stream_details(item_id, media_type)
1211    pcm_format = AudioFormat(
1212        content_type=ContentType.from_bit_depth(streamdetails.audio_format.bit_depth),
1213        sample_rate=streamdetails.audio_format.sample_rate,
1214        bit_depth=streamdetails.audio_format.bit_depth,
1215        channels=streamdetails.audio_format.channels,
1216    )
1217    async for chunk in get_ffmpeg_stream(
1218        audio_input=get_media_stream(
1219            mass=mass,
1220            streamdetails=streamdetails,
1221            pcm_format=pcm_format,
1222        ),
1223        input_format=pcm_format,
1224        output_format=AudioFormat(content_type=ContentType.AAC),
1225        extra_input_args=["-t", "30"],  # cut after 30 seconds
1226    ):
1227        yield chunk
1228
1229
1230async def get_silence(
1231    duration: int,
1232    output_format: AudioFormat,
1233) -> AsyncGenerator[bytes, None]:
1234    """Create stream of silence, encoded to format of choice."""
1235    if output_format.content_type.is_pcm():
1236        # pcm = just zeros
1237        for _ in range(duration):
1238            yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1239        return
1240    if output_format.content_type == ContentType.WAV:
1241        # wav silence = wave header + zero's
1242        yield create_wave_header(
1243            samplerate=output_format.sample_rate,
1244            channels=2,
1245            bitspersample=output_format.bit_depth,
1246            duration=duration,
1247        )
1248        for _ in range(duration):
1249            yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1250        return
1251    # use ffmpeg for all other encodings
1252    args = [
1253        "ffmpeg",
1254        "-hide_banner",
1255        "-loglevel",
1256        "quiet",
1257        "-f",
1258        "lavfi",
1259        "-i",
1260        f"anullsrc=r={output_format.sample_rate}:cl={'stereo'}",
1261        "-t",
1262        str(duration),
1263        "-f",
1264        output_format.output_format_str,
1265        "-",
1266    ]
1267    async with AsyncProcess(args, stdout=True) as ffmpeg_proc:
1268        async for chunk in ffmpeg_proc.iter_chunked():
1269            yield chunk
1270
1271
1272async def resample_pcm_audio(
1273    input_audio: bytes,
1274    input_format: AudioFormat,
1275    output_format: AudioFormat,
1276) -> bytes:
1277    """
1278    Resample (a chunk of) PCM audio from input_format to output_format using ffmpeg.
1279
1280    :param input_audio: Raw PCM audio data to resample.
1281    :param input_format: AudioFormat of the input audio.
1282    :param output_format: Desired AudioFormat for the output audio.
1283
1284    :return: Resampled audio data, frame-aligned. Returns empty bytes if resampling fails.
1285    """
1286    if input_format == output_format:
1287        return input_audio
1288    LOGGER.log(VERBOSE_LOG_LEVEL, f"Resampling audio from {input_format} to {output_format}")
1289    try:
1290        ffmpeg_args = get_ffmpeg_args(
1291            input_format=input_format, output_format=output_format, filter_params=[]
1292        )
1293        _, stdout, stderr = await communicate(ffmpeg_args, input_audio)
1294        if not stdout:
1295            LOGGER.error(
1296                "Resampling failed: no output from ffmpeg. Input: %s, Output: %s, stderr: %s",
1297                input_format,
1298                output_format,
1299                stderr.decode() if stderr else "(no stderr)",
1300            )
1301            return b""
1302        # Ensure frame alignment after resampling
1303        return align_audio_to_frame_boundary(stdout, output_format)
1304    except Exception as err:
1305        LOGGER.exception(
1306            "Failed to resample audio from %s to %s: %s",
1307            input_format,
1308            output_format,
1309            err,
1310        )
1311        return b""
1312
1313
1314def get_chunksize(
1315    fmt: AudioFormat,
1316    seconds: float = 1,
1317) -> int:
1318    """Get a default chunk/file size for given contenttype in bytes."""
1319    pcm_size = int(fmt.sample_rate * (fmt.bit_depth / 8) * fmt.channels * seconds)
1320    if fmt.content_type.is_pcm() or fmt.content_type == ContentType.WAV:
1321        return pcm_size
1322    if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
1323        return pcm_size
1324    if fmt.bit_rate and fmt.bit_rate < 10000:
1325        return int(((fmt.bit_rate * 1000) / 8) * seconds)
1326    if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
1327        # assume 74.7% compression ratio (level 0)
1328        # source: https://z-issue.com/wp/flac-compression-level-comparison/
1329        return int(pcm_size * 0.747)
1330    if fmt.content_type in (ContentType.MP3, ContentType.OGG):
1331        return int((320000 / 8) * seconds)
1332    if fmt.content_type in (ContentType.AAC, ContentType.M4A):
1333        return int((256000 / 8) * seconds)
1334    return int((320000 / 8) * seconds)
1335
1336
1337def is_grouping_preventing_dsp(player: Player) -> bool:
1338    """Check if grouping is preventing DSP from being applied to this leader/PlayerGroup.
1339
1340    If this returns True, no DSP should be applied to the player.
1341    This function will not check if the Player is in a group, the caller should do that first.
1342    """
1343    # We require the caller to handle non-leader cases themselves since player.synced_to
1344    # can be unreliable in some edge cases
1345    multi_device_dsp_supported = PlayerFeature.MULTI_DEVICE_DSP in player.supported_features
1346    child_count = len(player.group_members) if player.group_members else 0
1347
1348    is_multiple_devices: bool
1349    if player.provider.domain == "player_group":
1350        # PlayerGroups have no leader, so having a child count of 1 means
1351        # the group actually contains only a single player.
1352        is_multiple_devices = child_count > 1
1353    elif player.type == PlayerType.GROUP:
1354        # This is an group player external to Music Assistant.
1355        is_multiple_devices = True
1356    else:
1357        is_multiple_devices = child_count > 0
1358    return is_multiple_devices and not multi_device_dsp_supported
1359
1360
1361def is_output_limiter_enabled(mass: MusicAssistant, player: Player) -> bool:
1362    """Check if the player has the output limiter enabled.
1363
1364    Unlike DSP, the limiter is still configurable when synchronized without MULTI_DEVICE_DSP.
1365    So in grouped scenarios without MULTI_DEVICE_DSP, the permanent sync group or the leader gets
1366    decides if the limiter should be turned on or not.
1367    """
1368    deciding_player_id = player.player_id
1369    if player.active_group:
1370        # Syncgroup, get from the group player
1371        deciding_player_id = player.active_group
1372    elif player.synced_to:
1373        # Not in sync group, but synced, get from the leader
1374        deciding_player_id = player.synced_to
1375    output_limiter_enabled = mass.config.get_raw_player_config_value(
1376        deciding_player_id,
1377        CONF_ENTRY_OUTPUT_LIMITER.key,
1378        CONF_ENTRY_OUTPUT_LIMITER.default_value,
1379    )
1380    return bool(output_limiter_enabled)
1381
1382
1383def get_player_filter_params(
1384    mass: MusicAssistant,
1385    player_id: str,
1386    input_format: AudioFormat,
1387    output_format: AudioFormat,
1388) -> list[str]:
1389    """Get player specific filter parameters for ffmpeg (if any)."""
1390    filter_params = []
1391
1392    dsp = mass.config.get_player_dsp_config(player_id)
1393    limiter_enabled = True
1394
1395    if player := mass.players.get(player_id):
1396        if is_grouping_preventing_dsp(player):
1397            # We can not correctly apply DSP to a grouped player without multi-device DSP support,
1398            # so we disable it.
1399            dsp.enabled = False
1400        elif player.provider.domain == "player_group" and (
1401            PlayerFeature.MULTI_DEVICE_DSP not in player.supported_features
1402        ):
1403            # This is a special case! We have a player group where:
1404            # - The group leader does not support MULTI_DEVICE_DSP
1405            # - But only contains a single player (since nothing is preventing DSP)
1406            # We can still apply the DSP of that single player.
1407            if player.group_members:
1408                child_player = mass.players.get(player.group_members[0])
1409                assert child_player is not None  # for type checking
1410                dsp = mass.config.get_player_dsp_config(child_player.player_id)
1411            else:
1412                # This should normally never happen, but if it does, we disable DSP.
1413                dsp.enabled = False
1414
1415        # We here implicitly know what output format is used for the player
1416        # in the audio processing steps. We save this information to
1417        # later be able to show this to the user in the UI.
1418        player.extra_data["output_format"] = output_format
1419
1420        limiter_enabled = is_output_limiter_enabled(mass, player)
1421
1422    if dsp.enabled:
1423        # Apply input gain
1424        if dsp.input_gain != 0:
1425            filter_params.append(f"volume={dsp.input_gain}dB")
1426
1427        # Process each DSP filter sequentially
1428        for f in dsp.filters:
1429            if not f.enabled:
1430                continue
1431
1432            # Apply filter
1433            filter_params.extend(filter_to_ffmpeg_params(f, input_format))
1434
1435        # Apply output gain
1436        if dsp.output_gain != 0:
1437            filter_params.append(f"volume={dsp.output_gain}dB")
1438
1439    conf_channels = mass.config.get_raw_player_config_value(
1440        player_id, CONF_OUTPUT_CHANNELS, "stereo"
1441    )
1442
1443    # handle output mixing only left or right
1444    if conf_channels == "left":
1445        filter_params.append("pan=mono|c0=FL")
1446    elif conf_channels == "right":
1447        filter_params.append("pan=mono|c0=FR")
1448
1449    # Add safety limiter at the end
1450    if limiter_enabled:
1451        filter_params.append("alimiter=limit=-2dB:level=false:asc=true")
1452
1453    LOGGER.debug("Generated ffmpeg params for player %s: %s", player_id, filter_params)
1454    return filter_params
1455
1456
1457def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
1458    """Parse Loudness measurement from ffmpeg stderr output."""
1459    stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
1460    if "[Parsed_loudnorm_0 @" not in stderr_data:
1461        return None
1462    for jsun_chunk in stderr_data.split(" { "):
1463        try:
1464            stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}"
1465            loudness_data = json_loads(stderr_data)
1466            return float(loudness_data["input_i"])
1467        except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError):
1468            continue
1469    return None
1470
1471
1472async def analyze_loudness(
1473    mass: MusicAssistant,
1474    streamdetails: StreamDetails,
1475) -> None:
1476    """Analyze media item's audio, to calculate EBU R128 loudness."""
1477    if await mass.music.get_loudness(
1478        streamdetails.item_id,
1479        streamdetails.provider,
1480        media_type=streamdetails.media_type,
1481    ):
1482        # only when needed we do the analyze job
1483        return
1484
1485    logger = LOGGER.getChild("analyze_loudness")
1486    logger.debug("Start analyzing audio for %s", streamdetails.uri)
1487
1488    extra_input_args = [
1489        # limit to 10 minutes to reading too much in memory
1490        "-t",
1491        "600",
1492    ]
1493    # work out audio source for these streamdetails
1494    stream_type = streamdetails.stream_type
1495    audio_source: str | AsyncGenerator[bytes, None]
1496    if stream_type == StreamType.CUSTOM:
1497        music_prov = mass.get_provider(streamdetails.provider)
1498        if TYPE_CHECKING:  # avoid circular import
1499            assert isinstance(music_prov, MusicProvider)
1500        audio_source = music_prov.get_audio_stream(streamdetails)
1501    elif stream_type == StreamType.ICY:
1502        assert isinstance(streamdetails.path, str)  # for type checking
1503        audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
1504    elif stream_type == StreamType.HLS:
1505        assert isinstance(streamdetails.path, str)  # for type checking
1506        substream = await get_hls_substream(mass, streamdetails.path)
1507        audio_source = substream.path
1508    else:
1509        # all other stream types (HTTP, FILE, etc)
1510        if stream_type == StreamType.ENCRYPTED_HTTP:
1511            assert streamdetails.decryption_key is not None  # for type checking
1512            extra_input_args += ["-decryption_key", streamdetails.decryption_key]
1513        if isinstance(streamdetails.path, list):
1514            # multi part stream - just use a single file for the measurement
1515            audio_source = streamdetails.path[1].path
1516        else:
1517            # regular single file/url stream
1518            assert isinstance(streamdetails.path, str)  # for type checking
1519            audio_source = streamdetails.path
1520
1521    # calculate BS.1770 R128 integrated loudness with ffmpeg
1522    async with FFMpeg(
1523        audio_input=audio_source,
1524        input_format=streamdetails.audio_format,
1525        output_format=streamdetails.audio_format,
1526        audio_output="NULL",
1527        filter_params=["ebur128=framelog=verbose"],
1528        extra_input_args=extra_input_args,
1529        collect_log_history=True,
1530        loglevel="info",
1531    ) as ffmpeg_proc:
1532        await ffmpeg_proc.wait()
1533        log_lines = ffmpeg_proc.log_history
1534        log_lines_str = "\n".join(log_lines)
1535        try:
1536            loudness_str = (
1537                log_lines_str.split("Integrated loudness")[1].split("I:")[1].split("LUFS")[0]
1538            )
1539            loudness = float(loudness_str.strip())
1540        except (IndexError, ValueError, AttributeError):
1541            LOGGER.warning(
1542                "Could not determine integrated loudness of %s - %s",
1543                streamdetails.uri,
1544                log_lines_str or "received empty value",
1545            )
1546        else:
1547            await mass.music.set_loudness(
1548                streamdetails.item_id,
1549                streamdetails.provider,
1550                loudness,
1551                media_type=streamdetails.media_type,
1552            )
1553            logger.debug(
1554                "Integrated loudness of %s is: %s",
1555                streamdetails.uri,
1556                loudness,
1557            )
1558
1559
1560def _get_normalization_mode(
1561    core_config: CoreConfig, player_config: PlayerConfig, streamdetails: StreamDetails
1562) -> VolumeNormalizationMode:
1563    if not player_config.get_value(CONF_VOLUME_NORMALIZATION):
1564        # disabled for this player
1565        return VolumeNormalizationMode.DISABLED
1566    if streamdetails.target_loudness is None:
1567        # no target loudness set, disable normalization
1568        return VolumeNormalizationMode.DISABLED
1569    # work out preference for track or radio
1570    preference = VolumeNormalizationMode(
1571        str(
1572            core_config.get_value(
1573                CONF_VOLUME_NORMALIZATION_RADIO
1574                if streamdetails.media_type == MediaType.RADIO
1575                else CONF_VOLUME_NORMALIZATION_TRACKS,
1576            )
1577        )
1578    )
1579
1580    # handle no measurement available but fallback to dynamic mode is allowed
1581    if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_DYNAMIC:
1582        return VolumeNormalizationMode.DYNAMIC
1583
1584    # handle no measurement available and no fallback allowed
1585    if streamdetails.loudness is None and preference == VolumeNormalizationMode.MEASUREMENT_ONLY:
1586        return VolumeNormalizationMode.DISABLED
1587
1588    # handle no measurement available and fallback to fixed gain is allowed
1589    if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_FIXED_GAIN:
1590        return VolumeNormalizationMode.FIXED_GAIN
1591
1592    # handle measurement available - chosen mode is measurement
1593    if streamdetails.loudness is not None and preference not in (
1594        VolumeNormalizationMode.DISABLED,
1595        VolumeNormalizationMode.FIXED_GAIN,
1596        VolumeNormalizationMode.DYNAMIC,
1597    ):
1598        return VolumeNormalizationMode.MEASUREMENT_ONLY
1599
1600    # simply return the preference
1601    return preference
1602