music-assistant-server

27.2 KBPY
__init__.py
27.2 KB666 lines • python
1"""Built-in/generic provider to handle media from files and (remote) urls."""
2
3from __future__ import annotations
4
5import asyncio
6import os
7import time
8from collections.abc import AsyncGenerator
9from typing import TYPE_CHECKING, Final, cast
10
11import aiofiles
12import shortuuid
13from music_assistant_models.enums import (
14    ContentType,
15    ImageType,
16    MediaType,
17    ProviderFeature,
18    StreamType,
19)
20from music_assistant_models.errors import (
21    InvalidDataError,
22    MediaNotFoundError,
23    ProviderUnavailableError,
24)
25from music_assistant_models.media_items import (
26    Artist,
27    AudioFormat,
28    MediaItemImage,
29    MediaItemMetadata,
30    MediaItemType,
31    Playlist,
32    ProviderMapping,
33    Radio,
34    Track,
35    UniqueList,
36)
37from music_assistant_models.streamdetails import StreamDetails
38
39from music_assistant.constants import MASS_LOGO, VARIOUS_ARTISTS_FANART
40from music_assistant.controllers.cache import use_cache
41from music_assistant.helpers.tags import AudioTags, async_parse_tags
42from music_assistant.helpers.uri import parse_uri
43from music_assistant.models.music_provider import MusicProvider
44
45from .constants import (
46    ALL_FAVORITE_TRACKS,
47    BUILTIN_PLAYLISTS,
48    BUILTIN_PLAYLISTS_ENTRIES,
49    COLLAGE_IMAGE_PLAYLISTS,
50    CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN,
51    CONF_ENTRY_LIBRARY_SYNC_PLAYLISTS_HIDDEN,
52    CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN,
53    CONF_ENTRY_LIBRARY_SYNC_TRACKS_HIDDEN,
54    CONF_ENTRY_PROVIDER_SYNC_INTERVAL_PLAYLISTS_MOD,
55    CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN,
56    CONF_ENTRY_PROVIDER_SYNC_INTERVAL_TRACKS_HIDDEN,
57    CONF_KEY_PLAYLISTS,
58    CONF_KEY_RADIOS,
59    CONF_KEY_TRACKS,
60    DEFAULT_FANART,
61    DEFAULT_THUMB,
62    RANDOM_ALBUM,
63    RANDOM_ARTIST,
64    RANDOM_TRACKS,
65    RECENTLY_ADDED_TRACKS,
66    RECENTLY_PLAYED,
67    StoredItem,
68)
69
70if TYPE_CHECKING:
71    from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
72    from music_assistant_models.provider import ProviderManifest
73
74    from music_assistant.mass import MusicAssistant
75    from music_assistant.models import ProviderInstanceType
76
77CACHE_CATEGORY_MEDIA_INFO: Final[int] = 1
78CACHE_CATEGORY_PLAYLISTS: Final[int] = 2
79
80
81SUPPORTED_FEATURES = {
82    ProviderFeature.BROWSE,
83    ProviderFeature.LIBRARY_TRACKS,
84    ProviderFeature.LIBRARY_RADIOS,
85    ProviderFeature.LIBRARY_PLAYLISTS,
86    ProviderFeature.LIBRARY_TRACKS_EDIT,
87    ProviderFeature.LIBRARY_RADIOS_EDIT,
88    ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
89    ProviderFeature.PLAYLIST_CREATE,
90    ProviderFeature.PLAYLIST_TRACKS_EDIT,
91}
92
93
94async def setup(
95    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
96) -> ProviderInstanceType:
97    """Initialize provider(instance) with given configuration."""
98    return BuiltinProvider(mass, manifest, config, SUPPORTED_FEATURES)
99
100
101async def get_config_entries(
102    mass: MusicAssistant,  # noqa: ARG001
103    instance_id: str | None = None,  # noqa: ARG001
104    action: str | None = None,  # noqa: ARG001
105    values: dict[str, ConfigValueType] | None = None,  # noqa: ARG001
106) -> tuple[ConfigEntry, ...]:
107    """
108    Return Config entries to setup this provider.
109
110    instance_id: id of an existing provider instance (None if new instance setup).
111    action: [optional] action key called from config entries UI.
112    values: the (intermediate) raw values for config entries sent with the action.
113    """
114    return (
115        *BUILTIN_PLAYLISTS_ENTRIES,
116        # hide some of the default (dynamic) entries for library management
117        CONF_ENTRY_LIBRARY_SYNC_TRACKS_HIDDEN,
118        CONF_ENTRY_LIBRARY_SYNC_PLAYLISTS_HIDDEN,
119        CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN,
120        CONF_ENTRY_PROVIDER_SYNC_INTERVAL_TRACKS_HIDDEN,
121        CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN,
122        CONF_ENTRY_PROVIDER_SYNC_INTERVAL_PLAYLISTS_MOD,
123        CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN,
124    )
125
126
127class BuiltinProvider(MusicProvider):
128    """Built-in/generic provider to handle (manually added) media from files and (remote) urls."""
129
130    _playlists_dir: str
131    _playlist_lock: asyncio.Lock
132
133    async def loaded_in_mass(self) -> None:
134        """Call after the provider has been loaded."""
135        self._playlist_lock = asyncio.Lock()
136        # make sure that our directory with collage images exists
137        self._playlists_dir = os.path.join(self.mass.storage_path, "playlists")
138        if not await asyncio.to_thread(os.path.exists, self._playlists_dir):
139            await asyncio.to_thread(os.mkdir, self._playlists_dir)
140        await super().loaded_in_mass()
141
142    @property
143    def is_streaming_provider(self) -> bool:
144        """Return True if the provider is a streaming provider."""
145        return False
146
147    async def get_track(self, prov_track_id: str) -> Track:
148        """Get full track details by id."""
149        parsed_item = cast("Track", await self.parse_item(prov_track_id))
150        stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
151        if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None):
152            # always prefer the stored info, such as the name
153            parsed_item.name = stored_item["name"]
154            if image_url := stored_item.get("image_url"):
155                parsed_item.metadata.add_image(
156                    MediaItemImage(
157                        type=ImageType.THUMB,
158                        path=image_url,
159                        provider=self.domain,
160                        remotely_accessible=image_url.startswith("http"),
161                    )
162                )
163        return parsed_item
164
165    async def get_radio(self, prov_radio_id: str) -> Radio:
166        """Get full radio details by id."""
167        parsed_item = await self.parse_item(prov_radio_id, force_radio=True)
168        assert isinstance(parsed_item, Radio)
169        stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
170        if stored_item := next((x for x in stored_items if x["item_id"] == prov_radio_id), None):
171            # always prefer the stored info, such as the name
172            parsed_item.name = stored_item["name"]
173            if image_url := stored_item.get("image_url"):
174                parsed_item.metadata.add_image(
175                    MediaItemImage(
176                        type=ImageType.THUMB,
177                        path=image_url,
178                        provider=self.domain,
179                        remotely_accessible=image_url.startswith("http"),
180                    )
181                )
182        return parsed_item
183
184    async def get_artist(self, prov_artist_id: str) -> Artist:
185        """Get full artist details by id."""
186        artist = prov_artist_id
187        # this is here for compatibility reasons only
188        return Artist(
189            item_id=artist,
190            provider=self.domain,
191            name=artist,
192            provider_mappings={
193                ProviderMapping(
194                    item_id=artist,
195                    provider_domain=self.domain,
196                    provider_instance=self.instance_id,
197                    available=False,
198                )
199            },
200        )
201
202    async def get_playlist(self, prov_playlist_id: str) -> Playlist:
203        """Get full playlist details by id."""
204        if prov_playlist_id in BUILTIN_PLAYLISTS:
205            # this is one of our builtin/default playlists
206            return Playlist(
207                item_id=prov_playlist_id,
208                provider=self.instance_id,
209                name=BUILTIN_PLAYLISTS[prov_playlist_id],
210                provider_mappings={
211                    ProviderMapping(
212                        item_id=prov_playlist_id,
213                        provider_domain=self.domain,
214                        provider_instance=self.instance_id,
215                    )
216                },
217                owner="Music Assistant",
218                is_editable=False,
219                metadata=MediaItemMetadata(
220                    images=UniqueList([DEFAULT_THUMB])
221                    if prov_playlist_id in COLLAGE_IMAGE_PLAYLISTS
222                    else UniqueList([DEFAULT_THUMB, DEFAULT_FANART]),
223                ),
224            )
225        # user created universal playlist
226        stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
227        stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
228        if not stored_item:
229            raise MediaNotFoundError
230        playlist = Playlist(
231            item_id=prov_playlist_id,
232            provider=self.instance_id,
233            name=stored_item["name"],
234            provider_mappings={
235                ProviderMapping(
236                    item_id=prov_playlist_id,
237                    provider_domain=self.domain,
238                    provider_instance=self.instance_id,
239                )
240            },
241            owner="Music Assistant",
242            is_editable=True,
243        )
244        if image_url := stored_item.get("image_url"):
245            playlist.metadata.add_image(
246                MediaItemImage(
247                    type=ImageType.THUMB,
248                    path=image_url,
249                    provider=self.domain,
250                    remotely_accessible=image_url.startswith("http"),
251                )
252            )
253        return playlist
254
255    async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
256        """Get single MediaItem from provider."""
257        if media_type == MediaType.ARTIST:
258            return await self.get_artist(prov_item_id)
259        if media_type == MediaType.TRACK:
260            return await self.get_track(prov_item_id)
261        if media_type == MediaType.RADIO:
262            return await self.get_radio(prov_item_id)
263        if media_type == MediaType.PLAYLIST:
264            return await self.get_playlist(prov_item_id)
265        if media_type == MediaType.UNKNOWN:
266            return await self.parse_item(prov_item_id)
267        raise NotImplementedError
268
269    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
270        """Retrieve library tracks from the provider."""
271        stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
272        for item in stored_items:
273            try:
274                yield await self.get_track(item["item_id"])
275            except MediaNotFoundError as err:
276                self.logger.warning("Track %s not found: %s", item, err)
277
278    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
279        """Retrieve library/subscribed playlists from the provider."""
280        # return user stored playlists
281        stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
282        for item in stored_items:
283            yield await self.get_playlist(item["item_id"])
284        # return builtin playlists
285        for item_id in BUILTIN_PLAYLISTS:
286            if self.config.get_value(item_id) is False:
287                continue
288            yield await self.get_playlist(item_id)
289
290    async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
291        """Retrieve library/subscribed radio stations from the provider."""
292        stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
293        for item in stored_items:
294            try:
295                yield await self.get_radio(item["item_id"])
296            except (MediaNotFoundError, InvalidDataError) as err:
297                self.logger.warning("Radio station %s not found: %s", item, err)
298                yield Radio(
299                    item_id=item["item_id"],
300                    provider=self.instance_id,
301                    name=item["name"],
302                    provider_mappings={
303                        ProviderMapping(
304                            item_id=item["item_id"],
305                            provider_domain=self.domain,
306                            provider_instance=self.instance_id,
307                            available=False,
308                        )
309                    },
310                )
311
312    async def library_add(self, item: MediaItemType) -> bool:
313        """Add item to provider's library. Return true on success."""
314        if item.media_type == MediaType.TRACK:
315            key = CONF_KEY_TRACKS
316        elif item.media_type == MediaType.RADIO:
317            key = CONF_KEY_RADIOS
318        else:
319            return False
320        stored_item = StoredItem(item_id=item.item_id, name=item.name)
321        if item.image:
322            stored_item["image_url"] = item.image.path
323        stored_items: list[StoredItem] = self.mass.config.get(key, [])
324        # filter out existing
325        stored_items = [x for x in stored_items if x["item_id"] != item.item_id]
326        stored_items.append(stored_item)
327        self.mass.config.set(key, stored_items)
328        return True
329
330    async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
331        """Remove item from provider's library. Return true on success."""
332        if media_type == MediaType.PLAYLIST and prov_item_id in BUILTIN_PLAYLISTS:
333            # user wants to disable/remove one of our builtin playlists
334            # to prevent it comes back, we mark it as disabled in config
335            self._update_config_value(prov_item_id, False)
336            return True
337        if media_type == MediaType.TRACK:
338            # regular manual track URL/path
339            key = CONF_KEY_TRACKS
340        elif media_type == MediaType.RADIO:
341            # regular manual radio URL/path
342            key = CONF_KEY_RADIOS
343        elif media_type == MediaType.PLAYLIST:
344            # manually added (multi provider) playlist removal
345            key = CONF_KEY_PLAYLISTS
346            # also delete the playlist file if it exists
347            playlist_file = os.path.join(self._playlists_dir, prov_item_id)
348            if await asyncio.to_thread(os.path.isfile, playlist_file):
349                async with self._playlist_lock:
350                    await asyncio.to_thread(os.remove, playlist_file)
351        else:
352            return False
353        stored_items: list[StoredItem] = self.mass.config.get(key, [])
354        stored_items = [x for x in stored_items if x["item_id"] != prov_item_id]
355        self.mass.config.set(key, stored_items)
356        return True
357
358    async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
359        """Get playlist tracks."""
360        if page > 0:
361            # paging not supported, we always return the whole list at once
362            return []
363        if prov_playlist_id in BUILTIN_PLAYLISTS:
364            return await self._get_builtin_playlist_tracks(prov_playlist_id)
365        # user created universal playlist
366        result: list[Track] = []
367        playlist_items = await self._read_playlist_file_items(prov_playlist_id)
368        for index, uri in enumerate(playlist_items, 1):
369            try:
370                media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
371                media_controller = self.mass.music.get_controller(media_type)
372                # prefer item already in the db
373                track = await media_controller.get_library_item_by_prov_id(
374                    item_id, provider_instance_id_or_domain
375                )
376                if track is None:
377                    # get the provider item and not the full track from a regular 'get' call
378                    # as we only need basic track info here
379                    track = await media_controller.get_provider_item(
380                        item_id, provider_instance_id_or_domain
381                    )
382                assert isinstance(track, Track)
383                track.position = index
384                result.append(track)
385            except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
386                self.logger.warning(
387                    "Skipping %s in playlist %s: %s", uri, prov_playlist_id, str(err)
388                )
389        return result
390
391    async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
392        """Add track(s) to playlist."""
393        playlist_items = await self._read_playlist_file_items(prov_playlist_id)
394        for uri in prov_track_ids:
395            if uri not in playlist_items:
396                playlist_items.append(uri)
397        # store playlist file
398        await self._write_playlist_file_items(prov_playlist_id, playlist_items)
399        # mark last_updated on playlist object
400        stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
401        stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
402        if stored_item:
403            stored_item["last_updated"] = int(time.time())
404            self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
405
406    async def remove_playlist_tracks(
407        self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
408    ) -> None:
409        """Remove track(s) from playlist."""
410        playlist_items = await self._read_playlist_file_items(prov_playlist_id)
411        # remove items by index
412        for i in sorted(positions_to_remove, reverse=True):
413            del playlist_items[i - 1]
414        # store playlist file
415        await self._write_playlist_file_items(prov_playlist_id, playlist_items)
416        # mark last_updated on playlist object
417        stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
418        stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
419        if stored_item:
420            stored_item["last_updated"] = int(time.time())
421            self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
422
423    async def create_playlist(self, name: str) -> Playlist:
424        """Create a new playlist on provider with given name."""
425        item_id = shortuuid.random(8)
426        stored_item = StoredItem(item_id=item_id, name=name)
427        stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
428        stored_items.append(stored_item)
429        self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
430        return await self.get_playlist(item_id)
431
432    async def parse_item(
433        self,
434        url: str,
435        force_refresh: bool = False,
436        force_radio: bool = False,
437    ) -> Track | Radio:
438        """Parse plain URL to MediaItem of type Radio or Track."""
439        media_info = await self._get_media_info(url, force_refresh)
440        is_radio = media_info.get("icyname") or not media_info.duration
441        provider_mappings = {
442            ProviderMapping(
443                item_id=url,
444                provider_domain=self.domain,
445                provider_instance=self.instance_id,
446                audio_format=AudioFormat(
447                    content_type=ContentType.try_parse(media_info.format),
448                    sample_rate=media_info.sample_rate,
449                    bit_depth=media_info.bits_per_sample,
450                    bit_rate=media_info.bit_rate,
451                ),
452            )
453        }
454        media_item: Track | Radio
455        if is_radio or force_radio:
456            # treat as radio
457            media_item = Radio(
458                item_id=url,
459                provider=self.domain,
460                name=media_info.get("icyname")
461                or media_info.get("programtitle")
462                or media_info.title
463                or url,
464                provider_mappings=provider_mappings,
465            )
466        else:
467            media_item = Track(
468                item_id=url,
469                provider=self.domain,
470                name=media_info.title or url,
471                duration=int(media_info.duration or 0),
472                artists=UniqueList(
473                    [await self.get_artist(artist) for artist in media_info.artists]
474                ),
475                provider_mappings=provider_mappings,
476            )
477
478        if media_info.has_cover_image:
479            media_item.metadata.images = UniqueList(
480                [
481                    MediaItemImage(
482                        type=ImageType.THUMB,
483                        path=url,
484                        provider=self.domain,
485                        remotely_accessible=False,
486                    )
487                ]
488            )
489        return media_item
490
491    async def resolve_image(self, path: str) -> str | bytes:
492        """
493        Resolve an image from an image path.
494
495        This either returns (a generator to get) raw bytes of the image or
496        a string with an http(s) URL or local path that is accessible from the server.
497        """
498        if path == "logo.png":
499            return MASS_LOGO
500        if path in ("fanart.jpg", "fallback_fanart.jpeg"):
501            return VARIOUS_ARTISTS_FANART
502        return path
503
504    async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
505        """Retrieve mediainfo for url."""
506        # do we have some cached info for this url ?
507        cached_info = await self.mass.cache.get(
508            url, provider=self.instance_id, category=CACHE_CATEGORY_MEDIA_INFO
509        )
510        if cached_info and not force_refresh:
511            return AudioTags.parse(cached_info)
512        # parse info with ffprobe (and store in cache)
513        media_info = await async_parse_tags(url)
514        if "authSig" in url:
515            media_info.has_cover_image = False
516        await self.mass.cache.set(
517            url, media_info.raw, provider=self.instance_id, category=CACHE_CATEGORY_MEDIA_INFO
518        )
519        return media_info
520
521    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
522        """Get streamdetails for a track/radio."""
523        media_info = await self._get_media_info(item_id)
524        is_radio = media_info.get("icy-name") or not media_info.duration
525        return StreamDetails(
526            provider=self.instance_id,
527            item_id=item_id,
528            audio_format=AudioFormat(
529                content_type=ContentType.try_parse(media_info.format),
530                sample_rate=media_info.sample_rate,
531                bit_depth=media_info.bits_per_sample,
532                channels=media_info.channels,
533            ),
534            media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
535            stream_type=StreamType.HTTP,
536            path=item_id,
537            can_seek=not is_radio,
538            allow_seek=not is_radio,
539        )
540
541    @use_cache(expiration=120, category=CACHE_CATEGORY_PLAYLISTS)
542    async def _get_builtin_playlist_random_favorite_tracks(self) -> list[Track]:
543        result: list[Track] = []
544        res = await self.mass.music.tracks.library_items(
545            favorite=True, limit=250000, order_by="random_play_count"
546        )
547        for idx, item in enumerate(res, 1):
548            item.position = idx
549            result.append(item)
550        return result
551
552    @use_cache(expiration=120, category=CACHE_CATEGORY_PLAYLISTS)
553    async def _get_builtin_playlist_random_tracks(self) -> list[Track]:
554        result: list[Track] = []
555        res = await self.mass.music.tracks.library_items(limit=500, order_by="random_play_count")
556        for idx, item in enumerate(res, 1):
557            item.position = idx
558            result.append(item)
559        return result
560
561    @use_cache(expiration=3600, category=CACHE_CATEGORY_PLAYLISTS)
562    async def _get_builtin_playlist_random_album(self) -> list[Track]:
563        for random_album in await self.mass.music.albums.get_library_items_by_query(
564            limit=1,
565            order_by="random",
566            extra_query_parts=["album_type != :excluded_album_type"],
567            extra_query_params={"excluded_album_type": "single"},
568        ):
569            tracks = await self.mass.music.albums.tracks(
570                random_album.item_id, random_album.provider
571            )
572            for idx, track in enumerate(tracks, 1):
573                track.position = idx
574            return tracks
575        return []
576
577    @use_cache(expiration=3600, category=CACHE_CATEGORY_PLAYLISTS)
578    async def _get_builtin_playlist_random_artist(self) -> list[Track]:
579        for in_library_only in (True, False):
580            for min_tracks_required in (25, 10, 5, 1):
581                for random_artist in await self.mass.music.artists.library_items(
582                    limit=25, order_by="random"
583                ):
584                    tracks = await self.mass.music.artists.tracks(
585                        random_artist.item_id,
586                        random_artist.provider,
587                        in_library_only=in_library_only,
588                    )
589                    if len(tracks) < min_tracks_required:
590                        continue
591                    for idx, track in enumerate(tracks, 1):
592                        track.position = idx
593                    return tracks
594        return []
595
596    @use_cache(expiration=30, category=CACHE_CATEGORY_PLAYLISTS)
597    async def _get_builtin_playlist_recently_played(self) -> list[Track]:
598        result: list[Track] = []
599        recent_tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
600        for idx, item in enumerate(recent_tracks, 1):
601            if not (item_provider := self.mass.get_provider(item.provider)):
602                continue
603            track = Track(
604                item_id=item.item_id,
605                provider=item.provider,
606                name=item.name,
607                provider_mappings={
608                    ProviderMapping(
609                        item_id=item.item_id,
610                        provider_domain=item_provider.domain,
611                        provider_instance=item_provider.instance_id,
612                    )
613                },
614            )
615            if item.image:
616                track.metadata.add_image(item.image)
617            track.position = idx
618            result.append(track)
619        return result
620
621    @use_cache(expiration=60, category=CACHE_CATEGORY_PLAYLISTS)
622    async def _get_builtin_playlist_recently_added_tracks(self) -> list[Track]:
623        result: list[Track] = []
624        recent_tracks = await self.mass.music.recently_added_tracks(100)
625        for idx, track in enumerate(recent_tracks, 1):
626            track.position = idx
627            result.append(track)
628        return result
629
630    async def _get_builtin_playlist_tracks(
631        self, builtin_playlist_id: str
632    ) -> list[Track] | UniqueList[Track]:
633        """Get all playlist tracks for given builtin playlist id."""
634        try:
635            return await {
636                ALL_FAVORITE_TRACKS: self._get_builtin_playlist_random_favorite_tracks,
637                RANDOM_TRACKS: self._get_builtin_playlist_random_tracks,
638                RANDOM_ALBUM: self._get_builtin_playlist_random_album,
639                RANDOM_ARTIST: self._get_builtin_playlist_random_artist,
640                RECENTLY_PLAYED: self._get_builtin_playlist_recently_played,
641                RECENTLY_ADDED_TRACKS: self._get_builtin_playlist_recently_added_tracks,
642            }[builtin_playlist_id]()
643        except KeyError:
644            raise MediaNotFoundError(f"No built in playlist: {builtin_playlist_id}")
645
646    async def _read_playlist_file_items(self, playlist_id: str) -> list[str]:
647        """Return lines of a playlist file."""
648        playlist_file = os.path.join(self._playlists_dir, playlist_id)
649        if not await asyncio.to_thread(os.path.isfile, playlist_file):
650            return []
651        async with (
652            self._playlist_lock,
653            aiofiles.open(playlist_file, encoding="utf-8") as _file,
654        ):
655            lines = await _file.readlines()
656            return [x.strip() for x in lines]
657
658    async def _write_playlist_file_items(self, playlist_id: str, lines: list[str]) -> None:
659        """Return lines of a playlist file."""
660        playlist_file = os.path.join(self._playlists_dir, playlist_id)
661        async with (
662            self._playlist_lock,
663            aiofiles.open(playlist_file, "w", encoding="utf-8") as _file,
664        ):
665            await _file.write("\n".join(lines))
666