music-assistant-server

6.8 KBPY
browse.py
6.8 KB182 lines • python
1"""Helpers for Apple Music playlist browsing."""
2
3from __future__ import annotations
4
5from collections.abc import Sequence
6from dataclasses import dataclass, replace
7from typing import TYPE_CHECKING, Any
8
9from music_assistant_models.enums import MediaType
10from music_assistant_models.errors import MediaNotFoundError
11from music_assistant_models.media_items import BrowseFolder, Playlist, ProviderMapping
12
13if TYPE_CHECKING:
14    from music_assistant.providers.apple_music import AppleMusicProvider
15
16ROOT_PLAYLIST_FOLDER_ID = "p.playlistsroot"
17# Apple exposes the entire playlist hierarchy under this synthetic root. We walk the
18# tree lazily, fetching the exact branch the user opens instead of preloading.
19
20
21@dataclass(slots=True)
22class AppleMusicPlaylistFolder:
23    """Lightweight representation of a folder node returned by Apple."""
24
25    item_id: str
26    name: str
27
28
29def _folder_path_segment(name: str) -> str:
30    """Return human-readable, path-safe breadcrumb text."""
31    return (name.strip() or "Folder").replace("/", "-").replace("|", "-")
32
33
34def _extract_playlist_folder_id(path_parts: list[str]) -> str | None:
35    """Extract the active folder id from a playlist browse path."""
36    if not path_parts:
37        return None
38    last_segment = path_parts[-1]
39    if "|" in last_segment:
40        return last_segment.rsplit("|", 1)[1]
41    return last_segment
42
43
44def _folder_nodes(
45    provider: AppleMusicProvider,
46    folders: list[AppleMusicPlaylistFolder],
47    base_path: str,
48) -> list[BrowseFolder]:
49    """Convert folder metadata returned by the API into browse nodes."""
50    normalized_base = base_path.rstrip("/")
51    items: list[BrowseFolder] = []
52    for folder in folders:
53        folder_name = folder.name or "Folder"
54        segment_name = _folder_path_segment(folder_name)
55        segment = f"{segment_name}|{folder.item_id}"
56        items.append(
57            BrowseFolder(
58                item_id=f"folder:{folder.item_id}",
59                provider=provider.instance_id,
60                path=f"{normalized_base}/{segment}",
61                name=folder_name,
62            )
63        )
64    return items
65
66
67async def _fetch_playlist_folder_children(
68    provider: AppleMusicProvider,
69    folder_id: str | None = None,
70) -> tuple[list[AppleMusicPlaylistFolder], list[Playlist]]:
71    """Fetch folders/playlists for a single branch of the Apple Music tree."""
72    apple_folder_id = folder_id or ROOT_PLAYLIST_FOLDER_ID
73    endpoint = f"me/library/playlist-folders/{apple_folder_id}/children"
74    try:
75        children = await provider._get_all_items(endpoint)
76    except MediaNotFoundError:
77        children = []
78    folders: list[AppleMusicPlaylistFolder] = []
79    playlist_entries: list[dict[str, Any]] = []
80    library_playlist_ids: list[str] = []
81    for child in children:
82        child_id = child.get("id")
83        if not child_id:
84            continue
85        child_type = child.get("type")
86        attributes = child.get("attributes") or {}
87        if child_type == "library-playlist-folders":
88            folders.append(
89                AppleMusicPlaylistFolder(
90                    item_id=child_id,
91                    name=attributes.get("name") or "Folder",
92                )
93            )
94        elif child_type == "library-playlists":
95            playlist_entries.append(child)
96            if provider.is_library_id(child_id):
97                library_playlist_ids.append(child_id)
98    ratings: dict[str, Any] = {}
99    if library_playlist_ids:
100        ratings = await provider._get_ratings(library_playlist_ids, MediaType.PLAYLIST)
101    playlists: list[Playlist] = []
102    for playlist_entry in playlist_entries:
103        playlist_id = playlist_entry.get("id")
104        is_favourite = ratings.get(playlist_id)
105        attributes = playlist_entry.get("attributes") or {}
106        play_params = attributes.get("playParams") or {}
107        global_id = play_params.get("globalId")
108
109        # Start with the original entry, potentially modify it below
110        playlist_obj = playlist_entry
111
112        if attributes.get("hasCatalog") and global_id and not provider.is_library_id(global_id):
113            try:
114                playlist = await provider.get_playlist(global_id, is_favourite)
115            except MediaNotFoundError:
116                provider.logger.debug(
117                    "Catalog playlist %s not found, falling back to library metadata",
118                    global_id,
119                )
120                playlist_obj = _playlist_without_global_id(playlist_obj)
121            else:
122                playlists.append(_apply_library_id(playlist, playlist_id, provider))
123                continue
124        playlists.append(provider._parse_playlist(playlist_obj, is_favourite))
125    playlists.sort(key=lambda item: (item.name or "").casefold())
126    folders.sort(key=lambda folder: folder.name.casefold())
127    return folders, playlists
128
129
130def _playlist_without_global_id(playlist_obj: dict[str, Any]) -> dict[str, Any]:
131    """Return a shallow copy without a catalog ID.
132
133    Some folders report `hasCatalog=True` but their catalog playlist fetch fails.
134    When that happens we strip the bogus `globalId` so downstream parsing sticks
135    to the library ID (which *can* be resolved).
136    """
137    new_obj = dict(playlist_obj)
138    attributes = dict(new_obj.get("attributes") or {})
139    play_params = dict(attributes.get("playParams") or {})
140    play_params.pop("globalId", None)
141    attributes["playParams"] = play_params
142    new_obj["attributes"] = attributes
143    return new_obj
144
145
146def _apply_library_id(
147    playlist: Playlist, library_id: str, provider: AppleMusicProvider
148) -> Playlist:
149    """Return a copy of `playlist` that always points to the library endpoint.
150
151    `get_playlist` is cached, so mutating the original object would leak those
152    changes to other consumers of the cached catalog playlist.  Instead we clone
153    the dataclass with `replace`, swap the ids for this provider instance, and
154    keep the cached object untouched.
155    """
156    new_mappings: set[ProviderMapping] = set()
157    for mapping in playlist.provider_mappings:
158        if mapping.provider_instance == provider.instance_id:
159            new_mappings.add(replace(mapping, item_id=library_id))
160        else:
161            new_mappings.add(mapping)
162    return replace(
163        playlist,
164        item_id=library_id,
165        provider=provider.instance_id,
166        provider_mappings=new_mappings,
167    )
168
169
170async def browse_playlists(
171    provider: AppleMusicProvider, path: str, path_parts: list[str]
172) -> Sequence[BrowseFolder | Playlist]:
173    """Handle playlist browsing for the Apple Music provider."""
174    folder_id: str | None = None
175    base_path = f"{provider.instance_id}://playlists"
176    if len(path_parts) > 1:
177        folder_id = _extract_playlist_folder_id(path_parts[1:])
178        base_path = path.rstrip("/")
179    folders, playlists = await _fetch_playlist_folder_children(provider, folder_id)
180    folder_nodes = _folder_nodes(provider, folders, base_path)
181    return [*folder_nodes, *playlists]
182