/
/
/
1"""Various helpers for audio streaming and manipulation."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import os
8import re
9import struct
10import time
11from collections.abc import AsyncGenerator
12from functools import partial
13from io import BytesIO
14from typing import TYPE_CHECKING, Final, cast
15
16import aiofiles
17import shortuuid
18from aiohttp import ClientTimeout
19from music_assistant_models.dsp import DSPConfig, DSPDetails, DSPState
20from music_assistant_models.enums import (
21 ContentType,
22 MediaType,
23 PlayerFeature,
24 PlayerType,
25 StreamType,
26 VolumeNormalizationMode,
27)
28from music_assistant_models.errors import (
29 AudioError,
30 InvalidDataError,
31 MediaNotFoundError,
32 MusicAssistantError,
33 ProviderUnavailableError,
34)
35from music_assistant_models.media_items import AudioFormat
36from music_assistant_models.streamdetails import MultiPartPath
37
38from music_assistant.constants import (
39 CONF_ENTRY_OUTPUT_LIMITER,
40 CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
41 CONF_OUTPUT_CHANNELS,
42 CONF_VOLUME_NORMALIZATION,
43 CONF_VOLUME_NORMALIZATION_RADIO,
44 CONF_VOLUME_NORMALIZATION_TARGET,
45 CONF_VOLUME_NORMALIZATION_TRACKS,
46 MASS_LOGGER_NAME,
47 VERBOSE_LOG_LEVEL,
48)
49from music_assistant.controllers.players.sync_groups import SyncGroupPlayer
50from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
51from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
52from music_assistant.helpers.util import clean_stream_title, remove_file
53
54from .audio_buffer import AudioBuffer
55from .dsp import filter_to_ffmpeg_params
56from .ffmpeg import FFMpeg, get_ffmpeg_args, get_ffmpeg_stream
57from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
58from .process import AsyncProcess, communicate
59from .util import detect_charset
60
61if TYPE_CHECKING:
62 from music_assistant_models.config_entries import CoreConfig, PlayerConfig
63 from music_assistant_models.queue_item import QueueItem
64 from music_assistant_models.streamdetails import StreamDetails
65
66 from music_assistant.mass import MusicAssistant
67 from music_assistant.models.music_provider import MusicProvider
68 from music_assistant.models.player import Player
69
70LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio")
71
72# ruff: noqa: PLR0915
73
74HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
75HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
76
77SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
78
79CACHE_CATEGORY_RESOLVED_RADIO_URL: Final[int] = 100
80CACHE_PROVIDER: Final[str] = "audio"
81
82
83def align_audio_to_frame_boundary(audio_data: bytes, pcm_format: AudioFormat) -> bytes:
84 """Align audio data to frame boundaries by truncating incomplete frames.
85
86 :param audio_data: Raw PCM audio data to align.
87 :param pcm_format: AudioFormat of the audio data.
88 """
89 bytes_per_sample = pcm_format.bit_depth // 8
90 frame_size = bytes_per_sample * pcm_format.channels
91 valid_bytes = (len(audio_data) // frame_size) * frame_size
92 if valid_bytes != len(audio_data):
93 LOGGER.debug(
94 "Truncating %d bytes from audio buffer to align to frame boundary",
95 len(audio_data) - valid_bytes,
96 )
97 return audio_data[:valid_bytes]
98 return audio_data
99
100
101async def strip_silence(
102 mass: MusicAssistant, # noqa: ARG001
103 audio_data: bytes,
104 pcm_format: AudioFormat,
105 reverse: bool = False,
106) -> bytes:
107 """Strip silence from begin or end of pcm audio using ffmpeg."""
108 args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"]
109 args += [
110 "-acodec",
111 pcm_format.content_type.name.lower(),
112 "-f",
113 pcm_format.content_type.value,
114 "-ac",
115 str(pcm_format.channels),
116 "-ar",
117 str(pcm_format.sample_rate),
118 "-i",
119 "-",
120 ]
121 # filter args
122 if reverse:
123 args += [
124 "-af",
125 "areverse,atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02,areverse",
126 ]
127 else:
128 args += [
129 "-af",
130 "atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02",
131 ]
132 # output args
133 args += ["-f", pcm_format.content_type.value, "-"]
134 _returncode, stripped_data, _stderr = await communicate(args, audio_data)
135
136 # return stripped audio
137 bytes_stripped = len(audio_data) - len(stripped_data)
138 if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
139 seconds_stripped = round(bytes_stripped / pcm_format.pcm_sample_size, 2)
140 location = "end" if reverse else "begin"
141 LOGGER.log(
142 VERBOSE_LOG_LEVEL,
143 "stripped %s seconds of silence from %s of pcm audio. bytes stripped: %s",
144 seconds_stripped,
145 location,
146 bytes_stripped,
147 )
148 return stripped_data
149
150
151def get_player_dsp_details(
152 mass: MusicAssistant, player: Player, group_preventing_dsp: bool = False
153) -> DSPDetails:
154 """Return DSP details of single a player.
155
156 This will however not check if the queried player is part of a group.
157 The caller is responsible for passing the result of is_grouping_preventing_dsp of
158 the leader/PlayerGroup as the group_preventing_dsp argument in such cases.
159 """
160 dsp_config = mass.config.get_player_dsp_config(player.player_id)
161 dsp_state = DSPState.ENABLED if dsp_config.enabled else DSPState.DISABLED
162 if dsp_state == DSPState.ENABLED and (
163 group_preventing_dsp or is_grouping_preventing_dsp(player)
164 ):
165 dsp_state = DSPState.DISABLED_BY_UNSUPPORTED_GROUP
166 dsp_config = DSPConfig(enabled=False)
167 elif dsp_state == DSPState.DISABLED:
168 # DSP is disabled by the user, remove all filters
169 dsp_config = DSPConfig(enabled=False)
170
171 # remove disabled filters
172 dsp_config.filters = [x for x in dsp_config.filters if x.enabled]
173
174 output_limiter = is_output_limiter_enabled(mass, player)
175 return DSPDetails(
176 state=dsp_state,
177 input_gain=dsp_config.input_gain,
178 filters=dsp_config.filters,
179 output_gain=dsp_config.output_gain,
180 output_limiter=output_limiter,
181 output_format=player.extra_data.get("output_format", None),
182 )
183
184
185def get_stream_dsp_details(
186 mass: MusicAssistant,
187 queue_id: str,
188) -> dict[str, DSPDetails]:
189 """Return DSP details of all players playing this queue, keyed by player_id."""
190 player = mass.players.get(queue_id)
191 dsp: dict[str, DSPDetails] = {}
192 assert player is not None # for type checking
193 group_preventing_dsp = is_grouping_preventing_dsp(player)
194 output_format = None
195 is_external_group = False
196
197 if player.type == PlayerType.GROUP and isinstance(player, SyncGroupPlayer):
198 if group_preventing_dsp:
199 if sync_leader := player.sync_leader:
200 output_format = sync_leader.extra_data.get("output_format", None)
201 else:
202 # We only add real players (so skip the PlayerGroups as they only sync containing players)
203 details = get_player_dsp_details(mass, player)
204 dsp[player.player_id] = details
205 if group_preventing_dsp:
206 # The leader is responsible for sending the (combined) audio stream, so get
207 # the output format from the leader.
208 output_format = player.extra_data.get("output_format", None)
209 is_external_group = player.type in (PlayerType.GROUP, PlayerType.STEREO_PAIR)
210
211 # We don't enumerate all group members in case this group is externally created
212 # (e.g. a Chromecast group from the Google Home app)
213 if player and player.group_members and not is_external_group:
214 # grouped playback, get DSP details for each player in the group
215 for child_id in player.group_members:
216 # skip if we already have the details (so if it's the group leader)
217 if child_id in dsp:
218 continue
219 if child_player := mass.players.get(child_id):
220 dsp[child_id] = get_player_dsp_details(
221 mass, child_player, group_preventing_dsp=group_preventing_dsp
222 )
223 if group_preventing_dsp:
224 # Use the correct format from the group leader, since
225 # this player is part of a group that does not support
226 # multi device DSP processing.
227 dsp[child_id].output_format = output_format
228 return dsp
229
230
231async def get_stream_details(
232 mass: MusicAssistant,
233 queue_item: QueueItem,
234 seek_position: int = 0,
235 fade_in: bool = False,
236 prefer_album_loudness: bool = False,
237) -> StreamDetails:
238 """
239 Get streamdetails for the given QueueItem.
240
241 This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
242 Do not try to request streamdetails too much in advance as this is expiring data.
243 """
244 streamdetails: StreamDetails | None = None
245 time_start = time.time()
246 LOGGER.debug("Getting streamdetails for %s", queue_item.uri)
247 if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
248 LOGGER.warning("seeking is not possible on duration-less streams!")
249 seek_position = 0
250
251 if not queue_item.media_item and not queue_item.streamdetails:
252 # in case of a non-media item queue item, the streamdetails should already be provided
253 # this should not happen, but guard it just in case
254 raise MediaNotFoundError(
255 f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
256 )
257 buffer: AudioBuffer | None = None
258 if queue_item.streamdetails and (
259 (queue_item.streamdetails.created_at + queue_item.streamdetails.expiration) > time.time()
260 or ((buffer := queue_item.streamdetails.buffer) and buffer.is_valid(seek_position))
261 ):
262 # already got a fresh/unused (or unexpired) streamdetails
263 streamdetails = queue_item.streamdetails
264 else:
265 # need to (re)create streamdetails
266 # retrieve streamdetails from provider
267
268 media_item = queue_item.media_item
269 assert media_item is not None # for type checking
270 preferred_providers: list[str] = []
271 if (
272 (queue := mass.player_queues.get(queue_item.queue_id))
273 and queue.userid
274 and (playback_user := await mass.webserver.auth.get_user(queue.userid))
275 and playback_user.provider_filter
276 ):
277 # handle steering into user preferred providerinstance
278 preferred_providers = playback_user.provider_filter
279 else:
280 preferred_providers = [x.provider_instance for x in media_item.provider_mappings]
281 for allow_other_provider in (False, True):
282 # sort by quality and check item's availability
283 for prov_media in sorted(
284 media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
285 ):
286 if not prov_media.available:
287 LOGGER.debug(f"Skipping unavailable {prov_media}")
288 continue
289 if (
290 not allow_other_provider
291 and prov_media.provider_instance not in preferred_providers
292 ):
293 continue
294 # guard that provider is available
295 music_prov = mass.get_provider(prov_media.provider_instance)
296 if TYPE_CHECKING: # avoid circular import
297 assert isinstance(music_prov, MusicProvider)
298 if not music_prov:
299 LOGGER.debug(f"Skipping {prov_media} - provider not available")
300 continue # provider not available ?
301 # get streamdetails from provider
302 try:
303 BYPASS_THROTTLER.set(True)
304 streamdetails = await music_prov.get_stream_details(
305 prov_media.item_id, media_item.media_type
306 )
307 except MusicAssistantError as err:
308 LOGGER.warning(str(err))
309 else:
310 break
311 finally:
312 BYPASS_THROTTLER.set(False)
313
314 if not streamdetails:
315 msg = f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
316 raise MediaNotFoundError(msg)
317
318 # work out how to handle radio stream
319 if (
320 streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
321 and streamdetails.media_type == MediaType.RADIO
322 and isinstance(streamdetails.path, str)
323 ):
324 resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
325 streamdetails.path = resolved_url
326 streamdetails.stream_type = stream_type
327 # Set up metadata monitoring callback for HLS radio streams
328 if stream_type == StreamType.HLS:
329 streamdetails.stream_metadata_update_callback = partial(
330 _update_hls_radio_metadata, mass
331 )
332 streamdetails.stream_metadata_update_interval = 5
333 # handle volume normalization details
334 if result := await mass.music.get_loudness(
335 streamdetails.item_id,
336 streamdetails.provider,
337 media_type=queue_item.media_type,
338 ):
339 streamdetails.loudness = result[0]
340 streamdetails.loudness_album = result[1]
341
342 # set queue_id on the streamdetails so we know what is being streamed
343 streamdetails.queue_id = queue_item.queue_id
344 # handle skip/fade_in details
345 streamdetails.seek_position = seek_position
346 streamdetails.fade_in = fade_in
347 if not streamdetails.duration:
348 streamdetails.duration = queue_item.duration
349 streamdetails.prefer_album_loudness = prefer_album_loudness
350 player_settings = await mass.config.get_player_config(streamdetails.queue_id)
351 core_config = await mass.config.get_core_config("streams")
352 conf_volume_normalization_target = float(
353 str(player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET, -17))
354 )
355 # guard against invalid volume normalization values
356 # range and default_value are guaranteed to be set for this constant
357 volume_range = CONF_ENTRY_VOLUME_NORMALIZATION_TARGET.range
358 assert volume_range is not None
359 if (
360 conf_volume_normalization_target < volume_range[0]
361 or conf_volume_normalization_target >= volume_range[1]
362 ):
363 default_val = CONF_ENTRY_VOLUME_NORMALIZATION_TARGET.default_value
364 assert isinstance(default_val, (int, float))
365 conf_volume_normalization_target = float(default_val)
366 LOGGER.warning(
367 "Invalid volume normalization target configured for player %s, "
368 "resetting to default of %s dB",
369 streamdetails.queue_id,
370 CONF_ENTRY_VOLUME_NORMALIZATION_TARGET.default_value,
371 )
372 streamdetails.target_loudness = conf_volume_normalization_target
373 streamdetails.volume_normalization_mode = _get_normalization_mode(
374 core_config, player_settings, streamdetails
375 )
376
377 # attach the DSP details of all group members
378 streamdetails.dsp = get_stream_dsp_details(mass, streamdetails.queue_id)
379
380 LOGGER.debug(
381 "retrieved streamdetails for %s in %s milliseconds",
382 queue_item.uri,
383 int((time.time() - time_start) * 1000),
384 )
385 return streamdetails
386
387
388async def get_buffered_media_stream(
389 mass: MusicAssistant,
390 streamdetails: StreamDetails,
391 pcm_format: AudioFormat,
392 seek_position: int = 0,
393 filter_params: list[str] | None = None,
394) -> AsyncGenerator[bytes, None]:
395 """Get audio stream for given media details as raw PCM with buffering."""
396 LOGGER.log(
397 VERBOSE_LOG_LEVEL,
398 "buffered_media_stream: Starting for %s (seek: %s)",
399 streamdetails.uri,
400 seek_position,
401 )
402
403 # checksum based on filter_params
404 checksum = f"{filter_params}"
405
406 async def fill_buffer_task() -> None:
407 """Background task to fill the audio buffer."""
408 chunk_count = 0
409 status = "running"
410 try:
411 async for chunk in get_media_stream(
412 mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params
413 ):
414 chunk_count += 1
415 await audio_buffer.put(chunk)
416 # Yield to event loop to prevent blocking warnings
417 await asyncio.sleep(0)
418 # Only set EOF if we completed successfully
419 await audio_buffer.set_eof()
420 except asyncio.CancelledError:
421 status = "cancelled"
422 raise
423 except Exception:
424 status = "aborted with error"
425 raise
426 finally:
427 LOGGER.log(
428 VERBOSE_LOG_LEVEL,
429 "fill_buffer_task: %s (%s chunks) for %s",
430 status,
431 chunk_count,
432 streamdetails.uri,
433 )
434
435 # check for existing buffer and reuse if possible
436 existing_buffer: AudioBuffer | None = streamdetails.buffer
437 if existing_buffer is not None:
438 if not existing_buffer.is_valid(checksum, seek_position):
439 LOGGER.log(
440 VERBOSE_LOG_LEVEL,
441 "buffered_media_stream: Existing buffer invalid for %s (seek: %s, discarded: %s)",
442 streamdetails.uri,
443 seek_position,
444 existing_buffer._discarded_chunks,
445 )
446 await existing_buffer.clear()
447 streamdetails.buffer = None
448 existing_buffer = None
449 else:
450 LOGGER.debug(
451 "buffered_media_stream: Reusing existing buffer for %s - "
452 "available: %ss, seek: %s, discarded: %s",
453 streamdetails.uri,
454 existing_buffer.seconds_available,
455 seek_position,
456 existing_buffer._discarded_chunks,
457 )
458 audio_buffer = existing_buffer
459
460 if not existing_buffer and seek_position > 60:
461 # If seeking into the track and no valid buffer exists,
462 # just start a normal stream without buffering,
463 # otherwise we would need to fill the buffer up to the seek position first
464 # which is not efficient.
465 LOGGER.debug(
466 "buffered_media_stream: No existing buffer and seek >60s for %s, "
467 "starting normal (unbuffered) stream",
468 streamdetails.uri,
469 )
470 async for chunk in get_media_stream(
471 mass,
472 streamdetails,
473 pcm_format,
474 seek_position=seek_position,
475 filter_params=filter_params,
476 ):
477 yield chunk
478 return
479
480 if not existing_buffer:
481 # create new audio buffer and start fill task
482 LOGGER.debug(
483 "buffered_media_stream: Creating new buffer for %s",
484 streamdetails.uri,
485 )
486 audio_buffer = AudioBuffer(pcm_format, checksum)
487 streamdetails.buffer = audio_buffer
488 task = mass.loop.create_task(fill_buffer_task())
489 audio_buffer.attach_producer_task(task)
490
491 # special case: pcm format mismatch, resample on the fly
492 # this may happen in some special situations such as crossfading
493 # and its a bit of a waste to throw away the existing buffer
494 if audio_buffer.pcm_format != pcm_format:
495 LOGGER.info(
496 "buffered_media_stream: pcm format mismatch, resampling on the fly for %s - "
497 "buffer format: %s - requested format: %s",
498 streamdetails.uri,
499 audio_buffer.pcm_format,
500 pcm_format,
501 )
502 async for chunk in get_ffmpeg_stream(
503 audio_input=audio_buffer.iter(seek_position=seek_position),
504 input_format=audio_buffer.pcm_format,
505 output_format=pcm_format,
506 ):
507 yield chunk
508 return
509
510 # yield data from the buffer
511 chunk_count = 0
512 try:
513 async for chunk in audio_buffer.iter(seek_position=seek_position):
514 chunk_count += 1
515 yield chunk
516 finally:
517 LOGGER.log(
518 VERBOSE_LOG_LEVEL,
519 "buffered_media_stream: Completed, yielded %s chunks",
520 chunk_count,
521 )
522
523
524async def get_media_stream(
525 mass: MusicAssistant,
526 streamdetails: StreamDetails,
527 pcm_format: AudioFormat,
528 seek_position: int = 0,
529 filter_params: list[str] | None = None,
530) -> AsyncGenerator[bytes, None]:
531 """Get audio stream for given media details as raw PCM."""
532 logger = LOGGER.getChild("media_stream")
533 logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
534 extra_input_args = streamdetails.extra_input_args or []
535
536 # work out audio source for these streamdetails
537 audio_source: str | AsyncGenerator[bytes, None]
538 stream_type = streamdetails.stream_type
539 if stream_type == StreamType.CUSTOM:
540 music_prov = mass.get_provider(streamdetails.provider)
541 if TYPE_CHECKING: # avoid circular import
542 assert isinstance(music_prov, MusicProvider)
543 audio_source = music_prov.get_audio_stream(
544 streamdetails,
545 seek_position=seek_position if streamdetails.can_seek else 0,
546 )
547 seek_position = 0 if streamdetails.can_seek else seek_position
548 elif stream_type == StreamType.ICY:
549 assert isinstance(streamdetails.path, str) # for type checking
550 audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
551 seek_position = 0 # seeking not possible on radio streams
552 elif stream_type == StreamType.HLS:
553 assert isinstance(streamdetails.path, str) # for type checking
554 substream = await get_hls_substream(mass, streamdetails.path)
555 audio_source = substream.path
556 if streamdetails.media_type == MediaType.RADIO:
557 # HLS streams (especially the BBC) struggle when they're played directly
558 # with ffmpeg, where they just stop after some minutes,
559 # so we tell ffmpeg to loop around in this case.
560 extra_input_args += ["-stream_loop", "-1", "-re"]
561 else:
562 # all other stream types (HTTP, FILE, etc)
563 if stream_type == StreamType.ENCRYPTED_HTTP:
564 assert streamdetails.decryption_key is not None # for type checking
565 extra_input_args += ["-decryption_key", streamdetails.decryption_key]
566 if isinstance(streamdetails.path, list):
567 # multi part stream
568 audio_source = get_multi_file_stream(mass, streamdetails, seek_position)
569 seek_position = 0 # handled by get_multi_file_stream
570 else:
571 # regular single file/url stream
572 assert isinstance(streamdetails.path, str) # for type checking
573 audio_source = streamdetails.path
574
575 # handle seek support
576 if seek_position and streamdetails.duration and streamdetails.allow_seek:
577 extra_input_args += ["-ss", str(int(seek_position))]
578
579 bytes_sent = 0
580 finished = False
581 cancelled = False
582 first_chunk_received = False
583 ffmpeg_proc = FFMpeg(
584 audio_input=audio_source,
585 input_format=streamdetails.audio_format,
586 output_format=pcm_format,
587 filter_params=filter_params,
588 extra_input_args=extra_input_args,
589 collect_log_history=True,
590 loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
591 )
592
593 try:
594 await ffmpeg_proc.start()
595 assert ffmpeg_proc.proc is not None # for type checking
596 logger.debug(
597 "Started media stream for %s - using streamtype: %s - pcm format: %s - ffmpeg PID: %s",
598 streamdetails.uri,
599 streamdetails.stream_type,
600 pcm_format.content_type.value,
601 ffmpeg_proc.proc.pid,
602 )
603 stream_start = mass.loop.time()
604
605 chunk_size = get_chunksize(pcm_format, 1)
606 async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
607 if not first_chunk_received:
608 # At this point ffmpeg has started and should now know the codec used
609 # for encoding the audio.
610 first_chunk_received = True
611 streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type
612 logger.debug(
613 "First chunk received after %.2f seconds (codec detected: %s)",
614 mass.loop.time() - stream_start,
615 ffmpeg_proc.input_format.codec_type,
616 )
617 yield chunk
618 bytes_sent += len(chunk)
619
620 # end of audio/track reached
621 logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
622 # wait until stderr also completed reading
623 await ffmpeg_proc.wait_with_timeout(5)
624 if ffmpeg_proc.returncode not in (0, None):
625 log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:])
626 raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}")
627 if bytes_sent == 0:
628 # edge case: no audio data was received at all
629 raise AudioError("No audio was received")
630 finished = True
631 except (Exception, GeneratorExit, asyncio.CancelledError) as err:
632 if isinstance(err, asyncio.CancelledError | GeneratorExit):
633 # we were cancelled, just raise
634 cancelled = True
635 raise
636 # dump the last 10 lines of the log in case of an unclean exit
637 logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
638 raise AudioError(f"Error while streaming: {err}") from err
639 finally:
640 # always ensure close is called which also handles all cleanup
641 await ffmpeg_proc.close()
642 # determine how many seconds we've received
643 # for pcm output we can calculate this easily
644 seconds_received = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
645 # store accurate duration
646 if finished and not seek_position and seconds_received:
647 streamdetails.duration = int(seconds_received)
648
649 logger.log(
650 VERBOSE_LOG_LEVEL,
651 "stream %s (with code %s) for %s",
652 "cancelled" if cancelled else "finished" if finished else "aborted",
653 ffmpeg_proc.returncode,
654 streamdetails.uri,
655 )
656
657 # parse loudnorm data if we have that collected (and enabled)
658 if (
659 (streamdetails.loudness is None or finished)
660 and streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC
661 and (finished or (seconds_received >= 300))
662 ):
663 # if dynamic volume normalization is enabled
664 # the loudnorm filter will output the measurement in the log,
665 # so we can use that directly instead of analyzing the audio
666 logger.log(VERBOSE_LOG_LEVEL, "Collecting loudness measurement...")
667 if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
668 logger.debug(
669 "Loudness measurement for %s: %s dB",
670 streamdetails.uri,
671 loudness_details,
672 )
673 mass.create_task(
674 mass.music.set_loudness(
675 streamdetails.item_id,
676 streamdetails.provider,
677 loudness_details,
678 media_type=streamdetails.media_type,
679 )
680 )
681
682
683def create_wave_header(
684 samplerate: int = 44100, channels: int = 2, bitspersample: int = 16, duration: int | None = None
685) -> bytes:
686 """Generate a wave header from given params."""
687 file = BytesIO()
688
689 # Generate format chunk
690 format_chunk_spec = b"<4sLHHLLHH"
691 format_chunk = struct.pack(
692 format_chunk_spec,
693 b"fmt ", # Chunk id
694 16, # Size of this chunk (excluding chunk id and this field)
695 1, # Audio format, 1 for PCM
696 channels, # Number of channels
697 int(samplerate), # Samplerate, 44100, 48000, etc.
698 int(samplerate * channels * (bitspersample / 8)), # Byterate
699 int(channels * (bitspersample / 8)), # Blockalign
700 bitspersample, # 16 bits for two byte samples, etc.
701 )
702 # Generate data chunk
703 # duration = 3600*6.7
704 data_chunk_spec = b"<4sL"
705 if duration is None:
706 # use max value possible
707 datasize = 4254768000 # = 6,7 hours at 44100/16
708 else:
709 # calculate from duration
710 numsamples = samplerate * duration
711 datasize = int(numsamples * channels * (bitspersample / 8))
712 data_chunk = struct.pack(
713 data_chunk_spec,
714 b"data", # Chunk id
715 int(datasize), # Chunk size (excluding chunk id and this field)
716 )
717 sum_items = [
718 # "WAVE" string following size field
719 4,
720 # "fmt " + chunk size field + chunk size
721 struct.calcsize(format_chunk_spec),
722 # Size of data chunk spec + data size
723 struct.calcsize(data_chunk_spec) + datasize,
724 ]
725 # Generate main header
726 all_chunks_size = int(sum(sum_items))
727 main_header_spec = b"<4sL4s"
728 main_header = struct.pack(main_header_spec, b"RIFF", all_chunks_size, b"WAVE")
729 # Write all the contents in
730 file.write(main_header)
731 file.write(format_chunk)
732 file.write(data_chunk)
733
734 # return file.getvalue(), all_chunks_size + 8
735 return file.getvalue()
736
737
738async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, StreamType]:
739 """
740 Resolve a streaming radio URL.
741
742 Unwraps any playlists if needed.
743 Determines if the stream supports ICY metadata.
744
745 Returns tuple;
746 - unfolded URL as string
747 - StreamType to determine ICY (radio) or HLS stream.
748 """
749 if cache := await mass.cache.get(
750 key=url, provider=CACHE_PROVIDER, category=CACHE_CATEGORY_RESOLVED_RADIO_URL
751 ):
752 if TYPE_CHECKING: # for type checking
753 cache = cast("tuple[str, str]", cache)
754 return (cache[0], StreamType(cache[1]))
755 stream_type = StreamType.HTTP
756 resolved_url = url
757 timeout = ClientTimeout(total=None, connect=10, sock_read=5)
758 try:
759 async with mass.http_session_no_ssl.get(
760 url, headers=HTTP_HEADERS_ICY, allow_redirects=True, timeout=timeout
761 ) as resp:
762 headers = resp.headers
763 resp.raise_for_status()
764 if not resp.headers:
765 raise InvalidDataError("no headers found")
766 if headers.get("icy-metaint") is not None:
767 stream_type = StreamType.ICY
768 if (
769 url.endswith((".m3u", ".m3u8", ".pls"))
770 or ".m3u?" in url
771 or ".m3u8?" in url
772 or ".pls?" in url
773 or "audio/x-mpegurl" in headers.get("content-type", "")
774 or "audio/x-scpls" in headers.get("content-type", "")
775 ):
776 # url is playlist, we need to unfold it
777 try:
778 substreams = await fetch_playlist(mass, url)
779 if not any(x for x in substreams if x.length):
780 for line in substreams:
781 if not line.is_url:
782 continue
783 # unfold first url of playlist
784 return await resolve_radio_stream(mass, line.path)
785 raise InvalidDataError("No content found in playlist")
786 except IsHLSPlaylist:
787 stream_type = StreamType.HLS
788
789 except Exception as err:
790 LOGGER.warning("Error while parsing radio URL %s: %s", url, str(err))
791 return (url, stream_type)
792
793 result = (resolved_url, stream_type)
794 cache_expiration = 3600 * 3
795 await mass.cache.set(
796 url,
797 result,
798 expiration=cache_expiration,
799 provider=CACHE_PROVIDER,
800 category=CACHE_CATEGORY_RESOLVED_RADIO_URL,
801 )
802 return result
803
804
805async def get_icy_radio_stream(
806 mass: MusicAssistant, url: str, streamdetails: StreamDetails
807) -> AsyncGenerator[bytes, None]:
808 """Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
809 timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
810 LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
811 async with mass.http_session_no_ssl.get(
812 url, allow_redirects=True, headers=HTTP_HEADERS_ICY, timeout=timeout
813 ) as resp:
814 headers = resp.headers
815 meta_int = int(headers["icy-metaint"])
816 while True:
817 try:
818 yield await resp.content.readexactly(meta_int)
819 meta_byte = await resp.content.readexactly(1)
820 if meta_byte == b"\x00":
821 continue
822 meta_length = ord(meta_byte) * 16
823 meta_data = await resp.content.readexactly(meta_length)
824 except asyncio.exceptions.IncompleteReadError:
825 break
826 if not meta_data:
827 continue
828 meta_data = meta_data.rstrip(b"\0")
829 stream_title_re = re.search(rb"StreamTitle='([^']*)';", meta_data)
830 if not stream_title_re:
831 continue
832 try:
833 # in 99% of the cases the stream title is utf-8 encoded
834 stream_title = stream_title_re.group(1).decode("utf-8")
835 except UnicodeDecodeError:
836 # fallback to iso-8859-1
837 stream_title = stream_title_re.group(1).decode("iso-8859-1", errors="replace")
838 cleaned_stream_title = clean_stream_title(stream_title)
839 if cleaned_stream_title != streamdetails.stream_title:
840 LOGGER.log(
841 VERBOSE_LOG_LEVEL,
842 "ICY Radio streamtitle original: %s",
843 stream_title,
844 )
845 LOGGER.log(
846 VERBOSE_LOG_LEVEL,
847 "ICY Radio streamtitle cleaned: %s",
848 cleaned_stream_title,
849 )
850 streamdetails.stream_title = cleaned_stream_title
851
852
853def parse_extinf_metadata(extinf_line: str) -> dict[str, str]:
854 """
855 Parse metadata from HLS EXTINF line.
856
857 Extracts structured metadata like title="...", artist="..." from EXTINF lines.
858 Common in iHeartRadio and other commercial radio HLS streams.
859
860 :param extinf_line: The EXTINF line containing metadata
861 """
862 metadata = {}
863
864 # Pattern to match key="value" pairs in the EXTINF line
865 # Handles nested quotes by matching everything until the closing quote
866 pattern = r'(\w+)="([^"]*)"'
867
868 matches = re.findall(pattern, extinf_line)
869 for key, value in matches:
870 metadata[key.lower()] = value
871
872 return metadata
873
874
875async def _update_hls_radio_metadata(
876 mass: MusicAssistant,
877 streamdetails: StreamDetails,
878 elapsed_time: int, # noqa: ARG001
879) -> None:
880 """
881 Update HLS radio stream metadata by fetching the playlist.
882
883 Fetches the HLS playlist and extracts metadata from EXTINF lines.
884
885 :param mass: MusicAssistant instance
886 :param streamdetails: StreamDetails object to update with metadata
887 :param elapsed_time: Current playback position in seconds (unused for live radio)
888 """
889 try:
890 # Get the actual media playlist URL from cache or resolve it
891 # We cache the media_playlist_url in streamdetails.data to avoid re-resolving
892 if streamdetails.data is None:
893 streamdetails.data = {}
894 media_playlist_url = streamdetails.data.get("hls_media_playlist_url")
895 if not media_playlist_url:
896 try:
897 assert isinstance(streamdetails.path, str) # for type checking
898 substream = await get_hls_substream(mass, streamdetails.path)
899 media_playlist_url = substream.path
900 streamdetails.data["hls_media_playlist_url"] = media_playlist_url
901 except Exception as err:
902 LOGGER.warning(
903 "Failed to resolve HLS substream for metadata monitoring: %s",
904 err,
905 )
906 return
907
908 # Fetch the media playlist
909 timeout = ClientTimeout(total=0, connect=10, sock_read=30)
910 async with mass.http_session_no_ssl.get(media_playlist_url, timeout=timeout) as resp:
911 resp.raise_for_status()
912 playlist_content = await resp.text()
913
914 # Parse the playlist and look for EXTINF metadata
915 # The most recent segment usually has the current metadata
916 lines = playlist_content.strip().split("\n")
917 for line in reversed(lines):
918 if line.startswith("#EXTINF:"):
919 # Extract metadata from EXTINF line
920 metadata = parse_extinf_metadata(line)
921
922 # Build stream title from title and artist
923 title = metadata.get("title", "")
924 artist = metadata.get("artist", "")
925
926 if title or artist:
927 # Format as "Artist - Title"
928 if artist and title:
929 stream_title = f"{artist} - {title}"
930 elif title:
931 stream_title = title
932 else:
933 stream_title = artist
934
935 # Clean the stream title
936 cleaned_title = clean_stream_title(stream_title)
937
938 # Only update if changed
939 if cleaned_title != streamdetails.stream_title and cleaned_title:
940 LOGGER.log(
941 VERBOSE_LOG_LEVEL,
942 "HLS Radio metadata updated: %s",
943 cleaned_title,
944 )
945 streamdetails.stream_title = cleaned_title
946
947 # Only check the most recent EXTINF
948 break
949
950 except Exception as err:
951 LOGGER.debug(
952 "Error fetching HLS metadata: %s",
953 err,
954 )
955
956
957async def get_hls_substream(
958 mass: MusicAssistant,
959 url: str,
960) -> PlaylistItem:
961 """Select the (highest quality) HLS substream for given HLS playlist/URL."""
962 timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
963 # fetch master playlist and select (best) child playlist
964 # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
965 async with mass.http_session_no_ssl.get(
966 url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
967 ) as resp:
968 resp.raise_for_status()
969 raw_data = await resp.read()
970 encoding = resp.charset or await detect_charset(raw_data)
971 master_m3u_data = raw_data.decode(encoding)
972 substreams = parse_m3u(master_m3u_data)
973 # There is a chance that we did not get a master playlist with subplaylists
974 # but just a single master/sub playlist with the actual audio stream(s)
975 # so we need to detect if the playlist child's contain audio streams or
976 # sub-playlists.
977 if any(
978 x
979 for x in substreams
980 if (x.length or x.path.endswith((".mp4", ".aac")))
981 and not x.path.endswith((".m3u", ".m3u8"))
982 ):
983 return PlaylistItem(path=url, key=substreams[0].key)
984 # sort substreams on best quality (highest bandwidth) when available
985 if any(x for x in substreams if x.stream_info):
986 substreams.sort(
987 key=lambda x: int(
988 x.stream_info.get("BANDWIDTH", "0") if x.stream_info is not None else 0
989 ),
990 reverse=True,
991 )
992 substream = substreams[0]
993 if not substream.path.startswith("http"):
994 # path is relative, stitch it together
995 base_path = url.rsplit("/", 1)[0]
996 substream.path = base_path + "/" + substream.path
997 return substream
998
999
1000async def get_http_stream(
1001 mass: MusicAssistant,
1002 url: str,
1003 streamdetails: StreamDetails,
1004 seek_position: int = 0,
1005 verify_ssl: bool = True,
1006) -> AsyncGenerator[bytes, None]:
1007 """Get audio stream from HTTP."""
1008 LOGGER.debug("Start HTTP stream for %s (seek_position %s)", streamdetails.uri, seek_position)
1009 if seek_position:
1010 assert streamdetails.duration, "Duration required for seek requests"
1011 http_session = mass.http_session if verify_ssl else mass.http_session_no_ssl
1012 # try to get filesize with a head request
1013 seek_supported = streamdetails.can_seek
1014 if seek_position or not streamdetails.size:
1015 async with http_session.head(url, allow_redirects=True, headers=HTTP_HEADERS) as resp:
1016 resp.raise_for_status()
1017 if size := resp.headers.get("Content-Length"):
1018 streamdetails.size = int(size)
1019 seek_supported = resp.headers.get("Accept-Ranges") == "bytes"
1020 # headers
1021 headers = {**HTTP_HEADERS}
1022 timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
1023 skip_bytes = 0
1024 if seek_position and streamdetails.size:
1025 assert streamdetails.duration is not None # for type checking
1026 skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position)
1027 headers["Range"] = f"bytes={skip_bytes}-{streamdetails.size}"
1028
1029 # seeking an unknown or container format is not supported due to the (moov) headers
1030 if seek_position and (
1031 not seek_supported
1032 or streamdetails.audio_format.content_type
1033 in (
1034 ContentType.UNKNOWN,
1035 ContentType.M4A,
1036 ContentType.M4B,
1037 )
1038 ):
1039 LOGGER.warning(
1040 "Seeking in %s (%s) not possible.",
1041 streamdetails.uri,
1042 streamdetails.audio_format.output_format_str,
1043 )
1044 seek_position = 0
1045 streamdetails.seek_position = 0
1046
1047 # start the streaming from http
1048 bytes_received = 0
1049 async with http_session.get(
1050 url, allow_redirects=True, headers=headers, timeout=timeout
1051 ) as resp:
1052 is_partial = resp.status == 206
1053 if seek_position and not is_partial:
1054 raise InvalidDataError("HTTP source does not support seeking!")
1055 resp.raise_for_status()
1056 async for chunk in resp.content.iter_any():
1057 bytes_received += len(chunk)
1058 yield chunk
1059
1060 # store size on streamdetails for later use
1061 if not streamdetails.size:
1062 streamdetails.size = bytes_received
1063 LOGGER.debug(
1064 "Finished HTTP stream for %s (transferred %s/%s bytes)",
1065 streamdetails.uri,
1066 bytes_received,
1067 streamdetails.size,
1068 )
1069
1070
1071async def get_file_stream(
1072 mass: MusicAssistant, # noqa: ARG001
1073 filename: str,
1074 streamdetails: StreamDetails,
1075 seek_position: int = 0,
1076) -> AsyncGenerator[bytes, None]:
1077 """Get audio stream from local accessible file."""
1078 if seek_position:
1079 assert streamdetails.duration, "Duration required for seek requests"
1080 if not streamdetails.size:
1081 stat = await asyncio.to_thread(os.stat, filename)
1082 streamdetails.size = stat.st_size
1083
1084 # seeking an unknown or container format is not supported due to the (moov) headers
1085 if seek_position and (
1086 streamdetails.audio_format.content_type
1087 in (
1088 ContentType.UNKNOWN,
1089 ContentType.M4A,
1090 ContentType.M4B,
1091 ContentType.MP4,
1092 )
1093 ):
1094 LOGGER.warning(
1095 "Seeking in %s (%s) not possible.",
1096 streamdetails.uri,
1097 streamdetails.audio_format.output_format_str,
1098 )
1099 seek_position = 0
1100 streamdetails.seek_position = 0
1101
1102 chunk_size = get_chunksize(streamdetails.audio_format)
1103 async with aiofiles.open(streamdetails.data, "rb") as _file:
1104 if seek_position:
1105 assert streamdetails.duration is not None # for type checking
1106 seek_pos = int((streamdetails.size / streamdetails.duration) * seek_position)
1107 await _file.seek(seek_pos)
1108 # yield chunks of data from file
1109 while True:
1110 data = await _file.read(chunk_size)
1111 if not data:
1112 break
1113 yield data
1114
1115
1116def _get_parts_from_position(
1117 parts: list[MultiPartPath], seek_position: int
1118) -> tuple[list[MultiPartPath], int]:
1119 """Get the remaining parts list from a timestamp.
1120
1121 Arguments:
1122 parts: The list of parts
1123 seek_position: The seeking position in seconds of the tracklist
1124
1125 Returns:
1126 In a tuple, A list of parts, starting with the one at the requested
1127 seek position and the position in seconds to seek to in the first
1128 track.
1129 """
1130 skipped_duration = 0.0
1131 for i, part in enumerate(parts):
1132 if not isinstance(part, MultiPartPath):
1133 raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1134 if part.duration is None:
1135 return parts, seek_position
1136 if skipped_duration + part.duration < seek_position:
1137 skipped_duration += part.duration
1138 continue
1139
1140 position = seek_position - skipped_duration
1141
1142 # Seeking in some parts is inaccurate, making the seek to a chapter land on the end of
1143 # the previous track. If we're within 2 second of the end, skip the current track
1144 if position + 2 >= part.duration:
1145 LOGGER.debug(
1146 f"Skipping to the next part due to seek position being at the end: {position}"
1147 )
1148 if i + 1 < len(parts):
1149 return parts[i + 1 :], 0
1150 return parts[i:], int(position) # last part, cannot skip
1151
1152 return parts[i:], int(position)
1153
1154 raise IndexError(f"Could not find any candidate part for position {seek_position}")
1155
1156
1157async def get_multi_file_stream(
1158 mass: MusicAssistant, # noqa: ARG001
1159 streamdetails: StreamDetails,
1160 seek_position: int = 0,
1161) -> AsyncGenerator[bytes, None]:
1162 """Return audio stream for a concatenation of multiple files.
1163
1164 Arguments:
1165 seek_position: The position to seek to in seconds
1166 """
1167 if not isinstance(streamdetails.path, list):
1168 raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1169 parts, seek_position = _get_parts_from_position(streamdetails.path, seek_position)
1170 files_list = [part.path for part in parts]
1171
1172 # concat input files
1173 temp_file = f"/tmp/{shortuuid.random(20)}.txt" # noqa: S108
1174 async with aiofiles.open(temp_file, "w") as f:
1175 for path in files_list:
1176 await f.write(f"file '{path}'\n")
1177
1178 try:
1179 async for chunk in get_ffmpeg_stream(
1180 audio_input=temp_file,
1181 input_format=streamdetails.audio_format,
1182 output_format=AudioFormat(
1183 content_type=ContentType.NUT,
1184 sample_rate=streamdetails.audio_format.sample_rate,
1185 bit_depth=streamdetails.audio_format.bit_depth,
1186 channels=streamdetails.audio_format.channels,
1187 ),
1188 extra_input_args=[
1189 "-safe",
1190 "0",
1191 "-f",
1192 "concat",
1193 "-i",
1194 temp_file,
1195 "-ss",
1196 str(seek_position),
1197 ],
1198 ):
1199 yield chunk
1200 finally:
1201 await remove_file(temp_file)
1202
1203
1204async def get_preview_stream(
1205 mass: MusicAssistant,
1206 provider_instance_id_or_domain: str,
1207 item_id: str,
1208 media_type: MediaType = MediaType.TRACK,
1209) -> AsyncGenerator[bytes, None]:
1210 """Create a 30 seconds preview audioclip for the given streamdetails."""
1211 if not (music_prov := mass.get_provider(provider_instance_id_or_domain)):
1212 raise ProviderUnavailableError
1213 if TYPE_CHECKING: # avoid circular import
1214 assert isinstance(music_prov, MusicProvider)
1215
1216 # Validate that item_id corresponds to a valid item in the provider for security
1217 if not await music_prov.get_item(media_type, item_id):
1218 msg = f"Item {item_id} not found in provider {provider_instance_id_or_domain}"
1219 raise MediaNotFoundError(msg)
1220
1221 streamdetails = await music_prov.get_stream_details(item_id, media_type)
1222 pcm_format = AudioFormat(
1223 content_type=ContentType.from_bit_depth(streamdetails.audio_format.bit_depth),
1224 sample_rate=streamdetails.audio_format.sample_rate,
1225 bit_depth=streamdetails.audio_format.bit_depth,
1226 channels=streamdetails.audio_format.channels,
1227 )
1228 async for chunk in get_ffmpeg_stream(
1229 audio_input=get_media_stream(
1230 mass=mass,
1231 streamdetails=streamdetails,
1232 pcm_format=pcm_format,
1233 ),
1234 input_format=pcm_format,
1235 output_format=AudioFormat(content_type=ContentType.AAC),
1236 extra_input_args=["-t", "30"], # cut after 30 seconds
1237 ):
1238 yield chunk
1239
1240
1241async def get_silence(
1242 duration: int,
1243 output_format: AudioFormat,
1244) -> AsyncGenerator[bytes, None]:
1245 """Create stream of silence, encoded to format of choice."""
1246 if output_format.content_type.is_pcm():
1247 # pcm = just zeros
1248 for _ in range(duration):
1249 yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1250 return
1251 if output_format.content_type == ContentType.WAV:
1252 # wav silence = wave header + zero's
1253 yield create_wave_header(
1254 samplerate=output_format.sample_rate,
1255 channels=2,
1256 bitspersample=output_format.bit_depth,
1257 duration=duration,
1258 )
1259 for _ in range(duration):
1260 yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1261 return
1262 # use ffmpeg for all other encodings
1263 args = [
1264 "ffmpeg",
1265 "-hide_banner",
1266 "-loglevel",
1267 "quiet",
1268 "-f",
1269 "lavfi",
1270 "-i",
1271 f"anullsrc=r={output_format.sample_rate}:cl={'stereo'}",
1272 "-t",
1273 str(duration),
1274 "-f",
1275 output_format.output_format_str,
1276 "-",
1277 ]
1278 async with AsyncProcess(args, stdout=True) as ffmpeg_proc:
1279 async for chunk in ffmpeg_proc.iter_chunked():
1280 yield chunk
1281
1282
1283async def resample_pcm_audio(
1284 input_audio: bytes,
1285 input_format: AudioFormat,
1286 output_format: AudioFormat,
1287) -> bytes:
1288 """
1289 Resample (a chunk of) PCM audio from input_format to output_format using ffmpeg.
1290
1291 :param input_audio: Raw PCM audio data to resample.
1292 :param input_format: AudioFormat of the input audio.
1293 :param output_format: Desired AudioFormat for the output audio.
1294
1295 :return: Resampled audio data, frame-aligned. Returns empty bytes if resampling fails.
1296 """
1297 if input_format == output_format:
1298 return input_audio
1299 LOGGER.log(VERBOSE_LOG_LEVEL, f"Resampling audio from {input_format} to {output_format}")
1300 try:
1301 ffmpeg_args = get_ffmpeg_args(
1302 input_format=input_format, output_format=output_format, filter_params=[]
1303 )
1304 _, stdout, stderr = await communicate(ffmpeg_args, input_audio)
1305 if not stdout:
1306 LOGGER.error(
1307 "Resampling failed: no output from ffmpeg. Input: %s, Output: %s, stderr: %s",
1308 input_format,
1309 output_format,
1310 stderr.decode() if stderr else "(no stderr)",
1311 )
1312 return b""
1313 # Ensure frame alignment after resampling
1314 return align_audio_to_frame_boundary(stdout, output_format)
1315 except Exception as err:
1316 LOGGER.exception(
1317 "Failed to resample audio from %s to %s: %s",
1318 input_format,
1319 output_format,
1320 err,
1321 )
1322 return b""
1323
1324
1325def get_chunksize(
1326 fmt: AudioFormat,
1327 seconds: float = 1,
1328) -> int:
1329 """Get a default chunk/file size for given contenttype in bytes."""
1330 pcm_size = int(fmt.sample_rate * (fmt.bit_depth / 8) * fmt.channels * seconds)
1331 if fmt.content_type.is_pcm() or fmt.content_type == ContentType.WAV:
1332 return pcm_size
1333 if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
1334 return pcm_size
1335 if fmt.bit_rate and fmt.bit_rate < 10000:
1336 return int(((fmt.bit_rate * 1000) / 8) * seconds)
1337 if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
1338 # assume 74.7% compression ratio (level 0)
1339 # source: https://z-issue.com/wp/flac-compression-level-comparison/
1340 return int(pcm_size * 0.747)
1341 if fmt.content_type in (ContentType.MP3, ContentType.OGG):
1342 return int((320000 / 8) * seconds)
1343 if fmt.content_type in (ContentType.AAC, ContentType.M4A):
1344 return int((256000 / 8) * seconds)
1345 return int((320000 / 8) * seconds)
1346
1347
1348def is_grouping_preventing_dsp(player: Player) -> bool:
1349 """Check if grouping is preventing DSP from being applied to this leader/PlayerGroup.
1350
1351 If this returns True, no DSP should be applied to the player.
1352 This function will not check if the Player is in a group, the caller should do that first.
1353 """
1354 # We require the caller to handle non-leader cases themselves since player.synced_to
1355 # can be unreliable in some edge cases
1356 multi_device_dsp_supported = PlayerFeature.MULTI_DEVICE_DSP in player.supported_features
1357 child_count = len(player.group_members) if player.group_members else 0
1358
1359 is_multiple_devices: bool
1360 if player.provider.domain == "player_group":
1361 # PlayerGroups have no leader, so having a child count of 1 means
1362 # the group actually contains only a single player.
1363 is_multiple_devices = child_count > 1
1364 elif player.type == PlayerType.GROUP:
1365 # This is an group player external to Music Assistant.
1366 is_multiple_devices = True
1367 else:
1368 is_multiple_devices = child_count > 0
1369 return is_multiple_devices and not multi_device_dsp_supported
1370
1371
1372def is_output_limiter_enabled(mass: MusicAssistant, player: Player) -> bool:
1373 """Check if the player has the output limiter enabled.
1374
1375 Unlike DSP, the limiter is still configurable when synchronized without MULTI_DEVICE_DSP.
1376 So in grouped scenarios without MULTI_DEVICE_DSP, the permanent sync group or the leader gets
1377 decides if the limiter should be turned on or not.
1378 """
1379 deciding_player_id = player.player_id
1380 if player.active_group:
1381 # Syncgroup, get from the group player
1382 deciding_player_id = player.active_group
1383 elif player.synced_to:
1384 # Not in sync group, but synced, get from the leader
1385 deciding_player_id = player.synced_to
1386 output_limiter_enabled = mass.config.get_raw_player_config_value(
1387 deciding_player_id,
1388 CONF_ENTRY_OUTPUT_LIMITER.key,
1389 CONF_ENTRY_OUTPUT_LIMITER.default_value,
1390 )
1391 return bool(output_limiter_enabled)
1392
1393
1394def get_player_filter_params(
1395 mass: MusicAssistant,
1396 player_id: str,
1397 input_format: AudioFormat,
1398 output_format: AudioFormat,
1399) -> list[str]:
1400 """Get player specific filter parameters for ffmpeg (if any)."""
1401 filter_params = []
1402
1403 dsp = mass.config.get_player_dsp_config(player_id)
1404 limiter_enabled = True
1405
1406 if player := mass.players.get(player_id):
1407 if is_grouping_preventing_dsp(player):
1408 # We can not correctly apply DSP to a grouped player without multi-device DSP support,
1409 # so we disable it.
1410 dsp.enabled = False
1411 elif player.provider.domain == "player_group" and (
1412 PlayerFeature.MULTI_DEVICE_DSP not in player.supported_features
1413 ):
1414 # This is a special case! We have a player group where:
1415 # - The group leader does not support MULTI_DEVICE_DSP
1416 # - But only contains a single player (since nothing is preventing DSP)
1417 # We can still apply the DSP of that single player.
1418 if player.group_members:
1419 child_player = mass.players.get(player.group_members[0])
1420 assert child_player is not None # for type checking
1421 dsp = mass.config.get_player_dsp_config(child_player.player_id)
1422 else:
1423 # This should normally never happen, but if it does, we disable DSP.
1424 dsp.enabled = False
1425
1426 # We here implicitly know what output format is used for the player
1427 # in the audio processing steps. We save this information to
1428 # later be able to show this to the user in the UI.
1429 player.extra_data["output_format"] = output_format
1430
1431 limiter_enabled = is_output_limiter_enabled(mass, player)
1432
1433 if dsp.enabled:
1434 # Apply input gain
1435 if dsp.input_gain != 0:
1436 filter_params.append(f"volume={dsp.input_gain}dB")
1437
1438 # Process each DSP filter sequentially
1439 for f in dsp.filters:
1440 if not f.enabled:
1441 continue
1442
1443 # Apply filter
1444 filter_params.extend(filter_to_ffmpeg_params(f, input_format))
1445
1446 # Apply output gain
1447 if dsp.output_gain != 0:
1448 filter_params.append(f"volume={dsp.output_gain}dB")
1449
1450 conf_channels = mass.config.get_raw_player_config_value(
1451 player_id, CONF_OUTPUT_CHANNELS, "stereo"
1452 )
1453
1454 # handle output mixing only left or right
1455 if conf_channels == "left":
1456 filter_params.append("pan=mono|c0=FL")
1457 elif conf_channels == "right":
1458 filter_params.append("pan=mono|c0=FR")
1459
1460 # Add safety limiter at the end
1461 if limiter_enabled:
1462 filter_params.append("alimiter=limit=-2dB:level=false:asc=true")
1463
1464 LOGGER.debug("Generated ffmpeg params for player %s: %s", player_id, filter_params)
1465 return filter_params
1466
1467
1468def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
1469 """Parse Loudness measurement from ffmpeg stderr output."""
1470 stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
1471 if "[Parsed_loudnorm_0 @" not in stderr_data:
1472 return None
1473 for jsun_chunk in stderr_data.split(" { "):
1474 try:
1475 stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}"
1476 loudness_data = json_loads(stderr_data)
1477 return float(loudness_data["input_i"])
1478 except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError):
1479 continue
1480 return None
1481
1482
1483async def analyze_loudness(
1484 mass: MusicAssistant,
1485 streamdetails: StreamDetails,
1486) -> None:
1487 """Analyze media item's audio, to calculate EBU R128 loudness."""
1488 if await mass.music.get_loudness(
1489 streamdetails.item_id,
1490 streamdetails.provider,
1491 media_type=streamdetails.media_type,
1492 ):
1493 # only when needed we do the analyze job
1494 return
1495
1496 logger = LOGGER.getChild("analyze_loudness")
1497 logger.debug("Start analyzing audio for %s", streamdetails.uri)
1498
1499 extra_input_args = [
1500 # limit to 10 minutes to reading too much in memory
1501 "-t",
1502 "600",
1503 ]
1504 # work out audio source for these streamdetails
1505 stream_type = streamdetails.stream_type
1506 audio_source: str | AsyncGenerator[bytes, None]
1507 if stream_type == StreamType.CUSTOM:
1508 music_prov = mass.get_provider(streamdetails.provider)
1509 if TYPE_CHECKING: # avoid circular import
1510 assert isinstance(music_prov, MusicProvider)
1511 audio_source = music_prov.get_audio_stream(streamdetails)
1512 elif stream_type == StreamType.ICY:
1513 assert isinstance(streamdetails.path, str) # for type checking
1514 audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
1515 elif stream_type == StreamType.HLS:
1516 assert isinstance(streamdetails.path, str) # for type checking
1517 substream = await get_hls_substream(mass, streamdetails.path)
1518 audio_source = substream.path
1519 else:
1520 # all other stream types (HTTP, FILE, etc)
1521 if stream_type == StreamType.ENCRYPTED_HTTP:
1522 assert streamdetails.decryption_key is not None # for type checking
1523 extra_input_args += ["-decryption_key", streamdetails.decryption_key]
1524 if isinstance(streamdetails.path, list):
1525 # multi part stream - just use a single file for the measurement
1526 audio_source = streamdetails.path[1].path
1527 else:
1528 # regular single file/url stream
1529 assert isinstance(streamdetails.path, str) # for type checking
1530 audio_source = streamdetails.path
1531
1532 # calculate BS.1770 R128 integrated loudness with ffmpeg
1533 async with FFMpeg(
1534 audio_input=audio_source,
1535 input_format=streamdetails.audio_format,
1536 output_format=streamdetails.audio_format,
1537 audio_output="NULL",
1538 filter_params=["ebur128=framelog=verbose"],
1539 extra_input_args=extra_input_args,
1540 collect_log_history=True,
1541 loglevel="info",
1542 ) as ffmpeg_proc:
1543 await ffmpeg_proc.wait()
1544 log_lines = ffmpeg_proc.log_history
1545 log_lines_str = "\n".join(log_lines)
1546 try:
1547 loudness_str = (
1548 log_lines_str.split("Integrated loudness")[1].split("I:")[1].split("LUFS")[0]
1549 )
1550 loudness = float(loudness_str.strip())
1551 except (IndexError, ValueError, AttributeError):
1552 LOGGER.warning(
1553 "Could not determine integrated loudness of %s - %s",
1554 streamdetails.uri,
1555 log_lines_str or "received empty value",
1556 )
1557 else:
1558 await mass.music.set_loudness(
1559 streamdetails.item_id,
1560 streamdetails.provider,
1561 loudness,
1562 media_type=streamdetails.media_type,
1563 )
1564 logger.debug(
1565 "Integrated loudness of %s is: %s",
1566 streamdetails.uri,
1567 loudness,
1568 )
1569
1570
1571def _get_normalization_mode(
1572 core_config: CoreConfig, player_config: PlayerConfig, streamdetails: StreamDetails
1573) -> VolumeNormalizationMode:
1574 if not player_config.get_value(CONF_VOLUME_NORMALIZATION):
1575 # disabled for this player
1576 return VolumeNormalizationMode.DISABLED
1577 if streamdetails.target_loudness is None:
1578 # no target loudness set, disable normalization
1579 return VolumeNormalizationMode.DISABLED
1580 # work out preference for track or radio
1581 preference = VolumeNormalizationMode(
1582 str(
1583 core_config.get_value(
1584 CONF_VOLUME_NORMALIZATION_RADIO
1585 if streamdetails.media_type == MediaType.RADIO
1586 else CONF_VOLUME_NORMALIZATION_TRACKS,
1587 )
1588 )
1589 )
1590
1591 # handle no measurement available but fallback to dynamic mode is allowed
1592 if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_DYNAMIC:
1593 return VolumeNormalizationMode.DYNAMIC
1594
1595 # handle no measurement available and no fallback allowed
1596 if streamdetails.loudness is None and preference == VolumeNormalizationMode.MEASUREMENT_ONLY:
1597 return VolumeNormalizationMode.DISABLED
1598
1599 # handle no measurement available and fallback to fixed gain is allowed
1600 if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_FIXED_GAIN:
1601 return VolumeNormalizationMode.FIXED_GAIN
1602
1603 # handle measurement available - chosen mode is measurement
1604 if streamdetails.loudness is not None and preference not in (
1605 VolumeNormalizationMode.DISABLED,
1606 VolumeNormalizationMode.FIXED_GAIN,
1607 VolumeNormalizationMode.DYNAMIC,
1608 ):
1609 return VolumeNormalizationMode.MEASUREMENT_ONLY
1610
1611 # simply return the preference
1612 return preference
1613