music-assistant-server

15.1 KBPY
parsers.py
15.1 KB419 lines • python
1"""Parsing utilities to convert Spotify API responses into Music Assistant model objects."""
2
3from __future__ import annotations
4
5import contextlib
6from datetime import datetime
7from typing import TYPE_CHECKING, Any
8
9from music_assistant_models.enums import AlbumType, ContentType, ExternalID, ImageType
10from music_assistant_models.media_items import (
11    Album,
12    Artist,
13    Audiobook,
14    AudioFormat,
15    MediaItemImage,
16    Playlist,
17    Podcast,
18    PodcastEpisode,
19    ProviderMapping,
20    Track,
21)
22from music_assistant_models.unique_list import UniqueList
23
24from music_assistant.helpers.util import infer_album_type, parse_title_and_version
25
26if TYPE_CHECKING:
27    from .provider import SpotifyProvider
28
29
30def parse_images(
31    images_list: list[dict[str, Any]], instance_id: str, exclude_generic: bool = False
32) -> UniqueList[MediaItemImage]:
33    """Parse images list into MediaItemImage objects."""
34    if not images_list:
35        return UniqueList([])
36
37    # Filter out generic images if requested (for artists)
38    filtered_images = []
39    for img in images_list:
40        img_url = img["url"]
41        if exclude_generic and "2a96cbd8b46e442fc41c2b86b821562f" in img_url:
42            continue
43        filtered_images.append(img)
44
45    if not filtered_images:
46        return UniqueList([])
47
48    # Spotify images come in various sizes (typically 640x640, 300x300, 64x64)
49    # Find the largest image available
50    best_image = max(
51        filtered_images, key=lambda img: img.get("height", 0), default=filtered_images[0]
52    )
53
54    return UniqueList(
55        [
56            MediaItemImage(
57                type=ImageType.THUMB,
58                path=best_image["url"],
59                provider=instance_id,
60                remotely_accessible=True,
61            )
62        ]
63    )
64
65
66def parse_artist(artist_obj: dict[str, Any], provider: SpotifyProvider) -> Artist:
67    """Parse spotify artist object to generic layout."""
68    artist = Artist(
69        item_id=artist_obj["id"],
70        provider=provider.instance_id,
71        name=artist_obj["name"] or artist_obj["id"],
72        provider_mappings={
73            ProviderMapping(
74                item_id=artist_obj["id"],
75                provider_domain=provider.domain,
76                provider_instance=provider.instance_id,
77                url=artist_obj["external_urls"]["spotify"],
78            )
79        },
80    )
81    if "genres" in artist_obj:
82        artist.metadata.genres = set(artist_obj["genres"])
83
84    # Use unified image parsing with generic exclusion
85    artist.metadata.images = parse_images(
86        artist_obj.get("images", []), provider.instance_id, exclude_generic=True
87    )
88    return artist
89
90
91def parse_album(album_obj: dict[str, Any], provider: SpotifyProvider) -> Album:
92    """Parse spotify album object to generic layout."""
93    name, version = parse_title_and_version(album_obj["name"])
94    album = Album(
95        item_id=album_obj["id"],
96        provider=provider.instance_id,
97        name=name,
98        version=version,
99        provider_mappings={
100            ProviderMapping(
101                item_id=album_obj["id"],
102                provider_domain=provider.domain,
103                provider_instance=provider.instance_id,
104                audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
105                url=album_obj["external_urls"]["spotify"],
106            )
107        },
108    )
109    if "external_ids" in album_obj and album_obj["external_ids"].get("upc"):
110        album.external_ids.add((ExternalID.BARCODE, "0" + album_obj["external_ids"]["upc"]))
111    if "external_ids" in album_obj and album_obj["external_ids"].get("ean"):
112        album.external_ids.add((ExternalID.BARCODE, album_obj["external_ids"]["ean"]))
113
114    for artist_obj in album_obj["artists"]:
115        if not artist_obj.get("name") or not artist_obj.get("id"):
116            continue
117        album.artists.append(parse_artist(artist_obj, provider))
118
119    with contextlib.suppress(ValueError):
120        album.album_type = AlbumType(album_obj["album_type"])
121
122    # Override with inferred type if version indicates it
123    inferred_type = infer_album_type(album.name, album.version)
124    if inferred_type in (AlbumType.LIVE, AlbumType.SOUNDTRACK):
125        album.album_type = inferred_type
126
127    if "genres" in album_obj:
128        album.metadata.genres = set(album_obj["genres"])
129
130    album.metadata.images = parse_images(album_obj.get("images", []), provider.instance_id)
131
132    if "label" in album_obj:
133        album.metadata.label = album_obj["label"]
134    if album_obj.get("release_date"):
135        album.year = int(album_obj["release_date"].split("-")[0])
136    if album_obj.get("copyrights"):
137        album.metadata.copyright = album_obj["copyrights"][0]["text"]
138    if album_obj.get("explicit"):
139        album.metadata.explicit = album_obj["explicit"]
140    return album
141
142
143def parse_track(
144    track_obj: dict[str, Any],
145    provider: SpotifyProvider,
146    artist: Artist | None = None,
147) -> Track:
148    """Parse spotify track object to generic layout."""
149    name, version = parse_title_and_version(track_obj["name"])
150    track = Track(
151        item_id=track_obj["id"],
152        provider=provider.instance_id,
153        name=name,
154        version=version,
155        duration=track_obj["duration_ms"] / 1000,
156        provider_mappings={
157            ProviderMapping(
158                item_id=track_obj["id"],
159                provider_domain=provider.domain,
160                provider_instance=provider.instance_id,
161                audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
162                url=track_obj["external_urls"]["spotify"],
163                available=not track_obj["is_local"] and track_obj["is_playable"],
164            )
165        },
166        disc_number=track_obj.get("disc_number", 0),
167        track_number=track_obj.get("track_number", 0),
168    )
169    if isrc := track_obj.get("external_ids", {}).get("isrc"):
170        track.external_ids.add((ExternalID.ISRC, isrc))
171
172    if artist:
173        track.artists.append(artist)
174    for track_artist in track_obj.get("artists", []):
175        if not track_artist.get("name") or not track_artist.get("id"):
176            continue
177        artist_parsed = parse_artist(track_artist, provider)
178        if artist_parsed and artist_parsed.item_id not in {x.item_id for x in track.artists}:
179            track.artists.append(artist_parsed)
180
181    track.metadata.explicit = track_obj["explicit"]
182    if "preview_url" in track_obj:
183        track.metadata.preview = track_obj["preview_url"]
184    if "album" in track_obj:
185        track.album = parse_album(track_obj["album"], provider)
186        track.metadata.images = parse_images(
187            track_obj["album"].get("images", []), provider.instance_id
188        )
189    if track_obj.get("copyright"):
190        track.metadata.copyright = track_obj["copyright"]
191    if track_obj.get("explicit"):
192        track.metadata.explicit = True
193    if track_obj.get("popularity"):
194        track.metadata.popularity = track_obj["popularity"]
195    return track
196
197
198def parse_playlist(playlist_obj: dict[str, Any], provider: SpotifyProvider) -> Playlist:
199    """Parse spotify playlist object to generic layout."""
200    owner_id = playlist_obj["owner"].get("id", "")
201    is_editable = (
202        provider._sp_user is not None and owner_id == provider._sp_user["id"]
203    ) or playlist_obj["collaborative"]
204
205    # Spotify-owned playlists (Daily Mix, Discover Weekly, etc.) are personalized per user
206    is_spotify_owned = owner_id.lower() == "spotify"
207
208    # Get owner name with fallback
209    owner_name = playlist_obj["owner"].get("display_name")
210    if owner_name is None and provider._sp_user is not None:
211        owner_name = provider._sp_user["display_name"]
212
213    # Mark as unique if user-owned/editable OR if it's a Spotify personalized playlist
214    is_unique = is_editable or is_spotify_owned
215
216    playlist = Playlist(
217        item_id=playlist_obj["id"],
218        provider=provider.instance_id,
219        name=playlist_obj["name"],
220        owner=owner_name,
221        provider_mappings={
222            ProviderMapping(
223                item_id=playlist_obj["id"],
224                provider_domain=provider.domain,
225                provider_instance=provider.instance_id,
226                url=playlist_obj["external_urls"]["spotify"],
227                is_unique=is_unique,
228            )
229        },
230        is_editable=is_editable,
231    )
232
233    playlist.metadata.images = parse_images(playlist_obj.get("images", []), provider.instance_id)
234    return playlist
235
236
237def parse_podcast(podcast_obj: dict[str, Any], provider: SpotifyProvider) -> Podcast:
238    """Parse spotify podcast (show) object to generic layout."""
239    podcast = Podcast(
240        item_id=podcast_obj["id"],
241        provider=provider.instance_id,
242        name=podcast_obj["name"],
243        provider_mappings={
244            ProviderMapping(
245                item_id=podcast_obj["id"],
246                provider_domain=provider.domain,
247                provider_instance=provider.instance_id,
248                url=podcast_obj["external_urls"]["spotify"],
249            )
250        },
251        publisher=podcast_obj.get("publisher"),
252        total_episodes=podcast_obj.get("total_episodes"),
253    )
254
255    # Set metadata
256    if podcast_obj.get("description"):
257        podcast.metadata.description = podcast_obj["description"]
258
259    podcast.metadata.images = parse_images(podcast_obj.get("images", []), provider.instance_id)
260
261    if "explicit" in podcast_obj:
262        podcast.metadata.explicit = podcast_obj["explicit"]
263
264    # Convert languages list to genres for categorization
265    if "languages" in podcast_obj:
266        podcast.metadata.genres = set(podcast_obj["languages"])
267
268    return podcast
269
270
271def parse_podcast_episode(
272    episode_obj: dict[str, Any], provider: SpotifyProvider, podcast: Podcast | None = None
273) -> PodcastEpisode:
274    """Parse spotify podcast episode object to generic layout."""
275    # Get or create a basic podcast reference if not provided
276    if podcast is None and "show" in episode_obj:
277        podcast = Podcast(
278            item_id=episode_obj["show"]["id"],
279            provider=provider.instance_id,
280            name=episode_obj["show"]["name"],
281            provider_mappings={
282                ProviderMapping(
283                    item_id=episode_obj["show"]["id"],
284                    provider_domain=provider.domain,
285                    provider_instance=provider.instance_id,
286                    url=episode_obj["show"]["external_urls"]["spotify"],
287                )
288            },
289        )
290    elif podcast is None:
291        # Create a minimal podcast reference if none available
292        podcast = Podcast(
293            item_id="unknown",
294            provider=provider.instance_id,
295            name="Unknown Podcast",
296            provider_mappings=set(),
297        )
298
299    episode = PodcastEpisode(
300        item_id=episode_obj["id"],
301        provider=provider.instance_id,
302        name=episode_obj["name"],
303        duration=episode_obj["duration_ms"] // 1000 if episode_obj.get("duration_ms") else 0,
304        podcast=podcast,
305        position=0,
306        provider_mappings={
307            ProviderMapping(
308                item_id=episode_obj["id"],
309                provider_domain=provider.domain,
310                provider_instance=provider.instance_id,
311                audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=160),
312                url=episode_obj["external_urls"]["spotify"],
313            )
314        },
315    )
316
317    # Set description in metadata
318    if episode_obj.get("description"):
319        episode.metadata.description = episode_obj["description"]
320
321    # Add release date to metadata
322    if episode_obj.get("release_date"):
323        with contextlib.suppress(ValueError, TypeError):
324            date_str = episode_obj["release_date"].strip()
325
326            if len(date_str) == 4:
327                # Year only: "2023" -> "2023-01-01T00:00:00+00:00"
328                date_str = f"{date_str}-01-01T00:00:00+00:00"
329            elif len(date_str) == 10:
330                # Date only: "2023-12-25" -> "2023-12-25T00:00:00+00:00"
331                date_str = f"{date_str}T00:00:00+00:00"
332
333            episode.metadata.release_date = datetime.fromisoformat(date_str)
334
335    episode.metadata.images = parse_images(episode_obj.get("images", []), provider.instance_id)
336
337    # Use podcast artwork if episode has none
338    if not episode.metadata.images and isinstance(podcast, Podcast) and podcast.metadata.images:
339        episode.metadata.images = podcast.metadata.images
340
341    if "explicit" in episode_obj:
342        episode.metadata.explicit = episode_obj["explicit"]
343
344    if "audio_preview_url" in episode_obj:
345        episode.metadata.preview = episode_obj["audio_preview_url"]
346
347    return episode
348
349
350def parse_audiobook(audiobook_obj: dict[str, Any], provider: SpotifyProvider) -> Audiobook:
351    """Parse spotify audiobook object to generic layout."""
352    audiobook = Audiobook(
353        item_id=audiobook_obj["id"],
354        provider=provider.instance_id,
355        name=audiobook_obj["name"],
356        provider_mappings={
357            ProviderMapping(
358                item_id=audiobook_obj["id"],
359                provider_domain=provider.domain,
360                provider_instance=provider.instance_id,
361                audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
362                url=audiobook_obj["external_urls"]["spotify"],
363            )
364        },
365    )
366
367    if "duration_ms" in audiobook_obj:
368        provider.logger.debug(
369            f"Found duration_ms in audiobook object: {audiobook_obj['duration_ms']}"
370        )
371        audiobook.duration = audiobook_obj["duration_ms"] // 1000
372    else:
373        provider.logger.debug(
374            "No duration_ms found in main audiobook object - will calculate from chapters"
375        )
376        # Don't set duration here - let get_audiobook calculate it from chapters
377        audiobook.duration = 0
378
379    # Set authors
380    if "authors" in audiobook_obj:
381        for author_obj in audiobook_obj["authors"]:
382            if author_obj.get("name"):
383                audiobook.authors.append(author_obj["name"])
384
385    # Set narrators
386    if "narrators" in audiobook_obj:
387        for narrator_obj in audiobook_obj["narrators"]:
388            if narrator_obj.get("name"):
389                audiobook.narrators.append(narrator_obj["name"])
390
391    # Set metadata
392    if audiobook_obj.get("description"):
393        audiobook.metadata.description = audiobook_obj["description"]
394
395    if audiobook_obj.get("publisher"):
396        audiobook.publisher = audiobook_obj["publisher"]
397
398    audiobook.metadata.images = parse_images(audiobook_obj.get("images", []), provider.instance_id)
399
400    if audiobook_obj.get("explicit"):
401        audiobook.metadata.explicit = audiobook_obj["explicit"]
402
403    if audiobook_obj.get("languages"):
404        audiobook.metadata.languages = audiobook_obj["languages"][0]
405
406    # Set publication date if available
407    if audiobook_obj.get("publication_date"):
408        with contextlib.suppress(ValueError, TypeError):
409            date_str = audiobook_obj["publication_date"].strip()
410            if len(date_str) == 4:
411                # Year only: "2023" -> "2023-01-01T00:00:00+00:00"
412                date_str = f"{date_str}-01-01T00:00:00+00:00"
413            elif len(date_str) == 10:
414                # Date only: "2023-12-25" -> "2023-12-25T00:00:00+00:00"
415                date_str = f"{date_str}T00:00:00+00:00"
416            audiobook.metadata.release_date = datetime.fromisoformat(date_str)
417
418    return audiobook
419