/
/
/
1"""Metadata reader for shairport-sync metadata pipe."""
2
3from __future__ import annotations
4
5import asyncio
6import base64
7import os
8import re
9import struct
10import time
11from contextlib import suppress
12from typing import TYPE_CHECKING, Any
13
14from music_assistant.constants import VERBOSE_LOG_LEVEL
15
16if TYPE_CHECKING:
17 from collections.abc import Callable
18 from logging import Logger
19
20
21class MetadataReader:
22 """Read and parse metadata from shairport-sync metadata pipe."""
23
24 def __init__(
25 self,
26 metadata_pipe: str,
27 logger: Logger,
28 on_metadata: Callable[[dict[str, Any]], None] | None = None,
29 ) -> None:
30 """Initialize metadata reader.
31
32 :param metadata_pipe: Path to the metadata pipe.
33 :param logger: Logger instance.
34 :param on_metadata: Callback function for metadata updates.
35 """
36 self.metadata_pipe = metadata_pipe
37 self.logger = logger
38 self.on_metadata = on_metadata
39 self._reader_task: asyncio.Task[None] | None = None
40 self._stop = False
41 self._current_metadata: dict[str, Any] = {}
42 self._fd: int | None = None
43 self._buffer = ""
44 self.cover_art_bytes: bytes | None = None
45
46 async def start(self) -> None:
47 """Start reading metadata from the pipe."""
48 self._stop = False
49 self._reader_task = asyncio.create_task(self._read_metadata())
50
51 async def stop(self) -> None:
52 """Stop reading metadata."""
53 self._stop = True
54 if self._reader_task and not self._reader_task.done():
55 self._reader_task.cancel()
56 with suppress(asyncio.CancelledError):
57 await self._reader_task
58
59 async def _read_metadata(self) -> None:
60 """Read metadata from the pipe using async file descriptor."""
61 loop = asyncio.get_event_loop()
62 try:
63 # Open the metadata pipe in non-blocking mode
64 # Use O_RDONLY | O_NONBLOCK to avoid blocking on open
65 self._fd = await loop.run_in_executor(
66 None, os.open, self.metadata_pipe, os.O_RDONLY | os.O_NONBLOCK
67 )
68
69 # Create an asyncio.Event to signal when data is available
70 data_available = asyncio.Event()
71
72 def on_readable() -> None:
73 """Set data available flag when file descriptor is readable."""
74 data_available.set()
75
76 # Register the file descriptor with the event loop
77 loop.add_reader(self._fd, on_readable)
78
79 try:
80 while not self._stop:
81 # Wait for data to be available
82 await data_available.wait()
83 data_available.clear()
84
85 # Read available data from the pipe
86 try:
87 chunk = os.read(self._fd, 4096)
88 if chunk:
89 # Decode as text and add to buffer
90 self._buffer += chunk.decode("utf-8", errors="ignore")
91 # Process all complete metadata items in the buffer
92 self._process_buffer()
93 except BlockingIOError:
94 # No data available right now, wait for next notification
95 continue
96 except OSError as err:
97 self.logger.debug("Error reading from pipe: %s", err)
98 await asyncio.sleep(0.1)
99
100 finally:
101 # Remove the reader callback
102 loop.remove_reader(self._fd)
103
104 except Exception as err:
105 self.logger.error("Error reading metadata pipe: %s", err)
106 finally:
107 if self._fd is not None:
108 with suppress(OSError):
109 os.close(self._fd)
110 self._fd = None
111
112 def _process_buffer(self) -> None:
113 """Process all complete metadata items in the buffer (XML format or plain text markers)."""
114 # First, check for plain text markers from sessioncontrol hooks
115 while "\n" in self._buffer:
116 # Check if we have a complete line before any XML
117 line_end = self._buffer.index("\n")
118 if "<item>" not in self._buffer or self._buffer.index("<item>") > line_end:
119 # We have a plain text line before any XML
120 line = self._buffer[:line_end].strip()
121 self._buffer = self._buffer[line_end + 1 :]
122
123 # Handle our custom markers
124 if line == "MA_PLAY_BEGIN":
125 self.logger.info("Playback started (via sessioncontrol hook)")
126 if self.on_metadata:
127 self.on_metadata({"play_state": "playing"})
128 elif line == "MA_PLAY_END":
129 self.logger.info("Playback ended (via sessioncontrol hook)")
130 if self.on_metadata:
131 self.on_metadata({"play_state": "stopped"})
132 # Ignore other plain text lines
133 else:
134 # XML item comes first, stop looking for lines
135 break
136
137 # Look for complete <item>...</item> blocks
138 while "<item>" in self._buffer and "</item>" in self._buffer:
139 try:
140 # Find the boundaries of the next item
141 start_idx = self._buffer.index("<item>")
142 end_idx = self._buffer.index("</item>") + len("</item>")
143
144 # Extract the item
145 item_xml = self._buffer[start_idx:end_idx]
146
147 # Remove processed item from buffer
148 self._buffer = self._buffer[end_idx:]
149
150 # Parse the item
151 self._parse_xml_item(item_xml)
152
153 except (ValueError, IndexError) as err:
154 self.logger.debug("Error processing buffer: %s", err)
155 # Clear malformed data
156 if "</item>" in self._buffer:
157 # Skip to after the next </item>
158 self._buffer = self._buffer[self._buffer.index("</item>") + len("</item>") :]
159 else:
160 # Wait for more data
161 break
162 except Exception as err:
163 self.logger.error("Unexpected error processing buffer: %s", err)
164 # Clear the buffer on unexpected error
165 self._buffer = ""
166 break
167
168 def _parse_xml_item(self, item_xml: str) -> None:
169 """Parse a single XML metadata item.
170
171 :param item_xml: XML string containing a metadata item.
172 """
173 try:
174 # Extract type (hex format)
175 type_match = re.search(r"<type>([0-9a-fA-F]{8})</type>", item_xml)
176 code_match = re.search(r"<code>([0-9a-fA-F]{8})</code>", item_xml)
177 length_match = re.search(r"<length>(\d+)</length>", item_xml)
178
179 if not type_match or not code_match or not length_match:
180 return
181
182 # Convert hex type and code to ASCII strings
183 type_hex = int(type_match.group(1), 16)
184 code_hex = int(code_match.group(1), 16)
185 length = int(length_match.group(1))
186
187 # Convert hex to 4-character ASCII codes
188 type_str = type_hex.to_bytes(4, "big").decode("ascii", errors="ignore")
189 code_str = code_hex.to_bytes(4, "big").decode("ascii", errors="ignore")
190
191 # Extract data if present
192 data: str | bytes | None = None
193 if length > 0:
194 data_match = re.search(r"<data encoding=\"base64\">([^<]+)</data>", item_xml)
195 if data_match:
196 try:
197 # Decode base64 data
198 data_b64 = data_match.group(1).strip()
199 decoded_data = base64.b64decode(data_b64)
200
201 # For binary fields (PICT, astm), keep as raw bytes
202 # For text fields, decode to UTF-8
203 if code_str in ("PICT", "astm"):
204 # Cover art and duration: keep as raw bytes
205 data = decoded_data
206 else:
207 # Text metadata: decode to UTF-8
208 data = decoded_data.decode("utf-8", errors="ignore")
209 except Exception as err:
210 self.logger.debug("Error decoding base64 data: %s", err)
211
212 # Process the metadata item
213 asyncio.create_task(self._process_metadata_item(type_str, code_str, data))
214
215 except Exception as err:
216 self.logger.debug("Error parsing XML item: %s", err)
217
218 async def _process_metadata_item(
219 self, item_type: str, code: str, data: str | bytes | None
220 ) -> None:
221 """Process a metadata item and update current metadata.
222
223 :param item_type: Type of metadata (e.g., 'core' or 'ssnc').
224 :param code: Metadata code identifier.
225 :param data: Optional metadata data (string, bytes, or None).
226 """
227 # Don't log binary data (like cover art)
228 if code == "PICT":
229 self.logger.log(
230 VERBOSE_LOG_LEVEL,
231 "Metadata: type=%s, code=%s, data=<binary image data>",
232 item_type,
233 code,
234 )
235 else:
236 self.logger.log(
237 VERBOSE_LOG_LEVEL, "Metadata: type=%s, code=%s, data=%s", item_type, code, data
238 )
239
240 # Handle metadata start/end markers
241 if item_type == "ssnc" and code == "mdst":
242 self._current_metadata = {}
243 # Note: We don't clear cover_art_bytes here because:
244 # 1. Cover art may arrive before mdst (at playback start)
245 # 2. New cover art will overwrite old bytes when it arrives
246 # 3. Cache-busting timestamp ensures browser gets correct image
247 if self.on_metadata:
248 self.on_metadata({"metadata_start": True})
249 return
250
251 if item_type == "ssnc" and code == "mden":
252 if self.on_metadata and self._current_metadata:
253 self.on_metadata(dict(self._current_metadata))
254 return
255
256 # Parse core metadata (from iTunes/iOS)
257 if item_type == "core" and data is not None:
258 self._parse_core_metadata(code, data)
259
260 # Parse shairport-sync metadata
261 if item_type == "ssnc" and data is not None:
262 self._parse_ssnc_metadata(code, data)
263
264 def _parse_core_metadata(self, code: str, data: str | bytes) -> None:
265 """Parse core metadata from iTunes/iOS.
266
267 :param code: Metadata code identifier.
268 :param data: Metadata data.
269 """
270 # Text metadata fields - expect string data
271 if isinstance(data, str):
272 if code == "asar": # Artist
273 self._current_metadata["artist"] = data
274 elif code == "asal": # Album
275 self._current_metadata["album"] = data
276 elif code == "minm": # Title
277 self._current_metadata["title"] = data
278
279 # Binary metadata fields - expect bytes data
280 elif isinstance(data, bytes):
281 if code == "PICT": # Cover art (raw bytes)
282 # Store raw bytes for later retrieval via resolve_image
283 self.cover_art_bytes = data
284 self.logger.debug("Stored cover art: %d bytes", len(data))
285 # Signal that cover art is available with timestamp for cache-busting
286 timestamp = str(int(time.time() * 1000))
287 self._current_metadata["cover_art_timestamp"] = timestamp
288 # Send cover art update immediately (cover art often arrives in separate block)
289 if self.on_metadata:
290 self.on_metadata({"cover_art_timestamp": timestamp})
291 elif code == "astm": # Track duration in milliseconds (stored as 32-bit big-endian int)
292 try:
293 # Duration is sent as 4-byte big-endian integer
294 if len(data) >= 4:
295 duration_ms = struct.unpack(">I", data[:4])[0]
296 self._current_metadata["duration"] = duration_ms // 1000
297 except (ValueError, TypeError, struct.error) as err:
298 self.logger.debug("Error parsing duration: %s", err)
299
300 def _parse_ssnc_metadata(self, code: str, data: str | bytes) -> None:
301 """Parse shairport-sync metadata.
302
303 :param code: Metadata code identifier.
304 :param data: Metadata data.
305 """
306 # Handle binary data (cover art can come as ssnc type)
307 if isinstance(data, bytes):
308 if code == "PICT": # Cover art (raw bytes)
309 # Store raw bytes for later retrieval via resolve_image
310 self.cover_art_bytes = data
311 self.logger.debug("Stored cover art: %d bytes", len(data))
312 # Signal that cover art is available with timestamp for cache-busting
313 timestamp = str(int(time.time() * 1000))
314 self._current_metadata["cover_art_timestamp"] = timestamp
315 # Send cover art update immediately (cover art often arrives in separate block)
316 if self.on_metadata:
317 self.on_metadata({"cover_art_timestamp": timestamp})
318 return
319
320 # Process string data for ssnc metadata (volume/progress are text-based)
321 if code == "pvol": # Volume
322 self._parse_volume(data)
323 # Send volume updates immediately (not batched with mden)
324 if self.on_metadata and "volume" in self._current_metadata:
325 self.on_metadata({"volume": self._current_metadata["volume"]})
326 elif code == "prgr": # Progress
327 self._parse_progress(data)
328 # Send progress updates immediately (not batched with mden)
329 if self.on_metadata and "elapsed_time" in self._current_metadata:
330 self.on_metadata({"elapsed_time": self._current_metadata["elapsed_time"]})
331 elif code == "paus": # Paused
332 self._current_metadata["paused"] = True
333 elif code == "prsm": # Playing/resumed
334 self._current_metadata["paused"] = False
335
336 def _parse_volume(self, data: str) -> None:
337 """Parse volume metadata from shairport-sync.
338
339 Format: airplay_volume,min_volume,max_volume,mute
340 AirPlay volume is in dB, typically ranging from -30.0 (silent) to 0.0 (max).
341 Special value -144.0 means muted.
342
343 :param data: Volume data string (e.g., "-21.88,0.00,0.00,0.00").
344 """
345 try:
346 parts = data.split(",")
347 if len(parts) >= 1:
348 airplay_volume = float(parts[0])
349 # -144.0 means muted
350 if airplay_volume <= -144.0:
351 volume_percent = 0
352 else:
353 # Convert dB to percentage: -30dB = 0%, 0dB = 100%
354 volume_percent = int(((airplay_volume + 30.0) / 30.0) * 100)
355 volume_percent = max(0, min(100, volume_percent))
356 self._current_metadata["volume"] = volume_percent
357 except (ValueError, IndexError) as err:
358 self.logger.debug("Error parsing volume: %s", err)
359
360 def _parse_progress(self, data: str) -> None:
361 """Parse progress metadata.
362
363 :param data: Progress data string.
364 """
365 try:
366 parts = data.split("/")
367 if len(parts) >= 3:
368 start_rtp = int(parts[0])
369 current_rtp = int(parts[1])
370 elapsed_frames = current_rtp - start_rtp
371 elapsed_seconds = elapsed_frames / 44100
372 self._current_metadata["elapsed_time"] = int(elapsed_seconds)
373 except (ValueError, IndexError) as err:
374 self.logger.debug("Error parsing progress: %s", err)
375