music-assistant-server

9.5 KBPY
media.py
9.5 KB229 lines • python
1"""Media retrieval operations for Tidal."""
2
3from __future__ import annotations
4
5from contextlib import suppress
6from typing import TYPE_CHECKING, Any
7
8from aiohttp.client_exceptions import ClientError
9from music_assistant_models.enums import MediaType
10from music_assistant_models.errors import MediaNotFoundError
11from music_assistant_models.media_items import SearchResults
12
13from .parsers import parse_album, parse_artist, parse_playlist, parse_track
14
15if TYPE_CHECKING:
16    from music_assistant_models.media_items import Album, Artist, Playlist, Track
17
18    from .provider import TidalProvider
19
20
21class TidalMediaManager:
22    """Handles retrieval of media items from Tidal."""
23
24    def __init__(self, provider: TidalProvider):
25        """Initialize media retriever."""
26        self.provider = provider
27        self.api = provider.api
28        self.logger = provider.logger
29
30    async def search(
31        self, search_query: str, media_types: list[MediaType], limit: int = 5
32    ) -> SearchResults:
33        """Perform search on Tidal."""
34        parsed_results = SearchResults()
35        media_type_strings = []
36
37        if MediaType.ARTIST in media_types:
38            media_type_strings.append("artists")
39        if MediaType.ALBUM in media_types:
40            media_type_strings.append("albums")
41        if MediaType.TRACK in media_types:
42            media_type_strings.append("tracks")
43        if MediaType.PLAYLIST in media_types:
44            media_type_strings.append("playlists")
45
46        if not media_type_strings:
47            return parsed_results
48
49        results = await self.api.get_data(
50            "search",
51            params={
52                "query": search_query.replace("'", ""),
53                "limit": limit,
54                "types": ",".join(media_type_strings),
55            },
56        )
57
58        if "artists" in results and results["artists"].get("items"):
59            parsed_results.artists = [
60                parse_artist(self.provider, x) for x in results["artists"]["items"]
61            ]
62        if "albums" in results and results["albums"].get("items"):
63            parsed_results.albums = [
64                parse_album(self.provider, x) for x in results["albums"]["items"]
65            ]
66        if "playlists" in results and results["playlists"].get("items"):
67            parsed_results.playlists = [
68                parse_playlist(self.provider, x) for x in results["playlists"]["items"]
69            ]
70        if "tracks" in results and results["tracks"].get("items"):
71            parsed_results.tracks = [
72                parse_track(self.provider, x) for x in results["tracks"]["items"]
73            ]
74        return parsed_results
75
76    async def get_artist(self, prov_artist_id: str) -> Artist:
77        """Get artist details."""
78        try:
79            data = await self.api.get_data(f"artists/{prov_artist_id}")
80            return parse_artist(self.provider, data)
81        except (ClientError, KeyError, ValueError) as err:
82            raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
83
84    async def get_album(self, prov_album_id: str) -> Album:
85        """Get album details."""
86        try:
87            data = await self.api.get_data(f"albums/{prov_album_id}")
88            return parse_album(self.provider, data)
89        except (ClientError, KeyError, ValueError) as err:
90            raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
91
92    async def get_track(self, prov_track_id: str) -> Track:
93        """Get track details."""
94        try:
95            track_obj = await self.api.get_data(f"tracks/{prov_track_id}")
96
97            lyrics = None
98            with suppress(MediaNotFoundError):
99                lyrics = await self.api.get_data(f"tracks/{prov_track_id}/lyrics")
100
101            return parse_track(self.provider, track_obj, lyrics=lyrics)
102        except (ClientError, KeyError, ValueError) as err:
103            raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
104
105    async def get_playlist(self, prov_playlist_id: str) -> Playlist:
106        """Get playlist details."""
107        if prov_playlist_id.startswith("mix_"):
108            return await self._get_mix_details(prov_playlist_id[4:])
109
110        try:
111            data = await self.api.get_data(f"playlists/{prov_playlist_id}")
112            return parse_playlist(self.provider, data)
113        except MediaNotFoundError:
114            return await self._get_mix_details(prov_playlist_id)
115        except (ClientError, KeyError, ValueError) as err:
116            raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
117
118    async def _get_mix_details(self, prov_mix_id: str) -> Playlist:
119        """Get details for a Tidal Mix."""
120        try:
121            params = {"mixId": prov_mix_id, "deviceType": "BROWSER"}
122            tidal_mix = await self.api.get_data("pages/mix", params=params)
123
124            mix_obj = {
125                "id": prov_mix_id,
126                "title": tidal_mix.get("title", "Unknown Mix"),
127                "updated": tidal_mix.get("lastUpdated", ""),
128                "images": {},
129            }
130
131            # Try to extract images from rows/modules structure
132            rows = tidal_mix.get("rows", [])
133            if rows and (modules := rows[0].get("modules")):
134                if mix_data := modules[0].get("mix"):
135                    mix_obj["images"] = mix_data.get("images", {})
136
137            if "subTitle" not in mix_obj:
138                mix_obj["subTitle"] = tidal_mix.get("subTitle", "")
139
140            return parse_playlist(self.provider, mix_obj, is_mix=True)
141        except (ClientError, KeyError, ValueError) as err:
142            raise MediaNotFoundError(f"Mix {prov_mix_id} not found") from err
143
144    async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
145        """Get album tracks."""
146        try:
147            data = await self.api.get_data(f"albums/{prov_album_id}/tracks", params={"limit": 250})
148            return [parse_track(self.provider, x) for x in data.get("items", [])]
149        except (ClientError, KeyError, ValueError) as err:
150            raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
151
152    async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
153        """Get artist albums."""
154        try:
155            data = await self.api.get_data(
156                f"artists/{prov_artist_id}/albums", params={"limit": 250}
157            )
158            return [parse_album(self.provider, x) for x in data.get("items", [])]
159        except (ClientError, KeyError, ValueError) as err:
160            raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
161
162    async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
163        """Get artist top tracks."""
164        try:
165            data = await self.api.get_data(
166                f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0}
167            )
168            return [parse_track(self.provider, x) for x in data.get("items", [])]
169        except (ClientError, KeyError, ValueError) as err:
170            raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
171
172    async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
173        """Get similar tracks."""
174        try:
175            data = await self.api.get_data(f"tracks/{prov_track_id}/radio", params={"limit": limit})
176            return [parse_track(self.provider, x) for x in data.get("items", [])]
177        except (ClientError, KeyError, ValueError) as err:
178            raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
179
180    async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
181        """Get playlist tracks."""
182        page_size = 200
183        offset = page * page_size
184
185        if prov_playlist_id.startswith("mix_"):
186            return await self._get_mix_tracks(prov_playlist_id[4:], page_size, offset)
187
188        try:
189            data = await self.api.get_data(
190                f"playlists/{prov_playlist_id}/tracks",
191                params={"limit": page_size, "offset": offset},
192            )
193            return self._process_tracks(data.get("items", []), offset)
194        except MediaNotFoundError:
195            return await self._get_mix_tracks(prov_playlist_id, page_size, offset)
196
197    async def _get_mix_tracks(self, mix_id: str, limit: int, offset: int) -> list[Track]:
198        """Get tracks from a mix."""
199        try:
200            params = {"mixId": mix_id, "deviceType": "BROWSER"}
201            data = await self.api.get_data("pages/mix", params=params)
202
203            # Mix tracks are usually in the second row
204            rows = data.get("rows", [])
205            if len(rows) < 2:
206                raise MediaNotFoundError(f"Mix {mix_id} has no tracks")
207
208            modules = rows[1].get("modules", [])
209            if not modules or "pagedList" not in modules[0]:
210                raise MediaNotFoundError(f"Mix {mix_id} has no tracks")
211
212            all_items = modules[0]["pagedList"].get("items", [])
213            # Manual pagination for mixes
214            paged_items = all_items[offset : offset + limit]
215            return self._process_tracks(paged_items, offset)
216        except (ClientError, KeyError, ValueError) as err:
217            raise MediaNotFoundError(f"Mix {mix_id} not found") from err
218
219    def _process_tracks(self, items: list[dict[str, Any]], offset: int) -> list[Track]:
220        result = []
221        for idx, item in enumerate(items, 1):
222            try:
223                track = parse_track(self.provider, item)
224                track.position = offset + idx
225                result.append(track)
226            except (KeyError, TypeError):
227                continue
228        return result
229