/
/
/
1"""Podcast Index provider implementation."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator, Sequence
6from typing import Any, cast
7
8import aiohttp
9from music_assistant_models.enums import ContentType, MediaType, StreamType
10from music_assistant_models.errors import (
11 InvalidDataError,
12 LoginFailed,
13 MediaNotFoundError,
14 ProviderUnavailableError,
15)
16from music_assistant_models.media_items import (
17 AudioFormat,
18 BrowseFolder,
19 MediaItemType,
20 Podcast,
21 PodcastEpisode,
22 SearchResults,
23)
24from music_assistant_models.streamdetails import StreamDetails
25
26from music_assistant.constants import VERBOSE_LOG_LEVEL
27from music_assistant.controllers.cache import use_cache
28from music_assistant.models.music_provider import MusicProvider
29
30from .constants import (
31 BROWSE_CATEGORIES,
32 BROWSE_RECENT,
33 BROWSE_TRENDING,
34 CONF_API_KEY,
35 CONF_API_SECRET,
36 CONF_STORED_PODCASTS,
37)
38from .helpers import make_api_request, parse_episode_from_data, parse_podcast_from_feed
39
40
41class PodcastIndexProvider(MusicProvider):
42 """Podcast Index provider for Music Assistant."""
43
44 api_key: str = ""
45 api_secret: str = ""
46
47 async def handle_async_init(self) -> None:
48 """Handle async initialization of the provider."""
49 self.api_key = str(self.config.get_value(CONF_API_KEY))
50 self.api_secret = str(self.config.get_value(CONF_API_SECRET))
51
52 if not self.api_key or not self.api_secret:
53 raise LoginFailed("API key and secret are required")
54
55 # Test API connection
56 try:
57 await self._api_request("stats/current")
58 except (LoginFailed, ProviderUnavailableError):
59 # Re-raise these specific errors as they have proper context
60 raise
61 except aiohttp.ClientConnectorError as err:
62 raise ProviderUnavailableError(
63 f"Failed to connect to Podcast Index API: {err}"
64 ) from err
65 except aiohttp.ServerTimeoutError as err:
66 raise ProviderUnavailableError(f"Podcast Index API timeout: {err}") from err
67 except Exception as err:
68 raise LoginFailed(f"Failed to connect to API: {err}") from err
69
70 async def search(
71 self, search_query: str, media_types: list[MediaType], limit: int = 10
72 ) -> SearchResults:
73 """
74 Perform search on Podcast Index.
75
76 Searches for podcasts by term. Future enhancement could include
77 category search if needed.
78 """
79 result = SearchResults()
80 if MediaType.PODCAST not in media_types:
81 return result
82
83 response = await self._api_request(
84 "search/byterm", params={"q": search_query, "max": limit}
85 )
86
87 podcasts = []
88 for feed_data in response.get("feeds", []):
89 podcast = parse_podcast_from_feed(feed_data, self.instance_id, self.domain)
90 if podcast:
91 podcasts.append(podcast)
92
93 result.podcasts = podcasts
94 return result
95
96 async def browse(self, path: str) -> Sequence[BrowseFolder | Podcast | PodcastEpisode]:
97 """Browse this provider's items."""
98 base = f"{self.instance_id}://"
99
100 if path == base:
101 # Return main browse categories
102 return [
103 BrowseFolder(
104 item_id=BROWSE_TRENDING,
105 provider=self.domain,
106 path=f"{base}{BROWSE_TRENDING}",
107 name="Trending Podcasts",
108 ),
109 BrowseFolder(
110 item_id=BROWSE_RECENT,
111 provider=self.domain,
112 path=f"{base}{BROWSE_RECENT}",
113 name="Recent Episodes",
114 ),
115 BrowseFolder(
116 item_id=BROWSE_CATEGORIES,
117 provider=self.domain,
118 path=f"{base}{BROWSE_CATEGORIES}",
119 name="Categories",
120 ),
121 ]
122
123 # Parse path after base
124 if path.startswith(base):
125 subpath_parts = path[len(base) :].split("/")
126 subpath = subpath_parts[0] if subpath_parts else ""
127
128 if subpath == BROWSE_TRENDING:
129 return await self._browse_trending()
130 if subpath == BROWSE_RECENT:
131 return await self._browse_recent_episodes()
132 if subpath == BROWSE_CATEGORIES:
133 if len(subpath_parts) > 1:
134 # Browse specific category - category name is directly in path
135 category_name = subpath_parts[1]
136 return await self._browse_category_podcasts(category_name)
137 # Browse categories
138 return await self._browse_categories()
139
140 return []
141
142 async def library_add(self, item: MediaItemType) -> bool:
143 """
144 Add podcast to library.
145
146 Retrieves the RSS feed URL for the podcast and adds it to the stored
147 podcasts configuration. Returns True if successfully added, False if
148 the podcast was already in the library or if the feed URL couldn't be found.
149 """
150 # Only handle podcasts - delegate others to base class
151 if not isinstance(item, Podcast):
152 return await super().library_add(item)
153
154 stored_podcasts = cast("list[str]", self.config.get_value(CONF_STORED_PODCASTS))
155
156 # Get the RSS URL from the podcast via API
157 try:
158 feed_url = await self._get_feed_url_for_podcast(item.item_id)
159 except Exception as err:
160 self.logger.warning(
161 "Failed to retrieve feed URL for podcast %s: %s", item.name, err, exc_info=True
162 )
163 return False
164
165 if not feed_url:
166 self.logger.warning(
167 "No feed URL found for podcast %s (ID: %s)", item.name, item.item_id
168 )
169 return False
170
171 if feed_url in stored_podcasts:
172 return False
173
174 self.logger.debug("Adding podcast %s to library", item.name)
175 stored_podcasts.append(feed_url)
176 self._update_config_value(CONF_STORED_PODCASTS, stored_podcasts)
177 return True
178
179 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
180 """
181 Remove podcast from library.
182
183 Removes the podcast's RSS feed URL from the stored podcasts configuration.
184 Always returns True for idempotent operation. If feed URL retrieval fails,
185 logs a warning but still returns True to maintain the idempotent contract
186 as required by MA convention.
187 """
188 stored_podcasts = cast("list[str]", self.config.get_value(CONF_STORED_PODCASTS))
189
190 # Get the RSS URL for this podcast
191 try:
192 feed_url = await self._get_feed_url_for_podcast(prov_item_id)
193 except Exception as err:
194 self.logger.warning(
195 "Failed to retrieve feed URL for podcast removal %s: %s",
196 prov_item_id,
197 err,
198 exc_info=True,
199 )
200 # Still return True for idempotent operation
201 return True
202
203 if not feed_url or feed_url not in stored_podcasts:
204 return True
205
206 self.logger.debug("Removing podcast %s from library", prov_item_id)
207 stored_podcasts = [x for x in stored_podcasts if x != feed_url]
208 self._update_config_value(CONF_STORED_PODCASTS, stored_podcasts)
209 return True
210
211 @use_cache(3600 * 24 * 14) # Cache for 14 days
212 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
213 """Get podcast details."""
214 try:
215 # Try by ID first
216 response = await self._api_request("podcasts/byfeedid", params={"id": prov_podcast_id})
217 if response.get("feed"):
218 podcast = parse_podcast_from_feed(response["feed"], self.instance_id, self.domain)
219 if podcast:
220 return podcast
221 except (ProviderUnavailableError, InvalidDataError):
222 # Re-raise these specific errors
223 raise
224 except Exception as err:
225 self.logger.debug("Unexpected error getting podcast %s: %s", prov_podcast_id, err)
226
227 raise MediaNotFoundError(f"Podcast {prov_podcast_id} not found")
228
229 async def get_podcast_episodes(
230 self, prov_podcast_id: str
231 ) -> AsyncGenerator[PodcastEpisode, None]:
232 """Get episodes for a podcast."""
233 self.logger.debug("Getting episodes for podcast ID: %s", prov_podcast_id)
234
235 # Try to get the podcast name from the current context first
236 podcast_name = None
237 try:
238 podcast = await self.mass.music.podcasts.get_provider_item(
239 prov_podcast_id, self.instance_id
240 )
241 if podcast:
242 podcast_name = podcast.name
243 self.logger.debug("Got podcast name from MA context: %s", podcast_name)
244 except Exception as err:
245 self.logger.debug("Could not get podcast from MA context: %s", err)
246
247 # If we don't have the name, get it from the API
248 if not podcast_name:
249 try:
250 podcast_response = await self._api_request(
251 "podcasts/byfeedid", params={"id": prov_podcast_id}
252 )
253 if podcast_response.get("feed"):
254 podcast_name = podcast_response["feed"].get("title")
255 self.logger.debug("Got podcast name from API fallback: %s", podcast_name)
256 except Exception as err:
257 self.logger.warning("Could not get podcast name from API: %s", err)
258
259 try:
260 response = await self._api_request(
261 "episodes/byfeedid", params={"id": prov_podcast_id, "max": 1000}
262 )
263
264 episodes = response.get("items", [])
265 for idx, episode_data in enumerate(episodes):
266 episode = parse_episode_from_data(
267 episode_data,
268 prov_podcast_id,
269 idx,
270 self.instance_id,
271 self.domain,
272 podcast_name,
273 )
274 if episode:
275 yield episode
276
277 except (ProviderUnavailableError, InvalidDataError):
278 # Re-raise these specific errors
279 raise
280 except Exception as err:
281 self.logger.warning(
282 "Unexpected error getting episodes for %s: %s", prov_podcast_id, err
283 )
284
285 @use_cache(43200) # Cache for 12 hours
286 async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
287 """
288 Get podcast episode details using direct API lookup.
289
290 Uses the efficient episodes/byid endpoint for direct episode retrieval.
291 """
292 try:
293 podcast_id, episode_id = prov_episode_id.split("|", 1)
294
295 response = await self._api_request("episodes/byid", params={"id": episode_id})
296 episode_data = response.get("episode")
297
298 if episode_data:
299 episode = parse_episode_from_data(
300 episode_data, podcast_id, 0, self.instance_id, self.domain
301 )
302 if episode:
303 return episode
304
305 except (ProviderUnavailableError, InvalidDataError):
306 # Re-raise these specific errors
307 raise
308 except ValueError as err:
309 # Handle malformed episode ID
310 raise InvalidDataError(f"Invalid episode ID format: {prov_episode_id}") from err
311 except Exception as err:
312 self.logger.warning("Unexpected error getting episode %s: %s", prov_episode_id, err)
313
314 raise MediaNotFoundError(f"Episode {prov_episode_id} not found")
315
316 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
317 """
318 Get stream details for a podcast episode.
319
320 Uses the Podcast Index episodes/byid endpoint for efficient direct lookup
321 rather than fetching all episodes for a podcast.
322 """
323 if media_type != MediaType.PODCAST_EPISODE:
324 raise MediaNotFoundError("Stream details only available for episodes")
325
326 try:
327 _, episode_id = item_id.split("|", 1)
328
329 # Use direct episode lookup for efficiency
330 response = await self._api_request("episodes/byid", params={"id": episode_id})
331 episode_data = response.get("episode")
332
333 if episode_data:
334 stream_url = episode_data.get("enclosureUrl")
335 if stream_url:
336 return StreamDetails(
337 provider=self.instance_id,
338 item_id=item_id,
339 audio_format=AudioFormat(
340 content_type=ContentType.try_parse(
341 episode_data.get("enclosureType") or "audio/mpeg"
342 ),
343 ),
344 media_type=MediaType.PODCAST_EPISODE,
345 stream_type=StreamType.HTTP,
346 path=stream_url,
347 allow_seek=True,
348 )
349
350 except (ProviderUnavailableError, InvalidDataError):
351 # Re-raise these specific errors
352 raise
353 except ValueError as err:
354 # Handle malformed episode ID
355 raise InvalidDataError(f"Invalid episode ID format: {item_id}") from err
356 except Exception as err:
357 self.logger.warning("Unexpected error getting stream for %s: %s", item_id, err)
358
359 raise MediaNotFoundError(f"Stream not found for {item_id}")
360
361 async def get_item(self, media_type: MediaType, prov_item_id: str) -> Podcast | PodcastEpisode:
362 """Get single MediaItem from provider."""
363 if media_type == MediaType.PODCAST:
364 return await self.get_podcast(prov_item_id)
365 if media_type == MediaType.PODCAST_EPISODE:
366 return await self.get_podcast_episode(prov_item_id)
367 raise MediaNotFoundError(f"Media type {media_type} not supported by this provider")
368
369 async def _fetch_podcasts(
370 self, endpoint: str, params: dict[str, Any] | None = None
371 ) -> list[Podcast]:
372 """Fetch and parse podcasts from API endpoint."""
373 response = await self._api_request(endpoint, params)
374 podcasts = []
375 for feed_data in response.get("feeds", []):
376 podcast = parse_podcast_from_feed(feed_data, self.instance_id, self.domain)
377 if podcast:
378 podcasts.append(podcast)
379 return podcasts
380
381 async def _api_request(
382 self, endpoint: str, params: dict[str, Any] | None = None
383 ) -> dict[str, Any]:
384 """Make authenticated request to Podcast Index API."""
385 self.logger.log(
386 VERBOSE_LOG_LEVEL, "Making API request to %s with params: %s", endpoint, params
387 )
388 return await make_api_request(self.mass, self.api_key, self.api_secret, endpoint, params)
389
390 async def _get_feed_url_for_podcast(self, podcast_id: str) -> str | None:
391 """Get RSS feed URL for a podcast ID."""
392 try:
393 response = await self._api_request("podcasts/byfeedid", params={"id": podcast_id})
394 feed_data: dict[str, Any] = response.get("feed", {})
395 return feed_data.get("url")
396 except (ProviderUnavailableError, InvalidDataError):
397 # Re-raise these specific errors
398 raise
399 except Exception as err:
400 self.logger.warning(
401 "Unexpected error getting feed URL for podcast %s: %s",
402 podcast_id,
403 err,
404 exc_info=True,
405 )
406 return None
407
408 @use_cache(7200) # Cache for 2 hours
409 async def _browse_trending(self) -> list[Podcast]:
410 """Browse trending podcasts."""
411 try:
412 return await self._fetch_podcasts("podcasts/trending", {"max": 50})
413 except (ProviderUnavailableError, InvalidDataError):
414 raise
415 except Exception as err:
416 self.logger.warning(
417 "Unexpected error getting trending podcasts: %s", err, exc_info=True
418 )
419 return []
420
421 @use_cache(14400) # Cache for 4 hours
422 async def _browse_recent_episodes(self) -> list[PodcastEpisode]:
423 """Browse recent episodes."""
424 try:
425 response = await self._api_request("recent/episodes", params={"max": 50})
426
427 episodes = []
428 for idx, episode_data in enumerate(response.get("items", [])):
429 # Extract podcast ID from episode data
430 podcast_id = str(episode_data.get("feedId", ""))
431 # Pass feedTitle to avoid unnecessary API calls
432 podcast_name = episode_data.get("feedTitle")
433 episode = parse_episode_from_data(
434 episode_data,
435 podcast_id,
436 idx,
437 self.instance_id,
438 self.domain,
439 podcast_name,
440 )
441 if episode:
442 episodes.append(episode)
443
444 return episodes
445
446 except (ProviderUnavailableError, InvalidDataError):
447 # Re-raise these specific errors
448 raise
449 except Exception as err:
450 self.logger.warning("Unexpected error getting recent episodes: %s", err, exc_info=True)
451 return []
452
453 @use_cache(86400) # Cache for 24 hours
454 async def _browse_categories(self) -> list[BrowseFolder]:
455 """Browse podcast categories."""
456 try:
457 response = await self._api_request("categories/list")
458
459 categories = []
460 # Categories API returns feeds array with {id, name} objects
461 categories_data = response.get("feeds", [])
462
463 for category in categories_data:
464 cat_name = category.get("name", "Unknown Category")
465
466 categories.append(
467 BrowseFolder(
468 item_id=cat_name, # Use name as ID
469 provider=self.domain,
470 path=f"{self.instance_id}://{BROWSE_CATEGORIES}/{cat_name}",
471 name=cat_name,
472 )
473 )
474
475 # Sort by name
476 return sorted(categories, key=lambda x: x.name)
477
478 except (ProviderUnavailableError, InvalidDataError):
479 # Re-raise these specific errors
480 raise
481 except Exception as err:
482 self.logger.warning("Unexpected error getting categories: %s", err, exc_info=True)
483 return []
484
485 @use_cache(43200) # Cache for 12 hours
486 async def _browse_category_podcasts(self, category_name: str) -> list[Podcast]:
487 """Browse podcasts in a specific category using search."""
488 try:
489 # Search for podcasts using the category name directly
490 search_response = await self._api_request(
491 "search/byterm", params={"q": category_name, "max": 50}
492 )
493
494 podcasts = []
495 for feed_data in search_response.get("feeds", []):
496 podcast = parse_podcast_from_feed(feed_data, self.instance_id, self.domain)
497 if podcast:
498 podcasts.append(podcast)
499
500 return podcasts
501
502 except (ProviderUnavailableError, InvalidDataError):
503 raise
504 except Exception as err:
505 self.logger.warning(
506 "Unexpected error getting category podcasts: %s", err, exc_info=True
507 )
508 return []
509