/
/
/
1"""Sendspin WebSocket proxy handler for Music Assistant.
2
3This module provides an authenticated WebSocket proxy to the internal Sendspin server,
4allowing web clients to connect through the main webserver instead of requiring direct
5access to the Sendspin port.
6"""
7
8from __future__ import annotations
9
10import asyncio
11import contextlib
12import json
13import logging
14from typing import TYPE_CHECKING
15
16from aiohttp import WSMsgType, web
17
18from music_assistant.constants import MASS_LOGGER_NAME
19from music_assistant.controllers.webserver.helpers.auth_middleware import (
20 get_authenticated_user,
21 is_request_from_ingress,
22)
23
24if TYPE_CHECKING:
25 import aiohttp
26 from music_assistant_models.auth import User
27
28 from music_assistant.controllers.webserver import WebserverController
29
30LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.sendspin_proxy")
31
32
33class SendspinProxyHandler:
34 """Handler for proxying WebSocket connections to the internal Sendspin server."""
35
36 def __init__(self, webserver: WebserverController) -> None:
37 """Initialize the Sendspin proxy handler.
38
39 :param webserver: The webserver controller instance.
40 """
41 self.webserver = webserver
42 self.mass = webserver.mass
43 self.logger = LOGGER
44
45 @property
46 def internal_sendspin_url(self) -> str:
47 """Return the internal sendspin URL for connecting to the internal Sendspin server."""
48 return f"ws://{self.mass.streams.publish_ip}:8927/sendspin"
49
50 async def handle_sendspin_proxy(self, request: web.Request) -> web.WebSocketResponse:
51 """
52 Handle incoming WebSocket connection and proxy to internal Sendspin server.
53
54 Authentication is required as the first message. The client must send:
55 {"type": "auth", "token": "<access_token>"}
56
57 After successful authentication, all messages are proxied bidirectionally.
58
59 :param request: The incoming HTTP request to upgrade to WebSocket.
60 :return: The WebSocket response.
61 """
62 wsock = web.WebSocketResponse(heartbeat=30)
63 await wsock.prepare(request)
64
65 self.logger.debug("Sendspin proxy connection from %s", request.remote)
66
67 # Check for ingress authentication (HA handles auth via headers)
68 if is_request_from_ingress(request):
69 user = await get_authenticated_user(request)
70 if not user:
71 self.logger.warning(
72 "Ingress auth failed for sendspin proxy from %s", request.remote
73 )
74 await wsock.close(code=4001, message=b"Ingress authentication failed")
75 return wsock
76 self.logger.debug("Sendspin proxy authenticated via ingress: %s", user.username)
77 else:
78 # Regular auth via first message
79 try:
80 user = await self._authenticate(wsock)
81 if not user:
82 return wsock
83 except TimeoutError:
84 self.logger.warning("Auth timeout for sendspin proxy from %s", request.remote)
85 await wsock.close(code=4001, message=b"Authentication timeout")
86 return wsock
87 except Exception:
88 self.logger.exception("Auth error for sendspin proxy")
89 await wsock.close(code=4001, message=b"Authentication error")
90 return wsock
91
92 try:
93 internal_ws = await self.mass.http_session.ws_connect(self.internal_sendspin_url)
94 except Exception:
95 self.logger.exception("Failed to connect to internal Sendspin server")
96 await wsock.close(code=1011, message=b"Internal server error")
97 return wsock
98
99 self.logger.debug("Sendspin proxy authenticated and connected for %s", request.remote)
100
101 try:
102 await self._proxy_messages(wsock, internal_ws)
103 finally:
104 if not internal_ws.closed:
105 await internal_ws.close()
106 if not wsock.closed:
107 await wsock.close()
108
109 return wsock
110
111 async def _authenticate(self, wsock: web.WebSocketResponse) -> User | None:
112 """Wait for and validate authentication message.
113
114 :param wsock: The client WebSocket connection.
115 :return: The authenticated user, or None if authentication failed.
116 """
117 async with asyncio.timeout(10):
118 msg = await wsock.receive()
119
120 if msg.type != WSMsgType.TEXT:
121 await wsock.close(code=4001, message=b"Expected text message for auth")
122 return None
123
124 try:
125 auth_data = json.loads(msg.data)
126 except json.JSONDecodeError:
127 await wsock.close(code=4001, message=b"Invalid JSON in auth message")
128 return None
129
130 if auth_data.get("type") != "auth":
131 await wsock.close(code=4001, message=b"First message must be auth")
132 return None
133
134 token = auth_data.get("token")
135 if not token:
136 await wsock.close(code=4001, message=b"Token required in auth message")
137 return None
138
139 user = await self.webserver.auth.authenticate_with_token(token)
140 if not user:
141 await wsock.close(code=4001, message=b"Invalid or expired token")
142 return None
143
144 # Set the sendspin player_id on the user's websocket client(s)
145 # This allows the player controller to auto-whitelist this (web)player
146 # without modifying the user's player_filter list
147 client_id = auth_data.get("client_id")
148 if client_id:
149 self.webserver.set_sendspin_player_for_user(user.user_id, client_id)
150 self.logger.debug("Registered sendspin player %s for user %s", client_id, user.username)
151
152 self.logger.debug("Sendspin proxy authenticated user: %s", user.username)
153 await wsock.send_str('{"type": "auth_ok"}')
154 return user
155
156 async def _proxy_messages(
157 self,
158 client_ws: web.WebSocketResponse,
159 internal_ws: aiohttp.ClientWebSocketResponse,
160 ) -> None:
161 """
162 Proxy messages bidirectionally between client and internal Sendspin server.
163
164 :param client_ws: The client WebSocket connection.
165 :param internal_ws: The internal Sendspin server WebSocket connection.
166 """
167 client_to_internal = asyncio.create_task(
168 self._forward_client_to_internal(client_ws, internal_ws)
169 )
170 internal_to_client = asyncio.create_task(
171 self._forward_internal_to_client(client_ws, internal_ws)
172 )
173
174 _done, pending = await asyncio.wait(
175 [client_to_internal, internal_to_client],
176 return_when=asyncio.FIRST_COMPLETED,
177 )
178
179 for task in pending:
180 task.cancel()
181 with contextlib.suppress(asyncio.CancelledError):
182 await task
183
184 async def _forward_client_to_internal(
185 self,
186 client_ws: web.WebSocketResponse,
187 internal_ws: aiohttp.ClientWebSocketResponse,
188 ) -> None:
189 """
190 Forward messages from client to internal Sendspin server.
191
192 :param client_ws: The client WebSocket connection.
193 :param internal_ws: The internal Sendspin server WebSocket connection.
194 """
195 async for msg in client_ws:
196 if msg.type == WSMsgType.TEXT:
197 await internal_ws.send_str(msg.data)
198 elif msg.type == WSMsgType.BINARY:
199 await internal_ws.send_bytes(msg.data)
200 elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR):
201 break
202
203 async def _forward_internal_to_client(
204 self,
205 client_ws: web.WebSocketResponse,
206 internal_ws: aiohttp.ClientWebSocketResponse,
207 ) -> None:
208 """
209 Forward messages from internal Sendspin server to client.
210
211 :param client_ws: The client WebSocket connection.
212 :param internal_ws: The internal Sendspin server WebSocket connection.
213 """
214 async for msg in internal_ws:
215 if msg.type == WSMsgType.TEXT:
216 await client_ws.send_str(msg.data)
217 elif msg.type == WSMsgType.BINARY:
218 await client_ws.send_bytes(msg.data)
219 elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSED, WSMsgType.ERROR):
220 break
221