music-assistant-server

88.2 KBPY
streams_controller.py
88.2 KB2,008 lines • python
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24    ConfigEntryType,
25    ContentType,
26    MediaType,
27    PlayerFeature,
28    StreamType,
29    VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32    AudioError,
33    InvalidDataError,
34    ProviderUnavailableError,
35    QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41    ANNOUNCE_ALERT_FILE,
42    CONF_BIND_IP,
43    CONF_BIND_PORT,
44    CONF_CROSSFADE_DURATION,
45    CONF_ENTRY_ENABLE_ICY_METADATA,
46    CONF_ENTRY_LOG_LEVEL,
47    CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48    CONF_HTTP_PROFILE,
49    CONF_OUTPUT_CHANNELS,
50    CONF_OUTPUT_CODEC,
51    CONF_PUBLISH_IP,
52    CONF_SAMPLE_RATES,
53    CONF_SMART_FADES_MODE,
54    CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55    CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56    CONF_VOLUME_NORMALIZATION_RADIO,
57    CONF_VOLUME_NORMALIZATION_TRACKS,
58    DEFAULT_STREAM_HEADERS,
59    ICY_HEADERS,
60    INTERNAL_PCM_FORMAT,
61    SILENCE_FILE,
62    VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.helpers import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70    get_buffered_media_stream,
71    get_chunksize,
72    get_media_stream,
73    get_player_filter_params,
74    get_stream_details,
75    resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81    divide_chunks,
82    format_ip_for_url,
83    get_ip_addresses,
84    get_total_system_memory,
85)
86from music_assistant.helpers.webserver import Webserver
87from music_assistant.models.core_controller import CoreController
88from music_assistant.models.music_provider import MusicProvider
89from music_assistant.models.plugin import PluginProvider, PluginSource
90from music_assistant.models.smart_fades import SmartFadesMode
91from music_assistant.providers.universal_group.constants import UGP_PREFIX
92from music_assistant.providers.universal_group.player import UniversalGroupPlayer
93
94if TYPE_CHECKING:
95    from music_assistant_models.config_entries import CoreConfig
96    from music_assistant_models.player import PlayerMedia
97    from music_assistant_models.player_queue import PlayerQueue
98    from music_assistant_models.queue_item import QueueItem
99    from music_assistant_models.streamdetails import StreamDetails
100
101    from music_assistant.mass import MusicAssistant
102    from music_assistant.models.player import Player
103
104
105isfile = wrap(os.path.isfile)
106
107CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
108CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
109CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
110
111# Calculate total system memory once at module load time
112TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
113CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
114DEFAULT_PORT: Final[int] = 8097
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118    """Parse PCM info from a codec/content_type string."""
119    params = (
120        dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121    )
122    sample_rate = int(params.get("rate", 44100))
123    sample_size = int(params.get("bitrate", 16))
124    channels = int(params.get("channels", 2))
125    return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130    """Data class to hold crossfade data."""
131
132    data: bytes
133    fade_in_size: int
134    pcm_format: AudioFormat  # Format of the 'data' bytes (current/previous track's format)
135    fade_in_pcm_format: AudioFormat  # Format for 'fade_in_size' (next track's format)
136    queue_item_id: str
137
138
139class StreamsController(CoreController):
140    """Webserver Controller to stream audio to players."""
141
142    domain: str = "streams"
143
144    def __init__(self, mass: MusicAssistant) -> None:
145        """Initialize instance."""
146        super().__init__(mass)
147        self._server = Webserver(self.logger, enable_dynamic_routes=True)
148        self.register_dynamic_route = self._server.register_dynamic_route
149        self.unregister_dynamic_route = self._server.unregister_dynamic_route
150        self.manifest.name = "Streamserver"
151        self.manifest.description = (
152            "Music Assistant's core controller that is responsible for "
153            "streaming audio to players on the local network."
154        )
155        self.manifest.icon = "cast-audio"
156        self.announcements: dict[str, AnnounceData] = {}
157        self._crossfade_data: dict[str, CrossfadeData] = {}
158        self._bind_ip: str = "0.0.0.0"
159        self._smart_fades_mixer = SmartFadesMixer(self)
160        self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162    @property
163    def base_url(self) -> str:
164        """Return the base_url for the streamserver."""
165        return self._server.base_url
166
167    @property
168    def bind_ip(self) -> str:
169        """Return the IP address this streamserver is bound to."""
170        return self._bind_ip
171
172    @property
173    def smart_fades_mixer(self) -> SmartFadesMixer:
174        """Return the SmartFadesMixer instance."""
175        return self._smart_fades_mixer
176
177    @property
178    def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179        """Return the SmartFadesAnalyzer instance."""
180        return self._smart_fades_analyzer
181
182    async def get_config_entries(
183        self,
184        action: str | None = None,
185        values: dict[str, ConfigValueType] | None = None,
186    ) -> tuple[ConfigEntry, ...]:
187        """Return all Config Entries for this core module (if any)."""
188        ip_addresses = await get_ip_addresses(include_ipv6=True)
189        return (
190            ConfigEntry(
191                key=CONF_ALLOW_BUFFER,
192                type=ConfigEntryType.BOOLEAN,
193                default_value=CONF_ALLOW_BUFFER_DEFAULT,
194                label="Allow (in-memory) buffering of (track) audio",
195                description="By default, Music Assistant tries to be as resource "
196                "efficient as possible when streaming audio, especially considering "
197                "low-end devices such as Raspberry Pi's. This means that audio "
198                "buffering is disabled by default to reduce memory usage. \n\n"
199                "Enabling this option allows for in-memory buffering of audio, "
200                "which (massively) improves playback (and seeking) performance but it comes "
201                "at the cost of increased memory usage. "
202                "If you run Music Assistant on a capable device with enough memory, "
203                "enabling this option is strongly recommended.",
204                required=False,
205                category="playback",
206            ),
207            ConfigEntry(
208                key=CONF_VOLUME_NORMALIZATION_RADIO,
209                type=ConfigEntryType.STRING,
210                default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
211                label="Volume normalization method for radio streams",
212                options=[
213                    ConfigValueOption(x.value.replace("_", " ").title(), x.value)
214                    for x in VolumeNormalizationMode
215                ],
216                category="playback",
217            ),
218            ConfigEntry(
219                key=CONF_VOLUME_NORMALIZATION_TRACKS,
220                type=ConfigEntryType.STRING,
221                default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
222                label="Volume normalization method for tracks",
223                options=[
224                    ConfigValueOption(x.value.replace("_", " ").title(), x.value)
225                    for x in VolumeNormalizationMode
226                ],
227                category="playback",
228            ),
229            ConfigEntry(
230                key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
231                type=ConfigEntryType.FLOAT,
232                range=(-20, 10),
233                default_value=-6,
234                label="Fixed/fallback gain adjustment for radio streams",
235                category="playback",
236            ),
237            ConfigEntry(
238                key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
239                type=ConfigEntryType.FLOAT,
240                range=(-20, 10),
241                default_value=-6,
242                label="Fixed/fallback gain adjustment for tracks",
243                category="playback",
244            ),
245            ConfigEntry(
246                key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
247                type=ConfigEntryType.BOOLEAN,
248                default_value=False,
249                label="Allow crossfade between tracks from the same album",
250                description="Enabling this option allows for crossfading between tracks "
251                "that are part of the same album.",
252                category="playback",
253            ),
254            ConfigEntry(
255                key=CONF_PUBLISH_IP,
256                type=ConfigEntryType.STRING,
257                default_value=ip_addresses[0],
258                label="Published IP address",
259                description="This IP address is communicated to players where to find this server."
260                "\nMake sure that this IP can be reached by players on the local network, "
261                "otherwise audio streaming will not work.",
262                required=False,
263                category="generic",
264                advanced=True,
265                requires_reload=True,
266            ),
267            ConfigEntry(
268                key=CONF_BIND_PORT,
269                type=ConfigEntryType.INTEGER,
270                default_value=DEFAULT_PORT,
271                label="TCP Port",
272                description="The TCP port to run the server. "
273                "Make sure that this server can be reached "
274                "on the given IP and TCP port by players on the local network.",
275                category="generic",
276                advanced=True,
277                requires_reload=True,
278            ),
279            ConfigEntry(
280                key=CONF_BIND_IP,
281                type=ConfigEntryType.STRING,
282                default_value="0.0.0.0",
283                options=[ConfigValueOption(x, x) for x in {"0.0.0.0", "::", *ip_addresses}],
284                label="Bind to IP/interface",
285                description="Start the stream server on this specific interface. \n"
286                "Use 0.0.0.0 or :: to bind to all interfaces, which is the default. \n"
287                "This is an advanced setting that should normally "
288                "not be adjusted in regular setups.",
289                category="generic",
290                advanced=True,
291                required=False,
292                requires_reload=True,
293            ),
294            ConfigEntry(
295                key=CONF_SMART_FADES_LOG_LEVEL,
296                type=ConfigEntryType.STRING,
297                label="Smart Fades Log level",
298                description="Log level for the Smart Fades mixer and analyzer.",
299                options=CONF_ENTRY_LOG_LEVEL.options,
300                default_value="GLOBAL",
301                category="generic",
302                advanced=True,
303            ),
304        )
305
306    async def setup(self, config: CoreConfig) -> None:
307        """Async initialize of module."""
308        # copy log level to audio/ffmpeg loggers
309        AUDIO_LOGGER.setLevel(self.logger.level)
310        FFMPEG_LOGGER.setLevel(self.logger.level)
311        self._setup_smart_fades_logger(config)
312        # perform check for ffmpeg version
313        await check_ffmpeg_version()
314        # start the webserver
315        self.publish_port = config.get_value(CONF_BIND_PORT, DEFAULT_PORT)
316        self.publish_ip = config.get_value(CONF_PUBLISH_IP)
317        self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
318        # print a big fat message in the log where the streamserver is running
319        # because this is a common source of issues for people with more complex setups
320        self.logger.log(
321            logging.INFO if self.mass.config.onboard_done else logging.WARNING,
322            "\n\n################################################################################\n"
323            "Starting streamserver on  %s:%s\n"
324            "This is the IP address that is communicated to players.\n"
325            "If this is incorrect, audio will not play!\n"
326            "See the documentation how to configure the publish IP for the Streamserver\n"
327            "in Settings --> Core modules --> Streamserver\n"
328            "################################################################################\n",
329            self.publish_ip,
330            self.publish_port,
331        )
332        await self._server.setup(
333            bind_ip=bind_ip,
334            bind_port=cast("int", self.publish_port),
335            base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
336            static_routes=[
337                (
338                    "*",
339                    "/flow/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
340                    self.serve_queue_flow_stream,
341                ),
342                (
343                    "*",
344                    "/single/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
345                    self.serve_queue_item_stream,
346                ),
347                (
348                    "*",
349                    "/command/{queue_id}/{command}.mp3",
350                    self.serve_command_request,
351                ),
352                (
353                    "*",
354                    "/announcement/{player_id}.{fmt}",
355                    self.serve_announcement_stream,
356                ),
357                (
358                    "*",
359                    "/pluginsource/{plugin_source}/{player_id}.{fmt}",
360                    self.serve_plugin_source_stream,
361                ),
362            ],
363        )
364        # Start periodic garbage collection task
365        # This ensures memory from audio buffers and streams is cleaned up regularly
366        self.mass.call_later(900, self._periodic_garbage_collection)  # 15 minutes
367
368    async def close(self) -> None:
369        """Cleanup on exit."""
370        await self._server.close()
371
372    async def resolve_stream_url(
373        self,
374        player_id: str,
375        media: PlayerMedia,
376    ) -> str:
377        """Resolve the stream URL for the given PlayerMedia."""
378        conf_output_codec = await self.mass.config.get_player_config_value(
379            player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
380        )
381        output_codec = ContentType.try_parse(conf_output_codec or "flac")
382        fmt = output_codec.value
383        # handle raw pcm without exact format specifiers
384        if output_codec.is_pcm() and ";" not in fmt:
385            fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
386        extra_data = media.custom_data or {}
387        flow_mode = extra_data.get("flow_mode", False)
388        session_id = extra_data.get("session_id")
389        queue_item_id = media.queue_item_id
390        if not session_id or not queue_item_id:
391            raise InvalidDataError("Can not resolve stream URL: Invalid PlayerMedia data")
392        queue_id = media.source_id
393        base_path = "flow" if flow_mode else "single"
394        return f"{self._server.base_url}/{base_path}/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}"  # noqa: E501
395
396    async def get_plugin_source_url(
397        self,
398        plugin_source: PluginSource,
399        player_id: str,
400    ) -> str:
401        """Get the url for the Plugin Source stream/proxy."""
402        if plugin_source.audio_format.content_type.is_pcm():
403            fmt = ContentType.WAV.value
404        else:
405            fmt = plugin_source.audio_format.content_type.value
406        return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
407
408    async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
409        """Stream single queueitem audio to a player."""
410        self._log_request(request)
411        queue_id = request.match_info["queue_id"]
412        player_id = request.match_info["player_id"]
413        if not (queue := self.mass.player_queues.get(queue_id)):
414            raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
415        session_id = request.match_info["session_id"]
416        if queue.session_id and session_id != queue.session_id:
417            raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
418        if not (player := self.mass.players.get_player(player_id)):
419            raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
420        queue_item_id = request.match_info["queue_item_id"]
421        queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
422        if not queue_item:
423            raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
424        if not queue_item.streamdetails:
425            try:
426                queue_item.streamdetails = await get_stream_details(
427                    mass=self.mass, queue_item=queue_item
428                )
429            except Exception as e:
430                self.logger.error(
431                    "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
432                )
433                queue_item.available = False
434                raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
435
436        # pick output format based on the streamdetails and player capabilities
437        pcm_format = await self._select_pcm_format(
438            player=player,
439            streamdetails=queue_item.streamdetails,
440            smartfades_enabled=True,
441        )
442        output_format = await self.get_output_format(
443            output_format_str=request.match_info["fmt"],
444            player=player,
445            content_sample_rate=pcm_format.sample_rate,
446            content_bit_depth=pcm_format.bit_depth,
447        )
448
449        # prepare request, add some DLNA/UPNP compatible headers
450        # icy-name is sanitized to avoid a "Potential header injection attack" exception by aiohttp
451        # see https://github.com/music-assistant/support/issues/4913
452        headers = {
453            **DEFAULT_STREAM_HEADERS,
454            "icy-name": queue_item.name.replace("\n", " ").replace("\r", " ").replace("\t", " "),
455            "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000",  # noqa: E501
456            "Accept-Ranges": "none",
457            "Content-Type": f"audio/{output_format.output_format_str}",
458        }
459        resp = web.StreamResponse(
460            status=200,
461            reason="OK",
462            headers=headers,
463        )
464        resp.content_type = f"audio/{output_format.output_format_str}"
465        http_profile = await self.mass.config.get_player_config_value(
466            queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
467        )
468        if http_profile == "forced_content_length" and not queue_item.duration:
469            # just set an insane high content length to make sure the player keeps playing
470            resp.content_length = get_chunksize(output_format, 12 * 3600)
471        elif http_profile == "forced_content_length" and queue_item.duration:
472            # guess content length based on duration
473            resp.content_length = get_chunksize(output_format, queue_item.duration)
474        elif http_profile == "chunked":
475            resp.enable_chunked_encoding()
476
477        await resp.prepare(request)
478
479        # return early if this is not a GET request
480        if request.method != "GET":
481            return resp
482
483        if queue_item.media_type != MediaType.TRACK:
484            # no crossfade on non-tracks
485            smart_fades_mode = SmartFadesMode.DISABLED
486        else:
487            smart_fades_mode = await self.mass.config.get_player_config_value(
488                queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
489            )
490            standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
491                queue.queue_id, CONF_CROSSFADE_DURATION, 10
492            )
493        if (
494            smart_fades_mode != SmartFadesMode.DISABLED
495            and PlayerFeature.GAPLESS_PLAYBACK not in player.state.supported_features
496        ):
497            # crossfade is not supported on this player due to missing gapless playback
498            self.logger.warning(
499                "Crossfade disabled: Player %s does not support gapless playback, "
500                "consider enabling flow mode to enable crossfade on this player.",
501                player.state.name if player else "Unknown Player",
502            )
503            smart_fades_mode = SmartFadesMode.DISABLED
504
505        if smart_fades_mode != SmartFadesMode.DISABLED:
506            # crossfade is enabled, use special crossfaded single item stream
507            # where the crossfade of the next track is present in the stream of
508            # a single track. This only works if the player supports gapless playback!
509            audio_input = self.get_queue_item_stream_with_smartfade(
510                queue_item=queue_item,
511                pcm_format=pcm_format,
512                smart_fades_mode=smart_fades_mode,
513                standard_crossfade_duration=standard_crossfade_duration,
514            )
515        else:
516            # no crossfade, just a regular single item stream
517            audio_input = buffered(
518                self.get_queue_item_stream(
519                    queue_item=queue_item,
520                    pcm_format=pcm_format,
521                    seek_position=queue_item.streamdetails.seek_position,
522                ),
523                buffer_size=10,
524                min_buffer_before_yield=2,
525            )
526        # stream the audio
527        # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
528        # the desired output format for the player including any player specific filter params
529        # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
530        if queue_item.media_type == MediaType.RADIO:
531            # keep very short buffer for radio streams
532            # to keep them (more or less) realtime and prevent time outs
533            read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "2"]
534        else:
535            # just allow the player to buffer whatever it wants for single item streams
536            read_rate_input_args = None
537
538        first_chunk_received = False
539        bytes_sent = 0
540        async for chunk in get_ffmpeg_stream(
541            audio_input=audio_input,
542            input_format=pcm_format,
543            output_format=output_format,
544            filter_params=get_player_filter_params(
545                self.mass,
546                player_id=player.player_id,
547                input_format=pcm_format,
548                output_format=output_format,
549            ),
550            extra_input_args=read_rate_input_args,
551        ):
552            try:
553                await resp.write(chunk)
554                bytes_sent += len(chunk)
555                if not first_chunk_received:
556                    first_chunk_received = True
557                    # inform the queue that the track is now loaded in the buffer
558                    # so for example the next track can be enqueued
559                    self.mass.player_queues.track_loaded_in_buffer(
560                        queue_item.queue_id, queue_item.queue_item_id
561                    )
562            except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
563                if first_chunk_received and not player.stop_called:
564                    # Player disconnected (unexpected) after receiving at least some data
565                    # This could indicate buffering issues, network problems,
566                    # or player-specific issues
567                    bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
568                    self.logger.warning(
569                        "Player %s disconnected prematurely from stream for %s (%s) - "
570                        "error: %s, sent %d bytes, expected (approx) bytes=%d",
571                        queue.display_name,
572                        queue_item.name,
573                        queue_item.uri,
574                        err.__class__.__name__,
575                        bytes_sent,
576                        bytes_expected,
577                    )
578                break
579        if queue_item.streamdetails.stream_error:
580            self.logger.error(
581                "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
582                queue_item.name,
583                queue_item.uri,
584                queue.display_name,
585            )
586            # try to skip to the next item in the queue after a short delay
587            self.mass.call_later(5, self.mass.player_queues.next(queue_id))
588        return resp
589
590    async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
591        """Stream Queue Flow audio to player."""
592        self._log_request(request)
593        queue_id = request.match_info["queue_id"]
594        player_id = request.match_info["player_id"]
595        if not (queue := self.mass.player_queues.get(queue_id)):
596            raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
597        if not (player := self.mass.players.get_player(player_id)):
598            raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
599        start_queue_item_id = request.match_info["queue_item_id"]
600        start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
601        if not start_queue_item:
602            raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
603
604        queue.flow_mode_stream_log = []
605
606        # select the highest possible PCM settings for this player
607        flow_pcm_format = await self._select_flow_format(player)
608
609        # work out output format/details
610        output_format = await self.get_output_format(
611            output_format_str=request.match_info["fmt"],
612            player=player,
613            content_sample_rate=flow_pcm_format.sample_rate,
614            content_bit_depth=flow_pcm_format.bit_depth,
615        )
616        # work out ICY metadata support
617        icy_preference = self.mass.config.get_raw_player_config_value(
618            queue_id,
619            CONF_ENTRY_ENABLE_ICY_METADATA.key,
620            CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
621        )
622        enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
623        icy_meta_interval = 256000 if icy_preference == "full" else 16384
624
625        # prepare request, add some DLNA/UPNP compatible headers
626        headers = {
627            **DEFAULT_STREAM_HEADERS,
628            **ICY_HEADERS,
629            "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000",  # noqa: E501
630            "Accept-Ranges": "none",
631            "Content-Type": f"audio/{output_format.output_format_str}",
632        }
633        if enable_icy:
634            headers["icy-metaint"] = str(icy_meta_interval)
635
636        resp = web.StreamResponse(
637            status=200,
638            reason="OK",
639            headers=headers,
640        )
641        http_profile = await self.mass.config.get_player_config_value(
642            queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
643        )
644        if http_profile == "forced_content_length":
645            # just set an insane high content length to make sure the player keeps playing
646            resp.content_length = get_chunksize(output_format, 12 * 3600)
647        elif http_profile == "chunked":
648            resp.enable_chunked_encoding()
649
650        await resp.prepare(request)
651
652        # return early if this is not a GET request
653        if request.method != "GET":
654            return resp
655
656        # all checks passed, start streaming!
657        # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
658        # the desired output format for the player including any player specific filter params
659        # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
660        self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
661
662        async for chunk in get_ffmpeg_stream(
663            audio_input=self.get_queue_flow_stream(
664                queue=queue,
665                start_queue_item=start_queue_item,
666                pcm_format=flow_pcm_format,
667            ),
668            input_format=flow_pcm_format,
669            output_format=output_format,
670            filter_params=get_player_filter_params(
671                self.mass, player.player_id, flow_pcm_format, output_format
672            ),
673            # we need to slowly feed the music to avoid the player stopping and later
674            # restarting (or completely failing) the audio stream by keeping the buffer short.
675            # this is reported to be an issue especially with Chromecast players.
676            # see for example: https://github.com/music-assistant/support/issues/3717
677            # allow buffer ahead of 6 seconds and read rest in realtime
678            extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
679            chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
680        ):
681            try:
682                await resp.write(chunk)
683            except (BrokenPipeError, ConnectionResetError, ConnectionError):
684                # race condition
685                break
686
687            if not enable_icy:
688                continue
689
690            # if icy metadata is enabled, send the icy metadata after the chunk
691            if (
692                # use current item here and not buffered item, otherwise
693                # the icy metadata will be too much ahead
694                (current_item := queue.current_item)
695                and current_item.streamdetails
696                and current_item.streamdetails.stream_title
697            ):
698                title = current_item.streamdetails.stream_title
699            elif queue and current_item and current_item.name:
700                title = current_item.name
701            else:
702                title = "Music Assistant"
703            metadata = f"StreamTitle='{title}';".encode()
704            if icy_preference == "full" and current_item and current_item.image:
705                metadata += f"StreamURL='{current_item.image.path}'".encode()
706            while len(metadata) % 16 != 0:
707                metadata += b"\x00"
708            length = len(metadata)
709            length_b = chr(int(length / 16)).encode()
710            await resp.write(length_b + metadata)
711
712        return resp
713
714    async def serve_command_request(self, request: web.Request) -> web.FileResponse:
715        """Handle special 'command' request for a player."""
716        self._log_request(request)
717        queue_id = request.match_info["queue_id"]
718        command = request.match_info["command"]
719        if command == "next":
720            self.mass.create_task(self.mass.player_queues.next(queue_id))
721        return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
722
723    async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
724        """Stream announcement audio to a player."""
725        self._log_request(request)
726        player_id = request.match_info["player_id"]
727        player = self.mass.player_queues.get(player_id)
728        if not player:
729            raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
730        if not (announce_data := self.announcements.get(player_id)):
731            raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
732
733        # work out output format/details
734        fmt = request.match_info["fmt"]
735        audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
736
737        http_profile = await self.mass.config.get_player_config_value(
738            player_id, CONF_HTTP_PROFILE, default="default", return_type=str
739        )
740        if http_profile == "forced_content_length":
741            # given the fact that an announcement is just a short audio clip,
742            # just send it over completely at once so we have a fixed content length
743            data = b""
744            async for chunk in self.get_announcement_stream(
745                announcement_url=announce_data["announcement_url"],
746                output_format=audio_format,
747                pre_announce=announce_data["pre_announce"],
748                pre_announce_url=announce_data["pre_announce_url"],
749            ):
750                data += chunk
751            return web.Response(
752                body=data,
753                content_type=f"audio/{audio_format.output_format_str}",
754                headers=DEFAULT_STREAM_HEADERS,
755            )
756
757        resp = web.StreamResponse(
758            status=200,
759            reason="OK",
760            headers=DEFAULT_STREAM_HEADERS,
761        )
762        resp.content_type = f"audio/{audio_format.output_format_str}"
763        if http_profile == "chunked":
764            resp.enable_chunked_encoding()
765
766        await resp.prepare(request)
767
768        # return early if this is not a GET request
769        if request.method != "GET":
770            return resp
771
772        # all checks passed, start streaming!
773        self.logger.debug(
774            "Start serving audio stream for Announcement %s to %s",
775            announce_data["announcement_url"],
776            player.state.name,
777        )
778        async for chunk in self.get_announcement_stream(
779            announcement_url=announce_data["announcement_url"],
780            output_format=audio_format,
781            pre_announce=announce_data["pre_announce"],
782            pre_announce_url=announce_data["pre_announce_url"],
783        ):
784            try:
785                await resp.write(chunk)
786            except (BrokenPipeError, ConnectionResetError):
787                break
788
789        self.logger.debug(
790            "Finished serving audio stream for Announcement %s to %s",
791            announce_data["announcement_url"],
792            player.state.name,
793        )
794
795        return resp
796
797    async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
798        """Stream PluginSource audio to a player."""
799        self._log_request(request)
800        plugin_source_id = request.match_info["plugin_source"]
801        provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
802        if not provider:
803            raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
804        # work out output format/details
805        player_id = request.match_info["player_id"]
806        player = self.mass.players.get_player(player_id)
807        if not player:
808            raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
809        plugin_source = provider.get_source()
810        output_format = await self.get_output_format(
811            output_format_str=request.match_info["fmt"],
812            player=player,
813            content_sample_rate=plugin_source.audio_format.sample_rate,
814            content_bit_depth=plugin_source.audio_format.bit_depth,
815        )
816        headers = {
817            **DEFAULT_STREAM_HEADERS,
818            "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000",  # noqa: E501
819            "icy-name": plugin_source.name,
820            "Accept-Ranges": "none",
821            "Content-Type": f"audio/{output_format.output_format_str}",
822        }
823
824        resp = web.StreamResponse(
825            status=200,
826            reason="OK",
827            headers=headers,
828        )
829        resp.content_type = f"audio/{output_format.output_format_str}"
830        http_profile = await self.mass.config.get_player_config_value(
831            player_id, CONF_HTTP_PROFILE, default="default", return_type=str
832        )
833        if http_profile == "forced_content_length":
834            # just set an insanely high content length to make sure the player keeps playing
835            resp.content_length = get_chunksize(output_format, 12 * 3600)
836        elif http_profile == "chunked":
837            resp.enable_chunked_encoding()
838
839        await resp.prepare(request)
840
841        # return early if this is not a GET request
842        if request.method != "GET":
843            return resp
844
845        # all checks passed, start streaming!
846        if not plugin_source.audio_format:
847            raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
848        async for chunk in self.get_plugin_source_stream(
849            plugin_source_id=plugin_source_id,
850            output_format=output_format,
851            player_id=player_id,
852            player_filter_params=get_player_filter_params(
853                self.mass, player_id, plugin_source.audio_format, output_format
854            ),
855        ):
856            try:
857                await resp.write(chunk)
858            except (BrokenPipeError, ConnectionResetError, ConnectionError):
859                break
860        return resp
861
862    def get_command_url(self, player_or_queue_id: str, command: str) -> str:
863        """Get the url for the special command stream."""
864        return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
865
866    def get_announcement_url(
867        self,
868        player_id: str,
869        announce_data: AnnounceData,
870        content_type: ContentType = ContentType.MP3,
871    ) -> str:
872        """Get the url for the special announcement stream."""
873        self.announcements[player_id] = announce_data
874        # use stream server to host announcement on local network
875        # this ensures playback on all players, including ones that do not
876        # like https hosts and it also offers the pre-announce 'bell'
877        return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
878
879    def get_stream(
880        self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
881    ) -> AsyncGenerator[bytes, None]:
882        """
883        Get a stream of the given media as raw PCM audio.
884
885        This is used as helper for player providers that can consume the raw PCM
886        audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
887        """
888        # select audio source
889        if media.media_type == MediaType.ANNOUNCEMENT:
890            # special case: stream announcement
891            assert media.custom_data
892            audio_source = self.get_announcement_stream(
893                media.custom_data["announcement_url"],
894                output_format=pcm_format,
895                pre_announce=media.custom_data["pre_announce"],
896                pre_announce_url=media.custom_data["pre_announce_url"],
897            )
898        elif media.media_type == MediaType.PLUGIN_SOURCE:
899            # special case: plugin source stream
900            assert media.custom_data
901            audio_source = self.get_plugin_source_stream(
902                plugin_source_id=media.custom_data["source_id"],
903                output_format=pcm_format,
904                # need to pass player_id from the PlayerMedia object
905                # because this could have been a group
906                player_id=media.custom_data["player_id"],
907            )
908        elif (
909            media.media_type == MediaType.FLOW_STREAM
910            and media.source_id
911            and media.source_id.startswith(UGP_PREFIX)
912            and media.uri
913            and "/ugp/" in media.uri
914        ):
915            # special case: member player accessing UGP stream
916            # Check URI to distinguish from the UGP accessing its own stream
917            ugp_player = cast("UniversalGroupPlayer", self.mass.players.get_player(media.source_id))
918            ugp_stream = ugp_player.stream
919            assert ugp_stream is not None  # for type checker
920            if ugp_stream.base_pcm_format == pcm_format:
921                # no conversion needed
922                audio_source = ugp_stream.subscribe_raw()
923            else:
924                audio_source = ugp_stream.get_stream(output_format=pcm_format)
925        elif (
926            media.source_id
927            and media.queue_item_id
928            and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
929        ):
930            # regular queue (flow) stream request
931            queue = self.mass.player_queues.get(media.source_id)
932            assert queue
933            start_queue_item = self.mass.player_queues.get_item(
934                media.source_id, media.queue_item_id
935            )
936            assert start_queue_item
937            audio_source = self.mass.streams.get_queue_flow_stream(
938                queue=queue,
939                start_queue_item=start_queue_item,
940                pcm_format=pcm_format,
941            )
942        elif media.source_id and media.queue_item_id:
943            # single item stream (e.g. radio)
944            queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
945            assert queue_item
946            audio_source = buffered(
947                self.get_queue_item_stream(
948                    queue_item=queue_item,
949                    pcm_format=pcm_format,
950                ),
951                buffer_size=10,
952                min_buffer_before_yield=2,
953            )
954        else:
955            # assume url or some other direct path
956            # NOTE: this will fail if its an uri not playable by ffmpeg
957            audio_source = get_ffmpeg_stream(
958                audio_input=media.uri,
959                input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
960                output_format=pcm_format,
961            )
962        return audio_source
963
964    @use_buffer(buffer_size=30, min_buffer_before_yield=2)
965    async def get_queue_flow_stream(
966        self,
967        queue: PlayerQueue,
968        start_queue_item: QueueItem,
969        pcm_format: AudioFormat,
970    ) -> AsyncGenerator[bytes, None]:
971        """
972        Get a flow stream of all tracks in the queue as raw PCM audio.
973
974        yields chunks of exactly 1 second of audio in the given pcm_format.
975        """
976        # ruff: noqa: PLR0915
977        assert pcm_format.content_type.is_pcm()
978        queue_track = None
979        last_fadeout_part: bytes = b""
980        last_streamdetails: StreamDetails | None = None
981        last_play_log_entry: PlayLogEntry | None = None
982        queue.flow_mode = True
983        if not start_queue_item:
984            # this can happen in some (edge case) race conditions
985            return
986        pcm_sample_size = pcm_format.pcm_sample_size
987        if start_queue_item.media_type != MediaType.TRACK:
988            # no crossfade on non-tracks
989            smart_fades_mode = SmartFadesMode.DISABLED
990            standard_crossfade_duration = 0
991        else:
992            smart_fades_mode = await self.mass.config.get_player_config_value(
993                queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
994            )
995            standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
996                queue.queue_id, CONF_CROSSFADE_DURATION, 10
997            )
998        self.logger.info(
999            "Start Queue Flow stream for Queue %s - crossfade: %s %s",
1000            queue.display_name,
1001            smart_fades_mode,
1002            f"({standard_crossfade_duration}s)"
1003            if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
1004            else "",
1005        )
1006        total_bytes_sent = 0
1007        total_chunks_received = 0
1008
1009        while True:
1010            # get (next) queue item to stream
1011            if queue_track is None:
1012                queue_track = start_queue_item
1013            else:
1014                try:
1015                    queue_track = await self.mass.player_queues.load_next_queue_item(
1016                        queue.queue_id, queue_track.queue_item_id
1017                    )
1018                except QueueEmpty:
1019                    break
1020
1021            if queue_track.streamdetails is None:
1022                raise InvalidDataError(
1023                    "No Streamdetails known for queue item %s",
1024                    queue_track.queue_item_id,
1025                )
1026
1027            self.logger.debug(
1028                "Start Streaming queue track: %s (%s) for queue %s",
1029                queue_track.streamdetails.uri,
1030                queue_track.name,
1031                queue.display_name,
1032            )
1033            # append to play log so the queue controller can work out which track is playing
1034            play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1035            queue.flow_mode_stream_log.append(play_log_entry)
1036            # calculate crossfade buffer size
1037            crossfade_buffer_duration = (
1038                SMART_CROSSFADE_DURATION
1039                if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1040                else standard_crossfade_duration
1041            )
1042            crossfade_buffer_duration = min(
1043                crossfade_buffer_duration,
1044                int(queue_track.streamdetails.duration / 2)
1045                if queue_track.streamdetails.duration
1046                else crossfade_buffer_duration,
1047            )
1048            # Ensure crossfade buffer size is aligned to frame boundaries
1049            # Frame size = bytes_per_sample * channels
1050            bytes_per_sample = pcm_format.bit_depth // 8
1051            frame_size = bytes_per_sample * pcm_format.channels
1052            crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1053            # Round down to nearest frame boundary
1054            crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1055
1056            bytes_written = 0
1057            buffer = b""
1058            # handle incoming audio chunks
1059            first_chunk_received = False
1060            # buffer size needs to be big enough to include the crossfade part
1061
1062            async for chunk in self.get_queue_item_stream(
1063                queue_track,
1064                pcm_format=pcm_format,
1065                seek_position=queue_track.streamdetails.seek_position,
1066                raise_on_error=False,
1067            ):
1068                total_chunks_received += 1
1069                if not first_chunk_received:
1070                    first_chunk_received = True
1071                    # inform the queue that the track is now loaded in the buffer
1072                    # so the next track can be preloaded
1073                    self.mass.player_queues.track_loaded_in_buffer(
1074                        queue.queue_id, queue_track.queue_item_id
1075                    )
1076                if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1077                    # we want a stream to start as quickly as possible
1078                    # so for the first 10 chunks we keep a very short buffer
1079                    req_buffer_size = pcm_format.pcm_sample_size
1080                else:
1081                    req_buffer_size = (
1082                        pcm_sample_size
1083                        if smart_fades_mode == SmartFadesMode.DISABLED
1084                        else crossfade_buffer_size
1085                    )
1086
1087                # ALWAYS APPEND CHUNK TO BUFFER
1088                buffer += chunk
1089                del chunk
1090                if len(buffer) < req_buffer_size:
1091                    # buffer is not full enough, move on
1092                    # yield control to event loop with 10ms delay
1093                    await asyncio.sleep(0.01)
1094                    continue
1095
1096                ####  HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1097                if last_fadeout_part and last_streamdetails:
1098                    # perform crossfade
1099                    fadein_part = buffer[:crossfade_buffer_size]
1100                    remaining_bytes = buffer[crossfade_buffer_size:]
1101                    # Use the mixer to handle all crossfade logic
1102                    crossfade_part = await self._smart_fades_mixer.mix(
1103                        fade_in_part=fadein_part,
1104                        fade_out_part=last_fadeout_part,
1105                        fade_in_streamdetails=queue_track.streamdetails,
1106                        fade_out_streamdetails=last_streamdetails,
1107                        pcm_format=pcm_format,
1108                        standard_crossfade_duration=standard_crossfade_duration,
1109                        mode=smart_fades_mode,
1110                    )
1111                    # because the crossfade exists of both the fadein and fadeout part
1112                    # we need to correct the bytes_written accordingly so the duration
1113                    # calculations at the end of the track are correct
1114                    crossfade_part_len = len(crossfade_part)
1115                    bytes_written += int(crossfade_part_len / 2)
1116                    if last_play_log_entry:
1117                        assert last_play_log_entry.seconds_streamed is not None
1118                        last_play_log_entry.seconds_streamed += (
1119                            crossfade_part_len / 2 / pcm_sample_size
1120                        )
1121                    # yield crossfade_part (in pcm_sample_size chunks)
1122                    for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1123                        yield _chunk
1124                        del _chunk
1125                    del crossfade_part
1126                    # also write the leftover bytes from the crossfade action
1127                    if remaining_bytes:
1128                        yield remaining_bytes
1129                        bytes_written += len(remaining_bytes)
1130                        del remaining_bytes
1131                    # clear vars
1132                    last_fadeout_part = b""
1133                    last_streamdetails = None
1134                    buffer = b""
1135
1136                #### OTHER: enough data in buffer, feed to output
1137                while len(buffer) > req_buffer_size:
1138                    yield buffer[:pcm_sample_size]
1139                    bytes_written += pcm_sample_size
1140                    buffer = buffer[pcm_sample_size:]
1141
1142            #### HANDLE END OF TRACK
1143            if last_fadeout_part:
1144                # edge case: we did not get enough data to make the crossfade
1145                for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1146                    yield _chunk
1147                    del _chunk
1148                bytes_written += len(last_fadeout_part)
1149                last_fadeout_part = b""
1150            if self._crossfade_allowed(
1151                queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1152            ):
1153                # if crossfade is enabled, save fadeout part to pickup for next track
1154                last_fadeout_part = buffer[-crossfade_buffer_size:]
1155                last_streamdetails = queue_track.streamdetails
1156                last_play_log_entry = play_log_entry
1157                remaining_bytes = buffer[:-crossfade_buffer_size]
1158                if remaining_bytes:
1159                    yield remaining_bytes
1160                    bytes_written += len(remaining_bytes)
1161                del remaining_bytes
1162            elif buffer:
1163                # no crossfade enabled, just yield the buffer last part
1164                bytes_written += len(buffer)
1165                for _chunk in divide_chunks(buffer, pcm_sample_size):
1166                    yield _chunk
1167                    del _chunk
1168            # make sure the buffer gets cleaned up
1169            del buffer
1170
1171            # update duration details based on the actual pcm data we sent
1172            # this also accounts for crossfade and silence stripping
1173            seconds_streamed = bytes_written / pcm_sample_size
1174            queue_track.streamdetails.seconds_streamed = seconds_streamed
1175            queue_track.streamdetails.duration = int(
1176                queue_track.streamdetails.seek_position + seconds_streamed
1177            )
1178            play_log_entry.seconds_streamed = seconds_streamed
1179            play_log_entry.duration = queue_track.streamdetails.duration
1180            total_bytes_sent += bytes_written
1181            self.logger.debug(
1182                "Finished Streaming queue track: %s (%s) on queue %s",
1183                queue_track.streamdetails.uri,
1184                queue_track.name,
1185                queue.display_name,
1186            )
1187        #### HANDLE END OF QUEUE FLOW STREAM
1188        # end of queue flow: make sure we yield the last_fadeout_part
1189        if last_fadeout_part:
1190            for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1191                yield _chunk
1192                del _chunk
1193            # correct seconds streamed/duration
1194            last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1195            streamdetails = queue_track.streamdetails
1196            assert streamdetails is not None
1197            streamdetails.seconds_streamed = (
1198                streamdetails.seconds_streamed or 0
1199            ) + last_part_seconds
1200            streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1201            last_fadeout_part = b""
1202        total_bytes_sent += bytes_written
1203        self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1204
1205    async def get_announcement_stream(
1206        self,
1207        announcement_url: str,
1208        output_format: AudioFormat,
1209        pre_announce: bool | str = False,
1210        pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1211    ) -> AsyncGenerator[bytes, None]:
1212        """Get the special announcement stream."""
1213        announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1214        # we are doing announcement in PCM first to avoid multiple encodings
1215        # when mixing pre-announce and announcement
1216        # also we have to deal with some TTS sources being super slow in delivering audio
1217        # so we take an approach where we start fetching the announcement in the background
1218        # while we can already start playing the pre-announce sound (if any)
1219
1220        pcm_format = (
1221            output_format
1222            if output_format.content_type.is_pcm()
1223            else AudioFormat(
1224                sample_rate=output_format.sample_rate,
1225                content_type=ContentType.PCM_S16LE,
1226                bit_depth=16,
1227                channels=output_format.channels,
1228            )
1229        )
1230
1231        async def fetch_announcement() -> None:
1232            fmt = announcement_url.rsplit(".")[-1]
1233            async for chunk in get_ffmpeg_stream(
1234                audio_input=announcement_url,
1235                input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1236                output_format=pcm_format,
1237                chunk_size=get_chunksize(pcm_format, 1),
1238            ):
1239                await announcement_data.put(chunk)
1240            await announcement_data.put(None)  # signal end of stream
1241
1242        self.mass.create_task(fetch_announcement())
1243
1244        async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1245            """Generate the PCM audio stream for the announcement + optional pre-announce."""
1246            if pre_announce:
1247                async for chunk in get_ffmpeg_stream(
1248                    audio_input=pre_announce_url,
1249                    input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1250                    output_format=pcm_format,
1251                    chunk_size=get_chunksize(pcm_format, 1),
1252                ):
1253                    yield chunk
1254            # pad silence while we're waiting for the announcement to be ready
1255            while announcement_data.empty():
1256                yield b"\0" * int(
1257                    pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1258                )
1259                await asyncio.sleep(0.1)
1260            # stream announcement
1261            while True:
1262                announcement_chunk = await announcement_data.get()
1263                if announcement_chunk is None:
1264                    break
1265                yield announcement_chunk
1266
1267        if output_format == pcm_format:
1268            # no need to re-encode, just yield the raw PCM stream
1269            async for chunk in _announcement_stream():
1270                yield chunk
1271            return
1272
1273        # stream final announcement in requested output format
1274        async for chunk in get_ffmpeg_stream(
1275            audio_input=_announcement_stream(),
1276            input_format=pcm_format,
1277            output_format=output_format,
1278        ):
1279            yield chunk
1280
1281    async def get_plugin_source_stream(
1282        self,
1283        plugin_source_id: str,
1284        output_format: AudioFormat,
1285        player_id: str,
1286        player_filter_params: list[str] | None = None,
1287    ) -> AsyncGenerator[bytes, None]:
1288        """Get the special plugin source stream."""
1289        plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1290        if not plugin_prov:
1291            raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1292
1293        plugin_source = plugin_prov.get_source()
1294        self.logger.debug(
1295            "Start streaming PluginSource %s to %s using output format %s",
1296            plugin_source_id,
1297            player_id,
1298            output_format,
1299        )
1300        # this should already be set by the player controller, but just to be sure
1301        plugin_source.in_use_by = player_id
1302
1303        try:
1304            async for chunk in get_ffmpeg_stream(
1305                audio_input=cast(
1306                    "str | AsyncGenerator[bytes, None]",
1307                    plugin_prov.get_audio_stream(player_id)
1308                    if plugin_source.stream_type == StreamType.CUSTOM
1309                    else plugin_source.path,
1310                ),
1311                input_format=plugin_source.audio_format,
1312                output_format=output_format,
1313                filter_params=player_filter_params,
1314                extra_input_args=["-y", "-re"],
1315            ):
1316                if plugin_source.in_use_by != player_id:
1317                    # another player took over or the stream ended, stop streaming
1318                    break
1319                yield chunk
1320        finally:
1321            self.logger.debug(
1322                "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1323            )
1324            await asyncio.sleep(1)  # prevent race conditions when selecting source
1325            if plugin_source.in_use_by == player_id:
1326                # release control
1327                plugin_source.in_use_by = None
1328
1329    async def get_queue_item_stream(
1330        self,
1331        queue_item: QueueItem,
1332        pcm_format: AudioFormat,
1333        seek_position: int = 0,
1334        raise_on_error: bool = True,
1335    ) -> AsyncGenerator[bytes, None]:
1336        """Get the (PCM) audio stream for a single queue item."""
1337        # collect all arguments for ffmpeg
1338        streamdetails = queue_item.streamdetails
1339        assert streamdetails
1340        filter_params: list[str] = []
1341
1342        # handle volume normalization
1343        gain_correct: float | None = None
1344        if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1345            # volume normalization using loudnorm filter (in dynamic mode)
1346            # which also collects the measurement on the fly during playback
1347            # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1348            filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1349            filter_rule += ":print_format=json"
1350            filter_params.append(filter_rule)
1351        elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1352            # apply user defined fixed volume/gain correction
1353            config_key = (
1354                CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1355                if streamdetails.media_type == MediaType.TRACK
1356                else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1357            )
1358            gain_value = await self.mass.config.get_core_config_value(
1359                self.domain, config_key, default=0.0, return_type=float
1360            )
1361            gain_correct = round(gain_value, 2)
1362            filter_params.append(f"volume={gain_correct}dB")
1363        elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1364            # volume normalization with known loudness measurement
1365            # apply volume/gain correction
1366            target_loudness = (
1367                float(streamdetails.target_loudness)
1368                if streamdetails.target_loudness is not None
1369                else 0.0
1370            )
1371            if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1372                gain_correct = target_loudness - float(streamdetails.loudness_album)
1373            elif streamdetails.loudness is not None:
1374                gain_correct = target_loudness - float(streamdetails.loudness)
1375            else:
1376                gain_correct = 0.0
1377            gain_correct = round(gain_correct, 2)
1378            filter_params.append(f"volume={gain_correct}dB")
1379        streamdetails.volume_normalization_gain_correct = gain_correct
1380
1381        allow_buffer = bool(
1382            self.mass.config.get_raw_core_config_value(
1383                self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1384            )
1385            and streamdetails.duration
1386        )
1387
1388        self.logger.debug(
1389            "Starting queue item stream for %s (%s)"
1390            " - using buffer: %s"
1391            " - using fade-in: %s"
1392            " - using volume normalization: %s",
1393            queue_item.name,
1394            streamdetails.uri,
1395            allow_buffer,
1396            streamdetails.fade_in,
1397            streamdetails.volume_normalization_mode,
1398        )
1399        if allow_buffer:
1400            media_stream_gen = get_buffered_media_stream(
1401                self.mass,
1402                streamdetails=streamdetails,
1403                pcm_format=pcm_format,
1404                seek_position=int(seek_position),
1405                filter_params=filter_params,
1406            )
1407        else:
1408            media_stream_gen = get_media_stream(
1409                self.mass,
1410                streamdetails=streamdetails,
1411                pcm_format=pcm_format,
1412                seek_position=int(seek_position),
1413                filter_params=filter_params,
1414            )
1415
1416        first_chunk_received = False
1417        fade_in_buffer = b""
1418        bytes_received = 0
1419        finished = False
1420        stream_started_at = asyncio.get_event_loop().time()
1421        try:
1422            async for chunk in media_stream_gen:
1423                bytes_received += len(chunk)
1424                if not first_chunk_received:
1425                    first_chunk_received = True
1426                    self.logger.debug(
1427                        "First audio chunk received for %s (%s) after %.2f seconds",
1428                        queue_item.name,
1429                        streamdetails.uri,
1430                        asyncio.get_event_loop().time() - stream_started_at,
1431                    )
1432                # handle optional fade-in
1433                if streamdetails.fade_in:
1434                    if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1435                        fade_in_buffer += chunk
1436                    elif fade_in_buffer:
1437                        async for fade_chunk in get_ffmpeg_stream(
1438                            # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1439                            # but FFMpeg class actually accepts bytes too. This works at
1440                            # runtime but needs type: ignore for mypy.
1441                            audio_input=fade_in_buffer + chunk,  # type: ignore[arg-type]
1442                            input_format=pcm_format,
1443                            output_format=pcm_format,
1444                            filter_params=["afade=type=in:start_time=0:duration=3"],
1445                        ):
1446                            yield fade_chunk
1447                    fade_in_buffer = b""
1448                    streamdetails.fade_in = False
1449                else:
1450                    yield chunk
1451                # help garbage collection by explicitly deleting chunk
1452                del chunk
1453            finished = True
1454        except AudioError as err:
1455            streamdetails.stream_error = True
1456            queue_item.available = False
1457            if raise_on_error:
1458                raise
1459            # yes, we swallow the error here after logging it
1460            # so the outer stream can handle it gracefully
1461            self.logger.error(
1462                "AudioError while streaming queue item %s (%s): %s",
1463                queue_item.name,
1464                streamdetails.uri,
1465                err,
1466            )
1467        finally:
1468            # determine how many seconds we've streamed
1469            # for pcm output we can calculate this easily
1470            seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1471            streamdetails.seconds_streamed = seconds_streamed
1472            self.logger.debug(
1473                "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1474                "aborted" if not finished else "finished",
1475                streamdetails.uri,
1476                asyncio.get_event_loop().time() - stream_started_at,
1477                seconds_streamed,
1478            )
1479            # report stream to provider
1480            if (finished or seconds_streamed >= 90) and (
1481                music_prov := self.mass.get_provider(streamdetails.provider)
1482            ):
1483                if TYPE_CHECKING:  # avoid circular import
1484                    assert isinstance(music_prov, MusicProvider)
1485                self.mass.create_task(music_prov.on_streamed(streamdetails))
1486
1487    @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1488    async def get_queue_item_stream_with_smartfade(
1489        self,
1490        queue_item: QueueItem,
1491        pcm_format: AudioFormat,
1492        smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1493        standard_crossfade_duration: int = 10,
1494    ) -> AsyncGenerator[bytes, None]:
1495        """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1496        queue = self.mass.player_queues.get(queue_item.queue_id)
1497        if not queue:
1498            raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1499
1500        streamdetails = queue_item.streamdetails
1501        assert streamdetails
1502        crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1503
1504        if crossfade_data and streamdetails.seek_position > 0:
1505            # don't do crossfade when seeking into track
1506            crossfade_data = None
1507        if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1508            # edge case alert: the next item changed just while we were preloading/crossfading
1509            self.logger.warning(
1510                "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1511            )
1512            crossfade_data = None
1513
1514        self.logger.debug(
1515            "Start Streaming queue track: %s (%s) for queue %s "
1516            "- crossfade mode: %s "
1517            "- crossfading from previous track: %s ",
1518            queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1519            queue_item.name,
1520            queue.display_name,
1521            smart_fades_mode,
1522            "true" if crossfade_data else "false",
1523        )
1524
1525        buffer = b""
1526        bytes_written = 0
1527        # calculate crossfade buffer size
1528        crossfade_buffer_duration = (
1529            SMART_CROSSFADE_DURATION
1530            if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1531            else standard_crossfade_duration
1532        )
1533        crossfade_buffer_duration = min(
1534            crossfade_buffer_duration,
1535            int(streamdetails.duration / 2)
1536            if streamdetails.duration
1537            else crossfade_buffer_duration,
1538        )
1539        # Ensure crossfade buffer size is aligned to frame boundaries
1540        # Frame size = bytes_per_sample * channels
1541        bytes_per_sample = pcm_format.bit_depth // 8
1542        frame_size = bytes_per_sample * pcm_format.channels
1543        crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1544        # Round down to nearest frame boundary
1545        crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1546        fade_out_data: bytes | None = None
1547
1548        if crossfade_data:
1549            # Calculate discard amount in seconds (format-independent)
1550            # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1551            fade_in_duration_seconds = (
1552                crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1553            )
1554            discard_seconds = int(fade_in_duration_seconds) - 1
1555            # Calculate discard amounts in CURRENT track's format
1556            discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1557            # Convert fade_in_size to current track's format for correct leftover calculation
1558            fade_in_size_in_current_format = int(
1559                fade_in_duration_seconds * pcm_format.pcm_sample_size
1560            )
1561            discard_leftover = fade_in_size_in_current_format - discard_bytes
1562        else:
1563            discard_seconds = streamdetails.seek_position
1564            discard_leftover = 0
1565        total_chunks_received = 0
1566        req_buffer_size = crossfade_buffer_size
1567        async for chunk in self.get_queue_item_stream(
1568            queue_item, pcm_format, seek_position=discard_seconds
1569        ):
1570            total_chunks_received += 1
1571            if discard_leftover:
1572                # discard leftover bytes from crossfade data
1573                chunk = chunk[discard_leftover:]  # noqa: PLW2901
1574                discard_leftover = 0
1575
1576            if total_chunks_received < 10:
1577                # we want a stream to start as quickly as possible
1578                # so for the first 10 chunks we keep a very short buffer
1579                req_buffer_size = pcm_format.pcm_sample_size
1580            else:
1581                req_buffer_size = crossfade_buffer_size
1582
1583            # ALWAYS APPEND CHUNK TO BUFFER
1584            buffer += chunk
1585            del chunk
1586            if len(buffer) < req_buffer_size:
1587                # buffer is not full enough, move on
1588                continue
1589
1590            ####  HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1591            if crossfade_data:
1592                # send the (second half of the) crossfade data
1593                if crossfade_data.pcm_format != pcm_format:
1594                    # edge case: pcm format mismatch, we need to resample
1595                    self.logger.debug(
1596                        "Resampling crossfade data from %s to %s for queue %s",
1597                        crossfade_data.pcm_format.sample_rate,
1598                        pcm_format.sample_rate,
1599                        queue.display_name,
1600                    )
1601                    resampled_data = await resample_pcm_audio(
1602                        crossfade_data.data,
1603                        crossfade_data.pcm_format,
1604                        pcm_format,
1605                    )
1606                    if resampled_data:
1607                        for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1608                            yield _chunk
1609                        bytes_written += len(resampled_data)
1610                    else:
1611                        # Resampling failed, error already logged in resample_pcm_audio
1612                        # Skip crossfade data entirely - stream continues without it
1613                        self.logger.warning(
1614                            "Skipping crossfade data for queue %s due to resampling failure",
1615                            queue.display_name,
1616                        )
1617                else:
1618                    for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1619                        yield _chunk
1620                    bytes_written += len(crossfade_data.data)
1621                # clear vars
1622                crossfade_data = None
1623
1624            #### OTHER: enough data in buffer, feed to output
1625            while len(buffer) > req_buffer_size:
1626                yield buffer[: pcm_format.pcm_sample_size]
1627                bytes_written += pcm_format.pcm_sample_size
1628                buffer = buffer[pcm_format.pcm_sample_size :]
1629
1630        #### HANDLE END OF TRACK
1631
1632        if crossfade_data:
1633            # edge case: we did not get enough data to send the crossfade data
1634            # send the (second half of the) crossfade data
1635            if crossfade_data.pcm_format != pcm_format:
1636                # (yet another) edge case: pcm format mismatch, we need to resample
1637                self.logger.debug(
1638                    "Resampling remaining crossfade data from %s to %s for queue %s",
1639                    crossfade_data.pcm_format.sample_rate,
1640                    pcm_format.sample_rate,
1641                    queue.display_name,
1642                )
1643                resampled_crossfade_data = await resample_pcm_audio(
1644                    crossfade_data.data,
1645                    crossfade_data.pcm_format,
1646                    pcm_format,
1647                )
1648                if resampled_crossfade_data:
1649                    crossfade_data.data = resampled_crossfade_data
1650                else:
1651                    # Resampling failed, error already logged in resample_pcm_audio
1652                    # Skip the crossfade data entirely
1653                    self.logger.warning(
1654                        "Skipping remaining crossfade data for queue %s due to resampling failure",
1655                        queue.display_name,
1656                    )
1657                    crossfade_data = None
1658            if crossfade_data:
1659                for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1660                    yield _chunk
1661                bytes_written += len(crossfade_data.data)
1662                crossfade_data = None
1663
1664        # get next track for crossfade
1665        next_queue_item: QueueItem | None
1666        try:
1667            self.logger.debug(
1668                "Preloading NEXT track for crossfade for queue %s",
1669                queue.display_name,
1670            )
1671            next_queue_item = await self.mass.player_queues.load_next_queue_item(
1672                queue.queue_id, queue_item.queue_item_id
1673            )
1674            # set index_in_buffer to prevent our next track is overwritten while preloading
1675            if next_queue_item.streamdetails is None:
1676                raise InvalidDataError(
1677                    f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1678                )
1679            queue.index_in_buffer = self.mass.player_queues.index_by_id(
1680                queue.queue_id, next_queue_item.queue_item_id
1681            )
1682            queue_player = self.mass.players.get_player(queue.queue_id)
1683            assert queue_player is not None
1684            next_queue_item_pcm_format = await self._select_pcm_format(
1685                player=queue_player,
1686                streamdetails=next_queue_item.streamdetails,
1687                smartfades_enabled=True,
1688            )
1689        except QueueEmpty:
1690            # end of queue reached, no next item
1691            next_queue_item = None
1692
1693        if not next_queue_item or not self._crossfade_allowed(
1694            queue_item,
1695            smart_fades_mode=smart_fades_mode,
1696            flow_mode=False,
1697            next_queue_item=next_queue_item,
1698            sample_rate=pcm_format.sample_rate,
1699            next_sample_rate=next_queue_item_pcm_format.sample_rate,
1700        ):
1701            # no crossfade enabled/allowed, just yield the buffer last part
1702            bytes_written += len(buffer)
1703            for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1704                yield _chunk
1705        else:
1706            # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1707            fade_out_data = buffer
1708            buffer = b""
1709            try:
1710                async for chunk in self.get_queue_item_stream(
1711                    next_queue_item, next_queue_item_pcm_format
1712                ):
1713                    # append to buffer until we reach crossfade size
1714                    # we only need the first X seconds of the NEXT track so we can
1715                    # perform the crossfade.
1716                    # the crossfaded audio of the previous and next track will be
1717                    # sent in two equal parts: first half now, second half
1718                    # when the next track starts. We use CrossfadeData to store
1719                    # the second half to be picked up by the next track's stream generator.
1720                    # Note that we more or less expect the user to have enabled the in-memory
1721                    # buffer so we can keep the next track's audio data in memory.
1722                    buffer += chunk
1723                    del chunk
1724                    if len(buffer) >= crossfade_buffer_size:
1725                        break
1726                ####  HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1727                # Store original buffer size before any resampling for fade_in_size calculation
1728                # This size is in the next track's original format which is what we need
1729                original_buffer_size = len(buffer)
1730                if next_queue_item_pcm_format != pcm_format:
1731                    # edge case: pcm format mismatch, we need to resample the next track's
1732                    # beginning part before crossfading
1733                    self.logger.debug(
1734                        "Resampling next track's crossfade from %s to %s for queue %s",
1735                        next_queue_item_pcm_format.sample_rate,
1736                        pcm_format.sample_rate,
1737                        queue.display_name,
1738                    )
1739                    buffer = await resample_pcm_audio(
1740                        buffer,
1741                        next_queue_item_pcm_format,
1742                        pcm_format,
1743                    )
1744                # perform actual (smart fades) crossfade using mixer
1745                crossfade_bytes = await self._smart_fades_mixer.mix(
1746                    fade_in_part=buffer,
1747                    fade_out_part=fade_out_data,
1748                    fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1749                    fade_out_streamdetails=streamdetails,
1750                    pcm_format=pcm_format,
1751                    standard_crossfade_duration=standard_crossfade_duration,
1752                    mode=smart_fades_mode,
1753                )
1754                # send half of the crossfade_part (= approx the fadeout part)
1755                split_point = (len(crossfade_bytes) + 1) // 2
1756                crossfade_first = crossfade_bytes[:split_point]
1757                crossfade_second = crossfade_bytes[split_point:]
1758                del crossfade_bytes
1759                bytes_written += len(crossfade_first)
1760                for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1761                    yield _chunk
1762                # store the other half for the next track
1763                # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1764                # because it was created from the resampled buffer used for mixing.
1765                # BUT fade_in_size represents bytes in NEXT track's original format
1766                # (next_queue_item_pcm_format) because that's how much of the next track
1767                # was consumed during the crossfade. We need both formats to correctly
1768                # handle the crossfade data when the next track starts.
1769                self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1770                    data=crossfade_second,
1771                    fade_in_size=original_buffer_size,
1772                    pcm_format=pcm_format,  # Format of the data (current track)
1773                    fade_in_pcm_format=next_queue_item_pcm_format,  # Format for fade_in_size
1774                    queue_item_id=next_queue_item.queue_item_id,
1775                )
1776            except AudioError:
1777                # no crossfade possible, just yield the fade_out_data
1778                next_queue_item = None
1779                yield fade_out_data
1780                bytes_written += len(fade_out_data)
1781                del fade_out_data
1782        # make sure the buffer gets cleaned up
1783        del buffer
1784        # update duration details based on the actual pcm data we sent
1785        # this also accounts for crossfade and silence stripping
1786        seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1787        streamdetails.seconds_streamed = seconds_streamed
1788        streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1789        self.logger.debug(
1790            "Finished Streaming queue track: %s (%s) on queue %s "
1791            "- crossfade data prepared for next track: %s",
1792            streamdetails.uri,
1793            queue_item.name,
1794            queue.display_name,
1795            next_queue_item.name if next_queue_item else "N/A",
1796        )
1797
1798    def _log_request(self, request: web.Request) -> None:
1799        """Log request."""
1800        if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1801            self.logger.log(
1802                VERBOSE_LOG_LEVEL,
1803                "Got %s request to %s from %s\nheaders: %s\n",
1804                request.method,
1805                request.path,
1806                request.remote,
1807                request.headers,
1808            )
1809        else:
1810            self.logger.debug(
1811                "Got %s request to %s from %s",
1812                request.method,
1813                request.path,
1814                request.remote,
1815            )
1816
1817    async def get_output_format(
1818        self,
1819        output_format_str: str,
1820        player: Player,
1821        content_sample_rate: int,
1822        content_bit_depth: int,
1823    ) -> AudioFormat:
1824        """Parse (player specific) output format details for given format string."""
1825        content_type: ContentType = ContentType.try_parse(output_format_str)
1826        supported_rates_conf = cast(
1827            "list[tuple[str, str]]",
1828            await self.mass.config.get_player_config_value(
1829                player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1830            ),
1831        )
1832        output_channels_str = self.mass.config.get_raw_player_config_value(
1833            player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1834        )
1835        supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1836        supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1837
1838        player_max_bit_depth = max(supported_bit_depths)
1839        output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1840        if content_sample_rate in supported_sample_rates:
1841            output_sample_rate = content_sample_rate
1842        else:
1843            output_sample_rate = max(supported_sample_rates)
1844
1845        if not content_type.is_lossless():
1846            # no point in having a higher bit depth for lossy formats
1847            output_bit_depth = 16
1848            output_sample_rate = min(48000, output_sample_rate)
1849        if output_format_str == "pcm":
1850            content_type = ContentType.from_bit_depth(output_bit_depth)
1851        return AudioFormat(
1852            content_type=content_type,
1853            sample_rate=output_sample_rate,
1854            bit_depth=output_bit_depth,
1855            channels=1 if output_channels_str != "stereo" else 2,
1856        )
1857
1858    async def _select_flow_format(
1859        self,
1860        player: Player,
1861    ) -> AudioFormat:
1862        """Parse (player specific) flow stream PCM format."""
1863        supported_rates_conf = cast(
1864            "list[tuple[str, str]]",
1865            await self.mass.config.get_player_config_value(
1866                player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1867            ),
1868        )
1869        supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1870        output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1871        for sample_rate in (192000, 96000, 48000, 44100):
1872            if sample_rate in supported_sample_rates:
1873                output_sample_rate = sample_rate
1874                break
1875        return AudioFormat(
1876            content_type=INTERNAL_PCM_FORMAT.content_type,
1877            sample_rate=output_sample_rate,
1878            bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1879            channels=2,
1880        )
1881
1882    async def _select_pcm_format(
1883        self,
1884        player: Player,
1885        streamdetails: StreamDetails,
1886        smartfades_enabled: bool,
1887    ) -> AudioFormat:
1888        """Parse (player specific) stream internal PCM format."""
1889        supported_rates_conf = cast(
1890            "list[tuple[str, str]]",
1891            await self.mass.config.get_player_config_value(
1892                player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1893            ),
1894        )
1895        supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1896        # use highest supported rate within content rate
1897        output_sample_rate = max(
1898            (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1899            default=48000,  # sane/safe default
1900        )
1901        # work out pcm format based on streamdetails
1902        pcm_format = AudioFormat(
1903            sample_rate=output_sample_rate,
1904            # always use f32 internally for extra headroom for filters etc
1905            content_type=INTERNAL_PCM_FORMAT.content_type,
1906            bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1907            channels=streamdetails.audio_format.channels,
1908        )
1909        if smartfades_enabled:
1910            pcm_format.channels = 2  # force stereo for crossfading
1911
1912        return pcm_format
1913
1914    def _crossfade_allowed(
1915        self,
1916        queue_item: QueueItem,
1917        smart_fades_mode: SmartFadesMode,
1918        flow_mode: bool = False,
1919        next_queue_item: QueueItem | None = None,
1920        sample_rate: int | None = None,
1921        next_sample_rate: int | None = None,
1922    ) -> bool:
1923        """Get the crossfade config for a queue item."""
1924        if smart_fades_mode == SmartFadesMode.DISABLED:
1925            return False
1926        if not (self.mass.players.get_player(queue_item.queue_id)):
1927            return False  # just a guard
1928        if queue_item.media_type != MediaType.TRACK:
1929            self.logger.debug("Skipping crossfade: current item is not a track")
1930            return False
1931        # check if the next item is part of the same album
1932        next_item = next_queue_item or self.mass.player_queues.get_next_item(
1933            queue_item.queue_id, queue_item.queue_item_id
1934        )
1935        if not next_item:
1936            # there is no next item!
1937            return False
1938        # check if next item is a track
1939        if next_item.media_type != MediaType.TRACK:
1940            self.logger.debug("Skipping crossfade: next item is not a track")
1941            return False
1942        if (
1943            isinstance(queue_item.media_item, Track)
1944            and isinstance(next_item.media_item, Track)
1945            and queue_item.media_item.album
1946            and next_item.media_item.album
1947            and queue_item.media_item.album == next_item.media_item.album
1948            and not self.mass.config.get_raw_core_config_value(
1949                self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
1950            )
1951        ):
1952            # in general, crossfade is not desired for tracks of the same (gapless) album
1953            # because we have no accurate way to determine if the album is gapless or not,
1954            # for now we just never crossfade between tracks of the same album
1955            self.logger.debug("Skipping crossfade: next item is part of the same album")
1956            return False
1957
1958        # check if we're allowed to crossfade on different sample rates
1959        if (
1960            not flow_mode
1961            and sample_rate
1962            and next_sample_rate
1963            and sample_rate != next_sample_rate
1964            and not self.mass.config.get_raw_player_config_value(
1965                queue_item.queue_id,
1966                CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
1967                CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
1968            )
1969        ):
1970            self.logger.debug(
1971                "Skipping crossfade: player does not support gapless playback "
1972                "with different sample rates (%s vs %s)",
1973                sample_rate,
1974                next_sample_rate,
1975            )
1976            return False
1977
1978        return True
1979
1980    async def _periodic_garbage_collection(self) -> None:
1981        """Periodic garbage collection to free up memory from audio buffers and streams."""
1982        self.logger.log(
1983            VERBOSE_LOG_LEVEL,
1984            "Running periodic garbage collection...",
1985        )
1986        # Run garbage collection in executor to avoid blocking the event loop
1987        # Since this runs periodically (not in response to subprocess cleanup),
1988        # it's safe to run in a thread without causing thread-safety issues
1989        loop = asyncio.get_running_loop()
1990        collected = await loop.run_in_executor(None, gc.collect)
1991        self.logger.log(
1992            VERBOSE_LOG_LEVEL,
1993            "Garbage collection completed, collected %d objects",
1994            collected,
1995        )
1996        # Schedule next run in 15 minutes
1997        self.mass.call_later(900, self._periodic_garbage_collection)
1998
1999    def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
2000        """Set up smart fades logger level."""
2001        log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
2002        if log_level == "GLOBAL":
2003            self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2004            self.smart_fades_mixer.logger.setLevel(self.logger.level)
2005        else:
2006            self.smart_fades_analyzer.logger.setLevel(log_level)
2007            self.smart_fades_mixer.logger.setLevel(log_level)
2008