/
/
/
1"""
2Controller to stream audio to players.
3
4The streams controller hosts a basic, unprotected HTTP-only webserver
5purely to stream audio packets to players and some control endpoints such as
6the upnp callbacks and json rpc api for slimproto clients.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import gc
13import logging
14import os
15import urllib.parse
16from collections.abc import AsyncGenerator
17from dataclasses import dataclass
18from typing import TYPE_CHECKING, Final, cast
19
20from aiofiles.os import wrap
21from aiohttp import web
22from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
23from music_assistant_models.enums import (
24 ConfigEntryType,
25 ContentType,
26 MediaType,
27 PlayerFeature,
28 StreamType,
29 VolumeNormalizationMode,
30)
31from music_assistant_models.errors import (
32 AudioError,
33 InvalidDataError,
34 ProviderUnavailableError,
35 QueueEmpty,
36)
37from music_assistant_models.media_items import AudioFormat, Track
38from music_assistant_models.player_queue import PlayLogEntry
39
40from music_assistant.constants import (
41 ANNOUNCE_ALERT_FILE,
42 CONF_BIND_IP,
43 CONF_BIND_PORT,
44 CONF_CROSSFADE_DURATION,
45 CONF_ENTRY_ENABLE_ICY_METADATA,
46 CONF_ENTRY_LOG_LEVEL,
47 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES,
48 CONF_HTTP_PROFILE,
49 CONF_OUTPUT_CHANNELS,
50 CONF_OUTPUT_CODEC,
51 CONF_PUBLISH_IP,
52 CONF_SAMPLE_RATES,
53 CONF_SMART_FADES_MODE,
54 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
55 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
56 CONF_VOLUME_NORMALIZATION_RADIO,
57 CONF_VOLUME_NORMALIZATION_TRACKS,
58 DEFAULT_STREAM_HEADERS,
59 ICY_HEADERS,
60 INTERNAL_PCM_FORMAT,
61 SILENCE_FILE,
62 VERBOSE_LOG_LEVEL,
63)
64from music_assistant.controllers.players.helpers import AnnounceData
65from music_assistant.controllers.streams.smart_fades import SmartFadesMixer
66from music_assistant.controllers.streams.smart_fades.analyzer import SmartFadesAnalyzer
67from music_assistant.controllers.streams.smart_fades.fades import SMART_CROSSFADE_DURATION
68from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
69from music_assistant.helpers.audio import (
70 get_buffered_media_stream,
71 get_chunksize,
72 get_media_stream,
73 get_player_filter_params,
74 get_stream_details,
75 resample_pcm_audio,
76)
77from music_assistant.helpers.buffered_generator import buffered, use_buffer
78from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
79from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
80from music_assistant.helpers.util import (
81 divide_chunks,
82 format_ip_for_url,
83 get_ip_addresses,
84 get_total_system_memory,
85)
86from music_assistant.helpers.webserver import Webserver
87from music_assistant.models.core_controller import CoreController
88from music_assistant.models.music_provider import MusicProvider
89from music_assistant.models.plugin import PluginProvider, PluginSource
90from music_assistant.models.smart_fades import SmartFadesMode
91from music_assistant.providers.universal_group.constants import UGP_PREFIX
92from music_assistant.providers.universal_group.player import UniversalGroupPlayer
93
94if TYPE_CHECKING:
95 from music_assistant_models.config_entries import CoreConfig
96 from music_assistant_models.player import PlayerMedia
97 from music_assistant_models.player_queue import PlayerQueue
98 from music_assistant_models.queue_item import QueueItem
99 from music_assistant_models.streamdetails import StreamDetails
100
101 from music_assistant.mass import MusicAssistant
102 from music_assistant.models.player import Player
103
104
105isfile = wrap(os.path.isfile)
106
107CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
108CONF_ALLOW_CROSSFADE_SAME_ALBUM: Final[str] = "allow_crossfade_same_album"
109CONF_SMART_FADES_LOG_LEVEL: Final[str] = "smart_fades_log_level"
110
111# Calculate total system memory once at module load time
112TOTAL_SYSTEM_MEMORY_GB: Final[float] = get_total_system_memory()
113CONF_ALLOW_BUFFER_DEFAULT = TOTAL_SYSTEM_MEMORY_GB >= 8.0
114DEFAULT_PORT: Final[int] = 8097
115
116
117def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
118 """Parse PCM info from a codec/content_type string."""
119 params = (
120 dict(urllib.parse.parse_qsl(content_type.replace(";", "&"))) if ";" in content_type else {}
121 )
122 sample_rate = int(params.get("rate", 44100))
123 sample_size = int(params.get("bitrate", 16))
124 channels = int(params.get("channels", 2))
125 return (sample_rate, sample_size, channels)
126
127
128@dataclass
129class CrossfadeData:
130 """Data class to hold crossfade data."""
131
132 data: bytes
133 fade_in_size: int
134 pcm_format: AudioFormat # Format of the 'data' bytes (current/previous track's format)
135 fade_in_pcm_format: AudioFormat # Format for 'fade_in_size' (next track's format)
136 queue_item_id: str
137
138
139class StreamsController(CoreController):
140 """Webserver Controller to stream audio to players."""
141
142 domain: str = "streams"
143
144 def __init__(self, mass: MusicAssistant) -> None:
145 """Initialize instance."""
146 super().__init__(mass)
147 self._server = Webserver(self.logger, enable_dynamic_routes=True)
148 self.register_dynamic_route = self._server.register_dynamic_route
149 self.unregister_dynamic_route = self._server.unregister_dynamic_route
150 self.manifest.name = "Streamserver"
151 self.manifest.description = (
152 "Music Assistant's core controller that is responsible for "
153 "streaming audio to players on the local network."
154 )
155 self.manifest.icon = "cast-audio"
156 self.announcements: dict[str, AnnounceData] = {}
157 self._crossfade_data: dict[str, CrossfadeData] = {}
158 self._bind_ip: str = "0.0.0.0"
159 self._smart_fades_mixer = SmartFadesMixer(self)
160 self._smart_fades_analyzer = SmartFadesAnalyzer(self)
161
162 @property
163 def base_url(self) -> str:
164 """Return the base_url for the streamserver."""
165 return self._server.base_url
166
167 @property
168 def bind_ip(self) -> str:
169 """Return the IP address this streamserver is bound to."""
170 return self._bind_ip
171
172 @property
173 def smart_fades_mixer(self) -> SmartFadesMixer:
174 """Return the SmartFadesMixer instance."""
175 return self._smart_fades_mixer
176
177 @property
178 def smart_fades_analyzer(self) -> SmartFadesAnalyzer:
179 """Return the SmartFadesAnalyzer instance."""
180 return self._smart_fades_analyzer
181
182 async def get_config_entries(
183 self,
184 action: str | None = None,
185 values: dict[str, ConfigValueType] | None = None,
186 ) -> tuple[ConfigEntry, ...]:
187 """Return all Config Entries for this core module (if any)."""
188 ip_addresses = await get_ip_addresses(include_ipv6=True)
189 return (
190 ConfigEntry(
191 key=CONF_ALLOW_BUFFER,
192 type=ConfigEntryType.BOOLEAN,
193 default_value=CONF_ALLOW_BUFFER_DEFAULT,
194 label="Allow (in-memory) buffering of (track) audio",
195 description="By default, Music Assistant tries to be as resource "
196 "efficient as possible when streaming audio, especially considering "
197 "low-end devices such as Raspberry Pi's. This means that audio "
198 "buffering is disabled by default to reduce memory usage. \n\n"
199 "Enabling this option allows for in-memory buffering of audio, "
200 "which (massively) improves playback (and seeking) performance but it comes "
201 "at the cost of increased memory usage. "
202 "If you run Music Assistant on a capable device with enough memory, "
203 "enabling this option is strongly recommended.",
204 required=False,
205 category="playback",
206 ),
207 ConfigEntry(
208 key=CONF_VOLUME_NORMALIZATION_RADIO,
209 type=ConfigEntryType.STRING,
210 default_value=VolumeNormalizationMode.FALLBACK_FIXED_GAIN,
211 label="Volume normalization method for radio streams",
212 options=[
213 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
214 for x in VolumeNormalizationMode
215 ],
216 category="playback",
217 ),
218 ConfigEntry(
219 key=CONF_VOLUME_NORMALIZATION_TRACKS,
220 type=ConfigEntryType.STRING,
221 default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
222 label="Volume normalization method for tracks",
223 options=[
224 ConfigValueOption(x.value.replace("_", " ").title(), x.value)
225 for x in VolumeNormalizationMode
226 ],
227 category="playback",
228 ),
229 ConfigEntry(
230 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
231 type=ConfigEntryType.FLOAT,
232 range=(-20, 10),
233 default_value=-6,
234 label="Fixed/fallback gain adjustment for radio streams",
235 category="playback",
236 ),
237 ConfigEntry(
238 key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
239 type=ConfigEntryType.FLOAT,
240 range=(-20, 10),
241 default_value=-6,
242 label="Fixed/fallback gain adjustment for tracks",
243 category="playback",
244 ),
245 ConfigEntry(
246 key=CONF_ALLOW_CROSSFADE_SAME_ALBUM,
247 type=ConfigEntryType.BOOLEAN,
248 default_value=False,
249 label="Allow crossfade between tracks from the same album",
250 description="Enabling this option allows for crossfading between tracks "
251 "that are part of the same album.",
252 category="playback",
253 ),
254 ConfigEntry(
255 key=CONF_PUBLISH_IP,
256 type=ConfigEntryType.STRING,
257 default_value=ip_addresses[0],
258 label="Published IP address",
259 description="This IP address is communicated to players where to find this server."
260 "\nMake sure that this IP can be reached by players on the local network, "
261 "otherwise audio streaming will not work.",
262 required=False,
263 category="generic",
264 advanced=True,
265 requires_reload=True,
266 ),
267 ConfigEntry(
268 key=CONF_BIND_PORT,
269 type=ConfigEntryType.INTEGER,
270 default_value=DEFAULT_PORT,
271 label="TCP Port",
272 description="The TCP port to run the server. "
273 "Make sure that this server can be reached "
274 "on the given IP and TCP port by players on the local network.",
275 category="generic",
276 advanced=True,
277 requires_reload=True,
278 ),
279 ConfigEntry(
280 key=CONF_BIND_IP,
281 type=ConfigEntryType.STRING,
282 default_value="0.0.0.0",
283 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", "::", *ip_addresses}],
284 label="Bind to IP/interface",
285 description="Start the stream server on this specific interface. \n"
286 "Use 0.0.0.0 or :: to bind to all interfaces, which is the default. \n"
287 "This is an advanced setting that should normally "
288 "not be adjusted in regular setups.",
289 category="generic",
290 advanced=True,
291 required=False,
292 requires_reload=True,
293 ),
294 ConfigEntry(
295 key=CONF_SMART_FADES_LOG_LEVEL,
296 type=ConfigEntryType.STRING,
297 label="Smart Fades Log level",
298 description="Log level for the Smart Fades mixer and analyzer.",
299 options=CONF_ENTRY_LOG_LEVEL.options,
300 default_value="GLOBAL",
301 category="generic",
302 advanced=True,
303 ),
304 )
305
306 async def setup(self, config: CoreConfig) -> None:
307 """Async initialize of module."""
308 # copy log level to audio/ffmpeg loggers
309 AUDIO_LOGGER.setLevel(self.logger.level)
310 FFMPEG_LOGGER.setLevel(self.logger.level)
311 self._setup_smart_fades_logger(config)
312 # perform check for ffmpeg version
313 await check_ffmpeg_version()
314 # start the webserver
315 self.publish_port = config.get_value(CONF_BIND_PORT, DEFAULT_PORT)
316 self.publish_ip = config.get_value(CONF_PUBLISH_IP)
317 self._bind_ip = bind_ip = str(config.get_value(CONF_BIND_IP))
318 # print a big fat message in the log where the streamserver is running
319 # because this is a common source of issues for people with more complex setups
320 self.logger.log(
321 logging.INFO if self.mass.config.onboard_done else logging.WARNING,
322 "\n\n################################################################################\n"
323 "Starting streamserver on %s:%s\n"
324 "This is the IP address that is communicated to players.\n"
325 "If this is incorrect, audio will not play!\n"
326 "See the documentation how to configure the publish IP for the Streamserver\n"
327 "in Settings --> Core modules --> Streamserver\n"
328 "################################################################################\n",
329 self.publish_ip,
330 self.publish_port,
331 )
332 await self._server.setup(
333 bind_ip=bind_ip,
334 bind_port=cast("int", self.publish_port),
335 base_url=f"http://{format_ip_for_url(str(self.publish_ip))}:{self.publish_port}",
336 static_routes=[
337 (
338 "*",
339 "/flow/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
340 self.serve_queue_flow_stream,
341 ),
342 (
343 "*",
344 "/single/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}",
345 self.serve_queue_item_stream,
346 ),
347 (
348 "*",
349 "/command/{queue_id}/{command}.mp3",
350 self.serve_command_request,
351 ),
352 (
353 "*",
354 "/announcement/{player_id}.{fmt}",
355 self.serve_announcement_stream,
356 ),
357 (
358 "*",
359 "/pluginsource/{plugin_source}/{player_id}.{fmt}",
360 self.serve_plugin_source_stream,
361 ),
362 ],
363 )
364 # Start periodic garbage collection task
365 # This ensures memory from audio buffers and streams is cleaned up regularly
366 self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
367
368 async def close(self) -> None:
369 """Cleanup on exit."""
370 await self._server.close()
371
372 async def resolve_stream_url(
373 self,
374 player_id: str,
375 media: PlayerMedia,
376 ) -> str:
377 """Resolve the stream URL for the given PlayerMedia."""
378 conf_output_codec = await self.mass.config.get_player_config_value(
379 player_id, CONF_OUTPUT_CODEC, default="flac", return_type=str
380 )
381 output_codec = ContentType.try_parse(conf_output_codec or "flac")
382 fmt = output_codec.value
383 # handle raw pcm without exact format specifiers
384 if output_codec.is_pcm() and ";" not in fmt:
385 fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}"
386 extra_data = media.custom_data or {}
387 flow_mode = extra_data.get("flow_mode", False)
388 session_id = extra_data.get("session_id")
389 queue_item_id = media.queue_item_id
390 if not session_id or not queue_item_id:
391 raise InvalidDataError("Can not resolve stream URL: Invalid PlayerMedia data")
392 queue_id = media.source_id
393 base_path = "flow" if flow_mode else "single"
394 return f"{self._server.base_url}/{base_path}/{session_id}/{queue_id}/{queue_item_id}/{player_id}.{fmt}" # noqa: E501
395
396 async def get_plugin_source_url(
397 self,
398 plugin_source: PluginSource,
399 player_id: str,
400 ) -> str:
401 """Get the url for the Plugin Source stream/proxy."""
402 if plugin_source.audio_format.content_type.is_pcm():
403 fmt = ContentType.WAV.value
404 else:
405 fmt = plugin_source.audio_format.content_type.value
406 return f"{self._server.base_url}/pluginsource/{plugin_source.id}/{player_id}.{fmt}"
407
408 async def serve_queue_item_stream(self, request: web.Request) -> web.StreamResponse:
409 """Stream single queueitem audio to a player."""
410 self._log_request(request)
411 queue_id = request.match_info["queue_id"]
412 player_id = request.match_info["player_id"]
413 if not (queue := self.mass.player_queues.get(queue_id)):
414 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
415 session_id = request.match_info["session_id"]
416 if queue.session_id and session_id != queue.session_id:
417 raise web.HTTPNotFound(reason=f"Unknown (or invalid) session: {session_id}")
418 if not (player := self.mass.players.get_player(player_id)):
419 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
420 queue_item_id = request.match_info["queue_item_id"]
421 queue_item = self.mass.player_queues.get_item(queue_id, queue_item_id)
422 if not queue_item:
423 raise web.HTTPNotFound(reason=f"Unknown Queue item: {queue_item_id}")
424 if not queue_item.streamdetails:
425 try:
426 queue_item.streamdetails = await get_stream_details(
427 mass=self.mass, queue_item=queue_item
428 )
429 except Exception as e:
430 self.logger.error(
431 "Failed to get streamdetails for QueueItem %s: %s", queue_item_id, e
432 )
433 queue_item.available = False
434 raise web.HTTPNotFound(reason=f"No streamdetails for Queue item: {queue_item_id}")
435
436 # pick output format based on the streamdetails and player capabilities
437 pcm_format = await self._select_pcm_format(
438 player=player,
439 streamdetails=queue_item.streamdetails,
440 smartfades_enabled=True,
441 )
442 output_format = await self.get_output_format(
443 output_format_str=request.match_info["fmt"],
444 player=player,
445 content_sample_rate=pcm_format.sample_rate,
446 content_bit_depth=pcm_format.bit_depth,
447 )
448
449 # prepare request, add some DLNA/UPNP compatible headers
450 # icy-name is sanitized to avoid a "Potential header injection attack" exception by aiohttp
451 # see https://github.com/music-assistant/support/issues/4913
452 headers = {
453 **DEFAULT_STREAM_HEADERS,
454 "icy-name": queue_item.name.replace("\n", " ").replace("\r", " ").replace("\t", " "),
455 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01500000000000000000000000000000", # noqa: E501
456 "Accept-Ranges": "none",
457 "Content-Type": f"audio/{output_format.output_format_str}",
458 }
459 resp = web.StreamResponse(
460 status=200,
461 reason="OK",
462 headers=headers,
463 )
464 resp.content_type = f"audio/{output_format.output_format_str}"
465 http_profile = await self.mass.config.get_player_config_value(
466 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
467 )
468 if http_profile == "forced_content_length" and not queue_item.duration:
469 # just set an insane high content length to make sure the player keeps playing
470 resp.content_length = get_chunksize(output_format, 12 * 3600)
471 elif http_profile == "forced_content_length" and queue_item.duration:
472 # guess content length based on duration
473 resp.content_length = get_chunksize(output_format, queue_item.duration)
474 elif http_profile == "chunked":
475 resp.enable_chunked_encoding()
476
477 await resp.prepare(request)
478
479 # return early if this is not a GET request
480 if request.method != "GET":
481 return resp
482
483 if queue_item.media_type != MediaType.TRACK:
484 # no crossfade on non-tracks
485 smart_fades_mode = SmartFadesMode.DISABLED
486 else:
487 smart_fades_mode = await self.mass.config.get_player_config_value(
488 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
489 )
490 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
491 queue.queue_id, CONF_CROSSFADE_DURATION, 10
492 )
493 if (
494 smart_fades_mode != SmartFadesMode.DISABLED
495 and PlayerFeature.GAPLESS_PLAYBACK not in player.state.supported_features
496 ):
497 # crossfade is not supported on this player due to missing gapless playback
498 self.logger.warning(
499 "Crossfade disabled: Player %s does not support gapless playback, "
500 "consider enabling flow mode to enable crossfade on this player.",
501 player.state.name if player else "Unknown Player",
502 )
503 smart_fades_mode = SmartFadesMode.DISABLED
504
505 if smart_fades_mode != SmartFadesMode.DISABLED:
506 # crossfade is enabled, use special crossfaded single item stream
507 # where the crossfade of the next track is present in the stream of
508 # a single track. This only works if the player supports gapless playback!
509 audio_input = self.get_queue_item_stream_with_smartfade(
510 queue_item=queue_item,
511 pcm_format=pcm_format,
512 smart_fades_mode=smart_fades_mode,
513 standard_crossfade_duration=standard_crossfade_duration,
514 )
515 else:
516 # no crossfade, just a regular single item stream
517 audio_input = buffered(
518 self.get_queue_item_stream(
519 queue_item=queue_item,
520 pcm_format=pcm_format,
521 seek_position=queue_item.streamdetails.seek_position,
522 ),
523 buffer_size=10,
524 min_buffer_before_yield=2,
525 )
526 # stream the audio
527 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
528 # the desired output format for the player including any player specific filter params
529 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
530 if queue_item.media_type == MediaType.RADIO:
531 # keep very short buffer for radio streams
532 # to keep them (more or less) realtime and prevent time outs
533 read_rate_input_args = ["-readrate", "1.0", "-readrate_initial_burst", "2"]
534 else:
535 # just allow the player to buffer whatever it wants for single item streams
536 read_rate_input_args = None
537
538 first_chunk_received = False
539 bytes_sent = 0
540 async for chunk in get_ffmpeg_stream(
541 audio_input=audio_input,
542 input_format=pcm_format,
543 output_format=output_format,
544 filter_params=get_player_filter_params(
545 self.mass,
546 player_id=player.player_id,
547 input_format=pcm_format,
548 output_format=output_format,
549 ),
550 extra_input_args=read_rate_input_args,
551 ):
552 try:
553 await resp.write(chunk)
554 bytes_sent += len(chunk)
555 if not first_chunk_received:
556 first_chunk_received = True
557 # inform the queue that the track is now loaded in the buffer
558 # so for example the next track can be enqueued
559 self.mass.player_queues.track_loaded_in_buffer(
560 queue_item.queue_id, queue_item.queue_item_id
561 )
562 except (BrokenPipeError, ConnectionResetError, ConnectionError) as err:
563 if first_chunk_received and not player.stop_called:
564 # Player disconnected (unexpected) after receiving at least some data
565 # This could indicate buffering issues, network problems,
566 # or player-specific issues
567 bytes_expected = get_chunksize(output_format, queue_item.duration or 3600)
568 self.logger.warning(
569 "Player %s disconnected prematurely from stream for %s (%s) - "
570 "error: %s, sent %d bytes, expected (approx) bytes=%d",
571 queue.display_name,
572 queue_item.name,
573 queue_item.uri,
574 err.__class__.__name__,
575 bytes_sent,
576 bytes_expected,
577 )
578 break
579 if queue_item.streamdetails.stream_error:
580 self.logger.error(
581 "Error streaming QueueItem %s (%s) to %s - will try to skip to next item",
582 queue_item.name,
583 queue_item.uri,
584 queue.display_name,
585 )
586 # try to skip to the next item in the queue after a short delay
587 self.mass.call_later(5, self.mass.player_queues.next(queue_id))
588 return resp
589
590 async def serve_queue_flow_stream(self, request: web.Request) -> web.StreamResponse:
591 """Stream Queue Flow audio to player."""
592 self._log_request(request)
593 queue_id = request.match_info["queue_id"]
594 player_id = request.match_info["player_id"]
595 if not (queue := self.mass.player_queues.get(queue_id)):
596 raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}")
597 if not (player := self.mass.players.get_player(player_id)):
598 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
599 start_queue_item_id = request.match_info["queue_item_id"]
600 start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id)
601 if not start_queue_item:
602 raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}")
603
604 queue.flow_mode_stream_log = []
605
606 # select the highest possible PCM settings for this player
607 flow_pcm_format = await self._select_flow_format(player)
608
609 # work out output format/details
610 output_format = await self.get_output_format(
611 output_format_str=request.match_info["fmt"],
612 player=player,
613 content_sample_rate=flow_pcm_format.sample_rate,
614 content_bit_depth=flow_pcm_format.bit_depth,
615 )
616 # work out ICY metadata support
617 icy_preference = self.mass.config.get_raw_player_config_value(
618 queue_id,
619 CONF_ENTRY_ENABLE_ICY_METADATA.key,
620 CONF_ENTRY_ENABLE_ICY_METADATA.default_value,
621 )
622 enable_icy = request.headers.get("Icy-MetaData", "") == "1" and icy_preference != "disabled"
623 icy_meta_interval = 256000 if icy_preference == "full" else 16384
624
625 # prepare request, add some DLNA/UPNP compatible headers
626 headers = {
627 **DEFAULT_STREAM_HEADERS,
628 **ICY_HEADERS,
629 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
630 "Accept-Ranges": "none",
631 "Content-Type": f"audio/{output_format.output_format_str}",
632 }
633 if enable_icy:
634 headers["icy-metaint"] = str(icy_meta_interval)
635
636 resp = web.StreamResponse(
637 status=200,
638 reason="OK",
639 headers=headers,
640 )
641 http_profile = await self.mass.config.get_player_config_value(
642 queue_id, CONF_HTTP_PROFILE, default="default", return_type=str
643 )
644 if http_profile == "forced_content_length":
645 # just set an insane high content length to make sure the player keeps playing
646 resp.content_length = get_chunksize(output_format, 12 * 3600)
647 elif http_profile == "chunked":
648 resp.enable_chunked_encoding()
649
650 await resp.prepare(request)
651
652 # return early if this is not a GET request
653 if request.method != "GET":
654 return resp
655
656 # all checks passed, start streaming!
657 # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
658 # the desired output format for the player including any player specific filter params
659 # such as channels mixing, DSP, resampling and, only if needed, encoding to lossy formats
660 self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name)
661
662 async for chunk in get_ffmpeg_stream(
663 audio_input=self.get_queue_flow_stream(
664 queue=queue,
665 start_queue_item=start_queue_item,
666 pcm_format=flow_pcm_format,
667 ),
668 input_format=flow_pcm_format,
669 output_format=output_format,
670 filter_params=get_player_filter_params(
671 self.mass, player.player_id, flow_pcm_format, output_format
672 ),
673 # we need to slowly feed the music to avoid the player stopping and later
674 # restarting (or completely failing) the audio stream by keeping the buffer short.
675 # this is reported to be an issue especially with Chromecast players.
676 # see for example: https://github.com/music-assistant/support/issues/3717
677 # allow buffer ahead of 6 seconds and read rest in realtime
678 extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "6"],
679 chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
680 ):
681 try:
682 await resp.write(chunk)
683 except (BrokenPipeError, ConnectionResetError, ConnectionError):
684 # race condition
685 break
686
687 if not enable_icy:
688 continue
689
690 # if icy metadata is enabled, send the icy metadata after the chunk
691 if (
692 # use current item here and not buffered item, otherwise
693 # the icy metadata will be too much ahead
694 (current_item := queue.current_item)
695 and current_item.streamdetails
696 and current_item.streamdetails.stream_title
697 ):
698 title = current_item.streamdetails.stream_title
699 elif queue and current_item and current_item.name:
700 title = current_item.name
701 else:
702 title = "Music Assistant"
703 metadata = f"StreamTitle='{title}';".encode()
704 if icy_preference == "full" and current_item and current_item.image:
705 metadata += f"StreamURL='{current_item.image.path}'".encode()
706 while len(metadata) % 16 != 0:
707 metadata += b"\x00"
708 length = len(metadata)
709 length_b = chr(int(length / 16)).encode()
710 await resp.write(length_b + metadata)
711
712 return resp
713
714 async def serve_command_request(self, request: web.Request) -> web.FileResponse:
715 """Handle special 'command' request for a player."""
716 self._log_request(request)
717 queue_id = request.match_info["queue_id"]
718 command = request.match_info["command"]
719 if command == "next":
720 self.mass.create_task(self.mass.player_queues.next(queue_id))
721 return web.FileResponse(SILENCE_FILE, headers={"icy-name": "Music Assistant"})
722
723 async def serve_announcement_stream(self, request: web.Request) -> web.StreamResponse:
724 """Stream announcement audio to a player."""
725 self._log_request(request)
726 player_id = request.match_info["player_id"]
727 player = self.mass.player_queues.get(player_id)
728 if not player:
729 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
730 if not (announce_data := self.announcements.get(player_id)):
731 raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}")
732
733 # work out output format/details
734 fmt = request.match_info["fmt"]
735 audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
736
737 http_profile = await self.mass.config.get_player_config_value(
738 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
739 )
740 if http_profile == "forced_content_length":
741 # given the fact that an announcement is just a short audio clip,
742 # just send it over completely at once so we have a fixed content length
743 data = b""
744 async for chunk in self.get_announcement_stream(
745 announcement_url=announce_data["announcement_url"],
746 output_format=audio_format,
747 pre_announce=announce_data["pre_announce"],
748 pre_announce_url=announce_data["pre_announce_url"],
749 ):
750 data += chunk
751 return web.Response(
752 body=data,
753 content_type=f"audio/{audio_format.output_format_str}",
754 headers=DEFAULT_STREAM_HEADERS,
755 )
756
757 resp = web.StreamResponse(
758 status=200,
759 reason="OK",
760 headers=DEFAULT_STREAM_HEADERS,
761 )
762 resp.content_type = f"audio/{audio_format.output_format_str}"
763 if http_profile == "chunked":
764 resp.enable_chunked_encoding()
765
766 await resp.prepare(request)
767
768 # return early if this is not a GET request
769 if request.method != "GET":
770 return resp
771
772 # all checks passed, start streaming!
773 self.logger.debug(
774 "Start serving audio stream for Announcement %s to %s",
775 announce_data["announcement_url"],
776 player.state.name,
777 )
778 async for chunk in self.get_announcement_stream(
779 announcement_url=announce_data["announcement_url"],
780 output_format=audio_format,
781 pre_announce=announce_data["pre_announce"],
782 pre_announce_url=announce_data["pre_announce_url"],
783 ):
784 try:
785 await resp.write(chunk)
786 except (BrokenPipeError, ConnectionResetError):
787 break
788
789 self.logger.debug(
790 "Finished serving audio stream for Announcement %s to %s",
791 announce_data["announcement_url"],
792 player.state.name,
793 )
794
795 return resp
796
797 async def serve_plugin_source_stream(self, request: web.Request) -> web.StreamResponse:
798 """Stream PluginSource audio to a player."""
799 self._log_request(request)
800 plugin_source_id = request.match_info["plugin_source"]
801 provider = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
802 if not provider:
803 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
804 # work out output format/details
805 player_id = request.match_info["player_id"]
806 player = self.mass.players.get_player(player_id)
807 if not player:
808 raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}")
809 plugin_source = provider.get_source()
810 output_format = await self.get_output_format(
811 output_format_str=request.match_info["fmt"],
812 player=player,
813 content_sample_rate=plugin_source.audio_format.sample_rate,
814 content_bit_depth=plugin_source.audio_format.bit_depth,
815 )
816 headers = {
817 **DEFAULT_STREAM_HEADERS,
818 "contentFeatures.dlna.org": "DLNA.ORG_OP=01;DLNA.ORG_FLAGS=01700000000000000000000000000000", # noqa: E501
819 "icy-name": plugin_source.name,
820 "Accept-Ranges": "none",
821 "Content-Type": f"audio/{output_format.output_format_str}",
822 }
823
824 resp = web.StreamResponse(
825 status=200,
826 reason="OK",
827 headers=headers,
828 )
829 resp.content_type = f"audio/{output_format.output_format_str}"
830 http_profile = await self.mass.config.get_player_config_value(
831 player_id, CONF_HTTP_PROFILE, default="default", return_type=str
832 )
833 if http_profile == "forced_content_length":
834 # just set an insanely high content length to make sure the player keeps playing
835 resp.content_length = get_chunksize(output_format, 12 * 3600)
836 elif http_profile == "chunked":
837 resp.enable_chunked_encoding()
838
839 await resp.prepare(request)
840
841 # return early if this is not a GET request
842 if request.method != "GET":
843 return resp
844
845 # all checks passed, start streaming!
846 if not plugin_source.audio_format:
847 raise InvalidDataError(f"No audio format for plugin source {plugin_source_id}")
848 async for chunk in self.get_plugin_source_stream(
849 plugin_source_id=plugin_source_id,
850 output_format=output_format,
851 player_id=player_id,
852 player_filter_params=get_player_filter_params(
853 self.mass, player_id, plugin_source.audio_format, output_format
854 ),
855 ):
856 try:
857 await resp.write(chunk)
858 except (BrokenPipeError, ConnectionResetError, ConnectionError):
859 break
860 return resp
861
862 def get_command_url(self, player_or_queue_id: str, command: str) -> str:
863 """Get the url for the special command stream."""
864 return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3"
865
866 def get_announcement_url(
867 self,
868 player_id: str,
869 announce_data: AnnounceData,
870 content_type: ContentType = ContentType.MP3,
871 ) -> str:
872 """Get the url for the special announcement stream."""
873 self.announcements[player_id] = announce_data
874 # use stream server to host announcement on local network
875 # this ensures playback on all players, including ones that do not
876 # like https hosts and it also offers the pre-announce 'bell'
877 return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
878
879 def get_stream(
880 self, media: PlayerMedia, pcm_format: AudioFormat, force_flow_mode: bool = False
881 ) -> AsyncGenerator[bytes, None]:
882 """
883 Get a stream of the given media as raw PCM audio.
884
885 This is used as helper for player providers that can consume the raw PCM
886 audio stream directly (e.g. AirPlay) and not rely on HTTP transport.
887 """
888 # select audio source
889 if media.media_type == MediaType.ANNOUNCEMENT:
890 # special case: stream announcement
891 assert media.custom_data
892 audio_source = self.get_announcement_stream(
893 media.custom_data["announcement_url"],
894 output_format=pcm_format,
895 pre_announce=media.custom_data["pre_announce"],
896 pre_announce_url=media.custom_data["pre_announce_url"],
897 )
898 elif media.media_type == MediaType.PLUGIN_SOURCE:
899 # special case: plugin source stream
900 assert media.custom_data
901 audio_source = self.get_plugin_source_stream(
902 plugin_source_id=media.custom_data["source_id"],
903 output_format=pcm_format,
904 # need to pass player_id from the PlayerMedia object
905 # because this could have been a group
906 player_id=media.custom_data["player_id"],
907 )
908 elif (
909 media.media_type == MediaType.FLOW_STREAM
910 and media.source_id
911 and media.source_id.startswith(UGP_PREFIX)
912 and media.uri
913 and "/ugp/" in media.uri
914 ):
915 # special case: member player accessing UGP stream
916 # Check URI to distinguish from the UGP accessing its own stream
917 ugp_player = cast("UniversalGroupPlayer", self.mass.players.get_player(media.source_id))
918 ugp_stream = ugp_player.stream
919 assert ugp_stream is not None # for type checker
920 if ugp_stream.base_pcm_format == pcm_format:
921 # no conversion needed
922 audio_source = ugp_stream.subscribe_raw()
923 else:
924 audio_source = ugp_stream.get_stream(output_format=pcm_format)
925 elif (
926 media.source_id
927 and media.queue_item_id
928 and (media.media_type == MediaType.FLOW_STREAM or force_flow_mode)
929 ):
930 # regular queue (flow) stream request
931 queue = self.mass.player_queues.get(media.source_id)
932 assert queue
933 start_queue_item = self.mass.player_queues.get_item(
934 media.source_id, media.queue_item_id
935 )
936 assert start_queue_item
937 audio_source = self.mass.streams.get_queue_flow_stream(
938 queue=queue,
939 start_queue_item=start_queue_item,
940 pcm_format=pcm_format,
941 )
942 elif media.source_id and media.queue_item_id:
943 # single item stream (e.g. radio)
944 queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
945 assert queue_item
946 audio_source = buffered(
947 self.get_queue_item_stream(
948 queue_item=queue_item,
949 pcm_format=pcm_format,
950 ),
951 buffer_size=10,
952 min_buffer_before_yield=2,
953 )
954 else:
955 # assume url or some other direct path
956 # NOTE: this will fail if its an uri not playable by ffmpeg
957 audio_source = get_ffmpeg_stream(
958 audio_input=media.uri,
959 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
960 output_format=pcm_format,
961 )
962 return audio_source
963
964 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
965 async def get_queue_flow_stream(
966 self,
967 queue: PlayerQueue,
968 start_queue_item: QueueItem,
969 pcm_format: AudioFormat,
970 ) -> AsyncGenerator[bytes, None]:
971 """
972 Get a flow stream of all tracks in the queue as raw PCM audio.
973
974 yields chunks of exactly 1 second of audio in the given pcm_format.
975 """
976 # ruff: noqa: PLR0915
977 assert pcm_format.content_type.is_pcm()
978 queue_track = None
979 last_fadeout_part: bytes = b""
980 last_streamdetails: StreamDetails | None = None
981 last_play_log_entry: PlayLogEntry | None = None
982 queue.flow_mode = True
983 if not start_queue_item:
984 # this can happen in some (edge case) race conditions
985 return
986 pcm_sample_size = pcm_format.pcm_sample_size
987 if start_queue_item.media_type != MediaType.TRACK:
988 # no crossfade on non-tracks
989 smart_fades_mode = SmartFadesMode.DISABLED
990 standard_crossfade_duration = 0
991 else:
992 smart_fades_mode = await self.mass.config.get_player_config_value(
993 queue.queue_id, CONF_SMART_FADES_MODE, return_type=SmartFadesMode
994 )
995 standard_crossfade_duration = self.mass.config.get_raw_player_config_value(
996 queue.queue_id, CONF_CROSSFADE_DURATION, 10
997 )
998 self.logger.info(
999 "Start Queue Flow stream for Queue %s - crossfade: %s %s",
1000 queue.display_name,
1001 smart_fades_mode,
1002 f"({standard_crossfade_duration}s)"
1003 if smart_fades_mode == SmartFadesMode.STANDARD_CROSSFADE
1004 else "",
1005 )
1006 total_bytes_sent = 0
1007 total_chunks_received = 0
1008
1009 while True:
1010 # get (next) queue item to stream
1011 if queue_track is None:
1012 queue_track = start_queue_item
1013 else:
1014 try:
1015 queue_track = await self.mass.player_queues.load_next_queue_item(
1016 queue.queue_id, queue_track.queue_item_id
1017 )
1018 except QueueEmpty:
1019 break
1020
1021 if queue_track.streamdetails is None:
1022 raise InvalidDataError(
1023 "No Streamdetails known for queue item %s",
1024 queue_track.queue_item_id,
1025 )
1026
1027 self.logger.debug(
1028 "Start Streaming queue track: %s (%s) for queue %s",
1029 queue_track.streamdetails.uri,
1030 queue_track.name,
1031 queue.display_name,
1032 )
1033 # append to play log so the queue controller can work out which track is playing
1034 play_log_entry = PlayLogEntry(queue_track.queue_item_id)
1035 queue.flow_mode_stream_log.append(play_log_entry)
1036 # calculate crossfade buffer size
1037 crossfade_buffer_duration = (
1038 SMART_CROSSFADE_DURATION
1039 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1040 else standard_crossfade_duration
1041 )
1042 crossfade_buffer_duration = min(
1043 crossfade_buffer_duration,
1044 int(queue_track.streamdetails.duration / 2)
1045 if queue_track.streamdetails.duration
1046 else crossfade_buffer_duration,
1047 )
1048 # Ensure crossfade buffer size is aligned to frame boundaries
1049 # Frame size = bytes_per_sample * channels
1050 bytes_per_sample = pcm_format.bit_depth // 8
1051 frame_size = bytes_per_sample * pcm_format.channels
1052 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1053 # Round down to nearest frame boundary
1054 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1055
1056 bytes_written = 0
1057 buffer = b""
1058 # handle incoming audio chunks
1059 first_chunk_received = False
1060 # buffer size needs to be big enough to include the crossfade part
1061
1062 async for chunk in self.get_queue_item_stream(
1063 queue_track,
1064 pcm_format=pcm_format,
1065 seek_position=queue_track.streamdetails.seek_position,
1066 raise_on_error=False,
1067 ):
1068 total_chunks_received += 1
1069 if not first_chunk_received:
1070 first_chunk_received = True
1071 # inform the queue that the track is now loaded in the buffer
1072 # so the next track can be preloaded
1073 self.mass.player_queues.track_loaded_in_buffer(
1074 queue.queue_id, queue_track.queue_item_id
1075 )
1076 if total_chunks_received < 10 and smart_fades_mode != SmartFadesMode.DISABLED:
1077 # we want a stream to start as quickly as possible
1078 # so for the first 10 chunks we keep a very short buffer
1079 req_buffer_size = pcm_format.pcm_sample_size
1080 else:
1081 req_buffer_size = (
1082 pcm_sample_size
1083 if smart_fades_mode == SmartFadesMode.DISABLED
1084 else crossfade_buffer_size
1085 )
1086
1087 # ALWAYS APPEND CHUNK TO BUFFER
1088 buffer += chunk
1089 del chunk
1090 if len(buffer) < req_buffer_size:
1091 # buffer is not full enough, move on
1092 # yield control to event loop with 10ms delay
1093 await asyncio.sleep(0.01)
1094 continue
1095
1096 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1097 if last_fadeout_part and last_streamdetails:
1098 # perform crossfade
1099 fadein_part = buffer[:crossfade_buffer_size]
1100 remaining_bytes = buffer[crossfade_buffer_size:]
1101 # Use the mixer to handle all crossfade logic
1102 crossfade_part = await self._smart_fades_mixer.mix(
1103 fade_in_part=fadein_part,
1104 fade_out_part=last_fadeout_part,
1105 fade_in_streamdetails=queue_track.streamdetails,
1106 fade_out_streamdetails=last_streamdetails,
1107 pcm_format=pcm_format,
1108 standard_crossfade_duration=standard_crossfade_duration,
1109 mode=smart_fades_mode,
1110 )
1111 # because the crossfade exists of both the fadein and fadeout part
1112 # we need to correct the bytes_written accordingly so the duration
1113 # calculations at the end of the track are correct
1114 crossfade_part_len = len(crossfade_part)
1115 bytes_written += int(crossfade_part_len / 2)
1116 if last_play_log_entry:
1117 assert last_play_log_entry.seconds_streamed is not None
1118 last_play_log_entry.seconds_streamed += (
1119 crossfade_part_len / 2 / pcm_sample_size
1120 )
1121 # yield crossfade_part (in pcm_sample_size chunks)
1122 for _chunk in divide_chunks(crossfade_part, pcm_sample_size):
1123 yield _chunk
1124 del _chunk
1125 del crossfade_part
1126 # also write the leftover bytes from the crossfade action
1127 if remaining_bytes:
1128 yield remaining_bytes
1129 bytes_written += len(remaining_bytes)
1130 del remaining_bytes
1131 # clear vars
1132 last_fadeout_part = b""
1133 last_streamdetails = None
1134 buffer = b""
1135
1136 #### OTHER: enough data in buffer, feed to output
1137 while len(buffer) > req_buffer_size:
1138 yield buffer[:pcm_sample_size]
1139 bytes_written += pcm_sample_size
1140 buffer = buffer[pcm_sample_size:]
1141
1142 #### HANDLE END OF TRACK
1143 if last_fadeout_part:
1144 # edge case: we did not get enough data to make the crossfade
1145 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1146 yield _chunk
1147 del _chunk
1148 bytes_written += len(last_fadeout_part)
1149 last_fadeout_part = b""
1150 crossfade_allowed = self._crossfade_allowed(
1151 queue_track, smart_fades_mode=smart_fades_mode, flow_mode=True
1152 )
1153 if crossfade_allowed:
1154 # if crossfade is enabled, save fadeout part to pickup for next track
1155 last_fadeout_part = buffer[-crossfade_buffer_size:]
1156 last_streamdetails = queue_track.streamdetails
1157 last_play_log_entry = play_log_entry
1158 remaining_bytes = buffer[:-crossfade_buffer_size]
1159 if remaining_bytes:
1160 yield remaining_bytes
1161 bytes_written += len(remaining_bytes)
1162 del remaining_bytes
1163 elif smart_fades_mode != SmartFadesMode.DISABLED:
1164 self.logger.debug(
1165 "Flow mode: crossfade NOT allowed for track %s on queue %s"
1166 " - fadeout part not saved",
1167 queue_track.name,
1168 queue.display_name,
1169 )
1170 if not crossfade_allowed and buffer:
1171 # no crossfade enabled, just yield the buffer last part
1172 bytes_written += len(buffer)
1173 for _chunk in divide_chunks(buffer, pcm_sample_size):
1174 yield _chunk
1175 del _chunk
1176 # make sure the buffer gets cleaned up
1177 del buffer
1178
1179 # update duration details based on the actual pcm data we sent
1180 # this also accounts for crossfade and silence stripping
1181 seconds_streamed = bytes_written / pcm_sample_size
1182 queue_track.streamdetails.seconds_streamed = seconds_streamed
1183 queue_track.streamdetails.duration = int(
1184 queue_track.streamdetails.seek_position + seconds_streamed
1185 )
1186 play_log_entry.seconds_streamed = seconds_streamed
1187 play_log_entry.duration = queue_track.streamdetails.duration
1188 total_bytes_sent += bytes_written
1189 self.logger.debug(
1190 "Finished Streaming queue track: %s (%s) on queue %s",
1191 queue_track.streamdetails.uri,
1192 queue_track.name,
1193 queue.display_name,
1194 )
1195 #### HANDLE END OF QUEUE FLOW STREAM
1196 # end of queue flow: make sure we yield the last_fadeout_part
1197 if last_fadeout_part:
1198 for _chunk in divide_chunks(last_fadeout_part, pcm_sample_size):
1199 yield _chunk
1200 del _chunk
1201 # correct seconds streamed/duration
1202 last_part_seconds = len(last_fadeout_part) / pcm_sample_size
1203 streamdetails = queue_track.streamdetails
1204 assert streamdetails is not None
1205 streamdetails.seconds_streamed = (
1206 streamdetails.seconds_streamed or 0
1207 ) + last_part_seconds
1208 streamdetails.duration = int((streamdetails.duration or 0) + last_part_seconds)
1209 last_fadeout_part = b""
1210 total_bytes_sent += bytes_written
1211 self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name)
1212
1213 async def get_announcement_stream(
1214 self,
1215 announcement_url: str,
1216 output_format: AudioFormat,
1217 pre_announce: bool | str = False,
1218 pre_announce_url: str = ANNOUNCE_ALERT_FILE,
1219 ) -> AsyncGenerator[bytes, None]:
1220 """Get the special announcement stream."""
1221 announcement_data: asyncio.Queue[bytes | None] = asyncio.Queue(10)
1222 # we are doing announcement in PCM first to avoid multiple encodings
1223 # when mixing pre-announce and announcement
1224 # also we have to deal with some TTS sources being super slow in delivering audio
1225 # so we take an approach where we start fetching the announcement in the background
1226 # while we can already start playing the pre-announce sound (if any)
1227
1228 pcm_format = (
1229 output_format
1230 if output_format.content_type.is_pcm()
1231 else AudioFormat(
1232 sample_rate=output_format.sample_rate,
1233 content_type=ContentType.PCM_S16LE,
1234 bit_depth=16,
1235 channels=output_format.channels,
1236 )
1237 )
1238
1239 async def fetch_announcement() -> None:
1240 fmt = announcement_url.rsplit(".")[-1]
1241 async for chunk in get_ffmpeg_stream(
1242 audio_input=announcement_url,
1243 input_format=AudioFormat(content_type=ContentType.try_parse(fmt)),
1244 output_format=pcm_format,
1245 chunk_size=get_chunksize(pcm_format, 1),
1246 ):
1247 await announcement_data.put(chunk)
1248 await announcement_data.put(None) # signal end of stream
1249
1250 self.mass.create_task(fetch_announcement())
1251
1252 async def _announcement_stream() -> AsyncGenerator[bytes, None]:
1253 """Generate the PCM audio stream for the announcement + optional pre-announce."""
1254 if pre_announce:
1255 async for chunk in get_ffmpeg_stream(
1256 audio_input=pre_announce_url,
1257 input_format=AudioFormat(content_type=ContentType.try_parse(pre_announce_url)),
1258 output_format=pcm_format,
1259 chunk_size=get_chunksize(pcm_format, 1),
1260 ):
1261 yield chunk
1262 # pad silence while we're waiting for the announcement to be ready
1263 while announcement_data.empty():
1264 yield b"\0" * int(
1265 pcm_format.sample_rate * (pcm_format.bit_depth / 8) * pcm_format.channels * 0.1
1266 )
1267 await asyncio.sleep(0.1)
1268 # stream announcement
1269 while True:
1270 announcement_chunk = await announcement_data.get()
1271 if announcement_chunk is None:
1272 break
1273 yield announcement_chunk
1274
1275 if output_format == pcm_format:
1276 # no need to re-encode, just yield the raw PCM stream
1277 async for chunk in _announcement_stream():
1278 yield chunk
1279 return
1280
1281 # stream final announcement in requested output format
1282 async for chunk in get_ffmpeg_stream(
1283 audio_input=_announcement_stream(),
1284 input_format=pcm_format,
1285 output_format=output_format,
1286 ):
1287 yield chunk
1288
1289 async def get_plugin_source_stream(
1290 self,
1291 plugin_source_id: str,
1292 output_format: AudioFormat,
1293 player_id: str,
1294 player_filter_params: list[str] | None = None,
1295 ) -> AsyncGenerator[bytes, None]:
1296 """Get the special plugin source stream."""
1297 plugin_prov = cast("PluginProvider", self.mass.get_provider(plugin_source_id))
1298 if not plugin_prov:
1299 raise ProviderUnavailableError(f"Unknown PluginSource: {plugin_source_id}")
1300
1301 plugin_source = plugin_prov.get_source()
1302 self.logger.debug(
1303 "Start streaming PluginSource %s to %s using output format %s",
1304 plugin_source_id,
1305 player_id,
1306 output_format,
1307 )
1308 # this should already be set by the player controller, but just to be sure
1309 plugin_source.in_use_by = player_id
1310
1311 try:
1312 async for chunk in get_ffmpeg_stream(
1313 audio_input=cast(
1314 "str | AsyncGenerator[bytes, None]",
1315 plugin_prov.get_audio_stream(player_id)
1316 if plugin_source.stream_type == StreamType.CUSTOM
1317 else plugin_source.path,
1318 ),
1319 input_format=plugin_source.audio_format,
1320 output_format=output_format,
1321 filter_params=player_filter_params,
1322 extra_input_args=["-y", "-re"],
1323 ):
1324 if plugin_source.in_use_by != player_id:
1325 # another player took over or the stream ended, stop streaming
1326 break
1327 yield chunk
1328 finally:
1329 self.logger.debug(
1330 "Finished streaming PluginSource %s to %s", plugin_source_id, player_id
1331 )
1332 await asyncio.sleep(1) # prevent race conditions when selecting source
1333 if plugin_source.in_use_by == player_id:
1334 # release control
1335 plugin_source.in_use_by = None
1336
1337 async def get_queue_item_stream(
1338 self,
1339 queue_item: QueueItem,
1340 pcm_format: AudioFormat,
1341 seek_position: int = 0,
1342 raise_on_error: bool = True,
1343 ) -> AsyncGenerator[bytes, None]:
1344 """Get the (PCM) audio stream for a single queue item."""
1345 # collect all arguments for ffmpeg
1346 streamdetails = queue_item.streamdetails
1347 assert streamdetails
1348 filter_params: list[str] = []
1349
1350 # handle volume normalization
1351 gain_correct: float | None = None
1352 if streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC:
1353 # volume normalization using loudnorm filter (in dynamic mode)
1354 # which also collects the measurement on the fly during playback
1355 # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
1356 filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
1357 filter_rule += ":print_format=json"
1358 filter_params.append(filter_rule)
1359 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN:
1360 # apply user defined fixed volume/gain correction
1361 config_key = (
1362 CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS
1363 if streamdetails.media_type == MediaType.TRACK
1364 else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
1365 )
1366 gain_value = await self.mass.config.get_core_config_value(
1367 self.domain, config_key, default=0.0, return_type=float
1368 )
1369 gain_correct = round(gain_value, 2)
1370 filter_params.append(f"volume={gain_correct}dB")
1371 elif streamdetails.volume_normalization_mode == VolumeNormalizationMode.MEASUREMENT_ONLY:
1372 # volume normalization with known loudness measurement
1373 # apply volume/gain correction
1374 target_loudness = (
1375 float(streamdetails.target_loudness)
1376 if streamdetails.target_loudness is not None
1377 else 0.0
1378 )
1379 if streamdetails.prefer_album_loudness and streamdetails.loudness_album is not None:
1380 gain_correct = target_loudness - float(streamdetails.loudness_album)
1381 elif streamdetails.loudness is not None:
1382 gain_correct = target_loudness - float(streamdetails.loudness)
1383 else:
1384 gain_correct = 0.0
1385 gain_correct = round(gain_correct, 2)
1386 filter_params.append(f"volume={gain_correct}dB")
1387 streamdetails.volume_normalization_gain_correct = gain_correct
1388
1389 allow_buffer = bool(
1390 self.mass.config.get_raw_core_config_value(
1391 self.domain, CONF_ALLOW_BUFFER, CONF_ALLOW_BUFFER_DEFAULT
1392 )
1393 and streamdetails.duration
1394 )
1395
1396 self.logger.debug(
1397 "Starting queue item stream for %s (%s)"
1398 " - using buffer: %s"
1399 " - using fade-in: %s"
1400 " - using volume normalization: %s",
1401 queue_item.name,
1402 streamdetails.uri,
1403 allow_buffer,
1404 streamdetails.fade_in,
1405 streamdetails.volume_normalization_mode,
1406 )
1407 if allow_buffer:
1408 media_stream_gen = get_buffered_media_stream(
1409 self.mass,
1410 streamdetails=streamdetails,
1411 pcm_format=pcm_format,
1412 seek_position=int(seek_position),
1413 filter_params=filter_params,
1414 )
1415 else:
1416 media_stream_gen = get_media_stream(
1417 self.mass,
1418 streamdetails=streamdetails,
1419 pcm_format=pcm_format,
1420 seek_position=int(seek_position),
1421 filter_params=filter_params,
1422 )
1423
1424 first_chunk_received = False
1425 fade_in_buffer = b""
1426 bytes_received = 0
1427 finished = False
1428 stream_started_at = asyncio.get_event_loop().time()
1429 try:
1430 async for chunk in media_stream_gen:
1431 bytes_received += len(chunk)
1432 if not first_chunk_received:
1433 first_chunk_received = True
1434 self.logger.debug(
1435 "First audio chunk received for %s (%s) after %.2f seconds",
1436 queue_item.name,
1437 streamdetails.uri,
1438 asyncio.get_event_loop().time() - stream_started_at,
1439 )
1440 # handle optional fade-in
1441 if streamdetails.fade_in:
1442 if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
1443 fade_in_buffer += chunk
1444 elif fade_in_buffer:
1445 async for fade_chunk in get_ffmpeg_stream(
1446 # NOTE: get_ffmpeg_stream signature says str | AsyncGenerator
1447 # but FFMpeg class actually accepts bytes too. This works at
1448 # runtime but needs type: ignore for mypy.
1449 audio_input=fade_in_buffer + chunk, # type: ignore[arg-type]
1450 input_format=pcm_format,
1451 output_format=pcm_format,
1452 filter_params=["afade=type=in:start_time=0:duration=3"],
1453 ):
1454 yield fade_chunk
1455 fade_in_buffer = b""
1456 streamdetails.fade_in = False
1457 else:
1458 yield chunk
1459 # help garbage collection by explicitly deleting chunk
1460 del chunk
1461 finished = True
1462 except AudioError as err:
1463 streamdetails.stream_error = True
1464 queue_item.available = False
1465 if raise_on_error:
1466 raise
1467 # yes, we swallow the error here after logging it
1468 # so the outer stream can handle it gracefully
1469 self.logger.error(
1470 "AudioError while streaming queue item %s (%s): %s",
1471 queue_item.name,
1472 streamdetails.uri,
1473 err,
1474 )
1475 finally:
1476 # determine how many seconds we've streamed
1477 # for pcm output we can calculate this easily
1478 seconds_streamed = bytes_received / pcm_format.pcm_sample_size
1479 streamdetails.seconds_streamed = seconds_streamed
1480 self.logger.debug(
1481 "stream %s for %s in %.2f seconds - seconds streamed/buffered: %.2f",
1482 "aborted" if not finished else "finished",
1483 streamdetails.uri,
1484 asyncio.get_event_loop().time() - stream_started_at,
1485 seconds_streamed,
1486 )
1487 # report stream to provider
1488 if (finished or seconds_streamed >= 90) and (
1489 music_prov := self.mass.get_provider(streamdetails.provider)
1490 ):
1491 if TYPE_CHECKING: # avoid circular import
1492 assert isinstance(music_prov, MusicProvider)
1493 self.mass.create_task(music_prov.on_streamed(streamdetails))
1494
1495 @use_buffer(buffer_size=30, min_buffer_before_yield=2)
1496 async def get_queue_item_stream_with_smartfade(
1497 self,
1498 queue_item: QueueItem,
1499 pcm_format: AudioFormat,
1500 smart_fades_mode: SmartFadesMode = SmartFadesMode.SMART_CROSSFADE,
1501 standard_crossfade_duration: int = 10,
1502 ) -> AsyncGenerator[bytes, None]:
1503 """Get the audio stream for a single queue item with (smart) crossfade to the next item."""
1504 queue = self.mass.player_queues.get(queue_item.queue_id)
1505 if not queue:
1506 raise RuntimeError(f"Queue {queue_item.queue_id} not found")
1507
1508 streamdetails = queue_item.streamdetails
1509 assert streamdetails
1510 crossfade_data = self._crossfade_data.pop(queue.queue_id, None)
1511 self.logger.debug(
1512 "Crossfade data pop for queue %s (track: %s): %s",
1513 queue.display_name,
1514 queue_item.name,
1515 "found" if crossfade_data else "EMPTY - no crossfade data from previous track",
1516 )
1517
1518 if crossfade_data and streamdetails.seek_position > 0:
1519 # don't do crossfade when seeking into track
1520 crossfade_data = None
1521 if crossfade_data and (crossfade_data.queue_item_id != queue_item.queue_item_id):
1522 # edge case alert: the next item changed just while we were preloading/crossfading
1523 self.logger.warning(
1524 "Skipping crossfade data for queue %s - next item changed!", queue.display_name
1525 )
1526 crossfade_data = None
1527
1528 self.logger.debug(
1529 "Start Streaming queue track: %s (%s) for queue %s "
1530 "- crossfade mode: %s "
1531 "- crossfading from previous track: %s ",
1532 queue_item.streamdetails.uri if queue_item.streamdetails else "Unknown URI",
1533 queue_item.name,
1534 queue.display_name,
1535 smart_fades_mode,
1536 "true" if crossfade_data else "false",
1537 )
1538
1539 buffer = b""
1540 bytes_written = 0
1541 # calculate crossfade buffer size
1542 crossfade_buffer_duration = (
1543 SMART_CROSSFADE_DURATION
1544 if smart_fades_mode == SmartFadesMode.SMART_CROSSFADE
1545 else standard_crossfade_duration
1546 )
1547 crossfade_buffer_duration = min(
1548 crossfade_buffer_duration,
1549 int(streamdetails.duration / 2)
1550 if streamdetails.duration
1551 else crossfade_buffer_duration,
1552 )
1553 # Ensure crossfade buffer size is aligned to frame boundaries
1554 # Frame size = bytes_per_sample * channels
1555 bytes_per_sample = pcm_format.bit_depth // 8
1556 frame_size = bytes_per_sample * pcm_format.channels
1557 crossfade_buffer_size = int(pcm_format.pcm_sample_size * crossfade_buffer_duration)
1558 # Round down to nearest frame boundary
1559 crossfade_buffer_size = (crossfade_buffer_size // frame_size) * frame_size
1560 fade_out_data: bytes | None = None
1561
1562 if crossfade_data:
1563 # Calculate discard amount in seconds (format-independent)
1564 # Use fade_in_pcm_format because fade_in_size is in the next track's original format
1565 fade_in_duration_seconds = (
1566 crossfade_data.fade_in_size / crossfade_data.fade_in_pcm_format.pcm_sample_size
1567 )
1568 discard_seconds = int(fade_in_duration_seconds) - 1
1569 # Calculate discard amounts in CURRENT track's format
1570 discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
1571 # Convert fade_in_size to current track's format for correct leftover calculation
1572 fade_in_size_in_current_format = int(
1573 fade_in_duration_seconds * pcm_format.pcm_sample_size
1574 )
1575 discard_leftover = fade_in_size_in_current_format - discard_bytes
1576 else:
1577 discard_seconds = streamdetails.seek_position
1578 discard_leftover = 0
1579 total_chunks_received = 0
1580 req_buffer_size = crossfade_buffer_size
1581 async for chunk in self.get_queue_item_stream(
1582 queue_item, pcm_format, seek_position=discard_seconds
1583 ):
1584 total_chunks_received += 1
1585 if discard_leftover:
1586 # discard leftover bytes from crossfade data
1587 chunk = chunk[discard_leftover:] # noqa: PLW2901
1588 discard_leftover = 0
1589
1590 if total_chunks_received < 10:
1591 # we want a stream to start as quickly as possible
1592 # so for the first 10 chunks we keep a very short buffer
1593 req_buffer_size = pcm_format.pcm_sample_size
1594 else:
1595 req_buffer_size = crossfade_buffer_size
1596
1597 # ALWAYS APPEND CHUNK TO BUFFER
1598 buffer += chunk
1599 del chunk
1600 if len(buffer) < req_buffer_size:
1601 # buffer is not full enough, move on
1602 continue
1603
1604 #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
1605 if crossfade_data:
1606 # send the (second half of the) crossfade data
1607 if crossfade_data.pcm_format != pcm_format:
1608 # edge case: pcm format mismatch, we need to resample
1609 self.logger.debug(
1610 "Resampling crossfade data from %s to %s for queue %s",
1611 crossfade_data.pcm_format.sample_rate,
1612 pcm_format.sample_rate,
1613 queue.display_name,
1614 )
1615 resampled_data = await resample_pcm_audio(
1616 crossfade_data.data,
1617 crossfade_data.pcm_format,
1618 pcm_format,
1619 )
1620 if resampled_data:
1621 for _chunk in divide_chunks(resampled_data, pcm_format.pcm_sample_size):
1622 yield _chunk
1623 bytes_written += len(resampled_data)
1624 else:
1625 # Resampling failed, error already logged in resample_pcm_audio
1626 # Skip crossfade data entirely - stream continues without it
1627 self.logger.warning(
1628 "Skipping crossfade data for queue %s due to resampling failure",
1629 queue.display_name,
1630 )
1631 else:
1632 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1633 yield _chunk
1634 bytes_written += len(crossfade_data.data)
1635 # clear vars
1636 crossfade_data = None
1637
1638 #### OTHER: enough data in buffer, feed to output
1639 while len(buffer) > req_buffer_size:
1640 yield buffer[: pcm_format.pcm_sample_size]
1641 bytes_written += pcm_format.pcm_sample_size
1642 buffer = buffer[pcm_format.pcm_sample_size :]
1643
1644 #### HANDLE END OF TRACK
1645
1646 if crossfade_data:
1647 # edge case: we did not get enough data to send the crossfade data
1648 # send the (second half of the) crossfade data
1649 if crossfade_data.pcm_format != pcm_format:
1650 # (yet another) edge case: pcm format mismatch, we need to resample
1651 self.logger.debug(
1652 "Resampling remaining crossfade data from %s to %s for queue %s",
1653 crossfade_data.pcm_format.sample_rate,
1654 pcm_format.sample_rate,
1655 queue.display_name,
1656 )
1657 resampled_crossfade_data = await resample_pcm_audio(
1658 crossfade_data.data,
1659 crossfade_data.pcm_format,
1660 pcm_format,
1661 )
1662 if resampled_crossfade_data:
1663 crossfade_data.data = resampled_crossfade_data
1664 else:
1665 # Resampling failed, error already logged in resample_pcm_audio
1666 # Skip the crossfade data entirely
1667 self.logger.warning(
1668 "Skipping remaining crossfade data for queue %s due to resampling failure",
1669 queue.display_name,
1670 )
1671 crossfade_data = None
1672 if crossfade_data:
1673 for _chunk in divide_chunks(crossfade_data.data, pcm_format.pcm_sample_size):
1674 yield _chunk
1675 bytes_written += len(crossfade_data.data)
1676 crossfade_data = None
1677
1678 # get next track for crossfade
1679 next_queue_item: QueueItem | None
1680 try:
1681 self.logger.debug(
1682 "Preloading NEXT track for crossfade for queue %s",
1683 queue.display_name,
1684 )
1685 next_queue_item = await self.mass.player_queues.load_next_queue_item(
1686 queue.queue_id, queue_item.queue_item_id
1687 )
1688 # set index_in_buffer to prevent our next track is overwritten while preloading
1689 if next_queue_item.streamdetails is None:
1690 raise InvalidDataError(
1691 f"No streamdetails for next queue item {next_queue_item.queue_item_id}"
1692 )
1693 queue.index_in_buffer = self.mass.player_queues.index_by_id(
1694 queue.queue_id, next_queue_item.queue_item_id
1695 )
1696 queue_player = self.mass.players.get_player(queue.queue_id)
1697 assert queue_player is not None
1698 next_queue_item_pcm_format = await self._select_pcm_format(
1699 player=queue_player,
1700 streamdetails=next_queue_item.streamdetails,
1701 smartfades_enabled=True,
1702 )
1703 except QueueEmpty:
1704 # end of queue reached, no next item
1705 next_queue_item = None
1706
1707 crossfade_allowed = bool(next_queue_item) and self._crossfade_allowed(
1708 queue_item,
1709 smart_fades_mode=smart_fades_mode,
1710 flow_mode=False,
1711 next_queue_item=next_queue_item,
1712 sample_rate=pcm_format.sample_rate,
1713 next_sample_rate=next_queue_item_pcm_format.sample_rate,
1714 )
1715 if not crossfade_allowed:
1716 if not next_queue_item:
1717 self.logger.debug(
1718 "Enqueue mode: no next queue item loaded (QueueEmpty) for queue %s",
1719 queue.display_name,
1720 )
1721 else:
1722 self.logger.debug(
1723 "Enqueue mode: crossfade NOT allowed for track %s -> %s on queue %s",
1724 queue_item.name,
1725 next_queue_item.name,
1726 queue.display_name,
1727 )
1728 # no crossfade enabled/allowed, just yield the buffer last part
1729 bytes_written += len(buffer)
1730 for _chunk in divide_chunks(buffer, pcm_format.pcm_sample_size):
1731 yield _chunk
1732 else:
1733 # if crossfade is enabled, save fadeout part in buffer to pickup for next track
1734 assert next_queue_item is not None
1735 fade_out_data = buffer
1736 buffer = b""
1737 try:
1738 async for chunk in self.get_queue_item_stream(
1739 next_queue_item, next_queue_item_pcm_format
1740 ):
1741 # append to buffer until we reach crossfade size
1742 # we only need the first X seconds of the NEXT track so we can
1743 # perform the crossfade.
1744 # the crossfaded audio of the previous and next track will be
1745 # sent in two equal parts: first half now, second half
1746 # when the next track starts. We use CrossfadeData to store
1747 # the second half to be picked up by the next track's stream generator.
1748 # Note that we more or less expect the user to have enabled the in-memory
1749 # buffer so we can keep the next track's audio data in memory.
1750 buffer += chunk
1751 del chunk
1752 if len(buffer) >= crossfade_buffer_size:
1753 break
1754 #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
1755 # Store original buffer size before any resampling for fade_in_size calculation
1756 # This size is in the next track's original format which is what we need
1757 original_buffer_size = len(buffer)
1758 if next_queue_item_pcm_format != pcm_format:
1759 # edge case: pcm format mismatch, we need to resample the next track's
1760 # beginning part before crossfading
1761 self.logger.debug(
1762 "Resampling next track's crossfade from %s to %s for queue %s",
1763 next_queue_item_pcm_format.sample_rate,
1764 pcm_format.sample_rate,
1765 queue.display_name,
1766 )
1767 buffer = await resample_pcm_audio(
1768 buffer,
1769 next_queue_item_pcm_format,
1770 pcm_format,
1771 )
1772 # perform actual (smart fades) crossfade using mixer
1773 crossfade_bytes = await self._smart_fades_mixer.mix(
1774 fade_in_part=buffer,
1775 fade_out_part=fade_out_data,
1776 fade_in_streamdetails=cast("StreamDetails", next_queue_item.streamdetails),
1777 fade_out_streamdetails=streamdetails,
1778 pcm_format=pcm_format,
1779 standard_crossfade_duration=standard_crossfade_duration,
1780 mode=smart_fades_mode,
1781 )
1782 # send half of the crossfade_part (= approx the fadeout part)
1783 split_point = (len(crossfade_bytes) + 1) // 2
1784 crossfade_first = crossfade_bytes[:split_point]
1785 crossfade_second = crossfade_bytes[split_point:]
1786 del crossfade_bytes
1787 bytes_written += len(crossfade_first)
1788 for _chunk in divide_chunks(crossfade_first, pcm_format.pcm_sample_size):
1789 yield _chunk
1790 # store the other half for the next track
1791 # IMPORTANT: crossfade_second data is in CURRENT track's format (pcm_format)
1792 # because it was created from the resampled buffer used for mixing.
1793 # BUT fade_in_size represents bytes in NEXT track's original format
1794 # (next_queue_item_pcm_format) because that's how much of the next track
1795 # was consumed during the crossfade. We need both formats to correctly
1796 # handle the crossfade data when the next track starts.
1797 self._crossfade_data[queue_item.queue_id] = CrossfadeData(
1798 data=crossfade_second,
1799 fade_in_size=original_buffer_size,
1800 pcm_format=pcm_format, # Format of the data (current track)
1801 fade_in_pcm_format=next_queue_item_pcm_format, # Format for fade_in_size
1802 queue_item_id=next_queue_item.queue_item_id,
1803 )
1804 self.logger.debug(
1805 "Crossfade data STORED for queue %s (outgoing track: %s, incoming track: %s)",
1806 queue.display_name,
1807 queue_item.name,
1808 next_queue_item.name if next_queue_item else "N/A",
1809 )
1810 except AudioError:
1811 # no crossfade possible, just yield the fade_out_data
1812 next_queue_item = None
1813 yield fade_out_data
1814 bytes_written += len(fade_out_data)
1815 del fade_out_data
1816 # make sure the buffer gets cleaned up
1817 del buffer
1818 # update duration details based on the actual pcm data we sent
1819 # this also accounts for crossfade and silence stripping
1820 seconds_streamed = bytes_written / pcm_format.pcm_sample_size
1821 streamdetails.seconds_streamed = seconds_streamed
1822 streamdetails.duration = int(streamdetails.seek_position + seconds_streamed)
1823 self.logger.debug(
1824 "Finished Streaming queue track: %s (%s) on queue %s "
1825 "- crossfade data prepared for next track: %s",
1826 streamdetails.uri,
1827 queue_item.name,
1828 queue.display_name,
1829 next_queue_item.name if next_queue_item else "N/A",
1830 )
1831
1832 def _log_request(self, request: web.Request) -> None:
1833 """Log request."""
1834 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
1835 self.logger.log(
1836 VERBOSE_LOG_LEVEL,
1837 "Got %s request to %s from %s\nheaders: %s\n",
1838 request.method,
1839 request.path,
1840 request.remote,
1841 request.headers,
1842 )
1843 else:
1844 self.logger.debug(
1845 "Got %s request to %s from %s",
1846 request.method,
1847 request.path,
1848 request.remote,
1849 )
1850
1851 async def get_output_format(
1852 self,
1853 output_format_str: str,
1854 player: Player,
1855 content_sample_rate: int,
1856 content_bit_depth: int,
1857 ) -> AudioFormat:
1858 """Parse (player specific) output format details for given format string."""
1859 content_type: ContentType = ContentType.try_parse(output_format_str)
1860 supported_rates_conf = cast(
1861 "list[tuple[str, str]]",
1862 await self.mass.config.get_player_config_value(
1863 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1864 ),
1865 )
1866 output_channels_str = self.mass.config.get_raw_player_config_value(
1867 player.player_id, CONF_OUTPUT_CHANNELS, "stereo"
1868 )
1869 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1870 supported_bit_depths = tuple(int(x[1]) for x in supported_rates_conf)
1871
1872 player_max_bit_depth = max(supported_bit_depths)
1873 output_bit_depth = min(content_bit_depth, player_max_bit_depth)
1874 if content_sample_rate in supported_sample_rates:
1875 output_sample_rate = content_sample_rate
1876 else:
1877 output_sample_rate = max(supported_sample_rates)
1878
1879 if not content_type.is_lossless():
1880 # no point in having a higher bit depth for lossy formats
1881 output_bit_depth = 16
1882 output_sample_rate = min(48000, output_sample_rate)
1883 if output_format_str == "pcm":
1884 content_type = ContentType.from_bit_depth(output_bit_depth)
1885 return AudioFormat(
1886 content_type=content_type,
1887 sample_rate=output_sample_rate,
1888 bit_depth=output_bit_depth,
1889 channels=1 if output_channels_str != "stereo" else 2,
1890 )
1891
1892 async def _select_flow_format(
1893 self,
1894 player: Player,
1895 ) -> AudioFormat:
1896 """Parse (player specific) flow stream PCM format."""
1897 supported_rates_conf = cast(
1898 "list[tuple[str, str]]",
1899 await self.mass.config.get_player_config_value(
1900 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1901 ),
1902 )
1903 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1904 output_sample_rate = INTERNAL_PCM_FORMAT.sample_rate
1905 for sample_rate in (192000, 96000, 48000, 44100):
1906 if sample_rate in supported_sample_rates:
1907 output_sample_rate = sample_rate
1908 break
1909 return AudioFormat(
1910 content_type=INTERNAL_PCM_FORMAT.content_type,
1911 sample_rate=output_sample_rate,
1912 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1913 channels=2,
1914 )
1915
1916 async def _select_pcm_format(
1917 self,
1918 player: Player,
1919 streamdetails: StreamDetails,
1920 smartfades_enabled: bool,
1921 ) -> AudioFormat:
1922 """Parse (player specific) stream internal PCM format."""
1923 supported_rates_conf = cast(
1924 "list[tuple[str, str]]",
1925 await self.mass.config.get_player_config_value(
1926 player.player_id, CONF_SAMPLE_RATES, unpack_splitted_values=True
1927 ),
1928 )
1929 supported_sample_rates = tuple(int(x[0]) for x in supported_rates_conf)
1930 # use highest supported rate within content rate
1931 output_sample_rate = max(
1932 (r for r in supported_sample_rates if r <= streamdetails.audio_format.sample_rate),
1933 default=48000, # sane/safe default
1934 )
1935 # work out pcm format based on streamdetails
1936 pcm_format = AudioFormat(
1937 sample_rate=output_sample_rate,
1938 # always use f32 internally for extra headroom for filters etc
1939 content_type=INTERNAL_PCM_FORMAT.content_type,
1940 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
1941 channels=streamdetails.audio_format.channels,
1942 )
1943 if smartfades_enabled:
1944 pcm_format.channels = 2 # force stereo for crossfading
1945
1946 return pcm_format
1947
1948 def _crossfade_allowed(
1949 self,
1950 queue_item: QueueItem,
1951 smart_fades_mode: SmartFadesMode,
1952 flow_mode: bool = False,
1953 next_queue_item: QueueItem | None = None,
1954 sample_rate: int | None = None,
1955 next_sample_rate: int | None = None,
1956 ) -> bool:
1957 """Get the crossfade config for a queue item."""
1958 if smart_fades_mode == SmartFadesMode.DISABLED:
1959 return False
1960 if not (self.mass.players.get_player(queue_item.queue_id)):
1961 return False # just a guard
1962 if queue_item.media_type != MediaType.TRACK:
1963 self.logger.debug("Skipping crossfade: current item is not a track")
1964 return False
1965 # check if the next item is part of the same album
1966 next_item = next_queue_item or self.mass.player_queues.get_next_item(
1967 queue_item.queue_id, queue_item.queue_item_id
1968 )
1969 if not next_item:
1970 # there is no next item!
1971 self.logger.debug(
1972 "Crossfade not allowed: no next item found for %s (flow_mode=%s, queue_id=%s)",
1973 queue_item.name,
1974 flow_mode,
1975 queue_item.queue_id,
1976 )
1977 return False
1978 # check if next item is a track
1979 if next_item.media_type != MediaType.TRACK:
1980 self.logger.debug("Skipping crossfade: next item is not a track")
1981 return False
1982 if (
1983 isinstance(queue_item.media_item, Track)
1984 and isinstance(next_item.media_item, Track)
1985 and queue_item.media_item.album
1986 and next_item.media_item.album
1987 and queue_item.media_item.album == next_item.media_item.album
1988 and not self.mass.config.get_raw_core_config_value(
1989 self.domain, CONF_ALLOW_CROSSFADE_SAME_ALBUM, False
1990 )
1991 ):
1992 # in general, crossfade is not desired for tracks of the same (gapless) album
1993 # because we have no accurate way to determine if the album is gapless or not,
1994 # for now we just never crossfade between tracks of the same album
1995 self.logger.debug("Skipping crossfade: next item is part of the same album")
1996 return False
1997
1998 # check if we're allowed to crossfade on different sample rates
1999 if (
2000 not flow_mode
2001 and sample_rate
2002 and next_sample_rate
2003 and sample_rate != next_sample_rate
2004 and not self.mass.config.get_raw_player_config_value(
2005 queue_item.queue_id,
2006 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.key,
2007 CONF_ENTRY_SUPPORT_GAPLESS_DIFFERENT_SAMPLE_RATES.default_value,
2008 )
2009 ):
2010 self.logger.debug(
2011 "Skipping crossfade: player does not support gapless playback "
2012 "with different sample rates (%s vs %s)",
2013 sample_rate,
2014 next_sample_rate,
2015 )
2016 return False
2017
2018 return True
2019
2020 async def _periodic_garbage_collection(self) -> None:
2021 """Periodic garbage collection to free up memory from audio buffers and streams."""
2022 self.logger.log(
2023 VERBOSE_LOG_LEVEL,
2024 "Running periodic garbage collection...",
2025 )
2026 # Run garbage collection in executor to avoid blocking the event loop
2027 # Since this runs periodically (not in response to subprocess cleanup),
2028 # it's safe to run in a thread without causing thread-safety issues
2029 loop = asyncio.get_running_loop()
2030 collected = await loop.run_in_executor(None, gc.collect)
2031 self.logger.log(
2032 VERBOSE_LOG_LEVEL,
2033 "Garbage collection completed, collected %d objects",
2034 collected,
2035 )
2036 # Schedule next run in 15 minutes
2037 self.mass.call_later(900, self._periodic_garbage_collection)
2038
2039 def _setup_smart_fades_logger(self, config: CoreConfig) -> None:
2040 """Set up smart fades logger level."""
2041 log_level = str(config.get_value(CONF_SMART_FADES_LOG_LEVEL))
2042 if log_level == "GLOBAL":
2043 self.smart_fades_analyzer.logger.setLevel(self.logger.level)
2044 self.smart_fades_mixer.logger.setLevel(self.logger.level)
2045 else:
2046 self.smart_fades_analyzer.logger.setLevel(log_level)
2047 self.smart_fades_mixer.logger.setLevel(log_level)
2048