/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_HTTP_PROFILE,
49 CONF_OUTPUT_CHANNELS,
50 CONF_OUTPUT_CODEC,
51 CONF_PUBLISH_IP,
52 CONF_SAMPLE_RATES,
53 CONF_SMART_FADES_MODE,
54 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56 CONF_VOLUME_NORMALIZATION_RADIO,
57 CONF_VOLUME_NORMALIZATION_TRACKS,
58 DEFAULT_STREAM_HEADERS,
59 ICY_HEADERS,
60 INTERNAL_PCM_FORMAT,
61 SILENCE_FILE,
62 VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.helpers import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70 get_buffered_media_stream,
71 get_chunksize,
72 get_media_stream,
73 get_player_filter_params,
74 get_stream_details,
75 resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81 divide_chunks,
82 format_ip_for_url,
83 get_ip_addresses,
84 get_total_system_memory,
85)
86from music_assistant.helpers.webserver import Webserver
87from music_assistant.models.core_controller import CoreController
88from music_assistant.models.music_provider import MusicProvider
89from music_assistant.models.plugin import PluginProvider, PluginSource
90from music_assistant.models.smart_fades import SmartFadesMode
91from music_assistant.providers.universal_group.constants import UGP_PREFIX
92from music_assistant.providers.universal_group.player import UniversalGroupPlayer
93
94if TYPE_CHECKING:
95 from music_assistant_models.config_entries import CoreConfig
96 from music_assistant_models.player import PlayerMedia
97 from music_assistant_models.player_queue import PlayerQueue
98 from music_assistant_models.queue_item import QueueItem
99 from music_assistant_models.streamdetails import StreamDetails
100
101 from music_assistant.mass import MusicAssistant
102 from music_assistant.models.player import Player
103
104
105isfile = wrap(os.path.isfile)
106
107CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
108CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
109CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
110
111# Calculate total system memory once at module load time
112TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
113CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
114DEFAULT_PORT: Final[int] = 8097
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses(include_ipv6=True)
189 return (
190 ConfigEntry(
191 key=CONF_ALLOW_BUFFER,
192 type=ConfigEntryType.BOOLEAN,
193 default_value=CONF_ALLOW_BUFFER_DEFAULT,
194 label="Allow (in-memory) buffering of (track) audio",
195 description="By default, Music Assistant tries to be as resource "
196 "efficient as possible when streaming audio, especially considering "
197 "low-end devices such as Raspberry Pi's. This means that audio "
198 "buffering is disabled by default to reduce memory usage. \n\n"
199 "Enabling this option allows for in-memory buffering of audio, "
200 "which (massively) improves playback (and seeking) performance but it comes "
201 "at the cost of increased memory usage. "
202 "If you run Music Assistant on a capable device with enough memory, "
203 "enabling this option is strongly recommended.",
204 required=False,
205 category="playback",
206 ),
207 ConfigEntry(
208 key=CONF_VOLUME_NORMALIZATION_RADIO,
209 type=ConfigEntryType.STRING,
210 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
211 label="Volume normalization method for radio streams",
212 options=[
213 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
214 for x in VolumeNormalizationMode
215 ],
216 category="playback",
217 ),
218 ConfigEntry(
219 key=CONF_VOLUME_NORMALIZATION_TRACKS,
220 type=ConfigEntryType.STRING,
221 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
222 label="Volume normalization method for tracks",
223 options=[
224 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
225 for x in VolumeNormalizationMode
226 ],
227 category="playback",
228 ),
229 ConfigEntry(
230 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
231 type=ConfigEntryType.FLOAT,
232 range=(-20, 10),
233 default_value=-6,
234 label="Fixed/fallback gain adjustment for radio streams",
235 category="playback",
236 ),
237 ConfigEntry(
238 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
239 type=ConfigEntryType.FLOAT,
240 range=(-20, 10),
241 default_value=-6,
242 label="Fixed/fallback gain adjustment for tracks",
243 category="playback",
244 ),
245 ConfigEntry(
246 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
247 type=ConfigEntryType.BOOLEAN,
248 default_value=False,
249 label="Allow crossfade between tracks from the same album",
250 description="Enabling this option allows for crossfading between tracks "
251 "that are part of the same album.",
252 category="playback",
253 ),
254 ConfigEntry(
255 key=CONF_PUBLISH_IP,
256 type=ConfigEntryType.STRING,
257 default_value=ip_addresses[0],
258 label="Published IP address",
259 description="This IP address is communicated to players where to find this server."
260 "\nMake sure that this IP can be reached by players on the local network, "
261 "otherwise audio streaming will not work.",
262 required=False,
263 category="generic",
264 advanced=True,
265 requires_reload=True,
266 ),
267 ConfigEntry(
268 key=CONF_BIND_PORT,
269 type=ConfigEntryType.INTEGER,
270 default_value=DEFAULT_PORT,
271 label="TCP Port",
272 description="The TCP port to run the server. "
273 "Make sure that this server can be reached "
274 "on the given IP and TCP port by players on the local network.",
275 category="generic",
276 advanced=True,
277 requires_reload=True,
278 ),
279 ConfigEntry(
280 key=CONF_BIND_IP,
281 type=ConfigEntryType.STRING,
282 default_value="0.0.0.0",
283 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", "::", *ip_addresses}],
284 label="Bind to IP/interface",
285 description="Start the stream server on this specific interface. \n"
286 "Use 0.0.0.0 or :: to bind to all interfaces, which is the default. \n"
287 "This is an advanced setting that should normally "
288 "not be adjusted in regular setups.",
289 category="generic",
290 advanced=True,
291 required=False,
292 requires_reload=True,
293 ),
294 ConfigEntry(
295 key=CONF_SMART_FADES_LOG_LEVEL,
296 type=ConfigEntryType.STRING,
297 label="Smart Fades Log level",
298 description="Log level for the Smart Fades mixer and analyzer.",
299 options=CONF_ENTRY_LOG_LEVEL.options,
300 default_value="GLOBAL",
301 category="generic",
302 advanced=True,
303 ),
304 )
305
306 async def setup(self, config: CoreConfig) -> None:
307 """Async initialize of module."""
308 # copy log level to audio/ffmpeg loggers
309 AUDIO_LOGGER.setLevel(self.logger.level)
310 FFMPEG_LOGGER.setLevel(self.logger.level)
311 self._setup_smart_fades_logger(config)
312 # perform check for ffmpeg version
313 await check_ffmpeg_version()
314 # start the webserver
315 self.publish_port = config.get_value(CONF_BIND_PORT, DEFAULT_PORT)
316 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
317 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
318 # print a big fat message in the log where the streamserver is running
319 # because this is a common source of issues for people with more complex setups
320 self.logger.log(
321 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
322 "\n\n################################################################################\n"
323 "Starting streamserver on %s:%s\n"
324 "This is the IP address that is communicated to players.\n"
325 "If this is incorrect, audio will not play!\n"
326 "See the documentation how to configure the publish IP for the Streamserver\n"
327 "in Settings --> Core modules --> Streamserver\n"
328 "################################################################################\n",
329 self.publish_ip,
330 self.publish_port,
331 )
332 await self._server.setup(
333 bind_ip=bind_ip,
334 bind_port=cast("int", self.publish_port),
335 base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
336 static_routes=[
337 (
338 "*",
339 "/flow/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
340 self.serve_queue_flow_stream,
341 ),
342 (
343 "*",
344 "/single/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
345 self.serve_queue_item_stream,
346 ),
347 (
348 "*",
349 "/command/{queue_id}/{command}.mp3",
350 self.serve_command_request,
351 ),
352 (
353 "*",
354 "/announcement/{player_id}.{fmt}",
355 self.serve_announcement_stream,
356 ),
357 (
358 "*",
359 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
360 self.serve_plugin_source_stream,
361 ),
362 ],
363 )
364 # Start periodic garbage collection task
365 # This ensures memory from audio buffers and streams is cleaned up regularly
366 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
367
368 async def close(self) -> None:
369 """Cleanup on exit."""
370 await self._server.close()
371
372 async def resolve_stream_url(
373 self,
374 player_id: str,
375 media: PlayerMedia,
376 ) -> str:
377 """Resolve the stream URL for the given PlayerMedia."""
378 conf_output_codec = await self.mass.config.get_player_config_value(
379 player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
380 )
381 output_codec = ContentType.try_parse(conf_output_codec or "flac")
382 fmt = output_codec.value
383 # handle raw pcm without exact format specifiers
384 if output_codec.is_pcm() and ";" not in fmt:
385 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
386 extra_data = media.custom_data or {}
387 flow_mode = extra_data.get("flow_mode", False)
388 session_id = extra_data.get("session_id")
389 queue_item_id = media.queue_item_id
390 if not session_id or not queue_item_id:
391 raise InvalidDataError("Can not resolve stream URL: Invalid PlayerMedia data")
392 queue_id = media.source_id
393 base_path = "flow" if flow_mode else "single"
394 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}" # noqa: E501
395
396 async def get_plugin_source_url(
397 self,
398 plugin_source: PluginSource,
399 player_id: str,
400 ) -> str:
401 """Get the url for the Plugin Source stream/proxy."""
402 if plugin_source.audio_format.content_type.is_pcm():
403 fmt = ContentType.WAV.value
404 else:
405 fmt = plugin_source.audio_format.content_type.value
406 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
407
408 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
409 """Stream single queueitem audio to a player."""
410 self._log_request(request)
411 queue_id = request.match_info["queue_id"]
412 player_id = request.match_info["player_id"]
413 if not (queue := self.mass.player_queues.get(queue_id)):
414 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
415 session_id = request.match_info["session_id"]
416 if queue.session_id and session_id != queue.session_id:
417 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
418 if not (player := self.mass.players.get_player(player_id)):
419 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
420 queue_item_id = request.match_info["queue_item_id"]
421 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
422 if not queue_item:
423 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
424 if not queue_item.streamdetails:
425 try:
426 queue_item.streamdetails = await get_stream_details(
427 mass=self.mass, queue_item=queue_item
428 )
429 except Exception as e:
430 self.logger.error(
431 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
432 )
433 queue_item.available = False
434 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
435
436 # pick output format based on the streamdetails and player capabilities
437 pcm_format = await self._select_pcm_format(
438 player=player,
439 streamdetails=queue_item.streamdetails,
440 smartfades_enabled=True,
441 )
442 output_format = await self.get_output_format(
443 output_format_str=request.match_info["fmt"],
444 player=player,
445 content_sample_rate=pcm_format.sample_rate,
446 content_bit_depth=pcm_format.bit_depth,
447 )
448
449 # prepare request, add some DLNA/UPNP compatible headers
450 # icy-name is sanitized to avoid a "Potential header injection attack" exception by aiohttp
451 # see https://github.com/music-assistant/support/issues/4913
452 headers = {
453 **DEFAULT_STREAM_HEADERS,
454 "icy-name": queue_item.name.replace("\n", " ").replace("\r", " ").replace("\t", " "),
455 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
456 "Accept-Ranges": "none",
457 "Content-Type": f"audio/{output_format.output_format_str}",
458 }
459 resp = web.StreamResponse(
460 status=200,
461 reason="OK",
462 headers=headers,
463 )
464 resp.content_type = f"audio/{output_format.output_format_str}"
465 http_profile = await self.mass.config.get_player_config_value(
466 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
467 )
468 if http_profile == "forced_content_length" and not queue_item.duration:
469 # just set an insane high content length to make sure the player keeps playing
470 resp.content_length = get_chunksize(output_format, 12 * 3600)
471 elif http_profile == "forced_content_length" and queue_item.duration:
472 # guess content length based on duration
473 resp.content_length = get_chunksize(output_format, queue_item.duration)
474 elif http_profile == "chunked":
475 resp.enable_chunked_encoding()
476
477 await resp.prepare(request)
478
479 # return early if this is not a GET request
480 if request.method != "GET":
481 return resp
482
483 if queue_item.media_type != MediaType.TRACK:
484 # no crossfade on non-tracks
485 smart_fades_mode = SmartFadesMode.DISABLED
486 else:
487 smart_fades_mode = await self.mass.config.get_player_config_value(
488 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
489 )
490 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
491 queue.queue_id, CONF_CROSSFADE_DURATION, 10
492 )
493 if (
494 smart_fades_mode != SmartFadesMode.DISABLED
495 and PlayerFeature.GAPLESS_PLAYBACK not in player.state.supported_features
496 ):
497 # crossfade is not supported on this player due to missing gapless playback
498 self.logger.warning(
499 "Crossfade disabled: Player %s does not support gapless playback, "
500 "consider enabling flow mode to enable crossfade on this player.",
501 player.state.name if player else "Unknown Player",
502 )
503 smart_fades_mode = SmartFadesMode.DISABLED
504
505 if smart_fades_mode != SmartFadesMode.DISABLED:
506 # crossfade is enabled, use special crossfaded single item stream
507 # where the crossfade of the next track is present in the stream of
508 # a single track. This only works if the player supports gapless playback!
509 audio_input = self.get_queue_item_stream_with_smartfade(
510 queue_item=queue_item,
511 pcm_format=pcm_format,
512 smart_fades_mode=smart_fades_mode,
513 standard_crossfade_duration=standard_crossfade_duration,
514 )
515 else:
516 # no crossfade, just a regular single item stream
517 audio_input = self.get_queue_item_stream(
518 queue_item=queue_item,
519 pcm_format=pcm_format,
520 seek_position=queue_item.streamdetails.seek_position,
521 )
522 # stream the audio
523 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
524 # the desired output format for the player including any player specific filter params
525 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
526 first_chunk_received = False
527 bytes_sent = 0
528 async for chunk in get_ffmpeg_stream(
529 audio_input=audio_input,
530 input_format=pcm_format,
531 output_format=output_format,
532 filter_params=get_player_filter_params(
533 self.mass,
534 player_id=player.player_id,
535 input_format=pcm_format,
536 output_format=output_format,
537 ),
538 ):
539 try:
540 await resp.write(chunk)
541 bytes_sent += len(chunk)
542 if not first_chunk_received:
543 first_chunk_received = True
544 # inform the queue that the track is now loaded in the buffer
545 # so for example the next track can be enqueued
546 self.mass.player_queues.track_loaded_in_buffer(
547 queue_item.queue_id, queue_item.queue_item_id
548 )
549 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
550 if first_chunk_received and not player.stop_called:
551 # Player disconnected (unexpected) after receiving at least some data
552 # This could indicate buffering issues, network problems,
553 # or player-specific issues
554 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
555 self.logger.warning(
556 "Player %s disconnected prematurely from stream for %s (%s) - "
557 "error: %s, sent %d bytes, expected (approx) bytes=%d",
558 queue.display_name,
559 queue_item.name,
560 queue_item.uri,
561 err.__class__.__name__,
562 bytes_sent,
563 bytes_expected,
564 )
565 break
566 if queue_item.streamdetails.stream_error:
567 self.logger.error(
568 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
569 queue_item.name,
570 queue_item.uri,
571 queue.display_name,
572 )
573 # try to skip to the next item in the queue after a short delay
574 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
575 return resp
576
577 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
578 """Stream Queue Flow audio to player."""
579 self._log_request(request)
580 queue_id = request.match_info["queue_id"]
581 player_id = request.match_info["player_id"]
582 if not (queue := self.mass.player_queues.get(queue_id)):
583 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
584 if not (player := self.mass.players.get_player(player_id)):
585 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
586 start_queue_item_id = request.match_info["queue_item_id"]
587 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
588 if not start_queue_item:
589 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
590
591 queue.flow_mode_stream_log = []
592
593 # select the highest possible PCM settings for this player
594 flow_pcm_format = await self._select_flow_format(player)
595
596 # work out output format/details
597 output_format = await self.get_output_format(
598 output_format_str=request.match_info["fmt"],
599 player=player,
600 content_sample_rate=flow_pcm_format.sample_rate,
601 content_bit_depth=flow_pcm_format.bit_depth,
602 )
603 # work out ICY metadata support
604 icy_preference = self.mass.config.get_raw_player_config_value(
605 player_id,
606 CONF_ENTRY_ENABLE_ICY_METADATA.key,
607 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
608 )
609 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
610 icy_meta_interval = 256000 if icy_preference == "full" else 16384
611
612 # prepare request, add some DLNA/UPNP compatible headers
613 headers = {
614 **DEFAULT_STREAM_HEADERS,
615 **ICY_HEADERS,
616 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
617 "Accept-Ranges": "none",
618 "Content-Type": f"audio/{output_format.output_format_str}",
619 }
620 if enable_icy:
621 headers["icy-metaint"] = str(icy_meta_interval)
622
623 resp = web.StreamResponse(
624 status=200,
625 reason="OK",
626 headers=headers,
627 )
628 http_profile = await self.mass.config.get_player_config_value(
629 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
630 )
631 if http_profile == "forced_content_length":
632 # just set an insane high content length to make sure the player keeps playing
633 resp.content_length = get_chunksize(output_format, 12 * 3600)
634 elif http_profile == "chunked":
635 resp.enable_chunked_encoding()
636
637 await resp.prepare(request)
638
639 # return early if this is not a GET request
640 if request.method != "GET":
641 return resp
642
643 # all checks passed, start streaming!
644 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
645 # the desired output format for the player including any player specific filter params
646 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
647 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
648
649 async for chunk in get_ffmpeg_stream(
650 audio_input=self.get_queue_flow_stream(
651 queue=queue,
652 start_queue_item=start_queue_item,
653 pcm_format=flow_pcm_format,
654 ),
655 input_format=flow_pcm_format,
656 output_format=output_format,
657 filter_params=get_player_filter_params(
658 self.mass, player.player_id, flow_pcm_format, output_format
659 ),
660 # we need to slowly feed the music to avoid the player stopping and later
661 # restarting (or completely failing) the audio stream by keeping the buffer short.
662 # this is reported to be an issue especially with Chromecast players.
663 # see for example: https://github.com/music-assistant/support/issues/3717
664 # allow buffer ahead of 6 seconds and read rest in realtime
665 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
666 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
667 ):
668 try:
669 await resp.write(chunk)
670 except (BrokenPipeError, ConnectionResetError, ConnectionError):
671 # race condition
672 break
673
674 if not enable_icy:
675 continue
676
677 # if icy metadata is enabled, send the icy metadata after the chunk
678 if (
679 # use current item here and not buffered item, otherwise
680 # the icy metadata will be too much ahead
681 (current_item := queue.current_item)
682 and current_item.streamdetails
683 and current_item.streamdetails.stream_title
684 ):
685 title = current_item.streamdetails.stream_title
686 elif queue and current_item and current_item.name:
687 title = current_item.name
688 else:
689 title = "Music Assistant"
690 metadata = f"StreamTitle='{title}';".encode()
691 if icy_preference == "full" and current_item and current_item.image:
692 metadata += f"StreamURL='{current_item.image.path}'".encode()
693 while len(metadata) % 16 != 0:
694 metadata += b"\x00"
695 length = len(metadata)
696 length_b = chr(int(length / 16)).encode()
697 await resp.write(length_b + metadata)
698
699 return resp
700
701 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
702 """Handle special 'command' request for a player."""
703 self._log_request(request)
704 queue_id = request.match_info["queue_id"]
705 command = request.match_info["command"]
706 if command == "next":
707 self.mass.create_task(self.mass.player_queues.next(queue_id))
708 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
709
710 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
711 """Stream announcement audio to a player."""
712 self._log_request(request)
713 player_id = request.match_info["player_id"]
714 player = self.mass.player_queues.get(player_id)
715 if not player:
716 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
717 if not (announce_data := self.announcements.get(player_id)):
718 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
719
720 # work out output format/details
721 fmt = request.match_info["fmt"]
722 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
723
724 http_profile = await self.mass.config.get_player_config_value(
725 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
726 )
727 if http_profile == "forced_content_length":
728 # given the fact that an announcement is just a short audio clip,
729 # just send it over completely at once so we have a fixed content length
730 data = b""
731 async for chunk in self.get_announcement_stream(
732 announcement_url=announce_data["announcement_url"],
733 output_format=audio_format,
734 pre_announce=announce_data["pre_announce"],
735 pre_announce_url=announce_data["pre_announce_url"],
736 ):
737 data += chunk
738 return web.Response(
739 body=data,
740 content_type=f"audio/{audio_format.output_format_str}",
741 headers=DEFAULT_STREAM_HEADERS,
742 )
743
744 resp = web.StreamResponse(
745 status=200,
746 reason="OK",
747 headers=DEFAULT_STREAM_HEADERS,
748 )
749 resp.content_type = f"audio/{audio_format.output_format_str}"
750 if http_profile == "chunked":
751 resp.enable_chunked_encoding()
752
753 await resp.prepare(request)
754
755 # return early if this is not a GET request
756 if request.method != "GET":
757 return resp
758
759 # all checks passed, start streaming!
760 self.logger.debug(
761 "Start serving audio stream for Announcement %s to %s",
762 announce_data["announcement_url"],
763 player.state.name,
764 )
765 async for chunk in self.get_announcement_stream(
766 announcement_url=announce_data["announcement_url"],
767 output_format=audio_format,
768 pre_announce=announce_data["pre_announce"],
769 pre_announce_url=announce_data["pre_announce_url"],
770 ):
771 try:
772 await resp.write(chunk)
773 except (BrokenPipeError, ConnectionResetError):
774 break
775
776 self.logger.debug(
777 "Finished serving audio stream for Announcement %s to %s",
778 announce_data["announcement_url"],
779 player.state.name,
780 )
781
782 return resp
783
784 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
785 """Stream PluginSource audio to a player."""
786 self._log_request(request)
787 plugin_source_id = request.match_info["plugin_source"]
788 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
789 if not provider:
790 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
791 # work out output format/details
792 player_id = request.match_info["player_id"]
793 player = self.mass.players.get_player(player_id)
794 if not player:
795 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
796 plugin_source = provider.get_source()
797 output_format = await self.get_output_format(
798 output_format_str=request.match_info["fmt"],
799 player=player,
800 content_sample_rate=plugin_source.audio_format.sample_rate,
801 content_bit_depth=plugin_source.audio_format.bit_depth,
802 )
803 headers = {
804 **DEFAULT_STREAM_HEADERS,
805 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
806 "icy-name": plugin_source.name,
807 "Accept-Ranges": "none",
808 "Content-Type": f"audio/{output_format.output_format_str}",
809 }
810
811 resp = web.StreamResponse(
812 status=200,
813 reason="OK",
814 headers=headers,
815 )
816 resp.content_type = f"audio/{output_format.output_format_str}"
817 http_profile = await self.mass.config.get_player_config_value(
818 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
819 )
820 if http_profile == "forced_content_length":
821 # just set an insanely high content length to make sure the player keeps playing
822 resp.content_length = get_chunksize(output_format, 12 * 3600)
823 elif http_profile == "chunked":
824 resp.enable_chunked_encoding()
825
826 await resp.prepare(request)
827
828 # return early if this is not a GET request
829 if request.method != "GET":
830 return resp
831
832 # all checks passed, start streaming!
833 if not plugin_source.audio_format:
834 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
835 async for chunk in self.get_plugin_source_stream(
836 plugin_source_id=plugin_source_id,
837 output_format=output_format,
838 player_id=player_id,
839 player_filter_params=get_player_filter_params(
840 self.mass, player_id, plugin_source.audio_format, output_format
841 ),
842 ):
843 try:
844 await resp.write(chunk)
845 except (BrokenPipeError, ConnectionResetError, ConnectionError):
846 break
847 return resp
848
849 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
850 """Get the url for the special command stream."""
851 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
852
853 def get_announcement_url(
854 self,
855 player_id: str,
856 announce_data: AnnounceData,
857 content_type: ContentType = ContentType.MP3,
858 ) -> str:
859 """Get the url for the special announcement stream."""
860 self.announcements[player_id] = announce_data
861 # use stream server to host announcement on local network
862 # this ensures playback on all players, including ones that do not
863 # like https hosts and it also offers the pre-announce 'bell'
864 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
865
866 def get_stream(
867 self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
868 ) -> AsyncGenerator[bytes, None]:
869 """
870 Get a stream of the given media as raw PCM audio.
871
872 This is used as helper for player providers that can consume the raw PCM
873 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
874 """
875 # select audio source
876 if media.media_type == MediaType.ANNOUNCEMENT:
877 # special case: stream announcement
878 assert media.custom_data
879 audio_source = self.get_announcement_stream(
880 media.custom_data["announcement_url"],
881 output_format=pcm_format,
882 pre_announce=media.custom_data["pre_announce"],
883 pre_announce_url=media.custom_data["pre_announce_url"],
884 )
885 elif media.media_type == MediaType.PLUGIN_SOURCE:
886 # special case: plugin source stream
887 assert media.custom_data
888 audio_source = self.get_plugin_source_stream(
889 plugin_source_id=media.custom_data["source_id"],
890 output_format=pcm_format,
891 # need to pass player_id from the PlayerMedia object
892 # because this could have been a group
893 player_id=media.custom_data["player_id"],
894 )
895 elif (
896 media.media_type == MediaType.FLOW_STREAM
897 and media.source_id
898 and media.source_id.startswith(UGP_PREFIX)
899 and media.uri
900 and "/ugp/" in media.uri
901 ):
902 # special case: member player accessing UGP stream
903 # Check URI to distinguish from the UGP accessing its own stream
904 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get_player(media.source_id))
905 ugp_stream = ugp_player.stream
906 assert ugp_stream is not None # for type checker
907 if ugp_stream.base_pcm_format == pcm_format:
908 # no conversion needed
909 audio_source = ugp_stream.subscribe_raw()
910 else:
911 audio_source = ugp_stream.get_stream(output_format=pcm_format)
912 elif (
913 media.source_id
914 and media.queue_item_id
915 and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
916 ):
917 # regular queue (flow) stream request
918 queue = self.mass.player_queues.get(media.source_id)
919 assert queue
920 start_queue_item = self.mass.player_queues.get_item(
921 media.source_id, media.queue_item_id
922 )
923 assert start_queue_item
924 audio_source = self.mass.streams.get_queue_flow_stream(
925 queue=queue,
926 start_queue_item=start_queue_item,
927 pcm_format=pcm_format,
928 )
929 elif media.source_id and media.queue_item_id:
930 # single item stream (e.g. radio)
931 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
932 assert queue_item
933 audio_source = buffered(
934 self.get_queue_item_stream(
935 queue_item=queue_item,
936 pcm_format=pcm_format,
937 ),
938 buffer_size=10,
939 min_buffer_before_yield=2,
940 )
941 else:
942 # assume url or some other direct path
943 # NOTE: this will fail if its an uri not playable by ffmpeg
944 audio_source = get_ffmpeg_stream(
945 audio_input=media.uri,
946 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
947 output_format=pcm_format,
948 )
949 return audio_source
950
951 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
952 async def get_queue_flow_stream(
953 self,
954 queue: PlayerQueue,
955 start_queue_item: QueueItem,
956 pcm_format: AudioFormat,
957 ) -> AsyncGenerator[bytes, None]:
958 """
959 Get a flow stream of all tracks in the queue as raw PCM audio.
960
961 yields chunks of exactly 1 second of audio in the given pcm_format.
962 """
963 # ruff: noqa: PLR0915
964 assert pcm_format.content_type.is_pcm()
965 queue_track = None
966 last_fadeout_part: bytes = b""
967 last_streamdetails: StreamDetails | None = None
968 last_play_log_entry: PlayLogEntry | None = None
969 queue.flow_mode = True
970 if not start_queue_item:
971 # this can happen in some (edge case) race conditions
972 return
973 pcm_sample_size = pcm_format.pcm_sample_size
974 if start_queue_item.media_type != MediaType.TRACK:
975 # no crossfade on non-tracks
976 smart_fades_mode = SmartFadesMode.DISABLED
977 standard_crossfade_duration = 0
978 else:
979 smart_fades_mode = await self.mass.config.get_player_config_value(
980 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
981 )
982 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
983 queue.queue_id, CONF_CROSSFADE_DURATION, 10
984 )
985 self.logger.info(
986 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
987 queue.display_name,
988 smart_fades_mode,
989 f"({standard_crossfade_duration}s)"
990 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
991 else "",
992 )
993 total_bytes_sent = 0
994 total_chunks_received = 0
995
996 while True:
997 # get (next) queue item to stream
998 if queue_track is None:
999 queue_track = start_queue_item
1000 else:
1001 try:
1002 queue_track = await self.mass.player_queues.load_next_queue_item(
1003 queue.queue_id, queue_track.queue_item_id
1004 )
1005 except QueueEmpty:
1006 break
1007
1008 if queue_track.streamdetails is None:
1009 self.logger.error(
1010 "No StreamDetails for queue item %s (%s) on queue %s - skipping track",
1011 queue_track.queue_item_id,
1012 queue_track.name,
1013 queue.display_name,
1014 )
1015 continue
1016
1017 self.logger.debug(
1018 "Start Streaming queue track: %s (%s) for queue %s",
1019 queue_track.streamdetails.uri,
1020 queue_track.name,
1021 queue.display_name,
1022 )
1023 # append to play log so the queue controller can work out which track is playing
1024 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1025 queue.flow_mode_stream_log.append(play_log_entry)
1026 # calculate crossfade buffer size
1027 crossfade_buffer_duration = (
1028 SMART_CROSSFADE_DURATION
1029 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1030 else standard_crossfade_duration
1031 )
1032 crossfade_buffer_duration = min(
1033 crossfade_buffer_duration,
1034 int(queue_track.streamdetails.duration / 2)
1035 if queue_track.streamdetails.duration
1036 else crossfade_buffer_duration,
1037 )
1038 # Ensure crossfade buffer size is aligned to frame boundaries
1039 # Frame size = bytes_per_sample * channels
1040 bytes_per_sample = pcm_format.bit_depth // 8
1041 frame_size = bytes_per_sample * pcm_format.channels
1042 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1043 # Round down to nearest frame boundary
1044 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1045
1046 bytes_written = 0
1047 buffer = b""
1048 # handle incoming audio chunks
1049 first_chunk_received = False
1050 # buffer size needs to be big enough to include the crossfade part
1051
1052 async for chunk in self.get_queue_item_stream(
1053 queue_track,
1054 pcm_format=pcm_format,
1055 seek_position=queue_track.streamdetails.seek_position,
1056 raise_on_error=False,
1057 ):
1058 total_chunks_received += 1
1059 if not first_chunk_received:
1060 first_chunk_received = True
1061 # inform the queue that the track is now loaded in the buffer
1062 # so the next track can be preloaded
1063 self.mass.player_queues.track_loaded_in_buffer(
1064 queue.queue_id, queue_track.queue_item_id
1065 )
1066 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1067 # we want a stream to start as quickly as possible
1068 # so for the first 10 chunks we keep a very short buffer
1069 req_buffer_size = pcm_format.pcm_sample_size
1070 else:
1071 req_buffer_size = (
1072 pcm_sample_size
1073 if smart_fades_mode == SmartFadesMode.DISABLED
1074 else crossfade_buffer_size
1075 )
1076
1077 # ALWAYS APPEND CHUNK TO BUFFER
1078 buffer += chunk
1079 del chunk
1080 if len(buffer) < req_buffer_size:
1081 # buffer is not full enough, move on
1082 # yield control to event loop with 10ms delay
1083 await asyncio.sleep(0.01)
1084 continue
1085
1086 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1087 if last_fadeout_part and last_streamdetails:
1088 # perform crossfade
1089 fadein_part = buffer[:crossfade_buffer_size]
1090 remaining_bytes = buffer[crossfade_buffer_size:]
1091 # Use the mixer to handle all crossfade logic
1092 try:
1093 crossfade_part = await self._smart_fades_mixer.mix(
1094 fade_in_part=fadein_part,
1095 fade_out_part=last_fadeout_part,
1096 fade_in_streamdetails=queue_track.streamdetails,
1097 fade_out_streamdetails=last_streamdetails,
1098 pcm_format=pcm_format,
1099 standard_crossfade_duration=standard_crossfade_duration,
1100 mode=smart_fades_mode,
1101 )
1102 except Exception as mix_err:
1103 self.logger.warning(
1104 "Crossfade mixer failed for %s, falling back to simple concat: %s",
1105 queue_track.name,
1106 mix_err,
1107 )
1108 # Fallback: just output the fadeout part then the buffer
1109 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1110 yield _chunk
1111 bytes_written += len(_chunk)
1112 del _chunk
1113 crossfade_part = b""
1114 remaining_bytes = buffer
1115 if crossfade_part:
1116 # because the crossfade exists of both the fadein and fadeout part
1117 # we need to correct the bytes_written accordingly so the duration
1118 # calculations at the end of the track are correct
1119 crossfade_part_len = len(crossfade_part)
1120 bytes_written += int(crossfade_part_len / 2)
1121 if last_play_log_entry:
1122 assert last_play_log_entry.seconds_streamed is not None
1123 last_play_log_entry.seconds_streamed += (
1124 crossfade_part_len / 2 / pcm_sample_size
1125 )
1126 # yield crossfade_part (in pcm_sample_size chunks)
1127 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1128 yield _chunk
1129 del _chunk
1130 del crossfade_part
1131 # also write the leftover bytes from the crossfade action
1132 if remaining_bytes:
1133 yield remaining_bytes
1134 bytes_written += len(remaining_bytes)
1135 del remaining_bytes
1136 # clear vars
1137 last_fadeout_part = b""
1138 last_streamdetails = None
1139 buffer = b""
1140
1141 #### OTHER: enough data in buffer, feed to output
1142 while len(buffer) > req_buffer_size:
1143 yield buffer[:pcm_sample_size]
1144 bytes_written += pcm_sample_size
1145 buffer = buffer[pcm_sample_size:]
1146
1147 #### HANDLE END OF TRACK
1148 if not first_chunk_received:
1149 # Track failed to stream - no chunks received at all
1150 self.logger.warning(
1151 "Track %s (%s) on queue %s produced no audio data - skipping",
1152 queue_track.name,
1153 queue_track.streamdetails.uri if queue_track.streamdetails else "unknown",
1154 queue.display_name,
1155 )
1156 # Clean up and continue to next track
1157 queue_track.streamdetails.stream_error = True
1158 del buffer
1159 continue
1160 if last_fadeout_part:
1161 # edge case: we did not get enough data to make the crossfade
1162 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1163 yield _chunk
1164 del _chunk
1165 bytes_written += len(last_fadeout_part)
1166 last_fadeout_part = b""
1167 crossfade_allowed = self._crossfade_allowed(
1168 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1169 )
1170 if crossfade_allowed:
1171 # if crossfade is enabled, save fadeout part to pickup for next track
1172 last_fadeout_part = buffer[-crossfade_buffer_size:]
1173 last_streamdetails = queue_track.streamdetails
1174 last_play_log_entry = play_log_entry
1175 remaining_bytes = buffer[:-crossfade_buffer_size]
1176 if remaining_bytes:
1177 yield remaining_bytes
1178 bytes_written += len(remaining_bytes)
1179 del remaining_bytes
1180 elif smart_fades_mode != SmartFadesMode.DISABLED:
1181 self.logger.debug(
1182 "Flow mode: crossfade NOT allowed for track %s on queue %s"
1183 " - fadeout part not saved",
1184 queue_track.name,
1185 queue.display_name,
1186 )
1187 if not crossfade_allowed and buffer:
1188 # no crossfade enabled, just yield the buffer last part
1189 bytes_written += len(buffer)
1190 for _chunk in divide_chunks(buffer, pcm_sample_size):
1191 yield _chunk
1192 del _chunk
1193 # make sure the buffer gets cleaned up
1194 del buffer
1195
1196 # update duration details based on the actual pcm data we sent
1197 # this also accounts for crossfade and silence stripping
1198 seconds_streamed = bytes_written / pcm_sample_size
1199 queue_track.streamdetails.seconds_streamed = seconds_streamed
1200 queue_track.streamdetails.duration = int(
1201 queue_track.streamdetails.seek_position + seconds_streamed
1202 )
1203 play_log_entry.seconds_streamed = seconds_streamed
1204 play_log_entry.duration = queue_track.streamdetails.duration
1205 total_bytes_sent += bytes_written
1206 self.logger.debug(
1207 "Finished Streaming queue track: %s (%s) on queue %s",
1208 queue_track.streamdetails.uri,
1209 queue_track.name,
1210 queue.display_name,
1211 )
1212 #### HANDLE END OF QUEUE FLOW STREAM
1213 # end of queue flow: make sure we yield the last_fadeout_part
1214 if last_fadeout_part:
1215 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1216 yield _chunk
1217 del _chunk
1218 # correct seconds streamed/duration
1219 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1220 streamdetails = queue_track.streamdetails
1221 assert streamdetails is not None
1222 streamdetails.seconds_streamed = (
1223 streamdetails.seconds_streamed or 0
1224 ) + last_part_seconds
1225 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1226 last_fadeout_part = b""
1227 total_bytes_sent += bytes_written
1228 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1229
1230 async def get_announcement_stream(
1231 self,
1232 announcement_url: str,
1233 output_format: AudioFormat,
1234 pre_announce: bool | str = False,
1235 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1236 ) -> AsyncGenerator[bytes, None]:
1237 """Get the special announcement stream."""
1238 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1239 # we are doing announcement in PCM first to avoid multiple encodings
1240 # when mixing pre-announce and announcement
1241 # also we have to deal with some TTS sources being super slow in delivering audio
1242 # so we take an approach where we start fetching the announcement in the background
1243 # while we can already start playing the pre-announce sound (if any)
1244
1245 pcm_format = (
1246 output_format
1247 if output_format.content_type.is_pcm()
1248 else AudioFormat(
1249 sample_rate=output_format.sample_rate,
1250 content_type=ContentType.PCM_S16LE,
1251 bit_depth=16,
1252 channels=output_format.channels,
1253 )
1254 )
1255
1256 async def fetch_announcement() -> None:
1257 fmt = announcement_url.rsplit(".")[-1]
1258 async for chunk in get_ffmpeg_stream(
1259 audio_input=announcement_url,
1260 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1261 output_format=pcm_format,
1262 chunk_size=get_chunksize(pcm_format, 1),
1263 ):
1264 await announcement_data.put(chunk)
1265 await announcement_data.put(None) # signal end of stream
1266
1267 self.mass.create_task(fetch_announcement())
1268
1269 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1270 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1271 if pre_announce:
1272 async for chunk in get_ffmpeg_stream(
1273 audio_input=pre_announce_url,
1274 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1275 output_format=pcm_format,
1276 chunk_size=get_chunksize(pcm_format, 1),
1277 ):
1278 yield chunk
1279 # pad silence while we're waiting for the announcement to be ready
1280 while announcement_data.empty():
1281 yield b"\0" * int(
1282 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1283 )
1284 await asyncio.sleep(0.1)
1285 # stream announcement
1286 while True:
1287 announcement_chunk = await announcement_data.get()
1288 if announcement_chunk is None:
1289 break
1290 yield announcement_chunk
1291
1292 if output_format == pcm_format:
1293 # no need to re-encode, just yield the raw PCM stream
1294 async for chunk in _announcement_stream():
1295 yield chunk
1296 return
1297
1298 # stream final announcement in requested output format
1299 async for chunk in get_ffmpeg_stream(
1300 audio_input=_announcement_stream(),
1301 input_format=pcm_format,
1302 output_format=output_format,
1303 ):
1304 yield chunk
1305
1306 async def get_plugin_source_stream(
1307 self,
1308 plugin_source_id: str,
1309 output_format: AudioFormat,
1310 player_id: str,
1311 player_filter_params: list[str] | None = None,
1312 ) -> AsyncGenerator[bytes, None]:
1313 """Get the special plugin source stream."""
1314 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1315 if not plugin_prov:
1316 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1317
1318 plugin_source = plugin_prov.get_source()
1319 self.logger.debug(
1320 "Start streaming PluginSource %s to %s using output format %s",
1321 plugin_source_id,
1322 player_id,
1323 output_format,
1324 )
1325 # this should already be set by the player controller, but just to be sure
1326 plugin_source.in_use_by = player_id
1327
1328 try:
1329 async for chunk in get_ffmpeg_stream(
1330 audio_input=cast(
1331 "str | AsyncGenerator[bytes, None]",
1332 plugin_prov.get_audio_stream(player_id)
1333 if plugin_source.stream_type == StreamType.CUSTOM
1334 else plugin_source.path,
1335 ),
1336 input_format=plugin_source.audio_format,
1337 output_format=output_format,
1338 filter_params=player_filter_params,
1339 extra_input_args=["-y", "-re"],
1340 ):
1341 if plugin_source.in_use_by != player_id:
1342 # another player took over or the stream ended, stop streaming
1343 break
1344 yield chunk
1345 finally:
1346 self.logger.debug(
1347 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1348 )
1349 await asyncio.sleep(1) # prevent race conditions when selecting source
1350 if plugin_source.in_use_by == player_id:
1351 # release control
1352 plugin_source.in_use_by = None
1353
1354 async def get_queue_item_stream(
1355 self,
1356 queue_item: QueueItem,
1357 pcm_format: AudioFormat,
1358 seek_position: int = 0,
1359 raise_on_error: bool = True,
1360 ) -> AsyncGenerator[bytes, None]:
1361 """Get the (PCM) audio stream for a single queue item."""
1362 # collect all arguments for ffmpeg
1363 streamdetails = queue_item.streamdetails
1364 assert streamdetails
1365 filter_params: list[str] = []
1366
1367 # handle volume normalization
1368 gain_correct: float | None = None
1369 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1370 # volume normalization using loudnorm filter (in dynamic mode)
1371 # which also collects the measurement on the fly during playback
1372 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1373 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1374 filter_rule += ":print_format=json"
1375 filter_params.append(filter_rule)
1376 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1377 # apply user defined fixed volume/gain correction
1378 config_key = (
1379 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1380 if streamdetails.media_type == MediaType.TRACK
1381 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1382 )
1383 gain_value = await self.mass.config.get_core_config_value(
1384 self.domain, config_key, default=0.0, return_type=float
1385 )
1386 gain_correct = round(gain_value, 2)
1387 filter_params.append(f"volume={gain_correct}dB")
1388 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1389 # volume normalization with known loudness measurement
1390 # apply volume/gain correction
1391 target_loudness = (
1392 float(streamdetails.target_loudness)
1393 if streamdetails.target_loudness is not None
1394 else 0.0
1395 )
1396 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1397 gain_correct = target_loudness - float(streamdetails.loudness_album)
1398 elif streamdetails.loudness is not None:
1399 gain_correct = target_loudness - float(streamdetails.loudness)
1400 else:
1401 gain_correct = 0.0
1402 gain_correct = round(gain_correct, 2)
1403 filter_params.append(f"volume={gain_correct}dB")
1404 streamdetails.volume_normalization_gain_correct = gain_correct
1405
1406 allow_buffer = bool(
1407 self.mass.config.get_raw_core_config_value(
1408 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1409 )
1410 and streamdetails.duration
1411 )
1412
1413 self.logger.debug(
1414 "Starting queue item stream for %s (%s)"
1415 " - using buffer: %s"
1416 " - using fade-in: %s"
1417 " - using volume normalization: %s",
1418 queue_item.name,
1419 streamdetails.uri,
1420 allow_buffer,
1421 streamdetails.fade_in,
1422 streamdetails.volume_normalization_mode,
1423 )
1424 if allow_buffer:
1425 media_stream_gen = get_buffered_media_stream(
1426 self.mass,
1427 streamdetails=streamdetails,
1428 pcm_format=pcm_format,
1429 seek_position=int(seek_position),
1430 filter_params=filter_params,
1431 )
1432 else:
1433 media_stream_gen = get_media_stream(
1434 self.mass,
1435 streamdetails=streamdetails,
1436 pcm_format=pcm_format,
1437 seek_position=int(seek_position),
1438 filter_params=filter_params,
1439 )
1440
1441 first_chunk_received = False
1442 fade_in_buffer = b""
1443 bytes_received = 0
1444 finished = False
1445 stream_started_at = asyncio.get_event_loop().time()
1446 try:
1447 async for chunk in media_stream_gen:
1448 bytes_received += len(chunk)
1449 if not first_chunk_received:
1450 first_chunk_received = True
1451 self.logger.debug(
1452 "First audio chunk received for %s (%s) after %.2f seconds",
1453 queue_item.name,
1454 streamdetails.uri,
1455 asyncio.get_event_loop().time() - stream_started_at,
1456 )
1457 # handle optional fade-in
1458 if streamdetails.fade_in:
1459 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1460 fade_in_buffer += chunk
1461 elif fade_in_buffer:
1462 async for fade_chunk in get_ffmpeg_stream(
1463 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1464 # but FFMpeg class actually accepts bytes too. This works at
1465 # runtime but needs type: ignore for mypy.
1466 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1467 input_format=pcm_format,
1468 output_format=pcm_format,
1469 filter_params=["afade=type=in:start_time=0:duration=3"],
1470 ):
1471 yield fade_chunk
1472 fade_in_buffer = b""
1473 streamdetails.fade_in = False
1474 else:
1475 yield chunk
1476 # help garbage collection by explicitly deleting chunk
1477 del chunk
1478 finished = True
1479 except AudioError as err:
1480 streamdetails.stream_error = True
1481 queue_item.available = False
1482 if raise_on_error:
1483 raise
1484 # yes, we swallow the error here after logging it
1485 # so the outer stream can handle it gracefully
1486 self.logger.error(
1487 "AudioError while streaming queue item %s (%s): %s",
1488 queue_item.name,
1489 streamdetails.uri,
1490 err,
1491 )
1492 except asyncio.CancelledError:
1493 # Don't swallow cancellation - let it propagate
1494 raise
1495 except Exception as err:
1496 # Catch any other unexpected exceptions to prevent them from
1497 # silently killing the entire queue stream
1498 streamdetails.stream_error = True
1499 if raise_on_error:
1500 raise
1501 self.logger.exception(
1502 "Unexpected error while streaming queue item %s (%s): %s",
1503 queue_item.name,
1504 streamdetails.uri,
1505 err,
1506 )
1507 finally:
1508 # determine how many seconds we've streamed
1509 # for pcm output we can calculate this easily
1510 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1511 streamdetails.seconds_streamed = seconds_streamed
1512 self.logger.debug(
1513 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1514 "aborted" if not finished else "finished",
1515 streamdetails.uri,
1516 asyncio.get_event_loop().time() - stream_started_at,
1517 seconds_streamed,
1518 )
1519 # report stream to provider
1520 if (finished or seconds_streamed >= 90) and (
1521 music_prov := self.mass.get_provider(streamdetails.provider)
1522 ):
1523 if TYPE_CHECKING: # avoid circular import
1524 assert isinstance(music_prov, MusicProvider)
1525 self.mass.create_task(music_prov.on_streamed(streamdetails))
1526
1527 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1528 async def get_queue_item_stream_with_smartfade(
1529 self,
1530 queue_item: QueueItem,
1531 pcm_format: AudioFormat,
1532 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1533 standard_crossfade_duration: int = 10,
1534 ) -> AsyncGenerator[bytes, None]:
1535 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1536 queue = self.mass.player_queues.get(queue_item.queue_id)
1537 if not queue:
1538 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1539
1540 streamdetails = queue_item.streamdetails
1541 assert streamdetails
1542 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1543 self.logger.debug(
1544 "Crossfade data pop for queue %s (track: %s): %s",
1545 queue.display_name,
1546 queue_item.name,
1547 "found" if crossfade_data else "EMPTY - no crossfade data from previous track",
1548 )
1549
1550 if crossfade_data and streamdetails.seek_position > 0:
1551 # don't do crossfade when seeking into track
1552 crossfade_data = None
1553 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1554 # edge case alert: the next item changed just while we were preloading/crossfading
1555 self.logger.warning(
1556 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1557 )
1558 crossfade_data = None
1559
1560 self.logger.debug(
1561 "Start Streaming queue track: %s (%s) for queue %s "
1562 "- crossfade mode: %s "
1563 "- crossfading from previous track: %s ",
1564 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1565 queue_item.name,
1566 queue.display_name,
1567 smart_fades_mode,
1568 "true" if crossfade_data else "false",
1569 )
1570
1571 buffer = b""
1572 bytes_written = 0
1573 # calculate crossfade buffer size
1574 crossfade_buffer_duration = (
1575 SMART_CROSSFADE_DURATION
1576 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1577 else standard_crossfade_duration
1578 )
1579 crossfade_buffer_duration = min(
1580 crossfade_buffer_duration,
1581 int(streamdetails.duration / 2)
1582 if streamdetails.duration
1583 else crossfade_buffer_duration,
1584 )
1585 # Ensure crossfade buffer size is aligned to frame boundaries
1586 # Frame size = bytes_per_sample * channels
1587 bytes_per_sample = pcm_format.bit_depth // 8
1588 frame_size = bytes_per_sample * pcm_format.channels
1589 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1590 # Round down to nearest frame boundary
1591 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1592 fade_out_data: bytes | None = None
1593
1594 if crossfade_data:
1595 # Calculate discard amount in seconds (format-independent)
1596 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1597 fade_in_duration_seconds = (
1598 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1599 )
1600 discard_seconds = int(fade_in_duration_seconds) - 1
1601 # Calculate discard amounts in CURRENT track's format
1602 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1603 # Convert fade_in_size to current track's format for correct leftover calculation
1604 fade_in_size_in_current_format = int(
1605 fade_in_duration_seconds * pcm_format.pcm_sample_size
1606 )
1607 discard_leftover = fade_in_size_in_current_format - discard_bytes
1608 else:
1609 discard_seconds = streamdetails.seek_position
1610 discard_leftover = 0
1611 total_chunks_received = 0
1612 req_buffer_size = crossfade_buffer_size
1613 async for chunk in self.get_queue_item_stream(
1614 queue_item, pcm_format, seek_position=discard_seconds
1615 ):
1616 total_chunks_received += 1
1617 if discard_leftover:
1618 # discard leftover bytes from crossfade data
1619 chunk = chunk[discard_leftover:] # noqa: PLW2901
1620 discard_leftover = 0
1621
1622 if total_chunks_received < 10:
1623 # we want a stream to start as quickly as possible
1624 # so for the first 10 chunks we keep a very short buffer
1625 req_buffer_size = pcm_format.pcm_sample_size
1626 else:
1627 req_buffer_size = crossfade_buffer_size
1628
1629 # ALWAYS APPEND CHUNK TO BUFFER
1630 buffer += chunk
1631 del chunk
1632 if len(buffer) < req_buffer_size:
1633 # buffer is not full enough, move on
1634 continue
1635
1636 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1637 if crossfade_data:
1638 # send the (second half of the) crossfade data
1639 if crossfade_data.pcm_format != pcm_format:
1640 # edge case: pcm format mismatch, we need to resample
1641 self.logger.debug(
1642 "Resampling crossfade data from %s to %s for queue %s",
1643 crossfade_data.pcm_format.sample_rate,
1644 pcm_format.sample_rate,
1645 queue.display_name,
1646 )
1647 resampled_data = await resample_pcm_audio(
1648 crossfade_data.data,
1649 crossfade_data.pcm_format,
1650 pcm_format,
1651 )
1652 if resampled_data:
1653 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1654 yield _chunk
1655 bytes_written += len(resampled_data)
1656 else:
1657 # Resampling failed, error already logged in resample_pcm_audio
1658 # Skip crossfade data entirely - stream continues without it
1659 self.logger.warning(
1660 "Skipping crossfade data for queue %s due to resampling failure",
1661 queue.display_name,
1662 )
1663 else:
1664 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1665 yield _chunk
1666 bytes_written += len(crossfade_data.data)
1667 # clear vars
1668 crossfade_data = None
1669
1670 #### OTHER: enough data in buffer, feed to output
1671 while len(buffer) > req_buffer_size:
1672 yield buffer[: pcm_format.pcm_sample_size]
1673 bytes_written += pcm_format.pcm_sample_size
1674 buffer = buffer[pcm_format.pcm_sample_size :]
1675
1676 #### HANDLE END OF TRACK
1677
1678 if crossfade_data:
1679 # edge case: we did not get enough data to send the crossfade data
1680 # send the (second half of the) crossfade data
1681 if crossfade_data.pcm_format != pcm_format:
1682 # (yet another) edge case: pcm format mismatch, we need to resample
1683 self.logger.debug(
1684 "Resampling remaining crossfade data from %s to %s for queue %s",
1685 crossfade_data.pcm_format.sample_rate,
1686 pcm_format.sample_rate,
1687 queue.display_name,
1688 )
1689 resampled_crossfade_data = await resample_pcm_audio(
1690 crossfade_data.data,
1691 crossfade_data.pcm_format,
1692 pcm_format,
1693 )
1694 if resampled_crossfade_data:
1695 crossfade_data.data = resampled_crossfade_data
1696 else:
1697 # Resampling failed, error already logged in resample_pcm_audio
1698 # Skip the crossfade data entirely
1699 self.logger.warning(
1700 "Skipping remaining crossfade data for queue %s due to resampling failure",
1701 queue.display_name,
1702 )
1703 crossfade_data = None
1704 if crossfade_data:
1705 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1706 yield _chunk
1707 bytes_written += len(crossfade_data.data)
1708 crossfade_data = None
1709
1710 # get next track for crossfade
1711 next_queue_item: QueueItem | None
1712 try:
1713 self.logger.debug(
1714 "Preloading NEXT track for crossfade for queue %s",
1715 queue.display_name,
1716 )
1717 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1718 queue.queue_id, queue_item.queue_item_id
1719 )
1720 # set index_in_buffer to prevent our next track is overwritten while preloading
1721 if next_queue_item.streamdetails is None:
1722 raise InvalidDataError(
1723 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1724 )
1725 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1726 queue.queue_id, next_queue_item.queue_item_id
1727 )
1728 queue_player = self.mass.players.get_player(queue.queue_id)
1729 assert queue_player is not None
1730 next_queue_item_pcm_format = await self._select_pcm_format(
1731 player=queue_player,
1732 streamdetails=next_queue_item.streamdetails,
1733 smartfades_enabled=True,
1734 )
1735 except QueueEmpty:
1736 # end of queue reached, no next item
1737 next_queue_item = None
1738
1739 crossfade_allowed = bool(next_queue_item) and self._crossfade_allowed(
1740 queue_item,
1741 smart_fades_mode=smart_fades_mode,
1742 flow_mode=False,
1743 next_queue_item=next_queue_item,
1744 sample_rate=pcm_format.sample_rate,
1745 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1746 )
1747 if not crossfade_allowed:
1748 if not next_queue_item:
1749 self.logger.debug(
1750 "Enqueue mode: no next queue item loaded (QueueEmpty) for queue %s",
1751 queue.display_name,
1752 )
1753 else:
1754 self.logger.debug(
1755 "Enqueue mode: crossfade NOT allowed for track %s -> %s on queue %s",
1756 queue_item.name,
1757 next_queue_item.name,
1758 queue.display_name,
1759 )
1760 # no crossfade enabled/allowed, just yield the buffer last part
1761 bytes_written += len(buffer)
1762 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1763 yield _chunk
1764 else:
1765 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1766 assert next_queue_item is not None
1767 fade_out_data = buffer
1768 buffer = b""
1769 try:
1770 async for chunk in self.get_queue_item_stream(
1771 next_queue_item, next_queue_item_pcm_format
1772 ):
1773 # append to buffer until we reach crossfade size
1774 # we only need the first X seconds of the NEXT track so we can
1775 # perform the crossfade.
1776 # the crossfaded audio of the previous and next track will be
1777 # sent in two equal parts: first half now, second half
1778 # when the next track starts. We use CrossfadeData to store
1779 # the second half to be picked up by the next track's stream generator.
1780 # Note that we more or less expect the user to have enabled the in-memory
1781 # buffer so we can keep the next track's audio data in memory.
1782 buffer += chunk
1783 del chunk
1784 if len(buffer) >= crossfade_buffer_size:
1785 break
1786 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1787 # Store original buffer size before any resampling for fade_in_size calculation
1788 # This size is in the next track's original format which is what we need
1789 original_buffer_size = len(buffer)
1790 if next_queue_item_pcm_format != pcm_format:
1791 # edge case: pcm format mismatch, we need to resample the next track's
1792 # beginning part before crossfading
1793 self.logger.debug(
1794 "Resampling next track's crossfade from %s to %s for queue %s",
1795 next_queue_item_pcm_format.sample_rate,
1796 pcm_format.sample_rate,
1797 queue.display_name,
1798 )
1799 buffer = await resample_pcm_audio(
1800 buffer,
1801 next_queue_item_pcm_format,
1802 pcm_format,
1803 )
1804 # perform actual (smart fades) crossfade using mixer
1805 crossfade_bytes = await self._smart_fades_mixer.mix(
1806 fade_in_part=buffer,
1807 fade_out_part=fade_out_data,
1808 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1809 fade_out_streamdetails=streamdetails,
1810 pcm_format=pcm_format,
1811 standard_crossfade_duration=standard_crossfade_duration,
1812 mode=smart_fades_mode,
1813 )
1814 # send half of the crossfade_part (= approx the fadeout part)
1815 split_point = (len(crossfade_bytes) + 1) // 2
1816 crossfade_first = crossfade_bytes[:split_point]
1817 crossfade_second = crossfade_bytes[split_point:]
1818 del crossfade_bytes
1819 bytes_written += len(crossfade_first)
1820 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1821 yield _chunk
1822 # store the other half for the next track
1823 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1824 # because it was created from the resampled buffer used for mixing.
1825 # BUT fade_in_size represents bytes in NEXT track's original format
1826 # (next_queue_item_pcm_format) because that's how much of the next track
1827 # was consumed during the crossfade. We need both formats to correctly
1828 # handle the crossfade data when the next track starts.
1829 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1830 data=crossfade_second,
1831 fade_in_size=original_buffer_size,
1832 pcm_format=pcm_format, # Format of the data (current track)
1833 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1834 queue_item_id=next_queue_item.queue_item_id,
1835 )
1836 self.logger.debug(
1837 "Crossfade data STORED for queue %s (outgoing track: %s, incoming track: %s)",
1838 queue.display_name,
1839 queue_item.name,
1840 next_queue_item.name if next_queue_item else "N/A",
1841 )
1842 except AudioError:
1843 # no crossfade possible, just yield the fade_out_data
1844 next_queue_item = None
1845 yield fade_out_data
1846 bytes_written += len(fade_out_data)
1847 del fade_out_data
1848 # make sure the buffer gets cleaned up
1849 del buffer
1850 # update duration details based on the actual pcm data we sent
1851 # this also accounts for crossfade and silence stripping
1852 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1853 streamdetails.seconds_streamed = seconds_streamed
1854 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1855 self.logger.debug(
1856 "Finished Streaming queue track: %s (%s) on queue %s "
1857 "- crossfade data prepared for next track: %s",
1858 streamdetails.uri,
1859 queue_item.name,
1860 queue.display_name,
1861 next_queue_item.name if next_queue_item else "N/A",
1862 )
1863
1864 def _log_request(self, request: web.Request) -> None:
1865 """Log request."""
1866 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1867 self.logger.log(
1868 VERBOSE_LOG_LEVEL,
1869 "Got %s request to %s from %s\nheaders: %s\n",
1870 request.method,
1871 request.path,
1872 request.remote,
1873 request.headers,
1874 )
1875 else:
1876 self.logger.debug(
1877 "Got %s request to %s from %s",
1878 request.method,
1879 request.path,
1880 request.remote,
1881 )
1882
1883 async def get_output_format(
1884 self,
1885 output_format_str: str,
1886 player: Player,
1887 content_sample_rate: int,
1888 content_bit_depth: int,
1889 ) -> AudioFormat:
1890 """Parse (player specific) output format details for given format string."""
1891 content_type: ContentType = ContentType.try_parse(output_format_str)
1892 supported_rates_conf = cast(
1893 "list[tuple[str, str]]",
1894 await self.mass.config.get_player_config_value(
1895 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1896 ),
1897 )
1898 output_channels_str = self.mass.config.get_raw_player_config_value(
1899 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1900 )
1901 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1902 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1903
1904 player_max_bit_depth = max(supported_bit_depths)
1905 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1906 if content_sample_rate in supported_sample_rates:
1907 output_sample_rate = content_sample_rate
1908 else:
1909 output_sample_rate = max(supported_sample_rates)
1910
1911 if not content_type.is_lossless():
1912 # no point in having a higher bit depth for lossy formats
1913 output_bit_depth = 16
1914 output_sample_rate = min(48000, output_sample_rate)
1915 if output_format_str == "pcm":
1916 content_type = ContentType.from_bit_depth(output_bit_depth)
1917 return AudioFormat(
1918 content_type=content_type,
1919 sample_rate=output_sample_rate,
1920 bit_depth=output_bit_depth,
1921 channels=1 if output_channels_str != "stereo" else 2,
1922 )
1923
1924 async def _select_flow_format(
1925 self,
1926 player: Player,
1927 ) -> AudioFormat:
1928 """Parse (player specific) flow stream PCM format."""
1929 supported_rates_conf = cast(
1930 "list[tuple[str, str]]",
1931 await self.mass.config.get_player_config_value(
1932 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1933 ),
1934 )
1935 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1936 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1937 for sample_rate in (192000, 96000, 48000, 44100):
1938 if sample_rate in supported_sample_rates:
1939 output_sample_rate = sample_rate
1940 break
1941 return AudioFormat(
1942 content_type=INTERNAL_PCM_FORMAT.content_type,
1943 sample_rate=output_sample_rate,
1944 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1945 channels=2,
1946 )
1947
1948 async def _select_pcm_format(
1949 self,
1950 player: Player,
1951 streamdetails: StreamDetails,
1952 smartfades_enabled: bool,
1953 ) -> AudioFormat:
1954 """Parse (player specific) stream internal PCM format."""
1955 supported_rates_conf = cast(
1956 "list[tuple[str, str]]",
1957 await self.mass.config.get_player_config_value(
1958 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1959 ),
1960 )
1961 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1962 # use highest supported rate within content rate
1963 output_sample_rate = max(
1964 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1965 default=48000, # sane/safe default
1966 )
1967 # work out pcm format based on streamdetails
1968 pcm_format = AudioFormat(
1969 sample_rate=output_sample_rate,
1970 # always use f32 internally for extra headroom for filters etc
1971 content_type=INTERNAL_PCM_FORMAT.content_type,
1972 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1973 channels=streamdetails.audio_format.channels,
1974 )
1975 if smartfades_enabled:
1976 pcm_format.channels = 2 # force stereo for crossfading
1977
1978 return pcm_format
1979
1980 def _crossfade_allowed(
1981 self,
1982 queue_item: QueueItem,
1983 smart_fades_mode: SmartFadesMode,
1984 flow_mode: bool = False,
1985 next_queue_item: QueueItem | None = None,
1986 sample_rate: int | None = None,
1987 next_sample_rate: int | None = None,
1988 ) -> bool:
1989 """Get the crossfade config for a queue item."""
1990 if smart_fades_mode == SmartFadesMode.DISABLED:
1991 return False
1992 if not (self.mass.players.get_player(queue_item.queue_id)):
1993 return False # just a guard
1994 if queue_item.media_type != MediaType.TRACK:
1995 self.logger.debug("Skipping crossfade: current item is not a track")
1996 return False
1997 # check if the next item is part of the same album
1998 next_item = next_queue_item or self.mass.player_queues.get_next_item(
1999 queue_item.queue_id, queue_item.queue_item_id
2000 )
2001 if not next_item:
2002 # there is no next item!
2003 self.logger.debug(
2004 "Crossfade not allowed: no next item found for %s (flow_mode=%s, queue_id=%s)",
2005 queue_item.name,
2006 flow_mode,
2007 queue_item.queue_id,
2008 )
2009 return False
2010 # check if next item is a track
2011 if next_item.media_type != MediaType.TRACK:
2012 self.logger.debug("Skipping crossfade: next item is not a track")
2013 return False
2014 if (
2015 isinstance(queue_item.media_item, Track)
2016 and isinstance(next_item.media_item, Track)
2017 and queue_item.media_item.album
2018 and next_item.media_item.album
2019 and queue_item.media_item.album == next_item.media_item.album
2020 and not self.mass.config.get_raw_core_config_value(
2021 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
2022 )
2023 ):
2024 # in general, crossfade is not desired for tracks of the same (gapless) album
2025 # because we have no accurate way to determine if the album is gapless or not,
2026 # for now we just never crossfade between tracks of the same album
2027 self.logger.debug("Skipping crossfade: next item is part of the same album")
2028 return False
2029
2030 # check if we're allowed to crossfade on different sample rates
2031 if (
2032 not flow_mode
2033 and sample_rate
2034 and next_sample_rate
2035 and sample_rate != next_sample_rate
2036 and not self.mass.config.get_raw_player_config_value(
2037 queue_item.queue_id,
2038 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
2039 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
2040 )
2041 ):
2042 self.logger.debug(
2043 "Skipping crossfade: player does not support gapless playback "
2044 "with different sample rates (%s vs %s)",
2045 sample_rate,
2046 next_sample_rate,
2047 )
2048 return False
2049
2050 return True
2051
2052 async def _periodic_garbage_collection(self) -> None:
2053 """Periodic garbage collection to free up memory from audio buffers and streams."""
2054 self.logger.log(
2055 VERBOSE_LOG_LEVEL,
2056 "Running periodic garbage collection...",
2057 )
2058 # Run garbage collection in executor to avoid blocking the event loop
2059 # Since this runs periodically (not in response to subprocess cleanup),
2060 # it's safe to run in a thread without causing thread-safety issues
2061 loop = asyncio.get_running_loop()
2062 collected = await loop.run_in_executor(None, gc.collect)
2063 self.logger.log(
2064 VERBOSE_LOG_LEVEL,
2065 "Garbage collection completed, collected %d objects",
2066 collected,
2067 )
2068 # Schedule next run in 15 minutes
2069 self.mass.call_later(900, self._periodic_garbage_collection)
2070
2071 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
2072 """Set up smart fades logger level."""
2073 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
2074 if log_level == "GLOBAL":
2075 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2076 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2077 else:
2078 self.smart_fades_analyzer.logger.setLevel(log_level)
2079 self.smart_fades_mixer.logger.setLevel(log_level)
2080
2081 async def cleanup_stale_queue_buffers(self, queue_id: str, current_index: int) -> None:
2082 """
2083 Clean up audio buffers for queue items that are no longer needed.
2084
2085 This clears buffers for items at index <= current_index - 2, keeping only:
2086 - The previous track (current_index - 1)
2087 - The current track (current_index)
2088 - The next track (current_index + 1, handled by preloading)
2089
2090 :param queue_id: The queue ID to clean up buffers for.
2091 :param current_index: The current playing index in the queue.
2092 """
2093 if current_index < 2:
2094 return # Nothing to clean up yet
2095
2096 queue_items = self.mass.player_queues._queue_items.get(queue_id, [])
2097 cleanup_threshold = current_index - 2
2098 buffers_cleared = 0
2099
2100 for idx, item in enumerate(queue_items):
2101 if idx > cleanup_threshold:
2102 break # No need to check further
2103 if item.streamdetails and item.streamdetails.buffer:
2104 self.logger.log(
2105 VERBOSE_LOG_LEVEL,
2106 "Clearing stale audio buffer for queue item %s (index %d) in queue %s",
2107 item.name,
2108 idx,
2109 queue_id,
2110 )
2111 await item.streamdetails.buffer.clear()
2112 item.streamdetails.buffer = None
2113 buffers_cleared += 1
2114
2115 if buffers_cleared > 0:
2116 self.logger.debug(
2117 "Cleared %d stale audio buffer(s) for queue %s (items before index %d)",
2118 buffers_cleared,
2119 queue_id,
2120 cleanup_threshold + 1,
2121 )
2122
2123 async def cleanup_queue_audio_data(self, queue_id: str) -> None:
2124 """
2125 Clean up all audio-related data for a queue when it is stopped or cleared.
2126
2127 This clears:
2128 - All audio buffers attached to queue item streamdetails
2129 - Any pending crossfade data for the queue
2130
2131 :param queue_id: The queue ID to clean up.
2132 """
2133 # Clear crossfade data for this queue
2134 if queue_id in self._crossfade_data:
2135 self.logger.debug("Clearing crossfade data for queue %s", queue_id)
2136 del self._crossfade_data[queue_id]
2137
2138 # Clear all audio buffers for queue items
2139 queue_items = self.mass.player_queues._queue_items.get(queue_id, [])
2140 buffers_cleared = 0
2141
2142 for item in queue_items:
2143 if item.streamdetails and item.streamdetails.buffer:
2144 await item.streamdetails.buffer.clear()
2145 item.streamdetails.buffer = None
2146 buffers_cleared += 1
2147
2148 if buffers_cleared > 0:
2149 self.logger.debug(
2150 "Cleared %d audio buffer(s) for stopped/cleared queue %s",
2151 buffers_cleared,
2152 queue_id,
2153 )
2154