music-assistant-server

25.6 KBPY
__init__.py
25.6 KB659 lines • python
1"""gPodder provider for Music Assistant.
2
3Tested against opodsync, https://github.com/kd2org/opodsync
4and nextcloud-gpodder, https://github.com/thrillfall/nextcloud-gpodder
5gpodder.net is not supported due to responsiveness/ frequent downtimes of domain.
6
7Note:
8    - it can happen, that we have the guid and use that for identification, but the sync state
9      provider, eg. opodsync might use only the stream url. So always make sure, to compare both
10      when relying on an external service
11    - The service calls have a timestamp (int, unix epoch s), which give the changes since then.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import time
18from collections.abc import AsyncGenerator
19from typing import TYPE_CHECKING, Any
20
21from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
22from music_assistant_models.enums import (
23    ConfigEntryType,
24    ContentType,
25    EventType,
26    MediaType,
27    ProviderFeature,
28    StreamType,
29)
30from music_assistant_models.errors import (
31    LoginFailed,
32    MediaNotFoundError,
33    ResourceTemporarilyUnavailable,
34)
35from music_assistant_models.media_items import AudioFormat, MediaItemType, Podcast, PodcastEpisode
36from music_assistant_models.streamdetails import StreamDetails
37
38from music_assistant.helpers.podcast_parsers import (
39    get_podcastparser_dict,
40    get_stream_url_and_guid_from_episode,
41    parse_podcast,
42    parse_podcast_episode,
43)
44from music_assistant.models.music_provider import MusicProvider
45
46from .client import EpisodeActionDelete, EpisodeActionNew, EpisodeActionPlay, GPodderClient
47
48if TYPE_CHECKING:
49    from music_assistant_models.provider import ProviderManifest
50
51    from music_assistant.mass import MusicAssistant
52    from music_assistant.models import ProviderInstanceType
53
54# Config for "classic" gpodder api
55CONF_URL = "url"
56CONF_USERNAME = "username"
57CONF_PASSWORD = "password"
58CONF_DEVICE_ID = "device_id"
59CONF_USING_GPODDER = "using_gpodder"  # hidden, bool, true if not nextcloud used
60
61# Config for nextcloud
62CONF_ACTION_AUTH_NC = "authenticate_nc"
63CONF_TOKEN_NC = "token"
64CONF_URL_NC = "url_nc"
65
66# General config
67CONF_VERIFY_SSL = "verify_ssl"
68CONF_MAX_NUM_EPISODES = "max_num_episodes"
69
70
71CACHE_CATEGORY_PODCAST_ITEMS = 0  # the individual parsed podcast (dict from podcastparser)
72CACHE_CATEGORY_OTHER = 1
73CACHE_KEY_TIMESTAMP = (
74    "timestamp"  # tuple of two ints, timestamp_subscriptions and timestamp_actions
75)
76CACHE_KEY_FEEDS = "feeds"  # list[str] : all available rss feed urls
77
78SUPPORTED_FEATURES = {
79    ProviderFeature.LIBRARY_PODCASTS,
80    ProviderFeature.BROWSE,
81}
82
83
84async def setup(
85    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
86) -> ProviderInstanceType:
87    """Initialize provider(instance) with given configuration."""
88    return GPodder(mass, manifest, config, SUPPORTED_FEATURES)
89
90
91async def get_config_entries(
92    mass: MusicAssistant,
93    instance_id: str | None = None,
94    action: str | None = None,
95    values: dict[str, ConfigValueType] | None = None,
96) -> tuple[ConfigEntry, ...]:
97    """
98    Return Config entries to setup this provider.
99
100    instance_id: id of an existing provider instance (None if new instance setup).
101    action: [optional] action key called from config entries UI.
102    values: the (intermediate) raw values for config entries sent with the action.
103    """
104    # ruff: noqa: ARG001
105    if values is None:
106        values = {}
107
108    verify_ssl = True
109    if _verify_ssl := values.get(CONF_VERIFY_SSL):
110        verify_ssl = bool(_verify_ssl)
111
112    if action == CONF_ACTION_AUTH_NC:
113        session = mass.http_session
114        response = await session.post(
115            str(values[CONF_URL_NC]).rstrip("/") + "/index.php/login/v2",
116            headers={"User-Agent": "Music Assistant"},
117            ssl=verify_ssl,
118        )
119        data = await response.json()
120        poll_endpoint = data["poll"]["endpoint"]
121        poll_token = data["poll"]["token"]
122        login_url = data["login"]
123        session_id = str(values["session_id"])
124        mass.signal_event(EventType.AUTH_SESSION, session_id, login_url)
125        while True:
126            response = await session.post(poll_endpoint, data={"token": poll_token}, ssl=verify_ssl)
127            if response.status not in [200, 404]:
128                raise LoginFailed("The specified url seems not to belong to a nextcloud instance.")
129            if response.status == 200:
130                data = await response.json()
131                values[CONF_TOKEN_NC] = data["appPassword"]
132                break
133            await asyncio.sleep(1)
134
135    authenticated_nc = True
136    if values.get(CONF_TOKEN_NC) is None:
137        authenticated_nc = False
138
139    using_gpodder = bool(values.get(CONF_USING_GPODDER, False))
140
141    return (
142        ConfigEntry(
143            key="label_text",
144            type=ConfigEntryType.LABEL,
145            label="Authentication did succeed! Please press save to continue.",
146            hidden=not authenticated_nc,
147        ),
148        ConfigEntry(
149            key="label_gpodder",
150            type=ConfigEntryType.LABEL,
151            label="Authentication with gPodder compatible web service, e.g. opodsync:",
152            hidden=authenticated_nc,
153        ),
154        ConfigEntry(
155            key=CONF_URL,
156            type=ConfigEntryType.STRING,
157            label="gPodder Service URL",
158            required=False,
159            description="URL of gPodder instance.",
160            value=values.get(CONF_URL),
161            hidden=authenticated_nc,
162        ),
163        ConfigEntry(
164            key=CONF_USERNAME,
165            type=ConfigEntryType.STRING,
166            label="Username",
167            required=False,
168            description="Username of gPodder instance.",
169            hidden=authenticated_nc,
170            value=values.get(CONF_USERNAME),
171        ),
172        ConfigEntry(
173            key=CONF_PASSWORD,
174            type=ConfigEntryType.SECURE_STRING,
175            label="Password",
176            required=False,
177            description="Password for gPodder instance.",
178            hidden=authenticated_nc,
179            value=values.get(CONF_PASSWORD),
180        ),
181        ConfigEntry(
182            key=CONF_DEVICE_ID,
183            type=ConfigEntryType.STRING,
184            label="Device ID",
185            required=False,
186            description="Device ID of user.",
187            hidden=authenticated_nc,
188            value=values.get(CONF_DEVICE_ID),
189        ),
190        ConfigEntry(
191            key="label_nextcloud",
192            type=ConfigEntryType.LABEL,
193            label="Authentication with Nextcloud with gPodder Sync (nextcloud-gpodder) installed:",
194            hidden=authenticated_nc or using_gpodder,
195        ),
196        ConfigEntry(
197            key=CONF_URL_NC,
198            type=ConfigEntryType.STRING,
199            label="Nextcloud URL",
200            required=False,
201            description="URL of Nextcloud instance.",
202            value=values.get(CONF_URL_NC),
203            hidden=using_gpodder,
204        ),
205        ConfigEntry(
206            key=CONF_ACTION_AUTH_NC,
207            type=ConfigEntryType.ACTION,
208            label="(Re)Authenticate with Nextcloud",
209            description="This button will redirect you to your Nextcloud instance to authenticate.",
210            action=CONF_ACTION_AUTH_NC,
211            required=False,
212            hidden=using_gpodder,
213        ),
214        ConfigEntry(
215            key="label_general",
216            type=ConfigEntryType.LABEL,
217            label="General config:",
218        ),
219        ConfigEntry(
220            key=CONF_MAX_NUM_EPISODES,
221            type=ConfigEntryType.INTEGER,
222            label="Maximum number of episodes (0 for unlimited)",
223            required=False,
224            description="Maximum number of episodes to sync per feed. Use 0 for unlimited",
225            default_value=0,
226            value=values.get(CONF_MAX_NUM_EPISODES),
227        ),
228        ConfigEntry(
229            key=CONF_VERIFY_SSL,
230            type=ConfigEntryType.BOOLEAN,
231            label="Verify SSL",
232            required=False,
233            description="Whether or not to verify the certificate of SSL/TLS connections.",
234            advanced=True,
235            default_value=True,
236            value=values.get(CONF_VERIFY_SSL),
237        ),
238        ConfigEntry(
239            key=CONF_TOKEN_NC,
240            type=ConfigEntryType.SECURE_STRING,
241            label="token",
242            hidden=True,
243            required=False,
244            value=values.get(CONF_TOKEN_NC),
245        ),
246        ConfigEntry(
247            key=CONF_USING_GPODDER,
248            type=ConfigEntryType.BOOLEAN,
249            label="using_gpodder",
250            hidden=True,
251            required=False,
252            value=values.get(CONF_USING_GPODDER),
253        ),
254    )
255
256
257class GPodder(MusicProvider):
258    """gPodder MusicProvider."""
259
260    async def handle_async_init(self) -> None:
261        """Pass config values to client and initialize."""
262        base_url = str(self.config.get_value(CONF_URL))
263        _username = self.config.get_value(CONF_USERNAME)
264        _password = self.config.get_value(CONF_PASSWORD)
265        _device_id = self.config.get_value(CONF_DEVICE_ID)
266        nc_url = str(self.config.get_value(CONF_URL_NC))
267        nc_token = self.config.get_value(CONF_TOKEN_NC)
268        verify_ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
269
270        self.max_episodes = int(float(str(self.config.get_value(CONF_MAX_NUM_EPISODES))))
271
272        self._client = GPodderClient(
273            session=self.mass.http_session, logger=self.logger, verify_ssl=verify_ssl
274        )
275
276        if nc_token is not None:
277            assert nc_url is not None
278            self._client.init_nc(base_url=nc_url, nc_token=str(nc_token))
279        else:
280            self._update_config_value(CONF_USING_GPODDER, True)
281            if _username is None or _password is None or _device_id is None:
282                raise LoginFailed("Must provide username, password and device_id.")
283            username = str(_username)
284            password = str(_password)
285            device_id = str(_device_id)
286
287            if base_url.rstrip("/") == "https://gpodder.net":
288                raise LoginFailed("Do not use gpodder.net. See docs for explanation.")
289            try:
290                await self._client.init_gpodder(
291                    username=username, password=password, base_url=base_url, device=device_id
292                )
293            except RuntimeError as exc:
294                raise LoginFailed("Login failed.") from exc
295
296        timestamps = await self.mass.cache.get(
297            key=CACHE_KEY_TIMESTAMP,
298            provider=self.instance_id,
299            category=CACHE_CATEGORY_OTHER,
300            default=None,
301        )
302        if timestamps is None:
303            self.timestamp_subscriptions: int = 0
304            self.timestamp_actions: int = 0
305        else:
306            self.timestamp_subscriptions, self.timestamp_actions = timestamps
307
308        self.logger.debug(
309            "Our timestamps are (subscriptions, actions)  (%s, %s)",
310            self.timestamp_subscriptions,
311            self.timestamp_actions,
312        )
313
314        feeds = await self.mass.cache.get(
315            key=CACHE_KEY_FEEDS,
316            provider=self.instance_id,
317            category=CACHE_CATEGORY_OTHER,
318            default=None,
319        )
320        if feeds is None:
321            self.feeds: set[str] = set()
322        else:
323            self.feeds = set(feeds)  # feeds is a list here
324
325        # we are syncing the playlog, but not event based. A simple check in on_played,
326        # should be sufficient
327        self.progress_guard_timestamp = 0.0
328
329    @property
330    def is_streaming_provider(self) -> bool:
331        """Return True if the provider is a streaming provider."""
332        # For streaming providers return True here but for local file based providers return False.
333        # While the streams are remote, the user controls what is added.
334        return False
335
336    async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
337        """Retrieve library/subscribed podcasts from the provider."""
338        try:
339            subscriptions = await self._client.get_subscriptions()
340        except RuntimeError:
341            raise ResourceTemporarilyUnavailable(backoff_time=30)
342        if subscriptions is None:
343            return
344
345        for feed_url in subscriptions.add:
346            self.feeds.add(feed_url)
347        for feed_url in subscriptions.remove:
348            try:
349                self.feeds.remove(feed_url)
350            except KeyError:
351                # a podcast might have been added and removed in our absence...
352                continue
353
354        episode_actions, timestamp_action = await self._client.get_episode_actions()
355        for feed_url in self.feeds:
356            self.logger.debug("Adding podcast with feed %s to library", feed_url)
357            # parse podcast
358            try:
359                parsed_podcast = await get_podcastparser_dict(
360                    session=self.mass.http_session,
361                    feed_url=feed_url,
362                    max_episodes=self.max_episodes,
363                )
364            except MediaNotFoundError:
365                self.logger.warning(f"Was unable to obtain podcast with feed {feed_url}")
366                continue
367            await self._cache_set_podcast(feed_url, parsed_podcast)
368
369            # playlog
370            # be safe, if there should be multiple episodeactions. client already sorts
371            # progresses in descending order.
372            _already_processed = set()
373            _episode_actions = [x for x in episode_actions if x.podcast == feed_url]
374            for _action in _episode_actions:
375                if _action.episode not in _already_processed:
376                    _already_processed.add(_action.episode)
377                    # we do not have to add the progress, these would make calls twice,
378                    # and we only use the object to propagate to playlog
379                    self.progress_guard_timestamp = time.time()
380                    _episode_ids: list[str] = []
381                    if _action.guid is not None:
382                        _episode_ids.append(f"{feed_url} {_action.guid}")
383                    _episode_ids.append(f"{feed_url} {_action.episode}")
384                    mass_episode: PodcastEpisode | None = None
385                    for _episode_id in _episode_ids:
386                        try:
387                            mass_episode = await self.get_podcast_episode(
388                                _episode_id, add_progress=False
389                            )
390                            break
391                        except MediaNotFoundError:
392                            continue
393                    if mass_episode is None:
394                        self.logger.debug(
395                            f"Was unable to use progress for episode {_action.episode}."
396                        )
397                        continue
398                    match _action:
399                        case EpisodeActionNew():
400                            await self.mass.music.mark_item_unplayed(mass_episode)
401                        case EpisodeActionPlay():
402                            await self.mass.music.mark_item_played(
403                                mass_episode,
404                                fully_played=_action.position >= _action.total,
405                                seconds_played=_action.position,
406                            )
407
408            # cache
409            yield parse_podcast(
410                feed_url=feed_url,
411                parsed_feed=parsed_podcast,
412                instance_id=self.instance_id,
413                domain=self.domain,
414            )
415
416        self.timestamp_subscriptions = subscriptions.timestamp
417        if timestamp_action is not None:
418            self.timestamp_actions = timestamp_action
419        await self._cache_set_timestamps()
420        await self._cache_set_feeds()
421
422    async def get_podcast(self, prov_podcast_id: str) -> Podcast:
423        """Get Podcast."""
424        parsed_podcast = await self._cache_get_podcast(prov_podcast_id)
425
426        return parse_podcast(
427            feed_url=prov_podcast_id,
428            parsed_feed=parsed_podcast,
429            instance_id=self.instance_id,
430            domain=self.domain,
431        )
432
433    async def get_podcast_episodes(
434        self, prov_podcast_id: str, add_progress: bool = True
435    ) -> AsyncGenerator[PodcastEpisode, None]:
436        """Get Podcast episodes. Add progress information."""
437        if add_progress:
438            episode_actions, timestamp = await self._client.get_episode_actions()
439        else:
440            episode_actions, timestamp = [], None
441
442        podcast = await self._cache_get_podcast(prov_podcast_id)
443        podcast_cover = podcast.get("cover_url")
444        parsed_episodes = podcast.get("episodes", [])
445
446        if timestamp is not None:
447            self.timestamp_actions = timestamp
448            await self._cache_set_timestamps()
449
450        for cnt, parsed_episode in enumerate(parsed_episodes):
451            mass_episode = parse_podcast_episode(
452                episode=parsed_episode,
453                prov_podcast_id=prov_podcast_id,
454                episode_cnt=cnt,
455                podcast_cover=podcast_cover,
456                domain=self.domain,
457                instance_id=self.instance_id,
458            )
459            if mass_episode is None:
460                # faulty episode
461                continue
462            try:
463                stream_url, guid = get_stream_url_and_guid_from_episode(episode=parsed_episode)
464            except ValueError:
465                # episode enclosure or stream url missing
466                continue
467
468            for action in episode_actions:
469                # we have to test both, as we are comparing to external input.
470                _test = [action.guid, action.episode]
471                if prov_podcast_id == action.podcast and (guid in _test or stream_url in _test):
472                    self.progress_guard_timestamp = time.time()
473                    if isinstance(action, EpisodeActionNew):
474                        mass_episode.resume_position_ms = 0
475                        mass_episode.fully_played = False
476
477                        # propagate to playlog
478                        await self.mass.music.mark_item_unplayed(
479                            mass_episode,
480                        )
481                    elif isinstance(action, EpisodeActionPlay):
482                        fully_played = action.position >= action.total
483                        resume_position_s = action.position
484                        mass_episode.resume_position_ms = resume_position_s * 1000
485                        mass_episode.fully_played = fully_played
486
487                        # propagate progress to playlog
488                        await self.mass.music.mark_item_played(
489                            mass_episode,
490                            fully_played=fully_played,
491                            seconds_played=resume_position_s,
492                        )
493                    elif isinstance(action, EpisodeActionDelete):
494                        for mapping in mass_episode.provider_mappings:
495                            mapping.available = False
496                    break
497            yield mass_episode
498
499    async def get_podcast_episode(
500        self, prov_episode_id: str, add_progress: bool = True
501    ) -> PodcastEpisode:
502        """Get Podcast Episode. Add progress information."""
503        podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
504        async for mass_episode in self.get_podcast_episodes(podcast_id, add_progress=add_progress):
505            _, _guid_or_stream_url = mass_episode.item_id.split(" ")
506            # this is enough, as internal
507            if guid_or_stream_url == _guid_or_stream_url:
508                return mass_episode
509        raise MediaNotFoundError("Did not find episode.")
510
511    async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
512        """Return: finished, position_ms."""
513        assert media_type == MediaType.PODCAST_EPISODE
514        podcast_id, guid_or_stream_url = item_id.split(" ")
515        stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
516        try:
517            progresses, timestamp = await self._client.get_episode_actions(
518                since=self.timestamp_actions
519            )
520        except RuntimeError:
521            self.logger.warning("Was unable to obtain progresses.")
522            raise NotImplementedError  # fallback to internal position.
523        for action in progresses:
524            _test = [action.guid, action.episode]
525            # progress is external, compare guid and stream_url
526            if action.podcast == podcast_id and (
527                guid_or_stream_url in _test or stream_url in _test
528            ):
529                if timestamp is not None:
530                    self.timestamp_actions = timestamp
531                    await self._cache_set_timestamps()
532                if isinstance(action, EpisodeActionNew | EpisodeActionDelete):
533                    # no progress, it might have been actively reset
534                    # in case of delete, we start from start.
535                    return False, 0
536                _progress = (action.position >= action.total, max(action.position * 1000, 0))
537                self.logger.debug("Found an updated external resume position.")
538                return action.position >= action.total, max(action.position * 1000, 0)
539        self.logger.debug("Did not find an updated resume position, falling back to stored.")
540        # If we did not find a resume position, nothing changed since our last timestamp
541        # we raise NotImplementedError, such that MA falls back to the already stored
542        # resume_position in its playlog.
543        raise NotImplementedError
544
545    async def on_played(
546        self,
547        media_type: MediaType,
548        prov_item_id: str,
549        fully_played: bool,
550        position: int,
551        media_item: MediaItemType,
552        is_playing: bool = False,
553    ) -> None:
554        """Update progress."""
555        if media_item is None or not isinstance(media_item, PodcastEpisode):
556            return
557        if media_type != MediaType.PODCAST_EPISODE:
558            return
559        if time.time() - self.progress_guard_timestamp <= 5:
560            return
561        podcast_id, guid_or_stream_url = prov_item_id.split(" ")
562        stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
563        assert stream_url is not None
564        duration = media_item.duration
565        try:
566            await self._client.update_progress(
567                podcast_id=podcast_id,
568                episode_id=stream_url,
569                guid=guid_or_stream_url,
570                position_s=position,
571                duration_s=duration,
572            )
573            self.logger.debug(f"Updated progress to {position / duration * 100:.2f}%")
574        except RuntimeError as exc:
575            self.logger.debug(exc)
576            self.logger.debug("Failed to update progress.")
577
578    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
579        """Get streamdetails for item."""
580        podcast_id, guid_or_stream_url = item_id.split(" ")
581        stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
582        if stream_url is None:
583            raise MediaNotFoundError
584        return StreamDetails(
585            provider=self.instance_id,
586            item_id=item_id,
587            audio_format=AudioFormat(
588                content_type=ContentType.try_parse(stream_url),
589            ),
590            media_type=MediaType.PODCAST_EPISODE,
591            stream_type=StreamType.HTTP,
592            path=stream_url,
593            can_seek=True,
594            allow_seek=True,
595        )
596
597    async def _get_episode_stream_url(self, podcast_id: str, guid_or_stream_url: str) -> str | None:
598        podcast = await self._cache_get_podcast(podcast_id)
599        episodes = podcast.get("episodes", [])
600        for cnt, episode in enumerate(episodes):
601            episode_enclosures = episode.get("enclosures", [])
602            if len(episode_enclosures) < 1:
603                raise MediaNotFoundError
604            stream_url: str | None = episode_enclosures[0].get("url", None)
605            guid = episode.get("guid")
606            if guid is not None and len(guid.split(" ")) == 1:
607                _guid_or_stream_url_compare = guid
608            else:
609                _guid_or_stream_url_compare = stream_url
610            if guid_or_stream_url == _guid_or_stream_url_compare:
611                return stream_url
612        return None
613
614    async def _cache_get_podcast(self, prov_podcast_id: str) -> dict[str, Any]:
615        parsed_podcast = await self.mass.cache.get(
616            key=prov_podcast_id,
617            provider=self.instance_id,
618            category=CACHE_CATEGORY_PODCAST_ITEMS,
619            default=None,
620        )
621        if parsed_podcast is None:
622            # raises MediaNotFoundError
623            parsed_podcast = await get_podcastparser_dict(
624                session=self.mass.http_session,
625                feed_url=prov_podcast_id,
626                max_episodes=self.max_episodes,
627            )
628            await self._cache_set_podcast(feed_url=prov_podcast_id, parsed_podcast=parsed_podcast)
629
630        # this is a dictionary from podcastparser
631        return parsed_podcast  # type: ignore[no-any-return]
632
633    async def _cache_set_podcast(self, feed_url: str, parsed_podcast: dict[str, Any]) -> None:
634        await self.mass.cache.set(
635            key=feed_url,
636            provider=self.instance_id,
637            category=CACHE_CATEGORY_PODCAST_ITEMS,
638            data=parsed_podcast,
639            expiration=60 * 60 * 24,  # 1 day
640        )
641
642    async def _cache_set_timestamps(self) -> None:
643        # seven days default
644        await self.mass.cache.set(
645            key=CACHE_KEY_TIMESTAMP,
646            provider=self.instance_id,
647            category=CACHE_CATEGORY_OTHER,
648            data=[self.timestamp_subscriptions, self.timestamp_actions],
649        )
650
651    async def _cache_set_feeds(self) -> None:
652        # seven days default
653        await self.mass.cache.set(
654            key=CACHE_KEY_FEEDS,
655            provider=self.instance_id,
656            category=CACHE_CATEGORY_OTHER,
657            data=self.feeds,
658        )
659