/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_HTTP_PROFILE,
49 CONF_OUTPUT_CHANNELS,
50 CONF_OUTPUT_CODEC,
51 CONF_PUBLISH_IP,
52 CONF_SAMPLE_RATES,
53 CONF_SMART_FADES_MODE,
54 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56 CONF_VOLUME_NORMALIZATION_RADIO,
57 CONF_VOLUME_NORMALIZATION_TRACKS,
58 DEFAULT_STREAM_HEADERS,
59 ICY_HEADERS,
60 INTERNAL_PCM_FORMAT,
61 SILENCE_FILE,
62 VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.helpers import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70 get_buffered_media_stream,
71 get_chunksize,
72 get_media_stream,
73 get_player_filter_params,
74 get_stream_details,
75 resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81 divide_chunks,
82 format_ip_for_url,
83 get_ip_addresses,
84 get_total_system_memory,
85)
86from music_assistant.helpers.webserver import Webserver
87from music_assistant.models.core_controller import CoreController
88from music_assistant.models.music_provider import MusicProvider
89from music_assistant.models.plugin import PluginProvider, PluginSource
90from music_assistant.models.smart_fades import SmartFadesMode
91from music_assistant.providers.universal_group.constants import UGP_PREFIX
92from music_assistant.providers.universal_group.player import UniversalGroupPlayer
93
94if TYPE_CHECKING:
95 from music_assistant_models.config_entries import CoreConfig
96 from music_assistant_models.player import PlayerMedia
97 from music_assistant_models.player_queue import PlayerQueue
98 from music_assistant_models.queue_item import QueueItem
99 from music_assistant_models.streamdetails import StreamDetails
100
101 from music_assistant.mass import MusicAssistant
102 from music_assistant.models.player import Player
103
104
105isfile = wrap(os.path.isfile)
106
107CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
108CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
109CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
110
111# Calculate total system memory once at module load time
112TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
113CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
114DEFAULT_PORT: Final[int] = 8097
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses(include_ipv6=True)
189 return (
190 ConfigEntry(
191 key=CONF_ALLOW_BUFFER,
192 type=ConfigEntryType.BOOLEAN,
193 default_value=CONF_ALLOW_BUFFER_DEFAULT,
194 label="Allow (in-memory) buffering of (track) audio",
195 description="By default, Music Assistant tries to be as resource "
196 "efficient as possible when streaming audio, especially considering "
197 "low-end devices such as Raspberry Pi's. This means that audio "
198 "buffering is disabled by default to reduce memory usage. \n\n"
199 "Enabling this option allows for in-memory buffering of audio, "
200 "which (massively) improves playback (and seeking) performance but it comes "
201 "at the cost of increased memory usage. "
202 "If you run Music Assistant on a capable device with enough memory, "
203 "enabling this option is strongly recommended.",
204 required=False,
205 category="playback",
206 ),
207 ConfigEntry(
208 key=CONF_VOLUME_NORMALIZATION_RADIO,
209 type=ConfigEntryType.STRING,
210 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
211 label="Volume normalization method for radio streams",
212 options=[
213 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
214 for x in VolumeNormalizationMode
215 ],
216 category="playback",
217 ),
218 ConfigEntry(
219 key=CONF_VOLUME_NORMALIZATION_TRACKS,
220 type=ConfigEntryType.STRING,
221 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
222 label="Volume normalization method for tracks",
223 options=[
224 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
225 for x in VolumeNormalizationMode
226 ],
227 category="playback",
228 ),
229 ConfigEntry(
230 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
231 type=ConfigEntryType.FLOAT,
232 range=(-20, 10),
233 default_value=-6,
234 label="Fixed/fallback gain adjustment for radio streams",
235 category="playback",
236 ),
237 ConfigEntry(
238 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
239 type=ConfigEntryType.FLOAT,
240 range=(-20, 10),
241 default_value=-6,
242 label="Fixed/fallback gain adjustment for tracks",
243 category="playback",
244 ),
245 ConfigEntry(
246 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
247 type=ConfigEntryType.BOOLEAN,
248 default_value=False,
249 label="Allow crossfade between tracks from the same album",
250 description="Enabling this option allows for crossfading between tracks "
251 "that are part of the same album.",
252 category="playback",
253 ),
254 ConfigEntry(
255 key=CONF_PUBLISH_IP,
256 type=ConfigEntryType.STRING,
257 default_value=ip_addresses[0],
258 label="Published IP address",
259 description="This IP address is communicated to players where to find this server."
260 "\nMake sure that this IP can be reached by players on the local network, "
261 "otherwise audio streaming will not work.",
262 required=False,
263 category="generic",
264 advanced=True,
265 requires_reload=True,
266 ),
267 ConfigEntry(
268 key=CONF_BIND_PORT,
269 type=ConfigEntryType.INTEGER,
270 default_value=DEFAULT_PORT,
271 label="TCP Port",
272 description="The TCP port to run the server. "
273 "Make sure that this server can be reached "
274 "on the given IP and TCP port by players on the local network.",
275 category="generic",
276 advanced=True,
277 requires_reload=True,
278 ),
279 ConfigEntry(
280 key=CONF_BIND_IP,
281 type=ConfigEntryType.STRING,
282 default_value="0.0.0.0",
283 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", "::", *ip_addresses}],
284 label="Bind to IP/interface",
285 description="Start the stream server on this specific interface. \n"
286 "Use 0.0.0.0 or :: to bind to all interfaces, which is the default. \n"
287 "This is an advanced setting that should normally "
288 "not be adjusted in regular setups.",
289 category="generic",
290 advanced=True,
291 required=False,
292 requires_reload=True,
293 ),
294 ConfigEntry(
295 key=CONF_SMART_FADES_LOG_LEVEL,
296 type=ConfigEntryType.STRING,
297 label="Smart Fades Log level",
298 description="Log level for the Smart Fades mixer and analyzer.",
299 options=CONF_ENTRY_LOG_LEVEL.options,
300 default_value="GLOBAL",
301 category="generic",
302 advanced=True,
303 ),
304 )
305
306 async def setup(self, config: CoreConfig) -> None:
307 """Async initialize of module."""
308 # copy log level to audio/ffmpeg loggers
309 AUDIO_LOGGER.setLevel(self.logger.level)
310 FFMPEG_LOGGER.setLevel(self.logger.level)
311 self._setup_smart_fades_logger(config)
312 # perform check for ffmpeg version
313 await check_ffmpeg_version()
314 # start the webserver
315 self.publish_port = config.get_value(CONF_BIND_PORT, DEFAULT_PORT)
316 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
317 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
318 # print a big fat message in the log where the streamserver is running
319 # because this is a common source of issues for people with more complex setups
320 self.logger.log(
321 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
322 "\n\n################################################################################\n"
323 "Starting streamserver on %s:%s\n"
324 "This is the IP address that is communicated to players.\n"
325 "If this is incorrect, audio will not play!\n"
326 "See the documentation how to configure the publish IP for the Streamserver\n"
327 "in Settings --> Core modules --> Streamserver\n"
328 "################################################################################\n",
329 self.publish_ip,
330 self.publish_port,
331 )
332 await self._server.setup(
333 bind_ip=bind_ip,
334 bind_port=cast("int", self.publish_port),
335 base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
336 static_routes=[
337 (
338 "*",
339 "/flow/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
340 self.serve_queue_flow_stream,
341 ),
342 (
343 "*",
344 "/single/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
345 self.serve_queue_item_stream,
346 ),
347 (
348 "*",
349 "/command/{queue_id}/{command}.mp3",
350 self.serve_command_request,
351 ),
352 (
353 "*",
354 "/announcement/{player_id}.{fmt}",
355 self.serve_announcement_stream,
356 ),
357 (
358 "*",
359 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
360 self.serve_plugin_source_stream,
361 ),
362 ],
363 )
364 # Start periodic garbage collection task
365 # This ensures memory from audio buffers and streams is cleaned up regularly
366 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
367
368 async def close(self) -> None:
369 """Cleanup on exit."""
370 await self._server.close()
371
372 async def resolve_stream_url(
373 self,
374 player_id: str,
375 media: PlayerMedia,
376 ) -> str:
377 """Resolve the stream URL for the given PlayerMedia."""
378 conf_output_codec = await self.mass.config.get_player_config_value(
379 player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
380 )
381 output_codec = ContentType.try_parse(conf_output_codec or "flac")
382 fmt = output_codec.value
383 # handle raw pcm without exact format specifiers
384 if output_codec.is_pcm() and ";" not in fmt:
385 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
386 extra_data = media.custom_data or {}
387 flow_mode = extra_data.get("flow_mode", False)
388 session_id = extra_data.get("session_id")
389 queue_item_id = media.queue_item_id
390 if not session_id or not queue_item_id:
391 raise InvalidDataError("Can not resolve stream URL: Invalid PlayerMedia data")
392 queue_id = media.source_id
393 base_path = "flow" if flow_mode else "single"
394 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}" # noqa: E501
395
396 async def get_plugin_source_url(
397 self,
398 plugin_source: PluginSource,
399 player_id: str,
400 ) -> str:
401 """Get the url for the Plugin Source stream/proxy."""
402 if plugin_source.audio_format.content_type.is_pcm():
403 fmt = ContentType.WAV.value
404 else:
405 fmt = plugin_source.audio_format.content_type.value
406 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
407
408 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
409 """Stream single queueitem audio to a player."""
410 self._log_request(request)
411 queue_id = request.match_info["queue_id"]
412 player_id = request.match_info["player_id"]
413 if not (queue := self.mass.player_queues.get(queue_id)):
414 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
415 session_id = request.match_info["session_id"]
416 if queue.session_id and session_id != queue.session_id:
417 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
418 if not (player := self.mass.players.get_player(player_id)):
419 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
420 queue_item_id = request.match_info["queue_item_id"]
421 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
422 if not queue_item:
423 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
424 if not queue_item.streamdetails:
425 try:
426 queue_item.streamdetails = await get_stream_details(
427 mass=self.mass, queue_item=queue_item
428 )
429 except Exception as e:
430 self.logger.error(
431 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
432 )
433 queue_item.available = False
434 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
435
436 # pick output format based on the streamdetails and player capabilities
437 pcm_format = await self._select_pcm_format(
438 player=player,
439 streamdetails=queue_item.streamdetails,
440 smartfades_enabled=True,
441 )
442 output_format = await self.get_output_format(
443 output_format_str=request.match_info["fmt"],
444 player=player,
445 content_sample_rate=pcm_format.sample_rate,
446 content_bit_depth=pcm_format.bit_depth,
447 )
448
449 # prepare request, add some DLNA/UPNP compatible headers
450 # icy-name is sanitized to avoid a "Potential header injection attack" exception by aiohttp
451 # see https://github.com/music-assistant/support/issues/4913
452 headers = {
453 **DEFAULT_STREAM_HEADERS,
454 "icy-name": queue_item.name.replace("\n", " ").replace("\r", " ").replace("\t", " "),
455 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
456 "Accept-Ranges": "none",
457 "Content-Type": f"audio/{output_format.output_format_str}",
458 }
459 resp = web.StreamResponse(
460 status=200,
461 reason="OK",
462 headers=headers,
463 )
464 resp.content_type = f"audio/{output_format.output_format_str}"
465 http_profile = await self.mass.config.get_player_config_value(
466 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
467 )
468 if http_profile == "forced_content_length" and not queue_item.duration:
469 # just set an insane high content length to make sure the player keeps playing
470 resp.content_length = get_chunksize(output_format, 12 * 3600)
471 elif http_profile == "forced_content_length" and queue_item.duration:
472 # guess content length based on duration
473 resp.content_length = get_chunksize(output_format, queue_item.duration)
474 elif http_profile == "chunked":
475 resp.enable_chunked_encoding()
476
477 await resp.prepare(request)
478
479 # return early if this is not a GET request
480 if request.method != "GET":
481 return resp
482
483 if queue_item.media_type != MediaType.TRACK:
484 # no crossfade on non-tracks
485 smart_fades_mode = SmartFadesMode.DISABLED
486 else:
487 smart_fades_mode = await self.mass.config.get_player_config_value(
488 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
489 )
490 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
491 queue.queue_id, CONF_CROSSFADE_DURATION, 10
492 )
493 if (
494 smart_fades_mode != SmartFadesMode.DISABLED
495 and PlayerFeature.GAPLESS_PLAYBACK not in player.state.supported_features
496 ):
497 # crossfade is not supported on this player due to missing gapless playback
498 self.logger.warning(
499 "Crossfade disabled: Player %s does not support gapless playback, "
500 "consider enabling flow mode to enable crossfade on this player.",
501 player.state.name if player else "Unknown Player",
502 )
503 smart_fades_mode = SmartFadesMode.DISABLED
504
505 if smart_fades_mode != SmartFadesMode.DISABLED:
506 # crossfade is enabled, use special crossfaded single item stream
507 # where the crossfade of the next track is present in the stream of
508 # a single track. This only works if the player supports gapless playback!
509 audio_input = self.get_queue_item_stream_with_smartfade(
510 queue_item=queue_item,
511 pcm_format=pcm_format,
512 smart_fades_mode=smart_fades_mode,
513 standard_crossfade_duration=standard_crossfade_duration,
514 )
515 else:
516 # no crossfade, just a regular single item stream
517 audio_input = self.get_queue_item_stream(
518 queue_item=queue_item,
519 pcm_format=pcm_format,
520 seek_position=queue_item.streamdetails.seek_position,
521 )
522 # stream the audio
523 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
524 # the desired output format for the player including any player specific filter params
525 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
526 first_chunk_received = False
527 bytes_sent = 0
528 async for chunk in get_ffmpeg_stream(
529 audio_input=audio_input,
530 input_format=pcm_format,
531 output_format=output_format,
532 filter_params=get_player_filter_params(
533 self.mass,
534 player_id=player.player_id,
535 input_format=pcm_format,
536 output_format=output_format,
537 ),
538 ):
539 try:
540 await resp.write(chunk)
541 bytes_sent += len(chunk)
542 if not first_chunk_received:
543 first_chunk_received = True
544 # inform the queue that the track is now loaded in the buffer
545 # so for example the next track can be enqueued
546 self.mass.player_queues.track_loaded_in_buffer(
547 queue_item.queue_id, queue_item.queue_item_id
548 )
549 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
550 if first_chunk_received and not player.stop_called:
551 # Player disconnected (unexpected) after receiving at least some data
552 # This could indicate buffering issues, network problems,
553 # or player-specific issues
554 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
555 self.logger.warning(
556 "Player %s disconnected prematurely from stream for %s (%s) - "
557 "error: %s, sent %d bytes, expected (approx) bytes=%d",
558 queue.display_name,
559 queue_item.name,
560 queue_item.uri,
561 err.__class__.__name__,
562 bytes_sent,
563 bytes_expected,
564 )
565 break
566 if queue_item.streamdetails.stream_error:
567 self.logger.error(
568 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
569 queue_item.name,
570 queue_item.uri,
571 queue.display_name,
572 )
573 # try to skip to the next item in the queue after a short delay
574 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
575 return resp
576
577 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
578 """Stream Queue Flow audio to player."""
579 self._log_request(request)
580 queue_id = request.match_info["queue_id"]
581 player_id = request.match_info["player_id"]
582 if not (queue := self.mass.player_queues.get(queue_id)):
583 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
584 if not (player := self.mass.players.get_player(player_id)):
585 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
586 start_queue_item_id = request.match_info["queue_item_id"]
587 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
588 if not start_queue_item:
589 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
590
591 queue.flow_mode_stream_log = []
592
593 # select the highest possible PCM settings for this player
594 flow_pcm_format = await self._select_flow_format(player)
595
596 # work out output format/details
597 output_format = await self.get_output_format(
598 output_format_str=request.match_info["fmt"],
599 player=player,
600 content_sample_rate=flow_pcm_format.sample_rate,
601 content_bit_depth=flow_pcm_format.bit_depth,
602 )
603 # work out ICY metadata support
604 icy_preference = self.mass.config.get_raw_player_config_value(
605 player_id,
606 CONF_ENTRY_ENABLE_ICY_METADATA.key,
607 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
608 )
609 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
610 icy_meta_interval = 256000 if icy_preference == "full" else 16384
611
612 # prepare request, add some DLNA/UPNP compatible headers
613 headers = {
614 **DEFAULT_STREAM_HEADERS,
615 **ICY_HEADERS,
616 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
617 "Accept-Ranges": "none",
618 "Content-Type": f"audio/{output_format.output_format_str}",
619 }
620 if enable_icy:
621 headers["icy-metaint"] = str(icy_meta_interval)
622
623 resp = web.StreamResponse(
624 status=200,
625 reason="OK",
626 headers=headers,
627 )
628 http_profile = await self.mass.config.get_player_config_value(
629 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
630 )
631 if http_profile == "forced_content_length":
632 # just set an insane high content length to make sure the player keeps playing
633 resp.content_length = get_chunksize(output_format, 12 * 3600)
634 elif http_profile == "chunked":
635 resp.enable_chunked_encoding()
636
637 await resp.prepare(request)
638
639 # return early if this is not a GET request
640 if request.method != "GET":
641 return resp
642
643 # all checks passed, start streaming!
644 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
645 # the desired output format for the player including any player specific filter params
646 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
647 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
648
649 async for chunk in get_ffmpeg_stream(
650 audio_input=self.get_queue_flow_stream(
651 queue=queue,
652 start_queue_item=start_queue_item,
653 pcm_format=flow_pcm_format,
654 ),
655 input_format=flow_pcm_format,
656 output_format=output_format,
657 filter_params=get_player_filter_params(
658 self.mass, player.player_id, flow_pcm_format, output_format
659 ),
660 # we need to slowly feed the music to avoid the player stopping and later
661 # restarting (or completely failing) the audio stream by keeping the buffer short.
662 # this is reported to be an issue especially with Chromecast players.
663 # see for example: https://github.com/music-assistant/support/issues/3717
664 # allow buffer ahead of 6 seconds and read rest in realtime
665 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
666 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
667 ):
668 try:
669 await resp.write(chunk)
670 except (BrokenPipeError, ConnectionResetError, ConnectionError):
671 # race condition
672 break
673
674 if not enable_icy:
675 continue
676
677 # if icy metadata is enabled, send the icy metadata after the chunk
678 if (
679 # use current item here and not buffered item, otherwise
680 # the icy metadata will be too much ahead
681 (current_item := queue.current_item)
682 and current_item.streamdetails
683 and current_item.streamdetails.stream_title
684 ):
685 title = current_item.streamdetails.stream_title
686 elif queue and current_item and current_item.name:
687 title = current_item.name
688 else:
689 title = "Music Assistant"
690 metadata = f"StreamTitle='{title}';".encode()
691 if icy_preference == "full" and current_item and current_item.image:
692 metadata += f"StreamURL='{current_item.image.path}'".encode()
693 while len(metadata) % 16 != 0:
694 metadata += b"\x00"
695 length = len(metadata)
696 length_b = chr(int(length / 16)).encode()
697 await resp.write(length_b + metadata)
698
699 return resp
700
701 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
702 """Handle special 'command' request for a player."""
703 self._log_request(request)
704 queue_id = request.match_info["queue_id"]
705 command = request.match_info["command"]
706 if command == "next":
707 self.mass.create_task(self.mass.player_queues.next(queue_id))
708 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
709
710 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
711 """Stream announcement audio to a player."""
712 self._log_request(request)
713 player_id = request.match_info["player_id"]
714 player = self.mass.player_queues.get(player_id)
715 if not player:
716 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
717 if not (announce_data := self.announcements.get(player_id)):
718 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
719
720 # work out output format/details
721 fmt = request.match_info["fmt"]
722 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
723
724 http_profile = await self.mass.config.get_player_config_value(
725 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
726 )
727 if http_profile == "forced_content_length":
728 # given the fact that an announcement is just a short audio clip,
729 # just send it over completely at once so we have a fixed content length
730 data = b""
731 async for chunk in self.get_announcement_stream(
732 announcement_url=announce_data["announcement_url"],
733 output_format=audio_format,
734 pre_announce=announce_data["pre_announce"],
735 pre_announce_url=announce_data["pre_announce_url"],
736 ):
737 data += chunk
738 return web.Response(
739 body=data,
740 content_type=f"audio/{audio_format.output_format_str}",
741 headers=DEFAULT_STREAM_HEADERS,
742 )
743
744 resp = web.StreamResponse(
745 status=200,
746 reason="OK",
747 headers=DEFAULT_STREAM_HEADERS,
748 )
749 resp.content_type = f"audio/{audio_format.output_format_str}"
750 if http_profile == "chunked":
751 resp.enable_chunked_encoding()
752
753 await resp.prepare(request)
754
755 # return early if this is not a GET request
756 if request.method != "GET":
757 return resp
758
759 # all checks passed, start streaming!
760 self.logger.debug(
761 "Start serving audio stream for Announcement %s to %s",
762 announce_data["announcement_url"],
763 player.state.name,
764 )
765 async for chunk in self.get_announcement_stream(
766 announcement_url=announce_data["announcement_url"],
767 output_format=audio_format,
768 pre_announce=announce_data["pre_announce"],
769 pre_announce_url=announce_data["pre_announce_url"],
770 ):
771 try:
772 await resp.write(chunk)
773 except (BrokenPipeError, ConnectionResetError):
774 break
775
776 self.logger.debug(
777 "Finished serving audio stream for Announcement %s to %s",
778 announce_data["announcement_url"],
779 player.state.name,
780 )
781
782 return resp
783
784 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
785 """Stream PluginSource audio to a player."""
786 self._log_request(request)
787 plugin_source_id = request.match_info["plugin_source"]
788 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
789 if not provider:
790 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
791 # work out output format/details
792 player_id = request.match_info["player_id"]
793 player = self.mass.players.get_player(player_id)
794 if not player:
795 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
796 plugin_source = provider.get_source()
797 output_format = await self.get_output_format(
798 output_format_str=request.match_info["fmt"],
799 player=player,
800 content_sample_rate=plugin_source.audio_format.sample_rate,
801 content_bit_depth=plugin_source.audio_format.bit_depth,
802 )
803 headers = {
804 **DEFAULT_STREAM_HEADERS,
805 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
806 "icy-name": plugin_source.name,
807 "Accept-Ranges": "none",
808 "Content-Type": f"audio/{output_format.output_format_str}",
809 }
810
811 resp = web.StreamResponse(
812 status=200,
813 reason="OK",
814 headers=headers,
815 )
816 resp.content_type = f"audio/{output_format.output_format_str}"
817 http_profile = await self.mass.config.get_player_config_value(
818 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
819 )
820 if http_profile == "forced_content_length":
821 # just set an insanely high content length to make sure the player keeps playing
822 resp.content_length = get_chunksize(output_format, 12 * 3600)
823 elif http_profile == "chunked":
824 resp.enable_chunked_encoding()
825
826 await resp.prepare(request)
827
828 # return early if this is not a GET request
829 if request.method != "GET":
830 return resp
831
832 # all checks passed, start streaming!
833 if not plugin_source.audio_format:
834 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
835 async for chunk in self.get_plugin_source_stream(
836 plugin_source_id=plugin_source_id,
837 output_format=output_format,
838 player_id=player_id,
839 player_filter_params=get_player_filter_params(
840 self.mass, player_id, plugin_source.audio_format, output_format
841 ),
842 ):
843 try:
844 await resp.write(chunk)
845 except (BrokenPipeError, ConnectionResetError, ConnectionError):
846 break
847 return resp
848
849 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
850 """Get the url for the special command stream."""
851 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
852
853 def get_announcement_url(
854 self,
855 player_id: str,
856 announce_data: AnnounceData,
857 content_type: ContentType = ContentType.MP3,
858 ) -> str:
859 """Get the url for the special announcement stream."""
860 self.announcements[player_id] = announce_data
861 # use stream server to host announcement on local network
862 # this ensures playback on all players, including ones that do not
863 # like https hosts and it also offers the pre-announce 'bell'
864 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
865
866 def get_stream(
867 self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
868 ) -> AsyncGenerator[bytes, None]:
869 """
870 Get a stream of the given media as raw PCM audio.
871
872 This is used as helper for player providers that can consume the raw PCM
873 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
874 """
875 # select audio source
876 if media.media_type == MediaType.ANNOUNCEMENT:
877 # special case: stream announcement
878 assert media.custom_data
879 audio_source = self.get_announcement_stream(
880 media.custom_data["announcement_url"],
881 output_format=pcm_format,
882 pre_announce=media.custom_data["pre_announce"],
883 pre_announce_url=media.custom_data["pre_announce_url"],
884 )
885 elif media.media_type == MediaType.PLUGIN_SOURCE:
886 # special case: plugin source stream
887 assert media.custom_data
888 audio_source = self.get_plugin_source_stream(
889 plugin_source_id=media.custom_data["source_id"],
890 output_format=pcm_format,
891 # need to pass player_id from the PlayerMedia object
892 # because this could have been a group
893 player_id=media.custom_data["player_id"],
894 )
895 elif (
896 media.media_type == MediaType.FLOW_STREAM
897 and media.source_id
898 and media.source_id.startswith(UGP_PREFIX)
899 and media.uri
900 and "/ugp/" in media.uri
901 ):
902 # special case: member player accessing UGP stream
903 # Check URI to distinguish from the UGP accessing its own stream
904 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get_player(media.source_id))
905 ugp_stream = ugp_player.stream
906 assert ugp_stream is not None # for type checker
907 if ugp_stream.base_pcm_format == pcm_format:
908 # no conversion needed
909 audio_source = ugp_stream.subscribe_raw()
910 else:
911 audio_source = ugp_stream.get_stream(output_format=pcm_format)
912 elif (
913 media.source_id
914 and media.queue_item_id
915 and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
916 ):
917 # regular queue (flow) stream request
918 queue = self.mass.player_queues.get(media.source_id)
919 assert queue
920 start_queue_item = self.mass.player_queues.get_item(
921 media.source_id, media.queue_item_id
922 )
923 assert start_queue_item
924 audio_source = self.mass.streams.get_queue_flow_stream(
925 queue=queue,
926 start_queue_item=start_queue_item,
927 pcm_format=pcm_format,
928 )
929 elif media.source_id and media.queue_item_id:
930 # single item stream (e.g. radio)
931 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
932 assert queue_item
933 audio_source = buffered(
934 self.get_queue_item_stream(
935 queue_item=queue_item,
936 pcm_format=pcm_format,
937 ),
938 buffer_size=10,
939 min_buffer_before_yield=2,
940 )
941 else:
942 # assume url or some other direct path
943 # NOTE: this will fail if its an uri not playable by ffmpeg
944 audio_source = get_ffmpeg_stream(
945 audio_input=media.uri,
946 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
947 output_format=pcm_format,
948 )
949 return audio_source
950
951 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
952 async def get_queue_flow_stream(
953 self,
954 queue: PlayerQueue,
955 start_queue_item: QueueItem,
956 pcm_format: AudioFormat,
957 ) -> AsyncGenerator[bytes, None]:
958 """
959 Get a flow stream of all tracks in the queue as raw PCM audio.
960
961 yields chunks of exactly 1 second of audio in the given pcm_format.
962 """
963 # ruff: noqa: PLR0915
964 assert pcm_format.content_type.is_pcm()
965 queue_track = None
966 last_fadeout_part: bytes = b""
967 last_streamdetails: StreamDetails | None = None
968 last_play_log_entry: PlayLogEntry | None = None
969 queue.flow_mode = True
970 if not start_queue_item:
971 # this can happen in some (edge case) race conditions
972 return
973 pcm_sample_size = pcm_format.pcm_sample_size
974 if start_queue_item.media_type != MediaType.TRACK:
975 # no crossfade on non-tracks
976 smart_fades_mode = SmartFadesMode.DISABLED
977 standard_crossfade_duration = 0
978 else:
979 smart_fades_mode = await self.mass.config.get_player_config_value(
980 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
981 )
982 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
983 queue.queue_id, CONF_CROSSFADE_DURATION, 10
984 )
985 self.logger.info(
986 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
987 queue.display_name,
988 smart_fades_mode,
989 f"({standard_crossfade_duration}s)"
990 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
991 else "",
992 )
993 total_bytes_sent = 0
994 total_chunks_received = 0
995
996 while True:
997 # get (next) queue item to stream
998 if queue_track is None:
999 queue_track = start_queue_item
1000 else:
1001 try:
1002 queue_track = await self.mass.player_queues.load_next_queue_item(
1003 queue.queue_id, queue_track.queue_item_id
1004 )
1005 except QueueEmpty:
1006 break
1007
1008 if queue_track.streamdetails is None:
1009 raise InvalidDataError(
1010 "No Streamdetails known for queue item %s",
1011 queue_track.queue_item_id,
1012 )
1013
1014 self.logger.debug(
1015 "Start Streaming queue track: %s (%s) for queue %s",
1016 queue_track.streamdetails.uri,
1017 queue_track.name,
1018 queue.display_name,
1019 )
1020 # append to play log so the queue controller can work out which track is playing
1021 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1022 queue.flow_mode_stream_log.append(play_log_entry)
1023 # calculate crossfade buffer size
1024 crossfade_buffer_duration = (
1025 SMART_CROSSFADE_DURATION
1026 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1027 else standard_crossfade_duration
1028 )
1029 crossfade_buffer_duration = min(
1030 crossfade_buffer_duration,
1031 int(queue_track.streamdetails.duration / 2)
1032 if queue_track.streamdetails.duration
1033 else crossfade_buffer_duration,
1034 )
1035 # Ensure crossfade buffer size is aligned to frame boundaries
1036 # Frame size = bytes_per_sample * channels
1037 bytes_per_sample = pcm_format.bit_depth // 8
1038 frame_size = bytes_per_sample * pcm_format.channels
1039 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1040 # Round down to nearest frame boundary
1041 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1042
1043 bytes_written = 0
1044 buffer = b""
1045 # handle incoming audio chunks
1046 first_chunk_received = False
1047 # buffer size needs to be big enough to include the crossfade part
1048
1049 async for chunk in self.get_queue_item_stream(
1050 queue_track,
1051 pcm_format=pcm_format,
1052 seek_position=queue_track.streamdetails.seek_position,
1053 raise_on_error=False,
1054 ):
1055 total_chunks_received += 1
1056 if not first_chunk_received:
1057 first_chunk_received = True
1058 # inform the queue that the track is now loaded in the buffer
1059 # so the next track can be preloaded
1060 self.mass.player_queues.track_loaded_in_buffer(
1061 queue.queue_id, queue_track.queue_item_id
1062 )
1063 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1064 # we want a stream to start as quickly as possible
1065 # so for the first 10 chunks we keep a very short buffer
1066 req_buffer_size = pcm_format.pcm_sample_size
1067 else:
1068 req_buffer_size = (
1069 pcm_sample_size
1070 if smart_fades_mode == SmartFadesMode.DISABLED
1071 else crossfade_buffer_size
1072 )
1073
1074 # ALWAYS APPEND CHUNK TO BUFFER
1075 buffer += chunk
1076 del chunk
1077 if len(buffer) < req_buffer_size:
1078 # buffer is not full enough, move on
1079 # yield control to event loop with 10ms delay
1080 await asyncio.sleep(0.01)
1081 continue
1082
1083 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1084 if last_fadeout_part and last_streamdetails:
1085 # perform crossfade
1086 fadein_part = buffer[:crossfade_buffer_size]
1087 remaining_bytes = buffer[crossfade_buffer_size:]
1088 # Use the mixer to handle all crossfade logic
1089 crossfade_part = await self._smart_fades_mixer.mix(
1090 fade_in_part=fadein_part,
1091 fade_out_part=last_fadeout_part,
1092 fade_in_streamdetails=queue_track.streamdetails,
1093 fade_out_streamdetails=last_streamdetails,
1094 pcm_format=pcm_format,
1095 standard_crossfade_duration=standard_crossfade_duration,
1096 mode=smart_fades_mode,
1097 )
1098 # because the crossfade exists of both the fadein and fadeout part
1099 # we need to correct the bytes_written accordingly so the duration
1100 # calculations at the end of the track are correct
1101 crossfade_part_len = len(crossfade_part)
1102 bytes_written += int(crossfade_part_len / 2)
1103 if last_play_log_entry:
1104 assert last_play_log_entry.seconds_streamed is not None
1105 last_play_log_entry.seconds_streamed += (
1106 crossfade_part_len / 2 / pcm_sample_size
1107 )
1108 # yield crossfade_part (in pcm_sample_size chunks)
1109 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1110 yield _chunk
1111 del _chunk
1112 del crossfade_part
1113 # also write the leftover bytes from the crossfade action
1114 if remaining_bytes:
1115 yield remaining_bytes
1116 bytes_written += len(remaining_bytes)
1117 del remaining_bytes
1118 # clear vars
1119 last_fadeout_part = b""
1120 last_streamdetails = None
1121 buffer = b""
1122
1123 #### OTHER: enough data in buffer, feed to output
1124 while len(buffer) > req_buffer_size:
1125 yield buffer[:pcm_sample_size]
1126 bytes_written += pcm_sample_size
1127 buffer = buffer[pcm_sample_size:]
1128
1129 #### HANDLE END OF TRACK
1130 if last_fadeout_part:
1131 # edge case: we did not get enough data to make the crossfade
1132 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1133 yield _chunk
1134 del _chunk
1135 bytes_written += len(last_fadeout_part)
1136 last_fadeout_part = b""
1137 crossfade_allowed = self._crossfade_allowed(
1138 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1139 )
1140 if crossfade_allowed:
1141 # if crossfade is enabled, save fadeout part to pickup for next track
1142 last_fadeout_part = buffer[-crossfade_buffer_size:]
1143 last_streamdetails = queue_track.streamdetails
1144 last_play_log_entry = play_log_entry
1145 remaining_bytes = buffer[:-crossfade_buffer_size]
1146 if remaining_bytes:
1147 yield remaining_bytes
1148 bytes_written += len(remaining_bytes)
1149 del remaining_bytes
1150 elif smart_fades_mode != SmartFadesMode.DISABLED:
1151 self.logger.debug(
1152 "Flow mode: crossfade NOT allowed for track %s on queue %s"
1153 " - fadeout part not saved",
1154 queue_track.name,
1155 queue.display_name,
1156 )
1157 if not crossfade_allowed and buffer:
1158 # no crossfade enabled, just yield the buffer last part
1159 bytes_written += len(buffer)
1160 for _chunk in divide_chunks(buffer, pcm_sample_size):
1161 yield _chunk
1162 del _chunk
1163 # make sure the buffer gets cleaned up
1164 del buffer
1165
1166 # update duration details based on the actual pcm data we sent
1167 # this also accounts for crossfade and silence stripping
1168 seconds_streamed = bytes_written / pcm_sample_size
1169 queue_track.streamdetails.seconds_streamed = seconds_streamed
1170 queue_track.streamdetails.duration = int(
1171 queue_track.streamdetails.seek_position + seconds_streamed
1172 )
1173 play_log_entry.seconds_streamed = seconds_streamed
1174 play_log_entry.duration = queue_track.streamdetails.duration
1175 total_bytes_sent += bytes_written
1176 self.logger.debug(
1177 "Finished Streaming queue track: %s (%s) on queue %s",
1178 queue_track.streamdetails.uri,
1179 queue_track.name,
1180 queue.display_name,
1181 )
1182 #### HANDLE END OF QUEUE FLOW STREAM
1183 # end of queue flow: make sure we yield the last_fadeout_part
1184 if last_fadeout_part:
1185 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1186 yield _chunk
1187 del _chunk
1188 # correct seconds streamed/duration
1189 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1190 streamdetails = queue_track.streamdetails
1191 assert streamdetails is not None
1192 streamdetails.seconds_streamed = (
1193 streamdetails.seconds_streamed or 0
1194 ) + last_part_seconds
1195 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1196 last_fadeout_part = b""
1197 total_bytes_sent += bytes_written
1198 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1199
1200 async def get_announcement_stream(
1201 self,
1202 announcement_url: str,
1203 output_format: AudioFormat,
1204 pre_announce: bool | str = False,
1205 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1206 ) -> AsyncGenerator[bytes, None]:
1207 """Get the special announcement stream."""
1208 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1209 # we are doing announcement in PCM first to avoid multiple encodings
1210 # when mixing pre-announce and announcement
1211 # also we have to deal with some TTS sources being super slow in delivering audio
1212 # so we take an approach where we start fetching the announcement in the background
1213 # while we can already start playing the pre-announce sound (if any)
1214
1215 pcm_format = (
1216 output_format
1217 if output_format.content_type.is_pcm()
1218 else AudioFormat(
1219 sample_rate=output_format.sample_rate,
1220 content_type=ContentType.PCM_S16LE,
1221 bit_depth=16,
1222 channels=output_format.channels,
1223 )
1224 )
1225
1226 async def fetch_announcement() -> None:
1227 fmt = announcement_url.rsplit(".")[-1]
1228 async for chunk in get_ffmpeg_stream(
1229 audio_input=announcement_url,
1230 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1231 output_format=pcm_format,
1232 chunk_size=get_chunksize(pcm_format, 1),
1233 ):
1234 await announcement_data.put(chunk)
1235 await announcement_data.put(None) # signal end of stream
1236
1237 self.mass.create_task(fetch_announcement())
1238
1239 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1240 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1241 if pre_announce:
1242 async for chunk in get_ffmpeg_stream(
1243 audio_input=pre_announce_url,
1244 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1245 output_format=pcm_format,
1246 chunk_size=get_chunksize(pcm_format, 1),
1247 ):
1248 yield chunk
1249 # pad silence while we're waiting for the announcement to be ready
1250 while announcement_data.empty():
1251 yield b"\0" * int(
1252 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1253 )
1254 await asyncio.sleep(0.1)
1255 # stream announcement
1256 while True:
1257 announcement_chunk = await announcement_data.get()
1258 if announcement_chunk is None:
1259 break
1260 yield announcement_chunk
1261
1262 if output_format == pcm_format:
1263 # no need to re-encode, just yield the raw PCM stream
1264 async for chunk in _announcement_stream():
1265 yield chunk
1266 return
1267
1268 # stream final announcement in requested output format
1269 async for chunk in get_ffmpeg_stream(
1270 audio_input=_announcement_stream(),
1271 input_format=pcm_format,
1272 output_format=output_format,
1273 ):
1274 yield chunk
1275
1276 async def get_plugin_source_stream(
1277 self,
1278 plugin_source_id: str,
1279 output_format: AudioFormat,
1280 player_id: str,
1281 player_filter_params: list[str] | None = None,
1282 ) -> AsyncGenerator[bytes, None]:
1283 """Get the special plugin source stream."""
1284 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1285 if not plugin_prov:
1286 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1287
1288 plugin_source = plugin_prov.get_source()
1289 self.logger.debug(
1290 "Start streaming PluginSource %s to %s using output format %s",
1291 plugin_source_id,
1292 player_id,
1293 output_format,
1294 )
1295 # this should already be set by the player controller, but just to be sure
1296 plugin_source.in_use_by = player_id
1297
1298 try:
1299 async for chunk in get_ffmpeg_stream(
1300 audio_input=cast(
1301 "str | AsyncGenerator[bytes, None]",
1302 plugin_prov.get_audio_stream(player_id)
1303 if plugin_source.stream_type == StreamType.CUSTOM
1304 else plugin_source.path,
1305 ),
1306 input_format=plugin_source.audio_format,
1307 output_format=output_format,
1308 filter_params=player_filter_params,
1309 extra_input_args=["-y", "-re"],
1310 ):
1311 if plugin_source.in_use_by != player_id:
1312 # another player took over or the stream ended, stop streaming
1313 break
1314 yield chunk
1315 finally:
1316 self.logger.debug(
1317 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1318 )
1319 await asyncio.sleep(1) # prevent race conditions when selecting source
1320 if plugin_source.in_use_by == player_id:
1321 # release control
1322 plugin_source.in_use_by = None
1323
1324 async def get_queue_item_stream(
1325 self,
1326 queue_item: QueueItem,
1327 pcm_format: AudioFormat,
1328 seek_position: int = 0,
1329 raise_on_error: bool = True,
1330 ) -> AsyncGenerator[bytes, None]:
1331 """Get the (PCM) audio stream for a single queue item."""
1332 # collect all arguments for ffmpeg
1333 streamdetails = queue_item.streamdetails
1334 assert streamdetails
1335 filter_params: list[str] = []
1336
1337 # handle volume normalization
1338 gain_correct: float | None = None
1339 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1340 # volume normalization using loudnorm filter (in dynamic mode)
1341 # which also collects the measurement on the fly during playback
1342 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1343 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1344 filter_rule += ":print_format=json"
1345 filter_params.append(filter_rule)
1346 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1347 # apply user defined fixed volume/gain correction
1348 config_key = (
1349 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1350 if streamdetails.media_type == MediaType.TRACK
1351 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1352 )
1353 gain_value = await self.mass.config.get_core_config_value(
1354 self.domain, config_key, default=0.0, return_type=float
1355 )
1356 gain_correct = round(gain_value, 2)
1357 filter_params.append(f"volume={gain_correct}dB")
1358 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1359 # volume normalization with known loudness measurement
1360 # apply volume/gain correction
1361 target_loudness = (
1362 float(streamdetails.target_loudness)
1363 if streamdetails.target_loudness is not None
1364 else 0.0
1365 )
1366 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1367 gain_correct = target_loudness - float(streamdetails.loudness_album)
1368 elif streamdetails.loudness is not None:
1369 gain_correct = target_loudness - float(streamdetails.loudness)
1370 else:
1371 gain_correct = 0.0
1372 gain_correct = round(gain_correct, 2)
1373 filter_params.append(f"volume={gain_correct}dB")
1374 streamdetails.volume_normalization_gain_correct = gain_correct
1375
1376 allow_buffer = bool(
1377 self.mass.config.get_raw_core_config_value(
1378 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1379 )
1380 and streamdetails.duration
1381 )
1382
1383 self.logger.debug(
1384 "Starting queue item stream for %s (%s)"
1385 " - using buffer: %s"
1386 " - using fade-in: %s"
1387 " - using volume normalization: %s",
1388 queue_item.name,
1389 streamdetails.uri,
1390 allow_buffer,
1391 streamdetails.fade_in,
1392 streamdetails.volume_normalization_mode,
1393 )
1394 if allow_buffer:
1395 media_stream_gen = get_buffered_media_stream(
1396 self.mass,
1397 streamdetails=streamdetails,
1398 pcm_format=pcm_format,
1399 seek_position=int(seek_position),
1400 filter_params=filter_params,
1401 )
1402 else:
1403 media_stream_gen = get_media_stream(
1404 self.mass,
1405 streamdetails=streamdetails,
1406 pcm_format=pcm_format,
1407 seek_position=int(seek_position),
1408 filter_params=filter_params,
1409 )
1410
1411 first_chunk_received = False
1412 fade_in_buffer = b""
1413 bytes_received = 0
1414 finished = False
1415 stream_started_at = asyncio.get_event_loop().time()
1416 try:
1417 async for chunk in media_stream_gen:
1418 bytes_received += len(chunk)
1419 if not first_chunk_received:
1420 first_chunk_received = True
1421 self.logger.debug(
1422 "First audio chunk received for %s (%s) after %.2f seconds",
1423 queue_item.name,
1424 streamdetails.uri,
1425 asyncio.get_event_loop().time() - stream_started_at,
1426 )
1427 # handle optional fade-in
1428 if streamdetails.fade_in:
1429 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1430 fade_in_buffer += chunk
1431 elif fade_in_buffer:
1432 async for fade_chunk in get_ffmpeg_stream(
1433 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1434 # but FFMpeg class actually accepts bytes too. This works at
1435 # runtime but needs type: ignore for mypy.
1436 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1437 input_format=pcm_format,
1438 output_format=pcm_format,
1439 filter_params=["afade=type=in:start_time=0:duration=3"],
1440 ):
1441 yield fade_chunk
1442 fade_in_buffer = b""
1443 streamdetails.fade_in = False
1444 else:
1445 yield chunk
1446 # help garbage collection by explicitly deleting chunk
1447 del chunk
1448 finished = True
1449 except AudioError as err:
1450 streamdetails.stream_error = True
1451 queue_item.available = False
1452 if raise_on_error:
1453 raise
1454 # yes, we swallow the error here after logging it
1455 # so the outer stream can handle it gracefully
1456 self.logger.error(
1457 "AudioError while streaming queue item %s (%s): %s",
1458 queue_item.name,
1459 streamdetails.uri,
1460 err,
1461 )
1462 finally:
1463 # determine how many seconds we've streamed
1464 # for pcm output we can calculate this easily
1465 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1466 streamdetails.seconds_streamed = seconds_streamed
1467 self.logger.debug(
1468 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1469 "aborted" if not finished else "finished",
1470 streamdetails.uri,
1471 asyncio.get_event_loop().time() - stream_started_at,
1472 seconds_streamed,
1473 )
1474 # report stream to provider
1475 if (finished or seconds_streamed >= 90) and (
1476 music_prov := self.mass.get_provider(streamdetails.provider)
1477 ):
1478 if TYPE_CHECKING: # avoid circular import
1479 assert isinstance(music_prov, MusicProvider)
1480 self.mass.create_task(music_prov.on_streamed(streamdetails))
1481
1482 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1483 async def get_queue_item_stream_with_smartfade(
1484 self,
1485 queue_item: QueueItem,
1486 pcm_format: AudioFormat,
1487 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1488 standard_crossfade_duration: int = 10,
1489 ) -> AsyncGenerator[bytes, None]:
1490 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1491 queue = self.mass.player_queues.get(queue_item.queue_id)
1492 if not queue:
1493 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1494
1495 streamdetails = queue_item.streamdetails
1496 assert streamdetails
1497 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1498 self.logger.debug(
1499 "Crossfade data pop for queue %s (track: %s): %s",
1500 queue.display_name,
1501 queue_item.name,
1502 "found" if crossfade_data else "EMPTY - no crossfade data from previous track",
1503 )
1504
1505 if crossfade_data and streamdetails.seek_position > 0:
1506 # don't do crossfade when seeking into track
1507 crossfade_data = None
1508 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1509 # edge case alert: the next item changed just while we were preloading/crossfading
1510 self.logger.warning(
1511 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1512 )
1513 crossfade_data = None
1514
1515 self.logger.debug(
1516 "Start Streaming queue track: %s (%s) for queue %s "
1517 "- crossfade mode: %s "
1518 "- crossfading from previous track: %s ",
1519 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1520 queue_item.name,
1521 queue.display_name,
1522 smart_fades_mode,
1523 "true" if crossfade_data else "false",
1524 )
1525
1526 buffer = b""
1527 bytes_written = 0
1528 # calculate crossfade buffer size
1529 crossfade_buffer_duration = (
1530 SMART_CROSSFADE_DURATION
1531 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1532 else standard_crossfade_duration
1533 )
1534 crossfade_buffer_duration = min(
1535 crossfade_buffer_duration,
1536 int(streamdetails.duration / 2)
1537 if streamdetails.duration
1538 else crossfade_buffer_duration,
1539 )
1540 # Ensure crossfade buffer size is aligned to frame boundaries
1541 # Frame size = bytes_per_sample * channels
1542 bytes_per_sample = pcm_format.bit_depth // 8
1543 frame_size = bytes_per_sample * pcm_format.channels
1544 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1545 # Round down to nearest frame boundary
1546 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1547 fade_out_data: bytes | None = None
1548
1549 if crossfade_data:
1550 # Calculate discard amount in seconds (format-independent)
1551 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1552 fade_in_duration_seconds = (
1553 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1554 )
1555 discard_seconds = int(fade_in_duration_seconds) - 1
1556 # Calculate discard amounts in CURRENT track's format
1557 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1558 # Convert fade_in_size to current track's format for correct leftover calculation
1559 fade_in_size_in_current_format = int(
1560 fade_in_duration_seconds * pcm_format.pcm_sample_size
1561 )
1562 discard_leftover = fade_in_size_in_current_format - discard_bytes
1563 else:
1564 discard_seconds = streamdetails.seek_position
1565 discard_leftover = 0
1566 total_chunks_received = 0
1567 req_buffer_size = crossfade_buffer_size
1568 async for chunk in self.get_queue_item_stream(
1569 queue_item, pcm_format, seek_position=discard_seconds
1570 ):
1571 total_chunks_received += 1
1572 if discard_leftover:
1573 # discard leftover bytes from crossfade data
1574 chunk = chunk[discard_leftover:] # noqa: PLW2901
1575 discard_leftover = 0
1576
1577 if total_chunks_received < 10:
1578 # we want a stream to start as quickly as possible
1579 # so for the first 10 chunks we keep a very short buffer
1580 req_buffer_size = pcm_format.pcm_sample_size
1581 else:
1582 req_buffer_size = crossfade_buffer_size
1583
1584 # ALWAYS APPEND CHUNK TO BUFFER
1585 buffer += chunk
1586 del chunk
1587 if len(buffer) < req_buffer_size:
1588 # buffer is not full enough, move on
1589 continue
1590
1591 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1592 if crossfade_data:
1593 # send the (second half of the) crossfade data
1594 if crossfade_data.pcm_format != pcm_format:
1595 # edge case: pcm format mismatch, we need to resample
1596 self.logger.debug(
1597 "Resampling crossfade data from %s to %s for queue %s",
1598 crossfade_data.pcm_format.sample_rate,
1599 pcm_format.sample_rate,
1600 queue.display_name,
1601 )
1602 resampled_data = await resample_pcm_audio(
1603 crossfade_data.data,
1604 crossfade_data.pcm_format,
1605 pcm_format,
1606 )
1607 if resampled_data:
1608 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1609 yield _chunk
1610 bytes_written += len(resampled_data)
1611 else:
1612 # Resampling failed, error already logged in resample_pcm_audio
1613 # Skip crossfade data entirely - stream continues without it
1614 self.logger.warning(
1615 "Skipping crossfade data for queue %s due to resampling failure",
1616 queue.display_name,
1617 )
1618 else:
1619 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1620 yield _chunk
1621 bytes_written += len(crossfade_data.data)
1622 # clear vars
1623 crossfade_data = None
1624
1625 #### OTHER: enough data in buffer, feed to output
1626 while len(buffer) > req_buffer_size:
1627 yield buffer[: pcm_format.pcm_sample_size]
1628 bytes_written += pcm_format.pcm_sample_size
1629 buffer = buffer[pcm_format.pcm_sample_size :]
1630
1631 #### HANDLE END OF TRACK
1632
1633 if crossfade_data:
1634 # edge case: we did not get enough data to send the crossfade data
1635 # send the (second half of the) crossfade data
1636 if crossfade_data.pcm_format != pcm_format:
1637 # (yet another) edge case: pcm format mismatch, we need to resample
1638 self.logger.debug(
1639 "Resampling remaining crossfade data from %s to %s for queue %s",
1640 crossfade_data.pcm_format.sample_rate,
1641 pcm_format.sample_rate,
1642 queue.display_name,
1643 )
1644 resampled_crossfade_data = await resample_pcm_audio(
1645 crossfade_data.data,
1646 crossfade_data.pcm_format,
1647 pcm_format,
1648 )
1649 if resampled_crossfade_data:
1650 crossfade_data.data = resampled_crossfade_data
1651 else:
1652 # Resampling failed, error already logged in resample_pcm_audio
1653 # Skip the crossfade data entirely
1654 self.logger.warning(
1655 "Skipping remaining crossfade data for queue %s due to resampling failure",
1656 queue.display_name,
1657 )
1658 crossfade_data = None
1659 if crossfade_data:
1660 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1661 yield _chunk
1662 bytes_written += len(crossfade_data.data)
1663 crossfade_data = None
1664
1665 # get next track for crossfade
1666 next_queue_item: QueueItem | None
1667 try:
1668 self.logger.debug(
1669 "Preloading NEXT track for crossfade for queue %s",
1670 queue.display_name,
1671 )
1672 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1673 queue.queue_id, queue_item.queue_item_id
1674 )
1675 # set index_in_buffer to prevent our next track is overwritten while preloading
1676 if next_queue_item.streamdetails is None:
1677 raise InvalidDataError(
1678 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1679 )
1680 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1681 queue.queue_id, next_queue_item.queue_item_id
1682 )
1683 queue_player = self.mass.players.get_player(queue.queue_id)
1684 assert queue_player is not None
1685 next_queue_item_pcm_format = await self._select_pcm_format(
1686 player=queue_player,
1687 streamdetails=next_queue_item.streamdetails,
1688 smartfades_enabled=True,
1689 )
1690 except QueueEmpty:
1691 # end of queue reached, no next item
1692 next_queue_item = None
1693
1694 crossfade_allowed = bool(next_queue_item) and self._crossfade_allowed(
1695 queue_item,
1696 smart_fades_mode=smart_fades_mode,
1697 flow_mode=False,
1698 next_queue_item=next_queue_item,
1699 sample_rate=pcm_format.sample_rate,
1700 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1701 )
1702 if not crossfade_allowed:
1703 if not next_queue_item:
1704 self.logger.debug(
1705 "Enqueue mode: no next queue item loaded (QueueEmpty) for queue %s",
1706 queue.display_name,
1707 )
1708 else:
1709 self.logger.debug(
1710 "Enqueue mode: crossfade NOT allowed for track %s -> %s on queue %s",
1711 queue_item.name,
1712 next_queue_item.name,
1713 queue.display_name,
1714 )
1715 # no crossfade enabled/allowed, just yield the buffer last part
1716 bytes_written += len(buffer)
1717 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1718 yield _chunk
1719 else:
1720 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1721 assert next_queue_item is not None
1722 fade_out_data = buffer
1723 buffer = b""
1724 try:
1725 async for chunk in self.get_queue_item_stream(
1726 next_queue_item, next_queue_item_pcm_format
1727 ):
1728 # append to buffer until we reach crossfade size
1729 # we only need the first X seconds of the NEXT track so we can
1730 # perform the crossfade.
1731 # the crossfaded audio of the previous and next track will be
1732 # sent in two equal parts: first half now, second half
1733 # when the next track starts. We use CrossfadeData to store
1734 # the second half to be picked up by the next track's stream generator.
1735 # Note that we more or less expect the user to have enabled the in-memory
1736 # buffer so we can keep the next track's audio data in memory.
1737 buffer += chunk
1738 del chunk
1739 if len(buffer) >= crossfade_buffer_size:
1740 break
1741 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1742 # Store original buffer size before any resampling for fade_in_size calculation
1743 # This size is in the next track's original format which is what we need
1744 original_buffer_size = len(buffer)
1745 if next_queue_item_pcm_format != pcm_format:
1746 # edge case: pcm format mismatch, we need to resample the next track's
1747 # beginning part before crossfading
1748 self.logger.debug(
1749 "Resampling next track's crossfade from %s to %s for queue %s",
1750 next_queue_item_pcm_format.sample_rate,
1751 pcm_format.sample_rate,
1752 queue.display_name,
1753 )
1754 buffer = await resample_pcm_audio(
1755 buffer,
1756 next_queue_item_pcm_format,
1757 pcm_format,
1758 )
1759 # perform actual (smart fades) crossfade using mixer
1760 crossfade_bytes = await self._smart_fades_mixer.mix(
1761 fade_in_part=buffer,
1762 fade_out_part=fade_out_data,
1763 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1764 fade_out_streamdetails=streamdetails,
1765 pcm_format=pcm_format,
1766 standard_crossfade_duration=standard_crossfade_duration,
1767 mode=smart_fades_mode,
1768 )
1769 # send half of the crossfade_part (= approx the fadeout part)
1770 split_point = (len(crossfade_bytes) + 1) // 2
1771 crossfade_first = crossfade_bytes[:split_point]
1772 crossfade_second = crossfade_bytes[split_point:]
1773 del crossfade_bytes
1774 bytes_written += len(crossfade_first)
1775 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1776 yield _chunk
1777 # store the other half for the next track
1778 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1779 # because it was created from the resampled buffer used for mixing.
1780 # BUT fade_in_size represents bytes in NEXT track's original format
1781 # (next_queue_item_pcm_format) because that's how much of the next track
1782 # was consumed during the crossfade. We need both formats to correctly
1783 # handle the crossfade data when the next track starts.
1784 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1785 data=crossfade_second,
1786 fade_in_size=original_buffer_size,
1787 pcm_format=pcm_format, # Format of the data (current track)
1788 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1789 queue_item_id=next_queue_item.queue_item_id,
1790 )
1791 self.logger.debug(
1792 "Crossfade data STORED for queue %s (outgoing track: %s, incoming track: %s)",
1793 queue.display_name,
1794 queue_item.name,
1795 next_queue_item.name if next_queue_item else "N/A",
1796 )
1797 except AudioError:
1798 # no crossfade possible, just yield the fade_out_data
1799 next_queue_item = None
1800 yield fade_out_data
1801 bytes_written += len(fade_out_data)
1802 del fade_out_data
1803 # make sure the buffer gets cleaned up
1804 del buffer
1805 # update duration details based on the actual pcm data we sent
1806 # this also accounts for crossfade and silence stripping
1807 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1808 streamdetails.seconds_streamed = seconds_streamed
1809 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1810 self.logger.debug(
1811 "Finished Streaming queue track: %s (%s) on queue %s "
1812 "- crossfade data prepared for next track: %s",
1813 streamdetails.uri,
1814 queue_item.name,
1815 queue.display_name,
1816 next_queue_item.name if next_queue_item else "N/A",
1817 )
1818
1819 def _log_request(self, request: web.Request) -> None:
1820 """Log request."""
1821 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1822 self.logger.log(
1823 VERBOSE_LOG_LEVEL,
1824 "Got %s request to %s from %s\nheaders: %s\n",
1825 request.method,
1826 request.path,
1827 request.remote,
1828 request.headers,
1829 )
1830 else:
1831 self.logger.debug(
1832 "Got %s request to %s from %s",
1833 request.method,
1834 request.path,
1835 request.remote,
1836 )
1837
1838 async def get_output_format(
1839 self,
1840 output_format_str: str,
1841 player: Player,
1842 content_sample_rate: int,
1843 content_bit_depth: int,
1844 ) -> AudioFormat:
1845 """Parse (player specific) output format details for given format string."""
1846 content_type: ContentType = ContentType.try_parse(output_format_str)
1847 supported_rates_conf = cast(
1848 "list[tuple[str, str]]",
1849 await self.mass.config.get_player_config_value(
1850 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1851 ),
1852 )
1853 output_channels_str = self.mass.config.get_raw_player_config_value(
1854 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1855 )
1856 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1857 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1858
1859 player_max_bit_depth = max(supported_bit_depths)
1860 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1861 if content_sample_rate in supported_sample_rates:
1862 output_sample_rate = content_sample_rate
1863 else:
1864 output_sample_rate = max(supported_sample_rates)
1865
1866 if not content_type.is_lossless():
1867 # no point in having a higher bit depth for lossy formats
1868 output_bit_depth = 16
1869 output_sample_rate = min(48000, output_sample_rate)
1870 if output_format_str == "pcm":
1871 content_type = ContentType.from_bit_depth(output_bit_depth)
1872 return AudioFormat(
1873 content_type=content_type,
1874 sample_rate=output_sample_rate,
1875 bit_depth=output_bit_depth,
1876 channels=1 if output_channels_str != "stereo" else 2,
1877 )
1878
1879 async def _select_flow_format(
1880 self,
1881 player: Player,
1882 ) -> AudioFormat:
1883 """Parse (player specific) flow stream PCM format."""
1884 supported_rates_conf = cast(
1885 "list[tuple[str, str]]",
1886 await self.mass.config.get_player_config_value(
1887 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1888 ),
1889 )
1890 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1891 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1892 for sample_rate in (192000, 96000, 48000, 44100):
1893 if sample_rate in supported_sample_rates:
1894 output_sample_rate = sample_rate
1895 break
1896 return AudioFormat(
1897 content_type=INTERNAL_PCM_FORMAT.content_type,
1898 sample_rate=output_sample_rate,
1899 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1900 channels=2,
1901 )
1902
1903 async def _select_pcm_format(
1904 self,
1905 player: Player,
1906 streamdetails: StreamDetails,
1907 smartfades_enabled: bool,
1908 ) -> AudioFormat:
1909 """Parse (player specific) stream internal PCM format."""
1910 supported_rates_conf = cast(
1911 "list[tuple[str, str]]",
1912 await self.mass.config.get_player_config_value(
1913 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1914 ),
1915 )
1916 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1917 # use highest supported rate within content rate
1918 output_sample_rate = max(
1919 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1920 default=48000, # sane/safe default
1921 )
1922 # work out pcm format based on streamdetails
1923 pcm_format = AudioFormat(
1924 sample_rate=output_sample_rate,
1925 # always use f32 internally for extra headroom for filters etc
1926 content_type=INTERNAL_PCM_FORMAT.content_type,
1927 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1928 channels=streamdetails.audio_format.channels,
1929 )
1930 if smartfades_enabled:
1931 pcm_format.channels = 2 # force stereo for crossfading
1932
1933 return pcm_format
1934
1935 def _crossfade_allowed(
1936 self,
1937 queue_item: QueueItem,
1938 smart_fades_mode: SmartFadesMode,
1939 flow_mode: bool = False,
1940 next_queue_item: QueueItem | None = None,
1941 sample_rate: int | None = None,
1942 next_sample_rate: int | None = None,
1943 ) -> bool:
1944 """Get the crossfade config for a queue item."""
1945 if smart_fades_mode == SmartFadesMode.DISABLED:
1946 return False
1947 if not (self.mass.players.get_player(queue_item.queue_id)):
1948 return False # just a guard
1949 if queue_item.media_type != MediaType.TRACK:
1950 self.logger.debug("Skipping crossfade: current item is not a track")
1951 return False
1952 # check if the next item is part of the same album
1953 next_item = next_queue_item or self.mass.player_queues.get_next_item(
1954 queue_item.queue_id, queue_item.queue_item_id
1955 )
1956 if not next_item:
1957 # there is no next item!
1958 self.logger.debug(
1959 "Crossfade not allowed: no next item found for %s (flow_mode=%s, queue_id=%s)",
1960 queue_item.name,
1961 flow_mode,
1962 queue_item.queue_id,
1963 )
1964 return False
1965 # check if next item is a track
1966 if next_item.media_type != MediaType.TRACK:
1967 self.logger.debug("Skipping crossfade: next item is not a track")
1968 return False
1969 if (
1970 isinstance(queue_item.media_item, Track)
1971 and isinstance(next_item.media_item, Track)
1972 and queue_item.media_item.album
1973 and next_item.media_item.album
1974 and queue_item.media_item.album == next_item.media_item.album
1975 and not self.mass.config.get_raw_core_config_value(
1976 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
1977 )
1978 ):
1979 # in general, crossfade is not desired for tracks of the same (gapless) album
1980 # because we have no accurate way to determine if the album is gapless or not,
1981 # for now we just never crossfade between tracks of the same album
1982 self.logger.debug("Skipping crossfade: next item is part of the same album")
1983 return False
1984
1985 # check if we're allowed to crossfade on different sample rates
1986 if (
1987 not flow_mode
1988 and sample_rate
1989 and next_sample_rate
1990 and sample_rate != next_sample_rate
1991 and not self.mass.config.get_raw_player_config_value(
1992 queue_item.queue_id,
1993 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
1994 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
1995 )
1996 ):
1997 self.logger.debug(
1998 "Skipping crossfade: player does not support gapless playback "
1999 "with different sample rates (%s vs %s)",
2000 sample_rate,
2001 next_sample_rate,
2002 )
2003 return False
2004
2005 return True
2006
2007 async def _periodic_garbage_collection(self) -> None:
2008 """Periodic garbage collection to free up memory from audio buffers and streams."""
2009 self.logger.log(
2010 VERBOSE_LOG_LEVEL,
2011 "Running periodic garbage collection...",
2012 )
2013 # Run garbage collection in executor to avoid blocking the event loop
2014 # Since this runs periodically (not in response to subprocess cleanup),
2015 # it's safe to run in a thread without causing thread-safety issues
2016 loop = asyncio.get_running_loop()
2017 collected = await loop.run_in_executor(None, gc.collect)
2018 self.logger.log(
2019 VERBOSE_LOG_LEVEL,
2020 "Garbage collection completed, collected %d objects",
2021 collected,
2022 )
2023 # Schedule next run in 15 minutes
2024 self.mass.call_later(900, self._periodic_garbage_collection)
2025
2026 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
2027 """Set up smart fades logger level."""
2028 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
2029 if log_level == "GLOBAL":
2030 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2031 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2032 else:
2033 self.smart_fades_analyzer.logger.setLevel(log_level)
2034 self.smart_fades_mixer.logger.setLevel(log_level)
2035