/
/
/
1"""Smart Fades - Audio fade implementations."""
2
3from __future__ import annotations
4
5import logging
6from abc import ABC, abstractmethod
7from typing import TYPE_CHECKING
8
9import aiofiles
10import numpy as np
11import numpy.typing as npt
12import shortuuid
13
14from music_assistant.constants import VERBOSE_LOG_LEVEL
15from music_assistant.controllers.streams.smart_fades.filters import (
16 CrossfadeFilter,
17 Filter,
18 FrequencySweepFilter,
19 TimeStretchFilter,
20 TrimFilter,
21)
22from music_assistant.helpers.process import communicate
23from music_assistant.helpers.util import remove_file
24from music_assistant.models.smart_fades import (
25 SmartFadesAnalysis,
26)
27
28if TYPE_CHECKING:
29 from music_assistant_models.media_items import AudioFormat
30
31SMART_CROSSFADE_DURATION = 45
32
33
34class SmartFade(ABC):
35 """Abstract base class for Smart Fades."""
36
37 filters: list[Filter]
38
39 def __init__(self, logger: logging.Logger) -> None:
40 """Initialize SmartFade base class."""
41 self.filters = []
42 self.logger = logger
43
44 @abstractmethod
45 def _build(self) -> None:
46 """Build the smart fades filter chain."""
47 ...
48
49 def _get_ffmpeg_filters(
50 self,
51 input_fadein_label: str = "[1]",
52 input_fadeout_label: str = "[0]",
53 ) -> list[str]:
54 """Get FFmpeg filters for smart fades."""
55 if not self.filters:
56 self._build()
57 filters = []
58 _cur_fadein_label = input_fadein_label
59 _cur_fadeout_label = input_fadeout_label
60 for audio_filter in self.filters:
61 filter_strings = audio_filter.apply(_cur_fadein_label, _cur_fadeout_label)
62 filters.extend(filter_strings)
63 _cur_fadein_label = f"[{audio_filter.output_fadein_label}]"
64 _cur_fadeout_label = f"[{audio_filter.output_fadeout_label}]"
65 return filters
66
67 async def apply(
68 self,
69 fade_out_part: bytes,
70 fade_in_part: bytes,
71 pcm_format: AudioFormat,
72 ) -> bytes:
73 """Apply the smart fade to the given PCM audio parts."""
74 # Write the fade_out_part to a temporary file
75 fadeout_filename = f"/tmp/{shortuuid.random(20)}.pcm" # noqa: S108
76 async with aiofiles.open(fadeout_filename, "wb") as outfile:
77 await outfile.write(fade_out_part)
78
79 args = [
80 "ffmpeg",
81 "-hide_banner",
82 "-loglevel",
83 "error",
84 # Input 1: fadeout part (as file)
85 "-acodec",
86 pcm_format.content_type.name.lower(), # e.g., "pcm_f32le" not just "f32le"
87 "-ac",
88 str(pcm_format.channels),
89 "-ar",
90 str(pcm_format.sample_rate),
91 "-channel_layout",
92 "mono" if pcm_format.channels == 1 else "stereo",
93 "-f",
94 pcm_format.content_type.value,
95 "-i",
96 fadeout_filename,
97 # Input 2: fade_in part (stdin)
98 "-acodec",
99 pcm_format.content_type.name.lower(),
100 "-ac",
101 str(pcm_format.channels),
102 "-ar",
103 str(pcm_format.sample_rate),
104 "-channel_layout",
105 "mono" if pcm_format.channels == 1 else "stereo",
106 "-f",
107 pcm_format.content_type.value,
108 "-i",
109 "-",
110 ]
111 smart_fade_filters = self._get_ffmpeg_filters()
112 self.logger.debug(
113 "Applying smartfade: %s",
114 self,
115 )
116 args.extend(
117 [
118 "-filter_complex",
119 ";".join(smart_fade_filters),
120 # Output format specification - must match input codec format
121 "-acodec",
122 pcm_format.content_type.name.lower(),
123 "-ac",
124 str(pcm_format.channels),
125 "-ar",
126 str(pcm_format.sample_rate),
127 "-channel_layout",
128 "mono" if pcm_format.channels == 1 else "stereo",
129 "-f",
130 pcm_format.content_type.value,
131 "-",
132 ]
133 )
134 self.logger.log(VERBOSE_LOG_LEVEL, "FFmpeg command args: %s", " ".join(args))
135
136 try:
137 # Execute the enhanced smart fade with full buffer
138 returncode, raw_crossfade_output, stderr = await communicate(args, fade_in_part)
139
140 expected_min_output = (
141 len(fade_out_part) + len(fade_in_part) - int(pcm_format.pcm_sample_size * 10)
142 ) # rough minimum: both inputs minus ~10s overlap
143 self.logger.debug(
144 "FFmpeg smartfade result: returncode=%d%s, "
145 "output=%.2fs (%d bytes), fadeout_input=%.2fs, fadein_input=%.2fs%s, "
146 "stderr=%s",
147 returncode,
148 " *** NONZERO - crossfade likely FAILED or produced partial output!"
149 if returncode != 0
150 else "",
151 len(raw_crossfade_output) / pcm_format.pcm_sample_size
152 if raw_crossfade_output
153 else 0,
154 len(raw_crossfade_output) if raw_crossfade_output else 0,
155 len(fade_out_part) / pcm_format.pcm_sample_size,
156 len(fade_in_part) / pcm_format.pcm_sample_size,
157 f" *** OUTPUT SUSPICIOUSLY SMALL (expected >={expected_min_output} bytes)"
158 if raw_crossfade_output and len(raw_crossfade_output) < expected_min_output
159 else "",
160 stderr.decode().strip() if stderr else "(empty)",
161 )
162
163 if raw_crossfade_output:
164 return raw_crossfade_output
165 stderr_msg = stderr.decode() if stderr else "(no stderr output)"
166 raise RuntimeError(f"Smart crossfade failed. FFmpeg stderr: {stderr_msg}")
167 finally:
168 # Always cleanup temp file, even if ffmpeg fails
169 await remove_file(fadeout_filename)
170
171 def __repr__(self) -> str:
172 """Return string representation of SmartFade showing the filter chain."""
173 if not self.filters:
174 return f"<{self.__class__.__name__}: 0 filters>"
175
176 chain = " â ".join(repr(f) for f in self.filters)
177 return f"<{self.__class__.__name__}: {len(self.filters)} filters> {chain}"
178
179
180class SmartCrossFade(SmartFade):
181 """Smart fades class that implements a Smart Fade mode."""
182
183 # Only apply time stretching if BPM difference is < this %
184 time_stretch_bpm_percentage_threshold: float = 5.0
185
186 def __init__(
187 self,
188 logger: logging.Logger,
189 fade_out_analysis: SmartFadesAnalysis,
190 fade_in_analysis: SmartFadesAnalysis,
191 ) -> None:
192 """Initialize SmartFades with analysis data.
193
194 Args:
195 fade_out_analysis: Analysis data for the outgoing track
196 fade_in_analysis: Analysis data for the incoming track
197 logger: Optional logger for debug output
198 """
199 self.fade_out_analysis = fade_out_analysis
200 self.fade_in_analysis = fade_in_analysis
201 super().__init__(logger)
202
203 def _build(self) -> None:
204 """Build the smart fades filter chain."""
205 # Calculate tempo factor for time stretching
206 bpm_ratio = self.fade_in_analysis.bpm / self.fade_out_analysis.bpm
207 bpm_diff_percent = abs(1.0 - bpm_ratio) * 100
208
209 # Extrapolate downbeats for better bar calculation
210 self.extrapolated_fadeout_downbeats = extrapolate_downbeats(
211 self.fade_out_analysis.downbeats,
212 tempo_factor=1.0,
213 bpm=self.fade_out_analysis.bpm,
214 )
215
216 # Additional verbose logging to debug rare failures
217 self.logger.log(
218 VERBOSE_LOG_LEVEL,
219 "SmartCrossFade build: fade_out: %s, fade_in: %s",
220 self.fade_out_analysis,
221 self.fade_in_analysis,
222 )
223
224 # Calculate optimal crossfade bars that fit in available buffer
225 crossfade_bars = self._calculate_optimal_crossfade_bars()
226
227 # Calculate beat positions for the selected bar count
228 fadein_start_pos = self._calculate_optimal_fade_timing(crossfade_bars)
229
230 # Calculate initial crossfade duration (may be adjusted later for downbeat alignment)
231 crossfade_duration = self._calculate_crossfade_duration(crossfade_bars=crossfade_bars)
232
233 # Add time stretch filter if needed
234 if (
235 0.1 < bpm_diff_percent <= self.time_stretch_bpm_percentage_threshold
236 and crossfade_bars > 4
237 ):
238 self.filters.append(TimeStretchFilter(logger=self.logger, stretch_ratio=bpm_ratio))
239 # Re-extrapolate downbeats with actual tempo factor for time-stretched audio
240 self.extrapolated_fadeout_downbeats = extrapolate_downbeats(
241 self.fade_out_analysis.downbeats,
242 tempo_factor=bpm_ratio,
243 bpm=self.fade_out_analysis.bpm,
244 )
245
246 # Check if we would have enough audio after beat alignment for the crossfade
247 if fadein_start_pos:
248 required = fadein_start_pos + crossfade_duration
249 self.logger.debug(
250 "Trim validation: fadein_start=%.2fs + xfade=%.2fs"
251 " = %.2fs needed. Checked against constant=%ds"
252 " (pass=%s). NOTE: if actual fade_in buffer is"
253 " shorter than %ds after silence stripping,"
254 " FFmpeg acrossfade WILL fail (only %.2fs would"
255 " remain, need %.2fs)",
256 fadein_start_pos,
257 crossfade_duration,
258 required,
259 SMART_CROSSFADE_DURATION,
260 required <= SMART_CROSSFADE_DURATION,
261 SMART_CROSSFADE_DURATION,
262 SMART_CROSSFADE_DURATION - required,
263 crossfade_duration,
264 )
265 if fadein_start_pos and fadein_start_pos + crossfade_duration <= SMART_CROSSFADE_DURATION:
266 self.filters.append(TrimFilter(logger=self.logger, fadein_start_pos=fadein_start_pos))
267 else:
268 self.logger.log(
269 VERBOSE_LOG_LEVEL,
270 "Skipping beat alignment: not enough audio after trim (%.1fs + %.1fs > %.1fs)",
271 fadein_start_pos,
272 crossfade_duration,
273 SMART_CROSSFADE_DURATION,
274 )
275
276 # Adjust crossfade duration to align with outgoing track's downbeats
277 crossfade_duration = self._adjust_crossfade_to_downbeats(
278 crossfade_duration=crossfade_duration,
279 fadein_start_pos=fadein_start_pos,
280 )
281
282 # 90 BPM -> 1500Hz, 140 BPM -> 2500Hz
283 avg_bpm = (self.fade_out_analysis.bpm + self.fade_in_analysis.bpm) / 2
284 crossover_freq = int(np.clip(1500 + (avg_bpm - 90) * 20, 1500, 2500))
285
286 # Adjust for BPM mismatch
287 if abs(bpm_ratio - 1.0) > 0.3:
288 crossover_freq = int(crossover_freq * 0.85)
289
290 # For shorter fades, use exp/exp curves to avoid abruptness
291 if crossfade_bars < 8:
292 fadeout_curve = "exponential"
293 fadein_curve = "exponential"
294 # For long fades, use log/linear curves
295 else:
296 # Use logarithmic curve to give the next track more space
297 fadeout_curve = "logarithmic"
298 # Use linear curve for transition, predictable and not too abrupt
299 fadein_curve = "linear"
300
301 # Create lowpass filter on the outgoing track (unfiltered â low-pass)
302 # Extended lowpass effect to gradually remove bass frequencies
303 fadeout_eq_duration = min(max(crossfade_duration * 2.5, 8.0), SMART_CROSSFADE_DURATION)
304 # The crossfade always happens at the END of the buffer
305 fadeout_eq_start = max(0, SMART_CROSSFADE_DURATION - fadeout_eq_duration)
306 fadeout_sweep = FrequencySweepFilter(
307 logger=self.logger,
308 sweep_type="lowpass",
309 target_freq=crossover_freq,
310 duration=fadeout_eq_duration,
311 start_time=fadeout_eq_start,
312 sweep_direction="fade_in",
313 poles=1,
314 curve_type=fadeout_curve,
315 stream_type="fadeout",
316 )
317 self.filters.append(fadeout_sweep)
318
319 # Create high pass filter on the incoming track (high-pass â unfiltered)
320 # Quicker highpass removal to avoid lingering vocals after crossfade
321 fadein_eq_duration = crossfade_duration / 1.5
322 fadein_sweep = FrequencySweepFilter(
323 logger=self.logger,
324 sweep_type="highpass",
325 target_freq=crossover_freq,
326 duration=fadein_eq_duration,
327 start_time=0,
328 sweep_direction="fade_out",
329 poles=1,
330 curve_type=fadein_curve,
331 stream_type="fadein",
332 )
333 self.filters.append(fadein_sweep)
334
335 # Add final crossfade filter
336 crossfade_filter = CrossfadeFilter(
337 logger=self.logger, crossfade_duration=crossfade_duration
338 )
339 self.filters.append(crossfade_filter)
340
341 def _calculate_crossfade_duration(self, crossfade_bars: int) -> float:
342 """Calculate final crossfade duration based on musical bars and BPM."""
343 # Calculate crossfade duration based on incoming track's BPM
344 beats_per_bar = 4
345 seconds_per_beat = 60.0 / self.fade_in_analysis.bpm
346 musical_duration = crossfade_bars * beats_per_bar * seconds_per_beat
347
348 # Apply buffer constraint
349 actual_duration = min(musical_duration, SMART_CROSSFADE_DURATION)
350
351 # Log if we had to constrain the duration
352 if musical_duration > SMART_CROSSFADE_DURATION:
353 self.logger.log(
354 VERBOSE_LOG_LEVEL,
355 "Constraining crossfade duration from %.1fs to %.1fs (buffer limit)",
356 musical_duration,
357 actual_duration,
358 )
359
360 return actual_duration
361
362 def _calculate_optimal_crossfade_bars(self) -> int:
363 """Calculate optimal crossfade bars that fit in available buffer."""
364 bpm_in = self.fade_in_analysis.bpm
365 bpm_out = self.fade_out_analysis.bpm
366 bpm_diff_percent = abs(1.0 - bpm_in / bpm_out) * 100
367
368 # Calculate ideal bars based on BPM compatibility
369 ideal_bars = 10 if bpm_diff_percent <= self.time_stretch_bpm_percentage_threshold else 6
370
371 # Reduce bars until it fits in the fadein buffer
372 for bars in [ideal_bars, 8, 6, 4, 2, 1]:
373 if bars > ideal_bars:
374 continue
375
376 fadein_start_pos = self._calculate_optimal_fade_timing(bars)
377 if fadein_start_pos is None:
378 continue
379
380 # Calculate what the duration would be
381 test_duration = self._calculate_crossfade_duration(crossfade_bars=bars)
382
383 # Check if it fits in fadein buffer
384 fadein_buffer = SMART_CROSSFADE_DURATION - fadein_start_pos
385 if test_duration <= fadein_buffer:
386 if bars < ideal_bars:
387 self.logger.log(
388 VERBOSE_LOG_LEVEL,
389 "Reduced crossfade from %d to %d bars (fadein buffer=%.1fs, needed=%.1fs)",
390 ideal_bars,
391 bars,
392 fadein_buffer,
393 test_duration,
394 )
395 return bars
396
397 # Fall back to 1 bar if nothing else fits
398 return 1
399
400 def _calculate_optimal_fade_timing(self, crossfade_bars: int) -> float | None:
401 """Calculate beat positions for alignment."""
402 beats_per_bar = 4
403
404 def calculate_beat_positions(
405 fade_out_beats: npt.NDArray[np.float64],
406 fade_in_beats: npt.NDArray[np.float64],
407 num_beats: int,
408 ) -> float | None:
409 """Calculate start positions from beat arrays."""
410 if len(fade_out_beats) < num_beats or len(fade_in_beats) < num_beats:
411 return None
412
413 fade_in_slice = fade_in_beats[:num_beats]
414 return float(fade_in_slice[0])
415
416 # Try downbeats first for most musical timing
417 downbeat_positions = calculate_beat_positions(
418 self.extrapolated_fadeout_downbeats, self.fade_in_analysis.downbeats, crossfade_bars
419 )
420 if downbeat_positions:
421 return downbeat_positions
422
423 # Try regular beats if downbeats insufficient
424 required_beats = crossfade_bars * beats_per_bar
425 beat_positions = calculate_beat_positions(
426 self.fade_out_analysis.beats, self.fade_in_analysis.beats, required_beats
427 )
428 if beat_positions:
429 return beat_positions
430
431 # Fallback: No beat alignment possible
432 self.logger.log(VERBOSE_LOG_LEVEL, "No beat alignment possible (insufficient beats)")
433 return None
434
435 def _adjust_crossfade_to_downbeats(
436 self,
437 crossfade_duration: float,
438 fadein_start_pos: float | None,
439 ) -> float:
440 """Adjust crossfade duration to align with outgoing track's downbeats."""
441 # If we don't have downbeats or beat alignment is disabled, return original duration
442 if len(self.extrapolated_fadeout_downbeats) == 0 or fadein_start_pos is None:
443 return crossfade_duration
444
445 # Calculate where the crossfade would start in the buffer
446 ideal_start_pos = SMART_CROSSFADE_DURATION - crossfade_duration
447
448 # Debug logging
449 self.logger.log(
450 VERBOSE_LOG_LEVEL,
451 "Downbeat adjustment - ideal_start=%.2fs (buffer=%.1fs - crossfade=%.2fs), "
452 "fadein_start=%.2fs",
453 ideal_start_pos,
454 SMART_CROSSFADE_DURATION,
455 crossfade_duration,
456 fadein_start_pos,
457 )
458
459 # Find the closest downbeats (earlier and later)
460 earlier_downbeat = None
461 later_downbeat = None
462
463 for downbeat in self.extrapolated_fadeout_downbeats:
464 if downbeat <= ideal_start_pos:
465 earlier_downbeat = downbeat
466 elif downbeat > ideal_start_pos and later_downbeat is None:
467 later_downbeat = downbeat
468 break
469
470 # Try earlier downbeat first (longer crossfade)
471 if earlier_downbeat is not None:
472 adjusted_duration = float(SMART_CROSSFADE_DURATION - earlier_downbeat)
473 if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
474 if abs(adjusted_duration - crossfade_duration) > 0.1:
475 self.logger.log(
476 VERBOSE_LOG_LEVEL,
477 "Adjusted crossfade duration from %.2fs to %.2fs to align with "
478 "downbeat at %.2fs (earlier)",
479 crossfade_duration,
480 adjusted_duration,
481 earlier_downbeat,
482 )
483 return adjusted_duration
484
485 # Try later downbeat (shorter crossfade)
486 if later_downbeat is not None:
487 adjusted_duration = float(SMART_CROSSFADE_DURATION - later_downbeat)
488 if fadein_start_pos + adjusted_duration <= SMART_CROSSFADE_DURATION:
489 if abs(adjusted_duration - crossfade_duration) > 0.1:
490 self.logger.log(
491 VERBOSE_LOG_LEVEL,
492 "Adjusted crossfade duration from %.2fs to %.2fs to align with "
493 "downbeat at %.2fs (later)",
494 crossfade_duration,
495 adjusted_duration,
496 later_downbeat,
497 )
498 return adjusted_duration
499
500 # If no suitable downbeat found, return original duration
501 self.logger.log(
502 VERBOSE_LOG_LEVEL,
503 "Could not adjust crossfade duration to downbeats, using original %.2fs",
504 crossfade_duration,
505 )
506 return crossfade_duration
507
508
509class StandardCrossFade(SmartFade):
510 """Standard crossfade class that implements a standard crossfade mode."""
511
512 def __init__(self, logger: logging.Logger, crossfade_duration: float = 10.0) -> None:
513 """Initialize StandardCrossFade with crossfade duration."""
514 self.crossfade_duration = crossfade_duration
515 super().__init__(logger)
516
517 def _build(self) -> None:
518 """Build the standard crossfade filter chain."""
519 self.filters = [
520 CrossfadeFilter(logger=self.logger, crossfade_duration=self.crossfade_duration),
521 ]
522
523 async def apply(
524 self, fade_out_part: bytes, fade_in_part: bytes, pcm_format: AudioFormat
525 ) -> bytes:
526 """Apply the standard crossfade to the given PCM audio parts."""
527 # We need to override the default apply here, since standard crossfade only needs to be
528 # applied to the overlapping parts, not the full buffers.
529 crossfade_size = int(pcm_format.pcm_sample_size * self.crossfade_duration)
530 # Pre-crossfade: outgoing track minus the crossfaded portion
531 pre_crossfade = fade_out_part[:-crossfade_size]
532 # Post-crossfade: incoming track minus the crossfaded portion
533 post_crossfade = fade_in_part[crossfade_size:]
534 # Adjust portions to exact crossfade size
535 adjusted_fade_in_part = fade_in_part[:crossfade_size]
536 adjusted_fade_out_part = fade_out_part[-crossfade_size:]
537 # Adjust the duration to match actual sizes
538 self.crossfade_duration = min(
539 len(adjusted_fade_in_part) / pcm_format.pcm_sample_size,
540 len(adjusted_fade_out_part) / pcm_format.pcm_sample_size,
541 )
542 # Crossfaded portion: user's configured duration
543 crossfaded_section = await super().apply(
544 adjusted_fade_out_part, adjusted_fade_in_part, pcm_format
545 )
546 # Full result: everything concatenated
547 return pre_crossfade + crossfaded_section + post_crossfade
548
549
550# HELPER METHODS
551def get_bpm_diff_percentage(bpm1: float, bpm2: float) -> float:
552 """Calculate BPM difference percentage between two BPM values."""
553 return abs(1.0 - bpm1 / bpm2) * 100
554
555
556def extrapolate_downbeats(
557 downbeats: npt.NDArray[np.float64],
558 tempo_factor: float,
559 buffer_size: float = SMART_CROSSFADE_DURATION,
560 bpm: float | None = None,
561) -> npt.NDArray[np.float64]:
562 """Extrapolate downbeats based on actual intervals when detection is incomplete.
563
564 This is needed when we want to perform beat alignment in an 'atmospheric' outro
565 that does not have any detected downbeats.
566
567 Args:
568 downbeats: Array of detected downbeat positions in seconds
569 tempo_factor: Tempo adjustment factor for time stretching
570 buffer_size: Maximum buffer size in seconds
571 bpm: Optional BPM for validation when extrapolating with only 2 downbeats
572 """
573 # Handle case with exactly 2 downbeats (with BPM validation)
574 if len(downbeats) == 2 and bpm is not None:
575 interval = float(downbeats[1] - downbeats[0])
576
577 # Expected interval for this BPM (assuming 4/4 time signature)
578 expected_interval = (60.0 / bpm) * 4
579
580 # Only extrapolate if interval matches BPM within 15% tolerance
581 if abs(interval - expected_interval) / expected_interval < 0.15:
582 # Adjust detected downbeats for time stretching first
583 adjusted_downbeats = downbeats / tempo_factor
584 last_downbeat = adjusted_downbeats[-1]
585
586 # If the last downbeat is close to the buffer end, no extrapolation needed
587 if last_downbeat >= buffer_size - 5:
588 return adjusted_downbeats
589
590 # Adjust the interval for time stretching
591 adjusted_interval = interval / tempo_factor
592
593 # Extrapolate forward from last adjusted downbeat using adjusted interval
594 extrapolated = []
595 current_pos = last_downbeat + adjusted_interval
596 max_extrapolation_distance = 125.0 # Don't extrapolate more than 25s
597
598 while (
599 current_pos < buffer_size
600 and (current_pos - last_downbeat) <= max_extrapolation_distance
601 ):
602 extrapolated.append(current_pos)
603 current_pos += adjusted_interval
604
605 if extrapolated:
606 # Combine adjusted detected downbeats and extrapolated downbeats
607 return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
608
609 return adjusted_downbeats
610 # else: interval doesn't match BPM, fall through to return original
611
612 if len(downbeats) < 2:
613 # Need at least 2 downbeats to extrapolate
614 return downbeats / tempo_factor
615
616 # Adjust detected downbeats for time stretching first
617 adjusted_downbeats = downbeats / tempo_factor
618 last_downbeat = adjusted_downbeats[-1]
619
620 # If the last downbeat is close to the buffer end, no extrapolation needed
621 if last_downbeat >= buffer_size - 5:
622 return adjusted_downbeats
623
624 # Calculate intervals from ORIGINAL downbeats (before time stretching)
625 intervals = np.diff(downbeats)
626 median_interval = float(np.median(intervals))
627 std_interval = float(np.std(intervals))
628
629 # Only extrapolate if intervals are consistent (low standard deviation)
630 if std_interval > 0.2:
631 return adjusted_downbeats
632
633 # Adjust the interval for time stretching
634 # When slowing down (tempo_factor < 1.0), intervals get longer
635 adjusted_interval = median_interval / tempo_factor
636
637 # Extrapolate forward from last adjusted downbeat using adjusted interval
638 extrapolated = []
639 current_pos = last_downbeat + adjusted_interval
640 max_extrapolation_distance = 25.0 # Don't extrapolate more than 25s
641
642 while current_pos < buffer_size and (current_pos - last_downbeat) <= max_extrapolation_distance:
643 extrapolated.append(current_pos)
644 current_pos += adjusted_interval
645
646 if extrapolated:
647 # Combine adjusted detected downbeats and extrapolated downbeats
648 return np.concatenate([adjusted_downbeats, np.array(extrapolated)])
649
650 return adjusted_downbeats
651