music-assistant-server

9.8 KBPY
client.py
9.8 KB310 lines • python
1"""Simplest client for gPodder.
2
3Should be compatible with Nextcloud App GPodder Sync, and the original api
4of gpodder.net (mygpo) or drop-in replacements like opodsync.
5Gpodder Sync uses guid optionally.
6"""
7
8import datetime
9import logging
10from contextlib import suppress
11from dataclasses import dataclass, field
12from typing import Any
13
14import aiohttp
15from aiohttp.client_exceptions import ClientResponseError
16from mashumaro.config import BaseConfig
17from mashumaro.mixins.json import DataClassJSONMixin
18from mashumaro.types import Discriminator
19
20
21# https://gpoddernet.readthedocs.io/en/latest/api/reference/subscriptions.html#upload-subscription-changes
22@dataclass(kw_only=True)
23class SubscriptionsChangeRequest(DataClassJSONMixin):
24    """SubscriptionChangeRequest."""
25
26    add: list[str] = field(default_factory=list)
27    remove: list[str] = field(default_factory=list)
28
29
30# https://gpoddernet.readthedocs.io/en/latest/api/reference/subscriptions.html#upload-subscription-changes
31@dataclass(kw_only=True)
32class SubscriptionsGet(SubscriptionsChangeRequest):
33    """SubscriptionsGet."""
34
35    timestamp: int
36
37
38def action_tagger(cls: "type[EpisodeAction]") -> list[str]:
39    """Use action field to distinguish classes.
40
41    NC Gpodder uses upper case values, opodsync lower case.
42    This however does not work with a StrEnum, so plain string as action.
43    """
44    action = cls.__name__.replace("EpisodeAction", "")
45    return [action.upper(), action.lower()]
46
47
48@dataclass(kw_only=True)
49class EpisodeAction(DataClassJSONMixin):
50    """General EpisodeAction.
51
52    See https://gpoddernet.readthedocs.io/en/latest/api/reference/events.html
53    """
54
55    class Config(BaseConfig):
56        """Config."""
57
58        discriminator = Discriminator(
59            field="action", include_subtypes=True, variant_tagger_fn=action_tagger
60        )
61        omit_none = True  # only nextcloud supports guid
62
63    podcast: str
64    episode: str
65    timestamp: str = ""
66    guid: str | None = None
67
68
69@dataclass(kw_only=True)
70class EpisodeActionDownload(EpisodeAction):
71    """EpisodeActionDownload."""
72
73    action: str = "download"
74
75
76@dataclass(kw_only=True)
77class EpisodeActionDelete(EpisodeAction):
78    """EpisodeActionDelete."""
79
80    action: str = "delete"
81
82
83@dataclass(kw_only=True)
84class EpisodeActionNew(EpisodeAction):
85    """EpisodeActionNew."""
86
87    action: str = "new"
88
89
90@dataclass(kw_only=True)
91class EpisodeActionFlattr(EpisodeAction):
92    """EpisodeActionFlattr."""
93
94    action: str = "flattr"
95
96
97@dataclass(kw_only=True)
98class EpisodeActionPlay(EpisodeAction):
99    """EpisodeActionPlay."""
100
101    action: str = "play"
102
103    # all in seconds
104    started: int = 0
105    position: int = 0
106    total: int = 0
107
108
109@dataclass(kw_only=True)
110class EpisodeActionGet(DataClassJSONMixin):
111    """EpisodeActionGet."""
112
113    actions: list[EpisodeAction]
114    timestamp: int
115
116
117class GPodderClient:
118    """GPodderClient."""
119
120    def __init__(
121        self, session: aiohttp.ClientSession, logger: logging.Logger, verify_ssl: bool = True
122    ) -> None:
123        """Init for GPodderClient."""
124        self.session = session
125        self.verify_ssl = verify_ssl
126
127        self.is_nextcloud = False
128        self.base_url: str
129        self.token: str | None
130
131        self.username: str
132        self.device: str
133        self.auth: aiohttp.BasicAuth | None = None  # only for gpodder
134
135        self.logger = logger
136
137        self._nextcloud_prefix = "index.php/apps/gpoddersync"
138
139    def init_nc(self, base_url: str, nc_token: str | None = None) -> None:
140        """Init values for a nextcloud client."""
141        self.is_nextcloud = True
142        self.token = nc_token
143        self.base_url = base_url.rstrip("/")
144
145    async def init_gpodder(self, username: str, password: str, device: str, base_url: str) -> None:
146        """Init via basic auth."""
147        self.username = username
148        self.device = device
149        self.base_url = base_url.rstrip("/")
150        self.auth = aiohttp.BasicAuth(username, password)
151        await self._post(endpoint=f"api/2/auth/{username}/login.json")
152
153    @property
154    def headers(self) -> dict[str, str]:
155        """Session headers."""
156        if self.token is None:
157            raise RuntimeError("Token not set.")
158        return {"Authorization": f"Bearer {self.token}"}
159
160    async def _post(
161        self,
162        endpoint: str,
163        data: dict[str, Any] | list[Any] | None = None,
164    ) -> bytes:
165        """POST request."""
166        try:
167            response = await self.session.post(
168                f"{self.base_url}/{endpoint}",
169                json=data,
170                ssl=self.verify_ssl,
171                headers=self.headers if self.is_nextcloud else None,
172                raise_for_status=True,
173                auth=self.auth,
174            )
175        except ClientResponseError as exc:
176            self.logger.debug(exc)
177            raise RuntimeError(f"API POST call to {endpoint} failed.") from exc
178        if response.status != 200:
179            self.logger.debug(f"Call failed with status {response.status}")
180            raise RuntimeError(f"Api post call failed to {endpoint} failed!")
181        return await response.read()
182
183    async def _get(self, endpoint: str, params: dict[str, str | int] | None = None) -> bytes:
184        """GET request."""
185        response = await self.session.get(
186            f"{self.base_url}/{endpoint}",
187            params=params,
188            ssl=self.verify_ssl,
189            headers=self.headers if self.is_nextcloud else None,
190            auth=self.auth,
191        )
192        status = response.status
193        if response.content_type == "application/json" and status == 200:
194            return await response.read()
195        if status == 404:
196            return b""
197        self.logger.debug(f"Call failed with status {response.status}")
198        raise RuntimeError(f"API GET call to {endpoint} failed.")
199
200    async def get_subscriptions(self, since: int = 0) -> SubscriptionsGet | None:
201        """Get subscriptions.
202
203        since is unix time epoch - this may return none if there are no
204        subscriptions.
205        """
206        if self.is_nextcloud:
207            endpoint = f"{self._nextcloud_prefix}/subscriptions"
208        else:
209            endpoint = f"api/2/subscriptions/{self.username}/{self.device}.json"
210
211        response = await self._get(endpoint, params={"since": since})
212        if not response:
213            return None
214        return SubscriptionsGet.from_json(response)
215
216    async def get_episode_actions(
217        self, since: int = 0
218    ) -> tuple[list[EpisodeActionPlay | EpisodeActionNew | EpisodeActionDelete], int | None]:
219        """Get progresses or deletions. Timestamp is second return value.
220
221        gpodder net may filter by podcast
222        https://gpoddernet.readthedocs.io/en/latest/api/reference/events.html
223        -> we do not use this for now, since nextcloud implementation is not
224        capable of it. Also, implementation in drop-in replacements varies.
225
226        Play holds progress information.
227        New is a marked unplayed.
228        Delete is used if the user deletes a previously downloaded episode.
229        """
230        params: dict[str, str | int] = {"since": since}
231        if self.is_nextcloud:
232            endpoint = f"{self._nextcloud_prefix}/episode_action"
233        else:
234            endpoint = f"api/2/episodes/{self.username}.json"
235            params["device"] = self.device
236        response = await self._get(endpoint, params=params)
237        if not response:
238            return [], None
239        actions_response = EpisodeActionGet.from_json(response)
240
241        # play has progress information
242        # new means, there is no progress (i.e. mark unplayed)
243        actions = [
244            x
245            for x in actions_response.actions
246            if isinstance(x, EpisodeActionPlay | EpisodeActionNew | EpisodeActionDelete)
247        ]
248
249        with suppress(ValueError):
250            actions = sorted(actions, key=lambda x: datetime.datetime.fromisoformat(x.timestamp))[
251                ::-1
252            ]
253
254        return actions, actions_response.timestamp
255
256    async def update_subscriptions(
257        self, add: list[str] | None = None, remove: list[str] | None = None
258    ) -> None:
259        """Update subscriptions."""
260        if add is None:
261            add = []
262        if remove is None:
263            remove = []
264        request = SubscriptionsChangeRequest(add=add, remove=remove)
265        if self.is_nextcloud:
266            endpoint = f"{self._nextcloud_prefix}/subscription_change/create"
267        else:
268            endpoint = f"api/2/subscriptions/{self.username}/{self.device}.json"
269
270        await self._post(endpoint=endpoint, data=request.to_dict())
271
272    async def update_progress(
273        self,
274        *,
275        podcast_id: str,
276        episode_id: str,
277        guid: str | None,
278        position_s: float,
279        duration_s: float,
280    ) -> None:
281        """Update progress."""
282        utc_timestamp = (
283            datetime.datetime.now(datetime.UTC).replace(microsecond=0, tzinfo=None).isoformat()
284        )
285
286        episode_action: EpisodeActionNew | EpisodeActionPlay
287        if position_s == 0:
288            # mark unplayed
289            episode_action = EpisodeActionNew(
290                podcast=podcast_id, episode=episode_id, timestamp=utc_timestamp
291            )
292        else:
293            episode_action = EpisodeActionPlay(
294                podcast=podcast_id,
295                episode=episode_id,
296                timestamp=utc_timestamp,
297                position=int(position_s),
298                started=0,
299                total=int(duration_s),
300            )
301
302        # It is a bit unclear here, if other gpodder alternatives then nextcloud support the guid
303        # for episodes. I didn't see that in the source for opodsync at least...
304        if self.is_nextcloud:
305            episode_action.guid = guid
306            endpoint = f"{self._nextcloud_prefix}/episode_action/create"
307        else:
308            endpoint = f"api/2/episodes/{self.username}.json"
309        await self._post(endpoint=endpoint, data=[episode_action.to_dict()])
310