music-assistant-server

70.7 KBPY
__init__.py
70.7 KB1,691 lines • python
1"""Audiobookshelf (abs) provider for Music Assistant."""
2
3from __future__ import annotations
4
5import asyncio
6import functools
7import itertools
8import time
9from collections.abc import AsyncGenerator, Callable, Coroutine, Sequence
10from contextlib import suppress
11from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
12
13import aioaudiobookshelf as aioabs
14from aioaudiobookshelf.client.items import LibraryItemExpandedBook as AbsLibraryItemExpandedBook
15from aioaudiobookshelf.client.items import (
16    LibraryItemExpandedPodcast as AbsLibraryItemExpandedPodcast,
17)
18from aioaudiobookshelf.client.items import PlaybackSessionExpanded as AbsPlaybackSessionExpanded
19from aioaudiobookshelf.client.items import PlaybackSessionParameters as AbsPlaybackSessionParameters
20from aioaudiobookshelf.client.session import SyncOpenSessionParameters
21from aioaudiobookshelf.exceptions import LoginError as AbsLoginError
22from aioaudiobookshelf.exceptions import RefreshTokenExpiredError
23from aioaudiobookshelf.exceptions import SessionNotFoundError as AbsSessionNotFoundError
24from aioaudiobookshelf.schema.author import AuthorExpanded
25from aioaudiobookshelf.schema.calls_authors import (
26    AuthorWithItemsAndSeries as AbsAuthorWithItemsAndSeries,
27)
28from aioaudiobookshelf.schema.calls_series import SeriesWithProgress as AbsSeriesWithProgress
29from aioaudiobookshelf.schema.library import (
30    LibraryItemExpanded,
31    LibraryItemExpandedBook,
32    LibraryItemExpandedPodcast,
33    LibraryItemMinifiedPodcast,
34)
35from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType
36from aioaudiobookshelf.schema.session import DeviceInfo as AbsDeviceInfo
37from aioaudiobookshelf.schema.shelf import (
38    SeriesShelf,
39    ShelfAuthors,
40    ShelfBook,
41    ShelfEpisode,
42    ShelfLibraryItemMinified,
43    ShelfPodcast,
44    ShelfSeries,
45)
46from aioaudiobookshelf.schema.shelf import ShelfId as AbsShelfId
47from aioaudiobookshelf.schema.shelf import ShelfType as AbsShelfType
48from aiohttp import web
49from music_assistant_models.config_entries import (
50    ConfigEntry,
51    ConfigValueType,
52    ProviderConfig,
53)
54from music_assistant_models.enums import (
55    ConfigEntryType,
56    ContentType,
57    MediaType,
58    ProviderFeature,
59    StreamType,
60)
61from music_assistant_models.errors import LoginFailed, MediaNotFoundError
62from music_assistant_models.media_items import (
63    Audiobook,
64    AudioFormat,
65    BrowseFolder,
66    ItemMapping,
67    MediaItemType,
68    PodcastEpisode,
69    UniqueList,
70)
71from music_assistant_models.media_items.media_item import RecommendationFolder
72from music_assistant_models.streamdetails import MultiPartPath, StreamDetails
73
74from music_assistant.models.music_provider import MusicProvider
75from music_assistant.providers.audiobookshelf.parsers import (
76    parse_audiobook,
77    parse_podcast,
78    parse_podcast_episode,
79)
80
81from .constants import (
82    ABS_BROWSE_ITEMS_TO_PATH,
83    ABS_SHELF_ID_ICONS,
84    ABS_SHELF_ID_TRANSLATION_KEY,
85    AIOHTTP_TIMEOUT,
86    CACHE_CATEGORY_LIBRARIES,
87    CACHE_KEY_LIBRARIES,
88    CONF_API_TOKEN,
89    CONF_HIDE_EMPTY_PODCASTS,
90    CONF_HLS_FORMATS,
91    CONF_OLD_TOKEN,
92    CONF_PASSWORD,
93    CONF_URL,
94    CONF_USE_HLS,
95    CONF_USERNAME,
96    CONF_VERIFY_SSL,
97    HLS_ALL_FORMATS,
98    HLS_FORMATS_SPLIT,
99    AbsBrowseItemsBookTranslationKey,
100    AbsBrowseItemsPodcastTranslationKey,
101    AbsBrowsePaths,
102)
103from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard, SessionHelper
104
105if TYPE_CHECKING:
106    from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved
107    from aioaudiobookshelf.schema.media_progress import MediaProgress
108    from aioaudiobookshelf.schema.streams import Stream as AbsStream
109    from aioaudiobookshelf.schema.user import User
110    from music_assistant_models.media_items import Podcast
111    from music_assistant_models.provider import ProviderManifest
112
113    from music_assistant.mass import MusicAssistant
114    from music_assistant.models import ProviderInstanceType
115
116SUPPORTED_FEATURES = {
117    ProviderFeature.LIBRARY_PODCASTS,
118    ProviderFeature.LIBRARY_AUDIOBOOKS,
119    ProviderFeature.BROWSE,
120    ProviderFeature.RECOMMENDATIONS,
121}
122
123
124async def setup(
125    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
126) -> ProviderInstanceType:
127    """Initialize provider(instance) with given configuration."""
128    return Audiobookshelf(mass, manifest, config, SUPPORTED_FEATURES)
129
130
131async def get_config_entries(
132    mass: MusicAssistant,
133    instance_id: str | None = None,
134    action: str | None = None,
135    values: dict[str, ConfigValueType] | None = None,
136) -> tuple[ConfigEntry, ...]:
137    """
138    Return Config entries to setup this provider.
139
140    instance_id: id of an existing provider instance (None if new instance setup).
141    action: [optional] action key called from config entries UI.
142    values: the (intermediate) raw values for config entries sent with the action.
143    """
144    # ruff: noqa: ARG001
145    return (
146        ConfigEntry(
147            key="label",
148            type=ConfigEntryType.LABEL,
149            label="Please provide the address of your Audiobookshelf instance. To authenticate "
150            "you have two options: "
151            "a) Provide username AND password. Leave the API key empty. "
152            "b) Provide ONLY an API key.",
153        ),
154        ConfigEntry(
155            key=CONF_URL,
156            type=ConfigEntryType.STRING,
157            label="Server",
158            required=True,
159            description="The URL of the Audiobookshelf server to connect to. For example "
160            "https://abs.domain.tld/ or http://192.168.1.4:13378/",
161        ),
162        ConfigEntry(
163            key=CONF_USERNAME,
164            type=ConfigEntryType.STRING,
165            label="Username",
166            required=False,
167            description="The username to authenticate to the remote server.",
168        ),
169        ConfigEntry(
170            key=CONF_PASSWORD,
171            type=ConfigEntryType.SECURE_STRING,
172            label="Password",
173            required=False,
174            description="The password to authenticate to the remote server.",
175        ),
176        ConfigEntry(
177            key=CONF_API_TOKEN,
178            type=ConfigEntryType.SECURE_STRING,
179            label="API key _instead_ of user/ password. (ABS version >= 2.26)",
180            required=False,
181            description="Instead of using a username and password, "
182            "you may provide an API key (ABS version >= 2.26). "
183            "Please consult the docs.",
184        ),
185        ConfigEntry(
186            key=CONF_OLD_TOKEN,
187            type=ConfigEntryType.SECURE_STRING,
188            label="old token",
189            required=False,
190            hidden=True,
191        ),
192        ConfigEntry(
193            key=CONF_USE_HLS,
194            type=ConfigEntryType.BOOLEAN,
195            label="Stream via HLS from ABS.",
196            description="Use an HLS stream when streaming from audiobookshelf.",
197            required=False,
198            default_value=False,
199            advanced=True,
200        ),
201        ConfigEntry(
202            key=CONF_HLS_FORMATS,
203            type=ConfigEntryType.STRING,
204            label=f"Use HLS for these file extensions. Separate with ';'. Use {HLS_ALL_FORMATS} for"
205            " all formats.",
206            description="Use HLS only for these file extensions."
207            f" Separate with ;. E.g. m4b or m4b;aac or {HLS_ALL_FORMATS}",
208            required=False,
209            default_value="m4b",
210            advanced=True,
211        ),
212        ConfigEntry(
213            key=CONF_VERIFY_SSL,
214            type=ConfigEntryType.BOOLEAN,
215            label="Verify SSL",
216            required=False,
217            description="Whether or not to verify the certificate of SSL/TLS connections.",
218            advanced=True,
219            default_value=True,
220        ),
221        ConfigEntry(
222            key=CONF_HIDE_EMPTY_PODCASTS,
223            type=ConfigEntryType.BOOLEAN,
224            label="Hide empty podcasts.",
225            required=False,
226            description="This will skip podcasts with no episodes associated.",
227            advanced=True,
228            default_value=False,
229        ),
230    )
231
232
233R = TypeVar("R")
234P = ParamSpec("P")
235
236
237class Audiobookshelf(MusicProvider):
238    """Audiobookshelf MusicProvider."""
239
240    _on_unload_callbacks: list[Callable[[], None]]
241
242    @staticmethod
243    def handle_refresh_token(
244        method: Callable[P, Coroutine[Any, Any, R]],
245    ) -> Callable[P, Coroutine[Any, Any, R]]:
246        """Decorate a method to handle an expired refresh token by relogin."""
247
248        @functools.wraps(method)
249        async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
250            self = cast("Audiobookshelf", args[0])
251            try:
252                return await method(*args, **kwargs)
253            except RefreshTokenExpiredError:
254                self.logger.debug("Refresh token expired. Trying to renew.")
255                await self.reauthenticate()
256                return await method(*args, **kwargs)
257
258        return wrapper
259
260    async def handle_async_init(self) -> None:
261        """Pass config values to client and initialize."""
262        self._on_unload_callbacks: list[Callable[[], None]] = []
263        self.sessions: dict[str, SessionHelper] = {}  # key is the mass_item_id
264        self.create_session_lock = asyncio.Lock()
265        base_url = str(self.config.get_value(CONF_URL))
266        username = str(self.config.get_value(CONF_USERNAME))
267        password = str(self.config.get_value(CONF_PASSWORD))
268        token_old = self.config.get_value(CONF_OLD_TOKEN)
269        token_api = self.config.get_value(CONF_API_TOKEN)
270        verify_ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
271        session_config = aioabs.SessionConfiguration(
272            session=self.mass.http_session,
273            url=base_url,
274            verify_ssl=verify_ssl,
275            logger=self.logger,
276            pagination_items_per_page=30,  # audible provider goes with 50 for pagination
277            timeout=AIOHTTP_TIMEOUT,
278        )
279        # If we are configured with a non-expiring API key or not.
280        self.is_token_user = False
281        try:
282            if token_api is not None or token_old is not None:
283                _token = token_api if token_api is not None else token_old
284                session_config.token = str(_token)
285                (
286                    self._client,
287                    self._client_socket,
288                ) = await aioabs.get_user_and_socket_client_by_token(session_config=session_config)
289                self.is_token_user = True
290            else:
291                self._client, self._client_socket = await aioabs.get_user_and_socket_client(
292                    session_config=session_config, username=username, password=password
293                )
294            await self._client_socket.init_client()
295        except AbsLoginError as exc:
296            raise LoginFailed(f"Login to abs instance at {base_url} failed.") from exc
297
298        if token_old is not None and token_api is None:
299            # Log Message that the old token won't work
300            _version = self._client.server_settings.version.split(".")
301            if len(_version) >= 2:
302                try:
303                    major, minor = int(_version[0]), int(_version[1])
304                except ValueError:
305                    major = minor = 0
306                if major >= 2 and minor >= 26:
307                    self.logger.warning(
308                        """
309
310######## Audiobookshelf API key change #############################################################
311
312Audiobookshelf introduced a new API key system in version 2.26 (JWT).
313You are still using a token configured with a previous version of Audiobookshelf,
314but you are running version %s. This will stop working in a future Audiobookshelf release.
315Please create a non-expiring API Key instead, and update your configuration accordingly.
316Refer to the documentation of Audiobookshelf, https://www.audiobookshelf.org/guides/api-keys/
317and of Music Assistant https://www.music-assistant.io/music-providers/audiobookshelf/
318for more details.
319
320""",
321                        self._client.server_settings.version,
322                    )
323
324        cached_libraries = await self.mass.cache.get(
325            key=CACHE_KEY_LIBRARIES,
326            provider=self.instance_id,
327            category=CACHE_CATEGORY_LIBRARIES,
328            default=None,
329        )
330        if cached_libraries is None:
331            self.libraries = LibrariesHelper()
332            # We need the library ids for recommendations. If the cache got cleared e.g. by a db
333            # migration, we might end up with empty library helpers on a configured provider. Note,
334            # that the lib item ids are not synced, still only on full provider sync, instead the
335            # sets are empty. Full sync is expensive.
336            # See warning in browse_lib_podcasts / _browse_books
337            libraries = await self._client.get_all_libraries()
338            for library in libraries:
339                if library.media_type == AbsLibraryMediaType.BOOK:
340                    self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
341                elif library.media_type == AbsLibraryMediaType.PODCAST:
342                    self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
343        else:
344            self.libraries = LibrariesHelper.from_dict(cached_libraries)
345
346        # set socket callbacks
347        self._client_socket.set_item_callbacks(
348            on_item_added=self._socket_abs_item_changed,
349            on_item_updated=self._socket_abs_item_changed,
350            on_item_removed=self._socket_abs_item_removed,
351            on_items_added=self._socket_abs_item_changed,
352            on_items_updated=self._socket_abs_item_changed,
353        )
354
355        self._client_socket.set_user_callbacks(
356            on_user_item_progress_updated=self._socket_abs_user_item_progress_updated,
357        )
358
359        self._client_socket.set_refresh_token_expired_callback(
360            on_refresh_token_expired=self._socket_abs_refresh_token_expired
361        )
362
363        self._client_socket.set_stream_callbacks(on_stream_open=self._socket_stream_open)
364
365        # progress guard
366        self.progress_guard = ProgressGuard()
367
368        # safe guard reauthentication
369        self.reauthenticate_lock = asyncio.Lock()
370        self.reauthenticate_last = 0.0
371
372        # register dynamic stream route for audiobook parts
373        self._on_unload_callbacks.append(
374            self.mass.streams.register_dynamic_route(
375                f"/{self.instance_id}_part_stream", self._handle_session_part_request
376            )
377        )
378
379    @handle_refresh_token
380    async def unload(self, is_removed: bool = False) -> None:
381        """
382        Handle unload/close of the provider.
383
384        Called when provider is deregistered (e.g. MA exiting or config reloading).
385        is_removed will be set to True when the provider is removed from the configuration.
386        """
387        await self._client.logout()
388        await self._client_socket.logout()
389        for callback in self._on_unload_callbacks:
390            callback()
391
392    @property
393    def is_streaming_provider(self) -> bool:
394        """Return True if the provider is a streaming provider."""
395        # For streaming providers return True here but for local file based providers return False.
396        return False
397
398    @handle_refresh_token
399    async def sync_library(self, media_type: MediaType) -> None:
400        """Obtain audiobook library ids and podcast library ids."""
401        libraries = await self._client.get_all_libraries()
402        if len(libraries) == 0:
403            self._log_no_libraries()
404        for library in libraries:
405            if library.media_type == AbsLibraryMediaType.BOOK and media_type == MediaType.AUDIOBOOK:
406                self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
407            elif (
408                library.media_type == AbsLibraryMediaType.PODCAST
409                and media_type == MediaType.PODCAST
410            ):
411                self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
412        await super().sync_library(media_type)
413        await self._cache_set_helper_libraries()
414
415        # update playlog
416        user = await self._client.get_my_user()
417        await self._set_playlog_from_user(user)
418
419    async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
420        """Retrieve library/subscribed podcasts from the provider.
421
422        Minified podcast information is enough.
423        """
424        for pod_lib_id in self.libraries.podcasts:
425            async for response in self._client.get_library_items(library_id=pod_lib_id):
426                if not response.results:
427                    break
428                podcast_ids = [x.id_ for x in response.results]
429                # store uuids
430                self.libraries.podcasts[pod_lib_id].item_ids.update(podcast_ids)
431                for podcast_minified in response.results:
432                    assert isinstance(podcast_minified, LibraryItemMinifiedPodcast)
433                    mass_podcast = parse_podcast(
434                        abs_podcast=podcast_minified,
435                        instance_id=self.instance_id,
436                        domain=self.domain,
437                        token=self._client.token,
438                        base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
439                    )
440                    if (
441                        bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
442                        and mass_podcast.total_episodes == 0
443                    ):
444                        continue
445                    yield mass_podcast
446
447    @handle_refresh_token
448    async def _get_abs_expanded_podcast(
449        self, prov_podcast_id: str
450    ) -> AbsLibraryItemExpandedPodcast:
451        abs_podcast = await self._client.get_library_item_podcast(
452            podcast_id=prov_podcast_id, expanded=True
453        )
454        assert isinstance(abs_podcast, AbsLibraryItemExpandedPodcast)
455
456        return abs_podcast
457
458    @handle_refresh_token
459    async def get_podcast(self, prov_podcast_id: str) -> Podcast:
460        """Get single podcast."""
461        abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
462        return parse_podcast(
463            abs_podcast=abs_podcast,
464            instance_id=self.instance_id,
465            domain=self.domain,
466            token=self._client.token,
467            base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
468        )
469
470    async def get_podcast_episodes(
471        self, prov_podcast_id: str
472    ) -> AsyncGenerator[PodcastEpisode, None]:
473        """Get all podcast episodes of podcast.
474
475        Adds progress information.
476        """
477        abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
478        episode_cnt = 1
479        # the user has the progress of all media items
480        # so we use a single api call here to obtain possibly many
481        # progresses for episodes
482        user = await self._client.get_my_user()
483        abs_progresses = {
484            x.episode_id: x
485            for x in user.media_progress
486            if x.episode_id is not None and x.library_item_id == prov_podcast_id
487        }
488        for abs_episode in abs_podcast.media.episodes:
489            progress = abs_progresses.get(abs_episode.id_, None)
490            mass_episode = parse_podcast_episode(
491                episode=abs_episode,
492                prov_podcast_id=prov_podcast_id,
493                fallback_episode_cnt=episode_cnt,
494                instance_id=self.instance_id,
495                domain=self.domain,
496                token=self._client.token,
497                base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
498                media_progress=progress,
499            )
500            yield mass_episode
501            episode_cnt += 1
502
503    @handle_refresh_token
504    async def get_podcast_episode(
505        self, prov_episode_id: str, add_progress: bool = True
506    ) -> PodcastEpisode:
507        """Get single podcast episode."""
508        prov_podcast_id, e_id = prov_episode_id.split(" ")
509        abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
510        episode_cnt = 1
511        for abs_episode in abs_podcast.media.episodes:
512            if abs_episode.id_ == e_id:
513                progress = None
514                if add_progress:
515                    progress = await self._client.get_my_media_progress(
516                        item_id=prov_podcast_id, episode_id=abs_episode.id_
517                    )
518                return parse_podcast_episode(
519                    episode=abs_episode,
520                    prov_podcast_id=prov_podcast_id,
521                    fallback_episode_cnt=episode_cnt,
522                    instance_id=self.instance_id,
523                    domain=self.domain,
524                    token=self._client.token,
525                    base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
526                    media_progress=progress,
527                )
528
529            episode_cnt += 1
530        raise MediaNotFoundError("Episode not found")
531
532    async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
533        """Get Audiobook libraries.
534
535        Need expanded version for chapters.
536        """
537        for book_lib_id in self.libraries.audiobooks:
538            async for response in self._client.get_library_items(library_id=book_lib_id):
539                if not response.results:
540                    break
541                book_ids = [x.id_ for x in response.results]
542                # store uuids
543                self.libraries.audiobooks[book_lib_id].item_ids.update(book_ids)
544                # use expanded version for chapters/ caching.
545                books_expanded = await self._client.get_library_item_batch_book(item_ids=book_ids)
546                for book_expanded in books_expanded:
547                    # If the book has no audiofiles, we skip -> ebook only.
548                    if len(book_expanded.media.tracks) == 0:
549                        continue
550                    mass_audiobook = parse_audiobook(
551                        abs_audiobook=book_expanded,
552                        instance_id=self.instance_id,
553                        domain=self.domain,
554                        token=self._client.token,
555                        base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
556                    )
557                    yield mass_audiobook
558
559    @handle_refresh_token
560    async def _get_abs_expanded_audiobook(
561        self, prov_audiobook_id: str
562    ) -> AbsLibraryItemExpandedBook:
563        abs_audiobook = await self._client.get_library_item_book(
564            book_id=prov_audiobook_id, expanded=True
565        )
566        assert isinstance(abs_audiobook, AbsLibraryItemExpandedBook)
567
568        return abs_audiobook
569
570    @handle_refresh_token
571    async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
572        """Get a single audiobook.
573
574        Progress is added here.
575        """
576        progress = await self._client.get_my_media_progress(item_id=prov_audiobook_id)
577        abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=prov_audiobook_id)
578        return parse_audiobook(
579            abs_audiobook=abs_audiobook,
580            instance_id=self.instance_id,
581            domain=self.domain,
582            token=self._client.token,
583            base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
584            media_progress=progress,
585        )
586
587    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
588        """Get stream of item."""
589        # We always create a playback session. The default is direct playback.
590        # In that case, session.tracks holds the exact same as the audiobook/ podcast.track,
591        # so we only use the session to update our progress.
592        #
593        # In the case of hls the session has an hls stream as track.
594        if media_type in (MediaType.PODCAST_EPISODE, MediaType.AUDIOBOOK):
595            session = await self._get_playback_session(mass_item_id=item_id)
596            return await self._get_stream_details_session(
597                session, session_helper=self.sessions[item_id], media_type=media_type
598            )
599        raise MediaNotFoundError("Stream unknown")
600
601    async def _get_stream_details_session(
602        self,
603        abs_session: AbsPlaybackSessionExpanded,
604        session_helper: SessionHelper,
605        media_type: MediaType,
606    ) -> StreamDetails:
607        """Streamdetails audiobook.
608
609        We always use a custom stream type, also for single file, such
610        that we can handle an ffmpeg error and refresh our tokens.
611        """
612        abs_base_url = str(self.config.get_value(CONF_URL))
613        tracks = abs_session.audio_tracks
614
615        if len(tracks) == 0:
616            raise MediaNotFoundError("Session has no tracks.")
617
618        content_type = ContentType.UNKNOWN
619        if abs_session.audio_tracks[0].metadata is not None:
620            content_type = ContentType.try_parse(abs_session.audio_tracks[0].metadata.ext)
621
622        file_parts: list[MultiPartPath] = []
623        if self.is_token_user:
624            self.logger.debug("Token User - Streams are direct.")
625        for idx, track in enumerate(tracks):
626            if self.is_token_user:
627                # an api key is long-lived
628                stream_url = f"{abs_base_url}{track.content_url}?token={self._client.token}"
629            else:
630                # to ensure token is always valid, we create a dynamic url
631                # this ensures that we always get a fresh token on each part
632                # without having to deal with a custom stream etc.
633                # we also use this for a single track/ hls stream, otherwise we can't seek
634                stream_url = (
635                    f"{self.mass.streams.base_url}/{self.instance_id}_part_stream?"
636                    f"session_id={abs_session.id_}&part_id={idx}"
637                )
638            file_parts.append(MultiPartPath(path=stream_url, duration=track.duration))
639
640        stream_type = StreamType.HLS if "hls" in file_parts[0].path else StreamType.HTTP
641        if stream_type == StreamType.HLS:
642            # wait for stream to be ready
643            try:
644                await asyncio.wait_for(session_helper.hls_stream_open.wait(), 10)
645            except TimeoutError:
646                self.logger.warning(
647                    "Did not receive HLS stream open event after 10s, continuing anyways."
648                )
649
650        return StreamDetails(
651            provider=self.instance_id,
652            item_id=abs_session.id_,
653            audio_format=AudioFormat(content_type=content_type),
654            media_type=media_type,
655            stream_type=stream_type,
656            duration=int(abs_session.duration),
657            path=file_parts[0].path if len(file_parts) == 1 else file_parts,
658            can_seek=True,
659            allow_seek=True,
660        )
661
662    async def _get_playback_session(self, mass_item_id: str) -> AbsPlaybackSessionExpanded:
663        """Either creates or returns an open abs session."""
664        async with self.create_session_lock:
665            # check for an available open session
666            if session_helper := self.sessions.get(mass_item_id):
667                with suppress(AbsSessionNotFoundError):
668                    return await self._client.get_open_session(
669                        session_id=session_helper.abs_session_id
670                    )
671
672            item_ids = mass_item_id.split(" ")
673            abs_item_id = item_ids[0]
674            episode_id = item_ids[1] if len(item_ids) == 2 else None
675
676            # Create a new session
677            ## Check HLS usage
678            use_hls = bool(self.config.get_value(CONF_USE_HLS))
679            hls_formats = str(self.config.get_value(CONF_HLS_FORMATS))
680            if use_hls and hls_formats != HLS_ALL_FORMATS:
681                use_hls = False  # only for certain formats
682                extensions = [x.lstrip(".") for x in hls_formats.split(HLS_FORMATS_SPLIT)]
683                if episode_id is None:
684                    if (
685                        metadata := (await self._get_abs_expanded_audiobook(abs_item_id))
686                        .media.tracks[0]
687                        .metadata
688                    ):
689                        if metadata.ext.lstrip(".") in extensions:
690                            use_hls = True
691                else:
692                    podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_item_id)
693                    episode = None
694                    for episode in podcast.media.episodes:
695                        if episode.id_ == episode_id:
696                            break
697                    if episode and (metadata := episode.audio_track.metadata):
698                        if metadata.ext.lstrip(".") in extensions:
699                            use_hls = True
700
701            client_name = f"Music Assistant {self.instance_id}"
702            device_info = AbsDeviceInfo(
703                device_id=self.instance_id,
704                client_name=client_name,
705                client_version=self.mass.version,
706                manufacturer="",
707                model=self.mass.server_id,
708            )
709
710            session = await self._client.get_playback_session(
711                # These parameters give an hls if we don't enforce direct play stream,
712                # which is only a concat of the individual file's at abs
713                session_parameters=AbsPlaybackSessionParameters(
714                    device_info=device_info,
715                    force_direct_play=not use_hls,
716                    force_transcode=use_hls,
717                    # mimetypes are only checked for abs' internal "should transcode
718                    # see https://github.com/advplyr/audiobookshelf/blob/master/server/managers/PlaybackSessionManager.js
719                    supported_mime_types=[],
720                    media_player=client_name,
721                ),
722                item_id=abs_item_id,
723                episode_id=episode_id,
724            )
725
726            if use_hls:
727                # Safety check.
728                track_url = session.audio_tracks[0].content_url
729                if track_url.split("/")[1] != "hls":
730                    raise MediaNotFoundError("Did expect HLS stream for session playback")
731                self.logger.debug("Using an HLS stream for playback.")
732
733            self.sessions[mass_item_id] = SessionHelper(
734                abs_session_id=session.id_,
735                last_sync_time=time.time(),
736                hls_stream_open=asyncio.Event(),
737            )
738            return session
739
740    @handle_refresh_token
741    async def _handle_session_part_request(self, request: web.Request) -> web.Response:
742        """
743        Handle dynamic audiobook part stream request.
744
745        We redirect to the actual stream url with token.
746        This is done because the token might expire, so we need to
747        generate a fresh url on each part.
748        """
749        if not (session_id := request.query.get("session_id")):
750            return web.Response(status=400, text="Missing session_id")
751        if not (part_id := request.query.get("part_id")):
752            return web.Response(status=400, text="Missing part_id")
753        self.logger.debug(
754            "Handling session part request for session %s and part %s", session_id, part_id
755        )
756        try:
757            abs_session = await self._client.get_open_session(session_id=session_id)
758        except AbsSessionNotFoundError as err:
759            raise web.HTTPNotFound from err
760        part_id = int(part_id)  # type: ignore[assignment]
761        try:
762            part_track = abs_session.audio_tracks[part_id]
763        except IndexError:
764            return web.Response(status=404, text="Part not found")
765
766        base_url = str(self.config.get_value(CONF_URL))
767        stream_url = f"{base_url}{part_track.content_url}?token={self._client.token}"
768        # redirect to the actual stream url
769        raise web.HTTPFound(location=stream_url)
770
771    @handle_refresh_token
772    async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
773        """Return finished:bool, position_ms: int."""
774        # this method is called _before_ get_stream_details, so the playback session
775        # is created here.
776        session = await self._get_playback_session(mass_item_id=item_id)
777        finished = session.current_time > session.duration - 30
778        self.logger.debug("Resume position: obtained.")
779        return finished, int(session.current_time * 1000)
780
781    @handle_refresh_token
782    async def recommendations(self) -> list[RecommendationFolder]:
783        """Get recommendations."""
784        # We have to avoid "flooding" the home page, which becomes especially troublesome if users
785        # have multiple libraries. Instead we collect per ShelfId, and make sure, that we always get
786        # roughly the same amount of items per row, no matter the amount of libraries
787        # List of list (one list per lib) here, such that we can pick the items per lib later.
788        items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]] = {}
789
790        all_libraries = {**self.libraries.audiobooks, **self.libraries.podcasts}
791        max_items_per_row = 20
792        num_libraries = len(all_libraries)
793
794        if num_libraries == 0:
795            self._log_no_libraries()
796            return []
797
798        limit_items_per_lib = max_items_per_row // num_libraries
799        limit_items_per_lib = 1 if limit_items_per_lib == 0 else limit_items_per_lib
800
801        for library_id in all_libraries:
802            shelves = await self._client.get_library_personalized_view(
803                library_id=library_id, limit=limit_items_per_lib
804            )
805            await self._recommendations_iter_shelves(shelves, library_id, items_by_shelf_id)
806
807        folders: list[RecommendationFolder] = []
808        for shelf_id, item_lists in items_by_shelf_id.items():
809            # we have something like [[A, B], [C, D, E], [F]]
810            # and want [A, C, F, B, D, E]
811            recommendation_items = [
812                x
813                for x in itertools.chain.from_iterable(itertools.zip_longest(*item_lists))
814                if x is not None
815            ][:max_items_per_row]
816
817            # shelf ids follow pattern:
818            # recently-added
819            # newest-episodes
820            # etc
821            name = f"{shelf_id.capitalize().replace('-', ' ')}"
822            if ABS_SHELF_ID_TRANSLATION_KEY.get(shelf_id):
823                name = ""  # use translation key if available
824            folders.append(
825                RecommendationFolder(
826                    item_id=f"{shelf_id}",
827                    name=name,
828                    icon=ABS_SHELF_ID_ICONS.get(shelf_id),
829                    translation_key=ABS_SHELF_ID_TRANSLATION_KEY.get(shelf_id),
830                    items=UniqueList(recommendation_items),
831                    provider=self.instance_id,
832                )
833            )
834
835        # Browse "recommendation" for convenience. If the user has
836        # multiple audiobook libraries, we return a listing of them.
837        # If there is only a single audiobook library, we add the folders
838        # from _browse_lib_audiobooks, i.e. Authors, Narrators etc.
839        # Podcast libs do not have filter folders, so always the root folders.
840        browse_items: list[MediaItemType | BrowseFolder] = []
841        translation_key = "libraries"
842        if len(self.libraries.audiobooks) <= 1:
843            if len(self.libraries.podcasts) == 0:
844                translation_key = "library"
845
846            # audiobooklibs are first, and we have at max 1 audiobook lib
847            _browse_root = self._browse_root(append_mediatype_suffix=False)
848            if len(self.libraries.audiobooks) == 0:
849                browse_items.extend(_browse_root)
850            else:
851                assert isinstance(_browse_root[0], BrowseFolder)
852                _path = _browse_root[0].path
853                browse_items.extend(self._browse_lib_audiobooks(current_path=_path))
854                # add podcast roots
855                browse_items.extend(_browse_root[1:])
856        else:
857            browse_items = list(self._browse_root())
858
859        folders.append(
860            RecommendationFolder(
861                item_id="browse",
862                name="",  # use translation key
863                icon="mdi-bookshelf",
864                translation_key=translation_key,
865                items=UniqueList(browse_items),
866                provider=self.instance_id,
867            )
868        )
869
870        return folders
871
872    async def _recommendations_iter_shelves(
873        self,
874        shelves: list[ShelfBook | ShelfPodcast | ShelfAuthors | ShelfEpisode | ShelfSeries],
875        library_id: str,
876        items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]],
877    ) -> None:
878        for shelf in shelves:
879            media_type: MediaType
880            match shelf.type_:
881                case AbsShelfType.PODCAST:
882                    media_type = MediaType.PODCAST
883                case AbsShelfType.EPISODE:
884                    media_type = MediaType.PODCAST_EPISODE
885                case AbsShelfType.BOOK:
886                    media_type = MediaType.AUDIOBOOK
887                case AbsShelfType.SERIES | AbsShelfType.AUTHORS:
888                    media_type = MediaType.FOLDER
889                case _:
890                    # this would be authors, currently
891                    continue
892
893            items: list[MediaItemType | BrowseFolder] = []
894            # Recently added is the _only_ case, where we get a full podcast
895            # We have a podcast object with only the episodes matching the
896            # shelf.id_ otherwise.
897            match shelf.id_:
898                case (
899                    AbsShelfId.RECENTLY_ADDED
900                    | AbsShelfId.LISTEN_AGAIN
901                    | AbsShelfId.DISCOVER
902                    | AbsShelfId.NEWEST_EPISODES
903                    | AbsShelfId.CONTINUE_LISTENING
904                ):
905                    for entity in shelf.entities:
906                        assert isinstance(entity, ShelfLibraryItemMinified)
907                        item: MediaItemType | None = None
908                        if media_type in [MediaType.PODCAST, MediaType.AUDIOBOOK]:
909                            item = await self.mass.music.get_library_item_by_prov_id(
910                                media_type=media_type,
911                                provider_instance_id_or_domain=self.instance_id,
912                                item_id=entity.id_,
913                            )
914                        elif media_type == MediaType.PODCAST_EPISODE:
915                            podcast_id = entity.id_
916                            if entity.recent_episode is None:
917                                continue
918                            # we only have a PodcastEpisode here, with limited information
919                            item = parse_podcast_episode(
920                                episode=entity.recent_episode,
921                                prov_podcast_id=podcast_id,
922                                instance_id=self.instance_id,
923                                domain=self.domain,
924                                token=self._client.token,
925                                base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
926                            )
927                        if item is not None:
928                            items.append(item)
929                case AbsShelfId.RECENT_SERIES | AbsShelfId.CONTINUE_SERIES:
930                    # We jump into a browse folder here if we have SeriesShelf, set path up as if
931                    # browse function used.
932                    if isinstance(shelf, ShelfSeries):
933                        for entity in shelf.entities:
934                            assert isinstance(entity, SeriesShelf)
935                            if len(entity.books) == 0:
936                                continue
937                            path = (
938                                f"{self.instance_id}://"
939                                f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
940                                f"{AbsBrowsePaths.SERIES}/{entity.id_}"
941                            )
942                            items.append(
943                                BrowseFolder(
944                                    item_id=entity.id_,
945                                    name=entity.name,
946                                    provider=self.instance_id,
947                                    path=path,
948                                )
949                            )
950                    elif isinstance(shelf, ShelfBook) and media_type == MediaType.AUDIOBOOK:
951                        # Single books, must be audiobooks
952                        for entity in shelf.entities:
953                            item = await self.mass.music.get_library_item_by_prov_id(
954                                media_type=media_type,
955                                provider_instance_id_or_domain=self.instance_id,
956                                item_id=entity.id_,
957                            )
958                            if item is not None:
959                                items.append(item)
960                case AbsShelfId.NEWEST_AUTHORS:
961                    # same as for series, use a folder
962                    for entity in shelf.entities:
963                        assert isinstance(entity, AuthorExpanded)
964                        if entity.num_books == 0:
965                            continue
966                        path = (
967                            f"{self.instance_id}://"
968                            f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
969                            f"{AbsBrowsePaths.AUTHORS}/{entity.id_}"
970                        )
971                        items.append(
972                            BrowseFolder(
973                                item_id=entity.id_,
974                                name=entity.name,
975                                provider=self.instance_id,
976                                path=path,
977                            )
978                        )
979            if not items:
980                continue
981
982            # add collected items
983            assert isinstance(shelf.id_, AbsShelfId)
984            items_collected = items_by_shelf_id.get(shelf.id_, [])
985            items_collected.append(items)
986            items_by_shelf_id[shelf.id_] = items_collected
987
988    @handle_refresh_token
989    async def on_played(
990        self,
991        media_type: MediaType,
992        prov_item_id: str,
993        fully_played: bool,
994        position: int,
995        media_item: MediaItemType,
996        is_playing: bool = False,
997    ) -> None:
998        """Update progress in Audiobookshelf.
999
1000        In our case media_type may have 3 values:
1001            - PODCAST
1002            - PODCAST_EPISODE
1003            - AUDIOBOOK
1004        We ignore PODCAST (function is called on adding a podcast with position=None)
1005
1006        """
1007
1008        async def _update_by_session(session_helper: SessionHelper, duration: int) -> bool:
1009            now = time.time()
1010            try:
1011                await self._client.sync_open_session(
1012                    session_id=session_helper.abs_session_id,
1013                    parameters=SyncOpenSessionParameters(
1014                        current_time=position,
1015                        time_listened=now - session_helper.last_sync_time,
1016                        duration=duration,
1017                    ),
1018                )
1019                session_helper.last_sync_time = now
1020                self.logger.debug("Synced playback session, position %s s.", position)
1021                return True
1022            except AbsSessionNotFoundError:
1023                self.logger.error("Was unable to sync session.")
1024            return False
1025
1026        if media_type == MediaType.PODCAST_EPISODE:
1027            abs_podcast_id, abs_episode_id = prov_item_id.split(" ")
1028
1029            # guard, see progress guard class docstrings for explanation
1030            if not self.progress_guard.guard_ok_mass(
1031                item_id=abs_podcast_id, episode_id=abs_episode_id
1032            ):
1033                return
1034            self.progress_guard.add_progress(item_id=abs_podcast_id, episode_id=abs_episode_id)
1035
1036            if media_item is None or not isinstance(media_item, PodcastEpisode):
1037                return
1038
1039            if fully_played and position < media_item.duration - 30:
1040                # faulty position update
1041                # occurs sometimes, if a player disconnects unexpectedly, or reports
1042                # a false position - seen this for MC players, but not for sendspin
1043                return
1044
1045            if position == 0 and not fully_played:
1046                # marked unplayed
1047                mp = await self._client.get_my_media_progress(
1048                    item_id=abs_podcast_id, episode_id=abs_episode_id
1049                )
1050                if mp is not None:
1051                    await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1052                    self.logger.debug(f"Removed media progress of {media_type.value}.")
1053                    return
1054
1055            duration = media_item.duration
1056            updated = False
1057            if session_helper := self.sessions.get(prov_item_id):
1058                updated = await _update_by_session(session_helper=session_helper, duration=duration)
1059            if not updated:
1060                self.logger.debug(
1061                    f"Updating media progress of {media_type.value}, title {media_item.name}."
1062                )
1063                await self._client.update_my_media_progress(
1064                    item_id=abs_podcast_id,
1065                    episode_id=abs_episode_id,
1066                    duration_seconds=duration,
1067                    progress_seconds=position,
1068                    is_finished=fully_played,
1069                )
1070
1071        if media_type == MediaType.AUDIOBOOK:
1072            # guard, see progress guard class docstrings for explanation
1073            if not self.progress_guard.guard_ok_mass(item_id=prov_item_id):
1074                return
1075            self.progress_guard.add_progress(item_id=prov_item_id)
1076
1077            if media_item is None or not isinstance(media_item, Audiobook):
1078                return
1079
1080            if fully_played and position < media_item.duration - 30:
1081                # faulty position update, see above
1082                return
1083
1084            if position == 0 and not fully_played:
1085                # marked unplayed
1086                mp = await self._client.get_my_media_progress(item_id=prov_item_id)
1087                if mp is not None:
1088                    await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1089                    self.logger.debug(f"Removed media progress of {media_type.value}.")
1090                return
1091
1092            duration = media_item.duration
1093            updated = False
1094            if session_helper := self.sessions.get(prov_item_id):
1095                updated = await _update_by_session(session_helper=session_helper, duration=duration)
1096            if not updated:
1097                self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress")
1098                await self._client.update_my_media_progress(
1099                    item_id=prov_item_id,
1100                    duration_seconds=duration,
1101                    progress_seconds=position,
1102                    is_finished=fully_played,
1103                )
1104
1105    @handle_refresh_token
1106    async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
1107        """Browse for audiobookshelf.
1108
1109        Generates this view:
1110        Library_Name_A (Audiobooks)
1111            Audiobooks
1112                Audiobook_1
1113                Audiobook_2
1114            Series
1115                Series_1
1116                    Audiobook_1
1117                    Audiobook_2
1118                Series_2
1119                    Audiobook_3
1120                    Audiobook_4
1121            Collections
1122                Collection_1
1123                    Audiobook_1
1124                    Audiobook_2
1125                Collection_2
1126                    Audiobook_3
1127                    Audiobook_4
1128            Authors
1129                Author_1
1130                    Series_1
1131                    Audiobook_1
1132                    Audiobook_2
1133                Author_2
1134                    Audiobook_3
1135        Library_Name_B (Podcasts)
1136            Podcast_1
1137            Podcast_2
1138        """
1139        item_path = path.split("://", 1)[1]
1140        if not item_path:
1141            return self._browse_root()
1142        sub_path = item_path.split("/")
1143        lib_key, lib_id = sub_path[0].split(" ")
1144        if len(sub_path) == 1:
1145            if lib_key == AbsBrowsePaths.LIBRARIES_PODCAST:
1146                return await self._browse_lib_podcasts(library_id=lib_id)
1147            return self._browse_lib_audiobooks(current_path=path)
1148        if len(sub_path) == 2:
1149            item_key = sub_path[1]
1150            match item_key:
1151                case AbsBrowsePaths.AUTHORS:
1152                    return await self._browse_authors(current_path=path, library_id=lib_id)
1153                case AbsBrowsePaths.NARRATORS:
1154                    return await self._browse_narrators(current_path=path, library_id=lib_id)
1155                case AbsBrowsePaths.SERIES:
1156                    return await self._browse_series(current_path=path, library_id=lib_id)
1157                case AbsBrowsePaths.COLLECTIONS:
1158                    return await self._browse_collections(current_path=path, library_id=lib_id)
1159                case AbsBrowsePaths.AUDIOBOOKS:
1160                    return await self._browse_books(library_id=lib_id)
1161        elif len(sub_path) == 3:
1162            item_key, item_id = sub_path[1:3]
1163            match item_key:
1164                case AbsBrowsePaths.AUTHORS:
1165                    return await self._browse_author_books(current_path=path, author_id=item_id)
1166                case AbsBrowsePaths.NARRATORS:
1167                    return await self._browse_narrator_books(
1168                        library_id=lib_id, narrator_filter_str=item_id
1169                    )
1170                case AbsBrowsePaths.SERIES:
1171                    return await self._browse_series_books(series_id=item_id)
1172                case AbsBrowsePaths.COLLECTIONS:
1173                    return await self._browse_collection_books(collection_id=item_id)
1174        elif len(sub_path) == 4:
1175            # series within author
1176            series_id = sub_path[3]
1177            return await self._browse_series_books(series_id=series_id)
1178        return []
1179
1180    def _browse_root(self, append_mediatype_suffix: bool = True) -> Sequence[BrowseFolder]:
1181        items = []
1182
1183        def _get_folder(
1184            path: str, lib_id: str, lib_name: str, translation_key: str | None = None
1185        ) -> BrowseFolder:
1186            return BrowseFolder(
1187                item_id=lib_id,
1188                name=lib_name,
1189                translation_key=translation_key,  # if given, <name>: <translation> in frontend
1190                provider=self.instance_id,
1191                path=f"{self.instance_id}://{path}",
1192            )
1193
1194        if len(self.libraries.audiobooks) == 0 and len(self.libraries.podcasts) == 0:
1195            self._log_no_libraries()
1196            return []
1197
1198        translation_key: str | None
1199        for lib_id, lib in self.libraries.audiobooks.items():
1200            path = f"{AbsBrowsePaths.LIBRARIES_BOOK} {lib_id}"
1201            translation_key = None
1202            if append_mediatype_suffix:
1203                translation_key = AbsBrowseItemsBookTranslationKey.AUDIOBOOKS
1204            items.append(
1205                _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1206            )
1207        for lib_id, lib in self.libraries.podcasts.items():
1208            path = f"{AbsBrowsePaths.LIBRARIES_PODCAST} {lib_id}"
1209            translation_key = None
1210            if append_mediatype_suffix:
1211                translation_key = AbsBrowseItemsPodcastTranslationKey.PODCASTS
1212            items.append(
1213                _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1214            )
1215        return items
1216
1217    async def _browse_lib_podcasts(self, library_id: str) -> list[MediaItemType]:
1218        """No sub categories for podcasts."""
1219        if len(self.libraries.podcasts[library_id].item_ids) == 0:
1220            self._log_no_helper_item_ids()
1221        items = []
1222        for podcast_id in self.libraries.podcasts[library_id].item_ids:
1223            mass_item = await self.mass.music.get_library_item_by_prov_id(
1224                media_type=MediaType.PODCAST,
1225                item_id=podcast_id,
1226                provider_instance_id_or_domain=self.instance_id,
1227            )
1228            if mass_item is not None:
1229                items.append(mass_item)
1230        return sorted(items, key=lambda x: x.name)
1231
1232    def _browse_lib_audiobooks(self, current_path: str) -> Sequence[BrowseFolder]:
1233        items = []
1234        for translation_key in AbsBrowseItemsBookTranslationKey:
1235            path = current_path + "/" + ABS_BROWSE_ITEMS_TO_PATH[translation_key]
1236            items.append(
1237                BrowseFolder(
1238                    item_id=translation_key.lower(),
1239                    name="",  # use translation key
1240                    translation_key=translation_key,
1241                    provider=self.instance_id,
1242                    path=path,
1243                )
1244            )
1245        return items
1246
1247    async def _browse_authors(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1248        abs_authors = await self._client.get_library_authors(library_id=library_id)
1249        items = []
1250        for author in abs_authors:
1251            path = f"{current_path}/{author.id_}"
1252            items.append(
1253                BrowseFolder(
1254                    item_id=author.id_,
1255                    name=author.name,
1256                    provider=self.instance_id,
1257                    path=path,
1258                )
1259            )
1260
1261        return sorted(items, key=lambda x: x.name)
1262
1263    async def _browse_narrators(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1264        abs_narrators = await self._client.get_library_narrators(library_id=library_id)
1265        items = []
1266        for narrator in abs_narrators:
1267            path = f"{current_path}/{narrator.id_}"
1268            items.append(
1269                BrowseFolder(
1270                    item_id=narrator.id_,
1271                    name=narrator.name,
1272                    provider=self.instance_id,
1273                    path=path,
1274                )
1275            )
1276
1277        return sorted(items, key=lambda x: x.name)
1278
1279    async def _browse_series(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1280        items = []
1281        async for response in self._client.get_library_series(library_id=library_id):
1282            if not response.results:
1283                break
1284            for abs_series in response.results:
1285                path = f"{current_path}/{abs_series.id_}"
1286                items.append(
1287                    BrowseFolder(
1288                        item_id=abs_series.id_,
1289                        name=abs_series.name,
1290                        provider=self.instance_id,
1291                        path=path,
1292                    )
1293                )
1294
1295        return sorted(items, key=lambda x: x.name)
1296
1297    async def _browse_collections(
1298        self, current_path: str, library_id: str
1299    ) -> Sequence[BrowseFolder]:
1300        items = []
1301        async for response in self._client.get_library_collections(library_id=library_id):
1302            if not response.results:
1303                break
1304            for abs_collection in response.results:
1305                path = f"{current_path}/{abs_collection.id_}"
1306                items.append(
1307                    BrowseFolder(
1308                        item_id=abs_collection.id_,
1309                        name=abs_collection.name,
1310                        provider=self.instance_id,
1311                        path=path,
1312                    )
1313                )
1314        return sorted(items, key=lambda x: x.name)
1315
1316    async def _browse_books(self, library_id: str) -> Sequence[MediaItemType]:
1317        if len(self.libraries.audiobooks[library_id].item_ids) == 0:
1318            self._log_no_helper_item_ids()
1319        items = []
1320        for book_id in self.libraries.audiobooks[library_id].item_ids:
1321            mass_item = await self.mass.music.get_library_item_by_prov_id(
1322                media_type=MediaType.AUDIOBOOK,
1323                item_id=book_id,
1324                provider_instance_id_or_domain=self.instance_id,
1325            )
1326            if mass_item is not None:
1327                items.append(mass_item)
1328        return sorted(items, key=lambda x: x.name)
1329
1330    async def _browse_author_books(
1331        self, current_path: str, author_id: str
1332    ) -> Sequence[MediaItemType | BrowseFolder]:
1333        items: list[MediaItemType | BrowseFolder] = []
1334
1335        abs_author = await self._client.get_author(
1336            author_id=author_id, include_items=True, include_series=True
1337        )
1338        if not isinstance(abs_author, AbsAuthorWithItemsAndSeries):
1339            raise TypeError("Unexpected type of author.")
1340
1341        book_ids = {x.id_ for x in abs_author.library_items}
1342        series_book_ids = set()
1343
1344        for series in abs_author.series:
1345            series_book_ids.update([x.id_ for x in series.items])
1346            path = f"{current_path}/{series.id_}"
1347            items.append(
1348                BrowseFolder(
1349                    item_id=series.id_,
1350                    # frontend does <name>: <translation>
1351                    name=series.name,
1352                    translation_key="series_singular",
1353                    provider=self.instance_id,
1354                    path=path,
1355                )
1356            )
1357        book_ids = book_ids.difference(series_book_ids)
1358        for book_id in book_ids:
1359            mass_item = await self.mass.music.get_library_item_by_prov_id(
1360                media_type=MediaType.AUDIOBOOK,
1361                item_id=book_id,
1362                provider_instance_id_or_domain=self.instance_id,
1363            )
1364            if mass_item is not None:
1365                items.append(mass_item)
1366
1367        return items
1368
1369    async def _browse_narrator_books(
1370        self, library_id: str, narrator_filter_str: str
1371    ) -> Sequence[MediaItemType]:
1372        items: list[MediaItemType] = []
1373        async for response in self._client.get_library_items(
1374            library_id=library_id, filter_str=f"narrators.{narrator_filter_str}"
1375        ):
1376            if not response.results:
1377                break
1378            for item in response.results:
1379                mass_item = await self.mass.music.get_library_item_by_prov_id(
1380                    media_type=MediaType.AUDIOBOOK,
1381                    item_id=item.id_,
1382                    provider_instance_id_or_domain=self.instance_id,
1383                )
1384                if mass_item is not None:
1385                    items.append(mass_item)
1386
1387        return sorted(items, key=lambda x: x.name)
1388
1389    async def _browse_series_books(self, series_id: str) -> Sequence[MediaItemType]:
1390        items = []
1391
1392        abs_series = await self._client.get_series(series_id=series_id, include_progress=True)
1393        if not isinstance(abs_series, AbsSeriesWithProgress):
1394            raise TypeError("Unexpected series type.")
1395
1396        for book_id in abs_series.progress.library_item_ids:
1397            # these are sorted in abs by sequence
1398            mass_item = await self.mass.music.get_library_item_by_prov_id(
1399                media_type=MediaType.AUDIOBOOK,
1400                item_id=book_id,
1401                provider_instance_id_or_domain=self.instance_id,
1402            )
1403            if mass_item is not None:
1404                items.append(mass_item)
1405
1406        return items
1407
1408    async def _browse_collection_books(self, collection_id: str) -> Sequence[MediaItemType]:
1409        items = []
1410        abs_collection = await self._client.get_collection(collection_id=collection_id)
1411        for book in abs_collection.books:
1412            mass_item = await self.mass.music.get_library_item_by_prov_id(
1413                media_type=MediaType.AUDIOBOOK,
1414                item_id=book.id_,
1415                provider_instance_id_or_domain=self.instance_id,
1416            )
1417            if mass_item is not None:
1418                items.append(mass_item)
1419        return items
1420
1421    async def _socket_abs_item_changed(
1422        self, items: LibraryItemExpanded | list[LibraryItemExpanded]
1423    ) -> None:
1424        """For added and updated."""
1425        abs_items = [items] if isinstance(items, LibraryItemExpanded) else items
1426        for abs_item in abs_items:
1427            if isinstance(abs_item, LibraryItemExpandedBook):
1428                # If the book has no audiofiles, we skip -> ebook only.
1429                if len(abs_item.media.tracks) == 0:
1430                    continue
1431                self.logger.debug(
1432                    'Updated book "%s" via socket.', abs_item.media.metadata.title or ""
1433                )
1434                await self.mass.music.audiobooks.add_item_to_library(
1435                    parse_audiobook(
1436                        abs_audiobook=abs_item,
1437                        instance_id=self.instance_id,
1438                        domain=self.domain,
1439                        token=self._client.token,
1440                        base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1441                    ),
1442                    overwrite_existing=True,
1443                )
1444                lib = self.libraries.audiobooks.get(abs_item.library_id, None)
1445                if lib is not None:
1446                    lib.item_ids.add(abs_item.id_)
1447            elif isinstance(abs_item, LibraryItemExpandedPodcast):
1448                self.logger.debug(
1449                    'Updated podcast "%s" via socket.', abs_item.media.metadata.title or ""
1450                )
1451                mass_podcast = parse_podcast(
1452                    abs_podcast=abs_item,
1453                    instance_id=self.instance_id,
1454                    domain=self.domain,
1455                    token=self._client.token,
1456                    base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1457                )
1458                if not (
1459                    bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
1460                    and mass_podcast.total_episodes == 0
1461                ):
1462                    await self.mass.music.podcasts.add_item_to_library(
1463                        mass_podcast,
1464                        overwrite_existing=True,
1465                    )
1466                    lib = self.libraries.podcasts.get(abs_item.library_id, None)
1467                    if lib is not None:
1468                        lib.item_ids.add(abs_item.id_)
1469        await self._cache_set_helper_libraries()
1470
1471    async def _socket_abs_item_removed(self, item: LibraryItemRemoved) -> None:
1472        """Item removed."""
1473        media_type: MediaType | None = None
1474        for lib in self.libraries.audiobooks.values():
1475            if item.id_ in lib.item_ids:
1476                media_type = MediaType.AUDIOBOOK
1477                lib.item_ids.remove(item.id_)
1478                break
1479        for lib in self.libraries.podcasts.values():
1480            if item.id_ in lib.item_ids:
1481                media_type = MediaType.PODCAST
1482                lib.item_ids.remove(item.id_)
1483                break
1484
1485        if media_type is not None:
1486            mass_item = await self.mass.music.get_library_item_by_prov_id(
1487                media_type=media_type,
1488                item_id=item.id_,
1489                provider_instance_id_or_domain=self.instance_id,
1490            )
1491            if mass_item is not None:
1492                await self.mass.music.remove_item_from_library(
1493                    media_type=media_type, library_item_id=mass_item.item_id
1494                )
1495                self.logger.debug('Removed %s "%s" via socket.', media_type.value, mass_item.name)
1496
1497        await self._cache_set_helper_libraries()
1498
1499    async def _socket_abs_user_item_progress_updated(
1500        self, id_: str, progress: MediaProgress
1501    ) -> None:
1502        """To update continue listening.
1503
1504        ABS reports every 15s and immediately on play state change.
1505        This callback is called per item if a progress is changed:
1506            - a change in position
1507            - the item is finished
1508        But it is _not_called, if a progress is reset/ discarded.
1509        """
1510        # guard, see progress guard class docstrings for explanation
1511        if not self.progress_guard.guard_ok_abs(abs_progress=progress):
1512            return
1513
1514        known_ids = self._get_all_known_item_ids()
1515        if progress.library_item_id not in known_ids:
1516            return
1517
1518        self.logger.debug(f"Updated progress of item {progress.library_item_id} via socket.")
1519
1520        if progress.episode_id is None:
1521            await self._update_playlog_book(progress)
1522            return
1523        await self._update_playlog_episode(progress)
1524
1525    async def _socket_abs_refresh_token_expired(self) -> None:
1526        await self.reauthenticate()
1527
1528    async def _socket_stream_open(self, stream: AbsStream) -> None:
1529        # stream's id is the same as the playback session id
1530        for session_helper in self.sessions.values():
1531            if session_helper.abs_session_id == stream.id_:
1532                session_helper.hls_stream_open.set()
1533                break
1534
1535    async def reauthenticate(self) -> None:
1536        """Reauthorize the abs session config if refresh token expired."""
1537        # some safe guarding should that function be called simultaneously
1538        if self.reauthenticate_lock.locked() or time.time() - self.reauthenticate_last < 5:
1539            while True:
1540                if not self.reauthenticate_lock.locked():
1541                    return
1542                await asyncio.sleep(0.5)
1543        async with self.reauthenticate_lock:
1544            await self._client.session_config.authenticate(
1545                username=str(self.config.get_value(CONF_USERNAME)),
1546                password=str(self.config.get_value(CONF_PASSWORD)),
1547            )
1548            self.reauthenticate_last = time.time()
1549
1550    def _get_all_known_item_ids(self) -> set[str]:
1551        known_ids = set()
1552        for lib in self.libraries.podcasts.values():
1553            known_ids.update(lib.item_ids)
1554        for lib in self.libraries.audiobooks.values():
1555            known_ids.update(lib.item_ids)
1556
1557        return known_ids
1558
1559    async def _set_playlog_from_user(self, user: User) -> None:
1560        """Update on user callback.
1561
1562        User holds also all media progresses specific to that user.
1563
1564        The function 'guard_ok_abs' uses the timestamp of the last update in abs, thus after an
1565        initial progress update, an unchanged update will not trigger a (useless) playlog update.
1566
1567        We do not sync removed progresses for the sake of simplicity.
1568        """
1569        await self._set_playlog_from_user_sync(user.media_progress)
1570
1571    async def _set_playlog_from_user_sync(self, progresses: list[MediaProgress]) -> None:
1572        # for debugging
1573        __updated_items = 0
1574
1575        known_ids = self._get_all_known_item_ids()
1576        abs_ids_with_progress = set()
1577
1578        for progress in progresses:
1579            # save progress ids for later
1580            ma_item_id = (
1581                progress.library_item_id
1582                if progress.episode_id is None
1583                else f"{progress.library_item_id} {progress.episode_id}"
1584            )
1585            abs_ids_with_progress.add(ma_item_id)
1586
1587            # Guard. Also makes sure, that we don't write to db again if no state change happened.
1588            # This is achieved by adding a Helper Progress in the update playlog functions, which
1589            # then has the most recent timestamp. If a subsequent progress sent by abs has an older
1590            # timestamp, we do not update again.
1591            if not self.progress_guard.guard_ok_abs(progress):
1592                continue
1593            if progress.current_time is not None:
1594                if int(progress.current_time) != 0 and not progress.current_time >= 30:
1595                    # same as mass default, only > 30s
1596                    continue
1597            if progress.library_item_id not in known_ids:
1598                continue
1599            __updated_items += 1
1600            if progress.episode_id is None:
1601                await self._update_playlog_book(progress)
1602            else:
1603                await self._update_playlog_episode(progress)
1604        self.logger.debug(f"Updated {__updated_items} from full playlog.")
1605
1606        # Get MA's known progresses of ABS.
1607        # In ABS the user may discard a progress, which removes the progress completely.
1608        # There is no socket notification for this event.
1609        ma_playlog_state = await self.mass.music.get_playlog_provider_item_ids(
1610            provider_instance_id=self.instance_id
1611        )
1612        ma_ids_with_progress = {x for _, x in ma_playlog_state}
1613        discarded_progress_ids = ma_ids_with_progress.difference(abs_ids_with_progress)
1614        for discarded_progress_id in discarded_progress_ids:
1615            if len(discarded_progress_id.split(" ")) == 1:
1616                if discarded_item := await self.mass.music.get_library_item_by_prov_id(
1617                    media_type=MediaType.AUDIOBOOK,
1618                    item_id=discarded_progress_id,
1619                    provider_instance_id_or_domain=self.instance_id,
1620                ):
1621                    self.progress_guard.add_progress(discarded_progress_id)
1622                    await self.mass.music.mark_item_unplayed(discarded_item)
1623            else:
1624                with suppress(MediaNotFoundError):
1625                    discarded_item = await self.get_podcast_episode(
1626                        prov_episode_id=discarded_progress_id, add_progress=False
1627                    )
1628                    self.progress_guard.add_progress(*discarded_progress_id.split(" "))
1629                    await self.mass.music.mark_item_unplayed(discarded_item)
1630            self.logger.debug("Discarded item %s ", discarded_progress_id)
1631
1632    async def _update_playlog_book(self, progress: MediaProgress) -> None:
1633        # helper progress also ensures no useless progress updates,
1634        # see comment above
1635        self.progress_guard.add_progress(progress.library_item_id)
1636        if progress.current_time is None:
1637            return
1638        mass_audiobook = await self.mass.music.get_library_item_by_prov_id(
1639            media_type=MediaType.AUDIOBOOK,
1640            item_id=progress.library_item_id,
1641            provider_instance_id_or_domain=self.instance_id,
1642        )
1643        if mass_audiobook is None:
1644            return
1645        if int(progress.current_time) == 0:
1646            await self.mass.music.mark_item_unplayed(mass_audiobook)
1647        else:
1648            await self.mass.music.mark_item_played(
1649                mass_audiobook,
1650                fully_played=progress.is_finished,
1651                seconds_played=int(progress.current_time),
1652            )
1653
1654    async def _update_playlog_episode(self, progress: MediaProgress) -> None:
1655        # helper progress also ensures no useless progress updates,
1656        # see comment above
1657        self.progress_guard.add_progress(progress.library_item_id, progress.episode_id)
1658        if progress.current_time is None:
1659            return
1660        _episode_id = f"{progress.library_item_id} {progress.episode_id}"
1661        try:
1662            # need to obtain full podcast, and then search for episode
1663            mass_episode = await self.get_podcast_episode(_episode_id, add_progress=False)
1664        except MediaNotFoundError:
1665            return
1666        if int(progress.current_time) == 0:
1667            await self.mass.music.mark_item_unplayed(mass_episode)
1668        else:
1669            await self.mass.music.mark_item_played(
1670                mass_episode,
1671                fully_played=progress.is_finished,
1672                seconds_played=int(progress.current_time),
1673            )
1674
1675    async def _cache_set_helper_libraries(self) -> None:
1676        await self.mass.cache.set(
1677            key=CACHE_KEY_LIBRARIES,
1678            provider=self.instance_id,
1679            category=CACHE_CATEGORY_LIBRARIES,
1680            data=self.libraries.to_dict(),
1681        )
1682
1683    def _log_no_libraries(self) -> None:
1684        self.logger.error("There are no libraries visible to the Audiobookshelf provider.")
1685
1686    def _log_no_helper_item_ids(self) -> None:
1687        self.logger.warning(
1688            "Cached item ids are missing. "
1689            "Please trigger a full resync of the Audiobookshelf provider manually."
1690        )
1691