music-assistant-server

9.5 KBPY
analyzer.py
9.5 KB258 lines • python
1"""Smart Fades Analyzer - Performs audio analysis for smart fades."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7import warnings
8from typing import TYPE_CHECKING
9
10import librosa
11import numpy as np
12import numpy.typing as npt
13
14from music_assistant.constants import VERBOSE_LOG_LEVEL
15from music_assistant.helpers.audio import (
16    align_audio_to_frame_boundary,
17)
18from music_assistant.models.smart_fades import (
19    SmartFadesAnalysis,
20    SmartFadesAnalysisFragment,
21)
22
23if TYPE_CHECKING:
24    from music_assistant_models.media_items import AudioFormat
25
26    from music_assistant.controllers.streams.streams_controller import StreamsController
27
28ANALYSIS_FPS = 100
29
30
31class SmartFadesAnalyzer:
32    """Smart fades analyzer that performs audio analysis."""
33
34    def __init__(self, streams: StreamsController) -> None:
35        """Initialize smart fades analyzer."""
36        self.streams = streams
37        self.logger = streams.logger.getChild("smart_fades_analyzer")
38
39    async def analyze(
40        self,
41        item_id: str,
42        provider_instance_id_or_domain: str,
43        fragment: SmartFadesAnalysisFragment,
44        audio_data: bytes,
45        pcm_format: AudioFormat,
46    ) -> SmartFadesAnalysis | None:
47        """Analyze a track's beats for BPM matching smart fade."""
48        stream_details_name = f"{provider_instance_id_or_domain}://{item_id}"
49        start_time = time.perf_counter()
50        self.logger.log(
51            VERBOSE_LOG_LEVEL,
52            "Starting %s beat analysis for track : %s",
53            fragment.name,
54            stream_details_name,
55        )
56
57        # Validate input audio data is frame-aligned
58        audio_data = align_audio_to_frame_boundary(audio_data, pcm_format)
59
60        fragment_duration = len(audio_data) / (pcm_format.pcm_sample_size)
61        try:
62            self.logger.log(
63                VERBOSE_LOG_LEVEL,
64                "Audio data: %.2fs, %d bytes",
65                fragment_duration,
66                len(audio_data),
67            )
68            # Convert PCM bytes to numpy array and then to mono for analysis
69            audio_array = np.frombuffer(audio_data, dtype=np.float32)
70            if pcm_format.channels > 1:
71                # Ensure array size is divisible by channel count
72                samples_per_channel = len(audio_array) // pcm_format.channels
73                valid_samples = samples_per_channel * pcm_format.channels
74                if valid_samples != len(audio_array):
75                    self.logger.warning(
76                        "Audio buffer size (%d) not divisible by channels (%d), "
77                        "truncating %d samples",
78                        len(audio_array),
79                        pcm_format.channels,
80                        len(audio_array) - valid_samples,
81                    )
82                    audio_array = audio_array[:valid_samples]
83
84                # Reshape to separate channels and take average for mono conversion
85                audio_array = audio_array.reshape(-1, pcm_format.channels)
86                mono_audio = np.asarray(np.mean(audio_array, axis=1, dtype=np.float32))
87            else:
88                # Single channel - ensure consistent array type
89                mono_audio = np.asarray(audio_array, dtype=np.float32)
90
91            # Validate that the audio is finite (no NaN or Inf values)
92            if not np.all(np.isfinite(mono_audio)):
93                self.logger.error(
94                    "Audio buffer contains non-finite values (NaN/Inf) for %s, cannot analyze",
95                    stream_details_name,
96                )
97                return None
98
99            analysis = await self._analyze_track_beats(mono_audio, fragment, pcm_format.sample_rate)
100
101            total_time = time.perf_counter() - start_time
102            if not analysis:
103                self.logger.debug(
104                    "No analysis results found after analyzing audio for: %s (took %.2fs).",
105                    stream_details_name,
106                    total_time,
107                )
108                return None
109            self.logger.debug(
110                "Smart fades %s analysis completed for %s: BPM=%.1f, %d beats, "
111                "%d downbeats, confidence=%.2f (took %.2fs)",
112                fragment.name,
113                stream_details_name,
114                analysis.bpm,
115                len(analysis.beats),
116                len(analysis.downbeats),
117                analysis.confidence,
118                total_time,
119            )
120            self.streams.mass.create_task(
121                self.streams.mass.music.set_smart_fades_analysis(
122                    item_id, provider_instance_id_or_domain, analysis
123                )
124            )
125            return analysis
126        except Exception as e:
127            total_time = time.perf_counter() - start_time
128            self.logger.exception(
129                "Beat analysis error for %s: %s (took %.2fs)",
130                stream_details_name,
131                e,
132                total_time,
133            )
134            return None
135
136    def _librosa_beat_analysis(
137        self,
138        audio_array: npt.NDArray[np.float32],
139        fragment: SmartFadesAnalysisFragment,
140        sample_rate: int,
141    ) -> SmartFadesAnalysis | None:
142        """Perform beat analysis using librosa."""
143        try:
144            # Suppress librosa UserWarnings about empty mel filters
145            # These warnings are harmless and occur with certain audio characteristics
146            with warnings.catch_warnings():
147                warnings.filterwarnings(
148                    "ignore",
149                    message="Empty filters detected in mel frequency basis",
150                    category=UserWarning,
151                )
152                tempo, beats_array = librosa.beat.beat_track(
153                    y=audio_array,
154                    sr=sample_rate,
155                    units="time",
156                )
157            # librosa returns np.float64 arrays when units="time"
158
159            if len(beats_array) < 2:
160                self.logger.warning("Insufficient beats detected: %d", len(beats_array))
161                return None
162
163            bpm = float(tempo.item()) if hasattr(tempo, "item") else float(tempo)
164
165            # Calculate confidence based on consistency of intervals
166            if len(beats_array) > 2:
167                intervals = np.diff(beats_array)
168                interval_std = np.std(intervals)
169                interval_mean = np.mean(intervals)
170                # Lower coefficient of variation = higher confidence
171                cv = interval_std / interval_mean if interval_mean > 0 else 1.0
172                confidence = max(0.1, 1.0 - cv)
173            else:
174                confidence = 0.5  # Low confidence with few beats
175
176            downbeats = self._estimate_musical_downbeats(beats_array, bpm)
177
178            # Store complete fragment analysis
179            fragment_duration = len(audio_array) / sample_rate
180
181            return SmartFadesAnalysis(
182                fragment=fragment,
183                bpm=float(bpm),
184                beats=beats_array,
185                downbeats=downbeats,
186                confidence=float(confidence),
187                duration=fragment_duration,
188            )
189
190        except Exception as e:
191            self.logger.exception("Librosa beat analysis failed: %s", e)
192            return None
193
194    def _estimate_musical_downbeats(
195        self, beats_array: npt.NDArray[np.float64], bpm: float
196    ) -> npt.NDArray[np.float64]:
197        """Estimate downbeats using musical logic and beat consistency."""
198        if len(beats_array) < 4:
199            return beats_array[:1] if len(beats_array) > 0 else np.array([])
200
201        # Calculate expected beat interval from BPM
202        expected_beat_interval = 60.0 / bpm
203
204        # Look for the most likely starting downbeat by analyzing beat intervals
205        # In 4/4 time, downbeats should be every 4 beats
206        best_offset = 0
207        best_consistency = 0.0
208
209        # Try different starting offsets (0, 1, 2, 3) to find most consistent downbeat pattern
210        for offset in range(min(4, len(beats_array))):
211            downbeat_candidates = beats_array[offset::4]
212
213            if len(downbeat_candidates) < 2:
214                continue
215
216            # Calculate consistency score based on interval regularity
217            intervals = np.diff(downbeat_candidates)
218            expected_downbeat_interval = 4 * expected_beat_interval
219
220            # Score based on how close intervals are to expected 4-beat interval
221            interval_errors = (
222                np.abs(intervals - expected_downbeat_interval) / expected_downbeat_interval
223            )
224            consistency = 1.0 - np.mean(interval_errors)
225
226            if consistency > best_consistency:
227                best_consistency = float(consistency)
228                best_offset = offset
229
230        # Use the best offset to generate final downbeats
231        downbeats = beats_array[best_offset::4]
232
233        self.logger.log(
234            VERBOSE_LOG_LEVEL,
235            "Downbeat estimation: offset=%d, consistency=%.2f, %d downbeats from %d beats",
236            best_offset,
237            best_consistency,
238            len(downbeats),
239            len(beats_array),
240        )
241
242        return downbeats
243
244    async def _analyze_track_beats(
245        self,
246        audio_data: npt.NDArray[np.float32],
247        fragment: SmartFadesAnalysisFragment,
248        sample_rate: int,
249    ) -> SmartFadesAnalysis | None:
250        """Analyze track for beat tracking using librosa."""
251        try:
252            return await asyncio.to_thread(
253                self._librosa_beat_analysis, audio_data, fragment, sample_rate
254            )
255        except Exception as e:
256            self.logger.exception("Beat tracking analysis failed: %s", e)
257            return None
258