music-assistant-server

3.6 KBPY
streaming.py
3.6 KB99 lines • python
1"""Streaming functionality using librespot for Spotify provider."""
2
3from __future__ import annotations
4
5import asyncio
6from collections import deque
7from collections.abc import AsyncGenerator
8from typing import TYPE_CHECKING
9
10from music_assistant_models.enums import MediaType
11from music_assistant_models.errors import AudioError
12
13from music_assistant.constants import VERBOSE_LOG_LEVEL
14from music_assistant.helpers.process import AsyncProcess
15
16if TYPE_CHECKING:
17    from music_assistant_models.streamdetails import StreamDetails
18
19    from .provider import SpotifyProvider
20
21
22class LibrespotStreamer:
23    """Handles streaming functionality using librespot."""
24
25    def __init__(self, provider: SpotifyProvider) -> None:
26        """Initialize the LibrespotStreamer."""
27        self.provider = provider
28
29    async def get_audio_stream(
30        self, streamdetails: StreamDetails, seek_position: int = 0
31    ) -> AsyncGenerator[bytes, None]:
32        """Return the audio stream for the provider item."""
33        # Regular track/episode streaming - audiobooks are handled in the provider
34        media_type = "episode" if streamdetails.media_type == MediaType.PODCAST_EPISODE else "track"
35        spotify_uri = f"spotify://{media_type}:{streamdetails.item_id}"
36        async for chunk in self.stream_spotify_uri(spotify_uri, seek_position):
37            yield chunk
38
39    async def stream_spotify_uri(
40        self, spotify_uri: str, seek_position: int = 0
41    ) -> AsyncGenerator[bytes, None]:
42        """Return the audio stream for the Spotify URI."""
43        self.provider.logger.log(
44            VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot"
45        )
46        # Validate that librespot binary is available
47        if not self.provider._librespot_bin:
48            raise AudioError("Librespot binary not available")
49
50        args = [
51            self.provider._librespot_bin,
52            "--cache",
53            self.provider.cache_dir,
54            "--disable-audio-cache",
55            "--passthrough",
56            "--bitrate",
57            "320",
58            "--backend",
59            "pipe",
60            "--single-track",
61            spotify_uri,
62            "--disable-discovery",
63            "--dither",
64            "none",
65        ]
66        if seek_position:
67            args += ["--start-position", str(int(seek_position))]
68
69        async with AsyncProcess(
70            args,
71            stdout=True,
72            stderr=True,
73            name="librespot",
74        ) as librespot_proc:
75            log_history: deque[str] = deque(maxlen=10)
76            logger = self.provider.logger
77
78            async def log_librespot_output() -> None:
79                """Log librespot output if verbose logging is enabled."""
80                async for line in librespot_proc.iter_stderr():
81                    log_history.append(line)
82                    if "ERROR" in line or "WARNING" in line:
83                        logger.warning("[librespot] %s", line)
84                        if "Unable to read audio file" in line:
85                            # if this happens, we should stop the process to avoid hanging
86                            await librespot_proc.close()
87                    else:
88                        logger.log(VERBOSE_LOG_LEVEL, "[librespot] %s", line)
89
90            librespot_proc.attach_stderr_reader(asyncio.create_task(log_librespot_output()))
91            # yield from librespot's stdout
92            async for chunk in librespot_proc.iter_chunked():
93                yield chunk
94
95            if librespot_proc.returncode != 0:
96                raise AudioError(
97                    f"Librespot exited with code {librespot_proc.returncode} for {spotify_uri}"
98                )
99