/
/
/
1"""Main Spotify provider implementation."""
2
3from __future__ import annotations
4
5import os
6import time
7from collections.abc import AsyncGenerator
8from typing import Any, cast
9
10import aiohttp
11from music_assistant_models.enums import (
12 ContentType,
13 ImageType,
14 MediaType,
15 ProviderFeature,
16 StreamType,
17)
18from music_assistant_models.errors import (
19 LoginFailed,
20 MediaNotFoundError,
21 ProviderUnavailableError,
22 ResourceTemporarilyUnavailable,
23 UnsupportedFeaturedException,
24)
25from music_assistant_models.media_items import (
26 Album,
27 Artist,
28 Audiobook,
29 AudioFormat,
30 MediaItemImage,
31 MediaItemType,
32 Playlist,
33 Podcast,
34 PodcastEpisode,
35 ProviderMapping,
36 SearchResults,
37 Track,
38 UniqueList,
39)
40from music_assistant_models.media_items.metadata import MediaItemChapter
41from music_assistant_models.streamdetails import StreamDetails
42
43from music_assistant.controllers.cache import use_cache
44from music_assistant.helpers.app_vars import app_var # type: ignore[attr-defined]
45from music_assistant.helpers.json import json_loads
46from music_assistant.helpers.process import check_output
47from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
48from music_assistant.helpers.util import lock
49from music_assistant.models.music_provider import MusicProvider
50
51from .constants import (
52 CONF_CLIENT_ID,
53 CONF_REFRESH_TOKEN_DEV,
54 CONF_REFRESH_TOKEN_GLOBAL,
55 CONF_SYNC_AUDIOBOOK_PROGRESS,
56 CONF_SYNC_PODCAST_PROGRESS,
57 LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX,
58)
59from .helpers import get_librespot_binary, get_spotify_token
60from .parsers import (
61 parse_album,
62 parse_artist,
63 parse_audiobook,
64 parse_playlist,
65 parse_podcast,
66 parse_podcast_episode,
67 parse_track,
68)
69from .streaming import LibrespotStreamer
70
71
72class NotModifiedError(Exception):
73 """Exception raised when a resource has not been modified."""
74
75
76class SpotifyProvider(MusicProvider):
77 """Implementation of a Spotify MusicProvider."""
78
79 # Global session (MA's client ID) - always present
80 _auth_info_global: dict[str, Any] | None = None
81 # Developer session (user's custom client ID) - optional
82 _auth_info_dev: dict[str, Any] | None = None
83 _sp_user: dict[str, Any] | None = None
84 _librespot_bin: str | None = None
85 _audiobooks_supported = False
86 # True if user has configured a custom client ID with valid authentication
87 dev_session_active: bool = False
88 throttler: ThrottlerManager
89
90 async def handle_async_init(self) -> None:
91 """Handle async initialization of the provider."""
92 self.cache_dir = os.path.join(self.mass.cache_path, self.instance_id)
93 # Default throttler for global session (heavy rate limited)
94 self.throttler = ThrottlerManager(rate_limit=1, period=2)
95 self.streamer = LibrespotStreamer(self)
96
97 # check if we have a librespot binary for this arch
98 self._librespot_bin = await get_librespot_binary()
99 # try login which will raise if it fails (logs in global session)
100 await self.login()
101
102 # Check if user has a custom client ID with valid dev token
103 client_id = self.config.get_value(CONF_CLIENT_ID)
104 dev_token = self.config.get_value(CONF_REFRESH_TOKEN_DEV)
105
106 if client_id and dev_token and self._sp_user:
107 await self.login_dev()
108 # Verify user matches
109 userinfo = await self._get_data("me", use_global_session=False)
110 if userinfo["id"] != self._sp_user["id"]:
111 raise LoginFailed(
112 "Developer session must use the same Spotify account as the main session."
113 )
114 # loosen the throttler when a custom client id is used
115 self.throttler = ThrottlerManager(rate_limit=45, period=30)
116 self.dev_session_active = True
117 self.logger.info("Developer Spotify session active.")
118
119 self._audiobooks_supported = await self._test_audiobook_support()
120 if not self._audiobooks_supported:
121 self.logger.info(
122 "Audiobook support disabled: Audiobooks are not available in your region. "
123 "See https://support.spotify.com/us/authors/article/audiobooks-availability/ "
124 "for supported countries."
125 )
126
127 @property
128 def audiobooks_supported(self) -> bool:
129 """Check if audiobooks are supported for this user/region."""
130 return self._audiobooks_supported
131
132 @property
133 def audiobook_progress_sync_enabled(self) -> bool:
134 """Check if audiobook progress sync is enabled."""
135 return bool(self.config.get_value(CONF_SYNC_AUDIOBOOK_PROGRESS, False))
136
137 @property
138 def podcast_progress_sync_enabled(self) -> bool:
139 """Check if played status sync is enabled."""
140 value = self.config.get_value(CONF_SYNC_PODCAST_PROGRESS, True)
141 return bool(value) if value is not None else True
142
143 @property
144 def supported_features(self) -> set[ProviderFeature]:
145 """Return the features supported by this Provider."""
146 features = self._supported_features.copy()
147 # Add audiobook features if enabled
148 if self.audiobooks_supported:
149 features.add(ProviderFeature.LIBRARY_AUDIOBOOKS)
150 features.add(ProviderFeature.LIBRARY_AUDIOBOOKS_EDIT)
151 return features
152
153 @property
154 def instance_name_postfix(self) -> str | None:
155 """Return a (default) instance name postfix for this provider instance."""
156 if self._sp_user:
157 return str(self._sp_user["display_name"])
158 return None
159
160 ## Library retrieval methods (generators)
161 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
162 """Retrieve library artists from spotify."""
163 endpoint = "me/following"
164 while True:
165 spotify_artists = await self._get_data(
166 endpoint,
167 type="artist",
168 limit=50,
169 )
170 for item in spotify_artists["artists"]["items"]:
171 if item and item["id"]:
172 yield parse_artist(item, self)
173 if spotify_artists["artists"]["next"]:
174 endpoint = spotify_artists["artists"]["next"]
175 endpoint = endpoint.replace("https://api.spotify.com/v1/", "")
176 else:
177 break
178
179 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
180 """Retrieve library albums from the provider."""
181 async for item in self._get_all_items("me/albums"):
182 if item["album"] and item["album"]["id"]:
183 yield parse_album(item["album"], self)
184
185 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
186 """Retrieve library tracks from the provider."""
187 async for item in self._get_all_items("me/tracks"):
188 if item and item["track"]["id"]:
189 yield parse_track(item["track"], self)
190
191 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
192 """Retrieve library podcasts from spotify."""
193 async for item in self._get_all_items("me/shows"):
194 if item["show"] and item["show"]["id"]:
195 show_obj = item["show"]
196 # Filter out audiobooks - they have a distinctive description format
197 description = show_obj.get("description", "")
198 if description.startswith("Author(s):") and "Narrator(s):" in description:
199 continue
200 yield parse_podcast(show_obj, self)
201
202 async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
203 """Retrieve library audiobooks from spotify."""
204 if not self.audiobooks_supported:
205 return
206 async for item in self._get_all_items("me/audiobooks"):
207 if item and item["id"]:
208 # Parse the basic audiobook
209 audiobook = parse_audiobook(item, self)
210 # Add chapters from Spotify API data
211 await self._add_audiobook_chapters(audiobook)
212 yield audiobook
213
214 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
215 """Retrieve playlists from the provider.
216
217 Note: We use the global session here because playlists like "Daily Mix"
218 are only returned when using the non-dev (global) token.
219 """
220 yield await self._get_liked_songs_playlist()
221 async for item in self._get_all_items("me/playlists", use_global_session=True):
222 if item and item["id"]:
223 yield parse_playlist(item, self)
224
225 @use_cache()
226 async def search(
227 self, search_query: str, media_types: list[MediaType] | None = None, limit: int = 5
228 ) -> SearchResults:
229 """Perform search on musicprovider.
230
231 :param search_query: Search query.
232 :param media_types: A list of media_types to include.
233 :param limit: Number of items to return in the search (per type).
234 """
235 searchresult = SearchResults()
236 if media_types is None:
237 return searchresult
238
239 searchtype = self._build_search_types(media_types)
240 if not searchtype:
241 return searchresult
242
243 search_query = search_query.replace("'", "")
244 offset = 0
245 page_limit = min(limit, 50)
246
247 while True:
248 api_result = await self._get_data(
249 "search", q=search_query, type=searchtype, limit=page_limit, offset=offset
250 )
251 items_received = self._process_search_results(api_result, searchresult)
252
253 offset += page_limit
254 if offset >= limit or items_received < page_limit:
255 break
256
257 return searchresult
258
259 def _build_search_types(self, media_types: list[MediaType]) -> str:
260 """Build comma-separated search types string from media types."""
261 searchtypes = []
262 if MediaType.ARTIST in media_types:
263 searchtypes.append("artist")
264 if MediaType.ALBUM in media_types:
265 searchtypes.append("album")
266 if MediaType.TRACK in media_types:
267 searchtypes.append("track")
268 if MediaType.PLAYLIST in media_types:
269 searchtypes.append("playlist")
270 if MediaType.PODCAST in media_types:
271 searchtypes.append("show")
272 if MediaType.AUDIOBOOK in media_types and self.audiobooks_supported:
273 searchtypes.append("audiobook")
274 return ",".join(searchtypes)
275
276 def _process_search_results(
277 self, api_result: dict[str, Any], searchresult: SearchResults
278 ) -> int:
279 """Process API search results and update searchresult object.
280
281 Returns the total number of items received.
282 """
283 items_received = 0
284
285 if "artists" in api_result:
286 artists = [
287 parse_artist(item, self)
288 for item in api_result["artists"]["items"]
289 if (item and item["id"] and item["name"])
290 ]
291 searchresult.artists = [*searchresult.artists, *artists]
292 items_received += len(api_result["artists"]["items"])
293
294 if "albums" in api_result:
295 albums = [
296 parse_album(item, self)
297 for item in api_result["albums"]["items"]
298 if (item and item["id"])
299 ]
300 searchresult.albums = [*searchresult.albums, *albums]
301 items_received += len(api_result["albums"]["items"])
302
303 if "tracks" in api_result:
304 tracks = [
305 parse_track(item, self)
306 for item in api_result["tracks"]["items"]
307 if (item and item["id"])
308 ]
309 searchresult.tracks = [*searchresult.tracks, *tracks]
310 items_received += len(api_result["tracks"]["items"])
311
312 if "playlists" in api_result:
313 playlists = [
314 parse_playlist(item, self)
315 for item in api_result["playlists"]["items"]
316 if (item and item["id"])
317 ]
318 searchresult.playlists = [*searchresult.playlists, *playlists]
319 items_received += len(api_result["playlists"]["items"])
320
321 if "shows" in api_result:
322 podcasts = []
323 for item in api_result["shows"]["items"]:
324 if not (item and item["id"]):
325 continue
326 # Filter out audiobooks - they have a distinctive description format
327 description = item.get("description", "")
328 if description.startswith("Author(s):") and "Narrator(s):" in description:
329 continue
330 podcasts.append(parse_podcast(item, self))
331 searchresult.podcasts = [*searchresult.podcasts, *podcasts]
332 items_received += len(api_result["shows"]["items"])
333
334 if "audiobooks" in api_result and self.audiobooks_supported:
335 audiobooks = [
336 parse_audiobook(item, self)
337 for item in api_result["audiobooks"]["items"]
338 if (item and item["id"])
339 ]
340 searchresult.audiobooks = [*searchresult.audiobooks, *audiobooks]
341 items_received += len(api_result["audiobooks"]["items"])
342
343 return items_received
344
345 @use_cache()
346 async def get_artist(self, prov_artist_id: str) -> Artist:
347 """Get full artist details by id."""
348 artist_obj = await self._get_data(f"artists/{prov_artist_id}")
349 return parse_artist(artist_obj, self)
350
351 @use_cache()
352 async def get_album(self, prov_album_id: str) -> Album:
353 """Get full album details by id."""
354 album_obj = await self._get_data(f"albums/{prov_album_id}")
355 return parse_album(album_obj, self)
356
357 @use_cache()
358 async def get_track(self, prov_track_id: str) -> Track:
359 """Get full track details by id."""
360 track_obj = await self._get_data(f"tracks/{prov_track_id}")
361 return parse_track(track_obj, self)
362
363 @use_cache()
364 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
365 """Get full playlist details by id."""
366 if prov_playlist_id == self._get_liked_songs_playlist_id():
367 return await self._get_liked_songs_playlist()
368
369 # Check cache to see if this playlist requires global token
370 use_global = await self._playlist_requires_global_token(prov_playlist_id)
371 if use_global:
372 playlist_obj = await self._get_data(
373 f"playlists/{prov_playlist_id}", use_global_session=True
374 )
375 return parse_playlist(playlist_obj, self)
376
377 # Try with dev token first (if available), fallback to global on 400 error
378 # Some playlists like Spotify-owned (Daily Mix) or Liked Songs only work with global token
379 try:
380 playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}")
381 return parse_playlist(playlist_obj, self)
382 except MediaNotFoundError:
383 if self.dev_session_active:
384 # Remember that this playlist requires global token
385 await self._set_playlist_requires_global_token(prov_playlist_id)
386 playlist_obj = await self._get_data(
387 f"playlists/{prov_playlist_id}", use_global_session=True
388 )
389 return parse_playlist(playlist_obj, self)
390 raise
391
392 @use_cache()
393 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
394 """Get full podcast details by id."""
395 podcast_obj = await self._get_data(f"shows/{prov_podcast_id}")
396 if not podcast_obj:
397 raise MediaNotFoundError(f"Podcast not found: {prov_podcast_id}")
398 return parse_podcast(podcast_obj, self)
399
400 @use_cache()
401 async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
402 """Get full audiobook details by id."""
403 if not self.audiobooks_supported:
404 raise UnsupportedFeaturedException("Audiobooks are not supported with this account")
405
406 audiobook_obj = await self._get_data(f"audiobooks/{prov_audiobook_id}")
407 if not audiobook_obj:
408 raise MediaNotFoundError(f"Audiobook not found: {prov_audiobook_id}")
409
410 # Parse basic audiobook without chapters first
411 audiobook = parse_audiobook(audiobook_obj, self)
412
413 # Add chapters from Spotify API data
414 await self._add_audiobook_chapters(audiobook)
415
416 # Note: Resume position will be handled by MA's internal system
417 # which calls get_resume_position() when needed
418
419 return audiobook
420
421 async def get_podcast_episodes(
422 self, prov_podcast_id: str
423 ) -> AsyncGenerator[PodcastEpisode, None]:
424 """Get all podcast episodes."""
425 # Get podcast object for context if available
426 podcast = await self.mass.music.podcasts.get_library_item_by_prov_id(
427 prov_podcast_id, self.instance_id
428 )
429 podcast = await self.get_podcast(prov_podcast_id)
430
431 # Get (cached) episode data
432 episodes_data = await self._get_podcast_episodes_data(prov_podcast_id)
433
434 # Parse and yield episodes with position
435 for idx, episode_data in enumerate(episodes_data):
436 episode = parse_podcast_episode(episode_data, self, podcast)
437 episode.position = idx + 1
438
439 # Set played status if sync is enabled and resume data exists
440 if self.podcast_progress_sync_enabled and "resume_point" in episode_data:
441 resume_point = episode_data["resume_point"]
442 fully_played = resume_point.get("fully_played", False)
443 position_ms = resume_point.get("resume_position_ms", 0)
444
445 episode.fully_played = fully_played if fully_played else None
446 episode.resume_position_ms = position_ms if position_ms > 0 else None
447
448 yield episode
449
450 @use_cache(86400) # 24 hours
451 async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
452 """Get full podcast episode details by id."""
453 episode_obj = await self._get_data(f"episodes/{prov_episode_id}", market="from_token")
454 if not episode_obj:
455 raise MediaNotFoundError(f"Episode not found: {prov_episode_id}")
456 return parse_podcast_episode(episode_obj, self)
457
458 async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
459 """Get resume position for episode/audiobook from Spotify."""
460 if media_type == MediaType.PODCAST_EPISODE:
461 if not self.podcast_progress_sync_enabled:
462 raise NotImplementedError("Spotify podcast resume sync disabled in settings")
463
464 try:
465 episode_obj = await self._get_data(f"episodes/{item_id}", market="from_token")
466 except MediaNotFoundError:
467 raise NotImplementedError("Episode not found on Spotify")
468 except (ResourceTemporarilyUnavailable, aiohttp.ClientError) as e:
469 self.logger.debug(f"Error fetching episode {item_id}: {e}")
470 raise NotImplementedError("Unable to fetch episode data from Spotify")
471
472 if (
473 not episode_obj
474 or "resume_point" not in episode_obj
475 or not episode_obj["resume_point"]
476 ):
477 raise NotImplementedError("No resume point data from Spotify")
478
479 resume_point = episode_obj["resume_point"]
480 fully_played = resume_point.get("fully_played", False)
481 position_ms = resume_point.get("resume_position_ms", 0)
482 return fully_played, position_ms
483
484 if media_type == MediaType.AUDIOBOOK:
485 if not self.audiobooks_supported:
486 raise NotImplementedError("Audiobook support is disabled")
487 if not self.audiobook_progress_sync_enabled:
488 raise NotImplementedError("Spotify audiobook resume sync disabled in settings")
489
490 try:
491 chapters_data = await self._get_audiobook_chapters_data(item_id)
492 if not chapters_data:
493 raise NotImplementedError("No chapters data available")
494
495 total_position_ms = 0
496 fully_played = True
497
498 for chapter in chapters_data:
499 resume_point = chapter.get("resume_point", {})
500 chapter_fully_played = resume_point.get("fully_played", False)
501 chapter_position_ms = resume_point.get("resume_position_ms", 0)
502
503 if chapter_fully_played:
504 total_position_ms += chapter.get("duration_ms", 0)
505 elif chapter_position_ms > 0:
506 total_position_ms += chapter_position_ms
507 fully_played = False
508 break
509 else:
510 fully_played = False
511 break
512
513 return fully_played, total_position_ms
514
515 except (MediaNotFoundError, ResourceTemporarilyUnavailable, aiohttp.ClientError) as e:
516 self.logger.debug(f"Failed to get audiobook resume position for {item_id}: {e}")
517 raise NotImplementedError("Unable to get audiobook resume position from Spotify")
518
519 else:
520 raise NotImplementedError(f"Resume position not supported for {media_type}")
521
522 async def on_played(
523 self,
524 media_type: MediaType,
525 prov_item_id: str,
526 fully_played: bool,
527 position: int,
528 media_item: MediaItemType,
529 is_playing: bool = False,
530 ) -> None:
531 """
532 Call when an episode/audiobook is played in MA.
533
534 MA automatically handles internal position tracking - this method is for
535 provider-specific actions like syncing to external services.
536 """
537 if media_type == MediaType.PODCAST_EPISODE:
538 if not isinstance(media_item, PodcastEpisode):
539 return
540
541 # Log the playback for monitoring/debugging
542 safe_position = position or 0
543 if media_item.duration > 0:
544 completion_percentage = (safe_position / media_item.duration) * 100
545 else:
546 completion_percentage = 0
547
548 self.logger.debug(
549 f"Episode played: {prov_item_id} at {safe_position}s "
550 f"({completion_percentage:.1f}%, fully_played: {fully_played})"
551 )
552
553 # Note: No API exists to sync playback position back to Spotify for episodes
554 # MA handles all internal position tracking automatically
555
556 elif media_type == MediaType.AUDIOBOOK:
557 if not isinstance(media_item, Audiobook):
558 return
559
560 # Log the playback for monitoring/debugging
561 safe_position = position or 0
562 if media_item.duration > 0:
563 completion_percentage = (safe_position / media_item.duration) * 100
564 else:
565 completion_percentage = 0
566
567 self.logger.debug(
568 f"Audiobook played: {prov_item_id} at {safe_position}s "
569 f"({completion_percentage:.1f}%, fully_played: {fully_played})"
570 )
571
572 # Note: No API exists to sync playback position back to Spotify for audiobooks
573 # MA handles all internal position tracking automatically
574
575 # The resume position will be automatically updated by MA's internal tracking
576 # and will be retrieved via get_audiobook() which combines MA + Spotify positions
577
578 @use_cache()
579 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
580 """Get all album tracks for given album id."""
581 return [
582 parse_track(item, self)
583 async for item in self._get_all_items(f"albums/{prov_album_id}/tracks")
584 if item["id"]
585 ]
586
587 @use_cache(2600 * 3) # 3 hours
588 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
589 """Get playlist tracks."""
590 is_liked_songs = prov_playlist_id == self._get_liked_songs_playlist_id()
591 uri = "me/tracks" if is_liked_songs else f"playlists/{prov_playlist_id}/tracks"
592
593 # Liked songs always require global session
594 # For other playlists, call get_playlist first to trigger the fallback logic
595 # and populate the cache for which token to use
596 if is_liked_songs:
597 use_global = True
598 else:
599 # This call is cached and will determine/cache if global token is needed
600 await self.get_playlist(prov_playlist_id)
601 use_global = await self._playlist_requires_global_token(prov_playlist_id)
602
603 result: list[Track] = []
604 page_size = 50
605 offset = page * page_size
606
607 # Get etag for caching
608 cache_checksum = await self._get_etag(uri, limit=1, offset=0, use_global_session=use_global)
609
610 spotify_result = await self._get_data_with_caching(
611 uri, cache_checksum, limit=page_size, offset=offset, use_global_session=use_global
612 )
613 for index, item in enumerate(spotify_result["items"], 1):
614 if not (item and item["track"] and item["track"]["id"]):
615 continue
616 track = parse_track(item["track"], self)
617 track.position = offset + index
618 result.append(track)
619 return result
620
621 @use_cache(86400 * 14) # 14 days
622 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
623 """Get a list of all albums for the given artist."""
624 return [
625 parse_album(item, self)
626 async for item in self._get_all_items(
627 f"artists/{prov_artist_id}/albums?include_groups=album,single,compilation"
628 )
629 if (item and item["id"])
630 ]
631
632 @use_cache(86400 * 14) # 14 days
633 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
634 """Get a list of 10 most popular tracks for the given artist."""
635 artist = await self.get_artist(prov_artist_id)
636 endpoint = f"artists/{prov_artist_id}/top-tracks"
637 items = await self._get_data(endpoint)
638 return [
639 parse_track(item, self, artist=artist)
640 for item in items["tracks"]
641 if (item and item["id"])
642 ]
643
644 async def library_add(self, item: MediaItemType) -> bool:
645 """Add item to library."""
646 if item.media_type == MediaType.ARTIST:
647 await self._put_data("me/following", {"ids": [item.item_id]}, type="artist")
648 elif item.media_type == MediaType.ALBUM:
649 await self._put_data("me/albums", {"ids": [item.item_id]})
650 elif item.media_type == MediaType.TRACK:
651 await self._put_data("me/tracks", {"ids": [item.item_id]})
652 elif item.media_type == MediaType.PLAYLIST:
653 await self._put_data(f"playlists/{item.item_id}/followers", data={"public": False})
654 elif item.media_type == MediaType.PODCAST:
655 await self._put_data("me/shows", ids=item.item_id)
656 elif item.media_type == MediaType.AUDIOBOOK and self.audiobooks_supported:
657 await self._put_data("me/audiobooks", ids=item.item_id)
658 return True
659
660 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
661 """Remove item from library."""
662 if media_type == MediaType.ARTIST:
663 await self._delete_data("me/following", {"ids": [prov_item_id]}, type="artist")
664 elif media_type == MediaType.ALBUM:
665 await self._delete_data("me/albums", {"ids": [prov_item_id]})
666 elif media_type == MediaType.TRACK:
667 await self._delete_data("me/tracks", {"ids": [prov_item_id]})
668 elif media_type == MediaType.PLAYLIST:
669 await self._delete_data(f"playlists/{prov_item_id}/followers")
670 elif media_type == MediaType.PODCAST:
671 await self._delete_data("me/shows", ids=prov_item_id)
672 elif media_type == MediaType.AUDIOBOOK and self.audiobooks_supported:
673 await self._delete_data("me/audiobooks", ids=prov_item_id)
674 return True
675
676 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
677 """Add track(s) to playlist."""
678 track_uris = [f"spotify:track:{track_id}" for track_id in prov_track_ids]
679 data = {"uris": track_uris}
680 await self._post_data(f"playlists/{prov_playlist_id}/tracks", data=data)
681
682 async def remove_playlist_tracks(
683 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
684 ) -> None:
685 """Remove track(s) from playlist."""
686 track_uris = []
687 for pos in positions_to_remove:
688 uri = f"playlists/{prov_playlist_id}/tracks"
689 spotify_result = await self._get_data(uri, limit=1, offset=pos - 1)
690 for item in spotify_result["items"]:
691 if not (item and item["track"] and item["track"]["id"]):
692 continue
693 track_uris.append({"uri": f"spotify:track:{item['track']['id']}"})
694 data = {"tracks": track_uris}
695 await self._delete_data(f"playlists/{prov_playlist_id}/tracks", data=data)
696
697 async def create_playlist(self, name: str) -> Playlist:
698 """Create a new playlist on provider with given name."""
699 if self._sp_user is None:
700 raise LoginFailed("User info not available - not logged in")
701 data = {"name": name, "public": False}
702 new_playlist = await self._post_data(f"users/{self._sp_user['id']}/playlists", data=data)
703 self._fix_create_playlist_api_bug(new_playlist)
704 return parse_playlist(new_playlist, self)
705
706 @use_cache(86400 * 14) # 14 days
707 async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
708 """Retrieve a dynamic list of tracks based on the provided item."""
709 # Recommendations endpoint is only available on global session (not developer API)
710 # https://developer.spotify.com/blog/2024-11-27-changes-to-the-web-api
711 endpoint = "recommendations"
712 items = await self._get_data(
713 endpoint, seed_tracks=prov_track_id, limit=limit, use_global_session=True
714 )
715 return [parse_track(item, self) for item in items["tracks"] if (item and item["id"])]
716
717 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
718 """Return content details for the given track/episode/audiobook when it will be streamed."""
719 if media_type == MediaType.AUDIOBOOK and self.audiobooks_supported:
720 chapters_data = await self._get_audiobook_chapters_data(item_id)
721 if not chapters_data:
722 raise MediaNotFoundError(f"No chapters found for audiobook {item_id}")
723
724 # Calculate total duration and convert to seconds for StreamDetails
725 total_duration_ms = sum(chapter.get("duration_ms", 0) for chapter in chapters_data)
726 duration_seconds = total_duration_ms // 1000
727
728 # Create chapter URIs for streaming
729 chapter_uris = []
730 for chapter in chapters_data:
731 chapter_id = chapter["id"]
732 chapter_uri = f"spotify://episode:{chapter_id}"
733 chapter_uris.append(chapter_uri)
734
735 return StreamDetails(
736 item_id=item_id,
737 provider=self.instance_id,
738 media_type=MediaType.AUDIOBOOK,
739 audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
740 stream_type=StreamType.CUSTOM,
741 allow_seek=True,
742 can_seek=True,
743 duration=duration_seconds,
744 data={"chapters": chapter_uris, "chapters_data": chapters_data},
745 )
746
747 # For all other media types (tracks, podcast episodes)
748 return StreamDetails(
749 item_id=item_id,
750 provider=self.instance_id,
751 media_type=media_type,
752 audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
753 stream_type=StreamType.CUSTOM,
754 allow_seek=True,
755 can_seek=True,
756 )
757
758 async def get_audio_stream(
759 self, streamdetails: StreamDetails, seek_position: int = 0
760 ) -> AsyncGenerator[bytes, None]:
761 """Get audio stream from Spotify via librespot."""
762 if streamdetails.media_type == MediaType.AUDIOBOOK and isinstance(streamdetails.data, dict):
763 chapter_uris = streamdetails.data.get("chapters", [])
764 chapters_data = streamdetails.data.get("chapters_data", [])
765
766 # Calculate which chapter to start from based on seek_position
767 seek_position_ms = seek_position * 1000
768 current_seek_ms = seek_position_ms
769 start_chapter = 0
770
771 if seek_position > 0 and chapters_data:
772 accumulated_duration_ms = 0
773
774 for i, chapter_data in enumerate(chapters_data):
775 chapter_duration_ms = chapter_data.get("duration_ms", 0)
776
777 if accumulated_duration_ms + chapter_duration_ms > seek_position_ms:
778 start_chapter = i
779 current_seek_ms = seek_position_ms - accumulated_duration_ms
780 break
781 accumulated_duration_ms += chapter_duration_ms
782 else:
783 start_chapter = len(chapter_uris) - 1
784 current_seek_ms = 0
785
786 # Convert back to seconds for librespot
787 current_seek_seconds = int(current_seek_ms // 1000)
788
789 # Stream chapters starting from the calculated position
790 for i in range(start_chapter, len(chapter_uris)):
791 chapter_uri = chapter_uris[i]
792 chapter_seek = current_seek_seconds if i == start_chapter else 0
793
794 try:
795 async for chunk in self.streamer.stream_spotify_uri(chapter_uri, chapter_seek):
796 yield chunk
797 except Exception as e:
798 self.logger.error(f"Chapter {i + 1} streaming failed: {e}")
799 continue
800 else:
801 # Handle normal tracks and podcast episodes
802 async for chunk in self.streamer.get_audio_stream(streamdetails, seek_position):
803 yield chunk
804
805 @lock
806 async def login(self, force_refresh: bool = False) -> dict[str, Any]:
807 """Log-in Spotify global session and return Auth/token info.
808
809 This uses MA's global client ID which has full API access but heavy rate limits.
810 """
811 # return existing token if we have one in memory
812 if (
813 not force_refresh
814 and self._auth_info_global
815 and (self._auth_info_global["expires_at"] > (time.time() - 600))
816 ):
817 return self._auth_info_global
818 # request new access token using the refresh token
819 if not (refresh_token := self.config.get_value(CONF_REFRESH_TOKEN_GLOBAL)):
820 raise LoginFailed("Authentication required")
821
822 try:
823 auth_info = await get_spotify_token(
824 self.mass.http_session,
825 app_var(2), # Always use MA's global client ID
826 cast("str", refresh_token),
827 "global",
828 )
829 self.logger.debug("Successfully refreshed global access token")
830 except LoginFailed as err:
831 if "revoked" in str(err):
832 # clear refresh token if it's invalid
833 self._update_config_value(CONF_REFRESH_TOKEN_GLOBAL, None)
834 if self.available:
835 self.unload_with_error(str(err))
836 elif self.available:
837 self.mass.create_task(
838 self.mass.unload_provider_with_error(self.instance_id, str(err))
839 )
840 raise
841
842 # make sure that our updated creds get stored in memory + config
843 self._auth_info_global = auth_info
844 self._update_config_value(
845 CONF_REFRESH_TOKEN_GLOBAL, auth_info["refresh_token"], encrypted=True
846 )
847
848 # Setup librespot with global token only if dev token is not configured
849 # (if dev token exists, librespot will be set up in login_dev instead)
850 if not self.config.get_value(CONF_REFRESH_TOKEN_DEV):
851 await self._setup_librespot_auth(auth_info["access_token"])
852
853 # get logged-in user info
854 if not self._sp_user:
855 self._sp_user = userinfo = await self._get_data(
856 "me", auth_info=auth_info, use_global_session=True
857 )
858 self.mass.metadata.set_default_preferred_language(userinfo["country"])
859 self.logger.info("Successfully logged in to Spotify as %s", userinfo["display_name"])
860 return auth_info
861
862 @lock
863 async def login_dev(self, force_refresh: bool = False) -> dict[str, Any]:
864 """Log-in Spotify developer session and return Auth/token info.
865
866 This uses the user's custom client ID which has less rate limits but limited API access.
867 """
868 # return existing token if we have one in memory
869 if (
870 not force_refresh
871 and self._auth_info_dev
872 and (self._auth_info_dev["expires_at"] > (time.time() - 600))
873 ):
874 return self._auth_info_dev
875 # request new access token using the refresh token
876 refresh_token = self.config.get_value(CONF_REFRESH_TOKEN_DEV)
877 client_id = self.config.get_value(CONF_CLIENT_ID)
878 if not refresh_token or not client_id:
879 raise LoginFailed("Developer authentication not configured")
880
881 try:
882 auth_info = await get_spotify_token(
883 self.mass.http_session,
884 cast("str", client_id),
885 cast("str", refresh_token),
886 "developer",
887 )
888 self.logger.debug("Successfully refreshed developer access token")
889 except LoginFailed as err:
890 if "revoked" in str(err):
891 # clear refresh token if it's invalid
892 self._update_config_value(CONF_REFRESH_TOKEN_DEV, None)
893 self._update_config_value(CONF_CLIENT_ID, None)
894 # Don't unload - we can still use the global session
895 self.dev_session_active = False
896 self.logger.warning(str(err))
897 raise
898
899 # make sure that our updated creds get stored in memory + config
900 self._auth_info_dev = auth_info
901 self._update_config_value(
902 CONF_REFRESH_TOKEN_DEV, auth_info["refresh_token"], encrypted=True
903 )
904
905 # Setup librespot with dev token (preferred over global token)
906 await self._setup_librespot_auth(auth_info["access_token"])
907
908 self.logger.info("Successfully logged in to Spotify developer session")
909 return auth_info
910
911 async def _setup_librespot_auth(self, access_token: str) -> None:
912 """Set up librespot authentication with the given access token.
913
914 :param access_token: Spotify access token to use for librespot authentication.
915 """
916 if self._librespot_bin is None:
917 raise LoginFailed("Librespot binary not available")
918
919 args = [
920 self._librespot_bin,
921 "--cache",
922 self.cache_dir,
923 "--check-auth",
924 ]
925 ret_code, stdout = await check_output(*args)
926 if ret_code != 0:
927 # cached librespot creds are invalid, re-authenticate
928 # we can use the check-token option to send a new token to librespot
929 # librespot will then get its own token from spotify (somehow) and cache that.
930 args += [
931 "--access-token",
932 access_token,
933 ]
934 ret_code, stdout = await check_output(*args)
935 if ret_code != 0:
936 # this should not happen, but guard it just in case
937 err_str = stdout.decode("utf-8").strip()
938 raise LoginFailed(f"Failed to verify credentials on Librespot: {err_str}")
939
940 async def _get_auth_info(self, use_global_session: bool = False) -> dict[str, Any]:
941 """Get auth info for API requests, preferring dev session if available.
942
943 :param use_global_session: Force use of global session (for features not available on dev).
944 """
945 if use_global_session or not self.dev_session_active:
946 return await self.login()
947
948 # Try dev session first
949 try:
950 return await self.login_dev()
951 except LoginFailed:
952 # Fall back to global session
953 self.logger.debug("Falling back to global session after dev session failure")
954 return await self.login()
955
956 def _get_liked_songs_playlist_id(self) -> str:
957 return f"{LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX}-{self.instance_id}"
958
959 async def _get_liked_songs_playlist(self) -> Playlist:
960 if self._sp_user is None:
961 raise LoginFailed("User info not available - not logged in")
962
963 liked_songs = Playlist(
964 item_id=self._get_liked_songs_playlist_id(),
965 provider=self.instance_id,
966 name=f"Liked Songs {self._sp_user['display_name']}", # TODO to be translated
967 owner=self._sp_user["display_name"],
968 provider_mappings={
969 ProviderMapping(
970 item_id=self._get_liked_songs_playlist_id(),
971 provider_domain=self.domain,
972 provider_instance=self.instance_id,
973 url="https://open.spotify.com/collection/tracks",
974 is_unique=True, # liked songs is user-specific
975 )
976 },
977 )
978
979 liked_songs.is_editable = False # TODO Editing requires special endpoints
980
981 # Add image to the playlist metadata
982 image = MediaItemImage(
983 type=ImageType.THUMB,
984 path="https://misc.scdn.co/liked-songs/liked-songs-64.png",
985 provider=self.instance_id,
986 remotely_accessible=True,
987 )
988 if liked_songs.metadata.images is None:
989 liked_songs.metadata.images = UniqueList([image])
990 else:
991 liked_songs.metadata.add_image(image)
992
993 return liked_songs
994
995 async def _playlist_requires_global_token(self, prov_playlist_id: str) -> bool:
996 """Check if a playlist requires global token (cached).
997
998 :param prov_playlist_id: The Spotify playlist ID.
999 :returns: True if the playlist requires global token.
1000 """
1001 cache_key = f"playlist_global_token_{prov_playlist_id}"
1002 return bool(await self.mass.cache.get(cache_key, provider=self.instance_id))
1003
1004 async def _set_playlist_requires_global_token(self, prov_playlist_id: str) -> None:
1005 """Mark a playlist as requiring global token in cache.
1006
1007 :param prov_playlist_id: The Spotify playlist ID.
1008 """
1009 cache_key = f"playlist_global_token_{prov_playlist_id}"
1010 # Cache for 90 days - playlist ownership doesn't change
1011 await self.mass.cache.set(cache_key, True, provider=self.instance_id, expiration=86400 * 90)
1012
1013 async def _add_audiobook_chapters(self, audiobook: Audiobook) -> None:
1014 """Add chapter metadata to an audiobook from Spotify API data."""
1015 try:
1016 chapters_data = await self._get_audiobook_chapters_data(audiobook.item_id)
1017 if chapters_data:
1018 chapters = []
1019 total_duration_seconds = 0.0
1020
1021 for idx, chapter in enumerate(chapters_data):
1022 duration_ms = chapter.get("duration_ms", 0)
1023 duration_seconds = duration_ms / 1000.0
1024
1025 chapter_obj = MediaItemChapter(
1026 position=idx + 1,
1027 name=chapter.get("name", f"Chapter {idx + 1}"),
1028 start=total_duration_seconds,
1029 end=total_duration_seconds + duration_seconds,
1030 )
1031 chapters.append(chapter_obj)
1032 total_duration_seconds += duration_seconds
1033
1034 audiobook.metadata.chapters = chapters
1035 audiobook.duration = int(total_duration_seconds)
1036
1037 except (MediaNotFoundError, ResourceTemporarilyUnavailable, ProviderUnavailableError) as e:
1038 self.logger.warning(f"Failed to get chapters for audiobook {audiobook.item_id}: {e}")
1039
1040 @use_cache(43200) # 12 hours - balances freshness with performance
1041 async def _get_podcast_episodes_data(self, prov_podcast_id: str) -> list[dict[str, Any]]:
1042 """Get raw episode data from Spotify API (cached).
1043
1044 Args:
1045 prov_podcast_id: Spotify podcast ID
1046
1047 Returns:
1048 List of episode data dictionaries
1049 """
1050 episodes_data: list[dict[str, Any]] = []
1051
1052 try:
1053 async for item in self._get_all_items(
1054 f"shows/{prov_podcast_id}/episodes", market="from_token"
1055 ):
1056 if item and item.get("id"):
1057 episodes_data.append(item)
1058 except MediaNotFoundError:
1059 self.logger.warning("Podcast %s not found", prov_podcast_id)
1060 return []
1061 except ResourceTemporarilyUnavailable as err:
1062 self.logger.warning(
1063 "Temporary error fetching episodes for %s: %s", prov_podcast_id, err
1064 )
1065 raise
1066
1067 return episodes_data
1068
1069 @use_cache(7200) # 2 hours - shorter cache for resume point data
1070 async def _get_audiobook_chapters_data(self, prov_audiobook_id: str) -> list[dict[str, Any]]:
1071 """Get raw chapter data from Spotify API (cached).
1072
1073 Args:
1074 prov_audiobook_id: Spotify audiobook ID
1075
1076 Returns:
1077 List of chapter data dictionaries
1078 """
1079 chapters_data: list[dict[str, Any]] = []
1080
1081 try:
1082 async for item in self._get_all_items(
1083 f"audiobooks/{prov_audiobook_id}/chapters", market="from_token"
1084 ):
1085 if item and item.get("id"):
1086 chapters_data.append(item)
1087 except MediaNotFoundError:
1088 self.logger.warning("Audiobook %s not found", prov_audiobook_id)
1089 return []
1090 except ResourceTemporarilyUnavailable as err:
1091 self.logger.warning(
1092 "Temporary error fetching chapters for %s: %s", prov_audiobook_id, err
1093 )
1094 raise
1095
1096 return chapters_data
1097
1098 async def _get_all_items(
1099 self, endpoint: str, key: str = "items", **kwargs: Any
1100 ) -> AsyncGenerator[dict[str, Any], None]:
1101 """Get all items from a paged list."""
1102 limit = 50
1103 offset = 0
1104 # do single request to get the etag (which we use as checksum for caching)
1105 cache_checksum = await self._get_etag(endpoint, limit=1, offset=0, **kwargs)
1106 while True:
1107 result = await self._get_data_with_caching(
1108 endpoint, cache_checksum=cache_checksum, limit=limit, offset=offset, **kwargs
1109 )
1110 offset += limit
1111 if not result or key not in result or not result[key]:
1112 break
1113 for item in result[key]:
1114 yield item
1115 if len(result[key]) < limit:
1116 break
1117
1118 async def _get_data_with_caching(
1119 self, endpoint: str, cache_checksum: str | None, **kwargs: Any
1120 ) -> dict[str, Any]:
1121 """Get data from api with caching."""
1122 cache_key_parts = [endpoint]
1123 for key in sorted(kwargs.keys()):
1124 cache_key_parts.append(f"{key}{kwargs[key]}")
1125 cache_key = ".".join(map(str, cache_key_parts))
1126 if cached := await self.mass.cache.get(
1127 cache_key, provider=self.instance_id, checksum=cache_checksum, allow_bypass=False
1128 ):
1129 return cast("dict[str, Any]", cached)
1130 result = await self._get_data(endpoint, **kwargs)
1131 await self.mass.cache.set(
1132 cache_key, result, provider=self.instance_id, checksum=cache_checksum
1133 )
1134 return result
1135
1136 @use_cache(120, allow_bypass=False) # short cache for etags (subsequent calls use cached data)
1137 async def _get_etag(self, endpoint: str, **kwargs: Any) -> str | None:
1138 """Get etag for api endpoint."""
1139 _res = await self._get_data(endpoint, **kwargs)
1140 return _res.get("etag")
1141
1142 @throttle_with_retries
1143 async def _get_data(self, endpoint: str, **kwargs: Any) -> dict[str, Any]:
1144 """Get data from api.
1145
1146 :param endpoint: API endpoint to call.
1147 :param use_global_session: Force use of global session (for features not available on dev).
1148 """
1149 url = f"https://api.spotify.com/v1/{endpoint}"
1150 kwargs["market"] = "from_token"
1151 kwargs["country"] = "from_token"
1152 use_global_session = kwargs.pop("use_global_session", False)
1153 if not (auth_info := kwargs.pop("auth_info", None)):
1154 auth_info = await self._get_auth_info(use_global_session=use_global_session)
1155 headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
1156 locale = self.mass.metadata.locale.replace("_", "-")
1157 language = locale.split("-")[0]
1158 headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
1159 self.logger.debug("handling get data %s with kwargs %s", url, kwargs)
1160 async with (
1161 self.mass.http_session.get(
1162 url,
1163 headers=headers,
1164 params=kwargs,
1165 timeout=aiohttp.ClientTimeout(total=120),
1166 ) as response,
1167 ):
1168 # handle spotify rate limiter
1169 if response.status == 429:
1170 backoff_time = int(response.headers["Retry-After"])
1171 raise ResourceTemporarilyUnavailable(
1172 "Spotify Rate Limiter", backoff_time=backoff_time
1173 )
1174 # handle temporary server error
1175 if response.status in (502, 503):
1176 raise ResourceTemporarilyUnavailable(backoff_time=30)
1177
1178 # handle token expired, raise ResourceTemporarilyUnavailable
1179 # so it will be retried (and the token refreshed)
1180 if response.status == 401:
1181 if use_global_session or not self.dev_session_active:
1182 self._auth_info_global = None
1183 else:
1184 self._auth_info_dev = None
1185 raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
1186
1187 # handle 404 not found, convert to MediaNotFoundError
1188 if response.status in (400, 404):
1189 raise MediaNotFoundError(f"{endpoint} not found")
1190 response.raise_for_status()
1191 result: dict[str, Any] = await response.json(loads=json_loads)
1192 if etag := response.headers.get("ETag"):
1193 result["etag"] = etag
1194 return result
1195
1196 @throttle_with_retries
1197 async def _delete_data(self, endpoint: str, data: Any = None, **kwargs: Any) -> None:
1198 """Delete data from api."""
1199 url = f"https://api.spotify.com/v1/{endpoint}"
1200 use_global_session = kwargs.pop("use_global_session", False)
1201 if not (auth_info := kwargs.pop("auth_info", None)):
1202 auth_info = await self._get_auth_info(use_global_session=use_global_session)
1203 headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
1204 async with self.mass.http_session.delete(
1205 url, headers=headers, params=kwargs, json=data, ssl=True
1206 ) as response:
1207 # handle spotify rate limiter
1208 if response.status == 429:
1209 backoff_time = int(response.headers["Retry-After"])
1210 raise ResourceTemporarilyUnavailable(
1211 "Spotify Rate Limiter", backoff_time=backoff_time
1212 )
1213 # handle token expired, raise ResourceTemporarilyUnavailable
1214 # so it will be retried (and the token refreshed)
1215 if response.status == 401:
1216 if use_global_session or not self.dev_session_active:
1217 self._auth_info_global = None
1218 else:
1219 self._auth_info_dev = None
1220 raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
1221 # handle temporary server error
1222 if response.status in (502, 503):
1223 raise ResourceTemporarilyUnavailable(backoff_time=30)
1224 response.raise_for_status()
1225
1226 @throttle_with_retries
1227 async def _put_data(self, endpoint: str, data: Any = None, **kwargs: Any) -> None:
1228 """Put data on api."""
1229 url = f"https://api.spotify.com/v1/{endpoint}"
1230 use_global_session = kwargs.pop("use_global_session", False)
1231 if not (auth_info := kwargs.pop("auth_info", None)):
1232 auth_info = await self._get_auth_info(use_global_session=use_global_session)
1233 headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
1234 async with self.mass.http_session.put(
1235 url, headers=headers, params=kwargs, json=data, ssl=True
1236 ) as response:
1237 # handle spotify rate limiter
1238 if response.status == 429:
1239 backoff_time = int(response.headers["Retry-After"])
1240 raise ResourceTemporarilyUnavailable(
1241 "Spotify Rate Limiter", backoff_time=backoff_time
1242 )
1243 # handle token expired, raise ResourceTemporarilyUnavailable
1244 # so it will be retried (and the token refreshed)
1245 if response.status == 401:
1246 if use_global_session or not self.dev_session_active:
1247 self._auth_info_global = None
1248 else:
1249 self._auth_info_dev = None
1250 raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
1251
1252 # handle temporary server error
1253 if response.status in (502, 503):
1254 raise ResourceTemporarilyUnavailable(backoff_time=30)
1255 response.raise_for_status()
1256
1257 @throttle_with_retries
1258 async def _post_data(
1259 self, endpoint: str, data: Any = None, want_result: bool = True, **kwargs: Any
1260 ) -> dict[str, Any]:
1261 """Post data on api."""
1262 url = f"https://api.spotify.com/v1/{endpoint}"
1263 use_global_session = kwargs.pop("use_global_session", False)
1264 if not (auth_info := kwargs.pop("auth_info", None)):
1265 auth_info = await self._get_auth_info(use_global_session=use_global_session)
1266 headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
1267 async with self.mass.http_session.post(
1268 url, headers=headers, params=kwargs, json=data, ssl=True
1269 ) as response:
1270 # handle spotify rate limiter
1271 if response.status == 429:
1272 backoff_time = int(response.headers["Retry-After"])
1273 raise ResourceTemporarilyUnavailable(
1274 "Spotify Rate Limiter", backoff_time=backoff_time
1275 )
1276 # handle token expired, raise ResourceTemporarilyUnavailable
1277 # so it will be retried (and the token refreshed)
1278 if response.status == 401:
1279 if use_global_session or not self.dev_session_active:
1280 self._auth_info_global = None
1281 else:
1282 self._auth_info_dev = None
1283 raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
1284 # handle temporary server error
1285 if response.status in (502, 503):
1286 raise ResourceTemporarilyUnavailable(backoff_time=30)
1287 response.raise_for_status()
1288 if not want_result:
1289 return {}
1290 result: dict[str, Any] = await response.json(loads=json_loads)
1291 return result
1292
1293 def _fix_create_playlist_api_bug(self, playlist_obj: dict[str, Any]) -> None:
1294 """Fix spotify API bug where incorrect owner id is returned from Create Playlist."""
1295 if self._sp_user is None:
1296 raise LoginFailed("User info not available - not logged in")
1297
1298 if playlist_obj["owner"]["id"] != self._sp_user["id"]:
1299 playlist_obj["owner"]["id"] = self._sp_user["id"]
1300 playlist_obj["owner"]["display_name"] = self._sp_user["display_name"]
1301 else:
1302 self.logger.warning(
1303 "FIXME: Spotify have fixed their Create Playlist API, this fix can be removed."
1304 )
1305
1306 async def _test_audiobook_support(self) -> bool:
1307 """Test if audiobooks are supported in user's region."""
1308 try:
1309 await self._get_data("me/audiobooks", limit=1)
1310 return True
1311 except aiohttp.ClientResponseError as e:
1312 if e.status == 403:
1313 return False # Not available
1314 raise # Re-raise other HTTP errors
1315 except (MediaNotFoundError, ProviderUnavailableError):
1316 return False
1317