music-assistant-server

10.3 KBPY
auth_manager.py
10.3 KB304 lines • python
1"""Authentication manager for Tidal integration."""
2
3import json
4import random
5import time
6import urllib
7from collections.abc import Callable
8from dataclasses import dataclass
9from types import TracebackType
10from typing import TYPE_CHECKING, Any
11
12import pkce
13from aiohttp import ClientSession
14from music_assistant_models.enums import EventType
15from music_assistant_models.errors import LoginFailed
16
17from music_assistant.helpers.app_vars import app_var  # type: ignore[attr-defined]
18
19if TYPE_CHECKING:
20    from music_assistant.mass import MusicAssistant
21
22# Configuration constants
23TOKEN_TYPE = "Bearer"
24AUTH_URL = "https://auth.tidal.com/v1/oauth2"
25REDIRECT_URI = "https://tidal.com/android/login/auth"
26
27TOKEN_REFRESH_BUFFER = 60 * 7  # 7 minutes
28
29
30@dataclass
31class TidalUser:
32    """Represent a Tidal user with their associated account information."""
33
34    user_id: str | None = None
35    country_code: str | None = None
36    session_id: str | None = None
37    profile_name: str | None = None
38    user_name: str | None = None
39    email: str | None = None
40
41
42class ManualAuthenticationHelper:
43    """Helper for authentication flows that require manual user intervention.
44
45    For Tidal where the OAuth flow doesn't redirect to our callback,
46    but instead requires the user to manually copy a URL after authentication.
47    """
48
49    def __init__(self, mass: "MusicAssistant", session_id: str) -> None:
50        """Initialize the Manual Authentication Helper."""
51        self.mass = mass
52        self.session_id = session_id
53
54    async def __aenter__(self) -> "ManualAuthenticationHelper":
55        """Enter context manager."""
56        return self
57
58    async def __aexit__(
59        self,
60        exc_type: type[BaseException] | None,
61        exc_val: BaseException | None,
62        exc_tb: TracebackType | None,
63    ) -> bool | None:
64        """Exit context manager."""
65        return None
66
67    def send_url(self, auth_url: str) -> None:
68        """Send the URL to the user for authentication."""
69        self.mass.signal_event(EventType.AUTH_SESSION, self.session_id, auth_url)
70
71
72class TidalAuthManager:
73    """Manager for Tidal authentication process."""
74
75    def __init__(
76        self,
77        http_session: ClientSession,
78        config_updater: Callable[[dict[str, Any]], None],
79        logger: Any,
80    ):
81        """Initialize Tidal auth manager."""
82        self.http_session = http_session
83        self.update_config = config_updater
84        self.logger = logger
85        self._auth_info: dict[str, Any] | None = None
86        self.user = TidalUser()
87
88    async def initialize(self, auth_data: str) -> bool:
89        """Initialize the auth manager with stored auth data."""
90        if not auth_data:
91            return False
92
93        # Parse stored auth data
94        try:
95            self._auth_info = json.loads(auth_data)
96        except json.JSONDecodeError as err:
97            self.logger.error("Invalid authentication data: %s", err)
98            return False
99
100        # Ensure we have a valid token
101        return await self.ensure_valid_token()
102
103    @property
104    def user_id(self) -> str | None:
105        """Return the current user ID."""
106        return self.user.user_id
107
108    @property
109    def country_code(self) -> str | None:
110        """Return the current country code."""
111        return self.user.country_code
112
113    @property
114    def session_id(self) -> str | None:
115        """Return the current session ID."""
116        return self.user.session_id
117
118    @property
119    def access_token(self) -> str | None:
120        """Return the current access token."""
121        return self._auth_info.get("access_token") if self._auth_info else None
122
123    async def ensure_valid_token(self) -> bool:
124        """Ensure we have a valid token, refresh if needed."""
125        if not self._auth_info:
126            return False
127
128        # Check if token is expired
129        expires_at = self._auth_info.get("expires_at", 0)
130        if expires_at > time.time() + TOKEN_REFRESH_BUFFER:
131            return True
132
133        # Need to refresh token
134        return await self.refresh_token()
135
136    async def refresh_token(self) -> bool:
137        """Refresh the auth token."""
138        if not self._auth_info:
139            return False
140
141        refresh_token = self._auth_info.get("refresh_token")
142        if not refresh_token:
143            return False
144
145        client_id = self._auth_info.get("client_id", app_var(9))
146
147        data = {
148            "refresh_token": refresh_token,
149            "client_id": client_id,
150            "grant_type": "refresh_token",
151            "scope": "r_usr w_usr w_sub",
152        }
153
154        headers = {"Content-Type": "application/x-www-form-urlencoded"}
155
156        async with self.http_session.post(
157            f"{AUTH_URL}/token", data=data, headers=headers
158        ) as response:
159            if response.status != 200:
160                self.logger.error("Failed to refresh token: %s", await response.text())
161                return False
162
163            token_data = await response.json()
164
165            # Update auth info
166            self._auth_info["access_token"] = token_data["access_token"]
167            if "refresh_token" in token_data:
168                self._auth_info["refresh_token"] = token_data["refresh_token"]
169
170            # Update expiration
171            if "expires_in" in token_data:
172                self._auth_info["expires_at"] = time.time() + token_data["expires_in"]
173
174            # Store updated auth info
175            self.update_config(self._auth_info)
176
177            return True
178
179    async def update_user_info(self, user_info: dict[str, Any], session_id: str) -> None:
180        """Update user info from API response."""
181        # Update the TidalUser dataclass with values from API response
182        self.user = TidalUser(
183            user_id=user_info.get("id"),
184            country_code=user_info.get("countryCode"),
185            session_id=session_id,
186            profile_name=user_info.get("profileName"),
187            user_name=user_info.get("username"),
188        )
189
190    @staticmethod
191    async def generate_auth_url(auth_helper: ManualAuthenticationHelper, quality: str) -> str:
192        """Generate the Tidal authentication URL."""
193        # Generate PKCE challenge
194        code_verifier, code_challenge = pkce.generate_pkce_pair()
195        # Generate unique client key
196        client_unique_key = format(random.getrandbits(64), "02x")
197        # Store these values for later use
198        auth_params = {
199            "code_verifier": code_verifier,
200            "client_unique_key": client_unique_key,
201            "client_id": app_var(9),
202            "client_secret": app_var(10),
203            "quality": quality,
204        }
205
206        # Create auth URL
207        params = {
208            "response_type": "code",
209            "redirect_uri": REDIRECT_URI,
210            "client_id": auth_params["client_id"],
211            "lang": "EN",
212            "appMode": "android",
213            "client_unique_key": client_unique_key,
214            "code_challenge": code_challenge,
215            "code_challenge_method": "S256",
216            "restrict_signup": "true",
217        }
218
219        url = f"https://login.tidal.com/authorize?{urllib.parse.urlencode(params)}"
220
221        # Send URL to user
222        auth_helper.mass.loop.call_soon_threadsafe(auth_helper.send_url, url)
223
224        # Return serialized auth params
225        return json.dumps(auth_params)
226
227    @staticmethod
228    async def process_pkce_login(
229        http_session: ClientSession, base64_auth_params: str, redirect_url: str
230    ) -> dict[str, Any]:
231        """Process TIDAL authentication with PKCE flow."""
232        # Parse the stored auth parameters
233        try:
234            auth_params = json.loads(base64_auth_params)
235        except json.JSONDecodeError as err:
236            raise LoginFailed("Invalid authentication data") from err
237
238        # Extract required parameters
239        code_verifier = auth_params.get("code_verifier")
240        client_unique_key = auth_params.get("client_unique_key")
241        client_secret = auth_params.get("client_secret")
242        client_id = auth_params.get("client_id")
243        quality = auth_params.get("quality")
244
245        if not code_verifier or not client_unique_key:
246            raise LoginFailed("Missing required authentication parameters")
247
248        # Extract the authorization code from the redirect URL
249        parsed_url = urllib.parse.urlparse(redirect_url)
250        query_params = urllib.parse.parse_qs(parsed_url.query)
251        code = query_params.get("code", [""])[0]
252
253        if not code:
254            raise LoginFailed("No authorization code found in redirect URL")
255
256        # Prepare the token exchange request
257        token_url = f"{AUTH_URL}/token"
258        data = {
259            "code": code,
260            "client_id": client_id,
261            "grant_type": "authorization_code",
262            "redirect_uri": REDIRECT_URI,
263            "scope": "r_usr w_usr w_sub",
264            "code_verifier": code_verifier,
265            "client_unique_key": client_unique_key,
266        }
267
268        headers = {"Content-Type": "application/x-www-form-urlencoded"}
269
270        # Make the token exchange request
271        async with http_session.post(token_url, data=data, headers=headers) as response:
272            if response.status != 200:
273                error_text = await response.text()
274                raise LoginFailed(f"Token exchange failed: {error_text}")
275
276            token_data = await response.json()
277
278        # Validate we have authentication data
279        if not token_data.get("access_token") or not token_data.get("refresh_token"):
280            raise LoginFailed("Failed to obtain authentication tokens from Tidal")
281
282        # Get user information using the new token
283        headers = {"Authorization": f"Bearer {token_data['access_token']}"}
284        sessions_url = "https://api.tidal.com/v1/sessions"
285
286        # Again use mass.http_session
287        async with http_session.get(sessions_url, headers=headers) as response:
288            if response.status != 200:
289                error_text = await response.text()
290                raise LoginFailed(f"Failed to get user info: {error_text}")
291
292            user_info = await response.json()
293
294        # Combine token and user info, add expiration time
295        auth_data = {**token_data, **user_info}
296
297        # Add standard fields used by TidalProvider
298        auth_data["expires_at"] = time.time() + token_data.get("expires_in", 3600)
299        auth_data["quality"] = quality
300        auth_data["client_id"] = client_id
301        auth_data["client_secret"] = client_secret
302
303        return auth_data
304