/
/
/
1"""Audible provider for Music Assistant, utilizing the audible library."""
2
3from __future__ import annotations
4
5import asyncio
6import os
7from collections.abc import AsyncGenerator, Sequence
8from logging import getLevelName
9from typing import TYPE_CHECKING, cast
10from urllib.parse import quote, unquote
11from uuid import uuid4
12
13import audible
14from music_assistant_models.config_entries import (
15 ConfigEntry,
16 ConfigValueOption,
17 ConfigValueType,
18 ProviderConfig,
19)
20from music_assistant_models.enums import ConfigEntryType, EventType, MediaType, ProviderFeature
21from music_assistant_models.errors import LoginFailed, MediaNotFoundError
22from music_assistant_models.media_items import BrowseFolder, ItemMapping
23
24from music_assistant.models.music_provider import MusicProvider
25from music_assistant.providers.audible.audible_helper import (
26 AudibleHelper,
27 audible_custom_login,
28 audible_get_auth_info,
29 cached_authenticator_from_file,
30 check_file_exists,
31 refresh_access_token_compat,
32 remove_file,
33)
34
35if TYPE_CHECKING:
36 from music_assistant_models.media_items import (
37 Audiobook,
38 MediaItemType,
39 Podcast,
40 PodcastEpisode,
41 )
42 from music_assistant_models.provider import ProviderManifest
43 from music_assistant_models.streamdetails import StreamDetails
44
45 from music_assistant.mass import MusicAssistant
46 from music_assistant.models import ProviderInstanceType
47
48
49# Constants for config actions
50CONF_ACTION_AUTH = "authenticate"
51CONF_ACTION_VERIFY = "verify_link"
52CONF_ACTION_CLEAR_AUTH = "clear_auth"
53CONF_AUTH_FILE = "auth_file"
54CONF_POST_LOGIN_URL = "post_login_url"
55CONF_CODE_VERIFIER = "code_verifier"
56CONF_SERIAL = "serial"
57CONF_LOGIN_URL = "login_url"
58CONF_LOCALE = "locale"
59
60SUPPORTED_FEATURES = {
61 ProviderFeature.BROWSE,
62 ProviderFeature.LIBRARY_AUDIOBOOKS,
63 ProviderFeature.LIBRARY_PODCASTS,
64}
65
66
67async def setup(
68 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
69) -> ProviderInstanceType:
70 """Initialize provider(instance) with given configuration."""
71 return Audibleprovider(mass, manifest, config, SUPPORTED_FEATURES)
72
73
74async def get_config_entries(
75 mass: MusicAssistant,
76 instance_id: str | None = None, # noqa: ARG001
77 action: str | None = None,
78 values: dict[str, ConfigValueType] | None = None,
79) -> tuple[ConfigEntry, ...]:
80 """
81 Return Config entries to setup this provider.
82
83 instance_id: id of an existing provider instance (None if new instance setup).
84 action: [optional] action key called from config entries UI.
85 values: the (intermediate) raw values for config entries sent with the action.
86 """
87 if values is None:
88 values = {}
89
90 locale = cast("str", values.get("locale", "") or "us")
91 auth_file = cast("str", values.get(CONF_AUTH_FILE))
92
93 auth_required = True
94 if auth_file and await check_file_exists(auth_file):
95 try:
96 auth = await cached_authenticator_from_file(auth_file)
97 auth_required = False
98 except Exception:
99 auth_required = True
100 label_text = ""
101 if auth_required:
102 label_text = (
103 "You need to authenticate with Audible. Click the authenticate button below"
104 "to start the authentication process which will open in a new (popup) window,"
105 "so make sure to disable any popup blockers.\n\n"
106 "NOTE: \n"
107 "After successful login you will get a 'page not found' message - this is expected."
108 "Copy the address to the textbox below and press verify."
109 "This will register this provider as a virtual device with Audible."
110 )
111 else:
112 label_text = (
113 "Successfully authenticated with Audible."
114 "\nNote: Changing marketplace needs new authorization"
115 )
116
117 if action == CONF_ACTION_AUTH:
118 if auth_file and await check_file_exists(auth_file):
119 await remove_file(auth_file)
120 values[CONF_AUTH_FILE] = None
121 auth_file = ""
122
123 code_verifier, login_url, serial = await audible_get_auth_info(locale)
124 values[CONF_CODE_VERIFIER] = code_verifier
125 values[CONF_SERIAL] = serial
126 values[CONF_LOGIN_URL] = login_url
127 session_id = str(values["session_id"])
128 mass.signal_event(EventType.AUTH_SESSION, session_id, login_url)
129 await asyncio.sleep(15)
130
131 if action == CONF_ACTION_VERIFY:
132 code_verifier = str(values.get(CONF_CODE_VERIFIER))
133 serial = str(values.get(CONF_SERIAL))
134 post_login_url = str(values.get(CONF_POST_LOGIN_URL))
135 storage_path = mass.storage_path
136
137 try:
138 auth = await audible_custom_login(code_verifier, post_login_url, serial, locale)
139
140 # Verify signing auth was obtained (critical for stability)
141 if not (auth.adp_token and auth.device_private_key):
142 raise LoginFailed(
143 "Registration succeeded but signing keys were not obtained. "
144 "This may cause authentication issues. Please try again."
145 )
146
147 auth_file_path = os.path.join(storage_path, f"audible_auth_{uuid4().hex}.json")
148 await asyncio.to_thread(auth.to_file, auth_file_path)
149 values[CONF_AUTH_FILE] = auth_file_path
150 auth_required = False
151 except LoginFailed:
152 raise
153 except Exception as e:
154 raise LoginFailed(f"Verification failed: {e}") from e
155
156 return (
157 ConfigEntry(
158 key="label_text",
159 type=ConfigEntryType.LABEL,
160 label=label_text,
161 ),
162 ConfigEntry(
163 key=CONF_LOCALE,
164 type=ConfigEntryType.STRING,
165 label="Marketplace",
166 hidden=not auth_required,
167 required=True,
168 value=locale,
169 options=[
170 ConfigValueOption("US and all other countries not listed", "us"),
171 ConfigValueOption("Canada", "ca"),
172 ConfigValueOption("UK and Ireland", "uk"),
173 ConfigValueOption("Australia and New Zealand", "au"),
174 ConfigValueOption("France, Belgium, Switzerland", "fr"),
175 ConfigValueOption("Germany, Austria, Switzerland", "de"),
176 ConfigValueOption("Japan", "jp"),
177 ConfigValueOption("Italy", "it"),
178 ConfigValueOption("India", "in"),
179 ConfigValueOption("Spain", "es"),
180 ConfigValueOption("Brazil", "br"),
181 ],
182 default_value="us",
183 ),
184 ConfigEntry(
185 key=CONF_ACTION_AUTH,
186 type=ConfigEntryType.ACTION,
187 label="(Re)Authenticate with Audible",
188 description="This button will redirect you to Audible to authenticate.",
189 action=CONF_ACTION_AUTH,
190 ),
191 ConfigEntry(
192 key=CONF_POST_LOGIN_URL,
193 type=ConfigEntryType.STRING,
194 label="Post Login Url",
195 required=False,
196 value=cast("str | None", values.get(CONF_POST_LOGIN_URL)),
197 hidden=not auth_required,
198 ),
199 ConfigEntry(
200 key=CONF_ACTION_VERIFY,
201 type=ConfigEntryType.ACTION,
202 label="Verify Audible URL",
203 description="This button will check the url and register this provider.",
204 action=CONF_ACTION_VERIFY,
205 hidden=not auth_required,
206 ),
207 ConfigEntry(
208 key=CONF_CODE_VERIFIER,
209 type=ConfigEntryType.STRING,
210 label="Code Verifier",
211 hidden=True,
212 required=False,
213 value=cast("str | None", values.get(CONF_CODE_VERIFIER)),
214 ),
215 ConfigEntry(
216 key=CONF_SERIAL,
217 type=ConfigEntryType.STRING,
218 label="Serial",
219 hidden=True,
220 required=False,
221 value=cast("str | None", values.get(CONF_SERIAL)),
222 ),
223 ConfigEntry(
224 key=CONF_LOGIN_URL,
225 type=ConfigEntryType.STRING,
226 label="Login Url",
227 hidden=True,
228 required=False,
229 value=cast("str | None", values.get(CONF_LOGIN_URL)),
230 ),
231 ConfigEntry(
232 key=CONF_AUTH_FILE,
233 type=ConfigEntryType.STRING,
234 label="Authentication File",
235 hidden=True,
236 required=True,
237 value=cast("str | None", values.get(CONF_AUTH_FILE)),
238 ),
239 )
240
241
242class Audibleprovider(MusicProvider):
243 """Implementation of a Audible Audiobook Provider."""
244
245 locale: str
246 auth_file: str
247 _client: audible.AsyncClient | None = None
248
249 async def handle_async_init(self) -> None:
250 """Handle asynchronous initialization of the provider."""
251 self.locale = cast("str", self.config.get_value(CONF_LOCALE) or "us")
252 self.auth_file = cast("str", self.config.get_value(CONF_AUTH_FILE))
253 self._client: audible.AsyncClient | None = None
254 audible.log_helper.set_level(getLevelName(self.logger.level))
255 await self._login()
256
257 # Cache for authenticators to avoid repeated file I/O
258 _AUTH_CACHE: dict[str, audible.Authenticator] = {}
259
260 async def _login(self) -> None:
261 """Authenticate with Audible using the saved authentication file."""
262 try:
263 auth = self._AUTH_CACHE.get(self.instance_id)
264
265 if auth is None:
266 self.logger.debug("Loading authenticator from file")
267 auth = await cached_authenticator_from_file(self.auth_file)
268 self._AUTH_CACHE[self.instance_id] = auth
269 else:
270 self.logger.debug("Using cached authenticator")
271
272 # Check if we have signing auth (preferred, stable - not affected by API changes)
273 has_signing_auth = auth.adp_token and auth.device_private_key
274 if has_signing_auth:
275 self.logger.debug("Using signing auth (stable RSA-signed requests)")
276 else:
277 self.logger.debug("Signing auth not available, using bearer auth")
278
279 # Handle token refresh if needed
280 if auth.access_token_expired:
281 self.logger.debug("Access token expired, refreshing")
282 try:
283 # Use compatible refresh that handles new API token format
284 if auth.refresh_token and auth.locale:
285 refresh_data = await refresh_access_token_compat(
286 refresh_token=auth.refresh_token,
287 domain=auth.locale.domain,
288 http_session=self.mass.http_session,
289 with_username=auth.with_username or False,
290 )
291 auth._update_attrs(**refresh_data)
292 await asyncio.to_thread(auth.to_file, self.auth_file)
293 self._AUTH_CACHE[self.instance_id] = auth
294 self.logger.debug("Token refreshed successfully")
295 else:
296 self.logger.warning("Cannot refresh: missing refresh_token or locale")
297 except Exception as refresh_error:
298 self.logger.warning(f"Token refresh failed: {refresh_error}")
299 if not has_signing_auth:
300 # Only fail if we don't have signing auth as fallback
301 raise LoginFailed(
302 "Token refresh failed and signing auth not available. "
303 "Please re-authenticate with Audible."
304 ) from refresh_error
305 # Continue with signing auth
306
307 self._client = audible.AsyncClient(auth)
308
309 self.helper = AudibleHelper(
310 mass=self.mass,
311 client=self._client,
312 provider_instance=self.instance_id,
313 provider_domain=self.domain,
314 logger=self.logger,
315 )
316
317 self.logger.info("Successfully authenticated with Audible.")
318
319 except LoginFailed:
320 raise
321 except Exception as e:
322 self.logger.error(f"Failed to authenticate with Audible: {e}")
323 raise LoginFailed(f"Failed to authenticate with Audible: {e}") from e
324
325 @property
326 def is_streaming_provider(self) -> bool:
327 """Return True if the provider is a streaming provider."""
328 return True
329
330 async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
331 """Get all audiobooks from the library."""
332 async for audiobook in self.helper.get_library():
333 yield audiobook
334
335 async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
336 """Get full audiobook details by id."""
337 return await self.helper.get_audiobook(asin=prov_audiobook_id, use_cache=False)
338
339 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
340 """Browse this provider's items.
341
342 :param path: The path to browse, (e.g. provider_id://authors).
343 """
344 item_path = path.split("://", 1)[1] if "://" in path else ""
345 parts = item_path.split("/") if item_path else []
346
347 # Root - return main folders
348 if not item_path:
349 return self._browse_root(path)
350
351 # Authors listing
352 if parts[0] == "authors":
353 if len(parts) == 1:
354 return await self._browse_authors(path)
355 # Specific author's books
356 return await self._browse_author_books(unquote(parts[1]))
357
358 # Series listing
359 if parts[0] == "series":
360 if len(parts) == 1:
361 return await self._browse_series(path)
362 # Specific series' books
363 return await self._browse_series_books(unquote(parts[1]))
364
365 # Narrators listing
366 if parts[0] == "narrators":
367 if len(parts) == 1:
368 return await self._browse_narrators(path)
369 return await self._browse_narrator_books(unquote(parts[1]))
370
371 # Genres listing
372 if parts[0] == "genres":
373 if len(parts) == 1:
374 return await self._browse_genres(path)
375 return await self._browse_genre_books(unquote(parts[1]))
376
377 # Publishers listing
378 if parts[0] == "publishers":
379 if len(parts) == 1:
380 return await self._browse_publishers(path)
381 return await self._browse_publisher_books(unquote(parts[1]))
382
383 # Fall back to base implementation for audiobooks/podcasts
384 return await super().browse(path)
385
386 def _browse_root(self, base_path: str) -> list[BrowseFolder]:
387 """Return root browse folders."""
388 return [
389 BrowseFolder(
390 item_id="audiobooks",
391 provider=self.instance_id,
392 path=f"{base_path}audiobooks",
393 name="",
394 translation_key="audiobooks",
395 ),
396 BrowseFolder(
397 item_id="podcasts",
398 provider=self.instance_id,
399 path=f"{base_path}podcasts",
400 name="",
401 translation_key="podcasts",
402 ),
403 BrowseFolder(
404 item_id="authors",
405 provider=self.instance_id,
406 path=f"{base_path}authors",
407 name="Authors",
408 ),
409 BrowseFolder(
410 item_id="series",
411 provider=self.instance_id,
412 path=f"{base_path}series",
413 name="Series",
414 ),
415 BrowseFolder(
416 item_id="narrators",
417 provider=self.instance_id,
418 path=f"{base_path}narrators",
419 name="Narrators",
420 ),
421 BrowseFolder(
422 item_id="genres",
423 provider=self.instance_id,
424 path=f"{base_path}genres",
425 name="Genres",
426 ),
427 BrowseFolder(
428 item_id="publishers",
429 provider=self.instance_id,
430 path=f"{base_path}publishers",
431 name="Publishers",
432 ),
433 ]
434
435 async def _browse_authors(self, base_path: str) -> list[BrowseFolder]:
436 """Return list of all authors."""
437 authors = await self.helper.get_authors()
438 return [
439 BrowseFolder(
440 item_id=asin,
441 provider=self.instance_id,
442 path=f"{base_path}/{quote(asin)}",
443 name=name,
444 )
445 for asin, name in sorted(authors.items(), key=lambda x: x[1])
446 ]
447
448 async def _browse_author_books(self, author_asin: str) -> list[Audiobook]:
449 """Return audiobooks by a specific author."""
450 return await self.helper.get_audiobooks_by_author(author_asin)
451
452 async def _browse_series(self, base_path: str) -> list[BrowseFolder]:
453 """Return list of all series."""
454 series = await self.helper.get_series()
455 return [
456 BrowseFolder(
457 item_id=asin,
458 provider=self.instance_id,
459 path=f"{base_path}/{quote(asin)}",
460 name=title,
461 )
462 for asin, title in sorted(series.items(), key=lambda x: x[1])
463 ]
464
465 async def _browse_series_books(self, series_asin: str) -> list[Audiobook]:
466 """Return audiobooks in a specific series."""
467 return await self.helper.get_audiobooks_by_series(series_asin)
468
469 async def _browse_narrators(self, base_path: str) -> list[BrowseFolder]:
470 """Return list of all narrators."""
471 narrators = await self.helper.get_narrators()
472 return [
473 BrowseFolder(
474 item_id=asin,
475 provider=self.instance_id,
476 path=f"{base_path}/{quote(asin)}",
477 name=name,
478 )
479 for asin, name in sorted(narrators.items(), key=lambda x: x[1])
480 ]
481
482 async def _browse_narrator_books(self, narrator_asin: str) -> list[Audiobook]:
483 """Return audiobooks by a specific narrator."""
484 return await self.helper.get_audiobooks_by_narrator(narrator_asin)
485
486 async def _browse_genres(self, base_path: str) -> list[BrowseFolder]:
487 """Return list of all genres."""
488 genres = await self.helper.get_genres()
489 return [
490 BrowseFolder(
491 item_id=genre,
492 provider=self.instance_id,
493 path=f"{base_path}/{quote(genre)}",
494 name=genre,
495 )
496 for genre in sorted(genres)
497 ]
498
499 async def _browse_genre_books(self, genre: str) -> list[Audiobook]:
500 """Return audiobooks matching a genre."""
501 return await self.helper.get_audiobooks_by_genre(genre)
502
503 async def _browse_publishers(self, base_path: str) -> list[BrowseFolder]:
504 """Return list of all publishers."""
505 publishers = await self.helper.get_publishers()
506 return [
507 BrowseFolder(
508 item_id=publisher,
509 provider=self.instance_id,
510 path=f"{base_path}/{quote(publisher)}",
511 name=publisher,
512 )
513 for publisher in sorted(publishers)
514 ]
515
516 async def _browse_publisher_books(self, publisher: str) -> list[Audiobook]:
517 """Return audiobooks from a specific publisher."""
518 return await self.helper.get_audiobooks_by_publisher(publisher)
519
520 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
521 """Get all podcasts from the library."""
522 async for podcast in self.helper.get_library_podcasts():
523 yield podcast
524
525 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
526 """Get full podcast details by id."""
527 return await self.helper.get_podcast(asin=prov_podcast_id)
528
529 async def get_podcast_episodes(
530 self, prov_podcast_id: str
531 ) -> AsyncGenerator[PodcastEpisode, None]:
532 """Get all episodes for a podcast."""
533 async for episode in self.helper.get_podcast_episodes(prov_podcast_id):
534 yield episode
535
536 async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
537 """Get full podcast episode details by id."""
538 return await self.helper.get_podcast_episode(prov_episode_id)
539
540 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
541 """Get stream details for an audiobook or podcast episode.
542
543 :param item_id: The ASIN of the audiobook or podcast episode.
544 :param media_type: The type of media (audiobook or podcast episode).
545 """
546 try:
547 return await self.helper.get_stream(asin=item_id, media_type=media_type)
548 except ValueError as exc:
549 raise MediaNotFoundError(f"Failed to get stream details for {item_id}") from exc
550
551 async def on_played(
552 self,
553 media_type: MediaType,
554 prov_item_id: str,
555 fully_played: bool,
556 position: int,
557 media_item: MediaItemType,
558 is_playing: bool = False,
559 ) -> None:
560 """
561 Handle callback when a (playable) media item has been played.
562
563 This is called by the Queue controller when;
564 - a track has been fully played
565 - a track has been stopped (or skipped) after being played
566 - every 30s when a track is playing
567
568 Fully played is True when the track has been played to the end.
569
570 Position is the last known position of the track in seconds, to sync resume state.
571 When fully_played is set to false and position is 0,
572 the user marked the item as unplayed in the UI.
573
574 is_playing is True when the track is currently playing.
575
576 media_item is the full media item details of the played/playing track.
577 """
578 await self.helper.set_last_position(prov_item_id, position, media_type)
579
580 async def unload(self, is_removed: bool = False) -> None:
581 """
582 Handle unload/close of the provider.
583
584 Called when provider is deregistered (e.g. MA exiting or config reloading).
585 is_removed will be set to True when the provider is removed from the configuration.
586 """
587 if is_removed:
588 await self.helper.deregister()
589