/
/
/
1#!/usr/bin/env python3
2"""
3Control Music Assistant Snapcast plugin.
4
5This script is a bridge between Music Assistant and Snapcast.
6It connects to Music Assistant via a Unix socket and sends metadata to Snapcast
7and listens for player commands.
8"""
9
10import json
11import logging
12import socket
13import sys
14import threading
15import urllib.parse
16from collections.abc import Callable
17from contextlib import suppress
18from time import sleep
19from typing import Any
20
21import shortuuid
22
23LOOP_STATUS_MAP = {
24 "all": "playlist",
25 "one": "track",
26 "off": "none",
27}
28LOOP_STATUS_MAP_REVERSE = {v: k for k, v in LOOP_STATUS_MAP.items()}
29
30MessageCallback = Callable[[dict[str, Any]], None]
31
32
33def send(json_msg: dict[str, Any]) -> None:
34 """Send a message to stdout."""
35 sys.stdout.write(json.dumps(json_msg))
36 sys.stdout.write("\n")
37 sys.stdout.flush()
38
39
40class MusicAssistantControl:
41 """Music Assistant Unix socket remote control Snapcast plugin."""
42
43 def __init__(
44 self,
45 queue_id: str,
46 socket_path: str,
47 streamserver_ip: str,
48 streamserver_port: int,
49 ) -> None:
50 """Initialize."""
51 self.queue_id = queue_id
52 self.socket_path = socket_path
53 self.streamserver_ip = streamserver_ip
54 self.streamserver_port = streamserver_port
55 self._metadata: dict[str, Any] = {}
56 self._properties: dict[str, Any] = {}
57 self._request_callbacks: dict[str, MessageCallback] = {}
58 self._seek_offset = 0.0
59 self._socket: socket.socket | None = None
60 self._stopped = False
61 self._shutdown_event = threading.Event()
62 self._socket_thread = threading.Thread(target=self._socket_loop, args=())
63 self._socket_thread.name = "massControl"
64 self._socket_thread.start()
65
66 def stop(self) -> None:
67 """Stop the socket thread."""
68 self._stopped = True
69 if self._socket:
70 with suppress(OSError):
71 self._socket.close()
72 if threading.current_thread() is not self._socket_thread:
73 self._socket_thread.join()
74
75 def shutdown(self) -> None:
76 """Exit the control script."""
77 logger.info("Shutdown requested by server")
78 self.stop()
79 self._shutdown_event.set()
80
81 def handle_snapcast_request(self, request: dict[str, Any]) -> None:
82 """Handle (JSON RPC) message from Snapcast."""
83 id: str = request["id"] # noqa: A001
84 interface, cmd = request["method"].rsplit(".", 1)
85
86 queue_id = self.queue_id
87
88 # deny invalid commands
89 if interface != "Plugin.Stream.Player" or cmd not in (
90 "Control",
91 "SetProperty",
92 "GetProperties",
93 ):
94 send(
95 {
96 "jsonrpc": "2.0",
97 "error": {"code": -32601, "message": "Method not found"},
98 "id": id,
99 }
100 )
101
102 if cmd == "Control":
103 command = request["params"]["command"]
104 params = request["params"].get("params", {})
105 logger.debug(f"Control command: {command}, params: {params}")
106 if command == "next":
107 self.send_request("player_queues/next", queue_id=queue_id)
108 elif command == "previous":
109 self.send_request("player_queues/previous", queue_id=queue_id)
110 elif command == "play":
111 self.send_request("player_queues/play", queue_id=queue_id)
112 elif command == "pause":
113 self.send_request("player_queues/pause", queue_id=queue_id)
114 elif command == "playPause":
115 self.send_request("player_queues/play_pause", queue_id=queue_id)
116 elif command == "stop":
117 self.send_request("player_queues/stop", queue_id=queue_id)
118 elif command == "setPosition":
119 position = float(params["position"])
120 self.send_request("player_queues/seek", queue_id=queue_id, position=position)
121 elif command == "seek":
122 seek_offset = float(params["offset"])
123 self.send_request("player_queues/skip", queue_id=queue_id, seconds=seek_offset)
124 elif cmd == "SetProperty":
125 properties = request["params"]
126 logger.debug(f"SetProperty: {properties}")
127 if "shuffle" in properties:
128 self.send_request(
129 "player_queues/shuffle",
130 queue_id=queue_id,
131 shuffle_enabled=properties["shuffle"],
132 )
133 if "loopStatus" in properties:
134 value = properties["loopStatus"]
135 self.send_request(
136 "player_queues/repeat",
137 queue_id=queue_id,
138 repeat_mode=LOOP_STATUS_MAP_REVERSE[value],
139 )
140 # if "volume" in properties:
141 # self.send_request("core.mixer.set_volume", {"volume": int(properties["volume"])})
142 # if "mute" in properties:
143 # self.send_request("core.mixer.set_mute", {"mute": properties["mute"]})
144 elif cmd == "GetProperties":
145
146 def handle_result(result: dict[str, Any]) -> None:
147 send(
148 {
149 "jsonrpc": "2.0",
150 "result": self._create_properties(result),
151 "id": id,
152 }
153 )
154
155 self.send_request("player_queues/get", callback=handle_result, queue_id=queue_id)
156 return
157
158 # always acknowledge the request
159 send({"jsonrpc": "2.0", "result": "ok", "id": id})
160
161 def send_snapcast_log_notification(self, message: str, severity: str = "Info") -> None:
162 """Send log message to Snapcast."""
163 send(
164 {
165 "jsonrpc": "2.0",
166 "method": "Plugin.Stream.Log",
167 "params": {"severity": severity, "message": message},
168 }
169 )
170
171 def send_snapcast_properties_notification(self, properties: dict[str, Any]) -> None:
172 """Send properties to Snapcast."""
173 send(
174 {
175 "jsonrpc": "2.0",
176 "method": "Plugin.Stream.Player.Properties",
177 "params": properties,
178 }
179 )
180
181 def send_snapcast_stream_ready_notification(self) -> None:
182 """Send stream ready notification to Snapcast."""
183 send({"jsonrpc": "2.0", "method": "Plugin.Stream.Ready"})
184
185 def _socket_loop(self) -> None:
186 logger.info("Started socket loop")
187 while not self._stopped:
188 try:
189 self._connect_and_read()
190 except (Exception, KeyboardInterrupt) as e:
191 logger.info(f"Exception in socket loop: {e!s}")
192 if not self._stopped:
193 sleep(2)
194
195 def _connect_and_read(self) -> None:
196 """Connect to the Unix socket and read messages."""
197 logger.info("Connecting to Unix socket: %s", self.socket_path)
198 self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
199 try:
200 self._socket.connect(self.socket_path)
201 logger.info("Connected to Unix socket")
202 self.send_snapcast_stream_ready_notification()
203
204 # Read messages from socket
205 buffer = ""
206 while not self._stopped:
207 try:
208 data = self._socket.recv(4096)
209 if not data:
210 logger.info("Socket closed by server")
211 break
212 buffer += data.decode()
213
214 # Process complete lines
215 while "\n" in buffer:
216 line, buffer = buffer.split("\n", 1)
217 if line.strip():
218 self._handle_socket_message(line)
219 except TimeoutError:
220 continue
221 except OSError as e:
222 logger.error(f"Socket error: {e}")
223 break
224 finally:
225 if self._socket:
226 self._socket.close()
227 self._socket = None
228
229 def _handle_socket_message(self, message: str) -> None:
230 """Handle a message from the Music Assistant socket."""
231 logger.debug("Socket message received: %s", message)
232 try:
233 data = json.loads(message)
234 except json.JSONDecodeError as e:
235 logger.error(f"Invalid JSON: {e}")
236 return
237
238 if data.get("command") == "shutdown":
239 self.shutdown()
240 return
241
242 # Request response
243 if "message_id" in data:
244 message_id = data["message_id"]
245 if callback := self._request_callbacks.pop(message_id, None):
246 if result := data.get("result"):
247 callback(result)
248 # TODO: handle failed requests
249 return
250
251 # Event
252 if "event" in data and data.get("object_id") == self.queue_id:
253 event = data["event"]
254 if event == "queue_updated":
255 properties = self._create_properties(data["data"])
256 self.send_snapcast_properties_notification(properties)
257 return
258
259 def _create_properties(self, mass_queue_details: dict[str, Any]) -> dict[str, Any]:
260 """Create snapcast properties from Music Assistant queue details."""
261 current_queue_item: dict[str, Any] | None = mass_queue_details.get("current_item")
262 next_queue_item: dict[str, Any] | None = mass_queue_details.get("next_item")
263 properties: dict[str, Any] = {
264 "canGoNext": next_queue_item is not None,
265 "canGoPrevious": mass_queue_details["current_index"] > 0,
266 "canPlay": current_queue_item is not None,
267 "canPause": current_queue_item is not None,
268 "canSeek": current_queue_item and current_queue_item.get("duration") is not None,
269 "canControl": True,
270 "playbackStatus": mass_queue_details["state"],
271 "loopStatus": LOOP_STATUS_MAP[mass_queue_details["repeat_mode"]],
272 "shuffle": mass_queue_details["shuffle_enabled"],
273 "volume": 0,
274 "mute": False,
275 "rate": 1.0,
276 "position": mass_queue_details["elapsed_time"],
277 }
278 image_url: str | None = None
279 if current_queue_item and (media_item := current_queue_item.get("media_item")):
280 if image_path := current_queue_item.get("image", {}).get("path"):
281 image_path_encoded = urllib.parse.quote_plus(image_path)
282 image_url = (
283 # we prefer the streamserver for the imageproxy because it is enabled by default
284 # where the api server is by default protected
285 f"http://{self.streamserver_ip}:{self.streamserver_port}/imageproxy?path={image_path_encoded}"
286 f"&provider={current_queue_item['image']['provider']}"
287 "&size=512"
288 )
289 properties["metadata"] = {
290 "trackId": media_item["uri"],
291 "duration": media_item["duration"],
292 "title": media_item["name"],
293 "artUrl": image_url,
294 }
295 if "artists" in media_item:
296 properties["metadata"]["artist"] = [x["name"] for x in media_item["artists"]]
297 properties["metadata"]["artistSort"] = [
298 x["sort_name"] for x in media_item["artists"]
299 ]
300 if media_item.get("album"):
301 properties["metadata"]["album"] = media_item["album"]["name"]
302 properties["metadata"]["albumSort"] = media_item["album"]["sort_name"]
303 elif current_queue_item:
304 properties["metadata"] = {
305 "title": current_queue_item["name"],
306 "trackId": current_queue_item["queue_item_id"],
307 "artUrl": image_url,
308 }
309
310 return properties
311
312 def send_request(
313 self, command: str, callback: MessageCallback | None = None, **args: str | float | bool
314 ) -> None:
315 """Send request to Music Assistant via Unix socket."""
316 if not self._socket:
317 logger.warning("Cannot send request - socket not connected")
318 return
319
320 msg_id = shortuuid.random(10)
321 command_msg = {
322 "message_id": msg_id,
323 "command": command,
324 "args": args,
325 }
326 logger.debug("send_request: %s", command_msg)
327 if callback:
328 self._request_callbacks[msg_id] = callback
329 try:
330 data = json.dumps(command_msg) + "\n"
331 self._socket.sendall(data.encode())
332 except OSError as e:
333 logger.error(f"Failed to send request: {e}")
334 self._request_callbacks.pop(msg_id, None)
335
336
337if __name__ == "__main__":
338 # Parse command line
339 queue_id = None
340 socket_path: str | None = None
341 streamserver_ip: str | None = None
342 streamserver_port: str | None = None
343 stream_id: str | None = None
344 for arg in sys.argv:
345 if arg.startswith("--stream="):
346 stream_id = arg.split("=")[1]
347 if arg.startswith("--queueid="):
348 queue_id = arg.split("=")[1]
349 if arg.startswith("--socket="):
350 socket_path = arg.split("=")[1]
351 if arg.startswith("--streamserver-ip="):
352 streamserver_ip = arg.split("=")[1]
353 if arg.startswith("--streamserver-port="):
354 streamserver_port = arg.split("=")[1]
355
356 if not queue_id or not socket_path:
357 print("Usage: --stream=<stream_id> --socket=<socket_path>") # noqa: T201
358 sys.exit()
359
360 log_format_stderr = "%(asctime)s %(module)s %(levelname)s: %(message)s"
361 log_level = logging.INFO
362 logger = logging.getLogger("meta_mass")
363 logger.propagate = False
364 logger.setLevel(log_level)
365
366 # Log to stderr
367 log_handler = logging.StreamHandler()
368 log_handler.setFormatter(logging.Formatter(log_format_stderr))
369 logger.addHandler(log_handler)
370
371 logger.debug(
372 "Initializing for stream_id %s, queue_id %s and socket %s", stream_id, queue_id, socket_path
373 )
374
375 assert streamserver_ip is not None # for type checking
376 assert streamserver_port is not None
377 ctrl = MusicAssistantControl(queue_id, socket_path, streamserver_ip, int(streamserver_port))
378
379 # keep listening for messages on stdin and forward them
380 try:
381 while not ctrl._shutdown_event.is_set():
382 line = sys.stdin.readline()
383 if not line: # EOF
384 break
385 try:
386 ctrl.handle_snapcast_request(json.loads(line))
387 except Exception as e:
388 send(
389 {
390 "jsonrpc": "2.0",
391 "error": {"code": -32700, "message": "Parse error", "data": str(e)},
392 "id": id,
393 }
394 )
395 finally:
396 ctrl.stop()
397 sys.exit(0)
398