/
/
/
1"""Audiobookshelf (abs) provider for Music Assistant."""
2
3from __future__ import annotations
4
5import asyncio
6import functools
7import itertools
8import time
9from collections.abc import AsyncGenerator, Callable, Coroutine, Sequence
10from contextlib import suppress
11from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
12
13import aioaudiobookshelf as aioabs
14from aioaudiobookshelf.client.items import LibraryItemExpandedBook as AbsLibraryItemExpandedBook
15from aioaudiobookshelf.client.items import (
16 LibraryItemExpandedPodcast as AbsLibraryItemExpandedPodcast,
17)
18from aioaudiobookshelf.client.items import PlaybackSessionExpanded as AbsPlaybackSessionExpanded
19from aioaudiobookshelf.client.items import PlaybackSessionParameters as AbsPlaybackSessionParameters
20from aioaudiobookshelf.client.session import SyncOpenSessionParameters
21from aioaudiobookshelf.exceptions import LoginError as AbsLoginError
22from aioaudiobookshelf.exceptions import RefreshTokenExpiredError
23from aioaudiobookshelf.exceptions import SessionNotFoundError as AbsSessionNotFoundError
24from aioaudiobookshelf.schema.author import AuthorExpanded
25from aioaudiobookshelf.schema.calls_authors import (
26 AuthorWithItemsAndSeries as AbsAuthorWithItemsAndSeries,
27)
28from aioaudiobookshelf.schema.calls_series import SeriesWithProgress as AbsSeriesWithProgress
29from aioaudiobookshelf.schema.library import (
30 LibraryItemExpanded,
31 LibraryItemExpandedBook,
32 LibraryItemExpandedPodcast,
33 LibraryItemMinifiedPodcast,
34)
35from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType
36from aioaudiobookshelf.schema.session import DeviceInfo as AbsDeviceInfo
37from aioaudiobookshelf.schema.shelf import (
38 SeriesShelf,
39 ShelfAuthors,
40 ShelfBook,
41 ShelfEpisode,
42 ShelfLibraryItemMinified,
43 ShelfPodcast,
44 ShelfSeries,
45)
46from aioaudiobookshelf.schema.shelf import ShelfId as AbsShelfId
47from aioaudiobookshelf.schema.shelf import ShelfType as AbsShelfType
48from aiohttp import web
49from music_assistant_models.config_entries import (
50 ConfigEntry,
51 ConfigValueType,
52 ProviderConfig,
53)
54from music_assistant_models.enums import (
55 ConfigEntryType,
56 ContentType,
57 MediaType,
58 ProviderFeature,
59 StreamType,
60)
61from music_assistant_models.errors import LoginFailed, MediaNotFoundError
62from music_assistant_models.media_items import (
63 Audiobook,
64 AudioFormat,
65 BrowseFolder,
66 ItemMapping,
67 MediaItemType,
68 PodcastEpisode,
69 UniqueList,
70)
71from music_assistant_models.media_items.media_item import RecommendationFolder
72from music_assistant_models.streamdetails import MultiPartPath, StreamDetails
73
74from music_assistant.models.music_provider import MusicProvider
75from music_assistant.providers.audiobookshelf.parsers import (
76 parse_audiobook,
77 parse_podcast,
78 parse_podcast_episode,
79)
80
81from .constants import (
82 ABS_BROWSE_ITEMS_TO_PATH,
83 ABS_SHELF_ID_ICONS,
84 ABS_SHELF_ID_TRANSLATION_KEY,
85 AIOHTTP_TIMEOUT,
86 CACHE_CATEGORY_LIBRARIES,
87 CACHE_KEY_LIBRARIES,
88 CONF_API_TOKEN,
89 CONF_HIDE_EMPTY_PODCASTS,
90 CONF_HLS_FORMATS,
91 CONF_OLD_TOKEN,
92 CONF_PASSWORD,
93 CONF_URL,
94 CONF_USE_HLS,
95 CONF_USERNAME,
96 CONF_VERIFY_SSL,
97 HLS_ALL_FORMATS,
98 HLS_FORMATS_SPLIT,
99 AbsBrowseItemsBookTranslationKey,
100 AbsBrowseItemsPodcastTranslationKey,
101 AbsBrowsePaths,
102)
103from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard, SessionHelper
104
105if TYPE_CHECKING:
106 from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved
107 from aioaudiobookshelf.schema.media_progress import MediaProgress
108 from aioaudiobookshelf.schema.streams import Stream as AbsStream
109 from aioaudiobookshelf.schema.user import User
110 from music_assistant_models.media_items import Podcast
111 from music_assistant_models.provider import ProviderManifest
112
113 from music_assistant.mass import MusicAssistant
114 from music_assistant.models import ProviderInstanceType
115
116SUPPORTED_FEATURES = {
117 ProviderFeature.LIBRARY_PODCASTS,
118 ProviderFeature.LIBRARY_AUDIOBOOKS,
119 ProviderFeature.BROWSE,
120 ProviderFeature.RECOMMENDATIONS,
121}
122
123
124async def setup(
125 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
126) -> ProviderInstanceType:
127 """Initialize provider(instance) with given configuration."""
128 return Audiobookshelf(mass, manifest, config, SUPPORTED_FEATURES)
129
130
131async def get_config_entries(
132 mass: MusicAssistant,
133 instance_id: str | None = None,
134 action: str | None = None,
135 values: dict[str, ConfigValueType] | None = None,
136) -> tuple[ConfigEntry, ...]:
137 """
138 Return Config entries to setup this provider.
139
140 instance_id: id of an existing provider instance (None if new instance setup).
141 action: [optional] action key called from config entries UI.
142 values: the (intermediate) raw values for config entries sent with the action.
143 """
144 # ruff: noqa: ARG001
145 return (
146 ConfigEntry(
147 key="label",
148 type=ConfigEntryType.LABEL,
149 label="Please provide the address of your Audiobookshelf instance. To authenticate "
150 "you have two options: "
151 "a) Provide username AND password. Leave the API key empty. "
152 "b) Provide ONLY an API key.",
153 ),
154 ConfigEntry(
155 key=CONF_URL,
156 type=ConfigEntryType.STRING,
157 label="Server",
158 required=True,
159 description="The URL of the Audiobookshelf server to connect to. For example "
160 "https://abs.domain.tld/ or http://192.168.1.4:13378/",
161 ),
162 ConfigEntry(
163 key=CONF_USERNAME,
164 type=ConfigEntryType.STRING,
165 label="Username",
166 required=False,
167 description="The username to authenticate to the remote server.",
168 ),
169 ConfigEntry(
170 key=CONF_PASSWORD,
171 type=ConfigEntryType.SECURE_STRING,
172 label="Password",
173 required=False,
174 description="The password to authenticate to the remote server.",
175 ),
176 ConfigEntry(
177 key=CONF_API_TOKEN,
178 type=ConfigEntryType.SECURE_STRING,
179 label="API key _instead_ of user/ password. (ABS version >= 2.26)",
180 required=False,
181 description="Instead of using a username and password, "
182 "you may provide an API key (ABS version >= 2.26). "
183 "Please consult the docs.",
184 ),
185 ConfigEntry(
186 key=CONF_OLD_TOKEN,
187 type=ConfigEntryType.SECURE_STRING,
188 label="old token",
189 required=False,
190 hidden=True,
191 ),
192 ConfigEntry(
193 key=CONF_USE_HLS,
194 type=ConfigEntryType.BOOLEAN,
195 label="Stream via HLS from ABS.",
196 description="Use an HLS stream when streaming from audiobookshelf.",
197 required=False,
198 default_value=False,
199 advanced=True,
200 ),
201 ConfigEntry(
202 key=CONF_HLS_FORMATS,
203 type=ConfigEntryType.STRING,
204 label=f"Use HLS for these file extensions. Separate with ';'. Use {HLS_ALL_FORMATS} for"
205 " all formats.",
206 description="Use HLS only for these file extensions."
207 f" Separate with ;. E.g. m4b or m4b;aac or {HLS_ALL_FORMATS}",
208 required=False,
209 default_value="m4b",
210 advanced=True,
211 ),
212 ConfigEntry(
213 key=CONF_VERIFY_SSL,
214 type=ConfigEntryType.BOOLEAN,
215 label="Verify SSL",
216 required=False,
217 description="Whether or not to verify the certificate of SSL/TLS connections.",
218 advanced=True,
219 default_value=True,
220 ),
221 ConfigEntry(
222 key=CONF_HIDE_EMPTY_PODCASTS,
223 type=ConfigEntryType.BOOLEAN,
224 label="Hide empty podcasts.",
225 required=False,
226 description="This will skip podcasts with no episodes associated.",
227 advanced=True,
228 default_value=False,
229 ),
230 )
231
232
233R = TypeVar("R")
234P = ParamSpec("P")
235
236
237class Audiobookshelf(MusicProvider):
238 """Audiobookshelf MusicProvider."""
239
240 _on_unload_callbacks: list[Callable[[], None]]
241
242 @staticmethod
243 def handle_refresh_token(
244 method: Callable[P, Coroutine[Any, Any, R]],
245 ) -> Callable[P, Coroutine[Any, Any, R]]:
246 """Decorate a method to handle an expired refresh token by relogin."""
247
248 @functools.wraps(method)
249 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
250 self = cast("Audiobookshelf", args[0])
251 try:
252 return await method(*args, **kwargs)
253 except RefreshTokenExpiredError:
254 self.logger.debug("Refresh token expired. Trying to renew.")
255 await self.reauthenticate()
256 return await method(*args, **kwargs)
257
258 return wrapper
259
260 async def handle_async_init(self) -> None:
261 """Pass config values to client and initialize."""
262 self._on_unload_callbacks: list[Callable[[], None]] = []
263 self.sessions: dict[str, SessionHelper] = {} # key is the mass_item_id
264 self.create_session_lock = asyncio.Lock()
265 base_url = str(self.config.get_value(CONF_URL))
266 username = str(self.config.get_value(CONF_USERNAME))
267 password = str(self.config.get_value(CONF_PASSWORD))
268 token_old = self.config.get_value(CONF_OLD_TOKEN)
269 token_api = self.config.get_value(CONF_API_TOKEN)
270 verify_ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
271 session_config = aioabs.SessionConfiguration(
272 session=self.mass.http_session,
273 url=base_url,
274 verify_ssl=verify_ssl,
275 logger=self.logger,
276 pagination_items_per_page=30, # audible provider goes with 50 for pagination
277 timeout=AIOHTTP_TIMEOUT,
278 )
279 # If we are configured with a non-expiring API key or not.
280 self.is_token_user = False
281 try:
282 if token_api is not None or token_old is not None:
283 _token = token_api if token_api is not None else token_old
284 session_config.token = str(_token)
285 (
286 self._client,
287 self._client_socket,
288 ) = await aioabs.get_user_and_socket_client_by_token(session_config=session_config)
289 self.is_token_user = True
290 else:
291 self._client, self._client_socket = await aioabs.get_user_and_socket_client(
292 session_config=session_config, username=username, password=password
293 )
294 await self._client_socket.init_client()
295 except AbsLoginError as exc:
296 raise LoginFailed(f"Login to abs instance at {base_url} failed.") from exc
297
298 if token_old is not None and token_api is None:
299 # Log Message that the old token won't work
300 _version = self._client.server_settings.version.split(".")
301 if len(_version) >= 2:
302 try:
303 major, minor = int(_version[0]), int(_version[1])
304 except ValueError:
305 major = minor = 0
306 if major >= 2 and minor >= 26:
307 self.logger.warning(
308 """
309
310######## Audiobookshelf API key change #############################################################
311
312Audiobookshelf introduced a new API key system in version 2.26 (JWT).
313You are still using a token configured with a previous version of Audiobookshelf,
314but you are running version %s. This will stop working in a future Audiobookshelf release.
315Please create a non-expiring API Key instead, and update your configuration accordingly.
316Refer to the documentation of Audiobookshelf, https://www.audiobookshelf.org/guides/api-keys/
317and of Music Assistant https://www.music-assistant.io/music-providers/audiobookshelf/
318for more details.
319
320""",
321 self._client.server_settings.version,
322 )
323
324 cached_libraries = await self.mass.cache.get(
325 key=CACHE_KEY_LIBRARIES,
326 provider=self.instance_id,
327 category=CACHE_CATEGORY_LIBRARIES,
328 default=None,
329 )
330 if cached_libraries is None:
331 self.libraries = LibrariesHelper()
332 # We need the library ids for recommendations. If the cache got cleared e.g. by a db
333 # migration, we might end up with empty library helpers on a configured provider. Note,
334 # that the lib item ids are not synced, still only on full provider sync, instead the
335 # sets are empty. Full sync is expensive.
336 # See warning in browse_lib_podcasts / _browse_books
337 libraries = await self._client.get_all_libraries()
338 for library in libraries:
339 if library.media_type == AbsLibraryMediaType.BOOK:
340 self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
341 elif library.media_type == AbsLibraryMediaType.PODCAST:
342 self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
343 else:
344 self.libraries = LibrariesHelper.from_dict(cached_libraries)
345
346 # set socket callbacks
347 self._client_socket.set_item_callbacks(
348 on_item_added=self._socket_abs_item_changed,
349 on_item_updated=self._socket_abs_item_changed,
350 on_item_removed=self._socket_abs_item_removed,
351 on_items_added=self._socket_abs_item_changed,
352 on_items_updated=self._socket_abs_item_changed,
353 )
354
355 self._client_socket.set_user_callbacks(
356 on_user_item_progress_updated=self._socket_abs_user_item_progress_updated,
357 )
358
359 self._client_socket.set_refresh_token_expired_callback(
360 on_refresh_token_expired=self._socket_abs_refresh_token_expired
361 )
362
363 self._client_socket.set_stream_callbacks(on_stream_open=self._socket_stream_open)
364
365 # progress guard
366 self.progress_guard = ProgressGuard()
367
368 # safe guard reauthentication
369 self.reauthenticate_lock = asyncio.Lock()
370 self.reauthenticate_last = 0.0
371
372 # register dynamic stream route for audiobook parts
373 self._on_unload_callbacks.append(
374 self.mass.streams.register_dynamic_route(
375 f"/{self.instance_id}_part_stream", self._handle_session_part_request
376 )
377 )
378
379 @handle_refresh_token
380 async def unload(self, is_removed: bool = False) -> None:
381 """
382 Handle unload/close of the provider.
383
384 Called when provider is deregistered (e.g. MA exiting or config reloading).
385 is_removed will be set to True when the provider is removed from the configuration.
386 """
387 await self._client.logout()
388 await self._client_socket.logout()
389 for callback in self._on_unload_callbacks:
390 callback()
391
392 @property
393 def is_streaming_provider(self) -> bool:
394 """Return True if the provider is a streaming provider."""
395 # For streaming providers return True here but for local file based providers return False.
396 return False
397
398 @handle_refresh_token
399 async def sync_library(self, media_type: MediaType) -> None:
400 """Obtain audiobook library ids and podcast library ids."""
401 libraries = await self._client.get_all_libraries()
402 if len(libraries) == 0:
403 self._log_no_libraries()
404 for library in libraries:
405 if library.media_type == AbsLibraryMediaType.BOOK and media_type == MediaType.AUDIOBOOK:
406 self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
407 elif (
408 library.media_type == AbsLibraryMediaType.PODCAST
409 and media_type == MediaType.PODCAST
410 ):
411 self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
412 await super().sync_library(media_type)
413 await self._cache_set_helper_libraries()
414
415 # update playlog
416 user = await self._client.get_my_user()
417 await self._set_playlog_from_user(user)
418
419 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
420 """Retrieve library/subscribed podcasts from the provider.
421
422 Minified podcast information is enough.
423 """
424 for pod_lib_id in self.libraries.podcasts:
425 async for response in self._client.get_library_items(library_id=pod_lib_id):
426 if not response.results:
427 break
428 podcast_ids = [x.id_ for x in response.results]
429 # store uuids
430 self.libraries.podcasts[pod_lib_id].item_ids.update(podcast_ids)
431 for podcast_minified in response.results:
432 assert isinstance(podcast_minified, LibraryItemMinifiedPodcast)
433 mass_podcast = parse_podcast(
434 abs_podcast=podcast_minified,
435 instance_id=self.instance_id,
436 domain=self.domain,
437 token=self._client.token,
438 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
439 )
440 if (
441 bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
442 and mass_podcast.total_episodes == 0
443 ):
444 continue
445 yield mass_podcast
446
447 @handle_refresh_token
448 async def _get_abs_expanded_podcast(
449 self, prov_podcast_id: str
450 ) -> AbsLibraryItemExpandedPodcast:
451 abs_podcast = await self._client.get_library_item_podcast(
452 podcast_id=prov_podcast_id, expanded=True
453 )
454 assert isinstance(abs_podcast, AbsLibraryItemExpandedPodcast)
455
456 return abs_podcast
457
458 @handle_refresh_token
459 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
460 """Get single podcast."""
461 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
462 return parse_podcast(
463 abs_podcast=abs_podcast,
464 instance_id=self.instance_id,
465 domain=self.domain,
466 token=self._client.token,
467 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
468 )
469
470 async def get_podcast_episodes(
471 self, prov_podcast_id: str
472 ) -> AsyncGenerator[PodcastEpisode, None]:
473 """Get all podcast episodes of podcast.
474
475 Adds progress information.
476 """
477 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
478 episode_cnt = 1
479 # the user has the progress of all media items
480 # so we use a single api call here to obtain possibly many
481 # progresses for episodes
482 user = await self._client.get_my_user()
483 abs_progresses = {
484 x.episode_id: x
485 for x in user.media_progress
486 if x.episode_id is not None and x.library_item_id == prov_podcast_id
487 }
488 for abs_episode in abs_podcast.media.episodes:
489 progress = abs_progresses.get(abs_episode.id_, None)
490 mass_episode = parse_podcast_episode(
491 episode=abs_episode,
492 prov_podcast_id=prov_podcast_id,
493 fallback_episode_cnt=episode_cnt,
494 instance_id=self.instance_id,
495 domain=self.domain,
496 token=self._client.token,
497 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
498 media_progress=progress,
499 )
500 yield mass_episode
501 episode_cnt += 1
502
503 @handle_refresh_token
504 async def get_podcast_episode(
505 self, prov_episode_id: str, add_progress: bool = True
506 ) -> PodcastEpisode:
507 """Get single podcast episode."""
508 prov_podcast_id, e_id = prov_episode_id.split(" ")
509 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
510 episode_cnt = 1
511 for abs_episode in abs_podcast.media.episodes:
512 if abs_episode.id_ == e_id:
513 progress = None
514 if add_progress:
515 progress = await self._client.get_my_media_progress(
516 item_id=prov_podcast_id, episode_id=abs_episode.id_
517 )
518 return parse_podcast_episode(
519 episode=abs_episode,
520 prov_podcast_id=prov_podcast_id,
521 fallback_episode_cnt=episode_cnt,
522 instance_id=self.instance_id,
523 domain=self.domain,
524 token=self._client.token,
525 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
526 media_progress=progress,
527 )
528
529 episode_cnt += 1
530 raise MediaNotFoundError("Episode not found")
531
532 async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
533 """Get Audiobook libraries.
534
535 Need expanded version for chapters.
536 """
537 for book_lib_id in self.libraries.audiobooks:
538 async for response in self._client.get_library_items(library_id=book_lib_id):
539 if not response.results:
540 break
541 book_ids = [x.id_ for x in response.results]
542 # store uuids
543 self.libraries.audiobooks[book_lib_id].item_ids.update(book_ids)
544 # use expanded version for chapters/ caching.
545 books_expanded = await self._client.get_library_item_batch_book(item_ids=book_ids)
546 for book_expanded in books_expanded:
547 # If the book has no audiofiles, we skip -> ebook only.
548 if len(book_expanded.media.tracks) == 0:
549 continue
550 mass_audiobook = parse_audiobook(
551 abs_audiobook=book_expanded,
552 instance_id=self.instance_id,
553 domain=self.domain,
554 token=self._client.token,
555 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
556 )
557 yield mass_audiobook
558
559 @handle_refresh_token
560 async def _get_abs_expanded_audiobook(
561 self, prov_audiobook_id: str
562 ) -> AbsLibraryItemExpandedBook:
563 abs_audiobook = await self._client.get_library_item_book(
564 book_id=prov_audiobook_id, expanded=True
565 )
566 assert isinstance(abs_audiobook, AbsLibraryItemExpandedBook)
567
568 return abs_audiobook
569
570 @handle_refresh_token
571 async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
572 """Get a single audiobook.
573
574 Progress is added here.
575 """
576 progress = await self._client.get_my_media_progress(item_id=prov_audiobook_id)
577 abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=prov_audiobook_id)
578 return parse_audiobook(
579 abs_audiobook=abs_audiobook,
580 instance_id=self.instance_id,
581 domain=self.domain,
582 token=self._client.token,
583 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
584 media_progress=progress,
585 )
586
587 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
588 """Get stream of item."""
589 # We always create a playback session. The default is direct playback.
590 # In that case, session.tracks holds the exact same as the audiobook/ podcast.track,
591 # so we only use the session to update our progress.
592 #
593 # In the case of hls the session has an hls stream as track.
594 if media_type in (MediaType.PODCAST_EPISODE, MediaType.AUDIOBOOK):
595 session = await self._get_playback_session(mass_item_id=item_id)
596 return await self._get_stream_details_session(
597 session, session_helper=self.sessions[item_id], media_type=media_type
598 )
599 raise MediaNotFoundError("Stream unknown")
600
601 async def _get_stream_details_session(
602 self,
603 abs_session: AbsPlaybackSessionExpanded,
604 session_helper: SessionHelper,
605 media_type: MediaType,
606 ) -> StreamDetails:
607 """Streamdetails audiobook.
608
609 We always use a custom stream type, also for single file, such
610 that we can handle an ffmpeg error and refresh our tokens.
611 """
612 abs_base_url = str(self.config.get_value(CONF_URL))
613 tracks = abs_session.audio_tracks
614
615 if len(tracks) == 0:
616 raise MediaNotFoundError("Session has no tracks.")
617
618 content_type = ContentType.UNKNOWN
619 if abs_session.audio_tracks[0].metadata is not None:
620 content_type = ContentType.try_parse(abs_session.audio_tracks[0].metadata.ext)
621
622 file_parts: list[MultiPartPath] = []
623 if self.is_token_user:
624 self.logger.debug("Token User - Streams are direct.")
625 for idx, track in enumerate(tracks):
626 if self.is_token_user:
627 # an api key is long-lived
628 stream_url = f"{abs_base_url}{track.content_url}?token={self._client.token}"
629 else:
630 # to ensure token is always valid, we create a dynamic url
631 # this ensures that we always get a fresh token on each part
632 # without having to deal with a custom stream etc.
633 # we also use this for a single track/ hls stream, otherwise we can't seek
634 stream_url = (
635 f"{self.mass.streams.base_url}/{self.instance_id}_part_stream?"
636 f"session_id={abs_session.id_}&part_id={idx}"
637 )
638 file_parts.append(MultiPartPath(path=stream_url, duration=track.duration))
639
640 stream_type = StreamType.HLS if "hls" in file_parts[0].path else StreamType.HTTP
641 if stream_type == StreamType.HLS:
642 # wait for stream to be ready
643 try:
644 await asyncio.wait_for(session_helper.hls_stream_open.wait(), 10)
645 except TimeoutError:
646 self.logger.warning(
647 "Did not receive HLS stream open event after 10s, continuing anyways."
648 )
649
650 return StreamDetails(
651 provider=self.instance_id,
652 item_id=abs_session.id_,
653 audio_format=AudioFormat(content_type=content_type),
654 media_type=media_type,
655 stream_type=stream_type,
656 duration=int(abs_session.duration),
657 path=file_parts[0].path if len(file_parts) == 1 else file_parts,
658 can_seek=True,
659 allow_seek=True,
660 )
661
662 async def _get_playback_session(self, mass_item_id: str) -> AbsPlaybackSessionExpanded:
663 """Either creates or returns an open abs session."""
664 async with self.create_session_lock:
665 # check for an available open session
666 if session_helper := self.sessions.get(mass_item_id):
667 with suppress(AbsSessionNotFoundError):
668 return await self._client.get_open_session(
669 session_id=session_helper.abs_session_id
670 )
671
672 item_ids = mass_item_id.split(" ")
673 abs_item_id = item_ids[0]
674 episode_id = item_ids[1] if len(item_ids) == 2 else None
675
676 # Create a new session
677 ## Check HLS usage
678 use_hls = bool(self.config.get_value(CONF_USE_HLS))
679 hls_formats = str(self.config.get_value(CONF_HLS_FORMATS))
680 if use_hls and hls_formats != HLS_ALL_FORMATS:
681 use_hls = False # only for certain formats
682 extensions = [x.lstrip(".") for x in hls_formats.split(HLS_FORMATS_SPLIT)]
683 if episode_id is None:
684 if (
685 metadata := (await self._get_abs_expanded_audiobook(abs_item_id))
686 .media.tracks[0]
687 .metadata
688 ):
689 if metadata.ext.lstrip(".") in extensions:
690 use_hls = True
691 else:
692 podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_item_id)
693 episode = None
694 for episode in podcast.media.episodes:
695 if episode.id_ == episode_id:
696 break
697 if episode and (metadata := episode.audio_track.metadata):
698 if metadata.ext.lstrip(".") in extensions:
699 use_hls = True
700
701 client_name = f"Music Assistant {self.instance_id}"
702 device_info = AbsDeviceInfo(
703 device_id=self.instance_id,
704 client_name=client_name,
705 client_version=self.mass.version,
706 manufacturer="",
707 model=self.mass.server_id,
708 )
709
710 session = await self._client.get_playback_session(
711 # These parameters give an hls if we don't enforce direct play stream,
712 # which is only a concat of the individual file's at abs
713 session_parameters=AbsPlaybackSessionParameters(
714 device_info=device_info,
715 force_direct_play=not use_hls,
716 force_transcode=use_hls,
717 # mimetypes are only checked for abs' internal "should transcode
718 # see https://github.com/advplyr/audiobookshelf/blob/master/server/managers/PlaybackSessionManager.js
719 supported_mime_types=[],
720 media_player=client_name,
721 ),
722 item_id=abs_item_id,
723 episode_id=episode_id,
724 )
725
726 if use_hls:
727 # Safety check.
728 track_url = session.audio_tracks[0].content_url
729 if track_url.split("/")[1] != "hls":
730 raise MediaNotFoundError("Did expect HLS stream for session playback")
731 self.logger.debug("Using an HLS stream for playback.")
732
733 self.sessions[mass_item_id] = SessionHelper(
734 abs_session_id=session.id_,
735 last_sync_time=time.time(),
736 hls_stream_open=asyncio.Event(),
737 )
738 return session
739
740 @handle_refresh_token
741 async def _handle_session_part_request(self, request: web.Request) -> web.Response:
742 """
743 Handle dynamic audiobook part stream request.
744
745 We redirect to the actual stream url with token.
746 This is done because the token might expire, so we need to
747 generate a fresh url on each part.
748 """
749 if not (session_id := request.query.get("session_id")):
750 return web.Response(status=400, text="Missing session_id")
751 if not (part_id := request.query.get("part_id")):
752 return web.Response(status=400, text="Missing part_id")
753 self.logger.debug(
754 "Handling session part request for session %s and part %s", session_id, part_id
755 )
756 try:
757 abs_session = await self._client.get_open_session(session_id=session_id)
758 except AbsSessionNotFoundError as err:
759 raise web.HTTPNotFound from err
760 part_id = int(part_id) # type: ignore[assignment]
761 try:
762 part_track = abs_session.audio_tracks[part_id]
763 except IndexError:
764 return web.Response(status=404, text="Part not found")
765
766 base_url = str(self.config.get_value(CONF_URL))
767 stream_url = f"{base_url}{part_track.content_url}?token={self._client.token}"
768 # redirect to the actual stream url
769 raise web.HTTPFound(location=stream_url)
770
771 @handle_refresh_token
772 async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
773 """Return finished:bool, position_ms: int."""
774 # this method is called _before_ get_stream_details, so the playback session
775 # is created here.
776 session = await self._get_playback_session(mass_item_id=item_id)
777 finished = session.current_time > session.duration - 30
778 self.logger.debug("Resume position: obtained.")
779 return finished, int(session.current_time * 1000)
780
781 @handle_refresh_token
782 async def recommendations(self) -> list[RecommendationFolder]:
783 """Get recommendations."""
784 # We have to avoid "flooding" the home page, which becomes especially troublesome if users
785 # have multiple libraries. Instead we collect per ShelfId, and make sure, that we always get
786 # roughly the same amount of items per row, no matter the amount of libraries
787 # List of list (one list per lib) here, such that we can pick the items per lib later.
788 items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]] = {}
789
790 all_libraries = {**self.libraries.audiobooks, **self.libraries.podcasts}
791 max_items_per_row = 20
792 num_libraries = len(all_libraries)
793
794 if num_libraries == 0:
795 self._log_no_libraries()
796 return []
797
798 limit_items_per_lib = max_items_per_row // num_libraries
799 limit_items_per_lib = 1 if limit_items_per_lib == 0 else limit_items_per_lib
800
801 for library_id in all_libraries:
802 shelves = await self._client.get_library_personalized_view(
803 library_id=library_id, limit=limit_items_per_lib
804 )
805 await self._recommendations_iter_shelves(shelves, library_id, items_by_shelf_id)
806
807 folders: list[RecommendationFolder] = []
808 for shelf_id, item_lists in items_by_shelf_id.items():
809 # we have something like [[A, B], [C, D, E], [F]]
810 # and want [A, C, F, B, D, E]
811 recommendation_items = [
812 x
813 for x in itertools.chain.from_iterable(itertools.zip_longest(*item_lists))
814 if x is not None
815 ][:max_items_per_row]
816
817 # shelf ids follow pattern:
818 # recently-added
819 # newest-episodes
820 # etc
821 name = f"{shelf_id.capitalize().replace('-', ' ')}"
822 if ABS_SHELF_ID_TRANSLATION_KEY.get(shelf_id):
823 name = "" # use translation key if available
824 folders.append(
825 RecommendationFolder(
826 item_id=f"{shelf_id}",
827 name=name,
828 icon=ABS_SHELF_ID_ICONS.get(shelf_id),
829 translation_key=ABS_SHELF_ID_TRANSLATION_KEY.get(shelf_id),
830 items=UniqueList(recommendation_items),
831 provider=self.instance_id,
832 )
833 )
834
835 # Browse "recommendation" for convenience. If the user has
836 # multiple audiobook libraries, we return a listing of them.
837 # If there is only a single audiobook library, we add the folders
838 # from _browse_lib_audiobooks, i.e. Authors, Narrators etc.
839 # Podcast libs do not have filter folders, so always the root folders.
840 browse_items: list[MediaItemType | BrowseFolder] = []
841 translation_key = "libraries"
842 if len(self.libraries.audiobooks) <= 1:
843 if len(self.libraries.podcasts) == 0:
844 translation_key = "library"
845
846 # audiobooklibs are first, and we have at max 1 audiobook lib
847 _browse_root = self._browse_root(append_mediatype_suffix=False)
848 if len(self.libraries.audiobooks) == 0:
849 browse_items.extend(_browse_root)
850 else:
851 assert isinstance(_browse_root[0], BrowseFolder)
852 _path = _browse_root[0].path
853 browse_items.extend(self._browse_lib_audiobooks(current_path=_path))
854 # add podcast roots
855 browse_items.extend(_browse_root[1:])
856 else:
857 browse_items = list(self._browse_root())
858
859 folders.append(
860 RecommendationFolder(
861 item_id="browse",
862 name="", # use translation key
863 icon="mdi-bookshelf",
864 translation_key=translation_key,
865 items=UniqueList(browse_items),
866 provider=self.instance_id,
867 )
868 )
869
870 return folders
871
872 async def _recommendations_iter_shelves(
873 self,
874 shelves: list[ShelfBook | ShelfPodcast | ShelfAuthors | ShelfEpisode | ShelfSeries],
875 library_id: str,
876 items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]],
877 ) -> None:
878 for shelf in shelves:
879 media_type: MediaType
880 match shelf.type_:
881 case AbsShelfType.PODCAST:
882 media_type = MediaType.PODCAST
883 case AbsShelfType.EPISODE:
884 media_type = MediaType.PODCAST_EPISODE
885 case AbsShelfType.BOOK:
886 media_type = MediaType.AUDIOBOOK
887 case AbsShelfType.SERIES | AbsShelfType.AUTHORS:
888 media_type = MediaType.FOLDER
889 case _:
890 # this would be authors, currently
891 continue
892
893 items: list[MediaItemType | BrowseFolder] = []
894 # Recently added is the _only_ case, where we get a full podcast
895 # We have a podcast object with only the episodes matching the
896 # shelf.id_ otherwise.
897 match shelf.id_:
898 case (
899 AbsShelfId.RECENTLY_ADDED
900 | AbsShelfId.LISTEN_AGAIN
901 | AbsShelfId.DISCOVER
902 | AbsShelfId.NEWEST_EPISODES
903 | AbsShelfId.CONTINUE_LISTENING
904 ):
905 for entity in shelf.entities:
906 assert isinstance(entity, ShelfLibraryItemMinified)
907 item: MediaItemType | None = None
908 if media_type in [MediaType.PODCAST, MediaType.AUDIOBOOK]:
909 item = await self.mass.music.get_library_item_by_prov_id(
910 media_type=media_type,
911 provider_instance_id_or_domain=self.instance_id,
912 item_id=entity.id_,
913 )
914 elif media_type == MediaType.PODCAST_EPISODE:
915 podcast_id = entity.id_
916 if entity.recent_episode is None:
917 continue
918 # we only have a PodcastEpisode here, with limited information
919 item = parse_podcast_episode(
920 episode=entity.recent_episode,
921 prov_podcast_id=podcast_id,
922 instance_id=self.instance_id,
923 domain=self.domain,
924 token=self._client.token,
925 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
926 )
927 if item is not None:
928 items.append(item)
929 case AbsShelfId.RECENT_SERIES | AbsShelfId.CONTINUE_SERIES:
930 # We jump into a browse folder here if we have SeriesShelf, set path up as if
931 # browse function used.
932 if isinstance(shelf, ShelfSeries):
933 for entity in shelf.entities:
934 assert isinstance(entity, SeriesShelf)
935 if len(entity.books) == 0:
936 continue
937 path = (
938 f"{self.instance_id}://"
939 f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
940 f"{AbsBrowsePaths.SERIES}/{entity.id_}"
941 )
942 items.append(
943 BrowseFolder(
944 item_id=entity.id_,
945 name=entity.name,
946 provider=self.instance_id,
947 path=path,
948 )
949 )
950 elif isinstance(shelf, ShelfBook) and media_type == MediaType.AUDIOBOOK:
951 # Single books, must be audiobooks
952 for entity in shelf.entities:
953 item = await self.mass.music.get_library_item_by_prov_id(
954 media_type=media_type,
955 provider_instance_id_or_domain=self.instance_id,
956 item_id=entity.id_,
957 )
958 if item is not None:
959 items.append(item)
960 case AbsShelfId.NEWEST_AUTHORS:
961 # same as for series, use a folder
962 for entity in shelf.entities:
963 assert isinstance(entity, AuthorExpanded)
964 if entity.num_books == 0:
965 continue
966 path = (
967 f"{self.instance_id}://"
968 f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
969 f"{AbsBrowsePaths.AUTHORS}/{entity.id_}"
970 )
971 items.append(
972 BrowseFolder(
973 item_id=entity.id_,
974 name=entity.name,
975 provider=self.instance_id,
976 path=path,
977 )
978 )
979 if not items:
980 continue
981
982 # add collected items
983 assert isinstance(shelf.id_, AbsShelfId)
984 items_collected = items_by_shelf_id.get(shelf.id_, [])
985 items_collected.append(items)
986 items_by_shelf_id[shelf.id_] = items_collected
987
988 @handle_refresh_token
989 async def on_played(
990 self,
991 media_type: MediaType,
992 prov_item_id: str,
993 fully_played: bool,
994 position: int,
995 media_item: MediaItemType,
996 is_playing: bool = False,
997 ) -> None:
998 """Update progress in Audiobookshelf.
999
1000 In our case media_type may have 3 values:
1001 - PODCAST
1002 - PODCAST_EPISODE
1003 - AUDIOBOOK
1004 We ignore PODCAST (function is called on adding a podcast with position=None)
1005
1006 """
1007
1008 async def _update_by_session(session_helper: SessionHelper, duration: int) -> bool:
1009 now = time.time()
1010 try:
1011 await self._client.sync_open_session(
1012 session_id=session_helper.abs_session_id,
1013 parameters=SyncOpenSessionParameters(
1014 current_time=position,
1015 time_listened=now - session_helper.last_sync_time,
1016 duration=duration,
1017 ),
1018 )
1019 session_helper.last_sync_time = now
1020 self.logger.debug("Synced playback session, position %s s.", position)
1021 return True
1022 except AbsSessionNotFoundError:
1023 self.logger.error("Was unable to sync session.")
1024 return False
1025
1026 if media_type == MediaType.PODCAST_EPISODE:
1027 abs_podcast_id, abs_episode_id = prov_item_id.split(" ")
1028
1029 # guard, see progress guard class docstrings for explanation
1030 if not self.progress_guard.guard_ok_mass(
1031 item_id=abs_podcast_id, episode_id=abs_episode_id
1032 ):
1033 return
1034 self.progress_guard.add_progress(item_id=abs_podcast_id, episode_id=abs_episode_id)
1035
1036 if media_item is None or not isinstance(media_item, PodcastEpisode):
1037 return
1038
1039 if fully_played and position < media_item.duration - 30:
1040 # faulty position update
1041 # occurs sometimes, if a player disconnects unexpectedly, or reports
1042 # a false position - seen this for MC players, but not for sendspin
1043 return
1044
1045 if position == 0 and not fully_played:
1046 # marked unplayed
1047 mp = await self._client.get_my_media_progress(
1048 item_id=abs_podcast_id, episode_id=abs_episode_id
1049 )
1050 if mp is not None:
1051 await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1052 self.logger.debug(f"Removed media progress of {media_type.value}.")
1053 return
1054
1055 duration = media_item.duration
1056 updated = False
1057 if session_helper := self.sessions.get(prov_item_id):
1058 updated = await _update_by_session(session_helper=session_helper, duration=duration)
1059 if not updated:
1060 self.logger.debug(
1061 f"Updating media progress of {media_type.value}, title {media_item.name}."
1062 )
1063 await self._client.update_my_media_progress(
1064 item_id=abs_podcast_id,
1065 episode_id=abs_episode_id,
1066 duration_seconds=duration,
1067 progress_seconds=position,
1068 is_finished=fully_played,
1069 )
1070
1071 if media_type == MediaType.AUDIOBOOK:
1072 # guard, see progress guard class docstrings for explanation
1073 if not self.progress_guard.guard_ok_mass(item_id=prov_item_id):
1074 return
1075 self.progress_guard.add_progress(item_id=prov_item_id)
1076
1077 if media_item is None or not isinstance(media_item, Audiobook):
1078 return
1079
1080 if fully_played and position < media_item.duration - 30:
1081 # faulty position update, see above
1082 return
1083
1084 if position == 0 and not fully_played:
1085 # marked unplayed
1086 mp = await self._client.get_my_media_progress(item_id=prov_item_id)
1087 if mp is not None:
1088 await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1089 self.logger.debug(f"Removed media progress of {media_type.value}.")
1090 return
1091
1092 duration = media_item.duration
1093 updated = False
1094 if session_helper := self.sessions.get(prov_item_id):
1095 updated = await _update_by_session(session_helper=session_helper, duration=duration)
1096 if not updated:
1097 self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress")
1098 await self._client.update_my_media_progress(
1099 item_id=prov_item_id,
1100 duration_seconds=duration,
1101 progress_seconds=position,
1102 is_finished=fully_played,
1103 )
1104
1105 @handle_refresh_token
1106 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
1107 """Browse for audiobookshelf.
1108
1109 Generates this view:
1110 Library_Name_A (Audiobooks)
1111 Audiobooks
1112 Audiobook_1
1113 Audiobook_2
1114 Series
1115 Series_1
1116 Audiobook_1
1117 Audiobook_2
1118 Series_2
1119 Audiobook_3
1120 Audiobook_4
1121 Collections
1122 Collection_1
1123 Audiobook_1
1124 Audiobook_2
1125 Collection_2
1126 Audiobook_3
1127 Audiobook_4
1128 Authors
1129 Author_1
1130 Series_1
1131 Audiobook_1
1132 Audiobook_2
1133 Author_2
1134 Audiobook_3
1135 Library_Name_B (Podcasts)
1136 Podcast_1
1137 Podcast_2
1138 """
1139 item_path = path.split("://", 1)[1]
1140 if not item_path:
1141 return self._browse_root()
1142 sub_path = item_path.split("/")
1143 lib_key, lib_id = sub_path[0].split(" ")
1144 if len(sub_path) == 1:
1145 if lib_key == AbsBrowsePaths.LIBRARIES_PODCAST:
1146 return await self._browse_lib_podcasts(library_id=lib_id)
1147 return self._browse_lib_audiobooks(current_path=path)
1148 if len(sub_path) == 2:
1149 item_key = sub_path[1]
1150 match item_key:
1151 case AbsBrowsePaths.AUTHORS:
1152 return await self._browse_authors(current_path=path, library_id=lib_id)
1153 case AbsBrowsePaths.NARRATORS:
1154 return await self._browse_narrators(current_path=path, library_id=lib_id)
1155 case AbsBrowsePaths.SERIES:
1156 return await self._browse_series(current_path=path, library_id=lib_id)
1157 case AbsBrowsePaths.COLLECTIONS:
1158 return await self._browse_collections(current_path=path, library_id=lib_id)
1159 case AbsBrowsePaths.AUDIOBOOKS:
1160 return await self._browse_books(library_id=lib_id)
1161 elif len(sub_path) == 3:
1162 item_key, item_id = sub_path[1:3]
1163 match item_key:
1164 case AbsBrowsePaths.AUTHORS:
1165 return await self._browse_author_books(current_path=path, author_id=item_id)
1166 case AbsBrowsePaths.NARRATORS:
1167 return await self._browse_narrator_books(
1168 library_id=lib_id, narrator_filter_str=item_id
1169 )
1170 case AbsBrowsePaths.SERIES:
1171 return await self._browse_series_books(series_id=item_id)
1172 case AbsBrowsePaths.COLLECTIONS:
1173 return await self._browse_collection_books(collection_id=item_id)
1174 elif len(sub_path) == 4:
1175 # series within author
1176 series_id = sub_path[3]
1177 return await self._browse_series_books(series_id=series_id)
1178 return []
1179
1180 def _browse_root(self, append_mediatype_suffix: bool = True) -> Sequence[BrowseFolder]:
1181 items = []
1182
1183 def _get_folder(
1184 path: str, lib_id: str, lib_name: str, translation_key: str | None = None
1185 ) -> BrowseFolder:
1186 return BrowseFolder(
1187 item_id=lib_id,
1188 name=lib_name,
1189 translation_key=translation_key, # if given, <name>: <translation> in frontend
1190 provider=self.instance_id,
1191 path=f"{self.instance_id}://{path}",
1192 )
1193
1194 if len(self.libraries.audiobooks) == 0 and len(self.libraries.podcasts) == 0:
1195 self._log_no_libraries()
1196 return []
1197
1198 translation_key: str | None
1199 for lib_id, lib in self.libraries.audiobooks.items():
1200 path = f"{AbsBrowsePaths.LIBRARIES_BOOK} {lib_id}"
1201 translation_key = None
1202 if append_mediatype_suffix:
1203 translation_key = AbsBrowseItemsBookTranslationKey.AUDIOBOOKS
1204 items.append(
1205 _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1206 )
1207 for lib_id, lib in self.libraries.podcasts.items():
1208 path = f"{AbsBrowsePaths.LIBRARIES_PODCAST} {lib_id}"
1209 translation_key = None
1210 if append_mediatype_suffix:
1211 translation_key = AbsBrowseItemsPodcastTranslationKey.PODCASTS
1212 items.append(
1213 _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1214 )
1215 return items
1216
1217 async def _browse_lib_podcasts(self, library_id: str) -> list[MediaItemType]:
1218 """No sub categories for podcasts."""
1219 if len(self.libraries.podcasts[library_id].item_ids) == 0:
1220 self._log_no_helper_item_ids()
1221 items = []
1222 for podcast_id in self.libraries.podcasts[library_id].item_ids:
1223 mass_item = await self.mass.music.get_library_item_by_prov_id(
1224 media_type=MediaType.PODCAST,
1225 item_id=podcast_id,
1226 provider_instance_id_or_domain=self.instance_id,
1227 )
1228 if mass_item is not None:
1229 items.append(mass_item)
1230 return sorted(items, key=lambda x: x.name)
1231
1232 def _browse_lib_audiobooks(self, current_path: str) -> Sequence[BrowseFolder]:
1233 items = []
1234 for translation_key in AbsBrowseItemsBookTranslationKey:
1235 path = current_path + "/" + ABS_BROWSE_ITEMS_TO_PATH[translation_key]
1236 items.append(
1237 BrowseFolder(
1238 item_id=translation_key.lower(),
1239 name="", # use translation key
1240 translation_key=translation_key,
1241 provider=self.instance_id,
1242 path=path,
1243 )
1244 )
1245 return items
1246
1247 async def _browse_authors(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1248 abs_authors = await self._client.get_library_authors(library_id=library_id)
1249 items = []
1250 for author in abs_authors:
1251 path = f"{current_path}/{author.id_}"
1252 items.append(
1253 BrowseFolder(
1254 item_id=author.id_,
1255 name=author.name,
1256 provider=self.instance_id,
1257 path=path,
1258 )
1259 )
1260
1261 return sorted(items, key=lambda x: x.name)
1262
1263 async def _browse_narrators(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1264 abs_narrators = await self._client.get_library_narrators(library_id=library_id)
1265 items = []
1266 for narrator in abs_narrators:
1267 path = f"{current_path}/{narrator.id_}"
1268 items.append(
1269 BrowseFolder(
1270 item_id=narrator.id_,
1271 name=narrator.name,
1272 provider=self.instance_id,
1273 path=path,
1274 )
1275 )
1276
1277 return sorted(items, key=lambda x: x.name)
1278
1279 async def _browse_series(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1280 items = []
1281 async for response in self._client.get_library_series(library_id=library_id):
1282 if not response.results:
1283 break
1284 for abs_series in response.results:
1285 path = f"{current_path}/{abs_series.id_}"
1286 items.append(
1287 BrowseFolder(
1288 item_id=abs_series.id_,
1289 name=abs_series.name,
1290 provider=self.instance_id,
1291 path=path,
1292 )
1293 )
1294
1295 return sorted(items, key=lambda x: x.name)
1296
1297 async def _browse_collections(
1298 self, current_path: str, library_id: str
1299 ) -> Sequence[BrowseFolder]:
1300 items = []
1301 async for response in self._client.get_library_collections(library_id=library_id):
1302 if not response.results:
1303 break
1304 for abs_collection in response.results:
1305 path = f"{current_path}/{abs_collection.id_}"
1306 items.append(
1307 BrowseFolder(
1308 item_id=abs_collection.id_,
1309 name=abs_collection.name,
1310 provider=self.instance_id,
1311 path=path,
1312 )
1313 )
1314 return sorted(items, key=lambda x: x.name)
1315
1316 async def _browse_books(self, library_id: str) -> Sequence[MediaItemType]:
1317 if len(self.libraries.audiobooks[library_id].item_ids) == 0:
1318 self._log_no_helper_item_ids()
1319 items = []
1320 for book_id in self.libraries.audiobooks[library_id].item_ids:
1321 mass_item = await self.mass.music.get_library_item_by_prov_id(
1322 media_type=MediaType.AUDIOBOOK,
1323 item_id=book_id,
1324 provider_instance_id_or_domain=self.instance_id,
1325 )
1326 if mass_item is not None:
1327 items.append(mass_item)
1328 return sorted(items, key=lambda x: x.name)
1329
1330 async def _browse_author_books(
1331 self, current_path: str, author_id: str
1332 ) -> Sequence[MediaItemType | BrowseFolder]:
1333 items: list[MediaItemType | BrowseFolder] = []
1334
1335 abs_author = await self._client.get_author(
1336 author_id=author_id, include_items=True, include_series=True
1337 )
1338 if not isinstance(abs_author, AbsAuthorWithItemsAndSeries):
1339 raise TypeError("Unexpected type of author.")
1340
1341 book_ids = {x.id_ for x in abs_author.library_items}
1342 series_book_ids = set()
1343
1344 for series in abs_author.series:
1345 series_book_ids.update([x.id_ for x in series.items])
1346 path = f"{current_path}/{series.id_}"
1347 items.append(
1348 BrowseFolder(
1349 item_id=series.id_,
1350 # frontend does <name>: <translation>
1351 name=series.name,
1352 translation_key="series_singular",
1353 provider=self.instance_id,
1354 path=path,
1355 )
1356 )
1357 book_ids = book_ids.difference(series_book_ids)
1358 for book_id in book_ids:
1359 mass_item = await self.mass.music.get_library_item_by_prov_id(
1360 media_type=MediaType.AUDIOBOOK,
1361 item_id=book_id,
1362 provider_instance_id_or_domain=self.instance_id,
1363 )
1364 if mass_item is not None:
1365 items.append(mass_item)
1366
1367 return items
1368
1369 async def _browse_narrator_books(
1370 self, library_id: str, narrator_filter_str: str
1371 ) -> Sequence[MediaItemType]:
1372 items: list[MediaItemType] = []
1373 async for response in self._client.get_library_items(
1374 library_id=library_id, filter_str=f"narrators.{narrator_filter_str}"
1375 ):
1376 if not response.results:
1377 break
1378 for item in response.results:
1379 mass_item = await self.mass.music.get_library_item_by_prov_id(
1380 media_type=MediaType.AUDIOBOOK,
1381 item_id=item.id_,
1382 provider_instance_id_or_domain=self.instance_id,
1383 )
1384 if mass_item is not None:
1385 items.append(mass_item)
1386
1387 return sorted(items, key=lambda x: x.name)
1388
1389 async def _browse_series_books(self, series_id: str) -> Sequence[MediaItemType]:
1390 items = []
1391
1392 abs_series = await self._client.get_series(series_id=series_id, include_progress=True)
1393 if not isinstance(abs_series, AbsSeriesWithProgress):
1394 raise TypeError("Unexpected series type.")
1395
1396 for book_id in abs_series.progress.library_item_ids:
1397 # these are sorted in abs by sequence
1398 mass_item = await self.mass.music.get_library_item_by_prov_id(
1399 media_type=MediaType.AUDIOBOOK,
1400 item_id=book_id,
1401 provider_instance_id_or_domain=self.instance_id,
1402 )
1403 if mass_item is not None:
1404 items.append(mass_item)
1405
1406 return items
1407
1408 async def _browse_collection_books(self, collection_id: str) -> Sequence[MediaItemType]:
1409 items = []
1410 abs_collection = await self._client.get_collection(collection_id=collection_id)
1411 for book in abs_collection.books:
1412 mass_item = await self.mass.music.get_library_item_by_prov_id(
1413 media_type=MediaType.AUDIOBOOK,
1414 item_id=book.id_,
1415 provider_instance_id_or_domain=self.instance_id,
1416 )
1417 if mass_item is not None:
1418 items.append(mass_item)
1419 return items
1420
1421 async def _socket_abs_item_changed(
1422 self, items: LibraryItemExpanded | list[LibraryItemExpanded]
1423 ) -> None:
1424 """For added and updated."""
1425 abs_items = [items] if isinstance(items, LibraryItemExpanded) else items
1426 for abs_item in abs_items:
1427 if isinstance(abs_item, LibraryItemExpandedBook):
1428 # If the book has no audiofiles, we skip -> ebook only.
1429 if len(abs_item.media.tracks) == 0:
1430 continue
1431 self.logger.debug(
1432 'Updated book "%s" via socket.', abs_item.media.metadata.title or ""
1433 )
1434 await self.mass.music.audiobooks.add_item_to_library(
1435 parse_audiobook(
1436 abs_audiobook=abs_item,
1437 instance_id=self.instance_id,
1438 domain=self.domain,
1439 token=self._client.token,
1440 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1441 ),
1442 overwrite_existing=True,
1443 )
1444 lib = self.libraries.audiobooks.get(abs_item.library_id, None)
1445 if lib is not None:
1446 lib.item_ids.add(abs_item.id_)
1447 elif isinstance(abs_item, LibraryItemExpandedPodcast):
1448 self.logger.debug(
1449 'Updated podcast "%s" via socket.', abs_item.media.metadata.title or ""
1450 )
1451 mass_podcast = parse_podcast(
1452 abs_podcast=abs_item,
1453 instance_id=self.instance_id,
1454 domain=self.domain,
1455 token=self._client.token,
1456 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1457 )
1458 if not (
1459 bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
1460 and mass_podcast.total_episodes == 0
1461 ):
1462 await self.mass.music.podcasts.add_item_to_library(
1463 mass_podcast,
1464 overwrite_existing=True,
1465 )
1466 lib = self.libraries.podcasts.get(abs_item.library_id, None)
1467 if lib is not None:
1468 lib.item_ids.add(abs_item.id_)
1469 await self._cache_set_helper_libraries()
1470
1471 async def _socket_abs_item_removed(self, item: LibraryItemRemoved) -> None:
1472 """Item removed."""
1473 media_type: MediaType | None = None
1474 for lib in self.libraries.audiobooks.values():
1475 if item.id_ in lib.item_ids:
1476 media_type = MediaType.AUDIOBOOK
1477 lib.item_ids.remove(item.id_)
1478 break
1479 for lib in self.libraries.podcasts.values():
1480 if item.id_ in lib.item_ids:
1481 media_type = MediaType.PODCAST
1482 lib.item_ids.remove(item.id_)
1483 break
1484
1485 if media_type is not None:
1486 mass_item = await self.mass.music.get_library_item_by_prov_id(
1487 media_type=media_type,
1488 item_id=item.id_,
1489 provider_instance_id_or_domain=self.instance_id,
1490 )
1491 if mass_item is not None:
1492 await self.mass.music.remove_item_from_library(
1493 media_type=media_type, library_item_id=mass_item.item_id
1494 )
1495 self.logger.debug('Removed %s "%s" via socket.', media_type.value, mass_item.name)
1496
1497 await self._cache_set_helper_libraries()
1498
1499 async def _socket_abs_user_item_progress_updated(
1500 self, id_: str, progress: MediaProgress
1501 ) -> None:
1502 """To update continue listening.
1503
1504 ABS reports every 15s and immediately on play state change.
1505 This callback is called per item if a progress is changed:
1506 - a change in position
1507 - the item is finished
1508 But it is _not_called, if a progress is reset/ discarded.
1509 """
1510 # guard, see progress guard class docstrings for explanation
1511 if not self.progress_guard.guard_ok_abs(abs_progress=progress):
1512 return
1513
1514 known_ids = self._get_all_known_item_ids()
1515 if progress.library_item_id not in known_ids:
1516 return
1517
1518 self.logger.debug(f"Updated progress of item {progress.library_item_id} via socket.")
1519
1520 if progress.episode_id is None:
1521 await self._update_playlog_book(progress)
1522 return
1523 await self._update_playlog_episode(progress)
1524
1525 async def _socket_abs_refresh_token_expired(self) -> None:
1526 await self.reauthenticate()
1527
1528 async def _socket_stream_open(self, stream: AbsStream) -> None:
1529 # stream's id is the same as the playback session id
1530 for session_helper in self.sessions.values():
1531 if session_helper.abs_session_id == stream.id_:
1532 session_helper.hls_stream_open.set()
1533 break
1534
1535 async def reauthenticate(self) -> None:
1536 """Reauthorize the abs session config if refresh token expired."""
1537 # some safe guarding should that function be called simultaneously
1538 if self.reauthenticate_lock.locked() or time.time() - self.reauthenticate_last < 5:
1539 while True:
1540 if not self.reauthenticate_lock.locked():
1541 return
1542 await asyncio.sleep(0.5)
1543 async with self.reauthenticate_lock:
1544 await self._client.session_config.authenticate(
1545 username=str(self.config.get_value(CONF_USERNAME)),
1546 password=str(self.config.get_value(CONF_PASSWORD)),
1547 )
1548 self.reauthenticate_last = time.time()
1549
1550 def _get_all_known_item_ids(self) -> set[str]:
1551 known_ids = set()
1552 for lib in self.libraries.podcasts.values():
1553 known_ids.update(lib.item_ids)
1554 for lib in self.libraries.audiobooks.values():
1555 known_ids.update(lib.item_ids)
1556
1557 return known_ids
1558
1559 async def _set_playlog_from_user(self, user: User) -> None:
1560 """Update on user callback.
1561
1562 User holds also all media progresses specific to that user.
1563
1564 The function 'guard_ok_abs' uses the timestamp of the last update in abs, thus after an
1565 initial progress update, an unchanged update will not trigger a (useless) playlog update.
1566
1567 We do not sync removed progresses for the sake of simplicity.
1568 """
1569 await self._set_playlog_from_user_sync(user.media_progress)
1570
1571 async def _set_playlog_from_user_sync(self, progresses: list[MediaProgress]) -> None:
1572 # for debugging
1573 __updated_items = 0
1574
1575 known_ids = self._get_all_known_item_ids()
1576 abs_ids_with_progress = set()
1577
1578 for progress in progresses:
1579 # save progress ids for later
1580 ma_item_id = (
1581 progress.library_item_id
1582 if progress.episode_id is None
1583 else f"{progress.library_item_id} {progress.episode_id}"
1584 )
1585 abs_ids_with_progress.add(ma_item_id)
1586
1587 # Guard. Also makes sure, that we don't write to db again if no state change happened.
1588 # This is achieved by adding a Helper Progress in the update playlog functions, which
1589 # then has the most recent timestamp. If a subsequent progress sent by abs has an older
1590 # timestamp, we do not update again.
1591 if not self.progress_guard.guard_ok_abs(progress):
1592 continue
1593 if progress.current_time is not None:
1594 if int(progress.current_time) != 0 and not progress.current_time >= 30:
1595 # same as mass default, only > 30s
1596 continue
1597 if progress.library_item_id not in known_ids:
1598 continue
1599 __updated_items += 1
1600 if progress.episode_id is None:
1601 await self._update_playlog_book(progress)
1602 else:
1603 await self._update_playlog_episode(progress)
1604 self.logger.debug(f"Updated {__updated_items} from full playlog.")
1605
1606 # Get MA's known progresses of ABS.
1607 # In ABS the user may discard a progress, which removes the progress completely.
1608 # There is no socket notification for this event.
1609 ma_playlog_state = await self.mass.music.get_playlog_provider_item_ids(
1610 provider_instance_id=self.instance_id
1611 )
1612 ma_ids_with_progress = {x for _, x in ma_playlog_state}
1613 discarded_progress_ids = ma_ids_with_progress.difference(abs_ids_with_progress)
1614 for discarded_progress_id in discarded_progress_ids:
1615 if len(discarded_progress_id.split(" ")) == 1:
1616 if discarded_item := await self.mass.music.get_library_item_by_prov_id(
1617 media_type=MediaType.AUDIOBOOK,
1618 item_id=discarded_progress_id,
1619 provider_instance_id_or_domain=self.instance_id,
1620 ):
1621 self.progress_guard.add_progress(discarded_progress_id)
1622 await self.mass.music.mark_item_unplayed(discarded_item)
1623 else:
1624 with suppress(MediaNotFoundError):
1625 discarded_item = await self.get_podcast_episode(
1626 prov_episode_id=discarded_progress_id, add_progress=False
1627 )
1628 self.progress_guard.add_progress(*discarded_progress_id.split(" "))
1629 await self.mass.music.mark_item_unplayed(discarded_item)
1630 self.logger.debug("Discarded item %s ", discarded_progress_id)
1631
1632 async def _update_playlog_book(self, progress: MediaProgress) -> None:
1633 # helper progress also ensures no useless progress updates,
1634 # see comment above
1635 self.progress_guard.add_progress(progress.library_item_id)
1636 if progress.current_time is None:
1637 return
1638 mass_audiobook = await self.mass.music.get_library_item_by_prov_id(
1639 media_type=MediaType.AUDIOBOOK,
1640 item_id=progress.library_item_id,
1641 provider_instance_id_or_domain=self.instance_id,
1642 )
1643 if mass_audiobook is None:
1644 return
1645 if int(progress.current_time) == 0:
1646 await self.mass.music.mark_item_unplayed(mass_audiobook)
1647 else:
1648 await self.mass.music.mark_item_played(
1649 mass_audiobook,
1650 fully_played=progress.is_finished,
1651 seconds_played=int(progress.current_time),
1652 )
1653
1654 async def _update_playlog_episode(self, progress: MediaProgress) -> None:
1655 # helper progress also ensures no useless progress updates,
1656 # see comment above
1657 self.progress_guard.add_progress(progress.library_item_id, progress.episode_id)
1658 if progress.current_time is None:
1659 return
1660 _episode_id = f"{progress.library_item_id} {progress.episode_id}"
1661 try:
1662 # need to obtain full podcast, and then search for episode
1663 mass_episode = await self.get_podcast_episode(_episode_id, add_progress=False)
1664 except MediaNotFoundError:
1665 return
1666 if int(progress.current_time) == 0:
1667 await self.mass.music.mark_item_unplayed(mass_episode)
1668 else:
1669 await self.mass.music.mark_item_played(
1670 mass_episode,
1671 fully_played=progress.is_finished,
1672 seconds_played=int(progress.current_time),
1673 )
1674
1675 async def _cache_set_helper_libraries(self) -> None:
1676 await self.mass.cache.set(
1677 key=CACHE_KEY_LIBRARIES,
1678 provider=self.instance_id,
1679 category=CACHE_CATEGORY_LIBRARIES,
1680 data=self.libraries.to_dict(),
1681 )
1682
1683 def _log_no_libraries(self) -> None:
1684 self.logger.error("There are no libraries visible to the Audiobookshelf provider.")
1685
1686 def _log_no_helper_item_ids(self) -> None:
1687 self.logger.warning(
1688 "Cached item ids are missing. "
1689 "Please trigger a full resync of the Audiobookshelf provider manually."
1690 )
1691