music-assistant-server

80.9 KBPY
__init__.py
80.9 KB1,941 lines • python
1"""Filesystem musicprovider support for MusicAssistant."""
2
3from __future__ import annotations
4
5import asyncio
6import contextlib
7import logging
8import os
9import os.path
10import time
11import urllib.parse
12from collections.abc import AsyncGenerator, Iterator, Sequence
13from datetime import UTC, datetime
14from pathlib import Path
15from typing import TYPE_CHECKING, Any, cast
16
17import aiofiles
18import shortuuid
19import xmltodict
20from aiofiles.os import wrap
21from music_assistant_models.enums import (
22    ContentType,
23    ExternalID,
24    ImageType,
25    MediaType,
26    ProviderFeature,
27    StreamType,
28)
29from music_assistant_models.errors import MediaNotFoundError, MusicAssistantError, SetupFailedError
30from music_assistant_models.media_items import (
31    Album,
32    Artist,
33    Audiobook,
34    AudioFormat,
35    BrowseFolder,
36    ItemMapping,
37    MediaItemChapter,
38    MediaItemImage,
39    MediaItemType,
40    Playlist,
41    Podcast,
42    PodcastEpisode,
43    ProviderMapping,
44    SearchResults,
45    Track,
46    UniqueList,
47    is_track,
48)
49from music_assistant_models.streamdetails import MultiPartPath, StreamDetails
50
51from music_assistant.constants import (
52    CONF_PATH,
53    DB_TABLE_ALBUM_ARTISTS,
54    DB_TABLE_ALBUM_TRACKS,
55    DB_TABLE_ALBUMS,
56    DB_TABLE_ARTISTS,
57    DB_TABLE_PROVIDER_MAPPINGS,
58    DB_TABLE_TRACK_ARTISTS,
59    VARIOUS_ARTISTS_MBID,
60    VARIOUS_ARTISTS_NAME,
61    VERBOSE_LOG_LEVEL,
62)
63from music_assistant.helpers.compare import compare_strings, create_safe_string
64from music_assistant.helpers.json import json_loads
65from music_assistant.helpers.playlists import parse_m3u, parse_pls
66from music_assistant.helpers.tags import AudioTags, async_parse_tags, parse_tags, split_items
67from music_assistant.helpers.util import (
68    TaskManager,
69    detect_charset,
70    parse_title_and_version,
71    try_parse_int,
72)
73from music_assistant.models.music_provider import MusicProvider
74
75from .constants import (
76    AUDIOBOOK_EXTENSIONS,
77    CACHE_CATEGORY_ALBUM_INFO,
78    CACHE_CATEGORY_ARTIST_INFO,
79    CACHE_CATEGORY_AUDIOBOOK_CHAPTERS,
80    CACHE_CATEGORY_FOLDER_IMAGES,
81    CACHE_CATEGORY_PODCAST_METADATA,
82    CONF_ENTRY_CONTENT_TYPE,
83    CONF_ENTRY_CONTENT_TYPE_READ_ONLY,
84    CONF_ENTRY_IGNORE_ALBUM_PLAYLISTS,
85    CONF_ENTRY_LIBRARY_SYNC_AUDIOBOOKS,
86    CONF_ENTRY_LIBRARY_SYNC_PLAYLISTS,
87    CONF_ENTRY_LIBRARY_SYNC_PODCASTS,
88    CONF_ENTRY_LIBRARY_SYNC_TRACKS,
89    CONF_ENTRY_MISSING_ALBUM_ARTIST,
90    CONF_ENTRY_PATH,
91    IMAGE_EXTENSIONS,
92    PLAYLIST_EXTENSIONS,
93    PODCAST_EPISODE_EXTENSIONS,
94    SUPPORTED_EXTENSIONS,
95    TRACK_EXTENSIONS,
96    IsChapterFile,
97)
98from .helpers import (
99    IGNORE_DIRS,
100    FileSystemItem,
101    get_absolute_path,
102    get_album_dir,
103    get_artist_dir,
104    get_relative_path,
105    sorted_scandir,
106)
107
108if TYPE_CHECKING:
109    from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
110    from music_assistant_models.provider import ProviderManifest
111
112    from music_assistant.mass import MusicAssistant
113    from music_assistant.models import ProviderInstanceType
114
115
116isdir = wrap(os.path.isdir)
117isfile = wrap(os.path.isfile)
118exists = wrap(os.path.exists)
119makedirs = wrap(os.makedirs)
120scandir = wrap(os.scandir)
121
122SUPPORTED_FEATURES = {
123    ProviderFeature.BROWSE,
124    ProviderFeature.SEARCH,
125}
126
127
128async def setup(
129    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
130) -> ProviderInstanceType:
131    """Initialize provider(instance) with given configuration."""
132    base_path = cast("str", config.get_value(CONF_PATH))
133    return LocalFileSystemProvider(mass, manifest, config, base_path)
134
135
136async def get_config_entries(
137    mass: MusicAssistant,
138    instance_id: str | None = None,
139    action: str | None = None,
140    values: dict[str, ConfigValueType] | None = None,
141) -> tuple[ConfigEntry, ...]:
142    """
143    Return Config entries to setup this provider.
144
145    instance_id: id of an existing provider instance (None if new instance setup).
146    action: [optional] action key called from config entries UI.
147    values: the (intermediate) raw values for config entries sent with the action.
148    """
149    # ruff: noqa: ARG001
150    base_entries = [
151        CONF_ENTRY_PATH,
152        CONF_ENTRY_MISSING_ALBUM_ARTIST,
153        CONF_ENTRY_IGNORE_ALBUM_PLAYLISTS,
154        CONF_ENTRY_LIBRARY_SYNC_TRACKS,
155        CONF_ENTRY_LIBRARY_SYNC_PLAYLISTS,
156        CONF_ENTRY_LIBRARY_SYNC_PODCASTS,
157        CONF_ENTRY_LIBRARY_SYNC_AUDIOBOOKS,
158    ]
159    if instance_id is None or values is None:
160        return (CONF_ENTRY_CONTENT_TYPE, *base_entries)
161    return (CONF_ENTRY_CONTENT_TYPE_READ_ONLY, *base_entries)
162
163
164class LocalFileSystemProvider(MusicProvider):
165    """
166    Implementation of a musicprovider for (local) files.
167
168    Reads ID3 tags from file and falls back to parsing filename.
169    Optionally reads metadata from nfo files and images in folder structure <artist>/<album>.
170    Supports m3u files for playlists.
171    """
172
173    def __init__(
174        self,
175        mass: MusicAssistant,
176        manifest: ProviderManifest,
177        config: ProviderConfig,
178        base_path: str,
179    ) -> None:
180        """Initialize MusicProvider."""
181        super().__init__(mass, manifest, config, SUPPORTED_FEATURES)
182        self.base_path: str = base_path
183        self.write_access: bool = False
184        self.sync_running: bool = False
185        self.media_content_type = cast("str", config.get_value(CONF_ENTRY_CONTENT_TYPE.key))
186
187    @property
188    def supported_features(self) -> set[ProviderFeature]:
189        """Return the features supported by this Provider."""
190        base_features = {*SUPPORTED_FEATURES}
191        if self.media_content_type == "audiobooks":
192            return {ProviderFeature.LIBRARY_AUDIOBOOKS, *base_features}
193        if self.media_content_type == "podcasts":
194            return {ProviderFeature.LIBRARY_PODCASTS, *base_features}
195        music_features = {
196            ProviderFeature.LIBRARY_ALBUMS,
197            ProviderFeature.LIBRARY_ARTISTS,
198            ProviderFeature.LIBRARY_TRACKS,
199            ProviderFeature.LIBRARY_PLAYLISTS,
200            *base_features,
201        }
202        if self.write_access:
203            music_features.add(ProviderFeature.PLAYLIST_TRACKS_EDIT)
204            music_features.add(ProviderFeature.PLAYLIST_CREATE)
205        return music_features
206
207    @property
208    def is_streaming_provider(self) -> bool:
209        """Return True if the provider is a streaming provider."""
210        return False
211
212    @property
213    def instance_name_postfix(self) -> str | None:
214        """Return a (default) instance name postfix for this provider instance."""
215        return self.base_path.split(os.sep)[-1]
216
217    async def handle_async_init(self) -> None:
218        """Handle async initialization of the provider."""
219        if not await isdir(self.base_path):
220            msg = f"Music Directory {self.base_path} does not exist"
221            raise SetupFailedError(msg)
222        await self.check_write_access()
223
224    async def search(
225        self,
226        search_query: str,
227        media_types: list[MediaType] | None,
228        limit: int = 5,
229    ) -> SearchResults:
230        """Perform search on this file based musicprovider."""
231        result = SearchResults()
232        # searching the filesystem is slow and unreliable,
233        # so instead we just query the db...
234        if media_types is None or MediaType.TRACK in media_types:
235            result.tracks = await self.mass.music.tracks.get_library_items_by_query(
236                search=search_query, provider_filter=[self.instance_id], limit=limit
237            )
238
239        if media_types is None or MediaType.ALBUM in media_types:
240            result.albums = await self.mass.music.albums.get_library_items_by_query(
241                search=search_query,
242                provider_filter=[self.instance_id],
243                limit=limit,
244            )
245
246        if media_types is None or MediaType.ARTIST in media_types:
247            result.artists = await self.mass.music.artists.get_library_items_by_query(
248                search=search_query,
249                provider_filter=[self.instance_id],
250                limit=limit,
251            )
252        if media_types is None or MediaType.PLAYLIST in media_types:
253            result.playlists = await self.mass.music.playlists.get_library_items_by_query(
254                search=search_query,
255                provider_filter=[self.instance_id],
256                limit=limit,
257            )
258        if media_types is None or MediaType.AUDIOBOOK in media_types:
259            result.audiobooks = await self.mass.music.audiobooks.get_library_items_by_query(
260                search=search_query,
261                provider_filter=[self.instance_id],
262                limit=limit,
263            )
264        if media_types is None or MediaType.PODCAST in media_types:
265            result.podcasts = await self.mass.music.podcasts.get_library_items_by_query(
266                search=search_query,
267                provider_filter=[self.instance_id],
268                limit=limit,
269            )
270        return result
271
272    async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
273        """Browse this provider's items.
274
275        :param path: The path to browse, (e.g. provid://artists).
276        """
277        # for audiobooks and podcasts we just return all library items
278        if self.media_content_type == "podcasts":
279            return await self.mass.music.podcasts.library_items(provider=self.instance_id)
280        if self.media_content_type == "audiobooks":
281            return await self.mass.music.audiobooks.library_items(provider=self.instance_id)
282        items: list[MediaItemType | ItemMapping | BrowseFolder] = []
283        item_path = path.split("://", 1)[1]
284        if not item_path:
285            item_path = ""
286        abs_path = self.get_absolute_path(item_path)
287        for item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=True):
288            if not item.is_dir and ("." not in item.filename or not item.ext):
289                # skip system files and files without extension
290                continue
291
292            if item.is_dir:
293                items.append(
294                    BrowseFolder(
295                        item_id=item.relative_path,
296                        provider=self.instance_id,
297                        path=f"{self.instance_id}://{item.relative_path}",
298                        name=item.filename,
299                        # mark folder as playable, assuming it contains tracks underneath
300                        is_playable=True,
301                    )
302                )
303            elif item.ext in TRACK_EXTENSIONS:
304                items.append(
305                    ItemMapping(
306                        media_type=MediaType.TRACK,
307                        item_id=item.relative_path,
308                        provider=self.instance_id,
309                        name=item.filename,
310                    )
311                )
312            elif item.ext in PLAYLIST_EXTENSIONS:
313                items.append(
314                    ItemMapping(
315                        media_type=MediaType.PLAYLIST,
316                        item_id=item.relative_path,
317                        provider=self.instance_id,
318                        name=item.filename,
319                    )
320                )
321        return items
322
323    async def sync_library(self, media_type: MediaType) -> None:
324        """Run library sync for this provider."""
325        if media_type in (MediaType.ARTIST, MediaType.ALBUM):
326            # artists and albums are synced as part of track sync
327            return
328        assert self.mass.music.database
329        start_time = time.time()
330        if self.sync_running:
331            self.logger.warning("Library sync already running for %s", self.name)
332            return
333        self.logger.info(
334            "Started Library sync for %s",
335            self.name,
336        )
337        file_checksums: dict[str, str] = {}
338        # NOTE: we always run a scan of the entire library, as we need to detect changes
339        # we ignore any given mediatype(s) and just scan all supported files
340        query = (
341            f"SELECT provider_item_id, details FROM {DB_TABLE_PROVIDER_MAPPINGS} "
342            f"WHERE provider_instance = '{self.instance_id}' "
343            f"AND media_type in ('track', 'playlist', 'audiobook', 'podcast_episode')"
344        )
345        for db_row in await self.mass.music.database.get_rows_from_query(query, limit=0):
346            file_checksums[db_row["provider_item_id"]] = str(db_row["details"])
347        # find all supported files in the base directory and all subfolders
348        # we work bottom up, as-in we derive all info from the tracks
349        cur_filenames = set()
350        prev_filenames = set(file_checksums.keys())
351
352        # NOTE: we do the entire traversing of the directory structure, including parsing tags
353        # in a single executor thread to save the overhead of having to spin up tons of tasks
354        def listdir(path: str) -> Iterator[FileSystemItem]:
355            """Recursively traverse directory entries."""
356            for item in os.scandir(path):
357                # ignore invalid filenames
358                if item.name in IGNORE_DIRS or item.name.startswith((".", "_")):
359                    continue
360                if item.is_dir(follow_symlinks=False):
361                    yield from listdir(item.path)
362                elif item.is_file(follow_symlinks=False):
363                    # skip files without extension
364                    if "." not in item.name:
365                        continue
366                    ext = item.name.rsplit(".", 1)[1].lower()
367                    if ext not in SUPPORTED_EXTENSIONS:
368                        # skip unsupported file extension
369                        continue
370                    try:
371                        yield FileSystemItem.from_dir_entry(item, self.base_path)
372                    except OSError as err:
373                        # Skip files that cannot be stat'd (e.g., invalid encoding on SMB mounts)
374                        # This typically happens with emoji or special unicode characters
375                        self.logger.debug(
376                            "Skipping file %s due to stat error: %s",
377                            item.path,
378                            str(err),
379                        )
380
381        def run_sync() -> None:
382            """Run the actual sync (in an executor job)."""
383            self.sync_running = True
384            try:
385                for item in listdir(self.base_path):
386                    prev_checksum = file_checksums.get(item.relative_path)
387                    if self._process_item(item, prev_checksum):
388                        cur_filenames.add(item.relative_path)
389            finally:
390                self.sync_running = False
391
392        await asyncio.to_thread(run_sync)
393
394        end_time = time.time()
395        self.logger.info(
396            "Library sync for %s completed in %.2f seconds",
397            self.name,
398            end_time - start_time,
399        )
400        # work out deletions
401        deleted_files = prev_filenames - cur_filenames
402        await self._process_deletions(deleted_files)
403
404        # process orphaned albums and artists
405        await self._process_orphaned_albums_and_artists()
406
407    def _process_item(self, item: FileSystemItem, prev_checksum: str | None) -> bool:
408        """Process a single item. NOT async friendly."""
409        try:
410            self.logger.log(VERBOSE_LOG_LEVEL, "Processing: %s", item.relative_path)
411
412            # ignore playlists that are in album directories
413            # we need to run this check early because the setting may have changed
414            if (
415                item.ext in PLAYLIST_EXTENSIONS
416                and self.media_content_type == "music"
417                and self.config.get_value(CONF_ENTRY_IGNORE_ALBUM_PLAYLISTS.key)
418            ):
419                # we assume this in a bit of a dumb way by just checking if the playlist
420                # is more than 1 level deep in the directory structure
421                if len(item.relative_path.split("/")) > 2:
422                    return False
423
424            # return early if the item did not change (checksum still the same)
425            if item.checksum == prev_checksum:
426                return True
427
428            if item.ext in TRACK_EXTENSIONS and self.media_content_type == "music":
429                # handle track item
430                tags = parse_tags(item.absolute_path, item.file_size)
431
432                async def process_track() -> None:
433                    track = await self._parse_track(item, tags)
434                    # add/update track to db
435                    # note that filesystem items are always overwriting existing info
436                    # when they are detected as changed
437                    track.favorite = False  # TODO: implement favorite status based on rating ?
438                    await self.mass.music.tracks.add_item_to_library(
439                        track, overwrite_existing=prev_checksum is not None
440                    )
441
442                asyncio.run_coroutine_threadsafe(process_track(), self.mass.loop).result()
443                return True
444
445            if item.ext in AUDIOBOOK_EXTENSIONS and self.media_content_type == "audiobooks":
446                # handle audiobook item
447                tags = parse_tags(item.absolute_path, item.file_size)
448
449                async def process_audiobook() -> None:
450                    try:
451                        audiobook = await self._parse_audiobook(item, tags)
452                    except IsChapterFile:
453                        return
454                    # add/update audiobook to db
455                    # note that filesystem items are always overwriting existing info
456                    # when they are detected as changed
457                    await self.mass.music.audiobooks.add_item_to_library(
458                        audiobook, overwrite_existing=prev_checksum is not None
459                    )
460
461                asyncio.run_coroutine_threadsafe(process_audiobook(), self.mass.loop).result()
462                return True
463
464            if item.ext in PODCAST_EPISODE_EXTENSIONS and self.media_content_type == "podcasts":
465                # handle podcast(episode) item
466                tags = parse_tags(item.absolute_path, item.file_size)
467
468                async def process_episode() -> None:
469                    episode = await self._parse_podcast_episode(item, tags)
470                    assert isinstance(episode.podcast, Podcast)
471                    # add/update episode to db
472                    # note that filesystem items are always overwriting existing info
473                    # when they are detected as changed
474                    await self.mass.music.podcasts.add_item_to_library(
475                        episode.podcast, overwrite_existing=prev_checksum is not None
476                    )
477
478                asyncio.run_coroutine_threadsafe(process_episode(), self.mass.loop).result()
479                return True
480
481            if item.ext in PLAYLIST_EXTENSIONS and self.media_content_type == "music":
482                # handle playlist item
483
484                async def process_playlist() -> None:
485                    playlist = await self.get_playlist(item.relative_path)
486                    # add/update playlist to db
487                    await self.mass.music.playlists.add_item_to_library(
488                        playlist,
489                        overwrite_existing=prev_checksum is not None,
490                    )
491
492                asyncio.run_coroutine_threadsafe(process_playlist(), self.mass.loop).result()
493                return True
494
495        except Exception as err:
496            # we don't want the whole sync to crash on one file so we catch all exceptions here
497            self.logger.error(
498                "Error processing %s - %s",
499                item.relative_path,
500                str(err),
501                exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
502            )
503        return False
504
505    async def _process_orphaned_albums_and_artists(self) -> None:
506        """Process deletion of orphaned albums and artists."""
507        assert self.mass.music.database
508        # Remove albums without any tracks
509        query = (
510            f"SELECT item_id FROM {DB_TABLE_ALBUMS} "
511            f"WHERE item_id not in ( SELECT album_id from {DB_TABLE_ALBUM_TRACKS}) "
512            f"AND item_id in ( SELECT item_id from {DB_TABLE_PROVIDER_MAPPINGS} "
513            f"WHERE provider_instance = '{self.instance_id}' and media_type = 'album' )"
514        )
515        for db_row in await self.mass.music.database.get_rows_from_query(
516            query,
517            limit=100000,
518        ):
519            await self.mass.music.albums.remove_item_from_library(db_row["item_id"])
520
521        # Remove artists without any tracks or albums
522        query = (
523            f"SELECT item_id FROM {DB_TABLE_ARTISTS} "
524            f"WHERE item_id not in "
525            f"( select artist_id from {DB_TABLE_TRACK_ARTISTS} "
526            f"UNION SELECT artist_id from {DB_TABLE_ALBUM_ARTISTS} ) "
527            f"AND item_id in ( SELECT item_id from {DB_TABLE_PROVIDER_MAPPINGS} "
528            f"WHERE provider_instance = '{self.instance_id}' and media_type = 'artist' )"
529        )
530        for db_row in await self.mass.music.database.get_rows_from_query(
531            query,
532            limit=100000,
533        ):
534            await self.mass.music.artists.remove_item_from_library(db_row["item_id"])
535
536    async def _process_deletions(self, deleted_files: set[str]) -> None:
537        """Process all deletions."""
538        # process deleted tracks/playlists
539        album_ids = set()
540        artist_ids = set()
541        for file_path in deleted_files:
542            _, ext = file_path.rsplit(".", 1)
543            if ext in PODCAST_EPISODE_EXTENSIONS and self.media_content_type == "podcasts":
544                controller = self.mass.music.get_controller(MediaType.PODCAST_EPISODE)
545            elif ext in AUDIOBOOK_EXTENSIONS and self.media_content_type == "audiobooks":
546                controller = self.mass.music.get_controller(MediaType.AUDIOBOOK)
547            elif ext in PLAYLIST_EXTENSIONS and self.media_content_type == "music":
548                controller = self.mass.music.get_controller(MediaType.PLAYLIST)
549            elif ext in TRACK_EXTENSIONS and self.media_content_type == "music":
550                controller = self.mass.music.get_controller(MediaType.TRACK)
551            else:
552                # unsupported file extension?
553                continue
554
555            if library_item := await controller.get_library_item_by_prov_id(
556                file_path, self.instance_id
557            ):
558                if is_track(library_item):
559                    if library_item.album:
560                        album_ids.add(library_item.album.item_id)
561                        # need to fetch the library album to resolve the itemmapping
562                        db_album = await self.mass.music.albums.get_library_item(
563                            library_item.album.item_id
564                        )
565                        for artist in db_album.artists:
566                            artist_ids.add(artist.item_id)
567                    for artist in library_item.artists:
568                        artist_ids.add(artist.item_id)
569                await controller.remove_item_from_library(library_item.item_id)
570        # check if any albums need to be cleaned up
571        for album_id in album_ids:
572            if not await self.mass.music.albums.tracks(album_id, "library"):
573                await self.mass.music.albums.remove_item_from_library(album_id)
574        # check if any artists need to be cleaned up
575        for artist_id in artist_ids:
576            artist_albums = await self.mass.music.artists.albums(artist_id, "library")
577            artist_tracks = await self.mass.music.artists.tracks(artist_id, "library")
578            if not (artist_albums or artist_tracks):
579                await self.mass.music.artists.remove_item_from_library(artist_id)
580
581    async def get_artist(self, prov_artist_id: str) -> Artist:
582        """Get full artist details by id."""
583        db_artist = await self.mass.music.artists.get_library_item_by_prov_id(
584            prov_artist_id, self.instance_id
585        )
586        if not db_artist:
587            # this may happen if the artist is not in the db yet
588            # e.g. when browsing the filesystem
589            if await self.exists(prov_artist_id):
590                return await self._parse_artist(prov_artist_id, artist_path=prov_artist_id)
591            return await self._parse_artist(prov_artist_id)
592
593        # prov_artist_id is either an actual (relative) path or a name (as fallback)
594        safe_artist_name = create_safe_string(prov_artist_id, lowercase=False, replace_space=False)
595        if await self.exists(prov_artist_id):
596            artist_path = prov_artist_id
597        elif await self.exists(safe_artist_name):
598            artist_path = safe_artist_name
599        else:
600            for prov_mapping in db_artist.provider_mappings:
601                if prov_mapping.provider_instance != self.instance_id:
602                    continue
603                if prov_mapping.url:
604                    artist_path = prov_mapping.url
605                    break
606            else:
607                # this is an artist without an actual path on disk
608                # return the info we already have in the db
609                return db_artist
610        return await self._parse_artist(
611            db_artist.name,
612            sort_name=db_artist.sort_name,
613            mbid=db_artist.mbid,
614            artist_path=artist_path,
615        )
616
617    async def get_album(self, prov_album_id: str) -> Album:
618        """Get full album details by id."""
619        for track in await self.get_album_tracks(prov_album_id):
620            for prov_mapping in track.provider_mappings:
621                if prov_mapping.provider_instance == self.instance_id:
622                    file_item = await self.resolve(prov_mapping.item_id)
623                    tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
624                    full_track = await self._parse_track(file_item, tags)
625                    assert isinstance(full_track.album, Album)
626                    return full_track.album
627        msg = f"Album not found: {prov_album_id}"
628        raise MediaNotFoundError(msg)
629
630    async def get_track(self, prov_track_id: str) -> Track:
631        """Get full track details by id."""
632        # ruff: noqa: PLR0915
633        if not await self.exists(prov_track_id):
634            msg = f"Track path does not exist: {prov_track_id}"
635            raise MediaNotFoundError(msg)
636
637        file_item = await self.resolve(prov_track_id)
638        tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
639        return await self._parse_track(file_item, tags=tags, full_album_metadata=True)
640
641    async def get_playlist(self, prov_playlist_id: str) -> Playlist:
642        """Get full playlist details by id."""
643        if not await self.exists(prov_playlist_id):
644            msg = f"Playlist path does not exist: {prov_playlist_id}"
645            raise MediaNotFoundError(msg)
646
647        file_item = await self.resolve(prov_playlist_id)
648        playlist = Playlist(
649            item_id=file_item.relative_path,
650            provider=self.instance_id,
651            name=file_item.name,
652            provider_mappings={
653                ProviderMapping(
654                    item_id=file_item.relative_path,
655                    provider_domain=self.domain,
656                    provider_instance=self.instance_id,
657                    details=file_item.checksum,
658                )
659            },
660        )
661        playlist.is_editable = ProviderFeature.PLAYLIST_TRACKS_EDIT in self.supported_features
662        # only playlists in the root are editable - all other are read only
663        if "/" in prov_playlist_id or "\\" in prov_playlist_id:
664            playlist.is_editable = False
665        # we do not (yet) have support to edit/create pls playlists, only m3u files can be edited
666        if file_item.ext == "pls":
667            playlist.is_editable = False
668        playlist.owner = self.name
669        return playlist
670
671    async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
672        """Get full audiobook details by id."""
673        # ruff: noqa: PLR0915
674        if not await self.exists(prov_audiobook_id):
675            msg = f"Audiobook path does not exist: {prov_audiobook_id}"
676            raise MediaNotFoundError(msg)
677
678        file_item = await self.resolve(prov_audiobook_id)
679        tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
680        return await self._parse_audiobook(file_item, tags=tags)
681
682    async def get_podcast(self, prov_podcast_id: str) -> Podcast:
683        """Get full podcast details by id."""
684        async for episode in self.get_podcast_episodes(prov_podcast_id):
685            assert isinstance(episode.podcast, Podcast)
686            return episode.podcast
687        msg = f"Podcast not found: {prov_podcast_id}"
688        raise MediaNotFoundError(msg)
689
690    async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
691        """Get album tracks for given album id."""
692        # filesystem items are always stored in db so we can query the database
693        db_album = await self.mass.music.albums.get_library_item_by_prov_id(
694            prov_album_id, self.instance_id
695        )
696        if db_album is None:
697            msg = f"Album not found: {prov_album_id}"
698            raise MediaNotFoundError(msg)
699        album_tracks = await self.mass.music.albums.get_library_album_tracks(db_album.item_id)
700        return [
701            track
702            for track in album_tracks
703            if any(x.provider_instance == self.instance_id for x in track.provider_mappings)
704        ]
705
706    async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
707        """Get playlist tracks."""
708        result: list[Track] = []
709        if page > 0:
710            # paging not (yet) supported
711            return result
712        if not await self.exists(prov_playlist_id):
713            msg = f"Playlist path does not exist: {prov_playlist_id}"
714            raise MediaNotFoundError(msg)
715
716        file_item = await self.resolve(prov_playlist_id)
717        # We are using the checksum of the playlist file here to invalidate the cache
718        # when a change has been made to the playlist file (ie track addition/deletion)
719        cache_checksum = file_item.checksum
720
721        cache_key = f"get_playlist_tracks.{prov_playlist_id}"
722        cached_data = await self.mass.cache.get(
723            cache_key,
724            provider=self.instance_id,
725            checksum=cache_checksum,
726            category=0,
727        )
728        if cached_data is not None:
729            if cached_data and isinstance(cached_data[0], dict):
730                return [Track.from_dict(track_dict) for track_dict in cached_data]
731            return cast("list[Track]", cached_data)
732
733        _, ext = prov_playlist_id.rsplit(".", 1)
734        try:
735            # get playlist file contents
736            playlist_filename = self.get_absolute_path(prov_playlist_id)
737            async with aiofiles.open(playlist_filename, mode="rb") as _file:
738                playlist_data_raw = await _file.read()
739                encoding = await detect_charset(playlist_data_raw)
740                playlist_data = playlist_data_raw.decode(encoding, errors="replace")
741
742            if ext in ("m3u", "m3u8"):
743                playlist_lines = parse_m3u(playlist_data)
744            else:
745                playlist_lines = parse_pls(playlist_data)
746
747            for idx, playlist_line in enumerate(playlist_lines, 1):
748                if "#EXT" in playlist_line.path:
749                    continue
750                if track := await self._parse_playlist_line(
751                    playlist_line.path, os.path.dirname(prov_playlist_id)
752                ):
753                    track.position = idx
754                    result.append(track)
755
756        except Exception as err:
757            self.logger.warning(
758                "Error while parsing playlist %s: %s",
759                prov_playlist_id,
760                str(err),
761                exc_info=err if self.logger.isEnabledFor(10) else None,
762            )
763
764        await self.mass.cache.set(
765            key=cache_key,
766            data=result,
767            expiration=3600 * 24,  # Cache for 24 hours
768            provider=self.instance_id,
769            checksum=cache_checksum,
770            category=0,
771        )
772
773        return result
774
775    async def get_podcast_episodes(
776        self, prov_podcast_id: str
777    ) -> AsyncGenerator[PodcastEpisode, None]:
778        """Get podcast episodes for given podcast id."""
779        episodes: list[PodcastEpisode] = []
780
781        async def _process_podcast_episode(item: FileSystemItem) -> None:
782            tags = await async_parse_tags(item.absolute_path, item.file_size)
783            try:
784                episode = await self._parse_podcast_episode(item, tags)
785            except MusicAssistantError as err:
786                self.logger.warning(
787                    "Could not parse uri/file %s to podcast episode: %s",
788                    item.relative_path,
789                    str(err),
790                )
791            else:
792                episodes.append(episode)
793
794        async with TaskManager(self.mass, 25) as tm:
795            for item in await asyncio.to_thread(sorted_scandir, self.base_path, prov_podcast_id):
796                if "." not in item.relative_path or item.is_dir:
797                    continue
798                if item.ext not in PODCAST_EPISODE_EXTENSIONS:
799                    continue
800                tm.create_task(_process_podcast_episode(item))
801
802        for episode in episodes:
803            yield episode
804
805    async def _parse_playlist_line(self, line: str, playlist_path: str) -> Track | None:
806        """Try to parse a track from a playlist line."""
807        try:
808            line = line.replace("file://", "").strip()
809            # try to resolve the filename (both normal and url decoded):
810            # - as an absolute path
811            # - relative to the playlist path
812            # - relative to our base path
813            # - relative to the playlist path with a leading slash
814            for _line in (line, urllib.parse.unquote(line)):
815                for filename in (
816                    # try to resolve the line by resolving it against the (absolute) playlist path
817                    # use the path.resolve step in between to auto-resolve parent item references
818                    (Path(self.get_absolute_path(playlist_path)) / _line).resolve().as_posix(),
819                    # try to resolve the line as a full absolute (or relative to music dir) path
820                    _line,
821                ):
822                    with contextlib.suppress(FileNotFoundError):
823                        file_item = await self.resolve(filename)
824                        tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
825                        return await self._parse_track(file_item, tags)
826            # all attempts failed
827            raise MediaNotFoundError("Invalid path/uri")
828
829        except MusicAssistantError as err:
830            self.logger.warning("Could not parse %s to track: %s", line, str(err))
831
832        return None
833
834    async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
835        """Add track(s) to playlist."""
836        if not await self.exists(prov_playlist_id):
837            msg = f"Playlist path does not exist: {prov_playlist_id}"
838            raise MediaNotFoundError(msg)
839        playlist_filename = self.get_absolute_path(prov_playlist_id)
840        async with aiofiles.open(playlist_filename, encoding="utf-8") as _file:
841            playlist_data = await _file.read()
842        for file_path in prov_track_ids:
843            track = await self.get_track(file_path)
844            playlist_data += f"\n#EXTINF:{track.duration or 0},{track.name}\n{file_path}\n"
845
846        # write playlist file (always in utf-8)
847        async with aiofiles.open(playlist_filename, "w", encoding="utf-8") as _file:
848            await _file.write(playlist_data)
849
850    async def remove_playlist_tracks(
851        self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
852    ) -> None:
853        """Remove track(s) from playlist."""
854        if not await self.exists(prov_playlist_id):
855            msg = f"Playlist path does not exist: {prov_playlist_id}"
856            raise MediaNotFoundError(msg)
857        _, ext = prov_playlist_id.rsplit(".", 1)
858        # get playlist file contents
859        playlist_filename = self.get_absolute_path(prov_playlist_id)
860        async with aiofiles.open(playlist_filename, encoding="utf-8") as _file:
861            playlist_data = await _file.read()
862        # get current contents first
863        if ext in ("m3u", "m3u8"):
864            playlist_items = parse_m3u(playlist_data)
865        else:
866            playlist_items = parse_pls(playlist_data)
867        # remove items by index
868        for i in sorted(positions_to_remove, reverse=True):
869            # position = index + 1
870            del playlist_items[i - 1]
871        # build new playlist data
872        new_playlist_data = "#EXTM3U\n"
873        for item in playlist_items:
874            new_playlist_data += f"\n#EXTINF:{item.length or 0},{item.title}\n{item.path}\n"
875        async with aiofiles.open(playlist_filename, "w", encoding="utf-8") as _file:
876            await _file.write(new_playlist_data)
877
878    async def create_playlist(self, name: str) -> Playlist:
879        """Create a new playlist on provider with given name."""
880        # creating a new playlist on the filesystem is as easy
881        # as creating a new (empty) file with the m3u extension...
882        # filename = await self.resolve(f"{name}.m3u")
883        filename = f"{name}.m3u"
884        playlist_filename = self.get_absolute_path(filename)
885        async with aiofiles.open(playlist_filename, "w", encoding="utf-8") as _file:
886            await _file.write("#EXTM3U\n")
887        return await self.get_playlist(filename)
888
889    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
890        """Return the content details for the given track when it will be streamed."""
891        try:
892            if media_type == MediaType.AUDIOBOOK:
893                return await self._get_stream_details_for_audiobook(item_id)
894            if media_type == MediaType.PODCAST_EPISODE:
895                return await self._get_stream_details_for_podcast_episode(item_id)
896            return await self._get_stream_details_for_track(item_id)
897        except FileNotFoundError:
898            self.logger.warning(
899                "File not found for media item %s",
900                item_id,
901            )
902            msg = f"Media file not found: {item_id}"
903            raise MediaNotFoundError(msg)
904
905    async def resolve_image(self, path: str) -> str | bytes:
906        """
907        Resolve an image from an image path.
908
909        This either returns (a generator to get) raw bytes of the image or
910        a string with an http(s) URL or local path that is accessible from the server.
911        """
912        file_item = await self.resolve(path)
913        return file_item.absolute_path
914
915    async def _parse_track(
916        self, file_item: FileSystemItem, tags: AudioTags, full_album_metadata: bool = False
917    ) -> Track:
918        """Parse full track details from file tags."""
919        # ruff: noqa: PLR0915
920        name, version = parse_title_and_version(tags.title, tags.version)
921        track = Track(
922            item_id=file_item.relative_path,
923            provider=self.instance_id,
924            name=name,
925            sort_name=tags.title_sort,
926            version=version,
927            provider_mappings={
928                ProviderMapping(
929                    item_id=file_item.relative_path,
930                    provider_domain=self.domain,
931                    provider_instance=self.instance_id,
932                    audio_format=AudioFormat(
933                        content_type=ContentType.try_parse(file_item.ext or tags.format),
934                        sample_rate=tags.sample_rate,
935                        bit_depth=tags.bits_per_sample,
936                        channels=tags.channels,
937                        bit_rate=tags.bit_rate,
938                    ),
939                    details=file_item.checksum,
940                    in_library=True,
941                )
942            },
943            disc_number=tags.disc or 0,
944            track_number=tags.track or 0,
945            date_added=(
946                datetime.fromtimestamp(file_item.created_at, tz=UTC)
947                if file_item.created_at
948                else None
949            ),
950        )
951
952        if isrc_tags := tags.isrc:
953            for isrsc in isrc_tags:
954                track.external_ids.add((ExternalID.ISRC, isrsc))
955
956        if acoustid := tags.get("acoustid"):
957            track.external_ids.add((ExternalID.ACOUSTID, acoustid))
958
959        # album
960        album = track.album = (
961            await self._parse_album(
962                track_path=file_item.relative_path,
963                track_tags=tags,
964                track_created_at=file_item.created_at,
965            )
966            if tags.album
967            else None
968        )
969
970        # track artist(s)
971        for index, track_artist_str in enumerate(tags.artists):
972            # prefer album artist if match
973            if album and (
974                album_artist_match := next(
975                    (x for x in album.artists if x.name == track_artist_str), None
976                )
977            ):
978                track.artists.append(album_artist_match)
979                continue
980            artist = await self._parse_artist(
981                track_artist_str,
982                sort_name=(
983                    tags.artist_sort_names[index] if index < len(tags.artist_sort_names) else None
984                ),
985                mbid=(
986                    tags.musicbrainz_artistids[index]
987                    if index < len(tags.musicbrainz_artistids)
988                    else None
989                ),
990            )
991            track.artists.append(artist)
992
993        # handle embedded cover image
994        if tags.has_cover_image:
995            # we do not actually embed the image in the metadata because that would consume too
996            # much space and bandwidth. Instead we set the filename as value so the image can
997            # be retrieved later in realtime.
998            track.metadata.images = UniqueList(
999                [
1000                    MediaItemImage(
1001                        type=ImageType.THUMB,
1002                        path=file_item.relative_path,
1003                        provider=self.instance_id,
1004                        remotely_accessible=False,
1005                    )
1006                ]
1007            )
1008
1009        # copy (embedded) album image from track (if the album itself doesn't have an image)
1010        if album and not album.image and track.image:
1011            album.metadata.images = UniqueList([track.image])
1012
1013        # parse other info
1014        track.duration = int(tags.duration or 0)
1015        track.metadata.genres = set(tags.genres)
1016        if tags.disc:
1017            track.disc_number = tags.disc
1018        if tags.track:
1019            track.track_number = tags.track
1020        track.metadata.copyright = tags.get("copyright")
1021        track.metadata.lyrics = tags.lyrics
1022        track.metadata.grouping = tags.get("grouping")
1023        track.metadata.description = tags.get("comment")
1024        explicit_tag = tags.get("itunesadvisory")
1025        if explicit_tag is not None:
1026            track.metadata.explicit = explicit_tag == "1"
1027        if tags.musicbrainz_recordingid:
1028            track.mbid = tags.musicbrainz_recordingid
1029
1030        # handle (optional) loudness measurement tag(s)
1031        if tags.track_loudness is not None:
1032            self.mass.create_task(
1033                self.mass.music.set_loudness(
1034                    track.item_id,
1035                    self.instance_id,
1036                    tags.track_loudness,
1037                    tags.track_album_loudness,
1038                )
1039            )
1040
1041        # possible lrclib metadata
1042        # synced lyrics are saved as "filename.lrc" by lrcget alongside
1043        # the actual file location - just change the file extension
1044        assert file_item.ext is not None  # for type checking
1045        lrc_path = f"{file_item.absolute_path.removesuffix(file_item.ext)}lrc"
1046        if await self.exists(lrc_path):
1047            try:
1048                async with aiofiles.open(lrc_path, encoding="utf-8") as lrc_file:
1049                    track.metadata.lrc_lyrics = await lrc_file.read()
1050            except Exception as err:
1051                self.logger.warning(
1052                    "Failed to read lyrics file %s: %s",
1053                    lrc_path,
1054                    str(err),
1055                )
1056
1057        return track
1058
1059    async def _parse_artist(
1060        self,
1061        name: str,
1062        album_dir: str | None = None,
1063        sort_name: str | None = None,
1064        mbid: str | None = None,
1065        artist_path: str | None = None,
1066    ) -> Artist:
1067        """Parse full (album) Artist."""
1068        if not artist_path:
1069            # we need to hunt for the artist (metadata) path on disk
1070            # this can either be relative to the album path or at root level
1071            # check if we have an artist folder for this artist at root level
1072            safe_artist_name = create_safe_string(name, lowercase=False, replace_space=False)
1073            if await self.exists(name):
1074                artist_path = name
1075            elif await self.exists(safe_artist_name):
1076                artist_path = safe_artist_name
1077            elif album_dir and (foldermatch := get_artist_dir(name, album_dir=album_dir)):
1078                # try to find (album)artist folder based on album path
1079                artist_path = foldermatch
1080            else:
1081                # check if we have an existing item to retrieve the artist path
1082                async for item in self.mass.music.artists.iter_library_items(
1083                    search=name, provider=self.instance_id
1084                ):
1085                    if not compare_strings(name, item.name):
1086                        continue
1087                    for prov_mapping in item.provider_mappings:
1088                        if prov_mapping.provider_instance != self.instance_id:
1089                            continue
1090                        if prov_mapping.url:
1091                            artist_path = prov_mapping.url
1092                            break
1093                    if artist_path:
1094                        break
1095
1096        # prefer (short lived) cache for a bit more speed
1097        if artist_path and (
1098            cache := await self.cache.get(
1099                key=artist_path, provider=self.instance_id, category=CACHE_CATEGORY_ARTIST_INFO
1100            )
1101        ):
1102            return cast("Artist", cache)
1103
1104        prov_artist_id = artist_path or name
1105        artist = Artist(
1106            item_id=prov_artist_id,
1107            provider=self.instance_id,
1108            name=name,
1109            sort_name=sort_name,
1110            provider_mappings={
1111                ProviderMapping(
1112                    item_id=prov_artist_id,
1113                    provider_domain=self.domain,
1114                    provider_instance=self.instance_id,
1115                    url=artist_path,
1116                    in_library=True,
1117                )
1118            },
1119        )
1120        if mbid:
1121            artist.mbid = mbid
1122        if not artist_path:
1123            return artist
1124
1125        # grab additional metadata within the Artist's folder
1126        nfo_file = os.path.join(artist_path, "artist.nfo")
1127        if await self.exists(nfo_file):
1128            # found NFO file with metadata
1129            # https://kodi.wiki/view/NFO_files/Artists
1130            nfo_file = self.get_absolute_path(nfo_file)
1131            async with aiofiles.open(nfo_file) as _file:
1132                data = await _file.read()
1133            info = await asyncio.to_thread(xmltodict.parse, data)
1134            info = info["artist"]
1135            artist.name = info.get("title", info.get("name", name))
1136            if sort_name := info.get("sortname"):
1137                artist.sort_name = sort_name
1138            if mbid := info.get("musicbrainzartistid"):
1139                artist.mbid = mbid
1140            if description := info.get("biography"):
1141                artist.metadata.description = description
1142            if genre := info.get("genre"):
1143                artist.metadata.genres = set(split_items(genre))
1144        # find local images
1145        if images := await self._get_local_images(artist_path, extra_thumb_names=("artist",)):
1146            artist.metadata.images = UniqueList(images)
1147
1148        await self.cache.set(
1149            key=artist_path,
1150            data=artist,
1151            provider=self.instance_id,
1152            category=CACHE_CATEGORY_ARTIST_INFO,
1153            expiration=120,
1154        )
1155
1156        return artist
1157
1158    async def _parse_audiobook(self, file_item: FileSystemItem, tags: AudioTags) -> Audiobook:
1159        """Parse Audiobook details from file tags.
1160
1161        Audiobooks can be single files with embedded chapters or multiple files per folder.
1162        Only the first file (by track number or alphabetically) is processed as the audiobook.
1163        """
1164        # Skip files that aren't the first chapter
1165        track_tag = tags.tags.get("track")
1166        if track_tag:
1167            track_num = try_parse_int(str(track_tag).split("/")[0], None)
1168            if track_num and track_num > 1:
1169                raise IsChapterFile
1170        else:
1171            # No track tag - only process the first file alphabetically
1172            abs_path = self.get_absolute_path(file_item.parent_path)
1173            for item in await asyncio.to_thread(
1174                sorted_scandir, self.base_path, abs_path, sort=True
1175            ):
1176                if item.is_dir or item.ext not in AUDIOBOOK_EXTENSIONS:
1177                    continue
1178                if item.absolute_path != file_item.absolute_path:
1179                    raise IsChapterFile
1180                break
1181
1182        # For multi-file audiobooks, album tag is the book name, title is the chapter name
1183        if tags.album:
1184            book_name = tags.album
1185            sort_name = tags.album_sort
1186        elif (title := tags.tags.get("title")) and tags.track is None:
1187            book_name = title
1188            sort_name = tags.title_sort
1189        else:
1190            # file(s) without tags, use foldername
1191            book_name = file_item.parent_name
1192            sort_name = None
1193
1194        # collect all chapters
1195        total_duration, chapters = await self._get_chapters_for_audiobook(file_item, tags)
1196
1197        audio_book = Audiobook(
1198            item_id=file_item.relative_path,
1199            provider=self.instance_id,
1200            name=book_name,
1201            sort_name=sort_name,
1202            version=tags.version,
1203            duration=total_duration or int(tags.duration or 0),
1204            provider_mappings={
1205                ProviderMapping(
1206                    item_id=file_item.relative_path,
1207                    provider_domain=self.domain,
1208                    provider_instance=self.instance_id,
1209                    audio_format=AudioFormat(
1210                        content_type=ContentType.try_parse(file_item.ext or tags.format),
1211                        sample_rate=tags.sample_rate,
1212                        bit_depth=tags.bits_per_sample,
1213                        channels=tags.channels,
1214                        bit_rate=tags.bit_rate,
1215                    ),
1216                    details=file_item.checksum,
1217                    in_library=True,
1218                )
1219            },
1220        )
1221        audio_book.metadata.chapters = chapters
1222
1223        # handle embedded cover image
1224        if tags.has_cover_image:
1225            # we do not actually embed the image in the metadata because that would consume too
1226            # much space and bandwidth. Instead we set the filename as value so the image can
1227            # be retrieved later in realtime.
1228            audio_book.metadata.add_image(
1229                MediaItemImage(
1230                    type=ImageType.THUMB,
1231                    path=file_item.relative_path,
1232                    provider=self.instance_id,
1233                    remotely_accessible=False,
1234                )
1235            )
1236
1237        # parse other info
1238        audio_book.authors.set(tags.writers or tags.album_artists or tags.artists)
1239        audio_book.metadata.genres = set(tags.genres)
1240        audio_book.metadata.copyright = tags.get("copyright")
1241        audio_book.metadata.lyrics = tags.lyrics
1242        audio_book.metadata.description = tags.get("comment")
1243        explicit_tag = tags.get("itunesadvisory")
1244        if explicit_tag is not None:
1245            audio_book.metadata.explicit = explicit_tag == "1"
1246        if tags.musicbrainz_recordingid:
1247            audio_book.mbid = tags.musicbrainz_recordingid
1248
1249        # try to fetch additional metadata from the folder
1250        if not audio_book.image or not audio_book.metadata.description:
1251            # try to get an image by traversing files in the same folder
1252            abs_path = self.get_absolute_path(file_item.parent_path)
1253            for _item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path):
1254                if "." not in _item.relative_path or _item.is_dir:
1255                    continue
1256                if _item.ext in IMAGE_EXTENSIONS and not audio_book.image:
1257                    audio_book.metadata.add_image(
1258                        MediaItemImage(
1259                            type=ImageType.THUMB,
1260                            path=_item.relative_path,
1261                            provider=self.instance_id,
1262                            remotely_accessible=False,
1263                        )
1264                    )
1265                if _item.ext == "txt" and not audio_book.metadata.description:
1266                    # try to parse a description from a text file
1267                    try:
1268                        async with aiofiles.open(_item.absolute_path, encoding="utf-8") as _file:
1269                            description = await _file.read()
1270                        audio_book.metadata.description = description
1271                    except Exception as err:
1272                        self.logger.warning(
1273                            "Could not read description from file %s: %s",
1274                            _item.relative_path,
1275                            str(err),
1276                        )
1277
1278        # handle (optional) loudness measurement tag(s)
1279        if tags.track_loudness is not None:
1280            self.mass.create_task(
1281                self.mass.music.set_loudness(
1282                    audio_book.item_id,
1283                    self.instance_id,
1284                    tags.track_loudness,
1285                    tags.track_album_loudness,
1286                    media_type=MediaType.AUDIOBOOK,
1287                )
1288            )
1289        return audio_book
1290
1291    async def _parse_podcast_episode(
1292        self, file_item: FileSystemItem, tags: AudioTags
1293    ) -> PodcastEpisode:
1294        """Parse full PodcastEpisode details from file tags."""
1295        # ruff: noqa: PLR0915
1296        podcast_name = tags.album or file_item.parent_name
1297        podcast_path = get_relative_path(self.base_path, file_item.parent_path)
1298        episode = PodcastEpisode(
1299            item_id=file_item.relative_path,
1300            provider=self.instance_id,
1301            name=tags.title,
1302            sort_name=tags.title_sort,
1303            provider_mappings={
1304                ProviderMapping(
1305                    item_id=file_item.relative_path,
1306                    provider_domain=self.domain,
1307                    provider_instance=self.instance_id,
1308                    audio_format=AudioFormat(
1309                        content_type=ContentType.try_parse(file_item.ext or tags.format),
1310                        sample_rate=tags.sample_rate,
1311                        bit_depth=tags.bits_per_sample,
1312                        channels=tags.channels,
1313                        bit_rate=tags.bit_rate,
1314                    ),
1315                    details=file_item.checksum,
1316                    in_library=True,
1317                )
1318            },
1319            position=tags.track or 0,
1320            duration=try_parse_int(tags.duration) or 0,
1321            podcast=Podcast(
1322                item_id=podcast_path,
1323                provider=self.instance_id,
1324                name=podcast_name,
1325                sort_name=tags.album_sort,
1326                publisher=tags.tags.get("publisher"),
1327                provider_mappings={
1328                    ProviderMapping(
1329                        item_id=podcast_path,
1330                        provider_domain=self.domain,
1331                        provider_instance=self.instance_id,
1332                    )
1333                },
1334            ),
1335        )
1336        # handle embedded cover image
1337        if tags.has_cover_image:
1338            # we do not actually embed the image in the metadata because that would consume too
1339            # much space and bandwidth. Instead we set the filename as value so the image can
1340            # be retrieved later in realtime.
1341            episode.metadata.add_image(
1342                MediaItemImage(
1343                    type=ImageType.THUMB,
1344                    path=file_item.relative_path,
1345                    provider=self.instance_id,
1346                    remotely_accessible=False,
1347                )
1348            )
1349        # parse other info
1350        episode.metadata.genres = set(tags.genres)
1351        episode.metadata.copyright = tags.get("copyright")
1352        episode.metadata.lyrics = tags.lyrics
1353        episode.metadata.description = tags.get("comment")
1354        explicit_tag = tags.get("itunesadvisory")
1355        if explicit_tag is not None:
1356            episode.metadata.explicit = explicit_tag == "1"
1357
1358        # handle (optional) chapters
1359        if tags.chapters:
1360            episode.metadata.chapters = [
1361                MediaItemChapter(
1362                    position=chapter.chapter_id,
1363                    name=chapter.title or f"Chapter {chapter.chapter_id}",
1364                    start=chapter.position_start,
1365                    end=chapter.position_end,
1366                )
1367                for chapter in tags.chapters
1368            ]
1369
1370        # try to fetch additional Podcast metadata from the folder
1371        assert isinstance(episode.podcast, Podcast)
1372        if images := await self._get_local_images(file_item.parent_path):
1373            episode.podcast.metadata.images = images
1374        if metadata := await self._get_podcast_metadata(file_item.parent_path):
1375            if title := metadata.get("title"):
1376                episode.podcast.name = title
1377            if sort_name := metadata.get("sorttitle"):
1378                episode.podcast.sort_name = sort_name
1379            if description := metadata.get("description"):
1380                episode.podcast.metadata.description = description
1381            if genres := metadata.get("genres"):
1382                episode.podcast.metadata.genres = set(genres)
1383            if publisher := metadata.get("publisher"):
1384                episode.podcast.publisher = publisher
1385            if image := metadata.get("imageURL"):
1386                episode.podcast.metadata.add_image(
1387                    MediaItemImage(
1388                        type=ImageType.THUMB,
1389                        path=image,
1390                        provider=self.instance_id,
1391                        remotely_accessible=True,
1392                    )
1393                )
1394        # copy (embedded) image from episode (or vice versa)
1395        if not episode.podcast.image and episode.image:
1396            episode.podcast.metadata.add_image(episode.image)
1397        elif not episode.image and episode.podcast.image:
1398            episode.metadata.add_image(episode.podcast.image)
1399
1400        # handle (optional) loudness measurement tag(s)
1401        if tags.track_loudness is not None:
1402            self.mass.create_task(
1403                self.mass.music.set_loudness(
1404                    episode.item_id,
1405                    self.instance_id,
1406                    tags.track_loudness,
1407                    tags.track_album_loudness,
1408                    media_type=MediaType.PODCAST_EPISODE,
1409                )
1410            )
1411        return episode
1412
1413    async def _parse_album(
1414        self, track_path: str, track_tags: AudioTags, track_created_at: int | None = None
1415    ) -> Album:
1416        """Parse Album metadata from Track tags.
1417
1418        :param track_path: Path to the track file.
1419        :param track_tags: Audio tags from the track.
1420        :param track_created_at: Creation timestamp of the track file (Unix epoch).
1421        """
1422        assert track_tags.album
1423        # work out if we have an album and/or disc folder
1424        # track_dir is the folder level where the tracks are located
1425        # this may be a separate disc folder (Disc 1, Disc 2 etc) underneath the album folder
1426        # or this is an album folder with the disc attached
1427        track_dir = os.path.dirname(track_path)
1428        album_dir = get_album_dir(track_dir, track_tags.album)
1429
1430        if album_dir and (
1431            cache := await self.cache.get(
1432                key=album_dir,
1433                provider=self.instance_id,
1434                category=CACHE_CATEGORY_ALBUM_INFO,
1435            )
1436        ):
1437            return cast("Album", cache)
1438
1439        # album artist(s)
1440        album_artists: UniqueList[Artist | ItemMapping] = UniqueList()
1441        if track_tags.album_artists:
1442            for index, album_artist_str in enumerate(track_tags.album_artists):
1443                artist = await self._parse_artist(
1444                    album_artist_str,
1445                    album_dir=album_dir,
1446                    sort_name=(
1447                        track_tags.album_artist_sort_names[index]
1448                        if index < len(track_tags.album_artist_sort_names)
1449                        else None
1450                    ),
1451                    mbid=(
1452                        track_tags.musicbrainz_albumartistids[index]
1453                        if index < len(track_tags.musicbrainz_albumartistids)
1454                        else None
1455                    ),
1456                )
1457                album_artists.append(artist)
1458        else:
1459            # album artist tag is missing, determine fallback
1460            fallback_action = self.config.get_value(CONF_ENTRY_MISSING_ALBUM_ARTIST.key)
1461            if fallback_action == "folder_name" and album_dir:
1462                possible_artist_folder = os.path.dirname(album_dir)
1463                self.logger.warning(
1464                    "%s is missing ID3 tag [albumartist], using foldername %s as fallback",
1465                    track_path,
1466                    possible_artist_folder,
1467                )
1468                album_artist_str = possible_artist_folder.rsplit(os.sep)[-1]
1469                album_artists = UniqueList(
1470                    [await self._parse_artist(name=album_artist_str, album_dir=album_dir)]
1471                )
1472            # fallback to track artists (if defined by user)
1473            elif fallback_action == "track_artist":
1474                self.logger.warning(
1475                    "%s is missing ID3 tag [albumartist], using track artist(s) as fallback",
1476                    track_path,
1477                )
1478                album_artists = UniqueList(
1479                    [
1480                        await self._parse_artist(name=track_artist_str, album_dir=album_dir)
1481                        for track_artist_str in track_tags.artists
1482                    ]
1483                )
1484            # all other: fallback to various artists
1485            else:
1486                self.logger.warning(
1487                    "%s is missing ID3 tag [albumartist], using %s as fallback",
1488                    track_path,
1489                    VARIOUS_ARTISTS_NAME,
1490                )
1491                album_artists = UniqueList(
1492                    [await self._parse_artist(name=VARIOUS_ARTISTS_NAME, mbid=VARIOUS_ARTISTS_MBID)]
1493                )
1494
1495        if album_dir:  # noqa: SIM108
1496            # prefer the path as id
1497            item_id = album_dir
1498        else:
1499            # create fake item_id based on artist + album
1500            item_id = album_artists[0].name + os.sep + track_tags.album
1501
1502        name, version = parse_title_and_version(track_tags.album)
1503        album = Album(
1504            item_id=item_id,
1505            provider=self.instance_id,
1506            name=name,
1507            version=version,
1508            sort_name=track_tags.album_sort,
1509            artists=album_artists,
1510            provider_mappings={
1511                ProviderMapping(
1512                    item_id=item_id,
1513                    provider_domain=self.domain,
1514                    provider_instance=self.instance_id,
1515                    url=album_dir,
1516                    in_library=True,
1517                )
1518            },
1519            date_added=(
1520                datetime.fromtimestamp(track_created_at, tz=UTC) if track_created_at else None
1521            ),
1522        )
1523        if track_tags.barcode:
1524            album.external_ids.add((ExternalID.BARCODE, track_tags.barcode))
1525
1526        if track_tags.musicbrainz_albumid:
1527            album.mbid = track_tags.musicbrainz_albumid
1528        if track_tags.musicbrainz_releasegroupid:
1529            album.add_external_id(ExternalID.MB_RELEASEGROUP, track_tags.musicbrainz_releasegroupid)
1530        if track_tags.year:
1531            album.year = track_tags.year
1532        album.album_type = track_tags.album_type
1533
1534        # hunt for additional metadata and images in the folder structure
1535        if not album_dir:
1536            return album
1537
1538        for folder_path in (track_dir, album_dir):
1539            if not folder_path or not await self.exists(folder_path):
1540                continue
1541            nfo_file = os.path.join(folder_path, "album.nfo")
1542            if await self.exists(nfo_file):
1543                # found NFO file with metadata
1544                # https://kodi.wiki/view/NFO_files/Artists
1545                nfo_file = self.get_absolute_path(nfo_file)
1546                async with aiofiles.open(nfo_file) as _file:
1547                    data = await _file.read()
1548                info = await asyncio.to_thread(xmltodict.parse, data)
1549                info = info["album"]
1550                album.name = info.get("title", info.get("name", name))
1551                if sort_name := info.get("sortname"):
1552                    album.sort_name = sort_name
1553                if releasegroup_id := info.get("musicbrainzreleasegroupid"):
1554                    album.add_external_id(ExternalID.MB_RELEASEGROUP, releasegroup_id)
1555                if album_id := info.get("musicbrainzalbumid"):
1556                    album.add_external_id(ExternalID.MB_ALBUM, album_id)
1557                if mb_artist_id := info.get("musicbrainzalbumartistid"):
1558                    if album.artists and not album.artists[0].mbid:
1559                        album.artists[0].mbid = mb_artist_id
1560                if description := info.get("review"):
1561                    album.metadata.description = description
1562                if year := info.get("year"):
1563                    album.year = int(year)
1564                if genre := info.get("genre"):
1565                    album.metadata.genres = set(split_items(genre))
1566            # parse name/version
1567            album.name, album.version = parse_title_and_version(album.name)
1568            # find local images
1569            if images := await self._get_local_images(folder_path, extra_thumb_names=("album",)):
1570                if album.metadata.images is None:
1571                    album.metadata.images = UniqueList(images)
1572                else:
1573                    album.metadata.images += images
1574        await self.cache.set(
1575            key=album_dir,
1576            data=album,
1577            provider=self.instance_id,
1578            category=CACHE_CATEGORY_ALBUM_INFO,
1579            expiration=120,
1580        )
1581        return album
1582
1583    async def _get_local_images(
1584        self, folder: str, extra_thumb_names: tuple[str, ...] | None = None
1585    ) -> UniqueList[MediaItemImage]:
1586        """Return local images found in a given folderpath."""
1587        if (
1588            cache := await self.cache.get(
1589                key=folder, provider=self.instance_id, category=CACHE_CATEGORY_FOLDER_IMAGES
1590            )
1591        ) is not None:
1592            return cast("UniqueList[MediaItemImage]", cache)
1593        if extra_thumb_names is None:
1594            extra_thumb_names = ()
1595        images: UniqueList[MediaItemImage] = UniqueList()
1596        abs_path = self.get_absolute_path(folder)
1597        folder_files = await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=False)
1598        for item in folder_files:
1599            if "." not in item.relative_path or item.is_dir or not item.ext:
1600                continue
1601            if item.ext.lower() not in IMAGE_EXTENSIONS:
1602                continue
1603            # try match on filename = one of our imagetypes
1604            if item.name.lower() in ImageType:
1605                images.append(
1606                    MediaItemImage(
1607                        type=ImageType(item.name),
1608                        path=item.relative_path,
1609                        provider=self.instance_id,
1610                        remotely_accessible=False,
1611                    )
1612                )
1613
1614        # try alternative names for thumbs
1615        extra_thumb_names = ("folder", "cover", *extra_thumb_names)
1616        for item in folder_files:
1617            if "." not in item.relative_path or item.is_dir or not item.ext:
1618                continue
1619            if item.ext.lower() not in IMAGE_EXTENSIONS:
1620                continue
1621            if item.name.lower() not in extra_thumb_names:
1622                continue
1623            images.append(
1624                MediaItemImage(
1625                    type=ImageType.THUMB,
1626                    path=item.relative_path,
1627                    provider=self.instance_id,
1628                    remotely_accessible=False,
1629                )
1630            )
1631
1632        await self.cache.set(
1633            key=folder,
1634            data=images,
1635            provider=self.instance_id,
1636            category=CACHE_CATEGORY_FOLDER_IMAGES,
1637            expiration=120,
1638        )
1639        return images
1640
1641    async def check_write_access(self) -> None:
1642        """Perform check if we have write access."""
1643        # verify write access to determine we have playlist create/edit support
1644        # overwrite with provider specific implementation if needed
1645        temp_file_name = self.get_absolute_path(f"{shortuuid.random(8)}.txt")
1646        try:
1647            async with aiofiles.open(temp_file_name, "w") as _file:
1648                await _file.write("test")
1649            await asyncio.to_thread(os.remove, temp_file_name)
1650            self.write_access = True
1651        except Exception as err:
1652            self.logger.debug("Write access disabled: %s", str(err))
1653
1654    async def resolve(
1655        self,
1656        file_path: str,
1657    ) -> FileSystemItem:
1658        """Resolve (absolute or relative) path to FileSystemItem."""
1659        absolute_path = self.get_absolute_path(file_path)
1660
1661        def _create_item() -> FileSystemItem:
1662            if os.path.isdir(absolute_path):
1663                return FileSystemItem(
1664                    filename=os.path.basename(file_path),
1665                    relative_path=get_relative_path(self.base_path, file_path),
1666                    absolute_path=absolute_path,
1667                    is_dir=True,
1668                )
1669            stat = os.stat(absolute_path, follow_symlinks=False)
1670            return FileSystemItem(
1671                filename=os.path.basename(file_path),
1672                relative_path=get_relative_path(self.base_path, file_path),
1673                absolute_path=absolute_path,
1674                is_dir=False,
1675                checksum=str(int(stat.st_mtime)),
1676                file_size=stat.st_size,
1677            )
1678
1679        # run in thread because strictly taken this may be blocking IO
1680        return await asyncio.to_thread(_create_item)
1681
1682    async def exists(self, file_path: str) -> bool:
1683        """Return bool is this FileSystem musicprovider has given file/dir."""
1684        if not file_path:
1685            return False  # guard
1686        abs_path = self.get_absolute_path(file_path)
1687        return bool(await exists(abs_path))
1688
1689    def get_absolute_path(self, file_path: str) -> str:
1690        """Return absolute path for given file path."""
1691        return get_absolute_path(self.base_path, file_path)
1692
1693    async def _get_stream_details_for_track(self, item_id: str) -> StreamDetails:
1694        """Return the streamdetails for a track/song."""
1695        library_item = await self.mass.music.tracks.get_library_item_by_prov_id(
1696            item_id, self.instance_id
1697        )
1698        if library_item is None:
1699            # this could be a file that has just been added, try parsing it
1700            file_item = await self.resolve(item_id)
1701            tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
1702            if not (library_item := await self._parse_track(file_item, tags)):
1703                msg = f"Item not found: {item_id}"
1704                raise MediaNotFoundError(msg)
1705
1706        prov_mapping = next(x for x in library_item.provider_mappings if x.item_id == item_id)
1707        file_item = await self.resolve(item_id)
1708
1709        return StreamDetails(
1710            provider=self.instance_id,
1711            item_id=item_id,
1712            audio_format=prov_mapping.audio_format,
1713            media_type=MediaType.TRACK,
1714            stream_type=StreamType.LOCAL_FILE,
1715            duration=library_item.duration,
1716            size=file_item.file_size,
1717            data=file_item,
1718            path=file_item.absolute_path,
1719            can_seek=True,
1720            allow_seek=True,
1721        )
1722
1723    async def _get_stream_details_for_podcast_episode(self, item_id: str) -> StreamDetails:
1724        """Return the streamdetails for a podcast episode."""
1725        # podcasts episodes are never stored in the library so we need to parse the file
1726        file_item = await self.resolve(item_id)
1727        tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
1728        return StreamDetails(
1729            provider=self.instance_id,
1730            item_id=item_id,
1731            audio_format=AudioFormat(
1732                content_type=ContentType.try_parse(file_item.ext or tags.format),
1733                sample_rate=tags.sample_rate,
1734                bit_depth=tags.bits_per_sample,
1735                channels=tags.channels,
1736                bit_rate=tags.bit_rate,
1737            ),
1738            media_type=MediaType.PODCAST_EPISODE,
1739            stream_type=StreamType.LOCAL_FILE,
1740            duration=try_parse_int(tags.duration or 0),
1741            size=file_item.file_size,
1742            data=file_item,
1743            path=file_item.absolute_path,
1744            allow_seek=True,
1745            can_seek=True,
1746        )
1747
1748    async def _get_stream_details_for_audiobook(self, item_id: str) -> StreamDetails:
1749        """Return the streamdetails for an audiobook."""
1750        library_item = await self.mass.music.audiobooks.get_library_item_by_prov_id(
1751            item_id, self.instance_id
1752        )
1753        if library_item is None:
1754            # this could be a file that has just been added, try parsing it
1755            file_item = await self.resolve(item_id)
1756            tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
1757            if not (library_item := await self._parse_audiobook(file_item, tags)):
1758                msg = f"Item not found: {item_id}"
1759                raise MediaNotFoundError(msg)
1760
1761        prov_mapping = next(x for x in library_item.provider_mappings if x.item_id == item_id)
1762        file_item = await self.resolve(item_id)
1763        duration = library_item.duration
1764        file_based_chapters: list[tuple[str, float]] | None = await self.cache.get(
1765            key=file_item.relative_path,
1766            provider=self.instance_id,
1767            category=CACHE_CATEGORY_AUDIOBOOK_CHAPTERS,
1768        )
1769        if file_based_chapters is None:
1770            # no cache available for this audiobook, we need to parse the chapters
1771            tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
1772            await self._parse_audiobook(file_item, tags)
1773            file_based_chapters = await self.cache.get(
1774                key=file_item.relative_path,
1775                provider=self.instance_id,
1776                category=CACHE_CATEGORY_AUDIOBOOK_CHAPTERS,
1777            )
1778
1779        if file_based_chapters:
1780            # this is a multi-file audiobook
1781            return StreamDetails(
1782                provider=self.instance_id,
1783                item_id=item_id,
1784                audio_format=prov_mapping.audio_format,
1785                media_type=MediaType.AUDIOBOOK,
1786                stream_type=StreamType.LOCAL_FILE,
1787                duration=duration,
1788                path=[
1789                    MultiPartPath(path=self.get_absolute_path(path), duration=duration)
1790                    for path, duration in file_based_chapters
1791                ],
1792                allow_seek=True,
1793            )
1794
1795        # regular single-file streaming, simply let ffmpeg deal with the file directly
1796        return StreamDetails(
1797            provider=self.instance_id,
1798            item_id=item_id,
1799            audio_format=prov_mapping.audio_format,
1800            media_type=MediaType.AUDIOBOOK,
1801            stream_type=StreamType.LOCAL_FILE,
1802            duration=library_item.duration,
1803            size=file_item.file_size,
1804            data=file_item,
1805            path=file_item.absolute_path,
1806            allow_seek=True,
1807            can_seek=True,
1808        )
1809
1810    async def _get_chapters_for_audiobook(
1811        self, audiobook_file_item: FileSystemItem, tags: AudioTags
1812    ) -> tuple[int, list[MediaItemChapter]]:
1813        """Return chapters for an audiobook.
1814
1815        Chapter sources in order of preference:
1816        1. Multiple files with track tags - sorted by track number
1817        2. Single file with embedded chapters - use embedded chapter markers
1818        3. Multiple files without track tags - sorted alphabetically (fallback)
1819        """
1820        chapters: list[MediaItemChapter] = []
1821        all_chapter_files: list[tuple[str, float]] = []
1822        total_duration = 0.0
1823
1824        # Scan folder for chapter files, separating tagged from untagged
1825        chapter_file_tags: list[AudioTags] = []
1826        untagged_file_tags: list[AudioTags] = []
1827        abs_path = self.get_absolute_path(audiobook_file_item.parent_path)
1828        for item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=True):
1829            if "." not in item.relative_path or item.is_dir:
1830                continue
1831            if item.ext not in AUDIOBOOK_EXTENSIONS:
1832                continue
1833            item_tags = await async_parse_tags(item.absolute_path, item.file_size)
1834            if not (tags.album == item_tags.album or (item_tags.tags.get("title") is None)):
1835                continue
1836            if item_tags.tags.get("track") is None:
1837                untagged_file_tags.append(item_tags)
1838            else:
1839                chapter_file_tags.append(item_tags)
1840
1841        # Determine chapter source
1842        use_embedded = False
1843        use_alphabetical = False
1844
1845        if len(chapter_file_tags) > 1:
1846            chapter_file_tags.sort(key=lambda x: (x.disc or 0, x.track or 0))
1847        elif len(chapter_file_tags) <= 1 and tags.chapters:
1848            use_embedded = True
1849        elif len(untagged_file_tags) > 1:
1850            use_alphabetical = True
1851            chapter_file_tags = untagged_file_tags
1852            self.logger.info(
1853                "Audiobook files have no track tags, using alphabetical order: %s",
1854                tags.album,
1855            )
1856
1857        if use_embedded:
1858            chapters = [
1859                MediaItemChapter(
1860                    position=chapter.chapter_id,
1861                    name=chapter.title or f"Chapter {chapter.chapter_id}",
1862                    start=chapter.position_start,
1863                    end=chapter.position_end,
1864                )
1865                for chapter in tags.chapters
1866            ]
1867            total_duration = try_parse_int(tags.duration) or 0
1868            self.logger.log(
1869                VERBOSE_LOG_LEVEL,
1870                "Audiobook '%s': %d embedded chapters, duration=%d",
1871                tags.album,
1872                len(chapters),
1873                int(total_duration),
1874            )
1875        else:
1876            for position, chapter_tags in enumerate(chapter_file_tags, start=1):
1877                if chapter_tags.duration is None:
1878                    self.logger.warning(
1879                        "Chapter file has no duration, skipping: %s",
1880                        chapter_tags.filename,
1881                    )
1882                    continue
1883                chapters.append(
1884                    MediaItemChapter(
1885                        position=position if use_alphabetical else (chapter_tags.track or position),
1886                        name=chapter_tags.title,
1887                        start=total_duration,
1888                        end=total_duration + chapter_tags.duration,
1889                    )
1890                )
1891                all_chapter_files.append(
1892                    (
1893                        get_relative_path(self.base_path, chapter_tags.filename),
1894                        chapter_tags.duration,
1895                    )
1896                )
1897                total_duration += chapter_tags.duration
1898            sort_method = "alphabetical" if use_alphabetical else "track"
1899            self.logger.log(
1900                VERBOSE_LOG_LEVEL,
1901                "Audiobook '%s': %d files (%s order), duration=%d",
1902                tags.album,
1903                len(chapters),
1904                sort_method,
1905                int(total_duration),
1906            )
1907
1908        # Cache chapter files for streaming
1909        await self.cache.set(
1910            key=audiobook_file_item.relative_path,
1911            data=all_chapter_files,
1912            provider=self.instance_id,
1913            category=CACHE_CATEGORY_AUDIOBOOK_CHAPTERS,
1914        )
1915        return (int(total_duration), chapters)
1916
1917    async def _get_podcast_metadata(self, podcast_folder: str) -> dict[str, Any]:
1918        """Return metadata for a podcast."""
1919        if (
1920            cache := await self.cache.get(
1921                key=podcast_folder,
1922                provider=self.instance_id,
1923                category=CACHE_CATEGORY_PODCAST_METADATA,
1924            )
1925        ) is not None:
1926            return cast("dict[str, Any]", cache)
1927        data: dict[str, Any] = {}
1928        metadata_file = os.path.join(podcast_folder, "metadata.json")
1929        if await self.exists(metadata_file):
1930            # found json file with metadata
1931            metadata_file = self.get_absolute_path(metadata_file)
1932            async with aiofiles.open(metadata_file) as _file:
1933                data.update(json_loads(await _file.read()))
1934        await self.cache.set(
1935            key=podcast_folder,
1936            data=data,
1937            provider=self.instance_id,
1938            category=CACHE_CATEGORY_PODCAST_METADATA,
1939        )
1940        return data
1941