/
/
/
1"""Media retrieval operations for Tidal."""
2
3from __future__ import annotations
4
5from contextlib import suppress
6from typing import TYPE_CHECKING, Any
7
8from aiohttp.client_exceptions import ClientError
9from music_assistant_models.enums import MediaType
10from music_assistant_models.errors import MediaNotFoundError
11from music_assistant_models.media_items import SearchResults
12
13from .parsers import parse_album, parse_artist, parse_playlist, parse_track
14
15if TYPE_CHECKING:
16 from music_assistant_models.media_items import Album, Artist, Playlist, Track
17
18 from .provider import TidalProvider
19
20
21class TidalMediaManager:
22 """Handles retrieval of media items from Tidal."""
23
24 def __init__(self, provider: TidalProvider):
25 """Initialize media retriever."""
26 self.provider = provider
27 self.api = provider.api
28 self.logger = provider.logger
29
30 async def search(
31 self, search_query: str, media_types: list[MediaType], limit: int = 5
32 ) -> SearchResults:
33 """Perform search on Tidal."""
34 parsed_results = SearchResults()
35 media_type_strings = []
36
37 if MediaType.ARTIST in media_types:
38 media_type_strings.append("artists")
39 if MediaType.ALBUM in media_types:
40 media_type_strings.append("albums")
41 if MediaType.TRACK in media_types:
42 media_type_strings.append("tracks")
43 if MediaType.PLAYLIST in media_types:
44 media_type_strings.append("playlists")
45
46 if not media_type_strings:
47 return parsed_results
48
49 results = await self.api.get_data(
50 "search",
51 params={
52 "query": search_query.replace("'", ""),
53 "limit": limit,
54 "types": ",".join(media_type_strings),
55 },
56 )
57
58 if "artists" in results and results["artists"].get("items"):
59 parsed_results.artists = [
60 parse_artist(self.provider, x) for x in results["artists"]["items"]
61 ]
62 if "albums" in results and results["albums"].get("items"):
63 parsed_results.albums = [
64 parse_album(self.provider, x) for x in results["albums"]["items"]
65 ]
66 if "playlists" in results and results["playlists"].get("items"):
67 parsed_results.playlists = [
68 parse_playlist(self.provider, x) for x in results["playlists"]["items"]
69 ]
70 if "tracks" in results and results["tracks"].get("items"):
71 parsed_results.tracks = [
72 parse_track(self.provider, x) for x in results["tracks"]["items"]
73 ]
74 return parsed_results
75
76 async def get_artist(self, prov_artist_id: str) -> Artist:
77 """Get artist details."""
78 try:
79 data = await self.api.get_data(f"artists/{prov_artist_id}")
80 return parse_artist(self.provider, data)
81 except (ClientError, KeyError, ValueError) as err:
82 raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
83
84 async def get_album(self, prov_album_id: str) -> Album:
85 """Get album details."""
86 try:
87 data = await self.api.get_data(f"albums/{prov_album_id}")
88 return parse_album(self.provider, data)
89 except (ClientError, KeyError, ValueError) as err:
90 raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
91
92 async def get_track(self, prov_track_id: str) -> Track:
93 """Get track details."""
94 try:
95 track_obj = await self.api.get_data(f"tracks/{prov_track_id}")
96
97 lyrics = None
98 with suppress(MediaNotFoundError):
99 lyrics = await self.api.get_data(f"tracks/{prov_track_id}/lyrics")
100
101 return parse_track(self.provider, track_obj, lyrics=lyrics)
102 except (ClientError, KeyError, ValueError) as err:
103 raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
104
105 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
106 """Get playlist details."""
107 if prov_playlist_id.startswith("mix_"):
108 return await self._get_mix_details(prov_playlist_id[4:])
109
110 try:
111 data = await self.api.get_data(f"playlists/{prov_playlist_id}")
112 return parse_playlist(self.provider, data)
113 except MediaNotFoundError:
114 return await self._get_mix_details(prov_playlist_id)
115 except (ClientError, KeyError, ValueError) as err:
116 raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
117
118 async def _get_mix_details(self, prov_mix_id: str) -> Playlist:
119 """Get details for a Tidal Mix."""
120 try:
121 params = {"mixId": prov_mix_id, "deviceType": "BROWSER"}
122 tidal_mix = await self.api.get_data("pages/mix", params=params)
123
124 mix_obj = {
125 "id": prov_mix_id,
126 "title": tidal_mix.get("title", "Unknown Mix"),
127 "updated": tidal_mix.get("lastUpdated", ""),
128 "images": {},
129 }
130
131 # Try to extract images from rows/modules structure
132 rows = tidal_mix.get("rows", [])
133 if rows and (modules := rows[0].get("modules")):
134 if mix_data := modules[0].get("mix"):
135 mix_obj["images"] = mix_data.get("images", {})
136
137 if "subTitle" not in mix_obj:
138 mix_obj["subTitle"] = tidal_mix.get("subTitle", "")
139
140 return parse_playlist(self.provider, mix_obj, is_mix=True)
141 except (ClientError, KeyError, ValueError) as err:
142 raise MediaNotFoundError(f"Mix {prov_mix_id} not found") from err
143
144 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
145 """Get album tracks."""
146 try:
147 data = await self.api.get_data(f"albums/{prov_album_id}/tracks", params={"limit": 250})
148 return [parse_track(self.provider, x) for x in data.get("items", [])]
149 except (ClientError, KeyError, ValueError) as err:
150 raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
151
152 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
153 """Get artist albums."""
154 try:
155 data = await self.api.get_data(
156 f"artists/{prov_artist_id}/albums", params={"limit": 250}
157 )
158 return [parse_album(self.provider, x) for x in data.get("items", [])]
159 except (ClientError, KeyError, ValueError) as err:
160 raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
161
162 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
163 """Get artist top tracks."""
164 try:
165 data = await self.api.get_data(
166 f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0}
167 )
168 return [parse_track(self.provider, x) for x in data.get("items", [])]
169 except (ClientError, KeyError, ValueError) as err:
170 raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
171
172 async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
173 """Get similar tracks."""
174 try:
175 data = await self.api.get_data(f"tracks/{prov_track_id}/radio", params={"limit": limit})
176 return [parse_track(self.provider, x) for x in data.get("items", [])]
177 except (ClientError, KeyError, ValueError) as err:
178 raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
179
180 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
181 """Get playlist tracks."""
182 page_size = 200
183 offset = page * page_size
184
185 if prov_playlist_id.startswith("mix_"):
186 return await self._get_mix_tracks(prov_playlist_id[4:], page_size, offset)
187
188 try:
189 data = await self.api.get_data(
190 f"playlists/{prov_playlist_id}/tracks",
191 params={"limit": page_size, "offset": offset},
192 )
193 return self._process_tracks(data.get("items", []), offset)
194 except MediaNotFoundError:
195 return await self._get_mix_tracks(prov_playlist_id, page_size, offset)
196
197 async def _get_mix_tracks(self, mix_id: str, limit: int, offset: int) -> list[Track]:
198 """Get tracks from a mix."""
199 try:
200 params = {"mixId": mix_id, "deviceType": "BROWSER"}
201 data = await self.api.get_data("pages/mix", params=params)
202
203 # Mix tracks are usually in the second row
204 rows = data.get("rows", [])
205 if len(rows) < 2:
206 raise MediaNotFoundError(f"Mix {mix_id} has no tracks")
207
208 modules = rows[1].get("modules", [])
209 if not modules or "pagedList" not in modules[0]:
210 raise MediaNotFoundError(f"Mix {mix_id} has no tracks")
211
212 all_items = modules[0]["pagedList"].get("items", [])
213 # Manual pagination for mixes
214 paged_items = all_items[offset : offset + limit]
215 return self._process_tracks(paged_items, offset)
216 except (ClientError, KeyError, ValueError) as err:
217 raise MediaNotFoundError(f"Mix {mix_id} not found") from err
218
219 def _process_tracks(self, items: list[dict[str, Any]], offset: int) -> list[Track]:
220 result = []
221 for idx, item in enumerate(items, 1):
222 try:
223 track = parse_track(self.provider, item)
224 track.position = offset + idx
225 result.append(track)
226 except (KeyError, TypeError):
227 continue
228 return result
229