/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_HTTP_PROFILE,
49 CONF_OUTPUT_CHANNELS,
50 CONF_OUTPUT_CODEC,
51 CONF_PUBLISH_IP,
52 CONF_SAMPLE_RATES,
53 CONF_SMART_FADES_MODE,
54 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56 CONF_VOLUME_NORMALIZATION_RADIO,
57 CONF_VOLUME_NORMALIZATION_TRACKS,
58 DEFAULT_STREAM_HEADERS,
59 ICY_HEADERS,
60 INTERNAL_PCM_FORMAT,
61 SILENCE_FILE,
62 VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.helpers import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70 get_buffered_media_stream,
71 get_chunksize,
72 get_media_stream,
73 get_player_filter_params,
74 get_stream_details,
75 resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81 divide_chunks,
82 format_ip_for_url,
83 get_ip_addresses,
84 get_total_system_memory,
85)
86from music_assistant.helpers.webserver import Webserver
87from music_assistant.models.core_controller import CoreController
88from music_assistant.models.music_provider import MusicProvider
89from music_assistant.models.plugin import PluginProvider, PluginSource
90from music_assistant.models.smart_fades import SmartFadesMode
91from music_assistant.providers.universal_group.constants import UGP_PREFIX
92from music_assistant.providers.universal_group.player import UniversalGroupPlayer
93
94if TYPE_CHECKING:
95 from music_assistant_models.config_entries import CoreConfig
96 from music_assistant_models.player import PlayerMedia
97 from music_assistant_models.player_queue import PlayerQueue
98 from music_assistant_models.queue_item import QueueItem
99 from music_assistant_models.streamdetails import StreamDetails
100
101 from music_assistant.mass import MusicAssistant
102 from music_assistant.models.player import Player
103
104
105isfile = wrap(os.path.isfile)
106
107CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
108CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
109CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
110
111# Calculate total system memory once at module load time
112TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
113CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
114DEFAULT_PORT: Final[int] = 8097
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses(include_ipv6=True)
189 return (
190 ConfigEntry(
191 key=CONF_ALLOW_BUFFER,
192 type=ConfigEntryType.BOOLEAN,
193 default_value=CONF_ALLOW_BUFFER_DEFAULT,
194 label="Allow (in-memory) buffering of (track) audio",
195 description="By default, Music Assistant tries to be as resource "
196 "efficient as possible when streaming audio, especially considering "
197 "low-end devices such as Raspberry Pi's. This means that audio "
198 "buffering is disabled by default to reduce memory usage. \n\n"
199 "Enabling this option allows for in-memory buffering of audio, "
200 "which (massively) improves playback (and seeking) performance but it comes "
201 "at the cost of increased memory usage. "
202 "If you run Music Assistant on a capable device with enough memory, "
203 "enabling this option is strongly recommended.",
204 required=False,
205 category="playback",
206 ),
207 ConfigEntry(
208 key=CONF_VOLUME_NORMALIZATION_RADIO,
209 type=ConfigEntryType.STRING,
210 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
211 label="Volume normalization method for radio streams",
212 options=[
213 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
214 for x in VolumeNormalizationMode
215 ],
216 category="playback",
217 ),
218 ConfigEntry(
219 key=CONF_VOLUME_NORMALIZATION_TRACKS,
220 type=ConfigEntryType.STRING,
221 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
222 label="Volume normalization method for tracks",
223 options=[
224 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
225 for x in VolumeNormalizationMode
226 ],
227 category="playback",
228 ),
229 ConfigEntry(
230 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
231 type=ConfigEntryType.FLOAT,
232 range=(-20, 10),
233 default_value=-6,
234 label="Fixed/fallback gain adjustment for radio streams",
235 category="playback",
236 ),
237 ConfigEntry(
238 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
239 type=ConfigEntryType.FLOAT,
240 range=(-20, 10),
241 default_value=-6,
242 label="Fixed/fallback gain adjustment for tracks",
243 category="playback",
244 ),
245 ConfigEntry(
246 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
247 type=ConfigEntryType.BOOLEAN,
248 default_value=False,
249 label="Allow crossfade between tracks from the same album",
250 description="Enabling this option allows for crossfading between tracks "
251 "that are part of the same album.",
252 category="playback",
253 ),
254 ConfigEntry(
255 key=CONF_PUBLISH_IP,
256 type=ConfigEntryType.STRING,
257 default_value=ip_addresses[0],
258 label="Published IP address",
259 description="This IP address is communicated to players where to find this server."
260 "\nMake sure that this IP can be reached by players on the local network, "
261 "otherwise audio streaming will not work.",
262 required=False,
263 category="generic",
264 advanced=True,
265 requires_reload=True,
266 ),
267 ConfigEntry(
268 key=CONF_BIND_PORT,
269 type=ConfigEntryType.INTEGER,
270 default_value=DEFAULT_PORT,
271 label="TCP Port",
272 description="The TCP port to run the server. "
273 "Make sure that this server can be reached "
274 "on the given IP and TCP port by players on the local network.",
275 category="generic",
276 advanced=True,
277 requires_reload=True,
278 ),
279 ConfigEntry(
280 key=CONF_BIND_IP,
281 type=ConfigEntryType.STRING,
282 default_value="0.0.0.0",
283 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", "::", *ip_addresses}],
284 label="Bind to IP/interface",
285 description="Start the stream server on this specific interface. \n"
286 "Use 0.0.0.0 or :: to bind to all interfaces, which is the default. \n"
287 "This is an advanced setting that should normally "
288 "not be adjusted in regular setups.",
289 category="generic",
290 advanced=True,
291 required=False,
292 requires_reload=True,
293 ),
294 ConfigEntry(
295 key=CONF_SMART_FADES_LOG_LEVEL,
296 type=ConfigEntryType.STRING,
297 label="Smart Fades Log level",
298 description="Log level for the Smart Fades mixer and analyzer.",
299 options=CONF_ENTRY_LOG_LEVEL.options,
300 default_value="GLOBAL",
301 category="generic",
302 advanced=True,
303 ),
304 )
305
306 async def setup(self, config: CoreConfig) -> None:
307 """Async initialize of module."""
308 # copy log level to audio/ffmpeg loggers
309 AUDIO_LOGGER.setLevel(self.logger.level)
310 FFMPEG_LOGGER.setLevel(self.logger.level)
311 self._setup_smart_fades_logger(config)
312 # perform check for ffmpeg version
313 await check_ffmpeg_version()
314 # start the webserver
315 self.publish_port = config.get_value(CONF_BIND_PORT, DEFAULT_PORT)
316 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
317 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
318 # print a big fat message in the log where the streamserver is running
319 # because this is a common source of issues for people with more complex setups
320 self.logger.log(
321 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
322 "\n\n################################################################################\n"
323 "Starting streamserver on %s:%s\n"
324 "This is the IP address that is communicated to players.\n"
325 "If this is incorrect, audio will not play!\n"
326 "See the documentation how to configure the publish IP for the Streamserver\n"
327 "in Settings --> Core modules --> Streamserver\n"
328 "################################################################################\n",
329 self.publish_ip,
330 self.publish_port,
331 )
332 await self._server.setup(
333 bind_ip=bind_ip,
334 bind_port=cast("int", self.publish_port),
335 base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
336 static_routes=[
337 (
338 "*",
339 "/flow/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
340 self.serve_queue_flow_stream,
341 ),
342 (
343 "*",
344 "/single/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
345 self.serve_queue_item_stream,
346 ),
347 (
348 "*",
349 "/command/{queue_id}/{command}.mp3",
350 self.serve_command_request,
351 ),
352 (
353 "*",
354 "/announcement/{player_id}.{fmt}",
355 self.serve_announcement_stream,
356 ),
357 (
358 "*",
359 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
360 self.serve_plugin_source_stream,
361 ),
362 ],
363 )
364 # Start periodic garbage collection task
365 # This ensures memory from audio buffers and streams is cleaned up regularly
366 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
367
368 async def close(self) -> None:
369 """Cleanup on exit."""
370 await self._server.close()
371
372 async def resolve_stream_url(
373 self,
374 player_id: str,
375 media: PlayerMedia,
376 ) -> str:
377 """
378 Resolve the stream URL for the given PlayerMedia.
379
380 :param player_id: The (protocol) player ID requesting the stream.
381 :param media: The PlayerMedia object for which to resolve the stream URL.
382 :return: The resolved stream URL as a string.
383 """
384 protocol_player = self.mass.players.get_player(player_id)
385 conf_output_codec = cast(
386 "str",
387 protocol_player.config.get_value(CONF_OUTPUT_CODEC, default="flac")
388 if protocol_player
389 else "flac",
390 )
391 output_codec = ContentType.try_parse(conf_output_codec)
392 fmt = output_codec.value
393 # handle raw pcm without exact format specifiers
394 if output_codec.is_pcm() and ";" not in fmt:
395 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
396 extra_data = media.custom_data or {}
397 session_id = extra_data.get("session_id")
398 queue_item_id = media.queue_item_id
399 if not session_id or not queue_item_id:
400 raise InvalidDataError("Can not resolve stream URL: Invalid PlayerMedia data")
401 queue_id = media.source_id
402 crossfade_needs_flow_mode = (
403 # if the player(queue) has crossfade enabled but the player(protocol) does not support
404 # gapless playback, we need to enforce flow mode
405 queue_id
406 and (queue_player := self.mass.players.get_player(queue_id))
407 and queue_player.config.get_value(CONF_SMART_FADES_MODE) != SmartFadesMode.DISABLED
408 and protocol_player
409 and not protocol_player.supports_gapless
410 )
411 # Determine flow_mode based on the actual player's capabilities.
412 # This is done here (just-in-time) because the player's protocol determines this
413 flow_mode = (
414 protocol_player is not None
415 and (protocol_player.flow_mode or crossfade_needs_flow_mode)
416 and media.media_type not in (MediaType.RADIO, MediaType.PLUGIN_SOURCE)
417 )
418 base_path = "flow" if flow_mode else "single"
419 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}" # noqa: E501
420
421 async def get_plugin_source_url(
422 self,
423 plugin_source: PluginSource,
424 player_id: str,
425 ) -> str:
426 """Get the url for the Plugin Source stream/proxy."""
427 if plugin_source.audio_format.content_type.is_pcm():
428 fmt = ContentType.WAV.value
429 else:
430 fmt = plugin_source.audio_format.content_type.value
431 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
432
433 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
434 """Stream single queueitem audio to a player."""
435 self._log_request(request)
436 queue_id = request.match_info["queue_id"]
437 player_id = request.match_info["player_id"]
438 if not (queue := self.mass.player_queues.get(queue_id)):
439 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
440 session_id = request.match_info["session_id"]
441 if queue.session_id and session_id != queue.session_id:
442 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
443 if not (player := self.mass.players.get_player(player_id)):
444 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
445 queue_item_id = request.match_info["queue_item_id"]
446 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
447 if not queue_item:
448 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
449 if not queue_item.streamdetails:
450 try:
451 queue_item.streamdetails = await get_stream_details(
452 mass=self.mass, queue_item=queue_item
453 )
454 except Exception as e:
455 self.logger.error(
456 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
457 )
458 queue_item.available = False
459 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
460
461 # pick output format based on the streamdetails and player capabilities
462 pcm_format = await self._select_pcm_format(
463 player=player,
464 streamdetails=queue_item.streamdetails,
465 smartfades_enabled=True,
466 )
467 output_format = await self.get_output_format(
468 output_format_str=request.match_info["fmt"],
469 player=player,
470 content_sample_rate=pcm_format.sample_rate,
471 content_bit_depth=pcm_format.bit_depth,
472 )
473
474 # prepare request, add some DLNA/UPNP compatible headers
475 # icy-name is sanitized to avoid a "Potential header injection attack" exception by aiohttp
476 # see https://github.com/music-assistant/support/issues/4913
477 headers = {
478 **DEFAULT_STREAM_HEADERS,
479 "icy-name": queue_item.name.replace("\n", " ").replace("\r", " ").replace("\t", " "),
480 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
481 "Accept-Ranges": "none",
482 "Content-Type": f"audio/{output_format.output_format_str}",
483 }
484 resp = web.StreamResponse(
485 status=200,
486 reason="OK",
487 headers=headers,
488 )
489 resp.content_type = f"audio/{output_format.output_format_str}"
490 http_profile = await self.mass.config.get_player_config_value(
491 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
492 )
493 if http_profile == "forced_content_length" and not queue_item.duration:
494 # just set an insane high content length to make sure the player keeps playing
495 resp.content_length = get_chunksize(output_format, 12 * 3600)
496 elif http_profile == "forced_content_length" and queue_item.duration:
497 # guess content length based on duration
498 resp.content_length = get_chunksize(output_format, queue_item.duration)
499 elif http_profile == "chunked":
500 resp.enable_chunked_encoding()
501
502 await resp.prepare(request)
503
504 # return early if this is not a GET request
505 if request.method != "GET":
506 return resp
507
508 if queue_item.media_type != MediaType.TRACK:
509 # no crossfade on non-tracks
510 smart_fades_mode = SmartFadesMode.DISABLED
511 else:
512 smart_fades_mode = await self.mass.config.get_player_config_value(
513 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
514 )
515 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
516 queue.queue_id, CONF_CROSSFADE_DURATION, 10
517 )
518 if (
519 smart_fades_mode != SmartFadesMode.DISABLED
520 and PlayerFeature.GAPLESS_PLAYBACK not in player.state.supported_features
521 ):
522 # crossfade is not supported on this player due to missing gapless playback
523 self.logger.warning(
524 "Crossfade disabled: Player %s does not support gapless playback, "
525 "consider enabling flow mode to enable crossfade on this player.",
526 player.state.name if player else "Unknown Player",
527 )
528 smart_fades_mode = SmartFadesMode.DISABLED
529
530 if smart_fades_mode != SmartFadesMode.DISABLED:
531 # crossfade is enabled, use special crossfaded single item stream
532 # where the crossfade of the next track is present in the stream of
533 # a single track. This only works if the player supports gapless playback!
534 audio_input = self.get_queue_item_stream_with_smartfade(
535 queue_item=queue_item,
536 pcm_format=pcm_format,
537 smart_fades_mode=smart_fades_mode,
538 standard_crossfade_duration=standard_crossfade_duration,
539 )
540 else:
541 # no crossfade, just a regular single item stream
542 audio_input = self.get_queue_item_stream(
543 queue_item=queue_item,
544 pcm_format=pcm_format,
545 seek_position=queue_item.streamdetails.seek_position,
546 playback_speed=cast(
547 "float", queue_item.extra_attributes.get("playback_speed", 1.0)
548 ),
549 )
550 # stream the audio
551 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
552 # the desired output format for the player including any player specific filter params
553 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
554 first_chunk_received = False
555 bytes_sent = 0
556 async for chunk in get_ffmpeg_stream(
557 audio_input=audio_input,
558 input_format=pcm_format,
559 output_format=output_format,
560 filter_params=get_player_filter_params(
561 self.mass,
562 player_id=player.player_id,
563 input_format=pcm_format,
564 output_format=output_format,
565 ),
566 ):
567 try:
568 await resp.write(chunk)
569 bytes_sent += len(chunk)
570 if not first_chunk_received:
571 first_chunk_received = True
572 # inform the queue that the track is now loaded in the buffer
573 # so for example the next track can be enqueued
574 self.mass.player_queues.track_loaded_in_buffer(
575 queue_item.queue_id, queue_item.queue_item_id
576 )
577 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
578 if first_chunk_received and not player.stop_called:
579 # Player disconnected (unexpected) after receiving at least some data
580 # This could indicate buffering issues, network problems,
581 # or player-specific issues
582 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
583 self.logger.warning(
584 "Player %s disconnected prematurely from stream for %s (%s) - "
585 "error: %s, sent %d bytes, expected (approx) bytes=%d",
586 queue.display_name,
587 queue_item.name,
588 queue_item.uri,
589 err.__class__.__name__,
590 bytes_sent,
591 bytes_expected,
592 )
593 break
594 if queue_item.streamdetails.stream_error:
595 self.logger.error(
596 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
597 queue_item.name,
598 queue_item.uri,
599 queue.display_name,
600 )
601 # try to skip to the next item in the queue after a short delay
602 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
603 return resp
604
605 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
606 """Stream Queue Flow audio to player."""
607 self._log_request(request)
608 queue_id = request.match_info["queue_id"]
609 player_id = request.match_info["player_id"]
610 if not (queue := self.mass.player_queues.get(queue_id)):
611 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
612 if not (player := self.mass.players.get_player(player_id)):
613 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
614 start_queue_item_id = request.match_info["queue_item_id"]
615 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
616 if not start_queue_item:
617 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
618
619 queue.flow_mode_stream_log = []
620
621 # select the highest possible PCM settings for this player
622 flow_pcm_format = await self._select_flow_format(player)
623
624 # work out output format/details
625 output_format = await self.get_output_format(
626 output_format_str=request.match_info["fmt"],
627 player=player,
628 content_sample_rate=flow_pcm_format.sample_rate,
629 content_bit_depth=flow_pcm_format.bit_depth,
630 )
631 # work out ICY metadata support
632 icy_preference = self.mass.config.get_raw_player_config_value(
633 player_id,
634 CONF_ENTRY_ENABLE_ICY_METADATA.key,
635 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
636 )
637 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
638 icy_meta_interval = 256000 if icy_preference == "full" else 16384
639
640 # prepare request, add some DLNA/UPNP compatible headers
641 headers = {
642 **DEFAULT_STREAM_HEADERS,
643 **ICY_HEADERS,
644 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
645 "Accept-Ranges": "none",
646 "Content-Type": f"audio/{output_format.output_format_str}",
647 }
648 if enable_icy:
649 headers["icy-metaint"] = str(icy_meta_interval)
650
651 resp = web.StreamResponse(
652 status=200,
653 reason="OK",
654 headers=headers,
655 )
656 http_profile = await self.mass.config.get_player_config_value(
657 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
658 )
659 if http_profile == "forced_content_length":
660 # just set an insane high content length to make sure the player keeps playing
661 resp.content_length = get_chunksize(output_format, 12 * 3600)
662 elif http_profile == "chunked":
663 resp.enable_chunked_encoding()
664
665 await resp.prepare(request)
666
667 # return early if this is not a GET request
668 if request.method != "GET":
669 return resp
670
671 # all checks passed, start streaming!
672 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
673 # the desired output format for the player including any player specific filter params
674 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
675 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
676
677 async for chunk in get_ffmpeg_stream(
678 audio_input=self.get_queue_flow_stream(
679 queue=queue,
680 start_queue_item=start_queue_item,
681 pcm_format=flow_pcm_format,
682 ),
683 input_format=flow_pcm_format,
684 output_format=output_format,
685 filter_params=get_player_filter_params(
686 self.mass, player.player_id, flow_pcm_format, output_format
687 ),
688 # we need to slowly feed the music to avoid the player stopping and later
689 # restarting (or completely failing) the audio stream by keeping the buffer short.
690 # this is reported to be an issue especially with Chromecast players.
691 # see for example: https://github.com/music-assistant/support/issues/3717
692 # allow buffer ahead of 6 seconds and read rest in realtime
693 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
694 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
695 ):
696 try:
697 await resp.write(chunk)
698 except (BrokenPipeError, ConnectionResetError, ConnectionError):
699 # race condition
700 break
701
702 if not enable_icy:
703 continue
704
705 # if icy metadata is enabled, send the icy metadata after the chunk
706 if (
707 # use current item here and not buffered item, otherwise
708 # the icy metadata will be too much ahead
709 (current_item := queue.current_item)
710 and current_item.streamdetails
711 and current_item.streamdetails.stream_title
712 ):
713 title = current_item.streamdetails.stream_title
714 elif queue and current_item and current_item.name:
715 title = current_item.name
716 else:
717 title = "Music Assistant"
718 metadata = f"StreamTitle='{title}';".encode()
719 if icy_preference == "full" and current_item and current_item.image:
720 metadata += f"StreamURL='{current_item.image.path}'".encode()
721 while len(metadata) % 16 != 0:
722 metadata += b"\x00"
723 length = len(metadata)
724 length_b = chr(int(length / 16)).encode()
725 await resp.write(length_b + metadata)
726
727 return resp
728
729 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
730 """Handle special 'command' request for a player."""
731 self._log_request(request)
732 queue_id = request.match_info["queue_id"]
733 command = request.match_info["command"]
734 if command == "next":
735 self.mass.create_task(self.mass.player_queues.next(queue_id))
736 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
737
738 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
739 """Stream announcement audio to a player."""
740 self._log_request(request)
741 player_id = request.match_info["player_id"]
742 player = self.mass.player_queues.get(player_id)
743 if not player:
744 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
745 if not (announce_data := self.announcements.get(player_id)):
746 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
747
748 # work out output format/details
749 fmt = request.match_info["fmt"]
750 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
751
752 http_profile = await self.mass.config.get_player_config_value(
753 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
754 )
755 if http_profile == "forced_content_length":
756 # given the fact that an announcement is just a short audio clip,
757 # just send it over completely at once so we have a fixed content length
758 data = b""
759 async for chunk in self.get_announcement_stream(
760 announcement_url=announce_data["announcement_url"],
761 output_format=audio_format,
762 pre_announce=announce_data["pre_announce"],
763 pre_announce_url=announce_data["pre_announce_url"],
764 ):
765 data += chunk
766 return web.Response(
767 body=data,
768 content_type=f"audio/{audio_format.output_format_str}",
769 headers=DEFAULT_STREAM_HEADERS,
770 )
771
772 resp = web.StreamResponse(
773 status=200,
774 reason="OK",
775 headers=DEFAULT_STREAM_HEADERS,
776 )
777 resp.content_type = f"audio/{audio_format.output_format_str}"
778 if http_profile == "chunked":
779 resp.enable_chunked_encoding()
780
781 await resp.prepare(request)
782
783 # return early if this is not a GET request
784 if request.method != "GET":
785 return resp
786
787 # all checks passed, start streaming!
788 self.logger.debug(
789 "Start serving audio stream for Announcement %s to %s",
790 announce_data["announcement_url"],
791 player.state.name,
792 )
793 async for chunk in self.get_announcement_stream(
794 announcement_url=announce_data["announcement_url"],
795 output_format=audio_format,
796 pre_announce=announce_data["pre_announce"],
797 pre_announce_url=announce_data["pre_announce_url"],
798 ):
799 try:
800 await resp.write(chunk)
801 except (BrokenPipeError, ConnectionResetError):
802 break
803
804 self.logger.debug(
805 "Finished serving audio stream for Announcement %s to %s",
806 announce_data["announcement_url"],
807 player.state.name,
808 )
809
810 return resp
811
812 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
813 """Stream PluginSource audio to a player."""
814 self._log_request(request)
815 plugin_source_id = request.match_info["plugin_source"]
816 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
817 if not provider:
818 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
819 # work out output format/details
820 player_id = request.match_info["player_id"]
821 player = self.mass.players.get_player(player_id)
822 if not player:
823 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
824 plugin_source = provider.get_source()
825 output_format = await self.get_output_format(
826 output_format_str=request.match_info["fmt"],
827 player=player,
828 content_sample_rate=plugin_source.audio_format.sample_rate,
829 content_bit_depth=plugin_source.audio_format.bit_depth,
830 )
831 headers = {
832 **DEFAULT_STREAM_HEADERS,
833 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
834 "icy-name": plugin_source.name,
835 "Accept-Ranges": "none",
836 "Content-Type": f"audio/{output_format.output_format_str}",
837 }
838
839 resp = web.StreamResponse(
840 status=200,
841 reason="OK",
842 headers=headers,
843 )
844 resp.content_type = f"audio/{output_format.output_format_str}"
845 http_profile = await self.mass.config.get_player_config_value(
846 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
847 )
848 if http_profile == "forced_content_length":
849 # just set an insanely high content length to make sure the player keeps playing
850 resp.content_length = get_chunksize(output_format, 12 * 3600)
851 elif http_profile == "chunked":
852 resp.enable_chunked_encoding()
853
854 await resp.prepare(request)
855
856 # return early if this is not a GET request
857 if request.method != "GET":
858 return resp
859
860 # all checks passed, start streaming!
861 if not plugin_source.audio_format:
862 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
863 async for chunk in self.get_plugin_source_stream(
864 plugin_source_id=plugin_source_id,
865 output_format=output_format,
866 player_id=player_id,
867 player_filter_params=get_player_filter_params(
868 self.mass, player_id, plugin_source.audio_format, output_format
869 ),
870 ):
871 try:
872 await resp.write(chunk)
873 except (BrokenPipeError, ConnectionResetError, ConnectionError):
874 break
875 return resp
876
877 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
878 """Get the url for the special command stream."""
879 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
880
881 def get_announcement_url(
882 self,
883 player_id: str,
884 announce_data: AnnounceData,
885 content_type: ContentType = ContentType.MP3,
886 ) -> str:
887 """Get the url for the special announcement stream."""
888 self.announcements[player_id] = announce_data
889 # use stream server to host announcement on local network
890 # this ensures playback on all players, including ones that do not
891 # like https hosts and it also offers the pre-announce 'bell'
892 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
893
894 def get_stream(
895 self,
896 media: PlayerMedia,
897 pcm_format: AudioFormat,
898 player_id: str | None = None,
899 force_flow_mode: bool = False,
900 ) -> AsyncGenerator[bytes, None]:
901 """
902 Get a stream of the given media as raw PCM audio.
903
904 This is used as helper for player providers that can consume the raw PCM
905 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
906
907 :param media: The PlayerMedia to stream.
908 :param pcm_format: The desired output PCM format.
909 :param player_id: The player ID requesting the stream. Used to determine
910 if flow mode should be used based on the player's capabilities.
911 :param force_flow_mode: Force flow mode regardless of player capabilities.
912 Used for multi-client streaming scenarios that require continuous streams.
913 """
914 # select audio source
915 if media.media_type == MediaType.ANNOUNCEMENT:
916 # special case: stream announcement
917 assert media.custom_data
918 return self.get_announcement_stream(
919 media.custom_data["announcement_url"],
920 output_format=pcm_format,
921 pre_announce=media.custom_data["pre_announce"],
922 pre_announce_url=media.custom_data["pre_announce_url"],
923 )
924 if media.media_type == MediaType.PLUGIN_SOURCE:
925 # special case: plugin source stream
926 assert media.custom_data
927 return self.get_plugin_source_stream(
928 plugin_source_id=media.custom_data["source_id"],
929 output_format=pcm_format,
930 # need to pass player_id from the PlayerMedia object
931 # because this could have been a group
932 player_id=media.custom_data["player_id"],
933 )
934 if (
935 media.source_id
936 and media.source_id.startswith(UGP_PREFIX)
937 and media.uri
938 and "/ugp/" in media.uri
939 ):
940 # special case: member player accessing UGP stream
941 # Check URI to distinguish from the UGP accessing its own stream
942 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get_player(media.source_id))
943 ugp_stream = ugp_player.stream
944 assert ugp_stream is not None # for type checker
945 if ugp_stream.base_pcm_format == pcm_format:
946 # no conversion needed
947 return ugp_stream.subscribe_raw()
948 return ugp_stream.get_stream(output_format=pcm_format)
949 if media.source_id and media.queue_item_id:
950 # Queue stream request - determine flow_mode based on player capabilities
951 # or force it if explicitly requested (e.g., for multi-client streaming)
952 protocol_player = self.mass.players.get_player(player_id) if player_id else None
953 queue_id = media.source_id
954 crossfade_needs_flow_mode = (
955 # if the player(queue) has crossfade enabled but the player(protocol)
956 # does not support gapless playback, we need to enforce flow mode
957 queue_id
958 and (queue_player := self.mass.players.get_player(queue_id))
959 and queue_player.config.get_value(CONF_SMART_FADES_MODE) != SmartFadesMode.DISABLED
960 and protocol_player
961 and not protocol_player.supports_gapless
962 )
963 flow_mode = (
964 force_flow_mode
965 or (protocol_player is not None and protocol_player.flow_mode)
966 or crossfade_needs_flow_mode
967 )
968 if media.media_type == MediaType.RADIO:
969 # flow_mode for radio is pointless
970 flow_mode = False
971 if flow_mode:
972 # flow stream request
973 queue = self.mass.player_queues.get(media.source_id)
974 assert queue
975 start_queue_item = self.mass.player_queues.get_item(
976 media.source_id, media.queue_item_id
977 )
978 assert start_queue_item
979 return self.mass.streams.get_queue_flow_stream(
980 queue=queue,
981 start_queue_item=start_queue_item,
982 pcm_format=pcm_format,
983 )
984 # single item stream (e.g. radio or non-flow mode)
985 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
986 assert queue_item
987 return buffered(
988 self.get_queue_item_stream(
989 queue_item=queue_item,
990 pcm_format=pcm_format,
991 playback_speed=cast(
992 "float", queue_item.extra_attributes.get("playback_speed", 1.0)
993 ),
994 ),
995 buffer_size=10,
996 min_buffer_before_yield=2,
997 )
998 # assume url or some other direct path
999 # NOTE: this will fail if its an uri not playable by ffmpeg
1000 return get_ffmpeg_stream(
1001 audio_input=media.uri,
1002 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
1003 output_format=pcm_format,
1004 )
1005
1006 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1007 async def get_queue_flow_stream(
1008 self,
1009 queue: PlayerQueue,
1010 start_queue_item: QueueItem,
1011 pcm_format: AudioFormat,
1012 ) -> AsyncGenerator[bytes, None]:
1013 """
1014 Get a flow stream of all tracks in the queue as raw PCM audio.
1015
1016 yields chunks of exactly 1 second of audio in the given pcm_format.
1017 """
1018 # ruff: noqa: PLR0915
1019 assert pcm_format.content_type.is_pcm()
1020 queue_track = None
1021 last_fadeout_part: bytes = b""
1022 last_streamdetails: StreamDetails | None = None
1023 last_play_log_entry: PlayLogEntry | None = None
1024 queue.flow_mode = True
1025 if not start_queue_item:
1026 # this can happen in some (edge case) race conditions
1027 return
1028 pcm_sample_size = pcm_format.pcm_sample_size
1029 if start_queue_item.media_type != MediaType.TRACK:
1030 # no crossfade on non-tracks
1031 smart_fades_mode = SmartFadesMode.DISABLED
1032 standard_crossfade_duration = 0
1033 else:
1034 smart_fades_mode = await self.mass.config.get_player_config_value(
1035 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
1036 )
1037 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
1038 queue.queue_id, CONF_CROSSFADE_DURATION, 10
1039 )
1040 self.logger.info(
1041 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
1042 queue.display_name,
1043 smart_fades_mode,
1044 f"({standard_crossfade_duration}s)"
1045 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
1046 else "",
1047 )
1048 total_bytes_sent = 0
1049 total_chunks_received = 0
1050
1051 while True:
1052 # get (next) queue item to stream
1053 if queue_track is None:
1054 queue_track = start_queue_item
1055 else:
1056 try:
1057 queue_track = await self.mass.player_queues.load_next_queue_item(
1058 queue.queue_id, queue_track.queue_item_id
1059 )
1060 except QueueEmpty:
1061 break
1062
1063 if queue_track.streamdetails is None:
1064 self.logger.error(
1065 "No StreamDetails for queue item %s (%s) on queue %s - skipping track",
1066 queue_track.queue_item_id,
1067 queue_track.name,
1068 queue.display_name,
1069 )
1070 continue
1071
1072 self.logger.debug(
1073 "Start Streaming queue track: %s (%s) for queue %s",
1074 queue_track.streamdetails.uri,
1075 queue_track.name,
1076 queue.display_name,
1077 )
1078 # append to play log so the queue controller can work out which track is playing
1079 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1080 queue.flow_mode_stream_log.append(play_log_entry)
1081 # calculate crossfade buffer size
1082 crossfade_buffer_duration = (
1083 SMART_CROSSFADE_DURATION
1084 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1085 else standard_crossfade_duration
1086 )
1087 crossfade_buffer_duration = min(
1088 crossfade_buffer_duration,
1089 int(queue_track.streamdetails.duration / 2)
1090 if queue_track.streamdetails.duration
1091 else crossfade_buffer_duration,
1092 )
1093 # Ensure crossfade buffer size is aligned to frame boundaries
1094 # Frame size = bytes_per_sample * channels
1095 bytes_per_sample = pcm_format.bit_depth // 8
1096 frame_size = bytes_per_sample * pcm_format.channels
1097 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1098 # Round down to nearest frame boundary
1099 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1100
1101 bytes_written = 0
1102 buffer = b""
1103 # handle incoming audio chunks
1104 first_chunk_received = False
1105 # buffer size needs to be big enough to include the crossfade part
1106
1107 async for chunk in self.get_queue_item_stream(
1108 queue_track,
1109 pcm_format=pcm_format,
1110 seek_position=queue_track.streamdetails.seek_position,
1111 playback_speed=cast(
1112 "float", queue_track.extra_attributes.get("playback_speed", 1.0)
1113 ),
1114 raise_on_error=False,
1115 ):
1116 total_chunks_received += 1
1117 if not first_chunk_received:
1118 first_chunk_received = True
1119 # inform the queue that the track is now loaded in the buffer
1120 # so the next track can be preloaded
1121 self.mass.player_queues.track_loaded_in_buffer(
1122 queue.queue_id, queue_track.queue_item_id
1123 )
1124 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1125 # we want a stream to start as quickly as possible
1126 # so for the first 10 chunks we keep a very short buffer
1127 req_buffer_size = pcm_format.pcm_sample_size
1128 else:
1129 req_buffer_size = (
1130 pcm_sample_size
1131 if smart_fades_mode == SmartFadesMode.DISABLED
1132 else crossfade_buffer_size
1133 )
1134
1135 # ALWAYS APPEND CHUNK TO BUFFER
1136 buffer += chunk
1137 del chunk
1138 if len(buffer) < req_buffer_size:
1139 # buffer is not full enough, move on
1140 # yield control to event loop with 10ms delay
1141 await asyncio.sleep(0.01)
1142 continue
1143
1144 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1145 if last_fadeout_part and last_streamdetails:
1146 # perform crossfade
1147 fadein_part = buffer[:crossfade_buffer_size]
1148 remaining_bytes = buffer[crossfade_buffer_size:]
1149 # Use the mixer to handle all crossfade logic
1150 try:
1151 crossfade_part = await self._smart_fades_mixer.mix(
1152 fade_in_part=fadein_part,
1153 fade_out_part=last_fadeout_part,
1154 fade_in_streamdetails=queue_track.streamdetails,
1155 fade_out_streamdetails=last_streamdetails,
1156 pcm_format=pcm_format,
1157 standard_crossfade_duration=standard_crossfade_duration,
1158 mode=smart_fades_mode,
1159 )
1160 except Exception as mix_err:
1161 self.logger.warning(
1162 "Crossfade mixer failed for %s, falling back to simple concat: %s",
1163 queue_track.name,
1164 mix_err,
1165 )
1166 # Fallback: just output the fadeout part then the buffer
1167 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1168 yield _chunk
1169 bytes_written += len(_chunk)
1170 del _chunk
1171 crossfade_part = b""
1172 remaining_bytes = buffer
1173 if crossfade_part:
1174 # because the crossfade exists of both the fadein and fadeout part
1175 # we need to correct the bytes_written accordingly so the duration
1176 # calculations at the end of the track are correct
1177 crossfade_part_len = len(crossfade_part)
1178 bytes_written += int(crossfade_part_len / 2)
1179 if last_play_log_entry:
1180 assert last_play_log_entry.seconds_streamed is not None
1181 last_play_log_entry.seconds_streamed += (
1182 crossfade_part_len / 2 / pcm_sample_size
1183 )
1184 # yield crossfade_part (in pcm_sample_size chunks)
1185 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1186 yield _chunk
1187 del _chunk
1188 del crossfade_part
1189 # also write the leftover bytes from the crossfade action
1190 if remaining_bytes:
1191 yield remaining_bytes
1192 bytes_written += len(remaining_bytes)
1193 del remaining_bytes
1194 # clear vars
1195 last_fadeout_part = b""
1196 last_streamdetails = None
1197 buffer = b""
1198
1199 #### OTHER: enough data in buffer, feed to output
1200 while len(buffer) > req_buffer_size:
1201 yield buffer[:pcm_sample_size]
1202 bytes_written += pcm_sample_size
1203 buffer = buffer[pcm_sample_size:]
1204
1205 #### HANDLE END OF TRACK
1206 if not first_chunk_received:
1207 # Track failed to stream - no chunks received at all
1208 self.logger.warning(
1209 "Track %s (%s) on queue %s produced no audio data - skipping",
1210 queue_track.name,
1211 queue_track.streamdetails.uri if queue_track.streamdetails else "unknown",
1212 queue.display_name,
1213 )
1214 # Clean up and continue to next track
1215 queue_track.streamdetails.stream_error = True
1216 del buffer
1217 continue
1218 if last_fadeout_part:
1219 # edge case: we did not get enough data to make the crossfade
1220 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1221 yield _chunk
1222 del _chunk
1223 bytes_written += len(last_fadeout_part)
1224 last_fadeout_part = b""
1225 crossfade_allowed = self._crossfade_allowed(
1226 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1227 )
1228 if crossfade_allowed:
1229 # if crossfade is enabled, save fadeout part to pickup for next track
1230 last_fadeout_part = buffer[-crossfade_buffer_size:]
1231 last_streamdetails = queue_track.streamdetails
1232 last_play_log_entry = play_log_entry
1233 remaining_bytes = buffer[:-crossfade_buffer_size]
1234 if remaining_bytes:
1235 yield remaining_bytes
1236 bytes_written += len(remaining_bytes)
1237 del remaining_bytes
1238 elif smart_fades_mode != SmartFadesMode.DISABLED:
1239 self.logger.debug(
1240 "Flow mode: crossfade NOT allowed for track %s on queue %s"
1241 " - fadeout part not saved",
1242 queue_track.name,
1243 queue.display_name,
1244 )
1245 if not crossfade_allowed and buffer:
1246 # no crossfade enabled, just yield the buffer last part
1247 bytes_written += len(buffer)
1248 for _chunk in divide_chunks(buffer, pcm_sample_size):
1249 yield _chunk
1250 del _chunk
1251 # make sure the buffer gets cleaned up
1252 del buffer
1253
1254 # update duration details based on the actual pcm data we sent
1255 # this also accounts for crossfade and silence stripping
1256 seconds_streamed = bytes_written / pcm_sample_size
1257 queue_track.streamdetails.seconds_streamed = seconds_streamed
1258 queue_track.streamdetails.duration = int(
1259 queue_track.streamdetails.seek_position + seconds_streamed
1260 )
1261 play_log_entry.seconds_streamed = seconds_streamed
1262 play_log_entry.duration = queue_track.streamdetails.duration
1263 total_bytes_sent += bytes_written
1264 self.logger.debug(
1265 "Finished Streaming queue track: %s (%s) on queue %s",
1266 queue_track.streamdetails.uri,
1267 queue_track.name,
1268 queue.display_name,
1269 )
1270 #### HANDLE END OF QUEUE FLOW STREAM
1271 # end of queue flow: make sure we yield the last_fadeout_part
1272 if last_fadeout_part:
1273 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1274 yield _chunk
1275 del _chunk
1276 # correct seconds streamed/duration
1277 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1278 streamdetails = queue_track.streamdetails
1279 assert streamdetails is not None
1280 streamdetails.seconds_streamed = (
1281 streamdetails.seconds_streamed or 0
1282 ) + last_part_seconds
1283 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1284 last_fadeout_part = b""
1285 total_bytes_sent += bytes_written
1286 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1287
1288 async def get_announcement_stream(
1289 self,
1290 announcement_url: str,
1291 output_format: AudioFormat,
1292 pre_announce: bool | str = False,
1293 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1294 ) -> AsyncGenerator[bytes, None]:
1295 """Get the special announcement stream."""
1296 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1297 # we are doing announcement in PCM first to avoid multiple encodings
1298 # when mixing pre-announce and announcement
1299 # also we have to deal with some TTS sources being super slow in delivering audio
1300 # so we take an approach where we start fetching the announcement in the background
1301 # while we can already start playing the pre-announce sound (if any)
1302
1303 pcm_format = (
1304 output_format
1305 if output_format.content_type.is_pcm()
1306 else AudioFormat(
1307 sample_rate=output_format.sample_rate,
1308 content_type=ContentType.PCM_S16LE,
1309 bit_depth=16,
1310 channels=output_format.channels,
1311 )
1312 )
1313
1314 async def fetch_announcement() -> None:
1315 fmt = announcement_url.rsplit(".")[-1]
1316 async for chunk in get_ffmpeg_stream(
1317 audio_input=announcement_url,
1318 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1319 output_format=pcm_format,
1320 chunk_size=get_chunksize(pcm_format, 1),
1321 ):
1322 await announcement_data.put(chunk)
1323 await announcement_data.put(None) # signal end of stream
1324
1325 self.mass.create_task(fetch_announcement())
1326
1327 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1328 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1329 if pre_announce:
1330 async for chunk in get_ffmpeg_stream(
1331 audio_input=pre_announce_url,
1332 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1333 output_format=pcm_format,
1334 chunk_size=get_chunksize(pcm_format, 1),
1335 ):
1336 yield chunk
1337 # pad silence while we're waiting for the announcement to be ready
1338 while announcement_data.empty():
1339 yield b"\0" * int(
1340 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1341 )
1342 await asyncio.sleep(0.1)
1343 # stream announcement
1344 while True:
1345 announcement_chunk = await announcement_data.get()
1346 if announcement_chunk is None:
1347 break
1348 yield announcement_chunk
1349
1350 if output_format == pcm_format:
1351 # no need to re-encode, just yield the raw PCM stream
1352 async for chunk in _announcement_stream():
1353 yield chunk
1354 return
1355
1356 # stream final announcement in requested output format
1357 async for chunk in get_ffmpeg_stream(
1358 audio_input=_announcement_stream(),
1359 input_format=pcm_format,
1360 output_format=output_format,
1361 ):
1362 yield chunk
1363
1364 async def get_plugin_source_stream(
1365 self,
1366 plugin_source_id: str,
1367 output_format: AudioFormat,
1368 player_id: str,
1369 player_filter_params: list[str] | None = None,
1370 ) -> AsyncGenerator[bytes, None]:
1371 """Get the special plugin source stream."""
1372 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1373 if not plugin_prov:
1374 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1375
1376 plugin_source = plugin_prov.get_source()
1377 self.logger.debug(
1378 "Start streaming PluginSource %s to %s using output format %s",
1379 plugin_source_id,
1380 player_id,
1381 output_format,
1382 )
1383 # this should already be set by the player controller, but just to be sure
1384 plugin_source.in_use_by = player_id
1385
1386 try:
1387 async for chunk in get_ffmpeg_stream(
1388 audio_input=cast(
1389 "str | AsyncGenerator[bytes, None]",
1390 plugin_prov.get_audio_stream(player_id)
1391 if plugin_source.stream_type == StreamType.CUSTOM
1392 else plugin_source.path,
1393 ),
1394 input_format=plugin_source.audio_format,
1395 output_format=output_format,
1396 filter_params=player_filter_params,
1397 extra_input_args=["-y", "-re"],
1398 ):
1399 if plugin_source.in_use_by != player_id:
1400 # another player took over or the stream ended, stop streaming
1401 break
1402 yield chunk
1403 finally:
1404 self.logger.debug(
1405 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1406 )
1407 await asyncio.sleep(1) # prevent race conditions when selecting source
1408 if plugin_source.in_use_by == player_id:
1409 # release control
1410 plugin_source.in_use_by = None
1411
1412 async def get_queue_item_stream(
1413 self,
1414 queue_item: QueueItem,
1415 pcm_format: AudioFormat,
1416 seek_position: int = 0,
1417 playback_speed: float = 1.0,
1418 raise_on_error: bool = True,
1419 ) -> AsyncGenerator[bytes, None]:
1420 """Get the (PCM) audio stream for a single queue item."""
1421 # collect all arguments for ffmpeg
1422 streamdetails = queue_item.streamdetails
1423 assert streamdetails
1424 filter_params: list[str] = []
1425
1426 # handle volume normalization
1427 gain_correct: float | None = None
1428 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1429 # volume normalization using loudnorm filter (in dynamic mode)
1430 # which also collects the measurement on the fly during playback
1431 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1432 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1433 filter_rule += ":print_format=json"
1434 filter_params.append(filter_rule)
1435 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1436 # apply user defined fixed volume/gain correction
1437 config_key = (
1438 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1439 if streamdetails.media_type == MediaType.TRACK
1440 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1441 )
1442 gain_value = await self.mass.config.get_core_config_value(
1443 self.domain, config_key, default=0.0, return_type=float
1444 )
1445 gain_correct = round(gain_value, 2)
1446 filter_params.append(f"volume={gain_correct}dB")
1447 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1448 # volume normalization with known loudness measurement
1449 # apply volume/gain correction
1450 target_loudness = (
1451 float(streamdetails.target_loudness)
1452 if streamdetails.target_loudness is not None
1453 else 0.0
1454 )
1455 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1456 gain_correct = target_loudness - float(streamdetails.loudness_album)
1457 elif streamdetails.loudness is not None:
1458 gain_correct = target_loudness - float(streamdetails.loudness)
1459 else:
1460 gain_correct = 0.0
1461 gain_correct = round(gain_correct, 2)
1462 filter_params.append(f"volume={gain_correct}dB")
1463 streamdetails.volume_normalization_gain_correct = gain_correct
1464
1465 # handle playback speed
1466 if playback_speed != 1.0:
1467 filter_params.append(f"atempo={playback_speed}")
1468
1469 allow_buffer = bool(
1470 self.mass.config.get_raw_core_config_value(
1471 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1472 )
1473 and streamdetails.duration
1474 )
1475
1476 self.logger.debug(
1477 "Starting queue item stream for %s (%s)"
1478 " - using buffer: %s"
1479 " - using fade-in: %s"
1480 " - using volume normalization: %s",
1481 queue_item.name,
1482 streamdetails.uri,
1483 allow_buffer,
1484 streamdetails.fade_in,
1485 streamdetails.volume_normalization_mode,
1486 )
1487 if allow_buffer:
1488 media_stream_gen = get_buffered_media_stream(
1489 self.mass,
1490 streamdetails=streamdetails,
1491 pcm_format=pcm_format,
1492 seek_position=int(seek_position),
1493 filter_params=filter_params,
1494 )
1495 else:
1496 media_stream_gen = get_media_stream(
1497 self.mass,
1498 streamdetails=streamdetails,
1499 pcm_format=pcm_format,
1500 seek_position=int(seek_position),
1501 filter_params=filter_params,
1502 )
1503
1504 first_chunk_received = False
1505 fade_in_buffer = b""
1506 bytes_received = 0
1507 finished = False
1508 stream_started_at = asyncio.get_event_loop().time()
1509 try:
1510 async for chunk in media_stream_gen:
1511 bytes_received += len(chunk)
1512 if not first_chunk_received:
1513 first_chunk_received = True
1514 self.logger.debug(
1515 "First audio chunk received for %s (%s) after %.2f seconds",
1516 queue_item.name,
1517 streamdetails.uri,
1518 asyncio.get_event_loop().time() - stream_started_at,
1519 )
1520 # handle optional fade-in
1521 if streamdetails.fade_in:
1522 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1523 fade_in_buffer += chunk
1524 elif fade_in_buffer:
1525 async for fade_chunk in get_ffmpeg_stream(
1526 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1527 # but FFMpeg class actually accepts bytes too. This works at
1528 # runtime but needs type: ignore for mypy.
1529 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1530 input_format=pcm_format,
1531 output_format=pcm_format,
1532 filter_params=["afade=type=in:start_time=0:duration=3"],
1533 ):
1534 yield fade_chunk
1535 fade_in_buffer = b""
1536 streamdetails.fade_in = False
1537 else:
1538 yield chunk
1539 # help garbage collection by explicitly deleting chunk
1540 del chunk
1541 finished = True
1542 except AudioError as err:
1543 streamdetails.stream_error = True
1544 queue_item.available = False
1545 if raise_on_error:
1546 raise
1547 # yes, we swallow the error here after logging it
1548 # so the outer stream can handle it gracefully
1549 self.logger.error(
1550 "AudioError while streaming queue item %s (%s): %s",
1551 queue_item.name,
1552 streamdetails.uri,
1553 err,
1554 )
1555 except asyncio.CancelledError:
1556 # Don't swallow cancellation - let it propagate
1557 raise
1558 except Exception as err:
1559 # Catch any other unexpected exceptions to prevent them from
1560 # silently killing the entire queue stream
1561 streamdetails.stream_error = True
1562 if raise_on_error:
1563 raise
1564 self.logger.exception(
1565 "Unexpected error while streaming queue item %s (%s): %s",
1566 queue_item.name,
1567 streamdetails.uri,
1568 err,
1569 )
1570 finally:
1571 # determine how many seconds we've streamed
1572 # for pcm output we can calculate this easily
1573 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1574 streamdetails.seconds_streamed = seconds_streamed
1575 self.logger.debug(
1576 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1577 "aborted" if not finished else "finished",
1578 streamdetails.uri,
1579 asyncio.get_event_loop().time() - stream_started_at,
1580 seconds_streamed,
1581 )
1582 # report stream to provider
1583 if (finished or seconds_streamed >= 90) and (
1584 music_prov := self.mass.get_provider(streamdetails.provider)
1585 ):
1586 if TYPE_CHECKING: # avoid circular import
1587 assert isinstance(music_prov, MusicProvider)
1588 self.mass.create_task(music_prov.on_streamed(streamdetails))
1589
1590 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1591 async def get_queue_item_stream_with_smartfade(
1592 self,
1593 queue_item: QueueItem,
1594 pcm_format: AudioFormat,
1595 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1596 standard_crossfade_duration: int = 10,
1597 ) -> AsyncGenerator[bytes, None]:
1598 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1599 queue = self.mass.player_queues.get(queue_item.queue_id)
1600 if not queue:
1601 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1602
1603 streamdetails = queue_item.streamdetails
1604 assert streamdetails
1605 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1606 self.logger.debug(
1607 "Crossfade data pop for queue %s (track: %s): %s",
1608 queue.display_name,
1609 queue_item.name,
1610 "found" if crossfade_data else "EMPTY - no crossfade data from previous track",
1611 )
1612
1613 if crossfade_data and streamdetails.seek_position > 0:
1614 # don't do crossfade when seeking into track
1615 crossfade_data = None
1616 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1617 # edge case alert: the next item changed just while we were preloading/crossfading
1618 self.logger.warning(
1619 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1620 )
1621 crossfade_data = None
1622
1623 self.logger.debug(
1624 "Start Streaming queue track: %s (%s) for queue %s "
1625 "- crossfade mode: %s "
1626 "- crossfading from previous track: %s ",
1627 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1628 queue_item.name,
1629 queue.display_name,
1630 smart_fades_mode,
1631 "true" if crossfade_data else "false",
1632 )
1633
1634 buffer = b""
1635 bytes_written = 0
1636 # calculate crossfade buffer size
1637 crossfade_buffer_duration = (
1638 SMART_CROSSFADE_DURATION
1639 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1640 else standard_crossfade_duration
1641 )
1642 crossfade_buffer_duration = min(
1643 crossfade_buffer_duration,
1644 int(streamdetails.duration / 2)
1645 if streamdetails.duration
1646 else crossfade_buffer_duration,
1647 )
1648 # Ensure crossfade buffer size is aligned to frame boundaries
1649 # Frame size = bytes_per_sample * channels
1650 bytes_per_sample = pcm_format.bit_depth // 8
1651 frame_size = bytes_per_sample * pcm_format.channels
1652 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1653 # Round down to nearest frame boundary
1654 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1655 fade_out_data: bytes | None = None
1656
1657 if crossfade_data:
1658 # Calculate discard amount in seconds (format-independent)
1659 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1660 fade_in_duration_seconds = (
1661 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1662 )
1663 discard_seconds = int(fade_in_duration_seconds) - 1
1664 # Calculate discard amounts in CURRENT track's format
1665 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1666 # Convert fade_in_size to current track's format for correct leftover calculation
1667 fade_in_size_in_current_format = int(
1668 fade_in_duration_seconds * pcm_format.pcm_sample_size
1669 )
1670 discard_leftover = fade_in_size_in_current_format - discard_bytes
1671 else:
1672 discard_seconds = streamdetails.seek_position
1673 discard_leftover = 0
1674 total_chunks_received = 0
1675 req_buffer_size = crossfade_buffer_size
1676 async for chunk in self.get_queue_item_stream(
1677 queue_item,
1678 pcm_format,
1679 seek_position=discard_seconds,
1680 playback_speed=cast("float", queue_item.extra_attributes.get("playback_speed", 1.0)),
1681 ):
1682 total_chunks_received += 1
1683 if discard_leftover:
1684 # discard leftover bytes from crossfade data
1685 chunk = chunk[discard_leftover:] # noqa: PLW2901
1686 discard_leftover = 0
1687
1688 if total_chunks_received < 10:
1689 # we want a stream to start as quickly as possible
1690 # so for the first 10 chunks we keep a very short buffer
1691 req_buffer_size = pcm_format.pcm_sample_size
1692 else:
1693 req_buffer_size = crossfade_buffer_size
1694
1695 # ALWAYS APPEND CHUNK TO BUFFER
1696 buffer += chunk
1697 del chunk
1698 if len(buffer) < req_buffer_size:
1699 # buffer is not full enough, move on
1700 continue
1701
1702 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1703 if crossfade_data:
1704 # send the (second half of the) crossfade data
1705 if crossfade_data.pcm_format != pcm_format:
1706 # edge case: pcm format mismatch, we need to resample
1707 self.logger.debug(
1708 "Resampling crossfade data from %s to %s for queue %s",
1709 crossfade_data.pcm_format.sample_rate,
1710 pcm_format.sample_rate,
1711 queue.display_name,
1712 )
1713 resampled_data = await resample_pcm_audio(
1714 crossfade_data.data,
1715 crossfade_data.pcm_format,
1716 pcm_format,
1717 )
1718 if resampled_data:
1719 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1720 yield _chunk
1721 bytes_written += len(resampled_data)
1722 else:
1723 # Resampling failed, error already logged in resample_pcm_audio
1724 # Skip crossfade data entirely - stream continues without it
1725 self.logger.warning(
1726 "Skipping crossfade data for queue %s due to resampling failure",
1727 queue.display_name,
1728 )
1729 else:
1730 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1731 yield _chunk
1732 bytes_written += len(crossfade_data.data)
1733 # clear vars
1734 crossfade_data = None
1735
1736 #### OTHER: enough data in buffer, feed to output
1737 while len(buffer) > req_buffer_size:
1738 yield buffer[: pcm_format.pcm_sample_size]
1739 bytes_written += pcm_format.pcm_sample_size
1740 buffer = buffer[pcm_format.pcm_sample_size :]
1741
1742 #### HANDLE END OF TRACK
1743
1744 if crossfade_data:
1745 # edge case: we did not get enough data to send the crossfade data
1746 # send the (second half of the) crossfade data
1747 if crossfade_data.pcm_format != pcm_format:
1748 # (yet another) edge case: pcm format mismatch, we need to resample
1749 self.logger.debug(
1750 "Resampling remaining crossfade data from %s to %s for queue %s",
1751 crossfade_data.pcm_format.sample_rate,
1752 pcm_format.sample_rate,
1753 queue.display_name,
1754 )
1755 resampled_crossfade_data = await resample_pcm_audio(
1756 crossfade_data.data,
1757 crossfade_data.pcm_format,
1758 pcm_format,
1759 )
1760 if resampled_crossfade_data:
1761 crossfade_data.data = resampled_crossfade_data
1762 else:
1763 # Resampling failed, error already logged in resample_pcm_audio
1764 # Skip the crossfade data entirely
1765 self.logger.warning(
1766 "Skipping remaining crossfade data for queue %s due to resampling failure",
1767 queue.display_name,
1768 )
1769 crossfade_data = None
1770 if crossfade_data:
1771 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1772 yield _chunk
1773 bytes_written += len(crossfade_data.data)
1774 crossfade_data = None
1775
1776 # get next track for crossfade
1777 next_queue_item: QueueItem | None
1778 try:
1779 self.logger.debug(
1780 "Preloading NEXT track for crossfade for queue %s",
1781 queue.display_name,
1782 )
1783 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1784 queue.queue_id, queue_item.queue_item_id
1785 )
1786 # set index_in_buffer to prevent our next track is overwritten while preloading
1787 if next_queue_item.streamdetails is None:
1788 raise InvalidDataError(
1789 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1790 )
1791 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1792 queue.queue_id, next_queue_item.queue_item_id
1793 )
1794 queue_player = self.mass.players.get_player(queue.queue_id)
1795 assert queue_player is not None
1796 next_queue_item_pcm_format = await self._select_pcm_format(
1797 player=queue_player,
1798 streamdetails=next_queue_item.streamdetails,
1799 smartfades_enabled=True,
1800 )
1801 except QueueEmpty:
1802 # end of queue reached, no next item
1803 next_queue_item = None
1804
1805 crossfade_allowed = bool(next_queue_item) and self._crossfade_allowed(
1806 queue_item,
1807 smart_fades_mode=smart_fades_mode,
1808 flow_mode=False,
1809 next_queue_item=next_queue_item,
1810 sample_rate=pcm_format.sample_rate,
1811 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1812 )
1813 if not crossfade_allowed:
1814 if not next_queue_item:
1815 self.logger.debug(
1816 "Enqueue mode: no next queue item loaded (QueueEmpty) for queue %s",
1817 queue.display_name,
1818 )
1819 else:
1820 self.logger.debug(
1821 "Enqueue mode: crossfade NOT allowed for track %s -> %s on queue %s",
1822 queue_item.name,
1823 next_queue_item.name,
1824 queue.display_name,
1825 )
1826 # no crossfade enabled/allowed, just yield the buffer last part
1827 bytes_written += len(buffer)
1828 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1829 yield _chunk
1830 else:
1831 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1832 assert next_queue_item is not None
1833 fade_out_data = buffer
1834 buffer = b""
1835 try:
1836 async for chunk in self.get_queue_item_stream(
1837 next_queue_item,
1838 next_queue_item_pcm_format,
1839 playback_speed=cast(
1840 "float", queue_item.extra_attributes.get("playback_speed", 1.0)
1841 ),
1842 ):
1843 # append to buffer until we reach crossfade size
1844 # we only need the first X seconds of the NEXT track so we can
1845 # perform the crossfade.
1846 # the crossfaded audio of the previous and next track will be
1847 # sent in two equal parts: first half now, second half
1848 # when the next track starts. We use CrossfadeData to store
1849 # the second half to be picked up by the next track's stream generator.
1850 # Note that we more or less expect the user to have enabled the in-memory
1851 # buffer so we can keep the next track's audio data in memory.
1852 buffer += chunk
1853 del chunk
1854 if len(buffer) >= crossfade_buffer_size:
1855 break
1856 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1857 # Store original buffer size before any resampling for fade_in_size calculation
1858 # This size is in the next track's original format which is what we need
1859 original_buffer_size = len(buffer)
1860 if next_queue_item_pcm_format != pcm_format:
1861 # edge case: pcm format mismatch, we need to resample the next track's
1862 # beginning part before crossfading
1863 self.logger.debug(
1864 "Resampling next track's crossfade from %s to %s for queue %s",
1865 next_queue_item_pcm_format.sample_rate,
1866 pcm_format.sample_rate,
1867 queue.display_name,
1868 )
1869 buffer = await resample_pcm_audio(
1870 buffer,
1871 next_queue_item_pcm_format,
1872 pcm_format,
1873 )
1874 # perform actual (smart fades) crossfade using mixer
1875 crossfade_bytes = await self._smart_fades_mixer.mix(
1876 fade_in_part=buffer,
1877 fade_out_part=fade_out_data,
1878 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1879 fade_out_streamdetails=streamdetails,
1880 pcm_format=pcm_format,
1881 standard_crossfade_duration=standard_crossfade_duration,
1882 mode=smart_fades_mode,
1883 )
1884 # send half of the crossfade_part (= approx the fadeout part)
1885 split_point = (len(crossfade_bytes) + 1) // 2
1886 crossfade_first = crossfade_bytes[:split_point]
1887 crossfade_second = crossfade_bytes[split_point:]
1888 del crossfade_bytes
1889 bytes_written += len(crossfade_first)
1890 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1891 yield _chunk
1892 # store the other half for the next track
1893 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1894 # because it was created from the resampled buffer used for mixing.
1895 # BUT fade_in_size represents bytes in NEXT track's original format
1896 # (next_queue_item_pcm_format) because that's how much of the next track
1897 # was consumed during the crossfade. We need both formats to correctly
1898 # handle the crossfade data when the next track starts.
1899 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1900 data=crossfade_second,
1901 fade_in_size=original_buffer_size,
1902 pcm_format=pcm_format, # Format of the data (current track)
1903 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1904 queue_item_id=next_queue_item.queue_item_id,
1905 )
1906 self.logger.debug(
1907 "Crossfade data STORED for queue %s (outgoing track: %s, incoming track: %s)",
1908 queue.display_name,
1909 queue_item.name,
1910 next_queue_item.name if next_queue_item else "N/A",
1911 )
1912 except AudioError:
1913 # no crossfade possible, just yield the fade_out_data
1914 next_queue_item = None
1915 yield fade_out_data
1916 bytes_written += len(fade_out_data)
1917 del fade_out_data
1918 # make sure the buffer gets cleaned up
1919 del buffer
1920 # update duration details based on the actual pcm data we sent
1921 # this also accounts for crossfade and silence stripping
1922 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1923 streamdetails.seconds_streamed = seconds_streamed
1924 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1925 self.logger.debug(
1926 "Finished Streaming queue track: %s (%s) on queue %s "
1927 "- crossfade data prepared for next track: %s",
1928 streamdetails.uri,
1929 queue_item.name,
1930 queue.display_name,
1931 next_queue_item.name if next_queue_item else "N/A",
1932 )
1933
1934 def _log_request(self, request: web.Request) -> None:
1935 """Log request."""
1936 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1937 self.logger.log(
1938 VERBOSE_LOG_LEVEL,
1939 "Got %s request to %s from %s\nheaders: %s\n",
1940 request.method,
1941 request.path,
1942 request.remote,
1943 request.headers,
1944 )
1945 else:
1946 self.logger.debug(
1947 "Got %s request to %s from %s",
1948 request.method,
1949 request.path,
1950 request.remote,
1951 )
1952
1953 async def get_output_format(
1954 self,
1955 output_format_str: str,
1956 player: Player,
1957 content_sample_rate: int,
1958 content_bit_depth: int,
1959 ) -> AudioFormat:
1960 """Parse (player specific) output format details for given format string."""
1961 content_type: ContentType = ContentType.try_parse(output_format_str)
1962 supported_rates_conf = cast(
1963 "list[tuple[str, str]]",
1964 await self.mass.config.get_player_config_value(
1965 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1966 ),
1967 )
1968 output_channels_str = self.mass.config.get_raw_player_config_value(
1969 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1970 )
1971 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1972 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1973
1974 player_max_bit_depth = max(supported_bit_depths)
1975 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1976 if content_sample_rate in supported_sample_rates:
1977 output_sample_rate = content_sample_rate
1978 else:
1979 output_sample_rate = max(supported_sample_rates)
1980
1981 if not content_type.is_lossless():
1982 # no point in having a higher bit depth for lossy formats
1983 output_bit_depth = 16
1984 output_sample_rate = min(48000, output_sample_rate)
1985 if output_format_str == "pcm":
1986 content_type = ContentType.from_bit_depth(output_bit_depth)
1987 return AudioFormat(
1988 content_type=content_type,
1989 sample_rate=output_sample_rate,
1990 bit_depth=output_bit_depth,
1991 channels=1 if output_channels_str != "stereo" else 2,
1992 )
1993
1994 async def _select_flow_format(
1995 self,
1996 player: Player,
1997 ) -> AudioFormat:
1998 """Parse (player specific) flow stream PCM format."""
1999 supported_rates_conf = cast(
2000 "list[tuple[str, str]]",
2001 await self.mass.config.get_player_config_value(
2002 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
2003 ),
2004 )
2005 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
2006 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
2007 for sample_rate in (192000, 96000, 48000, 44100):
2008 if sample_rate in supported_sample_rates:
2009 output_sample_rate = sample_rate
2010 break
2011 return AudioFormat(
2012 content_type=INTERNAL_PCM_FORMAT.content_type,
2013 sample_rate=output_sample_rate,
2014 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
2015 channels=2,
2016 )
2017
2018 async def _select_pcm_format(
2019 self,
2020 player: Player,
2021 streamdetails: StreamDetails,
2022 smartfades_enabled: bool,
2023 ) -> AudioFormat:
2024 """Parse (player specific) stream internal PCM format."""
2025 supported_rates_conf = cast(
2026 "list[tuple[str, str]]",
2027 await self.mass.config.get_player_config_value(
2028 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
2029 ),
2030 )
2031 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
2032 # use highest supported rate within content rate
2033 output_sample_rate = max(
2034 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
2035 default=48000, # sane/safe default
2036 )
2037 # work out pcm format based on streamdetails
2038 pcm_format = AudioFormat(
2039 sample_rate=output_sample_rate,
2040 # always use f32 internally for extra headroom for filters etc
2041 content_type=INTERNAL_PCM_FORMAT.content_type,
2042 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
2043 channels=streamdetails.audio_format.channels,
2044 )
2045 if smartfades_enabled:
2046 pcm_format.channels = 2 # force stereo for crossfading
2047
2048 return pcm_format
2049
2050 def _crossfade_allowed(
2051 self,
2052 queue_item: QueueItem,
2053 smart_fades_mode: SmartFadesMode,
2054 flow_mode: bool = False,
2055 next_queue_item: QueueItem | None = None,
2056 sample_rate: int | None = None,
2057 next_sample_rate: int | None = None,
2058 ) -> bool:
2059 """Get the crossfade config for a queue item."""
2060 if smart_fades_mode == SmartFadesMode.DISABLED:
2061 return False
2062 if not (self.mass.players.get_player(queue_item.queue_id)):
2063 return False # just a guard
2064 if queue_item.media_type != MediaType.TRACK:
2065 self.logger.debug("Skipping crossfade: current item is not a track")
2066 return False
2067 # check if the next item is part of the same album
2068 next_item = next_queue_item or self.mass.player_queues.get_next_item(
2069 queue_item.queue_id, queue_item.queue_item_id
2070 )
2071 if not next_item:
2072 # there is no next item!
2073 self.logger.debug(
2074 "Crossfade not allowed: no next item found for %s (flow_mode=%s, queue_id=%s)",
2075 queue_item.name,
2076 flow_mode,
2077 queue_item.queue_id,
2078 )
2079 return False
2080 # check if next item is a track
2081 if next_item.media_type != MediaType.TRACK:
2082 self.logger.debug("Skipping crossfade: next item is not a track")
2083 return False
2084 if (
2085 isinstance(queue_item.media_item, Track)
2086 and isinstance(next_item.media_item, Track)
2087 and queue_item.media_item.album
2088 and next_item.media_item.album
2089 and queue_item.media_item.album == next_item.media_item.album
2090 and not self.mass.config.get_raw_core_config_value(
2091 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
2092 )
2093 ):
2094 # in general, crossfade is not desired for tracks of the same (gapless) album
2095 # because we have no accurate way to determine if the album is gapless or not,
2096 # for now we just never crossfade between tracks of the same album
2097 self.logger.debug("Skipping crossfade: next item is part of the same album")
2098 return False
2099
2100 # check if we're allowed to crossfade on different sample rates
2101 if (
2102 not flow_mode
2103 and sample_rate
2104 and next_sample_rate
2105 and sample_rate != next_sample_rate
2106 and not self.mass.config.get_raw_player_config_value(
2107 queue_item.queue_id,
2108 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
2109 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
2110 )
2111 ):
2112 self.logger.debug(
2113 "Skipping crossfade: player does not support gapless playback "
2114 "with different sample rates (%s vs %s)",
2115 sample_rate,
2116 next_sample_rate,
2117 )
2118 return False
2119
2120 return True
2121
2122 async def _periodic_garbage_collection(self) -> None:
2123 """Periodic garbage collection to free up memory from audio buffers and streams."""
2124 self.logger.log(
2125 VERBOSE_LOG_LEVEL,
2126 "Running periodic garbage collection...",
2127 )
2128 # Run garbage collection in executor to avoid blocking the event loop
2129 # Since this runs periodically (not in response to subprocess cleanup),
2130 # it's safe to run in a thread without causing thread-safety issues
2131 loop = asyncio.get_running_loop()
2132 collected = await loop.run_in_executor(None, gc.collect)
2133 self.logger.log(
2134 VERBOSE_LOG_LEVEL,
2135 "Garbage collection completed, collected %d objects",
2136 collected,
2137 )
2138 # Schedule next run in 15 minutes
2139 self.mass.call_later(900, self._periodic_garbage_collection)
2140
2141 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
2142 """Set up smart fades logger level."""
2143 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
2144 if log_level == "GLOBAL":
2145 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2146 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2147 else:
2148 self.smart_fades_analyzer.logger.setLevel(log_level)
2149 self.smart_fades_mixer.logger.setLevel(log_level)
2150
2151 async def cleanup_stale_queue_buffers(self, queue_id: str, current_index: int) -> None:
2152 """
2153 Clean up audio buffers for queue items that are no longer needed.
2154
2155 This clears buffers for items at index <= current_index - 2, keeping only:
2156 - The previous track (current_index - 1)
2157 - The current track (current_index)
2158 - The next track (current_index + 1, handled by preloading)
2159
2160 :param queue_id: The queue ID to clean up buffers for.
2161 :param current_index: The current playing index in the queue.
2162 """
2163 if current_index < 2:
2164 return # Nothing to clean up yet
2165
2166 queue_items = self.mass.player_queues._queue_items.get(queue_id, [])
2167 cleanup_threshold = current_index - 2
2168 buffers_cleared = 0
2169
2170 for idx, item in enumerate(queue_items):
2171 if idx > cleanup_threshold:
2172 break # No need to check further
2173 if item.streamdetails and item.streamdetails.buffer:
2174 self.logger.log(
2175 VERBOSE_LOG_LEVEL,
2176 "Clearing stale audio buffer for queue item %s (index %d) in queue %s",
2177 item.name,
2178 idx,
2179 queue_id,
2180 )
2181 await item.streamdetails.buffer.clear()
2182 item.streamdetails.buffer = None
2183 buffers_cleared += 1
2184
2185 if buffers_cleared > 0:
2186 self.logger.debug(
2187 "Cleared %d stale audio buffer(s) for queue %s (items before index %d)",
2188 buffers_cleared,
2189 queue_id,
2190 cleanup_threshold + 1,
2191 )
2192
2193 async def cleanup_queue_audio_data(self, queue_id: str) -> None:
2194 """
2195 Clean up all audio-related data for a queue when it is stopped or cleared.
2196
2197 This clears:
2198 - All audio buffers attached to queue item streamdetails
2199 - Any pending crossfade data for the queue
2200
2201 :param queue_id: The queue ID to clean up.
2202 """
2203 # Clear crossfade data for this queue
2204 if queue_id in self._crossfade_data:
2205 self.logger.debug("Clearing crossfade data for queue %s", queue_id)
2206 del self._crossfade_data[queue_id]
2207
2208 # Clear all audio buffers for queue items
2209 queue_items = self.mass.player_queues._queue_items.get(queue_id, [])
2210 buffers_cleared = 0
2211
2212 for item in queue_items:
2213 if item.streamdetails and item.streamdetails.buffer:
2214 await item.streamdetails.buffer.clear()
2215 item.streamdetails.buffer = None
2216 buffers_cleared += 1
2217
2218 if buffers_cleared > 0:
2219 self.logger.debug(
2220 "Cleared %d audio buffer(s) for stopped/cleared queue %s",
2221 buffers_cleared,
2222 queue_id,
2223 )
2224