/
/
/
1"""Squeezelite Player Provider implementation."""
2
3from __future__ import annotations
4
5import logging
6from typing import TYPE_CHECKING, cast
7
8from aiohttp import web
9from aioslimproto.models import EventType as SlimEventType
10from aioslimproto.models import SlimEvent
11from aioslimproto.server import SlimServer
12from music_assistant_models.errors import SetupFailedError
13
14from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
15from music_assistant.helpers.audio import get_player_filter_params
16from music_assistant.helpers.util import is_port_in_use
17from music_assistant.models.player_provider import PlayerProvider
18
19from .constants import CONF_CLI_JSON_PORT, CONF_CLI_TELNET_PORT
20from .player import SqueezelitePlayer
21
22if TYPE_CHECKING:
23 from aioslimproto.client import SlimClient
24
25
26class SqueezelitePlayerProvider(PlayerProvider):
27 """Player provider for players using slimproto (like Squeezelite)."""
28
29 slimproto: SlimServer | None = None
30
31 async def handle_async_init(self) -> None:
32 """Handle async initialization of the provider."""
33 # set-up aioslimproto logging
34 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
35 logging.getLogger("aioslimproto").setLevel(logging.DEBUG)
36 else:
37 logging.getLogger("aioslimproto").setLevel(self.logger.level + 10)
38
39 # Get all port configurations
40 control_port = cast("int", self.config.get_value(CONF_PORT))
41 telnet_port = cast("int | None", self.config.get_value(CONF_CLI_TELNET_PORT))
42 json_port = cast("int | None", self.config.get_value(CONF_CLI_JSON_PORT))
43
44 # Validate ALL required ports before starting ANY services
45 await self._validate_all_ports(control_port, telnet_port, json_port)
46
47 # Only proceed with server creation after all ports are validated
48 try:
49 self.slimproto = SlimServer(
50 cli_port=telnet_port or None,
51 cli_port_json=json_port or None,
52 ip_address=self.mass.streams.publish_ip,
53 name="Music Assistant",
54 control_port=control_port,
55 )
56 # start slimproto socket server
57 await self.slimproto.start()
58 except Exception as err:
59 # Ensure cleanup on any initialization failure
60 await self._cleanup_server()
61 raise SetupFailedError(f"Failed to start SlimProto server: {err}") from err
62
63 async def _validate_all_ports(
64 self, control_port: int, telnet_port: int | None, json_port: int | None
65 ) -> None:
66 """Validate that all required ports are available before starting any services."""
67 ports_to_check = [(control_port, "SlimProto control")]
68
69 if telnet_port and telnet_port > 0:
70 ports_to_check.append((telnet_port, "Telnet CLI"))
71
72 if json_port and json_port > 0:
73 ports_to_check.append((json_port, "JSON-RPC CLI"))
74
75 # Collect all port conflicts before raising any errors
76 occupied_ports = []
77 for port, port_description in ports_to_check:
78 if await is_port_in_use(port):
79 occupied_ports.append(f"{port_description} port {port}")
80
81 # If any ports are occupied, raise a comprehensive error message
82 if occupied_ports:
83 if len(occupied_ports) == 1:
84 msg = f"{occupied_ports[0]} is not available"
85 else:
86 msg = f"Multiple ports are not available: {', '.join(occupied_ports)}"
87 raise SetupFailedError(msg)
88
89 async def _cleanup_server(self) -> None:
90 """Ensure complete cleanup of the SlimProto server on initialization failure."""
91 if self.slimproto:
92 try:
93 await self.slimproto.stop()
94 except Exception as err:
95 self.logger.warning("Error stopping SlimProto server during cleanup: %s", err)
96 finally:
97 self.slimproto = None
98
99 async def loaded_in_mass(self) -> None:
100 """Call after the provider has been loaded."""
101 await super().loaded_in_mass()
102 assert self.slimproto is not None # for type checker
103 self.slimproto.subscribe(self._handle_slimproto_event)
104 self.mass.streams.register_dynamic_route(
105 "/slimproto/multi", self._serve_multi_client_stream
106 )
107 # it seems that WiiM devices do not use the json rpc port that is broadcasted
108 # in the discovery info but instead they just assume that the jsonrpc endpoint
109 # lives on the same server as stream URL. So we need to provide a jsonrpc.js
110 # endpoint that just redirects to the jsonrpc handler within the slimproto package.
111 self.mass.streams.register_dynamic_route(
112 "/jsonrpc.js", self.slimproto.cli._handle_jsonrpc_client
113 )
114
115 async def unload(self, is_removed: bool = False) -> None:
116 """Handle unload/close of the provider."""
117 # Ensure complete cleanup
118 await self._cleanup_server()
119 self.mass.streams.unregister_dynamic_route("/slimproto/multi")
120 self.mass.streams.unregister_dynamic_route("/jsonrpc.js")
121
122 def get_corrected_elapsed_milliseconds(self, slimplayer: SlimClient) -> int:
123 """Return corrected elapsed milliseconds for a slimplayer."""
124 sync_delay = self.mass.config.get_raw_player_config_value(
125 slimplayer.player_id, CONF_SYNC_ADJUST, 0
126 )
127 return int(slimplayer.elapsed_milliseconds - sync_delay)
128
129 def _handle_slimproto_event(
130 self,
131 event: SlimEvent,
132 ) -> None:
133 """Handle events from SlimProto players."""
134 # Exit early if system is closing or slimproto server is not initialized
135 if self.mass.closing or not self.slimproto:
136 return
137
138 # Handle new player connect (or reconnect of existing player)
139 if event.type == SlimEventType.PLAYER_CONNECTED:
140 slimclient = self.slimproto.get_player(event.player_id)
141 if not slimclient:
142 return # should not happen, but guard anyways
143 player = SqueezelitePlayer(self, event.player_id, slimclient)
144 self.mass.create_task(player.setup())
145 return
146
147 if not (mass_player := self.mass.players.get_player(event.player_id)):
148 return # guard for unknown player
149 player = cast("SqueezelitePlayer", mass_player)
150
151 # Handle player disconnect
152 if event.type == SlimEventType.PLAYER_DISCONNECTED:
153 self.mass.create_task(self.mass.players.unregister(player.player_id))
154 return
155
156 # forward all other events to the player itself
157 player.handle_slim_event(event)
158
159 async def _serve_multi_client_stream(self, request: web.Request) -> web.StreamResponse:
160 """Serve the multi-client flow stream audio to a player."""
161 player_id = request.query.get("player_id")
162 fmt = request.query.get("fmt")
163 child_player_id = request.query.get("child_player_id")
164
165 if not player_id:
166 raise web.HTTPNotFound(reason="Missing player_id parameter")
167 if not fmt:
168 raise web.HTTPNotFound(reason="Missing fmt parameter")
169 if not child_player_id:
170 raise web.HTTPNotFound(reason="Missing child_player_id parameter")
171
172 if not (sync_parent := self.mass.players.get_player(player_id)):
173 raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
174 sync_parent = cast("SqueezelitePlayer", sync_parent)
175
176 if not (child_player := self.mass.players.get_player(child_player_id)):
177 raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
178
179 if not (stream := sync_parent.multi_client_stream) or stream.done:
180 raise web.HTTPNotFound(reason=f"There is no active stream for {player_id}!")
181
182 resp = web.StreamResponse(
183 status=200,
184 reason="OK",
185 headers={
186 "Content-Type": f"audio/{fmt}",
187 },
188 )
189 await resp.prepare(request)
190
191 # return early if this is not a GET request
192 if request.method != "GET":
193 return resp
194
195 # all checks passed, start streaming!
196 self.logger.debug(
197 "Start serving multi-client flow audio stream to %s",
198 child_player.display_name,
199 )
200
201 output_format = await self.mass.streams.get_output_format(
202 output_format_str=fmt,
203 player=child_player,
204 content_sample_rate=stream.audio_format.sample_rate, # Flow PCM sample rate
205 content_bit_depth=stream.audio_format.bit_depth, # Flow PCM bit depth (32)
206 )
207
208 async for chunk in stream.get_stream(
209 output_format=output_format,
210 filter_params=get_player_filter_params(
211 self.mass, child_player_id, stream.audio_format, output_format
212 )
213 if child_player_id
214 else None,
215 ):
216 try:
217 await resp.write(chunk)
218 except (BrokenPipeError, ConnectionResetError, ConnectionError):
219 # race condition
220 break
221 return resp
222