/
/
/
1"""Authentication manager for Tidal integration."""
2
3import json
4import random
5import time
6import urllib
7from collections.abc import Callable
8from dataclasses import dataclass
9from types import TracebackType
10from typing import TYPE_CHECKING, Any
11
12import pkce
13from aiohttp import ClientSession
14from music_assistant_models.enums import EventType
15from music_assistant_models.errors import LoginFailed
16
17from music_assistant.helpers.app_vars import app_var # type: ignore[attr-defined]
18
19if TYPE_CHECKING:
20 from music_assistant.mass import MusicAssistant
21
22# Configuration constants
23TOKEN_TYPE = "Bearer"
24AUTH_URL = "https://auth.tidal.com/v1/oauth2"
25REDIRECT_URI = "https://tidal.com/android/login/auth"
26
27TOKEN_REFRESH_BUFFER = 60 * 7 # 7 minutes
28
29
30@dataclass
31class TidalUser:
32 """Represent a Tidal user with their associated account information."""
33
34 user_id: str | None = None
35 country_code: str | None = None
36 session_id: str | None = None
37 profile_name: str | None = None
38 user_name: str | None = None
39 email: str | None = None
40
41
42class ManualAuthenticationHelper:
43 """Helper for authentication flows that require manual user intervention.
44
45 For Tidal where the OAuth flow doesn't redirect to our callback,
46 but instead requires the user to manually copy a URL after authentication.
47 """
48
49 def __init__(self, mass: "MusicAssistant", session_id: str) -> None:
50 """Initialize the Manual Authentication Helper."""
51 self.mass = mass
52 self.session_id = session_id
53
54 async def __aenter__(self) -> "ManualAuthenticationHelper":
55 """Enter context manager."""
56 return self
57
58 async def __aexit__(
59 self,
60 exc_type: type[BaseException] | None,
61 exc_val: BaseException | None,
62 exc_tb: TracebackType | None,
63 ) -> bool | None:
64 """Exit context manager."""
65 return None
66
67 def send_url(self, auth_url: str) -> None:
68 """Send the URL to the user for authentication."""
69 self.mass.signal_event(EventType.AUTH_SESSION, self.session_id, auth_url)
70
71
72class TidalAuthManager:
73 """Manager for Tidal authentication process."""
74
75 def __init__(
76 self,
77 http_session: ClientSession,
78 config_updater: Callable[[dict[str, Any]], None],
79 logger: Any,
80 ):
81 """Initialize Tidal auth manager."""
82 self.http_session = http_session
83 self.update_config = config_updater
84 self.logger = logger
85 self._auth_info: dict[str, Any] | None = None
86 self.user = TidalUser()
87
88 async def initialize(self, auth_data: str) -> bool:
89 """Initialize the auth manager with stored auth data."""
90 if not auth_data:
91 return False
92
93 # Parse stored auth data
94 try:
95 self._auth_info = json.loads(auth_data)
96 except json.JSONDecodeError as err:
97 self.logger.error("Invalid authentication data: %s", err)
98 return False
99
100 # Ensure we have a valid token
101 return await self.ensure_valid_token()
102
103 @property
104 def user_id(self) -> str | None:
105 """Return the current user ID."""
106 return self.user.user_id
107
108 @property
109 def country_code(self) -> str | None:
110 """Return the current country code."""
111 return self.user.country_code
112
113 @property
114 def session_id(self) -> str | None:
115 """Return the current session ID."""
116 return self.user.session_id
117
118 @property
119 def access_token(self) -> str | None:
120 """Return the current access token."""
121 return self._auth_info.get("access_token") if self._auth_info else None
122
123 async def ensure_valid_token(self) -> bool:
124 """Ensure we have a valid token, refresh if needed."""
125 if not self._auth_info:
126 return False
127
128 # Check if token is expired
129 expires_at = self._auth_info.get("expires_at", 0)
130 if expires_at > time.time() + TOKEN_REFRESH_BUFFER:
131 return True
132
133 # Need to refresh token
134 return await self.refresh_token()
135
136 async def refresh_token(self) -> bool:
137 """Refresh the auth token."""
138 if not self._auth_info:
139 return False
140
141 refresh_token = self._auth_info.get("refresh_token")
142 if not refresh_token:
143 return False
144
145 client_id = self._auth_info.get("client_id", app_var(9))
146
147 data = {
148 "refresh_token": refresh_token,
149 "client_id": client_id,
150 "grant_type": "refresh_token",
151 "scope": "r_usr w_usr w_sub",
152 }
153
154 headers = {"Content-Type": "application/x-www-form-urlencoded"}
155
156 async with self.http_session.post(
157 f"{AUTH_URL}/token", data=data, headers=headers
158 ) as response:
159 if response.status != 200:
160 self.logger.error("Failed to refresh token: %s", await response.text())
161 return False
162
163 token_data = await response.json()
164
165 # Update auth info
166 self._auth_info["access_token"] = token_data["access_token"]
167 if "refresh_token" in token_data:
168 self._auth_info["refresh_token"] = token_data["refresh_token"]
169
170 # Update expiration
171 if "expires_in" in token_data:
172 self._auth_info["expires_at"] = time.time() + token_data["expires_in"]
173
174 # Store updated auth info
175 self.update_config(self._auth_info)
176
177 return True
178
179 async def update_user_info(self, user_info: dict[str, Any], session_id: str) -> None:
180 """Update user info from API response."""
181 # Update the TidalUser dataclass with values from API response
182 self.user = TidalUser(
183 user_id=user_info.get("id"),
184 country_code=user_info.get("countryCode"),
185 session_id=session_id,
186 profile_name=user_info.get("profileName"),
187 user_name=user_info.get("username"),
188 )
189
190 @staticmethod
191 async def generate_auth_url(auth_helper: ManualAuthenticationHelper, quality: str) -> str:
192 """Generate the Tidal authentication URL."""
193 # Generate PKCE challenge
194 code_verifier, code_challenge = pkce.generate_pkce_pair()
195 # Generate unique client key
196 client_unique_key = format(random.getrandbits(64), "02x")
197 # Store these values for later use
198 auth_params = {
199 "code_verifier": code_verifier,
200 "client_unique_key": client_unique_key,
201 "client_id": app_var(9),
202 "client_secret": app_var(10),
203 "quality": quality,
204 }
205
206 # Create auth URL
207 params = {
208 "response_type": "code",
209 "redirect_uri": REDIRECT_URI,
210 "client_id": auth_params["client_id"],
211 "lang": "EN",
212 "appMode": "android",
213 "client_unique_key": client_unique_key,
214 "code_challenge": code_challenge,
215 "code_challenge_method": "S256",
216 "restrict_signup": "true",
217 }
218
219 url = f"https://login.tidal.com/authorize?{urllib.parse.urlencode(params)}"
220
221 # Send URL to user
222 auth_helper.mass.loop.call_soon_threadsafe(auth_helper.send_url, url)
223
224 # Return serialized auth params
225 return json.dumps(auth_params)
226
227 @staticmethod
228 async def process_pkce_login(
229 http_session: ClientSession, base64_auth_params: str, redirect_url: str
230 ) -> dict[str, Any]:
231 """Process TIDAL authentication with PKCE flow."""
232 # Parse the stored auth parameters
233 try:
234 auth_params = json.loads(base64_auth_params)
235 except json.JSONDecodeError as err:
236 raise LoginFailed("Invalid authentication data") from err
237
238 # Extract required parameters
239 code_verifier = auth_params.get("code_verifier")
240 client_unique_key = auth_params.get("client_unique_key")
241 client_secret = auth_params.get("client_secret")
242 client_id = auth_params.get("client_id")
243 quality = auth_params.get("quality")
244
245 if not code_verifier or not client_unique_key:
246 raise LoginFailed("Missing required authentication parameters")
247
248 # Extract the authorization code from the redirect URL
249 parsed_url = urllib.parse.urlparse(redirect_url)
250 query_params = urllib.parse.parse_qs(parsed_url.query)
251 code = query_params.get("code", [""])[0]
252
253 if not code:
254 raise LoginFailed("No authorization code found in redirect URL")
255
256 # Prepare the token exchange request
257 token_url = f"{AUTH_URL}/token"
258 data = {
259 "code": code,
260 "client_id": client_id,
261 "grant_type": "authorization_code",
262 "redirect_uri": REDIRECT_URI,
263 "scope": "r_usr w_usr w_sub",
264 "code_verifier": code_verifier,
265 "client_unique_key": client_unique_key,
266 }
267
268 headers = {"Content-Type": "application/x-www-form-urlencoded"}
269
270 # Make the token exchange request
271 async with http_session.post(token_url, data=data, headers=headers) as response:
272 if response.status != 200:
273 error_text = await response.text()
274 raise LoginFailed(f"Token exchange failed: {error_text}")
275
276 token_data = await response.json()
277
278 # Validate we have authentication data
279 if not token_data.get("access_token") or not token_data.get("refresh_token"):
280 raise LoginFailed("Failed to obtain authentication tokens from Tidal")
281
282 # Get user information using the new token
283 headers = {"Authorization": f"Bearer {token_data['access_token']}"}
284 sessions_url = "https://api.tidal.com/v1/sessions"
285
286 # Again use mass.http_session
287 async with http_session.get(sessions_url, headers=headers) as response:
288 if response.status != 200:
289 error_text = await response.text()
290 raise LoginFailed(f"Failed to get user info: {error_text}")
291
292 user_info = await response.json()
293
294 # Combine token and user info, add expiration time
295 auth_data = {**token_data, **user_info}
296
297 # Add standard fields used by TidalProvider
298 auth_data["expires_at"] = time.time() + token_data.get("expires_in", 3600)
299 auth_data["quality"] = quality
300 auth_data["client_id"] = client_id
301 auth_data["client_secret"] = client_secret
302
303 return auth_data
304