/
/
/
1"""Built-in/generic provider to handle media from files and (remote) urls."""
2
3from __future__ import annotations
4
5import asyncio
6import os
7import time
8from collections.abc import AsyncGenerator
9from typing import TYPE_CHECKING, Final, cast, get_args
10
11import aiofiles
12import shortuuid
13from music_assistant_models.enums import (
14 ContentType,
15 ImageType,
16 MediaType,
17 ProviderFeature,
18 StreamType,
19)
20from music_assistant_models.errors import (
21 InvalidDataError,
22 MediaNotFoundError,
23 ProviderUnavailableError,
24)
25from music_assistant_models.media_items import (
26 Artist,
27 AudioFormat,
28 MediaItemImage,
29 MediaItemMetadata,
30 MediaItemType,
31 Playlist,
32 ProviderMapping,
33 Radio,
34 Track,
35 UniqueList,
36)
37from music_assistant_models.streamdetails import StreamDetails
38
39from music_assistant.constants import (
40 MASS_LOGO,
41 VARIOUS_ARTISTS_FANART,
42 PlaylistPlayableItem,
43)
44from music_assistant.controllers.cache import use_cache
45from music_assistant.helpers.tags import AudioTags, async_parse_tags
46from music_assistant.helpers.uri import parse_uri
47from music_assistant.models.music_provider import MusicProvider
48
49from .constants import (
50 ALL_FAVORITE_TRACKS,
51 BUILTIN_PLAYLISTS,
52 BUILTIN_PLAYLISTS_ENTRIES,
53 COLLAGE_IMAGE_PLAYLISTS,
54 CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN,
55 CONF_ENTRY_LIBRARY_SYNC_PLAYLISTS_HIDDEN,
56 CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN,
57 CONF_ENTRY_LIBRARY_SYNC_TRACKS_HIDDEN,
58 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_PLAYLISTS_MOD,
59 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN,
60 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_TRACKS_HIDDEN,
61 CONF_KEY_PLAYLISTS,
62 CONF_KEY_RADIOS,
63 CONF_KEY_TRACKS,
64 DEFAULT_FANART,
65 DEFAULT_THUMB,
66 RANDOM_ALBUM,
67 RANDOM_ARTIST,
68 RANDOM_TRACKS,
69 RECENTLY_ADDED_TRACKS,
70 RECENTLY_PLAYED,
71 StoredItem,
72)
73
74if TYPE_CHECKING:
75 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
76 from music_assistant_models.provider import ProviderManifest
77
78 from music_assistant.mass import MusicAssistant
79 from music_assistant.models import ProviderInstanceType
80
81CACHE_CATEGORY_MEDIA_INFO: Final[int] = 1
82CACHE_CATEGORY_PLAYLISTS: Final[int] = 2
83
84SUPPORTED_FEATURES = {
85 ProviderFeature.BROWSE,
86 ProviderFeature.LIBRARY_TRACKS,
87 ProviderFeature.LIBRARY_RADIOS,
88 ProviderFeature.LIBRARY_PLAYLISTS,
89 ProviderFeature.LIBRARY_TRACKS_EDIT,
90 ProviderFeature.LIBRARY_RADIOS_EDIT,
91 ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
92 ProviderFeature.PLAYLIST_CREATE,
93 ProviderFeature.PLAYLIST_TRACKS_EDIT,
94}
95
96
97async def setup(
98 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
99) -> ProviderInstanceType:
100 """Initialize provider(instance) with given configuration."""
101 return BuiltinProvider(mass, manifest, config, SUPPORTED_FEATURES)
102
103
104async def get_config_entries(
105 mass: MusicAssistant, # noqa: ARG001
106 instance_id: str | None = None, # noqa: ARG001
107 action: str | None = None, # noqa: ARG001
108 values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
109) -> tuple[ConfigEntry, ...]:
110 """
111 Return Config entries to setup this provider.
112
113 instance_id: id of an existing provider instance (None if new instance setup).
114 action: [optional] action key called from config entries UI.
115 values: the (intermediate) raw values for config entries sent with the action.
116 """
117 return (
118 *BUILTIN_PLAYLISTS_ENTRIES,
119 # hide some of the default (dynamic) entries for library management
120 CONF_ENTRY_LIBRARY_SYNC_TRACKS_HIDDEN,
121 CONF_ENTRY_LIBRARY_SYNC_PLAYLISTS_HIDDEN,
122 CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN,
123 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_TRACKS_HIDDEN,
124 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN,
125 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_PLAYLISTS_MOD,
126 CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN,
127 )
128
129
130class BuiltinProvider(MusicProvider):
131 """Built-in/generic provider to handle (manually added) media from files and (remote) urls."""
132
133 _playlists_dir: str
134 _playlist_lock: asyncio.Lock
135
136 async def loaded_in_mass(self) -> None:
137 """Call after the provider has been loaded."""
138 self._playlist_lock = asyncio.Lock()
139 # make sure that our directory with collage images exists
140 self._playlists_dir = os.path.join(self.mass.storage_path, "playlists")
141 if not await asyncio.to_thread(os.path.exists, self._playlists_dir):
142 await asyncio.to_thread(os.mkdir, self._playlists_dir)
143 await super().loaded_in_mass()
144
145 @property
146 def is_streaming_provider(self) -> bool:
147 """Return True if the provider is a streaming provider."""
148 return False
149
150 async def get_track(self, prov_track_id: str) -> Track:
151 """Get full track details by id."""
152 parsed_item = cast("Track", await self.parse_item(prov_track_id))
153 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
154 if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None):
155 # always prefer the stored info, such as the name
156 parsed_item.name = stored_item["name"]
157 if image_url := stored_item.get("image_url"):
158 parsed_item.metadata.add_image(
159 MediaItemImage(
160 type=ImageType.THUMB,
161 path=image_url,
162 provider=self.domain,
163 remotely_accessible=image_url.startswith("http"),
164 )
165 )
166 return parsed_item
167
168 async def get_radio(self, prov_radio_id: str) -> Radio:
169 """Get full radio details by id."""
170 parsed_item = await self.parse_item(prov_radio_id, force_radio=True)
171 assert isinstance(parsed_item, Radio)
172 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
173 if stored_item := next((x for x in stored_items if x["item_id"] == prov_radio_id), None):
174 # always prefer the stored info, such as the name
175 parsed_item.name = stored_item["name"]
176 if image_url := stored_item.get("image_url"):
177 parsed_item.metadata.add_image(
178 MediaItemImage(
179 type=ImageType.THUMB,
180 path=image_url,
181 provider=self.domain,
182 remotely_accessible=image_url.startswith("http"),
183 )
184 )
185 return parsed_item
186
187 async def get_artist(self, prov_artist_id: str) -> Artist:
188 """Get full artist details by id."""
189 artist = prov_artist_id
190 # this is here for compatibility reasons only
191 return Artist(
192 item_id=artist,
193 provider=self.domain,
194 name=artist,
195 provider_mappings={
196 ProviderMapping(
197 item_id=artist,
198 provider_domain=self.domain,
199 provider_instance=self.instance_id,
200 available=False,
201 )
202 },
203 )
204
205 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
206 """Get full playlist details by id."""
207 if prov_playlist_id in BUILTIN_PLAYLISTS:
208 # this is one of our builtin/default playlists
209 return Playlist(
210 item_id=prov_playlist_id,
211 provider=self.instance_id,
212 name=BUILTIN_PLAYLISTS[prov_playlist_id],
213 provider_mappings={
214 ProviderMapping(
215 item_id=prov_playlist_id,
216 provider_domain=self.domain,
217 provider_instance=self.instance_id,
218 )
219 },
220 owner="Music Assistant",
221 is_editable=False,
222 metadata=MediaItemMetadata(
223 images=UniqueList([DEFAULT_THUMB])
224 if prov_playlist_id in COLLAGE_IMAGE_PLAYLISTS
225 else UniqueList([DEFAULT_THUMB, DEFAULT_FANART]),
226 ),
227 )
228 # user created universal playlist
229 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
230 stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
231 if not stored_item:
232 raise MediaNotFoundError
233 playlist = Playlist(
234 item_id=prov_playlist_id,
235 provider=self.instance_id,
236 name=stored_item["name"],
237 provider_mappings={
238 ProviderMapping(
239 item_id=prov_playlist_id,
240 provider_domain=self.domain,
241 provider_instance=self.instance_id,
242 )
243 },
244 owner="Music Assistant",
245 is_editable=True,
246 )
247 if image_url := stored_item.get("image_url"):
248 playlist.metadata.add_image(
249 MediaItemImage(
250 type=ImageType.THUMB,
251 path=image_url,
252 provider=self.domain,
253 remotely_accessible=image_url.startswith("http"),
254 )
255 )
256 return playlist
257
258 async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
259 """Get single MediaItem from provider."""
260 if media_type == MediaType.ARTIST:
261 return await self.get_artist(prov_item_id)
262 if media_type == MediaType.TRACK:
263 return await self.get_track(prov_item_id)
264 if media_type == MediaType.RADIO:
265 return await self.get_radio(prov_item_id)
266 if media_type == MediaType.PLAYLIST:
267 return await self.get_playlist(prov_item_id)
268 if media_type == MediaType.UNKNOWN:
269 return await self.parse_item(prov_item_id)
270 raise NotImplementedError
271
272 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
273 """Retrieve library tracks from the provider."""
274 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
275 for item in stored_items:
276 try:
277 yield await self.get_track(item["item_id"])
278 except MediaNotFoundError as err:
279 self.logger.warning("Track %s not found: %s", item, err)
280
281 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
282 """Retrieve library/subscribed playlists from the provider."""
283 # return user stored playlists
284 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
285 for item in stored_items:
286 yield await self.get_playlist(item["item_id"])
287 # return builtin playlists
288 for item_id in BUILTIN_PLAYLISTS:
289 if self.config.get_value(item_id) is False:
290 continue
291 yield await self.get_playlist(item_id)
292
293 async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
294 """Retrieve library/subscribed radio stations from the provider."""
295 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
296 for item in stored_items:
297 try:
298 yield await self.get_radio(item["item_id"])
299 except (MediaNotFoundError, InvalidDataError) as err:
300 self.logger.warning("Radio station %s not found: %s", item, err)
301 yield Radio(
302 item_id=item["item_id"],
303 provider=self.instance_id,
304 name=item["name"],
305 provider_mappings={
306 ProviderMapping(
307 item_id=item["item_id"],
308 provider_domain=self.domain,
309 provider_instance=self.instance_id,
310 available=False,
311 )
312 },
313 )
314
315 async def library_add(self, item: MediaItemType) -> bool:
316 """Add item to provider's library. Return true on success."""
317 if item.media_type == MediaType.TRACK:
318 key = CONF_KEY_TRACKS
319 elif item.media_type == MediaType.RADIO:
320 key = CONF_KEY_RADIOS
321 else:
322 return False
323 stored_item = StoredItem(item_id=item.item_id, name=item.name)
324 if item.image:
325 stored_item["image_url"] = item.image.path
326 stored_items: list[StoredItem] = self.mass.config.get(key, [])
327 # filter out existing
328 stored_items = [x for x in stored_items if x["item_id"] != item.item_id]
329 stored_items.append(stored_item)
330 self.mass.config.set(key, stored_items)
331 return True
332
333 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
334 """Remove item from provider's library. Return true on success."""
335 if media_type == MediaType.PLAYLIST and prov_item_id in BUILTIN_PLAYLISTS:
336 # user wants to disable/remove one of our builtin playlists
337 # to prevent it comes back, we mark it as disabled in config
338 self._update_config_value(prov_item_id, False)
339 return True
340 if media_type == MediaType.TRACK:
341 # regular manual track URL/path
342 key = CONF_KEY_TRACKS
343 elif media_type == MediaType.RADIO:
344 # regular manual radio URL/path
345 key = CONF_KEY_RADIOS
346 elif media_type == MediaType.PLAYLIST:
347 # manually added (multi provider) playlist removal
348 key = CONF_KEY_PLAYLISTS
349 # also delete the playlist file if it exists
350 playlist_file = os.path.join(self._playlists_dir, prov_item_id)
351 if await asyncio.to_thread(os.path.isfile, playlist_file):
352 async with self._playlist_lock:
353 await asyncio.to_thread(os.remove, playlist_file)
354 else:
355 return False
356 stored_items: list[StoredItem] = self.mass.config.get(key, [])
357 stored_items = [x for x in stored_items if x["item_id"] != prov_item_id]
358 self.mass.config.set(key, stored_items)
359 return True
360
361 async def get_playlist_tracks( # type: ignore[override]
362 self, prov_playlist_id: str, page: int = 0
363 ) -> list[PlaylistPlayableItem]:
364 """Get playlist tracks.
365
366 Builtin provider supports Track, Radio, PodcastEpisode, and Audiobook items in playlists.
367 Overrides base class to return extended union type instead of list[Track].
368 """
369 if page > 0:
370 # paging not supported, we always return the whole list at once
371 return []
372 if prov_playlist_id in BUILTIN_PLAYLISTS:
373 # System-generated playlists (favorites, random, etc.) only contain tracks
374 return list(await self._get_builtin_playlist_tracks(prov_playlist_id))
375 # User-created playlists can contain Track, Radio, PodcastEpisode, and Audiobook items
376 result: list[PlaylistPlayableItem] = []
377 playlist_items = await self._read_playlist_file_items(prov_playlist_id)
378 for index, uri in enumerate(playlist_items, 1):
379 try:
380 media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
381 media_controller = self.mass.music.get_controller(media_type)
382 # prefer item already in the db
383 track = await media_controller.get_library_item_by_prov_id(
384 item_id, provider_instance_id_or_domain
385 )
386 if track is None:
387 # get the provider item and not the full track from a regular 'get' call
388 # as we only need basic track info here
389 track = await media_controller.get_provider_item(
390 item_id, provider_instance_id_or_domain
391 )
392 if isinstance(track, get_args(PlaylistPlayableItem)):
393 playlist_item = cast("PlaylistPlayableItem", track)
394 playlist_item.position = index
395 result.append(playlist_item)
396 else:
397 self.logger.warning(
398 "Unsupported media type in playlist %s: %s", prov_playlist_id, type(track)
399 )
400 except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
401 self.logger.warning(
402 "Skipping %s in playlist %s: %s", uri, prov_playlist_id, str(err)
403 )
404 return result
405
406 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
407 """Add track(s) to playlist."""
408 playlist_items = await self._read_playlist_file_items(prov_playlist_id)
409 for uri in prov_track_ids:
410 if uri not in playlist_items:
411 playlist_items.append(uri)
412 # store playlist file
413 await self._write_playlist_file_items(prov_playlist_id, playlist_items)
414 # mark last_updated on playlist object
415 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
416 stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
417 if stored_item:
418 stored_item["last_updated"] = int(time.time())
419 self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
420
421 async def remove_playlist_tracks(
422 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
423 ) -> None:
424 """Remove track(s) from playlist."""
425 playlist_items = await self._read_playlist_file_items(prov_playlist_id)
426 # remove items by index
427 for i in sorted(positions_to_remove, reverse=True):
428 del playlist_items[i - 1]
429 # store playlist file
430 await self._write_playlist_file_items(prov_playlist_id, playlist_items)
431 # mark last_updated on playlist object
432 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
433 stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
434 if stored_item:
435 stored_item["last_updated"] = int(time.time())
436 self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
437
438 async def create_playlist(self, name: str) -> Playlist:
439 """Create a new playlist on provider with given name."""
440 item_id = shortuuid.random(8)
441 stored_item = StoredItem(item_id=item_id, name=name)
442 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
443 stored_items.append(stored_item)
444 self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
445 return await self.get_playlist(item_id)
446
447 async def parse_item(
448 self,
449 url: str,
450 force_refresh: bool = False,
451 force_radio: bool = False,
452 ) -> Track | Radio:
453 """Parse plain URL to MediaItem of type Radio or Track."""
454 media_info = await self._get_media_info(url, force_refresh)
455 is_radio = media_info.get("icyname") or not media_info.duration
456 provider_mappings = {
457 ProviderMapping(
458 item_id=url,
459 provider_domain=self.domain,
460 provider_instance=self.instance_id,
461 audio_format=AudioFormat(
462 content_type=ContentType.try_parse(media_info.format),
463 sample_rate=media_info.sample_rate,
464 bit_depth=media_info.bits_per_sample,
465 bit_rate=media_info.bit_rate,
466 ),
467 )
468 }
469 media_item: Track | Radio
470 if is_radio or force_radio:
471 # treat as radio
472 media_item = Radio(
473 item_id=url,
474 provider=self.domain,
475 name=media_info.get("icyname")
476 or media_info.get("programtitle")
477 or media_info.title
478 or url,
479 provider_mappings=provider_mappings,
480 )
481 else:
482 media_item = Track(
483 item_id=url,
484 provider=self.domain,
485 name=media_info.title or url,
486 duration=int(media_info.duration or 0),
487 artists=UniqueList(
488 [await self.get_artist(artist) for artist in media_info.artists]
489 ),
490 provider_mappings=provider_mappings,
491 )
492
493 if media_info.has_cover_image:
494 media_item.metadata.images = UniqueList(
495 [
496 MediaItemImage(
497 type=ImageType.THUMB,
498 path=url,
499 provider=self.domain,
500 remotely_accessible=False,
501 )
502 ]
503 )
504 return media_item
505
506 async def resolve_image(self, path: str) -> str | bytes:
507 """
508 Resolve an image from an image path.
509
510 This either returns (a generator to get) raw bytes of the image or
511 a string with an http(s) URL or local path that is accessible from the server.
512 """
513 if path == "logo.png":
514 return MASS_LOGO
515 if path in ("fanart.jpg", "fallback_fanart.jpeg"):
516 return VARIOUS_ARTISTS_FANART
517 return path
518
519 async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
520 """Retrieve mediainfo for url."""
521 # do we have some cached info for this url ?
522 cached_info = await self.mass.cache.get(
523 url, provider=self.instance_id, category=CACHE_CATEGORY_MEDIA_INFO
524 )
525 if cached_info and not force_refresh:
526 return AudioTags.parse(cached_info)
527 # parse info with ffprobe (and store in cache)
528 media_info = await async_parse_tags(url)
529 if "authSig" in url:
530 media_info.has_cover_image = False
531 await self.mass.cache.set(
532 url, media_info.raw, provider=self.instance_id, category=CACHE_CATEGORY_MEDIA_INFO
533 )
534 return media_info
535
536 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
537 """Get streamdetails for a track/radio."""
538 media_info = await self._get_media_info(item_id)
539 is_radio = media_info.get("icy-name") or not media_info.duration
540 return StreamDetails(
541 provider=self.instance_id,
542 item_id=item_id,
543 audio_format=AudioFormat(
544 content_type=ContentType.try_parse(media_info.format),
545 sample_rate=media_info.sample_rate,
546 bit_depth=media_info.bits_per_sample,
547 channels=media_info.channels,
548 ),
549 media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
550 stream_type=StreamType.HTTP,
551 path=item_id,
552 can_seek=not is_radio,
553 allow_seek=not is_radio,
554 )
555
556 @use_cache(expiration=120, category=CACHE_CATEGORY_PLAYLISTS)
557 async def _get_builtin_playlist_random_favorite_tracks(self) -> list[Track]:
558 result: list[Track] = []
559 res = await self.mass.music.tracks.library_items(
560 favorite=True, limit=250000, order_by="random_play_count"
561 )
562 for idx, item in enumerate(res, 1):
563 item.position = idx
564 result.append(item)
565 return result
566
567 @use_cache(expiration=120, category=CACHE_CATEGORY_PLAYLISTS)
568 async def _get_builtin_playlist_random_tracks(self) -> list[Track]:
569 result: list[Track] = []
570 res = await self.mass.music.tracks.library_items(limit=500, order_by="random_play_count")
571 for idx, item in enumerate(res, 1):
572 item.position = idx
573 result.append(item)
574 return result
575
576 @use_cache(expiration=3600, category=CACHE_CATEGORY_PLAYLISTS)
577 async def _get_builtin_playlist_random_album(self) -> list[Track]:
578 for random_album in await self.mass.music.albums.get_library_items_by_query(
579 limit=1,
580 order_by="random",
581 extra_query_parts=["album_type != :excluded_album_type"],
582 extra_query_params={"excluded_album_type": "single"},
583 ):
584 tracks = await self.mass.music.albums.tracks(
585 random_album.item_id, random_album.provider
586 )
587 for idx, track in enumerate(tracks, 1):
588 track.position = idx
589 return tracks
590 return []
591
592 @use_cache(expiration=3600, category=CACHE_CATEGORY_PLAYLISTS)
593 async def _get_builtin_playlist_random_artist(self) -> list[Track]:
594 for in_library_only in (True, False):
595 for min_tracks_required in (25, 10, 5, 1):
596 for random_artist in await self.mass.music.artists.library_items(
597 limit=25, order_by="random"
598 ):
599 tracks = await self.mass.music.artists.tracks(
600 random_artist.item_id,
601 random_artist.provider,
602 in_library_only=in_library_only,
603 )
604 if len(tracks) < min_tracks_required:
605 continue
606 for idx, track in enumerate(tracks, 1):
607 track.position = idx
608 return tracks
609 return []
610
611 @use_cache(expiration=30, category=CACHE_CATEGORY_PLAYLISTS)
612 async def _get_builtin_playlist_recently_played(self) -> list[Track]:
613 result: list[Track] = []
614 recent_tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
615 for idx, item in enumerate(recent_tracks, 1):
616 if not (item_provider := self.mass.get_provider(item.provider)):
617 continue
618 track = Track(
619 item_id=item.item_id,
620 provider=item.provider,
621 name=item.name,
622 provider_mappings={
623 ProviderMapping(
624 item_id=item.item_id,
625 provider_domain=item_provider.domain,
626 provider_instance=item_provider.instance_id,
627 )
628 },
629 )
630 if item.image:
631 track.metadata.add_image(item.image)
632 track.position = idx
633 result.append(track)
634 return result
635
636 @use_cache(expiration=60, category=CACHE_CATEGORY_PLAYLISTS)
637 async def _get_builtin_playlist_recently_added_tracks(self) -> list[Track]:
638 result: list[Track] = []
639 recent_tracks = await self.mass.music.recently_added_tracks(100)
640 for idx, track in enumerate(recent_tracks, 1):
641 track.position = idx
642 result.append(track)
643 return result
644
645 async def _get_builtin_playlist_tracks(
646 self, builtin_playlist_id: str
647 ) -> list[Track] | UniqueList[Track]:
648 """Get all playlist tracks for given builtin playlist id."""
649 try:
650 return await {
651 ALL_FAVORITE_TRACKS: self._get_builtin_playlist_random_favorite_tracks,
652 RANDOM_TRACKS: self._get_builtin_playlist_random_tracks,
653 RANDOM_ALBUM: self._get_builtin_playlist_random_album,
654 RANDOM_ARTIST: self._get_builtin_playlist_random_artist,
655 RECENTLY_PLAYED: self._get_builtin_playlist_recently_played,
656 RECENTLY_ADDED_TRACKS: self._get_builtin_playlist_recently_added_tracks,
657 }[builtin_playlist_id]()
658 except KeyError:
659 raise MediaNotFoundError(f"No built in playlist: {builtin_playlist_id}")
660
661 async def _read_playlist_file_items(self, playlist_id: str) -> list[str]:
662 """Return lines of a playlist file."""
663 playlist_file = os.path.join(self._playlists_dir, playlist_id)
664 if not await asyncio.to_thread(os.path.isfile, playlist_file):
665 return []
666 async with (
667 self._playlist_lock,
668 aiofiles.open(playlist_file, encoding="utf-8") as _file,
669 ):
670 lines = await _file.readlines()
671 return [x.strip() for x in lines]
672
673 async def _write_playlist_file_items(self, playlist_id: str, lines: list[str]) -> None:
674 """Return lines of a playlist file."""
675 playlist_file = os.path.join(self._playlists_dir, playlist_id)
676 async with (
677 self._playlist_lock,
678 aiofiles.open(playlist_file, "w", encoding="utf-8") as _file,
679 ):
680 await _file.write("\n".join(lines))
681