music-assistant-server

9 KBPY
provider.py
9 KB222 lines • python
1"""Squeezelite Player Provider implementation."""
2
3from __future__ import annotations
4
5import logging
6from typing import TYPE_CHECKING, cast
7
8from aiohttp import web
9from aioslimproto.models import EventType as SlimEventType
10from aioslimproto.models import SlimEvent
11from aioslimproto.server import SlimServer
12from music_assistant_models.errors import SetupFailedError
13
14from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
15from music_assistant.helpers.audio import get_player_filter_params
16from music_assistant.helpers.util import is_port_in_use
17from music_assistant.models.player_provider import PlayerProvider
18
19from .constants import CONF_CLI_JSON_PORT, CONF_CLI_TELNET_PORT
20from .player import SqueezelitePlayer
21
22if TYPE_CHECKING:
23    from aioslimproto.client import SlimClient
24
25
26class SqueezelitePlayerProvider(PlayerProvider):
27    """Player provider for players using slimproto (like Squeezelite)."""
28
29    slimproto: SlimServer | None = None
30
31    async def handle_async_init(self) -> None:
32        """Handle async initialization of the provider."""
33        # set-up aioslimproto logging
34        if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
35            logging.getLogger("aioslimproto").setLevel(logging.DEBUG)
36        else:
37            logging.getLogger("aioslimproto").setLevel(self.logger.level + 10)
38
39        # Get all port configurations
40        control_port = cast("int", self.config.get_value(CONF_PORT))
41        telnet_port = cast("int | None", self.config.get_value(CONF_CLI_TELNET_PORT))
42        json_port = cast("int | None", self.config.get_value(CONF_CLI_JSON_PORT))
43
44        # Validate ALL required ports before starting ANY services
45        await self._validate_all_ports(control_port, telnet_port, json_port)
46
47        # Only proceed with server creation after all ports are validated
48        try:
49            self.slimproto = SlimServer(
50                cli_port=telnet_port or None,
51                cli_port_json=json_port or None,
52                ip_address=self.mass.streams.publish_ip,
53                name="Music Assistant",
54                control_port=control_port,
55            )
56            # start slimproto socket server
57            await self.slimproto.start()
58        except Exception as err:
59            # Ensure cleanup on any initialization failure
60            await self._cleanup_server()
61            raise SetupFailedError(f"Failed to start SlimProto server: {err}") from err
62
63    async def _validate_all_ports(
64        self, control_port: int, telnet_port: int | None, json_port: int | None
65    ) -> None:
66        """Validate that all required ports are available before starting any services."""
67        ports_to_check = [(control_port, "SlimProto control")]
68
69        if telnet_port and telnet_port > 0:
70            ports_to_check.append((telnet_port, "Telnet CLI"))
71
72        if json_port and json_port > 0:
73            ports_to_check.append((json_port, "JSON-RPC CLI"))
74
75        # Collect all port conflicts before raising any errors
76        occupied_ports = []
77        for port, port_description in ports_to_check:
78            if await is_port_in_use(port):
79                occupied_ports.append(f"{port_description} port {port}")
80
81        # If any ports are occupied, raise a comprehensive error message
82        if occupied_ports:
83            if len(occupied_ports) == 1:
84                msg = f"{occupied_ports[0]} is not available"
85            else:
86                msg = f"Multiple ports are not available: {', '.join(occupied_ports)}"
87            raise SetupFailedError(msg)
88
89    async def _cleanup_server(self) -> None:
90        """Ensure complete cleanup of the SlimProto server on initialization failure."""
91        if self.slimproto:
92            try:
93                await self.slimproto.stop()
94            except Exception as err:
95                self.logger.warning("Error stopping SlimProto server during cleanup: %s", err)
96            finally:
97                self.slimproto = None
98
99    async def loaded_in_mass(self) -> None:
100        """Call after the provider has been loaded."""
101        await super().loaded_in_mass()
102        assert self.slimproto is not None  # for type checker
103        self.slimproto.subscribe(self._handle_slimproto_event)
104        self.mass.streams.register_dynamic_route(
105            "/slimproto/multi", self._serve_multi_client_stream
106        )
107        # it seems that WiiM devices do not use the json rpc port that is broadcasted
108        # in the discovery info but instead they just assume that the jsonrpc endpoint
109        # lives on the same server as stream URL. So we need to provide a jsonrpc.js
110        # endpoint that just redirects to the jsonrpc handler within the slimproto package.
111        self.mass.streams.register_dynamic_route(
112            "/jsonrpc.js", self.slimproto.cli._handle_jsonrpc_client
113        )
114
115    async def unload(self, is_removed: bool = False) -> None:
116        """Handle unload/close of the provider."""
117        # Ensure complete cleanup
118        await self._cleanup_server()
119        self.mass.streams.unregister_dynamic_route("/slimproto/multi")
120        self.mass.streams.unregister_dynamic_route("/jsonrpc.js")
121
122    def get_corrected_elapsed_milliseconds(self, slimplayer: SlimClient) -> int:
123        """Return corrected elapsed milliseconds for a slimplayer."""
124        sync_delay = self.mass.config.get_raw_player_config_value(
125            slimplayer.player_id, CONF_SYNC_ADJUST, 0
126        )
127        return int(slimplayer.elapsed_milliseconds - sync_delay)
128
129    def _handle_slimproto_event(
130        self,
131        event: SlimEvent,
132    ) -> None:
133        """Handle events from SlimProto players."""
134        # Exit early if system is closing or slimproto server is not initialized
135        if self.mass.closing or not self.slimproto:
136            return
137
138        # Handle new player connect (or reconnect of existing player)
139        if event.type == SlimEventType.PLAYER_CONNECTED:
140            slimclient = self.slimproto.get_player(event.player_id)
141            if not slimclient:
142                return  # should not happen, but guard anyways
143            player = SqueezelitePlayer(self, event.player_id, slimclient)
144            self.mass.create_task(player.setup())
145            return
146
147        if not (mass_player := self.mass.players.get_player(event.player_id)):
148            return  # guard for unknown player
149        player = cast("SqueezelitePlayer", mass_player)
150
151        # Handle player disconnect
152        if event.type == SlimEventType.PLAYER_DISCONNECTED:
153            self.mass.create_task(self.mass.players.unregister(player.player_id))
154            return
155
156        # forward all other events to the player itself
157        player.handle_slim_event(event)
158
159    async def _serve_multi_client_stream(self, request: web.Request) -> web.StreamResponse:
160        """Serve the multi-client flow stream audio to a player."""
161        player_id = request.query.get("player_id")
162        fmt = request.query.get("fmt")
163        child_player_id = request.query.get("child_player_id")
164
165        if not player_id:
166            raise web.HTTPNotFound(reason="Missing player_id parameter")
167        if not fmt:
168            raise web.HTTPNotFound(reason="Missing fmt parameter")
169        if not child_player_id:
170            raise web.HTTPNotFound(reason="Missing child_player_id parameter")
171
172        if not (sync_parent := self.mass.players.get_player(player_id)):
173            raise web.HTTPNotFound(reason=f"Unknown player: {player_id}")
174        sync_parent = cast("SqueezelitePlayer", sync_parent)
175
176        if not (child_player := self.mass.players.get_player(child_player_id)):
177            raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}")
178
179        if not (stream := sync_parent.multi_client_stream) or stream.done:
180            raise web.HTTPNotFound(reason=f"There is no active stream for {player_id}!")
181
182        resp = web.StreamResponse(
183            status=200,
184            reason="OK",
185            headers={
186                "Content-Type": f"audio/{fmt}",
187            },
188        )
189        await resp.prepare(request)
190
191        # return early if this is not a GET request
192        if request.method != "GET":
193            return resp
194
195        # all checks passed, start streaming!
196        self.logger.debug(
197            "Start serving multi-client flow audio stream to %s",
198            child_player.display_name,
199        )
200
201        output_format = await self.mass.streams.get_output_format(
202            output_format_str=fmt,
203            player=child_player,
204            content_sample_rate=stream.audio_format.sample_rate,  # Flow PCM sample rate
205            content_bit_depth=stream.audio_format.bit_depth,  # Flow PCM bit depth (32)
206        )
207
208        async for chunk in stream.get_stream(
209            output_format=output_format,
210            filter_params=get_player_filter_params(
211                self.mass, child_player_id, stream.audio_format, output_format
212            )
213            if child_player_id
214            else None,
215        ):
216            try:
217                await resp.write(chunk)
218            except (BrokenPipeError, ConnectionResetError, ConnectionError):
219                # race condition
220                break
221        return resp
222