/
/
/
1"""Bandcamp music provider support for MusicAssistant."""
2
3import asyncio
4from collections.abc import AsyncGenerator
5from contextlib import suppress
6from typing import cast
7
8from bandcamp_async_api import (
9 BandcampAPIClient,
10 BandcampAPIError,
11 BandcampMustBeLoggedInError,
12 BandcampNotFoundError,
13 BandcampRateLimitError,
14 SearchResultAlbum,
15 SearchResultArtist,
16 SearchResultTrack,
17)
18from bandcamp_async_api.models import CollectionType
19from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
20from music_assistant_models.enums import ConfigEntryType, MediaType, ProviderFeature, StreamType
21from music_assistant_models.errors import (
22 InvalidDataError,
23 LoginFailed,
24 MediaNotFoundError,
25 ResourceTemporarilyUnavailable,
26)
27from music_assistant_models.media_items import Album, Artist, AudioFormat, SearchResults, Track
28from music_assistant_models.provider import ProviderManifest
29from music_assistant_models.streamdetails import StreamDetails
30
31from music_assistant.controllers.cache import use_cache
32from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
33from music_assistant.mass import MusicAssistant
34from music_assistant.models import ProviderInstanceType
35from music_assistant.models.music_provider import MusicProvider
36
37from .converters import BandcampConverters
38
39SUPPORTED_FEATURES = {
40 ProviderFeature.LIBRARY_ARTISTS,
41 ProviderFeature.LIBRARY_ALBUMS,
42 ProviderFeature.LIBRARY_TRACKS,
43 ProviderFeature.SEARCH,
44 ProviderFeature.ARTIST_ALBUMS,
45 ProviderFeature.ARTIST_TOPTRACKS,
46}
47
48CONF_IDENTITY = "identity"
49CONF_TOP_TRACKS_LIMIT = "top_tracks_limit"
50DEFAULT_TOP_TRACKS_LIMIT = 50
51CACHE = 3600 * 24 * 30 # Cache for 30 days
52
53
54async def setup(
55 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
56) -> ProviderInstanceType:
57 """Initialize provider(instance) with given configuration."""
58 return BandcampProvider(mass, manifest, config, SUPPORTED_FEATURES)
59
60
61# noinspection PyTypeHints,PyUnusedLocal
62async def get_config_entries(
63 mass: MusicAssistant, # noqa: ARG001
64 instance_id: str | None = None, # noqa: ARG001
65 action: str | None = None, # noqa: ARG001
66 values: dict[str, ConfigValueType] | None = None,
67) -> tuple[ConfigEntry, ...]:
68 """Return Config entries to setup this provider."""
69 return (
70 ConfigEntry(
71 key=CONF_IDENTITY,
72 type=ConfigEntryType.SECURE_STRING,
73 label="Identity token",
74 required=False,
75 description="Identity token from Bandcamp cookies for account collection access."
76 " Log in https://bandcamp.com and extract browser cookie named 'identity'.",
77 value=values.get(CONF_IDENTITY) if values else None,
78 ),
79 ConfigEntry(
80 key=CONF_TOP_TRACKS_LIMIT,
81 type=ConfigEntryType.INTEGER,
82 label="Artist Top Tracks search limit",
83 required=False,
84 description="Search limit while getting artist top tracks.",
85 value=values.get(CONF_TOP_TRACKS_LIMIT) if values else DEFAULT_TOP_TRACKS_LIMIT,
86 default_value=DEFAULT_TOP_TRACKS_LIMIT,
87 advanced=True,
88 ),
89 )
90
91
92def split_id(id_: str) -> tuple[int, int | None, int | None]:
93 """Return (artist_id, album_id, track_id). Missing parts are returned as 0."""
94 parts = id_.split("-")
95 part_0 = int(parts[0])
96 part_1 = int(parts[1]) if len(parts) > 1 else 0
97 part_2 = int(parts[2]) if len(parts) > 2 else 0
98 return part_0, part_1, part_2
99
100
101class BandcampProvider(MusicProvider):
102 """Bandcamp provider support."""
103
104 _client: BandcampAPIClient
105 _converters: BandcampConverters
106 throttler: ThrottlerManager = ThrottlerManager(
107 rate_limit=50, # requests per period seconds
108 period=10,
109 initial_backoff=3, # Bandcamp responds with Retry-After 3
110 retry_attempts=10,
111 )
112 top_tracks_limit: int
113
114 async def handle_async_init(self) -> None:
115 """Handle async init of the Bandcamp provider."""
116 identity = self.config.get_value(CONF_IDENTITY)
117 self.top_tracks_limit = cast(
118 "int", self.config.get_value(CONF_TOP_TRACKS_LIMIT, DEFAULT_TOP_TRACKS_LIMIT)
119 )
120 self._client = BandcampAPIClient(
121 session=self.mass.http_session,
122 identity_token=identity,
123 default_retry_after=3, # Bandcamp responds with Retry-After 3
124 )
125 self._converters = BandcampConverters(self.domain, self.instance_id)
126
127 @property
128 def is_streaming_provider(self) -> bool:
129 """Return True if the provider is a streaming provider."""
130 return True
131
132 @use_cache(CACHE)
133 @throttle_with_retries
134 async def search(
135 self, search_query: str, media_types: list[MediaType], limit: int = 50
136 ) -> SearchResults:
137 """Perform search on music provider."""
138 results = SearchResults()
139 if not self._client.identity:
140 return results
141
142 if not media_types:
143 return results
144
145 try:
146 search_results = await self._client.search(search_query)
147 except BandcampNotFoundError as error:
148 raise MediaNotFoundError("No results for Bandcamp search") from error
149 except BandcampRateLimitError as error:
150 raise ResourceTemporarilyUnavailable(
151 "Bandcamp rate limit reached", backoff_time=error.retry_after
152 ) from error
153 except BandcampAPIError as error:
154 raise InvalidDataError("Unexpected error during Bandcamp search") from error
155
156 for item in search_results[:limit]:
157 try:
158 if isinstance(item, SearchResultTrack) and MediaType.TRACK in media_types:
159 results.tracks = [*results.tracks, self._converters.track_from_search(item)]
160 elif isinstance(item, SearchResultAlbum) and MediaType.ALBUM in media_types:
161 results.albums = [*results.albums, self._converters.album_from_search(item)]
162 elif isinstance(item, SearchResultArtist) and MediaType.ARTIST in media_types:
163 results.artists = [*results.artists, self._converters.artist_from_search(item)]
164 except BandcampAPIError as error:
165 self.logger.warning("Failed to convert search result item: %s", error)
166 continue
167
168 return results
169
170 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
171 """Retrieve library artists from Bandcamp."""
172 if not self._client.identity: # library requires identity
173 return
174
175 try:
176 async with self.throttler.acquire(): # AsyncGenerator method cannot be decorated
177 collection = await self._client.get_collection_items(CollectionType.COLLECTION)
178 band_ids = set()
179 for item in collection.items:
180 if item.item_type == "band":
181 band_ids.add(item.item_id)
182 elif item.item_type == "album":
183 band_ids.add(item.band_id)
184
185 for band_id in band_ids:
186 yield await self.get_artist(band_id)
187 await asyncio.sleep(0) # Yield control to avoid blocking
188
189 except BandcampMustBeLoggedInError as error:
190 self.logger.error("Error getting Bandcamp library artists: Wrong identity token.")
191 raise LoginFailed("Wrong Bandcamp identity token.") from error
192 except BandcampNotFoundError as error:
193 raise MediaNotFoundError("Bandcamp library artists returned no results") from error
194 except BandcampRateLimitError as error:
195 raise ResourceTemporarilyUnavailable(
196 "Bandcamp rate limit reached", backoff_time=error.retry_after
197 ) from error
198 except BandcampAPIError as error:
199 raise MediaNotFoundError("Failed to get library artists") from error
200
201 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
202 """Retrieve library albums from Bandcamp."""
203 if not self._client.identity: # library requires identity
204 return
205
206 try:
207 async with self.throttler.acquire(): # AsyncGenerator method cannot be decorated
208 api_collection = await self._client.get_collection_items(CollectionType.COLLECTION)
209 for item in api_collection.items:
210 if item.item_type == "album":
211 yield await self.get_album(f"{item.band_id}-{item.item_id}")
212 await asyncio.sleep(0) # Yield control to avoid blocking
213 except BandcampMustBeLoggedInError as error:
214 self.logger.error("Error getting Bandcamp library albums: Wrong identity token.")
215 raise LoginFailed("Wrong Bandcamp identity token.") from error
216 except BandcampNotFoundError as error:
217 raise MediaNotFoundError("Bandcamp library albums returned no results") from error
218 except BandcampRateLimitError as error:
219 raise ResourceTemporarilyUnavailable(
220 "Bandcamp rate limit reached", backoff_time=error.retry_after
221 ) from error
222 except BandcampAPIError as error:
223 raise MediaNotFoundError("Failed to get library albums") from error
224
225 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
226 """Retrieve library tracks from Bandcamp."""
227 if not self._client.identity: # library requires identity
228 return
229
230 async for album in self.get_library_albums():
231 tracks = await self.get_album_tracks(album.item_id)
232 for track in tracks:
233 yield track
234 await asyncio.sleep(0) # Yield control to avoid blocking
235
236 @use_cache(CACHE)
237 @throttle_with_retries
238 async def get_artist(self, prov_artist_id: str | int) -> Artist:
239 """Get full artist details by id."""
240 try:
241 api_artist = await self._client.get_artist(prov_artist_id)
242 return self._converters.artist_from_api(api_artist)
243 except BandcampNotFoundError as error:
244 raise MediaNotFoundError(
245 f"Bandcamp artist {prov_artist_id} search returned no results"
246 ) from error
247 except BandcampRateLimitError as error:
248 raise ResourceTemporarilyUnavailable(
249 "Bandcamp rate limit reached", backoff_time=error.retry_after
250 ) from error
251 except BandcampAPIError as error:
252 raise MediaNotFoundError(f"Failed to get artist {prov_artist_id}") from error
253
254 @use_cache(CACHE)
255 @throttle_with_retries
256 async def get_album(self, prov_album_id: str) -> Album:
257 """Get full album details by id."""
258 artist_id, album_id, _ = split_id(prov_album_id)
259 try:
260 api_album = await self._client.get_album(artist_id, album_id)
261 return self._converters.album_from_api(api_album)
262 except BandcampNotFoundError as error:
263 raise MediaNotFoundError(
264 f"Bandcamp album {prov_album_id} search returned no results"
265 ) from error
266 except BandcampRateLimitError as error:
267 raise ResourceTemporarilyUnavailable(
268 "Bandcamp rate limit reached", backoff_time=error.retry_after
269 ) from error
270 except BandcampAPIError as error:
271 raise MediaNotFoundError(f"Failed to get album {prov_album_id}") from error
272
273 @use_cache(CACHE)
274 @throttle_with_retries
275 async def get_track(self, prov_track_id: str) -> Track:
276 """Get full track details by id."""
277 artist_id, album_id, track_id = split_id(prov_track_id)
278 if track_id is None: # artist_id-track_id
279 album_id, track_id = None, album_id
280
281 try:
282 if all((artist_id, album_id, track_id)):
283 api_album = await self._client.get_album(artist_id, album_id)
284 api_track = next((_ for _ in api_album.tracks if _.id == track_id), None)
285 return self._converters.track_from_api(
286 track=api_track,
287 album_id=api_album.id,
288 album_name=api_album.title,
289 album_image_url=api_album.art_url,
290 )
291 if not album_id:
292 api_track = await self._client.get_track(artist_id, track_id)
293 return self._converters.track_from_api(
294 track=api_track,
295 album_id=api_track.album.id if api_track.album else None,
296 album_name=api_track.album.title if api_track.album else "",
297 album_image_url=api_track.album.art_url if api_track.album else "",
298 )
299 raise MediaNotFoundError(f"Track {prov_track_id} not found on Bandcamp")
300 except BandcampNotFoundError as error:
301 raise MediaNotFoundError(
302 f"Bandcamp track {prov_track_id} search returned no results"
303 ) from error
304 except BandcampRateLimitError as error:
305 raise ResourceTemporarilyUnavailable(
306 "Bandcamp rate limit reached", backoff_time=error.retry_after
307 ) from error
308 except BandcampAPIError as error:
309 raise MediaNotFoundError(f"Failed to get track {prov_track_id}") from error
310
311 @use_cache(CACHE)
312 @throttle_with_retries
313 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
314 """Get all tracks in an album."""
315 artist_id, album_id, _ = split_id(prov_album_id)
316 try:
317 api_album = await self._client.get_album(artist_id, album_id)
318 if api_album.tracks:
319 return [
320 self._converters.track_from_api(
321 track=track,
322 album_id=album_id,
323 album_name=api_album.title,
324 album_image_url=api_album.art_url,
325 )
326 for track in api_album.tracks
327 if track.streaming_url # Only include tracks with streaming URLs
328 ]
329
330 return []
331
332 except BandcampNotFoundError as error:
333 raise MediaNotFoundError(
334 f"Bandcamp album {prov_album_id} tracks search returned no results"
335 ) from error
336 except BandcampRateLimitError as error:
337 raise ResourceTemporarilyUnavailable(
338 "Bandcamp rate limit reached", backoff_time=error.retry_after
339 ) from error
340 except BandcampAPIError as error:
341 raise MediaNotFoundError(f"Failed to get albums tracks for {prov_album_id}") from error
342
343 @use_cache(CACHE)
344 @throttle_with_retries
345 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
346 """Get albums by an artist."""
347 albums = []
348 try:
349 api_discography = await self._client.get_artist_discography(prov_artist_id)
350 for item in api_discography:
351 if item.get("item_type") == "album" and item.get("item_id"):
352 album = None
353
354 with suppress(MediaNotFoundError):
355 album = await self.get_album(f"{item['band_id']}-{item['item_id']}")
356
357 with suppress(MediaNotFoundError):
358 album = album or await self.get_album(f"{prov_artist_id}-{item['item_id']}")
359
360 if album:
361 albums.append(album)
362
363 except BandcampNotFoundError as error:
364 raise MediaNotFoundError(
365 f"Bandcamp artist {prov_artist_id} albums search returned no results"
366 ) from error
367 except BandcampRateLimitError as error:
368 raise ResourceTemporarilyUnavailable(
369 "Bandcamp rate limit reached", backoff_time=error.retry_after
370 ) from error
371 except BandcampAPIError as error:
372 raise MediaNotFoundError(f"Failed to get albums for artist {prov_artist_id}") from error
373
374 return albums
375
376 @use_cache(CACHE)
377 @throttle_with_retries
378 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
379 """Get top tracks of an artist."""
380 tracks: list[Track] = []
381 # get_artist_albums and get_album_tracks already handle exceptions and rate limiting
382 albums = await self.get_artist_albums(prov_artist_id)
383 albums.sort(key=lambda album: (album.year is None, album.year or 0), reverse=True)
384 for album in albums:
385 tracks.extend(await self.get_album_tracks(album.item_id))
386 if len(tracks) >= self.top_tracks_limit:
387 break
388
389 return tracks[: self.top_tracks_limit]
390
391 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
392 """Return the content details for the given track."""
393 # get_track already handles exceptions and rate limiting
394 track_ma = await self.get_track(item_id)
395 if not track_ma.metadata.links:
396 raise MediaNotFoundError(
397 f"No streaming links found for track {item_id}. Please report this"
398 )
399
400 link = next(iter(track_ma.metadata.links))
401 if not link:
402 raise MediaNotFoundError(
403 f"No streaming URL found for track {item_id}. Please report this"
404 )
405
406 streaming_url = link.url
407 if not streaming_url:
408 raise MediaNotFoundError(f"No streaming URL found for track {item_id}: {streaming_url}")
409
410 return StreamDetails(
411 item_id=item_id,
412 provider=self.instance_id,
413 audio_format=AudioFormat(),
414 stream_type=StreamType.HTTP,
415 media_type=media_type,
416 path=streaming_url,
417 can_seek=True,
418 allow_seek=True,
419 )
420