/
/
/
1"""Phish.in Music Provider for Music Assistant."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator
6from datetime import datetime
7from typing import TYPE_CHECKING, Any
8
9from music_assistant_models.enums import (
10 ContentType,
11 ImageType,
12 MediaType,
13 StreamType,
14)
15from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError
16from music_assistant_models.media_items import (
17 Album,
18 Artist,
19 AudioFormat,
20 BrowseFolder,
21 ItemMapping,
22 MediaItemImage,
23 MediaItemMetadata,
24 Playlist,
25 ProviderMapping,
26 SearchResults,
27 Track,
28)
29from music_assistant_models.streamdetails import StreamDetails
30from music_assistant_models.unique_list import UniqueList
31
32from music_assistant.controllers.cache import use_cache
33from music_assistant.models.music_provider import MusicProvider
34
35from .constants import (
36 ENDPOINTS,
37 FALLBACK_ALBUM_IMAGE,
38 MAX_SEARCH_RESULTS,
39 PHISH_ARTIST_ID,
40)
41from .helpers import (
42 api_request,
43 get_phish_artist,
44 parse_search_results,
45 show_to_album,
46 track_to_ma_track,
47)
48
49if TYPE_CHECKING:
50 from collections.abc import Sequence
51
52 from music_assistant_models.media_items import MediaItemType
53
54
55class PhishInProvider(MusicProvider):
56 """Phish.in music provider."""
57
58 @property
59 def is_streaming_provider(self) -> bool:
60 """Return True if the provider is a streaming provider."""
61 return True
62
63 async def search(
64 self,
65 search_query: str,
66 media_types: list[MediaType],
67 limit: int = MAX_SEARCH_RESULTS,
68 ) -> SearchResults:
69 """Perform search on Phish.in."""
70 # Handle "Artist - Track" format by extracting just the track name
71 if " - " in search_query:
72 parts = search_query.split(" - ", 1)
73 if parts[0].strip().lower() in ["phish", "the phish"]:
74 search_query = parts[1].strip()
75
76 if len(search_query.strip()) < 3:
77 return SearchResults()
78
79 try:
80 endpoint = ENDPOINTS["search"].format(term=search_query)
81 search_data = await api_request(
82 self, endpoint, params={"audio_status": "complete_or_partial"}
83 )
84
85 # If we got song matches, fetch all performances of those songs
86 if MediaType.TRACK in media_types and search_data.get("songs"):
87 all_track_results = []
88 for song in search_data.get("songs", [])[:3]: # Limit to first 3 songs
89 song_slug = song.get("slug")
90 if song_slug:
91 tracks_data = await api_request(
92 self,
93 "/tracks",
94 params={
95 "song_slug": song_slug,
96 "audio_status": "complete_or_partial",
97 "per_page": limit,
98 "sort": "likes_count:desc",
99 },
100 )
101 all_track_results.extend(tracks_data.get("tracks", []))
102
103 # Replace with comprehensive song_slug results
104 if all_track_results:
105 search_data["tracks"] = all_track_results[:limit]
106
107 # Handle venue album searches
108 if MediaType.ALBUM in media_types and search_data.get("venues"):
109 venue_shows: list[dict[str, Any]] = []
110 for venue in search_data.get("venues", []):
111 venue_slug = venue["slug"]
112 page = 1
113 while len(venue_shows) < limit:
114 shows_data = await api_request(
115 self, "/shows", params={"venue_slug": venue_slug, "page": page}
116 )
117 shows_on_page = shows_data.get("shows", [])
118 if not shows_on_page:
119 break
120 remaining_slots = limit - len(venue_shows)
121 venue_shows.extend(shows_on_page[:remaining_slots])
122 current_page = shows_data.get("current_page", 1)
123 total_pages = shows_data.get("total_pages", 1)
124 if current_page >= total_pages or len(venue_shows) >= limit:
125 break
126 page += 1
127 if venue_shows:
128 search_data["venue_shows"] = venue_shows
129
130 artists, albums, tracks, playlists = parse_search_results(
131 self, search_data, media_types, search_query.lower()
132 )
133
134 return SearchResults(
135 artists=artists[:limit] if MediaType.ARTIST in media_types else [],
136 albums=albums[:limit] if MediaType.ALBUM in media_types else [],
137 tracks=tracks[:limit] if MediaType.TRACK in media_types else [],
138 playlists=playlists[:limit] if MediaType.PLAYLIST in media_types else [],
139 )
140 except MediaNotFoundError:
141 raise
142 except Exception as err:
143 self.logger.error("Search failed for query '%s': %s", search_query, err)
144 raise ProviderUnavailableError(f"Search error: {err}") from err
145
146 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
147 """Retrieve library artists from the provider."""
148 yield await get_phish_artist(self)
149
150 async def get_artist(self, prov_artist_id: str) -> Artist:
151 """Get full artist details by id."""
152 if prov_artist_id == PHISH_ARTIST_ID:
153 return await get_phish_artist(self)
154 raise MediaNotFoundError(f"Artist {prov_artist_id} not found")
155
156 @use_cache(expiration=86400) # 24 hours - albums (ie. shows) could update daily
157 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
158 """Get a list of all albums for the given artist."""
159 if prov_artist_id != PHISH_ARTIST_ID:
160 raise MediaNotFoundError(f"Artist {prov_artist_id} not found")
161
162 albums = []
163 page = 1
164 per_page = 750 # Phish.in limit is 1000 but this caused asyncio warnings
165
166 try:
167 while True:
168 shows_data = await api_request(
169 self,
170 ENDPOINTS["shows"],
171 params={
172 "page": page,
173 "per_page": per_page,
174 "audio_status": "complete_or_partial",
175 },
176 )
177
178 shows = shows_data.get("shows", [])
179 if not shows:
180 break
181
182 for show in shows:
183 if show.get("audio_status") in ["complete", "partial"]:
184 albums.append(show_to_album(self, show))
185
186 if len(shows) < per_page:
187 break
188
189 page += 1
190
191 return albums
192
193 except (MediaNotFoundError, ProviderUnavailableError):
194 raise
195 except Exception as err:
196 self.logger.error("Failed to get artist albums: %s", err)
197 raise ProviderUnavailableError(f"Artist albums error: {err}") from err
198
199 @use_cache(expiration=2592000) # 30 days - Top tracks won't change that often as its voted on
200 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
201 """Get a list of most popular tracks for the given artist."""
202 if prov_artist_id != PHISH_ARTIST_ID:
203 raise MediaNotFoundError(f"Artist {prov_artist_id} not found")
204
205 try:
206 all_tracks: list[Track] = []
207 page = 1
208 max_pages = 5 # 2500 tracks max for UI performance
209
210 while len(all_tracks) < (max_pages * 500) and page <= max_pages:
211 tracks_data = await api_request(
212 self,
213 ENDPOINTS["tracks"],
214 params={
215 "page": page,
216 "per_page": 500,
217 "sort": "likes_count:desc",
218 "audio_status": "complete_or_partial",
219 },
220 )
221
222 tracks_on_page = tracks_data.get("tracks", [])
223 if not tracks_on_page:
224 break
225
226 for track_data in tracks_on_page:
227 show_data = {
228 "date": track_data.get("show_date"),
229 "album_cover_url": track_data.get("show_album_cover_url"),
230 "venue": {"name": track_data.get("venue_name")},
231 }
232 track = track_to_ma_track(self, track_data, show_data)
233 all_tracks.append(track)
234
235 if len(tracks_on_page) < 50:
236 break
237
238 page += 1
239
240 return all_tracks
241
242 except (MediaNotFoundError, ProviderUnavailableError):
243 raise
244 except Exception as err:
245 self.logger.error("Failed to get artist top tracks: %s", err)
246 raise ProviderUnavailableError(f"Top tracks error: {err}") from err
247
248 @use_cache(expiration=2592000) # 30 days - Show details from specific dates never change
249 async def get_album(self, prov_album_id: str) -> Album:
250 """Get full album details by id (show date)."""
251 try:
252 endpoint = ENDPOINTS["show_by_date"].format(date=prov_album_id)
253 show_data = await api_request(self, endpoint)
254
255 if not show_data:
256 raise MediaNotFoundError(f"Show {prov_album_id} not found")
257
258 return show_to_album(self, show_data)
259
260 except MediaNotFoundError:
261 raise
262 except Exception as err:
263 self.logger.error("Failed to get album %s: %s", prov_album_id, err)
264 raise ProviderUnavailableError(f"Album error: {err}") from err
265
266 @use_cache(expiration=2592000) # 30 days - Individual tracks never change once recorded
267 async def get_track(self, prov_track_id: str) -> Track:
268 """Get full track details by id."""
269 try:
270 endpoint = ENDPOINTS["track_by_id"].format(id=prov_track_id)
271 track_data = await api_request(self, endpoint)
272
273 if not track_data:
274 raise MediaNotFoundError(f"Track {prov_track_id} not found")
275
276 # Extract show data from the track response
277 show_data = track_data.get("show")
278
279 return track_to_ma_track(self, track_data, show_data)
280
281 except MediaNotFoundError:
282 raise
283 except Exception as err:
284 self.logger.error("Failed to get track %s: %s", prov_track_id, err)
285 raise ProviderUnavailableError(f"Track error: {err}") from err
286
287 @use_cache(expiration=2592000) # 30 days - Track listings for historical shows never change
288 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
289 """Get album tracks for given album id (show date)."""
290 try:
291 endpoint = ENDPOINTS["show_by_date"].format(date=prov_album_id)
292 show_data = await api_request(self, endpoint)
293
294 if not show_data:
295 raise MediaNotFoundError(f"Show {prov_album_id} not found")
296
297 tracks = []
298 for track_data in show_data.get("tracks", []):
299 track = track_to_ma_track(self, track_data, show_data)
300 tracks.append(track)
301
302 return tracks
303
304 except MediaNotFoundError:
305 raise
306 except Exception as err:
307 self.logger.error("Failed to get album tracks for %s: %s", prov_album_id, err)
308 raise ProviderUnavailableError(f"Album tracks error: {err}") from err
309
310 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
311 """Get streamdetails for a track."""
312 if media_type != MediaType.TRACK:
313 raise MediaNotFoundError(f"Streaming not supported for {media_type}")
314
315 try:
316 track = await self.get_track(item_id)
317
318 mp3_url = None
319 for mapping in track.provider_mappings:
320 if mapping.provider_instance == self.instance_id and mapping.url:
321 mp3_url = mapping.url
322 break
323
324 if not mp3_url:
325 raise MediaNotFoundError(f"No audio URL found for track {item_id}")
326
327 return StreamDetails(
328 provider=self.instance_id,
329 item_id=item_id,
330 audio_format=AudioFormat(
331 content_type=ContentType.MP3,
332 sample_rate=44100,
333 bit_depth=16,
334 channels=2,
335 ),
336 media_type=MediaType.TRACK,
337 stream_type=StreamType.HTTP,
338 path=mp3_url,
339 allow_seek=True,
340 can_seek=True,
341 )
342
343 except (MediaNotFoundError, ProviderUnavailableError):
344 raise
345 except Exception as err:
346 self.logger.error("Failed to get stream details for %s: %s", item_id, err)
347 raise ProviderUnavailableError(f"Stream error: {err}") from err
348
349 @use_cache(expiration=86400) # 24 hours - Current year gets new shows added throughout the year
350 async def _get_years_data(self) -> Any:
351 """Get years data with caching."""
352 return await api_request(self, ENDPOINTS["years"])
353
354 @use_cache(expiration=86400) # 24 hours - recent shows could update daily
355 async def _get_recent_shows(self) -> Any:
356 """Get recent shows with caching."""
357 return await api_request(
358 self,
359 ENDPOINTS["shows"],
360 params={"per_page": 20, "sort": "date:desc", "audio_status": "complete_or_partial"},
361 )
362
363 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
364 """Retrieve library playlists from the provider."""
365 try:
366 playlists_data = await api_request(
367 self, ENDPOINTS["playlists"], params={"per_page": 100, "sort": "likes_count:desc"}
368 )
369
370 for playlist_data in playlists_data.get("playlists", []):
371 track_count = playlist_data.get("tracks_count", 0)
372 if track_count > 0:
373 playlist_id = str(playlist_data.get("id"))
374
375 metadata = MediaItemMetadata(
376 images=UniqueList(
377 [
378 MediaItemImage(
379 type=ImageType.THUMB,
380 path=FALLBACK_ALBUM_IMAGE,
381 provider=self.instance_id,
382 remotely_accessible=True,
383 )
384 ]
385 )
386 )
387 yield Playlist(
388 item_id=playlist_id,
389 provider=self.instance_id,
390 name=playlist_data.get("name", ""),
391 owner=playlist_data.get("username", ""),
392 is_editable=False,
393 metadata=metadata,
394 provider_mappings={
395 ProviderMapping(
396 item_id=playlist_id,
397 provider_domain=self.domain,
398 provider_instance=self.instance_id,
399 available=True,
400 )
401 },
402 )
403 except (MediaNotFoundError, ProviderUnavailableError):
404 raise
405 except Exception as err:
406 self.logger.error("Failed to get library playlists: %s", err)
407 raise ProviderUnavailableError(f"Library playlists error: {err}") from err
408
409 @use_cache(expiration=86400) # 24 hours - Playlist metadata might be updated by users
410 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
411 """Get full playlist details by id."""
412 try:
413 playlists_data = await api_request(self, ENDPOINTS["playlists"])
414 playlist_slug = None
415 playlist_info = None
416
417 for playlist in playlists_data.get("playlists", []):
418 if str(playlist.get("id")) == prov_playlist_id:
419 playlist_slug = playlist.get("slug")
420 playlist_info = playlist
421 break
422
423 if not playlist_slug or not playlist_info:
424 raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found")
425
426 return Playlist(
427 item_id=prov_playlist_id,
428 provider=self.instance_id,
429 name=playlist_info.get("name", ""),
430 owner=playlist_info.get("username", ""),
431 is_editable=False,
432 provider_mappings={
433 ProviderMapping(
434 item_id=prov_playlist_id,
435 provider_domain=self.domain,
436 provider_instance=self.instance_id,
437 available=True,
438 )
439 },
440 )
441
442 except MediaNotFoundError:
443 raise
444 except Exception as err:
445 self.logger.error("Failed to get playlist %s: %s", prov_playlist_id, err)
446 raise ProviderUnavailableError(f"Playlist error: {err}") from err
447
448 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
449 """Get playlist tracks for given playlist id."""
450 if page > 0:
451 return []
452 try:
453 playlists_data = await api_request(self, ENDPOINTS["playlists"])
454 playlist_slug = None
455
456 for playlist in playlists_data.get("playlists", []):
457 if str(playlist.get("id")) == prov_playlist_id:
458 playlist_slug = playlist.get("slug")
459 break
460
461 if not playlist_slug:
462 return []
463
464 playlist_data = await api_request(
465 self, ENDPOINTS["playlist_by_slug"].format(slug=playlist_slug)
466 )
467
468 all_tracks = []
469 for entry in playlist_data.get("entries", []):
470 track_data = entry.get("track")
471 if track_data and track_data.get("mp3_url"):
472 track = track_to_ma_track(self, track_data)
473 all_tracks.append(track)
474
475 return all_tracks
476
477 except (MediaNotFoundError, ProviderUnavailableError):
478 raise
479 except Exception as err:
480 self.logger.error("Failed to get playlist tracks for %s: %s", prov_playlist_id, err)
481 raise ProviderUnavailableError(f"Playlist tracks error: {err}") from err
482
483 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
484 """Browse this provider's items."""
485 path_parts = [] if "://" not in path else path.split("://")[1].split("/")
486 subpath = path_parts[0] if path_parts else ""
487 subsubpath = "/".join(path_parts[1:]) if len(path_parts) > 1 else ""
488
489 if not subpath:
490 return self._browse_root(path)
491
492 if subpath == "playlists":
493 playlists = []
494 async for playlist in self.get_library_playlists():
495 playlists.append(playlist)
496 if len(playlists) >= 50:
497 break
498 return playlists
499 if subpath == "years":
500 return await self._browse_years(path, subsubpath)
501 if subpath == "recent":
502 return await self._browse_recent()
503 if subpath == "random":
504 return await self._browse_random()
505 if subpath == "today":
506 return await self._browse_today()
507 if subpath == "venues":
508 return await self._browse_venues(path, subsubpath)
509 if subpath == "tags":
510 return await self._browse_tags(path, subsubpath)
511 if subpath == "top_shows":
512 return await self._browse_top_shows()
513 if subpath == "top_tracks":
514 return await self._browse_top_tracks()
515
516 return []
517
518 def _browse_root(self, path: str) -> list[BrowseFolder]:
519 """Root level browse options."""
520 return [
521 BrowseFolder(
522 item_id="years",
523 provider=self.domain,
524 path=path + "years",
525 name="Browse by Year",
526 ),
527 BrowseFolder(
528 item_id="today",
529 provider=self.domain,
530 path=path + "today",
531 name="This Day in Phish History",
532 ),
533 BrowseFolder(
534 item_id="recent",
535 provider=self.domain,
536 path=path + "recent",
537 name="Recent Shows",
538 ),
539 BrowseFolder(
540 item_id="venues",
541 provider=self.domain,
542 path=path + "venues",
543 name="Browse by Venue",
544 ),
545 BrowseFolder(
546 item_id="tags",
547 provider=self.domain,
548 path=path + "tags",
549 name="Browse by Tag",
550 ),
551 BrowseFolder(
552 item_id="playlists",
553 provider=self.domain,
554 path=path + "playlists",
555 name="User Playlists",
556 ),
557 BrowseFolder(
558 item_id="top_shows",
559 provider=self.domain,
560 path=path + "top_shows",
561 name="Top 46 Shows",
562 ),
563 BrowseFolder(
564 item_id="top_tracks",
565 provider=self.domain,
566 path=path + "top_tracks",
567 name="Top 46 Tracks",
568 ),
569 BrowseFolder(
570 item_id="random",
571 provider=self.domain,
572 path=path + "random",
573 name="Random Show",
574 ),
575 ]
576
577 async def _browse_years(self, path: str, subsubpath: str) -> list[BrowseFolder | Album]:
578 """Browse shows by year/period."""
579 if not subsubpath:
580 try:
581 years_data = await self._get_years_data()
582 folders: list[BrowseFolder | Album] = []
583
584 for year_data in years_data:
585 period = year_data.get("period")
586 show_count = year_data.get("shows_count", 0)
587 if period and show_count > 0:
588 folders.append(
589 BrowseFolder(
590 item_id=f"period_{period}",
591 provider=self.domain,
592 path=f"phishin://years/{period}",
593 name=f"{period} ({show_count} shows)",
594 )
595 )
596
597 return sorted(folders, key=lambda x: x.name, reverse=True)
598
599 except (MediaNotFoundError, ProviderUnavailableError):
600 raise
601 except Exception as err:
602 self.logger.error("Failed to browse years: %s", err)
603 raise ProviderUnavailableError(f"Browse years error: {err}") from err
604 else:
605 return await self._get_shows_for_period(subsubpath)
606
607 async def _browse_recent(self) -> list[Album]:
608 """Get recent shows."""
609 try:
610 shows_data = await self._get_recent_shows()
611 albums: list[Album] = []
612
613 for show in shows_data.get("shows", []):
614 if show.get("audio_status") in ["complete", "partial"]:
615 album = show_to_album(self, show)
616 albums.append(album)
617
618 return albums
619
620 except (MediaNotFoundError, ProviderUnavailableError):
621 raise
622 except Exception as err:
623 self.logger.error("Failed to browse recent shows: %s", err)
624 raise ProviderUnavailableError(f"Browse recent error: {err}") from err
625
626 async def _browse_random(self) -> list[Album]:
627 """Get a random show."""
628 try:
629 show_data = await api_request(self, ENDPOINTS["random_show"])
630 if show_data and show_data.get("audio_status") in ["complete", "partial"]:
631 album = show_to_album(self, show_data)
632 return [album]
633 return []
634
635 except (MediaNotFoundError, ProviderUnavailableError):
636 raise
637 except Exception as err:
638 self.logger.error("Failed to get random show: %s", err)
639 raise ProviderUnavailableError(f"Random show error: {err}") from err
640
641 @use_cache(expiration=21600) # 6 hours - today's shows are historical but queried daily
642 async def _browse_today(self) -> list[Album]:
643 """Get shows that happened on this day in history."""
644 try:
645 today = datetime.now()
646 target_date = today.strftime("%Y-%m-%d")
647
648 shows_data = await api_request(
649 self,
650 ENDPOINTS["shows_day_of_year"].format(date=target_date),
651 params={"audio_status": "complete_or_partial", "sort": "date:desc"},
652 )
653
654 albums: list[Album] = []
655 shows = shows_data.get("shows", [])
656
657 for show in shows:
658 if show and show.get("audio_status") in ["complete", "partial"]:
659 album = show_to_album(self, show)
660 albums.append(album)
661
662 return albums
663
664 except MediaNotFoundError:
665 self.logger.info("No shows found for %s", today.strftime("%B %d"))
666 return []
667 except ProviderUnavailableError:
668 raise
669 except Exception as err:
670 self.logger.error("Failed to get today's shows: %s", err)
671 raise ProviderUnavailableError(f"Today's shows error: {err}") from err
672
673 @use_cache(expiration=604800) # 7 days - venue list changes rarely
674 async def _browse_venues(self, path: str, subsubpath: str) -> list[BrowseFolder | Album]:
675 """Browse shows by venue."""
676 if not subsubpath:
677 try:
678 venues_data = await api_request(
679 self, ENDPOINTS["venues"], params={"per_page": 100, "sort": "shows_count:desc"}
680 )
681
682 folders: list[BrowseFolder | Album] = []
683 for venue in venues_data.get("venues", []):
684 audio_count = venue.get("shows_with_audio_count", 0)
685 if audio_count > 0:
686 folders.append(
687 BrowseFolder(
688 item_id=f"venue_{venue.get('slug')}",
689 provider=self.domain,
690 path=f"phishin://venues/{venue.get('slug')}",
691 name=f"{venue.get('name')} ({audio_count} shows)",
692 )
693 )
694
695 return folders[:50]
696
697 except (MediaNotFoundError, ProviderUnavailableError):
698 raise
699 except Exception as err:
700 self.logger.error("Failed to browse venues: %s", err)
701 raise ProviderUnavailableError(f"Browse venues error: {err}") from err
702 else:
703 return await self._get_shows_for_venue(subsubpath)
704
705 @use_cache(expiration=604800) # 7 days - tags list changes rarely
706 async def _browse_tags(self, path: str, subsubpath: str) -> list[BrowseFolder | Album | Track]:
707 """Browse shows and tracks by tag."""
708 if not subsubpath:
709 try:
710 tags_data = await api_request(self, ENDPOINTS["tags"])
711
712 folders: list[BrowseFolder | Album | Track] = []
713 for tag in tags_data:
714 track_count = tag.get("tracks_count", 0)
715 show_count = tag.get("shows_count", 0)
716 if track_count > 0 or show_count > 0:
717 count_str = (
718 f"{show_count} shows, {track_count} tracks"
719 if show_count > 0
720 else f"{track_count} tracks"
721 )
722 folders.append(
723 BrowseFolder(
724 item_id=f"tag_{tag.get('slug')}",
725 provider=self.domain,
726 path=f"phishin://tags/{tag.get('slug')}",
727 name=f"{tag.get('name')} ({count_str})",
728 )
729 )
730
731 return sorted(folders, key=lambda x: x.name)
732
733 except (MediaNotFoundError, ProviderUnavailableError):
734 raise
735 except Exception as err:
736 self.logger.error("Failed to browse tags: %s", err)
737 raise ProviderUnavailableError(f"Browse tags error: {err}") from err
738
739 elif "/" not in subsubpath:
740 tag_slug = subsubpath
741 try:
742 tags_data = await api_request(self, ENDPOINTS["tags"])
743 tag_info: dict[str, Any] = next(
744 (tag for tag in tags_data if tag.get("slug") == tag_slug), {}
745 )
746 tag_name = tag_info.get("name", tag_slug)
747 show_count = tag_info.get("shows_count", 0)
748 track_count = tag_info.get("tracks_count", 0)
749
750 subfolders: list[BrowseFolder | Album | Track] = []
751
752 if show_count > 0:
753 subfolders.append(
754 BrowseFolder(
755 item_id=f"tag_shows_{tag_slug}",
756 provider=self.domain,
757 path=f"phishin://tags/{tag_slug}/shows",
758 name=f"Shows with {tag_name} ({show_count})",
759 )
760 )
761
762 if track_count > 0:
763 subfolders.append(
764 BrowseFolder(
765 item_id=f"tag_tracks_{tag_slug}",
766 provider=self.domain,
767 path=f"phishin://tags/{tag_slug}/tracks",
768 name=f"All {tag_name} Tracks ({track_count})",
769 )
770 )
771
772 return subfolders
773
774 except (MediaNotFoundError, ProviderUnavailableError):
775 raise
776 except Exception as err:
777 self.logger.error("Failed to get tag subfolders: %s", err)
778 raise ProviderUnavailableError(f"Tag subfolders error: {err}") from err
779 else:
780 tag_slug, content_type = subsubpath.split("/", 1)
781 if content_type == "shows":
782 return await self._get_shows_for_tag(tag_slug)
783 if content_type == "tracks":
784 return await self._get_tracks_for_tag(tag_slug)
785 return []
786
787 @use_cache(expiration=86400) # 24 hours - Tag associations could change as new shows are tagged
788 async def _get_tracks_for_tag(self, tag_slug: str) -> list[BrowseFolder | Album | Track]:
789 """Get tracks for a specific tag."""
790 try:
791 tracks_data = await api_request(
792 self,
793 ENDPOINTS["tracks"],
794 params={
795 "tag_slug": tag_slug,
796 "per_page": 100,
797 "audio_status": "complete_or_partial",
798 "sort": "likes_count:desc",
799 },
800 )
801
802 tracks: list[BrowseFolder | Album | Track] = []
803 for track_data in tracks_data.get("tracks", []):
804 if track_data.get("mp3_url"):
805 track = track_to_ma_track(self, track_data)
806 tracks.append(track)
807
808 return tracks
809
810 except (MediaNotFoundError, ProviderUnavailableError):
811 raise
812 except Exception as err:
813 self.logger.error("Failed to get tracks for tag %s: %s", tag_slug, err)
814 raise ProviderUnavailableError(f"Tag tracks error: {err}") from err
815
816 async def _browse_top_shows(self) -> list[Album]:
817 """Get top 46 most liked shows."""
818 try:
819 shows_data = await api_request(
820 self,
821 ENDPOINTS["shows"],
822 params={
823 "per_page": 46,
824 "sort": "likes_count:desc",
825 "audio_status": "complete_or_partial",
826 },
827 )
828
829 albums: list[Album] = []
830 for show in shows_data.get("shows", []):
831 if show.get("audio_status") in ["complete", "partial"]:
832 album = show_to_album(self, show)
833 albums.append(album)
834
835 return albums
836
837 except (MediaNotFoundError, ProviderUnavailableError):
838 raise
839 except Exception as err:
840 self.logger.error("Failed to get top shows: %s", err)
841 raise ProviderUnavailableError(f"Top shows error: {err}") from err
842
843 async def _browse_top_tracks(self) -> list[Track]:
844 """Get top 46 most liked tracks."""
845 try:
846 tracks_data = await api_request(
847 self,
848 ENDPOINTS["tracks"],
849 params={
850 "per_page": 46,
851 "sort": "likes_count:desc",
852 "audio_status": "complete_or_partial",
853 },
854 )
855
856 tracks: list[Track] = []
857 for track_data in tracks_data.get("tracks", []):
858 if track_data.get("mp3_url"):
859 track = track_to_ma_track(self, track_data)
860 tracks.append(track)
861
862 return tracks
863
864 except (MediaNotFoundError, ProviderUnavailableError):
865 raise
866 except Exception as err:
867 self.logger.error("Failed to get top tracks: %s", err)
868 raise ProviderUnavailableError(f"Top tracks error: {err}") from err
869
870 @use_cache(expiration=86400) # 24 hours - Shows can be added to the current year
871 async def _get_shows_for_period(self, period: str) -> list[BrowseFolder | Album]:
872 """Get shows for a specific year or period."""
873 try:
874 if "-" in period and len(period.split("-")) == 2:
875 params = {
876 "year_range": period,
877 "per_page": 100,
878 "audio_status": "complete_or_partial",
879 }
880 else:
881 params = {
882 "year": period,
883 "per_page": 100,
884 "audio_status": "complete_or_partial",
885 }
886
887 shows_data = await api_request(self, ENDPOINTS["shows"], params=params)
888
889 albums: list[BrowseFolder | Album] = []
890 for show in shows_data.get("shows", []):
891 if show.get("audio_status") in ["complete", "partial"]:
892 album = show_to_album(self, show)
893 albums.append(album)
894
895 return sorted(albums, key=lambda x: x.name)
896
897 except (MediaNotFoundError, ProviderUnavailableError):
898 raise
899 except Exception as err:
900 self.logger.error("Failed to browse period %s: %s", period, err)
901 raise ProviderUnavailableError(f"Browse period error: {err}") from err
902
903 @use_cache(expiration=86400) # 24 hours - Venues might get new shows added
904 async def _get_shows_for_venue(self, venue_slug: str) -> list[BrowseFolder | Album]:
905 """Get shows for a specific venue."""
906 try:
907 shows_data = await api_request(
908 self,
909 ENDPOINTS["shows"],
910 params={
911 "venue_slug": venue_slug,
912 "per_page": 100,
913 "audio_status": "complete_or_partial",
914 "sort": "date:desc",
915 },
916 )
917
918 albums: list[BrowseFolder | Album] = []
919 for show in shows_data.get("shows", []):
920 if show.get("audio_status") in ["complete", "partial"]:
921 album = show_to_album(self, show)
922 albums.append(album)
923
924 return albums
925
926 except (MediaNotFoundError, ProviderUnavailableError):
927 raise
928 except Exception as err:
929 self.logger.error("Failed to get shows for venue %s: %s", venue_slug, err)
930 raise ProviderUnavailableError(f"Venue shows error: {err}") from err
931
932 @use_cache(expiration=86400) # 24 hours - Tag associations could change as new shows are tagged
933 async def _get_shows_for_tag(self, tag_slug: str) -> list[BrowseFolder | Album | Track]:
934 """Get shows for a specific tag."""
935 try:
936 shows_data = await api_request(
937 self,
938 ENDPOINTS["shows"],
939 params={
940 "tag_slug": tag_slug,
941 "per_page": 100,
942 "audio_status": "complete_or_partial",
943 "sort": "date:desc",
944 },
945 )
946
947 albums: list[BrowseFolder | Album | Track] = []
948 for show in shows_data.get("shows", []):
949 if show.get("audio_status") in ["complete", "partial"]:
950 album = show_to_album(self, show)
951 albums.append(album)
952
953 return albums
954
955 except (MediaNotFoundError, ProviderUnavailableError):
956 raise
957 except Exception as err:
958 self.logger.error("Failed to get shows for tag %s: %s", tag_slug, err)
959 raise ProviderUnavailableError(f"Tag shows error: {err}") from err
960