music-assistant-server

9.4 KBPY
gw_client.py
9.4 KB253 lines • python
1"""A minimal client for the unofficial gw-API, which deezer is using on their website and app.
2
3Credits go out to RemixDev (https://gitlab.com/RemixDev) for figuring out, how to get the arl
4cookie based on the api_token.
5"""
6
7import datetime
8import json
9from collections.abc import Mapping
10from http.cookies import BaseCookie, Morsel
11from typing import Any, cast
12
13from aiohttp import ClientSession, ClientTimeout
14from music_assistant_models.errors import MediaNotFoundError
15from music_assistant_models.streamdetails import StreamDetails
16from yarl import URL
17
18from music_assistant.helpers.datetime import utc_timestamp
19
20USER_AGENT_HEADER = (
21    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
22    "Chrome/79.0.3945.130 Safari/537.36"
23)
24
25GW_LIGHT_URL = "https://www.deezer.com/ajax/gw-light.php"
26
27
28class DeezerGWError(BaseException):
29    """Exception type for GWClient related exceptions."""
30
31
32class GWClient:
33    """The GWClient class can be used to perform actions not being of the official API."""
34
35    _arl_token: str
36    _api_token: str
37    _gw_csrf_token: str | None
38    _license: str | None
39    _license_expiration_timestamp: int
40    _user_id: int
41    session: ClientSession
42    formats: list[dict[str, str]] = [
43        {"cipher": "BF_CBC_STRIPE", "format": "MP3_128"},
44    ]
45    user_country: str
46
47    def __init__(self, session: ClientSession, api_token: str, arl_token: str) -> None:
48        """Provide an aiohttp ClientSession and the deezer api_token."""
49        self._api_token = api_token
50        self._arl_token = arl_token
51        self.session = session
52
53    async def _set_cookie(self) -> None:
54        cookie: Morsel[str] = Morsel()
55
56        cookie.set("arl", self._arl_token, self._arl_token)
57        cookie.update({"domain": ".deezer.com", "path": "/", "httponly": "True"})
58
59        self.session.cookie_jar.update_cookies(BaseCookie({"arl": cookie}), URL(GW_LIGHT_URL))
60
61    async def _update_user_data(self) -> None:
62        user_data = await self._gw_api_call("deezer.getUserData", False)
63        if not user_data["results"]["USER"]["USER_ID"]:
64            await self._set_cookie()
65            user_data = await self._gw_api_call("deezer.getUserData", False)
66
67        if not user_data["results"]["OFFER_ID"]:
68            msg = "Free subscriptions cannot be used in MA. Make sure you set a valid ARL."
69            raise DeezerGWError(msg)
70
71        self._gw_csrf_token = user_data["results"]["checkForm"]
72        self._user_id = int(user_data["results"]["USER"]["USER_ID"])
73        self._license = user_data["results"]["USER"]["OPTIONS"]["license_token"]
74        self._license_expiration_timestamp = user_data["results"]["USER"]["OPTIONS"][
75            "expiration_timestamp"
76        ]
77        web_qualities = user_data["results"]["USER"]["OPTIONS"]["web_sound_quality"]
78        mobile_qualities = user_data["results"]["USER"]["OPTIONS"]["mobile_sound_quality"]
79        if web_qualities["high"] or mobile_qualities["high"]:
80            self.formats.insert(0, {"cipher": "BF_CBC_STRIPE", "format": "MP3_320"})
81        if web_qualities["lossless"] or mobile_qualities["lossless"]:
82            self.formats.insert(0, {"cipher": "BF_CBC_STRIPE", "format": "FLAC"})
83
84        self.user_country = user_data["results"]["COUNTRY"]
85
86    async def setup(self) -> None:
87        """Call this to let the client get its cookies, license and tokens."""
88        await self._set_cookie()
89        await self._update_user_data()
90
91    async def _get_license(self) -> str | None:
92        if (
93            self._license_expiration_timestamp
94            < (datetime.datetime.now() + datetime.timedelta(days=1)).timestamp()
95        ):
96            await self._update_user_data()
97        return self._license
98
99    async def _gw_api_call(
100        self,
101        method: str,
102        use_csrf_token: bool = True,
103        args: dict[str, Any] | None = None,
104        params: dict[str, Any] | None = None,
105        http_method: str = "POST",
106        retry: bool = True,
107    ) -> dict[str, Any]:
108        csrf_token = self._gw_csrf_token if use_csrf_token else "null"
109        if params is None:
110            params = {}
111        parameters = {"api_version": "1.0", "api_token": csrf_token, "input": "3", "method": method}
112        parameters |= params
113        result = await self.session.request(
114            http_method,
115            GW_LIGHT_URL,
116            params=cast("Mapping[str, str]", parameters),
117            timeout=ClientTimeout(total=30),
118            json=args,
119            headers={"User-Agent": USER_AGENT_HEADER},
120        )
121        result_json = await result.json()
122
123        if result_json["error"]:
124            if retry:
125                await self._update_user_data()
126                return await self._gw_api_call(
127                    method, use_csrf_token, args, params, http_method, False
128                )
129            msg = "Failed to call GW-API"
130            raise DeezerGWError(msg, result_json["error"])
131        return cast("dict[str, Any]", result_json)
132
133    async def get_user_radio(self, config_id: str) -> list[dict[str, Any]]:
134        """Get personalized Flow tracks for a specific mood or genre.
135
136        :param config_id: The Flow config identifier (e.g. "happy", "chill", "genre-rock").
137        """
138        result = await self._gw_api_call(
139            "radio.getUserRadio",
140            args={"config_id": config_id, "user_id": self._user_id},
141        )
142        if "data" not in result["results"]:
143            return []
144        return cast("list[dict[str, Any]]", result["results"]["data"])
145
146    async def get_home_flows(self) -> list[dict[str, Any]]:
147        """Discover available Flow variants from the Deezer home page."""
148        gateway_input = json.dumps(
149            {
150                "PAGE": "home",
151                "VERSION": "2.5",
152                "SUPPORT": {"filterable-grid": ["flow"]},
153            }
154        )
155        result = await self._gw_api_call(
156            "page.get",
157            params={"gateway_input": gateway_input},
158        )
159        sections = result["results"].get("sections", [])
160        for section in sections:
161            if section.get("layout") == "filterable-grid":
162                return cast("list[dict[str, Any]]", section["items"])
163        return []
164
165    async def get_song_data(self, track_id: str) -> dict[str, Any]:
166        """Get data such as the track token for a given track."""
167        return await self._gw_api_call("song.getData", args={"SNG_ID": track_id})
168
169    async def get_deezer_track_urls(self, track_id: str) -> tuple[dict[str, Any], dict[str, Any]]:
170        """Get the URL for a given track id."""
171        dz_license = await self._get_license()
172
173        song_results = await self.get_song_data(track_id)
174
175        song_data = song_results["results"]
176        # If the song has been replaced by a newer version, the old track will
177        # not play anymore. The data for the newer song is contained in a
178        # "FALLBACK" entry in the song data. So if that is available, use that
179        # instead so we get the right track token.
180        if "FALLBACK" in song_data:
181            song_data = song_data["FALLBACK"]
182
183        track_token = song_data["TRACK_TOKEN"]
184        url_data = {
185            "license_token": dz_license,
186            "media": [
187                {
188                    "type": "FULL",
189                    "formats": self.formats,
190                }
191            ],
192            "track_tokens": [track_token],
193        }
194        url_response = await self.session.post(
195            "https://media.deezer.com/v1/get_url",
196            json=url_data,
197            headers={"User-Agent": USER_AGENT_HEADER},
198        )
199        result_json = await url_response.json()
200
201        if error := result_json["data"][0].get("errors"):
202            msg = "Received an error from API"
203            raise DeezerGWError(msg, error)
204
205        media_list = result_json["data"][0].get("media", [])
206        if not media_list:
207            raise MediaNotFoundError(f"No media available for track {track_id}")
208
209        return media_list[0], song_data
210
211    async def log_listen(
212        self, next_track: str | None = None, last_track: StreamDetails | None = None
213    ) -> None:
214        """Log the next and/or previous track of the current playback queue."""
215        if not (next_track or last_track):
216            msg = "last or current track information must be provided."
217            raise DeezerGWError(msg)
218
219        payload: dict[str, Any] = {}
220
221        if next_track:
222            payload["next_media"] = {"media": {"id": next_track, "type": "song"}}
223
224        if last_track:
225            seconds_streamed = min(
226                utc_timestamp() - last_track.data["start_ts"],
227                last_track.seconds_streamed,
228            )
229
230            payload["params"] = {
231                "media": {
232                    "id": last_track.item_id,
233                    "type": "song",
234                    "format": last_track.data["format"],
235                },
236                "type": 1,
237                "stat": {
238                    "seek": 1 if seconds_streamed < last_track.duration else 0,
239                    "pause": 0,
240                    "sync": 0,
241                    "next": bool(next_track),
242                },
243                "lt": int(seconds_streamed),
244                "ctxt": {"t": "search_page", "id": last_track.item_id},
245                "dev": {"v": "10020230525142740", "t": 0},
246                "ls": [],
247                "ts_listen": int(last_track.data["start_ts"]),
248                "is_shuffle": False,
249                "stream_id": str(last_track.data["stream_id"]),
250            }
251
252        await self._gw_api_call("log.listen", args=payload)
253