music-assistant-server

44.9 KBPY
audible_helper.py
44.9 KB1,200 lines • python
1"""Helper for parsing and using audible api."""
2
3from __future__ import annotations
4
5import asyncio
6import hashlib
7import html
8import json
9import logging
10import os
11import re
12from collections.abc import AsyncGenerator
13from contextlib import suppress
14from datetime import UTC, datetime, timedelta
15from os import PathLike
16from typing import TYPE_CHECKING, Any
17from urllib.parse import parse_qs, urlparse
18
19import audible
20import audible.register
21from audible import AsyncClient
22
23if TYPE_CHECKING:
24    from aiohttp import ClientSession
25from music_assistant_models.enums import ContentType, ImageType, MediaType, StreamType
26from music_assistant_models.errors import LoginFailed, MediaNotFoundError
27from music_assistant_models.media_items import (
28    Audiobook,
29    AudioFormat,
30    ItemMapping,
31    MediaItemChapter,
32    MediaItemImage,
33    Podcast,
34    PodcastEpisode,
35    ProviderMapping,
36    UniqueList,
37)
38from music_assistant_models.streamdetails import StreamDetails
39
40from music_assistant.mass import MusicAssistant
41
42CACHE_DOMAIN = "audible"
43CACHE_CATEGORY_API = 0
44CACHE_CATEGORY_AUDIOBOOK = 1
45CACHE_CATEGORY_CHAPTERS = 2
46CACHE_CATEGORY_PODCAST = 3
47CACHE_CATEGORY_PODCAST_EPISODES = 4
48
49# Content delivery types
50AUDIOBOOK_CONTENT_TYPES = ("SinglePartBook", "MultiPartBook")
51PODCAST_CONTENT_TYPES = ("PodcastParent",)
52
53_AUTH_CACHE: dict[str, audible.Authenticator] = {}
54
55
56async def refresh_access_token_compat(
57    refresh_token: str, domain: str, http_session: ClientSession, with_username: bool = False
58) -> dict[str, Any]:
59    """Refresh tokens with compatibility for new Audible API format.
60
61    The Audible API changed from returning 'access_token' to 'actor_access_token'.
62    This function handles both formats for backward compatibility.
63
64    :param refresh_token: The refresh token obtained after device registration.
65    :param domain: The top level domain (e.g., com, de).
66    :param http_session: The HTTP client session to use for requests.
67    :param with_username: If True, use audible domain instead of amazon.
68    :return: Dict with access_token and expires timestamp.
69    """
70    logger = logging.getLogger("audible_helper")
71
72    body = {
73        "app_name": "Audible",
74        "app_version": "3.56.2",
75        "source_token": refresh_token,
76        "requested_token_type": "access_token",
77        "source_token_type": "refresh_token",
78    }
79
80    target_domain = "audible" if with_username else "amazon"
81    url = f"https://api.{target_domain}.{domain}/auth/token"
82
83    async with http_session.post(url, data=body) as resp:
84        resp.raise_for_status()
85        resp_dict = await resp.json()
86
87    expires_in_sec = int(resp_dict.get("expires_in", 3600))
88    expires = (datetime.now(UTC) + timedelta(seconds=expires_in_sec)).timestamp()
89
90    # Handle new format (actor_access_token) or fall back to legacy (access_token)
91    access_token = resp_dict.get("actor_access_token") or resp_dict.get("access_token")
92
93    if not access_token:
94        logger.error("Token refresh response missing both actor_access_token and access_token")
95        raise LoginFailed("Token refresh failed: no access token in response")
96
97    logger.debug(
98        "Token refreshed successfully using %s format",
99        "new (actor)" if "actor_access_token" in resp_dict else "legacy",
100    )
101
102    return {"access_token": access_token, "expires": expires}
103
104
105async def cached_authenticator_from_file(path: str) -> audible.Authenticator:
106    """Get an authenticator from file with caching and signing auth validation.
107
108    :param path: Path to the authenticator JSON file.
109    :return: The cached or loaded Authenticator instance.
110    """
111    logger = logging.getLogger("audible_helper")
112    if path in _AUTH_CACHE:
113        return _AUTH_CACHE[path]
114
115    logger.debug("Loading authenticator from file %s and caching it", path)
116    auth = await asyncio.to_thread(audible.Authenticator.from_file, path)
117
118    # Verify signing auth is available (not affected by API changes)
119    if auth.adp_token and auth.device_private_key:
120        logger.debug("Signing auth available - using stable RSA-signed requests")
121    else:
122        logger.warning(
123            "Signing auth not available - only bearer auth will work. "
124            "Consider re-authenticating for more stable auth."
125        )
126
127    _AUTH_CACHE[path] = auth
128    return auth
129
130
131class AudibleHelper:
132    """Helper for parsing and using audible api."""
133
134    def __init__(
135        self,
136        mass: MusicAssistant,
137        client: AsyncClient,
138        provider_domain: str,
139        provider_instance: str,
140        logger: logging.Logger | None = None,
141    ):
142        """Initialize the Audible Helper."""
143        self.mass = mass
144        self.client = client
145        self.provider_domain = provider_domain
146        self.provider_instance = provider_instance
147        self.logger = logger or logging.getLogger("audible_helper")
148        self._acr_cache: dict[tuple[str, MediaType], str] = {}
149
150    async def _fetch_library_items(
151        self,
152        response_groups: str,
153        content_types: tuple[str, ...],
154    ) -> AsyncGenerator[dict[str, Any], None]:
155        """Fetch items from the library with pagination."""
156        page = 1
157        page_size = 50
158        total_processed = 0
159        max_iterations = 100
160        iteration = 0
161
162        while iteration < max_iterations:
163            iteration += 1
164            self.logger.debug(
165                "Audible: Fetching library page %s (processed so far: %s)",
166                page,
167                total_processed,
168            )
169
170            library = await self._call_api(
171                "library",
172                use_cache=False,
173                response_groups=response_groups,
174                page=page,
175                num_results=page_size,
176            )
177
178            items = library.get("items", [])
179
180            if not items:
181                break
182
183            items_processed_this_page = 0
184            for item in items:
185                # Filter by content type if specified
186                if content_types and item.get("content_delivery_type") not in content_types:
187                    continue
188
189                yield item
190                items_processed_this_page += 1
191                total_processed += 1
192
193            self.logger.debug(
194                "Audible: Processed %s items on page %s", items_processed_this_page, page
195            )
196
197            page += 1
198            if len(items) < page_size:
199                break
200
201        if iteration >= max_iterations:
202            self.logger.warning(
203                "Audible: Reached maximum iteration limit (%s) with %s items processed",
204                max_iterations,
205                total_processed,
206            )
207
208    async def _process_audiobook_item(self, audiobook_data: dict[str, Any]) -> Audiobook | None:
209        """Process a single audiobook item from the library."""
210        # Ensure asin is a valid string
211        asin = str(audiobook_data.get("asin", ""))
212        cached_book = None
213        if asin:
214            cached_book = await self.mass.cache.get(
215                key=asin,
216                provider=self.provider_instance,
217                category=CACHE_CATEGORY_AUDIOBOOK,
218                default=None,
219            )
220
221        try:
222            if cached_book is not None:
223                return self._parse_audiobook(cached_book)
224            return self._parse_audiobook(audiobook_data)
225        except MediaNotFoundError as exc:
226            self.logger.warning(f"Skipping invalid audiobook: {exc}")
227            return None
228        except Exception as exc:
229            self.logger.warning(
230                f"Error processing audiobook {audiobook_data.get('asin', 'unknown')}: {exc}"
231            )
232            return None
233
234    async def get_library(self) -> AsyncGenerator[Audiobook, None]:
235        """Fetch the user's library with pagination."""
236        response_groups = [
237            "contributors",
238            "media",
239            "product_attrs",
240            "product_desc",
241            "product_details",
242            "product_extended_attrs",
243        ]
244
245        async for item in self._fetch_library_items(
246            ",".join(response_groups), AUDIOBOOK_CONTENT_TYPES
247        ):
248            if album := await self._process_audiobook_item(item):
249                yield album
250
251    async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook:
252        """Fetch the full audiobook by asin with all details including chapters.
253
254        This method fetches complete audiobook details including chapters and resume position.
255        Use this when the user requests full details for a specific audiobook.
256        """
257        if use_cache:
258            cached_book = await self.mass.cache.get(
259                key=asin,
260                provider=self.provider_instance,
261                category=CACHE_CATEGORY_AUDIOBOOK,
262                default=None,
263            )
264            if cached_book is not None:
265                book = self._parse_audiobook(cached_book)
266                # Enrich with chapters and resume position
267                await self._enrich_audiobook(book, asin)
268                return book
269        response = await self._call_api(
270            f"library/{asin}",
271            response_groups="""
272                contributors, media, price, product_attrs, product_desc, product_details,
273                product_extended_attrs,is_finished
274                """,
275        )
276
277        if response is None:
278            raise MediaNotFoundError(f"Audiobook with ASIN {asin} not found")
279
280        item_data = response.get("item")
281        if item_data is None:
282            raise MediaNotFoundError(f"Audiobook data for ASIN {asin} is empty")
283
284        await self.mass.cache.set(
285            key=asin,
286            provider=self.provider_instance,
287            category=CACHE_CATEGORY_AUDIOBOOK,
288            data=item_data,
289        )
290        book = self._parse_audiobook(item_data)
291        # Enrich with chapters and resume position
292        await self._enrich_audiobook(book, asin)
293        return book
294
295    async def _enrich_audiobook(self, book: Audiobook, asin: str) -> None:
296        """Enrich audiobook with chapters and resume position.
297
298        This makes additional API calls and should only be used for full audiobook details,
299        not during library sync.
300        """
301        # Fetch chapters
302        chapters_data = await self._fetch_chapters(asin=asin)
303        if chapters_data:
304            chapters: list[MediaItemChapter] = [
305                self._parse_chapter_data(chapter, idx) for idx, chapter in enumerate(chapters_data)
306            ]
307            book.metadata.chapters = chapters
308            # Update duration from chapters if available (more accurate)
309            try:
310                duration = sum(chapter.get("length_ms", 0) for chapter in chapters_data) / 1000
311                if duration > 0:
312                    book.duration = duration
313            except Exception as exc:
314                self.logger.warning(f"Error calculating duration from chapters for {asin}: {exc}")
315
316        # Fetch resume position
317        book.resume_position_ms = await self.get_last_postion(asin=asin)
318
319    async def get_stream(
320        self, asin: str, media_type: MediaType = MediaType.AUDIOBOOK
321    ) -> StreamDetails:
322        """Get stream details for an audiobook or podcast episode.
323
324        :param asin: The ASIN of the content.
325        :param media_type: The type of media (audiobook or podcast episode).
326        """
327        if not asin:
328            self.logger.error("Invalid ASIN provided to get_stream")
329            raise ValueError("Invalid ASIN provided to get_stream")
330
331        duration = 0
332        # For audiobooks, try to get duration from chapters
333        if media_type == MediaType.AUDIOBOOK:
334            chapters = await self._fetch_chapters(asin=asin)
335            if chapters:
336                try:
337                    duration = sum(chapter.get("length_ms", 0) for chapter in chapters) / 1000
338                except Exception as exc:
339                    self.logger.warning(f"Error calculating duration for ASIN {asin}: {exc}")
340
341        try:
342            # Podcasts use Mpeg (non-DRM MP3), audiobooks use HLS
343            if media_type == MediaType.PODCAST_EPISODE:
344                playback_info = await self.client.post(
345                    f"content/{asin}/licenserequest",
346                    body={
347                        "consumption_type": "Streaming",
348                        "drm_type": "Mpeg",
349                        "quality": "High",
350                    },
351                )
352            else:
353                playback_info = await self.client.post(
354                    f"content/{asin}/licenserequest",
355                    body={
356                        "quality": "High",
357                        "response_groups": "content_reference,certificate",
358                        "consumption_type": "Streaming",
359                        "supported_media_features": {
360                            "codecs": ["mp4a.40.2", "mp4a.40.42"],
361                            "drm_types": [
362                                "Hls",
363                            ],
364                        },
365                        "spatial": False,
366                    },
367                )
368
369            content_license = playback_info.get("content_license", {})
370            if not content_license:
371                self.logger.error(f"No content_license in playback_info for ASIN {asin}")
372                raise ValueError(f"Missing content_license for ASIN {asin}")
373
374            content_metadata = content_license.get("content_metadata", {})
375            content_reference = content_metadata.get("content_reference", {})
376            size = content_reference.get("content_size_in_bytes", 0)
377
378            stream_url = content_license.get("license_response")
379            if not stream_url:
380                self.logger.error(f"No license_response (stream URL) for ASIN {asin}")
381                raise ValueError(f"Missing stream URL for ASIN {asin}")
382
383            acr = content_license.get("acr", "")
384            if acr:
385                self._acr_cache[(asin, media_type)] = acr
386
387            content_type = (
388                ContentType.MP3 if media_type == MediaType.PODCAST_EPISODE else ContentType.AAC
389            )
390        except Exception as exc:
391            self.logger.error(f"Error getting stream details for ASIN {asin}: {exc}")
392            raise ValueError(f"Failed to get stream details: {exc}") from exc
393
394        return StreamDetails(
395            provider=self.provider_instance,
396            size=size,
397            item_id=f"{asin}",
398            audio_format=AudioFormat(content_type=content_type),
399            media_type=media_type,
400            stream_type=StreamType.HTTP,
401            path=stream_url,
402            can_seek=True,
403            allow_seek=True,
404            duration=duration,
405            data={"acr": acr},
406        )
407
408    async def _fetch_chapters(self, asin: str) -> list[dict[str, Any]]:
409        """Fetch chapter data for an audiobook."""
410        if not asin or asin == "error":
411            self.logger.warning(
412                "Invalid ASIN provided to _fetch_chapters, returning empty chapter list"
413            )
414            return []
415
416        chapters_data: list[Any] = await self.mass.cache.get(
417            key=asin, provider=self.provider_instance, category=CACHE_CATEGORY_CHAPTERS, default=[]
418        )
419
420        if not chapters_data:
421            try:
422                response = await self._call_api(
423                    f"content/{asin}/metadata",
424                    response_groups="chapter_info, always-returned, content_reference, content_url",
425                    chapter_titles_type="Flat",
426                )
427
428                if not response:
429                    self.logger.warning(f"Failed to get metadata for ASIN {asin}")
430                    return []
431
432                content_metadata = response.get("content_metadata")
433                if not content_metadata:
434                    self.logger.warning(f"No content_metadata for ASIN {asin}")
435                    return []
436
437                chapter_info = content_metadata.get("chapter_info")
438                if not chapter_info:
439                    self.logger.warning(f"No chapter_info for ASIN {asin}")
440                    return []
441
442                chapters_data = chapter_info.get("chapters") or []
443
444                await self.mass.cache.set(
445                    key=asin,
446                    data=chapters_data,
447                    provider=self.provider_instance,
448                    category=CACHE_CATEGORY_CHAPTERS,
449                )
450            except Exception as exc:
451                self.logger.error(f"Error fetching chapters for ASIN {asin}: {exc}")
452                chapters_data = []
453
454        return chapters_data
455
456    async def get_last_postion(self, asin: str) -> int:
457        """Fetch last position of asin."""
458        if not asin or asin == "error":
459            return 0
460
461        try:
462            response = await self._call_api("annotations/lastpositions", asins=asin)
463
464            if not response:
465                self.logger.debug(f"No last position data available for ASIN {asin}")
466                return 0
467
468            annotations = response.get("asin_last_position_heard_annots")
469            if not annotations or not isinstance(annotations, list) or len(annotations) == 0:
470                self.logger.debug(f"No annotations found for ASIN {asin}")
471                return 0
472
473            annotation = annotations[0]
474            if not annotation or not isinstance(annotation, dict):
475                self.logger.debug(f"Invalid annotation for ASIN {asin}")
476                return 0
477
478            last_position = annotation.get("last_position_heard")
479            if not last_position or not isinstance(last_position, dict):
480                self.logger.debug(f"Invalid last_position for ASIN {asin}")
481                return 0
482
483            position_ms = last_position.get("position_ms", 0)
484            return int(position_ms)
485
486        except Exception as exc:
487            self.logger.error(f"Error getting last position for ASIN {asin}: {exc}")
488            return 0
489
490    async def set_last_position(
491        self, asin: str, pos: int, media_type: MediaType = MediaType.AUDIOBOOK
492    ) -> None:
493        """Report last position to Audible.
494
495        :param asin: The content ID (audiobook or podcast episode).
496        :param pos: Position in seconds.
497        :param media_type: The type of media (audiobook or podcast episode).
498        """
499        if not asin or asin == "error" or pos <= 0:
500            return
501
502        try:
503            position_ms = pos * 1000
504
505            # Try to get ACR from cache first
506            acr = self._acr_cache.get((asin, media_type))
507            if not acr:
508                stream_details = await self.get_stream(asin=asin, media_type=media_type)
509                acr = stream_details.data.get("acr")
510
511            if not acr:
512                self.logger.warning(f"No ACR available for ASIN {asin}, cannot report position")
513                return
514
515            await self.client.put(
516                f"lastpositions/{asin}", body={"acr": acr, "asin": asin, "position_ms": position_ms}
517            )
518
519            self.logger.debug(f"Successfully reported position {position_ms}ms for ASIN {asin}")
520
521        except (KeyError, TypeError) as exc:
522            self.logger.error(
523                f"Error accessing data while reporting position for ASIN {asin}: {exc}"
524            )
525        except TimeoutError as exc:
526            self.logger.error(f"Timeout while reporting position for ASIN {asin}: {exc}")
527        except ConnectionError as exc:
528            self.logger.error(f"Connection error while reporting position for ASIN {asin}: {exc}")
529        except Exception as exc:
530            self.logger.error(f"Unexpected error reporting position for ASIN {asin}: {exc}")
531
532    async def _call_api(self, path: str, **kwargs: Any) -> Any:
533        response = None
534        use_cache = kwargs.pop("use_cache", False)
535        params_str = json.dumps(kwargs, sort_keys=True)
536        params_hash = hashlib.md5(params_str.encode()).hexdigest()
537        cache_key_with_params = f"{path}:{params_hash}"
538        if use_cache:
539            response = await self.mass.cache.get(
540                key=cache_key_with_params,
541                provider=self.provider_instance,
542                category=CACHE_CATEGORY_API,
543            )
544        if not response:
545            response = await self.client.get(path, **kwargs)
546            await self.mass.cache.set(
547                key=cache_key_with_params, provider=self.provider_instance, data=response
548            )
549        return response
550
551    def _parse_contributors(
552        self, contributors_list: list[dict[str, Any]] | None, default_name: str
553    ) -> list[str]:
554        """Parse contributors (authors, narrators) from API response."""
555        result: list[str] = []
556        contributors: list[dict[str, Any]] = contributors_list or []
557        if isinstance(contributors, list):
558            for contributor in contributors:
559                if contributor and isinstance(contributor, dict):
560                    result.append(contributor.get("name", default_name))
561        return result
562
563    def _create_images(self, image_path: str | None) -> list[MediaItemImage]:
564        """Create image objects if image path exists."""
565        images: list[MediaItemImage] = []
566        if image_path:
567            images.append(
568                MediaItemImage(
569                    type=ImageType.THUMB,
570                    path=image_path,
571                    provider=self.provider_instance,
572                    remotely_accessible=True,
573                )
574            )
575            images.append(
576                MediaItemImage(
577                    type=ImageType.CLEARART,
578                    path=image_path,
579                    provider=self.provider_instance,
580                    remotely_accessible=True,
581                )
582            )
583        return images
584
585    def _parse_chapter_data(self, chapter_data: dict[str, Any], index: int) -> MediaItemChapter:
586        """Parse chapter data into MediaItemChapter object."""
587        try:
588            start = int(chapter_data.get("start_offset_sec", 0))
589        except (TypeError, ValueError):
590            start = 0
591
592        try:
593            length = int(chapter_data.get("length_ms", 0)) / 1000
594        except (TypeError, ValueError):
595            length = 0
596
597        raw_title = chapter_data.get("title")
598        chapter_title: str
599        if raw_title is None:
600            chapter_title = f"Chapter {index + 1}"
601        elif isinstance(raw_title, str):
602            chapter_title = raw_title
603        else:
604            chapter_title = str(raw_title)
605
606        return MediaItemChapter(position=index, name=chapter_title, start=start, end=start + length)
607
608    def _parse_audiobook(self, audiobook_data: dict[str, Any] | None) -> Audiobook:
609        """Parse audiobook data from API response.
610
611        NOTE: This is a pure parser - no API calls allowed here.
612        Chapters and resume position are fetched lazily when needed.
613        """
614        if audiobook_data is None:
615            self.logger.error("Received None audiobook_data in _parse_audiobook")
616            raise MediaNotFoundError("Audiobook data not found")
617
618        asin = audiobook_data.get("asin", "")
619        title = audiobook_data.get("title", "")
620
621        # Parse authors and narrators
622        narrators = self._parse_contributors(audiobook_data.get("narrators"), "Unknown Narrator")
623        authors = self._parse_contributors(audiobook_data.get("authors"), "Unknown Author")
624
625        # Get duration from runtime_length_min (provided by 'media' response group)
626        # Chapters are fetched lazily when streaming, not during library sync
627        runtime_minutes = audiobook_data.get("runtime_length_min", 0)
628        duration = runtime_minutes * 60 if runtime_minutes else 0
629
630        # Create audiobook object
631        book = Audiobook(
632            item_id=asin,
633            provider=self.provider_instance,
634            name=title,
635            duration=duration,
636            provider_mappings={
637                ProviderMapping(
638                    item_id=asin,
639                    provider_domain=self.provider_domain,
640                    provider_instance=self.provider_instance,
641                )
642            },
643            publisher=audiobook_data.get("publisher_name"),
644            authors=UniqueList(authors),
645            narrators=UniqueList(narrators),
646        )
647
648        # Set metadata
649        book.metadata.copyright = audiobook_data.get("copyright")
650        book.metadata.description = _html_to_txt(
651            str(audiobook_data.get("extended_product_description", ""))
652        )
653        book.metadata.languages = UniqueList([audiobook_data.get("language") or ""])
654        if release_date := audiobook_data.get("release_date"):
655            with suppress(ValueError):
656                datetime.strptime(release_date, "%Y-%m-%d").astimezone(UTC)
657
658        # Set review if available
659        reviews = audiobook_data.get("editorial_reviews", [])
660        if reviews and reviews[0]:
661            book.metadata.review = _html_to_txt(str(reviews[0]))
662
663        # Set genres
664        book.metadata.genres = {
665            genre.replace("_", " ") for genre in (audiobook_data.get("platinum_keywords") or [])
666        }
667
668        # Add images
669        image_path = audiobook_data.get("product_images", {}).get("500")
670        book.metadata.images = UniqueList(self._create_images(image_path))
671
672        # Chapters are not fetched during parsing - they are fetched lazily when streaming
673        # This avoids N+1 API calls during library sync
674
675        return book
676
677    async def _process_podcast_item(self, podcast_data: dict[str, Any]) -> Podcast | None:
678        """Process a single podcast item from the library."""
679        asin = str(podcast_data.get("asin", ""))
680        cached_podcast = None
681        if asin:
682            cached_podcast = await self.mass.cache.get(
683                key=asin,
684                provider=self.provider_instance,
685                category=CACHE_CATEGORY_PODCAST,
686                default=None,
687            )
688
689        try:
690            if cached_podcast is not None:
691                return self._parse_podcast(cached_podcast)
692            return self._parse_podcast(podcast_data)
693        except MediaNotFoundError as exc:
694            self.logger.warning(f"Skipping invalid podcast: {exc}")
695            return None
696        except Exception as exc:
697            self.logger.warning(
698                f"Error processing podcast {podcast_data.get('asin', 'unknown')}: {exc}"
699            )
700            return None
701
702    async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
703        """Fetch podcasts from the user's library with pagination."""
704        response_groups = [
705            "contributors",
706            "media",
707            "product_attrs",
708            "product_desc",
709            "product_details",
710            "product_extended_attrs",
711        ]
712
713        async for item in self._fetch_library_items(
714            ",".join(response_groups), PODCAST_CONTENT_TYPES
715        ):
716            if podcast := await self._process_podcast_item(item):
717                yield podcast
718
719    async def get_podcast(self, asin: str, use_cache: bool = True) -> Podcast:
720        """Fetch full podcast details by ASIN.
721
722        :param asin: The ASIN of the podcast.
723        :param use_cache: Whether to use cached data if available.
724        """
725        if use_cache:
726            cached_podcast = await self.mass.cache.get(
727                key=asin,
728                provider=self.provider_instance,
729                category=CACHE_CATEGORY_PODCAST,
730                default=None,
731            )
732            if cached_podcast is not None:
733                return self._parse_podcast(cached_podcast)
734
735        response = await self._call_api(
736            f"library/{asin}",
737            response_groups="""
738                contributors, media, price, product_attrs, product_desc, product_details,
739                product_extended_attrs, relationships
740                """,
741        )
742
743        if response is None:
744            raise MediaNotFoundError(f"Podcast with ASIN {asin} not found")
745
746        item_data = response.get("item")
747        if item_data is None:
748            raise MediaNotFoundError(f"Podcast data for ASIN {asin} is empty")
749
750        await self.mass.cache.set(
751            key=asin,
752            provider=self.provider_instance,
753            category=CACHE_CATEGORY_PODCAST,
754            data=item_data,
755        )
756        return self._parse_podcast(item_data)
757
758    async def get_podcast_episodes(self, podcast_asin: str) -> AsyncGenerator[PodcastEpisode, None]:
759        """Fetch all episodes for a podcast.
760
761        :param podcast_asin: The ASIN of the parent podcast.
762        """
763        podcast = await self.get_podcast(podcast_asin)
764
765        # Fetch episodes - they're typically in relationships or we need to query children
766        response_groups = [
767            "contributors",
768            "media",
769            "product_attrs",
770            "product_desc",
771            "product_details",
772            "relationships",
773        ]
774
775        page = 1
776        page_size = 50
777        position = 0
778
779        while True:
780            # Query for children of the podcast parent
781            response = await self._call_api(
782                "library",
783                use_cache=False,
784                response_groups=",".join(response_groups),
785                parent_asin=podcast_asin,
786                page=page,
787                num_results=page_size,
788            )
789
790            items = response.get("items", [])
791            if not items:
792                break
793
794            for episode_data in items:
795                try:
796                    episode = self._parse_podcast_episode(episode_data, podcast, position)
797                    position += 1
798                    yield episode
799                except Exception as exc:
800                    asin = episode_data.get("asin", "unknown")
801                    self.logger.warning(f"Error parsing podcast episode {asin}: {exc}")
802
803            page += 1
804            if len(items) < page_size:
805                break
806
807    async def get_podcast_episode(self, episode_asin: str) -> PodcastEpisode:
808        """Fetch full podcast episode details by ASIN.
809
810        :param episode_asin: The ASIN of the podcast episode.
811        """
812        response = await self._call_api(
813            f"library/{episode_asin}",
814            response_groups="""
815                contributors, media, price, product_attrs, product_desc, product_details,
816                product_extended_attrs, relationships
817                """,
818        )
819
820        if response is None:
821            raise MediaNotFoundError(f"Podcast episode with ASIN {episode_asin} not found")
822
823        item_data = response.get("item")
824        if item_data is None:
825            raise MediaNotFoundError(f"Podcast episode data for ASIN {episode_asin} is empty")
826
827        # Try to get parent podcast info from relationships
828        podcast: Podcast | None = None
829        relationships = item_data.get("relationships", [])
830        for rel in relationships:
831            if rel.get("relationship_type") == "parent":
832                parent_asin = rel.get("asin")
833                if parent_asin:
834                    with suppress(MediaNotFoundError):
835                        podcast = await self.get_podcast(parent_asin)
836                break
837
838        return self._parse_podcast_episode(item_data, podcast, 0)
839
840    def _parse_podcast(self, podcast_data: dict[str, Any] | None) -> Podcast:
841        """Parse podcast data from API response.
842
843        :param podcast_data: Raw podcast data from the Audible API.
844        """
845        if podcast_data is None:
846            self.logger.error("Received None podcast_data in _parse_podcast")
847            raise MediaNotFoundError("Podcast data not found")
848
849        asin = podcast_data.get("asin", "")
850        title = podcast_data.get("title", "")
851        publisher = podcast_data.get("publisher_name", "")
852
853        # Create podcast object
854        podcast = Podcast(
855            item_id=asin,
856            provider=self.provider_instance,
857            name=title,
858            publisher=publisher,
859            provider_mappings={
860                ProviderMapping(
861                    item_id=asin,
862                    provider_domain=self.provider_domain,
863                    provider_instance=self.provider_instance,
864                )
865            },
866        )
867
868        # Set metadata
869        podcast.metadata.description = _html_to_txt(
870            str(
871                podcast_data.get("publisher_summary", "")
872                or podcast_data.get("extended_product_description", "")
873            )
874        )
875        podcast.metadata.languages = UniqueList([podcast_data.get("language") or ""])
876
877        # Set genres
878        podcast.metadata.genres = {
879            genre.replace("_", " ") for genre in (podcast_data.get("platinum_keywords") or [])
880        }
881
882        # Add images
883        image_path = podcast_data.get("product_images", {}).get("500")
884        podcast.metadata.images = UniqueList(self._create_images(image_path))
885
886        return podcast
887
888    def _parse_podcast_episode(
889        self,
890        episode_data: dict[str, Any] | None,
891        podcast: Podcast | None,
892        position: int,
893    ) -> PodcastEpisode:
894        """Parse podcast episode data from API response.
895
896        :param episode_data: Raw episode data from the Audible API.
897        :param podcast: Parent podcast object (optional).
898        :param position: Position/index of the episode in the podcast.
899        """
900        if episode_data is None:
901            self.logger.error("Received None episode_data in _parse_podcast_episode")
902            raise MediaNotFoundError("Podcast episode data not found")
903
904        asin = episode_data.get("asin", "")
905        title = episode_data.get("title", "")
906
907        # Get duration from runtime_length_min
908        runtime_minutes = episode_data.get("runtime_length_min", 0)
909        duration = runtime_minutes * 60 if runtime_minutes else 0
910
911        # Create podcast reference - use Podcast object or create ItemMapping
912        podcast_ref: Podcast | ItemMapping
913        if podcast is not None:
914            podcast_ref = podcast
915        else:
916            # Try to get parent_asin from relationships for ItemMapping
917            parent_asin = ""
918            relationships = episode_data.get("relationships", [])
919            for rel in relationships:
920                if rel.get("relationship_type") == "parent":
921                    parent_asin = rel.get("asin", "")
922                    break
923
924            if not parent_asin:
925                self.logger.warning(
926                    "No parent_asin found for podcast episode %s; parent podcast is unknown",
927                    asin,
928                )
929
930            podcast_ref = ItemMapping(
931                item_id=parent_asin or "",
932                provider=self.provider_instance,
933                name="Unknown Podcast",
934                media_type=MediaType.PODCAST,
935            )
936
937        # Create episode object
938        episode = PodcastEpisode(
939            item_id=asin,
940            provider=self.provider_instance,
941            name=title,
942            duration=duration,
943            position=position,
944            podcast=podcast_ref,
945            provider_mappings={
946                ProviderMapping(
947                    item_id=asin,
948                    provider_domain=self.provider_domain,
949                    provider_instance=self.provider_instance,
950                )
951            },
952        )
953
954        # Set metadata
955        episode.metadata.description = _html_to_txt(
956            str(
957                episode_data.get("publisher_summary", "")
958                or episode_data.get("extended_product_description", "")
959            )
960        )
961
962        # Add images
963        image_path = episode_data.get("product_images", {}).get("500")
964        episode.metadata.images = UniqueList(self._create_images(image_path))
965
966        return episode
967
968    async def get_authors(self) -> dict[str, str]:
969        """Get all unique authors from the library.
970
971        Returns dict mapping author ASIN to author name.
972        """
973        authors: dict[str, str] = {}
974        async for item in self._fetch_library_items(
975            "contributors,product_attrs", AUDIOBOOK_CONTENT_TYPES
976        ):
977            for author in item.get("authors") or []:
978                asin = author.get("asin")
979                name = author.get("name")
980                if asin and name:
981                    authors[asin] = name
982        return authors
983
984    async def get_series(self) -> dict[str, str]:
985        """Get all unique series from the library.
986
987        Returns dict mapping series ASIN to series title.
988        """
989        series: dict[str, str] = {}
990        async for item in self._fetch_library_items(
991            "series,product_attrs", AUDIOBOOK_CONTENT_TYPES
992        ):
993            for s in item.get("series") or []:
994                asin = s.get("asin")
995                title = s.get("title")
996                if asin and title:
997                    series[asin] = title
998        return series
999
1000    async def get_narrators(self) -> dict[str, str]:
1001        """Get all unique narrators from the library.
1002
1003        Returns dict mapping narrator ASIN to narrator name.
1004        """
1005        narrators: dict[str, str] = {}
1006        async for item in self._fetch_library_items(
1007            "contributors,product_attrs", AUDIOBOOK_CONTENT_TYPES
1008        ):
1009            for narrator in item.get("narrators") or []:
1010                asin = narrator.get("asin")
1011                name = narrator.get("name")
1012                if asin and name:
1013                    narrators[asin] = name
1014        return narrators
1015
1016    async def get_genres(self) -> set[str]:
1017        """Get all unique genres from the library."""
1018        genres: set[str] = set()
1019        async for item in self._fetch_library_items("product_attrs", AUDIOBOOK_CONTENT_TYPES):
1020            for keyword in item.get("thesaurus_subject_keywords") or []:
1021                genres.add(keyword.replace("_", " ").replace("-", " ").title())
1022        return genres
1023
1024    async def get_publishers(self) -> set[str]:
1025        """Get all unique publishers from the library."""
1026        publishers: set[str] = set()
1027        async for item in self._fetch_library_items("product_attrs", AUDIOBOOK_CONTENT_TYPES):
1028            publisher = item.get("publisher_name")
1029            if publisher:
1030                publishers.add(publisher)
1031        return publishers
1032
1033    async def get_audiobooks_by_author(self, author_asin: str) -> list[Audiobook]:
1034        """Get all audiobooks by a specific author, sorted by release date."""
1035        audiobooks: list[tuple[str, Audiobook]] = []
1036        async for item in self._fetch_library_items(
1037            "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1038        ):
1039            for author in item.get("authors") or []:
1040                if author.get("asin") == author_asin:
1041                    release_date = item.get("release_date") or "0000-00-00"
1042                    audiobooks.append((release_date, self._parse_audiobook(item)))
1043                    break
1044        audiobooks.sort(key=lambda x: x[0], reverse=True)
1045        return [book for _, book in audiobooks]
1046
1047    async def get_audiobooks_by_narrator(self, narrator_asin: str) -> list[Audiobook]:
1048        """Get all audiobooks by a specific narrator, sorted by release date."""
1049        audiobooks: list[tuple[str, Audiobook]] = []
1050        async for item in self._fetch_library_items(
1051            "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1052        ):
1053            for narrator in item.get("narrators") or []:
1054                if narrator.get("asin") == narrator_asin:
1055                    release_date = item.get("release_date") or "0000-00-00"
1056                    audiobooks.append((release_date, self._parse_audiobook(item)))
1057                    break
1058        audiobooks.sort(key=lambda x: x[0], reverse=True)
1059        return [book for _, book in audiobooks]
1060
1061    async def get_audiobooks_by_genre(self, genre: str) -> list[Audiobook]:
1062        """Get all audiobooks matching a genre, sorted by release date."""
1063        audiobooks: list[tuple[str, Audiobook]] = []
1064        genre_key = genre.lower().replace(" ", "_")
1065        genre_key_alt = genre.lower().replace(" ", "-")
1066        async for item in self._fetch_library_items(
1067            "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1068        ):
1069            keywords = item.get("thesaurus_subject_keywords") or []
1070            if genre_key in keywords or genre_key_alt in keywords:
1071                release_date = item.get("release_date") or "0000-00-00"
1072                audiobooks.append((release_date, self._parse_audiobook(item)))
1073        audiobooks.sort(key=lambda x: x[0], reverse=True)
1074        return [book for _, book in audiobooks]
1075
1076    async def get_audiobooks_by_publisher(self, publisher: str) -> list[Audiobook]:
1077        """Get all audiobooks from a specific publisher, sorted by release date."""
1078        audiobooks: list[tuple[str, Audiobook]] = []
1079        async for item in self._fetch_library_items(
1080            "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1081        ):
1082            if item.get("publisher_name") == publisher:
1083                release_date = item.get("release_date") or "0000-00-00"
1084                audiobooks.append((release_date, self._parse_audiobook(item)))
1085        audiobooks.sort(key=lambda x: x[0], reverse=True)
1086        return [book for _, book in audiobooks]
1087
1088    async def get_audiobooks_by_series(self, series_asin: str) -> list[Audiobook]:
1089        """Get all audiobooks in a specific series, ordered by sequence."""
1090        audiobooks: list[tuple[float, Audiobook]] = []
1091        async for item in self._fetch_library_items(
1092            "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
1093        ):
1094            for s in item.get("series") or []:
1095                if s.get("asin") == series_asin:
1096                    sequence = s.get("sequence")
1097                    try:
1098                        seq_num = float(sequence) if sequence else 999
1099                    except (ValueError, TypeError):
1100                        seq_num = 999
1101                    audiobooks.append((seq_num, self._parse_audiobook(item)))
1102                    break
1103        audiobooks.sort(key=lambda x: x[0])
1104        return [book for _, book in audiobooks]
1105
1106    async def deregister(self) -> None:
1107        """Deregister this provider from Audible."""
1108        await asyncio.to_thread(self.client.auth.deregister_device)
1109
1110
1111def _html_to_txt(html_text: str) -> str:
1112    txt = html.unescape(html_text)
1113    tags = re.findall("<[^>]+>", txt)
1114    for tag in tags:
1115        txt = txt.replace(tag, "")
1116    return txt
1117
1118
1119async def audible_get_auth_info(locale: str) -> tuple[str, str, str]:
1120    """Generate the login URL and auth info for Audible OAuth flow.
1121
1122    :param locale: The locale string (e.g., 'us', 'uk', 'de').
1123    :return: Tuple of (code_verifier, oauth_url, serial).
1124    """
1125    locale_obj = audible.localization.Locale(locale)
1126    code_verifier = await asyncio.to_thread(audible.login.create_code_verifier)
1127    oauth_url, serial = await asyncio.to_thread(
1128        audible.login.build_oauth_url,
1129        country_code=locale_obj.country_code,
1130        domain=locale_obj.domain,
1131        market_place_id=locale_obj.market_place_id,
1132        code_verifier=code_verifier,
1133        with_username=False,
1134    )
1135
1136    return code_verifier.decode(), oauth_url, serial
1137
1138
1139async def audible_custom_login(
1140    code_verifier: str, response_url: str, serial: str, locale: str
1141) -> audible.Authenticator:
1142    """Complete the authentication using the code_verifier, response_url, and serial.
1143
1144    :param code_verifier: The code verifier string used in OAuth flow.
1145    :param response_url: The response URL containing the authorization code.
1146    :param serial: The device serial number.
1147    :param locale: The locale string.
1148    :return: Audible Authenticator object.
1149    :raises LoginFailed: If authorization code is not found in the URL.
1150    """
1151    logger = logging.getLogger("audible_helper")
1152    auth = audible.Authenticator()
1153    auth.locale = audible.localization.Locale(locale)
1154
1155    response_url_parsed = urlparse(response_url)
1156    parsed_qs = parse_qs(response_url_parsed.query)
1157
1158    # Try multiple parameter names for authorization code
1159    # Audible may use different parameter names depending on the flow
1160    authorization_code = None
1161    for param_name in ["openid.oa2.authorization_code", "authorization_code", "code"]:
1162        if codes := parsed_qs.get(param_name):
1163            authorization_code = codes[0]
1164            logger.debug("Found authorization code in parameter: %s", param_name)
1165            break
1166
1167    if not authorization_code:
1168        available_params = list(parsed_qs.keys())
1169        raise LoginFailed(
1170            f"Authorization code not found in URL. "
1171            f"Expected 'openid.oa2.authorization_code' but found parameters: {available_params}"
1172        )
1173
1174    registration_data = await asyncio.to_thread(
1175        audible.register.register,
1176        authorization_code=authorization_code,
1177        code_verifier=code_verifier.encode(),
1178        domain=auth.locale.domain,
1179        serial=serial,
1180    )
1181    auth._update_attrs(**registration_data)
1182
1183    # Log what auth methods are available after registration
1184    if auth.adp_token and auth.device_private_key:
1185        logger.info("Registration successful with signing auth (stable)")
1186    else:
1187        logger.warning("Registration successful but signing auth not available")
1188
1189    return auth
1190
1191
1192async def check_file_exists(path: str | PathLike[str]) -> bool:
1193    """Async file exists check."""
1194    return await asyncio.to_thread(os.path.exists, path)
1195
1196
1197async def remove_file(path: str | PathLike[str]) -> None:
1198    """Async file delete."""
1199    await asyncio.to_thread(os.remove, path)
1200