/
/
/
1"""Audiobookshelf (abs) provider for Music Assistant."""
2
3from __future__ import annotations
4
5import asyncio
6import functools
7import itertools
8import time
9from collections.abc import AsyncGenerator, Callable, Coroutine, Sequence
10from contextlib import suppress
11from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
12
13import aioaudiobookshelf as aioabs
14from aioaudiobookshelf.client.items import LibraryItemExpandedBook as AbsLibraryItemExpandedBook
15from aioaudiobookshelf.client.items import (
16 LibraryItemExpandedPodcast as AbsLibraryItemExpandedPodcast,
17)
18from aioaudiobookshelf.client.items import PlaybackSessionExpanded as AbsPlaybackSessionExpanded
19from aioaudiobookshelf.client.items import PlaybackSessionParameters as AbsPlaybackSessionParameters
20from aioaudiobookshelf.client.session import SyncOpenSessionParameters
21from aioaudiobookshelf.exceptions import LoginError as AbsLoginError
22from aioaudiobookshelf.exceptions import RefreshTokenExpiredError
23from aioaudiobookshelf.exceptions import SessionNotFoundError as AbsSessionNotFoundError
24from aioaudiobookshelf.schema.author import AuthorExpanded
25from aioaudiobookshelf.schema.calls_authors import (
26 AuthorWithItemsAndSeries as AbsAuthorWithItemsAndSeries,
27)
28from aioaudiobookshelf.schema.calls_series import SeriesWithProgress as AbsSeriesWithProgress
29from aioaudiobookshelf.schema.library import (
30 LibraryItemExpanded,
31 LibraryItemExpandedBook,
32 LibraryItemExpandedPodcast,
33 LibraryItemMinifiedPodcast,
34)
35from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType
36from aioaudiobookshelf.schema.session import DeviceInfo as AbsDeviceInfo
37from aioaudiobookshelf.schema.shelf import (
38 SeriesShelf,
39 ShelfAuthors,
40 ShelfBook,
41 ShelfEpisode,
42 ShelfLibraryItemMinified,
43 ShelfPodcast,
44 ShelfSeries,
45)
46from aioaudiobookshelf.schema.shelf import ShelfId as AbsShelfId
47from aioaudiobookshelf.schema.shelf import ShelfType as AbsShelfType
48from aiohttp import web
49from music_assistant_models.config_entries import (
50 ConfigEntry,
51 ConfigValueType,
52 ProviderConfig,
53)
54from music_assistant_models.enums import (
55 ConfigEntryType,
56 ContentType,
57 MediaType,
58 ProviderFeature,
59 StreamType,
60)
61from music_assistant_models.errors import LoginFailed, MediaNotFoundError
62from music_assistant_models.media_items import (
63 Audiobook,
64 AudioFormat,
65 BrowseFolder,
66 ItemMapping,
67 MediaItemType,
68 PodcastEpisode,
69 UniqueList,
70)
71from music_assistant_models.media_items.media_item import RecommendationFolder
72from music_assistant_models.streamdetails import MultiPartPath, StreamDetails
73
74from music_assistant.constants import PLAYBACK_REPORT_INTERVAL_SECONDS
75from music_assistant.models.music_provider import MusicProvider
76from music_assistant.providers.audiobookshelf.parsers import (
77 parse_audiobook,
78 parse_podcast,
79 parse_podcast_episode,
80)
81
82from .constants import (
83 ABS_BROWSE_ITEMS_TO_PATH,
84 ABS_SHELF_ID_ICONS,
85 ABS_SHELF_ID_TRANSLATION_KEY,
86 AIOHTTP_TIMEOUT,
87 CACHE_CATEGORY_LIBRARIES,
88 CACHE_KEY_LIBRARIES,
89 CONF_API_TOKEN,
90 CONF_HIDE_EMPTY_PODCASTS,
91 CONF_HLS_FORMATS,
92 CONF_OLD_TOKEN,
93 CONF_PASSWORD,
94 CONF_URL,
95 CONF_USE_HLS,
96 CONF_USERNAME,
97 CONF_VERIFY_SSL,
98 HLS_ALL_FORMATS,
99 HLS_FORMATS_SPLIT,
100 AbsBrowseItemsBookTranslationKey,
101 AbsBrowseItemsPodcastTranslationKey,
102 AbsBrowsePaths,
103)
104from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard, SessionHelper
105
106if TYPE_CHECKING:
107 from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved
108 from aioaudiobookshelf.schema.media_progress import MediaProgress
109 from aioaudiobookshelf.schema.streams import Stream as AbsStream
110 from aioaudiobookshelf.schema.user import User
111 from music_assistant_models.media_items import Podcast
112 from music_assistant_models.provider import ProviderManifest
113
114 from music_assistant.mass import MusicAssistant
115 from music_assistant.models import ProviderInstanceType
116
117SUPPORTED_FEATURES = {
118 ProviderFeature.LIBRARY_PODCASTS,
119 ProviderFeature.LIBRARY_AUDIOBOOKS,
120 ProviderFeature.BROWSE,
121 ProviderFeature.RECOMMENDATIONS,
122}
123
124
125async def setup(
126 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
127) -> ProviderInstanceType:
128 """Initialize provider(instance) with given configuration."""
129 return Audiobookshelf(mass, manifest, config, SUPPORTED_FEATURES)
130
131
132async def get_config_entries(
133 mass: MusicAssistant,
134 instance_id: str | None = None,
135 action: str | None = None,
136 values: dict[str, ConfigValueType] | None = None,
137) -> tuple[ConfigEntry, ...]:
138 """
139 Return Config entries to setup this provider.
140
141 instance_id: id of an existing provider instance (None if new instance setup).
142 action: [optional] action key called from config entries UI.
143 values: the (intermediate) raw values for config entries sent with the action.
144 """
145 # ruff: noqa: ARG001
146 return (
147 ConfigEntry(
148 key="label",
149 type=ConfigEntryType.LABEL,
150 label="Please provide the address of your Audiobookshelf instance. To authenticate "
151 "you have two options: "
152 "a) Provide username AND password. Leave the API key empty. "
153 "b) Provide ONLY an API key.",
154 ),
155 ConfigEntry(
156 key=CONF_URL,
157 type=ConfigEntryType.STRING,
158 label="Server",
159 required=True,
160 description="The URL of the Audiobookshelf server to connect to. For example "
161 "https://abs.domain.tld/ or http://192.168.1.4:13378/",
162 ),
163 ConfigEntry(
164 key=CONF_USERNAME,
165 type=ConfigEntryType.STRING,
166 label="Username",
167 required=False,
168 description="The username to authenticate to the remote server.",
169 ),
170 ConfigEntry(
171 key=CONF_PASSWORD,
172 type=ConfigEntryType.SECURE_STRING,
173 label="Password",
174 required=False,
175 description="The password to authenticate to the remote server.",
176 ),
177 ConfigEntry(
178 key=CONF_API_TOKEN,
179 type=ConfigEntryType.SECURE_STRING,
180 label="API key _instead_ of user/ password. (ABS version >= 2.26)",
181 required=False,
182 description="Instead of using a username and password, "
183 "you may provide an API key (ABS version >= 2.26). "
184 "Please consult the docs.",
185 ),
186 ConfigEntry(
187 key=CONF_OLD_TOKEN,
188 type=ConfigEntryType.SECURE_STRING,
189 label="old token",
190 required=False,
191 hidden=True,
192 ),
193 ConfigEntry(
194 key=CONF_USE_HLS,
195 type=ConfigEntryType.BOOLEAN,
196 label="Stream via HLS from ABS.",
197 description="Use an HLS stream when streaming from audiobookshelf.",
198 required=False,
199 default_value=False,
200 advanced=True,
201 ),
202 ConfigEntry(
203 key=CONF_HLS_FORMATS,
204 type=ConfigEntryType.STRING,
205 label=f"Use HLS for these file extensions. Separate with ';'. Use {HLS_ALL_FORMATS} for"
206 " all formats.",
207 description="Use HLS only for these file extensions."
208 f" Separate with ;. E.g. m4b or m4b;aac or {HLS_ALL_FORMATS}",
209 required=False,
210 default_value="m4b",
211 advanced=True,
212 ),
213 ConfigEntry(
214 key=CONF_VERIFY_SSL,
215 type=ConfigEntryType.BOOLEAN,
216 label="Verify SSL",
217 required=False,
218 description="Whether or not to verify the certificate of SSL/TLS connections.",
219 advanced=True,
220 default_value=True,
221 ),
222 ConfigEntry(
223 key=CONF_HIDE_EMPTY_PODCASTS,
224 type=ConfigEntryType.BOOLEAN,
225 label="Hide empty podcasts.",
226 required=False,
227 description="This will skip podcasts with no episodes associated.",
228 advanced=True,
229 default_value=False,
230 ),
231 )
232
233
234R = TypeVar("R")
235P = ParamSpec("P")
236
237
238class Audiobookshelf(MusicProvider):
239 """Audiobookshelf MusicProvider."""
240
241 _on_unload_callbacks: list[Callable[[], None]]
242
243 @staticmethod
244 def handle_refresh_token(
245 method: Callable[P, Coroutine[Any, Any, R]],
246 ) -> Callable[P, Coroutine[Any, Any, R]]:
247 """Decorate a method to handle an expired refresh token by relogin."""
248
249 @functools.wraps(method)
250 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
251 self = cast("Audiobookshelf", args[0])
252 try:
253 return await method(*args, **kwargs)
254 except RefreshTokenExpiredError:
255 self.logger.debug("Refresh token expired. Trying to renew.")
256 await self.reauthenticate()
257 return await method(*args, **kwargs)
258
259 return wrapper
260
261 async def handle_async_init(self) -> None:
262 """Pass config values to client and initialize."""
263 self._on_unload_callbacks: list[Callable[[], None]] = []
264 self.sessions: dict[str, SessionHelper] = {} # key is the mass_item_id
265 self.create_session_lock = asyncio.Lock()
266 base_url = str(self.config.get_value(CONF_URL))
267 username = str(self.config.get_value(CONF_USERNAME))
268 password = str(self.config.get_value(CONF_PASSWORD))
269 token_old = self.config.get_value(CONF_OLD_TOKEN)
270 token_api = self.config.get_value(CONF_API_TOKEN)
271 verify_ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
272 session_config = aioabs.SessionConfiguration(
273 session=self.mass.http_session,
274 url=base_url,
275 verify_ssl=verify_ssl,
276 logger=self.logger,
277 pagination_items_per_page=30, # audible provider goes with 50 for pagination
278 timeout=AIOHTTP_TIMEOUT,
279 )
280 # If we are configured with a non-expiring API key or not.
281 self.is_token_user = False
282 try:
283 if token_api is not None or token_old is not None:
284 _token = token_api if token_api is not None else token_old
285 session_config.token = str(_token)
286 (
287 self._client,
288 self._client_socket,
289 ) = await aioabs.get_user_and_socket_client_by_token(session_config=session_config)
290 self.is_token_user = True
291 else:
292 self._client, self._client_socket = await aioabs.get_user_and_socket_client(
293 session_config=session_config, username=username, password=password
294 )
295 await self._client_socket.init_client()
296 except AbsLoginError as exc:
297 raise LoginFailed(f"Login to abs instance at {base_url} failed.") from exc
298
299 if token_old is not None and token_api is None:
300 # Log Message that the old token won't work
301 _version = self._client.server_settings.version.split(".")
302 if len(_version) >= 2:
303 try:
304 major, minor = int(_version[0]), int(_version[1])
305 except ValueError:
306 major = minor = 0
307 if major >= 2 and minor >= 26:
308 self.logger.warning(
309 """
310
311######## Audiobookshelf API key change #############################################################
312
313Audiobookshelf introduced a new API key system in version 2.26 (JWT).
314You are still using a token configured with a previous version of Audiobookshelf,
315but you are running version %s. This will stop working in a future Audiobookshelf release.
316Please create a non-expiring API Key instead, and update your configuration accordingly.
317Refer to the documentation of Audiobookshelf, https://www.audiobookshelf.org/guides/api-keys/
318and of Music Assistant https://www.music-assistant.io/music-providers/audiobookshelf/
319for more details.
320
321""",
322 self._client.server_settings.version,
323 )
324
325 cached_libraries = await self.mass.cache.get(
326 key=CACHE_KEY_LIBRARIES,
327 provider=self.instance_id,
328 category=CACHE_CATEGORY_LIBRARIES,
329 default=None,
330 )
331 if cached_libraries is None:
332 self.libraries = LibrariesHelper()
333 # We need the library ids for recommendations. If the cache got cleared e.g. by a db
334 # migration, we might end up with empty library helpers on a configured provider. Note,
335 # that the lib item ids are not synced, still only on full provider sync, instead the
336 # sets are empty. Full sync is expensive.
337 # See warning in browse_lib_podcasts / _browse_books
338 libraries = await self._client.get_all_libraries()
339 for library in libraries:
340 if library.media_type == AbsLibraryMediaType.BOOK:
341 self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
342 elif library.media_type == AbsLibraryMediaType.PODCAST:
343 self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
344 else:
345 self.libraries = LibrariesHelper.from_dict(cached_libraries)
346
347 # set socket callbacks
348 self._client_socket.set_item_callbacks(
349 on_item_added=self._socket_abs_item_changed,
350 on_item_updated=self._socket_abs_item_changed,
351 on_item_removed=self._socket_abs_item_removed,
352 on_items_added=self._socket_abs_item_changed,
353 on_items_updated=self._socket_abs_item_changed,
354 )
355
356 self._client_socket.set_user_callbacks(
357 on_user_item_progress_updated=self._socket_abs_user_item_progress_updated,
358 )
359
360 self._client_socket.set_refresh_token_expired_callback(
361 on_refresh_token_expired=self._socket_abs_refresh_token_expired
362 )
363
364 self._client_socket.set_stream_callbacks(on_stream_open=self._socket_stream_open)
365
366 # progress guard
367 self.progress_guard = ProgressGuard()
368
369 # safe guard reauthentication
370 self.reauthenticate_lock = asyncio.Lock()
371 self.reauthenticate_last = 0.0
372
373 # register dynamic stream route for audiobook parts
374 self._on_unload_callbacks.append(
375 self.mass.streams.register_dynamic_route(
376 f"/{self.instance_id}_part_stream", self._handle_session_part_request
377 )
378 )
379
380 @handle_refresh_token
381 async def unload(self, is_removed: bool = False) -> None:
382 """
383 Handle unload/close of the provider.
384
385 Called when provider is deregistered (e.g. MA exiting or config reloading).
386 is_removed will be set to True when the provider is removed from the configuration.
387 """
388 await self._client.logout()
389 await self._client_socket.logout()
390 for callback in self._on_unload_callbacks:
391 callback()
392
393 @property
394 def is_streaming_provider(self) -> bool:
395 """Return True if the provider is a streaming provider."""
396 # For streaming providers return True here but for local file based providers return False.
397 return False
398
399 @handle_refresh_token
400 async def sync_library(self, media_type: MediaType) -> None:
401 """Obtain audiobook library ids and podcast library ids."""
402 libraries = await self._client.get_all_libraries()
403 if len(libraries) == 0:
404 self._log_no_libraries()
405 for library in libraries:
406 if library.media_type == AbsLibraryMediaType.BOOK and media_type == MediaType.AUDIOBOOK:
407 self.libraries.audiobooks[library.id_] = LibraryHelper(name=library.name)
408 elif (
409 library.media_type == AbsLibraryMediaType.PODCAST
410 and media_type == MediaType.PODCAST
411 ):
412 self.libraries.podcasts[library.id_] = LibraryHelper(name=library.name)
413 await super().sync_library(media_type)
414 await self._cache_set_helper_libraries()
415
416 # update playlog
417 user = await self._client.get_my_user()
418 await self._set_playlog_from_user(user)
419
420 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
421 """Retrieve library/subscribed podcasts from the provider.
422
423 Minified podcast information is enough.
424 """
425 for pod_lib_id in self.libraries.podcasts:
426 async for response in self._client.get_library_items(library_id=pod_lib_id):
427 if not response.results:
428 break
429 podcast_ids = [x.id_ for x in response.results]
430 # store uuids
431 self.libraries.podcasts[pod_lib_id].item_ids.update(podcast_ids)
432 for podcast_minified in response.results:
433 assert isinstance(podcast_minified, LibraryItemMinifiedPodcast)
434 mass_podcast = parse_podcast(
435 abs_podcast=podcast_minified,
436 instance_id=self.instance_id,
437 domain=self.domain,
438 token=self._client.token,
439 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
440 )
441 if (
442 bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
443 and mass_podcast.total_episodes == 0
444 ):
445 continue
446 yield mass_podcast
447
448 @handle_refresh_token
449 async def _get_abs_expanded_podcast(
450 self, prov_podcast_id: str
451 ) -> AbsLibraryItemExpandedPodcast:
452 abs_podcast = await self._client.get_library_item_podcast(
453 podcast_id=prov_podcast_id, expanded=True
454 )
455 assert isinstance(abs_podcast, AbsLibraryItemExpandedPodcast)
456
457 return abs_podcast
458
459 @handle_refresh_token
460 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
461 """Get single podcast."""
462 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
463 return parse_podcast(
464 abs_podcast=abs_podcast,
465 instance_id=self.instance_id,
466 domain=self.domain,
467 token=self._client.token,
468 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
469 )
470
471 async def get_podcast_episodes(
472 self, prov_podcast_id: str
473 ) -> AsyncGenerator[PodcastEpisode, None]:
474 """Get all podcast episodes of podcast.
475
476 Adds progress information.
477 """
478 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
479 episode_cnt = 1
480 # the user has the progress of all media items
481 # so we use a single api call here to obtain possibly many
482 # progresses for episodes
483 user = await self._client.get_my_user()
484 abs_progresses = {
485 x.episode_id: x
486 for x in user.media_progress
487 if x.episode_id is not None and x.library_item_id == prov_podcast_id
488 }
489 for abs_episode in abs_podcast.media.episodes:
490 progress = abs_progresses.get(abs_episode.id_, None)
491 mass_episode = parse_podcast_episode(
492 episode=abs_episode,
493 prov_podcast_id=prov_podcast_id,
494 fallback_episode_cnt=episode_cnt,
495 instance_id=self.instance_id,
496 domain=self.domain,
497 token=self._client.token,
498 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
499 media_progress=progress,
500 )
501 yield mass_episode
502 episode_cnt += 1
503
504 @handle_refresh_token
505 async def get_podcast_episode(
506 self, prov_episode_id: str, add_progress: bool = True
507 ) -> PodcastEpisode:
508 """Get single podcast episode."""
509 prov_podcast_id, e_id = prov_episode_id.split(" ")
510 abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
511 episode_cnt = 1
512 for abs_episode in abs_podcast.media.episodes:
513 if abs_episode.id_ == e_id:
514 progress = None
515 if add_progress:
516 progress = await self._client.get_my_media_progress(
517 item_id=prov_podcast_id, episode_id=abs_episode.id_
518 )
519 return parse_podcast_episode(
520 episode=abs_episode,
521 prov_podcast_id=prov_podcast_id,
522 fallback_episode_cnt=episode_cnt,
523 instance_id=self.instance_id,
524 domain=self.domain,
525 token=self._client.token,
526 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
527 media_progress=progress,
528 )
529
530 episode_cnt += 1
531 raise MediaNotFoundError("Episode not found")
532
533 async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
534 """Get Audiobook libraries.
535
536 Need expanded version for chapters.
537 """
538 for book_lib_id in self.libraries.audiobooks:
539 async for response in self._client.get_library_items(library_id=book_lib_id):
540 if not response.results:
541 break
542 book_ids = [x.id_ for x in response.results]
543 # store uuids
544 self.libraries.audiobooks[book_lib_id].item_ids.update(book_ids)
545 # use expanded version for chapters/ caching.
546 books_expanded = await self._client.get_library_item_batch_book(item_ids=book_ids)
547 for book_expanded in books_expanded:
548 # If the book has no audiofiles, we skip -> ebook only.
549 if len(book_expanded.media.tracks) == 0:
550 continue
551 mass_audiobook = parse_audiobook(
552 abs_audiobook=book_expanded,
553 instance_id=self.instance_id,
554 domain=self.domain,
555 token=self._client.token,
556 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
557 )
558 yield mass_audiobook
559
560 @handle_refresh_token
561 async def _get_abs_expanded_audiobook(
562 self, prov_audiobook_id: str
563 ) -> AbsLibraryItemExpandedBook:
564 abs_audiobook = await self._client.get_library_item_book(
565 book_id=prov_audiobook_id, expanded=True
566 )
567 assert isinstance(abs_audiobook, AbsLibraryItemExpandedBook)
568
569 return abs_audiobook
570
571 @handle_refresh_token
572 async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
573 """Get a single audiobook.
574
575 Progress is added here.
576 """
577 progress = await self._client.get_my_media_progress(item_id=prov_audiobook_id)
578 abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=prov_audiobook_id)
579 return parse_audiobook(
580 abs_audiobook=abs_audiobook,
581 instance_id=self.instance_id,
582 domain=self.domain,
583 token=self._client.token,
584 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
585 media_progress=progress,
586 )
587
588 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
589 """Get stream of item."""
590 # We always create a playback session. The default is direct playback.
591 # In that case, session.tracks holds the exact same as the audiobook/ podcast.track,
592 # so we only use the session to update our progress.
593 #
594 # In the case of hls the session has an hls stream as track.
595 if media_type in (MediaType.PODCAST_EPISODE, MediaType.AUDIOBOOK):
596 session = await self._get_playback_session(mass_item_id=item_id)
597 return await self._get_stream_details_session(
598 session, session_helper=self.sessions[item_id], media_type=media_type
599 )
600 raise MediaNotFoundError("Stream unknown")
601
602 async def _get_stream_details_session(
603 self,
604 abs_session: AbsPlaybackSessionExpanded,
605 session_helper: SessionHelper,
606 media_type: MediaType,
607 ) -> StreamDetails:
608 """Streamdetails audiobook.
609
610 We always use a custom stream type, also for single file, such
611 that we can handle an ffmpeg error and refresh our tokens.
612 """
613 abs_base_url = str(self.config.get_value(CONF_URL))
614 tracks = abs_session.audio_tracks
615
616 if len(tracks) == 0:
617 raise MediaNotFoundError("Session has no tracks.")
618
619 content_type = ContentType.UNKNOWN
620 if abs_session.audio_tracks[0].metadata is not None:
621 content_type = ContentType.try_parse(abs_session.audio_tracks[0].metadata.ext)
622
623 file_parts: list[MultiPartPath] = []
624 if self.is_token_user:
625 self.logger.debug("Token User - Streams are direct.")
626 for idx, track in enumerate(tracks):
627 if self.is_token_user:
628 # an api key is long-lived
629 stream_url = f"{abs_base_url}{track.content_url}?token={self._client.token}"
630 else:
631 # to ensure token is always valid, we create a dynamic url
632 # this ensures that we always get a fresh token on each part
633 # without having to deal with a custom stream etc.
634 # we also use this for a single track/ hls stream, otherwise we can't seek
635 stream_url = (
636 f"{self.mass.streams.base_url}/{self.instance_id}_part_stream?"
637 f"session_id={abs_session.id_}&part_id={idx}"
638 )
639 file_parts.append(MultiPartPath(path=stream_url, duration=track.duration))
640
641 stream_type = StreamType.HLS if "hls" in file_parts[0].path else StreamType.HTTP
642 if stream_type == StreamType.HLS:
643 # wait for stream to be ready
644 try:
645 await asyncio.wait_for(session_helper.hls_stream_open.wait(), 10)
646 except TimeoutError:
647 self.logger.warning(
648 "Did not receive HLS stream open event after 10s, continuing anyways."
649 )
650
651 return StreamDetails(
652 provider=self.instance_id,
653 item_id=abs_session.id_,
654 audio_format=AudioFormat(content_type=content_type),
655 media_type=media_type,
656 stream_type=stream_type,
657 duration=int(abs_session.duration),
658 path=file_parts[0].path if len(file_parts) == 1 else file_parts,
659 can_seek=True,
660 allow_seek=True,
661 )
662
663 async def _get_playback_session(self, mass_item_id: str) -> AbsPlaybackSessionExpanded:
664 """Either creates or returns an open abs session."""
665 async with self.create_session_lock:
666 # check for an available open session
667 if session_helper := self.sessions.get(mass_item_id):
668 # reset here, as this is our "time listened".
669 session_helper.last_sync_time = time.time()
670 with suppress(AbsSessionNotFoundError):
671 return await self._client.get_open_session(
672 session_id=session_helper.abs_session_id
673 )
674
675 item_ids = mass_item_id.split(" ")
676 abs_item_id = item_ids[0]
677 episode_id = item_ids[1] if len(item_ids) == 2 else None
678
679 # Create a new session
680 ## Check HLS usage
681 use_hls = bool(self.config.get_value(CONF_USE_HLS))
682 hls_formats = str(self.config.get_value(CONF_HLS_FORMATS))
683 if use_hls and hls_formats != HLS_ALL_FORMATS:
684 use_hls = False # only for certain formats
685 extensions = [x.lstrip(".") for x in hls_formats.split(HLS_FORMATS_SPLIT)]
686 if episode_id is None:
687 if (
688 metadata := (await self._get_abs_expanded_audiobook(abs_item_id))
689 .media.tracks[0]
690 .metadata
691 ):
692 if metadata.ext.lstrip(".") in extensions:
693 use_hls = True
694 else:
695 podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_item_id)
696 episode = None
697 for episode in podcast.media.episodes:
698 if episode.id_ == episode_id:
699 break
700 if episode and (metadata := episode.audio_track.metadata):
701 if metadata.ext.lstrip(".") in extensions:
702 use_hls = True
703
704 client_name = f"Music Assistant {self.instance_id}"
705 device_info = AbsDeviceInfo(
706 device_id=self.instance_id,
707 client_name=client_name,
708 client_version=self.mass.version,
709 manufacturer="",
710 model=self.mass.server_id,
711 )
712
713 session = await self._client.get_playback_session(
714 # These parameters give an hls if we don't enforce direct play stream,
715 # which is only a concat of the individual file's at abs
716 session_parameters=AbsPlaybackSessionParameters(
717 device_info=device_info,
718 force_direct_play=not use_hls,
719 force_transcode=use_hls,
720 # mimetypes are only checked for abs' internal "should transcode
721 # see https://github.com/advplyr/audiobookshelf/blob/master/server/managers/PlaybackSessionManager.js
722 supported_mime_types=[],
723 media_player=client_name,
724 ),
725 item_id=abs_item_id,
726 episode_id=episode_id,
727 )
728
729 if use_hls:
730 # Safety check.
731 track_url = session.audio_tracks[0].content_url
732 if track_url.split("/")[1] != "hls":
733 raise MediaNotFoundError("Did expect HLS stream for session playback")
734 self.logger.debug("Using an HLS stream for playback.")
735
736 self.sessions[mass_item_id] = SessionHelper(
737 abs_session_id=session.id_,
738 last_sync_time=time.time(),
739 hls_stream_open=asyncio.Event(),
740 )
741 return session
742
743 @handle_refresh_token
744 async def _handle_session_part_request(self, request: web.Request) -> web.Response:
745 """
746 Handle dynamic audiobook part stream request.
747
748 We redirect to the actual stream url with token.
749 This is done because the token might expire, so we need to
750 generate a fresh url on each part.
751 """
752 if not (session_id := request.query.get("session_id")):
753 return web.Response(status=400, text="Missing session_id")
754 if not (part_id := request.query.get("part_id")):
755 return web.Response(status=400, text="Missing part_id")
756 self.logger.debug(
757 "Handling session part request for session %s and part %s", session_id, part_id
758 )
759 try:
760 abs_session = await self._client.get_open_session(session_id=session_id)
761 except AbsSessionNotFoundError as err:
762 raise web.HTTPNotFound from err
763 part_id = int(part_id) # type: ignore[assignment]
764 try:
765 part_track = abs_session.audio_tracks[part_id]
766 except IndexError:
767 return web.Response(status=404, text="Part not found")
768
769 base_url = str(self.config.get_value(CONF_URL))
770 stream_url = f"{base_url}{part_track.content_url}?token={self._client.token}"
771 # redirect to the actual stream url
772 raise web.HTTPFound(location=stream_url)
773
774 @handle_refresh_token
775 async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
776 """Return finished:bool, position_ms: int."""
777 # this method is called _before_ get_stream_details, so the playback session
778 # is created here.
779 session = await self._get_playback_session(mass_item_id=item_id)
780 finished = session.current_time > session.duration - PLAYBACK_REPORT_INTERVAL_SECONDS
781 self.logger.debug("Resume position: obtained.")
782 return finished, int(session.current_time * 1000)
783
784 @handle_refresh_token
785 async def recommendations(self) -> list[RecommendationFolder]:
786 """Get recommendations."""
787 # We have to avoid "flooding" the home page, which becomes especially troublesome if users
788 # have multiple libraries. Instead we collect per ShelfId, and make sure, that we always get
789 # roughly the same amount of items per row, no matter the amount of libraries
790 # List of list (one list per lib) here, such that we can pick the items per lib later.
791 items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]] = {}
792
793 all_libraries = {**self.libraries.audiobooks, **self.libraries.podcasts}
794 max_items_per_row = 20
795 num_libraries = len(all_libraries)
796
797 if num_libraries == 0:
798 self._log_no_libraries()
799 return []
800
801 limit_items_per_lib = max_items_per_row // num_libraries
802 limit_items_per_lib = 1 if limit_items_per_lib == 0 else limit_items_per_lib
803
804 for library_id in all_libraries:
805 shelves = await self._client.get_library_personalized_view(
806 library_id=library_id, limit=limit_items_per_lib
807 )
808 await self._recommendations_iter_shelves(shelves, library_id, items_by_shelf_id)
809
810 folders: list[RecommendationFolder] = []
811 for shelf_id, item_lists in items_by_shelf_id.items():
812 # we have something like [[A, B], [C, D, E], [F]]
813 # and want [A, C, F, B, D, E]
814 recommendation_items = [
815 x
816 for x in itertools.chain.from_iterable(itertools.zip_longest(*item_lists))
817 if x is not None
818 ][:max_items_per_row]
819
820 # shelf ids follow pattern:
821 # recently-added
822 # newest-episodes
823 # etc
824 name = f"{shelf_id.capitalize().replace('-', ' ')}"
825 folders.append(
826 RecommendationFolder(
827 item_id=f"{shelf_id}",
828 name=name,
829 icon=ABS_SHELF_ID_ICONS.get(shelf_id),
830 translation_key=ABS_SHELF_ID_TRANSLATION_KEY.get(shelf_id),
831 items=UniqueList(recommendation_items),
832 provider=self.instance_id,
833 )
834 )
835
836 # Browse "recommendation" for convenience. If the user has
837 # multiple audiobook libraries, we return a listing of them.
838 # If there is only a single audiobook library, we add the folders
839 # from _browse_lib_audiobooks, i.e. Authors, Narrators etc.
840 # Podcast libs do not have filter folders, so always the root folders.
841 browse_items: list[MediaItemType | BrowseFolder] = []
842 translation_key = "libraries"
843 if len(self.libraries.audiobooks) <= 1:
844 if len(self.libraries.podcasts) == 0:
845 translation_key = "library"
846
847 # audiobooklibs are first, and we have at max 1 audiobook lib
848 _browse_root = self._browse_root(append_mediatype_suffix=False)
849 if len(self.libraries.audiobooks) == 0:
850 browse_items.extend(_browse_root)
851 else:
852 assert isinstance(_browse_root[0], BrowseFolder)
853 _path = _browse_root[0].path
854 browse_items.extend(self._browse_lib_audiobooks(current_path=_path))
855 # add podcast roots
856 browse_items.extend(_browse_root[1:])
857 else:
858 browse_items = list(self._browse_root())
859
860 folders.append(
861 RecommendationFolder(
862 item_id="browse",
863 name="Libraries",
864 icon="mdi-bookshelf",
865 translation_key=translation_key,
866 items=UniqueList(browse_items),
867 provider=self.instance_id,
868 )
869 )
870
871 return folders
872
873 async def _recommendations_iter_shelves(
874 self,
875 shelves: list[ShelfBook | ShelfPodcast | ShelfAuthors | ShelfEpisode | ShelfSeries],
876 library_id: str,
877 items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType | BrowseFolder]]],
878 ) -> None:
879 for shelf in shelves:
880 media_type: MediaType
881 match shelf.type_:
882 case AbsShelfType.PODCAST:
883 media_type = MediaType.PODCAST
884 case AbsShelfType.EPISODE:
885 media_type = MediaType.PODCAST_EPISODE
886 case AbsShelfType.BOOK:
887 media_type = MediaType.AUDIOBOOK
888 case AbsShelfType.SERIES | AbsShelfType.AUTHORS:
889 media_type = MediaType.FOLDER
890 case _:
891 # this would be authors, currently
892 continue
893
894 items: list[MediaItemType | BrowseFolder] = []
895 # Recently added is the _only_ case, where we get a full podcast
896 # We have a podcast object with only the episodes matching the
897 # shelf.id_ otherwise.
898 match shelf.id_:
899 case (
900 AbsShelfId.RECENTLY_ADDED
901 | AbsShelfId.LISTEN_AGAIN
902 | AbsShelfId.DISCOVER
903 | AbsShelfId.NEWEST_EPISODES
904 | AbsShelfId.CONTINUE_LISTENING
905 ):
906 for entity in shelf.entities:
907 assert isinstance(entity, ShelfLibraryItemMinified)
908 item: MediaItemType | None = None
909 if media_type in [MediaType.PODCAST, MediaType.AUDIOBOOK]:
910 item = await self.mass.music.get_library_item_by_prov_id(
911 media_type=media_type,
912 provider_instance_id_or_domain=self.instance_id,
913 item_id=entity.id_,
914 )
915 elif media_type == MediaType.PODCAST_EPISODE:
916 podcast_id = entity.id_
917 if entity.recent_episode is None:
918 continue
919 # we only have a PodcastEpisode here, with limited information
920 item = parse_podcast_episode(
921 episode=entity.recent_episode,
922 prov_podcast_id=podcast_id,
923 instance_id=self.instance_id,
924 domain=self.domain,
925 token=self._client.token,
926 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
927 )
928 if item is not None:
929 items.append(item)
930 case AbsShelfId.RECENT_SERIES | AbsShelfId.CONTINUE_SERIES:
931 # We jump into a browse folder here if we have SeriesShelf, set path up as if
932 # browse function used.
933 if isinstance(shelf, ShelfSeries):
934 for entity in shelf.entities:
935 assert isinstance(entity, SeriesShelf)
936 if len(entity.books) == 0:
937 continue
938 path = (
939 f"{self.instance_id}://"
940 f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
941 f"{AbsBrowsePaths.SERIES}/{entity.id_}"
942 )
943 items.append(
944 BrowseFolder(
945 item_id=entity.id_,
946 name=entity.name,
947 provider=self.instance_id,
948 path=path,
949 )
950 )
951 elif isinstance(shelf, ShelfBook) and media_type == MediaType.AUDIOBOOK:
952 # Single books, must be audiobooks
953 for entity in shelf.entities:
954 item = await self.mass.music.get_library_item_by_prov_id(
955 media_type=media_type,
956 provider_instance_id_or_domain=self.instance_id,
957 item_id=entity.id_,
958 )
959 if item is not None:
960 items.append(item)
961 case AbsShelfId.NEWEST_AUTHORS:
962 # same as for series, use a folder
963 for entity in shelf.entities:
964 assert isinstance(entity, AuthorExpanded)
965 if entity.num_books == 0:
966 continue
967 path = (
968 f"{self.instance_id}://"
969 f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
970 f"{AbsBrowsePaths.AUTHORS}/{entity.id_}"
971 )
972 items.append(
973 BrowseFolder(
974 item_id=entity.id_,
975 name=entity.name,
976 provider=self.instance_id,
977 path=path,
978 )
979 )
980 if not items:
981 continue
982
983 # add collected items
984 assert isinstance(shelf.id_, AbsShelfId)
985 items_collected = items_by_shelf_id.get(shelf.id_, [])
986 items_collected.append(items)
987 items_by_shelf_id[shelf.id_] = items_collected
988
989 @handle_refresh_token
990 async def on_played(
991 self,
992 media_type: MediaType,
993 prov_item_id: str,
994 fully_played: bool,
995 position: int,
996 media_item: MediaItemType,
997 is_playing: bool = False,
998 ) -> None:
999 """Update progress in Audiobookshelf.
1000
1001 In our case media_type may have 3 values:
1002 - PODCAST
1003 - PODCAST_EPISODE
1004 - AUDIOBOOK
1005 We ignore PODCAST (function is called on adding a podcast with position=None)
1006
1007 """
1008
1009 async def _update_by_session(session_helper: SessionHelper, duration: int) -> bool:
1010 now = time.time()
1011 time_listened = now - session_helper.last_sync_time
1012 if time_listened > PLAYBACK_REPORT_INTERVAL_SECONDS + 3:
1013 # See player_queues controller, we get an update every 30s, and immediately on pause
1014 # or play.
1015 # We reset above 33, as this indicates a trigger after a longer absence and should
1016 # not count into abs' statistics
1017 self.logger.debug("Resetting time_listened due to longer absence.")
1018 time_listened = 0.0
1019 try:
1020 await self._client.sync_open_session(
1021 session_id=session_helper.abs_session_id,
1022 parameters=SyncOpenSessionParameters(
1023 current_time=position,
1024 time_listened=time_listened,
1025 duration=duration,
1026 ),
1027 )
1028 session_helper.last_sync_time = now
1029 self.logger.debug("Synced playback session, position %s s.", position)
1030 return True
1031 except AbsSessionNotFoundError:
1032 self.logger.error("Was unable to sync session.")
1033 return False
1034
1035 if media_type == MediaType.PODCAST_EPISODE:
1036 abs_podcast_id, abs_episode_id = prov_item_id.split(" ")
1037
1038 # guard, see progress guard class docstrings for explanation
1039 if not self.progress_guard.guard_ok_mass(
1040 item_id=abs_podcast_id, episode_id=abs_episode_id
1041 ):
1042 return
1043 self.progress_guard.add_progress(item_id=abs_podcast_id, episode_id=abs_episode_id)
1044
1045 if media_item is None or not isinstance(media_item, PodcastEpisode):
1046 return
1047
1048 if fully_played and position < media_item.duration - PLAYBACK_REPORT_INTERVAL_SECONDS:
1049 # faulty position update
1050 # occurs sometimes, if a player disconnects unexpectedly, or reports
1051 # a false position - seen this for MC players, but not for sendspin
1052 return
1053
1054 if position == 0 and not fully_played:
1055 # marked unplayed
1056 mp = await self._client.get_my_media_progress(
1057 item_id=abs_podcast_id, episode_id=abs_episode_id
1058 )
1059 if mp is not None:
1060 await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1061 self.logger.debug(f"Removed media progress of {media_type.value}.")
1062 return
1063
1064 duration = media_item.duration
1065 updated = False
1066 if session_helper := self.sessions.get(prov_item_id):
1067 updated = await _update_by_session(session_helper=session_helper, duration=duration)
1068 if not updated:
1069 self.logger.debug(
1070 f"Updating media progress of {media_type.value}, title {media_item.name}."
1071 )
1072 await self._client.update_my_media_progress(
1073 item_id=abs_podcast_id,
1074 episode_id=abs_episode_id,
1075 duration_seconds=duration,
1076 progress_seconds=position,
1077 is_finished=fully_played,
1078 )
1079
1080 if media_type == MediaType.AUDIOBOOK:
1081 # guard, see progress guard class docstrings for explanation
1082 if not self.progress_guard.guard_ok_mass(item_id=prov_item_id):
1083 return
1084 self.progress_guard.add_progress(item_id=prov_item_id)
1085
1086 if media_item is None or not isinstance(media_item, Audiobook):
1087 return
1088
1089 if fully_played and position < media_item.duration - PLAYBACK_REPORT_INTERVAL_SECONDS:
1090 # faulty position update, see above
1091 return
1092
1093 if position == 0 and not fully_played:
1094 # marked unplayed
1095 mp = await self._client.get_my_media_progress(item_id=prov_item_id)
1096 if mp is not None:
1097 await self._client.remove_my_media_progress(media_progress_id=mp.id_)
1098 self.logger.debug(f"Removed media progress of {media_type.value}.")
1099 return
1100
1101 duration = media_item.duration
1102 updated = False
1103 if session_helper := self.sessions.get(prov_item_id):
1104 updated = await _update_by_session(session_helper=session_helper, duration=duration)
1105 if not updated:
1106 self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress")
1107 await self._client.update_my_media_progress(
1108 item_id=prov_item_id,
1109 duration_seconds=duration,
1110 progress_seconds=position,
1111 is_finished=fully_played,
1112 )
1113
1114 @handle_refresh_token
1115 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
1116 """Browse for audiobookshelf.
1117
1118 Generates this view:
1119 Library_Name_A (Audiobooks)
1120 Audiobooks
1121 Audiobook_1
1122 Audiobook_2
1123 Series
1124 Series_1
1125 Audiobook_1
1126 Audiobook_2
1127 Series_2
1128 Audiobook_3
1129 Audiobook_4
1130 Collections
1131 Collection_1
1132 Audiobook_1
1133 Audiobook_2
1134 Collection_2
1135 Audiobook_3
1136 Audiobook_4
1137 Authors
1138 Author_1
1139 Series_1
1140 Audiobook_1
1141 Audiobook_2
1142 Author_2
1143 Audiobook_3
1144 Library_Name_B (Podcasts)
1145 Podcast_1
1146 Podcast_2
1147 """
1148 item_path = path.split("://", 1)[1]
1149 if not item_path:
1150 return self._browse_root()
1151 sub_path = item_path.split("/")
1152 lib_key, lib_id = sub_path[0].split(" ")
1153 if len(sub_path) == 1:
1154 if lib_key == AbsBrowsePaths.LIBRARIES_PODCAST:
1155 return await self._browse_lib_podcasts(library_id=lib_id)
1156 return self._browse_lib_audiobooks(current_path=path)
1157 if len(sub_path) == 2:
1158 item_key = sub_path[1]
1159 match item_key:
1160 case AbsBrowsePaths.AUTHORS:
1161 return await self._browse_authors(current_path=path, library_id=lib_id)
1162 case AbsBrowsePaths.NARRATORS:
1163 return await self._browse_narrators(current_path=path, library_id=lib_id)
1164 case AbsBrowsePaths.SERIES:
1165 return await self._browse_series(current_path=path, library_id=lib_id)
1166 case AbsBrowsePaths.COLLECTIONS:
1167 return await self._browse_collections(current_path=path, library_id=lib_id)
1168 case AbsBrowsePaths.AUDIOBOOKS:
1169 return await self._browse_books(library_id=lib_id)
1170 elif len(sub_path) == 3:
1171 item_key, item_id = sub_path[1:3]
1172 match item_key:
1173 case AbsBrowsePaths.AUTHORS:
1174 return await self._browse_author_books(current_path=path, author_id=item_id)
1175 case AbsBrowsePaths.NARRATORS:
1176 return await self._browse_narrator_books(
1177 library_id=lib_id, narrator_filter_str=item_id
1178 )
1179 case AbsBrowsePaths.SERIES:
1180 return await self._browse_series_books(series_id=item_id)
1181 case AbsBrowsePaths.COLLECTIONS:
1182 return await self._browse_collection_books(collection_id=item_id)
1183 elif len(sub_path) == 4:
1184 # series within author
1185 series_id = sub_path[3]
1186 return await self._browse_series_books(series_id=series_id)
1187 return []
1188
1189 def _browse_root(self, append_mediatype_suffix: bool = True) -> Sequence[BrowseFolder]:
1190 items = []
1191
1192 def _get_folder(
1193 path: str, lib_id: str, lib_name: str, translation_key: str | None = None
1194 ) -> BrowseFolder:
1195 return BrowseFolder(
1196 item_id=lib_id,
1197 name=lib_name,
1198 translation_key=translation_key, # if given, <name>: <translation> in frontend
1199 provider=self.instance_id,
1200 path=f"{self.instance_id}://{path}",
1201 )
1202
1203 if len(self.libraries.audiobooks) == 0 and len(self.libraries.podcasts) == 0:
1204 self._log_no_libraries()
1205 return []
1206
1207 translation_key: str | None
1208 for lib_id, lib in self.libraries.audiobooks.items():
1209 path = f"{AbsBrowsePaths.LIBRARIES_BOOK} {lib_id}"
1210 translation_key = None
1211 if append_mediatype_suffix:
1212 translation_key = AbsBrowseItemsBookTranslationKey.AUDIOBOOKS
1213 items.append(
1214 _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1215 )
1216 for lib_id, lib in self.libraries.podcasts.items():
1217 path = f"{AbsBrowsePaths.LIBRARIES_PODCAST} {lib_id}"
1218 translation_key = None
1219 if append_mediatype_suffix:
1220 translation_key = AbsBrowseItemsPodcastTranslationKey.PODCASTS
1221 items.append(
1222 _get_folder(path, lib_id, lib_name=lib.name, translation_key=translation_key)
1223 )
1224 return items
1225
1226 async def _browse_lib_podcasts(self, library_id: str) -> list[MediaItemType]:
1227 """No sub categories for podcasts."""
1228 if len(self.libraries.podcasts[library_id].item_ids) == 0:
1229 self._log_no_helper_item_ids()
1230 items = []
1231 for podcast_id in self.libraries.podcasts[library_id].item_ids:
1232 mass_item = await self.mass.music.get_library_item_by_prov_id(
1233 media_type=MediaType.PODCAST,
1234 item_id=podcast_id,
1235 provider_instance_id_or_domain=self.instance_id,
1236 )
1237 if mass_item is not None:
1238 items.append(mass_item)
1239 return sorted(items, key=lambda x: x.name)
1240
1241 def _browse_lib_audiobooks(self, current_path: str) -> Sequence[BrowseFolder]:
1242 items = []
1243 for translation_key in AbsBrowseItemsBookTranslationKey:
1244 path = current_path + "/" + ABS_BROWSE_ITEMS_TO_PATH[translation_key]
1245 items.append(
1246 BrowseFolder(
1247 item_id=translation_key.lower(),
1248 name="", # use translation key
1249 translation_key=translation_key,
1250 provider=self.instance_id,
1251 path=path,
1252 )
1253 )
1254 return items
1255
1256 async def _browse_authors(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1257 abs_authors = await self._client.get_library_authors(library_id=library_id)
1258 items = []
1259 for author in abs_authors:
1260 path = f"{current_path}/{author.id_}"
1261 items.append(
1262 BrowseFolder(
1263 item_id=author.id_,
1264 name=author.name,
1265 provider=self.instance_id,
1266 path=path,
1267 )
1268 )
1269
1270 return sorted(items, key=lambda x: x.name)
1271
1272 async def _browse_narrators(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1273 abs_narrators = await self._client.get_library_narrators(library_id=library_id)
1274 items = []
1275 for narrator in abs_narrators:
1276 path = f"{current_path}/{narrator.id_}"
1277 items.append(
1278 BrowseFolder(
1279 item_id=narrator.id_,
1280 name=narrator.name,
1281 provider=self.instance_id,
1282 path=path,
1283 )
1284 )
1285
1286 return sorted(items, key=lambda x: x.name)
1287
1288 async def _browse_series(self, current_path: str, library_id: str) -> Sequence[BrowseFolder]:
1289 items = []
1290 async for response in self._client.get_library_series(library_id=library_id):
1291 if not response.results:
1292 break
1293 for abs_series in response.results:
1294 path = f"{current_path}/{abs_series.id_}"
1295 items.append(
1296 BrowseFolder(
1297 item_id=abs_series.id_,
1298 name=abs_series.name,
1299 provider=self.instance_id,
1300 path=path,
1301 )
1302 )
1303
1304 return sorted(items, key=lambda x: x.name)
1305
1306 async def _browse_collections(
1307 self, current_path: str, library_id: str
1308 ) -> Sequence[BrowseFolder]:
1309 items = []
1310 async for response in self._client.get_library_collections(library_id=library_id):
1311 if not response.results:
1312 break
1313 for abs_collection in response.results:
1314 path = f"{current_path}/{abs_collection.id_}"
1315 items.append(
1316 BrowseFolder(
1317 item_id=abs_collection.id_,
1318 name=abs_collection.name,
1319 provider=self.instance_id,
1320 path=path,
1321 )
1322 )
1323 return sorted(items, key=lambda x: x.name)
1324
1325 async def _browse_books(self, library_id: str) -> Sequence[MediaItemType]:
1326 if len(self.libraries.audiobooks[library_id].item_ids) == 0:
1327 self._log_no_helper_item_ids()
1328 items = []
1329 for book_id in self.libraries.audiobooks[library_id].item_ids:
1330 mass_item = await self.mass.music.get_library_item_by_prov_id(
1331 media_type=MediaType.AUDIOBOOK,
1332 item_id=book_id,
1333 provider_instance_id_or_domain=self.instance_id,
1334 )
1335 if mass_item is not None:
1336 items.append(mass_item)
1337 return sorted(items, key=lambda x: x.name)
1338
1339 async def _browse_author_books(
1340 self, current_path: str, author_id: str
1341 ) -> Sequence[MediaItemType | BrowseFolder]:
1342 items: list[MediaItemType | BrowseFolder] = []
1343
1344 abs_author = await self._client.get_author(
1345 author_id=author_id, include_items=True, include_series=True
1346 )
1347 if not isinstance(abs_author, AbsAuthorWithItemsAndSeries):
1348 raise TypeError("Unexpected type of author.")
1349
1350 book_ids = {x.id_ for x in abs_author.library_items}
1351 series_book_ids = set()
1352
1353 for series in abs_author.series:
1354 series_book_ids.update([x.id_ for x in series.items])
1355 path = f"{current_path}/{series.id_}"
1356 items.append(
1357 BrowseFolder(
1358 item_id=series.id_,
1359 # frontend does <name>: <translation>
1360 name=series.name,
1361 translation_key="series_singular",
1362 provider=self.instance_id,
1363 path=path,
1364 )
1365 )
1366 book_ids = book_ids.difference(series_book_ids)
1367 for book_id in book_ids:
1368 mass_item = await self.mass.music.get_library_item_by_prov_id(
1369 media_type=MediaType.AUDIOBOOK,
1370 item_id=book_id,
1371 provider_instance_id_or_domain=self.instance_id,
1372 )
1373 if mass_item is not None:
1374 items.append(mass_item)
1375
1376 return items
1377
1378 async def _browse_narrator_books(
1379 self, library_id: str, narrator_filter_str: str
1380 ) -> Sequence[MediaItemType]:
1381 items: list[MediaItemType] = []
1382 async for response in self._client.get_library_items(
1383 library_id=library_id, filter_str=f"narrators.{narrator_filter_str}"
1384 ):
1385 if not response.results:
1386 break
1387 for item in response.results:
1388 mass_item = await self.mass.music.get_library_item_by_prov_id(
1389 media_type=MediaType.AUDIOBOOK,
1390 item_id=item.id_,
1391 provider_instance_id_or_domain=self.instance_id,
1392 )
1393 if mass_item is not None:
1394 items.append(mass_item)
1395
1396 return sorted(items, key=lambda x: x.name)
1397
1398 async def _browse_series_books(self, series_id: str) -> Sequence[MediaItemType]:
1399 items = []
1400
1401 abs_series = await self._client.get_series(series_id=series_id, include_progress=True)
1402 if not isinstance(abs_series, AbsSeriesWithProgress):
1403 raise TypeError("Unexpected series type.")
1404
1405 for book_id in abs_series.progress.library_item_ids:
1406 # these are sorted in abs by sequence
1407 mass_item = await self.mass.music.get_library_item_by_prov_id(
1408 media_type=MediaType.AUDIOBOOK,
1409 item_id=book_id,
1410 provider_instance_id_or_domain=self.instance_id,
1411 )
1412 if mass_item is not None:
1413 items.append(mass_item)
1414
1415 return items
1416
1417 async def _browse_collection_books(self, collection_id: str) -> Sequence[MediaItemType]:
1418 items = []
1419 abs_collection = await self._client.get_collection(collection_id=collection_id)
1420 for book in abs_collection.books:
1421 mass_item = await self.mass.music.get_library_item_by_prov_id(
1422 media_type=MediaType.AUDIOBOOK,
1423 item_id=book.id_,
1424 provider_instance_id_or_domain=self.instance_id,
1425 )
1426 if mass_item is not None:
1427 items.append(mass_item)
1428 return items
1429
1430 async def _socket_abs_item_changed(
1431 self, items: LibraryItemExpanded | list[LibraryItemExpanded]
1432 ) -> None:
1433 """For added and updated."""
1434 abs_items = [items] if isinstance(items, LibraryItemExpanded) else items
1435 for abs_item in abs_items:
1436 if isinstance(abs_item, LibraryItemExpandedBook):
1437 # If the book has no audiofiles, we skip -> ebook only.
1438 if len(abs_item.media.tracks) == 0:
1439 continue
1440 self.logger.debug(
1441 'Updated book "%s" via socket.', abs_item.media.metadata.title or ""
1442 )
1443 await self.mass.music.audiobooks.add_item_to_library(
1444 parse_audiobook(
1445 abs_audiobook=abs_item,
1446 instance_id=self.instance_id,
1447 domain=self.domain,
1448 token=self._client.token,
1449 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1450 ),
1451 overwrite_existing=True,
1452 )
1453 lib = self.libraries.audiobooks.get(abs_item.library_id, None)
1454 if lib is not None:
1455 lib.item_ids.add(abs_item.id_)
1456 elif isinstance(abs_item, LibraryItemExpandedPodcast):
1457 self.logger.debug(
1458 'Updated podcast "%s" via socket.', abs_item.media.metadata.title or ""
1459 )
1460 mass_podcast = parse_podcast(
1461 abs_podcast=abs_item,
1462 instance_id=self.instance_id,
1463 domain=self.domain,
1464 token=self._client.token,
1465 base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
1466 )
1467 if not (
1468 bool(self.config.get_value(CONF_HIDE_EMPTY_PODCASTS))
1469 and mass_podcast.total_episodes == 0
1470 ):
1471 await self.mass.music.podcasts.add_item_to_library(
1472 mass_podcast,
1473 overwrite_existing=True,
1474 )
1475 lib = self.libraries.podcasts.get(abs_item.library_id, None)
1476 if lib is not None:
1477 lib.item_ids.add(abs_item.id_)
1478 await self._cache_set_helper_libraries()
1479
1480 async def _socket_abs_item_removed(self, item: LibraryItemRemoved) -> None:
1481 """Item removed."""
1482 media_type: MediaType | None = None
1483 for lib in self.libraries.audiobooks.values():
1484 if item.id_ in lib.item_ids:
1485 media_type = MediaType.AUDIOBOOK
1486 lib.item_ids.remove(item.id_)
1487 break
1488 for lib in self.libraries.podcasts.values():
1489 if item.id_ in lib.item_ids:
1490 media_type = MediaType.PODCAST
1491 lib.item_ids.remove(item.id_)
1492 break
1493
1494 if media_type is not None:
1495 mass_item = await self.mass.music.get_library_item_by_prov_id(
1496 media_type=media_type,
1497 item_id=item.id_,
1498 provider_instance_id_or_domain=self.instance_id,
1499 )
1500 if mass_item is not None:
1501 await self.mass.music.remove_item_from_library(
1502 media_type=media_type, library_item_id=mass_item.item_id
1503 )
1504 self.logger.debug('Removed %s "%s" via socket.', media_type.value, mass_item.name)
1505
1506 await self._cache_set_helper_libraries()
1507
1508 async def _socket_abs_user_item_progress_updated(
1509 self, id_: str, progress: MediaProgress
1510 ) -> None:
1511 """To update continue listening.
1512
1513 ABS reports every 15s and immediately on play state change.
1514 This callback is called per item if a progress is changed:
1515 - a change in position
1516 - the item is finished
1517 But it is _not_called, if a progress is reset/ discarded.
1518 """
1519 # guard, see progress guard class docstrings for explanation
1520 if not self.progress_guard.guard_ok_abs(abs_progress=progress):
1521 return
1522
1523 known_ids = self._get_all_known_item_ids()
1524 if progress.library_item_id not in known_ids:
1525 return
1526
1527 self.logger.debug(f"Updated progress of item {progress.library_item_id} via socket.")
1528
1529 if progress.episode_id is None:
1530 await self._update_playlog_book(progress)
1531 return
1532 await self._update_playlog_episode(progress)
1533
1534 async def _socket_abs_refresh_token_expired(self) -> None:
1535 await self.reauthenticate()
1536
1537 async def _socket_stream_open(self, stream: AbsStream) -> None:
1538 # stream's id is the same as the playback session id
1539 for session_helper in self.sessions.values():
1540 if session_helper.abs_session_id == stream.id_:
1541 session_helper.hls_stream_open.set()
1542 break
1543
1544 async def reauthenticate(self) -> None:
1545 """Reauthorize the abs session config if refresh token expired."""
1546 # some safe guarding should that function be called simultaneously
1547 if self.reauthenticate_lock.locked() or time.time() - self.reauthenticate_last < 5:
1548 while True:
1549 if not self.reauthenticate_lock.locked():
1550 return
1551 await asyncio.sleep(0.5)
1552 async with self.reauthenticate_lock:
1553 await self._client.session_config.authenticate(
1554 username=str(self.config.get_value(CONF_USERNAME)),
1555 password=str(self.config.get_value(CONF_PASSWORD)),
1556 )
1557 self.reauthenticate_last = time.time()
1558
1559 def _get_all_known_item_ids(self) -> set[str]:
1560 known_ids = set()
1561 for lib in self.libraries.podcasts.values():
1562 known_ids.update(lib.item_ids)
1563 for lib in self.libraries.audiobooks.values():
1564 known_ids.update(lib.item_ids)
1565
1566 return known_ids
1567
1568 async def _set_playlog_from_user(self, user: User) -> None:
1569 """Update on user callback.
1570
1571 User holds also all media progresses specific to that user.
1572
1573 The function 'guard_ok_abs' uses the timestamp of the last update in abs, thus after an
1574 initial progress update, an unchanged update will not trigger a (useless) playlog update.
1575
1576 We do not sync removed progresses for the sake of simplicity.
1577 """
1578 await self._set_playlog_from_user_sync(user.media_progress)
1579
1580 async def _set_playlog_from_user_sync(self, progresses: list[MediaProgress]) -> None:
1581 # for debugging
1582 __updated_items = 0
1583
1584 known_ids = self._get_all_known_item_ids()
1585 abs_ids_with_progress = set()
1586
1587 for progress in progresses:
1588 # save progress ids for later
1589 ma_item_id = (
1590 progress.library_item_id
1591 if progress.episode_id is None
1592 else f"{progress.library_item_id} {progress.episode_id}"
1593 )
1594 abs_ids_with_progress.add(ma_item_id)
1595
1596 # Guard. Also makes sure, that we don't write to db again if no state change happened.
1597 # This is achieved by adding a Helper Progress in the update playlog functions, which
1598 # then has the most recent timestamp. If a subsequent progress sent by abs has an older
1599 # timestamp, we do not update again.
1600 if not self.progress_guard.guard_ok_abs(progress):
1601 continue
1602 if progress.current_time is not None:
1603 if (
1604 int(progress.current_time) != 0
1605 and not progress.current_time >= PLAYBACK_REPORT_INTERVAL_SECONDS
1606 ):
1607 # same as mass default, only > 30s
1608 continue
1609 if progress.library_item_id not in known_ids:
1610 continue
1611 __updated_items += 1
1612 if progress.episode_id is None:
1613 await self._update_playlog_book(progress)
1614 else:
1615 await self._update_playlog_episode(progress)
1616 self.logger.debug(f"Updated {__updated_items} from full playlog.")
1617
1618 # Get MA's known progresses of ABS.
1619 # In ABS the user may discard a progress, which removes the progress completely.
1620 # There is no socket notification for this event.
1621 ma_playlog_state = await self.mass.music.get_playlog_provider_item_ids(
1622 provider_instance_id=self.instance_id
1623 )
1624 ma_ids_with_progress = {x for _, x in ma_playlog_state}
1625 discarded_progress_ids = ma_ids_with_progress.difference(abs_ids_with_progress)
1626 for discarded_progress_id in discarded_progress_ids:
1627 if len(discarded_progress_id.split(" ")) == 1:
1628 if discarded_item := await self.mass.music.get_library_item_by_prov_id(
1629 media_type=MediaType.AUDIOBOOK,
1630 item_id=discarded_progress_id,
1631 provider_instance_id_or_domain=self.instance_id,
1632 ):
1633 self.progress_guard.add_progress(discarded_progress_id)
1634 await self.mass.music.mark_item_unplayed(discarded_item)
1635 else:
1636 with suppress(MediaNotFoundError):
1637 discarded_item = await self.get_podcast_episode(
1638 prov_episode_id=discarded_progress_id, add_progress=False
1639 )
1640 self.progress_guard.add_progress(*discarded_progress_id.split(" "))
1641 await self.mass.music.mark_item_unplayed(discarded_item)
1642 self.logger.debug("Discarded item %s ", discarded_progress_id)
1643
1644 async def _update_playlog_book(self, progress: MediaProgress) -> None:
1645 # helper progress also ensures no useless progress updates,
1646 # see comment above
1647 self.progress_guard.add_progress(progress.library_item_id)
1648 if progress.current_time is None:
1649 return
1650 mass_audiobook = await self.mass.music.get_library_item_by_prov_id(
1651 media_type=MediaType.AUDIOBOOK,
1652 item_id=progress.library_item_id,
1653 provider_instance_id_or_domain=self.instance_id,
1654 )
1655 if mass_audiobook is None:
1656 return
1657 if int(progress.current_time) == 0 and not progress.is_finished:
1658 await self.mass.music.mark_item_unplayed(mass_audiobook)
1659 else:
1660 await self.mass.music.mark_item_played(
1661 mass_audiobook,
1662 fully_played=progress.is_finished,
1663 seconds_played=int(progress.current_time),
1664 )
1665
1666 async def _update_playlog_episode(self, progress: MediaProgress) -> None:
1667 # helper progress also ensures no useless progress updates,
1668 # see comment above
1669 self.progress_guard.add_progress(progress.library_item_id, progress.episode_id)
1670 if progress.current_time is None:
1671 return
1672 _episode_id = f"{progress.library_item_id} {progress.episode_id}"
1673 try:
1674 # need to obtain full podcast, and then search for episode
1675 mass_episode = await self.get_podcast_episode(_episode_id, add_progress=False)
1676 except MediaNotFoundError:
1677 return
1678 if int(progress.current_time) == 0 and not progress.is_finished:
1679 await self.mass.music.mark_item_unplayed(mass_episode)
1680 else:
1681 await self.mass.music.mark_item_played(
1682 mass_episode,
1683 fully_played=progress.is_finished,
1684 seconds_played=int(progress.current_time),
1685 )
1686
1687 async def _cache_set_helper_libraries(self) -> None:
1688 await self.mass.cache.set(
1689 key=CACHE_KEY_LIBRARIES,
1690 provider=self.instance_id,
1691 category=CACHE_CATEGORY_LIBRARIES,
1692 data=self.libraries.to_dict(),
1693 )
1694
1695 def _log_no_libraries(self) -> None:
1696 self.logger.error("There are no libraries visible to the Audiobookshelf provider.")
1697
1698 def _log_no_helper_item_ids(self) -> None:
1699 self.logger.warning(
1700 "Cached item ids are missing. "
1701 "Please trigger a full resync of the Audiobookshelf provider manually."
1702 )
1703