/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_HTTP_PROFILE,
49 CONF_OUTPUT_CHANNELS,
50 CONF_OUTPUT_CODEC,
51 CONF_PUBLISH_IP,
52 CONF_SAMPLE_RATES,
53 CONF_SMART_FADES_MODE,
54 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56 CONF_VOLUME_NORMALIZATION_RADIO,
57 CONF_VOLUME_NORMALIZATION_TRACKS,
58 DEFAULT_STREAM_HEADERS,
59 ICY_HEADERS,
60 INTERNAL_PCM_FORMAT,
61 SILENCE_FILE,
62 VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.helpers import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70 get_buffered_media_stream,
71 get_chunksize,
72 get_media_stream,
73 get_player_filter_params,
74 get_stream_details,
75 resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81 divide_chunks,
82 format_ip_for_url,
83 get_ip_addresses,
84 get_total_system_memory,
85)
86from music_assistant.helpers.webserver import Webserver
87from music_assistant.models.core_controller import CoreController
88from music_assistant.models.music_provider import MusicProvider
89from music_assistant.models.plugin import PluginProvider, PluginSource
90from music_assistant.models.smart_fades import SmartFadesMode
91from music_assistant.providers.universal_group.constants import UGP_PREFIX
92from music_assistant.providers.universal_group.player import UniversalGroupPlayer
93
94if TYPE_CHECKING:
95 from music_assistant_models.config_entries import CoreConfig
96 from music_assistant_models.player import PlayerMedia
97 from music_assistant_models.player_queue import PlayerQueue
98 from music_assistant_models.queue_item import QueueItem
99 from music_assistant_models.streamdetails import StreamDetails
100
101 from music_assistant.mass import MusicAssistant
102 from music_assistant.models.player import Player
103
104
105isfile = wrap(os.path.isfile)
106
107CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
108CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
109CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
110
111# Calculate total system memory once at module load time
112TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
113CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
114DEFAULT_PORT: Final[int] = 8097
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses(include_ipv6=True)
189 return (
190 ConfigEntry(
191 key=CONF_ALLOW_BUFFER,
192 type=ConfigEntryType.BOOLEAN,
193 default_value=CONF_ALLOW_BUFFER_DEFAULT,
194 label="Allow (in-memory) buffering of (track) audio",
195 description="By default, Music Assistant tries to be as resource "
196 "efficient as possible when streaming audio, especially considering "
197 "low-end devices such as Raspberry Pi's. This means that audio "
198 "buffering is disabled by default to reduce memory usage. \n\n"
199 "Enabling this option allows for in-memory buffering of audio, "
200 "which (massively) improves playback (and seeking) performance but it comes "
201 "at the cost of increased memory usage. "
202 "If you run Music Assistant on a capable device with enough memory, "
203 "enabling this option is strongly recommended.",
204 required=False,
205 category="playback",
206 ),
207 ConfigEntry(
208 key=CONF_VOLUME_NORMALIZATION_RADIO,
209 type=ConfigEntryType.STRING,
210 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
211 label="Volume normalization method for radio streams",
212 options=[
213 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
214 for x in VolumeNormalizationMode
215 ],
216 category="playback",
217 ),
218 ConfigEntry(
219 key=CONF_VOLUME_NORMALIZATION_TRACKS,
220 type=ConfigEntryType.STRING,
221 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
222 label="Volume normalization method for tracks",
223 options=[
224 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
225 for x in VolumeNormalizationMode
226 ],
227 category="playback",
228 ),
229 ConfigEntry(
230 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
231 type=ConfigEntryType.FLOAT,
232 range=(-20, 10),
233 default_value=-6,
234 label="Fixed/fallback gain adjustment for radio streams",
235 category="playback",
236 ),
237 ConfigEntry(
238 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
239 type=ConfigEntryType.FLOAT,
240 range=(-20, 10),
241 default_value=-6,
242 label="Fixed/fallback gain adjustment for tracks",
243 category="playback",
244 ),
245 ConfigEntry(
246 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
247 type=ConfigEntryType.BOOLEAN,
248 default_value=False,
249 label="Allow crossfade between tracks from the same album",
250 description="Enabling this option allows for crossfading between tracks "
251 "that are part of the same album.",
252 category="playback",
253 ),
254 ConfigEntry(
255 key=CONF_PUBLISH_IP,
256 type=ConfigEntryType.STRING,
257 default_value=ip_addresses[0],
258 label="Published IP address",
259 description="This IP address is communicated to players where to find this server."
260 "\nMake sure that this IP can be reached by players on the local network, "
261 "otherwise audio streaming will not work.",
262 required=False,
263 category="generic",
264 advanced=True,
265 requires_reload=True,
266 ),
267 ConfigEntry(
268 key=CONF_BIND_PORT,
269 type=ConfigEntryType.INTEGER,
270 default_value=DEFAULT_PORT,
271 label="TCP Port",
272 description="The TCP port to run the server. "
273 "Make sure that this server can be reached "
274 "on the given IP and TCP port by players on the local network.",
275 category="generic",
276 advanced=True,
277 requires_reload=True,
278 ),
279 ConfigEntry(
280 key=CONF_BIND_IP,
281 type=ConfigEntryType.STRING,
282 default_value="0.0.0.0",
283 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", "::", *ip_addresses}],
284 label="Bind to IP/interface",
285 description="Start the stream server on this specific interface. \n"
286 "Use 0.0.0.0 or :: to bind to all interfaces, which is the default. \n"
287 "This is an advanced setting that should normally "
288 "not be adjusted in regular setups.",
289 category="generic",
290 advanced=True,
291 required=False,
292 requires_reload=True,
293 ),
294 ConfigEntry(
295 key=CONF_SMART_FADES_LOG_LEVEL,
296 type=ConfigEntryType.STRING,
297 label="Smart Fades Log level",
298 description="Log level for the Smart Fades mixer and analyzer.",
299 options=CONF_ENTRY_LOG_LEVEL.options,
300 default_value="GLOBAL",
301 category="generic",
302 advanced=True,
303 ),
304 )
305
306 async def setup(self, config: CoreConfig) -> None:
307 """Async initialize of module."""
308 # copy log level to audio/ffmpeg loggers
309 AUDIO_LOGGER.setLevel(self.logger.level)
310 FFMPEG_LOGGER.setLevel(self.logger.level)
311 self._setup_smart_fades_logger(config)
312 # perform check for ffmpeg version
313 await check_ffmpeg_version()
314 # start the webserver
315 self.publish_port = config.get_value(CONF_BIND_PORT, DEFAULT_PORT)
316 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
317 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
318 # print a big fat message in the log where the streamserver is running
319 # because this is a common source of issues for people with more complex setups
320 self.logger.log(
321 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
322 "\n\n################################################################################\n"
323 "Starting streamserver on %s:%s\n"
324 "This is the IP address that is communicated to players.\n"
325 "If this is incorrect, audio will not play!\n"
326 "See the documentation how to configure the publish IP for the Streamserver\n"
327 "in Settings --> Core modules --> Streamserver\n"
328 "################################################################################\n",
329 self.publish_ip,
330 self.publish_port,
331 )
332 await self._server.setup(
333 bind_ip=bind_ip,
334 bind_port=cast("int", self.publish_port),
335 base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
336 static_routes=[
337 (
338 "*",
339 "/flow/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
340 self.serve_queue_flow_stream,
341 ),
342 (
343 "*",
344 "/single/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
345 self.serve_queue_item_stream,
346 ),
347 (
348 "*",
349 "/command/{queue_id}/{command}.mp3",
350 self.serve_command_request,
351 ),
352 (
353 "*",
354 "/announcement/{player_id}.{fmt}",
355 self.serve_announcement_stream,
356 ),
357 (
358 "*",
359 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
360 self.serve_plugin_source_stream,
361 ),
362 ],
363 )
364 # Start periodic garbage collection task
365 # This ensures memory from audio buffers and streams is cleaned up regularly
366 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
367
368 async def close(self) -> None:
369 """Cleanup on exit."""
370 await self._server.close()
371
372 async def resolve_stream_url(
373 self,
374 player_id: str,
375 media: PlayerMedia,
376 ) -> str:
377 """Resolve the stream URL for the given PlayerMedia."""
378 conf_output_codec = await self.mass.config.get_player_config_value(
379 player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
380 )
381 output_codec = ContentType.try_parse(conf_output_codec or "flac")
382 fmt = output_codec.value
383 # handle raw pcm without exact format specifiers
384 if output_codec.is_pcm() and ";" not in fmt:
385 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
386 extra_data = media.custom_data or {}
387 flow_mode = extra_data.get("flow_mode", False)
388 session_id = extra_data.get("session_id")
389 queue_item_id = media.queue_item_id
390 if not session_id or not queue_item_id:
391 raise InvalidDataError("Can not resolve stream URL: Invalid PlayerMedia data")
392 queue_id = media.source_id
393 base_path = "flow" if flow_mode else "single"
394 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}" # noqa: E501
395
396 async def get_plugin_source_url(
397 self,
398 plugin_source: PluginSource,
399 player_id: str,
400 ) -> str:
401 """Get the url for the Plugin Source stream/proxy."""
402 if plugin_source.audio_format.content_type.is_pcm():
403 fmt = ContentType.WAV.value
404 else:
405 fmt = plugin_source.audio_format.content_type.value
406 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
407
408 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
409 """Stream single queueitem audio to a player."""
410 self._log_request(request)
411 queue_id = request.match_info["queue_id"]
412 player_id = request.match_info["player_id"]
413 if not (queue := self.mass.player_queues.get(queue_id)):
414 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
415 session_id = request.match_info["session_id"]
416 if queue.session_id and session_id != queue.session_id:
417 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
418 if not (player := self.mass.players.get_player(player_id)):
419 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
420 queue_item_id = request.match_info["queue_item_id"]
421 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
422 if not queue_item:
423 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
424 if not queue_item.streamdetails:
425 try:
426 queue_item.streamdetails = await get_stream_details(
427 mass=self.mass, queue_item=queue_item
428 )
429 except Exception as e:
430 self.logger.error(
431 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
432 )
433 queue_item.available = False
434 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
435
436 # pick output format based on the streamdetails and player capabilities
437 pcm_format = await self._select_pcm_format(
438 player=player,
439 streamdetails=queue_item.streamdetails,
440 smartfades_enabled=True,
441 )
442 output_format = await self.get_output_format(
443 output_format_str=request.match_info["fmt"],
444 player=player,
445 content_sample_rate=pcm_format.sample_rate,
446 content_bit_depth=pcm_format.bit_depth,
447 )
448
449 # prepare request, add some DLNA/UPNP compatible headers
450 # icy-name is sanitized to avoid a "Potential header injection attack" exception by aiohttp
451 # see https://github.com/music-assistant/support/issues/4913
452 headers = {
453 **DEFAULT_STREAM_HEADERS,
454 "icy-name": queue_item.name.replace("\n", " ").replace("\r", " ").replace("\t", " "),
455 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
456 "Accept-Ranges": "none",
457 "Content-Type": f"audio/{output_format.output_format_str}",
458 }
459 resp = web.StreamResponse(
460 status=200,
461 reason="OK",
462 headers=headers,
463 )
464 resp.content_type = f"audio/{output_format.output_format_str}"
465 http_profile = await self.mass.config.get_player_config_value(
466 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
467 )
468 if http_profile == "forced_content_length" and not queue_item.duration:
469 # just set an insane high content length to make sure the player keeps playing
470 resp.content_length = get_chunksize(output_format, 12 * 3600)
471 elif http_profile == "forced_content_length" and queue_item.duration:
472 # guess content length based on duration
473 resp.content_length = get_chunksize(output_format, queue_item.duration)
474 elif http_profile == "chunked":
475 resp.enable_chunked_encoding()
476
477 await resp.prepare(request)
478
479 # return early if this is not a GET request
480 if request.method != "GET":
481 return resp
482
483 if queue_item.media_type != MediaType.TRACK:
484 # no crossfade on non-tracks
485 smart_fades_mode = SmartFadesMode.DISABLED
486 else:
487 smart_fades_mode = await self.mass.config.get_player_config_value(
488 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
489 )
490 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
491 queue.queue_id, CONF_CROSSFADE_DURATION, 10
492 )
493 if (
494 smart_fades_mode != SmartFadesMode.DISABLED
495 and PlayerFeature.GAPLESS_PLAYBACK not in player.state.supported_features
496 ):
497 # crossfade is not supported on this player due to missing gapless playback
498 self.logger.warning(
499 "Crossfade disabled: Player %s does not support gapless playback, "
500 "consider enabling flow mode to enable crossfade on this player.",
501 player.state.name if player else "Unknown Player",
502 )
503 smart_fades_mode = SmartFadesMode.DISABLED
504
505 if smart_fades_mode != SmartFadesMode.DISABLED:
506 # crossfade is enabled, use special crossfaded single item stream
507 # where the crossfade of the next track is present in the stream of
508 # a single track. This only works if the player supports gapless playback!
509 audio_input = self.get_queue_item_stream_with_smartfade(
510 queue_item=queue_item,
511 pcm_format=pcm_format,
512 smart_fades_mode=smart_fades_mode,
513 standard_crossfade_duration=standard_crossfade_duration,
514 )
515 else:
516 # no crossfade, just a regular single item stream
517 audio_input = buffered(
518 self.get_queue_item_stream(
519 queue_item=queue_item,
520 pcm_format=pcm_format,
521 seek_position=queue_item.streamdetails.seek_position,
522 ),
523 buffer_size=10,
524 min_buffer_before_yield=2,
525 )
526 # stream the audio
527 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
528 # the desired output format for the player including any player specific filter params
529 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
530 if queue_item.media_type == MediaType.RADIO:
531 # keep very short buffer for radio streams
532 # to keep them (more or less) realtime and prevent time outs
533 read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "2"]
534 else:
535 # just allow the player to buffer whatever it wants for single item streams
536 read_rate_input_args = None
537
538 first_chunk_received = False
539 bytes_sent = 0
540 async for chunk in get_ffmpeg_stream(
541 audio_input=audio_input,
542 input_format=pcm_format,
543 output_format=output_format,
544 filter_params=get_player_filter_params(
545 self.mass,
546 player_id=player.player_id,
547 input_format=pcm_format,
548 output_format=output_format,
549 ),
550 extra_input_args=read_rate_input_args,
551 ):
552 try:
553 await resp.write(chunk)
554 bytes_sent += len(chunk)
555 if not first_chunk_received:
556 first_chunk_received = True
557 # inform the queue that the track is now loaded in the buffer
558 # so for example the next track can be enqueued
559 self.mass.player_queues.track_loaded_in_buffer(
560 queue_item.queue_id, queue_item.queue_item_id
561 )
562 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
563 if first_chunk_received and not player.stop_called:
564 # Player disconnected (unexpected) after receiving at least some data
565 # This could indicate buffering issues, network problems,
566 # or player-specific issues
567 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
568 self.logger.warning(
569 "Player %s disconnected prematurely from stream for %s (%s) - "
570 "error: %s, sent %d bytes, expected (approx) bytes=%d",
571 queue.display_name,
572 queue_item.name,
573 queue_item.uri,
574 err.__class__.__name__,
575 bytes_sent,
576 bytes_expected,
577 )
578 break
579 if queue_item.streamdetails.stream_error:
580 self.logger.error(
581 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
582 queue_item.name,
583 queue_item.uri,
584 queue.display_name,
585 )
586 # try to skip to the next item in the queue after a short delay
587 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
588 return resp
589
590 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
591 """Stream Queue Flow audio to player."""
592 self._log_request(request)
593 queue_id = request.match_info["queue_id"]
594 player_id = request.match_info["player_id"]
595 if not (queue := self.mass.player_queues.get(queue_id)):
596 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
597 if not (player := self.mass.players.get_player(player_id)):
598 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
599 start_queue_item_id = request.match_info["queue_item_id"]
600 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
601 if not start_queue_item:
602 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
603
604 queue.flow_mode_stream_log = []
605
606 # select the highest possible PCM settings for this player
607 flow_pcm_format = await self._select_flow_format(player)
608
609 # work out output format/details
610 output_format = await self.get_output_format(
611 output_format_str=request.match_info["fmt"],
612 player=player,
613 content_sample_rate=flow_pcm_format.sample_rate,
614 content_bit_depth=flow_pcm_format.bit_depth,
615 )
616 # work out ICY metadata support
617 icy_preference = self.mass.config.get_raw_player_config_value(
618 queue_id,
619 CONF_ENTRY_ENABLE_ICY_METADATA.key,
620 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
621 )
622 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
623 icy_meta_interval = 256000 if icy_preference == "full" else 16384
624
625 # prepare request, add some DLNA/UPNP compatible headers
626 headers = {
627 **DEFAULT_STREAM_HEADERS,
628 **ICY_HEADERS,
629 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
630 "Accept-Ranges": "none",
631 "Content-Type": f"audio/{output_format.output_format_str}",
632 }
633 if enable_icy:
634 headers["icy-metaint"] = str(icy_meta_interval)
635
636 resp = web.StreamResponse(
637 status=200,
638 reason="OK",
639 headers=headers,
640 )
641 http_profile = await self.mass.config.get_player_config_value(
642 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
643 )
644 if http_profile == "forced_content_length":
645 # just set an insane high content length to make sure the player keeps playing
646 resp.content_length = get_chunksize(output_format, 12 * 3600)
647 elif http_profile == "chunked":
648 resp.enable_chunked_encoding()
649
650 await resp.prepare(request)
651
652 # return early if this is not a GET request
653 if request.method != "GET":
654 return resp
655
656 # all checks passed, start streaming!
657 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
658 # the desired output format for the player including any player specific filter params
659 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
660 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
661
662 async for chunk in get_ffmpeg_stream(
663 audio_input=self.get_queue_flow_stream(
664 queue=queue,
665 start_queue_item=start_queue_item,
666 pcm_format=flow_pcm_format,
667 ),
668 input_format=flow_pcm_format,
669 output_format=output_format,
670 filter_params=get_player_filter_params(
671 self.mass, player.player_id, flow_pcm_format, output_format
672 ),
673 # we need to slowly feed the music to avoid the player stopping and later
674 # restarting (or completely failing) the audio stream by keeping the buffer short.
675 # this is reported to be an issue especially with Chromecast players.
676 # see for example: https://github.com/music-assistant/support/issues/3717
677 # allow buffer ahead of 6 seconds and read rest in realtime
678 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
679 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
680 ):
681 try:
682 await resp.write(chunk)
683 except (BrokenPipeError, ConnectionResetError, ConnectionError):
684 # race condition
685 break
686
687 if not enable_icy:
688 continue
689
690 # if icy metadata is enabled, send the icy metadata after the chunk
691 if (
692 # use current item here and not buffered item, otherwise
693 # the icy metadata will be too much ahead
694 (current_item := queue.current_item)
695 and current_item.streamdetails
696 and current_item.streamdetails.stream_title
697 ):
698 title = current_item.streamdetails.stream_title
699 elif queue and current_item and current_item.name:
700 title = current_item.name
701 else:
702 title = "Music Assistant"
703 metadata = f"StreamTitle='{title}';".encode()
704 if icy_preference == "full" and current_item and current_item.image:
705 metadata += f"StreamURL='{current_item.image.path}'".encode()
706 while len(metadata) % 16 != 0:
707 metadata += b"\x00"
708 length = len(metadata)
709 length_b = chr(int(length / 16)).encode()
710 await resp.write(length_b + metadata)
711
712 return resp
713
714 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
715 """Handle special 'command' request for a player."""
716 self._log_request(request)
717 queue_id = request.match_info["queue_id"]
718 command = request.match_info["command"]
719 if command == "next":
720 self.mass.create_task(self.mass.player_queues.next(queue_id))
721 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
722
723 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
724 """Stream announcement audio to a player."""
725 self._log_request(request)
726 player_id = request.match_info["player_id"]
727 player = self.mass.player_queues.get(player_id)
728 if not player:
729 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
730 if not (announce_data := self.announcements.get(player_id)):
731 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
732
733 # work out output format/details
734 fmt = request.match_info["fmt"]
735 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
736
737 http_profile = await self.mass.config.get_player_config_value(
738 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
739 )
740 if http_profile == "forced_content_length":
741 # given the fact that an announcement is just a short audio clip,
742 # just send it over completely at once so we have a fixed content length
743 data = b""
744 async for chunk in self.get_announcement_stream(
745 announcement_url=announce_data["announcement_url"],
746 output_format=audio_format,
747 pre_announce=announce_data["pre_announce"],
748 pre_announce_url=announce_data["pre_announce_url"],
749 ):
750 data += chunk
751 return web.Response(
752 body=data,
753 content_type=f"audio/{audio_format.output_format_str}",
754 headers=DEFAULT_STREAM_HEADERS,
755 )
756
757 resp = web.StreamResponse(
758 status=200,
759 reason="OK",
760 headers=DEFAULT_STREAM_HEADERS,
761 )
762 resp.content_type = f"audio/{audio_format.output_format_str}"
763 if http_profile == "chunked":
764 resp.enable_chunked_encoding()
765
766 await resp.prepare(request)
767
768 # return early if this is not a GET request
769 if request.method != "GET":
770 return resp
771
772 # all checks passed, start streaming!
773 self.logger.debug(
774 "Start serving audio stream for Announcement %s to %s",
775 announce_data["announcement_url"],
776 player.state.name,
777 )
778 async for chunk in self.get_announcement_stream(
779 announcement_url=announce_data["announcement_url"],
780 output_format=audio_format,
781 pre_announce=announce_data["pre_announce"],
782 pre_announce_url=announce_data["pre_announce_url"],
783 ):
784 try:
785 await resp.write(chunk)
786 except (BrokenPipeError, ConnectionResetError):
787 break
788
789 self.logger.debug(
790 "Finished serving audio stream for Announcement %s to %s",
791 announce_data["announcement_url"],
792 player.state.name,
793 )
794
795 return resp
796
797 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
798 """Stream PluginSource audio to a player."""
799 self._log_request(request)
800 plugin_source_id = request.match_info["plugin_source"]
801 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
802 if not provider:
803 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
804 # work out output format/details
805 player_id = request.match_info["player_id"]
806 player = self.mass.players.get_player(player_id)
807 if not player:
808 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
809 plugin_source = provider.get_source()
810 output_format = await self.get_output_format(
811 output_format_str=request.match_info["fmt"],
812 player=player,
813 content_sample_rate=plugin_source.audio_format.sample_rate,
814 content_bit_depth=plugin_source.audio_format.bit_depth,
815 )
816 headers = {
817 **DEFAULT_STREAM_HEADERS,
818 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
819 "icy-name": plugin_source.name,
820 "Accept-Ranges": "none",
821 "Content-Type": f"audio/{output_format.output_format_str}",
822 }
823
824 resp = web.StreamResponse(
825 status=200,
826 reason="OK",
827 headers=headers,
828 )
829 resp.content_type = f"audio/{output_format.output_format_str}"
830 http_profile = await self.mass.config.get_player_config_value(
831 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
832 )
833 if http_profile == "forced_content_length":
834 # just set an insanely high content length to make sure the player keeps playing
835 resp.content_length = get_chunksize(output_format, 12 * 3600)
836 elif http_profile == "chunked":
837 resp.enable_chunked_encoding()
838
839 await resp.prepare(request)
840
841 # return early if this is not a GET request
842 if request.method != "GET":
843 return resp
844
845 # all checks passed, start streaming!
846 if not plugin_source.audio_format:
847 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
848 async for chunk in self.get_plugin_source_stream(
849 plugin_source_id=plugin_source_id,
850 output_format=output_format,
851 player_id=player_id,
852 player_filter_params=get_player_filter_params(
853 self.mass, player_id, plugin_source.audio_format, output_format
854 ),
855 ):
856 try:
857 await resp.write(chunk)
858 except (BrokenPipeError, ConnectionResetError, ConnectionError):
859 break
860 return resp
861
862 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
863 """Get the url for the special command stream."""
864 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
865
866 def get_announcement_url(
867 self,
868 player_id: str,
869 announce_data: AnnounceData,
870 content_type: ContentType = ContentType.MP3,
871 ) -> str:
872 """Get the url for the special announcement stream."""
873 self.announcements[player_id] = announce_data
874 # use stream server to host announcement on local network
875 # this ensures playback on all players, including ones that do not
876 # like https hosts and it also offers the pre-announce 'bell'
877 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
878
879 def get_stream(
880 self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
881 ) -> AsyncGenerator[bytes, None]:
882 """
883 Get a stream of the given media as raw PCM audio.
884
885 This is used as helper for player providers that can consume the raw PCM
886 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
887 """
888 # select audio source
889 if media.media_type == MediaType.ANNOUNCEMENT:
890 # special case: stream announcement
891 assert media.custom_data
892 audio_source = self.get_announcement_stream(
893 media.custom_data["announcement_url"],
894 output_format=pcm_format,
895 pre_announce=media.custom_data["pre_announce"],
896 pre_announce_url=media.custom_data["pre_announce_url"],
897 )
898 elif media.media_type == MediaType.PLUGIN_SOURCE:
899 # special case: plugin source stream
900 assert media.custom_data
901 audio_source = self.get_plugin_source_stream(
902 plugin_source_id=media.custom_data["source_id"],
903 output_format=pcm_format,
904 # need to pass player_id from the PlayerMedia object
905 # because this could have been a group
906 player_id=media.custom_data["player_id"],
907 )
908 elif (
909 media.media_type == MediaType.FLOW_STREAM
910 and media.source_id
911 and media.source_id.startswith(UGP_PREFIX)
912 and media.uri
913 and "/ugp/" in media.uri
914 ):
915 # special case: member player accessing UGP stream
916 # Check URI to distinguish from the UGP accessing its own stream
917 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get_player(media.source_id))
918 ugp_stream = ugp_player.stream
919 assert ugp_stream is not None # for type checker
920 if ugp_stream.base_pcm_format == pcm_format:
921 # no conversion needed
922 audio_source = ugp_stream.subscribe_raw()
923 else:
924 audio_source = ugp_stream.get_stream(output_format=pcm_format)
925 elif (
926 media.source_id
927 and media.queue_item_id
928 and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
929 ):
930 # regular queue (flow) stream request
931 queue = self.mass.player_queues.get(media.source_id)
932 assert queue
933 start_queue_item = self.mass.player_queues.get_item(
934 media.source_id, media.queue_item_id
935 )
936 assert start_queue_item
937 audio_source = self.mass.streams.get_queue_flow_stream(
938 queue=queue,
939 start_queue_item=start_queue_item,
940 pcm_format=pcm_format,
941 )
942 elif media.source_id and media.queue_item_id:
943 # single item stream (e.g. radio)
944 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
945 assert queue_item
946 audio_source = buffered(
947 self.get_queue_item_stream(
948 queue_item=queue_item,
949 pcm_format=pcm_format,
950 ),
951 buffer_size=10,
952 min_buffer_before_yield=2,
953 )
954 else:
955 # assume url or some other direct path
956 # NOTE: this will fail if its an uri not playable by ffmpeg
957 audio_source = get_ffmpeg_stream(
958 audio_input=media.uri,
959 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
960 output_format=pcm_format,
961 )
962 return audio_source
963
964 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
965 async def get_queue_flow_stream(
966 self,
967 queue: PlayerQueue,
968 start_queue_item: QueueItem,
969 pcm_format: AudioFormat,
970 ) -> AsyncGenerator[bytes, None]:
971 """
972 Get a flow stream of all tracks in the queue as raw PCM audio.
973
974 yields chunks of exactly 1 second of audio in the given pcm_format.
975 """
976 # ruff: noqa: PLR0915
977 assert pcm_format.content_type.is_pcm()
978 queue_track = None
979 last_fadeout_part: bytes = b""
980 last_streamdetails: StreamDetails | None = None
981 last_play_log_entry: PlayLogEntry | None = None
982 queue.flow_mode = True
983 if not start_queue_item:
984 # this can happen in some (edge case) race conditions
985 return
986 pcm_sample_size = pcm_format.pcm_sample_size
987 if start_queue_item.media_type != MediaType.TRACK:
988 # no crossfade on non-tracks
989 smart_fades_mode = SmartFadesMode.DISABLED
990 standard_crossfade_duration = 0
991 else:
992 smart_fades_mode = await self.mass.config.get_player_config_value(
993 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
994 )
995 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
996 queue.queue_id, CONF_CROSSFADE_DURATION, 10
997 )
998 self.logger.info(
999 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
1000 queue.display_name,
1001 smart_fades_mode,
1002 f"({standard_crossfade_duration}s)"
1003 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
1004 else "",
1005 )
1006 total_bytes_sent = 0
1007 total_chunks_received = 0
1008
1009 while True:
1010 # get (next) queue item to stream
1011 if queue_track is None:
1012 queue_track = start_queue_item
1013 else:
1014 try:
1015 queue_track = await self.mass.player_queues.load_next_queue_item(
1016 queue.queue_id, queue_track.queue_item_id
1017 )
1018 except QueueEmpty:
1019 break
1020
1021 if queue_track.streamdetails is None:
1022 raise InvalidDataError(
1023 "No Streamdetails known for queue item %s",
1024 queue_track.queue_item_id,
1025 )
1026
1027 self.logger.debug(
1028 "Start Streaming queue track: %s (%s) for queue %s",
1029 queue_track.streamdetails.uri,
1030 queue_track.name,
1031 queue.display_name,
1032 )
1033 # append to play log so the queue controller can work out which track is playing
1034 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1035 queue.flow_mode_stream_log.append(play_log_entry)
1036 # calculate crossfade buffer size
1037 crossfade_buffer_duration = (
1038 SMART_CROSSFADE_DURATION
1039 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1040 else standard_crossfade_duration
1041 )
1042 crossfade_buffer_duration = min(
1043 crossfade_buffer_duration,
1044 int(queue_track.streamdetails.duration / 2)
1045 if queue_track.streamdetails.duration
1046 else crossfade_buffer_duration,
1047 )
1048 # Ensure crossfade buffer size is aligned to frame boundaries
1049 # Frame size = bytes_per_sample * channels
1050 bytes_per_sample = pcm_format.bit_depth // 8
1051 frame_size = bytes_per_sample * pcm_format.channels
1052 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1053 # Round down to nearest frame boundary
1054 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1055
1056 bytes_written = 0
1057 buffer = b""
1058 # handle incoming audio chunks
1059 first_chunk_received = False
1060 # buffer size needs to be big enough to include the crossfade part
1061
1062 async for chunk in self.get_queue_item_stream(
1063 queue_track,
1064 pcm_format=pcm_format,
1065 seek_position=queue_track.streamdetails.seek_position,
1066 raise_on_error=False,
1067 ):
1068 total_chunks_received += 1
1069 if not first_chunk_received:
1070 first_chunk_received = True
1071 # inform the queue that the track is now loaded in the buffer
1072 # so the next track can be preloaded
1073 self.mass.player_queues.track_loaded_in_buffer(
1074 queue.queue_id, queue_track.queue_item_id
1075 )
1076 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1077 # we want a stream to start as quickly as possible
1078 # so for the first 10 chunks we keep a very short buffer
1079 req_buffer_size = pcm_format.pcm_sample_size
1080 else:
1081 req_buffer_size = (
1082 pcm_sample_size
1083 if smart_fades_mode == SmartFadesMode.DISABLED
1084 else crossfade_buffer_size
1085 )
1086
1087 # ALWAYS APPEND CHUNK TO BUFFER
1088 buffer += chunk
1089 del chunk
1090 if len(buffer) < req_buffer_size:
1091 # buffer is not full enough, move on
1092 # yield control to event loop with 10ms delay
1093 await asyncio.sleep(0.01)
1094 continue
1095
1096 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1097 if last_fadeout_part and last_streamdetails:
1098 # perform crossfade
1099 fadein_part = buffer[:crossfade_buffer_size]
1100 remaining_bytes = buffer[crossfade_buffer_size:]
1101 # Use the mixer to handle all crossfade logic
1102 crossfade_part = await self._smart_fades_mixer.mix(
1103 fade_in_part=fadein_part,
1104 fade_out_part=last_fadeout_part,
1105 fade_in_streamdetails=queue_track.streamdetails,
1106 fade_out_streamdetails=last_streamdetails,
1107 pcm_format=pcm_format,
1108 standard_crossfade_duration=standard_crossfade_duration,
1109 mode=smart_fades_mode,
1110 )
1111 # because the crossfade exists of both the fadein and fadeout part
1112 # we need to correct the bytes_written accordingly so the duration
1113 # calculations at the end of the track are correct
1114 crossfade_part_len = len(crossfade_part)
1115 bytes_written += int(crossfade_part_len / 2)
1116 if last_play_log_entry:
1117 assert last_play_log_entry.seconds_streamed is not None
1118 last_play_log_entry.seconds_streamed += (
1119 crossfade_part_len / 2 / pcm_sample_size
1120 )
1121 # yield crossfade_part (in pcm_sample_size chunks)
1122 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1123 yield _chunk
1124 del _chunk
1125 del crossfade_part
1126 # also write the leftover bytes from the crossfade action
1127 if remaining_bytes:
1128 yield remaining_bytes
1129 bytes_written += len(remaining_bytes)
1130 del remaining_bytes
1131 # clear vars
1132 last_fadeout_part = b""
1133 last_streamdetails = None
1134 buffer = b""
1135
1136 #### OTHER: enough data in buffer, feed to output
1137 while len(buffer) > req_buffer_size:
1138 yield buffer[:pcm_sample_size]
1139 bytes_written += pcm_sample_size
1140 buffer = buffer[pcm_sample_size:]
1141
1142 #### HANDLE END OF TRACK
1143 if last_fadeout_part:
1144 # edge case: we did not get enough data to make the crossfade
1145 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1146 yield _chunk
1147 del _chunk
1148 bytes_written += len(last_fadeout_part)
1149 last_fadeout_part = b""
1150 if self._crossfade_allowed(
1151 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1152 ):
1153 # if crossfade is enabled, save fadeout part to pickup for next track
1154 last_fadeout_part = buffer[-crossfade_buffer_size:]
1155 last_streamdetails = queue_track.streamdetails
1156 last_play_log_entry = play_log_entry
1157 remaining_bytes = buffer[:-crossfade_buffer_size]
1158 if remaining_bytes:
1159 yield remaining_bytes
1160 bytes_written += len(remaining_bytes)
1161 del remaining_bytes
1162 elif buffer:
1163 # no crossfade enabled, just yield the buffer last part
1164 bytes_written += len(buffer)
1165 for _chunk in divide_chunks(buffer, pcm_sample_size):
1166 yield _chunk
1167 del _chunk
1168 # make sure the buffer gets cleaned up
1169 del buffer
1170
1171 # update duration details based on the actual pcm data we sent
1172 # this also accounts for crossfade and silence stripping
1173 seconds_streamed = bytes_written / pcm_sample_size
1174 queue_track.streamdetails.seconds_streamed = seconds_streamed
1175 queue_track.streamdetails.duration = int(
1176 queue_track.streamdetails.seek_position + seconds_streamed
1177 )
1178 play_log_entry.seconds_streamed = seconds_streamed
1179 play_log_entry.duration = queue_track.streamdetails.duration
1180 total_bytes_sent += bytes_written
1181 self.logger.debug(
1182 "Finished Streaming queue track: %s (%s) on queue %s",
1183 queue_track.streamdetails.uri,
1184 queue_track.name,
1185 queue.display_name,
1186 )
1187 #### HANDLE END OF QUEUE FLOW STREAM
1188 # end of queue flow: make sure we yield the last_fadeout_part
1189 if last_fadeout_part:
1190 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1191 yield _chunk
1192 del _chunk
1193 # correct seconds streamed/duration
1194 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1195 streamdetails = queue_track.streamdetails
1196 assert streamdetails is not None
1197 streamdetails.seconds_streamed = (
1198 streamdetails.seconds_streamed or 0
1199 ) + last_part_seconds
1200 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1201 last_fadeout_part = b""
1202 total_bytes_sent += bytes_written
1203 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1204
1205 async def get_announcement_stream(
1206 self,
1207 announcement_url: str,
1208 output_format: AudioFormat,
1209 pre_announce: bool | str = False,
1210 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1211 ) -> AsyncGenerator[bytes, None]:
1212 """Get the special announcement stream."""
1213 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1214 # we are doing announcement in PCM first to avoid multiple encodings
1215 # when mixing pre-announce and announcement
1216 # also we have to deal with some TTS sources being super slow in delivering audio
1217 # so we take an approach where we start fetching the announcement in the background
1218 # while we can already start playing the pre-announce sound (if any)
1219
1220 pcm_format = (
1221 output_format
1222 if output_format.content_type.is_pcm()
1223 else AudioFormat(
1224 sample_rate=output_format.sample_rate,
1225 content_type=ContentType.PCM_S16LE,
1226 bit_depth=16,
1227 channels=output_format.channels,
1228 )
1229 )
1230
1231 async def fetch_announcement() -> None:
1232 fmt = announcement_url.rsplit(".")[-1]
1233 async for chunk in get_ffmpeg_stream(
1234 audio_input=announcement_url,
1235 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1236 output_format=pcm_format,
1237 chunk_size=get_chunksize(pcm_format, 1),
1238 ):
1239 await announcement_data.put(chunk)
1240 await announcement_data.put(None) # signal end of stream
1241
1242 self.mass.create_task(fetch_announcement())
1243
1244 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1245 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1246 if pre_announce:
1247 async for chunk in get_ffmpeg_stream(
1248 audio_input=pre_announce_url,
1249 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1250 output_format=pcm_format,
1251 chunk_size=get_chunksize(pcm_format, 1),
1252 ):
1253 yield chunk
1254 # pad silence while we're waiting for the announcement to be ready
1255 while announcement_data.empty():
1256 yield b"\0" * int(
1257 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1258 )
1259 await asyncio.sleep(0.1)
1260 # stream announcement
1261 while True:
1262 announcement_chunk = await announcement_data.get()
1263 if announcement_chunk is None:
1264 break
1265 yield announcement_chunk
1266
1267 if output_format == pcm_format:
1268 # no need to re-encode, just yield the raw PCM stream
1269 async for chunk in _announcement_stream():
1270 yield chunk
1271 return
1272
1273 # stream final announcement in requested output format
1274 async for chunk in get_ffmpeg_stream(
1275 audio_input=_announcement_stream(),
1276 input_format=pcm_format,
1277 output_format=output_format,
1278 ):
1279 yield chunk
1280
1281 async def get_plugin_source_stream(
1282 self,
1283 plugin_source_id: str,
1284 output_format: AudioFormat,
1285 player_id: str,
1286 player_filter_params: list[str] | None = None,
1287 ) -> AsyncGenerator[bytes, None]:
1288 """Get the special plugin source stream."""
1289 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1290 if not plugin_prov:
1291 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1292
1293 plugin_source = plugin_prov.get_source()
1294 self.logger.debug(
1295 "Start streaming PluginSource %s to %s using output format %s",
1296 plugin_source_id,
1297 player_id,
1298 output_format,
1299 )
1300 # this should already be set by the player controller, but just to be sure
1301 plugin_source.in_use_by = player_id
1302
1303 try:
1304 async for chunk in get_ffmpeg_stream(
1305 audio_input=cast(
1306 "str | AsyncGenerator[bytes, None]",
1307 plugin_prov.get_audio_stream(player_id)
1308 if plugin_source.stream_type == StreamType.CUSTOM
1309 else plugin_source.path,
1310 ),
1311 input_format=plugin_source.audio_format,
1312 output_format=output_format,
1313 filter_params=player_filter_params,
1314 extra_input_args=["-y", "-re"],
1315 ):
1316 if plugin_source.in_use_by != player_id:
1317 # another player took over or the stream ended, stop streaming
1318 break
1319 yield chunk
1320 finally:
1321 self.logger.debug(
1322 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1323 )
1324 await asyncio.sleep(1) # prevent race conditions when selecting source
1325 if plugin_source.in_use_by == player_id:
1326 # release control
1327 plugin_source.in_use_by = None
1328
1329 async def get_queue_item_stream(
1330 self,
1331 queue_item: QueueItem,
1332 pcm_format: AudioFormat,
1333 seek_position: int = 0,
1334 raise_on_error: bool = True,
1335 ) -> AsyncGenerator[bytes, None]:
1336 """Get the (PCM) audio stream for a single queue item."""
1337 # collect all arguments for ffmpeg
1338 streamdetails = queue_item.streamdetails
1339 assert streamdetails
1340 filter_params: list[str] = []
1341
1342 # handle volume normalization
1343 gain_correct: float | None = None
1344 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1345 # volume normalization using loudnorm filter (in dynamic mode)
1346 # which also collects the measurement on the fly during playback
1347 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1348 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1349 filter_rule += ":print_format=json"
1350 filter_params.append(filter_rule)
1351 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1352 # apply user defined fixed volume/gain correction
1353 config_key = (
1354 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1355 if streamdetails.media_type == MediaType.TRACK
1356 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1357 )
1358 gain_value = await self.mass.config.get_core_config_value(
1359 self.domain, config_key, default=0.0, return_type=float
1360 )
1361 gain_correct = round(gain_value, 2)
1362 filter_params.append(f"volume={gain_correct}dB")
1363 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1364 # volume normalization with known loudness measurement
1365 # apply volume/gain correction
1366 target_loudness = (
1367 float(streamdetails.target_loudness)
1368 if streamdetails.target_loudness is not None
1369 else 0.0
1370 )
1371 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1372 gain_correct = target_loudness - float(streamdetails.loudness_album)
1373 elif streamdetails.loudness is not None:
1374 gain_correct = target_loudness - float(streamdetails.loudness)
1375 else:
1376 gain_correct = 0.0
1377 gain_correct = round(gain_correct, 2)
1378 filter_params.append(f"volume={gain_correct}dB")
1379 streamdetails.volume_normalization_gain_correct = gain_correct
1380
1381 allow_buffer = bool(
1382 self.mass.config.get_raw_core_config_value(
1383 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1384 )
1385 and streamdetails.duration
1386 )
1387
1388 self.logger.debug(
1389 "Starting queue item stream for %s (%s)"
1390 " - using buffer: %s"
1391 " - using fade-in: %s"
1392 " - using volume normalization: %s",
1393 queue_item.name,
1394 streamdetails.uri,
1395 allow_buffer,
1396 streamdetails.fade_in,
1397 streamdetails.volume_normalization_mode,
1398 )
1399 if allow_buffer:
1400 media_stream_gen = get_buffered_media_stream(
1401 self.mass,
1402 streamdetails=streamdetails,
1403 pcm_format=pcm_format,
1404 seek_position=int(seek_position),
1405 filter_params=filter_params,
1406 )
1407 else:
1408 media_stream_gen = get_media_stream(
1409 self.mass,
1410 streamdetails=streamdetails,
1411 pcm_format=pcm_format,
1412 seek_position=int(seek_position),
1413 filter_params=filter_params,
1414 )
1415
1416 first_chunk_received = False
1417 fade_in_buffer = b""
1418 bytes_received = 0
1419 finished = False
1420 stream_started_at = asyncio.get_event_loop().time()
1421 try:
1422 async for chunk in media_stream_gen:
1423 bytes_received += len(chunk)
1424 if not first_chunk_received:
1425 first_chunk_received = True
1426 self.logger.debug(
1427 "First audio chunk received for %s (%s) after %.2f seconds",
1428 queue_item.name,
1429 streamdetails.uri,
1430 asyncio.get_event_loop().time() - stream_started_at,
1431 )
1432 # handle optional fade-in
1433 if streamdetails.fade_in:
1434 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1435 fade_in_buffer += chunk
1436 elif fade_in_buffer:
1437 async for fade_chunk in get_ffmpeg_stream(
1438 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1439 # but FFMpeg class actually accepts bytes too. This works at
1440 # runtime but needs type: ignore for mypy.
1441 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1442 input_format=pcm_format,
1443 output_format=pcm_format,
1444 filter_params=["afade=type=in:start_time=0:duration=3"],
1445 ):
1446 yield fade_chunk
1447 fade_in_buffer = b""
1448 streamdetails.fade_in = False
1449 else:
1450 yield chunk
1451 # help garbage collection by explicitly deleting chunk
1452 del chunk
1453 finished = True
1454 except AudioError as err:
1455 streamdetails.stream_error = True
1456 queue_item.available = False
1457 if raise_on_error:
1458 raise
1459 # yes, we swallow the error here after logging it
1460 # so the outer stream can handle it gracefully
1461 self.logger.error(
1462 "AudioError while streaming queue item %s (%s): %s",
1463 queue_item.name,
1464 streamdetails.uri,
1465 err,
1466 )
1467 finally:
1468 # determine how many seconds we've streamed
1469 # for pcm output we can calculate this easily
1470 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1471 streamdetails.seconds_streamed = seconds_streamed
1472 self.logger.debug(
1473 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1474 "aborted" if not finished else "finished",
1475 streamdetails.uri,
1476 asyncio.get_event_loop().time() - stream_started_at,
1477 seconds_streamed,
1478 )
1479 # report stream to provider
1480 if (finished or seconds_streamed >= 90) and (
1481 music_prov := self.mass.get_provider(streamdetails.provider)
1482 ):
1483 if TYPE_CHECKING: # avoid circular import
1484 assert isinstance(music_prov, MusicProvider)
1485 self.mass.create_task(music_prov.on_streamed(streamdetails))
1486
1487 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1488 async def get_queue_item_stream_with_smartfade(
1489 self,
1490 queue_item: QueueItem,
1491 pcm_format: AudioFormat,
1492 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1493 standard_crossfade_duration: int = 10,
1494 ) -> AsyncGenerator[bytes, None]:
1495 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1496 queue = self.mass.player_queues.get(queue_item.queue_id)
1497 if not queue:
1498 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1499
1500 streamdetails = queue_item.streamdetails
1501 assert streamdetails
1502 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1503
1504 if crossfade_data and streamdetails.seek_position > 0:
1505 # don't do crossfade when seeking into track
1506 crossfade_data = None
1507 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1508 # edge case alert: the next item changed just while we were preloading/crossfading
1509 self.logger.warning(
1510 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1511 )
1512 crossfade_data = None
1513
1514 self.logger.debug(
1515 "Start Streaming queue track: %s (%s) for queue %s "
1516 "- crossfade mode: %s "
1517 "- crossfading from previous track: %s ",
1518 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1519 queue_item.name,
1520 queue.display_name,
1521 smart_fades_mode,
1522 "true" if crossfade_data else "false",
1523 )
1524
1525 buffer = b""
1526 bytes_written = 0
1527 # calculate crossfade buffer size
1528 crossfade_buffer_duration = (
1529 SMART_CROSSFADE_DURATION
1530 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1531 else standard_crossfade_duration
1532 )
1533 crossfade_buffer_duration = min(
1534 crossfade_buffer_duration,
1535 int(streamdetails.duration / 2)
1536 if streamdetails.duration
1537 else crossfade_buffer_duration,
1538 )
1539 # Ensure crossfade buffer size is aligned to frame boundaries
1540 # Frame size = bytes_per_sample * channels
1541 bytes_per_sample = pcm_format.bit_depth // 8
1542 frame_size = bytes_per_sample * pcm_format.channels
1543 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1544 # Round down to nearest frame boundary
1545 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1546 fade_out_data: bytes | None = None
1547
1548 if crossfade_data:
1549 # Calculate discard amount in seconds (format-independent)
1550 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1551 fade_in_duration_seconds = (
1552 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1553 )
1554 discard_seconds = int(fade_in_duration_seconds) - 1
1555 # Calculate discard amounts in CURRENT track's format
1556 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1557 # Convert fade_in_size to current track's format for correct leftover calculation
1558 fade_in_size_in_current_format = int(
1559 fade_in_duration_seconds * pcm_format.pcm_sample_size
1560 )
1561 discard_leftover = fade_in_size_in_current_format - discard_bytes
1562 else:
1563 discard_seconds = streamdetails.seek_position
1564 discard_leftover = 0
1565 total_chunks_received = 0
1566 req_buffer_size = crossfade_buffer_size
1567 async for chunk in self.get_queue_item_stream(
1568 queue_item, pcm_format, seek_position=discard_seconds
1569 ):
1570 total_chunks_received += 1
1571 if discard_leftover:
1572 # discard leftover bytes from crossfade data
1573 chunk = chunk[discard_leftover:] # noqa: PLW2901
1574 discard_leftover = 0
1575
1576 if total_chunks_received < 10:
1577 # we want a stream to start as quickly as possible
1578 # so for the first 10 chunks we keep a very short buffer
1579 req_buffer_size = pcm_format.pcm_sample_size
1580 else:
1581 req_buffer_size = crossfade_buffer_size
1582
1583 # ALWAYS APPEND CHUNK TO BUFFER
1584 buffer += chunk
1585 del chunk
1586 if len(buffer) < req_buffer_size:
1587 # buffer is not full enough, move on
1588 continue
1589
1590 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1591 if crossfade_data:
1592 # send the (second half of the) crossfade data
1593 if crossfade_data.pcm_format != pcm_format:
1594 # edge case: pcm format mismatch, we need to resample
1595 self.logger.debug(
1596 "Resampling crossfade data from %s to %s for queue %s",
1597 crossfade_data.pcm_format.sample_rate,
1598 pcm_format.sample_rate,
1599 queue.display_name,
1600 )
1601 resampled_data = await resample_pcm_audio(
1602 crossfade_data.data,
1603 crossfade_data.pcm_format,
1604 pcm_format,
1605 )
1606 if resampled_data:
1607 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1608 yield _chunk
1609 bytes_written += len(resampled_data)
1610 else:
1611 # Resampling failed, error already logged in resample_pcm_audio
1612 # Skip crossfade data entirely - stream continues without it
1613 self.logger.warning(
1614 "Skipping crossfade data for queue %s due to resampling failure",
1615 queue.display_name,
1616 )
1617 else:
1618 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1619 yield _chunk
1620 bytes_written += len(crossfade_data.data)
1621 # clear vars
1622 crossfade_data = None
1623
1624 #### OTHER: enough data in buffer, feed to output
1625 while len(buffer) > req_buffer_size:
1626 yield buffer[: pcm_format.pcm_sample_size]
1627 bytes_written += pcm_format.pcm_sample_size
1628 buffer = buffer[pcm_format.pcm_sample_size :]
1629
1630 #### HANDLE END OF TRACK
1631
1632 if crossfade_data:
1633 # edge case: we did not get enough data to send the crossfade data
1634 # send the (second half of the) crossfade data
1635 if crossfade_data.pcm_format != pcm_format:
1636 # (yet another) edge case: pcm format mismatch, we need to resample
1637 self.logger.debug(
1638 "Resampling remaining crossfade data from %s to %s for queue %s",
1639 crossfade_data.pcm_format.sample_rate,
1640 pcm_format.sample_rate,
1641 queue.display_name,
1642 )
1643 resampled_crossfade_data = await resample_pcm_audio(
1644 crossfade_data.data,
1645 crossfade_data.pcm_format,
1646 pcm_format,
1647 )
1648 if resampled_crossfade_data:
1649 crossfade_data.data = resampled_crossfade_data
1650 else:
1651 # Resampling failed, error already logged in resample_pcm_audio
1652 # Skip the crossfade data entirely
1653 self.logger.warning(
1654 "Skipping remaining crossfade data for queue %s due to resampling failure",
1655 queue.display_name,
1656 )
1657 crossfade_data = None
1658 if crossfade_data:
1659 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1660 yield _chunk
1661 bytes_written += len(crossfade_data.data)
1662 crossfade_data = None
1663
1664 # get next track for crossfade
1665 next_queue_item: QueueItem | None
1666 try:
1667 self.logger.debug(
1668 "Preloading NEXT track for crossfade for queue %s",
1669 queue.display_name,
1670 )
1671 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1672 queue.queue_id, queue_item.queue_item_id
1673 )
1674 # set index_in_buffer to prevent our next track is overwritten while preloading
1675 if next_queue_item.streamdetails is None:
1676 raise InvalidDataError(
1677 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1678 )
1679 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1680 queue.queue_id, next_queue_item.queue_item_id
1681 )
1682 queue_player = self.mass.players.get_player(queue.queue_id)
1683 assert queue_player is not None
1684 next_queue_item_pcm_format = await self._select_pcm_format(
1685 player=queue_player,
1686 streamdetails=next_queue_item.streamdetails,
1687 smartfades_enabled=True,
1688 )
1689 except QueueEmpty:
1690 # end of queue reached, no next item
1691 next_queue_item = None
1692
1693 if not next_queue_item or not self._crossfade_allowed(
1694 queue_item,
1695 smart_fades_mode=smart_fades_mode,
1696 flow_mode=False,
1697 next_queue_item=next_queue_item,
1698 sample_rate=pcm_format.sample_rate,
1699 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1700 ):
1701 # no crossfade enabled/allowed, just yield the buffer last part
1702 bytes_written += len(buffer)
1703 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1704 yield _chunk
1705 else:
1706 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1707 fade_out_data = buffer
1708 buffer = b""
1709 try:
1710 async for chunk in self.get_queue_item_stream(
1711 next_queue_item, next_queue_item_pcm_format
1712 ):
1713 # append to buffer until we reach crossfade size
1714 # we only need the first X seconds of the NEXT track so we can
1715 # perform the crossfade.
1716 # the crossfaded audio of the previous and next track will be
1717 # sent in two equal parts: first half now, second half
1718 # when the next track starts. We use CrossfadeData to store
1719 # the second half to be picked up by the next track's stream generator.
1720 # Note that we more or less expect the user to have enabled the in-memory
1721 # buffer so we can keep the next track's audio data in memory.
1722 buffer += chunk
1723 del chunk
1724 if len(buffer) >= crossfade_buffer_size:
1725 break
1726 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1727 # Store original buffer size before any resampling for fade_in_size calculation
1728 # This size is in the next track's original format which is what we need
1729 original_buffer_size = len(buffer)
1730 if next_queue_item_pcm_format != pcm_format:
1731 # edge case: pcm format mismatch, we need to resample the next track's
1732 # beginning part before crossfading
1733 self.logger.debug(
1734 "Resampling next track's crossfade from %s to %s for queue %s",
1735 next_queue_item_pcm_format.sample_rate,
1736 pcm_format.sample_rate,
1737 queue.display_name,
1738 )
1739 buffer = await resample_pcm_audio(
1740 buffer,
1741 next_queue_item_pcm_format,
1742 pcm_format,
1743 )
1744 # perform actual (smart fades) crossfade using mixer
1745 crossfade_bytes = await self._smart_fades_mixer.mix(
1746 fade_in_part=buffer,
1747 fade_out_part=fade_out_data,
1748 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1749 fade_out_streamdetails=streamdetails,
1750 pcm_format=pcm_format,
1751 standard_crossfade_duration=standard_crossfade_duration,
1752 mode=smart_fades_mode,
1753 )
1754 # send half of the crossfade_part (= approx the fadeout part)
1755 split_point = (len(crossfade_bytes) + 1) // 2
1756 crossfade_first = crossfade_bytes[:split_point]
1757 crossfade_second = crossfade_bytes[split_point:]
1758 del crossfade_bytes
1759 bytes_written += len(crossfade_first)
1760 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1761 yield _chunk
1762 # store the other half for the next track
1763 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1764 # because it was created from the resampled buffer used for mixing.
1765 # BUT fade_in_size represents bytes in NEXT track's original format
1766 # (next_queue_item_pcm_format) because that's how much of the next track
1767 # was consumed during the crossfade. We need both formats to correctly
1768 # handle the crossfade data when the next track starts.
1769 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1770 data=crossfade_second,
1771 fade_in_size=original_buffer_size,
1772 pcm_format=pcm_format, # Format of the data (current track)
1773 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1774 queue_item_id=next_queue_item.queue_item_id,
1775 )
1776 except AudioError:
1777 # no crossfade possible, just yield the fade_out_data
1778 next_queue_item = None
1779 yield fade_out_data
1780 bytes_written += len(fade_out_data)
1781 del fade_out_data
1782 # make sure the buffer gets cleaned up
1783 del buffer
1784 # update duration details based on the actual pcm data we sent
1785 # this also accounts for crossfade and silence stripping
1786 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1787 streamdetails.seconds_streamed = seconds_streamed
1788 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1789 self.logger.debug(
1790 "Finished Streaming queue track: %s (%s) on queue %s "
1791 "- crossfade data prepared for next track: %s",
1792 streamdetails.uri,
1793 queue_item.name,
1794 queue.display_name,
1795 next_queue_item.name if next_queue_item else "N/A",
1796 )
1797
1798 def _log_request(self, request: web.Request) -> None:
1799 """Log request."""
1800 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1801 self.logger.log(
1802 VERBOSE_LOG_LEVEL,
1803 "Got %s request to %s from %s\nheaders: %s\n",
1804 request.method,
1805 request.path,
1806 request.remote,
1807 request.headers,
1808 )
1809 else:
1810 self.logger.debug(
1811 "Got %s request to %s from %s",
1812 request.method,
1813 request.path,
1814 request.remote,
1815 )
1816
1817 async def get_output_format(
1818 self,
1819 output_format_str: str,
1820 player: Player,
1821 content_sample_rate: int,
1822 content_bit_depth: int,
1823 ) -> AudioFormat:
1824 """Parse (player specific) output format details for given format string."""
1825 content_type: ContentType = ContentType.try_parse(output_format_str)
1826 supported_rates_conf = cast(
1827 "list[tuple[str, str]]",
1828 await self.mass.config.get_player_config_value(
1829 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1830 ),
1831 )
1832 output_channels_str = self.mass.config.get_raw_player_config_value(
1833 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1834 )
1835 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1836 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1837
1838 player_max_bit_depth = max(supported_bit_depths)
1839 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1840 if content_sample_rate in supported_sample_rates:
1841 output_sample_rate = content_sample_rate
1842 else:
1843 output_sample_rate = max(supported_sample_rates)
1844
1845 if not content_type.is_lossless():
1846 # no point in having a higher bit depth for lossy formats
1847 output_bit_depth = 16
1848 output_sample_rate = min(48000, output_sample_rate)
1849 if output_format_str == "pcm":
1850 content_type = ContentType.from_bit_depth(output_bit_depth)
1851 return AudioFormat(
1852 content_type=content_type,
1853 sample_rate=output_sample_rate,
1854 bit_depth=output_bit_depth,
1855 channels=1 if output_channels_str != "stereo" else 2,
1856 )
1857
1858 async def _select_flow_format(
1859 self,
1860 player: Player,
1861 ) -> AudioFormat:
1862 """Parse (player specific) flow stream PCM format."""
1863 supported_rates_conf = cast(
1864 "list[tuple[str, str]]",
1865 await self.mass.config.get_player_config_value(
1866 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1867 ),
1868 )
1869 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1870 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1871 for sample_rate in (192000, 96000, 48000, 44100):
1872 if sample_rate in supported_sample_rates:
1873 output_sample_rate = sample_rate
1874 break
1875 return AudioFormat(
1876 content_type=INTERNAL_PCM_FORMAT.content_type,
1877 sample_rate=output_sample_rate,
1878 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1879 channels=2,
1880 )
1881
1882 async def _select_pcm_format(
1883 self,
1884 player: Player,
1885 streamdetails: StreamDetails,
1886 smartfades_enabled: bool,
1887 ) -> AudioFormat:
1888 """Parse (player specific) stream internal PCM format."""
1889 supported_rates_conf = cast(
1890 "list[tuple[str, str]]",
1891 await self.mass.config.get_player_config_value(
1892 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1893 ),
1894 )
1895 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1896 # use highest supported rate within content rate
1897 output_sample_rate = max(
1898 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1899 default=48000, # sane/safe default
1900 )
1901 # work out pcm format based on streamdetails
1902 pcm_format = AudioFormat(
1903 sample_rate=output_sample_rate,
1904 # always use f32 internally for extra headroom for filters etc
1905 content_type=INTERNAL_PCM_FORMAT.content_type,
1906 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1907 channels=streamdetails.audio_format.channels,
1908 )
1909 if smartfades_enabled:
1910 pcm_format.channels = 2 # force stereo for crossfading
1911
1912 return pcm_format
1913
1914 def _crossfade_allowed(
1915 self,
1916 queue_item: QueueItem,
1917 smart_fades_mode: SmartFadesMode,
1918 flow_mode: bool = False,
1919 next_queue_item: QueueItem | None = None,
1920 sample_rate: int | None = None,
1921 next_sample_rate: int | None = None,
1922 ) -> bool:
1923 """Get the crossfade config for a queue item."""
1924 if smart_fades_mode == SmartFadesMode.DISABLED:
1925 return False
1926 if not (self.mass.players.get_player(queue_item.queue_id)):
1927 return False # just a guard
1928 if queue_item.media_type != MediaType.TRACK:
1929 self.logger.debug("Skipping crossfade: current item is not a track")
1930 return False
1931 # check if the next item is part of the same album
1932 next_item = next_queue_item or self.mass.player_queues.get_next_item(
1933 queue_item.queue_id, queue_item.queue_item_id
1934 )
1935 if not next_item:
1936 # there is no next item!
1937 return False
1938 # check if next item is a track
1939 if next_item.media_type != MediaType.TRACK:
1940 self.logger.debug("Skipping crossfade: next item is not a track")
1941 return False
1942 if (
1943 isinstance(queue_item.media_item, Track)
1944 and isinstance(next_item.media_item, Track)
1945 and queue_item.media_item.album
1946 and next_item.media_item.album
1947 and queue_item.media_item.album == next_item.media_item.album
1948 and not self.mass.config.get_raw_core_config_value(
1949 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
1950 )
1951 ):
1952 # in general, crossfade is not desired for tracks of the same (gapless) album
1953 # because we have no accurate way to determine if the album is gapless or not,
1954 # for now we just never crossfade between tracks of the same album
1955 self.logger.debug("Skipping crossfade: next item is part of the same album")
1956 return False
1957
1958 # check if we're allowed to crossfade on different sample rates
1959 if (
1960 not flow_mode
1961 and sample_rate
1962 and next_sample_rate
1963 and sample_rate != next_sample_rate
1964 and not self.mass.config.get_raw_player_config_value(
1965 queue_item.queue_id,
1966 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
1967 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
1968 )
1969 ):
1970 self.logger.debug(
1971 "Skipping crossfade: player does not support gapless playback "
1972 "with different sample rates (%s vs %s)",
1973 sample_rate,
1974 next_sample_rate,
1975 )
1976 return False
1977
1978 return True
1979
1980 async def _periodic_garbage_collection(self) -> None:
1981 """Periodic garbage collection to free up memory from audio buffers and streams."""
1982 self.logger.log(
1983 VERBOSE_LOG_LEVEL,
1984 "Running periodic garbage collection...",
1985 )
1986 # Run garbage collection in executor to avoid blocking the event loop
1987 # Since this runs periodically (not in response to subprocess cleanup),
1988 # it's safe to run in a thread without causing thread-safety issues
1989 loop = asyncio.get_running_loop()
1990 collected = await loop.run_in_executor(None, gc.collect)
1991 self.logger.log(
1992 VERBOSE_LOG_LEVEL,
1993 "Garbage collection completed, collected %d objects",
1994 collected,
1995 )
1996 # Schedule next run in 15 minutes
1997 self.mass.call_later(900, self._periodic_garbage_collection)
1998
1999 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
2000 """Set up smart fades logger level."""
2001 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
2002 if log_level == "GLOBAL":
2003 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2004 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2005 else:
2006 self.smart_fades_analyzer.logger.setLevel(log_level)
2007 self.smart_fades_mixer.logger.setLevel(log_level)
2008