music-assistant-server

17.5 KBPY
__init__.py
17.5 KB420 lines • python
1"""Bandcamp music provider support for MusicAssistant."""
2
3import asyncio
4from collections.abc import AsyncGenerator
5from contextlib import suppress
6from typing import cast
7
8from bandcamp_async_api import (
9    BandcampAPIClient,
10    BandcampAPIError,
11    BandcampMustBeLoggedInError,
12    BandcampNotFoundError,
13    BandcampRateLimitError,
14    SearchResultAlbum,
15    SearchResultArtist,
16    SearchResultTrack,
17)
18from bandcamp_async_api.models import CollectionType
19from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
20from music_assistant_models.enums import ConfigEntryType, MediaType, ProviderFeature, StreamType
21from music_assistant_models.errors import (
22    InvalidDataError,
23    LoginFailed,
24    MediaNotFoundError,
25    ResourceTemporarilyUnavailable,
26)
27from music_assistant_models.media_items import Album, Artist, AudioFormat, SearchResults, Track
28from music_assistant_models.provider import ProviderManifest
29from music_assistant_models.streamdetails import StreamDetails
30
31from music_assistant.controllers.cache import use_cache
32from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
33from music_assistant.mass import MusicAssistant
34from music_assistant.models import ProviderInstanceType
35from music_assistant.models.music_provider import MusicProvider
36
37from .converters import BandcampConverters
38
39SUPPORTED_FEATURES = {
40    ProviderFeature.LIBRARY_ARTISTS,
41    ProviderFeature.LIBRARY_ALBUMS,
42    ProviderFeature.LIBRARY_TRACKS,
43    ProviderFeature.SEARCH,
44    ProviderFeature.ARTIST_ALBUMS,
45    ProviderFeature.ARTIST_TOPTRACKS,
46}
47
48CONF_IDENTITY = "identity"
49CONF_TOP_TRACKS_LIMIT = "top_tracks_limit"
50DEFAULT_TOP_TRACKS_LIMIT = 50
51CACHE = 3600 * 24 * 30  # Cache for 30 days
52
53
54async def setup(
55    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
56) -> ProviderInstanceType:
57    """Initialize provider(instance) with given configuration."""
58    return BandcampProvider(mass, manifest, config, SUPPORTED_FEATURES)
59
60
61# noinspection PyTypeHints,PyUnusedLocal
62async def get_config_entries(
63    mass: MusicAssistant,  # noqa: ARG001
64    instance_id: str | None = None,  # noqa: ARG001
65    action: str | None = None,  # noqa: ARG001
66    values: dict[str, ConfigValueType] | None = None,
67) -> tuple[ConfigEntry, ...]:
68    """Return Config entries to setup this provider."""
69    return (
70        ConfigEntry(
71            key=CONF_IDENTITY,
72            type=ConfigEntryType.SECURE_STRING,
73            label="Identity token",
74            required=False,
75            description="Identity token from Bandcamp cookies for account collection access."
76            " Log in https://bandcamp.com and extract browser cookie named 'identity'.",
77            value=values.get(CONF_IDENTITY) if values else None,
78        ),
79        ConfigEntry(
80            key=CONF_TOP_TRACKS_LIMIT,
81            type=ConfigEntryType.INTEGER,
82            label="Artist Top Tracks search limit",
83            required=False,
84            description="Search limit while getting artist top tracks.",
85            value=values.get(CONF_TOP_TRACKS_LIMIT) if values else DEFAULT_TOP_TRACKS_LIMIT,
86            default_value=DEFAULT_TOP_TRACKS_LIMIT,
87            advanced=True,
88        ),
89    )
90
91
92def split_id(id_: str) -> tuple[int, int | None, int | None]:
93    """Return (artist_id, album_id, track_id). Missing parts are returned as 0."""
94    parts = id_.split("-")
95    part_0 = int(parts[0])
96    part_1 = int(parts[1]) if len(parts) > 1 else 0
97    part_2 = int(parts[2]) if len(parts) > 2 else 0
98    return part_0, part_1, part_2
99
100
101class BandcampProvider(MusicProvider):
102    """Bandcamp provider support."""
103
104    _client: BandcampAPIClient
105    _converters: BandcampConverters
106    throttler: ThrottlerManager = ThrottlerManager(
107        rate_limit=50,  # requests per period seconds
108        period=10,
109        initial_backoff=3,  # Bandcamp responds with Retry-After 3
110        retry_attempts=10,
111    )
112    top_tracks_limit: int
113
114    async def handle_async_init(self) -> None:
115        """Handle async init of the Bandcamp provider."""
116        identity = self.config.get_value(CONF_IDENTITY)
117        self.top_tracks_limit = cast(
118            "int", self.config.get_value(CONF_TOP_TRACKS_LIMIT, DEFAULT_TOP_TRACKS_LIMIT)
119        )
120        self._client = BandcampAPIClient(
121            session=self.mass.http_session,
122            identity_token=identity,
123            default_retry_after=3,  # Bandcamp responds with Retry-After 3
124        )
125        self._converters = BandcampConverters(self.domain, self.instance_id)
126
127    @property
128    def is_streaming_provider(self) -> bool:
129        """Return True if the provider is a streaming provider."""
130        return True
131
132    @use_cache(CACHE)
133    @throttle_with_retries
134    async def search(
135        self, search_query: str, media_types: list[MediaType], limit: int = 50
136    ) -> SearchResults:
137        """Perform search on music provider."""
138        results = SearchResults()
139        if not self._client.identity:
140            return results
141
142        if not media_types:
143            return results
144
145        try:
146            search_results = await self._client.search(search_query)
147        except BandcampNotFoundError as error:
148            raise MediaNotFoundError("No results for Bandcamp search") from error
149        except BandcampRateLimitError as error:
150            raise ResourceTemporarilyUnavailable(
151                "Bandcamp rate limit reached", backoff_time=error.retry_after
152            ) from error
153        except BandcampAPIError as error:
154            raise InvalidDataError("Unexpected error during Bandcamp search") from error
155
156        for item in search_results[:limit]:
157            try:
158                if isinstance(item, SearchResultTrack) and MediaType.TRACK in media_types:
159                    results.tracks = [*results.tracks, self._converters.track_from_search(item)]
160                elif isinstance(item, SearchResultAlbum) and MediaType.ALBUM in media_types:
161                    results.albums = [*results.albums, self._converters.album_from_search(item)]
162                elif isinstance(item, SearchResultArtist) and MediaType.ARTIST in media_types:
163                    results.artists = [*results.artists, self._converters.artist_from_search(item)]
164            except BandcampAPIError as error:
165                self.logger.warning("Failed to convert search result item: %s", error)
166                continue
167
168        return results
169
170    async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
171        """Retrieve library artists from Bandcamp."""
172        if not self._client.identity:  # library requires identity
173            return
174
175        try:
176            async with self.throttler.acquire():  # AsyncGenerator method cannot be decorated
177                collection = await self._client.get_collection_items(CollectionType.COLLECTION)
178            band_ids = set()
179            for item in collection.items:
180                if item.item_type == "band":
181                    band_ids.add(item.item_id)
182                elif item.item_type == "album":
183                    band_ids.add(item.band_id)
184
185            for band_id in band_ids:
186                yield await self.get_artist(band_id)
187                await asyncio.sleep(0)  # Yield control to avoid blocking
188
189        except BandcampMustBeLoggedInError as error:
190            self.logger.error("Error getting Bandcamp library artists: Wrong identity token.")
191            raise LoginFailed("Wrong Bandcamp identity token.") from error
192        except BandcampNotFoundError as error:
193            raise MediaNotFoundError("Bandcamp library artists returned no results") from error
194        except BandcampRateLimitError as error:
195            raise ResourceTemporarilyUnavailable(
196                "Bandcamp rate limit reached", backoff_time=error.retry_after
197            ) from error
198        except BandcampAPIError as error:
199            raise MediaNotFoundError("Failed to get library artists") from error
200
201    async def get_library_albums(self) -> AsyncGenerator[Album, None]:
202        """Retrieve library albums from Bandcamp."""
203        if not self._client.identity:  # library requires identity
204            return
205
206        try:
207            async with self.throttler.acquire():  # AsyncGenerator method cannot be decorated
208                api_collection = await self._client.get_collection_items(CollectionType.COLLECTION)
209            for item in api_collection.items:
210                if item.item_type == "album":
211                    yield await self.get_album(f"{item.band_id}-{item.item_id}")
212                    await asyncio.sleep(0)  # Yield control to avoid blocking
213        except BandcampMustBeLoggedInError as error:
214            self.logger.error("Error getting Bandcamp library albums: Wrong identity token.")
215            raise LoginFailed("Wrong Bandcamp identity token.") from error
216        except BandcampNotFoundError as error:
217            raise MediaNotFoundError("Bandcamp library albums returned no results") from error
218        except BandcampRateLimitError as error:
219            raise ResourceTemporarilyUnavailable(
220                "Bandcamp rate limit reached", backoff_time=error.retry_after
221            ) from error
222        except BandcampAPIError as error:
223            raise MediaNotFoundError("Failed to get library albums") from error
224
225    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
226        """Retrieve library tracks from Bandcamp."""
227        if not self._client.identity:  # library requires identity
228            return
229
230        async for album in self.get_library_albums():
231            tracks = await self.get_album_tracks(album.item_id)
232            for track in tracks:
233                yield track
234                await asyncio.sleep(0)  # Yield control to avoid blocking
235
236    @use_cache(CACHE)
237    @throttle_with_retries
238    async def get_artist(self, prov_artist_id: str | int) -> Artist:
239        """Get full artist details by id."""
240        try:
241            api_artist = await self._client.get_artist(prov_artist_id)
242            return self._converters.artist_from_api(api_artist)
243        except BandcampNotFoundError as error:
244            raise MediaNotFoundError(
245                f"Bandcamp artist {prov_artist_id} search returned no results"
246            ) from error
247        except BandcampRateLimitError as error:
248            raise ResourceTemporarilyUnavailable(
249                "Bandcamp rate limit reached", backoff_time=error.retry_after
250            ) from error
251        except BandcampAPIError as error:
252            raise MediaNotFoundError(f"Failed to get artist {prov_artist_id}") from error
253
254    @use_cache(CACHE)
255    @throttle_with_retries
256    async def get_album(self, prov_album_id: str) -> Album:
257        """Get full album details by id."""
258        artist_id, album_id, _ = split_id(prov_album_id)
259        try:
260            api_album = await self._client.get_album(artist_id, album_id)
261            return self._converters.album_from_api(api_album)
262        except BandcampNotFoundError as error:
263            raise MediaNotFoundError(
264                f"Bandcamp album {prov_album_id} search returned no results"
265            ) from error
266        except BandcampRateLimitError as error:
267            raise ResourceTemporarilyUnavailable(
268                "Bandcamp rate limit reached", backoff_time=error.retry_after
269            ) from error
270        except BandcampAPIError as error:
271            raise MediaNotFoundError(f"Failed to get album {prov_album_id}") from error
272
273    @use_cache(CACHE)
274    @throttle_with_retries
275    async def get_track(self, prov_track_id: str) -> Track:
276        """Get full track details by id."""
277        artist_id, album_id, track_id = split_id(prov_track_id)
278        if track_id is None:  # artist_id-track_id
279            album_id, track_id = None, album_id
280
281        try:
282            if all((artist_id, album_id, track_id)):
283                api_album = await self._client.get_album(artist_id, album_id)
284                api_track = next((_ for _ in api_album.tracks if _.id == track_id), None)
285                return self._converters.track_from_api(
286                    track=api_track,
287                    album_id=api_album.id,
288                    album_name=api_album.title,
289                    album_image_url=api_album.art_url,
290                )
291            if not album_id:
292                api_track = await self._client.get_track(artist_id, track_id)
293                return self._converters.track_from_api(
294                    track=api_track,
295                    album_id=api_track.album.id if api_track.album else None,
296                    album_name=api_track.album.title if api_track.album else "",
297                    album_image_url=api_track.album.art_url if api_track.album else "",
298                )
299            raise MediaNotFoundError(f"Track {prov_track_id} not found on Bandcamp")
300        except BandcampNotFoundError as error:
301            raise MediaNotFoundError(
302                f"Bandcamp track {prov_track_id} search returned no results"
303            ) from error
304        except BandcampRateLimitError as error:
305            raise ResourceTemporarilyUnavailable(
306                "Bandcamp rate limit reached", backoff_time=error.retry_after
307            ) from error
308        except BandcampAPIError as error:
309            raise MediaNotFoundError(f"Failed to get track {prov_track_id}") from error
310
311    @use_cache(CACHE)
312    @throttle_with_retries
313    async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
314        """Get all tracks in an album."""
315        artist_id, album_id, _ = split_id(prov_album_id)
316        try:
317            api_album = await self._client.get_album(artist_id, album_id)
318            if api_album.tracks:
319                return [
320                    self._converters.track_from_api(
321                        track=track,
322                        album_id=album_id,
323                        album_name=api_album.title,
324                        album_image_url=api_album.art_url,
325                    )
326                    for track in api_album.tracks
327                    if track.streaming_url  # Only include tracks with streaming URLs
328                ]
329
330            return []
331
332        except BandcampNotFoundError as error:
333            raise MediaNotFoundError(
334                f"Bandcamp album {prov_album_id} tracks search returned no results"
335            ) from error
336        except BandcampRateLimitError as error:
337            raise ResourceTemporarilyUnavailable(
338                "Bandcamp rate limit reached", backoff_time=error.retry_after
339            ) from error
340        except BandcampAPIError as error:
341            raise MediaNotFoundError(f"Failed to get albums tracks for {prov_album_id}") from error
342
343    @use_cache(CACHE)
344    @throttle_with_retries
345    async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
346        """Get albums by an artist."""
347        albums = []
348        try:
349            api_discography = await self._client.get_artist_discography(prov_artist_id)
350            for item in api_discography:
351                if item.get("item_type") == "album" and item.get("item_id"):
352                    album = None
353
354                    with suppress(MediaNotFoundError):
355                        album = await self.get_album(f"{item['band_id']}-{item['item_id']}")
356
357                    with suppress(MediaNotFoundError):
358                        album = album or await self.get_album(f"{prov_artist_id}-{item['item_id']}")
359
360                    if album:
361                        albums.append(album)
362
363        except BandcampNotFoundError as error:
364            raise MediaNotFoundError(
365                f"Bandcamp artist {prov_artist_id} albums search returned no results"
366            ) from error
367        except BandcampRateLimitError as error:
368            raise ResourceTemporarilyUnavailable(
369                "Bandcamp rate limit reached", backoff_time=error.retry_after
370            ) from error
371        except BandcampAPIError as error:
372            raise MediaNotFoundError(f"Failed to get albums for artist {prov_artist_id}") from error
373
374        return albums
375
376    @use_cache(CACHE)
377    @throttle_with_retries
378    async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
379        """Get top tracks of an artist."""
380        tracks: list[Track] = []
381        # get_artist_albums and get_album_tracks already handle exceptions and rate limiting
382        albums = await self.get_artist_albums(prov_artist_id)
383        albums.sort(key=lambda album: (album.year is None, album.year or 0), reverse=True)
384        for album in albums:
385            tracks.extend(await self.get_album_tracks(album.item_id))
386            if len(tracks) >= self.top_tracks_limit:
387                break
388
389        return tracks[: self.top_tracks_limit]
390
391    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
392        """Return the content details for the given track."""
393        # get_track already handles exceptions and rate limiting
394        track_ma = await self.get_track(item_id)
395        if not track_ma.metadata.links:
396            raise MediaNotFoundError(
397                f"No streaming links found for track {item_id}. Please report this"
398            )
399
400        link = next(iter(track_ma.metadata.links))
401        if not link:
402            raise MediaNotFoundError(
403                f"No streaming URL found for track {item_id}. Please report this"
404            )
405
406        streaming_url = link.url
407        if not streaming_url:
408            raise MediaNotFoundError(f"No streaming URL found for track {item_id}: {streaming_url}")
409
410        return StreamDetails(
411            item_id=item_id,
412            provider=self.instance_id,
413            audio_format=AudioFormat(),
414            stream_type=StreamType.HTTP,
415            media_type=media_type,
416            path=streaming_url,
417            can_seek=True,
418            allow_seek=True,
419        )
420