/
/
/
1"""Authentication middleware and helpers for HTTP requests and WebSocket connections."""
2
3from __future__ import annotations
4
5import logging
6from contextvars import ContextVar
7from typing import TYPE_CHECKING, Any, cast
8
9from aiohttp import web
10from music_assistant_models.auth import AuthProviderType, User, UserRole
11
12from music_assistant.constants import HOMEASSISTANT_SYSTEM_USER, MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
13
14from .auth_providers import get_ha_user_details, get_ha_user_role
15
16LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.auth")
17
18if TYPE_CHECKING:
19 from music_assistant import MusicAssistant
20
21# Context key for storing authenticated user in request
22USER_CONTEXT_KEY = "authenticated_user"
23
24# ContextVar for tracking current user and token across async calls
25current_user: ContextVar[User | None] = ContextVar("current_user", default=None)
26current_token: ContextVar[str | None] = ContextVar("current_token", default=None)
27# ContextVar for tracking the sendspin player associated with the current connection
28sendspin_player_id: ContextVar[str | None] = ContextVar("sendspin_player_id", default=None)
29
30
31async def get_authenticated_user(request: web.Request) -> User | None:
32 """Get authenticated user from request.
33
34 :param request: The aiohttp request.
35 """
36 # Check if user is already in context (from middleware)
37 if USER_CONTEXT_KEY in request:
38 return cast("User | None", request[USER_CONTEXT_KEY])
39
40 mass: MusicAssistant = request.app["mass"]
41
42 # Check for Home Assistant Ingress connections
43 if is_request_from_ingress(request):
44 ingress_user_id = request.headers.get("X-Remote-User-ID")
45 ingress_username = request.headers.get("X-Remote-User-Name")
46 ingress_display_name = request.headers.get("X-Remote-User-Display-Name")
47
48 # Require all Ingress headers to be present for security
49 if not (ingress_user_id and ingress_username):
50 return None
51
52 # Try to find existing user linked to this HA user ID
53 user = await mass.webserver.auth.get_user_by_provider_link(
54 AuthProviderType.HOME_ASSISTANT, ingress_user_id
55 )
56 if not user:
57 user = await mass.webserver.auth.get_user_by_username(ingress_username)
58 if not user:
59 # New user - fetch details from HA
60 ha_username, ha_display_name, avatar_url = await get_ha_user_details(
61 mass, ingress_user_id
62 )
63 role = await get_ha_user_role(mass, ingress_user_id)
64 user = await mass.webserver.auth.create_user(
65 username=ha_username or ingress_username,
66 role=role,
67 display_name=ha_display_name or ingress_display_name,
68 avatar_url=avatar_url,
69 )
70
71 # Link to Home Assistant provider (or create the link if user already existed)
72 await mass.webserver.auth.link_user_to_provider(
73 user, AuthProviderType.HOME_ASSISTANT, ingress_user_id
74 )
75
76 # Update user with HA details if available (HA is source of truth)
77 # Fall back to ingress headers if API lookup doesn't return values
78 _, ha_display_name, avatar_url = await get_ha_user_details(mass, ingress_user_id)
79 final_display_name = ha_display_name or ingress_display_name
80 LOGGER.log(
81 VERBOSE_LOG_LEVEL,
82 "Ingress auth for user %s: ha_display_name=%s, ingress_display_name=%s, "
83 "final_display_name=%s, avatar_url=%s",
84 user.username,
85 ha_display_name,
86 ingress_display_name,
87 final_display_name,
88 avatar_url,
89 )
90 if final_display_name or avatar_url:
91 user = await mass.webserver.auth.update_user(
92 user,
93 display_name=final_display_name,
94 avatar_url=avatar_url,
95 )
96 LOGGER.log(
97 VERBOSE_LOG_LEVEL,
98 "Updated user %s: display_name=%s, avatar_url=%s",
99 user.username,
100 user.display_name,
101 user.avatar_url,
102 )
103
104 # Store in request context
105 request[USER_CONTEXT_KEY] = user
106 return user
107
108 # Try to authenticate from Authorization header
109 auth_header = request.headers.get("Authorization")
110 if not auth_header:
111 return None
112
113 # Expected format: "Bearer <token>"
114 parts = auth_header.split(" ", 1)
115 if len(parts) != 2 or parts[0].lower() != "bearer":
116 return None
117
118 token = parts[1]
119
120 # Authenticate with token (works for both user tokens and API keys)
121 user = await mass.webserver.auth.authenticate_with_token(token)
122 if user:
123 # Security: Deny homeassistant system user on regular (non-Ingress) webserver
124 if not is_request_from_ingress(request) and user.username == HOMEASSISTANT_SYSTEM_USER:
125 # Reject system user on regular webserver (should only use Ingress server)
126 return None
127
128 # Store in request context
129 request[USER_CONTEXT_KEY] = user
130
131 return user
132
133
134async def require_authentication(request: web.Request) -> User:
135 """Require authentication for a request, raise 401 if not authenticated.
136
137 :param request: The aiohttp request.
138 """
139 user = await get_authenticated_user(request)
140 if not user:
141 raise web.HTTPUnauthorized(
142 text="Authentication required",
143 headers={"WWW-Authenticate": 'Bearer realm="Music Assistant"'},
144 )
145 return user
146
147
148async def require_admin(request: web.Request) -> User:
149 """Require admin role for a request, raise 403 if not admin.
150
151 :param request: The aiohttp request.
152 """
153 user = await require_authentication(request)
154 if user.role != UserRole.ADMIN:
155 raise web.HTTPForbidden(text="Admin access required")
156 return user
157
158
159def get_current_user() -> User | None:
160 """
161 Get the current authenticated user from context.
162
163 :return: The current user or None if not authenticated.
164 """
165 return current_user.get()
166
167
168def set_current_user(user: User | None) -> None:
169 """
170 Set the current authenticated user in context.
171
172 :param user: The user to set as current.
173 """
174 current_user.set(user)
175
176
177def get_current_token() -> str | None:
178 """
179 Get the current authentication token from context.
180
181 :return: The current token or None if not authenticated.
182 """
183 return current_token.get()
184
185
186def set_current_token(token: str | None) -> None:
187 """
188 Set the current authentication token in context.
189
190 :param token: The token to set as current.
191 """
192 current_token.set(token)
193
194
195def get_sendspin_player_id() -> str | None:
196 """Get the sendspin player ID associated with the current connection.
197
198 :return: The sendspin player ID or None if not a sendspin connection.
199 """
200 return sendspin_player_id.get()
201
202
203def set_sendspin_player_id(player_id: str | None) -> None:
204 """Set the sendspin player ID for the current connection.
205
206 :param player_id: The sendspin player ID to set.
207 """
208 sendspin_player_id.set(player_id)
209
210
211def is_request_from_ingress(request: web.Request) -> bool:
212 """Check if request is coming from Home Assistant Ingress (internal network).
213
214 Security is enforced by socket-level verification (IP/port binding), not headers.
215 Only requests on the internal ingress TCP site (172.30.32.x:8094) are accepted.
216
217 :param request: The aiohttp request.
218 """
219 # Check if ingress site is configured in the app
220 ingress_site_params = request.app.get("ingress_site")
221 if not ingress_site_params:
222 # No ingress site configured, can't be an ingress request
223 return False
224
225 try:
226 # Security: Verify the request came through the ingress site by checking socket
227 # to prevent bypassing authentication on the regular webserver
228 transport = request.transport
229 if transport:
230 sockname = transport.get_extra_info("sockname")
231 if sockname and len(sockname) >= 2:
232 server_ip, server_port = sockname[0], sockname[1]
233 expected_ip, expected_port = ingress_site_params
234 # Request must match the ingress site's bind address and port
235 return bool(server_ip == expected_ip and server_port == expected_port)
236 except Exception: # noqa: S110
237 pass
238
239 return False
240
241
242@web.middleware
243async def auth_middleware(request: web.Request, handler: Any) -> web.StreamResponse:
244 """Authenticate requests and store user in context.
245
246 :param request: The aiohttp request.
247 :param handler: The request handler.
248 """
249 # Skip authentication for ingress requests (HA handles auth)
250 if is_request_from_ingress(request):
251 return cast("web.StreamResponse", await handler(request))
252
253 # Unauthenticated routes (static files, info, login, setup, etc.)
254 unauthenticated_paths = [
255 "/info",
256 "/login",
257 "/setup",
258 "/auth/",
259 "/api-docs/",
260 "/assets/",
261 "/favicon.ico",
262 "/manifest.json",
263 "/index.html",
264 "/",
265 ]
266
267 # Check if path should bypass auth
268 for path_prefix in unauthenticated_paths:
269 if request.path.startswith(path_prefix):
270 return cast("web.StreamResponse", await handler(request))
271
272 # Try to authenticate
273 user = await get_authenticated_user(request)
274
275 # Store user in context (might be None for unauthenticated requests)
276 request[USER_CONTEXT_KEY] = user
277
278 # Let the handler decide if authentication is required
279 # The handler will call require_authentication() if needed
280 return cast("web.StreamResponse", await handler(request))
281