/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_ENTRY_ZEROCONF_INTERFACES,
49 CONF_HTTP_PROFILE,
50 CONF_OUTPUT_CHANNELS,
51 CONF_OUTPUT_CODEC,
52 CONF_PUBLISH_IP,
53 CONF_SAMPLE_RATES,
54 CONF_SMART_FADES_MODE,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
56 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
57 CONF_VOLUME_NORMALIZATION_RADIO,
58 CONF_VOLUME_NORMALIZATION_TRACKS,
59 DEFAULT_STREAM_HEADERS,
60 ICY_HEADERS,
61 INTERNAL_PCM_FORMAT,
62 SILENCE_FILE,
63 VERBOSE_LOG_LEVEL,
64)
65from music_assistant.controllers.players.player_controller import AnnounceData
66from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
67from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
68from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
69from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
70from music_assistant.helpers.audio import (
71 get_buffered_media_stream,
72 get_chunksize,
73 get_media_stream,
74 get_player_filter_params,
75 get_stream_details,
76 resample_pcm_audio,
77)
78from music_assistant.helpers.buffered_generator import buffered, use_buffer
79from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
80from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
81from music_assistant.helpers.util import (
82 divide_chunks,
83 get_ip_addresses,
84 get_total_system_memory,
85 select_free_port,
86)
87from music_assistant.helpers.webserver import Webserver
88from music_assistant.models.core_controller import CoreController
89from music_assistant.models.music_provider import MusicProvider
90from music_assistant.models.plugin import PluginProvider, PluginSource
91from music_assistant.models.smart_fades import SmartFadesMode
92from music_assistant.providers.universal_group.constants import UGP_PREFIX
93from music_assistant.providers.universal_group.player import UniversalGroupPlayer
94
95if TYPE_CHECKING:
96 from music_assistant_models.config_entries import CoreConfig
97 from music_assistant_models.player import PlayerMedia
98 from music_assistant_models.player_queue import PlayerQueue
99 from music_assistant_models.queue_item import QueueItem
100 from music_assistant_models.streamdetails import StreamDetails
101
102 from music_assistant.mass import MusicAssistant
103 from music_assistant.models.player import Player
104
105
106isfile = wrap(os.path.isfile)
107
108CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
109CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
110CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
111
112# Calculate total system memory once at module load time
113TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
114CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses()
189 default_port = await select_free_port(8097, 9200)
190 return (
191 ConfigEntry(
192 key=CONF_ALLOW_BUFFER,
193 type=ConfigEntryType.BOOLEAN,
194 default_value=CONF_ALLOW_BUFFER_DEFAULT,
195 label="Allow (in-memory) buffering of (track) audio",
196 description="By default, Music Assistant tries to be as resource "
197 "efficient as possible when streaming audio, especially considering "
198 "low-end devices such as Raspberry Pi's. This means that audio "
199 "buffering is disabled by default to reduce memory usage. \n\n"
200 "Enabling this option allows for in-memory buffering of audio, "
201 "which (massively) improves playback (and seeking) performance but it comes "
202 "at the cost of increased memory usage. "
203 "If you run Music Assistant on a capable device with enough memory, "
204 "enabling this option is strongly recommended.",
205 required=False,
206 category="playback",
207 ),
208 ConfigEntry(
209 key=CONF_VOLUME_NORMALIZATION_RADIO,
210 type=ConfigEntryType.STRING,
211 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
212 label="Volume normalization method for radio streams",
213 options=[
214 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
215 for x in VolumeNormalizationMode
216 ],
217 category="playback",
218 ),
219 ConfigEntry(
220 key=CONF_VOLUME_NORMALIZATION_TRACKS,
221 type=ConfigEntryType.STRING,
222 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
223 label="Volume normalization method for tracks",
224 options=[
225 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
226 for x in VolumeNormalizationMode
227 ],
228 category="playback",
229 ),
230 ConfigEntry(
231 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
232 type=ConfigEntryType.FLOAT,
233 range=(-20, 10),
234 default_value=-6,
235 label="Fixed/fallback gain adjustment for radio streams",
236 category="playback",
237 ),
238 ConfigEntry(
239 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
240 type=ConfigEntryType.FLOAT,
241 range=(-20, 10),
242 default_value=-6,
243 label="Fixed/fallback gain adjustment for tracks",
244 category="playback",
245 ),
246 ConfigEntry(
247 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
248 type=ConfigEntryType.BOOLEAN,
249 default_value=False,
250 label="Allow crossfade between tracks from the same album",
251 description="Enabling this option allows for crossfading between tracks "
252 "that are part of the same album.",
253 category="playback",
254 ),
255 ConfigEntry(
256 key=CONF_PUBLISH_IP,
257 type=ConfigEntryType.STRING,
258 default_value=ip_addresses[0],
259 label="Published IP address",
260 description="This IP address is communicated to players where to find this server."
261 "\nMake sure that this IP can be reached by players on the local network, "
262 "otherwise audio streaming will not work.",
263 required=False,
264 category="generic",
265 advanced=True,
266 requires_reload=True,
267 ),
268 ConfigEntry(
269 key=CONF_BIND_PORT,
270 type=ConfigEntryType.INTEGER,
271 default_value=default_port,
272 label="TCP Port",
273 description="The TCP port to run the server. "
274 "Make sure that this server can be reached "
275 "on the given IP and TCP port by players on the local network.",
276 category="generic",
277 advanced=True,
278 requires_reload=True,
279 ),
280 ConfigEntry(
281 key=CONF_BIND_IP,
282 type=ConfigEntryType.STRING,
283 default_value="0.0.0.0",
284 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", *ip_addresses}],
285 label="Bind to IP/interface",
286 description="Start the stream server on this specific interface. \n"
287 "Use 0.0.0.0 to bind to all interfaces, which is the default. \n"
288 "This is an advanced setting that should normally "
289 "not be adjusted in regular setups.",
290 category="generic",
291 advanced=True,
292 required=False,
293 requires_reload=True,
294 ),
295 ConfigEntry(
296 key=CONF_SMART_FADES_LOG_LEVEL,
297 type=ConfigEntryType.STRING,
298 label="Smart Fades Log level",
299 description="Log level for the Smart Fades mixer and analyzer.",
300 options=CONF_ENTRY_LOG_LEVEL.options,
301 default_value="GLOBAL",
302 category="generic",
303 advanced=True,
304 ),
305 CONF_ENTRY_ZEROCONF_INTERFACES,
306 )
307
308 async def setup(self, config: CoreConfig) -> None:
309 """Async initialize of module."""
310 # copy log level to audio/ffmpeg loggers
311 AUDIO_LOGGER.setLevel(self.logger.level)
312 FFMPEG_LOGGER.setLevel(self.logger.level)
313 self._setup_smart_fades_logger(config)
314 # perform check for ffmpeg version
315 await check_ffmpeg_version()
316 # start the webserver
317 self.publish_port = config.get_value(CONF_BIND_PORT)
318 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
319 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
320 # print a big fat message in the log where the streamserver is running
321 # because this is a common source of issues for people with more complex setups
322 self.logger.log(
323 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
324 "\n\n################################################################################\n"
325 "Starting streamserver on %s:%s\n"
326 "This is the IP address that is communicated to players.\n"
327 "If this is incorrect, audio will not play!\n"
328 "See the documentation how to configure the publish IP for the Streamserver\n"
329 "in Settings --> Core modules --> Streamserver\n"
330 "################################################################################\n",
331 self.publish_ip,
332 self.publish_port,
333 )
334 await self._server.setup(
335 bind_ip=bind_ip,
336 bind_port=cast("int", self.publish_port),
337 base_url=f"http://{self.publish_ip}:{self.publish_port}",
338 static_routes=[
339 (
340 "*",
341 "/flow/{session_id}/{queue_id}/{queue_item_id}.{fmt}",
342 self.serve_queue_flow_stream,
343 ),
344 (
345 "*",
346 "/single/{session_id}/{queue_id}/{queue_item_id}.{fmt}",
347 self.serve_queue_item_stream,
348 ),
349 (
350 "*",
351 "/command/{queue_id}/{command}.mp3",
352 self.serve_command_request,
353 ),
354 (
355 "*",
356 "/announcement/{player_id}.{fmt}",
357 self.serve_announcement_stream,
358 ),
359 (
360 "*",
361 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
362 self.serve_plugin_source_stream,
363 ),
364 ],
365 )
366 # Start periodic garbage collection task
367 # This ensures memory from audio buffers and streams is cleaned up regularly
368 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
369
370 async def close(self) -> None:
371 """Cleanup on exit."""
372 await self._server.close()
373
374 async def resolve_stream_url(
375 self,
376 session_id: str,
377 queue_item: QueueItem,
378 flow_mode: bool = False,
379 player_id: str | None = None,
380 ) -> str:
381 """Resolve the stream URL for the given QueueItem."""
382 if not player_id:
383 player_id = queue_item.queue_id
384 conf_output_codec = await self.mass.config.get_player_config_value(
385 player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
386 )
387 output_codec = ContentType.try_parse(conf_output_codec or "flac")
388 fmt = output_codec.value
389 # handle raw pcm without exact format specifiers
390 if output_codec.is_pcm() and ";" not in fmt:
391 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
392 base_path = "flow" if flow_mode else "single"
393 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
394
395 async def get_plugin_source_url(
396 self,
397 plugin_source: PluginSource,
398 player_id: str,
399 ) -> str:
400 """Get the url for the Plugin Source stream/proxy."""
401 if plugin_source.audio_format.content_type.is_pcm():
402 fmt = ContentType.WAV.value
403 else:
404 fmt = plugin_source.audio_format.content_type.value
405 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
406
407 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
408 """Stream single queueitem audio to a player."""
409 self._log_request(request)
410 queue_id = request.match_info["queue_id"]
411 queue = self.mass.player_queues.get(queue_id)
412 if not queue:
413 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
414 session_id = request.match_info["session_id"]
415 if queue.session_id and session_id != queue.session_id:
416 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
417 queue_player = self.mass.players.get(queue_id)
418 queue_item_id = request.match_info["queue_item_id"]
419 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
420 if not queue_item:
421 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
422 if not queue_item.streamdetails:
423 try:
424 queue_item.streamdetails = await get_stream_details(
425 mass=self.mass, queue_item=queue_item
426 )
427 except Exception as e:
428 self.logger.error(
429 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
430 )
431 queue_item.available = False
432 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
433
434 # pick output format based on the streamdetails and player capabilities
435 if not queue_player:
436 raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
437
438 # work out pcm format based on streamdetails
439 pcm_format = await self._select_pcm_format(
440 player=queue_player,
441 streamdetails=queue_item.streamdetails,
442 smartfades_enabled=True,
443 )
444 output_format = await self.get_output_format(
445 output_format_str=request.match_info["fmt"],
446 player=queue_player,
447 content_sample_rate=pcm_format.sample_rate,
448 content_bit_depth=pcm_format.bit_depth,
449 )
450
451 # prepare request, add some DLNA/UPNP compatible headers
452 headers = {
453 **DEFAULT_STREAM_HEADERS,
454 "icy-name": queue_item.name,
455 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
456 "Accept-Ranges": "none",
457 "Content-Type": f"audio/{output_format.output_format_str}",
458 }
459 resp = web.StreamResponse(
460 status=200,
461 reason="OK",
462 headers=headers,
463 )
464 resp.content_type = f"audio/{output_format.output_format_str}"
465 http_profile = await self.mass.config.get_player_config_value(
466 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
467 )
468 if http_profile == "forced_content_length" and not queue_item.duration:
469 # just set an insane high content length to make sure the player keeps playing
470 resp.content_length = get_chunksize(output_format, 12 * 3600)
471 elif http_profile == "forced_content_length" and queue_item.duration:
472 # guess content length based on duration
473 resp.content_length = get_chunksize(output_format, queue_item.duration)
474 elif http_profile == "chunked":
475 resp.enable_chunked_encoding()
476
477 await resp.prepare(request)
478
479 # return early if this is not a GET request
480 if request.method != "GET":
481 return resp
482
483 if queue_item.media_type != MediaType.TRACK:
484 # no crossfade on non-tracks
485 smart_fades_mode = SmartFadesMode.DISABLED
486 else:
487 smart_fades_mode = await self.mass.config.get_player_config_value(
488 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
489 )
490 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
491 queue.queue_id, CONF_CROSSFADE_DURATION, 10
492 )
493 if (
494 smart_fades_mode != SmartFadesMode.DISABLED
495 and PlayerFeature.GAPLESS_PLAYBACK not in queue_player.supported_features
496 ):
497 # crossfade is not supported on this player due to missing gapless playback
498 self.logger.warning(
499 "Crossfade disabled: Player %s does not support gapless playback, "
500 "consider enabling flow mode to enable crossfade on this player.",
501 queue_player.display_name if queue_player else "Unknown Player",
502 )
503 smart_fades_mode = SmartFadesMode.DISABLED
504
505 if smart_fades_mode != SmartFadesMode.DISABLED:
506 # crossfade is enabled, use special crossfaded single item stream
507 # where the crossfade of the next track is present in the stream of
508 # a single track. This only works if the player supports gapless playback!
509 audio_input = self.get_queue_item_stream_with_smartfade(
510 queue_item=queue_item,
511 pcm_format=pcm_format,
512 smart_fades_mode=smart_fades_mode,
513 standard_crossfade_duration=standard_crossfade_duration,
514 )
515 else:
516 # no crossfade, just a regular single item stream
517 audio_input = self.get_queue_item_stream(
518 queue_item=queue_item,
519 pcm_format=pcm_format,
520 seek_position=queue_item.streamdetails.seek_position,
521 )
522 # stream the audio
523 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
524 # the desired output format for the player including any player specific filter params
525 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
526 if queue_item.media_type == MediaType.RADIO:
527 # keep very short buffer for radio streams
528 # to keep them (more or less) realtime and prevent time outs
529 read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "2"]
530 else:
531 # just allow the player to buffer whatever it wants for single item streams
532 read_rate_input_args = None
533
534 first_chunk_received = False
535 bytes_sent = 0
536 async for chunk in get_ffmpeg_stream(
537 audio_input=audio_input,
538 input_format=pcm_format,
539 output_format=output_format,
540 filter_params=get_player_filter_params(
541 self.mass,
542 player_id=queue_player.player_id,
543 input_format=pcm_format,
544 output_format=output_format,
545 ),
546 extra_input_args=read_rate_input_args,
547 ):
548 try:
549 await resp.write(chunk)
550 bytes_sent += len(chunk)
551 if not first_chunk_received:
552 first_chunk_received = True
553 # inform the queue that the track is now loaded in the buffer
554 # so for example the next track can be enqueued
555 self.mass.player_queues.track_loaded_in_buffer(
556 queue_item.queue_id, queue_item.queue_item_id
557 )
558 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
559 if first_chunk_received and not queue_player.stop_called:
560 # Player disconnected (unexpected) after receiving at least some data
561 # This could indicate buffering issues, network problems,
562 # or player-specific issues
563 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
564 self.logger.warning(
565 "Player %s disconnected prematurely from stream for %s (%s) - "
566 "error: %s, sent %d bytes, expected (approx) bytes=%d",
567 queue.display_name,
568 queue_item.name,
569 queue_item.uri,
570 err.__class__.__name__,
571 bytes_sent,
572 bytes_expected,
573 )
574 break
575 if queue_item.streamdetails.stream_error:
576 self.logger.error(
577 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
578 queue_item.name,
579 queue_item.uri,
580 queue.display_name,
581 )
582 # try to skip to the next item in the queue after a short delay
583 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
584 return resp
585
586 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
587 """Stream Queue Flow audio to player."""
588 self._log_request(request)
589 queue_id = request.match_info["queue_id"]
590 queue = self.mass.player_queues.get(queue_id)
591 if not queue:
592 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
593 if not (queue_player := self.mass.players.get(queue_id)):
594 raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
595 start_queue_item_id = request.match_info["queue_item_id"]
596 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
597 if not start_queue_item:
598 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
599
600 queue.flow_mode_stream_log = []
601
602 # select the highest possible PCM settings for this player
603 flow_pcm_format = await self._select_flow_format(queue_player)
604
605 # work out output format/details
606 output_format = await self.get_output_format(
607 output_format_str=request.match_info["fmt"],
608 player=queue_player,
609 content_sample_rate=flow_pcm_format.sample_rate,
610 content_bit_depth=flow_pcm_format.bit_depth,
611 )
612 # work out ICY metadata support
613 icy_preference = self.mass.config.get_raw_player_config_value(
614 queue_id,
615 CONF_ENTRY_ENABLE_ICY_METADATA.key,
616 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
617 )
618 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
619 icy_meta_interval = 256000 if icy_preference == "full" else 16384
620
621 # prepare request, add some DLNA/UPNP compatible headers
622 headers = {
623 **DEFAULT_STREAM_HEADERS,
624 **ICY_HEADERS,
625 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
626 "Accept-Ranges": "none",
627 "Content-Type": f"audio/{output_format.output_format_str}",
628 }
629 if enable_icy:
630 headers["icy-metaint"] = str(icy_meta_interval)
631
632 resp = web.StreamResponse(
633 status=200,
634 reason="OK",
635 headers=headers,
636 )
637 http_profile = await self.mass.config.get_player_config_value(
638 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
639 )
640 if http_profile == "forced_content_length":
641 # just set an insane high content length to make sure the player keeps playing
642 resp.content_length = get_chunksize(output_format, 12 * 3600)
643 elif http_profile == "chunked":
644 resp.enable_chunked_encoding()
645
646 await resp.prepare(request)
647
648 # return early if this is not a GET request
649 if request.method != "GET":
650 return resp
651
652 # all checks passed, start streaming!
653 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
654 # the desired output format for the player including any player specific filter params
655 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
656 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
657
658 async for chunk in get_ffmpeg_stream(
659 audio_input=self.get_queue_flow_stream(
660 queue=queue,
661 start_queue_item=start_queue_item,
662 pcm_format=flow_pcm_format,
663 ),
664 input_format=flow_pcm_format,
665 output_format=output_format,
666 filter_params=get_player_filter_params(
667 self.mass, queue_player.player_id, flow_pcm_format, output_format
668 ),
669 # we need to slowly feed the music to avoid the player stopping and later
670 # restarting (or completely failing) the audio stream by keeping the buffer short.
671 # this is reported to be an issue especially with Chromecast players.
672 # see for example: https://github.com/music-assistant/support/issues/3717
673 # allow buffer ahead of 6 seconds and read rest in realtime
674 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
675 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
676 ):
677 try:
678 await resp.write(chunk)
679 except (BrokenPipeError, ConnectionResetError, ConnectionError):
680 # race condition
681 break
682
683 if not enable_icy:
684 continue
685
686 # if icy metadata is enabled, send the icy metadata after the chunk
687 if (
688 # use current item here and not buffered item, otherwise
689 # the icy metadata will be too much ahead
690 (current_item := queue.current_item)
691 and current_item.streamdetails
692 and current_item.streamdetails.stream_title
693 ):
694 title = current_item.streamdetails.stream_title
695 elif queue and current_item and current_item.name:
696 title = current_item.name
697 else:
698 title = "Music Assistant"
699 metadata = f"StreamTitle='{title}';".encode()
700 if icy_preference == "full" and current_item and current_item.image:
701 metadata += f"StreamURL='{current_item.image.path}'".encode()
702 while len(metadata) % 16 != 0:
703 metadata += b"\x00"
704 length = len(metadata)
705 length_b = chr(int(length / 16)).encode()
706 await resp.write(length_b + metadata)
707
708 return resp
709
710 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
711 """Handle special 'command' request for a player."""
712 self._log_request(request)
713 queue_id = request.match_info["queue_id"]
714 command = request.match_info["command"]
715 if command == "next":
716 self.mass.create_task(self.mass.player_queues.next(queue_id))
717 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
718
719 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
720 """Stream announcement audio to a player."""
721 self._log_request(request)
722 player_id = request.match_info["player_id"]
723 player = self.mass.player_queues.get(player_id)
724 if not player:
725 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
726 if not (announce_data := self.announcements.get(player_id)):
727 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
728
729 # work out output format/details
730 fmt = request.match_info["fmt"]
731 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
732
733 http_profile = await self.mass.config.get_player_config_value(
734 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
735 )
736 if http_profile == "forced_content_length":
737 # given the fact that an announcement is just a short audio clip,
738 # just send it over completely at once so we have a fixed content length
739 data = b""
740 async for chunk in self.get_announcement_stream(
741 announcement_url=announce_data["announcement_url"],
742 output_format=audio_format,
743 pre_announce=announce_data["pre_announce"],
744 pre_announce_url=announce_data["pre_announce_url"],
745 ):
746 data += chunk
747 return web.Response(
748 body=data,
749 content_type=f"audio/{audio_format.output_format_str}",
750 headers=DEFAULT_STREAM_HEADERS,
751 )
752
753 resp = web.StreamResponse(
754 status=200,
755 reason="OK",
756 headers=DEFAULT_STREAM_HEADERS,
757 )
758 resp.content_type = f"audio/{audio_format.output_format_str}"
759 if http_profile == "chunked":
760 resp.enable_chunked_encoding()
761
762 await resp.prepare(request)
763
764 # return early if this is not a GET request
765 if request.method != "GET":
766 return resp
767
768 # all checks passed, start streaming!
769 self.logger.debug(
770 "Start serving audio stream for Announcement %s to %s",
771 announce_data["announcement_url"],
772 player.display_name,
773 )
774 async for chunk in self.get_announcement_stream(
775 announcement_url=announce_data["announcement_url"],
776 output_format=audio_format,
777 pre_announce=announce_data["pre_announce"],
778 pre_announce_url=announce_data["pre_announce_url"],
779 ):
780 try:
781 await resp.write(chunk)
782 except (BrokenPipeError, ConnectionResetError):
783 break
784
785 self.logger.debug(
786 "Finished serving audio stream for Announcement %s to %s",
787 announce_data["announcement_url"],
788 player.display_name,
789 )
790
791 return resp
792
793 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
794 """Stream PluginSource audio to a player."""
795 self._log_request(request)
796 plugin_source_id = request.match_info["plugin_source"]
797 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
798 if not provider:
799 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
800 # work out output format/details
801 player_id = request.match_info["player_id"]
802 player = self.mass.players.get(player_id)
803 if not player:
804 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
805 plugin_source = provider.get_source()
806 output_format = await self.get_output_format(
807 output_format_str=request.match_info["fmt"],
808 player=player,
809 content_sample_rate=plugin_source.audio_format.sample_rate,
810 content_bit_depth=plugin_source.audio_format.bit_depth,
811 )
812 headers = {
813 **DEFAULT_STREAM_HEADERS,
814 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
815 "icy-name": plugin_source.name,
816 "Accept-Ranges": "none",
817 "Content-Type": f"audio/{output_format.output_format_str}",
818 }
819
820 resp = web.StreamResponse(
821 status=200,
822 reason="OK",
823 headers=headers,
824 )
825 resp.content_type = f"audio/{output_format.output_format_str}"
826 http_profile = await self.mass.config.get_player_config_value(
827 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
828 )
829 if http_profile == "forced_content_length":
830 # just set an insanely high content length to make sure the player keeps playing
831 resp.content_length = get_chunksize(output_format, 12 * 3600)
832 elif http_profile == "chunked":
833 resp.enable_chunked_encoding()
834
835 await resp.prepare(request)
836
837 # return early if this is not a GET request
838 if request.method != "GET":
839 return resp
840
841 # all checks passed, start streaming!
842 if not plugin_source.audio_format:
843 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
844 async for chunk in self.get_plugin_source_stream(
845 plugin_source_id=plugin_source_id,
846 output_format=output_format,
847 player_id=player_id,
848 player_filter_params=get_player_filter_params(
849 self.mass, player_id, plugin_source.audio_format, output_format
850 ),
851 ):
852 try:
853 await resp.write(chunk)
854 except (BrokenPipeError, ConnectionResetError, ConnectionError):
855 break
856 return resp
857
858 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
859 """Get the url for the special command stream."""
860 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
861
862 def get_announcement_url(
863 self,
864 player_id: str,
865 announce_data: AnnounceData,
866 content_type: ContentType = ContentType.MP3,
867 ) -> str:
868 """Get the url for the special announcement stream."""
869 self.announcements[player_id] = announce_data
870 # use stream server to host announcement on local network
871 # this ensures playback on all players, including ones that do not
872 # like https hosts and it also offers the pre-announce 'bell'
873 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
874
875 def get_stream(
876 self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
877 ) -> AsyncGenerator[bytes, None]:
878 """
879 Get a stream of the given media as raw PCM audio.
880
881 This is used as helper for player providers that can consume the raw PCM
882 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
883 """
884 # select audio source
885 if media.media_type == MediaType.ANNOUNCEMENT:
886 # special case: stream announcement
887 assert media.custom_data
888 audio_source = self.get_announcement_stream(
889 media.custom_data["announcement_url"],
890 output_format=pcm_format,
891 pre_announce=media.custom_data["pre_announce"],
892 pre_announce_url=media.custom_data["pre_announce_url"],
893 )
894 elif media.media_type == MediaType.PLUGIN_SOURCE:
895 # special case: plugin source stream
896 assert media.custom_data
897 audio_source = self.get_plugin_source_stream(
898 plugin_source_id=media.custom_data["source_id"],
899 output_format=pcm_format,
900 # need to pass player_id from the PlayerMedia object
901 # because this could have been a group
902 player_id=media.custom_data["player_id"],
903 )
904 elif (
905 media.media_type == MediaType.FLOW_STREAM
906 and media.source_id
907 and media.source_id.startswith(UGP_PREFIX)
908 and media.uri
909 and "/ugp/" in media.uri
910 ):
911 # special case: member player accessing UGP stream
912 # Check URI to distinguish from the UGP accessing its own stream
913 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
914 ugp_stream = ugp_player.stream
915 assert ugp_stream is not None # for type checker
916 if ugp_stream.base_pcm_format == pcm_format:
917 # no conversion needed
918 audio_source = ugp_stream.subscribe_raw()
919 else:
920 audio_source = ugp_stream.get_stream(output_format=pcm_format)
921 elif (
922 media.source_id
923 and media.queue_item_id
924 and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
925 ):
926 # regular queue (flow) stream request
927 queue = self.mass.player_queues.get(media.source_id)
928 assert queue
929 start_queue_item = self.mass.player_queues.get_item(
930 media.source_id, media.queue_item_id
931 )
932 assert start_queue_item
933 audio_source = self.mass.streams.get_queue_flow_stream(
934 queue=queue,
935 start_queue_item=start_queue_item,
936 pcm_format=pcm_format,
937 )
938 elif media.source_id and media.queue_item_id:
939 # single item stream (e.g. radio)
940 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
941 assert queue_item
942 audio_source = buffered(
943 self.get_queue_item_stream(
944 queue_item=queue_item,
945 pcm_format=pcm_format,
946 ),
947 buffer_size=10,
948 min_buffer_before_yield=2,
949 )
950 else:
951 # assume url or some other direct path
952 # NOTE: this will fail if its an uri not playable by ffmpeg
953 audio_source = get_ffmpeg_stream(
954 audio_input=media.uri,
955 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
956 output_format=pcm_format,
957 )
958 return audio_source
959
960 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
961 async def get_queue_flow_stream(
962 self,
963 queue: PlayerQueue,
964 start_queue_item: QueueItem,
965 pcm_format: AudioFormat,
966 ) -> AsyncGenerator[bytes, None]:
967 """
968 Get a flow stream of all tracks in the queue as raw PCM audio.
969
970 yields chunks of exactly 1 second of audio in the given pcm_format.
971 """
972 # ruff: noqa: PLR0915
973 assert pcm_format.content_type.is_pcm()
974 queue_track = None
975 last_fadeout_part: bytes = b""
976 last_streamdetails: StreamDetails | None = None
977 last_play_log_entry: PlayLogEntry | None = None
978 queue.flow_mode = True
979 if not start_queue_item:
980 # this can happen in some (edge case) race conditions
981 return
982 pcm_sample_size = pcm_format.pcm_sample_size
983 if start_queue_item.media_type != MediaType.TRACK:
984 # no crossfade on non-tracks
985 smart_fades_mode = SmartFadesMode.DISABLED
986 standard_crossfade_duration = 0
987 else:
988 smart_fades_mode = await self.mass.config.get_player_config_value(
989 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
990 )
991 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
992 queue.queue_id, CONF_CROSSFADE_DURATION, 10
993 )
994 self.logger.info(
995 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
996 queue.display_name,
997 smart_fades_mode,
998 f"({standard_crossfade_duration}s)"
999 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
1000 else "",
1001 )
1002 total_bytes_sent = 0
1003 total_chunks_received = 0
1004
1005 while True:
1006 # get (next) queue item to stream
1007 if queue_track is None:
1008 queue_track = start_queue_item
1009 else:
1010 try:
1011 queue_track = await self.mass.player_queues.load_next_queue_item(
1012 queue.queue_id, queue_track.queue_item_id
1013 )
1014 except QueueEmpty:
1015 break
1016
1017 if queue_track.streamdetails is None:
1018 raise InvalidDataError(
1019 "No Streamdetails known for queue item %s",
1020 queue_track.queue_item_id,
1021 )
1022
1023 self.logger.debug(
1024 "Start Streaming queue track: %s (%s) for queue %s",
1025 queue_track.streamdetails.uri,
1026 queue_track.name,
1027 queue.display_name,
1028 )
1029 # append to play log so the queue controller can work out which track is playing
1030 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1031 queue.flow_mode_stream_log.append(play_log_entry)
1032 # calculate crossfade buffer size
1033 crossfade_buffer_duration = (
1034 SMART_CROSSFADE_DURATION
1035 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1036 else standard_crossfade_duration
1037 )
1038 crossfade_buffer_duration = min(
1039 crossfade_buffer_duration,
1040 int(queue_track.streamdetails.duration / 2)
1041 if queue_track.streamdetails.duration
1042 else crossfade_buffer_duration,
1043 )
1044 # Ensure crossfade buffer size is aligned to frame boundaries
1045 # Frame size = bytes_per_sample * channels
1046 bytes_per_sample = pcm_format.bit_depth // 8
1047 frame_size = bytes_per_sample * pcm_format.channels
1048 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1049 # Round down to nearest frame boundary
1050 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1051
1052 bytes_written = 0
1053 buffer = b""
1054 # handle incoming audio chunks
1055 first_chunk_received = False
1056 # buffer size needs to be big enough to include the crossfade part
1057
1058 async for chunk in self.get_queue_item_stream(
1059 queue_track,
1060 pcm_format=pcm_format,
1061 seek_position=queue_track.streamdetails.seek_position,
1062 raise_on_error=False,
1063 ):
1064 total_chunks_received += 1
1065 if not first_chunk_received:
1066 first_chunk_received = True
1067 # inform the queue that the track is now loaded in the buffer
1068 # so the next track can be preloaded
1069 self.mass.player_queues.track_loaded_in_buffer(
1070 queue.queue_id, queue_track.queue_item_id
1071 )
1072 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1073 # we want a stream to start as quickly as possible
1074 # so for the first 10 chunks we keep a very short buffer
1075 req_buffer_size = pcm_format.pcm_sample_size
1076 else:
1077 req_buffer_size = (
1078 pcm_sample_size
1079 if smart_fades_mode == SmartFadesMode.DISABLED
1080 else crossfade_buffer_size
1081 )
1082
1083 # ALWAYS APPEND CHUNK TO BUFFER
1084 buffer += chunk
1085 del chunk
1086 if len(buffer) < req_buffer_size:
1087 # buffer is not full enough, move on
1088 # yield control to event loop with 10ms delay
1089 await asyncio.sleep(0.01)
1090 continue
1091
1092 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1093 if last_fadeout_part and last_streamdetails:
1094 # perform crossfade
1095 fadein_part = buffer[:crossfade_buffer_size]
1096 remaining_bytes = buffer[crossfade_buffer_size:]
1097 # Use the mixer to handle all crossfade logic
1098 crossfade_part = await self._smart_fades_mixer.mix(
1099 fade_in_part=fadein_part,
1100 fade_out_part=last_fadeout_part,
1101 fade_in_streamdetails=queue_track.streamdetails,
1102 fade_out_streamdetails=last_streamdetails,
1103 pcm_format=pcm_format,
1104 standard_crossfade_duration=standard_crossfade_duration,
1105 mode=smart_fades_mode,
1106 )
1107 # because the crossfade exists of both the fadein and fadeout part
1108 # we need to correct the bytes_written accordingly so the duration
1109 # calculations at the end of the track are correct
1110 crossfade_part_len = len(crossfade_part)
1111 bytes_written += int(crossfade_part_len / 2)
1112 if last_play_log_entry:
1113 assert last_play_log_entry.seconds_streamed is not None
1114 last_play_log_entry.seconds_streamed += (
1115 crossfade_part_len / 2 / pcm_sample_size
1116 )
1117 # yield crossfade_part (in pcm_sample_size chunks)
1118 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1119 yield _chunk
1120 del _chunk
1121 del crossfade_part
1122 # also write the leftover bytes from the crossfade action
1123 if remaining_bytes:
1124 yield remaining_bytes
1125 bytes_written += len(remaining_bytes)
1126 del remaining_bytes
1127 # clear vars
1128 last_fadeout_part = b""
1129 last_streamdetails = None
1130 buffer = b""
1131
1132 #### OTHER: enough data in buffer, feed to output
1133 while len(buffer) > req_buffer_size:
1134 yield buffer[:pcm_sample_size]
1135 bytes_written += pcm_sample_size
1136 buffer = buffer[pcm_sample_size:]
1137
1138 #### HANDLE END OF TRACK
1139 if last_fadeout_part:
1140 # edge case: we did not get enough data to make the crossfade
1141 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1142 yield _chunk
1143 del _chunk
1144 bytes_written += len(last_fadeout_part)
1145 last_fadeout_part = b""
1146 if self._crossfade_allowed(
1147 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1148 ):
1149 # if crossfade is enabled, save fadeout part to pickup for next track
1150 last_fadeout_part = buffer[-crossfade_buffer_size:]
1151 last_streamdetails = queue_track.streamdetails
1152 last_play_log_entry = play_log_entry
1153 remaining_bytes = buffer[:-crossfade_buffer_size]
1154 if remaining_bytes:
1155 yield remaining_bytes
1156 bytes_written += len(remaining_bytes)
1157 del remaining_bytes
1158 elif buffer:
1159 # no crossfade enabled, just yield the buffer last part
1160 bytes_written += len(buffer)
1161 for _chunk in divide_chunks(buffer, pcm_sample_size):
1162 yield _chunk
1163 del _chunk
1164 # make sure the buffer gets cleaned up
1165 del buffer
1166
1167 # update duration details based on the actual pcm data we sent
1168 # this also accounts for crossfade and silence stripping
1169 seconds_streamed = bytes_written / pcm_sample_size
1170 queue_track.streamdetails.seconds_streamed = seconds_streamed
1171 queue_track.streamdetails.duration = int(
1172 queue_track.streamdetails.seek_position + seconds_streamed
1173 )
1174 play_log_entry.seconds_streamed = seconds_streamed
1175 play_log_entry.duration = queue_track.streamdetails.duration
1176 total_bytes_sent += bytes_written
1177 self.logger.debug(
1178 "Finished Streaming queue track: %s (%s) on queue %s",
1179 queue_track.streamdetails.uri,
1180 queue_track.name,
1181 queue.display_name,
1182 )
1183 #### HANDLE END OF QUEUE FLOW STREAM
1184 # end of queue flow: make sure we yield the last_fadeout_part
1185 if last_fadeout_part:
1186 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1187 yield _chunk
1188 del _chunk
1189 # correct seconds streamed/duration
1190 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1191 streamdetails = queue_track.streamdetails
1192 assert streamdetails is not None
1193 streamdetails.seconds_streamed = (
1194 streamdetails.seconds_streamed or 0
1195 ) + last_part_seconds
1196 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1197 last_fadeout_part = b""
1198 total_bytes_sent += bytes_written
1199 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1200
1201 async def get_announcement_stream(
1202 self,
1203 announcement_url: str,
1204 output_format: AudioFormat,
1205 pre_announce: bool | str = False,
1206 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1207 ) -> AsyncGenerator[bytes, None]:
1208 """Get the special announcement stream."""
1209 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1210 # we are doing announcement in PCM first to avoid multiple encodings
1211 # when mixing pre-announce and announcement
1212 # also we have to deal with some TTS sources being super slow in delivering audio
1213 # so we take an approach where we start fetching the announcement in the background
1214 # while we can already start playing the pre-announce sound (if any)
1215
1216 pcm_format = (
1217 output_format
1218 if output_format.content_type.is_pcm()
1219 else AudioFormat(
1220 sample_rate=output_format.sample_rate,
1221 content_type=ContentType.PCM_S16LE,
1222 bit_depth=16,
1223 channels=output_format.channels,
1224 )
1225 )
1226
1227 async def fetch_announcement() -> None:
1228 fmt = announcement_url.rsplit(".")[-1]
1229 async for chunk in get_ffmpeg_stream(
1230 audio_input=announcement_url,
1231 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1232 output_format=pcm_format,
1233 chunk_size=get_chunksize(pcm_format, 1),
1234 ):
1235 await announcement_data.put(chunk)
1236 await announcement_data.put(None) # signal end of stream
1237
1238 self.mass.create_task(fetch_announcement())
1239
1240 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1241 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1242 if pre_announce:
1243 async for chunk in get_ffmpeg_stream(
1244 audio_input=pre_announce_url,
1245 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1246 output_format=pcm_format,
1247 chunk_size=get_chunksize(pcm_format, 1),
1248 ):
1249 yield chunk
1250 # pad silence while we're waiting for the announcement to be ready
1251 while announcement_data.empty():
1252 yield b"\0" * int(
1253 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1254 )
1255 await asyncio.sleep(0.1)
1256 # stream announcement
1257 while True:
1258 announcement_chunk = await announcement_data.get()
1259 if announcement_chunk is None:
1260 break
1261 yield announcement_chunk
1262
1263 if output_format == pcm_format:
1264 # no need to re-encode, just yield the raw PCM stream
1265 async for chunk in _announcement_stream():
1266 yield chunk
1267 return
1268
1269 # stream final announcement in requested output format
1270 async for chunk in get_ffmpeg_stream(
1271 audio_input=_announcement_stream(),
1272 input_format=pcm_format,
1273 output_format=output_format,
1274 ):
1275 yield chunk
1276
1277 async def get_plugin_source_stream(
1278 self,
1279 plugin_source_id: str,
1280 output_format: AudioFormat,
1281 player_id: str,
1282 player_filter_params: list[str] | None = None,
1283 ) -> AsyncGenerator[bytes, None]:
1284 """Get the special plugin source stream."""
1285 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1286 if not plugin_prov:
1287 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1288
1289 plugin_source = plugin_prov.get_source()
1290 self.logger.debug(
1291 "Start streaming PluginSource %s to %s using output format %s",
1292 plugin_source_id,
1293 player_id,
1294 output_format,
1295 )
1296 # this should already be set by the player controller, but just to be sure
1297 plugin_source.in_use_by = player_id
1298
1299 try:
1300 async for chunk in get_ffmpeg_stream(
1301 audio_input=cast(
1302 "str | AsyncGenerator[bytes, None]",
1303 plugin_prov.get_audio_stream(player_id)
1304 if plugin_source.stream_type == StreamType.CUSTOM
1305 else plugin_source.path,
1306 ),
1307 input_format=plugin_source.audio_format,
1308 output_format=output_format,
1309 filter_params=player_filter_params,
1310 extra_input_args=["-y", "-re"],
1311 ):
1312 if plugin_source.in_use_by != player_id:
1313 # another player took over or the stream ended, stop streaming
1314 break
1315 yield chunk
1316 finally:
1317 self.logger.debug(
1318 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1319 )
1320 await asyncio.sleep(1) # prevent race conditions when selecting source
1321 if plugin_source.in_use_by == player_id:
1322 # release control
1323 plugin_source.in_use_by = None
1324
1325 async def get_queue_item_stream(
1326 self,
1327 queue_item: QueueItem,
1328 pcm_format: AudioFormat,
1329 seek_position: int = 0,
1330 raise_on_error: bool = True,
1331 ) -> AsyncGenerator[bytes, None]:
1332 """Get the (PCM) audio stream for a single queue item."""
1333 # collect all arguments for ffmpeg
1334 streamdetails = queue_item.streamdetails
1335 assert streamdetails
1336 filter_params: list[str] = []
1337
1338 # handle volume normalization
1339 gain_correct: float | None = None
1340 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1341 # volume normalization using loudnorm filter (in dynamic mode)
1342 # which also collects the measurement on the fly during playback
1343 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1344 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1345 filter_rule += ":print_format=json"
1346 filter_params.append(filter_rule)
1347 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1348 # apply user defined fixed volume/gain correction
1349 config_key = (
1350 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1351 if streamdetails.media_type == MediaType.TRACK
1352 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1353 )
1354 gain_value = await self.mass.config.get_core_config_value(
1355 self.domain, config_key, default=0.0, return_type=float
1356 )
1357 gain_correct = round(gain_value, 2)
1358 filter_params.append(f"volume={gain_correct}dB")
1359 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1360 # volume normalization with known loudness measurement
1361 # apply volume/gain correction
1362 target_loudness = (
1363 float(streamdetails.target_loudness)
1364 if streamdetails.target_loudness is not None
1365 else 0.0
1366 )
1367 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1368 gain_correct = target_loudness - float(streamdetails.loudness_album)
1369 elif streamdetails.loudness is not None:
1370 gain_correct = target_loudness - float(streamdetails.loudness)
1371 else:
1372 gain_correct = 0.0
1373 gain_correct = round(gain_correct, 2)
1374 filter_params.append(f"volume={gain_correct}dB")
1375 streamdetails.volume_normalization_gain_correct = gain_correct
1376
1377 allow_buffer = bool(
1378 self.mass.config.get_raw_core_config_value(
1379 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1380 )
1381 and streamdetails.duration
1382 )
1383
1384 self.logger.debug(
1385 "Starting queue item stream for %s (%s)"
1386 " - using buffer: %s"
1387 " - using fade-in: %s"
1388 " - using volume normalization: %s",
1389 queue_item.name,
1390 streamdetails.uri,
1391 allow_buffer,
1392 streamdetails.fade_in,
1393 streamdetails.volume_normalization_mode,
1394 )
1395 if allow_buffer:
1396 media_stream_gen = get_buffered_media_stream(
1397 self.mass,
1398 streamdetails=streamdetails,
1399 pcm_format=pcm_format,
1400 seek_position=int(seek_position),
1401 filter_params=filter_params,
1402 )
1403 else:
1404 media_stream_gen = get_media_stream(
1405 self.mass,
1406 streamdetails=streamdetails,
1407 pcm_format=pcm_format,
1408 seek_position=int(seek_position),
1409 filter_params=filter_params,
1410 )
1411
1412 first_chunk_received = False
1413 fade_in_buffer = b""
1414 bytes_received = 0
1415 finished = False
1416 stream_started_at = asyncio.get_event_loop().time()
1417 try:
1418 async for chunk in media_stream_gen:
1419 bytes_received += len(chunk)
1420 if not first_chunk_received:
1421 first_chunk_received = True
1422 self.logger.debug(
1423 "First audio chunk received for %s (%s) after %.2f seconds",
1424 queue_item.name,
1425 streamdetails.uri,
1426 asyncio.get_event_loop().time() - stream_started_at,
1427 )
1428 # handle optional fade-in
1429 if streamdetails.fade_in:
1430 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1431 fade_in_buffer += chunk
1432 elif fade_in_buffer:
1433 async for fade_chunk in get_ffmpeg_stream(
1434 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1435 # but FFMpeg class actually accepts bytes too. This works at
1436 # runtime but needs type: ignore for mypy.
1437 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1438 input_format=pcm_format,
1439 output_format=pcm_format,
1440 filter_params=["afade=type=in:start_time=0:duration=3"],
1441 ):
1442 yield fade_chunk
1443 fade_in_buffer = b""
1444 streamdetails.fade_in = False
1445 else:
1446 yield chunk
1447 # help garbage collection by explicitly deleting chunk
1448 del chunk
1449 finished = True
1450 except AudioError as err:
1451 streamdetails.stream_error = True
1452 queue_item.available = False
1453 if raise_on_error:
1454 raise
1455 # yes, we swallow the error here after logging it
1456 # so the outer stream can handle it gracefully
1457 self.logger.error(
1458 "AudioError while streaming queue item %s (%s): %s",
1459 queue_item.name,
1460 streamdetails.uri,
1461 err,
1462 )
1463 finally:
1464 # determine how many seconds we've streamed
1465 # for pcm output we can calculate this easily
1466 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1467 streamdetails.seconds_streamed = seconds_streamed
1468 self.logger.debug(
1469 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1470 "aborted" if not finished else "finished",
1471 streamdetails.uri,
1472 asyncio.get_event_loop().time() - stream_started_at,
1473 seconds_streamed,
1474 )
1475 # report stream to provider
1476 if (finished or seconds_streamed >= 90) and (
1477 music_prov := self.mass.get_provider(streamdetails.provider)
1478 ):
1479 if TYPE_CHECKING: # avoid circular import
1480 assert isinstance(music_prov, MusicProvider)
1481 self.mass.create_task(music_prov.on_streamed(streamdetails))
1482
1483 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1484 async def get_queue_item_stream_with_smartfade(
1485 self,
1486 queue_item: QueueItem,
1487 pcm_format: AudioFormat,
1488 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1489 standard_crossfade_duration: int = 10,
1490 ) -> AsyncGenerator[bytes, None]:
1491 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1492 queue = self.mass.player_queues.get(queue_item.queue_id)
1493 if not queue:
1494 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1495
1496 streamdetails = queue_item.streamdetails
1497 assert streamdetails
1498 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1499
1500 if crossfade_data and streamdetails.seek_position > 0:
1501 # don't do crossfade when seeking into track
1502 crossfade_data = None
1503 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1504 # edge case alert: the next item changed just while we were preloading/crossfading
1505 self.logger.warning(
1506 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1507 )
1508 crossfade_data = None
1509
1510 self.logger.debug(
1511 "Start Streaming queue track: %s (%s) for queue %s "
1512 "- crossfade mode: %s "
1513 "- crossfading from previous track: %s ",
1514 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1515 queue_item.name,
1516 queue.display_name,
1517 smart_fades_mode,
1518 "true" if crossfade_data else "false",
1519 )
1520
1521 buffer = b""
1522 bytes_written = 0
1523 # calculate crossfade buffer size
1524 crossfade_buffer_duration = (
1525 SMART_CROSSFADE_DURATION
1526 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1527 else standard_crossfade_duration
1528 )
1529 crossfade_buffer_duration = min(
1530 crossfade_buffer_duration,
1531 int(streamdetails.duration / 2)
1532 if streamdetails.duration
1533 else crossfade_buffer_duration,
1534 )
1535 # Ensure crossfade buffer size is aligned to frame boundaries
1536 # Frame size = bytes_per_sample * channels
1537 bytes_per_sample = pcm_format.bit_depth // 8
1538 frame_size = bytes_per_sample * pcm_format.channels
1539 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1540 # Round down to nearest frame boundary
1541 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1542 fade_out_data: bytes | None = None
1543
1544 if crossfade_data:
1545 # Calculate discard amount in seconds (format-independent)
1546 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1547 fade_in_duration_seconds = (
1548 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1549 )
1550 discard_seconds = int(fade_in_duration_seconds) - 1
1551 # Calculate discard amounts in CURRENT track's format
1552 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1553 # Convert fade_in_size to current track's format for correct leftover calculation
1554 fade_in_size_in_current_format = int(
1555 fade_in_duration_seconds * pcm_format.pcm_sample_size
1556 )
1557 discard_leftover = fade_in_size_in_current_format - discard_bytes
1558 else:
1559 discard_seconds = streamdetails.seek_position
1560 discard_leftover = 0
1561 total_chunks_received = 0
1562 req_buffer_size = crossfade_buffer_size
1563 async for chunk in self.get_queue_item_stream(
1564 queue_item, pcm_format, seek_position=discard_seconds
1565 ):
1566 total_chunks_received += 1
1567 if discard_leftover:
1568 # discard leftover bytes from crossfade data
1569 chunk = chunk[discard_leftover:] # noqa: PLW2901
1570 discard_leftover = 0
1571
1572 if total_chunks_received < 10:
1573 # we want a stream to start as quickly as possible
1574 # so for the first 10 chunks we keep a very short buffer
1575 req_buffer_size = pcm_format.pcm_sample_size
1576 else:
1577 req_buffer_size = crossfade_buffer_size
1578
1579 # ALWAYS APPEND CHUNK TO BUFFER
1580 buffer += chunk
1581 del chunk
1582 if len(buffer) < req_buffer_size:
1583 # buffer is not full enough, move on
1584 continue
1585
1586 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1587 if crossfade_data:
1588 # send the (second half of the) crossfade data
1589 if crossfade_data.pcm_format != pcm_format:
1590 # edge case: pcm format mismatch, we need to resample
1591 self.logger.debug(
1592 "Resampling crossfade data from %s to %s for queue %s",
1593 crossfade_data.pcm_format.sample_rate,
1594 pcm_format.sample_rate,
1595 queue.display_name,
1596 )
1597 resampled_data = await resample_pcm_audio(
1598 crossfade_data.data,
1599 crossfade_data.pcm_format,
1600 pcm_format,
1601 )
1602 if resampled_data:
1603 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1604 yield _chunk
1605 bytes_written += len(resampled_data)
1606 else:
1607 # Resampling failed, error already logged in resample_pcm_audio
1608 # Skip crossfade data entirely - stream continues without it
1609 self.logger.warning(
1610 "Skipping crossfade data for queue %s due to resampling failure",
1611 queue.display_name,
1612 )
1613 else:
1614 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1615 yield _chunk
1616 bytes_written += len(crossfade_data.data)
1617 # clear vars
1618 crossfade_data = None
1619
1620 #### OTHER: enough data in buffer, feed to output
1621 while len(buffer) > req_buffer_size:
1622 yield buffer[: pcm_format.pcm_sample_size]
1623 bytes_written += pcm_format.pcm_sample_size
1624 buffer = buffer[pcm_format.pcm_sample_size :]
1625
1626 #### HANDLE END OF TRACK
1627
1628 if crossfade_data:
1629 # edge case: we did not get enough data to send the crossfade data
1630 # send the (second half of the) crossfade data
1631 if crossfade_data.pcm_format != pcm_format:
1632 # (yet another) edge case: pcm format mismatch, we need to resample
1633 self.logger.debug(
1634 "Resampling remaining crossfade data from %s to %s for queue %s",
1635 crossfade_data.pcm_format.sample_rate,
1636 pcm_format.sample_rate,
1637 queue.display_name,
1638 )
1639 resampled_crossfade_data = await resample_pcm_audio(
1640 crossfade_data.data,
1641 crossfade_data.pcm_format,
1642 pcm_format,
1643 )
1644 if resampled_crossfade_data:
1645 crossfade_data.data = resampled_crossfade_data
1646 else:
1647 # Resampling failed, error already logged in resample_pcm_audio
1648 # Skip the crossfade data entirely
1649 self.logger.warning(
1650 "Skipping remaining crossfade data for queue %s due to resampling failure",
1651 queue.display_name,
1652 )
1653 crossfade_data = None
1654 if crossfade_data:
1655 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1656 yield _chunk
1657 bytes_written += len(crossfade_data.data)
1658 crossfade_data = None
1659
1660 # get next track for crossfade
1661 next_queue_item: QueueItem | None
1662 try:
1663 self.logger.debug(
1664 "Preloading NEXT track for crossfade for queue %s",
1665 queue.display_name,
1666 )
1667 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1668 queue.queue_id, queue_item.queue_item_id
1669 )
1670 # set index_in_buffer to prevent our next track is overwritten while preloading
1671 if next_queue_item.streamdetails is None:
1672 raise InvalidDataError(
1673 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1674 )
1675 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1676 queue.queue_id, next_queue_item.queue_item_id
1677 )
1678 queue_player = self.mass.players.get(queue.queue_id)
1679 assert queue_player is not None
1680 next_queue_item_pcm_format = await self._select_pcm_format(
1681 player=queue_player,
1682 streamdetails=next_queue_item.streamdetails,
1683 smartfades_enabled=True,
1684 )
1685 except QueueEmpty:
1686 # end of queue reached, no next item
1687 next_queue_item = None
1688
1689 if not next_queue_item or not self._crossfade_allowed(
1690 queue_item,
1691 smart_fades_mode=smart_fades_mode,
1692 flow_mode=False,
1693 next_queue_item=next_queue_item,
1694 sample_rate=pcm_format.sample_rate,
1695 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1696 ):
1697 # no crossfade enabled/allowed, just yield the buffer last part
1698 bytes_written += len(buffer)
1699 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1700 yield _chunk
1701 else:
1702 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1703 fade_out_data = buffer
1704 buffer = b""
1705 try:
1706 async for chunk in self.get_queue_item_stream(
1707 next_queue_item, next_queue_item_pcm_format
1708 ):
1709 # append to buffer until we reach crossfade size
1710 # we only need the first X seconds of the NEXT track so we can
1711 # perform the crossfade.
1712 # the crossfaded audio of the previous and next track will be
1713 # sent in two equal parts: first half now, second half
1714 # when the next track starts. We use CrossfadeData to store
1715 # the second half to be picked up by the next track's stream generator.
1716 # Note that we more or less expect the user to have enabled the in-memory
1717 # buffer so we can keep the next track's audio data in memory.
1718 buffer += chunk
1719 del chunk
1720 if len(buffer) >= crossfade_buffer_size:
1721 break
1722 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1723 # Store original buffer size before any resampling for fade_in_size calculation
1724 # This size is in the next track's original format which is what we need
1725 original_buffer_size = len(buffer)
1726 if next_queue_item_pcm_format != pcm_format:
1727 # edge case: pcm format mismatch, we need to resample the next track's
1728 # beginning part before crossfading
1729 self.logger.debug(
1730 "Resampling next track's crossfade from %s to %s for queue %s",
1731 next_queue_item_pcm_format.sample_rate,
1732 pcm_format.sample_rate,
1733 queue.display_name,
1734 )
1735 buffer = await resample_pcm_audio(
1736 buffer,
1737 next_queue_item_pcm_format,
1738 pcm_format,
1739 )
1740 # perform actual (smart fades) crossfade using mixer
1741 crossfade_bytes = await self._smart_fades_mixer.mix(
1742 fade_in_part=buffer,
1743 fade_out_part=fade_out_data,
1744 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1745 fade_out_streamdetails=streamdetails,
1746 pcm_format=pcm_format,
1747 standard_crossfade_duration=standard_crossfade_duration,
1748 mode=smart_fades_mode,
1749 )
1750 # send half of the crossfade_part (= approx the fadeout part)
1751 split_point = (len(crossfade_bytes) + 1) // 2
1752 crossfade_first = crossfade_bytes[:split_point]
1753 crossfade_second = crossfade_bytes[split_point:]
1754 del crossfade_bytes
1755 bytes_written += len(crossfade_first)
1756 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1757 yield _chunk
1758 # store the other half for the next track
1759 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1760 # because it was created from the resampled buffer used for mixing.
1761 # BUT fade_in_size represents bytes in NEXT track's original format
1762 # (next_queue_item_pcm_format) because that's how much of the next track
1763 # was consumed during the crossfade. We need both formats to correctly
1764 # handle the crossfade data when the next track starts.
1765 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1766 data=crossfade_second,
1767 fade_in_size=original_buffer_size,
1768 pcm_format=pcm_format, # Format of the data (current track)
1769 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1770 queue_item_id=next_queue_item.queue_item_id,
1771 )
1772 except AudioError:
1773 # no crossfade possible, just yield the fade_out_data
1774 next_queue_item = None
1775 yield fade_out_data
1776 bytes_written += len(fade_out_data)
1777 del fade_out_data
1778 # make sure the buffer gets cleaned up
1779 del buffer
1780 # update duration details based on the actual pcm data we sent
1781 # this also accounts for crossfade and silence stripping
1782 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1783 streamdetails.seconds_streamed = seconds_streamed
1784 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1785 self.logger.debug(
1786 "Finished Streaming queue track: %s (%s) on queue %s "
1787 "- crossfade data prepared for next track: %s",
1788 streamdetails.uri,
1789 queue_item.name,
1790 queue.display_name,
1791 next_queue_item.name if next_queue_item else "N/A",
1792 )
1793
1794 def _log_request(self, request: web.Request) -> None:
1795 """Log request."""
1796 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1797 self.logger.log(
1798 VERBOSE_LOG_LEVEL,
1799 "Got %s request to %s from %s\nheaders: %s\n",
1800 request.method,
1801 request.path,
1802 request.remote,
1803 request.headers,
1804 )
1805 else:
1806 self.logger.debug(
1807 "Got %s request to %s from %s",
1808 request.method,
1809 request.path,
1810 request.remote,
1811 )
1812
1813 async def get_output_format(
1814 self,
1815 output_format_str: str,
1816 player: Player,
1817 content_sample_rate: int,
1818 content_bit_depth: int,
1819 ) -> AudioFormat:
1820 """Parse (player specific) output format details for given format string."""
1821 content_type: ContentType = ContentType.try_parse(output_format_str)
1822 supported_rates_conf = cast(
1823 "list[tuple[str, str]]",
1824 await self.mass.config.get_player_config_value(
1825 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1826 ),
1827 )
1828 output_channels_str = self.mass.config.get_raw_player_config_value(
1829 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1830 )
1831 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1832 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1833
1834 player_max_bit_depth = max(supported_bit_depths)
1835 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1836 if content_sample_rate in supported_sample_rates:
1837 output_sample_rate = content_sample_rate
1838 else:
1839 output_sample_rate = max(supported_sample_rates)
1840
1841 if not content_type.is_lossless():
1842 # no point in having a higher bit depth for lossy formats
1843 output_bit_depth = 16
1844 output_sample_rate = min(48000, output_sample_rate)
1845 if output_format_str == "pcm":
1846 content_type = ContentType.from_bit_depth(output_bit_depth)
1847 return AudioFormat(
1848 content_type=content_type,
1849 sample_rate=output_sample_rate,
1850 bit_depth=output_bit_depth,
1851 channels=1 if output_channels_str != "stereo" else 2,
1852 )
1853
1854 async def _select_flow_format(
1855 self,
1856 player: Player,
1857 ) -> AudioFormat:
1858 """Parse (player specific) flow stream PCM format."""
1859 supported_rates_conf = cast(
1860 "list[tuple[str, str]]",
1861 await self.mass.config.get_player_config_value(
1862 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1863 ),
1864 )
1865 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1866 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1867 for sample_rate in (192000, 96000, 48000, 44100):
1868 if sample_rate in supported_sample_rates:
1869 output_sample_rate = sample_rate
1870 break
1871 return AudioFormat(
1872 content_type=INTERNAL_PCM_FORMAT.content_type,
1873 sample_rate=output_sample_rate,
1874 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1875 channels=2,
1876 )
1877
1878 async def _select_pcm_format(
1879 self,
1880 player: Player,
1881 streamdetails: StreamDetails,
1882 smartfades_enabled: bool,
1883 ) -> AudioFormat:
1884 """Parse (player specific) stream internal PCM format."""
1885 supported_rates_conf = cast(
1886 "list[tuple[str, str]]",
1887 await self.mass.config.get_player_config_value(
1888 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1889 ),
1890 )
1891 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1892 # use highest supported rate within content rate
1893 output_sample_rate = max(
1894 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1895 default=48000, # sane/safe default
1896 )
1897 # work out pcm format based on streamdetails
1898 pcm_format = AudioFormat(
1899 sample_rate=output_sample_rate,
1900 # always use f32 internally for extra headroom for filters etc
1901 content_type=INTERNAL_PCM_FORMAT.content_type,
1902 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1903 channels=streamdetails.audio_format.channels,
1904 )
1905 if smartfades_enabled:
1906 pcm_format.channels = 2 # force stereo for crossfading
1907
1908 return pcm_format
1909
1910 def _crossfade_allowed(
1911 self,
1912 queue_item: QueueItem,
1913 smart_fades_mode: SmartFadesMode,
1914 flow_mode: bool = False,
1915 next_queue_item: QueueItem | None = None,
1916 sample_rate: int | None = None,
1917 next_sample_rate: int | None = None,
1918 ) -> bool:
1919 """Get the crossfade config for a queue item."""
1920 if smart_fades_mode == SmartFadesMode.DISABLED:
1921 return False
1922 if not (self.mass.players.get(queue_item.queue_id)):
1923 return False # just a guard
1924 if queue_item.media_type != MediaType.TRACK:
1925 self.logger.debug("Skipping crossfade: current item is not a track")
1926 return False
1927 # check if the next item is part of the same album
1928 next_item = next_queue_item or self.mass.player_queues.get_next_item(
1929 queue_item.queue_id, queue_item.queue_item_id
1930 )
1931 if not next_item:
1932 # there is no next item!
1933 return False
1934 # check if next item is a track
1935 if next_item.media_type != MediaType.TRACK:
1936 self.logger.debug("Skipping crossfade: next item is not a track")
1937 return False
1938 if (
1939 isinstance(queue_item.media_item, Track)
1940 and isinstance(next_item.media_item, Track)
1941 and queue_item.media_item.album
1942 and next_item.media_item.album
1943 and queue_item.media_item.album == next_item.media_item.album
1944 and not self.mass.config.get_raw_core_config_value(
1945 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
1946 )
1947 ):
1948 # in general, crossfade is not desired for tracks of the same (gapless) album
1949 # because we have no accurate way to determine if the album is gapless or not,
1950 # for now we just never crossfade between tracks of the same album
1951 self.logger.debug("Skipping crossfade: next item is part of the same album")
1952 return False
1953
1954 # check if we're allowed to crossfade on different sample rates
1955 if (
1956 not flow_mode
1957 and sample_rate
1958 and next_sample_rate
1959 and sample_rate != next_sample_rate
1960 and not self.mass.config.get_raw_player_config_value(
1961 queue_item.queue_id,
1962 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
1963 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
1964 )
1965 ):
1966 self.logger.debug(
1967 "Skipping crossfade: player does not support gapless playback "
1968 "with different sample rates (%s vs %s)",
1969 sample_rate,
1970 next_sample_rate,
1971 )
1972 return False
1973
1974 return True
1975
1976 async def _periodic_garbage_collection(self) -> None:
1977 """Periodic garbage collection to free up memory from audio buffers and streams."""
1978 self.logger.log(
1979 VERBOSE_LOG_LEVEL,
1980 "Running periodic garbage collection...",
1981 )
1982 # Run garbage collection in executor to avoid blocking the event loop
1983 # Since this runs periodically (not in response to subprocess cleanup),
1984 # it's safe to run in a thread without causing thread-safety issues
1985 loop = asyncio.get_running_loop()
1986 collected = await loop.run_in_executor(None, gc.collect)
1987 self.logger.log(
1988 VERBOSE_LOG_LEVEL,
1989 "Garbage collection completed, collected %d objects",
1990 collected,
1991 )
1992 # Schedule next run in 15 minutes
1993 self.mass.call_later(900, self._periodic_garbage_collection)
1994
1995 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
1996 """Set up smart fades logger level."""
1997 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
1998 if log_level == "GLOBAL":
1999 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2000 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2001 else:
2002 self.smart_fades_analyzer.logger.setLevel(log_level)
2003 self.smart_fades_mixer.logger.setLevel(log_level)
2004