/
/
/
1"""Various helpers for audio streaming and manipulation."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import os
8import re
9import struct
10import time
11from collections.abc import AsyncGenerator
12from functools import partial
13from io import BytesIO
14from typing import TYPE_CHECKING, Final, cast
15
16import aiofiles
17import shortuuid
18from aiohttp import ClientTimeout
19from music_assistant_models.dsp import DSPConfig, DSPDetails, DSPState
20from music_assistant_models.enums import (
21 ContentType,
22 MediaType,
23 PlayerFeature,
24 PlayerType,
25 StreamType,
26 VolumeNormalizationMode,
27)
28from music_assistant_models.errors import (
29 AudioError,
30 InvalidDataError,
31 MediaNotFoundError,
32 MusicAssistantError,
33 ProviderUnavailableError,
34)
35from music_assistant_models.media_items import AudioFormat
36from music_assistant_models.streamdetails import MultiPartPath
37
38from music_assistant.constants import (
39 CONF_ENTRY_OUTPUT_LIMITER,
40 CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
41 CONF_OUTPUT_CHANNELS,
42 CONF_VOLUME_NORMALIZATION,
43 CONF_VOLUME_NORMALIZATION_RADIO,
44 CONF_VOLUME_NORMALIZATION_TARGET,
45 CONF_VOLUME_NORMALIZATION_TRACKS,
46 MASS_LOGGER_NAME,
47 VERBOSE_LOG_LEVEL,
48)
49from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
50from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
51from music_assistant.helpers.util import clean_stream_title, remove_file
52from music_assistant.providers.sync_group.constants import SGP_PREFIX
53
54from .audio_buffer import AudioBuffer
55from .dsp import filter_to_ffmpeg_params
56from .ffmpeg import FFMpeg, get_ffmpeg_args, get_ffmpeg_stream
57from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
58from .process import AsyncProcess, communicate
59from .util import detect_charset
60
61if TYPE_CHECKING:
62 from music_assistant_models.config_entries import CoreConfig, PlayerConfig
63 from music_assistant_models.queue_item import QueueItem
64 from music_assistant_models.streamdetails import StreamDetails
65
66 from music_assistant.mass import MusicAssistant
67 from music_assistant.models.music_provider import MusicProvider
68 from music_assistant.models.player import Player
69 from music_assistant.providers.sync_group import SyncGroupPlayer
70
71LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio")
72
73# ruff: noqa: PLR0915
74
75HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
76HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
77
78SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
79
80CACHE_CATEGORY_RESOLVED_RADIO_URL: Final[int] = 100
81CACHE_PROVIDER: Final[str] = "audio"
82
83
84def align_audio_to_frame_boundary(audio_data: bytes, pcm_format: AudioFormat) -> bytes:
85 """Align audio data to frame boundaries by truncating incomplete frames.
86
87 :param audio_data: Raw PCM audio data to align.
88 :param pcm_format: AudioFormat of the audio data.
89 """
90 bytes_per_sample = pcm_format.bit_depth // 8
91 frame_size = bytes_per_sample * pcm_format.channels
92 valid_bytes = (len(audio_data) // frame_size) * frame_size
93 if valid_bytes != len(audio_data):
94 LOGGER.debug(
95 "Truncating %d bytes from audio buffer to align to frame boundary",
96 len(audio_data) - valid_bytes,
97 )
98 return audio_data[:valid_bytes]
99 return audio_data
100
101
102async def strip_silence(
103 mass: MusicAssistant, # noqa: ARG001
104 audio_data: bytes,
105 pcm_format: AudioFormat,
106 reverse: bool = False,
107) -> bytes:
108 """Strip silence from begin or end of pcm audio using ffmpeg."""
109 args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"]
110 args += [
111 "-acodec",
112 pcm_format.content_type.name.lower(),
113 "-f",
114 pcm_format.content_type.value,
115 "-ac",
116 str(pcm_format.channels),
117 "-ar",
118 str(pcm_format.sample_rate),
119 "-i",
120 "-",
121 ]
122 # filter args
123 if reverse:
124 args += [
125 "-af",
126 "areverse,atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02,areverse",
127 ]
128 else:
129 args += [
130 "-af",
131 "atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02",
132 ]
133 # output args
134 args += ["-f", pcm_format.content_type.value, "-"]
135 _returncode, stripped_data, _stderr = await communicate(args, audio_data)
136
137 # return stripped audio
138 bytes_stripped = len(audio_data) - len(stripped_data)
139 if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
140 seconds_stripped = round(bytes_stripped / pcm_format.pcm_sample_size, 2)
141 location = "end" if reverse else "begin"
142 LOGGER.log(
143 VERBOSE_LOG_LEVEL,
144 "stripped %s seconds of silence from %s of pcm audio. bytes stripped: %s",
145 seconds_stripped,
146 location,
147 bytes_stripped,
148 )
149 return stripped_data
150
151
152def get_player_dsp_details(
153 mass: MusicAssistant, player: Player, group_preventing_dsp: bool = False
154) -> DSPDetails:
155 """Return DSP details of single a player.
156
157 This will however not check if the queried player is part of a group.
158 The caller is responsible for passing the result of is_grouping_preventing_dsp of
159 the leader/PlayerGroup as the group_preventing_dsp argument in such cases.
160 """
161 dsp_config = mass.config.get_player_dsp_config(player.player_id)
162 dsp_state = DSPState.ENABLED if dsp_config.enabled else DSPState.DISABLED
163 if dsp_state == DSPState.ENABLED and (
164 group_preventing_dsp or is_grouping_preventing_dsp(player)
165 ):
166 dsp_state = DSPState.DISABLED_BY_UNSUPPORTED_GROUP
167 dsp_config = DSPConfig(enabled=False)
168 elif dsp_state == DSPState.DISABLED:
169 # DSP is disabled by the user, remove all filters
170 dsp_config = DSPConfig(enabled=False)
171
172 # remove disabled filters
173 dsp_config.filters = [x for x in dsp_config.filters if x.enabled]
174
175 output_limiter = is_output_limiter_enabled(mass, player)
176 return DSPDetails(
177 state=dsp_state,
178 input_gain=dsp_config.input_gain,
179 filters=dsp_config.filters,
180 output_gain=dsp_config.output_gain,
181 output_limiter=output_limiter,
182 output_format=player.extra_data.get("output_format", None),
183 )
184
185
186def get_stream_dsp_details(
187 mass: MusicAssistant,
188 queue_id: str,
189) -> dict[str, DSPDetails]:
190 """Return DSP details of all players playing this queue, keyed by player_id."""
191 player = mass.players.get_player(queue_id)
192 dsp: dict[str, DSPDetails] = {}
193 assert player is not None # for type checking
194 group_preventing_dsp = is_grouping_preventing_dsp(player)
195 output_format = None
196 is_external_group = False
197
198 if player.player_id.startswith(SGP_PREFIX):
199 if group_preventing_dsp:
200 sgp_player = cast("SyncGroupPlayer", player)
201 if sync_leader := sgp_player.sync_leader:
202 output_format = sync_leader.extra_data.get("output_format", None)
203 else:
204 # We only add real players (so skip the PlayerGroups as they only sync containing players)
205 details = get_player_dsp_details(mass, player)
206 dsp[player.player_id] = details
207 if group_preventing_dsp:
208 # The leader is responsible for sending the (combined) audio stream, so get
209 # the output format from the leader.
210 output_format = player.extra_data.get("output_format", None)
211 is_external_group = player.state.type in (PlayerType.GROUP, PlayerType.STEREO_PAIR)
212
213 # We don't enumerate all group members in case this group is externally created
214 # (e.g. a Chromecast group from the Google Home app)
215 if player and player.state.group_members and not is_external_group:
216 # grouped playback, get DSP details for each player in the group
217 for child_id in player.state.group_members:
218 # skip if we already have the details (so if it's the group leader)
219 if child_id in dsp:
220 continue
221 if child_player := mass.players.get_player(child_id):
222 dsp[child_id] = get_player_dsp_details(
223 mass, child_player, group_preventing_dsp=group_preventing_dsp
224 )
225 if group_preventing_dsp:
226 # Use the correct format from the group leader, since
227 # this player is part of a group that does not support
228 # multi device DSP processing.
229 dsp[child_id].output_format = output_format
230 return dsp
231
232
233async def get_stream_details(
234 mass: MusicAssistant,
235 queue_item: QueueItem,
236 seek_position: int = 0,
237 fade_in: bool = False,
238 prefer_album_loudness: bool = False,
239) -> StreamDetails:
240 """
241 Get streamdetails for the given QueueItem.
242
243 This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
244 Do not try to request streamdetails too much in advance as this is expiring data.
245 """
246 streamdetails: StreamDetails | None = None
247 time_start = time.time()
248 LOGGER.debug("Getting streamdetails for %s", queue_item.uri)
249 if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
250 LOGGER.warning("seeking is not possible on duration-less streams!")
251 seek_position = 0
252
253 if not queue_item.media_item and not queue_item.streamdetails:
254 # in case of a non-media item queue item, the streamdetails should already be provided
255 # this should not happen, but guard it just in case
256 raise MediaNotFoundError(
257 f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
258 )
259 buffer: AudioBuffer | None = None
260 if queue_item.streamdetails and (
261 (queue_item.streamdetails.created_at + queue_item.streamdetails.expiration) > time.time()
262 or ((buffer := queue_item.streamdetails.buffer) and buffer.is_valid(seek_position))
263 ):
264 # already got a fresh/unused (or unexpired) streamdetails
265 streamdetails = queue_item.streamdetails
266 else:
267 # need to (re)create streamdetails
268 # retrieve streamdetails from provider
269
270 media_item = queue_item.media_item
271 assert media_item is not None # for type checking
272 preferred_providers: list[str] = []
273 if (
274 (queue := mass.player_queues.get(queue_item.queue_id))
275 and queue.userid
276 and (playback_user := await mass.webserver.auth.get_user(queue.userid))
277 and playback_user.provider_filter
278 ):
279 # handle steering into user preferred providerinstance
280 preferred_providers = playback_user.provider_filter
281 else:
282 preferred_providers = [x.provider_instance for x in media_item.provider_mappings]
283 for allow_other_provider in (False, True):
284 # sort by quality and check item's availability
285 for prov_media in sorted(
286 media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
287 ):
288 if not prov_media.available:
289 LOGGER.debug(f"Skipping unavailable {prov_media}")
290 continue
291 if (
292 not allow_other_provider
293 and prov_media.provider_instance not in preferred_providers
294 ):
295 continue
296 # guard that provider is available
297 music_prov = mass.get_provider(prov_media.provider_instance)
298 if TYPE_CHECKING: # avoid circular import
299 assert isinstance(music_prov, MusicProvider)
300 if not music_prov:
301 LOGGER.debug(f"Skipping {prov_media} - provider not available")
302 continue # provider not available ?
303 # get streamdetails from provider
304 try:
305 BYPASS_THROTTLER.set(True)
306 streamdetails = await music_prov.get_stream_details(
307 prov_media.item_id, media_item.media_type
308 )
309 except MusicAssistantError as err:
310 LOGGER.warning(str(err))
311 else:
312 break
313 finally:
314 BYPASS_THROTTLER.set(False)
315
316 if not streamdetails:
317 msg = f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
318 raise MediaNotFoundError(msg)
319
320 # work out how to handle radio stream
321 if (
322 streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
323 and streamdetails.media_type == MediaType.RADIO
324 and isinstance(streamdetails.path, str)
325 ):
326 resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
327 streamdetails.path = resolved_url
328 streamdetails.stream_type = stream_type
329 # Set up metadata monitoring callback for HLS radio streams
330 if stream_type == StreamType.HLS:
331 streamdetails.stream_metadata_update_callback = partial(
332 _update_hls_radio_metadata, mass
333 )
334 streamdetails.stream_metadata_update_interval = 5
335 # handle volume normalization details
336 if result := await mass.music.get_loudness(
337 streamdetails.item_id,
338 streamdetails.provider,
339 media_type=queue_item.media_type,
340 ):
341 streamdetails.loudness = result[0]
342 streamdetails.loudness_album = result[1]
343
344 # set queue_id on the streamdetails so we know what is being streamed
345 streamdetails.queue_id = queue_item.queue_id
346 # handle skip/fade_in details
347 streamdetails.seek_position = seek_position
348 streamdetails.fade_in = fade_in
349 if not streamdetails.duration:
350 streamdetails.duration = queue_item.duration
351 streamdetails.prefer_album_loudness = prefer_album_loudness
352 player_settings = await mass.config.get_player_config(streamdetails.queue_id)
353 core_config = await mass.config.get_core_config("streams")
354 conf_volume_normalization_target = float(
355 str(player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET, -17))
356 )
357 # guard against invalid volume normalization values
358 # range and default_value are guaranteed to be set for this constant
359 volume_range = CONF_ENTRY_VOLUME_NORMALIZATION_TARGET.range
360 assert volume_range is not None
361 if (
362 conf_volume_normalization_target < volume_range[0]
363 or conf_volume_normalization_target >= volume_range[1]
364 ):
365 default_val = CONF_ENTRY_VOLUME_NORMALIZATION_TARGET.default_value
366 assert isinstance(default_val, (int, float))
367 conf_volume_normalization_target = float(default_val)
368 LOGGER.warning(
369 "Invalid volume normalization target configured for player %s, "
370 "resetting to default of %s dB",
371 streamdetails.queue_id,
372 CONF_ENTRY_VOLUME_NORMALIZATION_TARGET.default_value,
373 )
374 streamdetails.target_loudness = conf_volume_normalization_target
375 streamdetails.volume_normalization_mode = _get_normalization_mode(
376 core_config, player_settings, streamdetails
377 )
378
379 # attach the DSP details of all group members
380 streamdetails.dsp = get_stream_dsp_details(mass, streamdetails.queue_id)
381
382 LOGGER.debug(
383 "retrieved streamdetails for %s in %s milliseconds",
384 queue_item.uri,
385 int((time.time() - time_start) * 1000),
386 )
387 return streamdetails
388
389
390async def get_buffered_media_stream(
391 mass: MusicAssistant,
392 streamdetails: StreamDetails,
393 pcm_format: AudioFormat,
394 seek_position: int = 0,
395 filter_params: list[str] | None = None,
396) -> AsyncGenerator[bytes, None]:
397 """Get audio stream for given media details as raw PCM with buffering."""
398 LOGGER.log(
399 VERBOSE_LOG_LEVEL,
400 "buffered_media_stream: Starting for %s (seek: %s)",
401 streamdetails.uri,
402 seek_position,
403 )
404
405 # checksum based on filter_params
406 checksum = f"{filter_params}"
407
408 async def fill_buffer_task() -> None:
409 """Background task to fill the audio buffer."""
410 chunk_count = 0
411 status = "running"
412 try:
413 async for chunk in get_media_stream(
414 mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params
415 ):
416 chunk_count += 1
417 await audio_buffer.put(chunk)
418 # Yield to event loop to prevent blocking warnings
419 await asyncio.sleep(0)
420 # Only set EOF if we completed successfully
421 await audio_buffer.set_eof()
422 except asyncio.CancelledError:
423 status = "cancelled"
424 raise
425 except Exception:
426 status = "aborted with error"
427 raise
428 finally:
429 LOGGER.log(
430 VERBOSE_LOG_LEVEL,
431 "fill_buffer_task: %s (%s chunks) for %s",
432 status,
433 chunk_count,
434 streamdetails.uri,
435 )
436
437 # check for existing buffer and reuse if possible
438 existing_buffer: AudioBuffer | None = streamdetails.buffer
439 if existing_buffer is not None:
440 if not existing_buffer.is_valid(checksum, seek_position):
441 LOGGER.log(
442 VERBOSE_LOG_LEVEL,
443 "buffered_media_stream: Existing buffer invalid for %s (seek: %s, discarded: %s)",
444 streamdetails.uri,
445 seek_position,
446 existing_buffer._discarded_chunks,
447 )
448 await existing_buffer.clear()
449 streamdetails.buffer = None
450 existing_buffer = None
451 else:
452 LOGGER.debug(
453 "buffered_media_stream: Reusing existing buffer for %s - "
454 "available: %ss, seek: %s, discarded: %s",
455 streamdetails.uri,
456 existing_buffer.seconds_available,
457 seek_position,
458 existing_buffer._discarded_chunks,
459 )
460 audio_buffer = existing_buffer
461
462 if not existing_buffer and seek_position > 60:
463 # If seeking into the track and no valid buffer exists,
464 # just start a normal stream without buffering,
465 # otherwise we would need to fill the buffer up to the seek position first
466 # which is not efficient.
467 LOGGER.debug(
468 "buffered_media_stream: No existing buffer and seek >60s for %s, "
469 "starting normal (unbuffered) stream",
470 streamdetails.uri,
471 )
472 async for chunk in get_media_stream(
473 mass,
474 streamdetails,
475 pcm_format,
476 seek_position=seek_position,
477 filter_params=filter_params,
478 ):
479 yield chunk
480 return
481
482 if not existing_buffer:
483 # create new audio buffer and start fill task
484 LOGGER.debug(
485 "buffered_media_stream: Creating new buffer for %s",
486 streamdetails.uri,
487 )
488 audio_buffer = AudioBuffer(pcm_format, checksum)
489 streamdetails.buffer = audio_buffer
490 task = mass.loop.create_task(fill_buffer_task())
491 audio_buffer.attach_producer_task(task)
492
493 # special case: pcm format mismatch, resample on the fly
494 # this may happen in some special situations such as crossfading
495 # and its a bit of a waste to throw away the existing buffer
496 if audio_buffer.pcm_format != pcm_format:
497 LOGGER.info(
498 "buffered_media_stream: pcm format mismatch, resampling on the fly for %s - "
499 "buffer format: %s - requested format: %s",
500 streamdetails.uri,
501 audio_buffer.pcm_format,
502 pcm_format,
503 )
504 async for chunk in get_ffmpeg_stream(
505 audio_input=audio_buffer.iter(seek_position=seek_position),
506 input_format=audio_buffer.pcm_format,
507 output_format=pcm_format,
508 ):
509 yield chunk
510 return
511
512 # yield data from the buffer
513 chunk_count = 0
514 try:
515 async for chunk in audio_buffer.iter(seek_position=seek_position):
516 chunk_count += 1
517 yield chunk
518 finally:
519 LOGGER.log(
520 VERBOSE_LOG_LEVEL,
521 "buffered_media_stream: Completed, yielded %s chunks",
522 chunk_count,
523 )
524
525
526async def get_media_stream(
527 mass: MusicAssistant,
528 streamdetails: StreamDetails,
529 pcm_format: AudioFormat,
530 seek_position: int = 0,
531 filter_params: list[str] | None = None,
532) -> AsyncGenerator[bytes, None]:
533 """Get audio stream for given media details as raw PCM."""
534 logger = LOGGER.getChild("media_stream")
535 logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
536 extra_input_args = streamdetails.extra_input_args or []
537
538 # work out audio source for these streamdetails
539 audio_source: str | AsyncGenerator[bytes, None]
540 stream_type = streamdetails.stream_type
541 if stream_type == StreamType.CUSTOM:
542 music_prov = mass.get_provider(streamdetails.provider)
543 if TYPE_CHECKING: # avoid circular import
544 assert isinstance(music_prov, MusicProvider)
545 audio_source = music_prov.get_audio_stream(
546 streamdetails,
547 seek_position=seek_position if streamdetails.can_seek else 0,
548 )
549 seek_position = 0 if streamdetails.can_seek else seek_position
550 elif stream_type == StreamType.ICY:
551 assert isinstance(streamdetails.path, str) # for type checking
552 audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
553 seek_position = 0 # seeking not possible on radio streams
554 elif stream_type == StreamType.HLS:
555 assert isinstance(streamdetails.path, str) # for type checking
556 substream = await get_hls_substream(mass, streamdetails.path)
557 audio_source = substream.path
558 if streamdetails.media_type == MediaType.RADIO:
559 # HLS streams (especially the BBC) struggle when they're played directly
560 # with ffmpeg, where they just stop after some minutes,
561 # so we tell ffmpeg to loop around in this case.
562 extra_input_args += ["-stream_loop", "-1", "-re"]
563 else:
564 # all other stream types (HTTP, FILE, etc)
565 if stream_type == StreamType.ENCRYPTED_HTTP:
566 assert streamdetails.decryption_key is not None # for type checking
567 extra_input_args += ["-decryption_key", streamdetails.decryption_key]
568 if isinstance(streamdetails.path, list):
569 # multi part stream
570 audio_source = get_multi_file_stream(mass, streamdetails, seek_position)
571 seek_position = 0 # handled by get_multi_file_stream
572 else:
573 # regular single file/url stream
574 assert isinstance(streamdetails.path, str) # for type checking
575 audio_source = streamdetails.path
576
577 # handle seek support
578 if seek_position and streamdetails.duration and streamdetails.allow_seek:
579 extra_input_args += ["-ss", str(int(seek_position))]
580
581 bytes_sent = 0
582 finished = False
583 cancelled = False
584 first_chunk_received = False
585 ffmpeg_proc = FFMpeg(
586 audio_input=audio_source,
587 input_format=streamdetails.audio_format,
588 output_format=pcm_format,
589 filter_params=filter_params,
590 extra_input_args=extra_input_args,
591 collect_log_history=True,
592 loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
593 )
594
595 try:
596 await ffmpeg_proc.start()
597 assert ffmpeg_proc.proc is not None # for type checking
598 logger.debug(
599 "Started media stream for %s - using streamtype: %s - pcm format: %s - ffmpeg PID: %s",
600 streamdetails.uri,
601 streamdetails.stream_type,
602 pcm_format.content_type.value,
603 ffmpeg_proc.proc.pid,
604 )
605 stream_start = mass.loop.time()
606
607 chunk_size = get_chunksize(pcm_format, 1)
608 async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
609 if not first_chunk_received:
610 # At this point ffmpeg has started and should now know the codec used
611 # for encoding the audio.
612 first_chunk_received = True
613 streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type
614 logger.debug(
615 "First chunk received after %.2f seconds (codec detected: %s)",
616 mass.loop.time() - stream_start,
617 ffmpeg_proc.input_format.codec_type,
618 )
619 yield chunk
620 bytes_sent += len(chunk)
621
622 # end of audio/track reached
623 logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
624 # wait until stderr also completed reading
625 await ffmpeg_proc.wait_with_timeout(5)
626 if ffmpeg_proc.returncode not in (0, None):
627 log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:])
628 raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}")
629 if bytes_sent == 0:
630 # edge case: no audio data was received at all
631 raise AudioError("No audio was received")
632 finished = True
633 except (Exception, GeneratorExit, asyncio.CancelledError) as err:
634 if isinstance(err, asyncio.CancelledError | GeneratorExit):
635 # we were cancelled, just raise
636 cancelled = True
637 raise
638 # dump the last 10 lines of the log in case of an unclean exit
639 logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
640 raise AudioError(f"Error while streaming: {err}") from err
641 finally:
642 # always ensure close is called which also handles all cleanup
643 await ffmpeg_proc.close()
644 # determine how many seconds we've received
645 # for pcm output we can calculate this easily
646 seconds_received = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
647 # store accurate duration
648 if finished and not seek_position and seconds_received:
649 streamdetails.duration = int(seconds_received)
650
651 logger.log(
652 VERBOSE_LOG_LEVEL,
653 "stream %s (with code %s) for %s",
654 "cancelled" if cancelled else "finished" if finished else "aborted",
655 ffmpeg_proc.returncode,
656 streamdetails.uri,
657 )
658
659 # parse loudnorm data if we have that collected (and enabled)
660 if (
661 (streamdetails.loudness is None or finished)
662 and streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC
663 and (finished or (seconds_received >= 300))
664 ):
665 # if dynamic volume normalization is enabled
666 # the loudnorm filter will output the measurement in the log,
667 # so we can use that directly instead of analyzing the audio
668 logger.log(VERBOSE_LOG_LEVEL, "Collecting loudness measurement...")
669 if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
670 logger.debug(
671 "Loudness measurement for %s: %s dB",
672 streamdetails.uri,
673 loudness_details,
674 )
675 mass.create_task(
676 mass.music.set_loudness(
677 streamdetails.item_id,
678 streamdetails.provider,
679 loudness_details,
680 media_type=streamdetails.media_type,
681 )
682 )
683
684
685def create_wave_header(
686 samplerate: int = 44100, channels: int = 2, bitspersample: int = 16, duration: int | None = None
687) -> bytes:
688 """Generate a wave header from given params."""
689 file = BytesIO()
690
691 # Generate format chunk
692 format_chunk_spec = b"<4sLHHLLHH"
693 format_chunk = struct.pack(
694 format_chunk_spec,
695 b"fmt ", # Chunk id
696 16, # Size of this chunk (excluding chunk id and this field)
697 1, # Audio format, 1 for PCM
698 channels, # Number of channels
699 int(samplerate), # Samplerate, 44100, 48000, etc.
700 int(samplerate * channels * (bitspersample / 8)), # Byterate
701 int(channels * (bitspersample / 8)), # Blockalign
702 bitspersample, # 16 bits for two byte samples, etc.
703 )
704 # Generate data chunk
705 # duration = 3600*6.7
706 data_chunk_spec = b"<4sL"
707 if duration is None:
708 # use max value possible
709 datasize = 4254768000 # = 6,7 hours at 44100/16
710 else:
711 # calculate from duration
712 numsamples = samplerate * duration
713 datasize = int(numsamples * channels * (bitspersample / 8))
714 data_chunk = struct.pack(
715 data_chunk_spec,
716 b"data", # Chunk id
717 int(datasize), # Chunk size (excluding chunk id and this field)
718 )
719 sum_items = [
720 # "WAVE" string following size field
721 4,
722 # "fmt " + chunk size field + chunk size
723 struct.calcsize(format_chunk_spec),
724 # Size of data chunk spec + data size
725 struct.calcsize(data_chunk_spec) + datasize,
726 ]
727 # Generate main header
728 all_chunks_size = int(sum(sum_items))
729 main_header_spec = b"<4sL4s"
730 main_header = struct.pack(main_header_spec, b"RIFF", all_chunks_size, b"WAVE")
731 # Write all the contents in
732 file.write(main_header)
733 file.write(format_chunk)
734 file.write(data_chunk)
735
736 # return file.getvalue(), all_chunks_size + 8
737 return file.getvalue()
738
739
740async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, StreamType]:
741 """
742 Resolve a streaming radio URL.
743
744 Unwraps any playlists if needed.
745 Determines if the stream supports ICY metadata.
746
747 Returns tuple;
748 - unfolded URL as string
749 - StreamType to determine ICY (radio) or HLS stream.
750 """
751 if cache := await mass.cache.get(
752 key=url, provider=CACHE_PROVIDER, category=CACHE_CATEGORY_RESOLVED_RADIO_URL
753 ):
754 if TYPE_CHECKING: # for type checking
755 cache = cast("tuple[str, str]", cache)
756 return (cache[0], StreamType(cache[1]))
757 stream_type = StreamType.HTTP
758 resolved_url = url
759 timeout = ClientTimeout(total=None, connect=10, sock_read=5)
760 try:
761 async with mass.http_session_no_ssl.get(
762 url, headers=HTTP_HEADERS_ICY, allow_redirects=True, timeout=timeout
763 ) as resp:
764 headers = resp.headers
765 resp.raise_for_status()
766 if not resp.headers:
767 raise InvalidDataError("no headers found")
768 if headers.get("icy-metaint") is not None:
769 stream_type = StreamType.ICY
770 if (
771 url.endswith((".m3u", ".m3u8", ".pls"))
772 or ".m3u?" in url
773 or ".m3u8?" in url
774 or ".pls?" in url
775 or "audio/x-mpegurl" in headers.get("content-type", "")
776 or "audio/x-scpls" in headers.get("content-type", "")
777 ):
778 # url is playlist, we need to unfold it
779 try:
780 substreams = await fetch_playlist(mass, url)
781 if not any(x for x in substreams if x.length):
782 for line in substreams:
783 if not line.is_url:
784 continue
785 # unfold first url of playlist
786 return await resolve_radio_stream(mass, line.path)
787 raise InvalidDataError("No content found in playlist")
788 except IsHLSPlaylist:
789 stream_type = StreamType.HLS
790
791 except Exception as err:
792 LOGGER.warning("Error while parsing radio URL %s: %s", url, str(err))
793 return (url, stream_type)
794
795 result = (resolved_url, stream_type)
796 cache_expiration = 3600 * 3
797 await mass.cache.set(
798 url,
799 result,
800 expiration=cache_expiration,
801 provider=CACHE_PROVIDER,
802 category=CACHE_CATEGORY_RESOLVED_RADIO_URL,
803 )
804 return result
805
806
807async def get_icy_radio_stream(
808 mass: MusicAssistant, url: str, streamdetails: StreamDetails
809) -> AsyncGenerator[bytes, None]:
810 """Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
811 timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
812 LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
813 async with mass.http_session_no_ssl.get(
814 url, allow_redirects=True, headers=HTTP_HEADERS_ICY, timeout=timeout
815 ) as resp:
816 headers = resp.headers
817 meta_int = int(headers["icy-metaint"])
818 while True:
819 try:
820 yield await resp.content.readexactly(meta_int)
821 meta_byte = await resp.content.readexactly(1)
822 if meta_byte == b"\x00":
823 continue
824 meta_length = ord(meta_byte) * 16
825 meta_data = await resp.content.readexactly(meta_length)
826 except asyncio.exceptions.IncompleteReadError:
827 break
828 if not meta_data:
829 continue
830 meta_data = meta_data.rstrip(b"\0")
831 stream_title_re = re.search(rb"StreamTitle='([^']*)';", meta_data)
832 if not stream_title_re:
833 continue
834 try:
835 # in 99% of the cases the stream title is utf-8 encoded
836 stream_title = stream_title_re.group(1).decode("utf-8")
837 except UnicodeDecodeError:
838 # fallback to iso-8859-1
839 stream_title = stream_title_re.group(1).decode("iso-8859-1", errors="replace")
840 cleaned_stream_title = clean_stream_title(stream_title)
841 if cleaned_stream_title != streamdetails.stream_title:
842 LOGGER.log(
843 VERBOSE_LOG_LEVEL,
844 "ICY Radio streamtitle original: %s",
845 stream_title,
846 )
847 LOGGER.log(
848 VERBOSE_LOG_LEVEL,
849 "ICY Radio streamtitle cleaned: %s",
850 cleaned_stream_title,
851 )
852 streamdetails.stream_title = cleaned_stream_title
853
854
855def parse_extinf_metadata(extinf_line: str) -> dict[str, str]:
856 """
857 Parse metadata from HLS EXTINF line.
858
859 Extracts structured metadata like title="...", artist="..." from EXTINF lines.
860 Common in iHeartRadio and other commercial radio HLS streams.
861
862 :param extinf_line: The EXTINF line containing metadata
863 """
864 metadata = {}
865
866 # Pattern to match key="value" pairs in the EXTINF line
867 # Handles nested quotes by matching everything until the closing quote
868 pattern = r'(\w+)="([^"]*)"'
869
870 matches = re.findall(pattern, extinf_line)
871 for key, value in matches:
872 metadata[key.lower()] = value
873
874 return metadata
875
876
877async def _update_hls_radio_metadata(
878 mass: MusicAssistant,
879 streamdetails: StreamDetails,
880 elapsed_time: int, # noqa: ARG001
881) -> None:
882 """
883 Update HLS radio stream metadata by fetching the playlist.
884
885 Fetches the HLS playlist and extracts metadata from EXTINF lines.
886
887 :param mass: MusicAssistant instance
888 :param streamdetails: StreamDetails object to update with metadata
889 :param elapsed_time: Current playback position in seconds (unused for live radio)
890 """
891 try:
892 # Get the actual media playlist URL from cache or resolve it
893 # We cache the media_playlist_url in streamdetails.data to avoid re-resolving
894 if streamdetails.data is None:
895 streamdetails.data = {}
896 media_playlist_url = streamdetails.data.get("hls_media_playlist_url")
897 if not media_playlist_url:
898 try:
899 assert isinstance(streamdetails.path, str) # for type checking
900 substream = await get_hls_substream(mass, streamdetails.path)
901 media_playlist_url = substream.path
902 streamdetails.data["hls_media_playlist_url"] = media_playlist_url
903 except Exception as err:
904 LOGGER.warning(
905 "Failed to resolve HLS substream for metadata monitoring: %s",
906 err,
907 )
908 return
909
910 # Fetch the media playlist
911 timeout = ClientTimeout(total=0, connect=10, sock_read=30)
912 async with mass.http_session_no_ssl.get(media_playlist_url, timeout=timeout) as resp:
913 resp.raise_for_status()
914 playlist_content = await resp.text()
915
916 # Parse the playlist and look for EXTINF metadata
917 # The most recent segment usually has the current metadata
918 lines = playlist_content.strip().split("\n")
919 for line in reversed(lines):
920 if line.startswith("#EXTINF:"):
921 # Extract metadata from EXTINF line
922 metadata = parse_extinf_metadata(line)
923
924 # Build stream title from title and artist
925 title = metadata.get("title", "")
926 artist = metadata.get("artist", "")
927
928 if title or artist:
929 # Format as "Artist - Title"
930 if artist and title:
931 stream_title = f"{artist} - {title}"
932 elif title:
933 stream_title = title
934 else:
935 stream_title = artist
936
937 # Clean the stream title
938 cleaned_title = clean_stream_title(stream_title)
939
940 # Only update if changed
941 if cleaned_title != streamdetails.stream_title and cleaned_title:
942 LOGGER.log(
943 VERBOSE_LOG_LEVEL,
944 "HLS Radio metadata updated: %s",
945 cleaned_title,
946 )
947 streamdetails.stream_title = cleaned_title
948
949 # Only check the most recent EXTINF
950 break
951
952 except Exception as err:
953 LOGGER.debug(
954 "Error fetching HLS metadata: %s",
955 err,
956 )
957
958
959async def get_hls_substream(
960 mass: MusicAssistant,
961 url: str,
962) -> PlaylistItem:
963 """Select the (highest quality) HLS substream for given HLS playlist/URL."""
964 timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
965 # fetch master playlist and select (best) child playlist
966 # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
967 async with mass.http_session_no_ssl.get(
968 url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
969 ) as resp:
970 resp.raise_for_status()
971 raw_data = await resp.read()
972 encoding = resp.charset or await detect_charset(raw_data)
973 master_m3u_data = raw_data.decode(encoding)
974 substreams = parse_m3u(master_m3u_data)
975 # There is a chance that we did not get a master playlist with subplaylists
976 # but just a single master/sub playlist with the actual audio stream(s)
977 # so we need to detect if the playlist child's contain audio streams or
978 # sub-playlists.
979 if any(
980 x
981 for x in substreams
982 if (x.length or x.path.endswith((".mp4", ".aac")))
983 and not x.path.endswith((".m3u", ".m3u8"))
984 ):
985 return PlaylistItem(path=url, key=substreams[0].key)
986 # sort substreams on best quality (highest bandwidth) when available
987 if any(x for x in substreams if x.stream_info):
988 substreams.sort(
989 key=lambda x: int(
990 x.stream_info.get("BANDWIDTH", "0") if x.stream_info is not None else 0
991 ),
992 reverse=True,
993 )
994 substream = substreams[0]
995 if not substream.path.startswith("http"):
996 # path is relative, stitch it together
997 base_path = url.rsplit("/", 1)[0]
998 substream.path = base_path + "/" + substream.path
999 return substream
1000
1001
1002async def get_http_stream(
1003 mass: MusicAssistant,
1004 url: str,
1005 streamdetails: StreamDetails,
1006 seek_position: int = 0,
1007 verify_ssl: bool = True,
1008) -> AsyncGenerator[bytes, None]:
1009 """Get audio stream from HTTP."""
1010 LOGGER.debug("Start HTTP stream for %s (seek_position %s)", streamdetails.uri, seek_position)
1011 if seek_position:
1012 assert streamdetails.duration, "Duration required for seek requests"
1013 http_session = mass.http_session if verify_ssl else mass.http_session_no_ssl
1014 # try to get filesize with a head request
1015 seek_supported = streamdetails.can_seek
1016 if seek_position or not streamdetails.size:
1017 async with http_session.head(url, allow_redirects=True, headers=HTTP_HEADERS) as resp:
1018 resp.raise_for_status()
1019 if size := resp.headers.get("Content-Length"):
1020 streamdetails.size = int(size)
1021 seek_supported = resp.headers.get("Accept-Ranges") == "bytes"
1022 # headers
1023 headers = {**HTTP_HEADERS}
1024 timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
1025 skip_bytes = 0
1026 if seek_position and streamdetails.size:
1027 assert streamdetails.duration is not None # for type checking
1028 skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position)
1029 headers["Range"] = f"bytes={skip_bytes}-{streamdetails.size}"
1030
1031 # seeking an unknown or container format is not supported due to the (moov) headers
1032 if seek_position and (
1033 not seek_supported
1034 or streamdetails.audio_format.content_type
1035 in (
1036 ContentType.UNKNOWN,
1037 ContentType.M4A,
1038 ContentType.M4B,
1039 )
1040 ):
1041 LOGGER.warning(
1042 "Seeking in %s (%s) not possible.",
1043 streamdetails.uri,
1044 streamdetails.audio_format.output_format_str,
1045 )
1046 seek_position = 0
1047 streamdetails.seek_position = 0
1048
1049 # start the streaming from http
1050 bytes_received = 0
1051 async with http_session.get(
1052 url, allow_redirects=True, headers=headers, timeout=timeout
1053 ) as resp:
1054 is_partial = resp.status == 206
1055 if seek_position and not is_partial:
1056 raise InvalidDataError("HTTP source does not support seeking!")
1057 resp.raise_for_status()
1058 async for chunk in resp.content.iter_any():
1059 bytes_received += len(chunk)
1060 yield chunk
1061
1062 # store size on streamdetails for later use
1063 if not streamdetails.size:
1064 streamdetails.size = bytes_received
1065 LOGGER.debug(
1066 "Finished HTTP stream for %s (transferred %s/%s bytes)",
1067 streamdetails.uri,
1068 bytes_received,
1069 streamdetails.size,
1070 )
1071
1072
1073async def get_file_stream(
1074 mass: MusicAssistant, # noqa: ARG001
1075 filename: str,
1076 streamdetails: StreamDetails,
1077 seek_position: int = 0,
1078) -> AsyncGenerator[bytes, None]:
1079 """Get audio stream from local accessible file."""
1080 if seek_position:
1081 assert streamdetails.duration, "Duration required for seek requests"
1082 if not streamdetails.size:
1083 stat = await asyncio.to_thread(os.stat, filename)
1084 streamdetails.size = stat.st_size
1085
1086 # seeking an unknown or container format is not supported due to the (moov) headers
1087 if seek_position and (
1088 streamdetails.audio_format.content_type
1089 in (
1090 ContentType.UNKNOWN,
1091 ContentType.M4A,
1092 ContentType.M4B,
1093 ContentType.MP4,
1094 )
1095 ):
1096 LOGGER.warning(
1097 "Seeking in %s (%s) not possible.",
1098 streamdetails.uri,
1099 streamdetails.audio_format.output_format_str,
1100 )
1101 seek_position = 0
1102 streamdetails.seek_position = 0
1103
1104 chunk_size = get_chunksize(streamdetails.audio_format)
1105 async with aiofiles.open(streamdetails.data, "rb") as _file:
1106 if seek_position:
1107 assert streamdetails.duration is not None # for type checking
1108 seek_pos = int((streamdetails.size / streamdetails.duration) * seek_position)
1109 await _file.seek(seek_pos)
1110 # yield chunks of data from file
1111 while True:
1112 data = await _file.read(chunk_size)
1113 if not data:
1114 break
1115 yield data
1116
1117
1118def _get_parts_from_position(
1119 parts: list[MultiPartPath], seek_position: int
1120) -> tuple[list[MultiPartPath], int]:
1121 """Get the remaining parts list from a timestamp.
1122
1123 Arguments:
1124 parts: The list of parts
1125 seek_position: The seeking position in seconds of the tracklist
1126
1127 Returns:
1128 In a tuple, A list of parts, starting with the one at the requested
1129 seek position and the position in seconds to seek to in the first
1130 track.
1131 """
1132 skipped_duration = 0.0
1133 for i, part in enumerate(parts):
1134 if not isinstance(part, MultiPartPath):
1135 raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1136 if part.duration is None:
1137 return parts, seek_position
1138 if skipped_duration + part.duration < seek_position:
1139 skipped_duration += part.duration
1140 continue
1141
1142 position = seek_position - skipped_duration
1143
1144 # Seeking in some parts is inaccurate, making the seek to a chapter land on the end of
1145 # the previous track. If we're within 2 second of the end, skip the current track
1146 if position + 2 >= part.duration:
1147 LOGGER.debug(
1148 f"Skipping to the next part due to seek position being at the end: {position}"
1149 )
1150 if i + 1 < len(parts):
1151 return parts[i + 1 :], 0
1152 return parts[i:], int(position) # last part, cannot skip
1153
1154 return parts[i:], int(position)
1155
1156 raise IndexError(f"Could not find any candidate part for position {seek_position}")
1157
1158
1159async def get_multi_file_stream(
1160 mass: MusicAssistant, # noqa: ARG001
1161 streamdetails: StreamDetails,
1162 seek_position: int = 0,
1163) -> AsyncGenerator[bytes, None]:
1164 """Return audio stream for a concatenation of multiple files.
1165
1166 Arguments:
1167 seek_position: The position to seek to in seconds
1168 """
1169 if not isinstance(streamdetails.path, list):
1170 raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1171 parts, seek_position = _get_parts_from_position(streamdetails.path, seek_position)
1172 files_list = [part.path for part in parts]
1173
1174 # concat input files
1175 temp_file = f"/tmp/{shortuuid.random(20)}.txt" # noqa: S108
1176 async with aiofiles.open(temp_file, "w") as f:
1177 for path in files_list:
1178 await f.write(f"file '{path}'\n")
1179
1180 try:
1181 async for chunk in get_ffmpeg_stream(
1182 audio_input=temp_file,
1183 input_format=streamdetails.audio_format,
1184 output_format=AudioFormat(
1185 content_type=ContentType.NUT,
1186 sample_rate=streamdetails.audio_format.sample_rate,
1187 bit_depth=streamdetails.audio_format.bit_depth,
1188 channels=streamdetails.audio_format.channels,
1189 ),
1190 extra_input_args=[
1191 "-safe",
1192 "0",
1193 "-f",
1194 "concat",
1195 "-i",
1196 temp_file,
1197 "-ss",
1198 str(seek_position),
1199 ],
1200 ):
1201 yield chunk
1202 finally:
1203 await remove_file(temp_file)
1204
1205
1206async def get_preview_stream(
1207 mass: MusicAssistant,
1208 provider_instance_id_or_domain: str,
1209 item_id: str,
1210 media_type: MediaType = MediaType.TRACK,
1211) -> AsyncGenerator[bytes, None]:
1212 """Create a 30 seconds preview audioclip for the given streamdetails."""
1213 if not (music_prov := mass.get_provider(provider_instance_id_or_domain)):
1214 raise ProviderUnavailableError
1215 if TYPE_CHECKING: # avoid circular import
1216 assert isinstance(music_prov, MusicProvider)
1217
1218 # Validate that item_id corresponds to a valid item in the provider for security
1219 if not await music_prov.get_item(media_type, item_id):
1220 msg = f"Item {item_id} not found in provider {provider_instance_id_or_domain}"
1221 raise MediaNotFoundError(msg)
1222
1223 streamdetails = await music_prov.get_stream_details(item_id, media_type)
1224 pcm_format = AudioFormat(
1225 content_type=ContentType.from_bit_depth(streamdetails.audio_format.bit_depth),
1226 sample_rate=streamdetails.audio_format.sample_rate,
1227 bit_depth=streamdetails.audio_format.bit_depth,
1228 channels=streamdetails.audio_format.channels,
1229 )
1230 async for chunk in get_ffmpeg_stream(
1231 audio_input=get_media_stream(
1232 mass=mass,
1233 streamdetails=streamdetails,
1234 pcm_format=pcm_format,
1235 ),
1236 input_format=pcm_format,
1237 output_format=AudioFormat(content_type=ContentType.AAC),
1238 extra_input_args=["-t", "30"], # cut after 30 seconds
1239 ):
1240 yield chunk
1241
1242
1243async def get_silence(
1244 duration: int,
1245 output_format: AudioFormat,
1246) -> AsyncGenerator[bytes, None]:
1247 """Create stream of silence, encoded to format of choice."""
1248 if output_format.content_type.is_pcm():
1249 # pcm = just zeros
1250 for _ in range(duration):
1251 yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1252 return
1253 if output_format.content_type == ContentType.WAV:
1254 # wav silence = wave header + zero's
1255 yield create_wave_header(
1256 samplerate=output_format.sample_rate,
1257 channels=2,
1258 bitspersample=output_format.bit_depth,
1259 duration=duration,
1260 )
1261 for _ in range(duration):
1262 yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1263 return
1264 # use ffmpeg for all other encodings
1265 args = [
1266 "ffmpeg",
1267 "-hide_banner",
1268 "-loglevel",
1269 "quiet",
1270 "-f",
1271 "lavfi",
1272 "-i",
1273 f"anullsrc=r={output_format.sample_rate}:cl={'stereo'}",
1274 "-t",
1275 str(duration),
1276 "-f",
1277 output_format.output_format_str,
1278 "-",
1279 ]
1280 async with AsyncProcess(args, stdout=True) as ffmpeg_proc:
1281 async for chunk in ffmpeg_proc.iter_chunked():
1282 yield chunk
1283
1284
1285async def resample_pcm_audio(
1286 input_audio: bytes,
1287 input_format: AudioFormat,
1288 output_format: AudioFormat,
1289) -> bytes:
1290 """
1291 Resample (a chunk of) PCM audio from input_format to output_format using ffmpeg.
1292
1293 :param input_audio: Raw PCM audio data to resample.
1294 :param input_format: AudioFormat of the input audio.
1295 :param output_format: Desired AudioFormat for the output audio.
1296
1297 :return: Resampled audio data, frame-aligned. Returns empty bytes if resampling fails.
1298 """
1299 if input_format == output_format:
1300 return input_audio
1301 LOGGER.log(VERBOSE_LOG_LEVEL, f"Resampling audio from {input_format} to {output_format}")
1302 try:
1303 ffmpeg_args = get_ffmpeg_args(
1304 input_format=input_format, output_format=output_format, filter_params=[]
1305 )
1306 _, stdout, stderr = await communicate(ffmpeg_args, input_audio)
1307 if not stdout:
1308 LOGGER.error(
1309 "Resampling failed: no output from ffmpeg. Input: %s, Output: %s, stderr: %s",
1310 input_format,
1311 output_format,
1312 stderr.decode() if stderr else "(no stderr)",
1313 )
1314 return b""
1315 # Ensure frame alignment after resampling
1316 return align_audio_to_frame_boundary(stdout, output_format)
1317 except Exception as err:
1318 LOGGER.exception(
1319 "Failed to resample audio from %s to %s: %s",
1320 input_format,
1321 output_format,
1322 err,
1323 )
1324 return b""
1325
1326
1327def get_chunksize(
1328 fmt: AudioFormat,
1329 seconds: float = 1,
1330) -> int:
1331 """Get a default chunk/file size for given contenttype in bytes."""
1332 pcm_size = int(fmt.sample_rate * (fmt.bit_depth / 8) * fmt.channels * seconds)
1333 if fmt.content_type.is_pcm() or fmt.content_type == ContentType.WAV:
1334 return pcm_size
1335 if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
1336 return pcm_size
1337 if fmt.bit_rate and fmt.bit_rate < 10000:
1338 return int(((fmt.bit_rate * 1000) / 8) * seconds)
1339 if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
1340 # assume 74.7% compression ratio (level 0)
1341 # source: https://z-issue.com/wp/flac-compression-level-comparison/
1342 return int(pcm_size * 0.747)
1343 if fmt.content_type in (ContentType.MP3, ContentType.OGG):
1344 return int((320000 / 8) * seconds)
1345 if fmt.content_type in (ContentType.AAC, ContentType.M4A):
1346 return int((256000 / 8) * seconds)
1347 return int((320000 / 8) * seconds)
1348
1349
1350def is_grouping_preventing_dsp(player: Player) -> bool:
1351 """Check if grouping is preventing DSP from being applied to this leader/PlayerGroup.
1352
1353 If this returns True, no DSP should be applied to the player.
1354 This function will not check if the Player is in a group, the caller should do that first.
1355 """
1356 # We require the caller to handle non-leader cases themselves since player.state.synced_to
1357 # can be unreliable in some edge cases
1358 multi_device_dsp_supported = PlayerFeature.MULTI_DEVICE_DSP in player.state.supported_features
1359 child_count = len(player.state.group_members) if player.state.group_members else 0
1360
1361 is_multiple_devices: bool
1362 if player.provider.domain == "player_group":
1363 # PlayerGroups have no leader, so having a child count of 1 means
1364 # the group actually contains only a single player.
1365 is_multiple_devices = child_count > 1
1366 elif player.state.type == PlayerType.GROUP:
1367 # This is an group player external to Music Assistant.
1368 is_multiple_devices = True
1369 else:
1370 is_multiple_devices = child_count > 0
1371 return is_multiple_devices and not multi_device_dsp_supported
1372
1373
1374def is_output_limiter_enabled(mass: MusicAssistant, player: Player) -> bool:
1375 """Check if the player has the output limiter enabled.
1376
1377 Unlike DSP, the limiter is still configurable when synchronized without MULTI_DEVICE_DSP.
1378 So in grouped scenarios without MULTI_DEVICE_DSP, the permanent sync group or the leader gets
1379 decides if the limiter should be turned on or not.
1380 """
1381 deciding_player_id = player.player_id
1382 if player.state.active_group:
1383 # Syncgroup, get from the group player
1384 deciding_player_id = player.state.active_group
1385 elif player.state.synced_to:
1386 # Not in sync group, but synced, get from the leader
1387 deciding_player_id = player.state.synced_to
1388 output_limiter_enabled = mass.config.get_raw_player_config_value(
1389 deciding_player_id,
1390 CONF_ENTRY_OUTPUT_LIMITER.key,
1391 CONF_ENTRY_OUTPUT_LIMITER.default_value,
1392 )
1393 return bool(output_limiter_enabled)
1394
1395
1396def get_player_filter_params(
1397 mass: MusicAssistant,
1398 player_id: str,
1399 input_format: AudioFormat,
1400 output_format: AudioFormat,
1401) -> list[str]:
1402 """Get player specific filter parameters for ffmpeg (if any)."""
1403 filter_params = []
1404
1405 player = mass.players.get_player(player_id)
1406 # In case this is a protocol player, their DSP config is stored
1407 # under the parent (native or universal) player that wraps them.
1408 dsp_player_id = player_id
1409 if player and player.protocol_parent_id:
1410 dsp_player_id = player.protocol_parent_id
1411 dsp = mass.config.get_player_dsp_config(dsp_player_id)
1412 limiter_enabled = True
1413
1414 if player:
1415 if is_grouping_preventing_dsp(player):
1416 # We can not correctly apply DSP to a grouped player without multi-device DSP support,
1417 # so we disable it.
1418 dsp.enabled = False
1419 elif player.provider.domain == "player_group" and (
1420 PlayerFeature.MULTI_DEVICE_DSP not in player.state.supported_features
1421 ):
1422 # This is a special case! We have a player group where:
1423 # - The group leader does not support MULTI_DEVICE_DSP
1424 # - But only contains a single player (since nothing is preventing DSP)
1425 # We can still apply the DSP of that single player.
1426 if player.state.group_members:
1427 child_player = mass.players.get_player(player.state.group_members[0])
1428 assert child_player is not None # for type checking
1429 dsp = mass.config.get_player_dsp_config(child_player.player_id)
1430 else:
1431 # This should normally never happen, but if it does, we disable DSP.
1432 dsp.enabled = False
1433
1434 # We here implicitly know what output format is used for the player
1435 # in the audio processing steps. We save this information to
1436 # later be able to show this to the user in the UI.
1437 player.extra_data["output_format"] = output_format
1438
1439 limiter_enabled = is_output_limiter_enabled(mass, player)
1440
1441 if dsp.enabled:
1442 # Apply input gain
1443 if dsp.input_gain != 0:
1444 filter_params.append(f"volume={dsp.input_gain}dB")
1445
1446 # Process each DSP filter sequentially
1447 for f in dsp.filters:
1448 if not f.enabled:
1449 continue
1450
1451 # Apply filter
1452 filter_params.extend(filter_to_ffmpeg_params(f, input_format))
1453
1454 # Apply output gain
1455 if dsp.output_gain != 0:
1456 filter_params.append(f"volume={dsp.output_gain}dB")
1457
1458 conf_channels = mass.config.get_raw_player_config_value(
1459 player_id, CONF_OUTPUT_CHANNELS, "stereo"
1460 )
1461
1462 # handle output mixing only left or right
1463 if conf_channels == "left":
1464 filter_params.append("pan=mono|c0=FL")
1465 elif conf_channels == "right":
1466 filter_params.append("pan=mono|c0=FR")
1467
1468 # Add safety limiter at the end
1469 if limiter_enabled:
1470 filter_params.append("alimiter=limit=-2dB:level=false:asc=true")
1471
1472 LOGGER.debug("Generated ffmpeg params for player %s: %s", player_id, filter_params)
1473 return filter_params
1474
1475
1476def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
1477 """Parse Loudness measurement from ffmpeg stderr output."""
1478 stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
1479 if "[Parsed_loudnorm_0 @" not in stderr_data:
1480 return None
1481 for jsun_chunk in stderr_data.split(" { "):
1482 try:
1483 stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}"
1484 loudness_data = json_loads(stderr_data)
1485 return float(loudness_data["input_i"])
1486 except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError):
1487 continue
1488 return None
1489
1490
1491async def analyze_loudness(
1492 mass: MusicAssistant,
1493 streamdetails: StreamDetails,
1494) -> None:
1495 """Analyze media item's audio, to calculate EBU R128 loudness."""
1496 if await mass.music.get_loudness(
1497 streamdetails.item_id,
1498 streamdetails.provider,
1499 media_type=streamdetails.media_type,
1500 ):
1501 # only when needed we do the analyze job
1502 return
1503
1504 logger = LOGGER.getChild("analyze_loudness")
1505 logger.debug("Start analyzing audio for %s", streamdetails.uri)
1506
1507 extra_input_args = [
1508 # limit to 10 minutes to reading too much in memory
1509 "-t",
1510 "600",
1511 ]
1512 # work out audio source for these streamdetails
1513 stream_type = streamdetails.stream_type
1514 audio_source: str | AsyncGenerator[bytes, None]
1515 if stream_type == StreamType.CUSTOM:
1516 music_prov = mass.get_provider(streamdetails.provider)
1517 if TYPE_CHECKING: # avoid circular import
1518 assert isinstance(music_prov, MusicProvider)
1519 audio_source = music_prov.get_audio_stream(streamdetails)
1520 elif stream_type == StreamType.ICY:
1521 assert isinstance(streamdetails.path, str) # for type checking
1522 audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
1523 elif stream_type == StreamType.HLS:
1524 assert isinstance(streamdetails.path, str) # for type checking
1525 substream = await get_hls_substream(mass, streamdetails.path)
1526 audio_source = substream.path
1527 else:
1528 # all other stream types (HTTP, FILE, etc)
1529 if stream_type == StreamType.ENCRYPTED_HTTP:
1530 assert streamdetails.decryption_key is not None # for type checking
1531 extra_input_args += ["-decryption_key", streamdetails.decryption_key]
1532 if isinstance(streamdetails.path, list):
1533 # multi part stream - just use a single file for the measurement
1534 audio_source = streamdetails.path[1].path
1535 else:
1536 # regular single file/url stream
1537 assert isinstance(streamdetails.path, str) # for type checking
1538 audio_source = streamdetails.path
1539
1540 # calculate BS.1770 R128 integrated loudness with ffmpeg
1541 async with FFMpeg(
1542 audio_input=audio_source,
1543 input_format=streamdetails.audio_format,
1544 output_format=streamdetails.audio_format,
1545 audio_output="NULL",
1546 filter_params=["ebur128=framelog=verbose"],
1547 extra_input_args=extra_input_args,
1548 collect_log_history=True,
1549 loglevel="info",
1550 ) as ffmpeg_proc:
1551 await ffmpeg_proc.wait()
1552 log_lines = ffmpeg_proc.log_history
1553 log_lines_str = "\n".join(log_lines)
1554 try:
1555 loudness_str = (
1556 log_lines_str.split("Integrated loudness")[1].split("I:")[1].split("LUFS")[0]
1557 )
1558 loudness = float(loudness_str.strip())
1559 except (IndexError, ValueError, AttributeError):
1560 LOGGER.warning(
1561 "Could not determine integrated loudness of %s - %s",
1562 streamdetails.uri,
1563 log_lines_str or "received empty value",
1564 )
1565 else:
1566 await mass.music.set_loudness(
1567 streamdetails.item_id,
1568 streamdetails.provider,
1569 loudness,
1570 media_type=streamdetails.media_type,
1571 )
1572 logger.debug(
1573 "Integrated loudness of %s is: %s",
1574 streamdetails.uri,
1575 loudness,
1576 )
1577
1578
1579def _get_normalization_mode(
1580 core_config: CoreConfig, player_config: PlayerConfig, streamdetails: StreamDetails
1581) -> VolumeNormalizationMode:
1582 if not player_config.get_value(CONF_VOLUME_NORMALIZATION):
1583 # disabled for this player
1584 return VolumeNormalizationMode.DISABLED
1585 if streamdetails.target_loudness is None:
1586 # no target loudness set, disable normalization
1587 return VolumeNormalizationMode.DISABLED
1588 # work out preference for track or radio
1589 preference = VolumeNormalizationMode(
1590 str(
1591 core_config.get_value(
1592 CONF_VOLUME_NORMALIZATION_RADIO
1593 if streamdetails.media_type == MediaType.RADIO
1594 else CONF_VOLUME_NORMALIZATION_TRACKS,
1595 )
1596 )
1597 )
1598
1599 # handle no measurement available but fallback to dynamic mode is allowed
1600 if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_DYNAMIC:
1601 return VolumeNormalizationMode.DYNAMIC
1602
1603 # handle no measurement available and no fallback allowed
1604 if streamdetails.loudness is None and preference == VolumeNormalizationMode.MEASUREMENT_ONLY:
1605 return VolumeNormalizationMode.DISABLED
1606
1607 # handle no measurement available and fallback to fixed gain is allowed
1608 if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_FIXED_GAIN:
1609 return VolumeNormalizationMode.FIXED_GAIN
1610
1611 # handle measurement available - chosen mode is measurement
1612 if streamdetails.loudness is not None and preference not in (
1613 VolumeNormalizationMode.DISABLED,
1614 VolumeNormalizationMode.FIXED_GAIN,
1615 VolumeNormalizationMode.DYNAMIC,
1616 ):
1617 return VolumeNormalizationMode.MEASUREMENT_ONLY
1618
1619 # simply return the preference
1620 return preference
1621