/
/
/
1#!/usr/bin/env python3
2"""
3Control Music Assistant Snapcast plugin.
4
5This script is a bridge between Music Assistant and Snapcast.
6It connects to Music Assistant via a Unix socket and sends metadata to Snapcast
7and listens for player commands.
8"""
9
10import json
11import logging
12import socket
13import sys
14import threading
15import urllib.parse
16from collections.abc import Callable
17from time import sleep
18from typing import Any
19
20import shortuuid
21
22LOOP_STATUS_MAP = {
23 "all": "playlist",
24 "one": "track",
25 "off": "none",
26}
27LOOP_STATUS_MAP_REVERSE = {v: k for k, v in LOOP_STATUS_MAP.items()}
28
29MessageCallback = Callable[[dict[str, Any]], None]
30
31
32def send(json_msg: dict[str, Any]) -> None:
33 """Send a message to stdout."""
34 sys.stdout.write(json.dumps(json_msg))
35 sys.stdout.write("\n")
36 sys.stdout.flush()
37
38
39class MusicAssistantControl:
40 """Music Assistant Unix socket remote control Snapcast plugin."""
41
42 def __init__(
43 self,
44 queue_id: str,
45 socket_path: str,
46 streamserver_ip: str,
47 streamserver_port: int,
48 ) -> None:
49 """Initialize."""
50 self.queue_id = queue_id
51 self.socket_path = socket_path
52 self.streamserver_ip = streamserver_ip
53 self.streamserver_port = streamserver_port
54 self._metadata: dict[str, Any] = {}
55 self._properties: dict[str, Any] = {}
56 self._request_callbacks: dict[str, MessageCallback] = {}
57 self._seek_offset = 0.0
58 self._socket: socket.socket | None = None
59 self._stopped = False
60 self._socket_thread = threading.Thread(target=self._socket_loop, args=())
61 self._socket_thread.name = "massControl"
62 self._socket_thread.start()
63
64 def stop(self) -> None:
65 """Stop the socket thread."""
66 self._stopped = True
67 if self._socket:
68 self._socket.close()
69 self._socket_thread.join()
70
71 def handle_snapcast_request(self, request: dict[str, Any]) -> None:
72 """Handle (JSON RPC) message from Snapcast."""
73 id: str = request["id"] # noqa: A001
74 interface, cmd = request["method"].rsplit(".", 1)
75
76 queue_id = self.queue_id
77
78 # deny invalid commands
79 if interface != "Plugin.Stream.Player" or cmd not in (
80 "Control",
81 "SetProperty",
82 "GetProperties",
83 ):
84 send(
85 {
86 "jsonrpc": "2.0",
87 "error": {"code": -32601, "message": "Method not found"},
88 "id": id,
89 }
90 )
91
92 if cmd == "Control":
93 command = request["params"]["command"]
94 params = request["params"].get("params", {})
95 logger.debug(f"Control command: {command}, params: {params}")
96 if command == "next":
97 self.send_request("player_queues/next", queue_id=queue_id)
98 elif command == "previous":
99 self.send_request("player_queues/previous", queue_id=queue_id)
100 elif command == "play":
101 self.send_request("player_queues/play", queue_id=queue_id)
102 elif command == "pause":
103 self.send_request("player_queues/pause", queue_id=queue_id)
104 elif command == "playPause":
105 self.send_request("player_queues/play_pause", queue_id=queue_id)
106 elif command == "stop":
107 self.send_request("player_queues/stop", queue_id=queue_id)
108 elif command == "setPosition":
109 position = float(params["position"])
110 self.send_request("player_queues/seek", queue_id=queue_id, position=position)
111 elif command == "seek":
112 seek_offset = float(params["offset"])
113 self.send_request("player_queues/skip", queue_id=queue_id, seconds=seek_offset)
114 elif cmd == "SetProperty":
115 properties = request["params"]
116 logger.debug(f"SetProperty: {properties}")
117 if "shuffle" in properties:
118 self.send_request(
119 "player_queues/shuffle",
120 queue_id=queue_id,
121 shuffle_enabled=properties["shuffle"],
122 )
123 if "loopStatus" in properties:
124 value = properties["loopStatus"]
125 self.send_request(
126 "player_queues/repeat",
127 queue_id=queue_id,
128 repeat_mode=LOOP_STATUS_MAP_REVERSE[value],
129 )
130 # if "volume" in properties:
131 # self.send_request("core.mixer.set_volume", {"volume": int(properties["volume"])})
132 # if "mute" in properties:
133 # self.send_request("core.mixer.set_mute", {"mute": properties["mute"]})
134 elif cmd == "GetProperties":
135
136 def handle_result(result: dict[str, Any]) -> None:
137 send(
138 {
139 "jsonrpc": "2.0",
140 "result": self._create_properties(result),
141 "id": id,
142 }
143 )
144
145 self.send_request("player_queues/get", callback=handle_result, queue_id=queue_id)
146 return
147
148 # always acknowledge the request
149 send({"jsonrpc": "2.0", "result": "ok", "id": id})
150
151 def send_snapcast_log_notification(self, message: str, severity: str = "Info") -> None:
152 """Send log message to Snapcast."""
153 send(
154 {
155 "jsonrpc": "2.0",
156 "method": "Plugin.Stream.Log",
157 "params": {"severity": severity, "message": message},
158 }
159 )
160
161 def send_snapcast_properties_notification(self, properties: dict[str, Any]) -> None:
162 """Send properties to Snapcast."""
163 send(
164 {
165 "jsonrpc": "2.0",
166 "method": "Plugin.Stream.Player.Properties",
167 "params": properties,
168 }
169 )
170
171 def send_snapcast_stream_ready_notification(self) -> None:
172 """Send stream ready notification to Snapcast."""
173 send({"jsonrpc": "2.0", "method": "Plugin.Stream.Ready"})
174
175 def _socket_loop(self) -> None:
176 logger.info("Started socket loop")
177 while not self._stopped:
178 try:
179 self._connect_and_read()
180 except (Exception, KeyboardInterrupt) as e:
181 logger.info(f"Exception in socket loop: {e!s}")
182 if not self._stopped:
183 sleep(2)
184
185 def _connect_and_read(self) -> None:
186 """Connect to the Unix socket and read messages."""
187 logger.info("Connecting to Unix socket: %s", self.socket_path)
188 self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
189 try:
190 self._socket.connect(self.socket_path)
191 logger.info("Connected to Unix socket")
192 self.send_snapcast_stream_ready_notification()
193
194 # Read messages from socket
195 buffer = ""
196 while not self._stopped:
197 try:
198 data = self._socket.recv(4096)
199 if not data:
200 logger.info("Socket closed by server")
201 break
202 buffer += data.decode()
203
204 # Process complete lines
205 while "\n" in buffer:
206 line, buffer = buffer.split("\n", 1)
207 if line.strip():
208 self._handle_socket_message(line)
209 except TimeoutError:
210 continue
211 except OSError as e:
212 logger.error(f"Socket error: {e}")
213 break
214 finally:
215 if self._socket:
216 self._socket.close()
217 self._socket = None
218
219 def _handle_socket_message(self, message: str) -> None:
220 """Handle a message from the Music Assistant socket."""
221 logger.debug("Socket message received: %s", message)
222 try:
223 data = json.loads(message)
224 except json.JSONDecodeError as e:
225 logger.error(f"Invalid JSON: {e}")
226 return
227
228 # Request response
229 if "message_id" in data:
230 message_id = data["message_id"]
231 if callback := self._request_callbacks.pop(message_id, None):
232 if result := data.get("result"):
233 callback(result)
234 # TODO: handle failed requests
235 return
236
237 # Event
238 if "event" in data and data.get("object_id") == self.queue_id:
239 event = data["event"]
240 if event == "queue_updated":
241 properties = self._create_properties(data["data"])
242 self.send_snapcast_properties_notification(properties)
243 return
244
245 def _create_properties(self, mass_queue_details: dict[str, Any]) -> dict[str, Any]:
246 """Create snapcast properties from Music Assistant queue details."""
247 current_queue_item: dict[str, Any] | None = mass_queue_details.get("current_item")
248 next_queue_item: dict[str, Any] | None = mass_queue_details.get("next_item")
249 properties: dict[str, Any] = {
250 "canGoNext": next_queue_item is not None,
251 "canGoPrevious": mass_queue_details["current_index"] > 0,
252 "canPlay": current_queue_item is not None,
253 "canPause": current_queue_item is not None,
254 "canSeek": current_queue_item and current_queue_item.get("duration") is not None,
255 "canControl": True,
256 "playbackStatus": mass_queue_details["state"],
257 "loopStatus": LOOP_STATUS_MAP[mass_queue_details["repeat_mode"]],
258 "shuffle": mass_queue_details["shuffle_enabled"],
259 "volume": 0,
260 "mute": False,
261 "rate": 1.0,
262 "position": mass_queue_details["elapsed_time"],
263 }
264 image_url: str | None = None
265 if current_queue_item and (media_item := current_queue_item.get("media_item")):
266 if image_path := current_queue_item.get("image", {}).get("path"):
267 image_path_encoded = urllib.parse.quote_plus(image_path)
268 image_url = (
269 # we prefer the streamserver for the imageproxy because it is enabled by default
270 # where the api server is by default protected
271 f"http://{self.streamserver_ip}:{self.streamserver_port}/imageproxy?path={image_path_encoded}"
272 f"&provider={current_queue_item['image']['provider']}"
273 "&size=512"
274 )
275 properties["metadata"] = {
276 "trackId": media_item["uri"],
277 "duration": media_item["duration"],
278 "title": media_item["name"],
279 "artUrl": image_url,
280 }
281 if "artists" in media_item:
282 properties["metadata"]["artist"] = [x["name"] for x in media_item["artists"]]
283 properties["metadata"]["artistSort"] = [
284 x["sort_name"] for x in media_item["artists"]
285 ]
286 if media_item.get("album"):
287 properties["metadata"]["album"] = media_item["album"]["name"]
288 properties["metadata"]["albumSort"] = media_item["album"]["sort_name"]
289 elif current_queue_item:
290 properties["metadata"] = {
291 "title": current_queue_item["name"],
292 "trackId": current_queue_item["queue_item_id"],
293 "artUrl": image_url,
294 }
295
296 return properties
297
298 def send_request(
299 self, command: str, callback: MessageCallback | None = None, **args: str | float | bool
300 ) -> None:
301 """Send request to Music Assistant via Unix socket."""
302 if not self._socket:
303 logger.warning("Cannot send request - socket not connected")
304 return
305
306 msg_id = shortuuid.random(10)
307 command_msg = {
308 "message_id": msg_id,
309 "command": command,
310 "args": args,
311 }
312 logger.debug("send_request: %s", command_msg)
313 if callback:
314 self._request_callbacks[msg_id] = callback
315 try:
316 data = json.dumps(command_msg) + "\n"
317 self._socket.sendall(data.encode())
318 except OSError as e:
319 logger.error(f"Failed to send request: {e}")
320 self._request_callbacks.pop(msg_id, None)
321
322
323if __name__ == "__main__":
324 # Parse command line
325 queue_id = None
326 socket_path: str | None = None
327 streamserver_ip: str | None = None
328 streamserver_port: str | None = None
329 stream_id: str | None = None
330 for arg in sys.argv:
331 if arg.startswith("--stream="):
332 stream_id = arg.split("=")[1]
333 if arg.startswith("--queueid="):
334 queue_id = arg.split("=")[1]
335 if arg.startswith("--socket="):
336 socket_path = arg.split("=")[1]
337 if arg.startswith("--streamserver-ip="):
338 streamserver_ip = arg.split("=")[1]
339 if arg.startswith("--streamserver-port="):
340 streamserver_port = arg.split("=")[1]
341
342 if not queue_id or not socket_path:
343 print("Usage: --stream=<stream_id> --socket=<socket_path>") # noqa: T201
344 sys.exit()
345
346 log_format_stderr = "%(asctime)s %(module)s %(levelname)s: %(message)s"
347 log_level = logging.INFO
348 logger = logging.getLogger("meta_mass")
349 logger.propagate = False
350 logger.setLevel(log_level)
351
352 # Log to stderr
353 log_handler = logging.StreamHandler()
354 log_handler.setFormatter(logging.Formatter(log_format_stderr))
355 logger.addHandler(log_handler)
356
357 logger.debug(
358 "Initializing for stream_id %s, queue_id %s and socket %s", stream_id, queue_id, socket_path
359 )
360
361 assert streamserver_ip is not None # for type checking
362 assert streamserver_port is not None
363 ctrl = MusicAssistantControl(queue_id, socket_path, streamserver_ip, int(streamserver_port))
364
365 # keep listening for messages on stdin and forward them
366 try:
367 for line in sys.stdin:
368 try:
369 ctrl.handle_snapcast_request(json.loads(line))
370 except Exception as e:
371 send(
372 {
373 "jsonrpc": "2.0",
374 "error": {"code": -32700, "message": "Parse error", "data": str(e)},
375 "id": id,
376 }
377 )
378 except (SystemExit, KeyboardInterrupt):
379 sys.exit(0)
380