music-assistant-server

9.6 KBPY
auth_middleware.py
9.6 KB281 lines • python
1"""Authentication middleware and helpers for HTTP requests and WebSocket connections."""
2
3from __future__ import annotations
4
5import logging
6from contextvars import ContextVar
7from typing import TYPE_CHECKING, Any, cast
8
9from aiohttp import web
10from music_assistant_models.auth import AuthProviderType, User, UserRole
11
12from music_assistant.constants import HOMEASSISTANT_SYSTEM_USER, MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
13
14from .auth_providers import get_ha_user_details, get_ha_user_role
15
16LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.auth")
17
18if TYPE_CHECKING:
19    from music_assistant import MusicAssistant
20
21# Context key for storing authenticated user in request
22USER_CONTEXT_KEY = "authenticated_user"
23
24# ContextVar for tracking current user and token across async calls
25current_user: ContextVar[User | None] = ContextVar("current_user", default=None)
26current_token: ContextVar[str | None] = ContextVar("current_token", default=None)
27# ContextVar for tracking the sendspin player associated with the current connection
28sendspin_player_id: ContextVar[str | None] = ContextVar("sendspin_player_id", default=None)
29
30
31async def get_authenticated_user(request: web.Request) -> User | None:
32    """Get authenticated user from request.
33
34    :param request: The aiohttp request.
35    """
36    # Check if user is already in context (from middleware)
37    if USER_CONTEXT_KEY in request:
38        return cast("User | None", request[USER_CONTEXT_KEY])
39
40    mass: MusicAssistant = request.app["mass"]
41
42    # Check for Home Assistant Ingress connections
43    if is_request_from_ingress(request):
44        ingress_user_id = request.headers.get("X-Remote-User-ID")
45        ingress_username = request.headers.get("X-Remote-User-Name")
46        ingress_display_name = request.headers.get("X-Remote-User-Display-Name")
47
48        # Require all Ingress headers to be present for security
49        if not (ingress_user_id and ingress_username):
50            return None
51
52        # Try to find existing user linked to this HA user ID
53        user = await mass.webserver.auth.get_user_by_provider_link(
54            AuthProviderType.HOME_ASSISTANT, ingress_user_id
55        )
56        if not user:
57            user = await mass.webserver.auth.get_user_by_username(ingress_username)
58            if not user:
59                # New user - fetch details from HA
60                ha_username, ha_display_name, avatar_url = await get_ha_user_details(
61                    mass, ingress_user_id
62                )
63                role = await get_ha_user_role(mass, ingress_user_id)
64                user = await mass.webserver.auth.create_user(
65                    username=ha_username or ingress_username,
66                    role=role,
67                    display_name=ha_display_name or ingress_display_name,
68                    avatar_url=avatar_url,
69                )
70
71            # Link to Home Assistant provider (or create the link if user already existed)
72            await mass.webserver.auth.link_user_to_provider(
73                user, AuthProviderType.HOME_ASSISTANT, ingress_user_id
74            )
75
76        # Update user with HA details if available (HA is source of truth)
77        # Fall back to ingress headers if API lookup doesn't return values
78        _, ha_display_name, avatar_url = await get_ha_user_details(mass, ingress_user_id)
79        final_display_name = ha_display_name or ingress_display_name
80        LOGGER.log(
81            VERBOSE_LOG_LEVEL,
82            "Ingress auth for user %s: ha_display_name=%s, ingress_display_name=%s, "
83            "final_display_name=%s, avatar_url=%s",
84            user.username,
85            ha_display_name,
86            ingress_display_name,
87            final_display_name,
88            avatar_url,
89        )
90        if final_display_name or avatar_url:
91            user = await mass.webserver.auth.update_user(
92                user,
93                display_name=final_display_name,
94                avatar_url=avatar_url,
95            )
96            LOGGER.log(
97                VERBOSE_LOG_LEVEL,
98                "Updated user %s: display_name=%s, avatar_url=%s",
99                user.username,
100                user.display_name,
101                user.avatar_url,
102            )
103
104        # Store in request context
105        request[USER_CONTEXT_KEY] = user
106        return user
107
108    # Try to authenticate from Authorization header
109    auth_header = request.headers.get("Authorization")
110    if not auth_header:
111        return None
112
113    # Expected format: "Bearer <token>"
114    parts = auth_header.split(" ", 1)
115    if len(parts) != 2 or parts[0].lower() != "bearer":
116        return None
117
118    token = parts[1]
119
120    # Authenticate with token (works for both user tokens and API keys)
121    user = await mass.webserver.auth.authenticate_with_token(token)
122    if user:
123        # Security: Deny homeassistant system user on regular (non-Ingress) webserver
124        if not is_request_from_ingress(request) and user.username == HOMEASSISTANT_SYSTEM_USER:
125            # Reject system user on regular webserver (should only use Ingress server)
126            return None
127
128        # Store in request context
129        request[USER_CONTEXT_KEY] = user
130
131    return user
132
133
134async def require_authentication(request: web.Request) -> User:
135    """Require authentication for a request, raise 401 if not authenticated.
136
137    :param request: The aiohttp request.
138    """
139    user = await get_authenticated_user(request)
140    if not user:
141        raise web.HTTPUnauthorized(
142            text="Authentication required",
143            headers={"WWW-Authenticate": 'Bearer realm="Music Assistant"'},
144        )
145    return user
146
147
148async def require_admin(request: web.Request) -> User:
149    """Require admin role for a request, raise 403 if not admin.
150
151    :param request: The aiohttp request.
152    """
153    user = await require_authentication(request)
154    if user.role != UserRole.ADMIN:
155        raise web.HTTPForbidden(text="Admin access required")
156    return user
157
158
159def get_current_user() -> User | None:
160    """
161    Get the current authenticated user from context.
162
163    :return: The current user or None if not authenticated.
164    """
165    return current_user.get()
166
167
168def set_current_user(user: User | None) -> None:
169    """
170    Set the current authenticated user in context.
171
172    :param user: The user to set as current.
173    """
174    current_user.set(user)
175
176
177def get_current_token() -> str | None:
178    """
179    Get the current authentication token from context.
180
181    :return: The current token or None if not authenticated.
182    """
183    return current_token.get()
184
185
186def set_current_token(token: str | None) -> None:
187    """
188    Set the current authentication token in context.
189
190    :param token: The token to set as current.
191    """
192    current_token.set(token)
193
194
195def get_sendspin_player_id() -> str | None:
196    """Get the sendspin player ID associated with the current connection.
197
198    :return: The sendspin player ID or None if not a sendspin connection.
199    """
200    return sendspin_player_id.get()
201
202
203def set_sendspin_player_id(player_id: str | None) -> None:
204    """Set the sendspin player ID for the current connection.
205
206    :param player_id: The sendspin player ID to set.
207    """
208    sendspin_player_id.set(player_id)
209
210
211def is_request_from_ingress(request: web.Request) -> bool:
212    """Check if request is coming from Home Assistant Ingress (internal network).
213
214    Security is enforced by socket-level verification (IP/port binding), not headers.
215    Only requests on the internal ingress TCP site (172.30.32.x:8094) are accepted.
216
217    :param request: The aiohttp request.
218    """
219    # Check if ingress site is configured in the app
220    ingress_site_params = request.app.get("ingress_site")
221    if not ingress_site_params:
222        # No ingress site configured, can't be an ingress request
223        return False
224
225    try:
226        # Security: Verify the request came through the ingress site by checking socket
227        # to prevent bypassing authentication on the regular webserver
228        transport = request.transport
229        if transport:
230            sockname = transport.get_extra_info("sockname")
231            if sockname and len(sockname) >= 2:
232                server_ip, server_port = sockname[0], sockname[1]
233                expected_ip, expected_port = ingress_site_params
234                # Request must match the ingress site's bind address and port
235                return bool(server_ip == expected_ip and server_port == expected_port)
236    except Exception:  # noqa: S110
237        pass
238
239    return False
240
241
242@web.middleware
243async def auth_middleware(request: web.Request, handler: Any) -> web.StreamResponse:
244    """Authenticate requests and store user in context.
245
246    :param request: The aiohttp request.
247    :param handler: The request handler.
248    """
249    # Skip authentication for ingress requests (HA handles auth)
250    if is_request_from_ingress(request):
251        return cast("web.StreamResponse", await handler(request))
252
253    # Unauthenticated routes (static files, info, login, setup, etc.)
254    unauthenticated_paths = [
255        "/info",
256        "/login",
257        "/setup",
258        "/auth/",
259        "/api-docs/",
260        "/assets/",
261        "/favicon.ico",
262        "/manifest.json",
263        "/index.html",
264        "/",
265    ]
266
267    # Check if path should bypass auth
268    for path_prefix in unauthenticated_paths:
269        if request.path.startswith(path_prefix):
270            return cast("web.StreamResponse", await handler(request))
271
272    # Try to authenticate
273    user = await get_authenticated_user(request)
274
275    # Store user in context (might be None for unauthenticated requests)
276    request[USER_CONTEXT_KEY] = user
277
278    # Let the handler decide if authentication is required
279    # The handler will call require_authentication() if needed
280    return cast("web.StreamResponse", await handler(request))
281