/
/
/
1"""Various helpers for audio streaming and manipulation."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import os
8import re
9import struct
10import time
11from collections.abc import AsyncGenerator
12from functools import partial
13from io import BytesIO
14from typing import TYPE_CHECKING, Final, cast
15
16import aiofiles
17import shortuuid
18from aiohttp import ClientTimeout
19from music_assistant_models.dsp import DSPConfig, DSPDetails, DSPState
20from music_assistant_models.enums import (
21 ContentType,
22 MediaType,
23 PlayerFeature,
24 PlayerType,
25 StreamType,
26 VolumeNormalizationMode,
27)
28from music_assistant_models.errors import (
29 AudioError,
30 InvalidDataError,
31 MediaNotFoundError,
32 MusicAssistantError,
33 ProviderUnavailableError,
34)
35from music_assistant_models.media_items import AudioFormat
36from music_assistant_models.streamdetails import MultiPartPath
37
38from music_assistant.constants import (
39 CONF_ENTRY_OUTPUT_LIMITER,
40 CONF_OUTPUT_CHANNELS,
41 CONF_VOLUME_NORMALIZATION,
42 CONF_VOLUME_NORMALIZATION_RADIO,
43 CONF_VOLUME_NORMALIZATION_TARGET,
44 CONF_VOLUME_NORMALIZATION_TRACKS,
45 MASS_LOGGER_NAME,
46 VERBOSE_LOG_LEVEL,
47)
48from music_assistant.controllers.players.sync_groups import SyncGroupPlayer
49from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
50from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
51from music_assistant.helpers.util import clean_stream_title, remove_file
52
53from .audio_buffer import AudioBuffer
54from .dsp import filter_to_ffmpeg_params
55from .ffmpeg import FFMpeg, get_ffmpeg_args, get_ffmpeg_stream
56from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
57from .process import AsyncProcess, communicate
58from .util import detect_charset
59
60if TYPE_CHECKING:
61 from music_assistant_models.config_entries import CoreConfig, PlayerConfig
62 from music_assistant_models.queue_item import QueueItem
63 from music_assistant_models.streamdetails import StreamDetails
64
65 from music_assistant.mass import MusicAssistant
66 from music_assistant.models.music_provider import MusicProvider
67 from music_assistant.models.player import Player
68
69LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio")
70
71# ruff: noqa: PLR0915
72
73HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
74HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
75
76SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
77
78CACHE_CATEGORY_RESOLVED_RADIO_URL: Final[int] = 100
79CACHE_PROVIDER: Final[str] = "audio"
80
81
82def align_audio_to_frame_boundary(audio_data: bytes, pcm_format: AudioFormat) -> bytes:
83 """Align audio data to frame boundaries by truncating incomplete frames.
84
85 :param audio_data: Raw PCM audio data to align.
86 :param pcm_format: AudioFormat of the audio data.
87 """
88 bytes_per_sample = pcm_format.bit_depth // 8
89 frame_size = bytes_per_sample * pcm_format.channels
90 valid_bytes = (len(audio_data) // frame_size) * frame_size
91 if valid_bytes != len(audio_data):
92 LOGGER.debug(
93 "Truncating %d bytes from audio buffer to align to frame boundary",
94 len(audio_data) - valid_bytes,
95 )
96 return audio_data[:valid_bytes]
97 return audio_data
98
99
100async def strip_silence(
101 mass: MusicAssistant, # noqa: ARG001
102 audio_data: bytes,
103 pcm_format: AudioFormat,
104 reverse: bool = False,
105) -> bytes:
106 """Strip silence from begin or end of pcm audio using ffmpeg."""
107 args = ["ffmpeg", "-hide_banner", "-loglevel", "quiet"]
108 args += [
109 "-acodec",
110 pcm_format.content_type.name.lower(),
111 "-f",
112 pcm_format.content_type.value,
113 "-ac",
114 str(pcm_format.channels),
115 "-ar",
116 str(pcm_format.sample_rate),
117 "-i",
118 "-",
119 ]
120 # filter args
121 if reverse:
122 args += [
123 "-af",
124 "areverse,atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02,areverse",
125 ]
126 else:
127 args += [
128 "-af",
129 "atrim=start=0.2,silenceremove=start_periods=1:start_silence=0.1:start_threshold=0.02",
130 ]
131 # output args
132 args += ["-f", pcm_format.content_type.value, "-"]
133 _returncode, stripped_data, _stderr = await communicate(args, audio_data)
134
135 # return stripped audio
136 bytes_stripped = len(audio_data) - len(stripped_data)
137 if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
138 seconds_stripped = round(bytes_stripped / pcm_format.pcm_sample_size, 2)
139 location = "end" if reverse else "begin"
140 LOGGER.log(
141 VERBOSE_LOG_LEVEL,
142 "stripped %s seconds of silence from %s of pcm audio. bytes stripped: %s",
143 seconds_stripped,
144 location,
145 bytes_stripped,
146 )
147 return stripped_data
148
149
150def get_player_dsp_details(
151 mass: MusicAssistant, player: Player, group_preventing_dsp: bool = False
152) -> DSPDetails:
153 """Return DSP details of single a player.
154
155 This will however not check if the queried player is part of a group.
156 The caller is responsible for passing the result of is_grouping_preventing_dsp of
157 the leader/PlayerGroup as the group_preventing_dsp argument in such cases.
158 """
159 dsp_config = mass.config.get_player_dsp_config(player.player_id)
160 dsp_state = DSPState.ENABLED if dsp_config.enabled else DSPState.DISABLED
161 if dsp_state == DSPState.ENABLED and (
162 group_preventing_dsp or is_grouping_preventing_dsp(player)
163 ):
164 dsp_state = DSPState.DISABLED_BY_UNSUPPORTED_GROUP
165 dsp_config = DSPConfig(enabled=False)
166 elif dsp_state == DSPState.DISABLED:
167 # DSP is disabled by the user, remove all filters
168 dsp_config = DSPConfig(enabled=False)
169
170 # remove disabled filters
171 dsp_config.filters = [x for x in dsp_config.filters if x.enabled]
172
173 output_limiter = is_output_limiter_enabled(mass, player)
174 return DSPDetails(
175 state=dsp_state,
176 input_gain=dsp_config.input_gain,
177 filters=dsp_config.filters,
178 output_gain=dsp_config.output_gain,
179 output_limiter=output_limiter,
180 output_format=player.extra_data.get("output_format", None),
181 )
182
183
184def get_stream_dsp_details(
185 mass: MusicAssistant,
186 queue_id: str,
187) -> dict[str, DSPDetails]:
188 """Return DSP details of all players playing this queue, keyed by player_id."""
189 player = mass.players.get(queue_id)
190 dsp: dict[str, DSPDetails] = {}
191 assert player is not None # for type checking
192 group_preventing_dsp = is_grouping_preventing_dsp(player)
193 output_format = None
194 is_external_group = False
195
196 if player.type == PlayerType.GROUP and isinstance(player, SyncGroupPlayer):
197 if group_preventing_dsp:
198 if sync_leader := player.sync_leader:
199 output_format = sync_leader.extra_data.get("output_format", None)
200 else:
201 # We only add real players (so skip the PlayerGroups as they only sync containing players)
202 details = get_player_dsp_details(mass, player)
203 dsp[player.player_id] = details
204 if group_preventing_dsp:
205 # The leader is responsible for sending the (combined) audio stream, so get
206 # the output format from the leader.
207 output_format = player.extra_data.get("output_format", None)
208 is_external_group = player.type in (PlayerType.GROUP, PlayerType.STEREO_PAIR)
209
210 # We don't enumerate all group members in case this group is externally created
211 # (e.g. a Chromecast group from the Google Home app)
212 if player and player.group_members and not is_external_group:
213 # grouped playback, get DSP details for each player in the group
214 for child_id in player.group_members:
215 # skip if we already have the details (so if it's the group leader)
216 if child_id in dsp:
217 continue
218 if child_player := mass.players.get(child_id):
219 dsp[child_id] = get_player_dsp_details(
220 mass, child_player, group_preventing_dsp=group_preventing_dsp
221 )
222 if group_preventing_dsp:
223 # Use the correct format from the group leader, since
224 # this player is part of a group that does not support
225 # multi device DSP processing.
226 dsp[child_id].output_format = output_format
227 return dsp
228
229
230async def get_stream_details(
231 mass: MusicAssistant,
232 queue_item: QueueItem,
233 seek_position: int = 0,
234 fade_in: bool = False,
235 prefer_album_loudness: bool = False,
236) -> StreamDetails:
237 """
238 Get streamdetails for the given QueueItem.
239
240 This is called just-in-time when a PlayerQueue wants a MediaItem to be played.
241 Do not try to request streamdetails too much in advance as this is expiring data.
242 """
243 streamdetails: StreamDetails | None = None
244 time_start = time.time()
245 LOGGER.debug("Getting streamdetails for %s", queue_item.uri)
246 if seek_position and (queue_item.media_type == MediaType.RADIO or not queue_item.duration):
247 LOGGER.warning("seeking is not possible on duration-less streams!")
248 seek_position = 0
249
250 if not queue_item.media_item and not queue_item.streamdetails:
251 # in case of a non-media item queue item, the streamdetails should already be provided
252 # this should not happen, but guard it just in case
253 raise MediaNotFoundError(
254 f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
255 )
256 buffer: AudioBuffer | None = None
257 if queue_item.streamdetails and (
258 (queue_item.streamdetails.created_at + queue_item.streamdetails.expiration) > time.time()
259 or ((buffer := queue_item.streamdetails.buffer) and buffer.is_valid(seek_position))
260 ):
261 # already got a fresh/unused (or unexpired) streamdetails
262 streamdetails = queue_item.streamdetails
263 else:
264 # need to (re)create streamdetails
265 # retrieve streamdetails from provider
266
267 media_item = queue_item.media_item
268 assert media_item is not None # for type checking
269 preferred_providers: list[str] = []
270 if (
271 (queue := mass.player_queues.get(queue_item.queue_id))
272 and queue.userid
273 and (playback_user := await mass.webserver.auth.get_user(queue.userid))
274 and playback_user.provider_filter
275 ):
276 # handle steering into user preferred providerinstance
277 preferred_providers = playback_user.provider_filter
278 else:
279 preferred_providers = [x.provider_instance for x in media_item.provider_mappings]
280 for allow_other_provider in (False, True):
281 # sort by quality and check item's availability
282 for prov_media in sorted(
283 media_item.provider_mappings, key=lambda x: x.quality or 0, reverse=True
284 ):
285 if not prov_media.available:
286 LOGGER.debug(f"Skipping unavailable {prov_media}")
287 continue
288 if (
289 not allow_other_provider
290 and prov_media.provider_instance not in preferred_providers
291 ):
292 continue
293 # guard that provider is available
294 music_prov = mass.get_provider(prov_media.provider_instance)
295 if TYPE_CHECKING: # avoid circular import
296 assert isinstance(music_prov, MusicProvider)
297 if not music_prov:
298 LOGGER.debug(f"Skipping {prov_media} - provider not available")
299 continue # provider not available ?
300 # get streamdetails from provider
301 try:
302 BYPASS_THROTTLER.set(True)
303 streamdetails = await music_prov.get_stream_details(
304 prov_media.item_id, media_item.media_type
305 )
306 except MusicAssistantError as err:
307 LOGGER.warning(str(err))
308 else:
309 break
310 finally:
311 BYPASS_THROTTLER.set(False)
312
313 if not streamdetails:
314 msg = f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
315 raise MediaNotFoundError(msg)
316
317 # work out how to handle radio stream
318 if (
319 streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
320 and streamdetails.media_type == MediaType.RADIO
321 and isinstance(streamdetails.path, str)
322 ):
323 resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
324 streamdetails.path = resolved_url
325 streamdetails.stream_type = stream_type
326 # Set up metadata monitoring callback for HLS radio streams
327 if stream_type == StreamType.HLS:
328 streamdetails.stream_metadata_update_callback = partial(
329 _update_hls_radio_metadata, mass
330 )
331 streamdetails.stream_metadata_update_interval = 5
332 # handle volume normalization details
333 if result := await mass.music.get_loudness(
334 streamdetails.item_id,
335 streamdetails.provider,
336 media_type=queue_item.media_type,
337 ):
338 streamdetails.loudness = result[0]
339 streamdetails.loudness_album = result[1]
340
341 # set queue_id on the streamdetails so we know what is being streamed
342 streamdetails.queue_id = queue_item.queue_id
343 # handle skip/fade_in details
344 streamdetails.seek_position = seek_position
345 streamdetails.fade_in = fade_in
346 if not streamdetails.duration:
347 streamdetails.duration = queue_item.duration
348 streamdetails.prefer_album_loudness = prefer_album_loudness
349 player_settings = await mass.config.get_player_config(streamdetails.queue_id)
350 core_config = await mass.config.get_core_config("streams")
351 conf_volume_normalization_target = float(
352 str(player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET, -17))
353 )
354 if conf_volume_normalization_target < -30 or conf_volume_normalization_target >= 0:
355 conf_volume_normalization_target = -17.0 # reset to default if out of bounds
356 LOGGER.warning(
357 "Invalid volume normalization target configured for player %s, "
358 "resetting to default of -17.0 dB",
359 streamdetails.queue_id,
360 )
361 streamdetails.target_loudness = conf_volume_normalization_target
362 streamdetails.volume_normalization_mode = _get_normalization_mode(
363 core_config, player_settings, streamdetails
364 )
365
366 # attach the DSP details of all group members
367 streamdetails.dsp = get_stream_dsp_details(mass, streamdetails.queue_id)
368
369 LOGGER.debug(
370 "retrieved streamdetails for %s in %s milliseconds",
371 queue_item.uri,
372 int((time.time() - time_start) * 1000),
373 )
374 return streamdetails
375
376
377async def get_buffered_media_stream(
378 mass: MusicAssistant,
379 streamdetails: StreamDetails,
380 pcm_format: AudioFormat,
381 seek_position: int = 0,
382 filter_params: list[str] | None = None,
383) -> AsyncGenerator[bytes, None]:
384 """Get audio stream for given media details as raw PCM with buffering."""
385 LOGGER.log(
386 VERBOSE_LOG_LEVEL,
387 "buffered_media_stream: Starting for %s (seek: %s)",
388 streamdetails.uri,
389 seek_position,
390 )
391
392 # checksum based on filter_params
393 checksum = f"{filter_params}"
394
395 async def fill_buffer_task() -> None:
396 """Background task to fill the audio buffer."""
397 chunk_count = 0
398 status = "running"
399 try:
400 async for chunk in get_media_stream(
401 mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params
402 ):
403 chunk_count += 1
404 await audio_buffer.put(chunk)
405 # Yield to event loop to prevent blocking warnings
406 await asyncio.sleep(0)
407 # Only set EOF if we completed successfully
408 await audio_buffer.set_eof()
409 except asyncio.CancelledError:
410 status = "cancelled"
411 raise
412 except Exception:
413 status = "aborted with error"
414 raise
415 finally:
416 LOGGER.log(
417 VERBOSE_LOG_LEVEL,
418 "fill_buffer_task: %s (%s chunks) for %s",
419 status,
420 chunk_count,
421 streamdetails.uri,
422 )
423
424 # check for existing buffer and reuse if possible
425 existing_buffer: AudioBuffer | None = streamdetails.buffer
426 if existing_buffer is not None:
427 if not existing_buffer.is_valid(checksum, seek_position):
428 LOGGER.log(
429 VERBOSE_LOG_LEVEL,
430 "buffered_media_stream: Existing buffer invalid for %s (seek: %s, discarded: %s)",
431 streamdetails.uri,
432 seek_position,
433 existing_buffer._discarded_chunks,
434 )
435 await existing_buffer.clear()
436 streamdetails.buffer = None
437 existing_buffer = None
438 else:
439 LOGGER.debug(
440 "buffered_media_stream: Reusing existing buffer for %s - "
441 "available: %ss, seek: %s, discarded: %s",
442 streamdetails.uri,
443 existing_buffer.seconds_available,
444 seek_position,
445 existing_buffer._discarded_chunks,
446 )
447 audio_buffer = existing_buffer
448
449 if not existing_buffer and seek_position > 60:
450 # If seeking into the track and no valid buffer exists,
451 # just start a normal stream without buffering,
452 # otherwise we would need to fill the buffer up to the seek position first
453 # which is not efficient.
454 LOGGER.debug(
455 "buffered_media_stream: No existing buffer and seek >60s for %s, "
456 "starting normal (unbuffered) stream",
457 streamdetails.uri,
458 )
459 async for chunk in get_media_stream(
460 mass,
461 streamdetails,
462 pcm_format,
463 seek_position=seek_position,
464 filter_params=filter_params,
465 ):
466 yield chunk
467 return
468
469 if not existing_buffer:
470 # create new audio buffer and start fill task
471 LOGGER.debug(
472 "buffered_media_stream: Creating new buffer for %s",
473 streamdetails.uri,
474 )
475 audio_buffer = AudioBuffer(pcm_format, checksum)
476 streamdetails.buffer = audio_buffer
477 task = mass.loop.create_task(fill_buffer_task())
478 audio_buffer.attach_producer_task(task)
479
480 # special case: pcm format mismatch, resample on the fly
481 # this may happen in some special situations such as crossfading
482 # and its a bit of a waste to throw away the existing buffer
483 if audio_buffer.pcm_format != pcm_format:
484 LOGGER.info(
485 "buffered_media_stream: pcm format mismatch, resampling on the fly for %s - "
486 "buffer format: %s - requested format: %s",
487 streamdetails.uri,
488 audio_buffer.pcm_format,
489 pcm_format,
490 )
491 async for chunk in get_ffmpeg_stream(
492 audio_input=audio_buffer.iter(seek_position=seek_position),
493 input_format=audio_buffer.pcm_format,
494 output_format=pcm_format,
495 ):
496 yield chunk
497 return
498
499 # yield data from the buffer
500 chunk_count = 0
501 try:
502 async for chunk in audio_buffer.iter(seek_position=seek_position):
503 chunk_count += 1
504 yield chunk
505 finally:
506 LOGGER.log(
507 VERBOSE_LOG_LEVEL,
508 "buffered_media_stream: Completed, yielded %s chunks",
509 chunk_count,
510 )
511
512
513async def get_media_stream(
514 mass: MusicAssistant,
515 streamdetails: StreamDetails,
516 pcm_format: AudioFormat,
517 seek_position: int = 0,
518 filter_params: list[str] | None = None,
519) -> AsyncGenerator[bytes, None]:
520 """Get audio stream for given media details as raw PCM."""
521 logger = LOGGER.getChild("media_stream")
522 logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
523 extra_input_args = streamdetails.extra_input_args or []
524
525 # work out audio source for these streamdetails
526 audio_source: str | AsyncGenerator[bytes, None]
527 stream_type = streamdetails.stream_type
528 if stream_type == StreamType.CUSTOM:
529 music_prov = mass.get_provider(streamdetails.provider)
530 if TYPE_CHECKING: # avoid circular import
531 assert isinstance(music_prov, MusicProvider)
532 audio_source = music_prov.get_audio_stream(
533 streamdetails,
534 seek_position=seek_position if streamdetails.can_seek else 0,
535 )
536 seek_position = 0 if streamdetails.can_seek else seek_position
537 elif stream_type == StreamType.ICY:
538 assert isinstance(streamdetails.path, str) # for type checking
539 audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
540 seek_position = 0 # seeking not possible on radio streams
541 elif stream_type == StreamType.HLS:
542 assert isinstance(streamdetails.path, str) # for type checking
543 substream = await get_hls_substream(mass, streamdetails.path)
544 audio_source = substream.path
545 if streamdetails.media_type == MediaType.RADIO:
546 # HLS streams (especially the BBC) struggle when they're played directly
547 # with ffmpeg, where they just stop after some minutes,
548 # so we tell ffmpeg to loop around in this case.
549 extra_input_args += ["-stream_loop", "-1", "-re"]
550 else:
551 # all other stream types (HTTP, FILE, etc)
552 if stream_type == StreamType.ENCRYPTED_HTTP:
553 assert streamdetails.decryption_key is not None # for type checking
554 extra_input_args += ["-decryption_key", streamdetails.decryption_key]
555 if isinstance(streamdetails.path, list):
556 # multi part stream
557 audio_source = get_multi_file_stream(mass, streamdetails, seek_position)
558 seek_position = 0 # handled by get_multi_file_stream
559 else:
560 # regular single file/url stream
561 assert isinstance(streamdetails.path, str) # for type checking
562 audio_source = streamdetails.path
563
564 # handle seek support
565 if seek_position and streamdetails.duration and streamdetails.allow_seek:
566 extra_input_args += ["-ss", str(int(seek_position))]
567
568 bytes_sent = 0
569 finished = False
570 cancelled = False
571 first_chunk_received = False
572 ffmpeg_proc = FFMpeg(
573 audio_input=audio_source,
574 input_format=streamdetails.audio_format,
575 output_format=pcm_format,
576 filter_params=filter_params,
577 extra_input_args=extra_input_args,
578 collect_log_history=True,
579 loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
580 )
581
582 try:
583 await ffmpeg_proc.start()
584 assert ffmpeg_proc.proc is not None # for type checking
585 logger.debug(
586 "Started media stream for %s - using streamtype: %s - pcm format: %s - ffmpeg PID: %s",
587 streamdetails.uri,
588 streamdetails.stream_type,
589 pcm_format.content_type.value,
590 ffmpeg_proc.proc.pid,
591 )
592 stream_start = mass.loop.time()
593
594 chunk_size = get_chunksize(pcm_format, 1)
595 async for chunk in ffmpeg_proc.iter_chunked(chunk_size):
596 if not first_chunk_received:
597 # At this point ffmpeg has started and should now know the codec used
598 # for encoding the audio.
599 first_chunk_received = True
600 streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type
601 logger.debug(
602 "First chunk received after %.2f seconds (codec detected: %s)",
603 mass.loop.time() - stream_start,
604 ffmpeg_proc.input_format.codec_type,
605 )
606 yield chunk
607 bytes_sent += len(chunk)
608
609 # end of audio/track reached
610 logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
611 # wait until stderr also completed reading
612 await ffmpeg_proc.wait_with_timeout(5)
613 if ffmpeg_proc.returncode not in (0, None):
614 log_trail = "\n".join(list(ffmpeg_proc.log_history)[-5:])
615 raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}: {log_trail}")
616 if bytes_sent == 0:
617 # edge case: no audio data was received at all
618 raise AudioError("No audio was received")
619 finished = True
620 except (Exception, GeneratorExit, asyncio.CancelledError) as err:
621 if isinstance(err, asyncio.CancelledError | GeneratorExit):
622 # we were cancelled, just raise
623 cancelled = True
624 raise
625 # dump the last 10 lines of the log in case of an unclean exit
626 logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
627 raise AudioError(f"Error while streaming: {err}") from err
628 finally:
629 # always ensure close is called which also handles all cleanup
630 await ffmpeg_proc.close()
631 # determine how many seconds we've received
632 # for pcm output we can calculate this easily
633 seconds_received = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
634 # store accurate duration
635 if finished and not seek_position and seconds_received:
636 streamdetails.duration = int(seconds_received)
637
638 logger.log(
639 VERBOSE_LOG_LEVEL,
640 "stream %s (with code %s) for %s",
641 "cancelled" if cancelled else "finished" if finished else "aborted",
642 ffmpeg_proc.returncode,
643 streamdetails.uri,
644 )
645
646 # parse loudnorm data if we have that collected (and enabled)
647 if (
648 (streamdetails.loudness is None or finished)
649 and streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC
650 and (finished or (seconds_received >= 300))
651 ):
652 # if dynamic volume normalization is enabled
653 # the loudnorm filter will output the measurement in the log,
654 # so we can use that directly instead of analyzing the audio
655 logger.log(VERBOSE_LOG_LEVEL, "Collecting loudness measurement...")
656 if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
657 logger.debug(
658 "Loudness measurement for %s: %s dB",
659 streamdetails.uri,
660 loudness_details,
661 )
662 mass.create_task(
663 mass.music.set_loudness(
664 streamdetails.item_id,
665 streamdetails.provider,
666 loudness_details,
667 media_type=streamdetails.media_type,
668 )
669 )
670
671
672def create_wave_header(
673 samplerate: int = 44100, channels: int = 2, bitspersample: int = 16, duration: int | None = None
674) -> bytes:
675 """Generate a wave header from given params."""
676 file = BytesIO()
677
678 # Generate format chunk
679 format_chunk_spec = b"<4sLHHLLHH"
680 format_chunk = struct.pack(
681 format_chunk_spec,
682 b"fmt ", # Chunk id
683 16, # Size of this chunk (excluding chunk id and this field)
684 1, # Audio format, 1 for PCM
685 channels, # Number of channels
686 int(samplerate), # Samplerate, 44100, 48000, etc.
687 int(samplerate * channels * (bitspersample / 8)), # Byterate
688 int(channels * (bitspersample / 8)), # Blockalign
689 bitspersample, # 16 bits for two byte samples, etc.
690 )
691 # Generate data chunk
692 # duration = 3600*6.7
693 data_chunk_spec = b"<4sL"
694 if duration is None:
695 # use max value possible
696 datasize = 4254768000 # = 6,7 hours at 44100/16
697 else:
698 # calculate from duration
699 numsamples = samplerate * duration
700 datasize = int(numsamples * channels * (bitspersample / 8))
701 data_chunk = struct.pack(
702 data_chunk_spec,
703 b"data", # Chunk id
704 int(datasize), # Chunk size (excluding chunk id and this field)
705 )
706 sum_items = [
707 # "WAVE" string following size field
708 4,
709 # "fmt " + chunk size field + chunk size
710 struct.calcsize(format_chunk_spec),
711 # Size of data chunk spec + data size
712 struct.calcsize(data_chunk_spec) + datasize,
713 ]
714 # Generate main header
715 all_chunks_size = int(sum(sum_items))
716 main_header_spec = b"<4sL4s"
717 main_header = struct.pack(main_header_spec, b"RIFF", all_chunks_size, b"WAVE")
718 # Write all the contents in
719 file.write(main_header)
720 file.write(format_chunk)
721 file.write(data_chunk)
722
723 # return file.getvalue(), all_chunks_size + 8
724 return file.getvalue()
725
726
727async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, StreamType]:
728 """
729 Resolve a streaming radio URL.
730
731 Unwraps any playlists if needed.
732 Determines if the stream supports ICY metadata.
733
734 Returns tuple;
735 - unfolded URL as string
736 - StreamType to determine ICY (radio) or HLS stream.
737 """
738 if cache := await mass.cache.get(
739 key=url, provider=CACHE_PROVIDER, category=CACHE_CATEGORY_RESOLVED_RADIO_URL
740 ):
741 if TYPE_CHECKING: # for type checking
742 cache = cast("tuple[str, str]", cache)
743 return (cache[0], StreamType(cache[1]))
744 stream_type = StreamType.HTTP
745 resolved_url = url
746 timeout = ClientTimeout(total=None, connect=10, sock_read=5)
747 try:
748 async with mass.http_session_no_ssl.get(
749 url, headers=HTTP_HEADERS_ICY, allow_redirects=True, timeout=timeout
750 ) as resp:
751 headers = resp.headers
752 resp.raise_for_status()
753 if not resp.headers:
754 raise InvalidDataError("no headers found")
755 if headers.get("icy-metaint") is not None:
756 stream_type = StreamType.ICY
757 if (
758 url.endswith((".m3u", ".m3u8", ".pls"))
759 or ".m3u?" in url
760 or ".m3u8?" in url
761 or ".pls?" in url
762 or "audio/x-mpegurl" in headers.get("content-type", "")
763 or "audio/x-scpls" in headers.get("content-type", "")
764 ):
765 # url is playlist, we need to unfold it
766 try:
767 substreams = await fetch_playlist(mass, url)
768 if not any(x for x in substreams if x.length):
769 for line in substreams:
770 if not line.is_url:
771 continue
772 # unfold first url of playlist
773 return await resolve_radio_stream(mass, line.path)
774 raise InvalidDataError("No content found in playlist")
775 except IsHLSPlaylist:
776 stream_type = StreamType.HLS
777
778 except Exception as err:
779 LOGGER.warning("Error while parsing radio URL %s: %s", url, str(err))
780 return (url, stream_type)
781
782 result = (resolved_url, stream_type)
783 cache_expiration = 3600 * 3
784 await mass.cache.set(
785 url,
786 result,
787 expiration=cache_expiration,
788 provider=CACHE_PROVIDER,
789 category=CACHE_CATEGORY_RESOLVED_RADIO_URL,
790 )
791 return result
792
793
794async def get_icy_radio_stream(
795 mass: MusicAssistant, url: str, streamdetails: StreamDetails
796) -> AsyncGenerator[bytes, None]:
797 """Get (radio) audio stream from HTTP, including ICY metadata retrieval."""
798 timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
799 LOGGER.debug("Start streaming radio with ICY metadata from url %s", url)
800 async with mass.http_session_no_ssl.get(
801 url, allow_redirects=True, headers=HTTP_HEADERS_ICY, timeout=timeout
802 ) as resp:
803 headers = resp.headers
804 meta_int = int(headers["icy-metaint"])
805 while True:
806 try:
807 yield await resp.content.readexactly(meta_int)
808 meta_byte = await resp.content.readexactly(1)
809 if meta_byte == b"\x00":
810 continue
811 meta_length = ord(meta_byte) * 16
812 meta_data = await resp.content.readexactly(meta_length)
813 except asyncio.exceptions.IncompleteReadError:
814 break
815 if not meta_data:
816 continue
817 meta_data = meta_data.rstrip(b"\0")
818 stream_title_re = re.search(rb"StreamTitle='([^']*)';", meta_data)
819 if not stream_title_re:
820 continue
821 try:
822 # in 99% of the cases the stream title is utf-8 encoded
823 stream_title = stream_title_re.group(1).decode("utf-8")
824 except UnicodeDecodeError:
825 # fallback to iso-8859-1
826 stream_title = stream_title_re.group(1).decode("iso-8859-1", errors="replace")
827 cleaned_stream_title = clean_stream_title(stream_title)
828 if cleaned_stream_title != streamdetails.stream_title:
829 LOGGER.log(
830 VERBOSE_LOG_LEVEL,
831 "ICY Radio streamtitle original: %s",
832 stream_title,
833 )
834 LOGGER.log(
835 VERBOSE_LOG_LEVEL,
836 "ICY Radio streamtitle cleaned: %s",
837 cleaned_stream_title,
838 )
839 streamdetails.stream_title = cleaned_stream_title
840
841
842def parse_extinf_metadata(extinf_line: str) -> dict[str, str]:
843 """
844 Parse metadata from HLS EXTINF line.
845
846 Extracts structured metadata like title="...", artist="..." from EXTINF lines.
847 Common in iHeartRadio and other commercial radio HLS streams.
848
849 :param extinf_line: The EXTINF line containing metadata
850 """
851 metadata = {}
852
853 # Pattern to match key="value" pairs in the EXTINF line
854 # Handles nested quotes by matching everything until the closing quote
855 pattern = r'(\w+)="([^"]*)"'
856
857 matches = re.findall(pattern, extinf_line)
858 for key, value in matches:
859 metadata[key.lower()] = value
860
861 return metadata
862
863
864async def _update_hls_radio_metadata(
865 mass: MusicAssistant,
866 streamdetails: StreamDetails,
867 elapsed_time: int, # noqa: ARG001
868) -> None:
869 """
870 Update HLS radio stream metadata by fetching the playlist.
871
872 Fetches the HLS playlist and extracts metadata from EXTINF lines.
873
874 :param mass: MusicAssistant instance
875 :param streamdetails: StreamDetails object to update with metadata
876 :param elapsed_time: Current playback position in seconds (unused for live radio)
877 """
878 try:
879 # Get the actual media playlist URL from cache or resolve it
880 # We cache the media_playlist_url in streamdetails.data to avoid re-resolving
881 if streamdetails.data is None:
882 streamdetails.data = {}
883 media_playlist_url = streamdetails.data.get("hls_media_playlist_url")
884 if not media_playlist_url:
885 try:
886 assert isinstance(streamdetails.path, str) # for type checking
887 substream = await get_hls_substream(mass, streamdetails.path)
888 media_playlist_url = substream.path
889 streamdetails.data["hls_media_playlist_url"] = media_playlist_url
890 except Exception as err:
891 LOGGER.warning(
892 "Failed to resolve HLS substream for metadata monitoring: %s",
893 err,
894 )
895 return
896
897 # Fetch the media playlist
898 timeout = ClientTimeout(total=0, connect=10, sock_read=30)
899 async with mass.http_session_no_ssl.get(media_playlist_url, timeout=timeout) as resp:
900 resp.raise_for_status()
901 playlist_content = await resp.text()
902
903 # Parse the playlist and look for EXTINF metadata
904 # The most recent segment usually has the current metadata
905 lines = playlist_content.strip().split("\n")
906 for line in reversed(lines):
907 if line.startswith("#EXTINF:"):
908 # Extract metadata from EXTINF line
909 metadata = parse_extinf_metadata(line)
910
911 # Build stream title from title and artist
912 title = metadata.get("title", "")
913 artist = metadata.get("artist", "")
914
915 if title or artist:
916 # Format as "Artist - Title"
917 if artist and title:
918 stream_title = f"{artist} - {title}"
919 elif title:
920 stream_title = title
921 else:
922 stream_title = artist
923
924 # Clean the stream title
925 cleaned_title = clean_stream_title(stream_title)
926
927 # Only update if changed
928 if cleaned_title != streamdetails.stream_title and cleaned_title:
929 LOGGER.log(
930 VERBOSE_LOG_LEVEL,
931 "HLS Radio metadata updated: %s",
932 cleaned_title,
933 )
934 streamdetails.stream_title = cleaned_title
935
936 # Only check the most recent EXTINF
937 break
938
939 except Exception as err:
940 LOGGER.debug(
941 "Error fetching HLS metadata: %s",
942 err,
943 )
944
945
946async def get_hls_substream(
947 mass: MusicAssistant,
948 url: str,
949) -> PlaylistItem:
950 """Select the (highest quality) HLS substream for given HLS playlist/URL."""
951 timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
952 # fetch master playlist and select (best) child playlist
953 # https://datatracker.ietf.org/doc/html/draft-pantos-http-live-streaming-19#section-10
954 async with mass.http_session_no_ssl.get(
955 url, allow_redirects=True, headers=HTTP_HEADERS, timeout=timeout
956 ) as resp:
957 resp.raise_for_status()
958 raw_data = await resp.read()
959 encoding = resp.charset or await detect_charset(raw_data)
960 master_m3u_data = raw_data.decode(encoding)
961 substreams = parse_m3u(master_m3u_data)
962 # There is a chance that we did not get a master playlist with subplaylists
963 # but just a single master/sub playlist with the actual audio stream(s)
964 # so we need to detect if the playlist child's contain audio streams or
965 # sub-playlists.
966 if any(
967 x
968 for x in substreams
969 if (x.length or x.path.endswith((".mp4", ".aac")))
970 and not x.path.endswith((".m3u", ".m3u8"))
971 ):
972 return PlaylistItem(path=url, key=substreams[0].key)
973 # sort substreams on best quality (highest bandwidth) when available
974 if any(x for x in substreams if x.stream_info):
975 substreams.sort(
976 key=lambda x: int(
977 x.stream_info.get("BANDWIDTH", "0") if x.stream_info is not None else 0
978 ),
979 reverse=True,
980 )
981 substream = substreams[0]
982 if not substream.path.startswith("http"):
983 # path is relative, stitch it together
984 base_path = url.rsplit("/", 1)[0]
985 substream.path = base_path + "/" + substream.path
986 return substream
987
988
989async def get_http_stream(
990 mass: MusicAssistant,
991 url: str,
992 streamdetails: StreamDetails,
993 seek_position: int = 0,
994 verify_ssl: bool = True,
995) -> AsyncGenerator[bytes, None]:
996 """Get audio stream from HTTP."""
997 LOGGER.debug("Start HTTP stream for %s (seek_position %s)", streamdetails.uri, seek_position)
998 if seek_position:
999 assert streamdetails.duration, "Duration required for seek requests"
1000 http_session = mass.http_session if verify_ssl else mass.http_session_no_ssl
1001 # try to get filesize with a head request
1002 seek_supported = streamdetails.can_seek
1003 if seek_position or not streamdetails.size:
1004 async with http_session.head(url, allow_redirects=True, headers=HTTP_HEADERS) as resp:
1005 resp.raise_for_status()
1006 if size := resp.headers.get("Content-Length"):
1007 streamdetails.size = int(size)
1008 seek_supported = resp.headers.get("Accept-Ranges") == "bytes"
1009 # headers
1010 headers = {**HTTP_HEADERS}
1011 timeout = ClientTimeout(total=None, connect=30, sock_read=5 * 60)
1012 skip_bytes = 0
1013 if seek_position and streamdetails.size:
1014 assert streamdetails.duration is not None # for type checking
1015 skip_bytes = int(streamdetails.size / streamdetails.duration * seek_position)
1016 headers["Range"] = f"bytes={skip_bytes}-{streamdetails.size}"
1017
1018 # seeking an unknown or container format is not supported due to the (moov) headers
1019 if seek_position and (
1020 not seek_supported
1021 or streamdetails.audio_format.content_type
1022 in (
1023 ContentType.UNKNOWN,
1024 ContentType.M4A,
1025 ContentType.M4B,
1026 )
1027 ):
1028 LOGGER.warning(
1029 "Seeking in %s (%s) not possible.",
1030 streamdetails.uri,
1031 streamdetails.audio_format.output_format_str,
1032 )
1033 seek_position = 0
1034 streamdetails.seek_position = 0
1035
1036 # start the streaming from http
1037 bytes_received = 0
1038 async with http_session.get(
1039 url, allow_redirects=True, headers=headers, timeout=timeout
1040 ) as resp:
1041 is_partial = resp.status == 206
1042 if seek_position and not is_partial:
1043 raise InvalidDataError("HTTP source does not support seeking!")
1044 resp.raise_for_status()
1045 async for chunk in resp.content.iter_any():
1046 bytes_received += len(chunk)
1047 yield chunk
1048
1049 # store size on streamdetails for later use
1050 if not streamdetails.size:
1051 streamdetails.size = bytes_received
1052 LOGGER.debug(
1053 "Finished HTTP stream for %s (transferred %s/%s bytes)",
1054 streamdetails.uri,
1055 bytes_received,
1056 streamdetails.size,
1057 )
1058
1059
1060async def get_file_stream(
1061 mass: MusicAssistant, # noqa: ARG001
1062 filename: str,
1063 streamdetails: StreamDetails,
1064 seek_position: int = 0,
1065) -> AsyncGenerator[bytes, None]:
1066 """Get audio stream from local accessible file."""
1067 if seek_position:
1068 assert streamdetails.duration, "Duration required for seek requests"
1069 if not streamdetails.size:
1070 stat = await asyncio.to_thread(os.stat, filename)
1071 streamdetails.size = stat.st_size
1072
1073 # seeking an unknown or container format is not supported due to the (moov) headers
1074 if seek_position and (
1075 streamdetails.audio_format.content_type
1076 in (
1077 ContentType.UNKNOWN,
1078 ContentType.M4A,
1079 ContentType.M4B,
1080 ContentType.MP4,
1081 )
1082 ):
1083 LOGGER.warning(
1084 "Seeking in %s (%s) not possible.",
1085 streamdetails.uri,
1086 streamdetails.audio_format.output_format_str,
1087 )
1088 seek_position = 0
1089 streamdetails.seek_position = 0
1090
1091 chunk_size = get_chunksize(streamdetails.audio_format)
1092 async with aiofiles.open(streamdetails.data, "rb") as _file:
1093 if seek_position:
1094 assert streamdetails.duration is not None # for type checking
1095 seek_pos = int((streamdetails.size / streamdetails.duration) * seek_position)
1096 await _file.seek(seek_pos)
1097 # yield chunks of data from file
1098 while True:
1099 data = await _file.read(chunk_size)
1100 if not data:
1101 break
1102 yield data
1103
1104
1105def _get_parts_from_position(
1106 parts: list[MultiPartPath], seek_position: int
1107) -> tuple[list[MultiPartPath], int]:
1108 """Get the remaining parts list from a timestamp.
1109
1110 Arguments:
1111 parts: The list of parts
1112 seek_position: The seeking position in seconds of the tracklist
1113
1114 Returns:
1115 In a tuple, A list of parts, starting with the one at the requested
1116 seek position and the position in seconds to seek to in the first
1117 track.
1118 """
1119 skipped_duration = 0.0
1120 for i, part in enumerate(parts):
1121 if not isinstance(part, MultiPartPath):
1122 raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1123 if part.duration is None:
1124 return parts, seek_position
1125 if skipped_duration + part.duration < seek_position:
1126 skipped_duration += part.duration
1127 continue
1128
1129 position = seek_position - skipped_duration
1130
1131 # Seeking in some parts is inaccurate, making the seek to a chapter land on the end of
1132 # the previous track. If we're within 2 second of the end, skip the current track
1133 if position + 2 >= part.duration:
1134 LOGGER.debug(
1135 f"Skipping to the next part due to seek position being at the end: {position}"
1136 )
1137 if i + 1 < len(parts):
1138 return parts[i + 1 :], 0
1139 return parts[i:], int(position) # last part, cannot skip
1140
1141 return parts[i:], int(position)
1142
1143 raise IndexError(f"Could not find any candidate part for position {seek_position}")
1144
1145
1146async def get_multi_file_stream(
1147 mass: MusicAssistant, # noqa: ARG001
1148 streamdetails: StreamDetails,
1149 seek_position: int = 0,
1150) -> AsyncGenerator[bytes, None]:
1151 """Return audio stream for a concatenation of multiple files.
1152
1153 Arguments:
1154 seek_position: The position to seek to in seconds
1155 """
1156 if not isinstance(streamdetails.path, list):
1157 raise InvalidDataError("Multi-file streamdetails requires a list of MultiPartPath")
1158 parts, seek_position = _get_parts_from_position(streamdetails.path, seek_position)
1159 files_list = [part.path for part in parts]
1160
1161 # concat input files
1162 temp_file = f"/tmp/{shortuuid.random(20)}.txt" # noqa: S108
1163 async with aiofiles.open(temp_file, "w") as f:
1164 for path in files_list:
1165 await f.write(f"file '{path}'\n")
1166
1167 try:
1168 async for chunk in get_ffmpeg_stream(
1169 audio_input=temp_file,
1170 input_format=streamdetails.audio_format,
1171 output_format=AudioFormat(
1172 content_type=ContentType.NUT,
1173 sample_rate=streamdetails.audio_format.sample_rate,
1174 bit_depth=streamdetails.audio_format.bit_depth,
1175 channels=streamdetails.audio_format.channels,
1176 ),
1177 extra_input_args=[
1178 "-safe",
1179 "0",
1180 "-f",
1181 "concat",
1182 "-i",
1183 temp_file,
1184 "-ss",
1185 str(seek_position),
1186 ],
1187 ):
1188 yield chunk
1189 finally:
1190 await remove_file(temp_file)
1191
1192
1193async def get_preview_stream(
1194 mass: MusicAssistant,
1195 provider_instance_id_or_domain: str,
1196 item_id: str,
1197 media_type: MediaType = MediaType.TRACK,
1198) -> AsyncGenerator[bytes, None]:
1199 """Create a 30 seconds preview audioclip for the given streamdetails."""
1200 if not (music_prov := mass.get_provider(provider_instance_id_or_domain)):
1201 raise ProviderUnavailableError
1202 if TYPE_CHECKING: # avoid circular import
1203 assert isinstance(music_prov, MusicProvider)
1204
1205 # Validate that item_id corresponds to a valid item in the provider for security
1206 if not await music_prov.get_item(media_type, item_id):
1207 msg = f"Item {item_id} not found in provider {provider_instance_id_or_domain}"
1208 raise MediaNotFoundError(msg)
1209
1210 streamdetails = await music_prov.get_stream_details(item_id, media_type)
1211 pcm_format = AudioFormat(
1212 content_type=ContentType.from_bit_depth(streamdetails.audio_format.bit_depth),
1213 sample_rate=streamdetails.audio_format.sample_rate,
1214 bit_depth=streamdetails.audio_format.bit_depth,
1215 channels=streamdetails.audio_format.channels,
1216 )
1217 async for chunk in get_ffmpeg_stream(
1218 audio_input=get_media_stream(
1219 mass=mass,
1220 streamdetails=streamdetails,
1221 pcm_format=pcm_format,
1222 ),
1223 input_format=pcm_format,
1224 output_format=AudioFormat(content_type=ContentType.AAC),
1225 extra_input_args=["-t", "30"], # cut after 30 seconds
1226 ):
1227 yield chunk
1228
1229
1230async def get_silence(
1231 duration: int,
1232 output_format: AudioFormat,
1233) -> AsyncGenerator[bytes, None]:
1234 """Create stream of silence, encoded to format of choice."""
1235 if output_format.content_type.is_pcm():
1236 # pcm = just zeros
1237 for _ in range(duration):
1238 yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1239 return
1240 if output_format.content_type == ContentType.WAV:
1241 # wav silence = wave header + zero's
1242 yield create_wave_header(
1243 samplerate=output_format.sample_rate,
1244 channels=2,
1245 bitspersample=output_format.bit_depth,
1246 duration=duration,
1247 )
1248 for _ in range(duration):
1249 yield b"\0" * int(output_format.sample_rate * (output_format.bit_depth / 8) * 2)
1250 return
1251 # use ffmpeg for all other encodings
1252 args = [
1253 "ffmpeg",
1254 "-hide_banner",
1255 "-loglevel",
1256 "quiet",
1257 "-f",
1258 "lavfi",
1259 "-i",
1260 f"anullsrc=r={output_format.sample_rate}:cl={'stereo'}",
1261 "-t",
1262 str(duration),
1263 "-f",
1264 output_format.output_format_str,
1265 "-",
1266 ]
1267 async with AsyncProcess(args, stdout=True) as ffmpeg_proc:
1268 async for chunk in ffmpeg_proc.iter_chunked():
1269 yield chunk
1270
1271
1272async def resample_pcm_audio(
1273 input_audio: bytes,
1274 input_format: AudioFormat,
1275 output_format: AudioFormat,
1276) -> bytes:
1277 """
1278 Resample (a chunk of) PCM audio from input_format to output_format using ffmpeg.
1279
1280 :param input_audio: Raw PCM audio data to resample.
1281 :param input_format: AudioFormat of the input audio.
1282 :param output_format: Desired AudioFormat for the output audio.
1283
1284 :return: Resampled audio data, frame-aligned. Returns empty bytes if resampling fails.
1285 """
1286 if input_format == output_format:
1287 return input_audio
1288 LOGGER.log(VERBOSE_LOG_LEVEL, f"Resampling audio from {input_format} to {output_format}")
1289 try:
1290 ffmpeg_args = get_ffmpeg_args(
1291 input_format=input_format, output_format=output_format, filter_params=[]
1292 )
1293 _, stdout, stderr = await communicate(ffmpeg_args, input_audio)
1294 if not stdout:
1295 LOGGER.error(
1296 "Resampling failed: no output from ffmpeg. Input: %s, Output: %s, stderr: %s",
1297 input_format,
1298 output_format,
1299 stderr.decode() if stderr else "(no stderr)",
1300 )
1301 return b""
1302 # Ensure frame alignment after resampling
1303 return align_audio_to_frame_boundary(stdout, output_format)
1304 except Exception as err:
1305 LOGGER.exception(
1306 "Failed to resample audio from %s to %s: %s",
1307 input_format,
1308 output_format,
1309 err,
1310 )
1311 return b""
1312
1313
1314def get_chunksize(
1315 fmt: AudioFormat,
1316 seconds: float = 1,
1317) -> int:
1318 """Get a default chunk/file size for given contenttype in bytes."""
1319 pcm_size = int(fmt.sample_rate * (fmt.bit_depth / 8) * fmt.channels * seconds)
1320 if fmt.content_type.is_pcm() or fmt.content_type == ContentType.WAV:
1321 return pcm_size
1322 if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
1323 return pcm_size
1324 if fmt.bit_rate and fmt.bit_rate < 10000:
1325 return int(((fmt.bit_rate * 1000) / 8) * seconds)
1326 if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
1327 # assume 74.7% compression ratio (level 0)
1328 # source: https://z-issue.com/wp/flac-compression-level-comparison/
1329 return int(pcm_size * 0.747)
1330 if fmt.content_type in (ContentType.MP3, ContentType.OGG):
1331 return int((320000 / 8) * seconds)
1332 if fmt.content_type in (ContentType.AAC, ContentType.M4A):
1333 return int((256000 / 8) * seconds)
1334 return int((320000 / 8) * seconds)
1335
1336
1337def is_grouping_preventing_dsp(player: Player) -> bool:
1338 """Check if grouping is preventing DSP from being applied to this leader/PlayerGroup.
1339
1340 If this returns True, no DSP should be applied to the player.
1341 This function will not check if the Player is in a group, the caller should do that first.
1342 """
1343 # We require the caller to handle non-leader cases themselves since player.synced_to
1344 # can be unreliable in some edge cases
1345 multi_device_dsp_supported = PlayerFeature.MULTI_DEVICE_DSP in player.supported_features
1346 child_count = len(player.group_members) if player.group_members else 0
1347
1348 is_multiple_devices: bool
1349 if player.provider.domain == "player_group":
1350 # PlayerGroups have no leader, so having a child count of 1 means
1351 # the group actually contains only a single player.
1352 is_multiple_devices = child_count > 1
1353 elif player.type == PlayerType.GROUP:
1354 # This is an group player external to Music Assistant.
1355 is_multiple_devices = True
1356 else:
1357 is_multiple_devices = child_count > 0
1358 return is_multiple_devices and not multi_device_dsp_supported
1359
1360
1361def is_output_limiter_enabled(mass: MusicAssistant, player: Player) -> bool:
1362 """Check if the player has the output limiter enabled.
1363
1364 Unlike DSP, the limiter is still configurable when synchronized without MULTI_DEVICE_DSP.
1365 So in grouped scenarios without MULTI_DEVICE_DSP, the permanent sync group or the leader gets
1366 decides if the limiter should be turned on or not.
1367 """
1368 deciding_player_id = player.player_id
1369 if player.active_group:
1370 # Syncgroup, get from the group player
1371 deciding_player_id = player.active_group
1372 elif player.synced_to:
1373 # Not in sync group, but synced, get from the leader
1374 deciding_player_id = player.synced_to
1375 output_limiter_enabled = mass.config.get_raw_player_config_value(
1376 deciding_player_id,
1377 CONF_ENTRY_OUTPUT_LIMITER.key,
1378 CONF_ENTRY_OUTPUT_LIMITER.default_value,
1379 )
1380 return bool(output_limiter_enabled)
1381
1382
1383def get_player_filter_params(
1384 mass: MusicAssistant,
1385 player_id: str,
1386 input_format: AudioFormat,
1387 output_format: AudioFormat,
1388) -> list[str]:
1389 """Get player specific filter parameters for ffmpeg (if any)."""
1390 filter_params = []
1391
1392 dsp = mass.config.get_player_dsp_config(player_id)
1393 limiter_enabled = True
1394
1395 if player := mass.players.get(player_id):
1396 if is_grouping_preventing_dsp(player):
1397 # We can not correctly apply DSP to a grouped player without multi-device DSP support,
1398 # so we disable it.
1399 dsp.enabled = False
1400 elif player.provider.domain == "player_group" and (
1401 PlayerFeature.MULTI_DEVICE_DSP not in player.supported_features
1402 ):
1403 # This is a special case! We have a player group where:
1404 # - The group leader does not support MULTI_DEVICE_DSP
1405 # - But only contains a single player (since nothing is preventing DSP)
1406 # We can still apply the DSP of that single player.
1407 if player.group_members:
1408 child_player = mass.players.get(player.group_members[0])
1409 assert child_player is not None # for type checking
1410 dsp = mass.config.get_player_dsp_config(child_player.player_id)
1411 else:
1412 # This should normally never happen, but if it does, we disable DSP.
1413 dsp.enabled = False
1414
1415 # We here implicitly know what output format is used for the player
1416 # in the audio processing steps. We save this information to
1417 # later be able to show this to the user in the UI.
1418 player.extra_data["output_format"] = output_format
1419
1420 limiter_enabled = is_output_limiter_enabled(mass, player)
1421
1422 if dsp.enabled:
1423 # Apply input gain
1424 if dsp.input_gain != 0:
1425 filter_params.append(f"volume={dsp.input_gain}dB")
1426
1427 # Process each DSP filter sequentially
1428 for f in dsp.filters:
1429 if not f.enabled:
1430 continue
1431
1432 # Apply filter
1433 filter_params.extend(filter_to_ffmpeg_params(f, input_format))
1434
1435 # Apply output gain
1436 if dsp.output_gain != 0:
1437 filter_params.append(f"volume={dsp.output_gain}dB")
1438
1439 conf_channels = mass.config.get_raw_player_config_value(
1440 player_id, CONF_OUTPUT_CHANNELS, "stereo"
1441 )
1442
1443 # handle output mixing only left or right
1444 if conf_channels == "left":
1445 filter_params.append("pan=mono|c0=FL")
1446 elif conf_channels == "right":
1447 filter_params.append("pan=mono|c0=FR")
1448
1449 # Add safety limiter at the end
1450 if limiter_enabled:
1451 filter_params.append("alimiter=limit=-2dB:level=false:asc=true")
1452
1453 LOGGER.debug("Generated ffmpeg params for player %s: %s", player_id, filter_params)
1454 return filter_params
1455
1456
1457def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
1458 """Parse Loudness measurement from ffmpeg stderr output."""
1459 stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
1460 if "[Parsed_loudnorm_0 @" not in stderr_data:
1461 return None
1462 for jsun_chunk in stderr_data.split(" { "):
1463 try:
1464 stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}"
1465 loudness_data = json_loads(stderr_data)
1466 return float(loudness_data["input_i"])
1467 except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError):
1468 continue
1469 return None
1470
1471
1472async def analyze_loudness(
1473 mass: MusicAssistant,
1474 streamdetails: StreamDetails,
1475) -> None:
1476 """Analyze media item's audio, to calculate EBU R128 loudness."""
1477 if await mass.music.get_loudness(
1478 streamdetails.item_id,
1479 streamdetails.provider,
1480 media_type=streamdetails.media_type,
1481 ):
1482 # only when needed we do the analyze job
1483 return
1484
1485 logger = LOGGER.getChild("analyze_loudness")
1486 logger.debug("Start analyzing audio for %s", streamdetails.uri)
1487
1488 extra_input_args = [
1489 # limit to 10 minutes to reading too much in memory
1490 "-t",
1491 "600",
1492 ]
1493 # work out audio source for these streamdetails
1494 stream_type = streamdetails.stream_type
1495 audio_source: str | AsyncGenerator[bytes, None]
1496 if stream_type == StreamType.CUSTOM:
1497 music_prov = mass.get_provider(streamdetails.provider)
1498 if TYPE_CHECKING: # avoid circular import
1499 assert isinstance(music_prov, MusicProvider)
1500 audio_source = music_prov.get_audio_stream(streamdetails)
1501 elif stream_type == StreamType.ICY:
1502 assert isinstance(streamdetails.path, str) # for type checking
1503 audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
1504 elif stream_type == StreamType.HLS:
1505 assert isinstance(streamdetails.path, str) # for type checking
1506 substream = await get_hls_substream(mass, streamdetails.path)
1507 audio_source = substream.path
1508 else:
1509 # all other stream types (HTTP, FILE, etc)
1510 if stream_type == StreamType.ENCRYPTED_HTTP:
1511 assert streamdetails.decryption_key is not None # for type checking
1512 extra_input_args += ["-decryption_key", streamdetails.decryption_key]
1513 if isinstance(streamdetails.path, list):
1514 # multi part stream - just use a single file for the measurement
1515 audio_source = streamdetails.path[1].path
1516 else:
1517 # regular single file/url stream
1518 assert isinstance(streamdetails.path, str) # for type checking
1519 audio_source = streamdetails.path
1520
1521 # calculate BS.1770 R128 integrated loudness with ffmpeg
1522 async with FFMpeg(
1523 audio_input=audio_source,
1524 input_format=streamdetails.audio_format,
1525 output_format=streamdetails.audio_format,
1526 audio_output="NULL",
1527 filter_params=["ebur128=framelog=verbose"],
1528 extra_input_args=extra_input_args,
1529 collect_log_history=True,
1530 loglevel="info",
1531 ) as ffmpeg_proc:
1532 await ffmpeg_proc.wait()
1533 log_lines = ffmpeg_proc.log_history
1534 log_lines_str = "\n".join(log_lines)
1535 try:
1536 loudness_str = (
1537 log_lines_str.split("Integrated loudness")[1].split("I:")[1].split("LUFS")[0]
1538 )
1539 loudness = float(loudness_str.strip())
1540 except (IndexError, ValueError, AttributeError):
1541 LOGGER.warning(
1542 "Could not determine integrated loudness of %s - %s",
1543 streamdetails.uri,
1544 log_lines_str or "received empty value",
1545 )
1546 else:
1547 await mass.music.set_loudness(
1548 streamdetails.item_id,
1549 streamdetails.provider,
1550 loudness,
1551 media_type=streamdetails.media_type,
1552 )
1553 logger.debug(
1554 "Integrated loudness of %s is: %s",
1555 streamdetails.uri,
1556 loudness,
1557 )
1558
1559
1560def _get_normalization_mode(
1561 core_config: CoreConfig, player_config: PlayerConfig, streamdetails: StreamDetails
1562) -> VolumeNormalizationMode:
1563 if not player_config.get_value(CONF_VOLUME_NORMALIZATION):
1564 # disabled for this player
1565 return VolumeNormalizationMode.DISABLED
1566 if streamdetails.target_loudness is None:
1567 # no target loudness set, disable normalization
1568 return VolumeNormalizationMode.DISABLED
1569 # work out preference for track or radio
1570 preference = VolumeNormalizationMode(
1571 str(
1572 core_config.get_value(
1573 CONF_VOLUME_NORMALIZATION_RADIO
1574 if streamdetails.media_type == MediaType.RADIO
1575 else CONF_VOLUME_NORMALIZATION_TRACKS,
1576 )
1577 )
1578 )
1579
1580 # handle no measurement available but fallback to dynamic mode is allowed
1581 if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_DYNAMIC:
1582 return VolumeNormalizationMode.DYNAMIC
1583
1584 # handle no measurement available and no fallback allowed
1585 if streamdetails.loudness is None and preference == VolumeNormalizationMode.MEASUREMENT_ONLY:
1586 return VolumeNormalizationMode.DISABLED
1587
1588 # handle no measurement available and fallback to fixed gain is allowed
1589 if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_FIXED_GAIN:
1590 return VolumeNormalizationMode.FIXED_GAIN
1591
1592 # handle measurement available - chosen mode is measurement
1593 if streamdetails.loudness is not None and preference not in (
1594 VolumeNormalizationMode.DISABLED,
1595 VolumeNormalizationMode.FIXED_GAIN,
1596 VolumeNormalizationMode.DYNAMIC,
1597 ):
1598 return VolumeNormalizationMode.MEASUREMENT_ONLY
1599
1600 # simply return the preference
1601 return preference
1602