/
/
/
1"""A minimal client for the unofficial gw-API, which deezer is using on their website and app.
2
3Credits go out to RemixDev (https://gitlab.com/RemixDev) for figuring out, how to get the arl
4cookie based on the api_token.
5"""
6
7import datetime
8from collections.abc import Mapping
9from http.cookies import BaseCookie, Morsel
10from typing import Any, cast
11
12from aiohttp import ClientSession, ClientTimeout
13from music_assistant_models.errors import MediaNotFoundError
14from music_assistant_models.streamdetails import StreamDetails
15from yarl import URL
16
17from music_assistant.helpers.datetime import utc_timestamp
18
19USER_AGENT_HEADER = (
20 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
21 "Chrome/79.0.3945.130 Safari/537.36"
22)
23
24GW_LIGHT_URL = "https://www.deezer.com/ajax/gw-light.php"
25
26
27class DeezerGWError(BaseException):
28 """Exception type for GWClient related exceptions."""
29
30
31class GWClient:
32 """The GWClient class can be used to perform actions not being of the official API."""
33
34 _arl_token: str
35 _api_token: str
36 _gw_csrf_token: str | None
37 _license: str | None
38 _license_expiration_timestamp: int
39 session: ClientSession
40 formats: list[dict[str, str]] = [
41 {"cipher": "BF_CBC_STRIPE", "format": "MP3_128"},
42 ]
43 user_country: str
44
45 def __init__(self, session: ClientSession, api_token: str, arl_token: str) -> None:
46 """Provide an aiohttp ClientSession and the deezer api_token."""
47 self._api_token = api_token
48 self._arl_token = arl_token
49 self.session = session
50
51 async def _set_cookie(self) -> None:
52 cookie: Morsel[str] = Morsel()
53
54 cookie.set("arl", self._arl_token, self._arl_token)
55 cookie.update({"domain": ".deezer.com", "path": "/", "httponly": "True"})
56
57 self.session.cookie_jar.update_cookies(BaseCookie({"arl": cookie}), URL(GW_LIGHT_URL))
58
59 async def _update_user_data(self) -> None:
60 user_data = await self._gw_api_call("deezer.getUserData", False)
61 if not user_data["results"]["USER"]["USER_ID"]:
62 await self._set_cookie()
63 user_data = await self._gw_api_call("deezer.getUserData", False)
64
65 if not user_data["results"]["OFFER_ID"]:
66 msg = "Free subscriptions cannot be used in MA. Make sure you set a valid ARL."
67 raise DeezerGWError(msg)
68
69 self._gw_csrf_token = user_data["results"]["checkForm"]
70 self._license = user_data["results"]["USER"]["OPTIONS"]["license_token"]
71 self._license_expiration_timestamp = user_data["results"]["USER"]["OPTIONS"][
72 "expiration_timestamp"
73 ]
74 web_qualities = user_data["results"]["USER"]["OPTIONS"]["web_sound_quality"]
75 mobile_qualities = user_data["results"]["USER"]["OPTIONS"]["mobile_sound_quality"]
76 if web_qualities["high"] or mobile_qualities["high"]:
77 self.formats.insert(0, {"cipher": "BF_CBC_STRIPE", "format": "MP3_320"})
78 if web_qualities["lossless"] or mobile_qualities["lossless"]:
79 self.formats.insert(0, {"cipher": "BF_CBC_STRIPE", "format": "FLAC"})
80
81 self.user_country = user_data["results"]["COUNTRY"]
82
83 async def setup(self) -> None:
84 """Call this to let the client get its cookies, license and tokens."""
85 await self._set_cookie()
86 await self._update_user_data()
87
88 async def _get_license(self) -> str | None:
89 if (
90 self._license_expiration_timestamp
91 < (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp()
92 ):
93 await self._update_user_data()
94 return self._license
95
96 async def _gw_api_call(
97 self,
98 method: str,
99 use_csrf_token: bool = True,
100 args: dict[str, Any] | None = None,
101 params: dict[str, Any] | None = None,
102 http_method: str = "POST",
103 retry: bool = True,
104 ) -> dict[str, Any]:
105 csrf_token = self._gw_csrf_token if use_csrf_token else "null"
106 if params is None:
107 params = {}
108 parameters = {"api_version": "1.0", "api_token": csrf_token, "input": "3", "method": method}
109 parameters |= params
110 result = await self.session.request(
111 http_method,
112 GW_LIGHT_URL,
113 params=cast("Mapping[str, str]", parameters),
114 timeout=ClientTimeout(total=30),
115 json=args,
116 headers={"User-Agent": USER_AGENT_HEADER},
117 )
118 result_json = await result.json()
119
120 if result_json["error"]:
121 if retry:
122 await self._update_user_data()
123 return await self._gw_api_call(
124 method, use_csrf_token, args, params, http_method, False
125 )
126 msg = "Failed to call GW-API"
127 raise DeezerGWError(msg, result_json["error"])
128 return cast("dict[str, Any]", result_json)
129
130 async def get_song_data(self, track_id: str) -> dict[str, Any]:
131 """Get data such as the track token for a given track."""
132 return await self._gw_api_call("song.getData", args={"SNG_ID": track_id})
133
134 async def get_deezer_track_urls(self, track_id: str) -> tuple[dict[str, Any], dict[str, Any]]:
135 """Get the URL for a given track id."""
136 dz_license = await self._get_license()
137
138 song_results = await self.get_song_data(track_id)
139
140 song_data = song_results["results"]
141 # If the song has been replaced by a newer version, the old track will
142 # not play anymore. The data for the newer song is contained in a
143 # "FALLBACK" entry in the song data. So if that is available, use that
144 # instead so we get the right track token.
145 if "FALLBACK" in song_data:
146 song_data = song_data["FALLBACK"]
147
148 track_token = song_data["TRACK_TOKEN"]
149 url_data = {
150 "license_token": dz_license,
151 "media": [
152 {
153 "type": "FULL",
154 "formats": self.formats,
155 }
156 ],
157 "track_tokens": [track_token],
158 }
159 url_response = await self.session.post(
160 "https://media.deezer.com/v1/get_url",
161 json=url_data,
162 headers={"User-Agent": USER_AGENT_HEADER},
163 )
164 result_json = await url_response.json()
165
166 if error := result_json["data"][0].get("errors"):
167 msg = "Received an error from API"
168 raise DeezerGWError(msg, error)
169
170 media_list = result_json["data"][0].get("media", [])
171 if not media_list:
172 raise MediaNotFoundError(f"No media available for track {track_id}")
173
174 return media_list[0], song_data
175
176 async def log_listen(
177 self, next_track: str | None = None, last_track: StreamDetails | None = None
178 ) -> None:
179 """Log the next and/or previous track of the current playback queue."""
180 if not (next_track or last_track):
181 msg = "last or current track information must be provided."
182 raise DeezerGWError(msg)
183
184 payload: dict[str, Any] = {}
185
186 if next_track:
187 payload["next_media"] = {"media": {"id": next_track, "type": "song"}}
188
189 if last_track:
190 seconds_streamed = min(
191 utc_timestamp() - last_track.data["start_ts"],
192 last_track.seconds_streamed,
193 )
194
195 payload["params"] = {
196 "media": {
197 "id": last_track.item_id,
198 "type": "song",
199 "format": last_track.data["format"],
200 },
201 "type": 1,
202 "stat": {
203 "seek": 1 if seconds_streamed < last_track.duration else 0,
204 "pause": 0,
205 "sync": 0,
206 "next": bool(next_track),
207 },
208 "lt": int(seconds_streamed),
209 "ctxt": {"t": "search_page", "id": last_track.item_id},
210 "dev": {"v": "10020230525142740", "t": 0},
211 "ls": [],
212 "ts_listen": int(last_track.data["start_ts"]),
213 "is_shuffle": False,
214 "stream_id": str(last_track.data["stream_id"]),
215 }
216
217 await self._gw_api_call("log.listen", args=payload)
218