/
/
/
1"""Helper for parsing and using audible api."""
2
3from __future__ import annotations
4
5import asyncio
6import hashlib
7import html
8import json
9import logging
10import os
11import re
12from collections.abc import AsyncGenerator
13from contextlib import suppress
14from datetime import UTC, datetime, timedelta
15from os import PathLike
16from typing import TYPE_CHECKING, Any
17from urllib.parse import parse_qs, urlparse
18
19import audible
20import audible.register
21from audible import AsyncClient
22
23if TYPE_CHECKING:
24 from aiohttp import ClientSession
25from music_assistant_models.enums import ContentType, ImageType, MediaType, StreamType
26from music_assistant_models.errors import LoginFailed, MediaNotFoundError
27from music_assistant_models.media_items import (
28 Audiobook,
29 AudioFormat,
30 ItemMapping,
31 MediaItemChapter,
32 MediaItemImage,
33 Podcast,
34 PodcastEpisode,
35 ProviderMapping,
36 UniqueList,
37)
38from music_assistant_models.streamdetails import StreamDetails
39
40from music_assistant.mass import MusicAssistant
41
42CACHE_DOMAIN = "audible"
43CACHE_CATEGORY_API = 0
44CACHE_CATEGORY_AUDIOBOOK = 1
45CACHE_CATEGORY_CHAPTERS = 2
46CACHE_CATEGORY_PODCAST = 3
47CACHE_CATEGORY_PODCAST_EPISODES = 4
48
49# Content delivery types
50AUDIOBOOK_CONTENT_TYPES = ("SinglePartBook", "MultiPartBook")
51PODCAST_CONTENT_TYPES = ("PodcastParent",)
52
53_AUTH_CACHE: dict[str, audible.Authenticator] = {}
54
55
56async def refresh_access_token_compat(
57 refresh_token: str, domain: str, http_session: ClientSession, with_username: bool = False
58) -> dict[str, Any]:
59 """Refresh tokens with compatibility for new Audible API format.
60
61 The Audible API changed from returning 'access_token' to 'actor_access_token'.
62 This function handles both formats for backward compatibility.
63
64 :param refresh_token: The refresh token obtained after device registration.
65 :param domain: The top level domain (e.g., com, de).
66 :param http_session: The HTTP client session to use for requests.
67 :param with_username: If True, use audible domain instead of amazon.
68 :return: Dict with access_token and expires timestamp.
69 """
70 logger = logging.getLogger("audible_helper")
71
72 body = {
73 "app_name": "Audible",
74 "app_version": "3.56.2",
75 "source_token": refresh_token,
76 "requested_token_type": "access_token",
77 "source_token_type": "refresh_token",
78 }
79
80 target_domain = "audible" if with_username else "amazon"
81 url = f"https://api.{target_domain}.{domain}/auth/token"
82
83 async with http_session.post(url, data=body) as resp:
84 resp.raise_for_status()
85 resp_dict = await resp.json()
86
87 expires_in_sec = int(resp_dict.get("expires_in", 3600))
88 expires = (datetime.now(UTC) + timedelta(seconds=expires_in_sec)).timestamp()
89
90 # Handle new format (actor_access_token) or fall back to legacy (access_token)
91 access_token = resp_dict.get("actor_access_token") or resp_dict.get("access_token")
92
93 if not access_token:
94 logger.error("Token refresh response missing both actor_access_token and access_token")
95 raise LoginFailed("Token refresh failed: no access token in response")
96
97 logger.debug(
98 "Token refreshed successfully using %s format",
99 "new (actor)" if "actor_access_token" in resp_dict else "legacy",
100 )
101
102 return {"access_token": access_token, "expires": expires}
103
104
105async def cached_authenticator_from_file(path: str) -> audible.Authenticator:
106 """Get an authenticator from file with caching and signing auth validation.
107
108 :param path: Path to the authenticator JSON file.
109 :return: The cached or loaded Authenticator instance.
110 """
111 logger = logging.getLogger("audible_helper")
112 if path in _AUTH_CACHE:
113 return _AUTH_CACHE[path]
114
115 logger.debug("Loading authenticator from file %s and caching it", path)
116 auth = await asyncio.to_thread(audible.Authenticator.from_file, path)
117
118 # Verify signing auth is available (not affected by API changes)
119 if auth.adp_token and auth.device_private_key:
120 logger.debug("Signing auth available - using stable RSA-signed requests")
121 else:
122 logger.warning(
123 "Signing auth not available - only bearer auth will work. "
124 "Consider re-authenticating for more stable auth."
125 )
126
127 _AUTH_CACHE[path] = auth
128 return auth
129
130
131class AudibleHelper:
132 """Helper for parsing and using audible api."""
133
134 def __init__(
135 self,
136 mass: MusicAssistant,
137 client: AsyncClient,
138 provider_domain: str,
139 provider_instance: str,
140 logger: logging.Logger | None = None,
141 ):
142 """Initialize the Audible Helper."""
143 self.mass = mass
144 self.client = client
145 self.provider_domain = provider_domain
146 self.provider_instance = provider_instance
147 self.logger = logger or logging.getLogger("audible_helper")
148 self._acr_cache: dict[tuple[str, MediaType], str] = {}
149
150 async def _fetch_library_items(
151 self,
152 response_groups: str,
153 content_types: tuple[str, ...],
154 ) -> AsyncGenerator[dict[str, Any], None]:
155 """Fetch items from the library with pagination."""
156 page = 1
157 page_size = 50
158 total_processed = 0
159 max_iterations = 100
160 iteration = 0
161
162 while iteration < max_iterations:
163 iteration += 1
164 self.logger.debug(
165 "Audible: Fetching library page %s (processed so far: %s)",
166 page,
167 total_processed,
168 )
169
170 library = await self._call_api(
171 "library",
172 use_cache=False,
173 response_groups=response_groups,
174 page=page,
175 num_results=page_size,
176 )
177
178 items = library.get("items", [])
179
180 if not items:
181 break
182
183 items_processed_this_page = 0
184 for item in items:
185 # Filter by content type if specified
186 if content_types and item.get("content_delivery_type") not in content_types:
187 continue
188
189 yield item
190 items_processed_this_page += 1
191 total_processed += 1
192
193 self.logger.debug(
194 "Audible: Processed %s items on page %s", items_processed_this_page, page
195 )
196
197 page += 1
198 if len(items) < page_size:
199 break
200
201 if iteration >= max_iterations:
202 self.logger.warning(
203 "Audible: Reached maximum iteration limit (%s) with %s items processed",
204 max_iterations,
205 total_processed,
206 )
207
208 async def _process_audiobook_item(self, audiobook_data: dict[str, Any]) -> Audiobook | None:
209 """Process a single audiobook item from the library."""
210 # Ensure asin is a valid string
211 asin = str(audiobook_data.get("asin", ""))
212 cached_book = None
213 if asin:
214 cached_book = await self.mass.cache.get(
215 key=asin,
216 provider=self.provider_instance,
217 category=CACHE_CATEGORY_AUDIOBOOK,
218 default=None,
219 )
220
221 try:
222 if cached_book is not None:
223 return self._parse_audiobook(cached_book)
224 return self._parse_audiobook(audiobook_data)
225 except MediaNotFoundError as exc:
226 self.logger.warning(f"Skipping invalid audiobook: {exc}")
227 return None
228 except Exception as exc:
229 self.logger.warning(
230 f"Error processing audiobook {audiobook_data.get('asin', 'unknown')}: {exc}"
231 )
232 return None
233
234 async def get_library(self) -> AsyncGenerator[Audiobook, None]:
235 """Fetch the user's library with pagination."""
236 response_groups = [
237 "contributors",
238 "media",
239 "product_attrs",
240 "product_desc",
241 "product_details",
242 "product_extended_attrs",
243 ]
244
245 async for item in self._fetch_library_items(
246 ",".join(response_groups), AUDIOBOOK_CONTENT_TYPES
247 ):
248 if album := await self._process_audiobook_item(item):
249 yield album
250
251 async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook:
252 """Fetch the full audiobook by asin with all details including chapters.
253
254 This method fetches complete audiobook details including chapters and resume position.
255 Use this when the user requests full details for a specific audiobook.
256 """
257 if use_cache:
258 cached_book = await self.mass.cache.get(
259 key=asin,
260 provider=self.provider_instance,
261 category=CACHE_CATEGORY_AUDIOBOOK,
262 default=None,
263 )
264 if cached_book is not None:
265 book = self._parse_audiobook(cached_book)
266 # Enrich with chapters and resume position
267 await self._enrich_audiobook(book, asin)
268 return book
269 response = await self._call_api(
270 f"library/{asin}",
271 response_groups="""
272 contributors, media, price, product_attrs, product_desc, product_details,
273 product_extended_attrs,is_finished
274 """,
275 )
276
277 if response is None:
278 raise MediaNotFoundError(f"Audiobook with ASIN {asin} not found")
279
280 item_data = response.get("item")
281 if item_data is None:
282 raise MediaNotFoundError(f"Audiobook data for ASIN {asin} is empty")
283
284 await self.mass.cache.set(
285 key=asin,
286 provider=self.provider_instance,
287 category=CACHE_CATEGORY_AUDIOBOOK,
288 data=item_data,
289 )
290 book = self._parse_audiobook(item_data)
291 # Enrich with chapters and resume position
292 await self._enrich_audiobook(book, asin)
293 return book
294
295 async def _enrich_audiobook(self, book: Audiobook, asin: str) -> None:
296 """Enrich audiobook with chapters and resume position.
297
298 This makes additional API calls and should only be used for full audiobook details,
299 not during library sync.
300 """
301 # Fetch chapters
302 chapters_data = await self._fetch_chapters(asin=asin)
303 if chapters_data:
304 chapters: list[MediaItemChapter] = [
305 self._parse_chapter_data(chapter, idx) for idx, chapter in enumerate(chapters_data)
306 ]
307 book.metadata.chapters = chapters
308 # Update duration from chapters if available (more accurate)
309 try:
310 duration = sum(chapter.get("length_ms", 0) for chapter in chapters_data) / 1000
311 if duration > 0:
312 book.duration = duration
313 except Exception as exc:
314 self.logger.warning(f"Error calculating duration from chapters for {asin}: {exc}")
315
316 # Fetch resume position
317 book.resume_position_ms = await self.get_last_postion(asin=asin)
318
319 async def get_stream(
320 self, asin: str, media_type: MediaType = MediaType.AUDIOBOOK
321 ) -> StreamDetails:
322 """Get stream details for an audiobook or podcast episode.
323
324 :param asin: The ASIN of the content.
325 :param media_type: The type of media (audiobook or podcast episode).
326 """
327 if not asin:
328 self.logger.error("Invalid ASIN provided to get_stream")
329 raise ValueError("Invalid ASIN provided to get_stream")
330
331 duration = 0
332 # For audiobooks, try to get duration from chapters
333 if media_type == MediaType.AUDIOBOOK:
334 chapters = await self._fetch_chapters(asin=asin)
335 if chapters:
336 try:
337 duration = sum(chapter.get("length_ms", 0) for chapter in chapters) / 1000
338 except Exception as exc:
339 self.logger.warning(f"Error calculating duration for ASIN {asin}: {exc}")
340
341 try:
342 # Podcasts use Mpeg (non-DRM MP3), audiobooks use HLS
343 if media_type == MediaType.PODCAST_EPISODE:
344 playback_info = await self.client.post(
345 f"content/{asin}/licenserequest",
346 body={
347 "consumption_type": "Streaming",
348 "drm_type": "Mpeg",
349 "quality": "High",
350 },
351 )
352 else:
353 playback_info = await self.client.post(
354 f"content/{asin}/licenserequest",
355 body={
356 "quality": "High",
357 "response_groups": "content_reference,certificate",
358 "consumption_type": "Streaming",
359 "supported_media_features": {
360 "codecs": ["mp4a.40.2", "mp4a.40.42"],
361 "drm_types": [
362 "Hls",
363 ],
364 },
365 "spatial": False,
366 },
367 )
368
369 content_license = playback_info.get("content_license", {})
370 if not content_license:
371 self.logger.error(f"No content_license in playback_info for ASIN {asin}")
372 raise ValueError(f"Missing content_license for ASIN {asin}")
373
374 content_metadata = content_license.get("content_metadata", {})
375 content_reference = content_metadata.get("content_reference", {})
376 size = content_reference.get("content_size_in_bytes", 0)
377
378 stream_url = content_license.get("license_response")
379 if not stream_url:
380 self.logger.error(f"No license_response (stream URL) for ASIN {asin}")
381 raise ValueError(f"Missing stream URL for ASIN {asin}")
382
383 acr = content_license.get("acr", "")
384 if acr:
385 self._acr_cache[(asin, media_type)] = acr
386
387 content_type = (
388 ContentType.MP3 if media_type == MediaType.PODCAST_EPISODE else ContentType.AAC
389 )
390 except Exception as exc:
391 self.logger.error(f"Error getting stream details for ASIN {asin}: {exc}")
392 raise ValueError(f"Failed to get stream details: {exc}") from exc
393
394 return StreamDetails(
395 provider=self.provider_instance,
396 size=size,
397 item_id=f"{asin}",
398 audio_format=AudioFormat(content_type=content_type),
399 media_type=media_type,
400 stream_type=StreamType.HTTP,
401 path=stream_url,
402 can_seek=True,
403 allow_seek=True,
404 duration=duration,
405 data={"acr": acr},
406 )
407
408 async def _fetch_chapters(self, asin: str) -> list[dict[str, Any]]:
409 """Fetch chapter data for an audiobook."""
410 if not asin or asin == "error":
411 self.logger.warning(
412 "Invalid ASIN provided to _fetch_chapters, returning empty chapter list"
413 )
414 return []
415
416 chapters_data: list[Any] = await self.mass.cache.get(
417 key=asin, provider=self.provider_instance, category=CACHE_CATEGORY_CHAPTERS, default=[]
418 )
419
420 if not chapters_data:
421 try:
422 response = await self._call_api(
423 f"content/{asin}/metadata",
424 response_groups="chapter_info, always-returned, content_reference, content_url",
425 chapter_titles_type="Flat",
426 )
427
428 if not response:
429 self.logger.warning(f"Failed to get metadata for ASIN {asin}")
430 return []
431
432 content_metadata = response.get("content_metadata")
433 if not content_metadata:
434 self.logger.warning(f"No content_metadata for ASIN {asin}")
435 return []
436
437 chapter_info = content_metadata.get("chapter_info")
438 if not chapter_info:
439 self.logger.warning(f"No chapter_info for ASIN {asin}")
440 return []
441
442 chapters_data = chapter_info.get("chapters") or []
443
444 await self.mass.cache.set(
445 key=asin,
446 data=chapters_data,
447 provider=self.provider_instance,
448 category=CACHE_CATEGORY_CHAPTERS,
449 )
450 except Exception as exc:
451 self.logger.error(f"Error fetching chapters for ASIN {asin}: {exc}")
452 chapters_data = []
453
454 return chapters_data
455
456 async def get_last_postion(self, asin: str) -> int:
457 """Fetch last position of asin."""
458 if not asin or asin == "error":
459 return 0
460
461 try:
462 response = await self._call_api("annotations/lastpositions", asins=asin)
463
464 if not response:
465 self.logger.debug(f"No last position data available for ASIN {asin}")
466 return 0
467
468 annotations = response.get("asin_last_position_heard_annots")
469 if not annotations or not isinstance(annotations, list) or len(annotations) == 0:
470 self.logger.debug(f"No annotations found for ASIN {asin}")
471 return 0
472
473 annotation = annotations[0]
474 if not annotation or not isinstance(annotation, dict):
475 self.logger.debug(f"Invalid annotation for ASIN {asin}")
476 return 0
477
478 last_position = annotation.get("last_position_heard")
479 if not last_position or not isinstance(last_position, dict):
480 self.logger.debug(f"Invalid last_position for ASIN {asin}")
481 return 0
482
483 position_ms = last_position.get("position_ms", 0)
484 return int(position_ms)
485
486 except Exception as exc:
487 self.logger.error(f"Error getting last position for ASIN {asin}: {exc}")
488 return 0
489
490 async def set_last_position(
491 self, asin: str, pos: int, media_type: MediaType = MediaType.AUDIOBOOK
492 ) -> None:
493 """Report last position to Audible.
494
495 :param asin: The content ID (audiobook or podcast episode).
496 :param pos: Position in seconds.
497 :param media_type: The type of media (audiobook or podcast episode).
498 """
499 if not asin or asin == "error" or pos <= 0:
500 return
501
502 try:
503 position_ms = pos * 1000
504
505 # Try to get ACR from cache first
506 acr = self._acr_cache.get((asin, media_type))
507 if not acr:
508 stream_details = await self.get_stream(asin=asin, media_type=media_type)
509 acr = stream_details.data.get("acr")
510
511 if not acr:
512 self.logger.warning(f"No ACR available for ASIN {asin}, cannot report position")
513 return
514
515 await self.client.put(
516 f"lastpositions/{asin}", body={"acr": acr, "asin": asin, "position_ms": position_ms}
517 )
518
519 self.logger.debug(f"Successfully reported position {position_ms}ms for ASIN {asin}")
520
521 except (KeyError, TypeError) as exc:
522 self.logger.error(
523 f"Error accessing data while reporting position for ASIN {asin}: {exc}"
524 )
525 except TimeoutError as exc:
526 self.logger.error(f"Timeout while reporting position for ASIN {asin}: {exc}")
527 except ConnectionError as exc:
528 self.logger.error(f"Connection error while reporting position for ASIN {asin}: {exc}")
529 except Exception as exc:
530 self.logger.error(f"Unexpected error reporting position for ASIN {asin}: {exc}")
531
532 async def _call_api(self, path: str, **kwargs: Any) -> Any:
533 response = None
534 use_cache = kwargs.pop("use_cache", False)
535 params_str = json.dumps(kwargs, sort_keys=True)
536 params_hash = hashlib.md5(params_str.encode()).hexdigest()
537 cache_key_with_params = f"{path}:{params_hash}"
538 if use_cache:
539 response = await self.mass.cache.get(
540 key=cache_key_with_params,
541 provider=self.provider_instance,
542 category=CACHE_CATEGORY_API,
543 )
544 if not response:
545 response = await self.client.get(path, **kwargs)
546 await self.mass.cache.set(
547 key=cache_key_with_params, provider=self.provider_instance, data=response
548 )
549 return response
550
551 def _parse_contributors(
552 self, contributors_list: list[dict[str, Any]] | None, default_name: str
553 ) -> list[str]:
554 """Parse contributors (authors, narrators) from API response."""
555 result: list[str] = []
556 contributors: list[dict[str, Any]] = contributors_list or []
557 if isinstance(contributors, list):
558 for contributor in contributors:
559 if contributor and isinstance(contributor, dict):
560 result.append(contributor.get("name", default_name))
561 return result
562
563 def _create_images(self, image_path: str | None) -> list[MediaItemImage]:
564 """Create image objects if image path exists."""
565 images: list[MediaItemImage] = []
566 if image_path:
567 images.append(
568 MediaItemImage(
569 type=ImageType.THUMB,
570 path=image_path,
571 provider=self.provider_instance,
572 remotely_accessible=True,
573 )
574 )
575 images.append(
576 MediaItemImage(
577 type=ImageType.CLEARART,
578 path=image_path,
579 provider=self.provider_instance,
580 remotely_accessible=True,
581 )
582 )
583 return images
584
585 def _parse_chapter_data(self, chapter_data: dict[str, Any], index: int) -> MediaItemChapter:
586 """Parse chapter data into MediaItemChapter object."""
587 try:
588 start = int(chapter_data.get("start_offset_sec", 0))
589 except (TypeError, ValueError):
590 start = 0
591
592 try:
593 length = int(chapter_data.get("length_ms", 0)) / 1000
594 except (TypeError, ValueError):
595 length = 0
596
597 raw_title = chapter_data.get("title")
598 chapter_title: str
599 if raw_title is None:
600 chapter_title = f"Chapter {index + 1}"
601 elif isinstance(raw_title, str):
602 chapter_title = raw_title
603 else:
604 chapter_title = str(raw_title)
605
606 return MediaItemChapter(position=index, name=chapter_title, start=start, end=start + length)
607
608 def _parse_audiobook(self, audiobook_data: dict[str, Any] | None) -> Audiobook:
609 """Parse audiobook data from API response.
610
611 NOTE: This is a pure parser - no API calls allowed here.
612 Chapters and resume position are fetched lazily when needed.
613 """
614 if audiobook_data is None:
615 self.logger.error("Received None audiobook_data in _parse_audiobook")
616 raise MediaNotFoundError("Audiobook data not found")
617
618 asin = audiobook_data.get("asin", "")
619 title = audiobook_data.get("title", "")
620
621 # Parse authors and narrators
622 narrators = self._parse_contributors(audiobook_data.get("narrators"), "Unknown Narrator")
623 authors = self._parse_contributors(audiobook_data.get("authors"), "Unknown Author")
624
625 # Get duration from runtime_length_min (provided by 'media' response group)
626 # Chapters are fetched lazily when streaming, not during library sync
627 runtime_minutes = audiobook_data.get("runtime_length_min", 0)
628 duration = runtime_minutes * 60 if runtime_minutes else 0
629
630 # Create audiobook object
631 book = Audiobook(
632 item_id=asin,
633 provider=self.provider_instance,
634 name=title,
635 duration=duration,
636 provider_mappings={
637 ProviderMapping(
638 item_id=asin,
639 provider_domain=self.provider_domain,
640 provider_instance=self.provider_instance,
641 )
642 },
643 publisher=audiobook_data.get("publisher_name"),
644 authors=UniqueList(authors),
645 narrators=UniqueList(narrators),
646 )
647
648 # Set metadata
649 book.metadata.copyright = audiobook_data.get("copyright")
650 book.metadata.description = _html_to_txt(
651 str(audiobook_data.get("extended_product_description", ""))
652 )
653 book.metadata.languages = UniqueList([audiobook_data.get("language") or ""])
654 if release_date := audiobook_data.get("release_date"):
655 with suppress(ValueError):
656 datetime.strptime(release_date, "%Y-%m-%d").astimezone(UTC)
657
658 # Set review if available
659 reviews = audiobook_data.get("editorial_reviews", [])
660 if reviews and reviews[0]:
661 book.metadata.review = _html_to_txt(str(reviews[0]))
662
663 # Set genres
664 book.metadata.genres = {
665 genre.replace("_", " ") for genre in (audiobook_data.get("platinum_keywords") or [])
666 }
667
668 # Add images
669 image_path = audiobook_data.get("product_images", {}).get("500")
670 book.metadata.images = UniqueList(self._create_images(image_path))
671
672 # Chapters are not fetched during parsing - they are fetched lazily when streaming
673 # This avoids N+1 API calls during library sync
674
675 return book
676
677 async def _process_podcast_item(self, podcast_data: dict[str, Any]) -> Podcast | None:
678 """Process a single podcast item from the library."""
679 asin = str(podcast_data.get("asin", ""))
680 cached_podcast = None
681 if asin:
682 cached_podcast = await self.mass.cache.get(
683 key=asin,
684 provider=self.provider_instance,
685 category=CACHE_CATEGORY_PODCAST,
686 default=None,
687 )
688
689 try:
690 if cached_podcast is not None:
691 return self._parse_podcast(cached_podcast)
692 return self._parse_podcast(podcast_data)
693 except MediaNotFoundError as exc:
694 self.logger.warning(f"Skipping invalid podcast: {exc}")
695 return None
696 except Exception as exc:
697 self.logger.warning(
698 f"Error processing podcast {podcast_data.get('asin', 'unknown')}: {exc}"
699 )
700 return None
701
702 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
703 """Fetch podcasts from the user's library with pagination."""
704 response_groups = [
705 "contributors",
706 "media",
707 "product_attrs",
708 "product_desc",
709 "product_details",
710 "product_extended_attrs",
711 ]
712
713 async for item in self._fetch_library_items(
714 ",".join(response_groups), PODCAST_CONTENT_TYPES
715 ):
716 if podcast := await self._process_podcast_item(item):
717 yield podcast
718
719 async def get_podcast(self, asin: str, use_cache: bool = True) -> Podcast:
720 """Fetch full podcast details by ASIN.
721
722 :param asin: The ASIN of the podcast.
723 :param use_cache: Whether to use cached data if available.
724 """
725 if use_cache:
726 cached_podcast = await self.mass.cache.get(
727 key=asin,
728 provider=self.provider_instance,
729 category=CACHE_CATEGORY_PODCAST,
730 default=None,
731 )
732 if cached_podcast is not None:
733 return self._parse_podcast(cached_podcast)
734
735 response = await self._call_api(
736 f"library/{asin}",
737 response_groups="""
738 contributors, media, price, product_attrs, product_desc, product_details,
739 product_extended_attrs, relationships
740 """,
741 )
742
743 if response is None:
744 raise MediaNotFoundError(f"Podcast with ASIN {asin} not found")
745
746 item_data = response.get("item")
747 if item_data is None:
748 raise MediaNotFoundError(f"Podcast data for ASIN {asin} is empty")
749
750 await self.mass.cache.set(
751 key=asin,
752 provider=self.provider_instance,
753 category=CACHE_CATEGORY_PODCAST,
754 data=item_data,
755 )
756 return self._parse_podcast(item_data)
757
758 async def get_podcast_episodes(self, podcast_asin: str) -> AsyncGenerator[PodcastEpisode, None]:
759 """Fetch all episodes for a podcast.
760
761 :param podcast_asin: The ASIN of the parent podcast.
762 """
763 podcast = await self.get_podcast(podcast_asin)
764
765 # Fetch episodes - they're typically in relationships or we need to query children
766 response_groups = [
767 "contributors",
768 "media",
769 "product_attrs",
770 "product_desc",
771 "product_details",
772 "relationships",
773 ]
774
775 page = 1
776 page_size = 50
777 position = 0
778
779 while True:
780 # Query for children of the podcast parent
781 response = await self._call_api(
782 "library",
783 use_cache=False,
784 response_groups=",".join(response_groups),
785 parent_asin=podcast_asin,
786 page=page,
787 num_results=page_size,
788 )
789
790 items = response.get("items", [])
791 if not items:
792 break
793
794 for episode_data in items:
795 try:
796 episode = self._parse_podcast_episode(episode_data, podcast, position)
797 position += 1
798 yield episode
799 except Exception as exc:
800 asin = episode_data.get("asin", "unknown")
801 self.logger.warning(f"Error parsing podcast episode {asin}: {exc}")
802
803 page += 1
804 if len(items) < page_size:
805 break
806
807 async def get_podcast_episode(self, episode_asin: str) -> PodcastEpisode:
808 """Fetch full podcast episode details by ASIN.
809
810 :param episode_asin: The ASIN of the podcast episode.
811 """
812 response = await self._call_api(
813 f"library/{episode_asin}",
814 response_groups="""
815 contributors, media, price, product_attrs, product_desc, product_details,
816 product_extended_attrs, relationships
817 """,
818 )
819
820 if response is None:
821 raise MediaNotFoundError(f"Podcast episode with ASIN {episode_asin} not found")
822
823 item_data = response.get("item")
824 if item_data is None:
825 raise MediaNotFoundError(f"Podcast episode data for ASIN {episode_asin} is empty")
826
827 # Try to get parent podcast info from relationships
828 podcast: Podcast | None = None
829 relationships = item_data.get("relationships", [])
830 for rel in relationships:
831 if rel.get("relationship_type") == "parent":
832 parent_asin = rel.get("asin")
833 if parent_asin:
834 with suppress(MediaNotFoundError):
835 podcast = await self.get_podcast(parent_asin)
836 break
837
838 return self._parse_podcast_episode(item_data, podcast, 0)
839
840 def _parse_podcast(self, podcast_data: dict[str, Any] | None) -> Podcast:
841 """Parse podcast data from API response.
842
843 :param podcast_data: Raw podcast data from the Audible API.
844 """
845 if podcast_data is None:
846 self.logger.error("Received None podcast_data in _parse_podcast")
847 raise MediaNotFoundError("Podcast data not found")
848
849 asin = podcast_data.get("asin", "")
850 title = podcast_data.get("title", "")
851 publisher = podcast_data.get("publisher_name", "")
852
853 # Create podcast object
854 podcast = Podcast(
855 item_id=asin,
856 provider=self.provider_instance,
857 name=title,
858 publisher=publisher,
859 provider_mappings={
860 ProviderMapping(
861 item_id=asin,
862 provider_domain=self.provider_domain,
863 provider_instance=self.provider_instance,
864 )
865 },
866 )
867
868 # Set metadata
869 podcast.metadata.description = _html_to_txt(
870 str(
871 podcast_data.get("publisher_summary", "")
872 or podcast_data.get("extended_product_description", "")
873 )
874 )
875 podcast.metadata.languages = UniqueList([podcast_data.get("language") or ""])
876
877 # Set genres
878 podcast.metadata.genres = {
879 genre.replace("_", " ") for genre in (podcast_data.get("platinum_keywords") or [])
880 }
881
882 # Add images
883 image_path = podcast_data.get("product_images", {}).get("500")
884 podcast.metadata.images = UniqueList(self._create_images(image_path))
885
886 return podcast
887
888 def _parse_podcast_episode(
889 self,
890 episode_data: dict[str, Any] | None,
891 podcast: Podcast | None,
892 position: int,
893 ) -> PodcastEpisode:
894 """Parse podcast episode data from API response.
895
896 :param episode_data: Raw episode data from the Audible API.
897 :param podcast: Parent podcast object (optional).
898 :param position: Position/index of the episode in the podcast.
899 """
900 if episode_data is None:
901 self.logger.error("Received None episode_data in _parse_podcast_episode")
902 raise MediaNotFoundError("Podcast episode data not found")
903
904 asin = episode_data.get("asin", "")
905 title = episode_data.get("title", "")
906
907 # Get duration from runtime_length_min
908 runtime_minutes = episode_data.get("runtime_length_min", 0)
909 duration = runtime_minutes * 60 if runtime_minutes else 0
910
911 # Create podcast reference - use Podcast object or create ItemMapping
912 podcast_ref: Podcast | ItemMapping
913 if podcast is not None:
914 podcast_ref = podcast
915 else:
916 # Try to get parent_asin from relationships for ItemMapping
917 parent_asin = ""
918 relationships = episode_data.get("relationships", [])
919 for rel in relationships:
920 if rel.get("relationship_type") == "parent":
921 parent_asin = rel.get("asin", "")
922 break
923
924 if not parent_asin:
925 self.logger.warning(
926 "No parent_asin found for podcast episode %s; parent podcast is unknown",
927 asin,
928 )
929
930 podcast_ref = ItemMapping(
931 item_id=parent_asin or "",
932 provider=self.provider_instance,
933 name="Unknown Podcast",
934 media_type=MediaType.PODCAST,
935 )
936
937 # Create episode object
938 episode = PodcastEpisode(
939 item_id=asin,
940 provider=self.provider_instance,
941 name=title,
942 duration=duration,
943 position=position,
944 podcast=podcast_ref,
945 provider_mappings={
946 ProviderMapping(
947 item_id=asin,
948 provider_domain=self.provider_domain,
949 provider_instance=self.provider_instance,
950 )
951 },
952 )
953
954 # Set metadata
955 episode.metadata.description = _html_to_txt(
956 str(
957 episode_data.get("publisher_summary", "")
958 or episode_data.get("extended_product_description", "")
959 )
960 )
961
962 # Add images
963 image_path = episode_data.get("product_images", {}).get("500")
964 episode.metadata.images = UniqueList(self._create_images(image_path))
965
966 return episode
967
968 async def get_authors(self) -> dict[str, str]:
969 """Get all unique authors from the library.
970
971 Returns dict mapping author ASIN to author name.
972 """
973 authors: dict[str, str] = {}
974 async for item in self._fetch_library_items(
975 "contributors,product_attrs", AUDIOBOOK_CONTENT_TYPES
976 ):
977 for author in item.get("authors") or []:
978 asin = author.get("asin")
979 name = author.get("name")
980 if asin and name:
981 authors[asin] = name
982 return authors
983
984 async def get_series(self) -> dict[str, str]:
985 """Get all unique series from the library.
986
987 Returns dict mapping series ASIN to series title.
988 """
989 series: dict[str, str] = {}
990 async for item in self._fetch_library_items(
991 "series,product_attrs", AUDIOBOOK_CONTENT_TYPES
992 ):
993 for s in item.get("series") or []:
994 asin = s.get("asin")
995 title = s.get("title")
996 if asin and title:
997 series[asin] = title
998 return series
999
1000 async def get_narrators(self) -> dict[str, str]:
1001 """Get all unique narrators from the library.
1002
1003 Returns dict mapping narrator ASIN to narrator name.
1004 """
1005 narrators: dict[str, str] = {}
1006 async for item in self._fetch_library_items(
1007 "contributors,product_attrs", AUDIOBOOK_CONTENT_TYPES
1008 ):
1009 for narrator in item.get("narrators") or []:
1010 asin = narrator.get("asin")
1011 name = narrator.get("name")
1012 if asin and name:
1013 narrators[asin] = name
1014 return narrators
1015
1016 async def get_genres(self) -> set[str]:
1017 """Get all unique genres from the library."""
1018 genres: set[str] = set()
1019 async for item in self._fetch_library_items("product_attrs", AUDIOBOOK_CONTENT_TYPES):
1020 for keyword in item.get("thesaurus_subject_keywords") or []:
1021 genres.add(keyword.replace("_", " ").replace("-", " ").title())
1022 return genres
1023
1024 async def get_publishers(self) -> set[str]:
1025 """Get all unique publishers from the library."""
1026 publishers: set[str] = set()
1027 async for item in self._fetch_library_items("product_attrs", AUDIOBOOK_CONTENT_TYPES):
1028 publisher = item.get("publisher_name")
1029 if publisher:
1030 publishers.add(publisher)
1031 return publishers
1032
1033 async def get_audiobooks_by_author(self, author_asin: str) -> list[Audiobook]:
1034 """Get all audiobooks by a specific author, sorted by release date."""
1035 audiobooks: list[tuple[str, Audiobook]] = []
1036 async for item in self._fetch_library_items(
1037 "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1038 ):
1039 for author in item.get("authors") or []:
1040 if author.get("asin") == author_asin:
1041 release_date = item.get("release_date") or "0000-00-00"
1042 audiobooks.append((release_date, self._parse_audiobook(item)))
1043 break
1044 audiobooks.sort(key=lambda x: x[0], reverse=True)
1045 return [book for _, book in audiobooks]
1046
1047 async def get_audiobooks_by_narrator(self, narrator_asin: str) -> list[Audiobook]:
1048 """Get all audiobooks by a specific narrator, sorted by release date."""
1049 audiobooks: list[tuple[str, Audiobook]] = []
1050 async for item in self._fetch_library_items(
1051 "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1052 ):
1053 for narrator in item.get("narrators") or []:
1054 if narrator.get("asin") == narrator_asin:
1055 release_date = item.get("release_date") or "0000-00-00"
1056 audiobooks.append((release_date, self._parse_audiobook(item)))
1057 break
1058 audiobooks.sort(key=lambda x: x[0], reverse=True)
1059 return [book for _, book in audiobooks]
1060
1061 async def get_audiobooks_by_genre(self, genre: str) -> list[Audiobook]:
1062 """Get all audiobooks matching a genre, sorted by release date."""
1063 audiobooks: list[tuple[str, Audiobook]] = []
1064 genre_key = genre.lower().replace(" ", "_")
1065 genre_key_alt = genre.lower().replace(" ", "-")
1066 async for item in self._fetch_library_items(
1067 "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1068 ):
1069 keywords = item.get("thesaurus_subject_keywords") or []
1070 if genre_key in keywords or genre_key_alt in keywords:
1071 release_date = item.get("release_date") or "0000-00-00"
1072 audiobooks.append((release_date, self._parse_audiobook(item)))
1073 audiobooks.sort(key=lambda x: x[0], reverse=True)
1074 return [book for _, book in audiobooks]
1075
1076 async def get_audiobooks_by_publisher(self, publisher: str) -> list[Audiobook]:
1077 """Get all audiobooks from a specific publisher, sorted by release date."""
1078 audiobooks: list[tuple[str, Audiobook]] = []
1079 async for item in self._fetch_library_items(
1080 "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1081 ):
1082 if item.get("publisher_name") == publisher:
1083 release_date = item.get("release_date") or "0000-00-00"
1084 audiobooks.append((release_date, self._parse_audiobook(item)))
1085 audiobooks.sort(key=lambda x: x[0], reverse=True)
1086 return [book for _, book in audiobooks]
1087
1088 async def get_audiobooks_by_series(self, series_asin: str) -> list[Audiobook]:
1089 """Get all audiobooks in a specific series, ordered by sequence."""
1090 audiobooks: list[tuple[float, Audiobook]] = []
1091 async for item in self._fetch_library_items(
1092 "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1093 ):
1094 for s in item.get("series") or []:
1095 if s.get("asin") == series_asin:
1096 sequence = s.get("sequence")
1097 try:
1098 seq_num = float(sequence) if sequence else 999
1099 except (ValueError, TypeError):
1100 seq_num = 999
1101 audiobooks.append((seq_num, self._parse_audiobook(item)))
1102 break
1103 audiobooks.sort(key=lambda x: x[0])
1104 return [book for _, book in audiobooks]
1105
1106 async def deregister(self) -> None:
1107 """Deregister this provider from Audible."""
1108 await asyncio.to_thread(self.client.auth.deregister_device)
1109
1110
1111def _html_to_txt(html_text: str) -> str:
1112 txt = html.unescape(html_text)
1113 tags = re.findall("<[^>]+>", txt)
1114 for tag in tags:
1115 txt = txt.replace(tag, "")
1116 return txt
1117
1118
1119async def audible_get_auth_info(locale: str) -> tuple[str, str, str]:
1120 """Generate the login URL and auth info for Audible OAuth flow.
1121
1122 :param locale: The locale string (e.g., 'us', 'uk', 'de').
1123 :return: Tuple of (code_verifier, oauth_url, serial).
1124 """
1125 locale_obj = audible.localization.Locale(locale)
1126 code_verifier = await asyncio.to_thread(audible.login.create_code_verifier)
1127 oauth_url, serial = await asyncio.to_thread(
1128 audible.login.build_oauth_url,
1129 country_code=locale_obj.country_code,
1130 domain=locale_obj.domain,
1131 market_place_id=locale_obj.market_place_id,
1132 code_verifier=code_verifier,
1133 with_username=False,
1134 )
1135
1136 return code_verifier.decode(), oauth_url, serial
1137
1138
1139async def audible_custom_login(
1140 code_verifier: str, response_url: str, serial: str, locale: str
1141) -> audible.Authenticator:
1142 """Complete the authentication using the code_verifier, response_url, and serial.
1143
1144 :param code_verifier: The code verifier string used in OAuth flow.
1145 :param response_url: The response URL containing the authorization code.
1146 :param serial: The device serial number.
1147 :param locale: The locale string.
1148 :return: Audible Authenticator object.
1149 :raises LoginFailed: If authorization code is not found in the URL.
1150 """
1151 logger = logging.getLogger("audible_helper")
1152 auth = audible.Authenticator()
1153 auth.locale = audible.localization.Locale(locale)
1154
1155 response_url_parsed = urlparse(response_url)
1156 parsed_qs = parse_qs(response_url_parsed.query)
1157
1158 # Try multiple parameter names for authorization code
1159 # Audible may use different parameter names depending on the flow
1160 authorization_code = None
1161 for param_name in ["openid.oa2.authorization_code", "authorization_code", "code"]:
1162 if codes := parsed_qs.get(param_name):
1163 authorization_code = codes[0]
1164 logger.debug("Found authorization code in parameter: %s", param_name)
1165 break
1166
1167 if not authorization_code:
1168 available_params = list(parsed_qs.keys())
1169 raise LoginFailed(
1170 f"Authorization code not found in URL. "
1171 f"Expected 'openid.oa2.authorization_code' but found parameters: {available_params}"
1172 )
1173
1174 registration_data = await asyncio.to_thread(
1175 audible.register.register,
1176 authorization_code=authorization_code,
1177 code_verifier=code_verifier.encode(),
1178 domain=auth.locale.domain,
1179 serial=serial,
1180 )
1181 auth._update_attrs(**registration_data)
1182
1183 # Log what auth methods are available after registration
1184 if auth.adp_token and auth.device_private_key:
1185 logger.info("Registration successful with signing auth (stable)")
1186 else:
1187 logger.warning("Registration successful but signing auth not available")
1188
1189 return auth
1190
1191
1192async def check_file_exists(path: str | PathLike[str]) -> bool:
1193 """Async file exists check."""
1194 return await asyncio.to_thread(os.path.exists, path)
1195
1196
1197async def remove_file(path: str | PathLike[str]) -> None:
1198 """Async file delete."""
1199 await asyncio.to_thread(os.remove, path)
1200