music-assistant-server
27.2 KB•PY
__init__.py
27.2 KB • 666 lines • python
1"""Built-in/generic provider to handle media from files and (remote) urls."""
2
3from __future__ import annotations
4
5import asyncio
6import os
7import time
8from collections.abc import AsyncGenerator
9from typing import TYPE_CHECKING, Final, cast
10
11import aiofiles
12import shortuuid
13from music_assistant_models.enums import (
14 ContentType,
15 ImageType,
16 MediaType,
17 ProviderFeature,
18 StreamType,
19)
20from music_assistant_models.errors import (
21 InvalidDataError,
22 MediaNotFoundError,
23 ProviderUnavailableError,
24)
25from music_assistant_models.media_items import (
26 Artist,
27 AudioFormat,
28 MediaItemImage,
29 MediaItemMetadata,
30 MediaItemType,
31 Playlist,
32 ProviderMapping,
33 Radio,
34 Track,
35 UniqueList,
36)
37from music_assistant_models.streamdetails import StreamDetails
38
39from music_assistant.constants import MASS_LOGO, VARIOUS_ARTISTS_FANART
40from music_assistant.controllers.cache import use_cache
41from music_assistant.helpers.tags import AudioTags, async_parse_tags
42from music_assistant.helpers.uri import parse_uri
43from music_assistant.models.music_provider import MusicProvider
44
45from .constants import (
46 ALL_FAVORITE_TRACKS,
47 BUILTIN_PLAYLISTS,
48 BUILTIN_PLAYLISTS_ENTRIES,
49 COLLAGE_IMAGE_PLAYLISTS,
50 CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN,
51 CONF_ENTRY_LIBRARY_SYNC_PLAYLISTS_HIDDEN,
52 CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN,
53 CONF_ENTRY_LIBRARY_SYNC_TRACKS_HIDDEN,
54 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_PLAYLISTS_MOD,
55 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN,
56 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_TRACKS_HIDDEN,
57 CONF_KEY_PLAYLISTS,
58 CONF_KEY_RADIOS,
59 CONF_KEY_TRACKS,
60 DEFAULT_FANART,
61 DEFAULT_THUMB,
62 RANDOM_ALBUM,
63 RANDOM_ARTIST,
64 RANDOM_TRACKS,
65 RECENTLY_ADDED_TRACKS,
66 RECENTLY_PLAYED,
67 StoredItem,
68)
69
70if TYPE_CHECKING:
71 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
72 from music_assistant_models.provider import ProviderManifest
73
74 from music_assistant.mass import MusicAssistant
75 from music_assistant.models import ProviderInstanceType
76
77CACHE_CATEGORY_MEDIA_INFO: Final[int] = 1
78CACHE_CATEGORY_PLAYLISTS: Final[int] = 2
79
80
81SUPPORTED_FEATURES = {
82 ProviderFeature.BROWSE,
83 ProviderFeature.LIBRARY_TRACKS,
84 ProviderFeature.LIBRARY_RADIOS,
85 ProviderFeature.LIBRARY_PLAYLISTS,
86 ProviderFeature.LIBRARY_TRACKS_EDIT,
87 ProviderFeature.LIBRARY_RADIOS_EDIT,
88 ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
89 ProviderFeature.PLAYLIST_CREATE,
90 ProviderFeature.PLAYLIST_TRACKS_EDIT,
91}
92
93
94async def setup(
95 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
96) -> ProviderInstanceType:
97 """Initialize provider(instance) with given configuration."""
98 return BuiltinProvider(mass, manifest, config, SUPPORTED_FEATURES)
99
100
101async def get_config_entries(
102 mass: MusicAssistant, # noqa: ARG001
103 instance_id: str | None = None, # noqa: ARG001
104 action: str | None = None, # noqa: ARG001
105 values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
106) -> tuple[ConfigEntry, ...]:
107 """
108 Return Config entries to setup this provider.
109
110 instance_id: id of an existing provider instance (None if new instance setup).
111 action: [optional] action key called from config entries UI.
112 values: the (intermediate) raw values for config entries sent with the action.
113 """
114 return (
115 *BUILTIN_PLAYLISTS_ENTRIES,
116 # hide some of the default (dynamic) entries for library management
117 CONF_ENTRY_LIBRARY_SYNC_TRACKS_HIDDEN,
118 CONF_ENTRY_LIBRARY_SYNC_PLAYLISTS_HIDDEN,
119 CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN,
120 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_TRACKS_HIDDEN,
121 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN,
122 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_PLAYLISTS_MOD,
123 CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN,
124 )
125
126
127class BuiltinProvider(MusicProvider):
128 """Built-in/generic provider to handle (manually added) media from files and (remote) urls."""
129
130 _playlists_dir: str
131 _playlist_lock: asyncio.Lock
132
133 async def loaded_in_mass(self) -> None:
134 """Call after the provider has been loaded."""
135 self._playlist_lock = asyncio.Lock()
136 # make sure that our directory with collage images exists
137 self._playlists_dir = os.path.join(self.mass.storage_path, "playlists")
138 if not await asyncio.to_thread(os.path.exists, self._playlists_dir):
139 await asyncio.to_thread(os.mkdir, self._playlists_dir)
140 await super().loaded_in_mass()
141
142 @property
143 def is_streaming_provider(self) -> bool:
144 """Return True if the provider is a streaming provider."""
145 return False
146
147 async def get_track(self, prov_track_id: str) -> Track:
148 """Get full track details by id."""
149 parsed_item = cast("Track", await self.parse_item(prov_track_id))
150 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
151 if stored_item := next((x for x in stored_items if x["item_id"] == prov_track_id), None):
152 # always prefer the stored info, such as the name
153 parsed_item.name = stored_item["name"]
154 if image_url := stored_item.get("image_url"):
155 parsed_item.metadata.add_image(
156 MediaItemImage(
157 type=ImageType.THUMB,
158 path=image_url,
159 provider=self.domain,
160 remotely_accessible=image_url.startswith("http"),
161 )
162 )
163 return parsed_item
164
165 async def get_radio(self, prov_radio_id: str) -> Radio:
166 """Get full radio details by id."""
167 parsed_item = await self.parse_item(prov_radio_id, force_radio=True)
168 assert isinstance(parsed_item, Radio)
169 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
170 if stored_item := next((x for x in stored_items if x["item_id"] == prov_radio_id), None):
171 # always prefer the stored info, such as the name
172 parsed_item.name = stored_item["name"]
173 if image_url := stored_item.get("image_url"):
174 parsed_item.metadata.add_image(
175 MediaItemImage(
176 type=ImageType.THUMB,
177 path=image_url,
178 provider=self.domain,
179 remotely_accessible=image_url.startswith("http"),
180 )
181 )
182 return parsed_item
183
184 async def get_artist(self, prov_artist_id: str) -> Artist:
185 """Get full artist details by id."""
186 artist = prov_artist_id
187 # this is here for compatibility reasons only
188 return Artist(
189 item_id=artist,
190 provider=self.domain,
191 name=artist,
192 provider_mappings={
193 ProviderMapping(
194 item_id=artist,
195 provider_domain=self.domain,
196 provider_instance=self.instance_id,
197 available=False,
198 )
199 },
200 )
201
202 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
203 """Get full playlist details by id."""
204 if prov_playlist_id in BUILTIN_PLAYLISTS:
205 # this is one of our builtin/default playlists
206 return Playlist(
207 item_id=prov_playlist_id,
208 provider=self.instance_id,
209 name=BUILTIN_PLAYLISTS[prov_playlist_id],
210 provider_mappings={
211 ProviderMapping(
212 item_id=prov_playlist_id,
213 provider_domain=self.domain,
214 provider_instance=self.instance_id,
215 )
216 },
217 owner="Music Assistant",
218 is_editable=False,
219 metadata=MediaItemMetadata(
220 images=UniqueList([DEFAULT_THUMB])
221 if prov_playlist_id in COLLAGE_IMAGE_PLAYLISTS
222 else UniqueList([DEFAULT_THUMB, DEFAULT_FANART]),
223 ),
224 )
225 # user created universal playlist
226 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
227 stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
228 if not stored_item:
229 raise MediaNotFoundError
230 playlist = Playlist(
231 item_id=prov_playlist_id,
232 provider=self.instance_id,
233 name=stored_item["name"],
234 provider_mappings={
235 ProviderMapping(
236 item_id=prov_playlist_id,
237 provider_domain=self.domain,
238 provider_instance=self.instance_id,
239 )
240 },
241 owner="Music Assistant",
242 is_editable=True,
243 )
244 if image_url := stored_item.get("image_url"):
245 playlist.metadata.add_image(
246 MediaItemImage(
247 type=ImageType.THUMB,
248 path=image_url,
249 provider=self.domain,
250 remotely_accessible=image_url.startswith("http"),
251 )
252 )
253 return playlist
254
255 async def get_item(self, media_type: MediaType, prov_item_id: str) -> MediaItemType:
256 """Get single MediaItem from provider."""
257 if media_type == MediaType.ARTIST:
258 return await self.get_artist(prov_item_id)
259 if media_type == MediaType.TRACK:
260 return await self.get_track(prov_item_id)
261 if media_type == MediaType.RADIO:
262 return await self.get_radio(prov_item_id)
263 if media_type == MediaType.PLAYLIST:
264 return await self.get_playlist(prov_item_id)
265 if media_type == MediaType.UNKNOWN:
266 return await self.parse_item(prov_item_id)
267 raise NotImplementedError
268
269 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
270 """Retrieve library tracks from the provider."""
271 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_TRACKS, [])
272 for item in stored_items:
273 try:
274 yield await self.get_track(item["item_id"])
275 except MediaNotFoundError as err:
276 self.logger.warning("Track %s not found: %s", item, err)
277
278 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
279 """Retrieve library/subscribed playlists from the provider."""
280 # return user stored playlists
281 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
282 for item in stored_items:
283 yield await self.get_playlist(item["item_id"])
284 # return builtin playlists
285 for item_id in BUILTIN_PLAYLISTS:
286 if self.config.get_value(item_id) is False:
287 continue
288 yield await self.get_playlist(item_id)
289
290 async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
291 """Retrieve library/subscribed radio stations from the provider."""
292 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_RADIOS, [])
293 for item in stored_items:
294 try:
295 yield await self.get_radio(item["item_id"])
296 except (MediaNotFoundError, InvalidDataError) as err:
297 self.logger.warning("Radio station %s not found: %s", item, err)
298 yield Radio(
299 item_id=item["item_id"],
300 provider=self.instance_id,
301 name=item["name"],
302 provider_mappings={
303 ProviderMapping(
304 item_id=item["item_id"],
305 provider_domain=self.domain,
306 provider_instance=self.instance_id,
307 available=False,
308 )
309 },
310 )
311
312 async def library_add(self, item: MediaItemType) -> bool:
313 """Add item to provider's library. Return true on success."""
314 if item.media_type == MediaType.TRACK:
315 key = CONF_KEY_TRACKS
316 elif item.media_type == MediaType.RADIO:
317 key = CONF_KEY_RADIOS
318 else:
319 return False
320 stored_item = StoredItem(item_id=item.item_id, name=item.name)
321 if item.image:
322 stored_item["image_url"] = item.image.path
323 stored_items: list[StoredItem] = self.mass.config.get(key, [])
324 # filter out existing
325 stored_items = [x for x in stored_items if x["item_id"] != item.item_id]
326 stored_items.append(stored_item)
327 self.mass.config.set(key, stored_items)
328 return True
329
330 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
331 """Remove item from provider's library. Return true on success."""
332 if media_type == MediaType.PLAYLIST and prov_item_id in BUILTIN_PLAYLISTS:
333 # user wants to disable/remove one of our builtin playlists
334 # to prevent it comes back, we mark it as disabled in config
335 self._update_config_value(prov_item_id, False)
336 return True
337 if media_type == MediaType.TRACK:
338 # regular manual track URL/path
339 key = CONF_KEY_TRACKS
340 elif media_type == MediaType.RADIO:
341 # regular manual radio URL/path
342 key = CONF_KEY_RADIOS
343 elif media_type == MediaType.PLAYLIST:
344 # manually added (multi provider) playlist removal
345 key = CONF_KEY_PLAYLISTS
346 # also delete the playlist file if it exists
347 playlist_file = os.path.join(self._playlists_dir, prov_item_id)
348 if await asyncio.to_thread(os.path.isfile, playlist_file):
349 async with self._playlist_lock:
350 await asyncio.to_thread(os.remove, playlist_file)
351 else:
352 return False
353 stored_items: list[StoredItem] = self.mass.config.get(key, [])
354 stored_items = [x for x in stored_items if x["item_id"] != prov_item_id]
355 self.mass.config.set(key, stored_items)
356 return True
357
358 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
359 """Get playlist tracks."""
360 if page > 0:
361 # paging not supported, we always return the whole list at once
362 return []
363 if prov_playlist_id in BUILTIN_PLAYLISTS:
364 return await self._get_builtin_playlist_tracks(prov_playlist_id)
365 # user created universal playlist
366 result: list[Track] = []
367 playlist_items = await self._read_playlist_file_items(prov_playlist_id)
368 for index, uri in enumerate(playlist_items, 1):
369 try:
370 media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
371 media_controller = self.mass.music.get_controller(media_type)
372 # prefer item already in the db
373 track = await media_controller.get_library_item_by_prov_id(
374 item_id, provider_instance_id_or_domain
375 )
376 if track is None:
377 # get the provider item and not the full track from a regular 'get' call
378 # as we only need basic track info here
379 track = await media_controller.get_provider_item(
380 item_id, provider_instance_id_or_domain
381 )
382 assert isinstance(track, Track)
383 track.position = index
384 result.append(track)
385 except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
386 self.logger.warning(
387 "Skipping %s in playlist %s: %s", uri, prov_playlist_id, str(err)
388 )
389 return result
390
391 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
392 """Add track(s) to playlist."""
393 playlist_items = await self._read_playlist_file_items(prov_playlist_id)
394 for uri in prov_track_ids:
395 if uri not in playlist_items:
396 playlist_items.append(uri)
397 # store playlist file
398 await self._write_playlist_file_items(prov_playlist_id, playlist_items)
399 # mark last_updated on playlist object
400 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
401 stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
402 if stored_item:
403 stored_item["last_updated"] = int(time.time())
404 self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
405
406 async def remove_playlist_tracks(
407 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
408 ) -> None:
409 """Remove track(s) from playlist."""
410 playlist_items = await self._read_playlist_file_items(prov_playlist_id)
411 # remove items by index
412 for i in sorted(positions_to_remove, reverse=True):
413 del playlist_items[i - 1]
414 # store playlist file
415 await self._write_playlist_file_items(prov_playlist_id, playlist_items)
416 # mark last_updated on playlist object
417 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
418 stored_item = next((x for x in stored_items if x["item_id"] == prov_playlist_id), None)
419 if stored_item:
420 stored_item["last_updated"] = int(time.time())
421 self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
422
423 async def create_playlist(self, name: str) -> Playlist:
424 """Create a new playlist on provider with given name."""
425 item_id = shortuuid.random(8)
426 stored_item = StoredItem(item_id=item_id, name=name)
427 stored_items: list[StoredItem] = self.mass.config.get(CONF_KEY_PLAYLISTS, [])
428 stored_items.append(stored_item)
429 self.mass.config.set(CONF_KEY_PLAYLISTS, stored_items)
430 return await self.get_playlist(item_id)
431
432 async def parse_item(
433 self,
434 url: str,
435 force_refresh: bool = False,
436 force_radio: bool = False,
437 ) -> Track | Radio:
438 """Parse plain URL to MediaItem of type Radio or Track."""
439 media_info = await self._get_media_info(url, force_refresh)
440 is_radio = media_info.get("icyname") or not media_info.duration
441 provider_mappings = {
442 ProviderMapping(
443 item_id=url,
444 provider_domain=self.domain,
445 provider_instance=self.instance_id,
446 audio_format=AudioFormat(
447 content_type=ContentType.try_parse(media_info.format),
448 sample_rate=media_info.sample_rate,
449 bit_depth=media_info.bits_per_sample,
450 bit_rate=media_info.bit_rate,
451 ),
452 )
453 }
454 media_item: Track | Radio
455 if is_radio or force_radio:
456 # treat as radio
457 media_item = Radio(
458 item_id=url,
459 provider=self.domain,
460 name=media_info.get("icyname")
461 or media_info.get("programtitle")
462 or media_info.title
463 or url,
464 provider_mappings=provider_mappings,
465 )
466 else:
467 media_item = Track(
468 item_id=url,
469 provider=self.domain,
470 name=media_info.title or url,
471 duration=int(media_info.duration or 0),
472 artists=UniqueList(
473 [await self.get_artist(artist) for artist in media_info.artists]
474 ),
475 provider_mappings=provider_mappings,
476 )
477
478 if media_info.has_cover_image:
479 media_item.metadata.images = UniqueList(
480 [
481 MediaItemImage(
482 type=ImageType.THUMB,
483 path=url,
484 provider=self.domain,
485 remotely_accessible=False,
486 )
487 ]
488 )
489 return media_item
490
491 async def resolve_image(self, path: str) -> str | bytes:
492 """
493 Resolve an image from an image path.
494
495 This either returns (a generator to get) raw bytes of the image or
496 a string with an http(s) URL or local path that is accessible from the server.
497 """
498 if path == "logo.png":
499 return MASS_LOGO
500 if path in ("fanart.jpg", "fallback_fanart.jpeg"):
501 return VARIOUS_ARTISTS_FANART
502 return path
503
504 async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
505 """Retrieve mediainfo for url."""
506 # do we have some cached info for this url ?
507 cached_info = await self.mass.cache.get(
508 url, provider=self.instance_id, category=CACHE_CATEGORY_MEDIA_INFO
509 )
510 if cached_info and not force_refresh:
511 return AudioTags.parse(cached_info)
512 # parse info with ffprobe (and store in cache)
513 media_info = await async_parse_tags(url)
514 if "authSig" in url:
515 media_info.has_cover_image = False
516 await self.mass.cache.set(
517 url, media_info.raw, provider=self.instance_id, category=CACHE_CATEGORY_MEDIA_INFO
518 )
519 return media_info
520
521 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
522 """Get streamdetails for a track/radio."""
523 media_info = await self._get_media_info(item_id)
524 is_radio = media_info.get("icy-name") or not media_info.duration
525 return StreamDetails(
526 provider=self.instance_id,
527 item_id=item_id,
528 audio_format=AudioFormat(
529 content_type=ContentType.try_parse(media_info.format),
530 sample_rate=media_info.sample_rate,
531 bit_depth=media_info.bits_per_sample,
532 channels=media_info.channels,
533 ),
534 media_type=MediaType.RADIO if is_radio else MediaType.TRACK,
535 stream_type=StreamType.HTTP,
536 path=item_id,
537 can_seek=not is_radio,
538 allow_seek=not is_radio,
539 )
540
541 @use_cache(expiration=120, category=CACHE_CATEGORY_PLAYLISTS)
542 async def _get_builtin_playlist_random_favorite_tracks(self) -> list[Track]:
543 result: list[Track] = []
544 res = await self.mass.music.tracks.library_items(
545 favorite=True, limit=250000, order_by="random_play_count"
546 )
547 for idx, item in enumerate(res, 1):
548 item.position = idx
549 result.append(item)
550 return result
551
552 @use_cache(expiration=120, category=CACHE_CATEGORY_PLAYLISTS)
553 async def _get_builtin_playlist_random_tracks(self) -> list[Track]:
554 result: list[Track] = []
555 res = await self.mass.music.tracks.library_items(limit=500, order_by="random_play_count")
556 for idx, item in enumerate(res, 1):
557 item.position = idx
558 result.append(item)
559 return result
560
561 @use_cache(expiration=3600, category=CACHE_CATEGORY_PLAYLISTS)
562 async def _get_builtin_playlist_random_album(self) -> list[Track]:
563 for random_album in await self.mass.music.albums.get_library_items_by_query(
564 limit=1,
565 order_by="random",
566 extra_query_parts=["album_type != :excluded_album_type"],
567 extra_query_params={"excluded_album_type": "single"},
568 ):
569 tracks = await self.mass.music.albums.tracks(
570 random_album.item_id, random_album.provider
571 )
572 for idx, track in enumerate(tracks, 1):
573 track.position = idx
574 return tracks
575 return []
576
577 @use_cache(expiration=3600, category=CACHE_CATEGORY_PLAYLISTS)
578 async def _get_builtin_playlist_random_artist(self) -> list[Track]:
579 for in_library_only in (True, False):
580 for min_tracks_required in (25, 10, 5, 1):
581 for random_artist in await self.mass.music.artists.library_items(
582 limit=25, order_by="random"
583 ):
584 tracks = await self.mass.music.artists.tracks(
585 random_artist.item_id,
586 random_artist.provider,
587 in_library_only=in_library_only,
588 )
589 if len(tracks) < min_tracks_required:
590 continue
591 for idx, track in enumerate(tracks, 1):
592 track.position = idx
593 return tracks
594 return []
595
596 @use_cache(expiration=30, category=CACHE_CATEGORY_PLAYLISTS)
597 async def _get_builtin_playlist_recently_played(self) -> list[Track]:
598 result: list[Track] = []
599 recent_tracks = await self.mass.music.recently_played(100, [MediaType.TRACK])
600 for idx, item in enumerate(recent_tracks, 1):
601 if not (item_provider := self.mass.get_provider(item.provider)):
602 continue
603 track = Track(
604 item_id=item.item_id,
605 provider=item.provider,
606 name=item.name,
607 provider_mappings={
608 ProviderMapping(
609 item_id=item.item_id,
610 provider_domain=item_provider.domain,
611 provider_instance=item_provider.instance_id,
612 )
613 },
614 )
615 if item.image:
616 track.metadata.add_image(item.image)
617 track.position = idx
618 result.append(track)
619 return result
620
621 @use_cache(expiration=60, category=CACHE_CATEGORY_PLAYLISTS)
622 async def _get_builtin_playlist_recently_added_tracks(self) -> list[Track]:
623 result: list[Track] = []
624 recent_tracks = await self.mass.music.recently_added_tracks(100)
625 for idx, track in enumerate(recent_tracks, 1):
626 track.position = idx
627 result.append(track)
628 return result
629
630 async def _get_builtin_playlist_tracks(
631 self, builtin_playlist_id: str
632 ) -> list[Track] | UniqueList[Track]:
633 """Get all playlist tracks for given builtin playlist id."""
634 try:
635 return await {
636 ALL_FAVORITE_TRACKS: self._get_builtin_playlist_random_favorite_tracks,
637 RANDOM_TRACKS: self._get_builtin_playlist_random_tracks,
638 RANDOM_ALBUM: self._get_builtin_playlist_random_album,
639 RANDOM_ARTIST: self._get_builtin_playlist_random_artist,
640 RECENTLY_PLAYED: self._get_builtin_playlist_recently_played,
641 RECENTLY_ADDED_TRACKS: self._get_builtin_playlist_recently_added_tracks,
642 }[builtin_playlist_id]()
643 except KeyError:
644 raise MediaNotFoundError(f"No built in playlist: {builtin_playlist_id}")
645
646 async def _read_playlist_file_items(self, playlist_id: str) -> list[str]:
647 """Return lines of a playlist file."""
648 playlist_file = os.path.join(self._playlists_dir, playlist_id)
649 if not await asyncio.to_thread(os.path.isfile, playlist_file):
650 return []
651 async with (
652 self._playlist_lock,
653 aiofiles.open(playlist_file, encoding="utf-8") as _file,
654 ):
655 lines = await _file.readlines()
656 return [x.strip() for x in lines]
657
658 async def _write_playlist_file_items(self, playlist_id: str, lines: list[str]) -> None:
659 """Return lines of a playlist file."""
660 playlist_file = os.path.join(self._playlists_dir, playlist_id)
661 async with (
662 self._playlist_lock,
663 aiofiles.open(playlist_file, "w", encoding="utf-8") as _file,
664 ):
665 await _file.write("\n".join(lines))
666