/
/
/
1"""Audiobookshelf (abs) provider for Music Assistant."""
2
3from __future__ import annotations
4
5import asyncio
6import functools
7import itertools
8import time
9from collections.abc import AsyncGenerator, Callable, Coroutine, Sequence
10from contextlib import suppress
11from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
12
13import aioaudiobookshelf as aioabs
14from aioaudiobookshelf.client.items import LibraryItemExpandedBook as AbsLibraryItemExpandedBook
15from aioaudiobookshelf.client.items import (
16 LibraryItemExpandedPodcast as AbsLibraryItemExpandedPodcast,
17)
18from aioaudiobookshelf.client.items import PlaybackSessionExpanded as AbsPlaybackSessionExpanded
19from aioaudiobookshelf.client.items import PlaybackSessionParameters as AbsPlaybackSessionParameters
20from aioaudiobookshelf.client.session import SyncOpenSessionParameters
21from aioaudiobookshelf.exceptions import LoginError as AbsLoginError
22from aioaudiobookshelf.exceptions import RefreshTokenExpiredError
23from aioaudiobookshelf.exceptions import SessionNotFoundError as AbsSessionNotFoundError
24from aioaudiobookshelf.schema.author import AuthorExpanded
25from aioaudiobookshelf.schema.calls_authors import (
26 AuthorWithItemsAndSeries as AbsAuthorWithItemsAndSeries,
27)
28from aioaudiobookshelf.schema.calls_series import SeriesWithProgress as AbsSeriesWithProgress
29from aioaudiobookshelf.schema.library import (
30 LibraryItemExpanded,
31 LibraryItemExpandedBook,
32 LibraryItemExpandedPodcast,
33 LibraryItemMinifiedPodcast,
34)
35from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType
36from aioaudiobookshelf.schema.session import DeviceInfo as AbsDeviceInfo
37from aioaudiobookshelf.schema.shelf import (
38 LibraryItemMinifiedPodcast as ShelfLibraryItemMinifiedPodcast,
39)
40from aioaudiobookshelf.schema.shelf import (
41 SeriesShelf,
42 ShelfAuthors,
43 ShelfBook,
44 ShelfEpisode,
45 ShelfLibraryItemMinified,
46 ShelfPodcast,
47 ShelfSeries,
48)
49from aioaudiobookshelf.schema.shelf import ShelfId as AbsShelfId
50from aioaudiobookshelf.schema.shelf import ShelfType as AbsShelfType
51from aiohttp import web
52from music_assistant_models.config_entries import (
53 ConfigEntry,
54 ConfigValueType,
55 ProviderConfig,
56)
57from music_assistant_models.enums import (
58 ConfigEntryType,
59 ContentType,
60 MediaType,
61 ProviderFeature,
62 StreamType,
63)
64from music_assistant_models.errors import LoginFailed, MediaNotFoundError
65from music_assistant_models.media_items import (
66 Audiobook,
67 AudioFormat,
68 BrowseFolder,
69 ItemMapping,
70 MediaItemType,
71 PodcastEpisode,
72 UniqueList,
73)
74from music_assistant_models.media_items.media_item import RecommendationFolder
75from music_assistant_models.streamdetails import MultiPartPath, StreamDetails
76
77from music_assistant.constants import PLAYBACK_REPORT_INTERVAL_SECONDS
78from music_assistant.models.music_provider import MusicProvider
79from music_assistant.providers.audiobookshelf.parsers import (
80 parse_audiobook,
81 parse_podcast,
82 parse_podcast_episode,
83)
84
85from .constants import (
86 ABS_BROWSE_ITEMS_TO_PATH,
87 ABS_SHELF_ID_ICONS,
88 ABS_SHELF_ID_TRANSLATION_KEY,
89 AIOHTTP_TIMEOUT,
90 CACHE_CATEGORY_LIBRARIES,
91 CACHE_KEY_LIBRARIES,
92 CONF_API_TOKEN,
93 CONF_HIDE_EMPTY_PODCASTS,
94 CONF_HLS_FORMATS,
95 CONF_OLD_TOKEN,
96 CONF_PASSWORD,
97 CONF_URL,
98 CONF_USE_HLS,
99 CONF_USERNAME,
100 CONF_VERIFY_SSL,
101 HLS_ALL_FORMATS,
102 HLS_FORMATS_SPLIT,
103 AbsBrowseItemsBookTranslationKey,
104 AbsBrowseItemsPodcastTranslationKey,
105 AbsBrowsePaths,
106)
107from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard, SessionHelper
108
109if TYPE_CHECKING:
110 from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved
111 from aioaudiobookshelf.schema.media_progress import MediaProgress
112 from aioaudiobookshelf.schema.streams import Stream as AbsStream
113 from aioaudiobookshelf.schema.user import User
114 from music_assistant_models.media_items import Podcast
115 from music_assistant_models.provider import ProviderManifest
116
117 from music_assistant.mass import MusicAssistant
118 from music_assistant.models import ProviderInstanceType
119
120SUPPORTED_FEATURES = {
121 ProviderFeature.LIBRARY_PODCASTS,
122 ProviderFeature.LIBRARY_AUDIOBOOKS,
123 ProviderFeature.BROWSE,
124 ProviderFeature.RECOMMENDATIONS,
125}
126
127
128async def setup(
129 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
130) -> ProviderInstanceType:
131 """Initialize provider(instance) with given configuration."""
132 return Audiobookshelf(mass, manifest, config, SUPPORTED_FEATURES)
133
134
135async def get_config_entries(
136 mass: MusicAssistant,
137 instance_id: str | None = None,
138 action: str | None = None,
139 values: dict[str, ConfigValueType] | None = None,
140) -> tuple[ConfigEntry, ...]:
141 """
142 Return Config entries to setup this provider.
143
144 instance_id: id of an existing provider instance (None if new instance setup).
145 action: [optional] action key called from config entries UI.
146 values: the (intermediate) raw values for config entries sent with the action.
147 """
148 # ruff: noqa: ARG001
149 return (
150 ConfigEntry(
151 key="label",
152 type=ConfigEntryType.LABEL,
153 label="Please provide the address of your Audiobookshelf instance. To authenticate "
154 "you have two options: "
155 "a) Provide username AND password. Leave the API key empty. "
156 "b) Provide ONLY an API key.",
157 ),
158 ConfigEntry(
159 key=CONF_URL,
160 type=ConfigEntryType.STRING,
161 label="Server",
162 required=True,
163 description="The URL of the Audiobookshelf server to connect to. For example "
164 "https://abs.domain.tld/ or http://192.168.1.4:13378/",
165 ),
166 ConfigEntry(
167 key=CONF_USERNAME,
168 type=ConfigEntryType.STRING,
169 label="Username",
170 required=False,
171 description="The username to authenticate to the remote server.",
172 ),
173 ConfigEntry(
174 key=CONF_PASSWORD,
175 type=ConfigEntryType.SECURE_STRING,
176 label="Password",
177 required=False,
178 description="The password to authenticate to the remote server.",
179 ),
180 ConfigEntry(
181 key=CONF_API_TOKEN,
182 type=ConfigEntryType.SECURE_STRING,
183 label="API key _instead_ of user/ password. (ABS version >= 2.26)",
184 required=False,
185 description="Instead of using a username and password, "
186 "you may provide an API key (ABS version >= 2.26). "
187 "Please consult the docs.",
188 ),
189 ConfigEntry(
190 key=CONF_OLD_TOKEN,
191 type=ConfigEntryType.SECURE_STRING,
192 label="old token",
193 required=False,
194 hidden=True,
195 ),
196 ConfigEntry(
197 key=CONF_USE_HLS,
198 type=ConfigEntryType.BOOLEAN,
199 label="Stream via HLS from ABS.",
200 description="Use an HLS stream when streaming from audiobookshelf.",
201 required=False,
202 default_value=False,
203 advanced=True,
204 ),
205 ConfigEntry(
206 key=CONF_HLS_FORMATS,
207 type=ConfigEntryType.STRING,
208 label=f"Use HLS for these file extensions. Separate with ';'. Use {HLS_ALL_FORMATS} for"
209 " all formats.",
210 description="Use HLS only for these file extensions."
211 f" Separate with ;. E.g. m4b or m4b;aac or {HLS_ALL_FORMATS}",
212 required=False,
213 default_value="m4b",
214 advanced=True,
215 ),
216 ConfigEntry(
217 key=CONF_VERIFY_SSL,
218 type=ConfigEntryType.BOOLEAN,
219 label="Verify SSL",
220 required=False,
221 description="Whether or not to verify the certificate of SSL/TLS connections.",
222 advanced=True,
223 default_value=True,
224 ),
225 ConfigEntry(
226 key=CONF_HIDE_EMPTY_PODCASTS,
227 type=ConfigEntryType.BOOLEAN,
228 label="Hide empty podcasts.",
229 required=False,
230 description="This will skip podcasts with no episodes associated.",
231 advanced=True,
232 default_value=False,
233 ),
234 )
235
236
237R = TypeVar("R")
238P = ParamSpec("P")
239
240
241class Audiobookshelf(MusicProvider):
242 """Audiobookshelf MusicProvider."""
243
244 _on_unload_callbacks: list[Callable[[], None]]
245
246 @staticmethod
247 def handle_refresh_token(
248 method: Callable[P, Coroutine[Any, Any, R]],
249 ) -> Callable[P, Coroutine[Any, Any, R]]:
250 """Decorate a method to handle an expired refresh token by relogin."""
251
252 @functools.wraps(method)
253 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
254 self = cast("Audiobookshelf", args[0])
255 try:
256 return await method(*args, **kwargs)
257 except RefreshTokenExpiredError:
258 self.logger.debug("Refresh token expired. Trying to renew.")
259 await self.reauthenticate()
260 return await method(*args, **kwargs)
261
262 return wrapper
263
264 async def handle_async_init(self) -> None:
265 """Pass config values to client and initialize."""
266 self._on_unload_callbacks: list[Callable[[], None]] = []
267 self.sessions: dict[str, SessionHelper] = {} # key is the mass_item_id
268 self.create_session_lock = asyncio.Lock()
269 base_url = str(self.config.get_value(CONF_URL))
270 username = str(self.config.get_value(CONF_USERNAME))
271 password = str(self.config.get_value(CONF_PASSWORD))
272 token_old = self.config.get_value(CONF_OLD_TOKEN)
273 token_api = self.config.get_value(CONF_API_TOKEN)
274 verify_ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
275 session_config = aioabs.SessionConfiguration(
276 session=self.mass.http_session,
277 url=base_url,
278 verify_ssl=verify_ssl,
279 logger=self.logger,
280 pagination_items_per_page=30, # audible provider goes with 50 for pagination
281 timeout=AIOHTTP_TIMEOUT,
282 )
283 # If we are configured with a non-expiring API key or not.
284 self.is_token_user = False
285 try:
286 if token_api is not None or token_old is not None:
287 _token = token_api if token_api is not None else token_old
288 session_config.token = str(_token)
289 (
290 self._client,
291 self._client_socket,
292 ) = await aioabs.get_user_and_socket_client_by_token(session_config=session_config)
293 self.is_token_user = True
294 else:
295 self._client, self._client_socket = await aioabs.get_user_and_socket_client(
296 session_config=session_config, username=username, password=password
297 )
298 await self._client_socket.init_client()
299 except AbsLoginError as exc:
300 raise LoginFailed(f"Login to abs instance at {base_url} failed.") from exc
301
302 if token_old is not None and token_api is None:
303 # Log Message that the old token won't work
304 _version = self._client.server_settings.version.split(".")
305 if len(_version) >= 2:
306 try:
307 major, minor = int(_version[0]), int(_version[1])
308 except ValueError:
309 major = minor = 0
310 if major >= 2 and minor >= 26:
311 self.logger.warning(
312 """
313
314######## Audiobookshelf API key change #############################################################
315
316Audiobookshelf introduced a new API key system in version 2.26 (JWT).
317You are still using a token configured with a previous version of Audiobookshelf,
318but you are running version %s. This will stop working in a future Audiobookshelf release.
319Please create a non-expiring API Key instead, and update your configuration accordingly.
320Refer to the documentation of Audiobookshelf, https://www.audiobookshelf.org/guides/api-keys/
321and of Music Assistant https://www.music-assistant.io/music-providers/audiobookshelf/
322for more details.
323
324""",
325 self._client.server_settings.version,
326 )
327
328 cached_libraries = await self.mass.cache.get(
329 key=CACHE_KEY_LIBRARIES,
330 provider=self.instance_id,
331 category=CACHE_CATEGORY_LIBRARIES,
332 default=None,
333 )
334 if cached_libraries is None:
335 self.libraries = LibrariesHelper()
336 # We need the library ids for recommendations. If the cache got cleared e.g. by a db
337 # migration, we might end up with empty library helpers on a configured provider. Note,
338 # that the lib item ids are not synced, still only on full provider sync, instead the
339 # sets are empty. Full sync is expensive.
340 # See warning in browse_lib_podcasts / _browse_books
341 libraries = await self._client.get_all_libraries()
342 for library in libraries:
343 if library.media_type == AbsLibraryMediaType.BOOK:
344 self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
345 elif library.media_type == AbsLibraryMediaType.PODCAST:
346 self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
347 else:
348 self.libraries = LibrariesHelper.from_dict(cached_libraries)
349
350 # set socket callbacks
351 self._client_socket.set_item_callbacks(
352 on_item_added=self._socket_abs_item_changed,
353 on_item_updated=self._socket_abs_item_changed,
354 on_item_removed=self._socket_abs_item_removed,
355 on_items_added=self._socket_abs_item_changed,
356 on_items_updated=self._socket_abs_item_changed,
357 )
358
359 self._client_socket.set_user_callbacks(
360 on_user_item_progress_updated=self._socket_abs_user_item_progress_updated,
361 )
362
363 self._client_socket.set_refresh_token_expired_callback(
364 on_refresh_token_expired=self._socket_abs_refresh_token_expired
365 )
366
367 self._client_socket.set_stream_callbacks(on_stream_open=self._socket_stream_open)
368
369 # progress guard
370 self.progress_guard = ProgressGuard()
371
372 # safe guard reauthentication
373 self.reauthenticate_lock = asyncio.Lock()
374 self.reauthenticate_last = 0.0
375
376 # register dynamic stream route for audiobook parts
377 self._on_unload_callbacks.append(
378 self.mass.streams.register_dynamic_route(
379 f"/{self.instance_id}_part_stream", self._handle_session_part_request
380 )
381 )
382
383 @handle_refresh_token
384 async def unload(self, is_removed: bool = False) -> None:
385 """
386 Handle unload/close of the provider.
387
388 Called when provider is deregistered (e.g. MA exiting or config reloading).
389 is_removed will be set to True when the provider is removed from the configuration.
390 """
391 await self._client.logout()
392 await self._client_socket.logout()
393 for callback in self._on_unload_callbacks:
394 callback()
395
396 @property
397 def is_streaming_provider(self) -> bool:
398 """Return True if the provider is a streaming provider."""
399 # For streaming providers return True here but for local file based providers return False.
400 return False
401
402 @handle_refresh_token
403 async def sync_library(self, media_type: MediaType) -> None:
404 """Obtain audiobook library ids and podcast library ids."""
405 libraries = await self._client.get_all_libraries()
406 if len(libraries) == 0:
407 self._log_no_libraries()
408 for library in libraries:
409 if library.media_type == AbsLibraryMediaType.BOOK and media_type == MediaType.AUDIOBOOK:
410 self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
411 elif (
412 library.media_type == AbsLibraryMediaType.PODCAST
413 and media_type == MediaType.PODCAST
414 ):
415 self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
416 await super().sync_library(media_type)
417 await self._cache_set_helper_libraries()
418
419 # update playlog
420 user = await self._client.get_my_user()
421 await self._set_playlog_from_user(user)
422
423 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
424 """Retrieve library/subscribed podcasts from the provider.
425
426 Minified podcast information is enough.
427 """
428 for pod_lib_id in self.libraries.podcasts:
429 async for response in self._client.get_library_items(library_id=pod_lib_id):
430 if not response.results:
431 break
432 podcast_ids = [x.id_ for x in response.results]
433 # store uuids
434 self.libraries.podcasts[pod_lib_id].item_ids.update(podcast_ids)
435 for podcast_minified in response.results:
436 assert isinstance(podcast_minified, LibraryItemMinifiedPodcast)
437 mass_podcast = parse_podcast(
438 abs_podcast=podcast_minified,
439 instance_id=self.instance_id,
440 domain=self.domain,
441 token=self._client.token,
442 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
443 )
444 if (
445 bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
446 and mass_podcast.total_episodes == 0
447 ):
448 continue
449 yield mass_podcast
450
451 @handle_refresh_token
452 async def _get_abs_expanded_podcast(
453 self, prov_podcast_id: str
454 ) -> AbsLibraryItemExpandedPodcast:
455 abs_podcast = await self._client.get_library_item_podcast(
456 podcast_id=prov_podcast_id, expanded=True
457 )
458 assert isinstance(abs_podcast, AbsLibraryItemExpandedPodcast)
459
460 return abs_podcast
461
462 @handle_refresh_token
463 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
464 """Get single podcast."""
465 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
466 return parse_podcast(
467 abs_podcast=abs_podcast,
468 instance_id=self.instance_id,
469 domain=self.domain,
470 token=self._client.token,
471 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
472 )
473
474 async def get_podcast_episodes(
475 self, prov_podcast_id: str
476 ) -> AsyncGenerator[PodcastEpisode, None]:
477 """Get all podcast episodes of podcast.
478
479 Adds progress information.
480 """
481 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
482 episode_cnt = 1
483 # the user has the progress of all media items
484 # so we use a single api call here to obtain possibly many
485 # progresses for episodes
486 user = await self._client.get_my_user()
487 abs_progresses = {
488 x.episode_id: x
489 for x in user.media_progress
490 if x.episode_id is not None and x.library_item_id == prov_podcast_id
491 }
492 for abs_episode in abs_podcast.media.episodes:
493 progress = abs_progresses.get(abs_episode.id_, None)
494 mass_episode = parse_podcast_episode(
495 episode=abs_episode,
496 prov_podcast_id=prov_podcast_id,
497 fallback_episode_cnt=episode_cnt,
498 instance_id=self.instance_id,
499 domain=self.domain,
500 token=self._client.token,
501 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
502 media_progress=progress,
503 add_cover=bool(abs_podcast.media.cover_path or False),
504 )
505 yield mass_episode
506 episode_cnt += 1
507
508 @handle_refresh_token
509 async def get_podcast_episode(
510 self, prov_episode_id: str, add_progress: bool = True
511 ) -> PodcastEpisode:
512 """Get single podcast episode."""
513 prov_podcast_id, e_id = prov_episode_id.split(" ")
514 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
515 episode_cnt = 1
516 for abs_episode in abs_podcast.media.episodes:
517 if abs_episode.id_ == e_id:
518 progress = None
519 if add_progress:
520 progress = await self._client.get_my_media_progress(
521 item_id=prov_podcast_id, episode_id=abs_episode.id_
522 )
523 return parse_podcast_episode(
524 episode=abs_episode,
525 prov_podcast_id=prov_podcast_id,
526 fallback_episode_cnt=episode_cnt,
527 instance_id=self.instance_id,
528 domain=self.domain,
529 token=self._client.token,
530 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
531 media_progress=progress,
532 add_cover=bool(abs_podcast.media.cover_path or False),
533 )
534
535 episode_cnt += 1
536 raise MediaNotFoundError("Episode not found")
537
538 async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
539 """Get Audiobook libraries.
540
541 Need expanded version for chapters.
542 """
543 for book_lib_id in self.libraries.audiobooks:
544 async for response in self._client.get_library_items(library_id=book_lib_id):
545 if not response.results:
546 break
547 book_ids = [x.id_ for x in response.results]
548 # store uuids
549 self.libraries.audiobooks[book_lib_id].item_ids.update(book_ids)
550 # use expanded version for chapters/ caching.
551 books_expanded = await self._client.get_library_item_batch_book(item_ids=book_ids)
552 for book_expanded in books_expanded:
553 # If the book has no audiofiles, we skip -> ebook only.
554 if len(book_expanded.media.tracks) == 0:
555 continue
556 mass_audiobook = parse_audiobook(
557 abs_audiobook=book_expanded,
558 instance_id=self.instance_id,
559 domain=self.domain,
560 token=self._client.token,
561 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
562 )
563 yield mass_audiobook
564
565 @handle_refresh_token
566 async def _get_abs_expanded_audiobook(
567 self, prov_audiobook_id: str
568 ) -> AbsLibraryItemExpandedBook:
569 abs_audiobook = await self._client.get_library_item_book(
570 book_id=prov_audiobook_id, expanded=True
571 )
572 assert isinstance(abs_audiobook, AbsLibraryItemExpandedBook)
573
574 return abs_audiobook
575
576 @handle_refresh_token
577 async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
578 """Get a single audiobook.
579
580 Progress is added here.
581 """
582 progress = await self._client.get_my_media_progress(item_id=prov_audiobook_id)
583 abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=prov_audiobook_id)
584 return parse_audiobook(
585 abs_audiobook=abs_audiobook,
586 instance_id=self.instance_id,
587 domain=self.domain,
588 token=self._client.token,
589 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
590 media_progress=progress,
591 )
592
593 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
594 """Get stream of item."""
595 # We always create a playback session. The default is direct playback.
596 # In that case, session.tracks holds the exact same as the audiobook/ podcast.track,
597 # so we only use the session to update our progress.
598 #
599 # In the case of hls the session has an hls stream as track.
600 if media_type in (MediaType.PODCAST_EPISODE, MediaType.AUDIOBOOK):
601 session = await self._get_playback_session(mass_item_id=item_id)
602 return await self._get_stream_details_session(
603 session, session_helper=self.sessions[item_id], media_type=media_type
604 )
605 raise MediaNotFoundError("Stream unknown")
606
607 async def _get_stream_details_session(
608 self,
609 abs_session: AbsPlaybackSessionExpanded,
610 session_helper: SessionHelper,
611 media_type: MediaType,
612 ) -> StreamDetails:
613 """Streamdetails audiobook.
614
615 We always use a custom stream type, also for single file, such
616 that we can handle an ffmpeg error and refresh our tokens.
617 """
618 abs_base_url = str(self.config.get_value(CONF_URL))
619 tracks = abs_session.audio_tracks
620
621 if len(tracks) == 0:
622 raise MediaNotFoundError("Session has no tracks.")
623
624 content_type = ContentType.UNKNOWN
625 if abs_session.audio_tracks[0].metadata is not None:
626 content_type = ContentType.try_parse(abs_session.audio_tracks[0].metadata.ext)
627
628 file_parts: list[MultiPartPath] = []
629 if self.is_token_user:
630 self.logger.debug("Token User - Streams are direct.")
631 for idx, track in enumerate(tracks):
632 if self.is_token_user:
633 # an api key is long-lived
634 stream_url = f"{abs_base_url}{track.content_url}?token={self._client.token}"
635 else:
636 # to ensure token is always valid, we create a dynamic url
637 # this ensures that we always get a fresh token on each part
638 # without having to deal with a custom stream etc.
639 # we also use this for a single track/ hls stream, otherwise we can't seek
640 stream_url = (
641 f"{self.mass.streams.base_url}/{self.instance_id}_part_stream?"
642 f"session_id={abs_session.id_}&part_id={idx}"
643 )
644 file_parts.append(MultiPartPath(path=stream_url, duration=track.duration))
645
646 stream_type = StreamType.HLS if "hls" in file_parts[0].path else StreamType.HTTP
647 if stream_type == StreamType.HLS:
648 # wait for stream to be ready
649 try:
650 await asyncio.wait_for(session_helper.hls_stream_open.wait(), 10)
651 except TimeoutError:
652 self.logger.warning(
653 "Did not receive HLS stream open event after 10s, continuing anyways."
654 )
655
656 return StreamDetails(
657 provider=self.instance_id,
658 item_id=abs_session.id_,
659 audio_format=AudioFormat(content_type=content_type),
660 media_type=media_type,
661 stream_type=stream_type,
662 duration=int(abs_session.duration),
663 path=file_parts[0].path if len(file_parts) == 1 else file_parts,
664 can_seek=True,
665 allow_seek=True,
666 )
667
668 async def _get_playback_session(self, mass_item_id: str) -> AbsPlaybackSessionExpanded:
669 """Either creates or returns an open abs session."""
670 async with self.create_session_lock:
671 # check for an available open session
672 if session_helper := self.sessions.get(mass_item_id):
673 # reset here, as this is our "time listened".
674 session_helper.last_sync_time = time.time()
675 with suppress(AbsSessionNotFoundError):
676 return await self._client.get_open_session(
677 session_id=session_helper.abs_session_id
678 )
679
680 item_ids = mass_item_id.split(" ")
681 abs_item_id = item_ids[0]
682 episode_id = item_ids[1] if len(item_ids) == 2 else None
683
684 # Create a new session
685 ## Check HLS usage
686 use_hls = bool(self.config.get_value(CONF_USE_HLS))
687 hls_formats = str(self.config.get_value(CONF_HLS_FORMATS))
688 if use_hls and hls_formats != HLS_ALL_FORMATS:
689 use_hls = False # only for certain formats
690 extensions = [x.lstrip(".") for x in hls_formats.split(HLS_FORMATS_SPLIT)]
691 if episode_id is None:
692 if (
693 metadata := (await self._get_abs_expanded_audiobook(abs_item_id))
694 .media.tracks[0]
695 .metadata
696 ):
697 if metadata.ext.lstrip(".") in extensions:
698 use_hls = True
699 else:
700 podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_item_id)
701 episode = None
702 for episode in podcast.media.episodes:
703 if episode.id_ == episode_id:
704 break
705 if episode and (metadata := episode.audio_track.metadata):
706 if metadata.ext.lstrip(".") in extensions:
707 use_hls = True
708
709 client_name = f"Music Assistant {self.instance_id}"
710 device_info = AbsDeviceInfo(
711 device_id=self.instance_id,
712 client_name=client_name,
713 client_version=self.mass.version,
714 manufacturer="",
715 model=self.mass.server_id,
716 )
717
718 session = await self._client.get_playback_session(
719 # These parameters give an hls if we don't enforce direct play stream,
720 # which is only a concat of the individual file's at abs
721 session_parameters=AbsPlaybackSessionParameters(
722 device_info=device_info,
723 force_direct_play=not use_hls,
724 force_transcode=use_hls,
725 # mimetypes are only checked for abs' internal "should transcode
726 # see https://github.com/advplyr/audiobookshelf/blob/master/server/managers/PlaybackSessionManager.js
727 supported_mime_types=[],
728 media_player=client_name,
729 ),
730 item_id=abs_item_id,
731 episode_id=episode_id,
732 )
733
734 if use_hls:
735 # Safety check.
736 track_url = session.audio_tracks[0].content_url
737 if track_url.split("/")[1] != "hls":
738 raise MediaNotFoundError("Did expect HLS stream for session playback")
739 self.logger.debug("Using an HLS stream for playback.")
740
741 self.sessions[mass_item_id] = SessionHelper(
742 abs_session_id=session.id_,
743 last_sync_time=time.time(),
744 hls_stream_open=asyncio.Event(),
745 )
746 return session
747
748 @handle_refresh_token
749 async def _handle_session_part_request(self, request: web.Request) -> web.Response:
750 """
751 Handle dynamic audiobook part stream request.
752
753 We redirect to the actual stream url with token.
754 This is done because the token might expire, so we need to
755 generate a fresh url on each part.
756 """
757 if not (session_id := request.query.get("session_id")):
758 return web.Response(status=400, text="Missing session_id")
759 if not (part_id := request.query.get("part_id")):
760 return web.Response(status=400, text="Missing part_id")
761 self.logger.debug(
762 "Handling session part request for session %s and part %s", session_id, part_id
763 )
764 try:
765 abs_session = await self._client.get_open_session(session_id=session_id)
766 except AbsSessionNotFoundError as err:
767 raise web.HTTPNotFound from err
768 part_id = int(part_id) # type: ignore[assignment]
769 try:
770 part_track = abs_session.audio_tracks[part_id]
771 except IndexError:
772 return web.Response(status=404, text="Part not found")
773
774 base_url = str(self.config.get_value(CONF_URL))
775 stream_url = f"{base_url}{part_track.content_url}?token={self._client.token}"
776 # redirect to the actual stream url
777 raise web.HTTPFound(location=stream_url)
778
779 @handle_refresh_token
780 async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
781 """Return finished:bool, position_ms: int."""
782 # this method is called _before_ get_stream_details, so the playback session
783 # is created here.
784 session = await self._get_playback_session(mass_item_id=item_id)
785 finished = session.current_time > session.duration - PLAYBACK_REPORT_INTERVAL_SECONDS
786 self.logger.debug("Resume position: obtained.")
787 return finished, int(session.current_time * 1000)
788
789 @handle_refresh_token
790 async def recommendations(self) -> list[RecommendationFolder]:
791 """Get recommendations."""
792 # We have to avoid "flooding" the home page, which becomes especially troublesome if users
793 # have multiple libraries. Instead we collect per ShelfId, and make sure, that we always get
794 # roughly the same amount of items per row, no matter the amount of libraries
795 # List of list (one list per lib) here, such that we can pick the items per lib later.
796 items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]] = {}
797
798 all_libraries = {**self.libraries.audiobooks, **self.libraries.podcasts}
799 max_items_per_row = 20
800 num_libraries = len(all_libraries)
801
802 if num_libraries == 0:
803 self._log_no_libraries()
804 return []
805
806 limit_items_per_lib = max_items_per_row // num_libraries
807 limit_items_per_lib = 1 if limit_items_per_lib == 0 else limit_items_per_lib
808
809 for library_id in all_libraries:
810 shelves = await self._client.get_library_personalized_view(
811 library_id=library_id, limit=limit_items_per_lib
812 )
813 await self._recommendations_iter_shelves(shelves, library_id, items_by_shelf_id)
814
815 folders: list[RecommendationFolder] = []
816 for shelf_id, item_lists in items_by_shelf_id.items():
817 # we have something like [[A, B], [C, D, E], [F]]
818 # and want [A, C, F, B, D, E]
819 recommendation_items = [
820 x
821 for x in itertools.chain.from_iterable(itertools.zip_longest(*item_lists))
822 if x is not None
823 ][:max_items_per_row]
824
825 # shelf ids follow pattern:
826 # recently-added
827 # newest-episodes
828 # etc
829 name = f"{shelf_id.capitalize().replace('-', ' ')}"
830 folders.append(
831 RecommendationFolder(
832 item_id=f"{shelf_id}",
833 name=name,
834 icon=ABS_SHELF_ID_ICONS.get(shelf_id),
835 translation_key=ABS_SHELF_ID_TRANSLATION_KEY.get(shelf_id),
836 items=UniqueList(recommendation_items),
837 provider=self.instance_id,
838 )
839 )
840
841 # Browse "recommendation" for convenience. If the user has
842 # multiple audiobook libraries, we return a listing of them.
843 # If there is only a single audiobook library, we add the folders
844 # from _browse_lib_audiobooks, i.e. Authors, Narrators etc.
845 # Podcast libs do not have filter folders, so always the root folders.
846 browse_items: list[MediaItemType | BrowseFolder] = []
847 translation_key = "libraries"
848 if len(self.libraries.audiobooks) <= 1:
849 if len(self.libraries.podcasts) == 0:
850 translation_key = "library"
851
852 # audiobooklibs are first, and we have at max 1 audiobook lib
853 _browse_root = self._browse_root(append_mediatype_suffix=False)
854 if len(self.libraries.audiobooks) == 0:
855 browse_items.extend(_browse_root)
856 else:
857 assert isinstance(_browse_root[0], BrowseFolder)
858 _path = _browse_root[0].path
859 browse_items.extend(self._browse_lib_audiobooks(current_path=_path))
860 # add podcast roots
861 browse_items.extend(_browse_root[1:])
862 else:
863 browse_items = list(self._browse_root())
864
865 folders.append(
866 RecommendationFolder(
867 item_id="browse",
868 name="Libraries",
869 icon="mdi-bookshelf",
870 translation_key=translation_key,
871 items=UniqueList(browse_items),
872 provider=self.instance_id,
873 )
874 )
875
876 return folders
877
878 async def _recommendations_iter_shelves(
879 self,
880 shelves: list[ShelfBook | ShelfPodcast | ShelfAuthors | ShelfEpisode | ShelfSeries],
881 library_id: str,
882 items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]],
883 ) -> None:
884 # ruff: noqa: PLR0915
885 for shelf in shelves:
886 media_type: MediaType
887 match shelf.type_:
888 case AbsShelfType.PODCAST:
889 media_type = MediaType.PODCAST
890 case AbsShelfType.EPISODE:
891 media_type = MediaType.PODCAST_EPISODE
892 case AbsShelfType.BOOK:
893 media_type = MediaType.AUDIOBOOK
894 case AbsShelfType.SERIES | AbsShelfType.AUTHORS:
895 media_type = MediaType.FOLDER
896 case _:
897 # this would be authors, currently
898 continue
899
900 items: list[MediaItemType | BrowseFolder] = []
901 # Recently added is the _only_ case, where we get a full podcast
902 # We have a podcast object with only the episodes matching the
903 # shelf.id_ otherwise.
904 match shelf.id_:
905 case (
906 AbsShelfId.RECENTLY_ADDED
907 | AbsShelfId.LISTEN_AGAIN
908 | AbsShelfId.DISCOVER
909 | AbsShelfId.NEWEST_EPISODES
910 | AbsShelfId.CONTINUE_LISTENING
911 ):
912 for entity in shelf.entities:
913 assert isinstance(entity, ShelfLibraryItemMinified)
914 item: MediaItemType | None = None
915 if media_type in [MediaType.PODCAST, MediaType.AUDIOBOOK]:
916 item = await self.mass.music.get_library_item_by_prov_id(
917 media_type=media_type,
918 provider_instance_id_or_domain=self.instance_id,
919 item_id=entity.id_,
920 )
921 elif media_type == MediaType.PODCAST_EPISODE:
922 podcast_id = entity.id_
923 if entity.recent_episode is None:
924 continue
925 _add_cover = False
926 if isinstance(entity, ShelfLibraryItemMinifiedPodcast):
927 _add_cover = bool(entity.media.cover_path or False)
928 # we only have a PodcastEpisode here, with limited information
929 item = parse_podcast_episode(
930 episode=entity.recent_episode,
931 prov_podcast_id=podcast_id,
932 instance_id=self.instance_id,
933 domain=self.domain,
934 token=self._client.token,
935 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
936 add_cover=_add_cover,
937 )
938 if item is not None:
939 items.append(item)
940 case AbsShelfId.RECENT_SERIES | AbsShelfId.CONTINUE_SERIES:
941 # We jump into a browse folder here if we have SeriesShelf, set path up as if
942 # browse function used.
943 if isinstance(shelf, ShelfSeries):
944 for entity in shelf.entities:
945 assert isinstance(entity, SeriesShelf)
946 if len(entity.books) == 0:
947 continue
948 path = (
949 f"{self.instance_id}://"
950 f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
951 f"{AbsBrowsePaths.SERIES}/{entity.id_}"
952 )
953 items.append(
954 BrowseFolder(
955 item_id=entity.id_,
956 name=entity.name,
957 provider=self.instance_id,
958 path=path,
959 )
960 )
961 elif isinstance(shelf, ShelfBook) and media_type == MediaType.AUDIOBOOK:
962 # Single books, must be audiobooks
963 for entity in shelf.entities:
964 item = await self.mass.music.get_library_item_by_prov_id(
965 media_type=media_type,
966 provider_instance_id_or_domain=self.instance_id,
967 item_id=entity.id_,
968 )
969 if item is not None:
970 items.append(item)
971 case AbsShelfId.NEWEST_AUTHORS:
972 # same as for series, use a folder
973 for entity in shelf.entities:
974 assert isinstance(entity, AuthorExpanded)
975 if entity.num_books == 0:
976 continue
977 path = (
978 f"{self.instance_id}://"
979 f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
980 f"{AbsBrowsePaths.AUTHORS}/{entity.id_}"
981 )
982 items.append(
983 BrowseFolder(
984 item_id=entity.id_,
985 name=entity.name,
986 provider=self.instance_id,
987 path=path,
988 )
989 )
990 if not items:
991 continue
992
993 # add collected items
994 assert isinstance(shelf.id_, AbsShelfId)
995 items_collected = items_by_shelf_id.get(shelf.id_, [])
996 items_collected.append(items)
997 items_by_shelf_id[shelf.id_] = items_collected
998
999 @handle_refresh_token
1000 async def on_played(
1001 self,
1002 media_type: MediaType,
1003 prov_item_id: str,
1004 fully_played: bool,
1005 position: int,
1006 media_item: MediaItemType,
1007 is_playing: bool = False,
1008 ) -> None:
1009 """Update progress in Audiobookshelf.
1010
1011 In our case media_type may have 3 values:
1012 - PODCAST
1013 - PODCAST_EPISODE
1014 - AUDIOBOOK
1015 We ignore PODCAST (function is called on adding a podcast with position=None)
1016
1017 """
1018
1019 async def _update_by_session(session_helper: SessionHelper, duration: int) -> bool:
1020 now = time.time()
1021 time_listened = now - session_helper.last_sync_time
1022 if time_listened > PLAYBACK_REPORT_INTERVAL_SECONDS + 3:
1023 # See player_queues controller, we get an update every 30s, and immediately on pause
1024 # or play.
1025 # We reset above 33, as this indicates a trigger after a longer absence and should
1026 # not count into abs' statistics
1027 self.logger.debug("Resetting time_listened due to longer absence.")
1028 time_listened = 0.0
1029 try:
1030 await self._client.sync_open_session(
1031 session_id=session_helper.abs_session_id,
1032 parameters=SyncOpenSessionParameters(
1033 current_time=position,
1034 time_listened=time_listened,
1035 duration=duration,
1036 ),
1037 )
1038 session_helper.last_sync_time = now
1039 self.logger.debug("Synced playback session, position %s s.", position)
1040 return True
1041 except AbsSessionNotFoundError:
1042 self.logger.error("Was unable to sync session.")
1043 return False
1044
1045 if media_type == MediaType.PODCAST_EPISODE:
1046 abs_podcast_id, abs_episode_id = prov_item_id.split(" ")
1047
1048 # guard, see progress guard class docstrings for explanation
1049 if not self.progress_guard.guard_ok_mass(
1050 item_id=abs_podcast_id, episode_id=abs_episode_id
1051 ):
1052 return
1053 self.progress_guard.add_progress(item_id=abs_podcast_id, episode_id=abs_episode_id)
1054
1055 if media_item is None or not isinstance(media_item, PodcastEpisode):
1056 return
1057
1058 if fully_played and position < media_item.duration - PLAYBACK_REPORT_INTERVAL_SECONDS:
1059 # faulty position update
1060 # occurs sometimes, if a player disconnects unexpectedly, or reports
1061 # a false position - seen this for MC players, but not for sendspin
1062 return
1063
1064 if position == 0 and not fully_played:
1065 # marked unplayed
1066 mp = await self._client.get_my_media_progress(
1067 item_id=abs_podcast_id, episode_id=abs_episode_id
1068 )
1069 if mp is not None:
1070 await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1071 self.logger.debug(f"Removed media progress of {media_type.value}.")
1072 return
1073
1074 duration = media_item.duration
1075 updated = False
1076 if session_helper := self.sessions.get(prov_item_id):
1077 updated = await _update_by_session(session_helper=session_helper, duration=duration)
1078 if not updated:
1079 self.logger.debug(
1080 f"Updating media progress of {media_type.value}, title {media_item.name}."
1081 )
1082 await self._client.update_my_media_progress(
1083 item_id=abs_podcast_id,
1084 episode_id=abs_episode_id,
1085 duration_seconds=duration,
1086 progress_seconds=position,
1087 is_finished=fully_played,
1088 )
1089
1090 if media_type == MediaType.AUDIOBOOK:
1091 # guard, see progress guard class docstrings for explanation
1092 if not self.progress_guard.guard_ok_mass(item_id=prov_item_id):
1093 return
1094 self.progress_guard.add_progress(item_id=prov_item_id)
1095
1096 if media_item is None or not isinstance(media_item, Audiobook):
1097 return
1098
1099 if fully_played and position < media_item.duration - PLAYBACK_REPORT_INTERVAL_SECONDS:
1100 # faulty position update, see above
1101 return
1102
1103 if position == 0 and not fully_played:
1104 # marked unplayed
1105 mp = await self._client.get_my_media_progress(item_id=prov_item_id)
1106 if mp is not None:
1107 await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1108 self.logger.debug(f"Removed media progress of {media_type.value}.")
1109 return
1110
1111 duration = media_item.duration
1112 updated = False
1113 if session_helper := self.sessions.get(prov_item_id):
1114 updated = await _update_by_session(session_helper=session_helper, duration=duration)
1115 if not updated:
1116 self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress")
1117 await self._client.update_my_media_progress(
1118 item_id=prov_item_id,
1119 duration_seconds=duration,
1120 progress_seconds=position,
1121 is_finished=fully_played,
1122 )
1123
1124 @handle_refresh_token
1125 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
1126 """Browse for audiobookshelf.
1127
1128 Generates this view:
1129 Library_Name_A (Audiobooks)
1130 Audiobooks
1131 Audiobook_1
1132 Audiobook_2
1133 Series
1134 Series_1
1135 Audiobook_1
1136 Audiobook_2
1137 Series_2
1138 Audiobook_3
1139 Audiobook_4
1140 Collections
1141 Collection_1
1142 Audiobook_1
1143 Audiobook_2
1144 Collection_2
1145 Audiobook_3
1146 Audiobook_4
1147 Authors
1148 Author_1
1149 Series_1
1150 Audiobook_1
1151 Audiobook_2
1152 Author_2
1153 Audiobook_3
1154 Library_Name_B (Podcasts)
1155 Podcast_1
1156 Podcast_2
1157 """
1158 item_path = path.split("://", 1)[1]
1159 if not item_path:
1160 return self._browse_root()
1161 sub_path = item_path.split("/")
1162 lib_key, lib_id = sub_path[0].split(" ")
1163 if len(sub_path) == 1:
1164 if lib_key == AbsBrowsePaths.LIBRARIES_PODCAST:
1165 return await self._browse_lib_podcasts(library_id=lib_id)
1166 return self._browse_lib_audiobooks(current_path=path)
1167 if len(sub_path) == 2:
1168 item_key = sub_path[1]
1169 match item_key:
1170 case AbsBrowsePaths.AUTHORS:
1171 return await self._browse_authors(current_path=path, library_id=lib_id)
1172 case AbsBrowsePaths.NARRATORS:
1173 return await self._browse_narrators(current_path=path, library_id=lib_id)
1174 case AbsBrowsePaths.SERIES:
1175 return await self._browse_series(current_path=path, library_id=lib_id)
1176 case AbsBrowsePaths.COLLECTIONS:
1177 return await self._browse_collections(current_path=path, library_id=lib_id)
1178 case AbsBrowsePaths.AUDIOBOOKS:
1179 return await self._browse_books(library_id=lib_id)
1180 elif len(sub_path) == 3:
1181 item_key, item_id = sub_path[1:3]
1182 match item_key:
1183 case AbsBrowsePaths.AUTHORS:
1184 return await self._browse_author_books(current_path=path, author_id=item_id)
1185 case AbsBrowsePaths.NARRATORS:
1186 return await self._browse_narrator_books(
1187 library_id=lib_id, narrator_filter_str=item_id
1188 )
1189 case AbsBrowsePaths.SERIES:
1190 return await self._browse_series_books(series_id=item_id)
1191 case AbsBrowsePaths.COLLECTIONS:
1192 return await self._browse_collection_books(collection_id=item_id)
1193 elif len(sub_path) == 4:
1194 # series within author
1195 series_id = sub_path[3]
1196 return await self._browse_series_books(series_id=series_id)
1197 return []
1198
1199 def _browse_root(self, append_mediatype_suffix: bool = True) -> Sequence[BrowseFolder]:
1200 items = []
1201
1202 def _get_folder(
1203 path: str, lib_id: str, lib_name: str, translation_key: str | None = None
1204 ) -> BrowseFolder:
1205 return BrowseFolder(
1206 item_id=lib_id,
1207 name=lib_name,
1208 translation_key=translation_key, # if given, <name>: <translation> in frontend
1209 provider=self.instance_id,
1210 path=f"{self.instance_id}://{path}",
1211 )
1212
1213 if len(self.libraries.audiobooks) == 0 and len(self.libraries.podcasts) == 0:
1214 self._log_no_libraries()
1215 return []
1216
1217 translation_key: str | None
1218 for lib_id, lib in self.libraries.audiobooks.items():
1219 path = f"{AbsBrowsePaths.LIBRARIES_BOOK} {lib_id}"
1220 translation_key = None
1221 if append_mediatype_suffix:
1222 translation_key = AbsBrowseItemsBookTranslationKey.AUDIOBOOKS
1223 items.append(
1224 _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1225 )
1226 for lib_id, lib in self.libraries.podcasts.items():
1227 path = f"{AbsBrowsePaths.LIBRARIES_PODCAST} {lib_id}"
1228 translation_key = None
1229 if append_mediatype_suffix:
1230 translation_key = AbsBrowseItemsPodcastTranslationKey.PODCASTS
1231 items.append(
1232 _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1233 )
1234 return items
1235
1236 async def _browse_lib_podcasts(self, library_id: str) -> list[MediaItemType]:
1237 """No sub categories for podcasts."""
1238 if len(self.libraries.podcasts[library_id].item_ids) == 0:
1239 self._log_no_helper_item_ids()
1240 items = []
1241 for podcast_id in self.libraries.podcasts[library_id].item_ids:
1242 mass_item = await self.mass.music.get_library_item_by_prov_id(
1243 media_type=MediaType.PODCAST,
1244 item_id=podcast_id,
1245 provider_instance_id_or_domain=self.instance_id,
1246 )
1247 if mass_item is not None:
1248 items.append(mass_item)
1249 return sorted(items, key=lambda x: x.name)
1250
1251 def _browse_lib_audiobooks(self, current_path: str) -> Sequence[BrowseFolder]:
1252 items = []
1253 for translation_key in AbsBrowseItemsBookTranslationKey:
1254 path = current_path + "/" + ABS_BROWSE_ITEMS_TO_PATH[translation_key]
1255 items.append(
1256 BrowseFolder(
1257 item_id=translation_key.lower(),
1258 name="", # use translation key
1259 translation_key=translation_key,
1260 provider=self.instance_id,
1261 path=path,
1262 )
1263 )
1264 return items
1265
1266 async def _browse_authors(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1267 abs_authors = await self._client.get_library_authors(library_id=library_id)
1268 items = []
1269 for author in abs_authors:
1270 path = f"{current_path}/{author.id_}"
1271 items.append(
1272 BrowseFolder(
1273 item_id=author.id_,
1274 name=author.name,
1275 provider=self.instance_id,
1276 path=path,
1277 )
1278 )
1279
1280 return sorted(items, key=lambda x: x.name)
1281
1282 async def _browse_narrators(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1283 abs_narrators = await self._client.get_library_narrators(library_id=library_id)
1284 items = []
1285 for narrator in abs_narrators:
1286 path = f"{current_path}/{narrator.id_}"
1287 items.append(
1288 BrowseFolder(
1289 item_id=narrator.id_,
1290 name=narrator.name,
1291 provider=self.instance_id,
1292 path=path,
1293 )
1294 )
1295
1296 return sorted(items, key=lambda x: x.name)
1297
1298 async def _browse_series(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1299 items = []
1300 async for response in self._client.get_library_series(library_id=library_id):
1301 if not response.results:
1302 break
1303 for abs_series in response.results:
1304 path = f"{current_path}/{abs_series.id_}"
1305 items.append(
1306 BrowseFolder(
1307 item_id=abs_series.id_,
1308 name=abs_series.name,
1309 provider=self.instance_id,
1310 path=path,
1311 )
1312 )
1313
1314 return sorted(items, key=lambda x: x.name)
1315
1316 async def _browse_collections(
1317 self, current_path: str, library_id: str
1318 ) -> Sequence[BrowseFolder]:
1319 items = []
1320 async for response in self._client.get_library_collections(library_id=library_id):
1321 if not response.results:
1322 break
1323 for abs_collection in response.results:
1324 path = f"{current_path}/{abs_collection.id_}"
1325 items.append(
1326 BrowseFolder(
1327 item_id=abs_collection.id_,
1328 name=abs_collection.name,
1329 provider=self.instance_id,
1330 path=path,
1331 )
1332 )
1333 return sorted(items, key=lambda x: x.name)
1334
1335 async def _browse_books(self, library_id: str) -> Sequence[MediaItemType]:
1336 if len(self.libraries.audiobooks[library_id].item_ids) == 0:
1337 self._log_no_helper_item_ids()
1338 items = []
1339 for book_id in self.libraries.audiobooks[library_id].item_ids:
1340 mass_item = await self.mass.music.get_library_item_by_prov_id(
1341 media_type=MediaType.AUDIOBOOK,
1342 item_id=book_id,
1343 provider_instance_id_or_domain=self.instance_id,
1344 )
1345 if mass_item is not None:
1346 items.append(mass_item)
1347 return sorted(items, key=lambda x: x.name)
1348
1349 async def _browse_author_books(
1350 self, current_path: str, author_id: str
1351 ) -> Sequence[MediaItemType | BrowseFolder]:
1352 items: list[MediaItemType | BrowseFolder] = []
1353
1354 abs_author = await self._client.get_author(
1355 author_id=author_id, include_items=True, include_series=True
1356 )
1357 if not isinstance(abs_author, AbsAuthorWithItemsAndSeries):
1358 raise TypeError("Unexpected type of author.")
1359
1360 book_ids = {x.id_ for x in abs_author.library_items}
1361 series_book_ids = set()
1362
1363 for series in abs_author.series:
1364 series_book_ids.update([x.id_ for x in series.items])
1365 path = f"{current_path}/{series.id_}"
1366 items.append(
1367 BrowseFolder(
1368 item_id=series.id_,
1369 # frontend does <name>: <translation>
1370 name=series.name,
1371 translation_key="series_singular",
1372 provider=self.instance_id,
1373 path=path,
1374 )
1375 )
1376 book_ids = book_ids.difference(series_book_ids)
1377 for book_id in book_ids:
1378 mass_item = await self.mass.music.get_library_item_by_prov_id(
1379 media_type=MediaType.AUDIOBOOK,
1380 item_id=book_id,
1381 provider_instance_id_or_domain=self.instance_id,
1382 )
1383 if mass_item is not None:
1384 items.append(mass_item)
1385
1386 return items
1387
1388 async def _browse_narrator_books(
1389 self, library_id: str, narrator_filter_str: str
1390 ) -> Sequence[MediaItemType]:
1391 items: list[MediaItemType] = []
1392 async for response in self._client.get_library_items(
1393 library_id=library_id, filter_str=f"narrators.{narrator_filter_str}"
1394 ):
1395 if not response.results:
1396 break
1397 for item in response.results:
1398 mass_item = await self.mass.music.get_library_item_by_prov_id(
1399 media_type=MediaType.AUDIOBOOK,
1400 item_id=item.id_,
1401 provider_instance_id_or_domain=self.instance_id,
1402 )
1403 if mass_item is not None:
1404 items.append(mass_item)
1405
1406 return sorted(items, key=lambda x: x.name)
1407
1408 async def _browse_series_books(self, series_id: str) -> Sequence[MediaItemType]:
1409 items = []
1410
1411 abs_series = await self._client.get_series(series_id=series_id, include_progress=True)
1412 if not isinstance(abs_series, AbsSeriesWithProgress):
1413 raise TypeError("Unexpected series type.")
1414
1415 for book_id in abs_series.progress.library_item_ids:
1416 # these are sorted in abs by sequence
1417 mass_item = await self.mass.music.get_library_item_by_prov_id(
1418 media_type=MediaType.AUDIOBOOK,
1419 item_id=book_id,
1420 provider_instance_id_or_domain=self.instance_id,
1421 )
1422 if mass_item is not None:
1423 items.append(mass_item)
1424
1425 return items
1426
1427 async def _browse_collection_books(self, collection_id: str) -> Sequence[MediaItemType]:
1428 items = []
1429 abs_collection = await self._client.get_collection(collection_id=collection_id)
1430 for book in abs_collection.books:
1431 mass_item = await self.mass.music.get_library_item_by_prov_id(
1432 media_type=MediaType.AUDIOBOOK,
1433 item_id=book.id_,
1434 provider_instance_id_or_domain=self.instance_id,
1435 )
1436 if mass_item is not None:
1437 items.append(mass_item)
1438 return items
1439
1440 async def _socket_abs_item_changed(
1441 self, items: LibraryItemExpanded | list[LibraryItemExpanded]
1442 ) -> None:
1443 """For added and updated."""
1444 abs_items = [items] if isinstance(items, LibraryItemExpanded) else items
1445 for abs_item in abs_items:
1446 if isinstance(abs_item, LibraryItemExpandedBook):
1447 # If the book has no audiofiles, we skip -> ebook only.
1448 if len(abs_item.media.tracks) == 0:
1449 continue
1450 self.logger.debug(
1451 'Updated book "%s" via socket.', abs_item.media.metadata.title or ""
1452 )
1453 await self.mass.music.audiobooks.add_item_to_library(
1454 parse_audiobook(
1455 abs_audiobook=abs_item,
1456 instance_id=self.instance_id,
1457 domain=self.domain,
1458 token=self._client.token,
1459 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1460 ),
1461 overwrite_existing=True,
1462 )
1463 lib = self.libraries.audiobooks.get(abs_item.library_id, None)
1464 if lib is not None:
1465 lib.item_ids.add(abs_item.id_)
1466 elif isinstance(abs_item, LibraryItemExpandedPodcast):
1467 self.logger.debug(
1468 'Updated podcast "%s" via socket.', abs_item.media.metadata.title or ""
1469 )
1470 mass_podcast = parse_podcast(
1471 abs_podcast=abs_item,
1472 instance_id=self.instance_id,
1473 domain=self.domain,
1474 token=self._client.token,
1475 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1476 )
1477 if not (
1478 bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
1479 and mass_podcast.total_episodes == 0
1480 ):
1481 await self.mass.music.podcasts.add_item_to_library(
1482 mass_podcast,
1483 overwrite_existing=True,
1484 )
1485 lib = self.libraries.podcasts.get(abs_item.library_id, None)
1486 if lib is not None:
1487 lib.item_ids.add(abs_item.id_)
1488 await self._cache_set_helper_libraries()
1489
1490 async def _socket_abs_item_removed(self, item: LibraryItemRemoved) -> None:
1491 """Item removed."""
1492 media_type: MediaType | None = None
1493 for lib in self.libraries.audiobooks.values():
1494 if item.id_ in lib.item_ids:
1495 media_type = MediaType.AUDIOBOOK
1496 lib.item_ids.remove(item.id_)
1497 break
1498 for lib in self.libraries.podcasts.values():
1499 if item.id_ in lib.item_ids:
1500 media_type = MediaType.PODCAST
1501 lib.item_ids.remove(item.id_)
1502 break
1503
1504 if media_type is not None:
1505 mass_item = await self.mass.music.get_library_item_by_prov_id(
1506 media_type=media_type,
1507 item_id=item.id_,
1508 provider_instance_id_or_domain=self.instance_id,
1509 )
1510 if mass_item is not None:
1511 await self.mass.music.remove_item_from_library(
1512 media_type=media_type, library_item_id=mass_item.item_id
1513 )
1514 self.logger.debug('Removed %s "%s" via socket.', media_type.value, mass_item.name)
1515
1516 await self._cache_set_helper_libraries()
1517
1518 async def _socket_abs_user_item_progress_updated(
1519 self, id_: str, progress: MediaProgress
1520 ) -> None:
1521 """To update continue listening.
1522
1523 ABS reports every 15s and immediately on play state change.
1524 This callback is called per item if a progress is changed:
1525 - a change in position
1526 - the item is finished
1527 But it is _not_called, if a progress is reset/ discarded.
1528 """
1529 # guard, see progress guard class docstrings for explanation
1530 if not self.progress_guard.guard_ok_abs(abs_progress=progress):
1531 return
1532
1533 known_ids = self._get_all_known_item_ids()
1534 if progress.library_item_id not in known_ids:
1535 return
1536
1537 self.logger.debug(f"Updated progress of item {progress.library_item_id} via socket.")
1538
1539 if progress.episode_id is None:
1540 await self._update_playlog_book(progress)
1541 return
1542 await self._update_playlog_episode(progress)
1543
1544 async def _socket_abs_refresh_token_expired(self) -> None:
1545 await self.reauthenticate()
1546
1547 async def _socket_stream_open(self, stream: AbsStream) -> None:
1548 # stream's id is the same as the playback session id
1549 for session_helper in self.sessions.values():
1550 if session_helper.abs_session_id == stream.id_:
1551 session_helper.hls_stream_open.set()
1552 break
1553
1554 async def reauthenticate(self) -> None:
1555 """Reauthorize the abs session config if refresh token expired."""
1556 # some safe guarding should that function be called simultaneously
1557 if self.reauthenticate_lock.locked() or time.time() - self.reauthenticate_last < 5:
1558 while True:
1559 if not self.reauthenticate_lock.locked():
1560 return
1561 await asyncio.sleep(0.5)
1562 async with self.reauthenticate_lock:
1563 await self._client.session_config.authenticate(
1564 username=str(self.config.get_value(CONF_USERNAME)),
1565 password=str(self.config.get_value(CONF_PASSWORD)),
1566 )
1567 self.reauthenticate_last = time.time()
1568
1569 def _get_all_known_item_ids(self) -> set[str]:
1570 known_ids = set()
1571 for lib in self.libraries.podcasts.values():
1572 known_ids.update(lib.item_ids)
1573 for lib in self.libraries.audiobooks.values():
1574 known_ids.update(lib.item_ids)
1575
1576 return known_ids
1577
1578 async def _set_playlog_from_user(self, user: User) -> None:
1579 """Update on user callback.
1580
1581 User holds also all media progresses specific to that user.
1582
1583 The function 'guard_ok_abs' uses the timestamp of the last update in abs, thus after an
1584 initial progress update, an unchanged update will not trigger a (useless) playlog update.
1585
1586 We do not sync removed progresses for the sake of simplicity.
1587 """
1588 await self._set_playlog_from_user_sync(user.media_progress)
1589
1590 async def _set_playlog_from_user_sync(self, progresses: list[MediaProgress]) -> None:
1591 # for debugging
1592 __updated_items = 0
1593
1594 known_ids = self._get_all_known_item_ids()
1595 abs_ids_with_progress = set()
1596
1597 for progress in progresses:
1598 # save progress ids for later
1599 ma_item_id = (
1600 progress.library_item_id
1601 if progress.episode_id is None
1602 else f"{progress.library_item_id} {progress.episode_id}"
1603 )
1604 abs_ids_with_progress.add(ma_item_id)
1605
1606 # Guard. Also makes sure, that we don't write to db again if no state change happened.
1607 # This is achieved by adding a Helper Progress in the update playlog functions, which
1608 # then has the most recent timestamp. If a subsequent progress sent by abs has an older
1609 # timestamp, we do not update again.
1610 if not self.progress_guard.guard_ok_abs(progress):
1611 continue
1612 if progress.current_time is not None:
1613 if (
1614 int(progress.current_time) != 0
1615 and not progress.current_time >= PLAYBACK_REPORT_INTERVAL_SECONDS
1616 ):
1617 # same as mass default, only > 30s
1618 continue
1619 if progress.library_item_id not in known_ids:
1620 continue
1621 __updated_items += 1
1622 if progress.episode_id is None:
1623 await self._update_playlog_book(progress)
1624 else:
1625 await self._update_playlog_episode(progress)
1626 self.logger.debug(f"Updated {__updated_items} from full playlog.")
1627
1628 # Get MA's known progresses of ABS.
1629 # In ABS the user may discard a progress, which removes the progress completely.
1630 # There is no socket notification for this event.
1631 ma_playlog_state = await self.mass.music.get_playlog_provider_item_ids(
1632 provider_instance_id=self.instance_id
1633 )
1634 ma_ids_with_progress = {x for _, x in ma_playlog_state}
1635 discarded_progress_ids = ma_ids_with_progress.difference(abs_ids_with_progress)
1636 for discarded_progress_id in discarded_progress_ids:
1637 if len(discarded_progress_id.split(" ")) == 1:
1638 if discarded_item := await self.mass.music.get_library_item_by_prov_id(
1639 media_type=MediaType.AUDIOBOOK,
1640 item_id=discarded_progress_id,
1641 provider_instance_id_or_domain=self.instance_id,
1642 ):
1643 self.progress_guard.add_progress(discarded_progress_id)
1644 await self.mass.music.mark_item_unplayed(discarded_item)
1645 else:
1646 with suppress(MediaNotFoundError):
1647 discarded_item = await self.get_podcast_episode(
1648 prov_episode_id=discarded_progress_id, add_progress=False
1649 )
1650 self.progress_guard.add_progress(*discarded_progress_id.split(" "))
1651 await self.mass.music.mark_item_unplayed(discarded_item)
1652 self.logger.debug("Discarded item %s ", discarded_progress_id)
1653
1654 async def _update_playlog_book(self, progress: MediaProgress) -> None:
1655 # helper progress also ensures no useless progress updates,
1656 # see comment above
1657 self.progress_guard.add_progress(progress.library_item_id)
1658 if progress.current_time is None:
1659 return
1660 mass_audiobook = await self.mass.music.get_library_item_by_prov_id(
1661 media_type=MediaType.AUDIOBOOK,
1662 item_id=progress.library_item_id,
1663 provider_instance_id_or_domain=self.instance_id,
1664 )
1665 if mass_audiobook is None:
1666 return
1667 if int(progress.current_time) == 0 and not progress.is_finished:
1668 await self.mass.music.mark_item_unplayed(mass_audiobook)
1669 else:
1670 await self.mass.music.mark_item_played(
1671 mass_audiobook,
1672 fully_played=progress.is_finished,
1673 seconds_played=int(progress.current_time),
1674 )
1675
1676 async def _update_playlog_episode(self, progress: MediaProgress) -> None:
1677 # helper progress also ensures no useless progress updates,
1678 # see comment above
1679 self.progress_guard.add_progress(progress.library_item_id, progress.episode_id)
1680 if progress.current_time is None:
1681 return
1682 _episode_id = f"{progress.library_item_id} {progress.episode_id}"
1683 try:
1684 # need to obtain full podcast, and then search for episode
1685 mass_episode = await self.get_podcast_episode(_episode_id, add_progress=False)
1686 except MediaNotFoundError:
1687 return
1688 if int(progress.current_time) == 0 and not progress.is_finished:
1689 await self.mass.music.mark_item_unplayed(mass_episode)
1690 else:
1691 await self.mass.music.mark_item_played(
1692 mass_episode,
1693 fully_played=progress.is_finished,
1694 seconds_played=int(progress.current_time),
1695 )
1696
1697 async def _cache_set_helper_libraries(self) -> None:
1698 await self.mass.cache.set(
1699 key=CACHE_KEY_LIBRARIES,
1700 provider=self.instance_id,
1701 category=CACHE_CATEGORY_LIBRARIES,
1702 data=self.libraries.to_dict(),
1703 )
1704
1705 def _log_no_libraries(self) -> None:
1706 self.logger.error("There are no libraries visible to the Audiobookshelf provider.")
1707
1708 def _log_no_helper_item_ids(self) -> None:
1709 self.logger.warning(
1710 "Cached item ids are missing. "
1711 "Please trigger a full resync of the Audiobookshelf provider manually."
1712 )
1713