music-assistant-server

8.1 KBPY
sendspin_proxy.py
8.1 KB221 lines • python
1"""Sendspin WebSocket proxy handler for Music Assistant.
2
3This module provides an authenticated WebSocket proxy to the internal Sendspin server,
4allowing web clients to connect through the main webserver instead of requiring direct
5access to the Sendspin port.
6"""
7
8from __future__ import annotations
9
10import asyncio
11import contextlib
12import json
13import logging
14from typing import TYPE_CHECKING
15
16from aiohttp import WSMsgType, web
17
18from music_assistant.constants import MASS_LOGGER_NAME
19from music_assistant.controllers.webserver.helpers.auth_middleware import (
20    get_authenticated_user,
21    is_request_from_ingress,
22)
23
24if TYPE_CHECKING:
25    import aiohttp
26    from music_assistant_models.auth import User
27
28    from music_assistant.controllers.webserver import WebserverController
29
30LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.sendspin_proxy")
31
32
33class SendspinProxyHandler:
34    """Handler for proxying WebSocket connections to the internal Sendspin server."""
35
36    def __init__(self, webserver: WebserverController) -> None:
37        """Initialize the Sendspin proxy handler.
38
39        :param webserver: The webserver controller instance.
40        """
41        self.webserver = webserver
42        self.mass = webserver.mass
43        self.logger = LOGGER
44
45    @property
46    def internal_sendspin_url(self) -> str:
47        """Return the internal sendspin URL for connecting to the internal Sendspin server."""
48        return f"ws://{self.mass.streams.publish_ip}:8927/sendspin"
49
50    async def handle_sendspin_proxy(self, request: web.Request) -> web.WebSocketResponse:
51        """
52        Handle incoming WebSocket connection and proxy to internal Sendspin server.
53
54        Authentication is required as the first message. The client must send:
55        {"type": "auth", "token": "<access_token>"}
56
57        After successful authentication, all messages are proxied bidirectionally.
58
59        :param request: The incoming HTTP request to upgrade to WebSocket.
60        :return: The WebSocket response.
61        """
62        wsock = web.WebSocketResponse(heartbeat=30)
63        await wsock.prepare(request)
64
65        self.logger.debug("Sendspin proxy connection from %s", request.remote)
66
67        # Check for ingress authentication (HA handles auth via headers)
68        if is_request_from_ingress(request):
69            user = await get_authenticated_user(request)
70            if not user:
71                self.logger.warning(
72                    "Ingress auth failed for sendspin proxy from %s", request.remote
73                )
74                await wsock.close(code=4001, message=b"Ingress authentication failed")
75                return wsock
76            self.logger.debug("Sendspin proxy authenticated via ingress: %s", user.username)
77        else:
78            # Regular auth via first message
79            try:
80                user = await self._authenticate(wsock)
81                if not user:
82                    return wsock
83            except TimeoutError:
84                self.logger.warning("Auth timeout for sendspin proxy from %s", request.remote)
85                await wsock.close(code=4001, message=b"Authentication timeout")
86                return wsock
87            except Exception:
88                self.logger.exception("Auth error for sendspin proxy")
89                await wsock.close(code=4001, message=b"Authentication error")
90                return wsock
91
92        try:
93            internal_ws = await self.mass.http_session.ws_connect(self.internal_sendspin_url)
94        except Exception:
95            self.logger.exception("Failed to connect to internal Sendspin server")
96            await wsock.close(code=1011, message=b"Internal server error")
97            return wsock
98
99        self.logger.debug("Sendspin proxy authenticated and connected for %s", request.remote)
100
101        try:
102            await self._proxy_messages(wsock, internal_ws)
103        finally:
104            if not internal_ws.closed:
105                await internal_ws.close()
106            if not wsock.closed:
107                await wsock.close()
108
109        return wsock
110
111    async def _authenticate(self, wsock: web.WebSocketResponse) -> User | None:
112        """Wait for and validate authentication message.
113
114        :param wsock: The client WebSocket connection.
115        :return: The authenticated user, or None if authentication failed.
116        """
117        async with asyncio.timeout(10):
118            msg = await wsock.receive()
119
120        if msg.type != WSMsgType.TEXT:
121            await wsock.close(code=4001, message=b"Expected text message for auth")
122            return None
123
124        try:
125            auth_data = json.loads(msg.data)
126        except json.JSONDecodeError:
127            await wsock.close(code=4001, message=b"Invalid JSON in auth message")
128            return None
129
130        if auth_data.get("type") != "auth":
131            await wsock.close(code=4001, message=b"First message must be auth")
132            return None
133
134        token = auth_data.get("token")
135        if not token:
136            await wsock.close(code=4001, message=b"Token required in auth message")
137            return None
138
139        user = await self.webserver.auth.authenticate_with_token(token)
140        if not user:
141            await wsock.close(code=4001, message=b"Invalid or expired token")
142            return None
143
144        # Set the sendspin player_id on the user's websocket client(s)
145        # This allows the player controller to auto-whitelist this (web)player
146        # without modifying the user's player_filter list
147        client_id = auth_data.get("client_id")
148        if client_id:
149            self.webserver.set_sendspin_player_for_user(user.user_id, client_id)
150            self.logger.debug("Registered sendspin player %s for user %s", client_id, user.username)
151
152        self.logger.debug("Sendspin proxy authenticated user: %s", user.username)
153        await wsock.send_str('{"type": "auth_ok"}')
154        return user
155
156    async def _proxy_messages(
157        self,
158        client_ws: web.WebSocketResponse,
159        internal_ws: aiohttp.ClientWebSocketResponse,
160    ) -> None:
161        """
162        Proxy messages bidirectionally between client and internal Sendspin server.
163
164        :param client_ws: The client WebSocket connection.
165        :param internal_ws: The internal Sendspin server WebSocket connection.
166        """
167        client_to_internal = asyncio.create_task(
168            self._forward_client_to_internal(client_ws, internal_ws)
169        )
170        internal_to_client = asyncio.create_task(
171            self._forward_internal_to_client(client_ws, internal_ws)
172        )
173
174        _done, pending = await asyncio.wait(
175            [client_to_internal, internal_to_client],
176            return_when=asyncio.FIRST_COMPLETED,
177        )
178
179        for task in pending:
180            task.cancel()
181            with contextlib.suppress(asyncio.CancelledError):
182                await task
183
184    async def _forward_client_to_internal(
185        self,
186        client_ws: web.WebSocketResponse,
187        internal_ws: aiohttp.ClientWebSocketResponse,
188    ) -> None:
189        """
190        Forward messages from client to internal Sendspin server.
191
192        :param client_ws: The client WebSocket connection.
193        :param internal_ws: The internal Sendspin server WebSocket connection.
194        """
195        async for msg in client_ws:
196            if msg.type == WSMsgType.TEXT:
197                await internal_ws.send_str(msg.data)
198            elif msg.type == WSMsgType.BINARY:
199                await internal_ws.send_bytes(msg.data)
200            elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR):
201                break
202
203    async def _forward_internal_to_client(
204        self,
205        client_ws: web.WebSocketResponse,
206        internal_ws: aiohttp.ClientWebSocketResponse,
207    ) -> None:
208        """
209        Forward messages from internal Sendspin server to client.
210
211        :param client_ws: The client WebSocket connection.
212        :param internal_ws: The internal Sendspin server WebSocket connection.
213        """
214        async for msg in internal_ws:
215            if msg.type == WSMsgType.TEXT:
216                await client_ws.send_str(msg.data)
217            elif msg.type == WSMsgType.BINARY:
218                await client_ws.send_bytes(msg.data)
219            elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR):
220                break
221