/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_HTTP_PROFILE,
49 CONF_OUTPUT_CHANNELS,
50 CONF_OUTPUT_CODEC,
51 CONF_PUBLISH_IP,
52 CONF_SAMPLE_RATES,
53 CONF_SMART_FADES_MODE,
54 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56 CONF_VOLUME_NORMALIZATION_RADIO,
57 CONF_VOLUME_NORMALIZATION_TRACKS,
58 DEFAULT_STREAM_HEADERS,
59 ICY_HEADERS,
60 INTERNAL_PCM_FORMAT,
61 SILENCE_FILE,
62 VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.helpers import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70 get_buffered_media_stream,
71 get_chunksize,
72 get_media_stream,
73 get_player_filter_params,
74 get_stream_details,
75 resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81 divide_chunks,
82 format_ip_for_url,
83 get_ip_addresses,
84 get_total_system_memory,
85)
86from music_assistant.helpers.webserver import Webserver
87from music_assistant.models.core_controller import CoreController
88from music_assistant.models.music_provider import MusicProvider
89from music_assistant.models.plugin import PluginProvider, PluginSource
90from music_assistant.models.smart_fades import SmartFadesMode
91from music_assistant.providers.universal_group.constants import UGP_PREFIX
92from music_assistant.providers.universal_group.player import UniversalGroupPlayer
93
94if TYPE_CHECKING:
95 from music_assistant_models.config_entries import CoreConfig
96 from music_assistant_models.player import PlayerMedia
97 from music_assistant_models.player_queue import PlayerQueue
98 from music_assistant_models.queue_item import QueueItem
99 from music_assistant_models.streamdetails import StreamDetails
100
101 from music_assistant.mass import MusicAssistant
102 from music_assistant.models.player import Player
103
104
105isfile = wrap(os.path.isfile)
106
107CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
108CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
109CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
110
111# Calculate total system memory once at module load time
112TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
113CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
114DEFAULT_PORT: Final[int] = 8097
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses(include_ipv6=True)
189 return (
190 ConfigEntry(
191 key=CONF_ALLOW_BUFFER,
192 type=ConfigEntryType.BOOLEAN,
193 default_value=CONF_ALLOW_BUFFER_DEFAULT,
194 label="Allow (in-memory) buffering of (track) audio",
195 description="By default, Music Assistant tries to be as resource "
196 "efficient as possible when streaming audio, especially considering "
197 "low-end devices such as Raspberry Pi's. This means that audio "
198 "buffering is disabled by default to reduce memory usage. \n\n"
199 "Enabling this option allows for in-memory buffering of audio, "
200 "which (massively) improves playback (and seeking) performance but it comes "
201 "at the cost of increased memory usage. "
202 "If you run Music Assistant on a capable device with enough memory, "
203 "enabling this option is strongly recommended.",
204 required=False,
205 category="playback",
206 ),
207 ConfigEntry(
208 key=CONF_VOLUME_NORMALIZATION_RADIO,
209 type=ConfigEntryType.STRING,
210 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
211 label="Volume normalization method for radio streams",
212 options=[
213 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
214 for x in VolumeNormalizationMode
215 ],
216 category="playback",
217 ),
218 ConfigEntry(
219 key=CONF_VOLUME_NORMALIZATION_TRACKS,
220 type=ConfigEntryType.STRING,
221 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
222 label="Volume normalization method for tracks",
223 options=[
224 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
225 for x in VolumeNormalizationMode
226 ],
227 category="playback",
228 ),
229 ConfigEntry(
230 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
231 type=ConfigEntryType.FLOAT,
232 range=(-20, 10),
233 default_value=-6,
234 label="Fixed/fallback gain adjustment for radio streams",
235 category="playback",
236 ),
237 ConfigEntry(
238 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
239 type=ConfigEntryType.FLOAT,
240 range=(-20, 10),
241 default_value=-6,
242 label="Fixed/fallback gain adjustment for tracks",
243 category="playback",
244 ),
245 ConfigEntry(
246 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
247 type=ConfigEntryType.BOOLEAN,
248 default_value=False,
249 label="Allow crossfade between tracks from the same album",
250 description="Enabling this option allows for crossfading between tracks "
251 "that are part of the same album.",
252 category="playback",
253 ),
254 ConfigEntry(
255 key=CONF_PUBLISH_IP,
256 type=ConfigEntryType.STRING,
257 default_value=ip_addresses[0],
258 label="Published IP address",
259 description="This IP address is communicated to players where to find this server."
260 "\nMake sure that this IP can be reached by players on the local network, "
261 "otherwise audio streaming will not work.",
262 required=False,
263 category="generic",
264 advanced=True,
265 requires_reload=True,
266 ),
267 ConfigEntry(
268 key=CONF_BIND_PORT,
269 type=ConfigEntryType.INTEGER,
270 default_value=DEFAULT_PORT,
271 label="TCP Port",
272 description="The TCP port to run the server. "
273 "Make sure that this server can be reached "
274 "on the given IP and TCP port by players on the local network.",
275 category="generic",
276 advanced=True,
277 requires_reload=True,
278 ),
279 ConfigEntry(
280 key=CONF_BIND_IP,
281 type=ConfigEntryType.STRING,
282 default_value="0.0.0.0",
283 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", "::", *ip_addresses}],
284 label="Bind to IP/interface",
285 description="Start the stream server on this specific interface. \n"
286 "Use 0.0.0.0 or :: to bind to all interfaces, which is the default. \n"
287 "This is an advanced setting that should normally "
288 "not be adjusted in regular setups.",
289 category="generic",
290 advanced=True,
291 required=False,
292 requires_reload=True,
293 ),
294 ConfigEntry(
295 key=CONF_SMART_FADES_LOG_LEVEL,
296 type=ConfigEntryType.STRING,
297 label="Smart Fades Log level",
298 description="Log level for the Smart Fades mixer and analyzer.",
299 options=CONF_ENTRY_LOG_LEVEL.options,
300 default_value="GLOBAL",
301 category="generic",
302 advanced=True,
303 ),
304 )
305
306 async def setup(self, config: CoreConfig) -> None:
307 """Async initialize of module."""
308 # copy log level to audio/ffmpeg loggers
309 AUDIO_LOGGER.setLevel(self.logger.level)
310 FFMPEG_LOGGER.setLevel(self.logger.level)
311 self._setup_smart_fades_logger(config)
312 # perform check for ffmpeg version
313 await check_ffmpeg_version()
314 # start the webserver
315 self.publish_port = config.get_value(CONF_BIND_PORT, DEFAULT_PORT)
316 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
317 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
318 # print a big fat message in the log where the streamserver is running
319 # because this is a common source of issues for people with more complex setups
320 self.logger.log(
321 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
322 "\n\n################################################################################\n"
323 "Starting streamserver on %s:%s\n"
324 "This is the IP address that is communicated to players.\n"
325 "If this is incorrect, audio will not play!\n"
326 "See the documentation how to configure the publish IP for the Streamserver\n"
327 "in Settings --> Core modules --> Streamserver\n"
328 "################################################################################\n",
329 self.publish_ip,
330 self.publish_port,
331 )
332 await self._server.setup(
333 bind_ip=bind_ip,
334 bind_port=cast("int", self.publish_port),
335 base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
336 static_routes=[
337 (
338 "*",
339 "/flow/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
340 self.serve_queue_flow_stream,
341 ),
342 (
343 "*",
344 "/single/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
345 self.serve_queue_item_stream,
346 ),
347 (
348 "*",
349 "/command/{queue_id}/{command}.mp3",
350 self.serve_command_request,
351 ),
352 (
353 "*",
354 "/announcement/{player_id}.{fmt}",
355 self.serve_announcement_stream,
356 ),
357 (
358 "*",
359 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
360 self.serve_plugin_source_stream,
361 ),
362 ],
363 )
364 # Start periodic garbage collection task
365 # This ensures memory from audio buffers and streams is cleaned up regularly
366 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
367
368 async def close(self) -> None:
369 """Cleanup on exit."""
370 await self._server.close()
371
372 async def resolve_stream_url(
373 self,
374 player_id: str,
375 media: PlayerMedia,
376 ) -> str:
377 """Resolve the stream URL for the given PlayerMedia."""
378 conf_output_codec = await self.mass.config.get_player_config_value(
379 player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
380 )
381 output_codec = ContentType.try_parse(conf_output_codec or "flac")
382 fmt = output_codec.value
383 # handle raw pcm without exact format specifiers
384 if output_codec.is_pcm() and ";" not in fmt:
385 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
386 extra_data = media.custom_data or {}
387 flow_mode = extra_data.get("flow_mode", False)
388 session_id = extra_data.get("session_id")
389 queue_item_id = media.queue_item_id
390 if not session_id or not queue_item_id:
391 raise InvalidDataError("Can not resolve stream URL: Invalid PlayerMedia data")
392 queue_id = media.source_id
393 base_path = "flow" if flow_mode else "single"
394 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}" # noqa: E501
395
396 async def get_plugin_source_url(
397 self,
398 plugin_source: PluginSource,
399 player_id: str,
400 ) -> str:
401 """Get the url for the Plugin Source stream/proxy."""
402 if plugin_source.audio_format.content_type.is_pcm():
403 fmt = ContentType.WAV.value
404 else:
405 fmt = plugin_source.audio_format.content_type.value
406 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
407
408 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
409 """Stream single queueitem audio to a player."""
410 self._log_request(request)
411 queue_id = request.match_info["queue_id"]
412 player_id = request.match_info["player_id"]
413 if not (queue := self.mass.player_queues.get(queue_id)):
414 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
415 session_id = request.match_info["session_id"]
416 if queue.session_id and session_id != queue.session_id:
417 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
418 if not (player := self.mass.players.get_player(player_id)):
419 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
420 queue_item_id = request.match_info["queue_item_id"]
421 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
422 if not queue_item:
423 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
424 if not queue_item.streamdetails:
425 try:
426 queue_item.streamdetails = await get_stream_details(
427 mass=self.mass, queue_item=queue_item
428 )
429 except Exception as e:
430 self.logger.error(
431 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
432 )
433 queue_item.available = False
434 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
435
436 # pick output format based on the streamdetails and player capabilities
437 pcm_format = await self._select_pcm_format(
438 player=player,
439 streamdetails=queue_item.streamdetails,
440 smartfades_enabled=True,
441 )
442 output_format = await self.get_output_format(
443 output_format_str=request.match_info["fmt"],
444 player=player,
445 content_sample_rate=pcm_format.sample_rate,
446 content_bit_depth=pcm_format.bit_depth,
447 )
448
449 # prepare request, add some DLNA/UPNP compatible headers
450 # icy-name is sanitized to avoid a "Potential header injection attack" exception by aiohttp
451 # see https://github.com/music-assistant/support/issues/4913
452 headers = {
453 **DEFAULT_STREAM_HEADERS,
454 "icy-name": queue_item.name.replace("\n", " ").replace("\r", " ").replace("\t", " "),
455 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
456 "Accept-Ranges": "none",
457 "Content-Type": f"audio/{output_format.output_format_str}",
458 }
459 resp = web.StreamResponse(
460 status=200,
461 reason="OK",
462 headers=headers,
463 )
464 resp.content_type = f"audio/{output_format.output_format_str}"
465 http_profile = await self.mass.config.get_player_config_value(
466 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
467 )
468 if http_profile == "forced_content_length" and not queue_item.duration:
469 # just set an insane high content length to make sure the player keeps playing
470 resp.content_length = get_chunksize(output_format, 12 * 3600)
471 elif http_profile == "forced_content_length" and queue_item.duration:
472 # guess content length based on duration
473 resp.content_length = get_chunksize(output_format, queue_item.duration)
474 elif http_profile == "chunked":
475 resp.enable_chunked_encoding()
476
477 await resp.prepare(request)
478
479 # return early if this is not a GET request
480 if request.method != "GET":
481 return resp
482
483 if queue_item.media_type != MediaType.TRACK:
484 # no crossfade on non-tracks
485 smart_fades_mode = SmartFadesMode.DISABLED
486 else:
487 smart_fades_mode = await self.mass.config.get_player_config_value(
488 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
489 )
490 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
491 queue.queue_id, CONF_CROSSFADE_DURATION, 10
492 )
493 if (
494 smart_fades_mode != SmartFadesMode.DISABLED
495 and PlayerFeature.GAPLESS_PLAYBACK not in player.state.supported_features
496 ):
497 # crossfade is not supported on this player due to missing gapless playback
498 self.logger.warning(
499 "Crossfade disabled: Player %s does not support gapless playback, "
500 "consider enabling flow mode to enable crossfade on this player.",
501 player.state.name if player else "Unknown Player",
502 )
503 smart_fades_mode = SmartFadesMode.DISABLED
504
505 if smart_fades_mode != SmartFadesMode.DISABLED:
506 # crossfade is enabled, use special crossfaded single item stream
507 # where the crossfade of the next track is present in the stream of
508 # a single track. This only works if the player supports gapless playback!
509 audio_input = self.get_queue_item_stream_with_smartfade(
510 queue_item=queue_item,
511 pcm_format=pcm_format,
512 smart_fades_mode=smart_fades_mode,
513 standard_crossfade_duration=standard_crossfade_duration,
514 )
515 else:
516 # no crossfade, just a regular single item stream
517 audio_input = self.get_queue_item_stream(
518 queue_item=queue_item,
519 pcm_format=pcm_format,
520 seek_position=queue_item.streamdetails.seek_position,
521 playback_speed=cast(
522 "float", queue_item.extra_attributes.get("playback_speed", 1.0)
523 ),
524 )
525 # stream the audio
526 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
527 # the desired output format for the player including any player specific filter params
528 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
529 first_chunk_received = False
530 bytes_sent = 0
531 async for chunk in get_ffmpeg_stream(
532 audio_input=audio_input,
533 input_format=pcm_format,
534 output_format=output_format,
535 filter_params=get_player_filter_params(
536 self.mass,
537 player_id=player.player_id,
538 input_format=pcm_format,
539 output_format=output_format,
540 ),
541 ):
542 try:
543 await resp.write(chunk)
544 bytes_sent += len(chunk)
545 if not first_chunk_received:
546 first_chunk_received = True
547 # inform the queue that the track is now loaded in the buffer
548 # so for example the next track can be enqueued
549 self.mass.player_queues.track_loaded_in_buffer(
550 queue_item.queue_id, queue_item.queue_item_id
551 )
552 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
553 if first_chunk_received and not player.stop_called:
554 # Player disconnected (unexpected) after receiving at least some data
555 # This could indicate buffering issues, network problems,
556 # or player-specific issues
557 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
558 self.logger.warning(
559 "Player %s disconnected prematurely from stream for %s (%s) - "
560 "error: %s, sent %d bytes, expected (approx) bytes=%d",
561 queue.display_name,
562 queue_item.name,
563 queue_item.uri,
564 err.__class__.__name__,
565 bytes_sent,
566 bytes_expected,
567 )
568 break
569 if queue_item.streamdetails.stream_error:
570 self.logger.error(
571 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
572 queue_item.name,
573 queue_item.uri,
574 queue.display_name,
575 )
576 # try to skip to the next item in the queue after a short delay
577 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
578 return resp
579
580 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
581 """Stream Queue Flow audio to player."""
582 self._log_request(request)
583 queue_id = request.match_info["queue_id"]
584 player_id = request.match_info["player_id"]
585 if not (queue := self.mass.player_queues.get(queue_id)):
586 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
587 if not (player := self.mass.players.get_player(player_id)):
588 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
589 start_queue_item_id = request.match_info["queue_item_id"]
590 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
591 if not start_queue_item:
592 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
593
594 queue.flow_mode_stream_log = []
595
596 # select the highest possible PCM settings for this player
597 flow_pcm_format = await self._select_flow_format(player)
598
599 # work out output format/details
600 output_format = await self.get_output_format(
601 output_format_str=request.match_info["fmt"],
602 player=player,
603 content_sample_rate=flow_pcm_format.sample_rate,
604 content_bit_depth=flow_pcm_format.bit_depth,
605 )
606 # work out ICY metadata support
607 icy_preference = self.mass.config.get_raw_player_config_value(
608 player_id,
609 CONF_ENTRY_ENABLE_ICY_METADATA.key,
610 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
611 )
612 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
613 icy_meta_interval = 256000 if icy_preference == "full" else 16384
614
615 # prepare request, add some DLNA/UPNP compatible headers
616 headers = {
617 **DEFAULT_STREAM_HEADERS,
618 **ICY_HEADERS,
619 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
620 "Accept-Ranges": "none",
621 "Content-Type": f"audio/{output_format.output_format_str}",
622 }
623 if enable_icy:
624 headers["icy-metaint"] = str(icy_meta_interval)
625
626 resp = web.StreamResponse(
627 status=200,
628 reason="OK",
629 headers=headers,
630 )
631 http_profile = await self.mass.config.get_player_config_value(
632 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
633 )
634 if http_profile == "forced_content_length":
635 # just set an insane high content length to make sure the player keeps playing
636 resp.content_length = get_chunksize(output_format, 12 * 3600)
637 elif http_profile == "chunked":
638 resp.enable_chunked_encoding()
639
640 await resp.prepare(request)
641
642 # return early if this is not a GET request
643 if request.method != "GET":
644 return resp
645
646 # all checks passed, start streaming!
647 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
648 # the desired output format for the player including any player specific filter params
649 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
650 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
651
652 async for chunk in get_ffmpeg_stream(
653 audio_input=self.get_queue_flow_stream(
654 queue=queue,
655 start_queue_item=start_queue_item,
656 pcm_format=flow_pcm_format,
657 ),
658 input_format=flow_pcm_format,
659 output_format=output_format,
660 filter_params=get_player_filter_params(
661 self.mass, player.player_id, flow_pcm_format, output_format
662 ),
663 # we need to slowly feed the music to avoid the player stopping and later
664 # restarting (or completely failing) the audio stream by keeping the buffer short.
665 # this is reported to be an issue especially with Chromecast players.
666 # see for example: https://github.com/music-assistant/support/issues/3717
667 # allow buffer ahead of 6 seconds and read rest in realtime
668 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
669 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
670 ):
671 try:
672 await resp.write(chunk)
673 except (BrokenPipeError, ConnectionResetError, ConnectionError):
674 # race condition
675 break
676
677 if not enable_icy:
678 continue
679
680 # if icy metadata is enabled, send the icy metadata after the chunk
681 if (
682 # use current item here and not buffered item, otherwise
683 # the icy metadata will be too much ahead
684 (current_item := queue.current_item)
685 and current_item.streamdetails
686 and current_item.streamdetails.stream_title
687 ):
688 title = current_item.streamdetails.stream_title
689 elif queue and current_item and current_item.name:
690 title = current_item.name
691 else:
692 title = "Music Assistant"
693 metadata = f"StreamTitle='{title}';".encode()
694 if icy_preference == "full" and current_item and current_item.image:
695 metadata += f"StreamURL='{current_item.image.path}'".encode()
696 while len(metadata) % 16 != 0:
697 metadata += b"\x00"
698 length = len(metadata)
699 length_b = chr(int(length / 16)).encode()
700 await resp.write(length_b + metadata)
701
702 return resp
703
704 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
705 """Handle special 'command' request for a player."""
706 self._log_request(request)
707 queue_id = request.match_info["queue_id"]
708 command = request.match_info["command"]
709 if command == "next":
710 self.mass.create_task(self.mass.player_queues.next(queue_id))
711 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
712
713 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
714 """Stream announcement audio to a player."""
715 self._log_request(request)
716 player_id = request.match_info["player_id"]
717 player = self.mass.player_queues.get(player_id)
718 if not player:
719 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
720 if not (announce_data := self.announcements.get(player_id)):
721 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
722
723 # work out output format/details
724 fmt = request.match_info["fmt"]
725 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
726
727 http_profile = await self.mass.config.get_player_config_value(
728 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
729 )
730 if http_profile == "forced_content_length":
731 # given the fact that an announcement is just a short audio clip,
732 # just send it over completely at once so we have a fixed content length
733 data = b""
734 async for chunk in self.get_announcement_stream(
735 announcement_url=announce_data["announcement_url"],
736 output_format=audio_format,
737 pre_announce=announce_data["pre_announce"],
738 pre_announce_url=announce_data["pre_announce_url"],
739 ):
740 data += chunk
741 return web.Response(
742 body=data,
743 content_type=f"audio/{audio_format.output_format_str}",
744 headers=DEFAULT_STREAM_HEADERS,
745 )
746
747 resp = web.StreamResponse(
748 status=200,
749 reason="OK",
750 headers=DEFAULT_STREAM_HEADERS,
751 )
752 resp.content_type = f"audio/{audio_format.output_format_str}"
753 if http_profile == "chunked":
754 resp.enable_chunked_encoding()
755
756 await resp.prepare(request)
757
758 # return early if this is not a GET request
759 if request.method != "GET":
760 return resp
761
762 # all checks passed, start streaming!
763 self.logger.debug(
764 "Start serving audio stream for Announcement %s to %s",
765 announce_data["announcement_url"],
766 player.state.name,
767 )
768 async for chunk in self.get_announcement_stream(
769 announcement_url=announce_data["announcement_url"],
770 output_format=audio_format,
771 pre_announce=announce_data["pre_announce"],
772 pre_announce_url=announce_data["pre_announce_url"],
773 ):
774 try:
775 await resp.write(chunk)
776 except (BrokenPipeError, ConnectionResetError):
777 break
778
779 self.logger.debug(
780 "Finished serving audio stream for Announcement %s to %s",
781 announce_data["announcement_url"],
782 player.state.name,
783 )
784
785 return resp
786
787 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
788 """Stream PluginSource audio to a player."""
789 self._log_request(request)
790 plugin_source_id = request.match_info["plugin_source"]
791 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
792 if not provider:
793 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
794 # work out output format/details
795 player_id = request.match_info["player_id"]
796 player = self.mass.players.get_player(player_id)
797 if not player:
798 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
799 plugin_source = provider.get_source()
800 output_format = await self.get_output_format(
801 output_format_str=request.match_info["fmt"],
802 player=player,
803 content_sample_rate=plugin_source.audio_format.sample_rate,
804 content_bit_depth=plugin_source.audio_format.bit_depth,
805 )
806 headers = {
807 **DEFAULT_STREAM_HEADERS,
808 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
809 "icy-name": plugin_source.name,
810 "Accept-Ranges": "none",
811 "Content-Type": f"audio/{output_format.output_format_str}",
812 }
813
814 resp = web.StreamResponse(
815 status=200,
816 reason="OK",
817 headers=headers,
818 )
819 resp.content_type = f"audio/{output_format.output_format_str}"
820 http_profile = await self.mass.config.get_player_config_value(
821 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
822 )
823 if http_profile == "forced_content_length":
824 # just set an insanely high content length to make sure the player keeps playing
825 resp.content_length = get_chunksize(output_format, 12 * 3600)
826 elif http_profile == "chunked":
827 resp.enable_chunked_encoding()
828
829 await resp.prepare(request)
830
831 # return early if this is not a GET request
832 if request.method != "GET":
833 return resp
834
835 # all checks passed, start streaming!
836 if not plugin_source.audio_format:
837 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
838 async for chunk in self.get_plugin_source_stream(
839 plugin_source_id=plugin_source_id,
840 output_format=output_format,
841 player_id=player_id,
842 player_filter_params=get_player_filter_params(
843 self.mass, player_id, plugin_source.audio_format, output_format
844 ),
845 ):
846 try:
847 await resp.write(chunk)
848 except (BrokenPipeError, ConnectionResetError, ConnectionError):
849 break
850 return resp
851
852 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
853 """Get the url for the special command stream."""
854 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
855
856 def get_announcement_url(
857 self,
858 player_id: str,
859 announce_data: AnnounceData,
860 content_type: ContentType = ContentType.MP3,
861 ) -> str:
862 """Get the url for the special announcement stream."""
863 self.announcements[player_id] = announce_data
864 # use stream server to host announcement on local network
865 # this ensures playback on all players, including ones that do not
866 # like https hosts and it also offers the pre-announce 'bell'
867 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
868
869 def get_stream(
870 self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
871 ) -> AsyncGenerator[bytes, None]:
872 """
873 Get a stream of the given media as raw PCM audio.
874
875 This is used as helper for player providers that can consume the raw PCM
876 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
877 """
878 # select audio source
879 if media.media_type == MediaType.ANNOUNCEMENT:
880 # special case: stream announcement
881 assert media.custom_data
882 audio_source = self.get_announcement_stream(
883 media.custom_data["announcement_url"],
884 output_format=pcm_format,
885 pre_announce=media.custom_data["pre_announce"],
886 pre_announce_url=media.custom_data["pre_announce_url"],
887 )
888 elif media.media_type == MediaType.PLUGIN_SOURCE:
889 # special case: plugin source stream
890 assert media.custom_data
891 audio_source = self.get_plugin_source_stream(
892 plugin_source_id=media.custom_data["source_id"],
893 output_format=pcm_format,
894 # need to pass player_id from the PlayerMedia object
895 # because this could have been a group
896 player_id=media.custom_data["player_id"],
897 )
898 elif (
899 media.media_type == MediaType.FLOW_STREAM
900 and media.source_id
901 and media.source_id.startswith(UGP_PREFIX)
902 and media.uri
903 and "/ugp/" in media.uri
904 ):
905 # special case: member player accessing UGP stream
906 # Check URI to distinguish from the UGP accessing its own stream
907 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get_player(media.source_id))
908 ugp_stream = ugp_player.stream
909 assert ugp_stream is not None # for type checker
910 if ugp_stream.base_pcm_format == pcm_format:
911 # no conversion needed
912 audio_source = ugp_stream.subscribe_raw()
913 else:
914 audio_source = ugp_stream.get_stream(output_format=pcm_format)
915 elif (
916 media.source_id
917 and media.queue_item_id
918 and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
919 ):
920 # regular queue (flow) stream request
921 queue = self.mass.player_queues.get(media.source_id)
922 assert queue
923 start_queue_item = self.mass.player_queues.get_item(
924 media.source_id, media.queue_item_id
925 )
926 assert start_queue_item
927 audio_source = self.mass.streams.get_queue_flow_stream(
928 queue=queue,
929 start_queue_item=start_queue_item,
930 pcm_format=pcm_format,
931 )
932 elif media.source_id and media.queue_item_id:
933 # single item stream (e.g. radio)
934 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
935 assert queue_item
936 audio_source = buffered(
937 self.get_queue_item_stream(
938 queue_item=queue_item,
939 pcm_format=pcm_format,
940 playback_speed=cast(
941 "float", queue_item.extra_attributes.get("playback_speed", 1.0)
942 ),
943 ),
944 buffer_size=10,
945 min_buffer_before_yield=2,
946 )
947 else:
948 # assume url or some other direct path
949 # NOTE: this will fail if its an uri not playable by ffmpeg
950 audio_source = get_ffmpeg_stream(
951 audio_input=media.uri,
952 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
953 output_format=pcm_format,
954 )
955 return audio_source
956
957 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
958 async def get_queue_flow_stream(
959 self,
960 queue: PlayerQueue,
961 start_queue_item: QueueItem,
962 pcm_format: AudioFormat,
963 ) -> AsyncGenerator[bytes, None]:
964 """
965 Get a flow stream of all tracks in the queue as raw PCM audio.
966
967 yields chunks of exactly 1 second of audio in the given pcm_format.
968 """
969 # ruff: noqa: PLR0915
970 assert pcm_format.content_type.is_pcm()
971 queue_track = None
972 last_fadeout_part: bytes = b""
973 last_streamdetails: StreamDetails | None = None
974 last_play_log_entry: PlayLogEntry | None = None
975 queue.flow_mode = True
976 if not start_queue_item:
977 # this can happen in some (edge case) race conditions
978 return
979 pcm_sample_size = pcm_format.pcm_sample_size
980 if start_queue_item.media_type != MediaType.TRACK:
981 # no crossfade on non-tracks
982 smart_fades_mode = SmartFadesMode.DISABLED
983 standard_crossfade_duration = 0
984 else:
985 smart_fades_mode = await self.mass.config.get_player_config_value(
986 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
987 )
988 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
989 queue.queue_id, CONF_CROSSFADE_DURATION, 10
990 )
991 self.logger.info(
992 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
993 queue.display_name,
994 smart_fades_mode,
995 f"({standard_crossfade_duration}s)"
996 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
997 else "",
998 )
999 total_bytes_sent = 0
1000 total_chunks_received = 0
1001
1002 while True:
1003 # get (next) queue item to stream
1004 if queue_track is None:
1005 queue_track = start_queue_item
1006 else:
1007 try:
1008 queue_track = await self.mass.player_queues.load_next_queue_item(
1009 queue.queue_id, queue_track.queue_item_id
1010 )
1011 except QueueEmpty:
1012 break
1013
1014 if queue_track.streamdetails is None:
1015 self.logger.error(
1016 "No StreamDetails for queue item %s (%s) on queue %s - skipping track",
1017 queue_track.queue_item_id,
1018 queue_track.name,
1019 queue.display_name,
1020 )
1021 continue
1022
1023 self.logger.debug(
1024 "Start Streaming queue track: %s (%s) for queue %s",
1025 queue_track.streamdetails.uri,
1026 queue_track.name,
1027 queue.display_name,
1028 )
1029 # append to play log so the queue controller can work out which track is playing
1030 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1031 queue.flow_mode_stream_log.append(play_log_entry)
1032 # calculate crossfade buffer size
1033 crossfade_buffer_duration = (
1034 SMART_CROSSFADE_DURATION
1035 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1036 else standard_crossfade_duration
1037 )
1038 crossfade_buffer_duration = min(
1039 crossfade_buffer_duration,
1040 int(queue_track.streamdetails.duration / 2)
1041 if queue_track.streamdetails.duration
1042 else crossfade_buffer_duration,
1043 )
1044 # Ensure crossfade buffer size is aligned to frame boundaries
1045 # Frame size = bytes_per_sample * channels
1046 bytes_per_sample = pcm_format.bit_depth // 8
1047 frame_size = bytes_per_sample * pcm_format.channels
1048 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1049 # Round down to nearest frame boundary
1050 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1051
1052 bytes_written = 0
1053 buffer = b""
1054 # handle incoming audio chunks
1055 first_chunk_received = False
1056 # buffer size needs to be big enough to include the crossfade part
1057
1058 async for chunk in self.get_queue_item_stream(
1059 queue_track,
1060 pcm_format=pcm_format,
1061 seek_position=queue_track.streamdetails.seek_position,
1062 playback_speed=cast(
1063 "float", queue_track.extra_attributes.get("playback_speed", 1.0)
1064 ),
1065 raise_on_error=False,
1066 ):
1067 total_chunks_received += 1
1068 if not first_chunk_received:
1069 first_chunk_received = True
1070 # inform the queue that the track is now loaded in the buffer
1071 # so the next track can be preloaded
1072 self.mass.player_queues.track_loaded_in_buffer(
1073 queue.queue_id, queue_track.queue_item_id
1074 )
1075 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1076 # we want a stream to start as quickly as possible
1077 # so for the first 10 chunks we keep a very short buffer
1078 req_buffer_size = pcm_format.pcm_sample_size
1079 else:
1080 req_buffer_size = (
1081 pcm_sample_size
1082 if smart_fades_mode == SmartFadesMode.DISABLED
1083 else crossfade_buffer_size
1084 )
1085
1086 # ALWAYS APPEND CHUNK TO BUFFER
1087 buffer += chunk
1088 del chunk
1089 if len(buffer) < req_buffer_size:
1090 # buffer is not full enough, move on
1091 # yield control to event loop with 10ms delay
1092 await asyncio.sleep(0.01)
1093 continue
1094
1095 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1096 if last_fadeout_part and last_streamdetails:
1097 # perform crossfade
1098 fadein_part = buffer[:crossfade_buffer_size]
1099 remaining_bytes = buffer[crossfade_buffer_size:]
1100 # Use the mixer to handle all crossfade logic
1101 try:
1102 crossfade_part = await self._smart_fades_mixer.mix(
1103 fade_in_part=fadein_part,
1104 fade_out_part=last_fadeout_part,
1105 fade_in_streamdetails=queue_track.streamdetails,
1106 fade_out_streamdetails=last_streamdetails,
1107 pcm_format=pcm_format,
1108 standard_crossfade_duration=standard_crossfade_duration,
1109 mode=smart_fades_mode,
1110 )
1111 except Exception as mix_err:
1112 self.logger.warning(
1113 "Crossfade mixer failed for %s, falling back to simple concat: %s",
1114 queue_track.name,
1115 mix_err,
1116 )
1117 # Fallback: just output the fadeout part then the buffer
1118 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1119 yield _chunk
1120 bytes_written += len(_chunk)
1121 del _chunk
1122 crossfade_part = b""
1123 remaining_bytes = buffer
1124 if crossfade_part:
1125 # because the crossfade exists of both the fadein and fadeout part
1126 # we need to correct the bytes_written accordingly so the duration
1127 # calculations at the end of the track are correct
1128 crossfade_part_len = len(crossfade_part)
1129 bytes_written += int(crossfade_part_len / 2)
1130 if last_play_log_entry:
1131 assert last_play_log_entry.seconds_streamed is not None
1132 last_play_log_entry.seconds_streamed += (
1133 crossfade_part_len / 2 / pcm_sample_size
1134 )
1135 # yield crossfade_part (in pcm_sample_size chunks)
1136 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1137 yield _chunk
1138 del _chunk
1139 del crossfade_part
1140 # also write the leftover bytes from the crossfade action
1141 if remaining_bytes:
1142 yield remaining_bytes
1143 bytes_written += len(remaining_bytes)
1144 del remaining_bytes
1145 # clear vars
1146 last_fadeout_part = b""
1147 last_streamdetails = None
1148 buffer = b""
1149
1150 #### OTHER: enough data in buffer, feed to output
1151 while len(buffer) > req_buffer_size:
1152 yield buffer[:pcm_sample_size]
1153 bytes_written += pcm_sample_size
1154 buffer = buffer[pcm_sample_size:]
1155
1156 #### HANDLE END OF TRACK
1157 if not first_chunk_received:
1158 # Track failed to stream - no chunks received at all
1159 self.logger.warning(
1160 "Track %s (%s) on queue %s produced no audio data - skipping",
1161 queue_track.name,
1162 queue_track.streamdetails.uri if queue_track.streamdetails else "unknown",
1163 queue.display_name,
1164 )
1165 # Clean up and continue to next track
1166 queue_track.streamdetails.stream_error = True
1167 del buffer
1168 continue
1169 if last_fadeout_part:
1170 # edge case: we did not get enough data to make the crossfade
1171 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1172 yield _chunk
1173 del _chunk
1174 bytes_written += len(last_fadeout_part)
1175 last_fadeout_part = b""
1176 crossfade_allowed = self._crossfade_allowed(
1177 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1178 )
1179 if crossfade_allowed:
1180 # if crossfade is enabled, save fadeout part to pickup for next track
1181 last_fadeout_part = buffer[-crossfade_buffer_size:]
1182 last_streamdetails = queue_track.streamdetails
1183 last_play_log_entry = play_log_entry
1184 remaining_bytes = buffer[:-crossfade_buffer_size]
1185 if remaining_bytes:
1186 yield remaining_bytes
1187 bytes_written += len(remaining_bytes)
1188 del remaining_bytes
1189 elif smart_fades_mode != SmartFadesMode.DISABLED:
1190 self.logger.debug(
1191 "Flow mode: crossfade NOT allowed for track %s on queue %s"
1192 " - fadeout part not saved",
1193 queue_track.name,
1194 queue.display_name,
1195 )
1196 if not crossfade_allowed and buffer:
1197 # no crossfade enabled, just yield the buffer last part
1198 bytes_written += len(buffer)
1199 for _chunk in divide_chunks(buffer, pcm_sample_size):
1200 yield _chunk
1201 del _chunk
1202 # make sure the buffer gets cleaned up
1203 del buffer
1204
1205 # update duration details based on the actual pcm data we sent
1206 # this also accounts for crossfade and silence stripping
1207 seconds_streamed = bytes_written / pcm_sample_size
1208 queue_track.streamdetails.seconds_streamed = seconds_streamed
1209 queue_track.streamdetails.duration = int(
1210 queue_track.streamdetails.seek_position + seconds_streamed
1211 )
1212 play_log_entry.seconds_streamed = seconds_streamed
1213 play_log_entry.duration = queue_track.streamdetails.duration
1214 total_bytes_sent += bytes_written
1215 self.logger.debug(
1216 "Finished Streaming queue track: %s (%s) on queue %s",
1217 queue_track.streamdetails.uri,
1218 queue_track.name,
1219 queue.display_name,
1220 )
1221 #### HANDLE END OF QUEUE FLOW STREAM
1222 # end of queue flow: make sure we yield the last_fadeout_part
1223 if last_fadeout_part:
1224 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1225 yield _chunk
1226 del _chunk
1227 # correct seconds streamed/duration
1228 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1229 streamdetails = queue_track.streamdetails
1230 assert streamdetails is not None
1231 streamdetails.seconds_streamed = (
1232 streamdetails.seconds_streamed or 0
1233 ) + last_part_seconds
1234 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1235 last_fadeout_part = b""
1236 total_bytes_sent += bytes_written
1237 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1238
1239 async def get_announcement_stream(
1240 self,
1241 announcement_url: str,
1242 output_format: AudioFormat,
1243 pre_announce: bool | str = False,
1244 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1245 ) -> AsyncGenerator[bytes, None]:
1246 """Get the special announcement stream."""
1247 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1248 # we are doing announcement in PCM first to avoid multiple encodings
1249 # when mixing pre-announce and announcement
1250 # also we have to deal with some TTS sources being super slow in delivering audio
1251 # so we take an approach where we start fetching the announcement in the background
1252 # while we can already start playing the pre-announce sound (if any)
1253
1254 pcm_format = (
1255 output_format
1256 if output_format.content_type.is_pcm()
1257 else AudioFormat(
1258 sample_rate=output_format.sample_rate,
1259 content_type=ContentType.PCM_S16LE,
1260 bit_depth=16,
1261 channels=output_format.channels,
1262 )
1263 )
1264
1265 async def fetch_announcement() -> None:
1266 fmt = announcement_url.rsplit(".")[-1]
1267 async for chunk in get_ffmpeg_stream(
1268 audio_input=announcement_url,
1269 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1270 output_format=pcm_format,
1271 chunk_size=get_chunksize(pcm_format, 1),
1272 ):
1273 await announcement_data.put(chunk)
1274 await announcement_data.put(None) # signal end of stream
1275
1276 self.mass.create_task(fetch_announcement())
1277
1278 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1279 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1280 if pre_announce:
1281 async for chunk in get_ffmpeg_stream(
1282 audio_input=pre_announce_url,
1283 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1284 output_format=pcm_format,
1285 chunk_size=get_chunksize(pcm_format, 1),
1286 ):
1287 yield chunk
1288 # pad silence while we're waiting for the announcement to be ready
1289 while announcement_data.empty():
1290 yield b"\0" * int(
1291 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1292 )
1293 await asyncio.sleep(0.1)
1294 # stream announcement
1295 while True:
1296 announcement_chunk = await announcement_data.get()
1297 if announcement_chunk is None:
1298 break
1299 yield announcement_chunk
1300
1301 if output_format == pcm_format:
1302 # no need to re-encode, just yield the raw PCM stream
1303 async for chunk in _announcement_stream():
1304 yield chunk
1305 return
1306
1307 # stream final announcement in requested output format
1308 async for chunk in get_ffmpeg_stream(
1309 audio_input=_announcement_stream(),
1310 input_format=pcm_format,
1311 output_format=output_format,
1312 ):
1313 yield chunk
1314
1315 async def get_plugin_source_stream(
1316 self,
1317 plugin_source_id: str,
1318 output_format: AudioFormat,
1319 player_id: str,
1320 player_filter_params: list[str] | None = None,
1321 ) -> AsyncGenerator[bytes, None]:
1322 """Get the special plugin source stream."""
1323 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1324 if not plugin_prov:
1325 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1326
1327 plugin_source = plugin_prov.get_source()
1328 self.logger.debug(
1329 "Start streaming PluginSource %s to %s using output format %s",
1330 plugin_source_id,
1331 player_id,
1332 output_format,
1333 )
1334 # this should already be set by the player controller, but just to be sure
1335 plugin_source.in_use_by = player_id
1336
1337 try:
1338 async for chunk in get_ffmpeg_stream(
1339 audio_input=cast(
1340 "str | AsyncGenerator[bytes, None]",
1341 plugin_prov.get_audio_stream(player_id)
1342 if plugin_source.stream_type == StreamType.CUSTOM
1343 else plugin_source.path,
1344 ),
1345 input_format=plugin_source.audio_format,
1346 output_format=output_format,
1347 filter_params=player_filter_params,
1348 extra_input_args=["-y", "-re"],
1349 ):
1350 if plugin_source.in_use_by != player_id:
1351 # another player took over or the stream ended, stop streaming
1352 break
1353 yield chunk
1354 finally:
1355 self.logger.debug(
1356 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1357 )
1358 await asyncio.sleep(1) # prevent race conditions when selecting source
1359 if plugin_source.in_use_by == player_id:
1360 # release control
1361 plugin_source.in_use_by = None
1362
1363 async def get_queue_item_stream(
1364 self,
1365 queue_item: QueueItem,
1366 pcm_format: AudioFormat,
1367 seek_position: int = 0,
1368 playback_speed: float = 1.0,
1369 raise_on_error: bool = True,
1370 ) -> AsyncGenerator[bytes, None]:
1371 """Get the (PCM) audio stream for a single queue item."""
1372 # collect all arguments for ffmpeg
1373 streamdetails = queue_item.streamdetails
1374 assert streamdetails
1375 filter_params: list[str] = []
1376
1377 # handle volume normalization
1378 gain_correct: float | None = None
1379 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1380 # volume normalization using loudnorm filter (in dynamic mode)
1381 # which also collects the measurement on the fly during playback
1382 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1383 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1384 filter_rule += ":print_format=json"
1385 filter_params.append(filter_rule)
1386 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1387 # apply user defined fixed volume/gain correction
1388 config_key = (
1389 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1390 if streamdetails.media_type == MediaType.TRACK
1391 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1392 )
1393 gain_value = await self.mass.config.get_core_config_value(
1394 self.domain, config_key, default=0.0, return_type=float
1395 )
1396 gain_correct = round(gain_value, 2)
1397 filter_params.append(f"volume={gain_correct}dB")
1398 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1399 # volume normalization with known loudness measurement
1400 # apply volume/gain correction
1401 target_loudness = (
1402 float(streamdetails.target_loudness)
1403 if streamdetails.target_loudness is not None
1404 else 0.0
1405 )
1406 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1407 gain_correct = target_loudness - float(streamdetails.loudness_album)
1408 elif streamdetails.loudness is not None:
1409 gain_correct = target_loudness - float(streamdetails.loudness)
1410 else:
1411 gain_correct = 0.0
1412 gain_correct = round(gain_correct, 2)
1413 filter_params.append(f"volume={gain_correct}dB")
1414 streamdetails.volume_normalization_gain_correct = gain_correct
1415
1416 # handle playback speed
1417 if playback_speed != 1.0:
1418 filter_params.append(f"atempo={playback_speed}")
1419
1420 allow_buffer = bool(
1421 self.mass.config.get_raw_core_config_value(
1422 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1423 )
1424 and streamdetails.duration
1425 )
1426
1427 self.logger.debug(
1428 "Starting queue item stream for %s (%s)"
1429 " - using buffer: %s"
1430 " - using fade-in: %s"
1431 " - using volume normalization: %s",
1432 queue_item.name,
1433 streamdetails.uri,
1434 allow_buffer,
1435 streamdetails.fade_in,
1436 streamdetails.volume_normalization_mode,
1437 )
1438 if allow_buffer:
1439 media_stream_gen = get_buffered_media_stream(
1440 self.mass,
1441 streamdetails=streamdetails,
1442 pcm_format=pcm_format,
1443 seek_position=int(seek_position),
1444 filter_params=filter_params,
1445 )
1446 else:
1447 media_stream_gen = get_media_stream(
1448 self.mass,
1449 streamdetails=streamdetails,
1450 pcm_format=pcm_format,
1451 seek_position=int(seek_position),
1452 filter_params=filter_params,
1453 )
1454
1455 first_chunk_received = False
1456 fade_in_buffer = b""
1457 bytes_received = 0
1458 finished = False
1459 stream_started_at = asyncio.get_event_loop().time()
1460 try:
1461 async for chunk in media_stream_gen:
1462 bytes_received += len(chunk)
1463 if not first_chunk_received:
1464 first_chunk_received = True
1465 self.logger.debug(
1466 "First audio chunk received for %s (%s) after %.2f seconds",
1467 queue_item.name,
1468 streamdetails.uri,
1469 asyncio.get_event_loop().time() - stream_started_at,
1470 )
1471 # handle optional fade-in
1472 if streamdetails.fade_in:
1473 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1474 fade_in_buffer += chunk
1475 elif fade_in_buffer:
1476 async for fade_chunk in get_ffmpeg_stream(
1477 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1478 # but FFMpeg class actually accepts bytes too. This works at
1479 # runtime but needs type: ignore for mypy.
1480 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1481 input_format=pcm_format,
1482 output_format=pcm_format,
1483 filter_params=["afade=type=in:start_time=0:duration=3"],
1484 ):
1485 yield fade_chunk
1486 fade_in_buffer = b""
1487 streamdetails.fade_in = False
1488 else:
1489 yield chunk
1490 # help garbage collection by explicitly deleting chunk
1491 del chunk
1492 finished = True
1493 except AudioError as err:
1494 streamdetails.stream_error = True
1495 queue_item.available = False
1496 if raise_on_error:
1497 raise
1498 # yes, we swallow the error here after logging it
1499 # so the outer stream can handle it gracefully
1500 self.logger.error(
1501 "AudioError while streaming queue item %s (%s): %s",
1502 queue_item.name,
1503 streamdetails.uri,
1504 err,
1505 )
1506 except asyncio.CancelledError:
1507 # Don't swallow cancellation - let it propagate
1508 raise
1509 except Exception as err:
1510 # Catch any other unexpected exceptions to prevent them from
1511 # silently killing the entire queue stream
1512 streamdetails.stream_error = True
1513 if raise_on_error:
1514 raise
1515 self.logger.exception(
1516 "Unexpected error while streaming queue item %s (%s): %s",
1517 queue_item.name,
1518 streamdetails.uri,
1519 err,
1520 )
1521 finally:
1522 # determine how many seconds we've streamed
1523 # for pcm output we can calculate this easily
1524 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1525 streamdetails.seconds_streamed = seconds_streamed
1526 self.logger.debug(
1527 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1528 "aborted" if not finished else "finished",
1529 streamdetails.uri,
1530 asyncio.get_event_loop().time() - stream_started_at,
1531 seconds_streamed,
1532 )
1533 # report stream to provider
1534 if (finished or seconds_streamed >= 90) and (
1535 music_prov := self.mass.get_provider(streamdetails.provider)
1536 ):
1537 if TYPE_CHECKING: # avoid circular import
1538 assert isinstance(music_prov, MusicProvider)
1539 self.mass.create_task(music_prov.on_streamed(streamdetails))
1540
1541 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1542 async def get_queue_item_stream_with_smartfade(
1543 self,
1544 queue_item: QueueItem,
1545 pcm_format: AudioFormat,
1546 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1547 standard_crossfade_duration: int = 10,
1548 ) -> AsyncGenerator[bytes, None]:
1549 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1550 queue = self.mass.player_queues.get(queue_item.queue_id)
1551 if not queue:
1552 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1553
1554 streamdetails = queue_item.streamdetails
1555 assert streamdetails
1556 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1557 self.logger.debug(
1558 "Crossfade data pop for queue %s (track: %s): %s",
1559 queue.display_name,
1560 queue_item.name,
1561 "found" if crossfade_data else "EMPTY - no crossfade data from previous track",
1562 )
1563
1564 if crossfade_data and streamdetails.seek_position > 0:
1565 # don't do crossfade when seeking into track
1566 crossfade_data = None
1567 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1568 # edge case alert: the next item changed just while we were preloading/crossfading
1569 self.logger.warning(
1570 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1571 )
1572 crossfade_data = None
1573
1574 self.logger.debug(
1575 "Start Streaming queue track: %s (%s) for queue %s "
1576 "- crossfade mode: %s "
1577 "- crossfading from previous track: %s ",
1578 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1579 queue_item.name,
1580 queue.display_name,
1581 smart_fades_mode,
1582 "true" if crossfade_data else "false",
1583 )
1584
1585 buffer = b""
1586 bytes_written = 0
1587 # calculate crossfade buffer size
1588 crossfade_buffer_duration = (
1589 SMART_CROSSFADE_DURATION
1590 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1591 else standard_crossfade_duration
1592 )
1593 crossfade_buffer_duration = min(
1594 crossfade_buffer_duration,
1595 int(streamdetails.duration / 2)
1596 if streamdetails.duration
1597 else crossfade_buffer_duration,
1598 )
1599 # Ensure crossfade buffer size is aligned to frame boundaries
1600 # Frame size = bytes_per_sample * channels
1601 bytes_per_sample = pcm_format.bit_depth // 8
1602 frame_size = bytes_per_sample * pcm_format.channels
1603 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1604 # Round down to nearest frame boundary
1605 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1606 fade_out_data: bytes | None = None
1607
1608 if crossfade_data:
1609 # Calculate discard amount in seconds (format-independent)
1610 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1611 fade_in_duration_seconds = (
1612 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1613 )
1614 discard_seconds = int(fade_in_duration_seconds) - 1
1615 # Calculate discard amounts in CURRENT track's format
1616 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1617 # Convert fade_in_size to current track's format for correct leftover calculation
1618 fade_in_size_in_current_format = int(
1619 fade_in_duration_seconds * pcm_format.pcm_sample_size
1620 )
1621 discard_leftover = fade_in_size_in_current_format - discard_bytes
1622 else:
1623 discard_seconds = streamdetails.seek_position
1624 discard_leftover = 0
1625 total_chunks_received = 0
1626 req_buffer_size = crossfade_buffer_size
1627 async for chunk in self.get_queue_item_stream(
1628 queue_item,
1629 pcm_format,
1630 seek_position=discard_seconds,
1631 playback_speed=cast("float", queue_item.extra_attributes.get("playback_speed", 1.0)),
1632 ):
1633 total_chunks_received += 1
1634 if discard_leftover:
1635 # discard leftover bytes from crossfade data
1636 chunk = chunk[discard_leftover:] # noqa: PLW2901
1637 discard_leftover = 0
1638
1639 if total_chunks_received < 10:
1640 # we want a stream to start as quickly as possible
1641 # so for the first 10 chunks we keep a very short buffer
1642 req_buffer_size = pcm_format.pcm_sample_size
1643 else:
1644 req_buffer_size = crossfade_buffer_size
1645
1646 # ALWAYS APPEND CHUNK TO BUFFER
1647 buffer += chunk
1648 del chunk
1649 if len(buffer) < req_buffer_size:
1650 # buffer is not full enough, move on
1651 continue
1652
1653 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1654 if crossfade_data:
1655 # send the (second half of the) crossfade data
1656 if crossfade_data.pcm_format != pcm_format:
1657 # edge case: pcm format mismatch, we need to resample
1658 self.logger.debug(
1659 "Resampling crossfade data from %s to %s for queue %s",
1660 crossfade_data.pcm_format.sample_rate,
1661 pcm_format.sample_rate,
1662 queue.display_name,
1663 )
1664 resampled_data = await resample_pcm_audio(
1665 crossfade_data.data,
1666 crossfade_data.pcm_format,
1667 pcm_format,
1668 )
1669 if resampled_data:
1670 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1671 yield _chunk
1672 bytes_written += len(resampled_data)
1673 else:
1674 # Resampling failed, error already logged in resample_pcm_audio
1675 # Skip crossfade data entirely - stream continues without it
1676 self.logger.warning(
1677 "Skipping crossfade data for queue %s due to resampling failure",
1678 queue.display_name,
1679 )
1680 else:
1681 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1682 yield _chunk
1683 bytes_written += len(crossfade_data.data)
1684 # clear vars
1685 crossfade_data = None
1686
1687 #### OTHER: enough data in buffer, feed to output
1688 while len(buffer) > req_buffer_size:
1689 yield buffer[: pcm_format.pcm_sample_size]
1690 bytes_written += pcm_format.pcm_sample_size
1691 buffer = buffer[pcm_format.pcm_sample_size :]
1692
1693 #### HANDLE END OF TRACK
1694
1695 if crossfade_data:
1696 # edge case: we did not get enough data to send the crossfade data
1697 # send the (second half of the) crossfade data
1698 if crossfade_data.pcm_format != pcm_format:
1699 # (yet another) edge case: pcm format mismatch, we need to resample
1700 self.logger.debug(
1701 "Resampling remaining crossfade data from %s to %s for queue %s",
1702 crossfade_data.pcm_format.sample_rate,
1703 pcm_format.sample_rate,
1704 queue.display_name,
1705 )
1706 resampled_crossfade_data = await resample_pcm_audio(
1707 crossfade_data.data,
1708 crossfade_data.pcm_format,
1709 pcm_format,
1710 )
1711 if resampled_crossfade_data:
1712 crossfade_data.data = resampled_crossfade_data
1713 else:
1714 # Resampling failed, error already logged in resample_pcm_audio
1715 # Skip the crossfade data entirely
1716 self.logger.warning(
1717 "Skipping remaining crossfade data for queue %s due to resampling failure",
1718 queue.display_name,
1719 )
1720 crossfade_data = None
1721 if crossfade_data:
1722 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1723 yield _chunk
1724 bytes_written += len(crossfade_data.data)
1725 crossfade_data = None
1726
1727 # get next track for crossfade
1728 next_queue_item: QueueItem | None
1729 try:
1730 self.logger.debug(
1731 "Preloading NEXT track for crossfade for queue %s",
1732 queue.display_name,
1733 )
1734 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1735 queue.queue_id, queue_item.queue_item_id
1736 )
1737 # set index_in_buffer to prevent our next track is overwritten while preloading
1738 if next_queue_item.streamdetails is None:
1739 raise InvalidDataError(
1740 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1741 )
1742 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1743 queue.queue_id, next_queue_item.queue_item_id
1744 )
1745 queue_player = self.mass.players.get_player(queue.queue_id)
1746 assert queue_player is not None
1747 next_queue_item_pcm_format = await self._select_pcm_format(
1748 player=queue_player,
1749 streamdetails=next_queue_item.streamdetails,
1750 smartfades_enabled=True,
1751 )
1752 except QueueEmpty:
1753 # end of queue reached, no next item
1754 next_queue_item = None
1755
1756 crossfade_allowed = bool(next_queue_item) and self._crossfade_allowed(
1757 queue_item,
1758 smart_fades_mode=smart_fades_mode,
1759 flow_mode=False,
1760 next_queue_item=next_queue_item,
1761 sample_rate=pcm_format.sample_rate,
1762 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1763 )
1764 if not crossfade_allowed:
1765 if not next_queue_item:
1766 self.logger.debug(
1767 "Enqueue mode: no next queue item loaded (QueueEmpty) for queue %s",
1768 queue.display_name,
1769 )
1770 else:
1771 self.logger.debug(
1772 "Enqueue mode: crossfade NOT allowed for track %s -> %s on queue %s",
1773 queue_item.name,
1774 next_queue_item.name,
1775 queue.display_name,
1776 )
1777 # no crossfade enabled/allowed, just yield the buffer last part
1778 bytes_written += len(buffer)
1779 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1780 yield _chunk
1781 else:
1782 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1783 assert next_queue_item is not None
1784 fade_out_data = buffer
1785 buffer = b""
1786 try:
1787 async for chunk in self.get_queue_item_stream(
1788 next_queue_item,
1789 next_queue_item_pcm_format,
1790 playback_speed=cast(
1791 "float", queue_item.extra_attributes.get("playback_speed", 1.0)
1792 ),
1793 ):
1794 # append to buffer until we reach crossfade size
1795 # we only need the first X seconds of the NEXT track so we can
1796 # perform the crossfade.
1797 # the crossfaded audio of the previous and next track will be
1798 # sent in two equal parts: first half now, second half
1799 # when the next track starts. We use CrossfadeData to store
1800 # the second half to be picked up by the next track's stream generator.
1801 # Note that we more or less expect the user to have enabled the in-memory
1802 # buffer so we can keep the next track's audio data in memory.
1803 buffer += chunk
1804 del chunk
1805 if len(buffer) >= crossfade_buffer_size:
1806 break
1807 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1808 # Store original buffer size before any resampling for fade_in_size calculation
1809 # This size is in the next track's original format which is what we need
1810 original_buffer_size = len(buffer)
1811 if next_queue_item_pcm_format != pcm_format:
1812 # edge case: pcm format mismatch, we need to resample the next track's
1813 # beginning part before crossfading
1814 self.logger.debug(
1815 "Resampling next track's crossfade from %s to %s for queue %s",
1816 next_queue_item_pcm_format.sample_rate,
1817 pcm_format.sample_rate,
1818 queue.display_name,
1819 )
1820 buffer = await resample_pcm_audio(
1821 buffer,
1822 next_queue_item_pcm_format,
1823 pcm_format,
1824 )
1825 # perform actual (smart fades) crossfade using mixer
1826 crossfade_bytes = await self._smart_fades_mixer.mix(
1827 fade_in_part=buffer,
1828 fade_out_part=fade_out_data,
1829 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1830 fade_out_streamdetails=streamdetails,
1831 pcm_format=pcm_format,
1832 standard_crossfade_duration=standard_crossfade_duration,
1833 mode=smart_fades_mode,
1834 )
1835 # send half of the crossfade_part (= approx the fadeout part)
1836 split_point = (len(crossfade_bytes) + 1) // 2
1837 crossfade_first = crossfade_bytes[:split_point]
1838 crossfade_second = crossfade_bytes[split_point:]
1839 del crossfade_bytes
1840 bytes_written += len(crossfade_first)
1841 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1842 yield _chunk
1843 # store the other half for the next track
1844 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1845 # because it was created from the resampled buffer used for mixing.
1846 # BUT fade_in_size represents bytes in NEXT track's original format
1847 # (next_queue_item_pcm_format) because that's how much of the next track
1848 # was consumed during the crossfade. We need both formats to correctly
1849 # handle the crossfade data when the next track starts.
1850 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1851 data=crossfade_second,
1852 fade_in_size=original_buffer_size,
1853 pcm_format=pcm_format, # Format of the data (current track)
1854 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1855 queue_item_id=next_queue_item.queue_item_id,
1856 )
1857 self.logger.debug(
1858 "Crossfade data STORED for queue %s (outgoing track: %s, incoming track: %s)",
1859 queue.display_name,
1860 queue_item.name,
1861 next_queue_item.name if next_queue_item else "N/A",
1862 )
1863 except AudioError:
1864 # no crossfade possible, just yield the fade_out_data
1865 next_queue_item = None
1866 yield fade_out_data
1867 bytes_written += len(fade_out_data)
1868 del fade_out_data
1869 # make sure the buffer gets cleaned up
1870 del buffer
1871 # update duration details based on the actual pcm data we sent
1872 # this also accounts for crossfade and silence stripping
1873 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1874 streamdetails.seconds_streamed = seconds_streamed
1875 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1876 self.logger.debug(
1877 "Finished Streaming queue track: %s (%s) on queue %s "
1878 "- crossfade data prepared for next track: %s",
1879 streamdetails.uri,
1880 queue_item.name,
1881 queue.display_name,
1882 next_queue_item.name if next_queue_item else "N/A",
1883 )
1884
1885 def _log_request(self, request: web.Request) -> None:
1886 """Log request."""
1887 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1888 self.logger.log(
1889 VERBOSE_LOG_LEVEL,
1890 "Got %s request to %s from %s\nheaders: %s\n",
1891 request.method,
1892 request.path,
1893 request.remote,
1894 request.headers,
1895 )
1896 else:
1897 self.logger.debug(
1898 "Got %s request to %s from %s",
1899 request.method,
1900 request.path,
1901 request.remote,
1902 )
1903
1904 async def get_output_format(
1905 self,
1906 output_format_str: str,
1907 player: Player,
1908 content_sample_rate: int,
1909 content_bit_depth: int,
1910 ) -> AudioFormat:
1911 """Parse (player specific) output format details for given format string."""
1912 content_type: ContentType = ContentType.try_parse(output_format_str)
1913 supported_rates_conf = cast(
1914 "list[tuple[str, str]]",
1915 await self.mass.config.get_player_config_value(
1916 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1917 ),
1918 )
1919 output_channels_str = self.mass.config.get_raw_player_config_value(
1920 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1921 )
1922 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1923 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1924
1925 player_max_bit_depth = max(supported_bit_depths)
1926 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1927 if content_sample_rate in supported_sample_rates:
1928 output_sample_rate = content_sample_rate
1929 else:
1930 output_sample_rate = max(supported_sample_rates)
1931
1932 if not content_type.is_lossless():
1933 # no point in having a higher bit depth for lossy formats
1934 output_bit_depth = 16
1935 output_sample_rate = min(48000, output_sample_rate)
1936 if output_format_str == "pcm":
1937 content_type = ContentType.from_bit_depth(output_bit_depth)
1938 return AudioFormat(
1939 content_type=content_type,
1940 sample_rate=output_sample_rate,
1941 bit_depth=output_bit_depth,
1942 channels=1 if output_channels_str != "stereo" else 2,
1943 )
1944
1945 async def _select_flow_format(
1946 self,
1947 player: Player,
1948 ) -> AudioFormat:
1949 """Parse (player specific) flow stream PCM format."""
1950 supported_rates_conf = cast(
1951 "list[tuple[str, str]]",
1952 await self.mass.config.get_player_config_value(
1953 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1954 ),
1955 )
1956 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1957 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1958 for sample_rate in (192000, 96000, 48000, 44100):
1959 if sample_rate in supported_sample_rates:
1960 output_sample_rate = sample_rate
1961 break
1962 return AudioFormat(
1963 content_type=INTERNAL_PCM_FORMAT.content_type,
1964 sample_rate=output_sample_rate,
1965 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1966 channels=2,
1967 )
1968
1969 async def _select_pcm_format(
1970 self,
1971 player: Player,
1972 streamdetails: StreamDetails,
1973 smartfades_enabled: bool,
1974 ) -> AudioFormat:
1975 """Parse (player specific) stream internal PCM format."""
1976 supported_rates_conf = cast(
1977 "list[tuple[str, str]]",
1978 await self.mass.config.get_player_config_value(
1979 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1980 ),
1981 )
1982 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1983 # use highest supported rate within content rate
1984 output_sample_rate = max(
1985 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1986 default=48000, # sane/safe default
1987 )
1988 # work out pcm format based on streamdetails
1989 pcm_format = AudioFormat(
1990 sample_rate=output_sample_rate,
1991 # always use f32 internally for extra headroom for filters etc
1992 content_type=INTERNAL_PCM_FORMAT.content_type,
1993 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1994 channels=streamdetails.audio_format.channels,
1995 )
1996 if smartfades_enabled:
1997 pcm_format.channels = 2 # force stereo for crossfading
1998
1999 return pcm_format
2000
2001 def _crossfade_allowed(
2002 self,
2003 queue_item: QueueItem,
2004 smart_fades_mode: SmartFadesMode,
2005 flow_mode: bool = False,
2006 next_queue_item: QueueItem | None = None,
2007 sample_rate: int | None = None,
2008 next_sample_rate: int | None = None,
2009 ) -> bool:
2010 """Get the crossfade config for a queue item."""
2011 if smart_fades_mode == SmartFadesMode.DISABLED:
2012 return False
2013 if not (self.mass.players.get_player(queue_item.queue_id)):
2014 return False # just a guard
2015 if queue_item.media_type != MediaType.TRACK:
2016 self.logger.debug("Skipping crossfade: current item is not a track")
2017 return False
2018 # check if the next item is part of the same album
2019 next_item = next_queue_item or self.mass.player_queues.get_next_item(
2020 queue_item.queue_id, queue_item.queue_item_id
2021 )
2022 if not next_item:
2023 # there is no next item!
2024 self.logger.debug(
2025 "Crossfade not allowed: no next item found for %s (flow_mode=%s, queue_id=%s)",
2026 queue_item.name,
2027 flow_mode,
2028 queue_item.queue_id,
2029 )
2030 return False
2031 # check if next item is a track
2032 if next_item.media_type != MediaType.TRACK:
2033 self.logger.debug("Skipping crossfade: next item is not a track")
2034 return False
2035 if (
2036 isinstance(queue_item.media_item, Track)
2037 and isinstance(next_item.media_item, Track)
2038 and queue_item.media_item.album
2039 and next_item.media_item.album
2040 and queue_item.media_item.album == next_item.media_item.album
2041 and not self.mass.config.get_raw_core_config_value(
2042 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
2043 )
2044 ):
2045 # in general, crossfade is not desired for tracks of the same (gapless) album
2046 # because we have no accurate way to determine if the album is gapless or not,
2047 # for now we just never crossfade between tracks of the same album
2048 self.logger.debug("Skipping crossfade: next item is part of the same album")
2049 return False
2050
2051 # check if we're allowed to crossfade on different sample rates
2052 if (
2053 not flow_mode
2054 and sample_rate
2055 and next_sample_rate
2056 and sample_rate != next_sample_rate
2057 and not self.mass.config.get_raw_player_config_value(
2058 queue_item.queue_id,
2059 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
2060 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
2061 )
2062 ):
2063 self.logger.debug(
2064 "Skipping crossfade: player does not support gapless playback "
2065 "with different sample rates (%s vs %s)",
2066 sample_rate,
2067 next_sample_rate,
2068 )
2069 return False
2070
2071 return True
2072
2073 async def _periodic_garbage_collection(self) -> None:
2074 """Periodic garbage collection to free up memory from audio buffers and streams."""
2075 self.logger.log(
2076 VERBOSE_LOG_LEVEL,
2077 "Running periodic garbage collection...",
2078 )
2079 # Run garbage collection in executor to avoid blocking the event loop
2080 # Since this runs periodically (not in response to subprocess cleanup),
2081 # it's safe to run in a thread without causing thread-safety issues
2082 loop = asyncio.get_running_loop()
2083 collected = await loop.run_in_executor(None, gc.collect)
2084 self.logger.log(
2085 VERBOSE_LOG_LEVEL,
2086 "Garbage collection completed, collected %d objects",
2087 collected,
2088 )
2089 # Schedule next run in 15 minutes
2090 self.mass.call_later(900, self._periodic_garbage_collection)
2091
2092 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
2093 """Set up smart fades logger level."""
2094 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
2095 if log_level == "GLOBAL":
2096 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2097 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2098 else:
2099 self.smart_fades_analyzer.logger.setLevel(log_level)
2100 self.smart_fades_mixer.logger.setLevel(log_level)
2101
2102 async def cleanup_stale_queue_buffers(self, queue_id: str, current_index: int) -> None:
2103 """
2104 Clean up audio buffers for queue items that are no longer needed.
2105
2106 This clears buffers for items at index <= current_index - 2, keeping only:
2107 - The previous track (current_index - 1)
2108 - The current track (current_index)
2109 - The next track (current_index + 1, handled by preloading)
2110
2111 :param queue_id: The queue ID to clean up buffers for.
2112 :param current_index: The current playing index in the queue.
2113 """
2114 if current_index < 2:
2115 return # Nothing to clean up yet
2116
2117 queue_items = self.mass.player_queues._queue_items.get(queue_id, [])
2118 cleanup_threshold = current_index - 2
2119 buffers_cleared = 0
2120
2121 for idx, item in enumerate(queue_items):
2122 if idx > cleanup_threshold:
2123 break # No need to check further
2124 if item.streamdetails and item.streamdetails.buffer:
2125 self.logger.log(
2126 VERBOSE_LOG_LEVEL,
2127 "Clearing stale audio buffer for queue item %s (index %d) in queue %s",
2128 item.name,
2129 idx,
2130 queue_id,
2131 )
2132 await item.streamdetails.buffer.clear()
2133 item.streamdetails.buffer = None
2134 buffers_cleared += 1
2135
2136 if buffers_cleared > 0:
2137 self.logger.debug(
2138 "Cleared %d stale audio buffer(s) for queue %s (items before index %d)",
2139 buffers_cleared,
2140 queue_id,
2141 cleanup_threshold + 1,
2142 )
2143
2144 async def cleanup_queue_audio_data(self, queue_id: str) -> None:
2145 """
2146 Clean up all audio-related data for a queue when it is stopped or cleared.
2147
2148 This clears:
2149 - All audio buffers attached to queue item streamdetails
2150 - Any pending crossfade data for the queue
2151
2152 :param queue_id: The queue ID to clean up.
2153 """
2154 # Clear crossfade data for this queue
2155 if queue_id in self._crossfade_data:
2156 self.logger.debug("Clearing crossfade data for queue %s", queue_id)
2157 del self._crossfade_data[queue_id]
2158
2159 # Clear all audio buffers for queue items
2160 queue_items = self.mass.player_queues._queue_items.get(queue_id, [])
2161 buffers_cleared = 0
2162
2163 for item in queue_items:
2164 if item.streamdetails and item.streamdetails.buffer:
2165 await item.streamdetails.buffer.clear()
2166 item.streamdetails.buffer = None
2167 buffers_cleared += 1
2168
2169 if buffers_cleared > 0:
2170 self.logger.debug(
2171 "Cleared %d audio buffer(s) for stopped/cleared queue %s",
2172 buffers_cleared,
2173 queue_id,
2174 )
2175