/
/
/
1"""Parsing utilities to convert Spotify API responses into Music Assistant model objects."""
2
3from __future__ import annotations
4
5import contextlib
6from datetime import datetime
7from typing import TYPE_CHECKING, Any
8
9from music_assistant_models.enums import AlbumType, ContentType, ExternalID, ImageType
10from music_assistant_models.media_items import (
11 Album,
12 Artist,
13 Audiobook,
14 AudioFormat,
15 MediaItemImage,
16 Playlist,
17 Podcast,
18 PodcastEpisode,
19 ProviderMapping,
20 Track,
21)
22from music_assistant_models.unique_list import UniqueList
23
24from music_assistant.helpers.util import infer_album_type, parse_title_and_version
25
26if TYPE_CHECKING:
27 from .provider import SpotifyProvider
28
29
30def parse_images(
31 images_list: list[dict[str, Any]], instance_id: str, exclude_generic: bool = False
32) -> UniqueList[MediaItemImage]:
33 """Parse images list into MediaItemImage objects."""
34 if not images_list:
35 return UniqueList([])
36
37 # Filter out generic images if requested (for artists)
38 filtered_images = []
39 for img in images_list:
40 img_url = img["url"]
41 if exclude_generic and "2a96cbd8b46e442fc41c2b86b821562f" in img_url:
42 continue
43 filtered_images.append(img)
44
45 if not filtered_images:
46 return UniqueList([])
47
48 # Spotify images come in various sizes (typically 640x640, 300x300, 64x64)
49 # Find the largest image available
50 best_image = max(
51 filtered_images, key=lambda img: img.get("height", 0), default=filtered_images[0]
52 )
53
54 return UniqueList(
55 [
56 MediaItemImage(
57 type=ImageType.THUMB,
58 path=best_image["url"],
59 provider=instance_id,
60 remotely_accessible=True,
61 )
62 ]
63 )
64
65
66def parse_artist(artist_obj: dict[str, Any], provider: SpotifyProvider) -> Artist:
67 """Parse spotify artist object to generic layout."""
68 artist = Artist(
69 item_id=artist_obj["id"],
70 provider=provider.instance_id,
71 name=artist_obj["name"] or artist_obj["id"],
72 provider_mappings={
73 ProviderMapping(
74 item_id=artist_obj["id"],
75 provider_domain=provider.domain,
76 provider_instance=provider.instance_id,
77 url=artist_obj["external_urls"]["spotify"],
78 )
79 },
80 )
81 if "genres" in artist_obj:
82 artist.metadata.genres = set(artist_obj["genres"])
83
84 # Use unified image parsing with generic exclusion
85 artist.metadata.images = parse_images(
86 artist_obj.get("images", []), provider.instance_id, exclude_generic=True
87 )
88 return artist
89
90
91def parse_album(album_obj: dict[str, Any], provider: SpotifyProvider) -> Album:
92 """Parse spotify album object to generic layout."""
93 name, version = parse_title_and_version(album_obj["name"])
94 album = Album(
95 item_id=album_obj["id"],
96 provider=provider.instance_id,
97 name=name,
98 version=version,
99 provider_mappings={
100 ProviderMapping(
101 item_id=album_obj["id"],
102 provider_domain=provider.domain,
103 provider_instance=provider.instance_id,
104 audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
105 url=album_obj["external_urls"]["spotify"],
106 )
107 },
108 )
109 if "external_ids" in album_obj and album_obj["external_ids"].get("upc"):
110 album.external_ids.add((ExternalID.BARCODE, "0" + album_obj["external_ids"]["upc"]))
111 if "external_ids" in album_obj and album_obj["external_ids"].get("ean"):
112 album.external_ids.add((ExternalID.BARCODE, album_obj["external_ids"]["ean"]))
113
114 for artist_obj in album_obj["artists"]:
115 if not artist_obj.get("name") or not artist_obj.get("id"):
116 continue
117 album.artists.append(parse_artist(artist_obj, provider))
118
119 with contextlib.suppress(ValueError):
120 album.album_type = AlbumType(album_obj["album_type"])
121
122 # Override with inferred type if version indicates it
123 inferred_type = infer_album_type(album.name, album.version)
124 if inferred_type in (AlbumType.LIVE, AlbumType.SOUNDTRACK):
125 album.album_type = inferred_type
126
127 if "genres" in album_obj:
128 album.metadata.genres = set(album_obj["genres"])
129
130 album.metadata.images = parse_images(album_obj.get("images", []), provider.instance_id)
131
132 if "label" in album_obj:
133 album.metadata.label = album_obj["label"]
134 if album_obj.get("release_date"):
135 album.year = int(album_obj["release_date"].split("-")[0])
136 if album_obj.get("copyrights"):
137 album.metadata.copyright = album_obj["copyrights"][0]["text"]
138 if album_obj.get("explicit"):
139 album.metadata.explicit = album_obj["explicit"]
140 return album
141
142
143def parse_track(
144 track_obj: dict[str, Any],
145 provider: SpotifyProvider,
146 artist: Artist | None = None,
147) -> Track:
148 """Parse spotify track object to generic layout."""
149 name, version = parse_title_and_version(track_obj["name"])
150 track = Track(
151 item_id=track_obj["id"],
152 provider=provider.instance_id,
153 name=name,
154 version=version,
155 duration=track_obj["duration_ms"] / 1000,
156 provider_mappings={
157 ProviderMapping(
158 item_id=track_obj["id"],
159 provider_domain=provider.domain,
160 provider_instance=provider.instance_id,
161 audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
162 url=track_obj["external_urls"]["spotify"],
163 available=not track_obj["is_local"] and track_obj["is_playable"],
164 )
165 },
166 disc_number=track_obj.get("disc_number", 0),
167 track_number=track_obj.get("track_number", 0),
168 )
169 if isrc := track_obj.get("external_ids", {}).get("isrc"):
170 track.external_ids.add((ExternalID.ISRC, isrc))
171
172 if artist:
173 track.artists.append(artist)
174 for track_artist in track_obj.get("artists", []):
175 if not track_artist.get("name") or not track_artist.get("id"):
176 continue
177 artist_parsed = parse_artist(track_artist, provider)
178 if artist_parsed and artist_parsed.item_id not in {x.item_id for x in track.artists}:
179 track.artists.append(artist_parsed)
180
181 track.metadata.explicit = track_obj["explicit"]
182 if "preview_url" in track_obj:
183 track.metadata.preview = track_obj["preview_url"]
184 if "album" in track_obj:
185 track.album = parse_album(track_obj["album"], provider)
186 track.metadata.images = parse_images(
187 track_obj["album"].get("images", []), provider.instance_id
188 )
189 if track_obj.get("copyright"):
190 track.metadata.copyright = track_obj["copyright"]
191 if track_obj.get("explicit"):
192 track.metadata.explicit = True
193 if track_obj.get("popularity"):
194 track.metadata.popularity = track_obj["popularity"]
195 return track
196
197
198def parse_playlist(playlist_obj: dict[str, Any], provider: SpotifyProvider) -> Playlist:
199 """Parse spotify playlist object to generic layout."""
200 owner_id = playlist_obj["owner"].get("id", "")
201 is_editable = (
202 provider._sp_user is not None and owner_id == provider._sp_user["id"]
203 ) or playlist_obj["collaborative"]
204
205 # Spotify-owned playlists (Daily Mix, Discover Weekly, etc.) are personalized per user
206 is_spotify_owned = owner_id.lower() == "spotify"
207
208 # Get owner name with fallback
209 owner_name = playlist_obj["owner"].get("display_name")
210 if owner_name is None and provider._sp_user is not None:
211 owner_name = provider._sp_user["display_name"]
212
213 # Mark as unique if user-owned/editable OR if it's a Spotify personalized playlist
214 is_unique = is_editable or is_spotify_owned
215
216 playlist = Playlist(
217 item_id=playlist_obj["id"],
218 provider=provider.instance_id,
219 name=playlist_obj["name"],
220 owner=owner_name,
221 provider_mappings={
222 ProviderMapping(
223 item_id=playlist_obj["id"],
224 provider_domain=provider.domain,
225 provider_instance=provider.instance_id,
226 url=playlist_obj["external_urls"]["spotify"],
227 is_unique=is_unique,
228 )
229 },
230 is_editable=is_editable,
231 )
232
233 playlist.metadata.images = parse_images(playlist_obj.get("images", []), provider.instance_id)
234 return playlist
235
236
237def parse_podcast(podcast_obj: dict[str, Any], provider: SpotifyProvider) -> Podcast:
238 """Parse spotify podcast (show) object to generic layout."""
239 podcast = Podcast(
240 item_id=podcast_obj["id"],
241 provider=provider.instance_id,
242 name=podcast_obj["name"],
243 provider_mappings={
244 ProviderMapping(
245 item_id=podcast_obj["id"],
246 provider_domain=provider.domain,
247 provider_instance=provider.instance_id,
248 url=podcast_obj["external_urls"]["spotify"],
249 )
250 },
251 publisher=podcast_obj.get("publisher"),
252 total_episodes=podcast_obj.get("total_episodes"),
253 )
254
255 # Set metadata
256 if podcast_obj.get("description"):
257 podcast.metadata.description = podcast_obj["description"]
258
259 podcast.metadata.images = parse_images(podcast_obj.get("images", []), provider.instance_id)
260
261 if "explicit" in podcast_obj:
262 podcast.metadata.explicit = podcast_obj["explicit"]
263
264 # Convert languages list to genres for categorization
265 if "languages" in podcast_obj:
266 podcast.metadata.genres = set(podcast_obj["languages"])
267
268 return podcast
269
270
271def parse_podcast_episode(
272 episode_obj: dict[str, Any], provider: SpotifyProvider, podcast: Podcast | None = None
273) -> PodcastEpisode:
274 """Parse spotify podcast episode object to generic layout."""
275 # Get or create a basic podcast reference if not provided
276 if podcast is None and "show" in episode_obj:
277 podcast = Podcast(
278 item_id=episode_obj["show"]["id"],
279 provider=provider.instance_id,
280 name=episode_obj["show"]["name"],
281 provider_mappings={
282 ProviderMapping(
283 item_id=episode_obj["show"]["id"],
284 provider_domain=provider.domain,
285 provider_instance=provider.instance_id,
286 url=episode_obj["show"]["external_urls"]["spotify"],
287 )
288 },
289 )
290 elif podcast is None:
291 # Create a minimal podcast reference if none available
292 podcast = Podcast(
293 item_id="unknown",
294 provider=provider.instance_id,
295 name="Unknown Podcast",
296 provider_mappings=set(),
297 )
298
299 episode = PodcastEpisode(
300 item_id=episode_obj["id"],
301 provider=provider.instance_id,
302 name=episode_obj["name"],
303 duration=episode_obj["duration_ms"] // 1000 if episode_obj.get("duration_ms") else 0,
304 podcast=podcast,
305 position=0,
306 provider_mappings={
307 ProviderMapping(
308 item_id=episode_obj["id"],
309 provider_domain=provider.domain,
310 provider_instance=provider.instance_id,
311 audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=160),
312 url=episode_obj["external_urls"]["spotify"],
313 )
314 },
315 )
316
317 # Set description in metadata
318 if episode_obj.get("description"):
319 episode.metadata.description = episode_obj["description"]
320
321 # Add release date to metadata
322 if episode_obj.get("release_date"):
323 with contextlib.suppress(ValueError, TypeError):
324 date_str = episode_obj["release_date"].strip()
325
326 if len(date_str) == 4:
327 # Year only: "2023" -> "2023-01-01T00:00:00+00:00"
328 date_str = f"{date_str}-01-01T00:00:00+00:00"
329 elif len(date_str) == 10:
330 # Date only: "2023-12-25" -> "2023-12-25T00:00:00+00:00"
331 date_str = f"{date_str}T00:00:00+00:00"
332
333 episode.metadata.release_date = datetime.fromisoformat(date_str)
334
335 episode.metadata.images = parse_images(episode_obj.get("images", []), provider.instance_id)
336
337 # Use podcast artwork if episode has none
338 if not episode.metadata.images and isinstance(podcast, Podcast) and podcast.metadata.images:
339 episode.metadata.images = podcast.metadata.images
340
341 if "explicit" in episode_obj:
342 episode.metadata.explicit = episode_obj["explicit"]
343
344 if "audio_preview_url" in episode_obj:
345 episode.metadata.preview = episode_obj["audio_preview_url"]
346
347 return episode
348
349
350def parse_audiobook(audiobook_obj: dict[str, Any], provider: SpotifyProvider) -> Audiobook:
351 """Parse spotify audiobook object to generic layout."""
352 audiobook = Audiobook(
353 item_id=audiobook_obj["id"],
354 provider=provider.instance_id,
355 name=audiobook_obj["name"],
356 provider_mappings={
357 ProviderMapping(
358 item_id=audiobook_obj["id"],
359 provider_domain=provider.domain,
360 provider_instance=provider.instance_id,
361 audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
362 url=audiobook_obj["external_urls"]["spotify"],
363 )
364 },
365 )
366
367 if "duration_ms" in audiobook_obj:
368 provider.logger.debug(
369 f"Found duration_ms in audiobook object: {audiobook_obj['duration_ms']}"
370 )
371 audiobook.duration = audiobook_obj["duration_ms"] // 1000
372 else:
373 provider.logger.debug(
374 "No duration_ms found in main audiobook object - will calculate from chapters"
375 )
376 # Don't set duration here - let get_audiobook calculate it from chapters
377 audiobook.duration = 0
378
379 # Set authors
380 if "authors" in audiobook_obj:
381 for author_obj in audiobook_obj["authors"]:
382 if author_obj.get("name"):
383 audiobook.authors.append(author_obj["name"])
384
385 # Set narrators
386 if "narrators" in audiobook_obj:
387 for narrator_obj in audiobook_obj["narrators"]:
388 if narrator_obj.get("name"):
389 audiobook.narrators.append(narrator_obj["name"])
390
391 # Set metadata
392 if audiobook_obj.get("description"):
393 audiobook.metadata.description = audiobook_obj["description"]
394
395 if audiobook_obj.get("publisher"):
396 audiobook.publisher = audiobook_obj["publisher"]
397
398 audiobook.metadata.images = parse_images(audiobook_obj.get("images", []), provider.instance_id)
399
400 if audiobook_obj.get("explicit"):
401 audiobook.metadata.explicit = audiobook_obj["explicit"]
402
403 if audiobook_obj.get("languages"):
404 audiobook.metadata.languages = audiobook_obj["languages"][0]
405
406 # Set publication date if available
407 if audiobook_obj.get("publication_date"):
408 with contextlib.suppress(ValueError, TypeError):
409 date_str = audiobook_obj["publication_date"].strip()
410 if len(date_str) == 4:
411 # Year only: "2023" -> "2023-01-01T00:00:00+00:00"
412 date_str = f"{date_str}-01-01T00:00:00+00:00"
413 elif len(date_str) == 10:
414 # Date only: "2023-12-25" -> "2023-12-25T00:00:00+00:00"
415 date_str = f"{date_str}T00:00:00+00:00"
416 audiobook.metadata.release_date = datetime.fromisoformat(date_str)
417
418 return audiobook
419