/
/
/
1"""iTunes Podcast search support for MusicAssistant."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator
6from pathlib import Path
7from typing import TYPE_CHECKING, Any
8
9import aiofiles
10import orjson
11from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
12from music_assistant_models.enums import (
13 ConfigEntryType,
14 ContentType,
15 ImageType,
16 MediaType,
17 ProviderFeature,
18 StreamType,
19)
20from music_assistant_models.errors import MediaNotFoundError
21from music_assistant_models.media_items import (
22 AudioFormat,
23 MediaItemImage,
24 Podcast,
25 PodcastEpisode,
26 ProviderMapping,
27 RecommendationFolder,
28 SearchResults,
29 UniqueList,
30)
31from music_assistant_models.streamdetails import StreamDetails
32
33from music_assistant.controllers.cache import use_cache
34from music_assistant.helpers.podcast_parsers import (
35 get_podcastparser_dict,
36 parse_podcast,
37 parse_podcast_episode,
38)
39from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
40from music_assistant.models.music_provider import MusicProvider
41from music_assistant.providers.itunes_podcasts.schema import (
42 ITunesSearchResults,
43 PodcastSearchResult,
44 TopPodcastsHelper,
45 TopPodcastsResponse,
46)
47
48if TYPE_CHECKING:
49 from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
50 from music_assistant_models.provider import ProviderManifest
51
52 from music_assistant.mass import MusicAssistant
53 from music_assistant.models import ProviderInstanceType
54
55
56CONF_LOCALE = "locale"
57CONF_EXPLICIT = "explicit"
58CONF_NUM_EPISODES = "num_episodes"
59
60CACHE_CATEGORY_PODCASTS = 0
61CACHE_CATEGORY_RECOMMENDATIONS = 1
62CACHE_KEY_TOP_PODCASTS = "top-podcasts"
63
64SUPPORTED_FEATURES = {ProviderFeature.SEARCH, ProviderFeature.RECOMMENDATIONS}
65
66
67async def setup(
68 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
69) -> ProviderInstanceType:
70 """Initialize provider(instance) with given configuration."""
71 return ITunesPodcastsProvider(mass, manifest, config, SUPPORTED_FEATURES)
72
73
74async def get_config_entries(
75 mass: MusicAssistant,
76 instance_id: str | None = None,
77 action: str | None = None,
78 values: dict[str, ConfigValueType] | None = None,
79) -> tuple[ConfigEntry, ...]:
80 """
81 Return Config entries to setup this provider.
82
83 instance_id: id of an existing provider instance (None if new instance setup).
84 action: [optional] action key called from config entries UI.
85 values: the (intermediate) raw values for config entries sent with the action.
86 """
87 # ruff: noqa: ARG001
88 json_path = Path(__file__).parent / "itunes_country_codes.json"
89 async with aiofiles.open(json_path) as f:
90 country_codes = orjson.loads(await f.read())
91
92 language_options = [ConfigValueOption(val, key.lower()) for key, val in country_codes.items()]
93 return (
94 ConfigEntry(
95 key=CONF_LOCALE,
96 type=ConfigEntryType.STRING,
97 label="Country",
98 required=True,
99 options=language_options,
100 ),
101 ConfigEntry(
102 key=CONF_NUM_EPISODES,
103 type=ConfigEntryType.INTEGER,
104 label="Maximum number of episodes. 0 for unlimited.",
105 required=False,
106 description="Maximum number of episodes. 0 for unlimited.",
107 default_value=0,
108 ),
109 ConfigEntry(
110 key=CONF_EXPLICIT,
111 type=ConfigEntryType.BOOLEAN,
112 label="Include explicit results",
113 required=False,
114 description="Whether or not to include explicit content results in search.",
115 default_value=True,
116 ),
117 )
118
119
120class ITunesPodcastsProvider(MusicProvider):
121 """ITunesPodcastsProvider."""
122
123 throttler: ThrottlerManager
124
125 @property
126 def is_streaming_provider(self) -> bool:
127 """Return True if the provider is a streaming provider."""
128 # For streaming providers return True here but for local file based providers return False.
129 return True
130
131 async def handle_async_init(self) -> None:
132 """Handle async initialization of the provider."""
133 self.max_episodes = int(str(self.config.get_value(CONF_NUM_EPISODES)))
134 # 20 requests per minute, be a bit below
135 self.throttler = ThrottlerManager(rate_limit=18, period=60)
136
137 @use_cache(3600 * 24 * 7) # Cache for 7 days
138 async def search(
139 self, search_query: str, media_types: list[MediaType], limit: int = 10
140 ) -> SearchResults:
141 """Perform search on musicprovider."""
142 result = SearchResults()
143 if MediaType.PODCAST not in media_types:
144 return result
145
146 if limit < 1:
147 limit = 1
148 elif limit > 200:
149 limit = 200
150 country = str(self.config.get_value(CONF_LOCALE))
151 explicit = "Yes" if bool(self.config.get_value(CONF_EXPLICIT)) else "No"
152 params: dict[str, str | int] = {
153 "media": "podcast",
154 "entity": "podcast",
155 "country": country,
156 "attribute": "titleTerm",
157 "explicit": explicit,
158 "limit": limit,
159 "term": search_query,
160 }
161 url = "https://itunes.apple.com/search?"
162 result.podcasts = await self._perform_search(url, params)
163
164 return result
165
166 @throttle_with_retries
167 async def _perform_search(self, url: str, params: dict[str, str | int]) -> list[Podcast]:
168 response = await self.mass.http_session.get(url, params=params)
169 json_response = b""
170 if response.status == 200:
171 json_response = await response.read()
172 if not json_response:
173 return []
174 results = ITunesSearchResults.from_json(json_response).results
175 return self._get_podcast_list(results)
176
177 def _get_podcast_list(self, results: list[PodcastSearchResult]) -> list[Podcast]:
178 podcast_list: list[Podcast] = []
179 for result in results:
180 if result.feed_url is None or result.track_name is None:
181 self.logger.info(
182 "The podcast '%s' does not have a feed url. Please see the docs for more info.",
183 result.track_name,
184 )
185 continue
186 podcast = Podcast(
187 name=result.track_name,
188 item_id=result.feed_url,
189 publisher=result.artist_name,
190 provider=self.instance_id,
191 provider_mappings={
192 ProviderMapping(
193 item_id=result.feed_url,
194 provider_domain=self.domain,
195 provider_instance=self.instance_id,
196 )
197 },
198 )
199 image_list = []
200 for artwork_url in [
201 result.artwork_url_600,
202 result.artwork_url_100,
203 result.artwork_url_60,
204 result.artwork_url_30,
205 ]:
206 if artwork_url is not None:
207 image_list.append(
208 MediaItemImage(
209 type=ImageType.THUMB, path=artwork_url, provider=self.instance_id
210 )
211 )
212 podcast.metadata.images = UniqueList(image_list)
213 podcast_list.append(podcast)
214 return podcast_list
215
216 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
217 """Get podcast."""
218 parsed = await self._cache_get_podcast(prov_podcast_id)
219
220 return parse_podcast(
221 feed_url=prov_podcast_id,
222 parsed_feed=parsed,
223 instance_id=self.instance_id,
224 domain=self.domain,
225 )
226
227 async def get_podcast_episodes(
228 self, prov_podcast_id: str
229 ) -> AsyncGenerator[PodcastEpisode, None]:
230 """Get podcast episodes."""
231 podcast = await self._cache_get_podcast(prov_podcast_id)
232 podcast_cover = podcast.get("cover_url")
233 episodes = podcast.get("episodes", [])
234 for cnt, episode in enumerate(episodes):
235 if mass_episode := parse_podcast_episode(
236 episode=episode,
237 prov_podcast_id=prov_podcast_id,
238 episode_cnt=cnt,
239 podcast_cover=podcast_cover,
240 domain=self.domain,
241 instance_id=self.instance_id,
242 ):
243 yield mass_episode
244
245 async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
246 """Get single podcast episode."""
247 podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
248 async for mass_episode in self.get_podcast_episodes(podcast_id):
249 _, _guid_or_stream_url = mass_episode.item_id.split(" ")
250 # this is enough, as internal
251 if guid_or_stream_url == _guid_or_stream_url:
252 return mass_episode
253 raise MediaNotFoundError("Episode not found")
254
255 async def recommendations(self) -> list[RecommendationFolder]:
256 """Get recommendations.
257
258 This provider uses a list of top podcasts for the configured country.
259 """
260 search_results = await self._cache_get_top_podcasts()
261 podcast_list = self._get_podcast_list(search_results)
262 return [
263 RecommendationFolder(
264 item_id="itunes-top-podcasts",
265 name="Trending Podcasts",
266 icon="mdi-trending-up",
267 translation_key="trending_podcasts",
268 items=UniqueList(podcast_list),
269 provider=self.instance_id,
270 )
271 ]
272
273 async def _get_episode_stream_url(self, podcast_id: str, guid_or_stream_url: str) -> str | None:
274 podcast = await self._cache_get_podcast(podcast_id)
275 episodes = podcast.get("episodes", [])
276 for cnt, episode in enumerate(episodes):
277 episode_enclosures = episode.get("enclosures", [])
278 if len(episode_enclosures) < 1:
279 raise MediaNotFoundError
280 stream_url: str | None = episode_enclosures[0].get("url", None)
281 guid = episode.get("guid")
282 if guid is not None and len(guid.split(" ")) == 1:
283 _guid_or_stream_url_compare = guid
284 else:
285 _guid_or_stream_url_compare = stream_url
286 if guid_or_stream_url == _guid_or_stream_url_compare:
287 return stream_url
288 return None
289
290 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
291 """Get streamdetails for item."""
292 podcast_id, guid_or_stream_url = item_id.split(" ")
293 stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
294 if stream_url is None:
295 raise MediaNotFoundError
296 return StreamDetails(
297 provider=self.instance_id,
298 item_id=item_id,
299 audio_format=AudioFormat(
300 content_type=ContentType.try_parse(stream_url),
301 ),
302 media_type=MediaType.PODCAST_EPISODE,
303 stream_type=StreamType.HTTP,
304 path=stream_url,
305 can_seek=True,
306 allow_seek=True,
307 )
308
309 @throttle_with_retries
310 async def _get_podcast_search_result_from_itunes_id(
311 self, itunes_id: int
312 ) -> PodcastSearchResult:
313 params = {"id": itunes_id}
314 url = "https://itunes.apple.com/lookup?"
315 response = await self.mass.http_session.get(url, params=params)
316 json_response = b""
317 if response.status == 200:
318 json_response = await response.read()
319 if not json_response:
320 raise MediaNotFoundError
321 search_results = ITunesSearchResults.from_json(json_response)
322 if search_results.result_count == 0:
323 raise MediaNotFoundError
324 if search_results.result_count > 1:
325 self.logger.warning("More than a single result for podcast.")
326 return search_results.results[0]
327
328 async def _cache_get_podcast(self, prov_podcast_id: str) -> dict[str, Any]:
329 parsed_podcast = await self.mass.cache.get(
330 key=prov_podcast_id,
331 provider=self.instance_id,
332 category=CACHE_CATEGORY_PODCASTS,
333 default=None,
334 )
335 if parsed_podcast is None:
336 # get_podcastparser_dict raises MediaNotFoundError if data is invalid
337 parsed_podcast = await get_podcastparser_dict(
338 session=self.mass.http_session,
339 feed_url=prov_podcast_id,
340 max_episodes=self.max_episodes,
341 )
342 await self._cache_set_podcast(feed_url=prov_podcast_id, parsed_podcast=parsed_podcast)
343
344 # this is a dictionary from podcastparser
345 return parsed_podcast # type: ignore[no-any-return]
346
347 async def _cache_set_podcast(self, feed_url: str, parsed_podcast: dict[str, Any]) -> None:
348 await self.mass.cache.set(
349 key=feed_url,
350 provider=self.instance_id,
351 category=CACHE_CATEGORY_PODCASTS,
352 data=parsed_podcast,
353 expiration=60 * 60 * 24, # 1 day
354 )
355
356 async def _cache_set_top_podcasts(self, top_podcast_helper: TopPodcastsHelper) -> None:
357 await self.mass.cache.set(
358 key=CACHE_KEY_TOP_PODCASTS,
359 provider=self.instance_id,
360 category=CACHE_CATEGORY_RECOMMENDATIONS,
361 data=top_podcast_helper.to_dict(),
362 expiration=60 * 60 * 6, # 6 hours
363 )
364
365 async def _cache_get_top_podcasts(self) -> list[PodcastSearchResult]:
366 parsed_top_podcasts = await self.mass.cache.get(
367 key=CACHE_KEY_TOP_PODCASTS,
368 provider=self.instance_id,
369 category=CACHE_CATEGORY_RECOMMENDATIONS,
370 )
371 if parsed_top_podcasts is not None:
372 helper = TopPodcastsHelper.from_dict(parsed_top_podcasts)
373 return helper.top_podcasts
374
375 # 15 results
376 # keep 20 requests max per minute in mind
377 # https://rss.marketingtools.apple.com/
378 country = str(self.config.get_value(CONF_LOCALE))
379 url = f"https://rss.marketingtools.apple.com/api/v2/{country}/podcasts/top/15/podcasts.json"
380 response = await self.mass.http_session.get(url)
381 json_response = b""
382 if response.status == 200:
383 json_response = await response.read()
384 if not json_response:
385 return []
386
387 top_podcasts_response = TopPodcastsResponse.from_json(json_response)
388
389 if top_podcasts_response.feed is None:
390 return []
391
392 include_explicit = bool(self.config.get_value(CONF_EXPLICIT))
393
394 helper = TopPodcastsHelper()
395 for top_podcast in top_podcasts_response.feed.results:
396 if not include_explicit and top_podcast.content_advisory_rating is not None:
397 # the spelling within the API is wrong.
398 if top_podcast.content_advisory_rating in [
399 "explicit",
400 "Explicit",
401 "Explict",
402 "explict",
403 ]:
404 continue
405 try:
406 podcast_search_result = await self._get_podcast_search_result_from_itunes_id(
407 int(top_podcast.id_)
408 )
409 except MediaNotFoundError:
410 continue
411 helper.top_podcasts.append(podcast_search_result)
412
413 await self._cache_set_top_podcasts(top_podcast_helper=helper)
414 return helper.top_podcasts
415