/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_HTTP_PROFILE,
49 CONF_OUTPUT_CHANNELS,
50 CONF_OUTPUT_CODEC,
51 CONF_PUBLISH_IP,
52 CONF_SAMPLE_RATES,
53 CONF_SMART_FADES_MODE,
54 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56 CONF_VOLUME_NORMALIZATION_RADIO,
57 CONF_VOLUME_NORMALIZATION_TRACKS,
58 DEFAULT_STREAM_HEADERS,
59 ICY_HEADERS,
60 INTERNAL_PCM_FORMAT,
61 SILENCE_FILE,
62 VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.player_controller import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70 get_buffered_media_stream,
71 get_chunksize,
72 get_media_stream,
73 get_player_filter_params,
74 get_stream_details,
75 resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81 divide_chunks,
82 format_ip_for_url,
83 get_ip_addresses,
84 get_total_system_memory,
85 select_free_port,
86)
87from music_assistant.helpers.webserver import Webserver
88from music_assistant.models.core_controller import CoreController
89from music_assistant.models.music_provider import MusicProvider
90from music_assistant.models.plugin import PluginProvider, PluginSource
91from music_assistant.models.smart_fades import SmartFadesMode
92from music_assistant.providers.universal_group.constants import UGP_PREFIX
93from music_assistant.providers.universal_group.player import UniversalGroupPlayer
94
95if TYPE_CHECKING:
96 from music_assistant_models.config_entries import CoreConfig
97 from music_assistant_models.player import PlayerMedia
98 from music_assistant_models.player_queue import PlayerQueue
99 from music_assistant_models.queue_item import QueueItem
100 from music_assistant_models.streamdetails import StreamDetails
101
102 from music_assistant.mass import MusicAssistant
103 from music_assistant.models.player import Player
104
105
106isfile = wrap(os.path.isfile)
107
108CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
109CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
110CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
111
112# Calculate total system memory once at module load time
113TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
114CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses(include_ipv6=True)
189 default_port = await select_free_port(8097, 9200)
190 return (
191 ConfigEntry(
192 key=CONF_ALLOW_BUFFER,
193 type=ConfigEntryType.BOOLEAN,
194 default_value=CONF_ALLOW_BUFFER_DEFAULT,
195 label="Allow (in-memory) buffering of (track) audio",
196 description="By default, Music Assistant tries to be as resource "
197 "efficient as possible when streaming audio, especially considering "
198 "low-end devices such as Raspberry Pi's. This means that audio "
199 "buffering is disabled by default to reduce memory usage. \n\n"
200 "Enabling this option allows for in-memory buffering of audio, "
201 "which (massively) improves playback (and seeking) performance but it comes "
202 "at the cost of increased memory usage. "
203 "If you run Music Assistant on a capable device with enough memory, "
204 "enabling this option is strongly recommended.",
205 required=False,
206 category="playback",
207 ),
208 ConfigEntry(
209 key=CONF_VOLUME_NORMALIZATION_RADIO,
210 type=ConfigEntryType.STRING,
211 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
212 label="Volume normalization method for radio streams",
213 options=[
214 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
215 for x in VolumeNormalizationMode
216 ],
217 category="playback",
218 ),
219 ConfigEntry(
220 key=CONF_VOLUME_NORMALIZATION_TRACKS,
221 type=ConfigEntryType.STRING,
222 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
223 label="Volume normalization method for tracks",
224 options=[
225 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
226 for x in VolumeNormalizationMode
227 ],
228 category="playback",
229 ),
230 ConfigEntry(
231 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
232 type=ConfigEntryType.FLOAT,
233 range=(-20, 10),
234 default_value=-6,
235 label="Fixed/fallback gain adjustment for radio streams",
236 category="playback",
237 ),
238 ConfigEntry(
239 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
240 type=ConfigEntryType.FLOAT,
241 range=(-20, 10),
242 default_value=-6,
243 label="Fixed/fallback gain adjustment for tracks",
244 category="playback",
245 ),
246 ConfigEntry(
247 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
248 type=ConfigEntryType.BOOLEAN,
249 default_value=False,
250 label="Allow crossfade between tracks from the same album",
251 description="Enabling this option allows for crossfading between tracks "
252 "that are part of the same album.",
253 category="playback",
254 ),
255 ConfigEntry(
256 key=CONF_PUBLISH_IP,
257 type=ConfigEntryType.STRING,
258 default_value=ip_addresses[0],
259 label="Published IP address",
260 description="This IP address is communicated to players where to find this server."
261 "\nMake sure that this IP can be reached by players on the local network, "
262 "otherwise audio streaming will not work.",
263 required=False,
264 category="generic",
265 advanced=True,
266 requires_reload=True,
267 ),
268 ConfigEntry(
269 key=CONF_BIND_PORT,
270 type=ConfigEntryType.INTEGER,
271 default_value=default_port,
272 label="TCP Port",
273 description="The TCP port to run the server. "
274 "Make sure that this server can be reached "
275 "on the given IP and TCP port by players on the local network.",
276 category="generic",
277 advanced=True,
278 requires_reload=True,
279 ),
280 ConfigEntry(
281 key=CONF_BIND_IP,
282 type=ConfigEntryType.STRING,
283 default_value="0.0.0.0",
284 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", "::", *ip_addresses}],
285 label="Bind to IP/interface",
286 description="Start the stream server on this specific interface. \n"
287 "Use 0.0.0.0 or :: to bind to all interfaces, which is the default. \n"
288 "This is an advanced setting that should normally "
289 "not be adjusted in regular setups.",
290 category="generic",
291 advanced=True,
292 required=False,
293 requires_reload=True,
294 ),
295 ConfigEntry(
296 key=CONF_SMART_FADES_LOG_LEVEL,
297 type=ConfigEntryType.STRING,
298 label="Smart Fades Log level",
299 description="Log level for the Smart Fades mixer and analyzer.",
300 options=CONF_ENTRY_LOG_LEVEL.options,
301 default_value="GLOBAL",
302 category="generic",
303 advanced=True,
304 ),
305 )
306
307 async def setup(self, config: CoreConfig) -> None:
308 """Async initialize of module."""
309 # copy log level to audio/ffmpeg loggers
310 AUDIO_LOGGER.setLevel(self.logger.level)
311 FFMPEG_LOGGER.setLevel(self.logger.level)
312 self._setup_smart_fades_logger(config)
313 # perform check for ffmpeg version
314 await check_ffmpeg_version()
315 # start the webserver
316 self.publish_port = config.get_value(CONF_BIND_PORT)
317 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
318 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
319 # print a big fat message in the log where the streamserver is running
320 # because this is a common source of issues for people with more complex setups
321 self.logger.log(
322 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
323 "\n\n################################################################################\n"
324 "Starting streamserver on %s:%s\n"
325 "This is the IP address that is communicated to players.\n"
326 "If this is incorrect, audio will not play!\n"
327 "See the documentation how to configure the publish IP for the Streamserver\n"
328 "in Settings --> Core modules --> Streamserver\n"
329 "################################################################################\n",
330 self.publish_ip,
331 self.publish_port,
332 )
333 await self._server.setup(
334 bind_ip=bind_ip,
335 bind_port=cast("int", self.publish_port),
336 base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
337 static_routes=[
338 (
339 "*",
340 "/flow/{session_id}/{queue_id}/{queue_item_id}.{fmt}",
341 self.serve_queue_flow_stream,
342 ),
343 (
344 "*",
345 "/single/{session_id}/{queue_id}/{queue_item_id}.{fmt}",
346 self.serve_queue_item_stream,
347 ),
348 (
349 "*",
350 "/command/{queue_id}/{command}.mp3",
351 self.serve_command_request,
352 ),
353 (
354 "*",
355 "/announcement/{player_id}.{fmt}",
356 self.serve_announcement_stream,
357 ),
358 (
359 "*",
360 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
361 self.serve_plugin_source_stream,
362 ),
363 ],
364 )
365 # Start periodic garbage collection task
366 # This ensures memory from audio buffers and streams is cleaned up regularly
367 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
368
369 async def close(self) -> None:
370 """Cleanup on exit."""
371 await self._server.close()
372
373 async def resolve_stream_url(
374 self,
375 session_id: str,
376 queue_item: QueueItem,
377 flow_mode: bool = False,
378 player_id: str | None = None,
379 ) -> str:
380 """Resolve the stream URL for the given QueueItem."""
381 if not player_id:
382 player_id = queue_item.queue_id
383 conf_output_codec = await self.mass.config.get_player_config_value(
384 player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
385 )
386 output_codec = ContentType.try_parse(conf_output_codec or "flac")
387 fmt = output_codec.value
388 # handle raw pcm without exact format specifiers
389 if output_codec.is_pcm() and ";" not in fmt:
390 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
391 base_path = "flow" if flow_mode else "single"
392 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501
393
394 async def get_plugin_source_url(
395 self,
396 plugin_source: PluginSource,
397 player_id: str,
398 ) -> str:
399 """Get the url for the Plugin Source stream/proxy."""
400 if plugin_source.audio_format.content_type.is_pcm():
401 fmt = ContentType.WAV.value
402 else:
403 fmt = plugin_source.audio_format.content_type.value
404 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
405
406 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
407 """Stream single queueitem audio to a player."""
408 self._log_request(request)
409 queue_id = request.match_info["queue_id"]
410 queue = self.mass.player_queues.get(queue_id)
411 if not queue:
412 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
413 session_id = request.match_info["session_id"]
414 if queue.session_id and session_id != queue.session_id:
415 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
416 queue_player = self.mass.players.get(queue_id)
417 queue_item_id = request.match_info["queue_item_id"]
418 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
419 if not queue_item:
420 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
421 if not queue_item.streamdetails:
422 try:
423 queue_item.streamdetails = await get_stream_details(
424 mass=self.mass, queue_item=queue_item
425 )
426 except Exception as e:
427 self.logger.error(
428 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
429 )
430 queue_item.available = False
431 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
432
433 # pick output format based on the streamdetails and player capabilities
434 if not queue_player:
435 raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
436
437 # work out pcm format based on streamdetails
438 pcm_format = await self._select_pcm_format(
439 player=queue_player,
440 streamdetails=queue_item.streamdetails,
441 smartfades_enabled=True,
442 )
443 output_format = await self.get_output_format(
444 output_format_str=request.match_info["fmt"],
445 player=queue_player,
446 content_sample_rate=pcm_format.sample_rate,
447 content_bit_depth=pcm_format.bit_depth,
448 )
449
450 # prepare request, add some DLNA/UPNP compatible headers
451 # icy-name is sanitized to avoid a "Potential header injection attack" exception by aiohttp
452 # see https://github.com/music-assistant/support/issues/4913
453 headers = {
454 **DEFAULT_STREAM_HEADERS,
455 "icy-name": queue_item.name.replace("\n", " ").replace("\r", " ").replace("\t", " "),
456 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
457 "Accept-Ranges": "none",
458 "Content-Type": f"audio/{output_format.output_format_str}",
459 }
460 resp = web.StreamResponse(
461 status=200,
462 reason="OK",
463 headers=headers,
464 )
465 resp.content_type = f"audio/{output_format.output_format_str}"
466 http_profile = await self.mass.config.get_player_config_value(
467 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
468 )
469 if http_profile == "forced_content_length" and not queue_item.duration:
470 # just set an insane high content length to make sure the player keeps playing
471 resp.content_length = get_chunksize(output_format, 12 * 3600)
472 elif http_profile == "forced_content_length" and queue_item.duration:
473 # guess content length based on duration
474 resp.content_length = get_chunksize(output_format, queue_item.duration)
475 elif http_profile == "chunked":
476 resp.enable_chunked_encoding()
477
478 await resp.prepare(request)
479
480 # return early if this is not a GET request
481 if request.method != "GET":
482 return resp
483
484 if queue_item.media_type != MediaType.TRACK:
485 # no crossfade on non-tracks
486 smart_fades_mode = SmartFadesMode.DISABLED
487 else:
488 smart_fades_mode = await self.mass.config.get_player_config_value(
489 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
490 )
491 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
492 queue.queue_id, CONF_CROSSFADE_DURATION, 10
493 )
494 if (
495 smart_fades_mode != SmartFadesMode.DISABLED
496 and PlayerFeature.GAPLESS_PLAYBACK not in queue_player.supported_features
497 ):
498 # crossfade is not supported on this player due to missing gapless playback
499 self.logger.warning(
500 "Crossfade disabled: Player %s does not support gapless playback, "
501 "consider enabling flow mode to enable crossfade on this player.",
502 queue_player.display_name if queue_player else "Unknown Player",
503 )
504 smart_fades_mode = SmartFadesMode.DISABLED
505
506 if smart_fades_mode != SmartFadesMode.DISABLED:
507 # crossfade is enabled, use special crossfaded single item stream
508 # where the crossfade of the next track is present in the stream of
509 # a single track. This only works if the player supports gapless playback!
510 audio_input = self.get_queue_item_stream_with_smartfade(
511 queue_item=queue_item,
512 pcm_format=pcm_format,
513 smart_fades_mode=smart_fades_mode,
514 standard_crossfade_duration=standard_crossfade_duration,
515 )
516 else:
517 # no crossfade, just a regular single item stream
518 audio_input = self.get_queue_item_stream(
519 queue_item=queue_item,
520 pcm_format=pcm_format,
521 seek_position=queue_item.streamdetails.seek_position,
522 )
523 # stream the audio
524 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
525 # the desired output format for the player including any player specific filter params
526 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
527 if queue_item.media_type == MediaType.RADIO:
528 # keep very short buffer for radio streams
529 # to keep them (more or less) realtime and prevent time outs
530 read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "2"]
531 else:
532 # just allow the player to buffer whatever it wants for single item streams
533 read_rate_input_args = None
534
535 first_chunk_received = False
536 bytes_sent = 0
537 async for chunk in get_ffmpeg_stream(
538 audio_input=audio_input,
539 input_format=pcm_format,
540 output_format=output_format,
541 filter_params=get_player_filter_params(
542 self.mass,
543 player_id=queue_player.player_id,
544 input_format=pcm_format,
545 output_format=output_format,
546 ),
547 extra_input_args=read_rate_input_args,
548 ):
549 try:
550 await resp.write(chunk)
551 bytes_sent += len(chunk)
552 if not first_chunk_received:
553 first_chunk_received = True
554 # inform the queue that the track is now loaded in the buffer
555 # so for example the next track can be enqueued
556 self.mass.player_queues.track_loaded_in_buffer(
557 queue_item.queue_id, queue_item.queue_item_id
558 )
559 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
560 if first_chunk_received and not queue_player.stop_called:
561 # Player disconnected (unexpected) after receiving at least some data
562 # This could indicate buffering issues, network problems,
563 # or player-specific issues
564 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
565 self.logger.warning(
566 "Player %s disconnected prematurely from stream for %s (%s) - "
567 "error: %s, sent %d bytes, expected (approx) bytes=%d",
568 queue.display_name,
569 queue_item.name,
570 queue_item.uri,
571 err.__class__.__name__,
572 bytes_sent,
573 bytes_expected,
574 )
575 break
576 if queue_item.streamdetails.stream_error:
577 self.logger.error(
578 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
579 queue_item.name,
580 queue_item.uri,
581 queue.display_name,
582 )
583 # try to skip to the next item in the queue after a short delay
584 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
585 return resp
586
587 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
588 """Stream Queue Flow audio to player."""
589 self._log_request(request)
590 queue_id = request.match_info["queue_id"]
591 queue = self.mass.player_queues.get(queue_id)
592 if not queue:
593 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
594 if not (queue_player := self.mass.players.get(queue_id)):
595 raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}")
596 start_queue_item_id = request.match_info["queue_item_id"]
597 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
598 if not start_queue_item:
599 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
600
601 queue.flow_mode_stream_log = []
602
603 # select the highest possible PCM settings for this player
604 flow_pcm_format = await self._select_flow_format(queue_player)
605
606 # work out output format/details
607 output_format = await self.get_output_format(
608 output_format_str=request.match_info["fmt"],
609 player=queue_player,
610 content_sample_rate=flow_pcm_format.sample_rate,
611 content_bit_depth=flow_pcm_format.bit_depth,
612 )
613 # work out ICY metadata support
614 icy_preference = self.mass.config.get_raw_player_config_value(
615 queue_id,
616 CONF_ENTRY_ENABLE_ICY_METADATA.key,
617 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
618 )
619 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
620 icy_meta_interval = 256000 if icy_preference == "full" else 16384
621
622 # prepare request, add some DLNA/UPNP compatible headers
623 headers = {
624 **DEFAULT_STREAM_HEADERS,
625 **ICY_HEADERS,
626 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
627 "Accept-Ranges": "none",
628 "Content-Type": f"audio/{output_format.output_format_str}",
629 }
630 if enable_icy:
631 headers["icy-metaint"] = str(icy_meta_interval)
632
633 resp = web.StreamResponse(
634 status=200,
635 reason="OK",
636 headers=headers,
637 )
638 http_profile = await self.mass.config.get_player_config_value(
639 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
640 )
641 if http_profile == "forced_content_length":
642 # just set an insane high content length to make sure the player keeps playing
643 resp.content_length = get_chunksize(output_format, 12 * 3600)
644 elif http_profile == "chunked":
645 resp.enable_chunked_encoding()
646
647 await resp.prepare(request)
648
649 # return early if this is not a GET request
650 if request.method != "GET":
651 return resp
652
653 # all checks passed, start streaming!
654 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
655 # the desired output format for the player including any player specific filter params
656 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
657 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
658
659 async for chunk in get_ffmpeg_stream(
660 audio_input=self.get_queue_flow_stream(
661 queue=queue,
662 start_queue_item=start_queue_item,
663 pcm_format=flow_pcm_format,
664 ),
665 input_format=flow_pcm_format,
666 output_format=output_format,
667 filter_params=get_player_filter_params(
668 self.mass, queue_player.player_id, flow_pcm_format, output_format
669 ),
670 # we need to slowly feed the music to avoid the player stopping and later
671 # restarting (or completely failing) the audio stream by keeping the buffer short.
672 # this is reported to be an issue especially with Chromecast players.
673 # see for example: https://github.com/music-assistant/support/issues/3717
674 # allow buffer ahead of 6 seconds and read rest in realtime
675 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
676 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
677 ):
678 try:
679 await resp.write(chunk)
680 except (BrokenPipeError, ConnectionResetError, ConnectionError):
681 # race condition
682 break
683
684 if not enable_icy:
685 continue
686
687 # if icy metadata is enabled, send the icy metadata after the chunk
688 if (
689 # use current item here and not buffered item, otherwise
690 # the icy metadata will be too much ahead
691 (current_item := queue.current_item)
692 and current_item.streamdetails
693 and current_item.streamdetails.stream_title
694 ):
695 title = current_item.streamdetails.stream_title
696 elif queue and current_item and current_item.name:
697 title = current_item.name
698 else:
699 title = "Music Assistant"
700 metadata = f"StreamTitle='{title}';".encode()
701 if icy_preference == "full" and current_item and current_item.image:
702 metadata += f"StreamURL='{current_item.image.path}'".encode()
703 while len(metadata) % 16 != 0:
704 metadata += b"\x00"
705 length = len(metadata)
706 length_b = chr(int(length / 16)).encode()
707 await resp.write(length_b + metadata)
708
709 return resp
710
711 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
712 """Handle special 'command' request for a player."""
713 self._log_request(request)
714 queue_id = request.match_info["queue_id"]
715 command = request.match_info["command"]
716 if command == "next":
717 self.mass.create_task(self.mass.player_queues.next(queue_id))
718 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
719
720 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
721 """Stream announcement audio to a player."""
722 self._log_request(request)
723 player_id = request.match_info["player_id"]
724 player = self.mass.player_queues.get(player_id)
725 if not player:
726 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
727 if not (announce_data := self.announcements.get(player_id)):
728 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
729
730 # work out output format/details
731 fmt = request.match_info["fmt"]
732 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
733
734 http_profile = await self.mass.config.get_player_config_value(
735 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
736 )
737 if http_profile == "forced_content_length":
738 # given the fact that an announcement is just a short audio clip,
739 # just send it over completely at once so we have a fixed content length
740 data = b""
741 async for chunk in self.get_announcement_stream(
742 announcement_url=announce_data["announcement_url"],
743 output_format=audio_format,
744 pre_announce=announce_data["pre_announce"],
745 pre_announce_url=announce_data["pre_announce_url"],
746 ):
747 data += chunk
748 return web.Response(
749 body=data,
750 content_type=f"audio/{audio_format.output_format_str}",
751 headers=DEFAULT_STREAM_HEADERS,
752 )
753
754 resp = web.StreamResponse(
755 status=200,
756 reason="OK",
757 headers=DEFAULT_STREAM_HEADERS,
758 )
759 resp.content_type = f"audio/{audio_format.output_format_str}"
760 if http_profile == "chunked":
761 resp.enable_chunked_encoding()
762
763 await resp.prepare(request)
764
765 # return early if this is not a GET request
766 if request.method != "GET":
767 return resp
768
769 # all checks passed, start streaming!
770 self.logger.debug(
771 "Start serving audio stream for Announcement %s to %s",
772 announce_data["announcement_url"],
773 player.display_name,
774 )
775 async for chunk in self.get_announcement_stream(
776 announcement_url=announce_data["announcement_url"],
777 output_format=audio_format,
778 pre_announce=announce_data["pre_announce"],
779 pre_announce_url=announce_data["pre_announce_url"],
780 ):
781 try:
782 await resp.write(chunk)
783 except (BrokenPipeError, ConnectionResetError):
784 break
785
786 self.logger.debug(
787 "Finished serving audio stream for Announcement %s to %s",
788 announce_data["announcement_url"],
789 player.display_name,
790 )
791
792 return resp
793
794 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
795 """Stream PluginSource audio to a player."""
796 self._log_request(request)
797 plugin_source_id = request.match_info["plugin_source"]
798 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
799 if not provider:
800 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
801 # work out output format/details
802 player_id = request.match_info["player_id"]
803 player = self.mass.players.get(player_id)
804 if not player:
805 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
806 plugin_source = provider.get_source()
807 output_format = await self.get_output_format(
808 output_format_str=request.match_info["fmt"],
809 player=player,
810 content_sample_rate=plugin_source.audio_format.sample_rate,
811 content_bit_depth=plugin_source.audio_format.bit_depth,
812 )
813 headers = {
814 **DEFAULT_STREAM_HEADERS,
815 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
816 "icy-name": plugin_source.name,
817 "Accept-Ranges": "none",
818 "Content-Type": f"audio/{output_format.output_format_str}",
819 }
820
821 resp = web.StreamResponse(
822 status=200,
823 reason="OK",
824 headers=headers,
825 )
826 resp.content_type = f"audio/{output_format.output_format_str}"
827 http_profile = await self.mass.config.get_player_config_value(
828 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
829 )
830 if http_profile == "forced_content_length":
831 # just set an insanely high content length to make sure the player keeps playing
832 resp.content_length = get_chunksize(output_format, 12 * 3600)
833 elif http_profile == "chunked":
834 resp.enable_chunked_encoding()
835
836 await resp.prepare(request)
837
838 # return early if this is not a GET request
839 if request.method != "GET":
840 return resp
841
842 # all checks passed, start streaming!
843 if not plugin_source.audio_format:
844 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
845 async for chunk in self.get_plugin_source_stream(
846 plugin_source_id=plugin_source_id,
847 output_format=output_format,
848 player_id=player_id,
849 player_filter_params=get_player_filter_params(
850 self.mass, player_id, plugin_source.audio_format, output_format
851 ),
852 ):
853 try:
854 await resp.write(chunk)
855 except (BrokenPipeError, ConnectionResetError, ConnectionError):
856 break
857 return resp
858
859 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
860 """Get the url for the special command stream."""
861 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
862
863 def get_announcement_url(
864 self,
865 player_id: str,
866 announce_data: AnnounceData,
867 content_type: ContentType = ContentType.MP3,
868 ) -> str:
869 """Get the url for the special announcement stream."""
870 self.announcements[player_id] = announce_data
871 # use stream server to host announcement on local network
872 # this ensures playback on all players, including ones that do not
873 # like https hosts and it also offers the pre-announce 'bell'
874 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
875
876 def get_stream(
877 self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
878 ) -> AsyncGenerator[bytes, None]:
879 """
880 Get a stream of the given media as raw PCM audio.
881
882 This is used as helper for player providers that can consume the raw PCM
883 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
884 """
885 # select audio source
886 if media.media_type == MediaType.ANNOUNCEMENT:
887 # special case: stream announcement
888 assert media.custom_data
889 audio_source = self.get_announcement_stream(
890 media.custom_data["announcement_url"],
891 output_format=pcm_format,
892 pre_announce=media.custom_data["pre_announce"],
893 pre_announce_url=media.custom_data["pre_announce_url"],
894 )
895 elif media.media_type == MediaType.PLUGIN_SOURCE:
896 # special case: plugin source stream
897 assert media.custom_data
898 audio_source = self.get_plugin_source_stream(
899 plugin_source_id=media.custom_data["source_id"],
900 output_format=pcm_format,
901 # need to pass player_id from the PlayerMedia object
902 # because this could have been a group
903 player_id=media.custom_data["player_id"],
904 )
905 elif (
906 media.media_type == MediaType.FLOW_STREAM
907 and media.source_id
908 and media.source_id.startswith(UGP_PREFIX)
909 and media.uri
910 and "/ugp/" in media.uri
911 ):
912 # special case: member player accessing UGP stream
913 # Check URI to distinguish from the UGP accessing its own stream
914 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
915 ugp_stream = ugp_player.stream
916 assert ugp_stream is not None # for type checker
917 if ugp_stream.base_pcm_format == pcm_format:
918 # no conversion needed
919 audio_source = ugp_stream.subscribe_raw()
920 else:
921 audio_source = ugp_stream.get_stream(output_format=pcm_format)
922 elif (
923 media.source_id
924 and media.queue_item_id
925 and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
926 ):
927 # regular queue (flow) stream request
928 queue = self.mass.player_queues.get(media.source_id)
929 assert queue
930 start_queue_item = self.mass.player_queues.get_item(
931 media.source_id, media.queue_item_id
932 )
933 assert start_queue_item
934 audio_source = self.mass.streams.get_queue_flow_stream(
935 queue=queue,
936 start_queue_item=start_queue_item,
937 pcm_format=pcm_format,
938 )
939 elif media.source_id and media.queue_item_id:
940 # single item stream (e.g. radio)
941 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
942 assert queue_item
943 audio_source = buffered(
944 self.get_queue_item_stream(
945 queue_item=queue_item,
946 pcm_format=pcm_format,
947 ),
948 buffer_size=10,
949 min_buffer_before_yield=2,
950 )
951 else:
952 # assume url or some other direct path
953 # NOTE: this will fail if its an uri not playable by ffmpeg
954 audio_source = get_ffmpeg_stream(
955 audio_input=media.uri,
956 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
957 output_format=pcm_format,
958 )
959 return audio_source
960
961 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
962 async def get_queue_flow_stream(
963 self,
964 queue: PlayerQueue,
965 start_queue_item: QueueItem,
966 pcm_format: AudioFormat,
967 ) -> AsyncGenerator[bytes, None]:
968 """
969 Get a flow stream of all tracks in the queue as raw PCM audio.
970
971 yields chunks of exactly 1 second of audio in the given pcm_format.
972 """
973 # ruff: noqa: PLR0915
974 assert pcm_format.content_type.is_pcm()
975 queue_track = None
976 last_fadeout_part: bytes = b""
977 last_streamdetails: StreamDetails | None = None
978 last_play_log_entry: PlayLogEntry | None = None
979 queue.flow_mode = True
980 if not start_queue_item:
981 # this can happen in some (edge case) race conditions
982 return
983 pcm_sample_size = pcm_format.pcm_sample_size
984 if start_queue_item.media_type != MediaType.TRACK:
985 # no crossfade on non-tracks
986 smart_fades_mode = SmartFadesMode.DISABLED
987 standard_crossfade_duration = 0
988 else:
989 smart_fades_mode = await self.mass.config.get_player_config_value(
990 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
991 )
992 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
993 queue.queue_id, CONF_CROSSFADE_DURATION, 10
994 )
995 self.logger.info(
996 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
997 queue.display_name,
998 smart_fades_mode,
999 f"({standard_crossfade_duration}s)"
1000 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
1001 else "",
1002 )
1003 total_bytes_sent = 0
1004 total_chunks_received = 0
1005
1006 while True:
1007 # get (next) queue item to stream
1008 if queue_track is None:
1009 queue_track = start_queue_item
1010 else:
1011 try:
1012 queue_track = await self.mass.player_queues.load_next_queue_item(
1013 queue.queue_id, queue_track.queue_item_id
1014 )
1015 except QueueEmpty:
1016 break
1017
1018 if queue_track.streamdetails is None:
1019 raise InvalidDataError(
1020 "No Streamdetails known for queue item %s",
1021 queue_track.queue_item_id,
1022 )
1023
1024 self.logger.debug(
1025 "Start Streaming queue track: %s (%s) for queue %s",
1026 queue_track.streamdetails.uri,
1027 queue_track.name,
1028 queue.display_name,
1029 )
1030 # append to play log so the queue controller can work out which track is playing
1031 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1032 queue.flow_mode_stream_log.append(play_log_entry)
1033 # calculate crossfade buffer size
1034 crossfade_buffer_duration = (
1035 SMART_CROSSFADE_DURATION
1036 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1037 else standard_crossfade_duration
1038 )
1039 crossfade_buffer_duration = min(
1040 crossfade_buffer_duration,
1041 int(queue_track.streamdetails.duration / 2)
1042 if queue_track.streamdetails.duration
1043 else crossfade_buffer_duration,
1044 )
1045 # Ensure crossfade buffer size is aligned to frame boundaries
1046 # Frame size = bytes_per_sample * channels
1047 bytes_per_sample = pcm_format.bit_depth // 8
1048 frame_size = bytes_per_sample * pcm_format.channels
1049 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1050 # Round down to nearest frame boundary
1051 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1052
1053 bytes_written = 0
1054 buffer = b""
1055 # handle incoming audio chunks
1056 first_chunk_received = False
1057 # buffer size needs to be big enough to include the crossfade part
1058
1059 async for chunk in self.get_queue_item_stream(
1060 queue_track,
1061 pcm_format=pcm_format,
1062 seek_position=queue_track.streamdetails.seek_position,
1063 raise_on_error=False,
1064 ):
1065 total_chunks_received += 1
1066 if not first_chunk_received:
1067 first_chunk_received = True
1068 # inform the queue that the track is now loaded in the buffer
1069 # so the next track can be preloaded
1070 self.mass.player_queues.track_loaded_in_buffer(
1071 queue.queue_id, queue_track.queue_item_id
1072 )
1073 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1074 # we want a stream to start as quickly as possible
1075 # so for the first 10 chunks we keep a very short buffer
1076 req_buffer_size = pcm_format.pcm_sample_size
1077 else:
1078 req_buffer_size = (
1079 pcm_sample_size
1080 if smart_fades_mode == SmartFadesMode.DISABLED
1081 else crossfade_buffer_size
1082 )
1083
1084 # ALWAYS APPEND CHUNK TO BUFFER
1085 buffer += chunk
1086 del chunk
1087 if len(buffer) < req_buffer_size:
1088 # buffer is not full enough, move on
1089 # yield control to event loop with 10ms delay
1090 await asyncio.sleep(0.01)
1091 continue
1092
1093 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1094 if last_fadeout_part and last_streamdetails:
1095 # perform crossfade
1096 fadein_part = buffer[:crossfade_buffer_size]
1097 remaining_bytes = buffer[crossfade_buffer_size:]
1098 # Use the mixer to handle all crossfade logic
1099 crossfade_part = await self._smart_fades_mixer.mix(
1100 fade_in_part=fadein_part,
1101 fade_out_part=last_fadeout_part,
1102 fade_in_streamdetails=queue_track.streamdetails,
1103 fade_out_streamdetails=last_streamdetails,
1104 pcm_format=pcm_format,
1105 standard_crossfade_duration=standard_crossfade_duration,
1106 mode=smart_fades_mode,
1107 )
1108 # because the crossfade exists of both the fadein and fadeout part
1109 # we need to correct the bytes_written accordingly so the duration
1110 # calculations at the end of the track are correct
1111 crossfade_part_len = len(crossfade_part)
1112 bytes_written += int(crossfade_part_len / 2)
1113 if last_play_log_entry:
1114 assert last_play_log_entry.seconds_streamed is not None
1115 last_play_log_entry.seconds_streamed += (
1116 crossfade_part_len / 2 / pcm_sample_size
1117 )
1118 # yield crossfade_part (in pcm_sample_size chunks)
1119 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1120 yield _chunk
1121 del _chunk
1122 del crossfade_part
1123 # also write the leftover bytes from the crossfade action
1124 if remaining_bytes:
1125 yield remaining_bytes
1126 bytes_written += len(remaining_bytes)
1127 del remaining_bytes
1128 # clear vars
1129 last_fadeout_part = b""
1130 last_streamdetails = None
1131 buffer = b""
1132
1133 #### OTHER: enough data in buffer, feed to output
1134 while len(buffer) > req_buffer_size:
1135 yield buffer[:pcm_sample_size]
1136 bytes_written += pcm_sample_size
1137 buffer = buffer[pcm_sample_size:]
1138
1139 #### HANDLE END OF TRACK
1140 if last_fadeout_part:
1141 # edge case: we did not get enough data to make the crossfade
1142 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1143 yield _chunk
1144 del _chunk
1145 bytes_written += len(last_fadeout_part)
1146 last_fadeout_part = b""
1147 if self._crossfade_allowed(
1148 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1149 ):
1150 # if crossfade is enabled, save fadeout part to pickup for next track
1151 last_fadeout_part = buffer[-crossfade_buffer_size:]
1152 last_streamdetails = queue_track.streamdetails
1153 last_play_log_entry = play_log_entry
1154 remaining_bytes = buffer[:-crossfade_buffer_size]
1155 if remaining_bytes:
1156 yield remaining_bytes
1157 bytes_written += len(remaining_bytes)
1158 del remaining_bytes
1159 elif buffer:
1160 # no crossfade enabled, just yield the buffer last part
1161 bytes_written += len(buffer)
1162 for _chunk in divide_chunks(buffer, pcm_sample_size):
1163 yield _chunk
1164 del _chunk
1165 # make sure the buffer gets cleaned up
1166 del buffer
1167
1168 # update duration details based on the actual pcm data we sent
1169 # this also accounts for crossfade and silence stripping
1170 seconds_streamed = bytes_written / pcm_sample_size
1171 queue_track.streamdetails.seconds_streamed = seconds_streamed
1172 queue_track.streamdetails.duration = int(
1173 queue_track.streamdetails.seek_position + seconds_streamed
1174 )
1175 play_log_entry.seconds_streamed = seconds_streamed
1176 play_log_entry.duration = queue_track.streamdetails.duration
1177 total_bytes_sent += bytes_written
1178 self.logger.debug(
1179 "Finished Streaming queue track: %s (%s) on queue %s",
1180 queue_track.streamdetails.uri,
1181 queue_track.name,
1182 queue.display_name,
1183 )
1184 #### HANDLE END OF QUEUE FLOW STREAM
1185 # end of queue flow: make sure we yield the last_fadeout_part
1186 if last_fadeout_part:
1187 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1188 yield _chunk
1189 del _chunk
1190 # correct seconds streamed/duration
1191 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1192 streamdetails = queue_track.streamdetails
1193 assert streamdetails is not None
1194 streamdetails.seconds_streamed = (
1195 streamdetails.seconds_streamed or 0
1196 ) + last_part_seconds
1197 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1198 last_fadeout_part = b""
1199 total_bytes_sent += bytes_written
1200 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1201
1202 async def get_announcement_stream(
1203 self,
1204 announcement_url: str,
1205 output_format: AudioFormat,
1206 pre_announce: bool | str = False,
1207 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1208 ) -> AsyncGenerator[bytes, None]:
1209 """Get the special announcement stream."""
1210 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1211 # we are doing announcement in PCM first to avoid multiple encodings
1212 # when mixing pre-announce and announcement
1213 # also we have to deal with some TTS sources being super slow in delivering audio
1214 # so we take an approach where we start fetching the announcement in the background
1215 # while we can already start playing the pre-announce sound (if any)
1216
1217 pcm_format = (
1218 output_format
1219 if output_format.content_type.is_pcm()
1220 else AudioFormat(
1221 sample_rate=output_format.sample_rate,
1222 content_type=ContentType.PCM_S16LE,
1223 bit_depth=16,
1224 channels=output_format.channels,
1225 )
1226 )
1227
1228 async def fetch_announcement() -> None:
1229 fmt = announcement_url.rsplit(".")[-1]
1230 async for chunk in get_ffmpeg_stream(
1231 audio_input=announcement_url,
1232 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1233 output_format=pcm_format,
1234 chunk_size=get_chunksize(pcm_format, 1),
1235 ):
1236 await announcement_data.put(chunk)
1237 await announcement_data.put(None) # signal end of stream
1238
1239 self.mass.create_task(fetch_announcement())
1240
1241 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1242 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1243 if pre_announce:
1244 async for chunk in get_ffmpeg_stream(
1245 audio_input=pre_announce_url,
1246 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1247 output_format=pcm_format,
1248 chunk_size=get_chunksize(pcm_format, 1),
1249 ):
1250 yield chunk
1251 # pad silence while we're waiting for the announcement to be ready
1252 while announcement_data.empty():
1253 yield b"\0" * int(
1254 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1255 )
1256 await asyncio.sleep(0.1)
1257 # stream announcement
1258 while True:
1259 announcement_chunk = await announcement_data.get()
1260 if announcement_chunk is None:
1261 break
1262 yield announcement_chunk
1263
1264 if output_format == pcm_format:
1265 # no need to re-encode, just yield the raw PCM stream
1266 async for chunk in _announcement_stream():
1267 yield chunk
1268 return
1269
1270 # stream final announcement in requested output format
1271 async for chunk in get_ffmpeg_stream(
1272 audio_input=_announcement_stream(),
1273 input_format=pcm_format,
1274 output_format=output_format,
1275 ):
1276 yield chunk
1277
1278 async def get_plugin_source_stream(
1279 self,
1280 plugin_source_id: str,
1281 output_format: AudioFormat,
1282 player_id: str,
1283 player_filter_params: list[str] | None = None,
1284 ) -> AsyncGenerator[bytes, None]:
1285 """Get the special plugin source stream."""
1286 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1287 if not plugin_prov:
1288 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1289
1290 plugin_source = plugin_prov.get_source()
1291 self.logger.debug(
1292 "Start streaming PluginSource %s to %s using output format %s",
1293 plugin_source_id,
1294 player_id,
1295 output_format,
1296 )
1297 # this should already be set by the player controller, but just to be sure
1298 plugin_source.in_use_by = player_id
1299
1300 try:
1301 async for chunk in get_ffmpeg_stream(
1302 audio_input=cast(
1303 "str | AsyncGenerator[bytes, None]",
1304 plugin_prov.get_audio_stream(player_id)
1305 if plugin_source.stream_type == StreamType.CUSTOM
1306 else plugin_source.path,
1307 ),
1308 input_format=plugin_source.audio_format,
1309 output_format=output_format,
1310 filter_params=player_filter_params,
1311 extra_input_args=["-y", "-re"],
1312 ):
1313 if plugin_source.in_use_by != player_id:
1314 # another player took over or the stream ended, stop streaming
1315 break
1316 yield chunk
1317 finally:
1318 self.logger.debug(
1319 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1320 )
1321 await asyncio.sleep(1) # prevent race conditions when selecting source
1322 if plugin_source.in_use_by == player_id:
1323 # release control
1324 plugin_source.in_use_by = None
1325
1326 async def get_queue_item_stream(
1327 self,
1328 queue_item: QueueItem,
1329 pcm_format: AudioFormat,
1330 seek_position: int = 0,
1331 raise_on_error: bool = True,
1332 ) -> AsyncGenerator[bytes, None]:
1333 """Get the (PCM) audio stream for a single queue item."""
1334 # collect all arguments for ffmpeg
1335 streamdetails = queue_item.streamdetails
1336 assert streamdetails
1337 filter_params: list[str] = []
1338
1339 # handle volume normalization
1340 gain_correct: float | None = None
1341 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1342 # volume normalization using loudnorm filter (in dynamic mode)
1343 # which also collects the measurement on the fly during playback
1344 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1345 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1346 filter_rule += ":print_format=json"
1347 filter_params.append(filter_rule)
1348 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1349 # apply user defined fixed volume/gain correction
1350 config_key = (
1351 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1352 if streamdetails.media_type == MediaType.TRACK
1353 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1354 )
1355 gain_value = await self.mass.config.get_core_config_value(
1356 self.domain, config_key, default=0.0, return_type=float
1357 )
1358 gain_correct = round(gain_value, 2)
1359 filter_params.append(f"volume={gain_correct}dB")
1360 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1361 # volume normalization with known loudness measurement
1362 # apply volume/gain correction
1363 target_loudness = (
1364 float(streamdetails.target_loudness)
1365 if streamdetails.target_loudness is not None
1366 else 0.0
1367 )
1368 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1369 gain_correct = target_loudness - float(streamdetails.loudness_album)
1370 elif streamdetails.loudness is not None:
1371 gain_correct = target_loudness - float(streamdetails.loudness)
1372 else:
1373 gain_correct = 0.0
1374 gain_correct = round(gain_correct, 2)
1375 filter_params.append(f"volume={gain_correct}dB")
1376 streamdetails.volume_normalization_gain_correct = gain_correct
1377
1378 allow_buffer = bool(
1379 self.mass.config.get_raw_core_config_value(
1380 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1381 )
1382 and streamdetails.duration
1383 )
1384
1385 self.logger.debug(
1386 "Starting queue item stream for %s (%s)"
1387 " - using buffer: %s"
1388 " - using fade-in: %s"
1389 " - using volume normalization: %s",
1390 queue_item.name,
1391 streamdetails.uri,
1392 allow_buffer,
1393 streamdetails.fade_in,
1394 streamdetails.volume_normalization_mode,
1395 )
1396 if allow_buffer:
1397 media_stream_gen = get_buffered_media_stream(
1398 self.mass,
1399 streamdetails=streamdetails,
1400 pcm_format=pcm_format,
1401 seek_position=int(seek_position),
1402 filter_params=filter_params,
1403 )
1404 else:
1405 media_stream_gen = get_media_stream(
1406 self.mass,
1407 streamdetails=streamdetails,
1408 pcm_format=pcm_format,
1409 seek_position=int(seek_position),
1410 filter_params=filter_params,
1411 )
1412
1413 first_chunk_received = False
1414 fade_in_buffer = b""
1415 bytes_received = 0
1416 finished = False
1417 stream_started_at = asyncio.get_event_loop().time()
1418 try:
1419 async for chunk in media_stream_gen:
1420 bytes_received += len(chunk)
1421 if not first_chunk_received:
1422 first_chunk_received = True
1423 self.logger.debug(
1424 "First audio chunk received for %s (%s) after %.2f seconds",
1425 queue_item.name,
1426 streamdetails.uri,
1427 asyncio.get_event_loop().time() - stream_started_at,
1428 )
1429 # handle optional fade-in
1430 if streamdetails.fade_in:
1431 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1432 fade_in_buffer += chunk
1433 elif fade_in_buffer:
1434 async for fade_chunk in get_ffmpeg_stream(
1435 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1436 # but FFMpeg class actually accepts bytes too. This works at
1437 # runtime but needs type: ignore for mypy.
1438 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1439 input_format=pcm_format,
1440 output_format=pcm_format,
1441 filter_params=["afade=type=in:start_time=0:duration=3"],
1442 ):
1443 yield fade_chunk
1444 fade_in_buffer = b""
1445 streamdetails.fade_in = False
1446 else:
1447 yield chunk
1448 # help garbage collection by explicitly deleting chunk
1449 del chunk
1450 finished = True
1451 except AudioError as err:
1452 streamdetails.stream_error = True
1453 queue_item.available = False
1454 if raise_on_error:
1455 raise
1456 # yes, we swallow the error here after logging it
1457 # so the outer stream can handle it gracefully
1458 self.logger.error(
1459 "AudioError while streaming queue item %s (%s): %s",
1460 queue_item.name,
1461 streamdetails.uri,
1462 err,
1463 )
1464 finally:
1465 # determine how many seconds we've streamed
1466 # for pcm output we can calculate this easily
1467 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1468 streamdetails.seconds_streamed = seconds_streamed
1469 self.logger.debug(
1470 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1471 "aborted" if not finished else "finished",
1472 streamdetails.uri,
1473 asyncio.get_event_loop().time() - stream_started_at,
1474 seconds_streamed,
1475 )
1476 # report stream to provider
1477 if (finished or seconds_streamed >= 90) and (
1478 music_prov := self.mass.get_provider(streamdetails.provider)
1479 ):
1480 if TYPE_CHECKING: # avoid circular import
1481 assert isinstance(music_prov, MusicProvider)
1482 self.mass.create_task(music_prov.on_streamed(streamdetails))
1483
1484 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1485 async def get_queue_item_stream_with_smartfade(
1486 self,
1487 queue_item: QueueItem,
1488 pcm_format: AudioFormat,
1489 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1490 standard_crossfade_duration: int = 10,
1491 ) -> AsyncGenerator[bytes, None]:
1492 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1493 queue = self.mass.player_queues.get(queue_item.queue_id)
1494 if not queue:
1495 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1496
1497 streamdetails = queue_item.streamdetails
1498 assert streamdetails
1499 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1500
1501 if crossfade_data and streamdetails.seek_position > 0:
1502 # don't do crossfade when seeking into track
1503 crossfade_data = None
1504 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1505 # edge case alert: the next item changed just while we were preloading/crossfading
1506 self.logger.warning(
1507 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1508 )
1509 crossfade_data = None
1510
1511 self.logger.debug(
1512 "Start Streaming queue track: %s (%s) for queue %s "
1513 "- crossfade mode: %s "
1514 "- crossfading from previous track: %s ",
1515 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1516 queue_item.name,
1517 queue.display_name,
1518 smart_fades_mode,
1519 "true" if crossfade_data else "false",
1520 )
1521
1522 buffer = b""
1523 bytes_written = 0
1524 # calculate crossfade buffer size
1525 crossfade_buffer_duration = (
1526 SMART_CROSSFADE_DURATION
1527 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1528 else standard_crossfade_duration
1529 )
1530 crossfade_buffer_duration = min(
1531 crossfade_buffer_duration,
1532 int(streamdetails.duration / 2)
1533 if streamdetails.duration
1534 else crossfade_buffer_duration,
1535 )
1536 # Ensure crossfade buffer size is aligned to frame boundaries
1537 # Frame size = bytes_per_sample * channels
1538 bytes_per_sample = pcm_format.bit_depth // 8
1539 frame_size = bytes_per_sample * pcm_format.channels
1540 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1541 # Round down to nearest frame boundary
1542 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1543 fade_out_data: bytes | None = None
1544
1545 if crossfade_data:
1546 # Calculate discard amount in seconds (format-independent)
1547 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1548 fade_in_duration_seconds = (
1549 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1550 )
1551 discard_seconds = int(fade_in_duration_seconds) - 1
1552 # Calculate discard amounts in CURRENT track's format
1553 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1554 # Convert fade_in_size to current track's format for correct leftover calculation
1555 fade_in_size_in_current_format = int(
1556 fade_in_duration_seconds * pcm_format.pcm_sample_size
1557 )
1558 discard_leftover = fade_in_size_in_current_format - discard_bytes
1559 else:
1560 discard_seconds = streamdetails.seek_position
1561 discard_leftover = 0
1562 total_chunks_received = 0
1563 req_buffer_size = crossfade_buffer_size
1564 async for chunk in self.get_queue_item_stream(
1565 queue_item, pcm_format, seek_position=discard_seconds
1566 ):
1567 total_chunks_received += 1
1568 if discard_leftover:
1569 # discard leftover bytes from crossfade data
1570 chunk = chunk[discard_leftover:] # noqa: PLW2901
1571 discard_leftover = 0
1572
1573 if total_chunks_received < 10:
1574 # we want a stream to start as quickly as possible
1575 # so for the first 10 chunks we keep a very short buffer
1576 req_buffer_size = pcm_format.pcm_sample_size
1577 else:
1578 req_buffer_size = crossfade_buffer_size
1579
1580 # ALWAYS APPEND CHUNK TO BUFFER
1581 buffer += chunk
1582 del chunk
1583 if len(buffer) < req_buffer_size:
1584 # buffer is not full enough, move on
1585 continue
1586
1587 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1588 if crossfade_data:
1589 # send the (second half of the) crossfade data
1590 if crossfade_data.pcm_format != pcm_format:
1591 # edge case: pcm format mismatch, we need to resample
1592 self.logger.debug(
1593 "Resampling crossfade data from %s to %s for queue %s",
1594 crossfade_data.pcm_format.sample_rate,
1595 pcm_format.sample_rate,
1596 queue.display_name,
1597 )
1598 resampled_data = await resample_pcm_audio(
1599 crossfade_data.data,
1600 crossfade_data.pcm_format,
1601 pcm_format,
1602 )
1603 if resampled_data:
1604 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1605 yield _chunk
1606 bytes_written += len(resampled_data)
1607 else:
1608 # Resampling failed, error already logged in resample_pcm_audio
1609 # Skip crossfade data entirely - stream continues without it
1610 self.logger.warning(
1611 "Skipping crossfade data for queue %s due to resampling failure",
1612 queue.display_name,
1613 )
1614 else:
1615 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1616 yield _chunk
1617 bytes_written += len(crossfade_data.data)
1618 # clear vars
1619 crossfade_data = None
1620
1621 #### OTHER: enough data in buffer, feed to output
1622 while len(buffer) > req_buffer_size:
1623 yield buffer[: pcm_format.pcm_sample_size]
1624 bytes_written += pcm_format.pcm_sample_size
1625 buffer = buffer[pcm_format.pcm_sample_size :]
1626
1627 #### HANDLE END OF TRACK
1628
1629 if crossfade_data:
1630 # edge case: we did not get enough data to send the crossfade data
1631 # send the (second half of the) crossfade data
1632 if crossfade_data.pcm_format != pcm_format:
1633 # (yet another) edge case: pcm format mismatch, we need to resample
1634 self.logger.debug(
1635 "Resampling remaining crossfade data from %s to %s for queue %s",
1636 crossfade_data.pcm_format.sample_rate,
1637 pcm_format.sample_rate,
1638 queue.display_name,
1639 )
1640 resampled_crossfade_data = await resample_pcm_audio(
1641 crossfade_data.data,
1642 crossfade_data.pcm_format,
1643 pcm_format,
1644 )
1645 if resampled_crossfade_data:
1646 crossfade_data.data = resampled_crossfade_data
1647 else:
1648 # Resampling failed, error already logged in resample_pcm_audio
1649 # Skip the crossfade data entirely
1650 self.logger.warning(
1651 "Skipping remaining crossfade data for queue %s due to resampling failure",
1652 queue.display_name,
1653 )
1654 crossfade_data = None
1655 if crossfade_data:
1656 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1657 yield _chunk
1658 bytes_written += len(crossfade_data.data)
1659 crossfade_data = None
1660
1661 # get next track for crossfade
1662 next_queue_item: QueueItem | None
1663 try:
1664 self.logger.debug(
1665 "Preloading NEXT track for crossfade for queue %s",
1666 queue.display_name,
1667 )
1668 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1669 queue.queue_id, queue_item.queue_item_id
1670 )
1671 # set index_in_buffer to prevent our next track is overwritten while preloading
1672 if next_queue_item.streamdetails is None:
1673 raise InvalidDataError(
1674 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1675 )
1676 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1677 queue.queue_id, next_queue_item.queue_item_id
1678 )
1679 queue_player = self.mass.players.get(queue.queue_id)
1680 assert queue_player is not None
1681 next_queue_item_pcm_format = await self._select_pcm_format(
1682 player=queue_player,
1683 streamdetails=next_queue_item.streamdetails,
1684 smartfades_enabled=True,
1685 )
1686 except QueueEmpty:
1687 # end of queue reached, no next item
1688 next_queue_item = None
1689
1690 if not next_queue_item or not self._crossfade_allowed(
1691 queue_item,
1692 smart_fades_mode=smart_fades_mode,
1693 flow_mode=False,
1694 next_queue_item=next_queue_item,
1695 sample_rate=pcm_format.sample_rate,
1696 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1697 ):
1698 # no crossfade enabled/allowed, just yield the buffer last part
1699 bytes_written += len(buffer)
1700 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1701 yield _chunk
1702 else:
1703 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1704 fade_out_data = buffer
1705 buffer = b""
1706 try:
1707 async for chunk in self.get_queue_item_stream(
1708 next_queue_item, next_queue_item_pcm_format
1709 ):
1710 # append to buffer until we reach crossfade size
1711 # we only need the first X seconds of the NEXT track so we can
1712 # perform the crossfade.
1713 # the crossfaded audio of the previous and next track will be
1714 # sent in two equal parts: first half now, second half
1715 # when the next track starts. We use CrossfadeData to store
1716 # the second half to be picked up by the next track's stream generator.
1717 # Note that we more or less expect the user to have enabled the in-memory
1718 # buffer so we can keep the next track's audio data in memory.
1719 buffer += chunk
1720 del chunk
1721 if len(buffer) >= crossfade_buffer_size:
1722 break
1723 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1724 # Store original buffer size before any resampling for fade_in_size calculation
1725 # This size is in the next track's original format which is what we need
1726 original_buffer_size = len(buffer)
1727 if next_queue_item_pcm_format != pcm_format:
1728 # edge case: pcm format mismatch, we need to resample the next track's
1729 # beginning part before crossfading
1730 self.logger.debug(
1731 "Resampling next track's crossfade from %s to %s for queue %s",
1732 next_queue_item_pcm_format.sample_rate,
1733 pcm_format.sample_rate,
1734 queue.display_name,
1735 )
1736 buffer = await resample_pcm_audio(
1737 buffer,
1738 next_queue_item_pcm_format,
1739 pcm_format,
1740 )
1741 # perform actual (smart fades) crossfade using mixer
1742 crossfade_bytes = await self._smart_fades_mixer.mix(
1743 fade_in_part=buffer,
1744 fade_out_part=fade_out_data,
1745 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1746 fade_out_streamdetails=streamdetails,
1747 pcm_format=pcm_format,
1748 standard_crossfade_duration=standard_crossfade_duration,
1749 mode=smart_fades_mode,
1750 )
1751 # send half of the crossfade_part (= approx the fadeout part)
1752 split_point = (len(crossfade_bytes) + 1) // 2
1753 crossfade_first = crossfade_bytes[:split_point]
1754 crossfade_second = crossfade_bytes[split_point:]
1755 del crossfade_bytes
1756 bytes_written += len(crossfade_first)
1757 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1758 yield _chunk
1759 # store the other half for the next track
1760 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1761 # because it was created from the resampled buffer used for mixing.
1762 # BUT fade_in_size represents bytes in NEXT track's original format
1763 # (next_queue_item_pcm_format) because that's how much of the next track
1764 # was consumed during the crossfade. We need both formats to correctly
1765 # handle the crossfade data when the next track starts.
1766 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1767 data=crossfade_second,
1768 fade_in_size=original_buffer_size,
1769 pcm_format=pcm_format, # Format of the data (current track)
1770 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1771 queue_item_id=next_queue_item.queue_item_id,
1772 )
1773 except AudioError:
1774 # no crossfade possible, just yield the fade_out_data
1775 next_queue_item = None
1776 yield fade_out_data
1777 bytes_written += len(fade_out_data)
1778 del fade_out_data
1779 # make sure the buffer gets cleaned up
1780 del buffer
1781 # update duration details based on the actual pcm data we sent
1782 # this also accounts for crossfade and silence stripping
1783 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1784 streamdetails.seconds_streamed = seconds_streamed
1785 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1786 self.logger.debug(
1787 "Finished Streaming queue track: %s (%s) on queue %s "
1788 "- crossfade data prepared for next track: %s",
1789 streamdetails.uri,
1790 queue_item.name,
1791 queue.display_name,
1792 next_queue_item.name if next_queue_item else "N/A",
1793 )
1794
1795 def _log_request(self, request: web.Request) -> None:
1796 """Log request."""
1797 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1798 self.logger.log(
1799 VERBOSE_LOG_LEVEL,
1800 "Got %s request to %s from %s\nheaders: %s\n",
1801 request.method,
1802 request.path,
1803 request.remote,
1804 request.headers,
1805 )
1806 else:
1807 self.logger.debug(
1808 "Got %s request to %s from %s",
1809 request.method,
1810 request.path,
1811 request.remote,
1812 )
1813
1814 async def get_output_format(
1815 self,
1816 output_format_str: str,
1817 player: Player,
1818 content_sample_rate: int,
1819 content_bit_depth: int,
1820 ) -> AudioFormat:
1821 """Parse (player specific) output format details for given format string."""
1822 content_type: ContentType = ContentType.try_parse(output_format_str)
1823 supported_rates_conf = cast(
1824 "list[tuple[str, str]]",
1825 await self.mass.config.get_player_config_value(
1826 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1827 ),
1828 )
1829 output_channels_str = self.mass.config.get_raw_player_config_value(
1830 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1831 )
1832 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1833 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1834
1835 player_max_bit_depth = max(supported_bit_depths)
1836 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1837 if content_sample_rate in supported_sample_rates:
1838 output_sample_rate = content_sample_rate
1839 else:
1840 output_sample_rate = max(supported_sample_rates)
1841
1842 if not content_type.is_lossless():
1843 # no point in having a higher bit depth for lossy formats
1844 output_bit_depth = 16
1845 output_sample_rate = min(48000, output_sample_rate)
1846 if output_format_str == "pcm":
1847 content_type = ContentType.from_bit_depth(output_bit_depth)
1848 return AudioFormat(
1849 content_type=content_type,
1850 sample_rate=output_sample_rate,
1851 bit_depth=output_bit_depth,
1852 channels=1 if output_channels_str != "stereo" else 2,
1853 )
1854
1855 async def _select_flow_format(
1856 self,
1857 player: Player,
1858 ) -> AudioFormat:
1859 """Parse (player specific) flow stream PCM format."""
1860 supported_rates_conf = cast(
1861 "list[tuple[str, str]]",
1862 await self.mass.config.get_player_config_value(
1863 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1864 ),
1865 )
1866 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1867 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1868 for sample_rate in (192000, 96000, 48000, 44100):
1869 if sample_rate in supported_sample_rates:
1870 output_sample_rate = sample_rate
1871 break
1872 return AudioFormat(
1873 content_type=INTERNAL_PCM_FORMAT.content_type,
1874 sample_rate=output_sample_rate,
1875 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1876 channels=2,
1877 )
1878
1879 async def _select_pcm_format(
1880 self,
1881 player: Player,
1882 streamdetails: StreamDetails,
1883 smartfades_enabled: bool,
1884 ) -> AudioFormat:
1885 """Parse (player specific) stream internal PCM format."""
1886 supported_rates_conf = cast(
1887 "list[tuple[str, str]]",
1888 await self.mass.config.get_player_config_value(
1889 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1890 ),
1891 )
1892 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1893 # use highest supported rate within content rate
1894 output_sample_rate = max(
1895 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1896 default=48000, # sane/safe default
1897 )
1898 # work out pcm format based on streamdetails
1899 pcm_format = AudioFormat(
1900 sample_rate=output_sample_rate,
1901 # always use f32 internally for extra headroom for filters etc
1902 content_type=INTERNAL_PCM_FORMAT.content_type,
1903 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1904 channels=streamdetails.audio_format.channels,
1905 )
1906 if smartfades_enabled:
1907 pcm_format.channels = 2 # force stereo for crossfading
1908
1909 return pcm_format
1910
1911 def _crossfade_allowed(
1912 self,
1913 queue_item: QueueItem,
1914 smart_fades_mode: SmartFadesMode,
1915 flow_mode: bool = False,
1916 next_queue_item: QueueItem | None = None,
1917 sample_rate: int | None = None,
1918 next_sample_rate: int | None = None,
1919 ) -> bool:
1920 """Get the crossfade config for a queue item."""
1921 if smart_fades_mode == SmartFadesMode.DISABLED:
1922 return False
1923 if not (self.mass.players.get(queue_item.queue_id)):
1924 return False # just a guard
1925 if queue_item.media_type != MediaType.TRACK:
1926 self.logger.debug("Skipping crossfade: current item is not a track")
1927 return False
1928 # check if the next item is part of the same album
1929 next_item = next_queue_item or self.mass.player_queues.get_next_item(
1930 queue_item.queue_id, queue_item.queue_item_id
1931 )
1932 if not next_item:
1933 # there is no next item!
1934 return False
1935 # check if next item is a track
1936 if next_item.media_type != MediaType.TRACK:
1937 self.logger.debug("Skipping crossfade: next item is not a track")
1938 return False
1939 if (
1940 isinstance(queue_item.media_item, Track)
1941 and isinstance(next_item.media_item, Track)
1942 and queue_item.media_item.album
1943 and next_item.media_item.album
1944 and queue_item.media_item.album == next_item.media_item.album
1945 and not self.mass.config.get_raw_core_config_value(
1946 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
1947 )
1948 ):
1949 # in general, crossfade is not desired for tracks of the same (gapless) album
1950 # because we have no accurate way to determine if the album is gapless or not,
1951 # for now we just never crossfade between tracks of the same album
1952 self.logger.debug("Skipping crossfade: next item is part of the same album")
1953 return False
1954
1955 # check if we're allowed to crossfade on different sample rates
1956 if (
1957 not flow_mode
1958 and sample_rate
1959 and next_sample_rate
1960 and sample_rate != next_sample_rate
1961 and not self.mass.config.get_raw_player_config_value(
1962 queue_item.queue_id,
1963 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
1964 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
1965 )
1966 ):
1967 self.logger.debug(
1968 "Skipping crossfade: player does not support gapless playback "
1969 "with different sample rates (%s vs %s)",
1970 sample_rate,
1971 next_sample_rate,
1972 )
1973 return False
1974
1975 return True
1976
1977 async def _periodic_garbage_collection(self) -> None:
1978 """Periodic garbage collection to free up memory from audio buffers and streams."""
1979 self.logger.log(
1980 VERBOSE_LOG_LEVEL,
1981 "Running periodic garbage collection...",
1982 )
1983 # Run garbage collection in executor to avoid blocking the event loop
1984 # Since this runs periodically (not in response to subprocess cleanup),
1985 # it's safe to run in a thread without causing thread-safety issues
1986 loop = asyncio.get_running_loop()
1987 collected = await loop.run_in_executor(None, gc.collect)
1988 self.logger.log(
1989 VERBOSE_LOG_LEVEL,
1990 "Garbage collection completed, collected %d objects",
1991 collected,
1992 )
1993 # Schedule next run in 15 minutes
1994 self.mass.call_later(900, self._periodic_garbage_collection)
1995
1996 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
1997 """Set up smart fades logger level."""
1998 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
1999 if log_level == "GLOBAL":
2000 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2001 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2002 else:
2003 self.smart_fades_analyzer.logger.setLevel(log_level)
2004 self.smart_fades_mixer.logger.setLevel(log_level)
2005