/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_HTTP_PROFILE,
49 CONF_OUTPUT_CHANNELS,
50 CONF_OUTPUT_CODEC,
51 CONF_PUBLISH_IP,
52 CONF_SAMPLE_RATES,
53 CONF_SMART_FADES_MODE,
54 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56 CONF_VOLUME_NORMALIZATION_RADIO,
57 CONF_VOLUME_NORMALIZATION_TRACKS,
58 DEFAULT_STREAM_HEADERS,
59 ICY_HEADERS,
60 INTERNAL_PCM_FORMAT,
61 SILENCE_FILE,
62 VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.player_controller import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70 get_buffered_media_stream,
71 get_chunksize,
72 get_media_stream,
73 get_player_filter_params,
74 get_stream_details,
75 resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81 divide_chunks,
82 format_ip_for_url,
83 get_ip_addresses,
84 get_total_system_memory,
85 select_free_port,
86)
87from music_assistant.helpers.webserver import Webserver
88from music_assistant.models.core_controller import CoreController
89from music_assistant.models.music_provider import MusicProvider
90from music_assistant.models.plugin import PluginProvider, PluginSource
91from music_assistant.models.smart_fades import SmartFadesMode
92from music_assistant.providers.universal_group.constants import UGP_PREFIX
93from music_assistant.providers.universal_group.player import UniversalGroupPlayer
94
95if TYPE_CHECKING:
96 from music_assistant_models.config_entries import CoreConfig
97 from music_assistant_models.player import PlayerMedia
98 from music_assistant_models.player_queue import PlayerQueue
99 from music_assistant_models.queue_item import QueueItem
100 from music_assistant_models.streamdetails import StreamDetails
101
102 from music_assistant.mass import MusicAssistant
103 from music_assistant.models.player import Player
104
105
106isfile = wrap(os.path.isfile)
107
108CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
109CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
110CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
111
112# Calculate total system memory once at module load time
113TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
114CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses(include_ipv6=True)
189 default_port = await select_free_port(8097, 9200)
190 return (
191 ConfigEntry(
192 key=CONF_ALLOW_BUFFER,
193 type=ConfigEntryType.BOOLEAN,
194 default_value=CONF_ALLOW_BUFFER_DEFAULT,
195 label="Allow (in-memory) buffering of (track) audio",
196 description="By default, Music Assistant tries to be as resource "
197 "efficient as possible when streaming audio, especially considering "
198 "low-end devices such as Raspberry Pi's. This means that audio "
199 "buffering is disabled by default to reduce memory usage. \n\n"
200 "Enabling this option allows for in-memory buffering of audio, "
201 "which (massively) improves playback (and seeking) performance but it comes "
202 "at the cost of increased memory usage. "
203 "If you run Music Assistant on a capable device with enough memory, "
204 "enabling this option is strongly recommended.",
205 required=False,
206 category="playback",
207 ),
208 ConfigEntry(
209 key=CONF_VOLUME_NORMALIZATION_RADIO,
210 type=ConfigEntryType.STRING,
211 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
212 label="Volume normalization method for radio streams",
213 options=[
214 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
215 for x in VolumeNormalizationMode
216 ],
217 category="playback",
218 ),
219 ConfigEntry(
220 key=CONF_VOLUME_NORMALIZATION_TRACKS,
221 type=ConfigEntryType.STRING,
222 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
223 label="Volume normalization method for tracks",
224 options=[
225 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
226 for x in VolumeNormalizationMode
227 ],
228 category="playback",
229 ),
230 ConfigEntry(
231 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
232 type=ConfigEntryType.FLOAT,
233 range=(-20, 10),
234 default_value=-6,
235 label="Fixed/fallback gain adjustment for radio streams",
236 category="playback",
237 ),
238 ConfigEntry(
239 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
240 type=ConfigEntryType.FLOAT,
241 range=(-20, 10),
242 default_value=-6,
243 label="Fixed/fallback gain adjustment for tracks",
244 category="playback",
245 ),
246 ConfigEntry(
247 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
248 type=ConfigEntryType.BOOLEAN,
249 default_value=False,
250 label="Allow crossfade between tracks from the same album",
251 description="Enabling this option allows for crossfading between tracks "
252 "that are part of the same album.",
253 category="playback",
254 ),
255 ConfigEntry(
256 key=CONF_PUBLISH_IP,
257 type=ConfigEntryType.STRING,
258 default_value=ip_addresses[0],
259 label="Published IP address",
260 description="This IP address is communicated to players where to find this server."
261 "\nMake sure that this IP can be reached by players on the local network, "
262 "otherwise audio streaming will not work.",
263 required=False,
264 category="generic",
265 advanced=True,
266 requires_reload=True,
267 ),
268 ConfigEntry(
269 key=CONF_BIND_PORT,
270 type=ConfigEntryType.INTEGER,
271 default_value=default_port,
272 label="TCP Port",
273 description="The TCP port to run the server. "
274 "Make sure that this server can be reached "
275 "on the given IP and TCP port by players on the local network.",
276 category="generic",
277 advanced=True,
278 requires_reload=True,
279 ),
280 ConfigEntry(
281 key=CONF_BIND_IP,
282 type=ConfigEntryType.STRING,
283 default_value="0.0.0.0",
284 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", "::", *ip_addresses}],
285 label="Bind to IP/interface",
286 description="Start the stream server on this specific interface. \n"
287 "Use 0.0.0.0 or :: to bind to all interfaces, which is the default. \n"
288 "This is an advanced setting that should normally "
289 "not be adjusted in regular setups.",
290 category="generic",
291 advanced=True,
292 required=False,
293 requires_reload=True,
294 ),
295 ConfigEntry(
296 key=CONF_SMART_FADES_LOG_LEVEL,
297 type=ConfigEntryType.STRING,
298 label="Smart Fades Log level",
299 description="Log level for the Smart Fades mixer and analyzer.",
300 options=CONF_ENTRY_LOG_LEVEL.options,
301 default_value="GLOBAL",
302 category="generic",
303 advanced=True,
304 ),
305 )
306
307 async def setup(self, config: CoreConfig) -> None:
308 """Async initialize of module."""
309 # copy log level to audio/ffmpeg loggers
310 AUDIO_LOGGER.setLevel(self.logger.level)
311 FFMPEG_LOGGER.setLevel(self.logger.level)
312 self._setup_smart_fades_logger(config)
313 # perform check for ffmpeg version
314 await check_ffmpeg_version()
315 # start the webserver
316 self.publish_port = config.get_value(CONF_BIND_PORT)
317 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
318 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
319 # print a big fat message in the log where the streamserver is running
320 # because this is a common source of issues for people with more complex setups
321 self.logger.log(
322 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
323 "\n\n################################################################################\n"
324 "Starting streamserver on %s:%s\n"
325 "This is the IP address that is communicated to players.\n"
326 "If this is incorrect, audio will not play!\n"
327 "See the documentation how to configure the publish IP for the Streamserver\n"
328 "in Settings --> Core modules --> Streamserver\n"
329 "################################################################################\n",
330 self.publish_ip,
331 self.publish_port,
332 )
333 await self._server.setup(
334 bind_ip=bind_ip,
335 bind_port=cast("int", self.publish_port),
336 base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
337 static_routes=[
338 (
339 "*",
340 "/flow/{session_id}/{queue_id}/{queue_item_id}.{fmt}",
341 self.serve_queue_flow_stream,
342 ),
343 (
344 "*",
345 "/single/{session_id}/{queue_id}/{queue_item_id}.{fmt}",
346 self.serve_queue_item_stream,
347 ),
348 (
349 "*",
350 "/command/{queue_id}/{command}.mp3",
351 self.serve_command_request,
352 ),
353 (
354 "*",
355 "/announcement/{player_id}.{fmt}",
356 self.serve_announcement_stream,
357 ),
358 (
359 "*",
360 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
361 self.serve_plugin_source_stream,
362 ),
363 ],
364 )
365 # Start periodic garbage collection task
366 # This ensures memory from audio buffers and streams is cleaned up regularly
367 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
368
369 async def close(self) -> None:
370 """Cleanup on exit."""
371 await self._server.close()
372
373 async def resolve_stream_url(
374 self,
375 session_id: str,
376 queue_item: QueueItem,
377 flow_mode: bool = False,
378 player_id: str | None = None,
379 ) -> str:
380 """Resolve the stream URL for the given QueueItem."""
381 if not player_id:
382 player_id = queue_item.queue_id
383 conf_output_codec = await self.mass.config.get_player_config_value(
384 player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
385 )
386 output_codec = ContentType.try_parse(conf_output_codec or "flac")
387 fmt = output_codec.value
388 # handle raw pcm without exact format specifiers
389 if output_codec.is_pcm() and ";" not in fmt:
390 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
391 base_path = "flow" if flow_mode else "single"
392 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
393
394 async def get_plugin_source_url(
395 self,
396 plugin_source: PluginSource,
397 player_id: str,
398 ) -> str:
399 """Get the url for the Plugin Source stream/proxy."""
400 if plugin_source.audio_format.content_type.is_pcm():
401 fmt = ContentType.WAV.value
402 else:
403 fmt = plugin_source.audio_format.content_type.value
404 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
405
406 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
407 """Stream single queueitem audio to a player."""
408 self._log_request(request)
409 queue_id = request.match_info["queue_id"]
410 queue = self.mass.player_queues.get(queue_id)
411 if not queue:
412 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
413 session_id = request.match_info["session_id"]
414 if queue.session_id and session_id != queue.session_id:
415 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
416 queue_player = self.mass.players.get(queue_id)
417 queue_item_id = request.match_info["queue_item_id"]
418 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
419 if not queue_item:
420 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
421 if not queue_item.streamdetails:
422 try:
423 queue_item.streamdetails = await get_stream_details(
424 mass=self.mass, queue_item=queue_item
425 )
426 except Exception as e:
427 self.logger.error(
428 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
429 )
430 queue_item.available = False
431 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
432
433 # pick output format based on the streamdetails and player capabilities
434 if not queue_player:
435 raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
436
437 # work out pcm format based on streamdetails
438 pcm_format = await self._select_pcm_format(
439 player=queue_player,
440 streamdetails=queue_item.streamdetails,
441 smartfades_enabled=True,
442 )
443 output_format = await self.get_output_format(
444 output_format_str=request.match_info["fmt"],
445 player=queue_player,
446 content_sample_rate=pcm_format.sample_rate,
447 content_bit_depth=pcm_format.bit_depth,
448 )
449
450 # prepare request, add some DLNA/UPNP compatible headers
451 # icy-name is sanitized to avoid a "Potential header injection attack" exception by aiohttp
452 # see https://github.com/music-assistant/support/issues/4913
453 headers = {
454 **DEFAULT_STREAM_HEADERS,
455 "icy-name": queue_item.name.replace("\n", " ").replace("\r", " ").replace("\t", " "),
456 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
457 "Accept-Ranges": "none",
458 "Content-Type": f"audio/{output_format.output_format_str}",
459 }
460 resp = web.StreamResponse(
461 status=200,
462 reason="OK",
463 headers=headers,
464 )
465 resp.content_type = f"audio/{output_format.output_format_str}"
466 http_profile = await self.mass.config.get_player_config_value(
467 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
468 )
469 if http_profile == "forced_content_length" and not queue_item.duration:
470 # just set an insane high content length to make sure the player keeps playing
471 resp.content_length = get_chunksize(output_format, 12 * 3600)
472 elif http_profile == "forced_content_length" and queue_item.duration:
473 # guess content length based on duration
474 resp.content_length = get_chunksize(output_format, queue_item.duration)
475 elif http_profile == "chunked":
476 resp.enable_chunked_encoding()
477
478 await resp.prepare(request)
479
480 # return early if this is not a GET request
481 if request.method != "GET":
482 return resp
483
484 if queue_item.media_type != MediaType.TRACK:
485 # no crossfade on non-tracks
486 smart_fades_mode = SmartFadesMode.DISABLED
487 else:
488 smart_fades_mode = await self.mass.config.get_player_config_value(
489 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
490 )
491 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
492 queue.queue_id, CONF_CROSSFADE_DURATION, 10
493 )
494 if (
495 smart_fades_mode != SmartFadesMode.DISABLED
496 and PlayerFeature.GAPLESS_PLAYBACK not in queue_player.supported_features
497 ):
498 # crossfade is not supported on this player due to missing gapless playback
499 self.logger.warning(
500 "Crossfade disabled: Player %s does not support gapless playback, "
501 "consider enabling flow mode to enable crossfade on this player.",
502 queue_player.display_name if queue_player else "Unknown Player",
503 )
504 smart_fades_mode = SmartFadesMode.DISABLED
505
506 if smart_fades_mode != SmartFadesMode.DISABLED:
507 # crossfade is enabled, use special crossfaded single item stream
508 # where the crossfade of the next track is present in the stream of
509 # a single track. This only works if the player supports gapless playback!
510 audio_input = self.get_queue_item_stream_with_smartfade(
511 queue_item=queue_item,
512 pcm_format=pcm_format,
513 smart_fades_mode=smart_fades_mode,
514 standard_crossfade_duration=standard_crossfade_duration,
515 )
516 else:
517 # no crossfade, just a regular single item stream
518 audio_input = buffered(
519 self.get_queue_item_stream(
520 queue_item=queue_item,
521 pcm_format=pcm_format,
522 seek_position=queue_item.streamdetails.seek_position,
523 ),
524 buffer_size=10,
525 min_buffer_before_yield=2,
526 )
527 # stream the audio
528 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
529 # the desired output format for the player including any player specific filter params
530 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
531 if queue_item.media_type == MediaType.RADIO:
532 # keep very short buffer for radio streams
533 # to keep them (more or less) realtime and prevent time outs
534 read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "2"]
535 else:
536 # just allow the player to buffer whatever it wants for single item streams
537 read_rate_input_args = None
538
539 first_chunk_received = False
540 bytes_sent = 0
541 async for chunk in get_ffmpeg_stream(
542 audio_input=audio_input,
543 input_format=pcm_format,
544 output_format=output_format,
545 filter_params=get_player_filter_params(
546 self.mass,
547 player_id=queue_player.player_id,
548 input_format=pcm_format,
549 output_format=output_format,
550 ),
551 extra_input_args=read_rate_input_args,
552 ):
553 try:
554 await resp.write(chunk)
555 bytes_sent += len(chunk)
556 if not first_chunk_received:
557 first_chunk_received = True
558 # inform the queue that the track is now loaded in the buffer
559 # so for example the next track can be enqueued
560 self.mass.player_queues.track_loaded_in_buffer(
561 queue_item.queue_id, queue_item.queue_item_id
562 )
563 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
564 if first_chunk_received and not queue_player.stop_called:
565 # Player disconnected (unexpected) after receiving at least some data
566 # This could indicate buffering issues, network problems,
567 # or player-specific issues
568 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
569 self.logger.warning(
570 "Player %s disconnected prematurely from stream for %s (%s) - "
571 "error: %s, sent %d bytes, expected (approx) bytes=%d",
572 queue.display_name,
573 queue_item.name,
574 queue_item.uri,
575 err.__class__.__name__,
576 bytes_sent,
577 bytes_expected,
578 )
579 break
580 if queue_item.streamdetails.stream_error:
581 self.logger.error(
582 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
583 queue_item.name,
584 queue_item.uri,
585 queue.display_name,
586 )
587 # try to skip to the next item in the queue after a short delay
588 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
589 return resp
590
591 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
592 """Stream Queue Flow audio to player."""
593 self._log_request(request)
594 queue_id = request.match_info["queue_id"]
595 queue = self.mass.player_queues.get(queue_id)
596 if not queue:
597 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
598 if not (queue_player := self.mass.players.get(queue_id)):
599 raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
600 start_queue_item_id = request.match_info["queue_item_id"]
601 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
602 if not start_queue_item:
603 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
604
605 queue.flow_mode_stream_log = []
606
607 # select the highest possible PCM settings for this player
608 flow_pcm_format = await self._select_flow_format(queue_player)
609
610 # work out output format/details
611 output_format = await self.get_output_format(
612 output_format_str=request.match_info["fmt"],
613 player=queue_player,
614 content_sample_rate=flow_pcm_format.sample_rate,
615 content_bit_depth=flow_pcm_format.bit_depth,
616 )
617 # work out ICY metadata support
618 icy_preference = self.mass.config.get_raw_player_config_value(
619 queue_id,
620 CONF_ENTRY_ENABLE_ICY_METADATA.key,
621 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
622 )
623 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
624 icy_meta_interval = 256000 if icy_preference == "full" else 16384
625
626 # prepare request, add some DLNA/UPNP compatible headers
627 headers = {
628 **DEFAULT_STREAM_HEADERS,
629 **ICY_HEADERS,
630 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
631 "Accept-Ranges": "none",
632 "Content-Type": f"audio/{output_format.output_format_str}",
633 }
634 if enable_icy:
635 headers["icy-metaint"] = str(icy_meta_interval)
636
637 resp = web.StreamResponse(
638 status=200,
639 reason="OK",
640 headers=headers,
641 )
642 http_profile = await self.mass.config.get_player_config_value(
643 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
644 )
645 if http_profile == "forced_content_length":
646 # just set an insane high content length to make sure the player keeps playing
647 resp.content_length = get_chunksize(output_format, 12 * 3600)
648 elif http_profile == "chunked":
649 resp.enable_chunked_encoding()
650
651 await resp.prepare(request)
652
653 # return early if this is not a GET request
654 if request.method != "GET":
655 return resp
656
657 # all checks passed, start streaming!
658 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
659 # the desired output format for the player including any player specific filter params
660 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
661 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
662
663 async for chunk in get_ffmpeg_stream(
664 audio_input=self.get_queue_flow_stream(
665 queue=queue,
666 start_queue_item=start_queue_item,
667 pcm_format=flow_pcm_format,
668 ),
669 input_format=flow_pcm_format,
670 output_format=output_format,
671 filter_params=get_player_filter_params(
672 self.mass, queue_player.player_id, flow_pcm_format, output_format
673 ),
674 # we need to slowly feed the music to avoid the player stopping and later
675 # restarting (or completely failing) the audio stream by keeping the buffer short.
676 # this is reported to be an issue especially with Chromecast players.
677 # see for example: https://github.com/music-assistant/support/issues/3717
678 # allow buffer ahead of 6 seconds and read rest in realtime
679 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
680 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
681 ):
682 try:
683 await resp.write(chunk)
684 except (BrokenPipeError, ConnectionResetError, ConnectionError):
685 # race condition
686 break
687
688 if not enable_icy:
689 continue
690
691 # if icy metadata is enabled, send the icy metadata after the chunk
692 if (
693 # use current item here and not buffered item, otherwise
694 # the icy metadata will be too much ahead
695 (current_item := queue.current_item)
696 and current_item.streamdetails
697 and current_item.streamdetails.stream_title
698 ):
699 title = current_item.streamdetails.stream_title
700 elif queue and current_item and current_item.name:
701 title = current_item.name
702 else:
703 title = "Music Assistant"
704 metadata = f"StreamTitle='{title}';".encode()
705 if icy_preference == "full" and current_item and current_item.image:
706 metadata += f"StreamURL='{current_item.image.path}'".encode()
707 while len(metadata) % 16 != 0:
708 metadata += b"\x00"
709 length = len(metadata)
710 length_b = chr(int(length / 16)).encode()
711 await resp.write(length_b + metadata)
712
713 return resp
714
715 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
716 """Handle special 'command' request for a player."""
717 self._log_request(request)
718 queue_id = request.match_info["queue_id"]
719 command = request.match_info["command"]
720 if command == "next":
721 self.mass.create_task(self.mass.player_queues.next(queue_id))
722 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
723
724 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
725 """Stream announcement audio to a player."""
726 self._log_request(request)
727 player_id = request.match_info["player_id"]
728 player = self.mass.player_queues.get(player_id)
729 if not player:
730 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
731 if not (announce_data := self.announcements.get(player_id)):
732 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
733
734 # work out output format/details
735 fmt = request.match_info["fmt"]
736 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
737
738 http_profile = await self.mass.config.get_player_config_value(
739 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
740 )
741 if http_profile == "forced_content_length":
742 # given the fact that an announcement is just a short audio clip,
743 # just send it over completely at once so we have a fixed content length
744 data = b""
745 async for chunk in self.get_announcement_stream(
746 announcement_url=announce_data["announcement_url"],
747 output_format=audio_format,
748 pre_announce=announce_data["pre_announce"],
749 pre_announce_url=announce_data["pre_announce_url"],
750 ):
751 data += chunk
752 return web.Response(
753 body=data,
754 content_type=f"audio/{audio_format.output_format_str}",
755 headers=DEFAULT_STREAM_HEADERS,
756 )
757
758 resp = web.StreamResponse(
759 status=200,
760 reason="OK",
761 headers=DEFAULT_STREAM_HEADERS,
762 )
763 resp.content_type = f"audio/{audio_format.output_format_str}"
764 if http_profile == "chunked":
765 resp.enable_chunked_encoding()
766
767 await resp.prepare(request)
768
769 # return early if this is not a GET request
770 if request.method != "GET":
771 return resp
772
773 # all checks passed, start streaming!
774 self.logger.debug(
775 "Start serving audio stream for Announcement %s to %s",
776 announce_data["announcement_url"],
777 player.display_name,
778 )
779 async for chunk in self.get_announcement_stream(
780 announcement_url=announce_data["announcement_url"],
781 output_format=audio_format,
782 pre_announce=announce_data["pre_announce"],
783 pre_announce_url=announce_data["pre_announce_url"],
784 ):
785 try:
786 await resp.write(chunk)
787 except (BrokenPipeError, ConnectionResetError):
788 break
789
790 self.logger.debug(
791 "Finished serving audio stream for Announcement %s to %s",
792 announce_data["announcement_url"],
793 player.display_name,
794 )
795
796 return resp
797
798 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
799 """Stream PluginSource audio to a player."""
800 self._log_request(request)
801 plugin_source_id = request.match_info["plugin_source"]
802 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
803 if not provider:
804 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
805 # work out output format/details
806 player_id = request.match_info["player_id"]
807 player = self.mass.players.get(player_id)
808 if not player:
809 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
810 plugin_source = provider.get_source()
811 output_format = await self.get_output_format(
812 output_format_str=request.match_info["fmt"],
813 player=player,
814 content_sample_rate=plugin_source.audio_format.sample_rate,
815 content_bit_depth=plugin_source.audio_format.bit_depth,
816 )
817 headers = {
818 **DEFAULT_STREAM_HEADERS,
819 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
820 "icy-name": plugin_source.name,
821 "Accept-Ranges": "none",
822 "Content-Type": f"audio/{output_format.output_format_str}",
823 }
824
825 resp = web.StreamResponse(
826 status=200,
827 reason="OK",
828 headers=headers,
829 )
830 resp.content_type = f"audio/{output_format.output_format_str}"
831 http_profile = await self.mass.config.get_player_config_value(
832 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
833 )
834 if http_profile == "forced_content_length":
835 # just set an insanely high content length to make sure the player keeps playing
836 resp.content_length = get_chunksize(output_format, 12 * 3600)
837 elif http_profile == "chunked":
838 resp.enable_chunked_encoding()
839
840 await resp.prepare(request)
841
842 # return early if this is not a GET request
843 if request.method != "GET":
844 return resp
845
846 # all checks passed, start streaming!
847 if not plugin_source.audio_format:
848 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
849 async for chunk in self.get_plugin_source_stream(
850 plugin_source_id=plugin_source_id,
851 output_format=output_format,
852 player_id=player_id,
853 player_filter_params=get_player_filter_params(
854 self.mass, player_id, plugin_source.audio_format, output_format
855 ),
856 ):
857 try:
858 await resp.write(chunk)
859 except (BrokenPipeError, ConnectionResetError, ConnectionError):
860 break
861 return resp
862
863 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
864 """Get the url for the special command stream."""
865 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
866
867 def get_announcement_url(
868 self,
869 player_id: str,
870 announce_data: AnnounceData,
871 content_type: ContentType = ContentType.MP3,
872 ) -> str:
873 """Get the url for the special announcement stream."""
874 self.announcements[player_id] = announce_data
875 # use stream server to host announcement on local network
876 # this ensures playback on all players, including ones that do not
877 # like https hosts and it also offers the pre-announce 'bell'
878 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
879
880 def get_stream(
881 self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
882 ) -> AsyncGenerator[bytes, None]:
883 """
884 Get a stream of the given media as raw PCM audio.
885
886 This is used as helper for player providers that can consume the raw PCM
887 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
888 """
889 # select audio source
890 if media.media_type == MediaType.ANNOUNCEMENT:
891 # special case: stream announcement
892 assert media.custom_data
893 audio_source = self.get_announcement_stream(
894 media.custom_data["announcement_url"],
895 output_format=pcm_format,
896 pre_announce=media.custom_data["pre_announce"],
897 pre_announce_url=media.custom_data["pre_announce_url"],
898 )
899 elif media.media_type == MediaType.PLUGIN_SOURCE:
900 # special case: plugin source stream
901 assert media.custom_data
902 audio_source = self.get_plugin_source_stream(
903 plugin_source_id=media.custom_data["source_id"],
904 output_format=pcm_format,
905 # need to pass player_id from the PlayerMedia object
906 # because this could have been a group
907 player_id=media.custom_data["player_id"],
908 )
909 elif (
910 media.media_type == MediaType.FLOW_STREAM
911 and media.source_id
912 and media.source_id.startswith(UGP_PREFIX)
913 and media.uri
914 and "/ugp/" in media.uri
915 ):
916 # special case: member player accessing UGP stream
917 # Check URI to distinguish from the UGP accessing its own stream
918 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
919 ugp_stream = ugp_player.stream
920 assert ugp_stream is not None # for type checker
921 if ugp_stream.base_pcm_format == pcm_format:
922 # no conversion needed
923 audio_source = ugp_stream.subscribe_raw()
924 else:
925 audio_source = ugp_stream.get_stream(output_format=pcm_format)
926 elif (
927 media.source_id
928 and media.queue_item_id
929 and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
930 ):
931 # regular queue (flow) stream request
932 queue = self.mass.player_queues.get(media.source_id)
933 assert queue
934 start_queue_item = self.mass.player_queues.get_item(
935 media.source_id, media.queue_item_id
936 )
937 assert start_queue_item
938 audio_source = self.mass.streams.get_queue_flow_stream(
939 queue=queue,
940 start_queue_item=start_queue_item,
941 pcm_format=pcm_format,
942 )
943 elif media.source_id and media.queue_item_id:
944 # single item stream (e.g. radio)
945 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
946 assert queue_item
947 audio_source = buffered(
948 self.get_queue_item_stream(
949 queue_item=queue_item,
950 pcm_format=pcm_format,
951 ),
952 buffer_size=10,
953 min_buffer_before_yield=2,
954 )
955 else:
956 # assume url or some other direct path
957 # NOTE: this will fail if its an uri not playable by ffmpeg
958 audio_source = get_ffmpeg_stream(
959 audio_input=media.uri,
960 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
961 output_format=pcm_format,
962 )
963 return audio_source
964
965 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
966 async def get_queue_flow_stream(
967 self,
968 queue: PlayerQueue,
969 start_queue_item: QueueItem,
970 pcm_format: AudioFormat,
971 ) -> AsyncGenerator[bytes, None]:
972 """
973 Get a flow stream of all tracks in the queue as raw PCM audio.
974
975 yields chunks of exactly 1 second of audio in the given pcm_format.
976 """
977 # ruff: noqa: PLR0915
978 assert pcm_format.content_type.is_pcm()
979 queue_track = None
980 last_fadeout_part: bytes = b""
981 last_streamdetails: StreamDetails | None = None
982 last_play_log_entry: PlayLogEntry | None = None
983 queue.flow_mode = True
984 if not start_queue_item:
985 # this can happen in some (edge case) race conditions
986 return
987 pcm_sample_size = pcm_format.pcm_sample_size
988 if start_queue_item.media_type != MediaType.TRACK:
989 # no crossfade on non-tracks
990 smart_fades_mode = SmartFadesMode.DISABLED
991 standard_crossfade_duration = 0
992 else:
993 smart_fades_mode = await self.mass.config.get_player_config_value(
994 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
995 )
996 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
997 queue.queue_id, CONF_CROSSFADE_DURATION, 10
998 )
999 self.logger.info(
1000 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
1001 queue.display_name,
1002 smart_fades_mode,
1003 f"({standard_crossfade_duration}s)"
1004 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
1005 else "",
1006 )
1007 total_bytes_sent = 0
1008 total_chunks_received = 0
1009
1010 while True:
1011 # get (next) queue item to stream
1012 if queue_track is None:
1013 queue_track = start_queue_item
1014 else:
1015 try:
1016 queue_track = await self.mass.player_queues.load_next_queue_item(
1017 queue.queue_id, queue_track.queue_item_id
1018 )
1019 except QueueEmpty:
1020 break
1021
1022 if queue_track.streamdetails is None:
1023 raise InvalidDataError(
1024 "No Streamdetails known for queue item %s",
1025 queue_track.queue_item_id,
1026 )
1027
1028 self.logger.debug(
1029 "Start Streaming queue track: %s (%s) for queue %s",
1030 queue_track.streamdetails.uri,
1031 queue_track.name,
1032 queue.display_name,
1033 )
1034 # append to play log so the queue controller can work out which track is playing
1035 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1036 queue.flow_mode_stream_log.append(play_log_entry)
1037 # calculate crossfade buffer size
1038 crossfade_buffer_duration = (
1039 SMART_CROSSFADE_DURATION
1040 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1041 else standard_crossfade_duration
1042 )
1043 crossfade_buffer_duration = min(
1044 crossfade_buffer_duration,
1045 int(queue_track.streamdetails.duration / 2)
1046 if queue_track.streamdetails.duration
1047 else crossfade_buffer_duration,
1048 )
1049 # Ensure crossfade buffer size is aligned to frame boundaries
1050 # Frame size = bytes_per_sample * channels
1051 bytes_per_sample = pcm_format.bit_depth // 8
1052 frame_size = bytes_per_sample * pcm_format.channels
1053 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1054 # Round down to nearest frame boundary
1055 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1056
1057 bytes_written = 0
1058 buffer = b""
1059 # handle incoming audio chunks
1060 first_chunk_received = False
1061 # buffer size needs to be big enough to include the crossfade part
1062
1063 async for chunk in self.get_queue_item_stream(
1064 queue_track,
1065 pcm_format=pcm_format,
1066 seek_position=queue_track.streamdetails.seek_position,
1067 raise_on_error=False,
1068 ):
1069 total_chunks_received += 1
1070 if not first_chunk_received:
1071 first_chunk_received = True
1072 # inform the queue that the track is now loaded in the buffer
1073 # so the next track can be preloaded
1074 self.mass.player_queues.track_loaded_in_buffer(
1075 queue.queue_id, queue_track.queue_item_id
1076 )
1077 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1078 # we want a stream to start as quickly as possible
1079 # so for the first 10 chunks we keep a very short buffer
1080 req_buffer_size = pcm_format.pcm_sample_size
1081 else:
1082 req_buffer_size = (
1083 pcm_sample_size
1084 if smart_fades_mode == SmartFadesMode.DISABLED
1085 else crossfade_buffer_size
1086 )
1087
1088 # ALWAYS APPEND CHUNK TO BUFFER
1089 buffer += chunk
1090 del chunk
1091 if len(buffer) < req_buffer_size:
1092 # buffer is not full enough, move on
1093 # yield control to event loop with 10ms delay
1094 await asyncio.sleep(0.01)
1095 continue
1096
1097 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1098 if last_fadeout_part and last_streamdetails:
1099 # perform crossfade
1100 fadein_part = buffer[:crossfade_buffer_size]
1101 remaining_bytes = buffer[crossfade_buffer_size:]
1102 # Use the mixer to handle all crossfade logic
1103 crossfade_part = await self._smart_fades_mixer.mix(
1104 fade_in_part=fadein_part,
1105 fade_out_part=last_fadeout_part,
1106 fade_in_streamdetails=queue_track.streamdetails,
1107 fade_out_streamdetails=last_streamdetails,
1108 pcm_format=pcm_format,
1109 standard_crossfade_duration=standard_crossfade_duration,
1110 mode=smart_fades_mode,
1111 )
1112 # because the crossfade exists of both the fadein and fadeout part
1113 # we need to correct the bytes_written accordingly so the duration
1114 # calculations at the end of the track are correct
1115 crossfade_part_len = len(crossfade_part)
1116 bytes_written += int(crossfade_part_len / 2)
1117 if last_play_log_entry:
1118 assert last_play_log_entry.seconds_streamed is not None
1119 last_play_log_entry.seconds_streamed += (
1120 crossfade_part_len / 2 / pcm_sample_size
1121 )
1122 # yield crossfade_part (in pcm_sample_size chunks)
1123 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1124 yield _chunk
1125 del _chunk
1126 del crossfade_part
1127 # also write the leftover bytes from the crossfade action
1128 if remaining_bytes:
1129 yield remaining_bytes
1130 bytes_written += len(remaining_bytes)
1131 del remaining_bytes
1132 # clear vars
1133 last_fadeout_part = b""
1134 last_streamdetails = None
1135 buffer = b""
1136
1137 #### OTHER: enough data in buffer, feed to output
1138 while len(buffer) > req_buffer_size:
1139 yield buffer[:pcm_sample_size]
1140 bytes_written += pcm_sample_size
1141 buffer = buffer[pcm_sample_size:]
1142
1143 #### HANDLE END OF TRACK
1144 if last_fadeout_part:
1145 # edge case: we did not get enough data to make the crossfade
1146 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1147 yield _chunk
1148 del _chunk
1149 bytes_written += len(last_fadeout_part)
1150 last_fadeout_part = b""
1151 if self._crossfade_allowed(
1152 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1153 ):
1154 # if crossfade is enabled, save fadeout part to pickup for next track
1155 last_fadeout_part = buffer[-crossfade_buffer_size:]
1156 last_streamdetails = queue_track.streamdetails
1157 last_play_log_entry = play_log_entry
1158 remaining_bytes = buffer[:-crossfade_buffer_size]
1159 if remaining_bytes:
1160 yield remaining_bytes
1161 bytes_written += len(remaining_bytes)
1162 del remaining_bytes
1163 elif buffer:
1164 # no crossfade enabled, just yield the buffer last part
1165 bytes_written += len(buffer)
1166 for _chunk in divide_chunks(buffer, pcm_sample_size):
1167 yield _chunk
1168 del _chunk
1169 # make sure the buffer gets cleaned up
1170 del buffer
1171
1172 # update duration details based on the actual pcm data we sent
1173 # this also accounts for crossfade and silence stripping
1174 seconds_streamed = bytes_written / pcm_sample_size
1175 queue_track.streamdetails.seconds_streamed = seconds_streamed
1176 queue_track.streamdetails.duration = int(
1177 queue_track.streamdetails.seek_position + seconds_streamed
1178 )
1179 play_log_entry.seconds_streamed = seconds_streamed
1180 play_log_entry.duration = queue_track.streamdetails.duration
1181 total_bytes_sent += bytes_written
1182 self.logger.debug(
1183 "Finished Streaming queue track: %s (%s) on queue %s",
1184 queue_track.streamdetails.uri,
1185 queue_track.name,
1186 queue.display_name,
1187 )
1188 #### HANDLE END OF QUEUE FLOW STREAM
1189 # end of queue flow: make sure we yield the last_fadeout_part
1190 if last_fadeout_part:
1191 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1192 yield _chunk
1193 del _chunk
1194 # correct seconds streamed/duration
1195 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1196 streamdetails = queue_track.streamdetails
1197 assert streamdetails is not None
1198 streamdetails.seconds_streamed = (
1199 streamdetails.seconds_streamed or 0
1200 ) + last_part_seconds
1201 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1202 last_fadeout_part = b""
1203 total_bytes_sent += bytes_written
1204 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1205
1206 async def get_announcement_stream(
1207 self,
1208 announcement_url: str,
1209 output_format: AudioFormat,
1210 pre_announce: bool | str = False,
1211 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1212 ) -> AsyncGenerator[bytes, None]:
1213 """Get the special announcement stream."""
1214 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1215 # we are doing announcement in PCM first to avoid multiple encodings
1216 # when mixing pre-announce and announcement
1217 # also we have to deal with some TTS sources being super slow in delivering audio
1218 # so we take an approach where we start fetching the announcement in the background
1219 # while we can already start playing the pre-announce sound (if any)
1220
1221 pcm_format = (
1222 output_format
1223 if output_format.content_type.is_pcm()
1224 else AudioFormat(
1225 sample_rate=output_format.sample_rate,
1226 content_type=ContentType.PCM_S16LE,
1227 bit_depth=16,
1228 channels=output_format.channels,
1229 )
1230 )
1231
1232 async def fetch_announcement() -> None:
1233 fmt = announcement_url.rsplit(".")[-1]
1234 async for chunk in get_ffmpeg_stream(
1235 audio_input=announcement_url,
1236 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1237 output_format=pcm_format,
1238 chunk_size=get_chunksize(pcm_format, 1),
1239 ):
1240 await announcement_data.put(chunk)
1241 await announcement_data.put(None) # signal end of stream
1242
1243 self.mass.create_task(fetch_announcement())
1244
1245 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1246 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1247 if pre_announce:
1248 async for chunk in get_ffmpeg_stream(
1249 audio_input=pre_announce_url,
1250 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1251 output_format=pcm_format,
1252 chunk_size=get_chunksize(pcm_format, 1),
1253 ):
1254 yield chunk
1255 # pad silence while we're waiting for the announcement to be ready
1256 while announcement_data.empty():
1257 yield b"\0" * int(
1258 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1259 )
1260 await asyncio.sleep(0.1)
1261 # stream announcement
1262 while True:
1263 announcement_chunk = await announcement_data.get()
1264 if announcement_chunk is None:
1265 break
1266 yield announcement_chunk
1267
1268 if output_format == pcm_format:
1269 # no need to re-encode, just yield the raw PCM stream
1270 async for chunk in _announcement_stream():
1271 yield chunk
1272 return
1273
1274 # stream final announcement in requested output format
1275 async for chunk in get_ffmpeg_stream(
1276 audio_input=_announcement_stream(),
1277 input_format=pcm_format,
1278 output_format=output_format,
1279 ):
1280 yield chunk
1281
1282 async def get_plugin_source_stream(
1283 self,
1284 plugin_source_id: str,
1285 output_format: AudioFormat,
1286 player_id: str,
1287 player_filter_params: list[str] | None = None,
1288 ) -> AsyncGenerator[bytes, None]:
1289 """Get the special plugin source stream."""
1290 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1291 if not plugin_prov:
1292 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1293
1294 plugin_source = plugin_prov.get_source()
1295 self.logger.debug(
1296 "Start streaming PluginSource %s to %s using output format %s",
1297 plugin_source_id,
1298 player_id,
1299 output_format,
1300 )
1301 # this should already be set by the player controller, but just to be sure
1302 plugin_source.in_use_by = player_id
1303
1304 try:
1305 async for chunk in get_ffmpeg_stream(
1306 audio_input=cast(
1307 "str | AsyncGenerator[bytes, None]",
1308 plugin_prov.get_audio_stream(player_id)
1309 if plugin_source.stream_type == StreamType.CUSTOM
1310 else plugin_source.path,
1311 ),
1312 input_format=plugin_source.audio_format,
1313 output_format=output_format,
1314 filter_params=player_filter_params,
1315 extra_input_args=["-y", "-re"],
1316 ):
1317 if plugin_source.in_use_by != player_id:
1318 # another player took over or the stream ended, stop streaming
1319 break
1320 yield chunk
1321 finally:
1322 self.logger.debug(
1323 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1324 )
1325 await asyncio.sleep(1) # prevent race conditions when selecting source
1326 if plugin_source.in_use_by == player_id:
1327 # release control
1328 plugin_source.in_use_by = None
1329
1330 async def get_queue_item_stream(
1331 self,
1332 queue_item: QueueItem,
1333 pcm_format: AudioFormat,
1334 seek_position: int = 0,
1335 raise_on_error: bool = True,
1336 ) -> AsyncGenerator[bytes, None]:
1337 """Get the (PCM) audio stream for a single queue item."""
1338 # collect all arguments for ffmpeg
1339 streamdetails = queue_item.streamdetails
1340 assert streamdetails
1341 filter_params: list[str] = []
1342
1343 # handle volume normalization
1344 gain_correct: float | None = None
1345 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1346 # volume normalization using loudnorm filter (in dynamic mode)
1347 # which also collects the measurement on the fly during playback
1348 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1349 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1350 filter_rule += ":print_format=json"
1351 filter_params.append(filter_rule)
1352 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1353 # apply user defined fixed volume/gain correction
1354 config_key = (
1355 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1356 if streamdetails.media_type == MediaType.TRACK
1357 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1358 )
1359 gain_value = await self.mass.config.get_core_config_value(
1360 self.domain, config_key, default=0.0, return_type=float
1361 )
1362 gain_correct = round(gain_value, 2)
1363 filter_params.append(f"volume={gain_correct}dB")
1364 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1365 # volume normalization with known loudness measurement
1366 # apply volume/gain correction
1367 target_loudness = (
1368 float(streamdetails.target_loudness)
1369 if streamdetails.target_loudness is not None
1370 else 0.0
1371 )
1372 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1373 gain_correct = target_loudness - float(streamdetails.loudness_album)
1374 elif streamdetails.loudness is not None:
1375 gain_correct = target_loudness - float(streamdetails.loudness)
1376 else:
1377 gain_correct = 0.0
1378 gain_correct = round(gain_correct, 2)
1379 filter_params.append(f"volume={gain_correct}dB")
1380 streamdetails.volume_normalization_gain_correct = gain_correct
1381
1382 allow_buffer = bool(
1383 self.mass.config.get_raw_core_config_value(
1384 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1385 )
1386 and streamdetails.duration
1387 )
1388
1389 self.logger.debug(
1390 "Starting queue item stream for %s (%s)"
1391 " - using buffer: %s"
1392 " - using fade-in: %s"
1393 " - using volume normalization: %s",
1394 queue_item.name,
1395 streamdetails.uri,
1396 allow_buffer,
1397 streamdetails.fade_in,
1398 streamdetails.volume_normalization_mode,
1399 )
1400 if allow_buffer:
1401 media_stream_gen = get_buffered_media_stream(
1402 self.mass,
1403 streamdetails=streamdetails,
1404 pcm_format=pcm_format,
1405 seek_position=int(seek_position),
1406 filter_params=filter_params,
1407 )
1408 else:
1409 media_stream_gen = get_media_stream(
1410 self.mass,
1411 streamdetails=streamdetails,
1412 pcm_format=pcm_format,
1413 seek_position=int(seek_position),
1414 filter_params=filter_params,
1415 )
1416
1417 first_chunk_received = False
1418 fade_in_buffer = b""
1419 bytes_received = 0
1420 finished = False
1421 stream_started_at = asyncio.get_event_loop().time()
1422 try:
1423 async for chunk in media_stream_gen:
1424 bytes_received += len(chunk)
1425 if not first_chunk_received:
1426 first_chunk_received = True
1427 self.logger.debug(
1428 "First audio chunk received for %s (%s) after %.2f seconds",
1429 queue_item.name,
1430 streamdetails.uri,
1431 asyncio.get_event_loop().time() - stream_started_at,
1432 )
1433 # handle optional fade-in
1434 if streamdetails.fade_in:
1435 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1436 fade_in_buffer += chunk
1437 elif fade_in_buffer:
1438 async for fade_chunk in get_ffmpeg_stream(
1439 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1440 # but FFMpeg class actually accepts bytes too. This works at
1441 # runtime but needs type: ignore for mypy.
1442 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1443 input_format=pcm_format,
1444 output_format=pcm_format,
1445 filter_params=["afade=type=in:start_time=0:duration=3"],
1446 ):
1447 yield fade_chunk
1448 fade_in_buffer = b""
1449 streamdetails.fade_in = False
1450 else:
1451 yield chunk
1452 # help garbage collection by explicitly deleting chunk
1453 del chunk
1454 finished = True
1455 except AudioError as err:
1456 streamdetails.stream_error = True
1457 queue_item.available = False
1458 if raise_on_error:
1459 raise
1460 # yes, we swallow the error here after logging it
1461 # so the outer stream can handle it gracefully
1462 self.logger.error(
1463 "AudioError while streaming queue item %s (%s): %s",
1464 queue_item.name,
1465 streamdetails.uri,
1466 err,
1467 )
1468 finally:
1469 # determine how many seconds we've streamed
1470 # for pcm output we can calculate this easily
1471 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1472 streamdetails.seconds_streamed = seconds_streamed
1473 self.logger.debug(
1474 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1475 "aborted" if not finished else "finished",
1476 streamdetails.uri,
1477 asyncio.get_event_loop().time() - stream_started_at,
1478 seconds_streamed,
1479 )
1480 # report stream to provider
1481 if (finished or seconds_streamed >= 90) and (
1482 music_prov := self.mass.get_provider(streamdetails.provider)
1483 ):
1484 if TYPE_CHECKING: # avoid circular import
1485 assert isinstance(music_prov, MusicProvider)
1486 self.mass.create_task(music_prov.on_streamed(streamdetails))
1487
1488 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1489 async def get_queue_item_stream_with_smartfade(
1490 self,
1491 queue_item: QueueItem,
1492 pcm_format: AudioFormat,
1493 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1494 standard_crossfade_duration: int = 10,
1495 ) -> AsyncGenerator[bytes, None]:
1496 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1497 queue = self.mass.player_queues.get(queue_item.queue_id)
1498 if not queue:
1499 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1500
1501 streamdetails = queue_item.streamdetails
1502 assert streamdetails
1503 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1504
1505 if crossfade_data and streamdetails.seek_position > 0:
1506 # don't do crossfade when seeking into track
1507 crossfade_data = None
1508 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1509 # edge case alert: the next item changed just while we were preloading/crossfading
1510 self.logger.warning(
1511 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1512 )
1513 crossfade_data = None
1514
1515 self.logger.debug(
1516 "Start Streaming queue track: %s (%s) for queue %s "
1517 "- crossfade mode: %s "
1518 "- crossfading from previous track: %s ",
1519 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1520 queue_item.name,
1521 queue.display_name,
1522 smart_fades_mode,
1523 "true" if crossfade_data else "false",
1524 )
1525
1526 buffer = b""
1527 bytes_written = 0
1528 # calculate crossfade buffer size
1529 crossfade_buffer_duration = (
1530 SMART_CROSSFADE_DURATION
1531 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1532 else standard_crossfade_duration
1533 )
1534 crossfade_buffer_duration = min(
1535 crossfade_buffer_duration,
1536 int(streamdetails.duration / 2)
1537 if streamdetails.duration
1538 else crossfade_buffer_duration,
1539 )
1540 # Ensure crossfade buffer size is aligned to frame boundaries
1541 # Frame size = bytes_per_sample * channels
1542 bytes_per_sample = pcm_format.bit_depth // 8
1543 frame_size = bytes_per_sample * pcm_format.channels
1544 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1545 # Round down to nearest frame boundary
1546 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1547 fade_out_data: bytes | None = None
1548
1549 if crossfade_data:
1550 # Calculate discard amount in seconds (format-independent)
1551 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1552 fade_in_duration_seconds = (
1553 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1554 )
1555 discard_seconds = int(fade_in_duration_seconds) - 1
1556 # Calculate discard amounts in CURRENT track's format
1557 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1558 # Convert fade_in_size to current track's format for correct leftover calculation
1559 fade_in_size_in_current_format = int(
1560 fade_in_duration_seconds * pcm_format.pcm_sample_size
1561 )
1562 discard_leftover = fade_in_size_in_current_format - discard_bytes
1563 else:
1564 discard_seconds = streamdetails.seek_position
1565 discard_leftover = 0
1566 total_chunks_received = 0
1567 req_buffer_size = crossfade_buffer_size
1568 async for chunk in self.get_queue_item_stream(
1569 queue_item, pcm_format, seek_position=discard_seconds
1570 ):
1571 total_chunks_received += 1
1572 if discard_leftover:
1573 # discard leftover bytes from crossfade data
1574 chunk = chunk[discard_leftover:] # noqa: PLW2901
1575 discard_leftover = 0
1576
1577 if total_chunks_received < 10:
1578 # we want a stream to start as quickly as possible
1579 # so for the first 10 chunks we keep a very short buffer
1580 req_buffer_size = pcm_format.pcm_sample_size
1581 else:
1582 req_buffer_size = crossfade_buffer_size
1583
1584 # ALWAYS APPEND CHUNK TO BUFFER
1585 buffer += chunk
1586 del chunk
1587 if len(buffer) < req_buffer_size:
1588 # buffer is not full enough, move on
1589 continue
1590
1591 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1592 if crossfade_data:
1593 # send the (second half of the) crossfade data
1594 if crossfade_data.pcm_format != pcm_format:
1595 # edge case: pcm format mismatch, we need to resample
1596 self.logger.debug(
1597 "Resampling crossfade data from %s to %s for queue %s",
1598 crossfade_data.pcm_format.sample_rate,
1599 pcm_format.sample_rate,
1600 queue.display_name,
1601 )
1602 resampled_data = await resample_pcm_audio(
1603 crossfade_data.data,
1604 crossfade_data.pcm_format,
1605 pcm_format,
1606 )
1607 if resampled_data:
1608 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1609 yield _chunk
1610 bytes_written += len(resampled_data)
1611 else:
1612 # Resampling failed, error already logged in resample_pcm_audio
1613 # Skip crossfade data entirely - stream continues without it
1614 self.logger.warning(
1615 "Skipping crossfade data for queue %s due to resampling failure",
1616 queue.display_name,
1617 )
1618 else:
1619 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1620 yield _chunk
1621 bytes_written += len(crossfade_data.data)
1622 # clear vars
1623 crossfade_data = None
1624
1625 #### OTHER: enough data in buffer, feed to output
1626 while len(buffer) > req_buffer_size:
1627 yield buffer[: pcm_format.pcm_sample_size]
1628 bytes_written += pcm_format.pcm_sample_size
1629 buffer = buffer[pcm_format.pcm_sample_size :]
1630
1631 #### HANDLE END OF TRACK
1632
1633 if crossfade_data:
1634 # edge case: we did not get enough data to send the crossfade data
1635 # send the (second half of the) crossfade data
1636 if crossfade_data.pcm_format != pcm_format:
1637 # (yet another) edge case: pcm format mismatch, we need to resample
1638 self.logger.debug(
1639 "Resampling remaining crossfade data from %s to %s for queue %s",
1640 crossfade_data.pcm_format.sample_rate,
1641 pcm_format.sample_rate,
1642 queue.display_name,
1643 )
1644 resampled_crossfade_data = await resample_pcm_audio(
1645 crossfade_data.data,
1646 crossfade_data.pcm_format,
1647 pcm_format,
1648 )
1649 if resampled_crossfade_data:
1650 crossfade_data.data = resampled_crossfade_data
1651 else:
1652 # Resampling failed, error already logged in resample_pcm_audio
1653 # Skip the crossfade data entirely
1654 self.logger.warning(
1655 "Skipping remaining crossfade data for queue %s due to resampling failure",
1656 queue.display_name,
1657 )
1658 crossfade_data = None
1659 if crossfade_data:
1660 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1661 yield _chunk
1662 bytes_written += len(crossfade_data.data)
1663 crossfade_data = None
1664
1665 # get next track for crossfade
1666 next_queue_item: QueueItem | None
1667 try:
1668 self.logger.debug(
1669 "Preloading NEXT track for crossfade for queue %s",
1670 queue.display_name,
1671 )
1672 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1673 queue.queue_id, queue_item.queue_item_id
1674 )
1675 # set index_in_buffer to prevent our next track is overwritten while preloading
1676 if next_queue_item.streamdetails is None:
1677 raise InvalidDataError(
1678 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1679 )
1680 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1681 queue.queue_id, next_queue_item.queue_item_id
1682 )
1683 queue_player = self.mass.players.get(queue.queue_id)
1684 assert queue_player is not None
1685 next_queue_item_pcm_format = await self._select_pcm_format(
1686 player=queue_player,
1687 streamdetails=next_queue_item.streamdetails,
1688 smartfades_enabled=True,
1689 )
1690 except QueueEmpty:
1691 # end of queue reached, no next item
1692 next_queue_item = None
1693
1694 if not next_queue_item or not self._crossfade_allowed(
1695 queue_item,
1696 smart_fades_mode=smart_fades_mode,
1697 flow_mode=False,
1698 next_queue_item=next_queue_item,
1699 sample_rate=pcm_format.sample_rate,
1700 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1701 ):
1702 # no crossfade enabled/allowed, just yield the buffer last part
1703 bytes_written += len(buffer)
1704 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1705 yield _chunk
1706 else:
1707 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1708 fade_out_data = buffer
1709 buffer = b""
1710 try:
1711 async for chunk in self.get_queue_item_stream(
1712 next_queue_item, next_queue_item_pcm_format
1713 ):
1714 # append to buffer until we reach crossfade size
1715 # we only need the first X seconds of the NEXT track so we can
1716 # perform the crossfade.
1717 # the crossfaded audio of the previous and next track will be
1718 # sent in two equal parts: first half now, second half
1719 # when the next track starts. We use CrossfadeData to store
1720 # the second half to be picked up by the next track's stream generator.
1721 # Note that we more or less expect the user to have enabled the in-memory
1722 # buffer so we can keep the next track's audio data in memory.
1723 buffer += chunk
1724 del chunk
1725 if len(buffer) >= crossfade_buffer_size:
1726 break
1727 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1728 # Store original buffer size before any resampling for fade_in_size calculation
1729 # This size is in the next track's original format which is what we need
1730 original_buffer_size = len(buffer)
1731 if next_queue_item_pcm_format != pcm_format:
1732 # edge case: pcm format mismatch, we need to resample the next track's
1733 # beginning part before crossfading
1734 self.logger.debug(
1735 "Resampling next track's crossfade from %s to %s for queue %s",
1736 next_queue_item_pcm_format.sample_rate,
1737 pcm_format.sample_rate,
1738 queue.display_name,
1739 )
1740 buffer = await resample_pcm_audio(
1741 buffer,
1742 next_queue_item_pcm_format,
1743 pcm_format,
1744 )
1745 # perform actual (smart fades) crossfade using mixer
1746 crossfade_bytes = await self._smart_fades_mixer.mix(
1747 fade_in_part=buffer,
1748 fade_out_part=fade_out_data,
1749 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1750 fade_out_streamdetails=streamdetails,
1751 pcm_format=pcm_format,
1752 standard_crossfade_duration=standard_crossfade_duration,
1753 mode=smart_fades_mode,
1754 )
1755 # send half of the crossfade_part (= approx the fadeout part)
1756 split_point = (len(crossfade_bytes) + 1) // 2
1757 crossfade_first = crossfade_bytes[:split_point]
1758 crossfade_second = crossfade_bytes[split_point:]
1759 del crossfade_bytes
1760 bytes_written += len(crossfade_first)
1761 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1762 yield _chunk
1763 # store the other half for the next track
1764 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1765 # because it was created from the resampled buffer used for mixing.
1766 # BUT fade_in_size represents bytes in NEXT track's original format
1767 # (next_queue_item_pcm_format) because that's how much of the next track
1768 # was consumed during the crossfade. We need both formats to correctly
1769 # handle the crossfade data when the next track starts.
1770 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1771 data=crossfade_second,
1772 fade_in_size=original_buffer_size,
1773 pcm_format=pcm_format, # Format of the data (current track)
1774 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1775 queue_item_id=next_queue_item.queue_item_id,
1776 )
1777 except AudioError:
1778 # no crossfade possible, just yield the fade_out_data
1779 next_queue_item = None
1780 yield fade_out_data
1781 bytes_written += len(fade_out_data)
1782 del fade_out_data
1783 # make sure the buffer gets cleaned up
1784 del buffer
1785 # update duration details based on the actual pcm data we sent
1786 # this also accounts for crossfade and silence stripping
1787 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1788 streamdetails.seconds_streamed = seconds_streamed
1789 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1790 self.logger.debug(
1791 "Finished Streaming queue track: %s (%s) on queue %s "
1792 "- crossfade data prepared for next track: %s",
1793 streamdetails.uri,
1794 queue_item.name,
1795 queue.display_name,
1796 next_queue_item.name if next_queue_item else "N/A",
1797 )
1798
1799 def _log_request(self, request: web.Request) -> None:
1800 """Log request."""
1801 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1802 self.logger.log(
1803 VERBOSE_LOG_LEVEL,
1804 "Got %s request to %s from %s\nheaders: %s\n",
1805 request.method,
1806 request.path,
1807 request.remote,
1808 request.headers,
1809 )
1810 else:
1811 self.logger.debug(
1812 "Got %s request to %s from %s",
1813 request.method,
1814 request.path,
1815 request.remote,
1816 )
1817
1818 async def get_output_format(
1819 self,
1820 output_format_str: str,
1821 player: Player,
1822 content_sample_rate: int,
1823 content_bit_depth: int,
1824 ) -> AudioFormat:
1825 """Parse (player specific) output format details for given format string."""
1826 content_type: ContentType = ContentType.try_parse(output_format_str)
1827 supported_rates_conf = cast(
1828 "list[tuple[str, str]]",
1829 await self.mass.config.get_player_config_value(
1830 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1831 ),
1832 )
1833 output_channels_str = self.mass.config.get_raw_player_config_value(
1834 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1835 )
1836 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1837 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1838
1839 player_max_bit_depth = max(supported_bit_depths)
1840 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1841 if content_sample_rate in supported_sample_rates:
1842 output_sample_rate = content_sample_rate
1843 else:
1844 output_sample_rate = max(supported_sample_rates)
1845
1846 if not content_type.is_lossless():
1847 # no point in having a higher bit depth for lossy formats
1848 output_bit_depth = 16
1849 output_sample_rate = min(48000, output_sample_rate)
1850 if output_format_str == "pcm":
1851 content_type = ContentType.from_bit_depth(output_bit_depth)
1852 return AudioFormat(
1853 content_type=content_type,
1854 sample_rate=output_sample_rate,
1855 bit_depth=output_bit_depth,
1856 channels=1 if output_channels_str != "stereo" else 2,
1857 )
1858
1859 async def _select_flow_format(
1860 self,
1861 player: Player,
1862 ) -> AudioFormat:
1863 """Parse (player specific) flow stream PCM format."""
1864 supported_rates_conf = cast(
1865 "list[tuple[str, str]]",
1866 await self.mass.config.get_player_config_value(
1867 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1868 ),
1869 )
1870 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1871 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1872 for sample_rate in (192000, 96000, 48000, 44100):
1873 if sample_rate in supported_sample_rates:
1874 output_sample_rate = sample_rate
1875 break
1876 return AudioFormat(
1877 content_type=INTERNAL_PCM_FORMAT.content_type,
1878 sample_rate=output_sample_rate,
1879 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1880 channels=2,
1881 )
1882
1883 async def _select_pcm_format(
1884 self,
1885 player: Player,
1886 streamdetails: StreamDetails,
1887 smartfades_enabled: bool,
1888 ) -> AudioFormat:
1889 """Parse (player specific) stream internal PCM format."""
1890 supported_rates_conf = cast(
1891 "list[tuple[str, str]]",
1892 await self.mass.config.get_player_config_value(
1893 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1894 ),
1895 )
1896 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1897 # use highest supported rate within content rate
1898 output_sample_rate = max(
1899 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1900 default=48000, # sane/safe default
1901 )
1902 # work out pcm format based on streamdetails
1903 pcm_format = AudioFormat(
1904 sample_rate=output_sample_rate,
1905 # always use f32 internally for extra headroom for filters etc
1906 content_type=INTERNAL_PCM_FORMAT.content_type,
1907 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1908 channels=streamdetails.audio_format.channels,
1909 )
1910 if smartfades_enabled:
1911 pcm_format.channels = 2 # force stereo for crossfading
1912
1913 return pcm_format
1914
1915 def _crossfade_allowed(
1916 self,
1917 queue_item: QueueItem,
1918 smart_fades_mode: SmartFadesMode,
1919 flow_mode: bool = False,
1920 next_queue_item: QueueItem | None = None,
1921 sample_rate: int | None = None,
1922 next_sample_rate: int | None = None,
1923 ) -> bool:
1924 """Get the crossfade config for a queue item."""
1925 if smart_fades_mode == SmartFadesMode.DISABLED:
1926 return False
1927 if not (self.mass.players.get(queue_item.queue_id)):
1928 return False # just a guard
1929 if queue_item.media_type != MediaType.TRACK:
1930 self.logger.debug("Skipping crossfade: current item is not a track")
1931 return False
1932 # check if the next item is part of the same album
1933 next_item = next_queue_item or self.mass.player_queues.get_next_item(
1934 queue_item.queue_id, queue_item.queue_item_id
1935 )
1936 if not next_item:
1937 # there is no next item!
1938 return False
1939 # check if next item is a track
1940 if next_item.media_type != MediaType.TRACK:
1941 self.logger.debug("Skipping crossfade: next item is not a track")
1942 return False
1943 if (
1944 isinstance(queue_item.media_item, Track)
1945 and isinstance(next_item.media_item, Track)
1946 and queue_item.media_item.album
1947 and next_item.media_item.album
1948 and queue_item.media_item.album == next_item.media_item.album
1949 and not self.mass.config.get_raw_core_config_value(
1950 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
1951 )
1952 ):
1953 # in general, crossfade is not desired for tracks of the same (gapless) album
1954 # because we have no accurate way to determine if the album is gapless or not,
1955 # for now we just never crossfade between tracks of the same album
1956 self.logger.debug("Skipping crossfade: next item is part of the same album")
1957 return False
1958
1959 # check if we're allowed to crossfade on different sample rates
1960 if (
1961 not flow_mode
1962 and sample_rate
1963 and next_sample_rate
1964 and sample_rate != next_sample_rate
1965 and not self.mass.config.get_raw_player_config_value(
1966 queue_item.queue_id,
1967 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
1968 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
1969 )
1970 ):
1971 self.logger.debug(
1972 "Skipping crossfade: player does not support gapless playback "
1973 "with different sample rates (%s vs %s)",
1974 sample_rate,
1975 next_sample_rate,
1976 )
1977 return False
1978
1979 return True
1980
1981 async def _periodic_garbage_collection(self) -> None:
1982 """Periodic garbage collection to free up memory from audio buffers and streams."""
1983 self.logger.log(
1984 VERBOSE_LOG_LEVEL,
1985 "Running periodic garbage collection...",
1986 )
1987 # Run garbage collection in executor to avoid blocking the event loop
1988 # Since this runs periodically (not in response to subprocess cleanup),
1989 # it's safe to run in a thread without causing thread-safety issues
1990 loop = asyncio.get_running_loop()
1991 collected = await loop.run_in_executor(None, gc.collect)
1992 self.logger.log(
1993 VERBOSE_LOG_LEVEL,
1994 "Garbage collection completed, collected %d objects",
1995 collected,
1996 )
1997 # Schedule next run in 15 minutes
1998 self.mass.call_later(900, self._periodic_garbage_collection)
1999
2000 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
2001 """Set up smart fades logger level."""
2002 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
2003 if log_level == "GLOBAL":
2004 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2005 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2006 else:
2007 self.smart_fades_analyzer.logger.setLevel(log_level)
2008 self.smart_fades_mixer.logger.setLevel(log_level)
2009