/
/
/
1"""Bandcamp music provider support for MusicAssistant."""
2
3import asyncio
4from collections.abc import AsyncGenerator
5from contextlib import suppress
6from typing import cast
7
8from bandcamp_async_api import (
9 BandcampAPIClient,
10 BandcampAPIError,
11 BandcampMustBeLoggedInError,
12 BandcampNotFoundError,
13 SearchResultAlbum,
14 SearchResultArtist,
15 SearchResultTrack,
16)
17from bandcamp_async_api.models import CollectionType
18from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
19from music_assistant_models.enums import ConfigEntryType, MediaType, ProviderFeature, StreamType
20from music_assistant_models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
21from music_assistant_models.media_items import Album, Artist, AudioFormat, SearchResults, Track
22from music_assistant_models.provider import ProviderManifest
23from music_assistant_models.streamdetails import StreamDetails
24
25from music_assistant.controllers.cache import use_cache
26from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
27from music_assistant.mass import MusicAssistant
28from music_assistant.models import ProviderInstanceType
29from music_assistant.models.music_provider import MusicProvider
30
31from .converters import BandcampConverters
32
33SUPPORTED_FEATURES = {
34 ProviderFeature.LIBRARY_ARTISTS,
35 ProviderFeature.LIBRARY_ALBUMS,
36 ProviderFeature.LIBRARY_TRACKS,
37 ProviderFeature.SEARCH,
38 ProviderFeature.ARTIST_ALBUMS,
39 ProviderFeature.ARTIST_TOPTRACKS,
40}
41
42CONF_IDENTITY = "identity"
43CONF_TOP_TRACKS_LIMIT = "top_tracks_limit"
44DEFAULT_TOP_TRACKS_LIMIT = 50
45CACHE = 3600 * 24 * 30 # Cache for 30 days
46
47
48async def setup(
49 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
50) -> ProviderInstanceType:
51 """Initialize provider(instance) with given configuration."""
52 return BandcampProvider(mass, manifest, config, SUPPORTED_FEATURES)
53
54
55# noinspection PyTypeHints,PyUnusedLocal
56async def get_config_entries(
57 mass: MusicAssistant, # noqa: ARG001
58 instance_id: str | None = None, # noqa: ARG001
59 action: str | None = None, # noqa: ARG001
60 values: dict[str, ConfigValueType] | None = None,
61) -> tuple[ConfigEntry, ...]:
62 """Return Config entries to setup this provider."""
63 return (
64 ConfigEntry(
65 key=CONF_IDENTITY,
66 type=ConfigEntryType.SECURE_STRING,
67 label="Identity token",
68 required=False,
69 description="Identity token from Bandcamp cookies for account collection access."
70 " Log in https://bandcamp.com and extract browser cookie named 'identity'.",
71 value=values.get(CONF_IDENTITY) if values else None,
72 ),
73 ConfigEntry(
74 key=CONF_TOP_TRACKS_LIMIT,
75 type=ConfigEntryType.INTEGER,
76 label="Artist Top Tracks search limit",
77 required=False,
78 description="Search limit while getting artist top tracks.",
79 value=values.get(CONF_TOP_TRACKS_LIMIT) if values else DEFAULT_TOP_TRACKS_LIMIT,
80 default_value=DEFAULT_TOP_TRACKS_LIMIT,
81 advanced=True,
82 ),
83 )
84
85
86def split_id(id_: str) -> tuple[int, int | None, int | None]:
87 """Return (artist_id, album_id, track_id). Missing parts are returned as 0."""
88 parts = id_.split("-")
89 part_0 = int(parts[0])
90 part_1 = int(parts[1]) if len(parts) > 1 else 0
91 part_2 = int(parts[2]) if len(parts) > 2 else 0
92 return part_0, part_1, part_2
93
94
95class BandcampProvider(MusicProvider):
96 """Bandcamp provider support."""
97
98 _client: BandcampAPIClient
99 _converters: BandcampConverters
100 throttler: ThrottlerManager
101 top_tracks_limit: int
102
103 async def handle_async_init(self) -> None:
104 """Handle async init of the Bandcamp provider."""
105 identity = self.config.get_value(CONF_IDENTITY)
106 self.top_tracks_limit = cast(
107 "int", self.config.get_value(CONF_TOP_TRACKS_LIMIT, DEFAULT_TOP_TRACKS_LIMIT)
108 )
109
110 # Initialize the new async API client
111 self._client = BandcampAPIClient(session=self.mass.http_session, identity_token=identity)
112
113 self.throttler = ThrottlerManager(rate_limit=1, period=2)
114 self._converters = BandcampConverters(self.domain, self.instance_id)
115
116 @property
117 def is_streaming_provider(self) -> bool:
118 """Return True if the provider is a streaming provider."""
119 return True
120
121 @use_cache(CACHE)
122 @throttle_with_retries
123 async def search(
124 self, search_query: str, media_types: list[MediaType], limit: int = 50
125 ) -> SearchResults:
126 """Perform search on music provider."""
127 results = SearchResults()
128 if not self._client.identity:
129 return results
130
131 if not media_types:
132 return results
133
134 try:
135 search_results = await self._client.search(search_query)
136 except BandcampNotFoundError as error:
137 raise MediaNotFoundError("No results for Bandcamp search") from error
138 except BandcampAPIError as error:
139 raise InvalidDataError("Unexpected error during Bandcamp search") from error
140
141 for item in search_results[:limit]:
142 try:
143 if isinstance(item, SearchResultTrack) and MediaType.TRACK in media_types:
144 results.tracks = [*results.tracks, self._converters.track_from_search(item)]
145 elif isinstance(item, SearchResultAlbum) and MediaType.ALBUM in media_types:
146 results.albums = [*results.albums, self._converters.album_from_search(item)]
147 elif isinstance(item, SearchResultArtist) and MediaType.ARTIST in media_types:
148 results.artists = [*results.artists, self._converters.artist_from_search(item)]
149 except BandcampAPIError as error:
150 self.logger.warning("Failed to convert search result item: %s", error)
151 continue
152
153 return results
154
155 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
156 """Retrieve library artists from Bandcamp."""
157 if not self._client.identity: # library requires identity
158 return
159
160 try:
161 collection = await self._client.get_collection_items(CollectionType.COLLECTION)
162 band_ids = set()
163 for item in collection.items:
164 if item.item_type == "band":
165 band_ids.add(item.item_id)
166 elif item.item_type == "album":
167 band_ids.add(item.band_id)
168
169 for band_id in band_ids:
170 yield await self.get_artist(band_id)
171 await asyncio.sleep(0) # Yield control to avoid blocking
172
173 except BandcampMustBeLoggedInError as error:
174 self.logger.error("Error getting Bandcamp library artists: Wrong identity token.")
175 raise LoginFailed("Wrong Bandcamp identity token.") from error
176 except BandcampNotFoundError as error:
177 raise MediaNotFoundError("Bandcamp library artists returned no results") from error
178 except BandcampAPIError as error:
179 raise MediaNotFoundError("Failed to get library artists") from error
180
181 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
182 """Retrieve library albums from Bandcamp."""
183 if not self._client.identity: # library requires identity
184 return
185
186 try:
187 api_collection = await self._client.get_collection_items(CollectionType.COLLECTION)
188 for item in api_collection.items:
189 if item.item_type == "album":
190 yield await self.get_album(f"{item.band_id}-{item.item_id}")
191 await asyncio.sleep(0) # Yield control to avoid blocking
192 except BandcampMustBeLoggedInError as error:
193 self.logger.error("Error getting Bandcamp library albums: Wrong identity token.")
194 raise LoginFailed("Wrong Bandcamp identity token.") from error
195 except BandcampNotFoundError as error:
196 raise MediaNotFoundError("Bandcamp library albums returned no results") from error
197 except BandcampAPIError as error:
198 raise MediaNotFoundError("Failed to get library albums") from error
199
200 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
201 """Retrieve library tracks from Bandcamp."""
202 if not self._client.identity: # library requires identity
203 return
204
205 try:
206 async for album in self.get_library_albums():
207 tracks = await self.get_album_tracks(album.item_id)
208 for track in tracks:
209 yield track
210 await asyncio.sleep(0) # Yield control to avoid blocking
211 except BandcampMustBeLoggedInError as error:
212 self.logger.error("Error getting Bandcamp library tracks: Wrong identity token.")
213 raise LoginFailed("Wrong Bandcamp identity token.") from error
214 except BandcampNotFoundError as error:
215 raise MediaNotFoundError("Bandcamp library tracks returned no results") from error
216 except BandcampAPIError as error:
217 raise MediaNotFoundError("Failed to get library tracks") from error
218
219 @use_cache(CACHE)
220 async def get_artist(self, prov_artist_id: str | int) -> Artist:
221 """Get full artist details by id."""
222 try:
223 api_artist = await self._client.get_artist(prov_artist_id)
224 return self._converters.artist_from_api(api_artist)
225 except BandcampNotFoundError as error:
226 raise MediaNotFoundError(
227 f"Bandcamp artist {prov_artist_id} search returned no results"
228 ) from error
229 except BandcampAPIError as error:
230 raise MediaNotFoundError(f"Failed to get artist {prov_artist_id}") from error
231
232 @use_cache(CACHE)
233 async def get_album(self, prov_album_id: str) -> Album:
234 """Get full album details by id."""
235 artist_id, album_id, _ = split_id(prov_album_id)
236 try:
237 api_album = await self._client.get_album(artist_id, album_id)
238 return self._converters.album_from_api(api_album)
239 except BandcampNotFoundError as error:
240 raise MediaNotFoundError(
241 f"Bandcamp album {prov_album_id} search returned no results"
242 ) from error
243 except BandcampAPIError as error:
244 raise MediaNotFoundError(f"Failed to get album {prov_album_id}") from error
245
246 @use_cache(CACHE)
247 async def get_track(self, prov_track_id: str) -> Track:
248 """Get full track details by id."""
249 artist_id, album_id, track_id = split_id(prov_track_id)
250 if track_id is None: # artist_id-track_id
251 album_id, track_id = None, album_id
252
253 try:
254 if all((artist_id, album_id, track_id)):
255 api_album = await self._client.get_album(artist_id, album_id)
256 api_track = next((_ for _ in api_album.tracks if _.id == track_id), None)
257 return self._converters.track_from_api(
258 track=api_track,
259 album_id=api_album.id,
260 album_name=api_album.title,
261 album_image_url=api_album.art_url,
262 )
263 if not album_id:
264 api_track = await self._client.get_track(artist_id, track_id)
265 return self._converters.track_from_api(
266 track=api_track,
267 album_id=api_track.album.id if api_track.album else None,
268 album_name=api_track.album.title if api_track.album else "",
269 album_image_url=api_track.album.art_url if api_track.album else "",
270 )
271 raise MediaNotFoundError(f"Track {prov_track_id} not found on Bandcamp")
272 except BandcampNotFoundError as error:
273 raise MediaNotFoundError(
274 f"Bandcamp track {prov_track_id} search returned no results"
275 ) from error
276 except BandcampAPIError as error:
277 raise MediaNotFoundError(f"Failed to get track {prov_track_id}") from error
278
279 @use_cache(CACHE)
280 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
281 """Get all tracks in an album."""
282 artist_id, album_id, _ = split_id(prov_album_id)
283 try:
284 api_album = await self._client.get_album(artist_id, album_id)
285 if api_album.tracks:
286 return [
287 self._converters.track_from_api(
288 track=track,
289 album_id=album_id,
290 album_name=api_album.title,
291 album_image_url=api_album.art_url,
292 )
293 for track in api_album.tracks
294 if track.streaming_url # Only include tracks with streaming URLs
295 ]
296
297 return []
298
299 except BandcampNotFoundError as error:
300 raise MediaNotFoundError(
301 f"Bandcamp album {prov_album_id} tracks search returned no results"
302 ) from error
303 except BandcampAPIError as error:
304 raise MediaNotFoundError(f"Failed to get albums tracks for {prov_album_id}") from error
305
306 @use_cache(CACHE)
307 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
308 """Get albums by an artist."""
309 albums = []
310 try:
311 api_discography = await self._client.get_artist_discography(prov_artist_id)
312 for item in api_discography:
313 if item.get("item_type") == "album" and item.get("item_id"):
314 album = None
315
316 with suppress(MediaNotFoundError):
317 album = await self.get_album(f"{item['band_id']}-{item['item_id']}")
318
319 with suppress(MediaNotFoundError):
320 album = album or await self.get_album(f"{prov_artist_id}-{item['item_id']}")
321
322 if album:
323 albums.append(album)
324
325 except BandcampNotFoundError as error:
326 raise MediaNotFoundError(
327 f"Bandcamp artist {prov_artist_id} albums search returned no results"
328 ) from error
329 except BandcampAPIError as error:
330 raise MediaNotFoundError(f"Failed to get albums for artist {prov_artist_id}") from error
331
332 return albums
333
334 @use_cache(CACHE)
335 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
336 """Get top tracks of an artist."""
337 tracks: list[Track] = []
338 try:
339 albums = await self.get_artist_albums(prov_artist_id)
340 albums.sort(key=lambda album: (album.year is None, album.year or 0), reverse=True)
341 for album in albums:
342 tracks.extend(await self.get_album_tracks(album.item_id))
343 if len(tracks) >= self.top_tracks_limit:
344 break
345
346 except BandcampNotFoundError as error:
347 raise MediaNotFoundError(
348 f"Bandcamp artist {prov_artist_id} top tracks search returned no results"
349 ) from error
350
351 except BandcampAPIError as error:
352 raise MediaNotFoundError(
353 f"Failed to get toptracks for artist {prov_artist_id}"
354 ) from error
355
356 return tracks[: self.top_tracks_limit]
357
358 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
359 """Return the content details for the given track."""
360 try:
361 # consider _client to avoid caching if the track urls become dynamic
362 track_ma = await self.get_track(item_id)
363 if not track_ma.metadata.links:
364 raise MediaNotFoundError(
365 f"No streaming links found for track {item_id}. Please report this"
366 )
367
368 link = next(iter(track_ma.metadata.links))
369 if not link:
370 raise MediaNotFoundError(
371 f"No streaming URL found for track {item_id}. Please report this"
372 )
373
374 streaming_url = link.url
375 if not streaming_url:
376 raise MediaNotFoundError(
377 f"No streaming URL found for track {item_id}: {streaming_url}"
378 )
379
380 return StreamDetails(
381 item_id=item_id,
382 provider=self.instance_id,
383 audio_format=AudioFormat(),
384 stream_type=StreamType.HTTP,
385 media_type=media_type,
386 path=streaming_url,
387 can_seek=True,
388 allow_seek=True,
389 )
390
391 except BandcampNotFoundError as error:
392 raise MediaNotFoundError(
393 f"Bandcamp stream details search for {media_type} {item_id} returned no results"
394 ) from error
395 except BandcampAPIError as error:
396 raise MediaNotFoundError(
397 f"Stream details not available for {media_type} {item_id}"
398 ) from error
399