music-assistant-server

22.8 KBPY
__init__.py
22.8 KB566 lines • python
1"""AriaCast Receiver Plugin Provider."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections import deque
8from collections.abc import AsyncGenerator
9from contextlib import suppress
10from typing import TYPE_CHECKING, Any
11
12import aiohttp
13from aiohttp import ClientTimeout
14from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
15from music_assistant_models.enums import (
16    ConfigEntryType,
17    ContentType,
18    ImageType,
19    PlaybackState,
20    ProviderFeature,
21    StreamType,
22)
23from music_assistant_models.media_items import AudioFormat, MediaItemImage
24from music_assistant_models.streamdetails import StreamMetadata
25
26from music_assistant.constants import CONF_ENTRY_WARN_PREVIEW
27from music_assistant.helpers.process import AsyncProcess
28from music_assistant.models.plugin import PluginProvider, PluginSource
29
30from .helpers import _get_binary_path
31
32if TYPE_CHECKING:
33    from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
34    from music_assistant_models.provider import ProviderManifest
35
36    from music_assistant.mass import MusicAssistant
37    from music_assistant.models import ProviderInstanceType
38
39CONF_MASS_PLAYER_ID = "mass_player_id"
40CONF_ALLOW_PLAYER_SWITCH = "allow_player_switch"
41
42
43PLAYER_ID_AUTO = "__auto__"
44SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE}
45
46
47async def setup(
48    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
49) -> ProviderInstanceType:
50    """Initialize provider(instance) with given configuration."""
51    return AriaCastBridge(mass, manifest, config)
52
53
54async def get_config_entries(
55    mass: MusicAssistant,
56    instance_id: str | None = None,  # noqa: ARG001
57    action: str | None = None,  # noqa: ARG001
58    values: dict[str, ConfigValueType] | None = None,  # noqa: ARG001
59) -> tuple[ConfigEntry, ...]:
60    """Return Config entries to setup this provider."""
61    return (
62        CONF_ENTRY_WARN_PREVIEW,
63        ConfigEntry(
64            key=CONF_MASS_PLAYER_ID,
65            type=ConfigEntryType.STRING,
66            label="Connected Music Assistant Player",
67            description="The player to use for playback.",
68            default_value=PLAYER_ID_AUTO,
69            options=[
70                ConfigValueOption("Auto (prefer playing player)", PLAYER_ID_AUTO),
71                *(
72                    ConfigValueOption(x.display_name, x.player_id)
73                    for x in sorted(
74                        mass.players.all_players(False, False), key=lambda p: p.display_name.lower()
75                    )
76                ),
77            ],
78            required=True,
79        ),
80        ConfigEntry(
81            key=CONF_ALLOW_PLAYER_SWITCH,
82            type=ConfigEntryType.BOOLEAN,
83            label="Allow manual player switching",
84            default_value=True,
85        ),
86    )
87
88
89class AriaCastBridge(PluginProvider):
90    """Bridge for the AriaCast Go Binary."""
91
92    def __init__(
93        self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
94    ) -> None:
95        """Initialize AriaCast Receiver."""
96        super().__init__(mass, manifest, config, SUPPORTED_FEATURES)
97        self._default_player_id = str(config.get_value(CONF_MASS_PLAYER_ID))
98        self._allow_player_switch = bool(config.get_value(CONF_ALLOW_PLAYER_SWITCH))
99
100        # Process
101        self._binary_process: AsyncProcess | None = None
102
103        # Internal State
104        self._active_player_id: str | None = None
105        self._metadata_task: asyncio.Task[None] | None = None
106        self._stdout_reader_task: asyncio.Task[None] | None = None
107        self._stop_called = False
108        self._binary_is_playing: bool = False  # Track binary playback state
109        self._current_track_title: str | None = None  # Track song changes
110
111        # Audio buffer - larger for high-latency players like Sendspin
112        self.max_frames = 75  # 1.5 second buffer (75 frames * 20ms each)
113        self.frame_queue: deque[bytes] = deque(maxlen=self.max_frames)
114        self.frame_available = asyncio.Event()
115        self._buffering = True  # Start in buffering mode
116
117        # Artwork storage
118        self._artwork_bytes: bytes | None = None
119        self._artwork_timestamp: int = 0
120
121        # Define the Source
122        self._source_details = PluginSource(
123            id=self.instance_id,
124            name=self.name,
125            passive=not self._allow_player_switch,
126            can_play_pause=True,  # Binary stops stdout writes when paused
127            can_seek=False,
128            can_next_previous=True,
129            audio_format=AudioFormat(
130                content_type=ContentType.PCM_S16LE,
131                sample_rate=48000,
132                bit_depth=16,
133                channels=2,
134            ),
135            metadata=StreamMetadata(title="AriaCast Ready"),
136            stream_type=StreamType.CUSTOM,
137        )
138
139        # Bind Hooks
140        self._source_details.on_select = self._on_source_selected
141        self._source_details.on_play = self._cmd_play
142        self._source_details.on_pause = self._cmd_pause
143        self._source_details.on_next = self._cmd_next
144        self._source_details.on_previous = self._cmd_previous
145
146    async def handle_async_init(self) -> None:
147        """Start the provider."""
148        # Launch Binary with stdout and stderr mode
149        binary_path = await asyncio.to_thread(_get_binary_path)
150        args = [binary_path, "--stdout"]
151
152        self.logger.info("Starting AriaCast binary: %s", binary_path)
153        self._binary_process = AsyncProcess(args, name="ariacast", stdout=True, stderr=True)
154        await self._binary_process.start()
155
156        # Start Metadata Monitor
157        self._metadata_task = self.mass.create_task(self._monitor_metadata())
158
159        # Start Stdout Reader (feeds the frame queue)
160        self._stdout_reader_task = self.mass.create_task(self._read_stdout_to_queue())
161
162        # Start Stderr Reader (logging)
163        self.mass.create_task(self._read_stderr())
164
165    async def unload(self, is_removed: bool = False) -> None:
166        """Cleanup resources."""
167        self._stop_called = True
168
169        if self._metadata_task:
170            self._metadata_task.cancel()
171            with suppress(asyncio.CancelledError):
172                await self._metadata_task
173
174        if self._stdout_reader_task:
175            self._stdout_reader_task.cancel()
176            with suppress(asyncio.CancelledError):
177                await self._stdout_reader_task
178
179        if self._binary_process:
180            self.logger.info("Stopping AriaCast binary...")
181            await self._binary_process.close()
182
183    def get_source(self) -> PluginSource:
184        """Return the plugin source details."""
185        return self._source_details
186
187    async def _monitor_metadata(self) -> None:
188        """Connect to local Go binary WebSocket to receive metadata updates."""
189        url = "ws://127.0.0.1:12889/metadata"
190        retry_delay = 1
191
192        while not self._stop_called:
193            try:
194                async with self.mass.http_session.ws_connect(url, heartbeat=30) as ws:
195                    self.logger.info("Connected to AriaCast metadata stream")
196                    retry_delay = 1  # Reset delay on success
197                    async for msg in ws:
198                        if msg.type == aiohttp.WSMsgType.TEXT:
199                            payload = msg.json()
200                            if payload.get("type") == "metadata":
201                                self._update_metadata(payload.get("data", {}))
202                        elif msg.type == aiohttp.WSMsgType.ERROR:
203                            break
204            except Exception as exc:
205                if not self._stop_called:
206                    self.logger.debug(
207                        "WebSocket connection to AriaCast metadata failed: %s. Retrying in %d s...",
208                        exc,
209                        retry_delay,
210                    )
211                    await asyncio.sleep(retry_delay)
212                    retry_delay = min(retry_delay * 2, 60)
213
214    def _update_metadata(self, data: dict[str, Any]) -> None:
215        """Update Music Assistant metadata from Go binary data."""
216        if not self._source_details.metadata:
217            self._source_details.metadata = StreamMetadata(title="AriaCast Ready")
218
219        meta = self._source_details.metadata
220
221        # Detect song change and clear queue to prevent stale audio
222        new_title = data.get("title", "Unknown")
223        self._handle_track_change(new_title)
224
225        meta.title = new_title
226        meta.artist = data.get("artist", "Unknown")
227        meta.album = data.get("album", "Unknown")
228
229        # Handle artwork
230        self._handle_artwork_update(data.get("artwork_url"), meta)
231
232        # Duration & Progress
233        if duration_ms := data.get("duration_ms"):
234            meta.duration = int(duration_ms / 1000)
235
236        if position_ms := data.get("position_ms"):
237            meta.elapsed_time = int(position_ms / 1000)
238            meta.elapsed_time_last_updated = time.time()
239
240        # Handle playback state
241        self._handle_playback_state_update(data.get("is_playing", False))
242
243        # Trigger UI Update
244        if self._source_details.in_use_by:
245            self.mass.players.trigger_player_update(self._source_details.in_use_by)
246
247    def _handle_track_change(self, new_title: str) -> None:
248        """Handle track change detection and queue clearing."""
249        if self._current_track_title and new_title != self._current_track_title:
250            if self._binary_is_playing:  # Only clear on song change during playback
251                self.logger.info(
252                    "Song changed from '%s' to '%s' - clearing audio queue",
253                    self._current_track_title,
254                    new_title,
255                )
256                self.frame_queue.clear()
257                self.frame_available.clear()
258        self._current_track_title = new_title
259
260    def _handle_artwork_update(self, artwork_url: str | None, meta: StreamMetadata) -> None:
261        """Handle artwork detection and download."""
262        if not artwork_url:
263            return
264
265        last_artwork_identifier = getattr(self, "_last_artwork_identifier", None)
266        if artwork_url != last_artwork_identifier:
267            # New artwork detected
268            self.logger.debug(
269                "New artwork detected: %s (was: %s)", artwork_url, last_artwork_identifier
270            )
271            self._last_artwork_identifier = artwork_url
272            # Clear old artwork data to prevent serving stale image
273            self._artwork_bytes = None
274            if meta:
275                meta.image_url = None
276            self.mass.create_task(self._download_artwork())
277
278    def _handle_playback_state_update(self, is_playing: bool) -> None:
279        """Handle binary playback state and player management."""
280        was_playing = self._binary_is_playing
281        self.logger.debug(
282            "Metadata update: is_playing=%s, was_playing=%s, active=%s, in_use=%s",
283            is_playing,
284            was_playing,
285            self._active_player_id,
286            self._source_details.in_use_by,
287        )
288
289        # Track binary state
290        self._binary_is_playing = is_playing
291
292        if is_playing and not self._source_details.in_use_by:
293            # Binary is playing but no player is consuming the stream
294            if self._active_player_id:
295                # Resume after pause - reclaim the same player
296                self.logger.info(
297                    "App resumed playback, reclaiming player %s", self._active_player_id
298                )
299                # Clear queue before resuming to remove old silence/data
300                self.frame_queue.clear()
301                self.frame_available.clear()
302                self._source_details.in_use_by = self._active_player_id
303                self.mass.players.trigger_player_update(self._active_player_id)
304                self.mass.create_task(
305                    self.mass.players.select_source(self._active_player_id, self.instance_id)
306                )
307            else:
308                # First time playing - auto-select a player
309                self._handle_auto_play()
310        elif not is_playing and was_playing and self._source_details.in_use_by:
311            # App paused playback - release the player
312            self.logger.info("App paused playback, releasing player")
313            self._active_player_id = self._source_details.in_use_by
314            self._source_details.in_use_by = None
315            # Clear queue to prevent old silence from accumulating
316            self.frame_queue.clear()
317            self.frame_available.clear()
318            self.mass.players.trigger_player_update(self._active_player_id)
319
320    def _handle_auto_play(self) -> None:
321        """Automatically select a player when music starts."""
322        target_id = self._get_target_player_id()
323        if target_id:
324            self._active_player_id = target_id
325            self._source_details.in_use_by = target_id
326            self.mass.create_task(self.mass.players.select_source(target_id, self.instance_id))
327
328    # --- Command Wrappers ---
329
330    async def _cmd_play(self) -> None:
331        """Send play command."""
332        self.logger.info("PLAY command")
333
334        # If player was released on pause, reclaim it
335        if not self._source_details.in_use_by and self._active_player_id:
336            # Clear queue before resuming to remove old silence/data
337            self.frame_queue.clear()
338            self.frame_available.clear()
339            self._source_details.in_use_by = self._active_player_id
340            self.mass.players.trigger_player_update(self._active_player_id)
341            # Restart playback on the player
342            await self.mass.players.select_source(self._active_player_id, self.instance_id)
343
344        await self._send_api_command("play")
345
346    async def _cmd_pause(self) -> None:
347        """Send pause command."""
348        self.logger.info("PAUSE command")
349
350        # Release the player (like Spotify Connect does) - this makes MA show it as idle
351        # Keep track of active_player_id so we can reclaim it on resume
352        if self._source_details.in_use_by:
353            self._active_player_id = self._source_details.in_use_by
354            self._source_details.in_use_by = None
355            self.mass.players.trigger_player_update(self._active_player_id)
356
357        # Clear the frame queue to prevent old silence from being played on resume
358        self.frame_queue.clear()
359        self.frame_available.clear()
360
361        await self._send_api_command("pause")
362
363    async def _cmd_next(self) -> None:
364        """Send next-track command."""
365        await self._send_api_command("next")
366
367    async def _cmd_previous(self) -> None:
368        """Send previous-track command."""
369        await self._send_api_command("previous")
370
371    async def _send_api_command(self, action: str) -> None:
372        """Send control command (POST) using shared session."""
373        url = "http://127.0.0.1:12889/api/command"
374        try:
375            async with self.mass.http_session.post(url, json={"action": action}) as response:
376                body = await response.text()
377                if not 200 <= response.status < 300:
378                    self.logger.warning(
379                        "Command '%s' failed with HTTP %s: %s",
380                        action,
381                        response.status,
382                        body,
383                    )
384        except Exception as e:
385            self.logger.warning("Failed to send command '%s': %s", action, e)
386
387    async def _download_artwork(self) -> None:
388        """Fetch artwork bytes from Go binary."""
389        # Add a small delay to ensure binary has rotated the image
390        await asyncio.sleep(0.2)
391        artwork_url = "http://127.0.0.1:12889/image/artwork"
392        self.logger.debug("Downloading artwork from %s", artwork_url)
393        try:
394            async with self.mass.http_session.get(
395                artwork_url, timeout=ClientTimeout(total=5)
396            ) as response:
397                if response.status == 200:
398                    img_data = await response.read()
399                    if img_data:
400                        self._artwork_bytes = img_data
401                        self._artwork_timestamp = int(time.time() * 1000)
402                        self.logger.info(
403                            "Artwork downloaded successfully, size: %d bytes", len(img_data)
404                        )
405
406                        image = MediaItemImage(
407                            type=ImageType.THUMB,
408                            path="artwork",
409                            provider=self.instance_id,
410                            remotely_accessible=False,
411                        )
412                        base_url = self.mass.metadata.get_image_url(image)
413
414                        if self._source_details.metadata:
415                            self._source_details.metadata.image_url = (
416                                f"{base_url}&t={self._artwork_timestamp}"
417                            )
418
419                        if self._source_details.in_use_by:
420                            self.mass.players.trigger_player_update(self._source_details.in_use_by)
421                else:
422                    self.logger.warning("Failed to download artwork: HTTP %s", response.status)
423        except Exception as e:
424            self.logger.debug("Failed to download artwork: %s", e)
425
426    async def resolve_image(self, path: str) -> bytes:
427        """Return raw image bytes to Music Assistant."""
428        if path == "artwork" and self._artwork_bytes:
429            return self._artwork_bytes
430        return b""
431
432    async def _read_stdout_to_queue(self) -> None:
433        """Background task to read from binary stdout and populate frame queue."""
434        frame_size = 3840  # 20ms of 48kHz stereo 16-bit
435
436        if not self._binary_process:
437            self.logger.error("Cannot read stdout: binary process not started")
438            return
439
440        self.logger.info("Starting to read audio from binary stdout")
441
442        try:
443            # Read from stdout in chunks
444            while not self._stop_called:
445                try:
446                    # Read exactly one frame from stdout
447                    data = await self._binary_process.read(frame_size)
448
449                    if not data:
450                        # Process ended or no more data
451                        self.logger.debug("Stdout closed or no data")
452                        break
453
454                    if len(data) < frame_size:
455                        # Incomplete frame, try to read remaining bytes
456                        remaining = frame_size - len(data)
457                        additional = await self._binary_process.read(remaining)
458                        if additional:
459                            data += additional
460
461                    # Add to queue
462                    self.frame_queue.append(data)
463                    self.frame_available.set()
464
465                except asyncio.CancelledError:
466                    break
467                except Exception as e:
468                    self.logger.debug("Error reading from stdout: %s", e)
469                    await asyncio.sleep(0.1)
470
471        except Exception as e:
472            self.logger.error("Fatal error in stdout reader: %s", e)
473        finally:
474            self.logger.info("Stdout reader task ended")
475
476    async def _read_stderr(self) -> None:
477        """Log errors from binary stderr."""
478        if not self._binary_process:
479            return
480        async for line in self._binary_process.iter_stderr():
481            self.logger.debug("[%s stderr] %s", self.name, line)
482
483    async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]:
484        """Return the custom audio stream for this source (like original ariacast_receiver)."""
485        self.logger.debug("Audio stream requested by player %s", player_id)
486
487        # Pre-buffering phase for high-latency players
488        min_buffer_size = int(self.max_frames * 0.6)  # Wait for 60% full buffer
489        self.logger.info("Pre-buffering: waiting for %d frames...", min_buffer_size)
490
491        buffer_start = time.time()
492        while len(self.frame_queue) < min_buffer_size and not self._stop_called:
493            if time.time() - buffer_start > 5:  # Timeout after 5 seconds
494                self.logger.warning(
495                    "Pre-buffering timeout, starting with %d frames", len(self.frame_queue)
496                )
497                break
498            await asyncio.sleep(0.05)
499
500        self.logger.info("Starting playback with %d frames buffered", len(self.frame_queue))
501
502        # Stream audio frames from the queue until playback stops
503        try:
504            while not self._stop_called:
505                # Stop if player was released (pause) or changed
506                if self._source_details.in_use_by != player_id:
507                    self.logger.debug("Player released or changed, stopping stream")
508                    break
509
510                if self.frame_queue:
511                    try:
512                        frame = self.frame_queue.popleft()
513                        yield frame
514                    except IndexError:
515                        # Queue became empty between the check and the pop
516                        continue
517                else:
518                    # No data available, wait for new frames or stop
519                    with suppress(asyncio.TimeoutError):
520                        await asyncio.wait_for(self.frame_available.wait(), timeout=1.0)
521                        # Only clear the event if the queue is still empty
522                        if not self.frame_queue:
523                            self.frame_available.clear()
524        finally:
525            self.logger.debug("Audio stream ended for player %s", player_id)
526            self.frame_queue.clear()
527
528    # --- Helpers ---
529
530    def _get_target_player_id(self) -> str | None:
531        """Find the best player to use."""
532        if self._active_player_id:
533            if self.mass.players.get_player(self._active_player_id):
534                return self._active_player_id
535            self._active_player_id = None
536
537        if self._default_player_id == PLAYER_ID_AUTO:
538            for player in self.mass.players.all_players(False, False):
539                if player.state.playback_state == PlaybackState.PLAYING:
540                    return player.player_id
541            players = list(self.mass.players.all_players(False, False))
542            return players[0].player_id if players else None
543
544        return str(self._default_player_id)
545
546    async def _on_source_selected(self) -> None:
547        """Handle manual selection in UI."""
548        new_player_id = self._source_details.in_use_by
549        if not new_player_id:
550            return
551
552        # Check if manual player switching is allowed
553        if not self._allow_player_switch:
554            current_target = self._get_target_player_id()
555            if new_player_id != current_target:
556                self.logger.debug(
557                    "Manual player switching disabled, ignoring selection on %s",
558                    new_player_id,
559                )
560                # Revert in_use_by
561                self._source_details.in_use_by = current_target
562                self.mass.players.trigger_player_update(new_player_id)
563                return
564
565        self._active_player_id = new_player_id
566