/
/
/
1"""Audiobookshelf (abs) provider for Music Assistant."""
2
3from __future__ import annotations
4
5import asyncio
6import functools
7import itertools
8import time
9from collections.abc import AsyncGenerator, Callable, Coroutine, Sequence
10from contextlib import suppress
11from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
12
13import aioaudiobookshelf as aioabs
14from aioaudiobookshelf.client.items import LibraryItemExpandedBook as AbsLibraryItemExpandedBook
15from aioaudiobookshelf.client.items import (
16 LibraryItemExpandedPodcast as AbsLibraryItemExpandedPodcast,
17)
18from aioaudiobookshelf.client.items import PlaybackSessionExpanded as AbsPlaybackSessionExpanded
19from aioaudiobookshelf.client.items import PlaybackSessionParameters as AbsPlaybackSessionParameters
20from aioaudiobookshelf.client.session import SyncOpenSessionParameters
21from aioaudiobookshelf.exceptions import LoginError as AbsLoginError
22from aioaudiobookshelf.exceptions import RefreshTokenExpiredError
23from aioaudiobookshelf.exceptions import SessionNotFoundError as AbsSessionNotFoundError
24from aioaudiobookshelf.schema.author import AuthorExpanded
25from aioaudiobookshelf.schema.calls_authors import (
26 AuthorWithItemsAndSeries as AbsAuthorWithItemsAndSeries,
27)
28from aioaudiobookshelf.schema.calls_series import SeriesWithProgress as AbsSeriesWithProgress
29from aioaudiobookshelf.schema.library import (
30 LibraryItemExpanded,
31 LibraryItemExpandedBook,
32 LibraryItemExpandedPodcast,
33 LibraryItemMinifiedPodcast,
34)
35from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType
36from aioaudiobookshelf.schema.session import DeviceInfo as AbsDeviceInfo
37from aioaudiobookshelf.schema.shelf import (
38 SeriesShelf,
39 ShelfAuthors,
40 ShelfBook,
41 ShelfEpisode,
42 ShelfLibraryItemMinified,
43 ShelfPodcast,
44 ShelfSeries,
45)
46from aioaudiobookshelf.schema.shelf import ShelfId as AbsShelfId
47from aioaudiobookshelf.schema.shelf import ShelfType as AbsShelfType
48from aiohttp import web
49from music_assistant_models.config_entries import (
50 ConfigEntry,
51 ConfigValueType,
52 ProviderConfig,
53)
54from music_assistant_models.enums import (
55 ConfigEntryType,
56 ContentType,
57 MediaType,
58 ProviderFeature,
59 StreamType,
60)
61from music_assistant_models.errors import LoginFailed, MediaNotFoundError
62from music_assistant_models.media_items import (
63 Audiobook,
64 AudioFormat,
65 BrowseFolder,
66 ItemMapping,
67 MediaItemType,
68 PodcastEpisode,
69 UniqueList,
70)
71from music_assistant_models.media_items.media_item import RecommendationFolder
72from music_assistant_models.streamdetails import MultiPartPath, StreamDetails
73
74from music_assistant.constants import PLAYBACK_REPORT_INTERVAL_SECONDS
75from music_assistant.models.music_provider import MusicProvider
76from music_assistant.providers.audiobookshelf.parsers import (
77 parse_audiobook,
78 parse_podcast,
79 parse_podcast_episode,
80)
81
82from .constants import (
83 ABS_BROWSE_ITEMS_TO_PATH,
84 ABS_SHELF_ID_ICONS,
85 ABS_SHELF_ID_TRANSLATION_KEY,
86 AIOHTTP_TIMEOUT,
87 CACHE_CATEGORY_LIBRARIES,
88 CACHE_KEY_LIBRARIES,
89 CONF_API_TOKEN,
90 CONF_HIDE_EMPTY_PODCASTS,
91 CONF_HLS_FORMATS,
92 CONF_OLD_TOKEN,
93 CONF_PASSWORD,
94 CONF_URL,
95 CONF_USE_HLS,
96 CONF_USERNAME,
97 CONF_VERIFY_SSL,
98 HLS_ALL_FORMATS,
99 HLS_FORMATS_SPLIT,
100 AbsBrowseItemsBookTranslationKey,
101 AbsBrowseItemsPodcastTranslationKey,
102 AbsBrowsePaths,
103)
104from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard, SessionHelper
105
106if TYPE_CHECKING:
107 from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved
108 from aioaudiobookshelf.schema.media_progress import MediaProgress
109 from aioaudiobookshelf.schema.streams import Stream as AbsStream
110 from aioaudiobookshelf.schema.user import User
111 from music_assistant_models.media_items import Podcast
112 from music_assistant_models.provider import ProviderManifest
113
114 from music_assistant.mass import MusicAssistant
115 from music_assistant.models import ProviderInstanceType
116
117SUPPORTED_FEATURES = {
118 ProviderFeature.LIBRARY_PODCASTS,
119 ProviderFeature.LIBRARY_AUDIOBOOKS,
120 ProviderFeature.BROWSE,
121 ProviderFeature.RECOMMENDATIONS,
122}
123
124
125async def setup(
126 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
127) -> ProviderInstanceType:
128 """Initialize provider(instance) with given configuration."""
129 return Audiobookshelf(mass, manifest, config, SUPPORTED_FEATURES)
130
131
132async def get_config_entries(
133 mass: MusicAssistant,
134 instance_id: str | None = None,
135 action: str | None = None,
136 values: dict[str, ConfigValueType] | None = None,
137) -> tuple[ConfigEntry, ...]:
138 """
139 Return Config entries to setup this provider.
140
141 instance_id: id of an existing provider instance (None if new instance setup).
142 action: [optional] action key called from config entries UI.
143 values: the (intermediate) raw values for config entries sent with the action.
144 """
145 # ruff: noqa: ARG001
146 return (
147 ConfigEntry(
148 key="label",
149 type=ConfigEntryType.LABEL,
150 label="Please provide the address of your Audiobookshelf instance. To authenticate "
151 "you have two options: "
152 "a) Provide username AND password. Leave the API key empty. "
153 "b) Provide ONLY an API key.",
154 ),
155 ConfigEntry(
156 key=CONF_URL,
157 type=ConfigEntryType.STRING,
158 label="Server",
159 required=True,
160 description="The URL of the Audiobookshelf server to connect to. For example "
161 "https://abs.domain.tld/ or http://192.168.1.4:13378/",
162 ),
163 ConfigEntry(
164 key=CONF_USERNAME,
165 type=ConfigEntryType.STRING,
166 label="Username",
167 required=False,
168 description="The username to authenticate to the remote server.",
169 ),
170 ConfigEntry(
171 key=CONF_PASSWORD,
172 type=ConfigEntryType.SECURE_STRING,
173 label="Password",
174 required=False,
175 description="The password to authenticate to the remote server.",
176 ),
177 ConfigEntry(
178 key=CONF_API_TOKEN,
179 type=ConfigEntryType.SECURE_STRING,
180 label="API key _instead_ of user/ password. (ABS version >= 2.26)",
181 required=False,
182 description="Instead of using a username and password, "
183 "you may provide an API key (ABS version >= 2.26). "
184 "Please consult the docs.",
185 ),
186 ConfigEntry(
187 key=CONF_OLD_TOKEN,
188 type=ConfigEntryType.SECURE_STRING,
189 label="old token",
190 required=False,
191 hidden=True,
192 ),
193 ConfigEntry(
194 key=CONF_USE_HLS,
195 type=ConfigEntryType.BOOLEAN,
196 label="Stream via HLS from ABS.",
197 description="Use an HLS stream when streaming from audiobookshelf.",
198 required=False,
199 default_value=False,
200 advanced=True,
201 ),
202 ConfigEntry(
203 key=CONF_HLS_FORMATS,
204 type=ConfigEntryType.STRING,
205 label=f"Use HLS for these file extensions. Separate with ';'. Use {HLS_ALL_FORMATS} for"
206 " all formats.",
207 description="Use HLS only for these file extensions."
208 f" Separate with ;. E.g. m4b or m4b;aac or {HLS_ALL_FORMATS}",
209 required=False,
210 default_value="m4b",
211 advanced=True,
212 ),
213 ConfigEntry(
214 key=CONF_VERIFY_SSL,
215 type=ConfigEntryType.BOOLEAN,
216 label="Verify SSL",
217 required=False,
218 description="Whether or not to verify the certificate of SSL/TLS connections.",
219 advanced=True,
220 default_value=True,
221 ),
222 ConfigEntry(
223 key=CONF_HIDE_EMPTY_PODCASTS,
224 type=ConfigEntryType.BOOLEAN,
225 label="Hide empty podcasts.",
226 required=False,
227 description="This will skip podcasts with no episodes associated.",
228 advanced=True,
229 default_value=False,
230 ),
231 )
232
233
234R = TypeVar("R")
235P = ParamSpec("P")
236
237
238class Audiobookshelf(MusicProvider):
239 """Audiobookshelf MusicProvider."""
240
241 _on_unload_callbacks: list[Callable[[], None]]
242
243 @staticmethod
244 def handle_refresh_token(
245 method: Callable[P, Coroutine[Any, Any, R]],
246 ) -> Callable[P, Coroutine[Any, Any, R]]:
247 """Decorate a method to handle an expired refresh token by relogin."""
248
249 @functools.wraps(method)
250 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
251 self = cast("Audiobookshelf", args[0])
252 try:
253 return await method(*args, **kwargs)
254 except RefreshTokenExpiredError:
255 self.logger.debug("Refresh token expired. Trying to renew.")
256 await self.reauthenticate()
257 return await method(*args, **kwargs)
258
259 return wrapper
260
261 async def handle_async_init(self) -> None:
262 """Pass config values to client and initialize."""
263 self._on_unload_callbacks: list[Callable[[], None]] = []
264 self.sessions: dict[str, SessionHelper] = {} # key is the mass_item_id
265 self.create_session_lock = asyncio.Lock()
266 base_url = str(self.config.get_value(CONF_URL))
267 username = str(self.config.get_value(CONF_USERNAME))
268 password = str(self.config.get_value(CONF_PASSWORD))
269 token_old = self.config.get_value(CONF_OLD_TOKEN)
270 token_api = self.config.get_value(CONF_API_TOKEN)
271 verify_ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
272 session_config = aioabs.SessionConfiguration(
273 session=self.mass.http_session,
274 url=base_url,
275 verify_ssl=verify_ssl,
276 logger=self.logger,
277 pagination_items_per_page=30, # audible provider goes with 50 for pagination
278 timeout=AIOHTTP_TIMEOUT,
279 )
280 # If we are configured with a non-expiring API key or not.
281 self.is_token_user = False
282 try:
283 if token_api is not None or token_old is not None:
284 _token = token_api if token_api is not None else token_old
285 session_config.token = str(_token)
286 (
287 self._client,
288 self._client_socket,
289 ) = await aioabs.get_user_and_socket_client_by_token(session_config=session_config)
290 self.is_token_user = True
291 else:
292 self._client, self._client_socket = await aioabs.get_user_and_socket_client(
293 session_config=session_config, username=username, password=password
294 )
295 await self._client_socket.init_client()
296 except AbsLoginError as exc:
297 raise LoginFailed(f"Login to abs instance at {base_url} failed.") from exc
298
299 if token_old is not None and token_api is None:
300 # Log Message that the old token won't work
301 _version = self._client.server_settings.version.split(".")
302 if len(_version) >= 2:
303 try:
304 major, minor = int(_version[0]), int(_version[1])
305 except ValueError:
306 major = minor = 0
307 if major >= 2 and minor >= 26:
308 self.logger.warning(
309 """
310
311######## Audiobookshelf API key change #############################################################
312
313Audiobookshelf introduced a new API key system in version 2.26 (JWT).
314You are still using a token configured with a previous version of Audiobookshelf,
315but you are running version %s. This will stop working in a future Audiobookshelf release.
316Please create a non-expiring API Key instead, and update your configuration accordingly.
317Refer to the documentation of Audiobookshelf, https://www.audiobookshelf.org/guides/api-keys/
318and of Music Assistant https://www.music-assistant.io/music-providers/audiobookshelf/
319for more details.
320
321""",
322 self._client.server_settings.version,
323 )
324
325 cached_libraries = await self.mass.cache.get(
326 key=CACHE_KEY_LIBRARIES,
327 provider=self.instance_id,
328 category=CACHE_CATEGORY_LIBRARIES,
329 default=None,
330 )
331 if cached_libraries is None:
332 self.libraries = LibrariesHelper()
333 # We need the library ids for recommendations. If the cache got cleared e.g. by a db
334 # migration, we might end up with empty library helpers on a configured provider. Note,
335 # that the lib item ids are not synced, still only on full provider sync, instead the
336 # sets are empty. Full sync is expensive.
337 # See warning in browse_lib_podcasts / _browse_books
338 libraries = await self._client.get_all_libraries()
339 for library in libraries:
340 if library.media_type == AbsLibraryMediaType.BOOK:
341 self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
342 elif library.media_type == AbsLibraryMediaType.PODCAST:
343 self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
344 else:
345 self.libraries = LibrariesHelper.from_dict(cached_libraries)
346
347 # set socket callbacks
348 self._client_socket.set_item_callbacks(
349 on_item_added=self._socket_abs_item_changed,
350 on_item_updated=self._socket_abs_item_changed,
351 on_item_removed=self._socket_abs_item_removed,
352 on_items_added=self._socket_abs_item_changed,
353 on_items_updated=self._socket_abs_item_changed,
354 )
355
356 self._client_socket.set_user_callbacks(
357 on_user_item_progress_updated=self._socket_abs_user_item_progress_updated,
358 )
359
360 self._client_socket.set_refresh_token_expired_callback(
361 on_refresh_token_expired=self._socket_abs_refresh_token_expired
362 )
363
364 self._client_socket.set_stream_callbacks(on_stream_open=self._socket_stream_open)
365
366 # progress guard
367 self.progress_guard = ProgressGuard()
368
369 # safe guard reauthentication
370 self.reauthenticate_lock = asyncio.Lock()
371 self.reauthenticate_last = 0.0
372
373 # register dynamic stream route for audiobook parts
374 self._on_unload_callbacks.append(
375 self.mass.streams.register_dynamic_route(
376 f"/{self.instance_id}_part_stream", self._handle_session_part_request
377 )
378 )
379
380 @handle_refresh_token
381 async def unload(self, is_removed: bool = False) -> None:
382 """
383 Handle unload/close of the provider.
384
385 Called when provider is deregistered (e.g. MA exiting or config reloading).
386 is_removed will be set to True when the provider is removed from the configuration.
387 """
388 await self._client.logout()
389 await self._client_socket.logout()
390 for callback in self._on_unload_callbacks:
391 callback()
392
393 @property
394 def is_streaming_provider(self) -> bool:
395 """Return True if the provider is a streaming provider."""
396 # For streaming providers return True here but for local file based providers return False.
397 return False
398
399 @handle_refresh_token
400 async def sync_library(self, media_type: MediaType) -> None:
401 """Obtain audiobook library ids and podcast library ids."""
402 libraries = await self._client.get_all_libraries()
403 if len(libraries) == 0:
404 self._log_no_libraries()
405 for library in libraries:
406 if library.media_type == AbsLibraryMediaType.BOOK and media_type == MediaType.AUDIOBOOK:
407 self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
408 elif (
409 library.media_type == AbsLibraryMediaType.PODCAST
410 and media_type == MediaType.PODCAST
411 ):
412 self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
413 await super().sync_library(media_type)
414 await self._cache_set_helper_libraries()
415
416 # update playlog
417 user = await self._client.get_my_user()
418 await self._set_playlog_from_user(user)
419
420 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
421 """Retrieve library/subscribed podcasts from the provider.
422
423 Minified podcast information is enough.
424 """
425 for pod_lib_id in self.libraries.podcasts:
426 async for response in self._client.get_library_items(library_id=pod_lib_id):
427 if not response.results:
428 break
429 podcast_ids = [x.id_ for x in response.results]
430 # store uuids
431 self.libraries.podcasts[pod_lib_id].item_ids.update(podcast_ids)
432 for podcast_minified in response.results:
433 assert isinstance(podcast_minified, LibraryItemMinifiedPodcast)
434 mass_podcast = parse_podcast(
435 abs_podcast=podcast_minified,
436 instance_id=self.instance_id,
437 domain=self.domain,
438 token=self._client.token,
439 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
440 )
441 if (
442 bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
443 and mass_podcast.total_episodes == 0
444 ):
445 continue
446 yield mass_podcast
447
448 @handle_refresh_token
449 async def _get_abs_expanded_podcast(
450 self, prov_podcast_id: str
451 ) -> AbsLibraryItemExpandedPodcast:
452 abs_podcast = await self._client.get_library_item_podcast(
453 podcast_id=prov_podcast_id, expanded=True
454 )
455 assert isinstance(abs_podcast, AbsLibraryItemExpandedPodcast)
456
457 return abs_podcast
458
459 @handle_refresh_token
460 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
461 """Get single podcast."""
462 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
463 return parse_podcast(
464 abs_podcast=abs_podcast,
465 instance_id=self.instance_id,
466 domain=self.domain,
467 token=self._client.token,
468 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
469 )
470
471 async def get_podcast_episodes(
472 self, prov_podcast_id: str
473 ) -> AsyncGenerator[PodcastEpisode, None]:
474 """Get all podcast episodes of podcast.
475
476 Adds progress information.
477 """
478 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
479 episode_cnt = 1
480 # the user has the progress of all media items
481 # so we use a single api call here to obtain possibly many
482 # progresses for episodes
483 user = await self._client.get_my_user()
484 abs_progresses = {
485 x.episode_id: x
486 for x in user.media_progress
487 if x.episode_id is not None and x.library_item_id == prov_podcast_id
488 }
489 for abs_episode in abs_podcast.media.episodes:
490 progress = abs_progresses.get(abs_episode.id_, None)
491 mass_episode = parse_podcast_episode(
492 episode=abs_episode,
493 prov_podcast_id=prov_podcast_id,
494 fallback_episode_cnt=episode_cnt,
495 instance_id=self.instance_id,
496 domain=self.domain,
497 token=self._client.token,
498 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
499 media_progress=progress,
500 )
501 yield mass_episode
502 episode_cnt += 1
503
504 @handle_refresh_token
505 async def get_podcast_episode(
506 self, prov_episode_id: str, add_progress: bool = True
507 ) -> PodcastEpisode:
508 """Get single podcast episode."""
509 prov_podcast_id, e_id = prov_episode_id.split(" ")
510 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
511 episode_cnt = 1
512 for abs_episode in abs_podcast.media.episodes:
513 if abs_episode.id_ == e_id:
514 progress = None
515 if add_progress:
516 progress = await self._client.get_my_media_progress(
517 item_id=prov_podcast_id, episode_id=abs_episode.id_
518 )
519 return parse_podcast_episode(
520 episode=abs_episode,
521 prov_podcast_id=prov_podcast_id,
522 fallback_episode_cnt=episode_cnt,
523 instance_id=self.instance_id,
524 domain=self.domain,
525 token=self._client.token,
526 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
527 media_progress=progress,
528 )
529
530 episode_cnt += 1
531 raise MediaNotFoundError("Episode not found")
532
533 async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
534 """Get Audiobook libraries.
535
536 Need expanded version for chapters.
537 """
538 for book_lib_id in self.libraries.audiobooks:
539 async for response in self._client.get_library_items(library_id=book_lib_id):
540 if not response.results:
541 break
542 book_ids = [x.id_ for x in response.results]
543 # store uuids
544 self.libraries.audiobooks[book_lib_id].item_ids.update(book_ids)
545 # use expanded version for chapters/ caching.
546 books_expanded = await self._client.get_library_item_batch_book(item_ids=book_ids)
547 for book_expanded in books_expanded:
548 # If the book has no audiofiles, we skip -> ebook only.
549 if len(book_expanded.media.tracks) == 0:
550 continue
551 mass_audiobook = parse_audiobook(
552 abs_audiobook=book_expanded,
553 instance_id=self.instance_id,
554 domain=self.domain,
555 token=self._client.token,
556 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
557 )
558 yield mass_audiobook
559
560 @handle_refresh_token
561 async def _get_abs_expanded_audiobook(
562 self, prov_audiobook_id: str
563 ) -> AbsLibraryItemExpandedBook:
564 abs_audiobook = await self._client.get_library_item_book(
565 book_id=prov_audiobook_id, expanded=True
566 )
567 assert isinstance(abs_audiobook, AbsLibraryItemExpandedBook)
568
569 return abs_audiobook
570
571 @handle_refresh_token
572 async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
573 """Get a single audiobook.
574
575 Progress is added here.
576 """
577 progress = await self._client.get_my_media_progress(item_id=prov_audiobook_id)
578 abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=prov_audiobook_id)
579 return parse_audiobook(
580 abs_audiobook=abs_audiobook,
581 instance_id=self.instance_id,
582 domain=self.domain,
583 token=self._client.token,
584 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
585 media_progress=progress,
586 )
587
588 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
589 """Get stream of item."""
590 # We always create a playback session. The default is direct playback.
591 # In that case, session.tracks holds the exact same as the audiobook/ podcast.track,
592 # so we only use the session to update our progress.
593 #
594 # In the case of hls the session has an hls stream as track.
595 if media_type in (MediaType.PODCAST_EPISODE, MediaType.AUDIOBOOK):
596 session = await self._get_playback_session(mass_item_id=item_id)
597 return await self._get_stream_details_session(
598 session, session_helper=self.sessions[item_id], media_type=media_type
599 )
600 raise MediaNotFoundError("Stream unknown")
601
602 async def _get_stream_details_session(
603 self,
604 abs_session: AbsPlaybackSessionExpanded,
605 session_helper: SessionHelper,
606 media_type: MediaType,
607 ) -> StreamDetails:
608 """Streamdetails audiobook.
609
610 We always use a custom stream type, also for single file, such
611 that we can handle an ffmpeg error and refresh our tokens.
612 """
613 abs_base_url = str(self.config.get_value(CONF_URL))
614 tracks = abs_session.audio_tracks
615
616 if len(tracks) == 0:
617 raise MediaNotFoundError("Session has no tracks.")
618
619 content_type = ContentType.UNKNOWN
620 if abs_session.audio_tracks[0].metadata is not None:
621 content_type = ContentType.try_parse(abs_session.audio_tracks[0].metadata.ext)
622
623 file_parts: list[MultiPartPath] = []
624 if self.is_token_user:
625 self.logger.debug("Token User - Streams are direct.")
626 for idx, track in enumerate(tracks):
627 if self.is_token_user:
628 # an api key is long-lived
629 stream_url = f"{abs_base_url}{track.content_url}?token={self._client.token}"
630 else:
631 # to ensure token is always valid, we create a dynamic url
632 # this ensures that we always get a fresh token on each part
633 # without having to deal with a custom stream etc.
634 # we also use this for a single track/ hls stream, otherwise we can't seek
635 stream_url = (
636 f"{self.mass.streams.base_url}/{self.instance_id}_part_stream?"
637 f"session_id={abs_session.id_}&part_id={idx}"
638 )
639 file_parts.append(MultiPartPath(path=stream_url, duration=track.duration))
640
641 stream_type = StreamType.HLS if "hls" in file_parts[0].path else StreamType.HTTP
642 if stream_type == StreamType.HLS:
643 # wait for stream to be ready
644 try:
645 await asyncio.wait_for(session_helper.hls_stream_open.wait(), 10)
646 except TimeoutError:
647 self.logger.warning(
648 "Did not receive HLS stream open event after 10s, continuing anyways."
649 )
650
651 return StreamDetails(
652 provider=self.instance_id,
653 item_id=abs_session.id_,
654 audio_format=AudioFormat(content_type=content_type),
655 media_type=media_type,
656 stream_type=stream_type,
657 duration=int(abs_session.duration),
658 path=file_parts[0].path if len(file_parts) == 1 else file_parts,
659 can_seek=True,
660 allow_seek=True,
661 )
662
663 async def _get_playback_session(self, mass_item_id: str) -> AbsPlaybackSessionExpanded:
664 """Either creates or returns an open abs session."""
665 async with self.create_session_lock:
666 # check for an available open session
667 if session_helper := self.sessions.get(mass_item_id):
668 # reset here, as this is our "time listened".
669 session_helper.last_sync_time = time.time()
670 with suppress(AbsSessionNotFoundError):
671 return await self._client.get_open_session(
672 session_id=session_helper.abs_session_id
673 )
674
675 item_ids = mass_item_id.split(" ")
676 abs_item_id = item_ids[0]
677 episode_id = item_ids[1] if len(item_ids) == 2 else None
678
679 # Create a new session
680 ## Check HLS usage
681 use_hls = bool(self.config.get_value(CONF_USE_HLS))
682 hls_formats = str(self.config.get_value(CONF_HLS_FORMATS))
683 if use_hls and hls_formats != HLS_ALL_FORMATS:
684 use_hls = False # only for certain formats
685 extensions = [x.lstrip(".") for x in hls_formats.split(HLS_FORMATS_SPLIT)]
686 if episode_id is None:
687 if (
688 metadata := (await self._get_abs_expanded_audiobook(abs_item_id))
689 .media.tracks[0]
690 .metadata
691 ):
692 if metadata.ext.lstrip(".") in extensions:
693 use_hls = True
694 else:
695 podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_item_id)
696 episode = None
697 for episode in podcast.media.episodes:
698 if episode.id_ == episode_id:
699 break
700 if episode and (metadata := episode.audio_track.metadata):
701 if metadata.ext.lstrip(".") in extensions:
702 use_hls = True
703
704 client_name = f"Music Assistant {self.instance_id}"
705 device_info = AbsDeviceInfo(
706 device_id=self.instance_id,
707 client_name=client_name,
708 client_version=self.mass.version,
709 manufacturer="",
710 model=self.mass.server_id,
711 )
712
713 session = await self._client.get_playback_session(
714 # These parameters give an hls if we don't enforce direct play stream,
715 # which is only a concat of the individual file's at abs
716 session_parameters=AbsPlaybackSessionParameters(
717 device_info=device_info,
718 force_direct_play=not use_hls,
719 force_transcode=use_hls,
720 # mimetypes are only checked for abs' internal "should transcode
721 # see https://github.com/advplyr/audiobookshelf/blob/master/server/managers/PlaybackSessionManager.js
722 supported_mime_types=[],
723 media_player=client_name,
724 ),
725 item_id=abs_item_id,
726 episode_id=episode_id,
727 )
728
729 if use_hls:
730 # Safety check.
731 track_url = session.audio_tracks[0].content_url
732 if track_url.split("/")[1] != "hls":
733 raise MediaNotFoundError("Did expect HLS stream for session playback")
734 self.logger.debug("Using an HLS stream for playback.")
735
736 self.sessions[mass_item_id] = SessionHelper(
737 abs_session_id=session.id_,
738 last_sync_time=time.time(),
739 hls_stream_open=asyncio.Event(),
740 )
741 return session
742
743 @handle_refresh_token
744 async def _handle_session_part_request(self, request: web.Request) -> web.Response:
745 """
746 Handle dynamic audiobook part stream request.
747
748 We redirect to the actual stream url with token.
749 This is done because the token might expire, so we need to
750 generate a fresh url on each part.
751 """
752 if not (session_id := request.query.get("session_id")):
753 return web.Response(status=400, text="Missing session_id")
754 if not (part_id := request.query.get("part_id")):
755 return web.Response(status=400, text="Missing part_id")
756 self.logger.debug(
757 "Handling session part request for session %s and part %s", session_id, part_id
758 )
759 try:
760 abs_session = await self._client.get_open_session(session_id=session_id)
761 except AbsSessionNotFoundError as err:
762 raise web.HTTPNotFound from err
763 part_id = int(part_id) # type: ignore[assignment]
764 try:
765 part_track = abs_session.audio_tracks[part_id]
766 except IndexError:
767 return web.Response(status=404, text="Part not found")
768
769 base_url = str(self.config.get_value(CONF_URL))
770 stream_url = f"{base_url}{part_track.content_url}?token={self._client.token}"
771 # redirect to the actual stream url
772 raise web.HTTPFound(location=stream_url)
773
774 @handle_refresh_token
775 async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
776 """Return finished:bool, position_ms: int."""
777 # this method is called _before_ get_stream_details, so the playback session
778 # is created here.
779 session = await self._get_playback_session(mass_item_id=item_id)
780 finished = session.current_time > session.duration - PLAYBACK_REPORT_INTERVAL_SECONDS
781 self.logger.debug("Resume position: obtained.")
782 return finished, int(session.current_time * 1000)
783
784 @handle_refresh_token
785 async def recommendations(self) -> list[RecommendationFolder]:
786 """Get recommendations."""
787 # We have to avoid "flooding" the home page, which becomes especially troublesome if users
788 # have multiple libraries. Instead we collect per ShelfId, and make sure, that we always get
789 # roughly the same amount of items per row, no matter the amount of libraries
790 # List of list (one list per lib) here, such that we can pick the items per lib later.
791 items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]] = {}
792
793 all_libraries = {**self.libraries.audiobooks, **self.libraries.podcasts}
794 max_items_per_row = 20
795 num_libraries = len(all_libraries)
796
797 if num_libraries == 0:
798 self._log_no_libraries()
799 return []
800
801 limit_items_per_lib = max_items_per_row // num_libraries
802 limit_items_per_lib = 1 if limit_items_per_lib == 0 else limit_items_per_lib
803
804 for library_id in all_libraries:
805 shelves = await self._client.get_library_personalized_view(
806 library_id=library_id, limit=limit_items_per_lib
807 )
808 await self._recommendations_iter_shelves(shelves, library_id, items_by_shelf_id)
809
810 folders: list[RecommendationFolder] = []
811 for shelf_id, item_lists in items_by_shelf_id.items():
812 # we have something like [[A, B], [C, D, E], [F]]
813 # and want [A, C, F, B, D, E]
814 recommendation_items = [
815 x
816 for x in itertools.chain.from_iterable(itertools.zip_longest(*item_lists))
817 if x is not None
818 ][:max_items_per_row]
819
820 # shelf ids follow pattern:
821 # recently-added
822 # newest-episodes
823 # etc
824 name = f"{shelf_id.capitalize().replace('-', ' ')}"
825 if ABS_SHELF_ID_TRANSLATION_KEY.get(shelf_id):
826 name = "" # use translation key if available
827 folders.append(
828 RecommendationFolder(
829 item_id=f"{shelf_id}",
830 name=name,
831 icon=ABS_SHELF_ID_ICONS.get(shelf_id),
832 translation_key=ABS_SHELF_ID_TRANSLATION_KEY.get(shelf_id),
833 items=UniqueList(recommendation_items),
834 provider=self.instance_id,
835 )
836 )
837
838 # Browse "recommendation" for convenience. If the user has
839 # multiple audiobook libraries, we return a listing of them.
840 # If there is only a single audiobook library, we add the folders
841 # from _browse_lib_audiobooks, i.e. Authors, Narrators etc.
842 # Podcast libs do not have filter folders, so always the root folders.
843 browse_items: list[MediaItemType | BrowseFolder] = []
844 translation_key = "libraries"
845 if len(self.libraries.audiobooks) <= 1:
846 if len(self.libraries.podcasts) == 0:
847 translation_key = "library"
848
849 # audiobooklibs are first, and we have at max 1 audiobook lib
850 _browse_root = self._browse_root(append_mediatype_suffix=False)
851 if len(self.libraries.audiobooks) == 0:
852 browse_items.extend(_browse_root)
853 else:
854 assert isinstance(_browse_root[0], BrowseFolder)
855 _path = _browse_root[0].path
856 browse_items.extend(self._browse_lib_audiobooks(current_path=_path))
857 # add podcast roots
858 browse_items.extend(_browse_root[1:])
859 else:
860 browse_items = list(self._browse_root())
861
862 folders.append(
863 RecommendationFolder(
864 item_id="browse",
865 name="", # use translation key
866 icon="mdi-bookshelf",
867 translation_key=translation_key,
868 items=UniqueList(browse_items),
869 provider=self.instance_id,
870 )
871 )
872
873 return folders
874
875 async def _recommendations_iter_shelves(
876 self,
877 shelves: list[ShelfBook | ShelfPodcast | ShelfAuthors | ShelfEpisode | ShelfSeries],
878 library_id: str,
879 items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]],
880 ) -> None:
881 for shelf in shelves:
882 media_type: MediaType
883 match shelf.type_:
884 case AbsShelfType.PODCAST:
885 media_type = MediaType.PODCAST
886 case AbsShelfType.EPISODE:
887 media_type = MediaType.PODCAST_EPISODE
888 case AbsShelfType.BOOK:
889 media_type = MediaType.AUDIOBOOK
890 case AbsShelfType.SERIES | AbsShelfType.AUTHORS:
891 media_type = MediaType.FOLDER
892 case _:
893 # this would be authors, currently
894 continue
895
896 items: list[MediaItemType | BrowseFolder] = []
897 # Recently added is the _only_ case, where we get a full podcast
898 # We have a podcast object with only the episodes matching the
899 # shelf.id_ otherwise.
900 match shelf.id_:
901 case (
902 AbsShelfId.RECENTLY_ADDED
903 | AbsShelfId.LISTEN_AGAIN
904 | AbsShelfId.DISCOVER
905 | AbsShelfId.NEWEST_EPISODES
906 | AbsShelfId.CONTINUE_LISTENING
907 ):
908 for entity in shelf.entities:
909 assert isinstance(entity, ShelfLibraryItemMinified)
910 item: MediaItemType | None = None
911 if media_type in [MediaType.PODCAST, MediaType.AUDIOBOOK]:
912 item = await self.mass.music.get_library_item_by_prov_id(
913 media_type=media_type,
914 provider_instance_id_or_domain=self.instance_id,
915 item_id=entity.id_,
916 )
917 elif media_type == MediaType.PODCAST_EPISODE:
918 podcast_id = entity.id_
919 if entity.recent_episode is None:
920 continue
921 # we only have a PodcastEpisode here, with limited information
922 item = parse_podcast_episode(
923 episode=entity.recent_episode,
924 prov_podcast_id=podcast_id,
925 instance_id=self.instance_id,
926 domain=self.domain,
927 token=self._client.token,
928 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
929 )
930 if item is not None:
931 items.append(item)
932 case AbsShelfId.RECENT_SERIES | AbsShelfId.CONTINUE_SERIES:
933 # We jump into a browse folder here if we have SeriesShelf, set path up as if
934 # browse function used.
935 if isinstance(shelf, ShelfSeries):
936 for entity in shelf.entities:
937 assert isinstance(entity, SeriesShelf)
938 if len(entity.books) == 0:
939 continue
940 path = (
941 f"{self.instance_id}://"
942 f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
943 f"{AbsBrowsePaths.SERIES}/{entity.id_}"
944 )
945 items.append(
946 BrowseFolder(
947 item_id=entity.id_,
948 name=entity.name,
949 provider=self.instance_id,
950 path=path,
951 )
952 )
953 elif isinstance(shelf, ShelfBook) and media_type == MediaType.AUDIOBOOK:
954 # Single books, must be audiobooks
955 for entity in shelf.entities:
956 item = await self.mass.music.get_library_item_by_prov_id(
957 media_type=media_type,
958 provider_instance_id_or_domain=self.instance_id,
959 item_id=entity.id_,
960 )
961 if item is not None:
962 items.append(item)
963 case AbsShelfId.NEWEST_AUTHORS:
964 # same as for series, use a folder
965 for entity in shelf.entities:
966 assert isinstance(entity, AuthorExpanded)
967 if entity.num_books == 0:
968 continue
969 path = (
970 f"{self.instance_id}://"
971 f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
972 f"{AbsBrowsePaths.AUTHORS}/{entity.id_}"
973 )
974 items.append(
975 BrowseFolder(
976 item_id=entity.id_,
977 name=entity.name,
978 provider=self.instance_id,
979 path=path,
980 )
981 )
982 if not items:
983 continue
984
985 # add collected items
986 assert isinstance(shelf.id_, AbsShelfId)
987 items_collected = items_by_shelf_id.get(shelf.id_, [])
988 items_collected.append(items)
989 items_by_shelf_id[shelf.id_] = items_collected
990
991 @handle_refresh_token
992 async def on_played(
993 self,
994 media_type: MediaType,
995 prov_item_id: str,
996 fully_played: bool,
997 position: int,
998 media_item: MediaItemType,
999 is_playing: bool = False,
1000 ) -> None:
1001 """Update progress in Audiobookshelf.
1002
1003 In our case media_type may have 3 values:
1004 - PODCAST
1005 - PODCAST_EPISODE
1006 - AUDIOBOOK
1007 We ignore PODCAST (function is called on adding a podcast with position=None)
1008
1009 """
1010
1011 async def _update_by_session(session_helper: SessionHelper, duration: int) -> bool:
1012 now = time.time()
1013 time_listened = now - session_helper.last_sync_time
1014 if time_listened > PLAYBACK_REPORT_INTERVAL_SECONDS + 3:
1015 # See player_queues controller, we get an update every 30s, and immediately on pause
1016 # or play.
1017 # We reset above 33, as this indicates a trigger after a longer absence and should
1018 # not count into abs' statistics
1019 self.logger.debug("Resetting time_listened due to longer absence.")
1020 time_listened = 0.0
1021 try:
1022 await self._client.sync_open_session(
1023 session_id=session_helper.abs_session_id,
1024 parameters=SyncOpenSessionParameters(
1025 current_time=position,
1026 time_listened=time_listened,
1027 duration=duration,
1028 ),
1029 )
1030 session_helper.last_sync_time = now
1031 self.logger.debug("Synced playback session, position %s s.", position)
1032 return True
1033 except AbsSessionNotFoundError:
1034 self.logger.error("Was unable to sync session.")
1035 return False
1036
1037 if media_type == MediaType.PODCAST_EPISODE:
1038 abs_podcast_id, abs_episode_id = prov_item_id.split(" ")
1039
1040 # guard, see progress guard class docstrings for explanation
1041 if not self.progress_guard.guard_ok_mass(
1042 item_id=abs_podcast_id, episode_id=abs_episode_id
1043 ):
1044 return
1045 self.progress_guard.add_progress(item_id=abs_podcast_id, episode_id=abs_episode_id)
1046
1047 if media_item is None or not isinstance(media_item, PodcastEpisode):
1048 return
1049
1050 if fully_played and position < media_item.duration - PLAYBACK_REPORT_INTERVAL_SECONDS:
1051 # faulty position update
1052 # occurs sometimes, if a player disconnects unexpectedly, or reports
1053 # a false position - seen this for MC players, but not for sendspin
1054 return
1055
1056 if position == 0 and not fully_played:
1057 # marked unplayed
1058 mp = await self._client.get_my_media_progress(
1059 item_id=abs_podcast_id, episode_id=abs_episode_id
1060 )
1061 if mp is not None:
1062 await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1063 self.logger.debug(f"Removed media progress of {media_type.value}.")
1064 return
1065
1066 duration = media_item.duration
1067 updated = False
1068 if session_helper := self.sessions.get(prov_item_id):
1069 updated = await _update_by_session(session_helper=session_helper, duration=duration)
1070 if not updated:
1071 self.logger.debug(
1072 f"Updating media progress of {media_type.value}, title {media_item.name}."
1073 )
1074 await self._client.update_my_media_progress(
1075 item_id=abs_podcast_id,
1076 episode_id=abs_episode_id,
1077 duration_seconds=duration,
1078 progress_seconds=position,
1079 is_finished=fully_played,
1080 )
1081
1082 if media_type == MediaType.AUDIOBOOK:
1083 # guard, see progress guard class docstrings for explanation
1084 if not self.progress_guard.guard_ok_mass(item_id=prov_item_id):
1085 return
1086 self.progress_guard.add_progress(item_id=prov_item_id)
1087
1088 if media_item is None or not isinstance(media_item, Audiobook):
1089 return
1090
1091 if fully_played and position < media_item.duration - PLAYBACK_REPORT_INTERVAL_SECONDS:
1092 # faulty position update, see above
1093 return
1094
1095 if position == 0 and not fully_played:
1096 # marked unplayed
1097 mp = await self._client.get_my_media_progress(item_id=prov_item_id)
1098 if mp is not None:
1099 await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1100 self.logger.debug(f"Removed media progress of {media_type.value}.")
1101 return
1102
1103 duration = media_item.duration
1104 updated = False
1105 if session_helper := self.sessions.get(prov_item_id):
1106 updated = await _update_by_session(session_helper=session_helper, duration=duration)
1107 if not updated:
1108 self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress")
1109 await self._client.update_my_media_progress(
1110 item_id=prov_item_id,
1111 duration_seconds=duration,
1112 progress_seconds=position,
1113 is_finished=fully_played,
1114 )
1115
1116 @handle_refresh_token
1117 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
1118 """Browse for audiobookshelf.
1119
1120 Generates this view:
1121 Library_Name_A (Audiobooks)
1122 Audiobooks
1123 Audiobook_1
1124 Audiobook_2
1125 Series
1126 Series_1
1127 Audiobook_1
1128 Audiobook_2
1129 Series_2
1130 Audiobook_3
1131 Audiobook_4
1132 Collections
1133 Collection_1
1134 Audiobook_1
1135 Audiobook_2
1136 Collection_2
1137 Audiobook_3
1138 Audiobook_4
1139 Authors
1140 Author_1
1141 Series_1
1142 Audiobook_1
1143 Audiobook_2
1144 Author_2
1145 Audiobook_3
1146 Library_Name_B (Podcasts)
1147 Podcast_1
1148 Podcast_2
1149 """
1150 item_path = path.split("://", 1)[1]
1151 if not item_path:
1152 return self._browse_root()
1153 sub_path = item_path.split("/")
1154 lib_key, lib_id = sub_path[0].split(" ")
1155 if len(sub_path) == 1:
1156 if lib_key == AbsBrowsePaths.LIBRARIES_PODCAST:
1157 return await self._browse_lib_podcasts(library_id=lib_id)
1158 return self._browse_lib_audiobooks(current_path=path)
1159 if len(sub_path) == 2:
1160 item_key = sub_path[1]
1161 match item_key:
1162 case AbsBrowsePaths.AUTHORS:
1163 return await self._browse_authors(current_path=path, library_id=lib_id)
1164 case AbsBrowsePaths.NARRATORS:
1165 return await self._browse_narrators(current_path=path, library_id=lib_id)
1166 case AbsBrowsePaths.SERIES:
1167 return await self._browse_series(current_path=path, library_id=lib_id)
1168 case AbsBrowsePaths.COLLECTIONS:
1169 return await self._browse_collections(current_path=path, library_id=lib_id)
1170 case AbsBrowsePaths.AUDIOBOOKS:
1171 return await self._browse_books(library_id=lib_id)
1172 elif len(sub_path) == 3:
1173 item_key, item_id = sub_path[1:3]
1174 match item_key:
1175 case AbsBrowsePaths.AUTHORS:
1176 return await self._browse_author_books(current_path=path, author_id=item_id)
1177 case AbsBrowsePaths.NARRATORS:
1178 return await self._browse_narrator_books(
1179 library_id=lib_id, narrator_filter_str=item_id
1180 )
1181 case AbsBrowsePaths.SERIES:
1182 return await self._browse_series_books(series_id=item_id)
1183 case AbsBrowsePaths.COLLECTIONS:
1184 return await self._browse_collection_books(collection_id=item_id)
1185 elif len(sub_path) == 4:
1186 # series within author
1187 series_id = sub_path[3]
1188 return await self._browse_series_books(series_id=series_id)
1189 return []
1190
1191 def _browse_root(self, append_mediatype_suffix: bool = True) -> Sequence[BrowseFolder]:
1192 items = []
1193
1194 def _get_folder(
1195 path: str, lib_id: str, lib_name: str, translation_key: str | None = None
1196 ) -> BrowseFolder:
1197 return BrowseFolder(
1198 item_id=lib_id,
1199 name=lib_name,
1200 translation_key=translation_key, # if given, <name>: <translation> in frontend
1201 provider=self.instance_id,
1202 path=f"{self.instance_id}://{path}",
1203 )
1204
1205 if len(self.libraries.audiobooks) == 0 and len(self.libraries.podcasts) == 0:
1206 self._log_no_libraries()
1207 return []
1208
1209 translation_key: str | None
1210 for lib_id, lib in self.libraries.audiobooks.items():
1211 path = f"{AbsBrowsePaths.LIBRARIES_BOOK} {lib_id}"
1212 translation_key = None
1213 if append_mediatype_suffix:
1214 translation_key = AbsBrowseItemsBookTranslationKey.AUDIOBOOKS
1215 items.append(
1216 _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1217 )
1218 for lib_id, lib in self.libraries.podcasts.items():
1219 path = f"{AbsBrowsePaths.LIBRARIES_PODCAST} {lib_id}"
1220 translation_key = None
1221 if append_mediatype_suffix:
1222 translation_key = AbsBrowseItemsPodcastTranslationKey.PODCASTS
1223 items.append(
1224 _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1225 )
1226 return items
1227
1228 async def _browse_lib_podcasts(self, library_id: str) -> list[MediaItemType]:
1229 """No sub categories for podcasts."""
1230 if len(self.libraries.podcasts[library_id].item_ids) == 0:
1231 self._log_no_helper_item_ids()
1232 items = []
1233 for podcast_id in self.libraries.podcasts[library_id].item_ids:
1234 mass_item = await self.mass.music.get_library_item_by_prov_id(
1235 media_type=MediaType.PODCAST,
1236 item_id=podcast_id,
1237 provider_instance_id_or_domain=self.instance_id,
1238 )
1239 if mass_item is not None:
1240 items.append(mass_item)
1241 return sorted(items, key=lambda x: x.name)
1242
1243 def _browse_lib_audiobooks(self, current_path: str) -> Sequence[BrowseFolder]:
1244 items = []
1245 for translation_key in AbsBrowseItemsBookTranslationKey:
1246 path = current_path + "/" + ABS_BROWSE_ITEMS_TO_PATH[translation_key]
1247 items.append(
1248 BrowseFolder(
1249 item_id=translation_key.lower(),
1250 name="", # use translation key
1251 translation_key=translation_key,
1252 provider=self.instance_id,
1253 path=path,
1254 )
1255 )
1256 return items
1257
1258 async def _browse_authors(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1259 abs_authors = await self._client.get_library_authors(library_id=library_id)
1260 items = []
1261 for author in abs_authors:
1262 path = f"{current_path}/{author.id_}"
1263 items.append(
1264 BrowseFolder(
1265 item_id=author.id_,
1266 name=author.name,
1267 provider=self.instance_id,
1268 path=path,
1269 )
1270 )
1271
1272 return sorted(items, key=lambda x: x.name)
1273
1274 async def _browse_narrators(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1275 abs_narrators = await self._client.get_library_narrators(library_id=library_id)
1276 items = []
1277 for narrator in abs_narrators:
1278 path = f"{current_path}/{narrator.id_}"
1279 items.append(
1280 BrowseFolder(
1281 item_id=narrator.id_,
1282 name=narrator.name,
1283 provider=self.instance_id,
1284 path=path,
1285 )
1286 )
1287
1288 return sorted(items, key=lambda x: x.name)
1289
1290 async def _browse_series(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1291 items = []
1292 async for response in self._client.get_library_series(library_id=library_id):
1293 if not response.results:
1294 break
1295 for abs_series in response.results:
1296 path = f"{current_path}/{abs_series.id_}"
1297 items.append(
1298 BrowseFolder(
1299 item_id=abs_series.id_,
1300 name=abs_series.name,
1301 provider=self.instance_id,
1302 path=path,
1303 )
1304 )
1305
1306 return sorted(items, key=lambda x: x.name)
1307
1308 async def _browse_collections(
1309 self, current_path: str, library_id: str
1310 ) -> Sequence[BrowseFolder]:
1311 items = []
1312 async for response in self._client.get_library_collections(library_id=library_id):
1313 if not response.results:
1314 break
1315 for abs_collection in response.results:
1316 path = f"{current_path}/{abs_collection.id_}"
1317 items.append(
1318 BrowseFolder(
1319 item_id=abs_collection.id_,
1320 name=abs_collection.name,
1321 provider=self.instance_id,
1322 path=path,
1323 )
1324 )
1325 return sorted(items, key=lambda x: x.name)
1326
1327 async def _browse_books(self, library_id: str) -> Sequence[MediaItemType]:
1328 if len(self.libraries.audiobooks[library_id].item_ids) == 0:
1329 self._log_no_helper_item_ids()
1330 items = []
1331 for book_id in self.libraries.audiobooks[library_id].item_ids:
1332 mass_item = await self.mass.music.get_library_item_by_prov_id(
1333 media_type=MediaType.AUDIOBOOK,
1334 item_id=book_id,
1335 provider_instance_id_or_domain=self.instance_id,
1336 )
1337 if mass_item is not None:
1338 items.append(mass_item)
1339 return sorted(items, key=lambda x: x.name)
1340
1341 async def _browse_author_books(
1342 self, current_path: str, author_id: str
1343 ) -> Sequence[MediaItemType | BrowseFolder]:
1344 items: list[MediaItemType | BrowseFolder] = []
1345
1346 abs_author = await self._client.get_author(
1347 author_id=author_id, include_items=True, include_series=True
1348 )
1349 if not isinstance(abs_author, AbsAuthorWithItemsAndSeries):
1350 raise TypeError("Unexpected type of author.")
1351
1352 book_ids = {x.id_ for x in abs_author.library_items}
1353 series_book_ids = set()
1354
1355 for series in abs_author.series:
1356 series_book_ids.update([x.id_ for x in series.items])
1357 path = f"{current_path}/{series.id_}"
1358 items.append(
1359 BrowseFolder(
1360 item_id=series.id_,
1361 # frontend does <name>: <translation>
1362 name=series.name,
1363 translation_key="series_singular",
1364 provider=self.instance_id,
1365 path=path,
1366 )
1367 )
1368 book_ids = book_ids.difference(series_book_ids)
1369 for book_id in book_ids:
1370 mass_item = await self.mass.music.get_library_item_by_prov_id(
1371 media_type=MediaType.AUDIOBOOK,
1372 item_id=book_id,
1373 provider_instance_id_or_domain=self.instance_id,
1374 )
1375 if mass_item is not None:
1376 items.append(mass_item)
1377
1378 return items
1379
1380 async def _browse_narrator_books(
1381 self, library_id: str, narrator_filter_str: str
1382 ) -> Sequence[MediaItemType]:
1383 items: list[MediaItemType] = []
1384 async for response in self._client.get_library_items(
1385 library_id=library_id, filter_str=f"narrators.{narrator_filter_str}"
1386 ):
1387 if not response.results:
1388 break
1389 for item in response.results:
1390 mass_item = await self.mass.music.get_library_item_by_prov_id(
1391 media_type=MediaType.AUDIOBOOK,
1392 item_id=item.id_,
1393 provider_instance_id_or_domain=self.instance_id,
1394 )
1395 if mass_item is not None:
1396 items.append(mass_item)
1397
1398 return sorted(items, key=lambda x: x.name)
1399
1400 async def _browse_series_books(self, series_id: str) -> Sequence[MediaItemType]:
1401 items = []
1402
1403 abs_series = await self._client.get_series(series_id=series_id, include_progress=True)
1404 if not isinstance(abs_series, AbsSeriesWithProgress):
1405 raise TypeError("Unexpected series type.")
1406
1407 for book_id in abs_series.progress.library_item_ids:
1408 # these are sorted in abs by sequence
1409 mass_item = await self.mass.music.get_library_item_by_prov_id(
1410 media_type=MediaType.AUDIOBOOK,
1411 item_id=book_id,
1412 provider_instance_id_or_domain=self.instance_id,
1413 )
1414 if mass_item is not None:
1415 items.append(mass_item)
1416
1417 return items
1418
1419 async def _browse_collection_books(self, collection_id: str) -> Sequence[MediaItemType]:
1420 items = []
1421 abs_collection = await self._client.get_collection(collection_id=collection_id)
1422 for book in abs_collection.books:
1423 mass_item = await self.mass.music.get_library_item_by_prov_id(
1424 media_type=MediaType.AUDIOBOOK,
1425 item_id=book.id_,
1426 provider_instance_id_or_domain=self.instance_id,
1427 )
1428 if mass_item is not None:
1429 items.append(mass_item)
1430 return items
1431
1432 async def _socket_abs_item_changed(
1433 self, items: LibraryItemExpanded | list[LibraryItemExpanded]
1434 ) -> None:
1435 """For added and updated."""
1436 abs_items = [items] if isinstance(items, LibraryItemExpanded) else items
1437 for abs_item in abs_items:
1438 if isinstance(abs_item, LibraryItemExpandedBook):
1439 # If the book has no audiofiles, we skip -> ebook only.
1440 if len(abs_item.media.tracks) == 0:
1441 continue
1442 self.logger.debug(
1443 'Updated book "%s" via socket.', abs_item.media.metadata.title or ""
1444 )
1445 await self.mass.music.audiobooks.add_item_to_library(
1446 parse_audiobook(
1447 abs_audiobook=abs_item,
1448 instance_id=self.instance_id,
1449 domain=self.domain,
1450 token=self._client.token,
1451 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1452 ),
1453 overwrite_existing=True,
1454 )
1455 lib = self.libraries.audiobooks.get(abs_item.library_id, None)
1456 if lib is not None:
1457 lib.item_ids.add(abs_item.id_)
1458 elif isinstance(abs_item, LibraryItemExpandedPodcast):
1459 self.logger.debug(
1460 'Updated podcast "%s" via socket.', abs_item.media.metadata.title or ""
1461 )
1462 mass_podcast = parse_podcast(
1463 abs_podcast=abs_item,
1464 instance_id=self.instance_id,
1465 domain=self.domain,
1466 token=self._client.token,
1467 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1468 )
1469 if not (
1470 bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
1471 and mass_podcast.total_episodes == 0
1472 ):
1473 await self.mass.music.podcasts.add_item_to_library(
1474 mass_podcast,
1475 overwrite_existing=True,
1476 )
1477 lib = self.libraries.podcasts.get(abs_item.library_id, None)
1478 if lib is not None:
1479 lib.item_ids.add(abs_item.id_)
1480 await self._cache_set_helper_libraries()
1481
1482 async def _socket_abs_item_removed(self, item: LibraryItemRemoved) -> None:
1483 """Item removed."""
1484 media_type: MediaType | None = None
1485 for lib in self.libraries.audiobooks.values():
1486 if item.id_ in lib.item_ids:
1487 media_type = MediaType.AUDIOBOOK
1488 lib.item_ids.remove(item.id_)
1489 break
1490 for lib in self.libraries.podcasts.values():
1491 if item.id_ in lib.item_ids:
1492 media_type = MediaType.PODCAST
1493 lib.item_ids.remove(item.id_)
1494 break
1495
1496 if media_type is not None:
1497 mass_item = await self.mass.music.get_library_item_by_prov_id(
1498 media_type=media_type,
1499 item_id=item.id_,
1500 provider_instance_id_or_domain=self.instance_id,
1501 )
1502 if mass_item is not None:
1503 await self.mass.music.remove_item_from_library(
1504 media_type=media_type, library_item_id=mass_item.item_id
1505 )
1506 self.logger.debug('Removed %s "%s" via socket.', media_type.value, mass_item.name)
1507
1508 await self._cache_set_helper_libraries()
1509
1510 async def _socket_abs_user_item_progress_updated(
1511 self, id_: str, progress: MediaProgress
1512 ) -> None:
1513 """To update continue listening.
1514
1515 ABS reports every 15s and immediately on play state change.
1516 This callback is called per item if a progress is changed:
1517 - a change in position
1518 - the item is finished
1519 But it is _not_called, if a progress is reset/ discarded.
1520 """
1521 # guard, see progress guard class docstrings for explanation
1522 if not self.progress_guard.guard_ok_abs(abs_progress=progress):
1523 return
1524
1525 known_ids = self._get_all_known_item_ids()
1526 if progress.library_item_id not in known_ids:
1527 return
1528
1529 self.logger.debug(f"Updated progress of item {progress.library_item_id} via socket.")
1530
1531 if progress.episode_id is None:
1532 await self._update_playlog_book(progress)
1533 return
1534 await self._update_playlog_episode(progress)
1535
1536 async def _socket_abs_refresh_token_expired(self) -> None:
1537 await self.reauthenticate()
1538
1539 async def _socket_stream_open(self, stream: AbsStream) -> None:
1540 # stream's id is the same as the playback session id
1541 for session_helper in self.sessions.values():
1542 if session_helper.abs_session_id == stream.id_:
1543 session_helper.hls_stream_open.set()
1544 break
1545
1546 async def reauthenticate(self) -> None:
1547 """Reauthorize the abs session config if refresh token expired."""
1548 # some safe guarding should that function be called simultaneously
1549 if self.reauthenticate_lock.locked() or time.time() - self.reauthenticate_last < 5:
1550 while True:
1551 if not self.reauthenticate_lock.locked():
1552 return
1553 await asyncio.sleep(0.5)
1554 async with self.reauthenticate_lock:
1555 await self._client.session_config.authenticate(
1556 username=str(self.config.get_value(CONF_USERNAME)),
1557 password=str(self.config.get_value(CONF_PASSWORD)),
1558 )
1559 self.reauthenticate_last = time.time()
1560
1561 def _get_all_known_item_ids(self) -> set[str]:
1562 known_ids = set()
1563 for lib in self.libraries.podcasts.values():
1564 known_ids.update(lib.item_ids)
1565 for lib in self.libraries.audiobooks.values():
1566 known_ids.update(lib.item_ids)
1567
1568 return known_ids
1569
1570 async def _set_playlog_from_user(self, user: User) -> None:
1571 """Update on user callback.
1572
1573 User holds also all media progresses specific to that user.
1574
1575 The function 'guard_ok_abs' uses the timestamp of the last update in abs, thus after an
1576 initial progress update, an unchanged update will not trigger a (useless) playlog update.
1577
1578 We do not sync removed progresses for the sake of simplicity.
1579 """
1580 await self._set_playlog_from_user_sync(user.media_progress)
1581
1582 async def _set_playlog_from_user_sync(self, progresses: list[MediaProgress]) -> None:
1583 # for debugging
1584 __updated_items = 0
1585
1586 known_ids = self._get_all_known_item_ids()
1587 abs_ids_with_progress = set()
1588
1589 for progress in progresses:
1590 # save progress ids for later
1591 ma_item_id = (
1592 progress.library_item_id
1593 if progress.episode_id is None
1594 else f"{progress.library_item_id} {progress.episode_id}"
1595 )
1596 abs_ids_with_progress.add(ma_item_id)
1597
1598 # Guard. Also makes sure, that we don't write to db again if no state change happened.
1599 # This is achieved by adding a Helper Progress in the update playlog functions, which
1600 # then has the most recent timestamp. If a subsequent progress sent by abs has an older
1601 # timestamp, we do not update again.
1602 if not self.progress_guard.guard_ok_abs(progress):
1603 continue
1604 if progress.current_time is not None:
1605 if (
1606 int(progress.current_time) != 0
1607 and not progress.current_time >= PLAYBACK_REPORT_INTERVAL_SECONDS
1608 ):
1609 # same as mass default, only > 30s
1610 continue
1611 if progress.library_item_id not in known_ids:
1612 continue
1613 __updated_items += 1
1614 if progress.episode_id is None:
1615 await self._update_playlog_book(progress)
1616 else:
1617 await self._update_playlog_episode(progress)
1618 self.logger.debug(f"Updated {__updated_items} from full playlog.")
1619
1620 # Get MA's known progresses of ABS.
1621 # In ABS the user may discard a progress, which removes the progress completely.
1622 # There is no socket notification for this event.
1623 ma_playlog_state = await self.mass.music.get_playlog_provider_item_ids(
1624 provider_instance_id=self.instance_id
1625 )
1626 ma_ids_with_progress = {x for _, x in ma_playlog_state}
1627 discarded_progress_ids = ma_ids_with_progress.difference(abs_ids_with_progress)
1628 for discarded_progress_id in discarded_progress_ids:
1629 if len(discarded_progress_id.split(" ")) == 1:
1630 if discarded_item := await self.mass.music.get_library_item_by_prov_id(
1631 media_type=MediaType.AUDIOBOOK,
1632 item_id=discarded_progress_id,
1633 provider_instance_id_or_domain=self.instance_id,
1634 ):
1635 self.progress_guard.add_progress(discarded_progress_id)
1636 await self.mass.music.mark_item_unplayed(discarded_item)
1637 else:
1638 with suppress(MediaNotFoundError):
1639 discarded_item = await self.get_podcast_episode(
1640 prov_episode_id=discarded_progress_id, add_progress=False
1641 )
1642 self.progress_guard.add_progress(*discarded_progress_id.split(" "))
1643 await self.mass.music.mark_item_unplayed(discarded_item)
1644 self.logger.debug("Discarded item %s ", discarded_progress_id)
1645
1646 async def _update_playlog_book(self, progress: MediaProgress) -> None:
1647 # helper progress also ensures no useless progress updates,
1648 # see comment above
1649 self.progress_guard.add_progress(progress.library_item_id)
1650 if progress.current_time is None:
1651 return
1652 mass_audiobook = await self.mass.music.get_library_item_by_prov_id(
1653 media_type=MediaType.AUDIOBOOK,
1654 item_id=progress.library_item_id,
1655 provider_instance_id_or_domain=self.instance_id,
1656 )
1657 if mass_audiobook is None:
1658 return
1659 if int(progress.current_time) == 0 and not progress.is_finished:
1660 await self.mass.music.mark_item_unplayed(mass_audiobook)
1661 else:
1662 await self.mass.music.mark_item_played(
1663 mass_audiobook,
1664 fully_played=progress.is_finished,
1665 seconds_played=int(progress.current_time),
1666 )
1667
1668 async def _update_playlog_episode(self, progress: MediaProgress) -> None:
1669 # helper progress also ensures no useless progress updates,
1670 # see comment above
1671 self.progress_guard.add_progress(progress.library_item_id, progress.episode_id)
1672 if progress.current_time is None:
1673 return
1674 _episode_id = f"{progress.library_item_id} {progress.episode_id}"
1675 try:
1676 # need to obtain full podcast, and then search for episode
1677 mass_episode = await self.get_podcast_episode(_episode_id, add_progress=False)
1678 except MediaNotFoundError:
1679 return
1680 if int(progress.current_time) == 0 and not progress.is_finished:
1681 await self.mass.music.mark_item_unplayed(mass_episode)
1682 else:
1683 await self.mass.music.mark_item_played(
1684 mass_episode,
1685 fully_played=progress.is_finished,
1686 seconds_played=int(progress.current_time),
1687 )
1688
1689 async def _cache_set_helper_libraries(self) -> None:
1690 await self.mass.cache.set(
1691 key=CACHE_KEY_LIBRARIES,
1692 provider=self.instance_id,
1693 category=CACHE_CATEGORY_LIBRARIES,
1694 data=self.libraries.to_dict(),
1695 )
1696
1697 def _log_no_libraries(self) -> None:
1698 self.logger.error("There are no libraries visible to the Audiobookshelf provider.")
1699
1700 def _log_no_helper_item_ids(self) -> None:
1701 self.logger.warning(
1702 "Cached item ids are missing. "
1703 "Please trigger a full resync of the Audiobookshelf provider manually."
1704 )
1705