/
/
/
1"""ARD Audiotek Music Provider for Music Assistant."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator, Sequence
6from datetime import datetime, timedelta
7from typing import TYPE_CHECKING, Any
8
9from gql import Client
10from gql.transport.aiohttp import AIOHTTPTransport
11from music_assistant_models.config_entries import ConfigEntry
12from music_assistant_models.enums import (
13 ConfigEntryType,
14 ContentType,
15 ImageType,
16 LinkType,
17 MediaType,
18 ProviderFeature,
19 StreamType,
20)
21from music_assistant_models.errors import LoginFailed, MediaNotFoundError, UnplayableMediaError
22from music_assistant_models.media_items import (
23 AudioFormat,
24 BrowseFolder,
25 ItemMapping,
26 MediaItemImage,
27 MediaItemLink,
28 MediaItemType,
29 Podcast,
30 PodcastEpisode,
31 ProviderMapping,
32 Radio,
33 SearchResults,
34)
35from music_assistant_models.streamdetails import StreamDetails
36
37from music_assistant.constants import CONF_PASSWORD
38from music_assistant.controllers.cache import use_cache
39from music_assistant.models.music_provider import MusicProvider
40from music_assistant.providers.ard_audiothek.database_queries import (
41 get_history_query,
42 get_subscriptions_query,
43 livestream_query,
44 organizations_query,
45 publication_services_query,
46 publications_list_query,
47 search_radios_query,
48 search_shows_query,
49 show_episode_query,
50 show_length_query,
51 show_query,
52 update_history_entry,
53)
54
55if TYPE_CHECKING:
56 from aiohttp import ClientSession
57 from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
58 from music_assistant_models.provider import ProviderManifest
59
60 from music_assistant.mass import MusicAssistant
61 from music_assistant.models import ProviderInstanceType
62
63# Config for login
64CONF_EMAIL = "email"
65CONF_TOKEN_BEARER = "token"
66CONF_EXPIRY_TIME = "token_expiry"
67CONF_USERID = "user_id"
68CONF_DISPLAY_NAME = "display_name"
69
70# Constants for config actions
71CONF_ACTION_AUTH = "authenticate"
72CONF_ACTION_CLEAR_AUTH = "clear_auth"
73
74# General config
75CONF_MAX_BITRATE = "max_num_episodes"
76CONF_PODCAST_FINISHED = "podcast_finished_time"
77
78IDENTITY_TOOLKIT_BASE_URL = "https://identitytoolkit.googleapis.com/v1/accounts"
79IDENTITY_TOOLKIT_TOKEN = "AIzaSyCEvA_fVGNMRcS9F-Ubaaa0y0qBDUMlh90"
80ARD_ACCOUNTS_URL = "https://accounts.ard.de"
81ARD_AUDIOTHEK_GRAPHQL = "https://api.ardaudiothek.de/graphql"
82
83SUPPORTED_FEATURES = {
84 ProviderFeature.BROWSE,
85 ProviderFeature.SEARCH,
86 ProviderFeature.LIBRARY_RADIOS,
87 ProviderFeature.LIBRARY_PODCASTS,
88}
89
90
91async def setup(
92 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
93) -> ProviderInstanceType:
94 """Initialize provider(instance) with given configuration."""
95 return ARDAudiothek(mass, manifest, config, SUPPORTED_FEATURES)
96
97
98async def _login(session: ClientSession, email: str, password: str) -> tuple[str, str, str]:
99 response = await session.post(
100 f"{IDENTITY_TOOLKIT_BASE_URL}:signInWithPassword?key={IDENTITY_TOOLKIT_TOKEN}",
101 headers={"User-Agent": "Music Assistant", "Origin": ARD_ACCOUNTS_URL},
102 json={
103 "returnSecureToken": True,
104 "email": email,
105 "password": password,
106 "clientType": "CLIENT_TYPE_WEB",
107 },
108 )
109 data = await response.json()
110 if "error" in data:
111 if data["error"]["message"] == "EMAIL_NOT_FOUND":
112 raise LoginFailed("Email address is not registered")
113 if data["error"]["message"] == "INVALID_PASSWORD":
114 raise LoginFailed("Password is wrong")
115 token = data["idToken"]
116 uid = data["localId"]
117
118 response = await session.post(
119 f"{IDENTITY_TOOLKIT_BASE_URL}:lookup?key={IDENTITY_TOOLKIT_TOKEN}",
120 headers={"User-Agent": "Music Assistant", "Origin": ARD_ACCOUNTS_URL},
121 json={
122 "idToken": token,
123 },
124 )
125 data = await response.json()
126 if "error" in data:
127 if data["error"]["message"] == "EMAIL_NOT_FOUND":
128 raise LoginFailed("Email address is not registered")
129 if data["error"]["message"] == "INVALID_PASSWORD":
130 raise LoginFailed("Password is wrong")
131
132 return token, uid, data["users"][0]["displayName"]
133
134
135def _create_aiohttptransport(headers: dict[str, str] | None = None) -> AIOHTTPTransport:
136 return AIOHTTPTransport(url=ARD_AUDIOTHEK_GRAPHQL, headers=headers, ssl=True)
137
138
139async def get_config_entries(
140 mass: MusicAssistant,
141 instance_id: str | None = None,
142 action: str | None = None,
143 values: dict[str, ConfigValueType] | None = None,
144) -> tuple[ConfigEntry, ...]:
145 """
146 Return Config entries to setup this provider.
147
148 instance_id: id of an existing provider instance (None if new instance setup).
149 action: [optional] action key called from config entries UI.
150 values: the (intermediate) raw values for config entries sent with the action.
151 """
152 # ruff: noqa: ARG001
153 if values is None:
154 values = {}
155
156 authenticated = True
157 if values.get(CONF_TOKEN_BEARER) is None or values.get(CONF_USERID) is None:
158 authenticated = False
159
160 return (
161 ConfigEntry(
162 key="label_text",
163 type=ConfigEntryType.LABEL,
164 label=f"Successfully signed in as {values.get(CONF_DISPLAY_NAME)} {str(values.get(CONF_EMAIL, '')).replace('@', '(at)')}.", # noqa: E501
165 hidden=not authenticated,
166 ),
167 ConfigEntry(
168 key=CONF_EMAIL,
169 type=ConfigEntryType.STRING,
170 label="E-Mail",
171 required=False,
172 description="E-Mail address of ARD account.",
173 hidden=authenticated,
174 value=values.get(CONF_EMAIL),
175 ),
176 ConfigEntry(
177 key=CONF_PASSWORD,
178 type=ConfigEntryType.SECURE_STRING,
179 label="Password",
180 required=False,
181 description="Password of ARD account.",
182 hidden=authenticated,
183 value=values.get(CONF_PASSWORD),
184 ),
185 ConfigEntry(
186 key=CONF_MAX_BITRATE,
187 type=ConfigEntryType.INTEGER,
188 label="Maximum bitrate for streams (0 for unlimited)",
189 required=False,
190 description="Maximum bitrate for streams. Use 0 for unlimited",
191 default_value=0,
192 value=values.get(CONF_MAX_BITRATE),
193 ),
194 ConfigEntry(
195 key=CONF_PODCAST_FINISHED,
196 type=ConfigEntryType.INTEGER,
197 label="Percentage required before podcast episode is marked as fully played",
198 required=False,
199 description="This setting defines how much of a podcast must be listened to before an "
200 "episode is marked as fully played",
201 default_value=95,
202 value=values.get(CONF_PODCAST_FINISHED),
203 ),
204 ConfigEntry(
205 key=CONF_TOKEN_BEARER,
206 type=ConfigEntryType.SECURE_STRING,
207 label="token",
208 hidden=True,
209 required=False,
210 value=values.get(CONF_TOKEN_BEARER),
211 ),
212 ConfigEntry(
213 key=CONF_USERID,
214 type=ConfigEntryType.SECURE_STRING,
215 label="uid",
216 hidden=True,
217 required=False,
218 value=values.get(CONF_USERID),
219 ),
220 ConfigEntry(
221 key=CONF_EXPIRY_TIME,
222 type=ConfigEntryType.SECURE_STRING,
223 label="token_expiry",
224 hidden=True,
225 required=False,
226 default_value=0,
227 value=values.get(CONF_EXPIRY_TIME),
228 ),
229 ConfigEntry(
230 key=CONF_DISPLAY_NAME,
231 type=ConfigEntryType.STRING,
232 label="username",
233 hidden=True,
234 required=False,
235 value=values.get(CONF_DISPLAY_NAME),
236 ),
237 )
238
239
240class ARDAudiothek(MusicProvider):
241 """ARD Audiothek Music provider."""
242
243 async def get_client(self) -> Client:
244 """Wrap the client creation procedure to recreate client.
245
246 This happens when the token is expired or user credentials are updated.
247 """
248 _email = self.config.get_value(CONF_EMAIL)
249 _password = self.config.get_value(CONF_PASSWORD)
250 self.token = self.config.get_value(CONF_TOKEN_BEARER)
251 self.user_id = self.config.get_value(CONF_USERID)
252 self.token_expire = datetime.fromtimestamp(
253 float(str(self.config.get_value(CONF_EXPIRY_TIME)))
254 )
255
256 self.max_bitrate = int(float(str(self.config.get_value(CONF_MAX_BITRATE))))
257
258 if (
259 _email is not None
260 and _password is not None
261 and (self.token is None or self.user_id is None or self.token_expire < datetime.now())
262 ):
263 self.token, self.user_id, _display_name = await _login(
264 self.mass.http_session, str(_email), str(_password)
265 )
266 self._update_config_value(CONF_TOKEN_BEARER, self.token, encrypted=True)
267 self._update_config_value(CONF_USERID, self.user_id, encrypted=True)
268 self._update_config_value(CONF_DISPLAY_NAME, _display_name)
269 self._update_config_value(
270 CONF_EXPIRY_TIME, str((datetime.now() + timedelta(hours=1)).timestamp())
271 )
272 self._client_initialized = False
273
274 if not self._client_initialized:
275 headers = None
276 if self.token:
277 headers = {"Authorization": f"Bearer {self.token}"}
278
279 self._client = Client(
280 transport=_create_aiohttptransport(headers),
281 fetch_schema_from_transport=True,
282 )
283 self._client_initialized = True
284
285 return self._client
286
287 async def handle_async_init(self) -> None:
288 """Pass config values to client and initialize."""
289 self._client_initialized = False
290 await self.get_client()
291
292 async def _update_progress(self) -> None:
293 if not self.user_id:
294 self.remote_progress = {}
295 return
296
297 async with await self.get_client() as session:
298 get_history_query.variable_values = {"loginId": self.user_id}
299 result = (await session.execute(get_history_query))["allEndUsers"]["nodes"][0][
300 "history"
301 ]["nodes"]
302
303 new_progress = {} # type: dict[str, tuple[bool, float]]
304 time_limit = int(str(self.config.get_value(CONF_PODCAST_FINISHED)))
305 for x in result:
306 core_id = x["item"]["coreId"]
307 if core_id is None:
308 continue
309 duration = x["item"]["duration"]
310 if duration is None:
311 continue
312 progress = x["progress"]
313 time_limit_reached = (progress / duration) * 100 > time_limit
314 new_progress[core_id] = (time_limit_reached, progress)
315 self.remote_progress = new_progress
316
317 def _get_progress(self, episode_id: str) -> tuple[bool, int]:
318 if episode_id in self.remote_progress:
319 return self.remote_progress[episode_id][0], int(
320 self.remote_progress[episode_id][1] * 1000
321 )
322 return False, 0
323
324 async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
325 """Return: finished, position_ms."""
326 assert media_type == MediaType.PODCAST_EPISODE
327 await self._update_progress()
328
329 return self._get_progress(item_id)
330
331 async def on_played(
332 self,
333 media_type: MediaType,
334 prov_item_id: str,
335 fully_played: bool,
336 position: int,
337 media_item: MediaItemType,
338 is_playing: bool = False,
339 ) -> None:
340 """Update progress."""
341 if not self.user_id:
342 return
343 if media_item is None or not isinstance(media_item, PodcastEpisode):
344 return
345 if media_type != MediaType.PODCAST_EPISODE:
346 return
347 async with await self.get_client() as session:
348 update_history_entry.variable_values = {"itemId": prov_item_id, "progress": position}
349 await session.execute(
350 update_history_entry,
351 )
352
353 @property
354 def is_streaming_provider(self) -> bool:
355 """Search and lookup always search remote."""
356 return True
357
358 @use_cache(3600 * 24 * 7) # cache for 7 days
359 async def search(
360 self,
361 search_query: str,
362 media_types: list[MediaType],
363 limit: int = 5,
364 ) -> SearchResults:
365 """Perform search on musicprovider.
366
367 :param search_query: Search query.
368 :param media_types: A list of media_types to include.
369 :param limit: Number of items to return in the search (per type).
370 """
371 podcasts = []
372 radios = []
373
374 if MediaType.PODCAST in media_types:
375 async with await self.get_client() as session:
376 search_shows_query.variable_values = {"query": search_query, "limit": limit}
377 search_shows = (await session.execute(search_shows_query))["search"]["shows"][
378 "nodes"
379 ]
380
381 for element in search_shows:
382 podcasts += [
383 _parse_podcast(
384 self.domain,
385 self.instance_id,
386 element,
387 element["coreId"],
388 )
389 ]
390
391 if MediaType.RADIO in media_types:
392 async with await self.get_client() as session:
393 search_radios_query.variable_values = {
394 "filter": {"title": {"includesInsensitive": search_query}},
395 "first": limit,
396 }
397 search_radios = (await session.execute(search_radios_query))[
398 "permanentLivestreams"
399 ]["nodes"]
400
401 for element in search_radios:
402 radios += [
403 _parse_radio(
404 self.domain,
405 self.instance_id,
406 element,
407 element["coreId"],
408 )
409 ]
410
411 return SearchResults(podcasts=podcasts, radio=radios)
412
413 @use_cache(3600 * 24 * 7) # cache for 7 days
414 async def get_radio(self, prov_radio_id: str) -> Radio:
415 """Get full radio details by id."""
416 # Get full details of a single Radio station.
417 # Mandatory only if you reported LIBRARY_RADIOS in the supported_features.
418 async with await self.get_client() as session:
419 livestream_query.variable_values = {"coreId": prov_radio_id}
420 rad = (await session.execute(livestream_query))["permanentLivestreamByCoreId"]
421 if not rad:
422 raise MediaNotFoundError("Radio not found.")
423 return _parse_radio(
424 self.domain,
425 self.instance_id,
426 rad,
427 prov_radio_id,
428 )
429
430 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
431 """Retrieve library/subscribed podcasts from the provider.
432
433 Minified podcast information is enough.
434 """
435 if not self.user_id:
436 return
437 async with await self.get_client() as session:
438 get_subscriptions_query.variable_values = {"loginId": self.user_id}
439 result = (await session.execute(get_subscriptions_query))["allEndUsers"]["nodes"][0][
440 "subscriptions"
441 ]["programSets"]["nodes"]
442 for show in result:
443 yield await self.get_podcast(show["subscribedProgramSet"]["coreId"])
444
445 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
446 """Browse through the ARD Audiothek.
447
448 This supports browsing through Podcasts and Radio stations.
449 :param path: The path to browse, (e.g. provider_id://artists).
450 """
451 part_parts = path.split("://")[1].split("/")
452 organization = part_parts[0] if part_parts else ""
453 provider = part_parts[1] if len(part_parts) > 1 else ""
454 radio_station = part_parts[2] if len(part_parts) > 2 else ""
455
456 if not organization:
457 return await self.get_organizations(path)
458
459 if not provider:
460 # list radios for specific organization
461 return await self.get_publication_services(path, organization)
462
463 if not radio_station:
464 return await self.get_publications_list(provider)
465
466 return []
467
468 @use_cache(3600 * 24 * 7) # cache for 7 days
469 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
470 """Get podcast."""
471 async with await self.get_client() as session:
472 show_query.variable_values = {"showId": prov_podcast_id}
473 result = (await session.execute(show_query))["show"]
474 if not result:
475 raise MediaNotFoundError("Podcast not found.")
476
477 return _parse_podcast(
478 self.domain,
479 self.instance_id,
480 result,
481 prov_podcast_id,
482 )
483
484 async def get_podcast_episodes(
485 self, prov_podcast_id: str
486 ) -> AsyncGenerator[PodcastEpisode, None]:
487 """Get podcast episodes."""
488 await self._update_progress()
489 depublished_filter = {"isPublished": {"equalTo": True}}
490 async with await self.get_client() as session:
491 show_length_query.variable_values = {
492 "showId": prov_podcast_id,
493 "filter": depublished_filter,
494 }
495 length = await session.execute(show_length_query)
496 length = length["show"]["items"]["totalCount"]
497 step_size = 128
498 for offset in range(0, length, step_size):
499 show_query.variable_values = {
500 "showId": prov_podcast_id,
501 "first": step_size,
502 "offset": offset,
503 "filter": depublished_filter,
504 }
505 result = (await session.execute(show_query))["show"]
506 for idx, episode in enumerate(result["items"]["nodes"]):
507 if len(episode["audioList"]) == 0:
508 continue
509 if episode["status"] == "DEPUBLISHED":
510 continue
511 episode_id = episode["coreId"]
512
513 progress = self._get_progress(episode_id)
514 yield _parse_podcast_episode(
515 self.domain,
516 self.instance_id,
517 episode,
518 episode_id,
519 result["title"],
520 offset + idx,
521 progress,
522 )
523
524 @use_cache(3600 * 24) # cache for 24 hours
525 async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
526 """Get single podcast episode."""
527 await self._update_progress()
528 async with await self.get_client() as session:
529 show_episode_query.variable_values = {"coreId": prov_episode_id}
530 result = (await session.execute(show_episode_query))["itemByCoreId"]
531 if not result:
532 raise MediaNotFoundError("Podcast episode not found")
533 progress = self._get_progress(prov_episode_id)
534 return _parse_podcast_episode(
535 self.domain,
536 self.instance_id,
537 result,
538 result["showId"],
539 result["show"]["title"],
540 result["rowId"],
541 progress,
542 )
543
544 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
545 """Get streamdetails for a radio station."""
546 async with await self.get_client() as session:
547 if media_type == MediaType.RADIO:
548 livestream_query.variable_values = {"coreId": item_id}
549 result = (await session.execute(livestream_query))["permanentLivestreamByCoreId"]
550 seek = False
551 elif media_type == MediaType.PODCAST_EPISODE:
552 show_episode_query.variable_values = {"coreId": item_id}
553 result = (await session.execute(show_episode_query))["itemByCoreId"]
554 seek = True
555
556 streams = result["audioList"]
557 if len(streams) == 0:
558 raise MediaNotFoundError("No stream available.")
559
560 def filter_func(val: dict[str, Any]) -> bool:
561 if self.max_bitrate == 0:
562 return True
563 return int(val["audioBitrate"]) < self.max_bitrate
564
565 filtered_streams = list(filter(filter_func, streams))
566 if len(filtered_streams) == 0:
567 raise UnplayableMediaError("No stream exceeding the minimum bitrate available.")
568 selected_stream = max(filtered_streams, key=lambda x: x["audioBitrate"])
569
570 return StreamDetails(
571 provider=self.domain,
572 item_id=item_id,
573 audio_format=AudioFormat(
574 content_type=ContentType.try_parse(selected_stream["audioCodec"]),
575 ),
576 media_type=media_type,
577 stream_type=StreamType.HTTP,
578 path=fix_url(selected_stream["href"]),
579 can_seek=seek,
580 allow_seek=seek,
581 )
582
583 @use_cache(3600 * 24 * 7) # cache for 7 days
584 async def get_organizations(self, path: str) -> list[BrowseFolder]:
585 """Create a list of all available organizations."""
586 async with await self.get_client() as session:
587 result = (await session.execute(organizations_query))["organizations"]["nodes"]
588 organizations = []
589
590 for org in result:
591 if all(
592 b["coreId"] is None for b in org["publicationServicesByOrganizationName"]["nodes"]
593 ):
594 # No available station
595 continue
596 image = None
597 for pub in org["publicationServicesByOrganizationName"]["nodes"]:
598 pub_title = pub["title"].lower()
599 org_name = org["name"].lower()
600 org_title = org["title"].lower()
601 if pub_title in (org_name, org_title) or pub_title.replace(" ", "") == org_name:
602 image = create_media_image(self.domain, pub["imagesList"])
603 break
604 organizations += [
605 BrowseFolder(
606 item_id=org["coreId"],
607 provider=self.domain,
608 path=path + org["coreId"],
609 image=image,
610 name=org["title"],
611 )
612 ]
613
614 return organizations
615
616 @use_cache(3600 * 24 * 7) # cache for 7 days
617 async def get_publication_services(self, path: str, core_id: str) -> list[BrowseFolder]:
618 """Create a list of publications for a given organization."""
619 async with await self.get_client() as session:
620 publication_services_query.variable_values = {"coreId": core_id}
621 result = (await session.execute(publication_services_query))["organizationByCoreId"][
622 "publicationServicesByOrganizationName"
623 ]["nodes"]
624 publications = []
625
626 for pub in result:
627 if not pub["coreId"]:
628 continue
629 publications += [
630 BrowseFolder(
631 item_id=pub["coreId"],
632 provider=self.domain,
633 path=path + "/" + pub["coreId"],
634 image=create_media_image(self.domain, pub["imagesList"]),
635 name=pub["title"],
636 )
637 ]
638
639 return publications
640
641 @use_cache(3600 * 24 * 7) # cache for 7 days
642 async def get_publications_list(self, core_id: str) -> list[Radio | Podcast]:
643 """Create list of available radio stations and shows for a publication service."""
644 async with await self.get_client() as session:
645 publications_list_query.variable_values = {"coreId": core_id}
646 result = (await session.execute(publications_list_query))["publicationServiceByCoreId"]
647
648 publications = [] # type: list[Radio | Podcast]
649
650 if not result:
651 raise MediaNotFoundError("Publication service not found.")
652
653 for rad in result["permanentLivestreams"]["nodes"]:
654 if not rad["coreId"]:
655 continue
656
657 radio = _parse_radio(self.domain, self.instance_id, rad, rad["coreId"])
658
659 publications += [radio]
660
661 for pod in result["shows"]["nodes"]:
662 if not pod["coreId"]:
663 continue
664
665 podcast = _parse_podcast(
666 self.domain,
667 self.instance_id,
668 pod,
669 pod["coreId"],
670 )
671 publications += [podcast]
672
673 return publications
674
675
676def _parse_social_media(
677 homepage_url: str | None, social_media_accounts: list[dict[str, None | str]]
678) -> set[MediaItemLink]:
679 return_set = set()
680 if homepage_url:
681 return_set.add(MediaItemLink(type=LinkType.WEBSITE, url=homepage_url))
682 for entry in social_media_accounts:
683 if entry["url"]:
684 link_type = None
685 match entry["service"]:
686 case "FACEBOOK":
687 link_type = LinkType.FACEBOOK
688 case "INSTAGRAM":
689 link_type = LinkType.INSTAGRAM
690 case "TIKTOK":
691 link_type = LinkType.TIKTOK
692 if link_type:
693 return_set.add(MediaItemLink(type=link_type, url=entry["url"]))
694 return return_set
695
696
697def _parse_podcast(
698 domain: str,
699 instance_id: str,
700 podcast_query: dict[str, Any],
701 podcast_id: str,
702) -> Podcast:
703 podcast = Podcast(
704 name=podcast_query["title"],
705 item_id=podcast_id,
706 publisher=podcast_query["publicationService"]["title"],
707 provider=instance_id,
708 provider_mappings={
709 ProviderMapping(
710 item_id=podcast_id,
711 provider_domain=domain,
712 provider_instance=instance_id,
713 )
714 },
715 total_episodes=podcast_query["items"]["totalCount"],
716 )
717
718 podcast.metadata.links = _parse_social_media(
719 podcast_query["publicationService"]["homepageUrl"],
720 podcast_query["publicationService"]["socialMediaAccounts"],
721 )
722
723 podcast.metadata.description = podcast_query["synopsis"]
724 podcast.metadata.genres = {r["title"] for r in podcast_query["editorialCategoriesList"]}
725
726 podcast.metadata.add_image(create_media_image(domain, podcast_query["imagesList"]))
727
728 return podcast
729
730
731def _parse_radio(
732 domain: str,
733 instance_id: str,
734 radio_query: dict[str, Any],
735 radio_id: str,
736) -> Radio:
737 radio = Radio(
738 name=radio_query["title"],
739 item_id=radio_id,
740 provider=domain,
741 provider_mappings={
742 ProviderMapping(
743 item_id=radio_id,
744 provider_domain=domain,
745 provider_instance=instance_id,
746 )
747 },
748 )
749
750 radio.metadata.links = _parse_social_media(
751 radio_query["publicationService"]["homepageUrl"],
752 radio_query["publicationService"]["socialMediaAccounts"],
753 )
754
755 radio.metadata.description = radio_query["publicationService"]["synopsis"]
756 radio.metadata.genres = {radio_query["publicationService"]["genre"]}
757
758 radio.metadata.add_image(create_media_image(domain, radio_query["imagesList"]))
759
760 return radio
761
762
763def _parse_podcast_episode(
764 domain: str,
765 instance_id: str,
766 episode: dict[str, Any],
767 podcast_id: str,
768 podcast_title: str,
769 idx: int,
770 progress: tuple[bool, int],
771) -> PodcastEpisode:
772 podcast_episode = PodcastEpisode(
773 name=episode["title"],
774 duration=episode["duration"],
775 item_id=episode["coreId"],
776 provider=instance_id,
777 podcast=ItemMapping(
778 item_id=podcast_id,
779 provider=instance_id,
780 name=podcast_title,
781 media_type=MediaType.PODCAST,
782 ),
783 provider_mappings={
784 ProviderMapping(
785 item_id=episode["coreId"],
786 provider_domain=domain,
787 provider_instance=instance_id,
788 )
789 },
790 position=idx,
791 fully_played=progress[0],
792 resume_position_ms=progress[1],
793 )
794
795 podcast_episode.metadata.add_image(create_media_image(domain, episode["imagesList"]))
796 podcast_episode.metadata.description = episode["summary"]
797 return podcast_episode
798
799
800def create_media_image(domain: str, image_list: list[dict[str, str]]) -> MediaItemImage:
801 """Extract the image for hopefully all possible cases."""
802 image_url = ""
803 selected_img = image_list[0] if image_list else None
804 for img in image_list:
805 if img["aspectRatio"] == "1x1":
806 selected_img = img
807 break
808 if selected_img:
809 image_url = selected_img["url"].replace("{width}", str(selected_img["width"]))
810 return MediaItemImage(
811 type=ImageType.THUMB,
812 path=image_url,
813 provider=domain,
814 remotely_accessible=True,
815 )
816
817
818def fix_url(url: str) -> str:
819 """Fix some of the stream urls, which do not provide a protocol."""
820 if url.startswith("//"):
821 url = "https:" + url
822 return url
823