/
/
/
1"""Helper functions for Phish.in provider."""
2
3from __future__ import annotations
4
5import contextlib
6from collections.abc import Callable
7from typing import TYPE_CHECKING, Any
8
9import aiohttp
10from music_assistant_models.enums import AlbumType, ContentType, ExternalID, ImageType, MediaType
11from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError
12from music_assistant_models.media_items import (
13 Album,
14 Artist,
15 AudioFormat,
16 ItemMapping,
17 MediaItemImage,
18 MediaItemMetadata,
19 Playlist,
20 ProviderMapping,
21 Track,
22)
23from music_assistant_models.unique_list import UniqueList
24
25from .constants import (
26 API_BASE_URL,
27 FALLBACK_ALBUM_IMAGE,
28 PHISH_ARTIST_ID,
29 PHISH_ARTIST_NAME,
30 PHISH_DISCOGS_ID,
31 PHISH_MUSICBRAINZ_ID,
32 PHISH_TADB_ID,
33 REQUEST_TIMEOUT,
34)
35
36if TYPE_CHECKING:
37 from music_assistant.models.music_provider import MusicProvider
38
39
40async def api_request(
41 provider: MusicProvider,
42 endpoint: str,
43 params: dict[str, Any] | None = None,
44) -> Any:
45 """Make an API request to Phish.in."""
46 url = f"{API_BASE_URL}{endpoint}"
47
48 try:
49 async with provider.mass.http_session.get(
50 url,
51 params=params,
52 timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT),
53 ) as response:
54 if response.status == 404:
55 raise MediaNotFoundError(f"Resource not found: {url}")
56 response.raise_for_status()
57 return await response.json()
58 except MediaNotFoundError:
59 raise
60 except aiohttp.ClientError as err:
61 provider.logger.error("API request failed for %s: %s", url, err)
62 raise ProviderUnavailableError(f"Phish.in API unavailable: {err}") from err
63
64
65def show_to_album(provider: MusicProvider, show_data: dict[str, Any]) -> Album:
66 """Convert a Phish.in show to a Music Assistant Album."""
67 show_date = show_data.get("date", "")
68 venue_data = show_data.get("venue", {})
69 venue_name = venue_data.get("name", "Unknown Venue")
70 location = venue_data.get("location", "")
71
72 album_name = f"{show_date} - {venue_name}"
73 if location:
74 album_name += f", {location}"
75
76 # Create metadata with image
77 album_cover_url = show_data.get("album_cover_url") or FALLBACK_ALBUM_IMAGE
78 metadata = MediaItemMetadata(
79 images=UniqueList(
80 [
81 MediaItemImage(
82 type=ImageType.THUMB,
83 path=album_cover_url,
84 provider=provider.instance_id,
85 remotely_accessible=True,
86 )
87 ]
88 )
89 )
90
91 # Parse year from date string (YYYY-MM-DD format)
92 year = None
93 if show_date and "-" in show_date:
94 with contextlib.suppress(ValueError, IndexError):
95 year = int(show_date.split("-")[0])
96
97 # Create details string for provider mapping
98 details_parts = [f"venue:{venue_name}"]
99 if location:
100 details_parts.append(f"location:{location}")
101 if show_data.get("duration"):
102 details_parts.append(f"duration:{show_data.get('duration')}")
103
104 audio_status = show_data.get("audio_status", "missing")
105 details_parts.append(f"audio_status:{audio_status}")
106
107 if show_data.get("tour_name"):
108 details_parts.append(f"tour:{show_data.get('tour_name')}")
109
110 # Create ItemMapping for Phish artist
111 phish_artist = ItemMapping(
112 item_id=PHISH_ARTIST_ID,
113 provider=provider.instance_id,
114 name=PHISH_ARTIST_NAME,
115 media_type=MediaType.ARTIST,
116 available=True,
117 )
118
119 return Album(
120 item_id=show_date,
121 provider=provider.instance_id,
122 name=album_name,
123 artists=UniqueList([phish_artist]),
124 year=year,
125 album_type=AlbumType.LIVE,
126 metadata=metadata,
127 provider_mappings={
128 ProviderMapping(
129 item_id=show_date,
130 provider_domain=provider.domain,
131 provider_instance=provider.instance_id,
132 available=audio_status in ["complete", "partial"],
133 audio_format=AudioFormat(content_type=ContentType.MP3),
134 details="|".join(details_parts),
135 )
136 },
137 )
138
139
140async def get_phish_artist(provider: MusicProvider) -> Artist:
141 """Get the main Phish artist object."""
142 artist = Artist(
143 item_id=PHISH_ARTIST_ID,
144 provider=provider.instance_id,
145 name=PHISH_ARTIST_NAME,
146 provider_mappings={
147 ProviderMapping(
148 item_id=PHISH_ARTIST_ID,
149 provider_domain=provider.domain,
150 provider_instance=provider.instance_id,
151 available=True,
152 )
153 },
154 )
155
156 # Add external IDs for metadata enrichment
157 artist.add_external_id(ExternalID.MB_ARTIST, PHISH_MUSICBRAINZ_ID)
158 artist.add_external_id(ExternalID.DISCOGS, PHISH_DISCOGS_ID)
159 artist.add_external_id(ExternalID.TADB, PHISH_TADB_ID)
160
161 return artist
162
163
164def _extract_version_from_title(full_title: str) -> tuple[str, str]:
165 """Extract song title and version from full title with performance indicators.
166
167 Returns:
168 Tuple of (clean_song_title, version_string)
169 """
170 song_title = full_title
171 version = None
172 performance_indicators = ["set", "soundcheck", "check", "encore"]
173
174 # Check for prefix: "(Check) Song Name"
175 if full_title.startswith("(") and ") " in full_title:
176 end_paren = full_title.index(") ")
177 prefix = full_title[1:end_paren]
178 if any(indicator in prefix.lower() for indicator in performance_indicators):
179 version = prefix
180 song_title = full_title[end_paren + 2 :]
181
182 # Check for suffix: "Song Name (Soundcheck)"
183 if " (" in song_title and song_title.endswith(")"):
184 base_title, suffix = song_title.rsplit(" (", 1)
185 suffix = suffix.rstrip(")")
186 if any(indicator in suffix.lower() for indicator in performance_indicators):
187 version = f"{version}, {suffix}" if version else suffix
188 song_title = base_title
189
190 return song_title, version or ""
191
192
193def _create_album_mapping(
194 provider: MusicProvider,
195 show_date: str,
196 show_data: dict[str, Any] | None,
197) -> ItemMapping | None:
198 """Create album ItemMapping with image for a track."""
199 if not show_date:
200 return None
201
202 venue_name = show_data.get("venue", {}).get("name", "") if show_data else ""
203
204 # Create the image for the album mapping
205 album_image = None
206 if show_data:
207 image_url = show_data.get("album_cover_url") or FALLBACK_ALBUM_IMAGE
208 album_image = MediaItemImage(
209 type=ImageType.THUMB,
210 path=image_url,
211 provider=provider.instance_id,
212 remotely_accessible=True,
213 )
214
215 return ItemMapping(
216 item_id=show_date,
217 provider=provider.instance_id,
218 name=f"{show_date} - {venue_name}" if venue_name else show_date,
219 media_type=MediaType.ALBUM,
220 available=True,
221 image=album_image,
222 )
223
224
225def _build_track_details(
226 track_data: dict[str, Any],
227 song_data: dict[str, Any],
228 show_date: str,
229 set_name: str,
230 venue_name: str,
231) -> str:
232 """Build details string for provider mapping."""
233 details_parts = [f"song_slug:{song_data.get('slug', '')}"]
234
235 if set_name:
236 details_parts.append(f"set_name:{set_name}")
237 if show_date:
238 details_parts.append(f"show_date:{show_date}")
239 if venue_name:
240 details_parts.append(f"venue:{venue_name}")
241 if track_data.get("tags"):
242 tag_names = [tag.get("name", "") for tag in track_data.get("tags", [])]
243 details_parts.append(f"tags:{','.join(tag_names)}")
244 if track_data.get("likes_count"):
245 details_parts.append(f"likes_count:{track_data.get('likes_count', 0)}")
246
247 return "|".join(details_parts)
248
249
250def track_to_ma_track(
251 provider: MusicProvider,
252 track_data: dict[str, Any],
253 show_data: dict[str, Any] | None = None,
254) -> Track:
255 """Convert a Phish.in track to a Music Assistant Track."""
256 track_id = str(track_data.get("id", ""))
257
258 # Extract song info and version
259 songs = track_data.get("songs", [])
260 song_data = songs[0] if songs else {}
261 full_title = track_data.get("title", "Unknown Song")
262 song_title, version = _extract_version_from_title(full_title)
263
264 # Extract basic track info
265 duration_ms = track_data.get("duration")
266 duration = int(duration_ms / 1000) if duration_ms else 0
267 position = track_data.get("position")
268 track_number = int(position) if position is not None else 0
269 set_name = track_data.get("set_name", "")
270
271 # Get show information
272 if show_data is None:
273 show_data = track_data.get("show", {})
274 show_date = show_data.get("date", "")
275 venue_name = show_data.get("venue", {}).get("name", "")
276
277 # Create artist mapping
278 phish_artist = ItemMapping(
279 item_id=PHISH_ARTIST_ID,
280 provider=provider.instance_id,
281 name=PHISH_ARTIST_NAME,
282 media_type=MediaType.ARTIST,
283 available=True,
284 )
285
286 # Create album mapping with image
287 album_mapping = _create_album_mapping(provider, show_date, show_data)
288
289 # Build details string
290 details = _build_track_details(track_data, song_data, show_date, set_name, venue_name)
291
292 # Create metadata with image
293 metadata = MediaItemMetadata()
294 if show_data:
295 image_url = show_data.get("album_cover_url")
296 if image_url:
297 metadata = MediaItemMetadata(
298 images=UniqueList(
299 [
300 MediaItemImage(
301 type=ImageType.THUMB,
302 path=image_url,
303 provider=provider.instance_id,
304 remotely_accessible=True,
305 )
306 ]
307 )
308 )
309
310 return Track(
311 item_id=track_id,
312 provider=provider.instance_id,
313 name=song_title,
314 version=version,
315 artists=UniqueList([phish_artist]),
316 album=album_mapping,
317 duration=duration,
318 track_number=track_number,
319 metadata=metadata,
320 provider_mappings={
321 ProviderMapping(
322 item_id=track_id,
323 provider_domain=provider.domain,
324 provider_instance=provider.instance_id,
325 available=bool(track_data.get("mp3_url")),
326 audio_format=AudioFormat(content_type=ContentType.MP3),
327 url=track_data.get("mp3_url"),
328 details=details,
329 )
330 },
331 )
332
333
334def playlist_to_ma_playlist(provider: MusicProvider, playlist_data: dict[str, Any]) -> Playlist:
335 """Convert phish.in playlist data to Music Assistant Playlist."""
336 playlist_id = str(playlist_data["id"])
337
338 metadata = MediaItemMetadata(
339 description=playlist_data.get("description"),
340 images=UniqueList(
341 [
342 MediaItemImage(
343 type=ImageType.THUMB,
344 path=FALLBACK_ALBUM_IMAGE,
345 provider=provider.instance_id,
346 remotely_accessible=True,
347 )
348 ]
349 ),
350 )
351
352 return Playlist(
353 item_id=playlist_id,
354 provider=provider.instance_id,
355 name=playlist_data.get("name", ""),
356 owner=playlist_data.get("username", ""),
357 is_editable=False,
358 metadata=metadata,
359 provider_mappings={
360 ProviderMapping(
361 item_id=playlist_id,
362 provider_domain=provider.domain,
363 provider_instance=provider.instance_id,
364 available=True,
365 )
366 },
367 )
368
369
370def get_main_artist_mapping(provider: MusicProvider) -> ProviderMapping:
371 """Get artist mapping for Phish."""
372 return ProviderMapping(
373 item_id=PHISH_ARTIST_ID,
374 provider_domain=provider.domain,
375 provider_instance=provider.instance_id,
376 available=True,
377 )
378
379
380def get_album_mapping(provider: MusicProvider, show_date: str) -> ProviderMapping:
381 """Get album mapping for a show date."""
382 return ProviderMapping(
383 item_id=show_date,
384 provider_domain=provider.domain,
385 provider_instance=provider.instance_id,
386 available=True,
387 )
388
389
390def parse_search_results(
391 provider: MusicProvider,
392 search_data: dict[str, Any],
393 media_types: list[MediaType],
394 search_query: str,
395) -> tuple[list[Artist], list[Album], list[Track], list[Playlist]]:
396 """Parse search results into MA media items."""
397 search_term = search_query.lower()
398
399 def contains_search_term(text: str | None) -> bool:
400 return search_term in text.lower() if text else False
401
402 def strip_performance_indicators(title: str) -> str:
403 """Strip performance indicators like (Set1), (Soundcheck), etc. from title."""
404 song_title = title
405 performance_indicators = ["set", "soundcheck", "check", "encore"]
406
407 # Check for prefix: "(Check) Song"
408 if song_title.startswith("(") and ") " in song_title:
409 end_paren = song_title.index(") ")
410 prefix = song_title[1:end_paren]
411 if any(indicator in prefix.lower() for indicator in performance_indicators):
412 song_title = song_title[end_paren + 2 :]
413
414 # Check for suffix: "Song (Set1)"
415 if " (" in song_title and song_title.endswith(")"):
416 base_title, suffix = song_title.rsplit(" (", 1)
417 suffix = suffix.rstrip(")")
418 if any(indicator in suffix.lower() for indicator in performance_indicators):
419 song_title = base_title
420
421 return song_title
422
423 artists: list[Artist] = _parse_artists(provider, media_types)
424 albums: list[Album] = _parse_albums(provider, search_data, media_types, contains_search_term)
425 tracks: list[Track] = _parse_tracks(
426 provider, search_data, media_types, contains_search_term, strip_performance_indicators
427 )
428 playlists: list[Playlist] = _parse_playlists(
429 provider, search_data, media_types, contains_search_term
430 )
431
432 return artists, albums, tracks, playlists
433
434
435def _parse_artists(provider: MusicProvider, media_types: list[MediaType]) -> list[Artist]:
436 """Parse artists from search results."""
437 artists: list[Artist] = []
438 if MediaType.ARTIST in media_types:
439 metadata = MediaItemMetadata(
440 images=UniqueList(
441 [
442 MediaItemImage(
443 type=ImageType.THUMB,
444 path=FALLBACK_ALBUM_IMAGE,
445 provider=provider.instance_id,
446 remotely_accessible=True,
447 )
448 ]
449 )
450 )
451
452 phish_artist_full = Artist(
453 item_id=PHISH_ARTIST_ID,
454 provider=provider.instance_id,
455 name=PHISH_ARTIST_NAME,
456 metadata=metadata,
457 provider_mappings={
458 ProviderMapping(
459 item_id=PHISH_ARTIST_ID,
460 provider_domain=provider.domain,
461 provider_instance=provider.instance_id,
462 available=True,
463 )
464 },
465 )
466 artists.append(phish_artist_full)
467
468 return artists
469
470
471def _parse_albums(
472 provider: MusicProvider,
473 search_data: dict[str, Any],
474 media_types: list[MediaType],
475 contains_search_term: Callable[[str | None], bool],
476) -> list[Album]:
477 """Parse albums from search results."""
478 albums: list[Album] = []
479 if MediaType.ALBUM not in media_types:
480 return albums
481
482 # Add exact show if present
483 if search_data.get("exact_show"):
484 show = search_data["exact_show"]
485 venue_name = show.get("venue_name", "")
486 if contains_search_term(venue_name):
487 albums.append(show_to_album(provider, show))
488
489 # Add other shows
490 for show in search_data.get("other_shows", []):
491 venue_name = show.get("venue_name", "")
492 if contains_search_term(venue_name):
493 albums.append(show_to_album(provider, show))
494
495 # Add venue shows (from additional API calls)
496 for show in search_data.get("venue_shows", []):
497 venue_name = show.get("venue_name", "")
498 if contains_search_term(venue_name):
499 albums.append(show_to_album(provider, show))
500
501 return albums
502
503
504def _parse_tracks(
505 provider: MusicProvider,
506 search_data: dict[str, Any],
507 media_types: list[MediaType],
508 contains_search_term: Callable[[str | None], bool],
509 strip_performance_indicators: Callable[[str], str],
510) -> list[Track]:
511 """Parse tracks from search results."""
512 tracks: list[Track] = []
513 if MediaType.TRACK not in media_types:
514 return tracks
515
516 for track_data in search_data.get("tracks", []):
517 full_title = track_data.get("title", "")
518 # Strip performance indicators to get base song name for matching
519 clean_title = strip_performance_indicators(full_title)
520
521 if contains_search_term(clean_title):
522 # Extract show data from track data for image
523 show_data = {
524 "date": track_data.get("show_date"),
525 "album_cover_url": track_data.get("show_album_cover_url"),
526 "venue": {"name": track_data.get("venue_name")},
527 }
528 tracks.append(track_to_ma_track(provider, track_data, show_data))
529
530 # Deduplicate by album - only return one track per show
531 seen_albums = set()
532 unique_tracks = []
533 for track in tracks:
534 album_id = track.album.item_id if track.album else None
535 if album_id and album_id not in seen_albums:
536 seen_albums.add(album_id)
537 unique_tracks.append(track)
538 elif not album_id:
539 unique_tracks.append(track)
540
541 return unique_tracks
542
543
544def _parse_playlists(
545 provider: MusicProvider,
546 search_data: dict[str, Any],
547 media_types: list[MediaType],
548 contains_search_term: Callable[[str | None], bool],
549) -> list[Playlist]:
550 """Parse playlists from search results."""
551 playlists: list[Playlist] = []
552 if MediaType.PLAYLIST in media_types:
553 for playlist_data in search_data.get("playlists", []):
554 playlist_name = playlist_data.get("name", "")
555 if contains_search_term(playlist_name):
556 playlists.append(playlist_to_ma_playlist(provider, playlist_data))
557
558 return playlists
559