/
/
/
1"""Smart Fades Analyzer - Performs audio analysis for smart fades."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7import warnings
8from typing import TYPE_CHECKING
9
10import librosa
11import numpy as np
12import numpy.typing as npt
13
14from music_assistant.constants import VERBOSE_LOG_LEVEL
15from music_assistant.helpers.audio import (
16 align_audio_to_frame_boundary,
17)
18from music_assistant.models.smart_fades import (
19 SmartFadesAnalysis,
20 SmartFadesAnalysisFragment,
21)
22
23if TYPE_CHECKING:
24 from music_assistant_models.media_items import AudioFormat
25
26 from music_assistant.controllers.streams.streams_controller import StreamsController
27
28ANALYSIS_FPS = 100
29
30
31class SmartFadesAnalyzer:
32 """Smart fades analyzer that performs audio analysis."""
33
34 def __init__(self, streams: StreamsController) -> None:
35 """Initialize smart fades analyzer."""
36 self.streams = streams
37 self.logger = streams.logger.getChild("smart_fades_analyzer")
38
39 async def analyze(
40 self,
41 item_id: str,
42 provider_instance_id_or_domain: str,
43 fragment: SmartFadesAnalysisFragment,
44 audio_data: bytes,
45 pcm_format: AudioFormat,
46 ) -> SmartFadesAnalysis | None:
47 """Analyze a track's beats for BPM matching smart fade."""
48 stream_details_name = f"{provider_instance_id_or_domain}://{item_id}"
49 start_time = time.perf_counter()
50 self.logger.log(
51 VERBOSE_LOG_LEVEL,
52 "Starting %s beat analysis for track : %s",
53 fragment.name,
54 stream_details_name,
55 )
56
57 # Validate input audio data is frame-aligned
58 audio_data = align_audio_to_frame_boundary(audio_data, pcm_format)
59
60 fragment_duration = len(audio_data) / (pcm_format.pcm_sample_size)
61 try:
62 self.logger.log(
63 VERBOSE_LOG_LEVEL,
64 "Audio data: %.2fs, %d bytes",
65 fragment_duration,
66 len(audio_data),
67 )
68 # Convert PCM bytes to numpy array and then to mono for analysis
69 audio_array = np.frombuffer(audio_data, dtype=np.float32)
70 if pcm_format.channels > 1:
71 # Ensure array size is divisible by channel count
72 samples_per_channel = len(audio_array) // pcm_format.channels
73 valid_samples = samples_per_channel * pcm_format.channels
74 if valid_samples != len(audio_array):
75 self.logger.warning(
76 "Audio buffer size (%d) not divisible by channels (%d), "
77 "truncating %d samples",
78 len(audio_array),
79 pcm_format.channels,
80 len(audio_array) - valid_samples,
81 )
82 audio_array = audio_array[:valid_samples]
83
84 # Reshape to separate channels and take average for mono conversion
85 audio_array = audio_array.reshape(-1, pcm_format.channels)
86 mono_audio = np.asarray(np.mean(audio_array, axis=1, dtype=np.float32))
87 else:
88 # Single channel - ensure consistent array type
89 mono_audio = np.asarray(audio_array, dtype=np.float32)
90
91 # Validate that the audio is finite (no NaN or Inf values)
92 if not np.all(np.isfinite(mono_audio)):
93 self.logger.error(
94 "Audio buffer contains non-finite values (NaN/Inf) for %s, cannot analyze",
95 stream_details_name,
96 )
97 return None
98
99 analysis = await self._analyze_track_beats(mono_audio, fragment, pcm_format.sample_rate)
100
101 total_time = time.perf_counter() - start_time
102 if not analysis:
103 self.logger.debug(
104 "No analysis results found after analyzing audio for: %s (took %.2fs).",
105 stream_details_name,
106 total_time,
107 )
108 return None
109 self.logger.debug(
110 "Smart fades %s analysis completed for %s: BPM=%.1f, %d beats, "
111 "%d downbeats, confidence=%.2f (took %.2fs)",
112 fragment.name,
113 stream_details_name,
114 analysis.bpm,
115 len(analysis.beats),
116 len(analysis.downbeats),
117 analysis.confidence,
118 total_time,
119 )
120 self.streams.mass.create_task(
121 self.streams.mass.music.set_smart_fades_analysis(
122 item_id, provider_instance_id_or_domain, analysis
123 )
124 )
125 return analysis
126 except Exception as e:
127 total_time = time.perf_counter() - start_time
128 self.logger.exception(
129 "Beat analysis error for %s: %s (took %.2fs)",
130 stream_details_name,
131 e,
132 total_time,
133 )
134 return None
135
136 def _librosa_beat_analysis(
137 self,
138 audio_array: npt.NDArray[np.float32],
139 fragment: SmartFadesAnalysisFragment,
140 sample_rate: int,
141 ) -> SmartFadesAnalysis | None:
142 """Perform beat analysis using librosa."""
143 try:
144 # Suppress librosa UserWarnings about empty mel filters
145 # These warnings are harmless and occur with certain audio characteristics
146 with warnings.catch_warnings():
147 warnings.filterwarnings(
148 "ignore",
149 message="Empty filters detected in mel frequency basis",
150 category=UserWarning,
151 )
152 tempo, beats_array = librosa.beat.beat_track(
153 y=audio_array,
154 sr=sample_rate,
155 units="time",
156 )
157 # librosa returns np.float64 arrays when units="time"
158
159 if len(beats_array) < 2:
160 self.logger.warning("Insufficient beats detected: %d", len(beats_array))
161 return None
162
163 bpm = float(tempo.item()) if hasattr(tempo, "item") else float(tempo)
164
165 # Calculate confidence based on consistency of intervals
166 if len(beats_array) > 2:
167 intervals = np.diff(beats_array)
168 interval_std = np.std(intervals)
169 interval_mean = np.mean(intervals)
170 # Lower coefficient of variation = higher confidence
171 cv = interval_std / interval_mean if interval_mean > 0 else 1.0
172 confidence = max(0.1, 1.0 - cv)
173 else:
174 confidence = 0.5 # Low confidence with few beats
175
176 downbeats = self._estimate_musical_downbeats(beats_array, bpm)
177
178 # Store complete fragment analysis
179 fragment_duration = len(audio_array) / sample_rate
180
181 return SmartFadesAnalysis(
182 fragment=fragment,
183 bpm=float(bpm),
184 beats=beats_array,
185 downbeats=downbeats,
186 confidence=float(confidence),
187 duration=fragment_duration,
188 )
189
190 except Exception as e:
191 self.logger.exception("Librosa beat analysis failed: %s", e)
192 return None
193
194 def _estimate_musical_downbeats(
195 self, beats_array: npt.NDArray[np.float64], bpm: float
196 ) -> npt.NDArray[np.float64]:
197 """Estimate downbeats using musical logic and beat consistency."""
198 if len(beats_array) < 4:
199 return beats_array[:1] if len(beats_array) > 0 else np.array([])
200
201 # Calculate expected beat interval from BPM
202 expected_beat_interval = 60.0 / bpm
203
204 # Look for the most likely starting downbeat by analyzing beat intervals
205 # In 4/4 time, downbeats should be every 4 beats
206 best_offset = 0
207 best_consistency = 0.0
208
209 # Try different starting offsets (0, 1, 2, 3) to find most consistent downbeat pattern
210 for offset in range(min(4, len(beats_array))):
211 downbeat_candidates = beats_array[offset::4]
212
213 if len(downbeat_candidates) < 2:
214 continue
215
216 # Calculate consistency score based on interval regularity
217 intervals = np.diff(downbeat_candidates)
218 expected_downbeat_interval = 4 * expected_beat_interval
219
220 # Score based on how close intervals are to expected 4-beat interval
221 interval_errors = (
222 np.abs(intervals - expected_downbeat_interval) / expected_downbeat_interval
223 )
224 consistency = 1.0 - np.mean(interval_errors)
225
226 if consistency > best_consistency:
227 best_consistency = float(consistency)
228 best_offset = offset
229
230 # Use the best offset to generate final downbeats
231 downbeats = beats_array[best_offset::4]
232
233 self.logger.log(
234 VERBOSE_LOG_LEVEL,
235 "Downbeat estimation: offset=%d, consistency=%.2f, %d downbeats from %d beats",
236 best_offset,
237 best_consistency,
238 len(downbeats),
239 len(beats_array),
240 )
241
242 return downbeats
243
244 async def _analyze_track_beats(
245 self,
246 audio_data: npt.NDArray[np.float32],
247 fragment: SmartFadesAnalysisFragment,
248 sample_rate: int,
249 ) -> SmartFadesAnalysis | None:
250 """Analyze track for beat tracking using librosa."""
251 try:
252 return await asyncio.to_thread(
253 self._librosa_beat_analysis, audio_data, fragment, sample_rate
254 )
255 except Exception as e:
256 self.logger.exception("Beat tracking analysis failed: %s", e)
257 return None
258