music-assistant-server

15.6 KBPY
metadata.py
15.6 KB375 lines • python
1"""Metadata reader for shairport-sync metadata pipe."""
2
3from __future__ import annotations
4
5import asyncio
6import base64
7import os
8import re
9import struct
10import time
11from contextlib import suppress
12from typing import TYPE_CHECKING, Any
13
14from music_assistant.constants import VERBOSE_LOG_LEVEL
15
16if TYPE_CHECKING:
17    from collections.abc import Callable
18    from logging import Logger
19
20
21class MetadataReader:
22    """Read and parse metadata from shairport-sync metadata pipe."""
23
24    def __init__(
25        self,
26        metadata_pipe: str,
27        logger: Logger,
28        on_metadata: Callable[[dict[str, Any]], None] | None = None,
29    ) -> None:
30        """Initialize metadata reader.
31
32        :param metadata_pipe: Path to the metadata pipe.
33        :param logger: Logger instance.
34        :param on_metadata: Callback function for metadata updates.
35        """
36        self.metadata_pipe = metadata_pipe
37        self.logger = logger
38        self.on_metadata = on_metadata
39        self._reader_task: asyncio.Task[None] | None = None
40        self._stop = False
41        self._current_metadata: dict[str, Any] = {}
42        self._fd: int | None = None
43        self._buffer = ""
44        self.cover_art_bytes: bytes | None = None
45
46    async def start(self) -> None:
47        """Start reading metadata from the pipe."""
48        self._stop = False
49        self._reader_task = asyncio.create_task(self._read_metadata())
50
51    async def stop(self) -> None:
52        """Stop reading metadata."""
53        self._stop = True
54        if self._reader_task and not self._reader_task.done():
55            self._reader_task.cancel()
56            with suppress(asyncio.CancelledError):
57                await self._reader_task
58
59    async def _read_metadata(self) -> None:
60        """Read metadata from the pipe using async file descriptor."""
61        loop = asyncio.get_event_loop()
62        try:
63            # Open the metadata pipe in non-blocking mode
64            # Use O_RDONLY | O_NONBLOCK to avoid blocking on open
65            self._fd = await loop.run_in_executor(
66                None, os.open, self.metadata_pipe, os.O_RDONLY | os.O_NONBLOCK
67            )
68
69            # Create an asyncio.Event to signal when data is available
70            data_available = asyncio.Event()
71
72            def on_readable() -> None:
73                """Set data available flag when file descriptor is readable."""
74                data_available.set()
75
76            # Register the file descriptor with the event loop
77            loop.add_reader(self._fd, on_readable)
78
79            try:
80                while not self._stop:
81                    # Wait for data to be available
82                    await data_available.wait()
83                    data_available.clear()
84
85                    # Read available data from the pipe
86                    try:
87                        chunk = os.read(self._fd, 4096)
88                        if chunk:
89                            # Decode as text and add to buffer
90                            self._buffer += chunk.decode("utf-8", errors="ignore")
91                            # Process all complete metadata items in the buffer
92                            self._process_buffer()
93                    except BlockingIOError:
94                        # No data available right now, wait for next notification
95                        continue
96                    except OSError as err:
97                        self.logger.debug("Error reading from pipe: %s", err)
98                        await asyncio.sleep(0.1)
99
100            finally:
101                # Remove the reader callback
102                loop.remove_reader(self._fd)
103
104        except Exception as err:
105            self.logger.error("Error reading metadata pipe: %s", err)
106        finally:
107            if self._fd is not None:
108                with suppress(OSError):
109                    os.close(self._fd)
110                self._fd = None
111
112    def _process_buffer(self) -> None:
113        """Process all complete metadata items in the buffer (XML format or plain text markers)."""
114        # First, check for plain text markers from sessioncontrol hooks
115        while "\n" in self._buffer:
116            # Check if we have a complete line before any XML
117            line_end = self._buffer.index("\n")
118            if "<item>" not in self._buffer or self._buffer.index("<item>") > line_end:
119                # We have a plain text line before any XML
120                line = self._buffer[:line_end].strip()
121                self._buffer = self._buffer[line_end + 1 :]
122
123                # Handle our custom markers
124                if line == "MA_PLAY_BEGIN":
125                    self.logger.info("Playback started (via sessioncontrol hook)")
126                    if self.on_metadata:
127                        self.on_metadata({"play_state": "playing"})
128                elif line == "MA_PLAY_END":
129                    self.logger.info("Playback ended (via sessioncontrol hook)")
130                    if self.on_metadata:
131                        self.on_metadata({"play_state": "stopped"})
132                # Ignore other plain text lines
133            else:
134                # XML item comes first, stop looking for lines
135                break
136
137        # Look for complete <item>...</item> blocks
138        while "<item>" in self._buffer and "</item>" in self._buffer:
139            try:
140                # Find the boundaries of the next item
141                start_idx = self._buffer.index("<item>")
142                end_idx = self._buffer.index("</item>") + len("</item>")
143
144                # Extract the item
145                item_xml = self._buffer[start_idx:end_idx]
146
147                # Remove processed item from buffer
148                self._buffer = self._buffer[end_idx:]
149
150                # Parse the item
151                self._parse_xml_item(item_xml)
152
153            except (ValueError, IndexError) as err:
154                self.logger.debug("Error processing buffer: %s", err)
155                # Clear malformed data
156                if "</item>" in self._buffer:
157                    # Skip to after the next </item>
158                    self._buffer = self._buffer[self._buffer.index("</item>") + len("</item>") :]
159                else:
160                    # Wait for more data
161                    break
162            except Exception as err:
163                self.logger.error("Unexpected error processing buffer: %s", err)
164                # Clear the buffer on unexpected error
165                self._buffer = ""
166                break
167
168    def _parse_xml_item(self, item_xml: str) -> None:
169        """Parse a single XML metadata item.
170
171        :param item_xml: XML string containing a metadata item.
172        """
173        try:
174            # Extract type (hex format)
175            type_match = re.search(r"<type>([0-9a-fA-F]{8})</type>", item_xml)
176            code_match = re.search(r"<code>([0-9a-fA-F]{8})</code>", item_xml)
177            length_match = re.search(r"<length>(\d+)</length>", item_xml)
178
179            if not type_match or not code_match or not length_match:
180                return
181
182            # Convert hex type and code to ASCII strings
183            type_hex = int(type_match.group(1), 16)
184            code_hex = int(code_match.group(1), 16)
185            length = int(length_match.group(1))
186
187            # Convert hex to 4-character ASCII codes
188            type_str = type_hex.to_bytes(4, "big").decode("ascii", errors="ignore")
189            code_str = code_hex.to_bytes(4, "big").decode("ascii", errors="ignore")
190
191            # Extract data if present
192            data: str | bytes | None = None
193            if length > 0:
194                data_match = re.search(r"<data encoding=\"base64\">([^<]+)</data>", item_xml)
195                if data_match:
196                    try:
197                        # Decode base64 data
198                        data_b64 = data_match.group(1).strip()
199                        decoded_data = base64.b64decode(data_b64)
200
201                        # For binary fields (PICT, astm), keep as raw bytes
202                        # For text fields, decode to UTF-8
203                        if code_str in ("PICT", "astm"):
204                            # Cover art and duration: keep as raw bytes
205                            data = decoded_data
206                        else:
207                            # Text metadata: decode to UTF-8
208                            data = decoded_data.decode("utf-8", errors="ignore")
209                    except Exception as err:
210                        self.logger.debug("Error decoding base64 data: %s", err)
211
212            # Process the metadata item
213            asyncio.create_task(self._process_metadata_item(type_str, code_str, data))
214
215        except Exception as err:
216            self.logger.debug("Error parsing XML item: %s", err)
217
218    async def _process_metadata_item(
219        self, item_type: str, code: str, data: str | bytes | None
220    ) -> None:
221        """Process a metadata item and update current metadata.
222
223        :param item_type: Type of metadata (e.g., 'core' or 'ssnc').
224        :param code: Metadata code identifier.
225        :param data: Optional metadata data (string, bytes, or None).
226        """
227        # Don't log binary data (like cover art)
228        if code == "PICT":
229            self.logger.log(
230                VERBOSE_LOG_LEVEL,
231                "Metadata: type=%s, code=%s, data=<binary image data>",
232                item_type,
233                code,
234            )
235        else:
236            self.logger.log(
237                VERBOSE_LOG_LEVEL, "Metadata: type=%s, code=%s, data=%s", item_type, code, data
238            )
239
240        # Handle metadata start/end markers
241        if item_type == "ssnc" and code == "mdst":
242            self._current_metadata = {}
243            # Note: We don't clear cover_art_bytes here because:
244            # 1. Cover art may arrive before mdst (at playback start)
245            # 2. New cover art will overwrite old bytes when it arrives
246            # 3. Cache-busting timestamp ensures browser gets correct image
247            if self.on_metadata:
248                self.on_metadata({"metadata_start": True})
249            return
250
251        if item_type == "ssnc" and code == "mden":
252            if self.on_metadata and self._current_metadata:
253                self.on_metadata(dict(self._current_metadata))
254            return
255
256        # Parse core metadata (from iTunes/iOS)
257        if item_type == "core" and data is not None:
258            self._parse_core_metadata(code, data)
259
260        # Parse shairport-sync metadata
261        if item_type == "ssnc" and data is not None:
262            self._parse_ssnc_metadata(code, data)
263
264    def _parse_core_metadata(self, code: str, data: str | bytes) -> None:
265        """Parse core metadata from iTunes/iOS.
266
267        :param code: Metadata code identifier.
268        :param data: Metadata data.
269        """
270        # Text metadata fields - expect string data
271        if isinstance(data, str):
272            if code == "asar":  # Artist
273                self._current_metadata["artist"] = data
274            elif code == "asal":  # Album
275                self._current_metadata["album"] = data
276            elif code == "minm":  # Title
277                self._current_metadata["title"] = data
278
279        # Binary metadata fields - expect bytes data
280        elif isinstance(data, bytes):
281            if code == "PICT":  # Cover art (raw bytes)
282                # Store raw bytes for later retrieval via resolve_image
283                self.cover_art_bytes = data
284                self.logger.debug("Stored cover art: %d bytes", len(data))
285                # Signal that cover art is available with timestamp for cache-busting
286                timestamp = str(int(time.time() * 1000))
287                self._current_metadata["cover_art_timestamp"] = timestamp
288                # Send cover art update immediately (cover art often arrives in separate block)
289                if self.on_metadata:
290                    self.on_metadata({"cover_art_timestamp": timestamp})
291            elif code == "astm":  # Track duration in milliseconds (stored as 32-bit big-endian int)
292                try:
293                    # Duration is sent as 4-byte big-endian integer
294                    if len(data) >= 4:
295                        duration_ms = struct.unpack(">I", data[:4])[0]
296                        self._current_metadata["duration"] = duration_ms // 1000
297                except (ValueError, TypeError, struct.error) as err:
298                    self.logger.debug("Error parsing duration: %s", err)
299
300    def _parse_ssnc_metadata(self, code: str, data: str | bytes) -> None:
301        """Parse shairport-sync metadata.
302
303        :param code: Metadata code identifier.
304        :param data: Metadata data.
305        """
306        # Handle binary data (cover art can come as ssnc type)
307        if isinstance(data, bytes):
308            if code == "PICT":  # Cover art (raw bytes)
309                # Store raw bytes for later retrieval via resolve_image
310                self.cover_art_bytes = data
311                self.logger.debug("Stored cover art: %d bytes", len(data))
312                # Signal that cover art is available with timestamp for cache-busting
313                timestamp = str(int(time.time() * 1000))
314                self._current_metadata["cover_art_timestamp"] = timestamp
315                # Send cover art update immediately (cover art often arrives in separate block)
316                if self.on_metadata:
317                    self.on_metadata({"cover_art_timestamp": timestamp})
318            return
319
320        # Process string data for ssnc metadata (volume/progress are text-based)
321        if code == "pvol":  # Volume
322            self._parse_volume(data)
323            # Send volume updates immediately (not batched with mden)
324            if self.on_metadata and "volume" in self._current_metadata:
325                self.on_metadata({"volume": self._current_metadata["volume"]})
326        elif code == "prgr":  # Progress
327            self._parse_progress(data)
328            # Send progress updates immediately (not batched with mden)
329            if self.on_metadata and "elapsed_time" in self._current_metadata:
330                self.on_metadata({"elapsed_time": self._current_metadata["elapsed_time"]})
331        elif code == "paus":  # Paused
332            self._current_metadata["paused"] = True
333        elif code == "prsm":  # Playing/resumed
334            self._current_metadata["paused"] = False
335
336    def _parse_volume(self, data: str) -> None:
337        """Parse volume metadata from shairport-sync.
338
339        Format: airplay_volume,min_volume,max_volume,mute
340        AirPlay volume is in dB, typically ranging from -30.0 (silent) to 0.0 (max).
341        Special value -144.0 means muted.
342
343        :param data: Volume data string (e.g., "-21.88,0.00,0.00,0.00").
344        """
345        try:
346            parts = data.split(",")
347            if len(parts) >= 1:
348                airplay_volume = float(parts[0])
349                # -144.0 means muted
350                if airplay_volume <= -144.0:
351                    volume_percent = 0
352                else:
353                    # Convert dB to percentage: -30dB = 0%, 0dB = 100%
354                    volume_percent = int(((airplay_volume + 30.0) / 30.0) * 100)
355                    volume_percent = max(0, min(100, volume_percent))
356                self._current_metadata["volume"] = volume_percent
357        except (ValueError, IndexError) as err:
358            self.logger.debug("Error parsing volume: %s", err)
359
360    def _parse_progress(self, data: str) -> None:
361        """Parse progress metadata.
362
363        :param data: Progress data string.
364        """
365        try:
366            parts = data.split("/")
367            if len(parts) >= 3:
368                start_rtp = int(parts[0])
369                current_rtp = int(parts[1])
370                elapsed_frames = current_rtp - start_rtp
371                elapsed_seconds = elapsed_frames / 44100
372                self._current_metadata["elapsed_time"] = int(elapsed_seconds)
373        except (ValueError, IndexError) as err:
374            self.logger.debug("Error parsing progress: %s", err)
375