music-assistant-server

18.1 KBPY
helpers.py
18.1 KB559 lines • python
1"""Helper functions for Phish.in provider."""
2
3from __future__ import annotations
4
5import contextlib
6from collections.abc import Callable
7from typing import TYPE_CHECKING, Any
8
9import aiohttp
10from music_assistant_models.enums import AlbumType, ContentType, ExternalID, ImageType, MediaType
11from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError
12from music_assistant_models.media_items import (
13    Album,
14    Artist,
15    AudioFormat,
16    ItemMapping,
17    MediaItemImage,
18    MediaItemMetadata,
19    Playlist,
20    ProviderMapping,
21    Track,
22)
23from music_assistant_models.unique_list import UniqueList
24
25from .constants import (
26    API_BASE_URL,
27    FALLBACK_ALBUM_IMAGE,
28    PHISH_ARTIST_ID,
29    PHISH_ARTIST_NAME,
30    PHISH_DISCOGS_ID,
31    PHISH_MUSICBRAINZ_ID,
32    PHISH_TADB_ID,
33    REQUEST_TIMEOUT,
34)
35
36if TYPE_CHECKING:
37    from music_assistant.models.music_provider import MusicProvider
38
39
40async def api_request(
41    provider: MusicProvider,
42    endpoint: str,
43    params: dict[str, Any] | None = None,
44) -> Any:
45    """Make an API request to Phish.in."""
46    url = f"{API_BASE_URL}{endpoint}"
47
48    try:
49        async with provider.mass.http_session.get(
50            url,
51            params=params,
52            timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT),
53        ) as response:
54            if response.status == 404:
55                raise MediaNotFoundError(f"Resource not found: {url}")
56            response.raise_for_status()
57            return await response.json()
58    except MediaNotFoundError:
59        raise
60    except aiohttp.ClientError as err:
61        provider.logger.error("API request failed for %s: %s", url, err)
62        raise ProviderUnavailableError(f"Phish.in API unavailable: {err}") from err
63
64
65def show_to_album(provider: MusicProvider, show_data: dict[str, Any]) -> Album:
66    """Convert a Phish.in show to a Music Assistant Album."""
67    show_date = show_data.get("date", "")
68    venue_data = show_data.get("venue", {})
69    venue_name = venue_data.get("name", "Unknown Venue")
70    location = venue_data.get("location", "")
71
72    album_name = f"{show_date} - {venue_name}"
73    if location:
74        album_name += f", {location}"
75
76    # Create metadata with image
77    album_cover_url = show_data.get("album_cover_url") or FALLBACK_ALBUM_IMAGE
78    metadata = MediaItemMetadata(
79        images=UniqueList(
80            [
81                MediaItemImage(
82                    type=ImageType.THUMB,
83                    path=album_cover_url,
84                    provider=provider.instance_id,
85                    remotely_accessible=True,
86                )
87            ]
88        )
89    )
90
91    # Parse year from date string (YYYY-MM-DD format)
92    year = None
93    if show_date and "-" in show_date:
94        with contextlib.suppress(ValueError, IndexError):
95            year = int(show_date.split("-")[0])
96
97    # Create details string for provider mapping
98    details_parts = [f"venue:{venue_name}"]
99    if location:
100        details_parts.append(f"location:{location}")
101    if show_data.get("duration"):
102        details_parts.append(f"duration:{show_data.get('duration')}")
103
104    audio_status = show_data.get("audio_status", "missing")
105    details_parts.append(f"audio_status:{audio_status}")
106
107    if show_data.get("tour_name"):
108        details_parts.append(f"tour:{show_data.get('tour_name')}")
109
110    # Create ItemMapping for Phish artist
111    phish_artist = ItemMapping(
112        item_id=PHISH_ARTIST_ID,
113        provider=provider.instance_id,
114        name=PHISH_ARTIST_NAME,
115        media_type=MediaType.ARTIST,
116        available=True,
117    )
118
119    return Album(
120        item_id=show_date,
121        provider=provider.instance_id,
122        name=album_name,
123        artists=UniqueList([phish_artist]),
124        year=year,
125        album_type=AlbumType.LIVE,
126        metadata=metadata,
127        provider_mappings={
128            ProviderMapping(
129                item_id=show_date,
130                provider_domain=provider.domain,
131                provider_instance=provider.instance_id,
132                available=audio_status in ["complete", "partial"],
133                audio_format=AudioFormat(content_type=ContentType.MP3),
134                details="|".join(details_parts),
135            )
136        },
137    )
138
139
140async def get_phish_artist(provider: MusicProvider) -> Artist:
141    """Get the main Phish artist object."""
142    artist = Artist(
143        item_id=PHISH_ARTIST_ID,
144        provider=provider.instance_id,
145        name=PHISH_ARTIST_NAME,
146        provider_mappings={
147            ProviderMapping(
148                item_id=PHISH_ARTIST_ID,
149                provider_domain=provider.domain,
150                provider_instance=provider.instance_id,
151                available=True,
152            )
153        },
154    )
155
156    # Add external IDs for metadata enrichment
157    artist.add_external_id(ExternalID.MB_ARTIST, PHISH_MUSICBRAINZ_ID)
158    artist.add_external_id(ExternalID.DISCOGS, PHISH_DISCOGS_ID)
159    artist.add_external_id(ExternalID.TADB, PHISH_TADB_ID)
160
161    return artist
162
163
164def _extract_version_from_title(full_title: str) -> tuple[str, str]:
165    """Extract song title and version from full title with performance indicators.
166
167    Returns:
168        Tuple of (clean_song_title, version_string)
169    """
170    song_title = full_title
171    version = None
172    performance_indicators = ["set", "soundcheck", "check", "encore"]
173
174    # Check for prefix: "(Check) Song Name"
175    if full_title.startswith("(") and ") " in full_title:
176        end_paren = full_title.index(") ")
177        prefix = full_title[1:end_paren]
178        if any(indicator in prefix.lower() for indicator in performance_indicators):
179            version = prefix
180            song_title = full_title[end_paren + 2 :]
181
182    # Check for suffix: "Song Name (Soundcheck)"
183    if " (" in song_title and song_title.endswith(")"):
184        base_title, suffix = song_title.rsplit(" (", 1)
185        suffix = suffix.rstrip(")")
186        if any(indicator in suffix.lower() for indicator in performance_indicators):
187            version = f"{version}, {suffix}" if version else suffix
188            song_title = base_title
189
190    return song_title, version or ""
191
192
193def _create_album_mapping(
194    provider: MusicProvider,
195    show_date: str,
196    show_data: dict[str, Any] | None,
197) -> ItemMapping | None:
198    """Create album ItemMapping with image for a track."""
199    if not show_date:
200        return None
201
202    venue_name = show_data.get("venue", {}).get("name", "") if show_data else ""
203
204    # Create the image for the album mapping
205    album_image = None
206    if show_data:
207        image_url = show_data.get("album_cover_url") or FALLBACK_ALBUM_IMAGE
208        album_image = MediaItemImage(
209            type=ImageType.THUMB,
210            path=image_url,
211            provider=provider.instance_id,
212            remotely_accessible=True,
213        )
214
215    return ItemMapping(
216        item_id=show_date,
217        provider=provider.instance_id,
218        name=f"{show_date} - {venue_name}" if venue_name else show_date,
219        media_type=MediaType.ALBUM,
220        available=True,
221        image=album_image,
222    )
223
224
225def _build_track_details(
226    track_data: dict[str, Any],
227    song_data: dict[str, Any],
228    show_date: str,
229    set_name: str,
230    venue_name: str,
231) -> str:
232    """Build details string for provider mapping."""
233    details_parts = [f"song_slug:{song_data.get('slug', '')}"]
234
235    if set_name:
236        details_parts.append(f"set_name:{set_name}")
237    if show_date:
238        details_parts.append(f"show_date:{show_date}")
239    if venue_name:
240        details_parts.append(f"venue:{venue_name}")
241    if track_data.get("tags"):
242        tag_names = [tag.get("name", "") for tag in track_data.get("tags", [])]
243        details_parts.append(f"tags:{','.join(tag_names)}")
244    if track_data.get("likes_count"):
245        details_parts.append(f"likes_count:{track_data.get('likes_count', 0)}")
246
247    return "|".join(details_parts)
248
249
250def track_to_ma_track(
251    provider: MusicProvider,
252    track_data: dict[str, Any],
253    show_data: dict[str, Any] | None = None,
254) -> Track:
255    """Convert a Phish.in track to a Music Assistant Track."""
256    track_id = str(track_data.get("id", ""))
257
258    # Extract song info and version
259    songs = track_data.get("songs", [])
260    song_data = songs[0] if songs else {}
261    full_title = track_data.get("title", "Unknown Song")
262    song_title, version = _extract_version_from_title(full_title)
263
264    # Extract basic track info
265    duration_ms = track_data.get("duration")
266    duration = int(duration_ms / 1000) if duration_ms else 0
267    position = track_data.get("position")
268    track_number = int(position) if position is not None else 0
269    set_name = track_data.get("set_name", "")
270
271    # Get show information
272    if show_data is None:
273        show_data = track_data.get("show", {})
274    show_date = show_data.get("date", "")
275    venue_name = show_data.get("venue", {}).get("name", "")
276
277    # Create artist mapping
278    phish_artist = ItemMapping(
279        item_id=PHISH_ARTIST_ID,
280        provider=provider.instance_id,
281        name=PHISH_ARTIST_NAME,
282        media_type=MediaType.ARTIST,
283        available=True,
284    )
285
286    # Create album mapping with image
287    album_mapping = _create_album_mapping(provider, show_date, show_data)
288
289    # Build details string
290    details = _build_track_details(track_data, song_data, show_date, set_name, venue_name)
291
292    # Create metadata with image
293    metadata = MediaItemMetadata()
294    if show_data:
295        image_url = show_data.get("album_cover_url")
296        if image_url:
297            metadata = MediaItemMetadata(
298                images=UniqueList(
299                    [
300                        MediaItemImage(
301                            type=ImageType.THUMB,
302                            path=image_url,
303                            provider=provider.instance_id,
304                            remotely_accessible=True,
305                        )
306                    ]
307                )
308            )
309
310    return Track(
311        item_id=track_id,
312        provider=provider.instance_id,
313        name=song_title,
314        version=version,
315        artists=UniqueList([phish_artist]),
316        album=album_mapping,
317        duration=duration,
318        track_number=track_number,
319        metadata=metadata,
320        provider_mappings={
321            ProviderMapping(
322                item_id=track_id,
323                provider_domain=provider.domain,
324                provider_instance=provider.instance_id,
325                available=bool(track_data.get("mp3_url")),
326                audio_format=AudioFormat(content_type=ContentType.MP3),
327                url=track_data.get("mp3_url"),
328                details=details,
329            )
330        },
331    )
332
333
334def playlist_to_ma_playlist(provider: MusicProvider, playlist_data: dict[str, Any]) -> Playlist:
335    """Convert phish.in playlist data to Music Assistant Playlist."""
336    playlist_id = str(playlist_data["id"])
337
338    metadata = MediaItemMetadata(
339        description=playlist_data.get("description"),
340        images=UniqueList(
341            [
342                MediaItemImage(
343                    type=ImageType.THUMB,
344                    path=FALLBACK_ALBUM_IMAGE,
345                    provider=provider.instance_id,
346                    remotely_accessible=True,
347                )
348            ]
349        ),
350    )
351
352    return Playlist(
353        item_id=playlist_id,
354        provider=provider.instance_id,
355        name=playlist_data.get("name", ""),
356        owner=playlist_data.get("username", ""),
357        is_editable=False,
358        metadata=metadata,
359        provider_mappings={
360            ProviderMapping(
361                item_id=playlist_id,
362                provider_domain=provider.domain,
363                provider_instance=provider.instance_id,
364                available=True,
365            )
366        },
367    )
368
369
370def get_main_artist_mapping(provider: MusicProvider) -> ProviderMapping:
371    """Get artist mapping for Phish."""
372    return ProviderMapping(
373        item_id=PHISH_ARTIST_ID,
374        provider_domain=provider.domain,
375        provider_instance=provider.instance_id,
376        available=True,
377    )
378
379
380def get_album_mapping(provider: MusicProvider, show_date: str) -> ProviderMapping:
381    """Get album mapping for a show date."""
382    return ProviderMapping(
383        item_id=show_date,
384        provider_domain=provider.domain,
385        provider_instance=provider.instance_id,
386        available=True,
387    )
388
389
390def parse_search_results(
391    provider: MusicProvider,
392    search_data: dict[str, Any],
393    media_types: list[MediaType],
394    search_query: str,
395) -> tuple[list[Artist], list[Album], list[Track], list[Playlist]]:
396    """Parse search results into MA media items."""
397    search_term = search_query.lower()
398
399    def contains_search_term(text: str | None) -> bool:
400        return search_term in text.lower() if text else False
401
402    def strip_performance_indicators(title: str) -> str:
403        """Strip performance indicators like (Set1), (Soundcheck), etc. from title."""
404        song_title = title
405        performance_indicators = ["set", "soundcheck", "check", "encore"]
406
407        # Check for prefix: "(Check) Song"
408        if song_title.startswith("(") and ") " in song_title:
409            end_paren = song_title.index(") ")
410            prefix = song_title[1:end_paren]
411            if any(indicator in prefix.lower() for indicator in performance_indicators):
412                song_title = song_title[end_paren + 2 :]
413
414        # Check for suffix: "Song (Set1)"
415        if " (" in song_title and song_title.endswith(")"):
416            base_title, suffix = song_title.rsplit(" (", 1)
417            suffix = suffix.rstrip(")")
418            if any(indicator in suffix.lower() for indicator in performance_indicators):
419                song_title = base_title
420
421        return song_title
422
423    artists: list[Artist] = _parse_artists(provider, media_types)
424    albums: list[Album] = _parse_albums(provider, search_data, media_types, contains_search_term)
425    tracks: list[Track] = _parse_tracks(
426        provider, search_data, media_types, contains_search_term, strip_performance_indicators
427    )
428    playlists: list[Playlist] = _parse_playlists(
429        provider, search_data, media_types, contains_search_term
430    )
431
432    return artists, albums, tracks, playlists
433
434
435def _parse_artists(provider: MusicProvider, media_types: list[MediaType]) -> list[Artist]:
436    """Parse artists from search results."""
437    artists: list[Artist] = []
438    if MediaType.ARTIST in media_types:
439        metadata = MediaItemMetadata(
440            images=UniqueList(
441                [
442                    MediaItemImage(
443                        type=ImageType.THUMB,
444                        path=FALLBACK_ALBUM_IMAGE,
445                        provider=provider.instance_id,
446                        remotely_accessible=True,
447                    )
448                ]
449            )
450        )
451
452        phish_artist_full = Artist(
453            item_id=PHISH_ARTIST_ID,
454            provider=provider.instance_id,
455            name=PHISH_ARTIST_NAME,
456            metadata=metadata,
457            provider_mappings={
458                ProviderMapping(
459                    item_id=PHISH_ARTIST_ID,
460                    provider_domain=provider.domain,
461                    provider_instance=provider.instance_id,
462                    available=True,
463                )
464            },
465        )
466        artists.append(phish_artist_full)
467
468    return artists
469
470
471def _parse_albums(
472    provider: MusicProvider,
473    search_data: dict[str, Any],
474    media_types: list[MediaType],
475    contains_search_term: Callable[[str | None], bool],
476) -> list[Album]:
477    """Parse albums from search results."""
478    albums: list[Album] = []
479    if MediaType.ALBUM not in media_types:
480        return albums
481
482    # Add exact show if present
483    if search_data.get("exact_show"):
484        show = search_data["exact_show"]
485        venue_name = show.get("venue_name", "")
486        if contains_search_term(venue_name):
487            albums.append(show_to_album(provider, show))
488
489    # Add other shows
490    for show in search_data.get("other_shows", []):
491        venue_name = show.get("venue_name", "")
492        if contains_search_term(venue_name):
493            albums.append(show_to_album(provider, show))
494
495    # Add venue shows (from additional API calls)
496    for show in search_data.get("venue_shows", []):
497        venue_name = show.get("venue_name", "")
498        if contains_search_term(venue_name):
499            albums.append(show_to_album(provider, show))
500
501    return albums
502
503
504def _parse_tracks(
505    provider: MusicProvider,
506    search_data: dict[str, Any],
507    media_types: list[MediaType],
508    contains_search_term: Callable[[str | None], bool],
509    strip_performance_indicators: Callable[[str], str],
510) -> list[Track]:
511    """Parse tracks from search results."""
512    tracks: list[Track] = []
513    if MediaType.TRACK not in media_types:
514        return tracks
515
516    for track_data in search_data.get("tracks", []):
517        full_title = track_data.get("title", "")
518        # Strip performance indicators to get base song name for matching
519        clean_title = strip_performance_indicators(full_title)
520
521        if contains_search_term(clean_title):
522            # Extract show data from track data for image
523            show_data = {
524                "date": track_data.get("show_date"),
525                "album_cover_url": track_data.get("show_album_cover_url"),
526                "venue": {"name": track_data.get("venue_name")},
527            }
528            tracks.append(track_to_ma_track(provider, track_data, show_data))
529
530    # Deduplicate by album - only return one track per show
531    seen_albums = set()
532    unique_tracks = []
533    for track in tracks:
534        album_id = track.album.item_id if track.album else None
535        if album_id and album_id not in seen_albums:
536            seen_albums.add(album_id)
537            unique_tracks.append(track)
538        elif not album_id:
539            unique_tracks.append(track)
540
541    return unique_tracks
542
543
544def _parse_playlists(
545    provider: MusicProvider,
546    search_data: dict[str, Any],
547    media_types: list[MediaType],
548    contains_search_term: Callable[[str | None], bool],
549) -> list[Playlist]:
550    """Parse playlists from search results."""
551    playlists: list[Playlist] = []
552    if MediaType.PLAYLIST in media_types:
553        for playlist_data in search_data.get("playlists", []):
554            playlist_name = playlist_data.get("name", "")
555            if contains_search_term(playlist_name):
556                playlists.append(playlist_to_ma_playlist(provider, playlist_data))
557
558    return playlists
559