/
/
/
1"""gPodder provider for Music Assistant.
2
3Tested against opodsync, https://github.com/kd2org/opodsync
4and nextcloud-gpodder, https://github.com/thrillfall/nextcloud-gpodder
5gpodder.net is not supported due to responsiveness/ frequent downtimes of domain.
6
7Note:
8 - it can happen, that we have the guid and use that for identification, but the sync state
9 provider, eg. opodsync might use only the stream url. So always make sure, to compare both
10 when relying on an external service
11 - The service calls have a timestamp (int, unix epoch s), which give the changes since then.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import time
18from collections.abc import AsyncGenerator
19from typing import TYPE_CHECKING, Any
20
21from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
22from music_assistant_models.enums import (
23 ConfigEntryType,
24 ContentType,
25 EventType,
26 MediaType,
27 ProviderFeature,
28 StreamType,
29)
30from music_assistant_models.errors import (
31 LoginFailed,
32 MediaNotFoundError,
33 ResourceTemporarilyUnavailable,
34)
35from music_assistant_models.media_items import AudioFormat, MediaItemType, Podcast, PodcastEpisode
36from music_assistant_models.streamdetails import StreamDetails
37
38from music_assistant.helpers.podcast_parsers import (
39 get_podcastparser_dict,
40 get_stream_url_and_guid_from_episode,
41 parse_podcast,
42 parse_podcast_episode,
43)
44from music_assistant.models.music_provider import MusicProvider
45
46from .client import EpisodeActionDelete, EpisodeActionNew, EpisodeActionPlay, GPodderClient
47
48if TYPE_CHECKING:
49 from music_assistant_models.provider import ProviderManifest
50
51 from music_assistant.mass import MusicAssistant
52 from music_assistant.models import ProviderInstanceType
53
54# Config for "classic" gpodder api
55CONF_URL = "url"
56CONF_USERNAME = "username"
57CONF_PASSWORD = "password"
58CONF_DEVICE_ID = "device_id"
59CONF_USING_GPODDER = "using_gpodder" # hidden, bool, true if not nextcloud used
60
61# Config for nextcloud
62CONF_ACTION_AUTH_NC = "authenticate_nc"
63CONF_TOKEN_NC = "token"
64CONF_URL_NC = "url_nc"
65
66# General config
67CONF_VERIFY_SSL = "verify_ssl"
68CONF_MAX_NUM_EPISODES = "max_num_episodes"
69
70
71CACHE_CATEGORY_PODCAST_ITEMS = 0 # the individual parsed podcast (dict from podcastparser)
72CACHE_CATEGORY_OTHER = 1
73CACHE_KEY_TIMESTAMP = (
74 "timestamp" # tuple of two ints, timestamp_subscriptions and timestamp_actions
75)
76CACHE_KEY_FEEDS = "feeds" # list[str] : all available rss feed urls
77
78SUPPORTED_FEATURES = {
79 ProviderFeature.LIBRARY_PODCASTS,
80 ProviderFeature.BROWSE,
81}
82
83
84async def setup(
85 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
86) -> ProviderInstanceType:
87 """Initialize provider(instance) with given configuration."""
88 return GPodder(mass, manifest, config, SUPPORTED_FEATURES)
89
90
91async def get_config_entries(
92 mass: MusicAssistant,
93 instance_id: str | None = None,
94 action: str | None = None,
95 values: dict[str, ConfigValueType] | None = None,
96) -> tuple[ConfigEntry, ...]:
97 """
98 Return Config entries to setup this provider.
99
100 instance_id: id of an existing provider instance (None if new instance setup).
101 action: [optional] action key called from config entries UI.
102 values: the (intermediate) raw values for config entries sent with the action.
103 """
104 # ruff: noqa: ARG001
105 if values is None:
106 values = {}
107
108 verify_ssl = True
109 if _verify_ssl := values.get(CONF_VERIFY_SSL):
110 verify_ssl = bool(_verify_ssl)
111
112 if action == CONF_ACTION_AUTH_NC:
113 session = mass.http_session
114 response = await session.post(
115 str(values[CONF_URL_NC]).rstrip("/") + "/index.php/login/v2",
116 headers={"User-Agent": "Music Assistant"},
117 ssl=verify_ssl,
118 )
119 data = await response.json()
120 poll_endpoint = data["poll"]["endpoint"]
121 poll_token = data["poll"]["token"]
122 login_url = data["login"]
123 session_id = str(values["session_id"])
124 mass.signal_event(EventType.AUTH_SESSION, session_id, login_url)
125 while True:
126 response = await session.post(poll_endpoint, data={"token": poll_token}, ssl=verify_ssl)
127 if response.status not in [200, 404]:
128 raise LoginFailed("The specified url seems not to belong to a nextcloud instance.")
129 if response.status == 200:
130 data = await response.json()
131 values[CONF_TOKEN_NC] = data["appPassword"]
132 break
133 await asyncio.sleep(1)
134
135 authenticated_nc = True
136 if values.get(CONF_TOKEN_NC) is None:
137 authenticated_nc = False
138
139 using_gpodder = bool(values.get(CONF_USING_GPODDER, False))
140
141 return (
142 ConfigEntry(
143 key="label_text",
144 type=ConfigEntryType.LABEL,
145 label="Authentication did succeed! Please press save to continue.",
146 hidden=not authenticated_nc,
147 ),
148 ConfigEntry(
149 key="label_gpodder",
150 type=ConfigEntryType.LABEL,
151 label="Authentication with gPodder compatible web service, e.g. opodsync:",
152 hidden=authenticated_nc,
153 ),
154 ConfigEntry(
155 key=CONF_URL,
156 type=ConfigEntryType.STRING,
157 label="gPodder Service URL",
158 required=False,
159 description="URL of gPodder instance.",
160 value=values.get(CONF_URL),
161 hidden=authenticated_nc,
162 ),
163 ConfigEntry(
164 key=CONF_USERNAME,
165 type=ConfigEntryType.STRING,
166 label="Username",
167 required=False,
168 description="Username of gPodder instance.",
169 hidden=authenticated_nc,
170 value=values.get(CONF_USERNAME),
171 ),
172 ConfigEntry(
173 key=CONF_PASSWORD,
174 type=ConfigEntryType.SECURE_STRING,
175 label="Password",
176 required=False,
177 description="Password for gPodder instance.",
178 hidden=authenticated_nc,
179 value=values.get(CONF_PASSWORD),
180 ),
181 ConfigEntry(
182 key=CONF_DEVICE_ID,
183 type=ConfigEntryType.STRING,
184 label="Device ID",
185 required=False,
186 description="Device ID of user.",
187 hidden=authenticated_nc,
188 value=values.get(CONF_DEVICE_ID),
189 ),
190 ConfigEntry(
191 key="label_nextcloud",
192 type=ConfigEntryType.LABEL,
193 label="Authentication with Nextcloud with gPodder Sync (nextcloud-gpodder) installed:",
194 hidden=authenticated_nc or using_gpodder,
195 ),
196 ConfigEntry(
197 key=CONF_URL_NC,
198 type=ConfigEntryType.STRING,
199 label="Nextcloud URL",
200 required=False,
201 description="URL of Nextcloud instance.",
202 value=values.get(CONF_URL_NC),
203 hidden=using_gpodder,
204 ),
205 ConfigEntry(
206 key=CONF_ACTION_AUTH_NC,
207 type=ConfigEntryType.ACTION,
208 label="(Re)Authenticate with Nextcloud",
209 description="This button will redirect you to your Nextcloud instance to authenticate.",
210 action=CONF_ACTION_AUTH_NC,
211 required=False,
212 hidden=using_gpodder,
213 ),
214 ConfigEntry(
215 key="label_general",
216 type=ConfigEntryType.LABEL,
217 label="General config:",
218 ),
219 ConfigEntry(
220 key=CONF_MAX_NUM_EPISODES,
221 type=ConfigEntryType.INTEGER,
222 label="Maximum number of episodes (0 for unlimited)",
223 required=False,
224 description="Maximum number of episodes to sync per feed. Use 0 for unlimited",
225 default_value=0,
226 value=values.get(CONF_MAX_NUM_EPISODES),
227 ),
228 ConfigEntry(
229 key=CONF_VERIFY_SSL,
230 type=ConfigEntryType.BOOLEAN,
231 label="Verify SSL",
232 required=False,
233 description="Whether or not to verify the certificate of SSL/TLS connections.",
234 advanced=True,
235 default_value=True,
236 value=values.get(CONF_VERIFY_SSL),
237 ),
238 ConfigEntry(
239 key=CONF_TOKEN_NC,
240 type=ConfigEntryType.SECURE_STRING,
241 label="token",
242 hidden=True,
243 required=False,
244 value=values.get(CONF_TOKEN_NC),
245 ),
246 ConfigEntry(
247 key=CONF_USING_GPODDER,
248 type=ConfigEntryType.BOOLEAN,
249 label="using_gpodder",
250 hidden=True,
251 required=False,
252 value=values.get(CONF_USING_GPODDER),
253 ),
254 )
255
256
257class GPodder(MusicProvider):
258 """gPodder MusicProvider."""
259
260 async def handle_async_init(self) -> None:
261 """Pass config values to client and initialize."""
262 base_url = str(self.config.get_value(CONF_URL))
263 _username = self.config.get_value(CONF_USERNAME)
264 _password = self.config.get_value(CONF_PASSWORD)
265 _device_id = self.config.get_value(CONF_DEVICE_ID)
266 nc_url = str(self.config.get_value(CONF_URL_NC))
267 nc_token = self.config.get_value(CONF_TOKEN_NC)
268 verify_ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
269
270 self.max_episodes = int(float(str(self.config.get_value(CONF_MAX_NUM_EPISODES))))
271
272 self._client = GPodderClient(
273 session=self.mass.http_session, logger=self.logger, verify_ssl=verify_ssl
274 )
275
276 if nc_token is not None:
277 assert nc_url is not None
278 self._client.init_nc(base_url=nc_url, nc_token=str(nc_token))
279 else:
280 self._update_config_value(CONF_USING_GPODDER, True)
281 if _username is None or _password is None or _device_id is None:
282 raise LoginFailed("Must provide username, password and device_id.")
283 username = str(_username)
284 password = str(_password)
285 device_id = str(_device_id)
286
287 if base_url.rstrip("/") == "https://gpodder.net":
288 raise LoginFailed("Do not use gpodder.net. See docs for explanation.")
289 try:
290 await self._client.init_gpodder(
291 username=username, password=password, base_url=base_url, device=device_id
292 )
293 except RuntimeError as exc:
294 raise LoginFailed("Login failed.") from exc
295
296 timestamps = await self.mass.cache.get(
297 key=CACHE_KEY_TIMESTAMP,
298 provider=self.instance_id,
299 category=CACHE_CATEGORY_OTHER,
300 default=None,
301 )
302 if timestamps is None:
303 self.timestamp_subscriptions: int = 0
304 self.timestamp_actions: int = 0
305 else:
306 self.timestamp_subscriptions, self.timestamp_actions = timestamps
307
308 self.logger.debug(
309 "Our timestamps are (subscriptions, actions) (%s, %s)",
310 self.timestamp_subscriptions,
311 self.timestamp_actions,
312 )
313
314 feeds = await self.mass.cache.get(
315 key=CACHE_KEY_FEEDS,
316 provider=self.instance_id,
317 category=CACHE_CATEGORY_OTHER,
318 default=None,
319 )
320 if feeds is None:
321 self.feeds: set[str] = set()
322 else:
323 self.feeds = set(feeds) # feeds is a list here
324
325 # we are syncing the playlog, but not event based. A simple check in on_played,
326 # should be sufficient
327 self.progress_guard_timestamp = 0.0
328
329 @property
330 def is_streaming_provider(self) -> bool:
331 """Return True if the provider is a streaming provider."""
332 # For streaming providers return True here but for local file based providers return False.
333 # While the streams are remote, the user controls what is added.
334 return False
335
336 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
337 """Retrieve library/subscribed podcasts from the provider."""
338 try:
339 subscriptions = await self._client.get_subscriptions()
340 except RuntimeError:
341 raise ResourceTemporarilyUnavailable(backoff_time=30)
342 if subscriptions is None:
343 return
344
345 for feed_url in subscriptions.add:
346 self.feeds.add(feed_url)
347 for feed_url in subscriptions.remove:
348 try:
349 self.feeds.remove(feed_url)
350 except KeyError:
351 # a podcast might have been added and removed in our absence...
352 continue
353
354 episode_actions, timestamp_action = await self._client.get_episode_actions()
355 for feed_url in self.feeds:
356 self.logger.debug("Adding podcast with feed %s to library", feed_url)
357 # parse podcast
358 try:
359 parsed_podcast = await get_podcastparser_dict(
360 session=self.mass.http_session,
361 feed_url=feed_url,
362 max_episodes=self.max_episodes,
363 )
364 except MediaNotFoundError:
365 self.logger.warning(f"Was unable to obtain podcast with feed {feed_url}")
366 continue
367 await self._cache_set_podcast(feed_url, parsed_podcast)
368
369 # playlog
370 # be safe, if there should be multiple episodeactions. client already sorts
371 # progresses in descending order.
372 _already_processed = set()
373 _episode_actions = [x for x in episode_actions if x.podcast == feed_url]
374 for _action in _episode_actions:
375 if _action.episode not in _already_processed:
376 _already_processed.add(_action.episode)
377 # we do not have to add the progress, these would make calls twice,
378 # and we only use the object to propagate to playlog
379 self.progress_guard_timestamp = time.time()
380 _episode_ids: list[str] = []
381 if _action.guid is not None:
382 _episode_ids.append(f"{feed_url} {_action.guid}")
383 _episode_ids.append(f"{feed_url} {_action.episode}")
384 mass_episode: PodcastEpisode | None = None
385 for _episode_id in _episode_ids:
386 try:
387 mass_episode = await self.get_podcast_episode(
388 _episode_id, add_progress=False
389 )
390 break
391 except MediaNotFoundError:
392 continue
393 if mass_episode is None:
394 self.logger.debug(
395 f"Was unable to use progress for episode {_action.episode}."
396 )
397 continue
398 match _action:
399 case EpisodeActionNew():
400 await self.mass.music.mark_item_unplayed(mass_episode)
401 case EpisodeActionPlay():
402 await self.mass.music.mark_item_played(
403 mass_episode,
404 fully_played=_action.position >= _action.total,
405 seconds_played=_action.position,
406 )
407
408 # cache
409 yield parse_podcast(
410 feed_url=feed_url,
411 parsed_feed=parsed_podcast,
412 instance_id=self.instance_id,
413 domain=self.domain,
414 )
415
416 self.timestamp_subscriptions = subscriptions.timestamp
417 if timestamp_action is not None:
418 self.timestamp_actions = timestamp_action
419 await self._cache_set_timestamps()
420 await self._cache_set_feeds()
421
422 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
423 """Get Podcast."""
424 parsed_podcast = await self._cache_get_podcast(prov_podcast_id)
425
426 return parse_podcast(
427 feed_url=prov_podcast_id,
428 parsed_feed=parsed_podcast,
429 instance_id=self.instance_id,
430 domain=self.domain,
431 )
432
433 async def get_podcast_episodes(
434 self, prov_podcast_id: str, add_progress: bool = True
435 ) -> AsyncGenerator[PodcastEpisode, None]:
436 """Get Podcast episodes. Add progress information."""
437 if add_progress:
438 episode_actions, timestamp = await self._client.get_episode_actions()
439 else:
440 episode_actions, timestamp = [], None
441
442 podcast = await self._cache_get_podcast(prov_podcast_id)
443 podcast_cover = podcast.get("cover_url")
444 parsed_episodes = podcast.get("episodes", [])
445
446 if timestamp is not None:
447 self.timestamp_actions = timestamp
448 await self._cache_set_timestamps()
449
450 for cnt, parsed_episode in enumerate(parsed_episodes):
451 mass_episode = parse_podcast_episode(
452 episode=parsed_episode,
453 prov_podcast_id=prov_podcast_id,
454 episode_cnt=cnt,
455 podcast_cover=podcast_cover,
456 domain=self.domain,
457 instance_id=self.instance_id,
458 )
459 if mass_episode is None:
460 # faulty episode
461 continue
462 try:
463 stream_url, guid = get_stream_url_and_guid_from_episode(episode=parsed_episode)
464 except ValueError:
465 # episode enclosure or stream url missing
466 continue
467
468 for action in episode_actions:
469 # we have to test both, as we are comparing to external input.
470 _test = [action.guid, action.episode]
471 if prov_podcast_id == action.podcast and (guid in _test or stream_url in _test):
472 self.progress_guard_timestamp = time.time()
473 if isinstance(action, EpisodeActionNew):
474 mass_episode.resume_position_ms = 0
475 mass_episode.fully_played = False
476
477 # propagate to playlog
478 await self.mass.music.mark_item_unplayed(
479 mass_episode,
480 )
481 elif isinstance(action, EpisodeActionPlay):
482 fully_played = action.position >= action.total
483 resume_position_s = action.position
484 mass_episode.resume_position_ms = resume_position_s * 1000
485 mass_episode.fully_played = fully_played
486
487 # propagate progress to playlog
488 await self.mass.music.mark_item_played(
489 mass_episode,
490 fully_played=fully_played,
491 seconds_played=resume_position_s,
492 )
493 elif isinstance(action, EpisodeActionDelete):
494 for mapping in mass_episode.provider_mappings:
495 mapping.available = False
496 break
497 yield mass_episode
498
499 async def get_podcast_episode(
500 self, prov_episode_id: str, add_progress: bool = True
501 ) -> PodcastEpisode:
502 """Get Podcast Episode. Add progress information."""
503 podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
504 async for mass_episode in self.get_podcast_episodes(podcast_id, add_progress=add_progress):
505 _, _guid_or_stream_url = mass_episode.item_id.split(" ")
506 # this is enough, as internal
507 if guid_or_stream_url == _guid_or_stream_url:
508 return mass_episode
509 raise MediaNotFoundError("Did not find episode.")
510
511 async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
512 """Return: finished, position_ms."""
513 assert media_type == MediaType.PODCAST_EPISODE
514 podcast_id, guid_or_stream_url = item_id.split(" ")
515 stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
516 try:
517 progresses, timestamp = await self._client.get_episode_actions(
518 since=self.timestamp_actions
519 )
520 except RuntimeError:
521 self.logger.warning("Was unable to obtain progresses.")
522 raise NotImplementedError # fallback to internal position.
523 for action in progresses:
524 _test = [action.guid, action.episode]
525 # progress is external, compare guid and stream_url
526 if action.podcast == podcast_id and (
527 guid_or_stream_url in _test or stream_url in _test
528 ):
529 if timestamp is not None:
530 self.timestamp_actions = timestamp
531 await self._cache_set_timestamps()
532 if isinstance(action, EpisodeActionNew | EpisodeActionDelete):
533 # no progress, it might have been actively reset
534 # in case of delete, we start from start.
535 return False, 0
536 _progress = (action.position >= action.total, max(action.position * 1000, 0))
537 self.logger.debug("Found an updated external resume position.")
538 return action.position >= action.total, max(action.position * 1000, 0)
539 self.logger.debug("Did not find an updated resume position, falling back to stored.")
540 # If we did not find a resume position, nothing changed since our last timestamp
541 # we raise NotImplementedError, such that MA falls back to the already stored
542 # resume_position in its playlog.
543 raise NotImplementedError
544
545 async def on_played(
546 self,
547 media_type: MediaType,
548 prov_item_id: str,
549 fully_played: bool,
550 position: int,
551 media_item: MediaItemType,
552 is_playing: bool = False,
553 ) -> None:
554 """Update progress."""
555 if media_item is None or not isinstance(media_item, PodcastEpisode):
556 return
557 if media_type != MediaType.PODCAST_EPISODE:
558 return
559 if time.time() - self.progress_guard_timestamp <= 5:
560 return
561 podcast_id, guid_or_stream_url = prov_item_id.split(" ")
562 stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
563 assert stream_url is not None
564 duration = media_item.duration
565 try:
566 await self._client.update_progress(
567 podcast_id=podcast_id,
568 episode_id=stream_url,
569 guid=guid_or_stream_url,
570 position_s=position,
571 duration_s=duration,
572 )
573 self.logger.debug(f"Updated progress to {position / duration * 100:.2f}%")
574 except RuntimeError as exc:
575 self.logger.debug(exc)
576 self.logger.debug("Failed to update progress.")
577
578 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
579 """Get streamdetails for item."""
580 podcast_id, guid_or_stream_url = item_id.split(" ")
581 stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
582 if stream_url is None:
583 raise MediaNotFoundError
584 return StreamDetails(
585 provider=self.instance_id,
586 item_id=item_id,
587 audio_format=AudioFormat(
588 content_type=ContentType.try_parse(stream_url),
589 ),
590 media_type=MediaType.PODCAST_EPISODE,
591 stream_type=StreamType.HTTP,
592 path=stream_url,
593 can_seek=True,
594 allow_seek=True,
595 )
596
597 async def _get_episode_stream_url(self, podcast_id: str, guid_or_stream_url: str) -> str | None:
598 podcast = await self._cache_get_podcast(podcast_id)
599 episodes = podcast.get("episodes", [])
600 for cnt, episode in enumerate(episodes):
601 episode_enclosures = episode.get("enclosures", [])
602 if len(episode_enclosures) < 1:
603 raise MediaNotFoundError
604 stream_url: str | None = episode_enclosures[0].get("url", None)
605 guid = episode.get("guid")
606 if guid is not None and len(guid.split(" ")) == 1:
607 _guid_or_stream_url_compare = guid
608 else:
609 _guid_or_stream_url_compare = stream_url
610 if guid_or_stream_url == _guid_or_stream_url_compare:
611 return stream_url
612 return None
613
614 async def _cache_get_podcast(self, prov_podcast_id: str) -> dict[str, Any]:
615 parsed_podcast = await self.mass.cache.get(
616 key=prov_podcast_id,
617 provider=self.instance_id,
618 category=CACHE_CATEGORY_PODCAST_ITEMS,
619 default=None,
620 )
621 if parsed_podcast is None:
622 # raises MediaNotFoundError
623 parsed_podcast = await get_podcastparser_dict(
624 session=self.mass.http_session,
625 feed_url=prov_podcast_id,
626 max_episodes=self.max_episodes,
627 )
628 await self._cache_set_podcast(feed_url=prov_podcast_id, parsed_podcast=parsed_podcast)
629
630 # this is a dictionary from podcastparser
631 return parsed_podcast # type: ignore[no-any-return]
632
633 async def _cache_set_podcast(self, feed_url: str, parsed_podcast: dict[str, Any]) -> None:
634 await self.mass.cache.set(
635 key=feed_url,
636 provider=self.instance_id,
637 category=CACHE_CATEGORY_PODCAST_ITEMS,
638 data=parsed_podcast,
639 expiration=60 * 60 * 24, # 1 day
640 )
641
642 async def _cache_set_timestamps(self) -> None:
643 # seven days default
644 await self.mass.cache.set(
645 key=CACHE_KEY_TIMESTAMP,
646 provider=self.instance_id,
647 category=CACHE_CATEGORY_OTHER,
648 data=[self.timestamp_subscriptions, self.timestamp_actions],
649 )
650
651 async def _cache_set_feeds(self) -> None:
652 # seven days default
653 await self.mass.cache.set(
654 key=CACHE_KEY_FEEDS,
655 provider=self.instance_id,
656 category=CACHE_CATEGORY_OTHER,
657 data=self.feeds,
658 )
659