/
/
/
1"""Simplest client for gPodder.
2
3Should be compatible with Nextcloud App GPodder Sync, and the original api
4of gpodder.net (mygpo) or drop-in replacements like opodsync.
5Gpodder Sync uses guid optionally.
6"""
7
8import datetime
9import logging
10from contextlib import suppress
11from dataclasses import dataclass, field
12from typing import Any
13
14import aiohttp
15from aiohttp.client_exceptions import ClientResponseError
16from mashumaro.config import BaseConfig
17from mashumaro.mixins.json import DataClassJSONMixin
18from mashumaro.types import Discriminator
19
20
21# https://gpoddernet.readthedocs.io/en/latest/api/reference/subscriptions.html#upload-subscription-changes
22@dataclass(kw_only=True)
23class SubscriptionsChangeRequest(DataClassJSONMixin):
24 """SubscriptionChangeRequest."""
25
26 add: list[str] = field(default_factory=list)
27 remove: list[str] = field(default_factory=list)
28
29
30# https://gpoddernet.readthedocs.io/en/latest/api/reference/subscriptions.html#upload-subscription-changes
31@dataclass(kw_only=True)
32class SubscriptionsGet(SubscriptionsChangeRequest):
33 """SubscriptionsGet."""
34
35 timestamp: int
36
37
38def action_tagger(cls: "type[EpisodeAction]") -> list[str]:
39 """Use action field to distinguish classes.
40
41 NC Gpodder uses upper case values, opodsync lower case.
42 This however does not work with a StrEnum, so plain string as action.
43 """
44 action = cls.__name__.replace("EpisodeAction", "")
45 return [action.upper(), action.lower()]
46
47
48@dataclass(kw_only=True)
49class EpisodeAction(DataClassJSONMixin):
50 """General EpisodeAction.
51
52 See https://gpoddernet.readthedocs.io/en/latest/api/reference/events.html
53 """
54
55 class Config(BaseConfig):
56 """Config."""
57
58 discriminator = Discriminator(
59 field="action", include_subtypes=True, variant_tagger_fn=action_tagger
60 )
61 omit_none = True # only nextcloud supports guid
62
63 podcast: str
64 episode: str
65 timestamp: str = ""
66 guid: str | None = None
67
68
69@dataclass(kw_only=True)
70class EpisodeActionDownload(EpisodeAction):
71 """EpisodeActionDownload."""
72
73 action: str = "download"
74
75
76@dataclass(kw_only=True)
77class EpisodeActionDelete(EpisodeAction):
78 """EpisodeActionDelete."""
79
80 action: str = "delete"
81
82
83@dataclass(kw_only=True)
84class EpisodeActionNew(EpisodeAction):
85 """EpisodeActionNew."""
86
87 action: str = "new"
88
89
90@dataclass(kw_only=True)
91class EpisodeActionFlattr(EpisodeAction):
92 """EpisodeActionFlattr."""
93
94 action: str = "flattr"
95
96
97@dataclass(kw_only=True)
98class EpisodeActionPlay(EpisodeAction):
99 """EpisodeActionPlay."""
100
101 action: str = "play"
102
103 # all in seconds
104 started: int = 0
105 position: int = 0
106 total: int = 0
107
108
109@dataclass(kw_only=True)
110class EpisodeActionGet(DataClassJSONMixin):
111 """EpisodeActionGet."""
112
113 actions: list[EpisodeAction]
114 timestamp: int
115
116
117class GPodderClient:
118 """GPodderClient."""
119
120 def __init__(
121 self, session: aiohttp.ClientSession, logger: logging.Logger, verify_ssl: bool = True
122 ) -> None:
123 """Init for GPodderClient."""
124 self.session = session
125 self.verify_ssl = verify_ssl
126
127 self.is_nextcloud = False
128 self.base_url: str
129 self.token: str | None
130
131 self.username: str
132 self.device: str
133 self.auth: aiohttp.BasicAuth | None = None # only for gpodder
134
135 self.logger = logger
136
137 self._nextcloud_prefix = "index.php/apps/gpoddersync"
138
139 def init_nc(self, base_url: str, nc_token: str | None = None) -> None:
140 """Init values for a nextcloud client."""
141 self.is_nextcloud = True
142 self.token = nc_token
143 self.base_url = base_url.rstrip("/")
144
145 async def init_gpodder(self, username: str, password: str, device: str, base_url: str) -> None:
146 """Init via basic auth."""
147 self.username = username
148 self.device = device
149 self.base_url = base_url.rstrip("/")
150 self.auth = aiohttp.BasicAuth(username, password)
151 await self._post(endpoint=f"api/2/auth/{username}/login.json")
152
153 @property
154 def headers(self) -> dict[str, str]:
155 """Session headers."""
156 if self.token is None:
157 raise RuntimeError("Token not set.")
158 return {"Authorization": f"Bearer {self.token}"}
159
160 async def _post(
161 self,
162 endpoint: str,
163 data: dict[str, Any] | list[Any] | None = None,
164 ) -> bytes:
165 """POST request."""
166 try:
167 response = await self.session.post(
168 f"{self.base_url}/{endpoint}",
169 json=data,
170 ssl=self.verify_ssl,
171 headers=self.headers if self.is_nextcloud else None,
172 raise_for_status=True,
173 auth=self.auth,
174 )
175 except ClientResponseError as exc:
176 self.logger.debug(exc)
177 raise RuntimeError(f"API POST call to {endpoint} failed.") from exc
178 if response.status != 200:
179 self.logger.debug(f"Call failed with status {response.status}")
180 raise RuntimeError(f"Api post call failed to {endpoint} failed!")
181 return await response.read()
182
183 async def _get(self, endpoint: str, params: dict[str, str | int] | None = None) -> bytes:
184 """GET request."""
185 response = await self.session.get(
186 f"{self.base_url}/{endpoint}",
187 params=params,
188 ssl=self.verify_ssl,
189 headers=self.headers if self.is_nextcloud else None,
190 auth=self.auth,
191 )
192 status = response.status
193 if response.content_type == "application/json" and status == 200:
194 return await response.read()
195 if status == 404:
196 return b""
197 self.logger.debug(f"Call failed with status {response.status}")
198 raise RuntimeError(f"API GET call to {endpoint} failed.")
199
200 async def get_subscriptions(self, since: int = 0) -> SubscriptionsGet | None:
201 """Get subscriptions.
202
203 since is unix time epoch - this may return none if there are no
204 subscriptions.
205 """
206 if self.is_nextcloud:
207 endpoint = f"{self._nextcloud_prefix}/subscriptions"
208 else:
209 endpoint = f"api/2/subscriptions/{self.username}/{self.device}.json"
210
211 response = await self._get(endpoint, params={"since": since})
212 if not response:
213 return None
214 return SubscriptionsGet.from_json(response)
215
216 async def get_episode_actions(
217 self, since: int = 0
218 ) -> tuple[list[EpisodeActionPlay | EpisodeActionNew | EpisodeActionDelete], int | None]:
219 """Get progresses or deletions. Timestamp is second return value.
220
221 gpodder net may filter by podcast
222 https://gpoddernet.readthedocs.io/en/latest/api/reference/events.html
223 -> we do not use this for now, since nextcloud implementation is not
224 capable of it. Also, implementation in drop-in replacements varies.
225
226 Play holds progress information.
227 New is a marked unplayed.
228 Delete is used if the user deletes a previously downloaded episode.
229 """
230 params: dict[str, str | int] = {"since": since}
231 if self.is_nextcloud:
232 endpoint = f"{self._nextcloud_prefix}/episode_action"
233 else:
234 endpoint = f"api/2/episodes/{self.username}.json"
235 params["device"] = self.device
236 response = await self._get(endpoint, params=params)
237 if not response:
238 return [], None
239 actions_response = EpisodeActionGet.from_json(response)
240
241 # play has progress information
242 # new means, there is no progress (i.e. mark unplayed)
243 actions = [
244 x
245 for x in actions_response.actions
246 if isinstance(x, EpisodeActionPlay | EpisodeActionNew | EpisodeActionDelete)
247 ]
248
249 with suppress(ValueError):
250 actions = sorted(actions, key=lambda x: datetime.datetime.fromisoformat(x.timestamp))[
251 ::-1
252 ]
253
254 return actions, actions_response.timestamp
255
256 async def update_subscriptions(
257 self, add: list[str] | None = None, remove: list[str] | None = None
258 ) -> None:
259 """Update subscriptions."""
260 if add is None:
261 add = []
262 if remove is None:
263 remove = []
264 request = SubscriptionsChangeRequest(add=add, remove=remove)
265 if self.is_nextcloud:
266 endpoint = f"{self._nextcloud_prefix}/subscription_change/create"
267 else:
268 endpoint = f"api/2/subscriptions/{self.username}/{self.device}.json"
269
270 await self._post(endpoint=endpoint, data=request.to_dict())
271
272 async def update_progress(
273 self,
274 *,
275 podcast_id: str,
276 episode_id: str,
277 guid: str | None,
278 position_s: float,
279 duration_s: float,
280 ) -> None:
281 """Update progress."""
282 utc_timestamp = (
283 datetime.datetime.now(datetime.UTC).replace(microsecond=0, tzinfo=None).isoformat()
284 )
285
286 episode_action: EpisodeActionNew | EpisodeActionPlay
287 if position_s == 0:
288 # mark unplayed
289 episode_action = EpisodeActionNew(
290 podcast=podcast_id, episode=episode_id, timestamp=utc_timestamp
291 )
292 else:
293 episode_action = EpisodeActionPlay(
294 podcast=podcast_id,
295 episode=episode_id,
296 timestamp=utc_timestamp,
297 position=int(position_s),
298 started=0,
299 total=int(duration_s),
300 )
301
302 # It is a bit unclear here, if other gpodder alternatives then nextcloud support the guid
303 # for episodes. I didn't see that in the source for opodsync at least...
304 if self.is_nextcloud:
305 episode_action.guid = guid
306 endpoint = f"{self._nextcloud_prefix}/episode_action/create"
307 else:
308 endpoint = f"api/2/episodes/{self.username}.json"
309 await self._post(endpoint=endpoint, data=[episode_action.to_dict()])
310