music-assistant-server

15.2 KBPY
__init__.py
15.2 KB415 lines • python
1"""iTunes Podcast search support for MusicAssistant."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator
6from pathlib import Path
7from typing import TYPE_CHECKING, Any
8
9import aiofiles
10import orjson
11from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
12from music_assistant_models.enums import (
13    ConfigEntryType,
14    ContentType,
15    ImageType,
16    MediaType,
17    ProviderFeature,
18    StreamType,
19)
20from music_assistant_models.errors import MediaNotFoundError
21from music_assistant_models.media_items import (
22    AudioFormat,
23    MediaItemImage,
24    Podcast,
25    PodcastEpisode,
26    ProviderMapping,
27    RecommendationFolder,
28    SearchResults,
29    UniqueList,
30)
31from music_assistant_models.streamdetails import StreamDetails
32
33from music_assistant.controllers.cache import use_cache
34from music_assistant.helpers.podcast_parsers import (
35    get_podcastparser_dict,
36    parse_podcast,
37    parse_podcast_episode,
38)
39from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
40from music_assistant.models.music_provider import MusicProvider
41from music_assistant.providers.itunes_podcasts.schema import (
42    ITunesSearchResults,
43    PodcastSearchResult,
44    TopPodcastsHelper,
45    TopPodcastsResponse,
46)
47
48if TYPE_CHECKING:
49    from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
50    from music_assistant_models.provider import ProviderManifest
51
52    from music_assistant.mass import MusicAssistant
53    from music_assistant.models import ProviderInstanceType
54
55
56CONF_LOCALE = "locale"
57CONF_EXPLICIT = "explicit"
58CONF_NUM_EPISODES = "num_episodes"
59
60CACHE_CATEGORY_PODCASTS = 0
61CACHE_CATEGORY_RECOMMENDATIONS = 1
62CACHE_KEY_TOP_PODCASTS = "top-podcasts"
63
64SUPPORTED_FEATURES = {ProviderFeature.SEARCH, ProviderFeature.RECOMMENDATIONS}
65
66
67async def setup(
68    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
69) -> ProviderInstanceType:
70    """Initialize provider(instance) with given configuration."""
71    return ITunesPodcastsProvider(mass, manifest, config, SUPPORTED_FEATURES)
72
73
74async def get_config_entries(
75    mass: MusicAssistant,
76    instance_id: str | None = None,
77    action: str | None = None,
78    values: dict[str, ConfigValueType] | None = None,
79) -> tuple[ConfigEntry, ...]:
80    """
81    Return Config entries to setup this provider.
82
83    instance_id: id of an existing provider instance (None if new instance setup).
84    action: [optional] action key called from config entries UI.
85    values: the (intermediate) raw values for config entries sent with the action.
86    """
87    # ruff: noqa: ARG001
88    json_path = Path(__file__).parent / "itunes_country_codes.json"
89    async with aiofiles.open(json_path) as f:
90        country_codes = orjson.loads(await f.read())
91
92    language_options = [ConfigValueOption(val, key.lower()) for key, val in country_codes.items()]
93    return (
94        ConfigEntry(
95            key=CONF_LOCALE,
96            type=ConfigEntryType.STRING,
97            label="Country",
98            required=True,
99            options=language_options,
100        ),
101        ConfigEntry(
102            key=CONF_NUM_EPISODES,
103            type=ConfigEntryType.INTEGER,
104            label="Maximum number of episodes. 0 for unlimited.",
105            required=False,
106            description="Maximum number of episodes. 0 for unlimited.",
107            default_value=0,
108        ),
109        ConfigEntry(
110            key=CONF_EXPLICIT,
111            type=ConfigEntryType.BOOLEAN,
112            label="Include explicit results",
113            required=False,
114            description="Whether or not to include explicit content results in search.",
115            default_value=True,
116        ),
117    )
118
119
120class ITunesPodcastsProvider(MusicProvider):
121    """ITunesPodcastsProvider."""
122
123    throttler: ThrottlerManager
124
125    @property
126    def is_streaming_provider(self) -> bool:
127        """Return True if the provider is a streaming provider."""
128        # For streaming providers return True here but for local file based providers return False.
129        return True
130
131    async def handle_async_init(self) -> None:
132        """Handle async initialization of the provider."""
133        self.max_episodes = int(str(self.config.get_value(CONF_NUM_EPISODES)))
134        # 20 requests per minute, be a bit below
135        self.throttler = ThrottlerManager(rate_limit=18, period=60)
136
137    @use_cache(3600 * 24 * 7)  # Cache for 7 days
138    async def search(
139        self, search_query: str, media_types: list[MediaType], limit: int = 10
140    ) -> SearchResults:
141        """Perform search on musicprovider."""
142        result = SearchResults()
143        if MediaType.PODCAST not in media_types:
144            return result
145
146        if limit < 1:
147            limit = 1
148        elif limit > 200:
149            limit = 200
150        country = str(self.config.get_value(CONF_LOCALE))
151        explicit = "Yes" if bool(self.config.get_value(CONF_EXPLICIT)) else "No"
152        params: dict[str, str | int] = {
153            "media": "podcast",
154            "entity": "podcast",
155            "country": country,
156            "attribute": "titleTerm",
157            "explicit": explicit,
158            "limit": limit,
159            "term": search_query,
160        }
161        url = "https://itunes.apple.com/search?"
162        result.podcasts = await self._perform_search(url, params)
163
164        return result
165
166    @throttle_with_retries
167    async def _perform_search(self, url: str, params: dict[str, str | int]) -> list[Podcast]:
168        response = await self.mass.http_session.get(url, params=params)
169        json_response = b""
170        if response.status == 200:
171            json_response = await response.read()
172        if not json_response:
173            return []
174        results = ITunesSearchResults.from_json(json_response).results
175        return self._get_podcast_list(results)
176
177    def _get_podcast_list(self, results: list[PodcastSearchResult]) -> list[Podcast]:
178        podcast_list: list[Podcast] = []
179        for result in results:
180            if result.feed_url is None or result.track_name is None:
181                self.logger.info(
182                    "The podcast '%s' does not have a feed url. Please see the docs for more info.",
183                    result.track_name,
184                )
185                continue
186            podcast = Podcast(
187                name=result.track_name,
188                item_id=result.feed_url,
189                publisher=result.artist_name,
190                provider=self.instance_id,
191                provider_mappings={
192                    ProviderMapping(
193                        item_id=result.feed_url,
194                        provider_domain=self.domain,
195                        provider_instance=self.instance_id,
196                    )
197                },
198            )
199            image_list = []
200            for artwork_url in [
201                result.artwork_url_600,
202                result.artwork_url_100,
203                result.artwork_url_60,
204                result.artwork_url_30,
205            ]:
206                if artwork_url is not None:
207                    image_list.append(
208                        MediaItemImage(
209                            type=ImageType.THUMB, path=artwork_url, provider=self.instance_id
210                        )
211                    )
212            podcast.metadata.images = UniqueList(image_list)
213            podcast_list.append(podcast)
214        return podcast_list
215
216    async def get_podcast(self, prov_podcast_id: str) -> Podcast:
217        """Get podcast."""
218        parsed = await self._cache_get_podcast(prov_podcast_id)
219
220        return parse_podcast(
221            feed_url=prov_podcast_id,
222            parsed_feed=parsed,
223            instance_id=self.instance_id,
224            domain=self.domain,
225        )
226
227    async def get_podcast_episodes(
228        self, prov_podcast_id: str
229    ) -> AsyncGenerator[PodcastEpisode, None]:
230        """Get podcast episodes."""
231        podcast = await self._cache_get_podcast(prov_podcast_id)
232        podcast_cover = podcast.get("cover_url")
233        episodes = podcast.get("episodes", [])
234        for cnt, episode in enumerate(episodes):
235            if mass_episode := parse_podcast_episode(
236                episode=episode,
237                prov_podcast_id=prov_podcast_id,
238                episode_cnt=cnt,
239                podcast_cover=podcast_cover,
240                domain=self.domain,
241                instance_id=self.instance_id,
242            ):
243                yield mass_episode
244
245    async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
246        """Get single podcast episode."""
247        podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
248        async for mass_episode in self.get_podcast_episodes(podcast_id):
249            _, _guid_or_stream_url = mass_episode.item_id.split(" ")
250            # this is enough, as internal
251            if guid_or_stream_url == _guid_or_stream_url:
252                return mass_episode
253        raise MediaNotFoundError("Episode not found")
254
255    async def recommendations(self) -> list[RecommendationFolder]:
256        """Get recommendations.
257
258        This provider uses a list of top podcasts for the configured country.
259        """
260        search_results = await self._cache_get_top_podcasts()
261        podcast_list = self._get_podcast_list(search_results)
262        return [
263            RecommendationFolder(
264                item_id="itunes-top-podcasts",
265                name="Trending Podcasts",
266                icon="mdi-trending-up",
267                translation_key="trending_podcasts",
268                items=UniqueList(podcast_list),
269                provider=self.instance_id,
270            )
271        ]
272
273    async def _get_episode_stream_url(self, podcast_id: str, guid_or_stream_url: str) -> str | None:
274        podcast = await self._cache_get_podcast(podcast_id)
275        episodes = podcast.get("episodes", [])
276        for cnt, episode in enumerate(episodes):
277            episode_enclosures = episode.get("enclosures", [])
278            if len(episode_enclosures) < 1:
279                raise MediaNotFoundError
280            stream_url: str | None = episode_enclosures[0].get("url", None)
281            guid = episode.get("guid")
282            if guid is not None and len(guid.split(" ")) == 1:
283                _guid_or_stream_url_compare = guid
284            else:
285                _guid_or_stream_url_compare = stream_url
286            if guid_or_stream_url == _guid_or_stream_url_compare:
287                return stream_url
288        return None
289
290    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
291        """Get streamdetails for item."""
292        podcast_id, guid_or_stream_url = item_id.split(" ")
293        stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
294        if stream_url is None:
295            raise MediaNotFoundError
296        return StreamDetails(
297            provider=self.instance_id,
298            item_id=item_id,
299            audio_format=AudioFormat(
300                content_type=ContentType.try_parse(stream_url),
301            ),
302            media_type=MediaType.PODCAST_EPISODE,
303            stream_type=StreamType.HTTP,
304            path=stream_url,
305            can_seek=True,
306            allow_seek=True,
307        )
308
309    @throttle_with_retries
310    async def _get_podcast_search_result_from_itunes_id(
311        self, itunes_id: int
312    ) -> PodcastSearchResult:
313        params = {"id": itunes_id}
314        url = "https://itunes.apple.com/lookup?"
315        response = await self.mass.http_session.get(url, params=params)
316        json_response = b""
317        if response.status == 200:
318            json_response = await response.read()
319        if not json_response:
320            raise MediaNotFoundError
321        search_results = ITunesSearchResults.from_json(json_response)
322        if search_results.result_count == 0:
323            raise MediaNotFoundError
324        if search_results.result_count > 1:
325            self.logger.warning("More than a single result for podcast.")
326        return search_results.results[0]
327
328    async def _cache_get_podcast(self, prov_podcast_id: str) -> dict[str, Any]:
329        parsed_podcast = await self.mass.cache.get(
330            key=prov_podcast_id,
331            provider=self.instance_id,
332            category=CACHE_CATEGORY_PODCASTS,
333            default=None,
334        )
335        if parsed_podcast is None:
336            # get_podcastparser_dict raises MediaNotFoundError if data is invalid
337            parsed_podcast = await get_podcastparser_dict(
338                session=self.mass.http_session,
339                feed_url=prov_podcast_id,
340                max_episodes=self.max_episodes,
341            )
342            await self._cache_set_podcast(feed_url=prov_podcast_id, parsed_podcast=parsed_podcast)
343
344        # this is a dictionary from podcastparser
345        return parsed_podcast  # type: ignore[no-any-return]
346
347    async def _cache_set_podcast(self, feed_url: str, parsed_podcast: dict[str, Any]) -> None:
348        await self.mass.cache.set(
349            key=feed_url,
350            provider=self.instance_id,
351            category=CACHE_CATEGORY_PODCASTS,
352            data=parsed_podcast,
353            expiration=60 * 60 * 24,  # 1 day
354        )
355
356    async def _cache_set_top_podcasts(self, top_podcast_helper: TopPodcastsHelper) -> None:
357        await self.mass.cache.set(
358            key=CACHE_KEY_TOP_PODCASTS,
359            provider=self.instance_id,
360            category=CACHE_CATEGORY_RECOMMENDATIONS,
361            data=top_podcast_helper.to_dict(),
362            expiration=60 * 60 * 6,  # 6 hours
363        )
364
365    async def _cache_get_top_podcasts(self) -> list[PodcastSearchResult]:
366        parsed_top_podcasts = await self.mass.cache.get(
367            key=CACHE_KEY_TOP_PODCASTS,
368            provider=self.instance_id,
369            category=CACHE_CATEGORY_RECOMMENDATIONS,
370        )
371        if parsed_top_podcasts is not None:
372            helper = TopPodcastsHelper.from_dict(parsed_top_podcasts)
373            return helper.top_podcasts
374
375        # 15 results
376        # keep 20 requests max per minute in mind
377        # https://rss.marketingtools.apple.com/
378        country = str(self.config.get_value(CONF_LOCALE))
379        url = f"https://rss.marketingtools.apple.com/api/v2/{country}/podcasts/top/15/podcasts.json"
380        response = await self.mass.http_session.get(url)
381        json_response = b""
382        if response.status == 200:
383            json_response = await response.read()
384        if not json_response:
385            return []
386
387        top_podcasts_response = TopPodcastsResponse.from_json(json_response)
388
389        if top_podcasts_response.feed is None:
390            return []
391
392        include_explicit = bool(self.config.get_value(CONF_EXPLICIT))
393
394        helper = TopPodcastsHelper()
395        for top_podcast in top_podcasts_response.feed.results:
396            if not include_explicit and top_podcast.content_advisory_rating is not None:
397                # the spelling within the API is wrong.
398                if top_podcast.content_advisory_rating in [
399                    "explicit",
400                    "Explicit",
401                    "Explict",
402                    "explict",
403                ]:
404                    continue
405            try:
406                podcast_search_result = await self._get_podcast_search_result_from_itunes_id(
407                    int(top_podcast.id_)
408                )
409            except MediaNotFoundError:
410                continue
411            helper.top_podcasts.append(podcast_search_result)
412
413        await self._cache_set_top_podcasts(top_podcast_helper=helper)
414        return helper.top_podcasts
415