/
/
/
1"""A minimal client for the unofficial gw-API, which deezer is using on their website and app.
2
3Credits go out to RemixDev (https://gitlab.com/RemixDev) for figuring out, how to get the arl
4cookie based on the api_token.
5"""
6
7import datetime
8import json
9from collections.abc import Mapping
10from http.cookies import BaseCookie, Morsel
11from typing import Any, cast
12
13from aiohttp import ClientSession, ClientTimeout
14from music_assistant_models.errors import MediaNotFoundError
15from music_assistant_models.streamdetails import StreamDetails
16from yarl import URL
17
18from music_assistant.helpers.datetime import utc_timestamp
19
20USER_AGENT_HEADER = (
21 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
22 "Chrome/79.0.3945.130 Safari/537.36"
23)
24
25GW_LIGHT_URL = "https://www.deezer.com/ajax/gw-light.php"
26
27
28class DeezerGWError(BaseException):
29 """Exception type for GWClient related exceptions."""
30
31
32class GWClient:
33 """The GWClient class can be used to perform actions not being of the official API."""
34
35 _arl_token: str
36 _api_token: str
37 _gw_csrf_token: str | None
38 _license: str | None
39 _license_expiration_timestamp: int
40 _user_id: int
41 session: ClientSession
42 formats: list[dict[str, str]] = [
43 {"cipher": "BF_CBC_STRIPE", "format": "MP3_128"},
44 ]
45 user_country: str
46
47 def __init__(self, session: ClientSession, api_token: str, arl_token: str) -> None:
48 """Provide an aiohttp ClientSession and the deezer api_token."""
49 self._api_token = api_token
50 self._arl_token = arl_token
51 self.session = session
52
53 async def _set_cookie(self) -> None:
54 cookie: Morsel[str] = Morsel()
55
56 cookie.set("arl", self._arl_token, self._arl_token)
57 cookie.update({"domain": ".deezer.com", "path": "/", "httponly": "True"})
58
59 self.session.cookie_jar.update_cookies(BaseCookie({"arl": cookie}), URL(GW_LIGHT_URL))
60
61 async def _update_user_data(self) -> None:
62 user_data = await self._gw_api_call("deezer.getUserData", False)
63 if not user_data["results"]["USER"]["USER_ID"]:
64 await self._set_cookie()
65 user_data = await self._gw_api_call("deezer.getUserData", False)
66
67 if not user_data["results"]["OFFER_ID"]:
68 msg = "Free subscriptions cannot be used in MA. Make sure you set a valid ARL."
69 raise DeezerGWError(msg)
70
71 self._gw_csrf_token = user_data["results"]["checkForm"]
72 self._user_id = int(user_data["results"]["USER"]["USER_ID"])
73 self._license = user_data["results"]["USER"]["OPTIONS"]["license_token"]
74 self._license_expiration_timestamp = user_data["results"]["USER"]["OPTIONS"][
75 "expiration_timestamp"
76 ]
77 web_qualities = user_data["results"]["USER"]["OPTIONS"]["web_sound_quality"]
78 mobile_qualities = user_data["results"]["USER"]["OPTIONS"]["mobile_sound_quality"]
79 if web_qualities["high"] or mobile_qualities["high"]:
80 self.formats.insert(0, {"cipher": "BF_CBC_STRIPE", "format": "MP3_320"})
81 if web_qualities["lossless"] or mobile_qualities["lossless"]:
82 self.formats.insert(0, {"cipher": "BF_CBC_STRIPE", "format": "FLAC"})
83
84 self.user_country = user_data["results"]["COUNTRY"]
85
86 async def setup(self) -> None:
87 """Call this to let the client get its cookies, license and tokens."""
88 await self._set_cookie()
89 await self._update_user_data()
90
91 async def _get_license(self) -> str | None:
92 if (
93 self._license_expiration_timestamp
94 < (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp()
95 ):
96 await self._update_user_data()
97 return self._license
98
99 async def _gw_api_call(
100 self,
101 method: str,
102 use_csrf_token: bool = True,
103 args: dict[str, Any] | None = None,
104 params: dict[str, Any] | None = None,
105 http_method: str = "POST",
106 retry: bool = True,
107 ) -> dict[str, Any]:
108 csrf_token = self._gw_csrf_token if use_csrf_token else "null"
109 if params is None:
110 params = {}
111 parameters = {"api_version": "1.0", "api_token": csrf_token, "input": "3", "method": method}
112 parameters |= params
113 result = await self.session.request(
114 http_method,
115 GW_LIGHT_URL,
116 params=cast("Mapping[str, str]", parameters),
117 timeout=ClientTimeout(total=30),
118 json=args,
119 headers={"User-Agent": USER_AGENT_HEADER},
120 )
121 result_json = await result.json()
122
123 if result_json["error"]:
124 if retry:
125 await self._update_user_data()
126 return await self._gw_api_call(
127 method, use_csrf_token, args, params, http_method, False
128 )
129 msg = "Failed to call GW-API"
130 raise DeezerGWError(msg, result_json["error"])
131 return cast("dict[str, Any]", result_json)
132
133 async def get_user_radio(self, config_id: str) -> list[dict[str, Any]]:
134 """Get personalized Flow tracks for a specific mood or genre.
135
136 :param config_id: The Flow config identifier (e.g. "happy", "chill", "genre-rock").
137 """
138 result = await self._gw_api_call(
139 "radio.getUserRadio",
140 args={"config_id": config_id, "user_id": self._user_id},
141 )
142 if "data" not in result["results"]:
143 return []
144 return cast("list[dict[str, Any]]", result["results"]["data"])
145
146 async def get_home_flows(self) -> list[dict[str, Any]]:
147 """Discover available Flow variants from the Deezer home page."""
148 gateway_input = json.dumps(
149 {
150 "PAGE": "home",
151 "VERSION": "2.5",
152 "SUPPORT": {"filterable-grid": ["flow"]},
153 }
154 )
155 result = await self._gw_api_call(
156 "page.get",
157 params={"gateway_input": gateway_input},
158 )
159 sections = result["results"].get("sections", [])
160 for section in sections:
161 if section.get("layout") == "filterable-grid":
162 return cast("list[dict[str, Any]]", section["items"])
163 return []
164
165 async def get_song_data(self, track_id: str) -> dict[str, Any]:
166 """Get data such as the track token for a given track."""
167 return await self._gw_api_call("song.getData", args={"SNG_ID": track_id})
168
169 async def get_deezer_track_urls(self, track_id: str) -> tuple[dict[str, Any], dict[str, Any]]:
170 """Get the URL for a given track id."""
171 dz_license = await self._get_license()
172
173 song_results = await self.get_song_data(track_id)
174
175 song_data = song_results["results"]
176 # If the song has been replaced by a newer version, the old track will
177 # not play anymore. The data for the newer song is contained in a
178 # "FALLBACK" entry in the song data. So if that is available, use that
179 # instead so we get the right track token.
180 if "FALLBACK" in song_data:
181 song_data = song_data["FALLBACK"]
182
183 track_token = song_data["TRACK_TOKEN"]
184 url_data = {
185 "license_token": dz_license,
186 "media": [
187 {
188 "type": "FULL",
189 "formats": self.formats,
190 }
191 ],
192 "track_tokens": [track_token],
193 }
194 url_response = await self.session.post(
195 "https://media.deezer.com/v1/get_url",
196 json=url_data,
197 headers={"User-Agent": USER_AGENT_HEADER},
198 )
199 result_json = await url_response.json()
200
201 if error := result_json["data"][0].get("errors"):
202 msg = "Received an error from API"
203 raise DeezerGWError(msg, error)
204
205 media_list = result_json["data"][0].get("media", [])
206 if not media_list:
207 raise MediaNotFoundError(f"No media available for track {track_id}")
208
209 return media_list[0], song_data
210
211 async def log_listen(
212 self, next_track: str | None = None, last_track: StreamDetails | None = None
213 ) -> None:
214 """Log the next and/or previous track of the current playback queue."""
215 if not (next_track or last_track):
216 msg = "last or current track information must be provided."
217 raise DeezerGWError(msg)
218
219 payload: dict[str, Any] = {}
220
221 if next_track:
222 payload["next_media"] = {"media": {"id": next_track, "type": "song"}}
223
224 if last_track:
225 seconds_streamed = min(
226 utc_timestamp() - last_track.data["start_ts"],
227 last_track.seconds_streamed,
228 )
229
230 payload["params"] = {
231 "media": {
232 "id": last_track.item_id,
233 "type": "song",
234 "format": last_track.data["format"],
235 },
236 "type": 1,
237 "stat": {
238 "seek": 1 if seconds_streamed < last_track.duration else 0,
239 "pause": 0,
240 "sync": 0,
241 "next": bool(next_track),
242 },
243 "lt": int(seconds_streamed),
244 "ctxt": {"t": "search_page", "id": last_track.item_id},
245 "dev": {"v": "10020230525142740", "t": 0},
246 "ls": [],
247 "ts_listen": int(last_track.data["start_ts"]),
248 "is_shuffle": False,
249 "stream_id": str(last_track.data["stream_id"]),
250 }
251
252 await self._gw_api_call("log.listen", args=payload)
253