/
/
/
1"""Main Spotify provider implementation."""
2
3from __future__ import annotations
4
5import os
6import time
7from collections.abc import AsyncGenerator
8from typing import Any, cast
9
10import aiohttp
11from music_assistant_models.enums import (
12 ContentType,
13 ImageType,
14 MediaType,
15 ProviderFeature,
16 StreamType,
17)
18from music_assistant_models.errors import (
19 LoginFailed,
20 MediaNotFoundError,
21 ProviderUnavailableError,
22 ResourceTemporarilyUnavailable,
23 UnsupportedFeaturedException,
24)
25from music_assistant_models.media_items import (
26 Album,
27 Artist,
28 Audiobook,
29 AudioFormat,
30 MediaItemImage,
31 MediaItemType,
32 Playlist,
33 Podcast,
34 PodcastEpisode,
35 ProviderMapping,
36 SearchResults,
37 Track,
38 UniqueList,
39)
40from music_assistant_models.media_items.metadata import MediaItemChapter
41from music_assistant_models.streamdetails import StreamDetails
42
43from music_assistant.controllers.cache import use_cache
44from music_assistant.helpers.app_vars import app_var # type: ignore[attr-defined]
45from music_assistant.helpers.json import json_loads
46from music_assistant.helpers.process import check_output
47from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
48from music_assistant.helpers.util import lock
49from music_assistant.models.music_provider import MusicProvider
50
51from .constants import (
52 CONF_CLIENT_ID,
53 CONF_REFRESH_TOKEN_DEV,
54 CONF_REFRESH_TOKEN_GLOBAL,
55 CONF_SYNC_AUDIOBOOK_PROGRESS,
56 CONF_SYNC_PODCAST_PROGRESS,
57 LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX,
58)
59from .helpers import get_librespot_binary, get_spotify_token
60from .parsers import (
61 parse_album,
62 parse_artist,
63 parse_audiobook,
64 parse_playlist,
65 parse_podcast,
66 parse_podcast_episode,
67 parse_track,
68)
69from .streaming import LibrespotStreamer
70
71
72class NotModifiedError(Exception):
73 """Exception raised when a resource has not been modified."""
74
75
76class SpotifyProvider(MusicProvider):
77 """Implementation of a Spotify MusicProvider."""
78
79 # Global session (MA's client ID) - always present
80 _auth_info_global: dict[str, Any] | None = None
81 # Developer session (user's custom client ID) - optional
82 _auth_info_dev: dict[str, Any] | None = None
83 _sp_user: dict[str, Any] | None = None
84 _librespot_bin: str | None = None
85 _audiobooks_supported = False
86 # True if user has configured a custom client ID with valid authentication
87 dev_session_active: bool = False
88 throttler: ThrottlerManager
89
90 async def handle_async_init(self) -> None:
91 """Handle async initialization of the provider."""
92 self.cache_dir = os.path.join(self.mass.cache_path, self.instance_id)
93 # Default throttler for global session (heavy rate limited)
94 self.throttler = ThrottlerManager(rate_limit=1, period=2)
95 self.streamer = LibrespotStreamer(self)
96
97 # check if we have a librespot binary for this arch
98 self._librespot_bin = await get_librespot_binary()
99 # try login which will raise if it fails (logs in global session)
100 await self.login()
101
102 # Check if user has a custom client ID with valid dev token
103 client_id = self.config.get_value(CONF_CLIENT_ID)
104 dev_token = self.config.get_value(CONF_REFRESH_TOKEN_DEV)
105
106 if client_id and dev_token and self._sp_user:
107 await self.login_dev()
108 # Verify user matches
109 userinfo = await self._get_data("me", use_global_session=False)
110 if userinfo["id"] != self._sp_user["id"]:
111 raise LoginFailed(
112 "Developer session must use the same Spotify account as the main session."
113 )
114 # loosen the throttler when a custom client id is used
115 self.throttler = ThrottlerManager(rate_limit=45, period=30)
116 self.dev_session_active = True
117 self.logger.info("Developer Spotify session active.")
118
119 self._audiobooks_supported = await self._test_audiobook_support()
120 if not self._audiobooks_supported:
121 self.logger.info(
122 "Audiobook support disabled: Audiobooks are not available in your region. "
123 "See https://support.spotify.com/us/authors/article/audiobooks-availability/ "
124 "for supported countries."
125 )
126
127 @property
128 def audiobooks_supported(self) -> bool:
129 """Check if audiobooks are supported for this user/region."""
130 return self._audiobooks_supported
131
132 @property
133 def audiobook_progress_sync_enabled(self) -> bool:
134 """Check if audiobook progress sync is enabled."""
135 return bool(self.config.get_value(CONF_SYNC_AUDIOBOOK_PROGRESS, False))
136
137 @property
138 def podcast_progress_sync_enabled(self) -> bool:
139 """Check if played status sync is enabled."""
140 value = self.config.get_value(CONF_SYNC_PODCAST_PROGRESS, True)
141 return bool(value) if value is not None else True
142
143 @property
144 def supported_features(self) -> set[ProviderFeature]:
145 """Return the features supported by this Provider."""
146 features = self._supported_features.copy()
147 # Add audiobook features if enabled
148 if self.audiobooks_supported:
149 features.add(ProviderFeature.LIBRARY_AUDIOBOOKS)
150 features.add(ProviderFeature.LIBRARY_AUDIOBOOKS_EDIT)
151 return features
152
153 @property
154 def instance_name_postfix(self) -> str | None:
155 """Return a (default) instance name postfix for this provider instance."""
156 if self._sp_user:
157 return str(self._sp_user["display_name"])
158 return None
159
160 ## Library retrieval methods (generators)
161 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
162 """Retrieve library artists from spotify."""
163 endpoint = "me/following"
164 while True:
165 spotify_artists = await self._get_data(
166 endpoint,
167 type="artist",
168 limit=50,
169 )
170 for item in spotify_artists["artists"]["items"]:
171 if item and item["id"]:
172 yield parse_artist(item, self)
173 if spotify_artists["artists"]["next"]:
174 endpoint = spotify_artists["artists"]["next"]
175 endpoint = endpoint.replace("https://api.spotify.com/v1/", "")
176 else:
177 break
178
179 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
180 """Retrieve library albums from the provider."""
181 async for item in self._get_all_items("me/albums"):
182 if item["album"] and item["album"]["id"]:
183 yield parse_album(item["album"], self)
184
185 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
186 """Retrieve library tracks from the provider."""
187 async for item in self._get_all_items("me/tracks"):
188 if item and item["track"]["id"]:
189 yield parse_track(item["track"], self)
190
191 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
192 """Retrieve library podcasts from spotify."""
193 async for item in self._get_all_items("me/shows"):
194 if item["show"] and item["show"]["id"]:
195 show_obj = item["show"]
196 # Filter out audiobooks - they have a distinctive description format
197 description = show_obj.get("description", "")
198 if description.startswith("Author(s):") and "Narrator(s):" in description:
199 continue
200 yield parse_podcast(show_obj, self)
201
202 async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
203 """Retrieve library audiobooks from spotify."""
204 if not self.audiobooks_supported:
205 return
206 async for item in self._get_all_items("me/audiobooks"):
207 if item and item["id"]:
208 # Parse the basic audiobook
209 audiobook = parse_audiobook(item, self)
210 # Add chapters from Spotify API data
211 await self._add_audiobook_chapters(audiobook)
212 yield audiobook
213
214 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
215 """Retrieve playlists from the provider.
216
217 Note: We use the global session here because playlists like "Daily Mix"
218 are only returned when using the non-dev (global) token.
219 """
220 yield await self._get_liked_songs_playlist()
221 async for item in self._get_all_items("me/playlists", use_global_session=True):
222 if item and item["id"]:
223 yield parse_playlist(item, self)
224
225 @use_cache()
226 async def search(
227 self, search_query: str, media_types: list[MediaType] | None = None, limit: int = 5
228 ) -> SearchResults:
229 """Perform search on musicprovider.
230
231 :param search_query: Search query.
232 :param media_types: A list of media_types to include.
233 :param limit: Number of items to return in the search (per type).
234 """
235 searchresult = SearchResults()
236 if media_types is None:
237 return searchresult
238
239 searchtype = self._build_search_types(media_types)
240 if not searchtype:
241 return searchresult
242
243 search_query = search_query.replace("'", "")
244 offset = 0
245 page_limit = min(limit, 50)
246
247 while True:
248 api_result = await self._get_data(
249 "search", q=search_query, type=searchtype, limit=page_limit, offset=offset
250 )
251 items_received = self._process_search_results(api_result, searchresult)
252
253 offset += page_limit
254 if offset >= limit or items_received < page_limit:
255 break
256
257 return searchresult
258
259 def _build_search_types(self, media_types: list[MediaType]) -> str:
260 """Build comma-separated search types string from media types."""
261 searchtypes = []
262 if MediaType.ARTIST in media_types:
263 searchtypes.append("artist")
264 if MediaType.ALBUM in media_types:
265 searchtypes.append("album")
266 if MediaType.TRACK in media_types:
267 searchtypes.append("track")
268 if MediaType.PLAYLIST in media_types:
269 searchtypes.append("playlist")
270 if MediaType.PODCAST in media_types:
271 searchtypes.append("show")
272 if MediaType.AUDIOBOOK in media_types and self.audiobooks_supported:
273 searchtypes.append("audiobook")
274 return ",".join(searchtypes)
275
276 def _process_search_results(
277 self, api_result: dict[str, Any], searchresult: SearchResults
278 ) -> int:
279 """Process API search results and update searchresult object.
280
281 Returns the total number of items received.
282 """
283 items_received = 0
284
285 if "artists" in api_result:
286 artists = [
287 parse_artist(item, self)
288 for item in api_result["artists"]["items"]
289 if (item and item["id"] and item["name"])
290 ]
291 searchresult.artists = [*searchresult.artists, *artists]
292 items_received += len(api_result["artists"]["items"])
293
294 if "albums" in api_result:
295 albums = [
296 parse_album(item, self)
297 for item in api_result["albums"]["items"]
298 if (item and item["id"])
299 ]
300 searchresult.albums = [*searchresult.albums, *albums]
301 items_received += len(api_result["albums"]["items"])
302
303 if "tracks" in api_result:
304 tracks = [
305 parse_track(item, self)
306 for item in api_result["tracks"]["items"]
307 if (item and item["id"])
308 ]
309 searchresult.tracks = [*searchresult.tracks, *tracks]
310 items_received += len(api_result["tracks"]["items"])
311
312 if "playlists" in api_result:
313 playlists = [
314 parse_playlist(item, self)
315 for item in api_result["playlists"]["items"]
316 if (item and item["id"])
317 ]
318 searchresult.playlists = [*searchresult.playlists, *playlists]
319 items_received += len(api_result["playlists"]["items"])
320
321 if "shows" in api_result:
322 podcasts = []
323 for item in api_result["shows"]["items"]:
324 if not (item and item["id"]):
325 continue
326 # Filter out audiobooks - they have a distinctive description format
327 description = item.get("description", "")
328 if description.startswith("Author(s):") and "Narrator(s):" in description:
329 continue
330 podcasts.append(parse_podcast(item, self))
331 searchresult.podcasts = [*searchresult.podcasts, *podcasts]
332 items_received += len(api_result["shows"]["items"])
333
334 if "audiobooks" in api_result and self.audiobooks_supported:
335 audiobooks = [
336 parse_audiobook(item, self)
337 for item in api_result["audiobooks"]["items"]
338 if (item and item["id"])
339 ]
340 searchresult.audiobooks = [*searchresult.audiobooks, *audiobooks]
341 items_received += len(api_result["audiobooks"]["items"])
342
343 return items_received
344
345 @use_cache()
346 async def get_artist(self, prov_artist_id: str) -> Artist:
347 """Get full artist details by id."""
348 artist_obj = await self._get_data(f"artists/{prov_artist_id}")
349 return parse_artist(artist_obj, self)
350
351 @use_cache()
352 async def get_album(self, prov_album_id: str) -> Album:
353 """Get full album details by id."""
354 album_obj = await self._get_data(f"albums/{prov_album_id}")
355 return parse_album(album_obj, self)
356
357 @use_cache()
358 async def get_track(self, prov_track_id: str) -> Track:
359 """Get full track details by id."""
360 track_obj = await self._get_data(f"tracks/{prov_track_id}")
361 return parse_track(track_obj, self)
362
363 @use_cache()
364 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
365 """Get full playlist details by id."""
366 if prov_playlist_id == self._get_liked_songs_playlist_id():
367 return await self._get_liked_songs_playlist()
368
369 # Check cache to see if this playlist requires global token
370 use_global = await self._playlist_requires_global_token(prov_playlist_id)
371 if use_global:
372 playlist_obj = await self._get_data(
373 f"playlists/{prov_playlist_id}", use_global_session=True
374 )
375 return parse_playlist(playlist_obj, self)
376
377 # Try with dev token first (if available), fallback to global on 400 error
378 # Some playlists like Spotify-owned (Daily Mix) or Liked Songs only work with global token
379 try:
380 playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}")
381 return parse_playlist(playlist_obj, self)
382 except MediaNotFoundError:
383 if self.dev_session_active:
384 # Remember that this playlist requires global token
385 await self._set_playlist_requires_global_token(prov_playlist_id)
386 playlist_obj = await self._get_data(
387 f"playlists/{prov_playlist_id}", use_global_session=True
388 )
389 return parse_playlist(playlist_obj, self)
390 raise
391
392 @use_cache()
393 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
394 """Get full podcast details by id."""
395 podcast_obj = await self._get_data(f"shows/{prov_podcast_id}")
396 if not podcast_obj:
397 raise MediaNotFoundError(f"Podcast not found: {prov_podcast_id}")
398 return parse_podcast(podcast_obj, self)
399
400 @use_cache()
401 async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
402 """Get full audiobook details by id."""
403 if not self.audiobooks_supported:
404 raise UnsupportedFeaturedException("Audiobooks are not supported with this account")
405
406 audiobook_obj = await self._get_data(f"audiobooks/{prov_audiobook_id}")
407 if not audiobook_obj:
408 raise MediaNotFoundError(f"Audiobook not found: {prov_audiobook_id}")
409
410 # Parse basic audiobook without chapters first
411 audiobook = parse_audiobook(audiobook_obj, self)
412
413 # Add chapters from Spotify API data
414 await self._add_audiobook_chapters(audiobook)
415
416 # Note: Resume position will be handled by MA's internal system
417 # which calls get_resume_position() when needed
418
419 return audiobook
420
421 async def get_podcast_episodes(
422 self, prov_podcast_id: str
423 ) -> AsyncGenerator[PodcastEpisode, None]:
424 """Get all podcast episodes."""
425 # Get podcast object for context if available
426 podcast = await self.mass.music.podcasts.get_library_item_by_prov_id(
427 prov_podcast_id, self.instance_id
428 )
429 podcast = await self.get_podcast(prov_podcast_id)
430
431 # Get (cached) episode data
432 episodes_data = await self._get_podcast_episodes_data(prov_podcast_id)
433
434 # Parse and yield episodes with position
435 for idx, episode_data in enumerate(episodes_data):
436 episode = parse_podcast_episode(episode_data, self, podcast)
437 episode.position = idx + 1
438
439 # Set played status if sync is enabled and resume data exists
440 if self.podcast_progress_sync_enabled and "resume_point" in episode_data:
441 resume_point = episode_data["resume_point"]
442 fully_played = resume_point.get("fully_played", False)
443 position_ms = resume_point.get("resume_position_ms", 0)
444
445 episode.fully_played = fully_played if fully_played else None
446 episode.resume_position_ms = position_ms if position_ms > 0 else None
447
448 yield episode
449
450 @use_cache(86400) # 24 hours
451 async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
452 """Get full podcast episode details by id."""
453 episode_obj = await self._get_data(f"episodes/{prov_episode_id}", market="from_token")
454 if not episode_obj:
455 raise MediaNotFoundError(f"Episode not found: {prov_episode_id}")
456 return parse_podcast_episode(episode_obj, self)
457
458 async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
459 """Get resume position for episode/audiobook from Spotify."""
460 if media_type == MediaType.PODCAST_EPISODE:
461 if not self.podcast_progress_sync_enabled:
462 raise NotImplementedError("Spotify podcast resume sync disabled in settings")
463
464 try:
465 episode_obj = await self._get_data(f"episodes/{item_id}", market="from_token")
466 except MediaNotFoundError:
467 raise NotImplementedError("Episode not found on Spotify")
468 except (ResourceTemporarilyUnavailable, aiohttp.ClientError) as e:
469 self.logger.debug(f"Error fetching episode {item_id}: {e}")
470 raise NotImplementedError("Unable to fetch episode data from Spotify")
471
472 if (
473 not episode_obj
474 or "resume_point" not in episode_obj
475 or not episode_obj["resume_point"]
476 ):
477 raise NotImplementedError("No resume point data from Spotify")
478
479 resume_point = episode_obj["resume_point"]
480 fully_played = resume_point.get("fully_played", False)
481 position_ms = resume_point.get("resume_position_ms", 0)
482 return fully_played, position_ms
483
484 if media_type == MediaType.AUDIOBOOK:
485 if not self.audiobooks_supported:
486 raise NotImplementedError("Audiobook support is disabled")
487 if not self.audiobook_progress_sync_enabled:
488 raise NotImplementedError("Spotify audiobook resume sync disabled in settings")
489
490 try:
491 chapters_data = await self._get_audiobook_chapters_data(item_id)
492 if not chapters_data:
493 raise NotImplementedError("No chapters data available")
494
495 total_position_ms = 0
496 fully_played = True
497
498 for chapter in chapters_data:
499 resume_point = chapter.get("resume_point", {})
500 chapter_fully_played = resume_point.get("fully_played", False)
501 chapter_position_ms = resume_point.get("resume_position_ms", 0)
502
503 if chapter_fully_played:
504 total_position_ms += chapter.get("duration_ms", 0)
505 elif chapter_position_ms > 0:
506 total_position_ms += chapter_position_ms
507 fully_played = False
508 break
509 else:
510 fully_played = False
511 break
512
513 return fully_played, total_position_ms
514
515 except (MediaNotFoundError, ResourceTemporarilyUnavailable, aiohttp.ClientError) as e:
516 self.logger.debug(f"Failed to get audiobook resume position for {item_id}: {e}")
517 raise NotImplementedError("Unable to get audiobook resume position from Spotify")
518
519 else:
520 raise NotImplementedError(f"Resume position not supported for {media_type}")
521
522 async def on_played(
523 self,
524 media_type: MediaType,
525 prov_item_id: str,
526 fully_played: bool,
527 position: int,
528 media_item: MediaItemType,
529 is_playing: bool = False,
530 ) -> None:
531 """
532 Call when an episode/audiobook is played in MA.
533
534 MA automatically handles internal position tracking - this method is for
535 provider-specific actions like syncing to external services.
536 """
537 if media_type == MediaType.PODCAST_EPISODE:
538 if not isinstance(media_item, PodcastEpisode):
539 return
540
541 # Log the playback for monitoring/debugging
542 safe_position = position or 0
543 if media_item.duration > 0:
544 completion_percentage = (safe_position / media_item.duration) * 100
545 else:
546 completion_percentage = 0
547
548 self.logger.debug(
549 f"Episode played: {prov_item_id} at {safe_position}s "
550 f"({completion_percentage:.1f}%, fully_played: {fully_played})"
551 )
552
553 # Note: No API exists to sync playback position back to Spotify for episodes
554 # MA handles all internal position tracking automatically
555
556 elif media_type == MediaType.AUDIOBOOK:
557 if not isinstance(media_item, Audiobook):
558 return
559
560 # Log the playback for monitoring/debugging
561 safe_position = position or 0
562 if media_item.duration > 0:
563 completion_percentage = (safe_position / media_item.duration) * 100
564 else:
565 completion_percentage = 0
566
567 self.logger.debug(
568 f"Audiobook played: {prov_item_id} at {safe_position}s "
569 f"({completion_percentage:.1f}%, fully_played: {fully_played})"
570 )
571
572 # Note: No API exists to sync playback position back to Spotify for audiobooks
573 # MA handles all internal position tracking automatically
574
575 # The resume position will be automatically updated by MA's internal tracking
576 # and will be retrieved via get_audiobook() which combines MA + Spotify positions
577
578 @use_cache()
579 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
580 """Get all album tracks for given album id."""
581 return [
582 parse_track(item, self)
583 async for item in self._get_all_items(f"albums/{prov_album_id}/tracks")
584 if item["id"]
585 ]
586
587 @use_cache(2600 * 3) # 3 hours
588 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
589 """Get playlist tracks."""
590 is_liked_songs = prov_playlist_id == self._get_liked_songs_playlist_id()
591 uri = "me/tracks" if is_liked_songs else f"playlists/{prov_playlist_id}/tracks"
592
593 # Liked songs always require global session
594 # For other playlists, call get_playlist first to trigger the fallback logic
595 # and populate the cache for which token to use
596 if is_liked_songs:
597 use_global = True
598 else:
599 # This call is cached and will determine/cache if global token is needed
600 await self.get_playlist(prov_playlist_id)
601 use_global = await self._playlist_requires_global_token(prov_playlist_id)
602
603 result: list[Track] = []
604 page_size = 50
605 offset = page * page_size
606
607 # Get etag for caching
608 cache_checksum = await self._get_etag(uri, limit=1, offset=0, use_global_session=use_global)
609
610 spotify_result = await self._get_data_with_caching(
611 uri, cache_checksum, limit=page_size, offset=offset, use_global_session=use_global
612 )
613 total = spotify_result.get("total", 0)
614 for index, item in enumerate(spotify_result["items"], 1):
615 # Spotify wraps/recycles items for offsets beyond the playlist size,
616 # so we need to break when we've reached the total.
617 if (offset + index) > total:
618 break
619 if not (item and item["track"] and item["track"]["id"]):
620 continue
621 track = parse_track(item["track"], self)
622 track.position = offset + index
623 result.append(track)
624 return result
625
626 @use_cache(86400 * 14) # 14 days
627 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
628 """Get a list of all albums for the given artist."""
629 return [
630 parse_album(item, self)
631 async for item in self._get_all_items(
632 f"artists/{prov_artist_id}/albums?include_groups=album,single,compilation"
633 )
634 if (item and item["id"])
635 ]
636
637 @use_cache(86400 * 14) # 14 days
638 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
639 """Get a list of 10 most popular tracks for the given artist."""
640 artist = await self.get_artist(prov_artist_id)
641 endpoint = f"artists/{prov_artist_id}/top-tracks"
642 items = await self._get_data(endpoint)
643 return [
644 parse_track(item, self, artist=artist)
645 for item in items["tracks"]
646 if (item and item["id"])
647 ]
648
649 async def library_add(self, item: MediaItemType) -> bool:
650 """Add item to library."""
651 if item.media_type == MediaType.ARTIST:
652 await self._put_data("me/following", {"ids": [item.item_id]}, type="artist")
653 elif item.media_type == MediaType.ALBUM:
654 await self._put_data("me/albums", {"ids": [item.item_id]})
655 elif item.media_type == MediaType.TRACK:
656 await self._put_data("me/tracks", {"ids": [item.item_id]})
657 elif item.media_type == MediaType.PLAYLIST:
658 await self._put_data(f"playlists/{item.item_id}/followers", data={"public": False})
659 elif item.media_type == MediaType.PODCAST:
660 await self._put_data("me/shows", ids=item.item_id)
661 elif item.media_type == MediaType.AUDIOBOOK and self.audiobooks_supported:
662 await self._put_data("me/audiobooks", ids=item.item_id)
663 return True
664
665 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
666 """Remove item from library."""
667 if media_type == MediaType.ARTIST:
668 await self._delete_data("me/following", {"ids": [prov_item_id]}, type="artist")
669 elif media_type == MediaType.ALBUM:
670 await self._delete_data("me/albums", {"ids": [prov_item_id]})
671 elif media_type == MediaType.TRACK:
672 await self._delete_data("me/tracks", {"ids": [prov_item_id]})
673 elif media_type == MediaType.PLAYLIST:
674 await self._delete_data(f"playlists/{prov_item_id}/followers")
675 elif media_type == MediaType.PODCAST:
676 await self._delete_data("me/shows", ids=prov_item_id)
677 elif media_type == MediaType.AUDIOBOOK and self.audiobooks_supported:
678 await self._delete_data("me/audiobooks", ids=prov_item_id)
679 return True
680
681 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
682 """Add track(s) to playlist."""
683 track_uris = [f"spotify:track:{track_id}" for track_id in prov_track_ids]
684 data = {"uris": track_uris}
685 await self._post_data(f"playlists/{prov_playlist_id}/tracks", data=data)
686
687 async def remove_playlist_tracks(
688 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
689 ) -> None:
690 """Remove track(s) from playlist."""
691 track_uris = []
692 for pos in positions_to_remove:
693 uri = f"playlists/{prov_playlist_id}/tracks"
694 spotify_result = await self._get_data(uri, limit=1, offset=pos - 1)
695 for item in spotify_result["items"]:
696 if not (item and item["track"] and item["track"]["id"]):
697 continue
698 track_uris.append({"uri": f"spotify:track:{item['track']['id']}"})
699 data = {"tracks": track_uris}
700 await self._delete_data(f"playlists/{prov_playlist_id}/tracks", data=data)
701
702 async def create_playlist(self, name: str) -> Playlist:
703 """Create a new playlist on provider with given name."""
704 if self._sp_user is None:
705 raise LoginFailed("User info not available - not logged in")
706 data = {"name": name, "public": False}
707 new_playlist = await self._post_data(f"users/{self._sp_user['id']}/playlists", data=data)
708 self._fix_create_playlist_api_bug(new_playlist)
709 return parse_playlist(new_playlist, self)
710
711 @use_cache(86400 * 14) # 14 days
712 async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
713 """Retrieve a dynamic list of tracks based on the provided item."""
714 # Recommendations endpoint is only available on global session (not developer API)
715 # https://developer.spotify.com/blog/2024-11-27-changes-to-the-web-api
716 endpoint = "recommendations"
717 items = await self._get_data(
718 endpoint, seed_tracks=prov_track_id, limit=limit, use_global_session=True
719 )
720 return [parse_track(item, self) for item in items["tracks"] if (item and item["id"])]
721
722 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
723 """Return content details for the given track/episode/audiobook when it will be streamed."""
724 if media_type == MediaType.AUDIOBOOK and self.audiobooks_supported:
725 chapters_data = await self._get_audiobook_chapters_data(item_id)
726 if not chapters_data:
727 raise MediaNotFoundError(f"No chapters found for audiobook {item_id}")
728
729 # Calculate total duration and convert to seconds for StreamDetails
730 total_duration_ms = sum(chapter.get("duration_ms", 0) for chapter in chapters_data)
731 duration_seconds = total_duration_ms // 1000
732
733 # Create chapter URIs for streaming
734 chapter_uris = []
735 for chapter in chapters_data:
736 chapter_id = chapter["id"]
737 chapter_uri = f"spotify://episode:{chapter_id}"
738 chapter_uris.append(chapter_uri)
739
740 return StreamDetails(
741 item_id=item_id,
742 provider=self.instance_id,
743 media_type=MediaType.AUDIOBOOK,
744 audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
745 stream_type=StreamType.CUSTOM,
746 allow_seek=True,
747 can_seek=True,
748 duration=duration_seconds,
749 data={"chapters": chapter_uris, "chapters_data": chapters_data},
750 )
751
752 # For all other media types (tracks, podcast episodes)
753 return StreamDetails(
754 item_id=item_id,
755 provider=self.instance_id,
756 media_type=media_type,
757 audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
758 stream_type=StreamType.CUSTOM,
759 allow_seek=True,
760 can_seek=True,
761 )
762
763 async def get_audio_stream(
764 self, streamdetails: StreamDetails, seek_position: int = 0
765 ) -> AsyncGenerator[bytes, None]:
766 """Get audio stream from Spotify via librespot."""
767 if streamdetails.media_type == MediaType.AUDIOBOOK and isinstance(streamdetails.data, dict):
768 chapter_uris = streamdetails.data.get("chapters", [])
769 chapters_data = streamdetails.data.get("chapters_data", [])
770
771 # Calculate which chapter to start from based on seek_position
772 seek_position_ms = seek_position * 1000
773 current_seek_ms = seek_position_ms
774 start_chapter = 0
775
776 if seek_position > 0 and chapters_data:
777 accumulated_duration_ms = 0
778
779 for i, chapter_data in enumerate(chapters_data):
780 chapter_duration_ms = chapter_data.get("duration_ms", 0)
781
782 if accumulated_duration_ms + chapter_duration_ms > seek_position_ms:
783 start_chapter = i
784 current_seek_ms = seek_position_ms - accumulated_duration_ms
785 break
786 accumulated_duration_ms += chapter_duration_ms
787 else:
788 start_chapter = len(chapter_uris) - 1
789 current_seek_ms = 0
790
791 # Convert back to seconds for librespot
792 current_seek_seconds = int(current_seek_ms // 1000)
793
794 # Stream chapters starting from the calculated position
795 for i in range(start_chapter, len(chapter_uris)):
796 chapter_uri = chapter_uris[i]
797 chapter_seek = current_seek_seconds if i == start_chapter else 0
798
799 try:
800 async for chunk in self.streamer.stream_spotify_uri(chapter_uri, chapter_seek):
801 yield chunk
802 except Exception as e:
803 self.logger.error(f"Chapter {i + 1} streaming failed: {e}")
804 continue
805 else:
806 # Handle normal tracks and podcast episodes
807 async for chunk in self.streamer.get_audio_stream(streamdetails, seek_position):
808 yield chunk
809
810 @lock
811 async def login(self, force_refresh: bool = False) -> dict[str, Any]:
812 """Log-in Spotify global session and return Auth/token info.
813
814 This uses MA's global client ID which has full API access but heavy rate limits.
815 """
816 # return existing token if we have one in memory
817 if (
818 not force_refresh
819 and self._auth_info_global
820 and (self._auth_info_global["expires_at"] > (time.time() - 600))
821 ):
822 return self._auth_info_global
823 # request new access token using the refresh token
824 if not (refresh_token := self.config.get_value(CONF_REFRESH_TOKEN_GLOBAL)):
825 raise LoginFailed("Authentication required")
826
827 try:
828 auth_info = await get_spotify_token(
829 self.mass.http_session,
830 app_var(2), # Always use MA's global client ID
831 cast("str", refresh_token),
832 "global",
833 )
834 self.logger.debug("Successfully refreshed global access token")
835 except LoginFailed as err:
836 if "revoked" in str(err):
837 # clear refresh token if it's invalid
838 self._update_config_value(CONF_REFRESH_TOKEN_GLOBAL, None)
839 if self.available:
840 self.unload_with_error(str(err))
841 elif self.available:
842 self.mass.create_task(
843 self.mass.unload_provider_with_error(self.instance_id, str(err))
844 )
845 raise
846
847 # make sure that our updated creds get stored in memory + config
848 self._auth_info_global = auth_info
849 self._update_config_value(
850 CONF_REFRESH_TOKEN_GLOBAL, auth_info["refresh_token"], encrypted=True
851 )
852
853 # Setup librespot with global token only if dev token is not configured
854 # (if dev token exists, librespot will be set up in login_dev instead)
855 if not self.config.get_value(CONF_REFRESH_TOKEN_DEV):
856 await self._setup_librespot_auth(auth_info["access_token"])
857
858 # get logged-in user info
859 if not self._sp_user:
860 self._sp_user = userinfo = await self._get_data(
861 "me", auth_info=auth_info, use_global_session=True
862 )
863 self.mass.metadata.set_default_preferred_language(userinfo["country"])
864 self.logger.info("Successfully logged in to Spotify as %s", userinfo["display_name"])
865 return auth_info
866
867 @lock
868 async def login_dev(self, force_refresh: bool = False) -> dict[str, Any]:
869 """Log-in Spotify developer session and return Auth/token info.
870
871 This uses the user's custom client ID which has less rate limits but limited API access.
872 """
873 # return existing token if we have one in memory
874 if (
875 not force_refresh
876 and self._auth_info_dev
877 and (self._auth_info_dev["expires_at"] > (time.time() - 600))
878 ):
879 return self._auth_info_dev
880 # request new access token using the refresh token
881 refresh_token = self.config.get_value(CONF_REFRESH_TOKEN_DEV)
882 client_id = self.config.get_value(CONF_CLIENT_ID)
883 if not refresh_token or not client_id:
884 raise LoginFailed("Developer authentication not configured")
885
886 try:
887 auth_info = await get_spotify_token(
888 self.mass.http_session,
889 cast("str", client_id),
890 cast("str", refresh_token),
891 "developer",
892 )
893 self.logger.debug("Successfully refreshed developer access token")
894 except LoginFailed as err:
895 if "revoked" in str(err):
896 # clear refresh token if it's invalid
897 self._update_config_value(CONF_REFRESH_TOKEN_DEV, None)
898 self._update_config_value(CONF_CLIENT_ID, None)
899 # Don't unload - we can still use the global session
900 self.dev_session_active = False
901 self.logger.warning(str(err))
902 raise
903
904 # make sure that our updated creds get stored in memory + config
905 self._auth_info_dev = auth_info
906 self._update_config_value(
907 CONF_REFRESH_TOKEN_DEV, auth_info["refresh_token"], encrypted=True
908 )
909
910 # Setup librespot with dev token (preferred over global token)
911 await self._setup_librespot_auth(auth_info["access_token"])
912
913 self.logger.info("Successfully logged in to Spotify developer session")
914 return auth_info
915
916 async def _setup_librespot_auth(self, access_token: str) -> None:
917 """Set up librespot authentication with the given access token.
918
919 :param access_token: Spotify access token to use for librespot authentication.
920 """
921 if self._librespot_bin is None:
922 raise LoginFailed("Librespot binary not available")
923
924 args = [
925 self._librespot_bin,
926 "--cache",
927 self.cache_dir,
928 "--check-auth",
929 ]
930 ret_code, stdout = await check_output(*args)
931 if ret_code != 0:
932 # cached librespot creds are invalid, re-authenticate
933 # we can use the check-token option to send a new token to librespot
934 # librespot will then get its own token from spotify (somehow) and cache that.
935 args += [
936 "--access-token",
937 access_token,
938 ]
939 ret_code, stdout = await check_output(*args)
940 if ret_code != 0:
941 # this should not happen, but guard it just in case
942 err_str = stdout.decode("utf-8").strip()
943 raise LoginFailed(f"Failed to verify credentials on Librespot: {err_str}")
944
945 async def _get_auth_info(self, use_global_session: bool = False) -> dict[str, Any]:
946 """Get auth info for API requests, preferring dev session if available.
947
948 :param use_global_session: Force use of global session (for features not available on dev).
949 """
950 if use_global_session or not self.dev_session_active:
951 return await self.login()
952
953 # Try dev session first
954 try:
955 return await self.login_dev()
956 except LoginFailed:
957 # Fall back to global session
958 self.logger.debug("Falling back to global session after dev session failure")
959 return await self.login()
960
961 def _get_liked_songs_playlist_id(self) -> str:
962 return f"{LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX}-{self.instance_id}"
963
964 async def _get_liked_songs_playlist(self) -> Playlist:
965 if self._sp_user is None:
966 raise LoginFailed("User info not available - not logged in")
967
968 liked_songs = Playlist(
969 item_id=self._get_liked_songs_playlist_id(),
970 provider=self.instance_id,
971 name=f"Liked Songs {self._sp_user['display_name']}", # TODO to be translated
972 owner=self._sp_user["display_name"],
973 provider_mappings={
974 ProviderMapping(
975 item_id=self._get_liked_songs_playlist_id(),
976 provider_domain=self.domain,
977 provider_instance=self.instance_id,
978 url="https://open.spotify.com/collection/tracks",
979 is_unique=True, # liked songs is user-specific
980 )
981 },
982 )
983
984 liked_songs.is_editable = False # TODO Editing requires special endpoints
985
986 # Add image to the playlist metadata
987 image = MediaItemImage(
988 type=ImageType.THUMB,
989 path="https://misc.scdn.co/liked-songs/liked-songs-64.png",
990 provider=self.instance_id,
991 remotely_accessible=True,
992 )
993 if liked_songs.metadata.images is None:
994 liked_songs.metadata.images = UniqueList([image])
995 else:
996 liked_songs.metadata.add_image(image)
997
998 return liked_songs
999
1000 async def _playlist_requires_global_token(self, prov_playlist_id: str) -> bool:
1001 """Check if a playlist requires global token (cached).
1002
1003 :param prov_playlist_id: The Spotify playlist ID.
1004 :returns: True if the playlist requires global token.
1005 """
1006 cache_key = f"playlist_global_token_{prov_playlist_id}"
1007 return bool(await self.mass.cache.get(cache_key, provider=self.instance_id))
1008
1009 async def _set_playlist_requires_global_token(self, prov_playlist_id: str) -> None:
1010 """Mark a playlist as requiring global token in cache.
1011
1012 :param prov_playlist_id: The Spotify playlist ID.
1013 """
1014 cache_key = f"playlist_global_token_{prov_playlist_id}"
1015 # Cache for 90 days - playlist ownership doesn't change
1016 await self.mass.cache.set(cache_key, True, provider=self.instance_id, expiration=86400 * 90)
1017
1018 async def _add_audiobook_chapters(self, audiobook: Audiobook) -> None:
1019 """Add chapter metadata to an audiobook from Spotify API data."""
1020 try:
1021 chapters_data = await self._get_audiobook_chapters_data(audiobook.item_id)
1022 if chapters_data:
1023 chapters = []
1024 total_duration_seconds = 0.0
1025
1026 for idx, chapter in enumerate(chapters_data):
1027 duration_ms = chapter.get("duration_ms", 0)
1028 duration_seconds = duration_ms / 1000.0
1029
1030 chapter_obj = MediaItemChapter(
1031 position=idx + 1,
1032 name=chapter.get("name", f"Chapter {idx + 1}"),
1033 start=total_duration_seconds,
1034 end=total_duration_seconds + duration_seconds,
1035 )
1036 chapters.append(chapter_obj)
1037 total_duration_seconds += duration_seconds
1038
1039 audiobook.metadata.chapters = chapters
1040 audiobook.duration = int(total_duration_seconds)
1041
1042 except (MediaNotFoundError, ResourceTemporarilyUnavailable, ProviderUnavailableError) as e:
1043 self.logger.warning(f"Failed to get chapters for audiobook {audiobook.item_id}: {e}")
1044
1045 @use_cache(43200) # 12 hours - balances freshness with performance
1046 async def _get_podcast_episodes_data(self, prov_podcast_id: str) -> list[dict[str, Any]]:
1047 """Get raw episode data from Spotify API (cached).
1048
1049 Args:
1050 prov_podcast_id: Spotify podcast ID
1051
1052 Returns:
1053 List of episode data dictionaries
1054 """
1055 episodes_data: list[dict[str, Any]] = []
1056
1057 try:
1058 async for item in self._get_all_items(
1059 f"shows/{prov_podcast_id}/episodes", market="from_token"
1060 ):
1061 if item and item.get("id"):
1062 episodes_data.append(item)
1063 except MediaNotFoundError:
1064 self.logger.warning("Podcast %s not found", prov_podcast_id)
1065 return []
1066 except ResourceTemporarilyUnavailable as err:
1067 self.logger.warning(
1068 "Temporary error fetching episodes for %s: %s", prov_podcast_id, err
1069 )
1070 raise
1071
1072 return episodes_data
1073
1074 @use_cache(7200) # 2 hours - shorter cache for resume point data
1075 async def _get_audiobook_chapters_data(self, prov_audiobook_id: str) -> list[dict[str, Any]]:
1076 """Get raw chapter data from Spotify API (cached).
1077
1078 Args:
1079 prov_audiobook_id: Spotify audiobook ID
1080
1081 Returns:
1082 List of chapter data dictionaries
1083 """
1084 chapters_data: list[dict[str, Any]] = []
1085
1086 try:
1087 async for item in self._get_all_items(
1088 f"audiobooks/{prov_audiobook_id}/chapters", market="from_token"
1089 ):
1090 if item and item.get("id"):
1091 chapters_data.append(item)
1092 except MediaNotFoundError:
1093 self.logger.warning("Audiobook %s not found", prov_audiobook_id)
1094 return []
1095 except ResourceTemporarilyUnavailable as err:
1096 self.logger.warning(
1097 "Temporary error fetching chapters for %s: %s", prov_audiobook_id, err
1098 )
1099 raise
1100
1101 return chapters_data
1102
1103 async def _get_all_items(
1104 self, endpoint: str, key: str = "items", **kwargs: Any
1105 ) -> AsyncGenerator[dict[str, Any], None]:
1106 """Get all items from a paged list."""
1107 limit = 50
1108 offset = 0
1109 # do single request to get the etag (which we use as checksum for caching)
1110 cache_checksum = await self._get_etag(endpoint, limit=1, offset=0, **kwargs)
1111 while True:
1112 result = await self._get_data_with_caching(
1113 endpoint, cache_checksum=cache_checksum, limit=limit, offset=offset, **kwargs
1114 )
1115 offset += limit
1116 if not result or key not in result or not result[key]:
1117 break
1118 for item in result[key]:
1119 yield item
1120 if len(result[key]) < limit:
1121 break
1122
1123 async def _get_data_with_caching(
1124 self, endpoint: str, cache_checksum: str | None, **kwargs: Any
1125 ) -> dict[str, Any]:
1126 """Get data from api with caching."""
1127 cache_key_parts = [endpoint]
1128 for key in sorted(kwargs.keys()):
1129 cache_key_parts.append(f"{key}{kwargs[key]}")
1130 cache_key = ".".join(map(str, cache_key_parts))
1131 if cached := await self.mass.cache.get(
1132 cache_key, provider=self.instance_id, checksum=cache_checksum, allow_bypass=False
1133 ):
1134 return cast("dict[str, Any]", cached)
1135 result = await self._get_data(endpoint, **kwargs)
1136 await self.mass.cache.set(
1137 cache_key, result, provider=self.instance_id, checksum=cache_checksum
1138 )
1139 return result
1140
1141 @use_cache(120, allow_bypass=False) # short cache for etags (subsequent calls use cached data)
1142 async def _get_etag(self, endpoint: str, **kwargs: Any) -> str | None:
1143 """Get etag for api endpoint."""
1144 _res = await self._get_data(endpoint, **kwargs)
1145 return _res.get("etag")
1146
1147 @throttle_with_retries
1148 async def _get_data(self, endpoint: str, **kwargs: Any) -> dict[str, Any]:
1149 """Get data from api.
1150
1151 :param endpoint: API endpoint to call.
1152 :param use_global_session: Force use of global session (for features not available on dev).
1153 """
1154 url = f"https://api.spotify.com/v1/{endpoint}"
1155 kwargs["market"] = "from_token"
1156 kwargs["country"] = "from_token"
1157 use_global_session = kwargs.pop("use_global_session", False)
1158 if not (auth_info := kwargs.pop("auth_info", None)):
1159 auth_info = await self._get_auth_info(use_global_session=use_global_session)
1160 headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
1161 locale = self.mass.metadata.locale.replace("_", "-")
1162 language = locale.split("-")[0]
1163 headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
1164 self.logger.debug("handling get data %s with kwargs %s", url, kwargs)
1165 async with (
1166 self.mass.http_session.get(
1167 url,
1168 headers=headers,
1169 params=kwargs,
1170 timeout=aiohttp.ClientTimeout(total=120),
1171 ) as response,
1172 ):
1173 # handle spotify rate limiter
1174 if response.status == 429:
1175 backoff_time = int(response.headers["Retry-After"])
1176 raise ResourceTemporarilyUnavailable(
1177 "Spotify Rate Limiter", backoff_time=backoff_time
1178 )
1179 # handle temporary server error
1180 if response.status in (502, 503):
1181 raise ResourceTemporarilyUnavailable(backoff_time=30)
1182
1183 # handle token expired, raise ResourceTemporarilyUnavailable
1184 # so it will be retried (and the token refreshed)
1185 if response.status == 401:
1186 if use_global_session or not self.dev_session_active:
1187 self._auth_info_global = None
1188 else:
1189 self._auth_info_dev = None
1190 raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
1191
1192 # handle 404 not found, convert to MediaNotFoundError
1193 if response.status in (400, 404):
1194 raise MediaNotFoundError(f"{endpoint} not found")
1195 response.raise_for_status()
1196 result: dict[str, Any] = await response.json(loads=json_loads)
1197 if etag := response.headers.get("ETag"):
1198 result["etag"] = etag
1199 return result
1200
1201 @throttle_with_retries
1202 async def _delete_data(self, endpoint: str, data: Any = None, **kwargs: Any) -> None:
1203 """Delete data from api."""
1204 url = f"https://api.spotify.com/v1/{endpoint}"
1205 use_global_session = kwargs.pop("use_global_session", False)
1206 if not (auth_info := kwargs.pop("auth_info", None)):
1207 auth_info = await self._get_auth_info(use_global_session=use_global_session)
1208 headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
1209 async with self.mass.http_session.delete(
1210 url, headers=headers, params=kwargs, json=data, ssl=True
1211 ) as response:
1212 # handle spotify rate limiter
1213 if response.status == 429:
1214 backoff_time = int(response.headers["Retry-After"])
1215 raise ResourceTemporarilyUnavailable(
1216 "Spotify Rate Limiter", backoff_time=backoff_time
1217 )
1218 # handle token expired, raise ResourceTemporarilyUnavailable
1219 # so it will be retried (and the token refreshed)
1220 if response.status == 401:
1221 if use_global_session or not self.dev_session_active:
1222 self._auth_info_global = None
1223 else:
1224 self._auth_info_dev = None
1225 raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
1226 # handle temporary server error
1227 if response.status in (502, 503):
1228 raise ResourceTemporarilyUnavailable(backoff_time=30)
1229 response.raise_for_status()
1230
1231 @throttle_with_retries
1232 async def _put_data(self, endpoint: str, data: Any = None, **kwargs: Any) -> None:
1233 """Put data on api."""
1234 url = f"https://api.spotify.com/v1/{endpoint}"
1235 use_global_session = kwargs.pop("use_global_session", False)
1236 if not (auth_info := kwargs.pop("auth_info", None)):
1237 auth_info = await self._get_auth_info(use_global_session=use_global_session)
1238 headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
1239 async with self.mass.http_session.put(
1240 url, headers=headers, params=kwargs, json=data, ssl=True
1241 ) as response:
1242 # handle spotify rate limiter
1243 if response.status == 429:
1244 backoff_time = int(response.headers["Retry-After"])
1245 raise ResourceTemporarilyUnavailable(
1246 "Spotify Rate Limiter", backoff_time=backoff_time
1247 )
1248 # handle token expired, raise ResourceTemporarilyUnavailable
1249 # so it will be retried (and the token refreshed)
1250 if response.status == 401:
1251 if use_global_session or not self.dev_session_active:
1252 self._auth_info_global = None
1253 else:
1254 self._auth_info_dev = None
1255 raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
1256
1257 # handle temporary server error
1258 if response.status in (502, 503):
1259 raise ResourceTemporarilyUnavailable(backoff_time=30)
1260 response.raise_for_status()
1261
1262 @throttle_with_retries
1263 async def _post_data(
1264 self, endpoint: str, data: Any = None, want_result: bool = True, **kwargs: Any
1265 ) -> dict[str, Any]:
1266 """Post data on api."""
1267 url = f"https://api.spotify.com/v1/{endpoint}"
1268 use_global_session = kwargs.pop("use_global_session", False)
1269 if not (auth_info := kwargs.pop("auth_info", None)):
1270 auth_info = await self._get_auth_info(use_global_session=use_global_session)
1271 headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
1272 async with self.mass.http_session.post(
1273 url, headers=headers, params=kwargs, json=data, ssl=True
1274 ) as response:
1275 # handle spotify rate limiter
1276 if response.status == 429:
1277 backoff_time = int(response.headers["Retry-After"])
1278 raise ResourceTemporarilyUnavailable(
1279 "Spotify Rate Limiter", backoff_time=backoff_time
1280 )
1281 # handle token expired, raise ResourceTemporarilyUnavailable
1282 # so it will be retried (and the token refreshed)
1283 if response.status == 401:
1284 if use_global_session or not self.dev_session_active:
1285 self._auth_info_global = None
1286 else:
1287 self._auth_info_dev = None
1288 raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
1289 # handle temporary server error
1290 if response.status in (502, 503):
1291 raise ResourceTemporarilyUnavailable(backoff_time=30)
1292 response.raise_for_status()
1293 if not want_result:
1294 return {}
1295 result: dict[str, Any] = await response.json(loads=json_loads)
1296 return result
1297
1298 def _fix_create_playlist_api_bug(self, playlist_obj: dict[str, Any]) -> None:
1299 """Fix spotify API bug where incorrect owner id is returned from Create Playlist."""
1300 if self._sp_user is None:
1301 raise LoginFailed("User info not available - not logged in")
1302
1303 if playlist_obj["owner"]["id"] != self._sp_user["id"]:
1304 playlist_obj["owner"]["id"] = self._sp_user["id"]
1305 playlist_obj["owner"]["display_name"] = self._sp_user["display_name"]
1306 else:
1307 self.logger.warning(
1308 "FIXME: Spotify have fixed their Create Playlist API, this fix can be removed."
1309 )
1310
1311 async def _test_audiobook_support(self) -> bool:
1312 """Test if audiobooks are supported in user's region."""
1313 try:
1314 await self._get_data("me/audiobooks", limit=1)
1315 return True
1316 except aiohttp.ClientResponseError as e:
1317 if e.status == 403:
1318 return False # Not available
1319 raise # Re-raise other HTTP errors
1320 except (MediaNotFoundError, ProviderUnavailableError):
1321 return False
1322