/
/
/
1"""Streaming functionality using librespot for Spotify provider."""
2
3from __future__ import annotations
4
5import asyncio
6from collections import deque
7from collections.abc import AsyncGenerator
8from typing import TYPE_CHECKING
9
10from music_assistant_models.enums import MediaType
11from music_assistant_models.errors import AudioError
12
13from music_assistant.constants import VERBOSE_LOG_LEVEL
14from music_assistant.helpers.process import AsyncProcess
15
16if TYPE_CHECKING:
17 from music_assistant_models.streamdetails import StreamDetails
18
19 from .provider import SpotifyProvider
20
21
22class LibrespotStreamer:
23 """Handles streaming functionality using librespot."""
24
25 def __init__(self, provider: SpotifyProvider) -> None:
26 """Initialize the LibrespotStreamer."""
27 self.provider = provider
28
29 async def get_audio_stream(
30 self, streamdetails: StreamDetails, seek_position: int = 0
31 ) -> AsyncGenerator[bytes, None]:
32 """Return the audio stream for the provider item."""
33 # Regular track/episode streaming - audiobooks are handled in the provider
34 media_type = "episode" if streamdetails.media_type == MediaType.PODCAST_EPISODE else "track"
35 spotify_uri = f"spotify://{media_type}:{streamdetails.item_id}"
36 async for chunk in self.stream_spotify_uri(spotify_uri, seek_position):
37 yield chunk
38
39 async def stream_spotify_uri(
40 self, spotify_uri: str, seek_position: int = 0
41 ) -> AsyncGenerator[bytes, None]:
42 """Return the audio stream for the Spotify URI."""
43 self.provider.logger.log(
44 VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot"
45 )
46 # Validate that librespot binary is available
47 if not self.provider._librespot_bin:
48 raise AudioError("Librespot binary not available")
49
50 args = [
51 self.provider._librespot_bin,
52 "--cache",
53 self.provider.cache_dir,
54 "--disable-audio-cache",
55 "--passthrough",
56 "--bitrate",
57 "320",
58 "--backend",
59 "pipe",
60 "--single-track",
61 spotify_uri,
62 "--disable-discovery",
63 "--dither",
64 "none",
65 ]
66 if seek_position:
67 args += ["--start-position", str(int(seek_position))]
68
69 async with AsyncProcess(
70 args,
71 stdout=True,
72 stderr=True,
73 name="librespot",
74 ) as librespot_proc:
75 log_history: deque[str] = deque(maxlen=10)
76 logger = self.provider.logger
77
78 async def log_librespot_output() -> None:
79 """Log librespot output if verbose logging is enabled."""
80 async for line in librespot_proc.iter_stderr():
81 log_history.append(line)
82 if "ERROR" in line or "WARNING" in line:
83 logger.warning("[librespot] %s", line)
84 if "Unable to read audio file" in line:
85 # if this happens, we should stop the process to avoid hanging
86 await librespot_proc.close()
87 else:
88 logger.log(VERBOSE_LOG_LEVEL, "[librespot] %s", line)
89
90 librespot_proc.attach_stderr_reader(asyncio.create_task(log_librespot_output()))
91 # yield from librespot's stdout
92 async for chunk in librespot_proc.iter_chunked():
93 yield chunk
94
95 if librespot_proc.returncode != 0:
96 raise AudioError(
97 f"Librespot exited with code {librespot_proc.returncode} for {spotify_uri}"
98 )
99