/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_HTTP_PROFILE,
49 CONF_OUTPUT_CHANNELS,
50 CONF_OUTPUT_CODEC,
51 CONF_PUBLISH_IP,
52 CONF_SAMPLE_RATES,
53 CONF_SMART_FADES_MODE,
54 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56 CONF_VOLUME_NORMALIZATION_RADIO,
57 CONF_VOLUME_NORMALIZATION_TRACKS,
58 DEFAULT_STREAM_HEADERS,
59 ICY_HEADERS,
60 INTERNAL_PCM_FORMAT,
61 SILENCE_FILE,
62 VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.player_controller import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70 get_buffered_media_stream,
71 get_chunksize,
72 get_media_stream,
73 get_player_filter_params,
74 get_stream_details,
75 resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81 divide_chunks,
82 format_ip_for_url,
83 get_ip_addresses,
84 get_total_system_memory,
85 select_free_port,
86)
87from music_assistant.helpers.webserver import Webserver
88from music_assistant.models.core_controller import CoreController
89from music_assistant.models.music_provider import MusicProvider
90from music_assistant.models.plugin import PluginProvider, PluginSource
91from music_assistant.models.smart_fades import SmartFadesMode
92from music_assistant.providers.universal_group.constants import UGP_PREFIX
93from music_assistant.providers.universal_group.player import UniversalGroupPlayer
94
95if TYPE_CHECKING:
96 from music_assistant_models.config_entries import CoreConfig
97 from music_assistant_models.player import PlayerMedia
98 from music_assistant_models.player_queue import PlayerQueue
99 from music_assistant_models.queue_item import QueueItem
100 from music_assistant_models.streamdetails import StreamDetails
101
102 from music_assistant.mass import MusicAssistant
103 from music_assistant.models.player import Player
104
105
106isfile = wrap(os.path.isfile)
107
108CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
109CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
110CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
111
112# Calculate total system memory once at module load time
113TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
114CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses(include_ipv6=True)
189 default_port = await select_free_port(8097, 9200)
190 return (
191 ConfigEntry(
192 key=CONF_ALLOW_BUFFER,
193 type=ConfigEntryType.BOOLEAN,
194 default_value=CONF_ALLOW_BUFFER_DEFAULT,
195 label="Allow (in-memory) buffering of (track) audio",
196 description="By default, Music Assistant tries to be as resource "
197 "efficient as possible when streaming audio, especially considering "
198 "low-end devices such as Raspberry Pi's. This means that audio "
199 "buffering is disabled by default to reduce memory usage. \n\n"
200 "Enabling this option allows for in-memory buffering of audio, "
201 "which (massively) improves playback (and seeking) performance but it comes "
202 "at the cost of increased memory usage. "
203 "If you run Music Assistant on a capable device with enough memory, "
204 "enabling this option is strongly recommended.",
205 required=False,
206 category="playback",
207 ),
208 ConfigEntry(
209 key=CONF_VOLUME_NORMALIZATION_RADIO,
210 type=ConfigEntryType.STRING,
211 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
212 label="Volume normalization method for radio streams",
213 options=[
214 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
215 for x in VolumeNormalizationMode
216 ],
217 category="playback",
218 ),
219 ConfigEntry(
220 key=CONF_VOLUME_NORMALIZATION_TRACKS,
221 type=ConfigEntryType.STRING,
222 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
223 label="Volume normalization method for tracks",
224 options=[
225 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
226 for x in VolumeNormalizationMode
227 ],
228 category="playback",
229 ),
230 ConfigEntry(
231 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
232 type=ConfigEntryType.FLOAT,
233 range=(-20, 10),
234 default_value=-6,
235 label="Fixed/fallback gain adjustment for radio streams",
236 category="playback",
237 ),
238 ConfigEntry(
239 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
240 type=ConfigEntryType.FLOAT,
241 range=(-20, 10),
242 default_value=-6,
243 label="Fixed/fallback gain adjustment for tracks",
244 category="playback",
245 ),
246 ConfigEntry(
247 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
248 type=ConfigEntryType.BOOLEAN,
249 default_value=False,
250 label="Allow crossfade between tracks from the same album",
251 description="Enabling this option allows for crossfading between tracks "
252 "that are part of the same album.",
253 category="playback",
254 ),
255 ConfigEntry(
256 key=CONF_PUBLISH_IP,
257 type=ConfigEntryType.STRING,
258 default_value=ip_addresses[0],
259 label="Published IP address",
260 description="This IP address is communicated to players where to find this server."
261 "\nMake sure that this IP can be reached by players on the local network, "
262 "otherwise audio streaming will not work.",
263 required=False,
264 category="generic",
265 advanced=True,
266 requires_reload=True,
267 ),
268 ConfigEntry(
269 key=CONF_BIND_PORT,
270 type=ConfigEntryType.INTEGER,
271 default_value=default_port,
272 label="TCP Port",
273 description="The TCP port to run the server. "
274 "Make sure that this server can be reached "
275 "on the given IP and TCP port by players on the local network.",
276 category="generic",
277 advanced=True,
278 requires_reload=True,
279 ),
280 ConfigEntry(
281 key=CONF_BIND_IP,
282 type=ConfigEntryType.STRING,
283 default_value="0.0.0.0",
284 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", *ip_addresses}],
285 label="Bind to IP/interface",
286 description="Start the stream server on this specific interface. \n"
287 "Use 0.0.0.0 to bind to all interfaces, which is the default. \n"
288 "This is an advanced setting that should normally "
289 "not be adjusted in regular setups.",
290 category="generic",
291 advanced=True,
292 required=False,
293 requires_reload=True,
294 ),
295 ConfigEntry(
296 key=CONF_SMART_FADES_LOG_LEVEL,
297 type=ConfigEntryType.STRING,
298 label="Smart Fades Log level",
299 description="Log level for the Smart Fades mixer and analyzer.",
300 options=CONF_ENTRY_LOG_LEVEL.options,
301 default_value="GLOBAL",
302 category="generic",
303 advanced=True,
304 ),
305 )
306
307 async def setup(self, config: CoreConfig) -> None:
308 """Async initialize of module."""
309 # copy log level to audio/ffmpeg loggers
310 AUDIO_LOGGER.setLevel(self.logger.level)
311 FFMPEG_LOGGER.setLevel(self.logger.level)
312 self._setup_smart_fades_logger(config)
313 # perform check for ffmpeg version
314 await check_ffmpeg_version()
315 # start the webserver
316 self.publish_port = config.get_value(CONF_BIND_PORT)
317 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
318 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
319 # print a big fat message in the log where the streamserver is running
320 # because this is a common source of issues for people with more complex setups
321 self.logger.log(
322 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
323 "\n\n################################################################################\n"
324 "Starting streamserver on %s:%s\n"
325 "This is the IP address that is communicated to players.\n"
326 "If this is incorrect, audio will not play!\n"
327 "See the documentation how to configure the publish IP for the Streamserver\n"
328 "in Settings --> Core modules --> Streamserver\n"
329 "################################################################################\n",
330 self.publish_ip,
331 self.publish_port,
332 )
333 await self._server.setup(
334 bind_ip=bind_ip,
335 bind_port=cast("int", self.publish_port),
336 base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
337 static_routes=[
338 (
339 "*",
340 "/flow/{session_id}/{queue_id}/{queue_item_id}.{fmt}",
341 self.serve_queue_flow_stream,
342 ),
343 (
344 "*",
345 "/single/{session_id}/{queue_id}/{queue_item_id}.{fmt}",
346 self.serve_queue_item_stream,
347 ),
348 (
349 "*",
350 "/command/{queue_id}/{command}.mp3",
351 self.serve_command_request,
352 ),
353 (
354 "*",
355 "/announcement/{player_id}.{fmt}",
356 self.serve_announcement_stream,
357 ),
358 (
359 "*",
360 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
361 self.serve_plugin_source_stream,
362 ),
363 ],
364 )
365 # Start periodic garbage collection task
366 # This ensures memory from audio buffers and streams is cleaned up regularly
367 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
368
369 async def close(self) -> None:
370 """Cleanup on exit."""
371 await self._server.close()
372
373 async def resolve_stream_url(
374 self,
375 session_id: str,
376 queue_item: QueueItem,
377 flow_mode: bool = False,
378 player_id: str | None = None,
379 ) -> str:
380 """Resolve the stream URL for the given QueueItem."""
381 if not player_id:
382 player_id = queue_item.queue_id
383 conf_output_codec = await self.mass.config.get_player_config_value(
384 player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
385 )
386 output_codec = ContentType.try_parse(conf_output_codec or "flac")
387 fmt = output_codec.value
388 # handle raw pcm without exact format specifiers
389 if output_codec.is_pcm() and ";" not in fmt:
390 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
391 base_path = "flow" if flow_mode else "single"
392 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
393
394 async def get_plugin_source_url(
395 self,
396 plugin_source: PluginSource,
397 player_id: str,
398 ) -> str:
399 """Get the url for the Plugin Source stream/proxy."""
400 if plugin_source.audio_format.content_type.is_pcm():
401 fmt = ContentType.WAV.value
402 else:
403 fmt = plugin_source.audio_format.content_type.value
404 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
405
406 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
407 """Stream single queueitem audio to a player."""
408 self._log_request(request)
409 queue_id = request.match_info["queue_id"]
410 queue = self.mass.player_queues.get(queue_id)
411 if not queue:
412 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
413 session_id = request.match_info["session_id"]
414 if queue.session_id and session_id != queue.session_id:
415 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
416 queue_player = self.mass.players.get(queue_id)
417 queue_item_id = request.match_info["queue_item_id"]
418 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
419 if not queue_item:
420 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
421 if not queue_item.streamdetails:
422 try:
423 queue_item.streamdetails = await get_stream_details(
424 mass=self.mass, queue_item=queue_item
425 )
426 except Exception as e:
427 self.logger.error(
428 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
429 )
430 queue_item.available = False
431 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
432
433 # pick output format based on the streamdetails and player capabilities
434 if not queue_player:
435 raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
436
437 # work out pcm format based on streamdetails
438 pcm_format = await self._select_pcm_format(
439 player=queue_player,
440 streamdetails=queue_item.streamdetails,
441 smartfades_enabled=True,
442 )
443 output_format = await self.get_output_format(
444 output_format_str=request.match_info["fmt"],
445 player=queue_player,
446 content_sample_rate=pcm_format.sample_rate,
447 content_bit_depth=pcm_format.bit_depth,
448 )
449
450 # prepare request, add some DLNA/UPNP compatible headers
451 headers = {
452 **DEFAULT_STREAM_HEADERS,
453 "icy-name": queue_item.name,
454 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
455 "Accept-Ranges": "none",
456 "Content-Type": f"audio/{output_format.output_format_str}",
457 }
458 resp = web.StreamResponse(
459 status=200,
460 reason="OK",
461 headers=headers,
462 )
463 resp.content_type = f"audio/{output_format.output_format_str}"
464 http_profile = await self.mass.config.get_player_config_value(
465 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
466 )
467 if http_profile == "forced_content_length" and not queue_item.duration:
468 # just set an insane high content length to make sure the player keeps playing
469 resp.content_length = get_chunksize(output_format, 12 * 3600)
470 elif http_profile == "forced_content_length" and queue_item.duration:
471 # guess content length based on duration
472 resp.content_length = get_chunksize(output_format, queue_item.duration)
473 elif http_profile == "chunked":
474 resp.enable_chunked_encoding()
475
476 await resp.prepare(request)
477
478 # return early if this is not a GET request
479 if request.method != "GET":
480 return resp
481
482 if queue_item.media_type != MediaType.TRACK:
483 # no crossfade on non-tracks
484 smart_fades_mode = SmartFadesMode.DISABLED
485 else:
486 smart_fades_mode = await self.mass.config.get_player_config_value(
487 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
488 )
489 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
490 queue.queue_id, CONF_CROSSFADE_DURATION, 10
491 )
492 if (
493 smart_fades_mode != SmartFadesMode.DISABLED
494 and PlayerFeature.GAPLESS_PLAYBACK not in queue_player.supported_features
495 ):
496 # crossfade is not supported on this player due to missing gapless playback
497 self.logger.warning(
498 "Crossfade disabled: Player %s does not support gapless playback, "
499 "consider enabling flow mode to enable crossfade on this player.",
500 queue_player.display_name if queue_player else "Unknown Player",
501 )
502 smart_fades_mode = SmartFadesMode.DISABLED
503
504 if smart_fades_mode != SmartFadesMode.DISABLED:
505 # crossfade is enabled, use special crossfaded single item stream
506 # where the crossfade of the next track is present in the stream of
507 # a single track. This only works if the player supports gapless playback!
508 audio_input = self.get_queue_item_stream_with_smartfade(
509 queue_item=queue_item,
510 pcm_format=pcm_format,
511 smart_fades_mode=smart_fades_mode,
512 standard_crossfade_duration=standard_crossfade_duration,
513 )
514 else:
515 # no crossfade, just a regular single item stream
516 audio_input = self.get_queue_item_stream(
517 queue_item=queue_item,
518 pcm_format=pcm_format,
519 seek_position=queue_item.streamdetails.seek_position,
520 )
521 # stream the audio
522 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
523 # the desired output format for the player including any player specific filter params
524 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
525 if queue_item.media_type == MediaType.RADIO:
526 # keep very short buffer for radio streams
527 # to keep them (more or less) realtime and prevent time outs
528 read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "2"]
529 else:
530 # just allow the player to buffer whatever it wants for single item streams
531 read_rate_input_args = None
532
533 first_chunk_received = False
534 bytes_sent = 0
535 async for chunk in get_ffmpeg_stream(
536 audio_input=audio_input,
537 input_format=pcm_format,
538 output_format=output_format,
539 filter_params=get_player_filter_params(
540 self.mass,
541 player_id=queue_player.player_id,
542 input_format=pcm_format,
543 output_format=output_format,
544 ),
545 extra_input_args=read_rate_input_args,
546 ):
547 try:
548 await resp.write(chunk)
549 bytes_sent += len(chunk)
550 if not first_chunk_received:
551 first_chunk_received = True
552 # inform the queue that the track is now loaded in the buffer
553 # so for example the next track can be enqueued
554 self.mass.player_queues.track_loaded_in_buffer(
555 queue_item.queue_id, queue_item.queue_item_id
556 )
557 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
558 if first_chunk_received and not queue_player.stop_called:
559 # Player disconnected (unexpected) after receiving at least some data
560 # This could indicate buffering issues, network problems,
561 # or player-specific issues
562 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
563 self.logger.warning(
564 "Player %s disconnected prematurely from stream for %s (%s) - "
565 "error: %s, sent %d bytes, expected (approx) bytes=%d",
566 queue.display_name,
567 queue_item.name,
568 queue_item.uri,
569 err.__class__.__name__,
570 bytes_sent,
571 bytes_expected,
572 )
573 break
574 if queue_item.streamdetails.stream_error:
575 self.logger.error(
576 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
577 queue_item.name,
578 queue_item.uri,
579 queue.display_name,
580 )
581 # try to skip to the next item in the queue after a short delay
582 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
583 return resp
584
585 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
586 """Stream Queue Flow audio to player."""
587 self._log_request(request)
588 queue_id = request.match_info["queue_id"]
589 queue = self.mass.player_queues.get(queue_id)
590 if not queue:
591 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
592 if not (queue_player := self.mass.players.get(queue_id)):
593 raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
594 start_queue_item_id = request.match_info["queue_item_id"]
595 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
596 if not start_queue_item:
597 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
598
599 queue.flow_mode_stream_log = []
600
601 # select the highest possible PCM settings for this player
602 flow_pcm_format = await self._select_flow_format(queue_player)
603
604 # work out output format/details
605 output_format = await self.get_output_format(
606 output_format_str=request.match_info["fmt"],
607 player=queue_player,
608 content_sample_rate=flow_pcm_format.sample_rate,
609 content_bit_depth=flow_pcm_format.bit_depth,
610 )
611 # work out ICY metadata support
612 icy_preference = self.mass.config.get_raw_player_config_value(
613 queue_id,
614 CONF_ENTRY_ENABLE_ICY_METADATA.key,
615 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
616 )
617 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
618 icy_meta_interval = 256000 if icy_preference == "full" else 16384
619
620 # prepare request, add some DLNA/UPNP compatible headers
621 headers = {
622 **DEFAULT_STREAM_HEADERS,
623 **ICY_HEADERS,
624 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
625 "Accept-Ranges": "none",
626 "Content-Type": f"audio/{output_format.output_format_str}",
627 }
628 if enable_icy:
629 headers["icy-metaint"] = str(icy_meta_interval)
630
631 resp = web.StreamResponse(
632 status=200,
633 reason="OK",
634 headers=headers,
635 )
636 http_profile = await self.mass.config.get_player_config_value(
637 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
638 )
639 if http_profile == "forced_content_length":
640 # just set an insane high content length to make sure the player keeps playing
641 resp.content_length = get_chunksize(output_format, 12 * 3600)
642 elif http_profile == "chunked":
643 resp.enable_chunked_encoding()
644
645 await resp.prepare(request)
646
647 # return early if this is not a GET request
648 if request.method != "GET":
649 return resp
650
651 # all checks passed, start streaming!
652 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
653 # the desired output format for the player including any player specific filter params
654 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
655 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
656
657 async for chunk in get_ffmpeg_stream(
658 audio_input=self.get_queue_flow_stream(
659 queue=queue,
660 start_queue_item=start_queue_item,
661 pcm_format=flow_pcm_format,
662 ),
663 input_format=flow_pcm_format,
664 output_format=output_format,
665 filter_params=get_player_filter_params(
666 self.mass, queue_player.player_id, flow_pcm_format, output_format
667 ),
668 # we need to slowly feed the music to avoid the player stopping and later
669 # restarting (or completely failing) the audio stream by keeping the buffer short.
670 # this is reported to be an issue especially with Chromecast players.
671 # see for example: https://github.com/music-assistant/support/issues/3717
672 # allow buffer ahead of 6 seconds and read rest in realtime
673 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
674 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
675 ):
676 try:
677 await resp.write(chunk)
678 except (BrokenPipeError, ConnectionResetError, ConnectionError):
679 # race condition
680 break
681
682 if not enable_icy:
683 continue
684
685 # if icy metadata is enabled, send the icy metadata after the chunk
686 if (
687 # use current item here and not buffered item, otherwise
688 # the icy metadata will be too much ahead
689 (current_item := queue.current_item)
690 and current_item.streamdetails
691 and current_item.streamdetails.stream_title
692 ):
693 title = current_item.streamdetails.stream_title
694 elif queue and current_item and current_item.name:
695 title = current_item.name
696 else:
697 title = "Music Assistant"
698 metadata = f"StreamTitle='{title}';".encode()
699 if icy_preference == "full" and current_item and current_item.image:
700 metadata += f"StreamURL='{current_item.image.path}'".encode()
701 while len(metadata) % 16 != 0:
702 metadata += b"\x00"
703 length = len(metadata)
704 length_b = chr(int(length / 16)).encode()
705 await resp.write(length_b + metadata)
706
707 return resp
708
709 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
710 """Handle special 'command' request for a player."""
711 self._log_request(request)
712 queue_id = request.match_info["queue_id"]
713 command = request.match_info["command"]
714 if command == "next":
715 self.mass.create_task(self.mass.player_queues.next(queue_id))
716 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
717
718 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
719 """Stream announcement audio to a player."""
720 self._log_request(request)
721 player_id = request.match_info["player_id"]
722 player = self.mass.player_queues.get(player_id)
723 if not player:
724 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
725 if not (announce_data := self.announcements.get(player_id)):
726 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
727
728 # work out output format/details
729 fmt = request.match_info["fmt"]
730 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
731
732 http_profile = await self.mass.config.get_player_config_value(
733 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
734 )
735 if http_profile == "forced_content_length":
736 # given the fact that an announcement is just a short audio clip,
737 # just send it over completely at once so we have a fixed content length
738 data = b""
739 async for chunk in self.get_announcement_stream(
740 announcement_url=announce_data["announcement_url"],
741 output_format=audio_format,
742 pre_announce=announce_data["pre_announce"],
743 pre_announce_url=announce_data["pre_announce_url"],
744 ):
745 data += chunk
746 return web.Response(
747 body=data,
748 content_type=f"audio/{audio_format.output_format_str}",
749 headers=DEFAULT_STREAM_HEADERS,
750 )
751
752 resp = web.StreamResponse(
753 status=200,
754 reason="OK",
755 headers=DEFAULT_STREAM_HEADERS,
756 )
757 resp.content_type = f"audio/{audio_format.output_format_str}"
758 if http_profile == "chunked":
759 resp.enable_chunked_encoding()
760
761 await resp.prepare(request)
762
763 # return early if this is not a GET request
764 if request.method != "GET":
765 return resp
766
767 # all checks passed, start streaming!
768 self.logger.debug(
769 "Start serving audio stream for Announcement %s to %s",
770 announce_data["announcement_url"],
771 player.display_name,
772 )
773 async for chunk in self.get_announcement_stream(
774 announcement_url=announce_data["announcement_url"],
775 output_format=audio_format,
776 pre_announce=announce_data["pre_announce"],
777 pre_announce_url=announce_data["pre_announce_url"],
778 ):
779 try:
780 await resp.write(chunk)
781 except (BrokenPipeError, ConnectionResetError):
782 break
783
784 self.logger.debug(
785 "Finished serving audio stream for Announcement %s to %s",
786 announce_data["announcement_url"],
787 player.display_name,
788 )
789
790 return resp
791
792 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
793 """Stream PluginSource audio to a player."""
794 self._log_request(request)
795 plugin_source_id = request.match_info["plugin_source"]
796 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
797 if not provider:
798 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
799 # work out output format/details
800 player_id = request.match_info["player_id"]
801 player = self.mass.players.get(player_id)
802 if not player:
803 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
804 plugin_source = provider.get_source()
805 output_format = await self.get_output_format(
806 output_format_str=request.match_info["fmt"],
807 player=player,
808 content_sample_rate=plugin_source.audio_format.sample_rate,
809 content_bit_depth=plugin_source.audio_format.bit_depth,
810 )
811 headers = {
812 **DEFAULT_STREAM_HEADERS,
813 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
814 "icy-name": plugin_source.name,
815 "Accept-Ranges": "none",
816 "Content-Type": f"audio/{output_format.output_format_str}",
817 }
818
819 resp = web.StreamResponse(
820 status=200,
821 reason="OK",
822 headers=headers,
823 )
824 resp.content_type = f"audio/{output_format.output_format_str}"
825 http_profile = await self.mass.config.get_player_config_value(
826 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
827 )
828 if http_profile == "forced_content_length":
829 # just set an insanely high content length to make sure the player keeps playing
830 resp.content_length = get_chunksize(output_format, 12 * 3600)
831 elif http_profile == "chunked":
832 resp.enable_chunked_encoding()
833
834 await resp.prepare(request)
835
836 # return early if this is not a GET request
837 if request.method != "GET":
838 return resp
839
840 # all checks passed, start streaming!
841 if not plugin_source.audio_format:
842 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
843 async for chunk in self.get_plugin_source_stream(
844 plugin_source_id=plugin_source_id,
845 output_format=output_format,
846 player_id=player_id,
847 player_filter_params=get_player_filter_params(
848 self.mass, player_id, plugin_source.audio_format, output_format
849 ),
850 ):
851 try:
852 await resp.write(chunk)
853 except (BrokenPipeError, ConnectionResetError, ConnectionError):
854 break
855 return resp
856
857 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
858 """Get the url for the special command stream."""
859 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
860
861 def get_announcement_url(
862 self,
863 player_id: str,
864 announce_data: AnnounceData,
865 content_type: ContentType = ContentType.MP3,
866 ) -> str:
867 """Get the url for the special announcement stream."""
868 self.announcements[player_id] = announce_data
869 # use stream server to host announcement on local network
870 # this ensures playback on all players, including ones that do not
871 # like https hosts and it also offers the pre-announce 'bell'
872 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
873
874 def get_stream(
875 self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
876 ) -> AsyncGenerator[bytes, None]:
877 """
878 Get a stream of the given media as raw PCM audio.
879
880 This is used as helper for player providers that can consume the raw PCM
881 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
882 """
883 # select audio source
884 if media.media_type == MediaType.ANNOUNCEMENT:
885 # special case: stream announcement
886 assert media.custom_data
887 audio_source = self.get_announcement_stream(
888 media.custom_data["announcement_url"],
889 output_format=pcm_format,
890 pre_announce=media.custom_data["pre_announce"],
891 pre_announce_url=media.custom_data["pre_announce_url"],
892 )
893 elif media.media_type == MediaType.PLUGIN_SOURCE:
894 # special case: plugin source stream
895 assert media.custom_data
896 audio_source = self.get_plugin_source_stream(
897 plugin_source_id=media.custom_data["source_id"],
898 output_format=pcm_format,
899 # need to pass player_id from the PlayerMedia object
900 # because this could have been a group
901 player_id=media.custom_data["player_id"],
902 )
903 elif (
904 media.media_type == MediaType.FLOW_STREAM
905 and media.source_id
906 and media.source_id.startswith(UGP_PREFIX)
907 and media.uri
908 and "/ugp/" in media.uri
909 ):
910 # special case: member player accessing UGP stream
911 # Check URI to distinguish from the UGP accessing its own stream
912 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
913 ugp_stream = ugp_player.stream
914 assert ugp_stream is not None # for type checker
915 if ugp_stream.base_pcm_format == pcm_format:
916 # no conversion needed
917 audio_source = ugp_stream.subscribe_raw()
918 else:
919 audio_source = ugp_stream.get_stream(output_format=pcm_format)
920 elif (
921 media.source_id
922 and media.queue_item_id
923 and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
924 ):
925 # regular queue (flow) stream request
926 queue = self.mass.player_queues.get(media.source_id)
927 assert queue
928 start_queue_item = self.mass.player_queues.get_item(
929 media.source_id, media.queue_item_id
930 )
931 assert start_queue_item
932 audio_source = self.mass.streams.get_queue_flow_stream(
933 queue=queue,
934 start_queue_item=start_queue_item,
935 pcm_format=pcm_format,
936 )
937 elif media.source_id and media.queue_item_id:
938 # single item stream (e.g. radio)
939 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
940 assert queue_item
941 audio_source = buffered(
942 self.get_queue_item_stream(
943 queue_item=queue_item,
944 pcm_format=pcm_format,
945 ),
946 buffer_size=10,
947 min_buffer_before_yield=2,
948 )
949 else:
950 # assume url or some other direct path
951 # NOTE: this will fail if its an uri not playable by ffmpeg
952 audio_source = get_ffmpeg_stream(
953 audio_input=media.uri,
954 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
955 output_format=pcm_format,
956 )
957 return audio_source
958
959 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
960 async def get_queue_flow_stream(
961 self,
962 queue: PlayerQueue,
963 start_queue_item: QueueItem,
964 pcm_format: AudioFormat,
965 ) -> AsyncGenerator[bytes, None]:
966 """
967 Get a flow stream of all tracks in the queue as raw PCM audio.
968
969 yields chunks of exactly 1 second of audio in the given pcm_format.
970 """
971 # ruff: noqa: PLR0915
972 assert pcm_format.content_type.is_pcm()
973 queue_track = None
974 last_fadeout_part: bytes = b""
975 last_streamdetails: StreamDetails | None = None
976 last_play_log_entry: PlayLogEntry | None = None
977 queue.flow_mode = True
978 if not start_queue_item:
979 # this can happen in some (edge case) race conditions
980 return
981 pcm_sample_size = pcm_format.pcm_sample_size
982 if start_queue_item.media_type != MediaType.TRACK:
983 # no crossfade on non-tracks
984 smart_fades_mode = SmartFadesMode.DISABLED
985 standard_crossfade_duration = 0
986 else:
987 smart_fades_mode = await self.mass.config.get_player_config_value(
988 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
989 )
990 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
991 queue.queue_id, CONF_CROSSFADE_DURATION, 10
992 )
993 self.logger.info(
994 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
995 queue.display_name,
996 smart_fades_mode,
997 f"({standard_crossfade_duration}s)"
998 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
999 else "",
1000 )
1001 total_bytes_sent = 0
1002 total_chunks_received = 0
1003
1004 while True:
1005 # get (next) queue item to stream
1006 if queue_track is None:
1007 queue_track = start_queue_item
1008 else:
1009 try:
1010 queue_track = await self.mass.player_queues.load_next_queue_item(
1011 queue.queue_id, queue_track.queue_item_id
1012 )
1013 except QueueEmpty:
1014 break
1015
1016 if queue_track.streamdetails is None:
1017 raise InvalidDataError(
1018 "No Streamdetails known for queue item %s",
1019 queue_track.queue_item_id,
1020 )
1021
1022 self.logger.debug(
1023 "Start Streaming queue track: %s (%s) for queue %s",
1024 queue_track.streamdetails.uri,
1025 queue_track.name,
1026 queue.display_name,
1027 )
1028 # append to play log so the queue controller can work out which track is playing
1029 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1030 queue.flow_mode_stream_log.append(play_log_entry)
1031 # calculate crossfade buffer size
1032 crossfade_buffer_duration = (
1033 SMART_CROSSFADE_DURATION
1034 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1035 else standard_crossfade_duration
1036 )
1037 crossfade_buffer_duration = min(
1038 crossfade_buffer_duration,
1039 int(queue_track.streamdetails.duration / 2)
1040 if queue_track.streamdetails.duration
1041 else crossfade_buffer_duration,
1042 )
1043 # Ensure crossfade buffer size is aligned to frame boundaries
1044 # Frame size = bytes_per_sample * channels
1045 bytes_per_sample = pcm_format.bit_depth // 8
1046 frame_size = bytes_per_sample * pcm_format.channels
1047 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1048 # Round down to nearest frame boundary
1049 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1050
1051 bytes_written = 0
1052 buffer = b""
1053 # handle incoming audio chunks
1054 first_chunk_received = False
1055 # buffer size needs to be big enough to include the crossfade part
1056
1057 async for chunk in self.get_queue_item_stream(
1058 queue_track,
1059 pcm_format=pcm_format,
1060 seek_position=queue_track.streamdetails.seek_position,
1061 raise_on_error=False,
1062 ):
1063 total_chunks_received += 1
1064 if not first_chunk_received:
1065 first_chunk_received = True
1066 # inform the queue that the track is now loaded in the buffer
1067 # so the next track can be preloaded
1068 self.mass.player_queues.track_loaded_in_buffer(
1069 queue.queue_id, queue_track.queue_item_id
1070 )
1071 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1072 # we want a stream to start as quickly as possible
1073 # so for the first 10 chunks we keep a very short buffer
1074 req_buffer_size = pcm_format.pcm_sample_size
1075 else:
1076 req_buffer_size = (
1077 pcm_sample_size
1078 if smart_fades_mode == SmartFadesMode.DISABLED
1079 else crossfade_buffer_size
1080 )
1081
1082 # ALWAYS APPEND CHUNK TO BUFFER
1083 buffer += chunk
1084 del chunk
1085 if len(buffer) < req_buffer_size:
1086 # buffer is not full enough, move on
1087 # yield control to event loop with 10ms delay
1088 await asyncio.sleep(0.01)
1089 continue
1090
1091 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1092 if last_fadeout_part and last_streamdetails:
1093 # perform crossfade
1094 fadein_part = buffer[:crossfade_buffer_size]
1095 remaining_bytes = buffer[crossfade_buffer_size:]
1096 # Use the mixer to handle all crossfade logic
1097 crossfade_part = await self._smart_fades_mixer.mix(
1098 fade_in_part=fadein_part,
1099 fade_out_part=last_fadeout_part,
1100 fade_in_streamdetails=queue_track.streamdetails,
1101 fade_out_streamdetails=last_streamdetails,
1102 pcm_format=pcm_format,
1103 standard_crossfade_duration=standard_crossfade_duration,
1104 mode=smart_fades_mode,
1105 )
1106 # because the crossfade exists of both the fadein and fadeout part
1107 # we need to correct the bytes_written accordingly so the duration
1108 # calculations at the end of the track are correct
1109 crossfade_part_len = len(crossfade_part)
1110 bytes_written += int(crossfade_part_len / 2)
1111 if last_play_log_entry:
1112 assert last_play_log_entry.seconds_streamed is not None
1113 last_play_log_entry.seconds_streamed += (
1114 crossfade_part_len / 2 / pcm_sample_size
1115 )
1116 # yield crossfade_part (in pcm_sample_size chunks)
1117 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1118 yield _chunk
1119 del _chunk
1120 del crossfade_part
1121 # also write the leftover bytes from the crossfade action
1122 if remaining_bytes:
1123 yield remaining_bytes
1124 bytes_written += len(remaining_bytes)
1125 del remaining_bytes
1126 # clear vars
1127 last_fadeout_part = b""
1128 last_streamdetails = None
1129 buffer = b""
1130
1131 #### OTHER: enough data in buffer, feed to output
1132 while len(buffer) > req_buffer_size:
1133 yield buffer[:pcm_sample_size]
1134 bytes_written += pcm_sample_size
1135 buffer = buffer[pcm_sample_size:]
1136
1137 #### HANDLE END OF TRACK
1138 if last_fadeout_part:
1139 # edge case: we did not get enough data to make the crossfade
1140 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1141 yield _chunk
1142 del _chunk
1143 bytes_written += len(last_fadeout_part)
1144 last_fadeout_part = b""
1145 if self._crossfade_allowed(
1146 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1147 ):
1148 # if crossfade is enabled, save fadeout part to pickup for next track
1149 last_fadeout_part = buffer[-crossfade_buffer_size:]
1150 last_streamdetails = queue_track.streamdetails
1151 last_play_log_entry = play_log_entry
1152 remaining_bytes = buffer[:-crossfade_buffer_size]
1153 if remaining_bytes:
1154 yield remaining_bytes
1155 bytes_written += len(remaining_bytes)
1156 del remaining_bytes
1157 elif buffer:
1158 # no crossfade enabled, just yield the buffer last part
1159 bytes_written += len(buffer)
1160 for _chunk in divide_chunks(buffer, pcm_sample_size):
1161 yield _chunk
1162 del _chunk
1163 # make sure the buffer gets cleaned up
1164 del buffer
1165
1166 # update duration details based on the actual pcm data we sent
1167 # this also accounts for crossfade and silence stripping
1168 seconds_streamed = bytes_written / pcm_sample_size
1169 queue_track.streamdetails.seconds_streamed = seconds_streamed
1170 queue_track.streamdetails.duration = int(
1171 queue_track.streamdetails.seek_position + seconds_streamed
1172 )
1173 play_log_entry.seconds_streamed = seconds_streamed
1174 play_log_entry.duration = queue_track.streamdetails.duration
1175 total_bytes_sent += bytes_written
1176 self.logger.debug(
1177 "Finished Streaming queue track: %s (%s) on queue %s",
1178 queue_track.streamdetails.uri,
1179 queue_track.name,
1180 queue.display_name,
1181 )
1182 #### HANDLE END OF QUEUE FLOW STREAM
1183 # end of queue flow: make sure we yield the last_fadeout_part
1184 if last_fadeout_part:
1185 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1186 yield _chunk
1187 del _chunk
1188 # correct seconds streamed/duration
1189 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1190 streamdetails = queue_track.streamdetails
1191 assert streamdetails is not None
1192 streamdetails.seconds_streamed = (
1193 streamdetails.seconds_streamed or 0
1194 ) + last_part_seconds
1195 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1196 last_fadeout_part = b""
1197 total_bytes_sent += bytes_written
1198 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1199
1200 async def get_announcement_stream(
1201 self,
1202 announcement_url: str,
1203 output_format: AudioFormat,
1204 pre_announce: bool | str = False,
1205 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1206 ) -> AsyncGenerator[bytes, None]:
1207 """Get the special announcement stream."""
1208 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1209 # we are doing announcement in PCM first to avoid multiple encodings
1210 # when mixing pre-announce and announcement
1211 # also we have to deal with some TTS sources being super slow in delivering audio
1212 # so we take an approach where we start fetching the announcement in the background
1213 # while we can already start playing the pre-announce sound (if any)
1214
1215 pcm_format = (
1216 output_format
1217 if output_format.content_type.is_pcm()
1218 else AudioFormat(
1219 sample_rate=output_format.sample_rate,
1220 content_type=ContentType.PCM_S16LE,
1221 bit_depth=16,
1222 channels=output_format.channels,
1223 )
1224 )
1225
1226 async def fetch_announcement() -> None:
1227 fmt = announcement_url.rsplit(".")[-1]
1228 async for chunk in get_ffmpeg_stream(
1229 audio_input=announcement_url,
1230 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1231 output_format=pcm_format,
1232 chunk_size=get_chunksize(pcm_format, 1),
1233 ):
1234 await announcement_data.put(chunk)
1235 await announcement_data.put(None) # signal end of stream
1236
1237 self.mass.create_task(fetch_announcement())
1238
1239 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1240 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1241 if pre_announce:
1242 async for chunk in get_ffmpeg_stream(
1243 audio_input=pre_announce_url,
1244 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1245 output_format=pcm_format,
1246 chunk_size=get_chunksize(pcm_format, 1),
1247 ):
1248 yield chunk
1249 # pad silence while we're waiting for the announcement to be ready
1250 while announcement_data.empty():
1251 yield b"\0" * int(
1252 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1253 )
1254 await asyncio.sleep(0.1)
1255 # stream announcement
1256 while True:
1257 announcement_chunk = await announcement_data.get()
1258 if announcement_chunk is None:
1259 break
1260 yield announcement_chunk
1261
1262 if output_format == pcm_format:
1263 # no need to re-encode, just yield the raw PCM stream
1264 async for chunk in _announcement_stream():
1265 yield chunk
1266 return
1267
1268 # stream final announcement in requested output format
1269 async for chunk in get_ffmpeg_stream(
1270 audio_input=_announcement_stream(),
1271 input_format=pcm_format,
1272 output_format=output_format,
1273 ):
1274 yield chunk
1275
1276 async def get_plugin_source_stream(
1277 self,
1278 plugin_source_id: str,
1279 output_format: AudioFormat,
1280 player_id: str,
1281 player_filter_params: list[str] | None = None,
1282 ) -> AsyncGenerator[bytes, None]:
1283 """Get the special plugin source stream."""
1284 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1285 if not plugin_prov:
1286 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1287
1288 plugin_source = plugin_prov.get_source()
1289 self.logger.debug(
1290 "Start streaming PluginSource %s to %s using output format %s",
1291 plugin_source_id,
1292 player_id,
1293 output_format,
1294 )
1295 # this should already be set by the player controller, but just to be sure
1296 plugin_source.in_use_by = player_id
1297
1298 try:
1299 async for chunk in get_ffmpeg_stream(
1300 audio_input=cast(
1301 "str | AsyncGenerator[bytes, None]",
1302 plugin_prov.get_audio_stream(player_id)
1303 if plugin_source.stream_type == StreamType.CUSTOM
1304 else plugin_source.path,
1305 ),
1306 input_format=plugin_source.audio_format,
1307 output_format=output_format,
1308 filter_params=player_filter_params,
1309 extra_input_args=["-y", "-re"],
1310 ):
1311 if plugin_source.in_use_by != player_id:
1312 # another player took over or the stream ended, stop streaming
1313 break
1314 yield chunk
1315 finally:
1316 self.logger.debug(
1317 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1318 )
1319 await asyncio.sleep(1) # prevent race conditions when selecting source
1320 if plugin_source.in_use_by == player_id:
1321 # release control
1322 plugin_source.in_use_by = None
1323
1324 async def get_queue_item_stream(
1325 self,
1326 queue_item: QueueItem,
1327 pcm_format: AudioFormat,
1328 seek_position: int = 0,
1329 raise_on_error: bool = True,
1330 ) -> AsyncGenerator[bytes, None]:
1331 """Get the (PCM) audio stream for a single queue item."""
1332 # collect all arguments for ffmpeg
1333 streamdetails = queue_item.streamdetails
1334 assert streamdetails
1335 filter_params: list[str] = []
1336
1337 # handle volume normalization
1338 gain_correct: float | None = None
1339 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1340 # volume normalization using loudnorm filter (in dynamic mode)
1341 # which also collects the measurement on the fly during playback
1342 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1343 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1344 filter_rule += ":print_format=json"
1345 filter_params.append(filter_rule)
1346 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1347 # apply user defined fixed volume/gain correction
1348 config_key = (
1349 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1350 if streamdetails.media_type == MediaType.TRACK
1351 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1352 )
1353 gain_value = await self.mass.config.get_core_config_value(
1354 self.domain, config_key, default=0.0, return_type=float
1355 )
1356 gain_correct = round(gain_value, 2)
1357 filter_params.append(f"volume={gain_correct}dB")
1358 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1359 # volume normalization with known loudness measurement
1360 # apply volume/gain correction
1361 target_loudness = (
1362 float(streamdetails.target_loudness)
1363 if streamdetails.target_loudness is not None
1364 else 0.0
1365 )
1366 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1367 gain_correct = target_loudness - float(streamdetails.loudness_album)
1368 elif streamdetails.loudness is not None:
1369 gain_correct = target_loudness - float(streamdetails.loudness)
1370 else:
1371 gain_correct = 0.0
1372 gain_correct = round(gain_correct, 2)
1373 filter_params.append(f"volume={gain_correct}dB")
1374 streamdetails.volume_normalization_gain_correct = gain_correct
1375
1376 allow_buffer = bool(
1377 self.mass.config.get_raw_core_config_value(
1378 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1379 )
1380 and streamdetails.duration
1381 )
1382
1383 self.logger.debug(
1384 "Starting queue item stream for %s (%s)"
1385 " - using buffer: %s"
1386 " - using fade-in: %s"
1387 " - using volume normalization: %s",
1388 queue_item.name,
1389 streamdetails.uri,
1390 allow_buffer,
1391 streamdetails.fade_in,
1392 streamdetails.volume_normalization_mode,
1393 )
1394 if allow_buffer:
1395 media_stream_gen = get_buffered_media_stream(
1396 self.mass,
1397 streamdetails=streamdetails,
1398 pcm_format=pcm_format,
1399 seek_position=int(seek_position),
1400 filter_params=filter_params,
1401 )
1402 else:
1403 media_stream_gen = get_media_stream(
1404 self.mass,
1405 streamdetails=streamdetails,
1406 pcm_format=pcm_format,
1407 seek_position=int(seek_position),
1408 filter_params=filter_params,
1409 )
1410
1411 first_chunk_received = False
1412 fade_in_buffer = b""
1413 bytes_received = 0
1414 finished = False
1415 stream_started_at = asyncio.get_event_loop().time()
1416 try:
1417 async for chunk in media_stream_gen:
1418 bytes_received += len(chunk)
1419 if not first_chunk_received:
1420 first_chunk_received = True
1421 self.logger.debug(
1422 "First audio chunk received for %s (%s) after %.2f seconds",
1423 queue_item.name,
1424 streamdetails.uri,
1425 asyncio.get_event_loop().time() - stream_started_at,
1426 )
1427 # handle optional fade-in
1428 if streamdetails.fade_in:
1429 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1430 fade_in_buffer += chunk
1431 elif fade_in_buffer:
1432 async for fade_chunk in get_ffmpeg_stream(
1433 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1434 # but FFMpeg class actually accepts bytes too. This works at
1435 # runtime but needs type: ignore for mypy.
1436 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1437 input_format=pcm_format,
1438 output_format=pcm_format,
1439 filter_params=["afade=type=in:start_time=0:duration=3"],
1440 ):
1441 yield fade_chunk
1442 fade_in_buffer = b""
1443 streamdetails.fade_in = False
1444 else:
1445 yield chunk
1446 # help garbage collection by explicitly deleting chunk
1447 del chunk
1448 finished = True
1449 except AudioError as err:
1450 streamdetails.stream_error = True
1451 queue_item.available = False
1452 if raise_on_error:
1453 raise
1454 # yes, we swallow the error here after logging it
1455 # so the outer stream can handle it gracefully
1456 self.logger.error(
1457 "AudioError while streaming queue item %s (%s): %s",
1458 queue_item.name,
1459 streamdetails.uri,
1460 err,
1461 )
1462 finally:
1463 # determine how many seconds we've streamed
1464 # for pcm output we can calculate this easily
1465 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1466 streamdetails.seconds_streamed = seconds_streamed
1467 self.logger.debug(
1468 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1469 "aborted" if not finished else "finished",
1470 streamdetails.uri,
1471 asyncio.get_event_loop().time() - stream_started_at,
1472 seconds_streamed,
1473 )
1474 # report stream to provider
1475 if (finished or seconds_streamed >= 90) and (
1476 music_prov := self.mass.get_provider(streamdetails.provider)
1477 ):
1478 if TYPE_CHECKING: # avoid circular import
1479 assert isinstance(music_prov, MusicProvider)
1480 self.mass.create_task(music_prov.on_streamed(streamdetails))
1481
1482 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1483 async def get_queue_item_stream_with_smartfade(
1484 self,
1485 queue_item: QueueItem,
1486 pcm_format: AudioFormat,
1487 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1488 standard_crossfade_duration: int = 10,
1489 ) -> AsyncGenerator[bytes, None]:
1490 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1491 queue = self.mass.player_queues.get(queue_item.queue_id)
1492 if not queue:
1493 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1494
1495 streamdetails = queue_item.streamdetails
1496 assert streamdetails
1497 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1498
1499 if crossfade_data and streamdetails.seek_position > 0:
1500 # don't do crossfade when seeking into track
1501 crossfade_data = None
1502 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1503 # edge case alert: the next item changed just while we were preloading/crossfading
1504 self.logger.warning(
1505 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1506 )
1507 crossfade_data = None
1508
1509 self.logger.debug(
1510 "Start Streaming queue track: %s (%s) for queue %s "
1511 "- crossfade mode: %s "
1512 "- crossfading from previous track: %s ",
1513 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1514 queue_item.name,
1515 queue.display_name,
1516 smart_fades_mode,
1517 "true" if crossfade_data else "false",
1518 )
1519
1520 buffer = b""
1521 bytes_written = 0
1522 # calculate crossfade buffer size
1523 crossfade_buffer_duration = (
1524 SMART_CROSSFADE_DURATION
1525 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1526 else standard_crossfade_duration
1527 )
1528 crossfade_buffer_duration = min(
1529 crossfade_buffer_duration,
1530 int(streamdetails.duration / 2)
1531 if streamdetails.duration
1532 else crossfade_buffer_duration,
1533 )
1534 # Ensure crossfade buffer size is aligned to frame boundaries
1535 # Frame size = bytes_per_sample * channels
1536 bytes_per_sample = pcm_format.bit_depth // 8
1537 frame_size = bytes_per_sample * pcm_format.channels
1538 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1539 # Round down to nearest frame boundary
1540 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1541 fade_out_data: bytes | None = None
1542
1543 if crossfade_data:
1544 # Calculate discard amount in seconds (format-independent)
1545 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1546 fade_in_duration_seconds = (
1547 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1548 )
1549 discard_seconds = int(fade_in_duration_seconds) - 1
1550 # Calculate discard amounts in CURRENT track's format
1551 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1552 # Convert fade_in_size to current track's format for correct leftover calculation
1553 fade_in_size_in_current_format = int(
1554 fade_in_duration_seconds * pcm_format.pcm_sample_size
1555 )
1556 discard_leftover = fade_in_size_in_current_format - discard_bytes
1557 else:
1558 discard_seconds = streamdetails.seek_position
1559 discard_leftover = 0
1560 total_chunks_received = 0
1561 req_buffer_size = crossfade_buffer_size
1562 async for chunk in self.get_queue_item_stream(
1563 queue_item, pcm_format, seek_position=discard_seconds
1564 ):
1565 total_chunks_received += 1
1566 if discard_leftover:
1567 # discard leftover bytes from crossfade data
1568 chunk = chunk[discard_leftover:] # noqa: PLW2901
1569 discard_leftover = 0
1570
1571 if total_chunks_received < 10:
1572 # we want a stream to start as quickly as possible
1573 # so for the first 10 chunks we keep a very short buffer
1574 req_buffer_size = pcm_format.pcm_sample_size
1575 else:
1576 req_buffer_size = crossfade_buffer_size
1577
1578 # ALWAYS APPEND CHUNK TO BUFFER
1579 buffer += chunk
1580 del chunk
1581 if len(buffer) < req_buffer_size:
1582 # buffer is not full enough, move on
1583 continue
1584
1585 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1586 if crossfade_data:
1587 # send the (second half of the) crossfade data
1588 if crossfade_data.pcm_format != pcm_format:
1589 # edge case: pcm format mismatch, we need to resample
1590 self.logger.debug(
1591 "Resampling crossfade data from %s to %s for queue %s",
1592 crossfade_data.pcm_format.sample_rate,
1593 pcm_format.sample_rate,
1594 queue.display_name,
1595 )
1596 resampled_data = await resample_pcm_audio(
1597 crossfade_data.data,
1598 crossfade_data.pcm_format,
1599 pcm_format,
1600 )
1601 if resampled_data:
1602 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1603 yield _chunk
1604 bytes_written += len(resampled_data)
1605 else:
1606 # Resampling failed, error already logged in resample_pcm_audio
1607 # Skip crossfade data entirely - stream continues without it
1608 self.logger.warning(
1609 "Skipping crossfade data for queue %s due to resampling failure",
1610 queue.display_name,
1611 )
1612 else:
1613 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1614 yield _chunk
1615 bytes_written += len(crossfade_data.data)
1616 # clear vars
1617 crossfade_data = None
1618
1619 #### OTHER: enough data in buffer, feed to output
1620 while len(buffer) > req_buffer_size:
1621 yield buffer[: pcm_format.pcm_sample_size]
1622 bytes_written += pcm_format.pcm_sample_size
1623 buffer = buffer[pcm_format.pcm_sample_size :]
1624
1625 #### HANDLE END OF TRACK
1626
1627 if crossfade_data:
1628 # edge case: we did not get enough data to send the crossfade data
1629 # send the (second half of the) crossfade data
1630 if crossfade_data.pcm_format != pcm_format:
1631 # (yet another) edge case: pcm format mismatch, we need to resample
1632 self.logger.debug(
1633 "Resampling remaining crossfade data from %s to %s for queue %s",
1634 crossfade_data.pcm_format.sample_rate,
1635 pcm_format.sample_rate,
1636 queue.display_name,
1637 )
1638 resampled_crossfade_data = await resample_pcm_audio(
1639 crossfade_data.data,
1640 crossfade_data.pcm_format,
1641 pcm_format,
1642 )
1643 if resampled_crossfade_data:
1644 crossfade_data.data = resampled_crossfade_data
1645 else:
1646 # Resampling failed, error already logged in resample_pcm_audio
1647 # Skip the crossfade data entirely
1648 self.logger.warning(
1649 "Skipping remaining crossfade data for queue %s due to resampling failure",
1650 queue.display_name,
1651 )
1652 crossfade_data = None
1653 if crossfade_data:
1654 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1655 yield _chunk
1656 bytes_written += len(crossfade_data.data)
1657 crossfade_data = None
1658
1659 # get next track for crossfade
1660 next_queue_item: QueueItem | None
1661 try:
1662 self.logger.debug(
1663 "Preloading NEXT track for crossfade for queue %s",
1664 queue.display_name,
1665 )
1666 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1667 queue.queue_id, queue_item.queue_item_id
1668 )
1669 # set index_in_buffer to prevent our next track is overwritten while preloading
1670 if next_queue_item.streamdetails is None:
1671 raise InvalidDataError(
1672 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1673 )
1674 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1675 queue.queue_id, next_queue_item.queue_item_id
1676 )
1677 queue_player = self.mass.players.get(queue.queue_id)
1678 assert queue_player is not None
1679 next_queue_item_pcm_format = await self._select_pcm_format(
1680 player=queue_player,
1681 streamdetails=next_queue_item.streamdetails,
1682 smartfades_enabled=True,
1683 )
1684 except QueueEmpty:
1685 # end of queue reached, no next item
1686 next_queue_item = None
1687
1688 if not next_queue_item or not self._crossfade_allowed(
1689 queue_item,
1690 smart_fades_mode=smart_fades_mode,
1691 flow_mode=False,
1692 next_queue_item=next_queue_item,
1693 sample_rate=pcm_format.sample_rate,
1694 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1695 ):
1696 # no crossfade enabled/allowed, just yield the buffer last part
1697 bytes_written += len(buffer)
1698 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1699 yield _chunk
1700 else:
1701 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1702 fade_out_data = buffer
1703 buffer = b""
1704 try:
1705 async for chunk in self.get_queue_item_stream(
1706 next_queue_item, next_queue_item_pcm_format
1707 ):
1708 # append to buffer until we reach crossfade size
1709 # we only need the first X seconds of the NEXT track so we can
1710 # perform the crossfade.
1711 # the crossfaded audio of the previous and next track will be
1712 # sent in two equal parts: first half now, second half
1713 # when the next track starts. We use CrossfadeData to store
1714 # the second half to be picked up by the next track's stream generator.
1715 # Note that we more or less expect the user to have enabled the in-memory
1716 # buffer so we can keep the next track's audio data in memory.
1717 buffer += chunk
1718 del chunk
1719 if len(buffer) >= crossfade_buffer_size:
1720 break
1721 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1722 # Store original buffer size before any resampling for fade_in_size calculation
1723 # This size is in the next track's original format which is what we need
1724 original_buffer_size = len(buffer)
1725 if next_queue_item_pcm_format != pcm_format:
1726 # edge case: pcm format mismatch, we need to resample the next track's
1727 # beginning part before crossfading
1728 self.logger.debug(
1729 "Resampling next track's crossfade from %s to %s for queue %s",
1730 next_queue_item_pcm_format.sample_rate,
1731 pcm_format.sample_rate,
1732 queue.display_name,
1733 )
1734 buffer = await resample_pcm_audio(
1735 buffer,
1736 next_queue_item_pcm_format,
1737 pcm_format,
1738 )
1739 # perform actual (smart fades) crossfade using mixer
1740 crossfade_bytes = await self._smart_fades_mixer.mix(
1741 fade_in_part=buffer,
1742 fade_out_part=fade_out_data,
1743 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1744 fade_out_streamdetails=streamdetails,
1745 pcm_format=pcm_format,
1746 standard_crossfade_duration=standard_crossfade_duration,
1747 mode=smart_fades_mode,
1748 )
1749 # send half of the crossfade_part (= approx the fadeout part)
1750 split_point = (len(crossfade_bytes) + 1) // 2
1751 crossfade_first = crossfade_bytes[:split_point]
1752 crossfade_second = crossfade_bytes[split_point:]
1753 del crossfade_bytes
1754 bytes_written += len(crossfade_first)
1755 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1756 yield _chunk
1757 # store the other half for the next track
1758 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1759 # because it was created from the resampled buffer used for mixing.
1760 # BUT fade_in_size represents bytes in NEXT track's original format
1761 # (next_queue_item_pcm_format) because that's how much of the next track
1762 # was consumed during the crossfade. We need both formats to correctly
1763 # handle the crossfade data when the next track starts.
1764 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1765 data=crossfade_second,
1766 fade_in_size=original_buffer_size,
1767 pcm_format=pcm_format, # Format of the data (current track)
1768 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1769 queue_item_id=next_queue_item.queue_item_id,
1770 )
1771 except AudioError:
1772 # no crossfade possible, just yield the fade_out_data
1773 next_queue_item = None
1774 yield fade_out_data
1775 bytes_written += len(fade_out_data)
1776 del fade_out_data
1777 # make sure the buffer gets cleaned up
1778 del buffer
1779 # update duration details based on the actual pcm data we sent
1780 # this also accounts for crossfade and silence stripping
1781 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1782 streamdetails.seconds_streamed = seconds_streamed
1783 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1784 self.logger.debug(
1785 "Finished Streaming queue track: %s (%s) on queue %s "
1786 "- crossfade data prepared for next track: %s",
1787 streamdetails.uri,
1788 queue_item.name,
1789 queue.display_name,
1790 next_queue_item.name if next_queue_item else "N/A",
1791 )
1792
1793 def _log_request(self, request: web.Request) -> None:
1794 """Log request."""
1795 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1796 self.logger.log(
1797 VERBOSE_LOG_LEVEL,
1798 "Got %s request to %s from %s\nheaders: %s\n",
1799 request.method,
1800 request.path,
1801 request.remote,
1802 request.headers,
1803 )
1804 else:
1805 self.logger.debug(
1806 "Got %s request to %s from %s",
1807 request.method,
1808 request.path,
1809 request.remote,
1810 )
1811
1812 async def get_output_format(
1813 self,
1814 output_format_str: str,
1815 player: Player,
1816 content_sample_rate: int,
1817 content_bit_depth: int,
1818 ) -> AudioFormat:
1819 """Parse (player specific) output format details for given format string."""
1820 content_type: ContentType = ContentType.try_parse(output_format_str)
1821 supported_rates_conf = cast(
1822 "list[tuple[str, str]]",
1823 await self.mass.config.get_player_config_value(
1824 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1825 ),
1826 )
1827 output_channels_str = self.mass.config.get_raw_player_config_value(
1828 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1829 )
1830 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1831 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1832
1833 player_max_bit_depth = max(supported_bit_depths)
1834 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1835 if content_sample_rate in supported_sample_rates:
1836 output_sample_rate = content_sample_rate
1837 else:
1838 output_sample_rate = max(supported_sample_rates)
1839
1840 if not content_type.is_lossless():
1841 # no point in having a higher bit depth for lossy formats
1842 output_bit_depth = 16
1843 output_sample_rate = min(48000, output_sample_rate)
1844 if output_format_str == "pcm":
1845 content_type = ContentType.from_bit_depth(output_bit_depth)
1846 return AudioFormat(
1847 content_type=content_type,
1848 sample_rate=output_sample_rate,
1849 bit_depth=output_bit_depth,
1850 channels=1 if output_channels_str != "stereo" else 2,
1851 )
1852
1853 async def _select_flow_format(
1854 self,
1855 player: Player,
1856 ) -> AudioFormat:
1857 """Parse (player specific) flow stream PCM format."""
1858 supported_rates_conf = cast(
1859 "list[tuple[str, str]]",
1860 await self.mass.config.get_player_config_value(
1861 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1862 ),
1863 )
1864 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1865 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1866 for sample_rate in (192000, 96000, 48000, 44100):
1867 if sample_rate in supported_sample_rates:
1868 output_sample_rate = sample_rate
1869 break
1870 return AudioFormat(
1871 content_type=INTERNAL_PCM_FORMAT.content_type,
1872 sample_rate=output_sample_rate,
1873 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1874 channels=2,
1875 )
1876
1877 async def _select_pcm_format(
1878 self,
1879 player: Player,
1880 streamdetails: StreamDetails,
1881 smartfades_enabled: bool,
1882 ) -> AudioFormat:
1883 """Parse (player specific) stream internal PCM format."""
1884 supported_rates_conf = cast(
1885 "list[tuple[str, str]]",
1886 await self.mass.config.get_player_config_value(
1887 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1888 ),
1889 )
1890 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1891 # use highest supported rate within content rate
1892 output_sample_rate = max(
1893 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1894 default=48000, # sane/safe default
1895 )
1896 # work out pcm format based on streamdetails
1897 pcm_format = AudioFormat(
1898 sample_rate=output_sample_rate,
1899 # always use f32 internally for extra headroom for filters etc
1900 content_type=INTERNAL_PCM_FORMAT.content_type,
1901 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1902 channels=streamdetails.audio_format.channels,
1903 )
1904 if smartfades_enabled:
1905 pcm_format.channels = 2 # force stereo for crossfading
1906
1907 return pcm_format
1908
1909 def _crossfade_allowed(
1910 self,
1911 queue_item: QueueItem,
1912 smart_fades_mode: SmartFadesMode,
1913 flow_mode: bool = False,
1914 next_queue_item: QueueItem | None = None,
1915 sample_rate: int | None = None,
1916 next_sample_rate: int | None = None,
1917 ) -> bool:
1918 """Get the crossfade config for a queue item."""
1919 if smart_fades_mode == SmartFadesMode.DISABLED:
1920 return False
1921 if not (self.mass.players.get(queue_item.queue_id)):
1922 return False # just a guard
1923 if queue_item.media_type != MediaType.TRACK:
1924 self.logger.debug("Skipping crossfade: current item is not a track")
1925 return False
1926 # check if the next item is part of the same album
1927 next_item = next_queue_item or self.mass.player_queues.get_next_item(
1928 queue_item.queue_id, queue_item.queue_item_id
1929 )
1930 if not next_item:
1931 # there is no next item!
1932 return False
1933 # check if next item is a track
1934 if next_item.media_type != MediaType.TRACK:
1935 self.logger.debug("Skipping crossfade: next item is not a track")
1936 return False
1937 if (
1938 isinstance(queue_item.media_item, Track)
1939 and isinstance(next_item.media_item, Track)
1940 and queue_item.media_item.album
1941 and next_item.media_item.album
1942 and queue_item.media_item.album == next_item.media_item.album
1943 and not self.mass.config.get_raw_core_config_value(
1944 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
1945 )
1946 ):
1947 # in general, crossfade is not desired for tracks of the same (gapless) album
1948 # because we have no accurate way to determine if the album is gapless or not,
1949 # for now we just never crossfade between tracks of the same album
1950 self.logger.debug("Skipping crossfade: next item is part of the same album")
1951 return False
1952
1953 # check if we're allowed to crossfade on different sample rates
1954 if (
1955 not flow_mode
1956 and sample_rate
1957 and next_sample_rate
1958 and sample_rate != next_sample_rate
1959 and not self.mass.config.get_raw_player_config_value(
1960 queue_item.queue_id,
1961 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
1962 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
1963 )
1964 ):
1965 self.logger.debug(
1966 "Skipping crossfade: player does not support gapless playback "
1967 "with different sample rates (%s vs %s)",
1968 sample_rate,
1969 next_sample_rate,
1970 )
1971 return False
1972
1973 return True
1974
1975 async def _periodic_garbage_collection(self) -> None:
1976 """Periodic garbage collection to free up memory from audio buffers and streams."""
1977 self.logger.log(
1978 VERBOSE_LOG_LEVEL,
1979 "Running periodic garbage collection...",
1980 )
1981 # Run garbage collection in executor to avoid blocking the event loop
1982 # Since this runs periodically (not in response to subprocess cleanup),
1983 # it's safe to run in a thread without causing thread-safety issues
1984 loop = asyncio.get_running_loop()
1985 collected = await loop.run_in_executor(None, gc.collect)
1986 self.logger.log(
1987 VERBOSE_LOG_LEVEL,
1988 "Garbage collection completed, collected %d objects",
1989 collected,
1990 )
1991 # Schedule next run in 15 minutes
1992 self.mass.call_later(900, self._periodic_garbage_collection)
1993
1994 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
1995 """Set up smart fades logger level."""
1996 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
1997 if log_level == "GLOBAL":
1998 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
1999 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2000 else:
2001 self.smart_fades_analyzer.logger.setLevel(log_level)
2002 self.smart_fades_mixer.logger.setLevel(log_level)
2003