music-assistant-server

25.4 KBPY
__init__.py
25.4 KB651 lines • python
1"""gPodder provider for Music Assistant.
2
3Tested against opodsync, https://github.com/kd2org/opodsync
4and nextcloud-gpodder, https://github.com/thrillfall/nextcloud-gpodder
5gpodder.net is not supported due to responsiveness/ frequent downtimes of domain.
6
7Note:
8    - it can happen, that we have the guid and use that for identification, but the sync state
9      provider, eg. opodsync might use only the stream url. So always make sure, to compare both
10      when relying on an external service
11    - The service calls have a timestamp (int, unix epoch s), which give the changes since then.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import time
18from collections.abc import AsyncGenerator
19from typing import TYPE_CHECKING, Any
20
21from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
22from music_assistant_models.enums import (
23    ConfigEntryType,
24    ContentType,
25    EventType,
26    MediaType,
27    ProviderFeature,
28    StreamType,
29)
30from music_assistant_models.errors import (
31    LoginFailed,
32    MediaNotFoundError,
33    ResourceTemporarilyUnavailable,
34)
35from music_assistant_models.media_items import AudioFormat, MediaItemType, Podcast, PodcastEpisode
36from music_assistant_models.streamdetails import StreamDetails
37
38from music_assistant.helpers.podcast_parsers import (
39    get_podcastparser_dict,
40    get_stream_url_and_guid_from_episode,
41    parse_podcast,
42    parse_podcast_episode,
43)
44from music_assistant.models.music_provider import MusicProvider
45
46from .client import EpisodeActionDelete, EpisodeActionNew, EpisodeActionPlay, GPodderClient
47
48if TYPE_CHECKING:
49    from music_assistant_models.provider import ProviderManifest
50
51    from music_assistant.mass import MusicAssistant
52    from music_assistant.models import ProviderInstanceType
53
54# Config for "classic" gpodder api
55CONF_URL = "url"
56CONF_USERNAME = "username"
57CONF_PASSWORD = "password"
58CONF_DEVICE_ID = "device_id"
59CONF_USING_GPODDER = "using_gpodder"  # hidden, bool, true if not nextcloud used
60
61# Config for nextcloud
62CONF_ACTION_AUTH_NC = "authenticate_nc"
63CONF_TOKEN_NC = "token"
64CONF_URL_NC = "url_nc"
65
66# General config
67CONF_VERIFY_SSL = "verify_ssl"
68CONF_MAX_NUM_EPISODES = "max_num_episodes"
69
70
71CACHE_CATEGORY_PODCAST_ITEMS = 0  # the individual parsed podcast (dict from podcastparser)
72CACHE_CATEGORY_OTHER = 1
73CACHE_KEY_TIMESTAMP = (
74    "timestamp"  # tuple of two ints, timestamp_subscriptions and timestamp_actions
75)
76CACHE_KEY_FEEDS = "feeds"  # list[str] : all available rss feed urls
77
78SUPPORTED_FEATURES = {
79    ProviderFeature.LIBRARY_PODCASTS,
80    ProviderFeature.BROWSE,
81}
82
83
84async def setup(
85    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
86) -> ProviderInstanceType:
87    """Initialize provider(instance) with given configuration."""
88    return GPodder(mass, manifest, config, SUPPORTED_FEATURES)
89
90
91async def get_config_entries(
92    mass: MusicAssistant,
93    instance_id: str | None = None,
94    action: str | None = None,
95    values: dict[str, ConfigValueType] | None = None,
96) -> tuple[ConfigEntry, ...]:
97    """
98    Return Config entries to setup this provider.
99
100    instance_id: id of an existing provider instance (None if new instance setup).
101    action: [optional] action key called from config entries UI.
102    values: the (intermediate) raw values for config entries sent with the action.
103    """
104    # ruff: noqa: ARG001
105    if values is None:
106        values = {}
107
108    if action == CONF_ACTION_AUTH_NC:
109        session = mass.http_session
110        response = await session.post(
111            str(values[CONF_URL_NC]).rstrip("/") + "/index.php/login/v2",
112            headers={"User-Agent": "Music Assistant"},
113        )
114        data = await response.json()
115        poll_endpoint = data["poll"]["endpoint"]
116        poll_token = data["poll"]["token"]
117        login_url = data["login"]
118        session_id = str(values["session_id"])
119        mass.signal_event(EventType.AUTH_SESSION, session_id, login_url)
120        while True:
121            response = await session.post(poll_endpoint, data={"token": poll_token})
122            if response.status not in [200, 404]:
123                raise LoginFailed("The specified url seems not to belong to a nextcloud instance.")
124            if response.status == 200:
125                data = await response.json()
126                values[CONF_TOKEN_NC] = data["appPassword"]
127                break
128            await asyncio.sleep(1)
129
130    authenticated_nc = True
131    if values.get(CONF_TOKEN_NC) is None:
132        authenticated_nc = False
133
134    using_gpodder = bool(values.get(CONF_USING_GPODDER, False))
135
136    return (
137        ConfigEntry(
138            key="label_text",
139            type=ConfigEntryType.LABEL,
140            label="Authentication did succeed! Please press save to continue.",
141            hidden=not authenticated_nc,
142        ),
143        ConfigEntry(
144            key="label_gpodder",
145            type=ConfigEntryType.LABEL,
146            label="Authentication with gPodder compatible web service, e.g. opodsync:",
147            hidden=authenticated_nc,
148        ),
149        ConfigEntry(
150            key=CONF_URL,
151            type=ConfigEntryType.STRING,
152            label="gPodder Service URL",
153            required=False,
154            description="URL of gPodder instance.",
155            value=values.get(CONF_URL),
156            hidden=authenticated_nc,
157        ),
158        ConfigEntry(
159            key=CONF_USERNAME,
160            type=ConfigEntryType.STRING,
161            label="Username",
162            required=False,
163            description="Username of gPodder instance.",
164            hidden=authenticated_nc,
165            value=values.get(CONF_USERNAME),
166        ),
167        ConfigEntry(
168            key=CONF_PASSWORD,
169            type=ConfigEntryType.SECURE_STRING,
170            label="Password",
171            required=False,
172            description="Password for gPodder instance.",
173            hidden=authenticated_nc,
174            value=values.get(CONF_PASSWORD),
175        ),
176        ConfigEntry(
177            key=CONF_DEVICE_ID,
178            type=ConfigEntryType.STRING,
179            label="Device ID",
180            required=False,
181            description="Device ID of user.",
182            hidden=authenticated_nc,
183            value=values.get(CONF_DEVICE_ID),
184        ),
185        ConfigEntry(
186            key="label_nextcloud",
187            type=ConfigEntryType.LABEL,
188            label="Authentication with Nextcloud with gPodder Sync (nextcloud-gpodder) installed:",
189            hidden=authenticated_nc or using_gpodder,
190        ),
191        ConfigEntry(
192            key=CONF_URL_NC,
193            type=ConfigEntryType.STRING,
194            label="Nextcloud URL",
195            required=False,
196            description="URL of Nextcloud instance.",
197            value=values.get(CONF_URL_NC),
198            hidden=using_gpodder,
199        ),
200        ConfigEntry(
201            key=CONF_ACTION_AUTH_NC,
202            type=ConfigEntryType.ACTION,
203            label="(Re)Authenticate with Nextcloud",
204            description="This button will redirect you to your Nextcloud instance to authenticate.",
205            action=CONF_ACTION_AUTH_NC,
206            required=False,
207            hidden=using_gpodder,
208        ),
209        ConfigEntry(
210            key="label_general",
211            type=ConfigEntryType.LABEL,
212            label="General config:",
213        ),
214        ConfigEntry(
215            key=CONF_MAX_NUM_EPISODES,
216            type=ConfigEntryType.INTEGER,
217            label="Maximum number of episodes (0 for unlimited)",
218            required=False,
219            description="Maximum number of episodes to sync per feed. Use 0 for unlimited",
220            default_value=0,
221            value=values.get(CONF_MAX_NUM_EPISODES),
222        ),
223        ConfigEntry(
224            key=CONF_VERIFY_SSL,
225            type=ConfigEntryType.BOOLEAN,
226            label="Verify SSL",
227            required=False,
228            description="Whether or not to verify the certificate of SSL/TLS connections.",
229            advanced=True,
230            default_value=True,
231            value=values.get(CONF_VERIFY_SSL),
232        ),
233        ConfigEntry(
234            key=CONF_TOKEN_NC,
235            type=ConfigEntryType.SECURE_STRING,
236            label="token",
237            hidden=True,
238            required=False,
239            value=values.get(CONF_TOKEN_NC),
240        ),
241        ConfigEntry(
242            key=CONF_USING_GPODDER,
243            type=ConfigEntryType.BOOLEAN,
244            label="using_gpodder",
245            hidden=True,
246            required=False,
247            value=values.get(CONF_USING_GPODDER),
248        ),
249    )
250
251
252class GPodder(MusicProvider):
253    """gPodder MusicProvider."""
254
255    async def handle_async_init(self) -> None:
256        """Pass config values to client and initialize."""
257        base_url = str(self.config.get_value(CONF_URL))
258        _username = self.config.get_value(CONF_USERNAME)
259        _password = self.config.get_value(CONF_PASSWORD)
260        _device_id = self.config.get_value(CONF_DEVICE_ID)
261        nc_url = str(self.config.get_value(CONF_URL_NC))
262        nc_token = self.config.get_value(CONF_TOKEN_NC)
263
264        self.max_episodes = int(float(str(self.config.get_value(CONF_MAX_NUM_EPISODES))))
265
266        self._client = GPodderClient(session=self.mass.http_session, logger=self.logger)
267
268        if nc_token is not None:
269            assert nc_url is not None
270            self._client.init_nc(base_url=nc_url, nc_token=str(nc_token))
271        else:
272            self._update_config_value(CONF_USING_GPODDER, True)
273            if _username is None or _password is None or _device_id is None:
274                raise LoginFailed("Must provide username, password and device_id.")
275            username = str(_username)
276            password = str(_password)
277            device_id = str(_device_id)
278
279            if base_url.rstrip("/") == "https://gpodder.net":
280                raise LoginFailed("Do not use gpodder.net. See docs for explanation.")
281            try:
282                await self._client.init_gpodder(
283                    username=username, password=password, base_url=base_url, device=device_id
284                )
285            except RuntimeError as exc:
286                raise LoginFailed("Login failed.") from exc
287
288        timestamps = await self.mass.cache.get(
289            key=CACHE_KEY_TIMESTAMP,
290            provider=self.instance_id,
291            category=CACHE_CATEGORY_OTHER,
292            default=None,
293        )
294        if timestamps is None:
295            self.timestamp_subscriptions: int = 0
296            self.timestamp_actions: int = 0
297        else:
298            self.timestamp_subscriptions, self.timestamp_actions = timestamps
299
300        self.logger.debug(
301            "Our timestamps are (subscriptions, actions)  (%s, %s)",
302            self.timestamp_subscriptions,
303            self.timestamp_actions,
304        )
305
306        feeds = await self.mass.cache.get(
307            key=CACHE_KEY_FEEDS,
308            provider=self.instance_id,
309            category=CACHE_CATEGORY_OTHER,
310            default=None,
311        )
312        if feeds is None:
313            self.feeds: set[str] = set()
314        else:
315            self.feeds = set(feeds)  # feeds is a list here
316
317        # we are syncing the playlog, but not event based. A simple check in on_played,
318        # should be sufficient
319        self.progress_guard_timestamp = 0.0
320
321    @property
322    def is_streaming_provider(self) -> bool:
323        """Return True if the provider is a streaming provider."""
324        # For streaming providers return True here but for local file based providers return False.
325        # While the streams are remote, the user controls what is added.
326        return False
327
328    async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
329        """Retrieve library/subscribed podcasts from the provider."""
330        try:
331            subscriptions = await self._client.get_subscriptions()
332        except RuntimeError:
333            raise ResourceTemporarilyUnavailable(backoff_time=30)
334        if subscriptions is None:
335            return
336
337        for feed_url in subscriptions.add:
338            self.feeds.add(feed_url)
339        for feed_url in subscriptions.remove:
340            try:
341                self.feeds.remove(feed_url)
342            except KeyError:
343                # a podcast might have been added and removed in our absence...
344                continue
345
346        episode_actions, timestamp_action = await self._client.get_episode_actions()
347        for feed_url in self.feeds:
348            self.logger.debug("Adding podcast with feed %s to library", feed_url)
349            # parse podcast
350            try:
351                parsed_podcast = await get_podcastparser_dict(
352                    session=self.mass.http_session,
353                    feed_url=feed_url,
354                    max_episodes=self.max_episodes,
355                )
356            except MediaNotFoundError:
357                self.logger.warning(f"Was unable to obtain podcast with feed {feed_url}")
358                continue
359            await self._cache_set_podcast(feed_url, parsed_podcast)
360
361            # playlog
362            # be safe, if there should be multiple episodeactions. client already sorts
363            # progresses in descending order.
364            _already_processed = set()
365            _episode_actions = [x for x in episode_actions if x.podcast == feed_url]
366            for _action in _episode_actions:
367                if _action.episode not in _already_processed:
368                    _already_processed.add(_action.episode)
369                    # we do not have to add the progress, these would make calls twice,
370                    # and we only use the object to propagate to playlog
371                    self.progress_guard_timestamp = time.time()
372                    _episode_ids: list[str] = []
373                    if _action.guid is not None:
374                        _episode_ids.append(f"{feed_url} {_action.guid}")
375                    _episode_ids.append(f"{feed_url} {_action.episode}")
376                    mass_episode: PodcastEpisode | None = None
377                    for _episode_id in _episode_ids:
378                        try:
379                            mass_episode = await self.get_podcast_episode(
380                                _episode_id, add_progress=False
381                            )
382                            break
383                        except MediaNotFoundError:
384                            continue
385                    if mass_episode is None:
386                        self.logger.debug(
387                            f"Was unable to use progress for episode {_action.episode}."
388                        )
389                        continue
390                    match _action:
391                        case EpisodeActionNew():
392                            await self.mass.music.mark_item_unplayed(mass_episode)
393                        case EpisodeActionPlay():
394                            await self.mass.music.mark_item_played(
395                                mass_episode,
396                                fully_played=_action.position >= _action.total,
397                                seconds_played=_action.position,
398                            )
399
400            # cache
401            yield parse_podcast(
402                feed_url=feed_url,
403                parsed_feed=parsed_podcast,
404                instance_id=self.instance_id,
405                domain=self.domain,
406            )
407
408        self.timestamp_subscriptions = subscriptions.timestamp
409        if timestamp_action is not None:
410            self.timestamp_actions = timestamp_action
411        await self._cache_set_timestamps()
412        await self._cache_set_feeds()
413
414    async def get_podcast(self, prov_podcast_id: str) -> Podcast:
415        """Get Podcast."""
416        parsed_podcast = await self._cache_get_podcast(prov_podcast_id)
417
418        return parse_podcast(
419            feed_url=prov_podcast_id,
420            parsed_feed=parsed_podcast,
421            instance_id=self.instance_id,
422            domain=self.domain,
423        )
424
425    async def get_podcast_episodes(
426        self, prov_podcast_id: str, add_progress: bool = True
427    ) -> AsyncGenerator[PodcastEpisode, None]:
428        """Get Podcast episodes. Add progress information."""
429        if add_progress:
430            episode_actions, timestamp = await self._client.get_episode_actions()
431        else:
432            episode_actions, timestamp = [], None
433
434        podcast = await self._cache_get_podcast(prov_podcast_id)
435        podcast_cover = podcast.get("cover_url")
436        parsed_episodes = podcast.get("episodes", [])
437
438        if timestamp is not None:
439            self.timestamp_actions = timestamp
440            await self._cache_set_timestamps()
441
442        for cnt, parsed_episode in enumerate(parsed_episodes):
443            mass_episode = parse_podcast_episode(
444                episode=parsed_episode,
445                prov_podcast_id=prov_podcast_id,
446                episode_cnt=cnt,
447                podcast_cover=podcast_cover,
448                domain=self.domain,
449                instance_id=self.instance_id,
450            )
451            if mass_episode is None:
452                # faulty episode
453                continue
454            try:
455                stream_url, guid = get_stream_url_and_guid_from_episode(episode=parsed_episode)
456            except ValueError:
457                # episode enclosure or stream url missing
458                continue
459
460            for action in episode_actions:
461                # we have to test both, as we are comparing to external input.
462                _test = [action.guid, action.episode]
463                if prov_podcast_id == action.podcast and (guid in _test or stream_url in _test):
464                    self.progress_guard_timestamp = time.time()
465                    if isinstance(action, EpisodeActionNew):
466                        mass_episode.resume_position_ms = 0
467                        mass_episode.fully_played = False
468
469                        # propagate to playlog
470                        await self.mass.music.mark_item_unplayed(
471                            mass_episode,
472                        )
473                    elif isinstance(action, EpisodeActionPlay):
474                        fully_played = action.position >= action.total
475                        resume_position_s = action.position
476                        mass_episode.resume_position_ms = resume_position_s * 1000
477                        mass_episode.fully_played = fully_played
478
479                        # propagate progress to playlog
480                        await self.mass.music.mark_item_played(
481                            mass_episode,
482                            fully_played=fully_played,
483                            seconds_played=resume_position_s,
484                        )
485                    elif isinstance(action, EpisodeActionDelete):
486                        for mapping in mass_episode.provider_mappings:
487                            mapping.available = False
488                    break
489            yield mass_episode
490
491    async def get_podcast_episode(
492        self, prov_episode_id: str, add_progress: bool = True
493    ) -> PodcastEpisode:
494        """Get Podcast Episode. Add progress information."""
495        podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
496        async for mass_episode in self.get_podcast_episodes(podcast_id, add_progress=add_progress):
497            _, _guid_or_stream_url = mass_episode.item_id.split(" ")
498            # this is enough, as internal
499            if guid_or_stream_url == _guid_or_stream_url:
500                return mass_episode
501        raise MediaNotFoundError("Did not find episode.")
502
503    async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
504        """Return: finished, position_ms."""
505        assert media_type == MediaType.PODCAST_EPISODE
506        podcast_id, guid_or_stream_url = item_id.split(" ")
507        stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
508        try:
509            progresses, timestamp = await self._client.get_episode_actions(
510                since=self.timestamp_actions
511            )
512        except RuntimeError:
513            self.logger.warning("Was unable to obtain progresses.")
514            raise NotImplementedError  # fallback to internal position.
515        for action in progresses:
516            _test = [action.guid, action.episode]
517            # progress is external, compare guid and stream_url
518            if action.podcast == podcast_id and (
519                guid_or_stream_url in _test or stream_url in _test
520            ):
521                if timestamp is not None:
522                    self.timestamp_actions = timestamp
523                    await self._cache_set_timestamps()
524                if isinstance(action, EpisodeActionNew | EpisodeActionDelete):
525                    # no progress, it might have been actively reset
526                    # in case of delete, we start from start.
527                    return False, 0
528                _progress = (action.position >= action.total, max(action.position * 1000, 0))
529                self.logger.debug("Found an updated external resume position.")
530                return action.position >= action.total, max(action.position * 1000, 0)
531        self.logger.debug("Did not find an updated resume position, falling back to stored.")
532        # If we did not find a resume position, nothing changed since our last timestamp
533        # we raise NotImplementedError, such that MA falls back to the already stored
534        # resume_position in its playlog.
535        raise NotImplementedError
536
537    async def on_played(
538        self,
539        media_type: MediaType,
540        prov_item_id: str,
541        fully_played: bool,
542        position: int,
543        media_item: MediaItemType,
544        is_playing: bool = False,
545    ) -> None:
546        """Update progress."""
547        if media_item is None or not isinstance(media_item, PodcastEpisode):
548            return
549        if media_type != MediaType.PODCAST_EPISODE:
550            return
551        if time.time() - self.progress_guard_timestamp <= 5:
552            return
553        podcast_id, guid_or_stream_url = prov_item_id.split(" ")
554        stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
555        assert stream_url is not None
556        duration = media_item.duration
557        try:
558            await self._client.update_progress(
559                podcast_id=podcast_id,
560                episode_id=stream_url,
561                guid=guid_or_stream_url,
562                position_s=position,
563                duration_s=duration,
564            )
565            self.logger.debug(f"Updated progress to {position / duration * 100:.2f}%")
566        except RuntimeError as exc:
567            self.logger.debug(exc)
568            self.logger.debug("Failed to update progress.")
569
570    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
571        """Get streamdetails for item."""
572        podcast_id, guid_or_stream_url = item_id.split(" ")
573        stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
574        if stream_url is None:
575            raise MediaNotFoundError
576        return StreamDetails(
577            provider=self.instance_id,
578            item_id=item_id,
579            audio_format=AudioFormat(
580                content_type=ContentType.try_parse(stream_url),
581            ),
582            media_type=MediaType.PODCAST_EPISODE,
583            stream_type=StreamType.HTTP,
584            path=stream_url,
585            can_seek=True,
586            allow_seek=True,
587        )
588
589    async def _get_episode_stream_url(self, podcast_id: str, guid_or_stream_url: str) -> str | None:
590        podcast = await self._cache_get_podcast(podcast_id)
591        episodes = podcast.get("episodes", [])
592        for cnt, episode in enumerate(episodes):
593            episode_enclosures = episode.get("enclosures", [])
594            if len(episode_enclosures) < 1:
595                raise MediaNotFoundError
596            stream_url: str | None = episode_enclosures[0].get("url", None)
597            guid = episode.get("guid")
598            if guid is not None and len(guid.split(" ")) == 1:
599                _guid_or_stream_url_compare = guid
600            else:
601                _guid_or_stream_url_compare = stream_url
602            if guid_or_stream_url == _guid_or_stream_url_compare:
603                return stream_url
604        return None
605
606    async def _cache_get_podcast(self, prov_podcast_id: str) -> dict[str, Any]:
607        parsed_podcast = await self.mass.cache.get(
608            key=prov_podcast_id,
609            provider=self.instance_id,
610            category=CACHE_CATEGORY_PODCAST_ITEMS,
611            default=None,
612        )
613        if parsed_podcast is None:
614            # raises MediaNotFoundError
615            parsed_podcast = await get_podcastparser_dict(
616                session=self.mass.http_session,
617                feed_url=prov_podcast_id,
618                max_episodes=self.max_episodes,
619            )
620            await self._cache_set_podcast(feed_url=prov_podcast_id, parsed_podcast=parsed_podcast)
621
622        # this is a dictionary from podcastparser
623        return parsed_podcast  # type: ignore[no-any-return]
624
625    async def _cache_set_podcast(self, feed_url: str, parsed_podcast: dict[str, Any]) -> None:
626        await self.mass.cache.set(
627            key=feed_url,
628            provider=self.instance_id,
629            category=CACHE_CATEGORY_PODCAST_ITEMS,
630            data=parsed_podcast,
631            expiration=60 * 60 * 24,  # 1 day
632        )
633
634    async def _cache_set_timestamps(self) -> None:
635        # seven days default
636        await self.mass.cache.set(
637            key=CACHE_KEY_TIMESTAMP,
638            provider=self.instance_id,
639            category=CACHE_CATEGORY_OTHER,
640            data=[self.timestamp_subscriptions, self.timestamp_actions],
641        )
642
643    async def _cache_set_feeds(self) -> None:
644        # seven days default
645        await self.mass.cache.set(
646            key=CACHE_KEY_FEEDS,
647            provider=self.instance_id,
648            category=CACHE_CATEGORY_OTHER,
649            data=self.feeds,
650        )
651