/
/
/
1"""Podcastfeed -> Mass."""
2
3from datetime import datetime
4from io import BytesIO
5from typing import Any
6
7import aiohttp
8import podcastparser
9from aiohttp.client import ClientError
10from music_assistant_models.enums import ContentType, ImageType, MediaType
11from music_assistant_models.errors import MediaNotFoundError
12from music_assistant_models.media_items import (
13 AudioFormat,
14 ItemMapping,
15 MediaItemChapter,
16 MediaItemImage,
17 Podcast,
18 PodcastEpisode,
19 ProviderMapping,
20 UniqueList,
21)
22
23
24async def get_podcastparser_dict(
25 *, session: aiohttp.ClientSession, feed_url: str, max_episodes: int = 0
26) -> dict[str, Any]:
27 """Get feed parsed by podcastparser by providing the url.
28
29 max_episodes = 0 does not limit the returned episodes.
30 """
31 response: aiohttp.ClientResponse | None = None
32 # without user agent, some feeds can not be retrieved
33 # https://github.com/music-assistant/support/issues/3596
34 # but, reports on discord show, that also the opposite may be true
35 for headers in [{"User-Agent": "Mozilla/5.0"}, {}]:
36 # raises ClientError on status failure
37 # ClientError is the base class of all possible Error, i.e. not authorized,
38 # url doesn't exist etc.
39 try:
40 response = await session.get(feed_url, headers=headers, raise_for_status=True)
41 except ClientError:
42 continue
43 break
44 if response is None:
45 # we did not get a single acceptable response
46 raise MediaNotFoundError(
47 f"Did not get acceptable response while trying to access {feed_url}."
48 )
49 feed_data = await response.read()
50 feed_stream = BytesIO(feed_data)
51 try:
52 return podcastparser.parse(feed_url, feed_stream, max_episodes=max_episodes) # type: ignore[no-any-return]
53 except podcastparser.FeedParseError:
54 raise MediaNotFoundError(f"The url at {feed_url} returns invalid RSS data.")
55
56
57def parse_podcast(
58 *,
59 feed_url: str,
60 parsed_feed: dict[str, Any],
61 instance_id: str,
62 domain: str,
63 mass_item_id: str | None = None,
64) -> Podcast:
65 """Podcast -> Mass Podcast.
66
67 The item_id is the feed url by default, or the optional mass_item_id instead.
68 """
69 publisher = parsed_feed.get("author") or parsed_feed.get("itunes_author", "NO_AUTHOR")
70 item_id = feed_url if mass_item_id is None else mass_item_id
71 mass_podcast = Podcast(
72 item_id=item_id,
73 name=parsed_feed.get("title", "NO_TITLE"),
74 publisher=publisher,
75 provider=instance_id,
76 uri=parsed_feed.get("link"),
77 provider_mappings={
78 ProviderMapping(
79 item_id=item_id,
80 provider_domain=domain,
81 provider_instance=instance_id,
82 )
83 },
84 )
85 genres: list[str] = []
86 if _genres := parsed_feed.get("itunes_categories"):
87 for _sub_genre in _genres:
88 if isinstance(_sub_genre, list):
89 genres.extend(x for x in _sub_genre if isinstance(x, str))
90 elif isinstance(_sub_genre, str):
91 genres.append(_sub_genre)
92
93 mass_podcast.metadata.genres = set(genres)
94 mass_podcast.metadata.description = parsed_feed.get("description", "")
95 mass_podcast.metadata.explicit = parsed_feed.get("explicit", False)
96 language = parsed_feed.get("language")
97 if language is not None:
98 mass_podcast.metadata.languages = UniqueList([language])
99 episodes = parsed_feed.get("episodes", [])
100 mass_podcast.total_episodes = len(episodes)
101 podcast_cover = parsed_feed.get("cover_url")
102 if podcast_cover is not None:
103 mass_podcast.metadata.images = UniqueList(
104 [
105 MediaItemImage(
106 type=ImageType.THUMB,
107 path=podcast_cover,
108 provider=instance_id,
109 remotely_accessible=True,
110 )
111 ]
112 )
113 return mass_podcast
114
115
116def get_stream_url_and_guid_from_episode(*, episode: dict[str, Any]) -> tuple[str, str | None]:
117 """Give episode's stream url and guid, if it exists."""
118 episode_enclosures = episode.get("enclosures", [])
119 if len(episode_enclosures) < 1:
120 raise ValueError("Episode enclosure is missing")
121 if stream_url := episode_enclosures[0].get("url"):
122 guid = episode.get("guid")
123 if guid is not None:
124 # The media's item_id is {prov_podcast_id} {guid_or_stream_url}
125 # see parse_podcast_episode.
126 # However, the guid must not contain a space, otherwise it is invalid.
127 # We cannot check, if it is a proper guid (uuid.UUID4(...)), as some podcast feeds
128 # do not follow the standard.
129 guid = None if len(guid.split(" ")) > 1 else guid
130 return stream_url, guid
131 raise ValueError("Stream URL is missing.")
132
133
134def parse_podcast_episode(
135 *,
136 episode: dict[str, Any],
137 prov_podcast_id: str,
138 episode_cnt: int,
139 podcast_cover: str | None = None,
140 instance_id: str,
141 domain: str,
142 mass_item_id: str | None = None,
143) -> PodcastEpisode | None:
144 """Podcast Episode -> Mass Podcast Episode.
145
146 The item_id is {prov_podcast_id} {guid_or_stream_url} by default, or the optional mass_item_id
147 instead. The podcast_cover is used, if the episode should not have its own cover.
148
149 The function returns None, if the episode enclosure is missing, i.e. there is no stream
150 information present.
151 """
152 episode_duration = episode.get("total_time", 0.0)
153 episode_title = episode.get("title", "NO_EPISODE_TITLE")
154 episode_cover = episode.get("episode_art_url", podcast_cover)
155
156 # this is unix epoch in s, and 0 if unknown
157 episode_published: int | None = episode.get("published")
158 if episode_published == 0:
159 episode_published = None
160
161 try:
162 stream_url, guid = get_stream_url_and_guid_from_episode(episode=episode)
163 except ValueError:
164 # we are missing the episode enclosure or stream information
165 return None
166 # We treat a guid as invalid if contains a space.
167 guid_or_stream_url = guid if guid is not None and len(guid.split(" ")) == 1 else stream_url
168
169 # Default episode id. A guid is preferred as identification.
170 episode_id = f"{prov_podcast_id} {guid_or_stream_url}" if mass_item_id is None else mass_item_id
171 mass_episode = PodcastEpisode(
172 item_id=episode_id,
173 provider=instance_id,
174 name=episode_title,
175 duration=int(episode_duration),
176 position=episode_cnt,
177 podcast=ItemMapping(
178 item_id=prov_podcast_id,
179 provider=instance_id,
180 name=episode_title,
181 media_type=MediaType.PODCAST,
182 ),
183 provider_mappings={
184 ProviderMapping(
185 item_id=episode_id,
186 provider_domain=domain,
187 provider_instance=instance_id,
188 audio_format=AudioFormat(
189 content_type=ContentType.try_parse(stream_url),
190 ),
191 url=stream_url,
192 )
193 },
194 )
195 if episode_published is not None:
196 mass_episode.metadata.release_date = datetime.fromtimestamp(episode_published)
197
198 # chapter
199 if chapters := episode.get("chapters"):
200 _chapters = []
201 for cnt, chapter in enumerate(chapters):
202 if not isinstance(chapter, dict):
203 continue
204 title = chapter.get("title")
205 start = chapter.get("start")
206 if title and start:
207 _chapters.append(MediaItemChapter(position=cnt + 1, name=title, start=start))
208
209 # cover image
210 if episode_cover is not None:
211 mass_episode.metadata.images = UniqueList(
212 [
213 MediaItemImage(
214 type=ImageType.THUMB,
215 path=episode_cover,
216 provider=instance_id,
217 remotely_accessible=True,
218 )
219 ]
220 )
221
222 return mass_episode
223