/
/
/
1"""AriaCast Receiver Plugin Provider."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections import deque
8from collections.abc import AsyncGenerator
9from contextlib import suppress
10from typing import TYPE_CHECKING, Any
11
12import aiohttp
13from aiohttp import ClientTimeout
14from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
15from music_assistant_models.enums import (
16 ConfigEntryType,
17 ContentType,
18 ImageType,
19 PlaybackState,
20 ProviderFeature,
21 StreamType,
22)
23from music_assistant_models.media_items import AudioFormat, MediaItemImage
24from music_assistant_models.streamdetails import StreamMetadata
25
26from music_assistant.constants import CONF_ENTRY_WARN_PREVIEW
27from music_assistant.helpers.process import AsyncProcess
28from music_assistant.models.plugin import PluginProvider, PluginSource
29
30from .helpers import _get_binary_path
31
32if TYPE_CHECKING:
33 from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
34 from music_assistant_models.provider import ProviderManifest
35
36 from music_assistant.mass import MusicAssistant
37 from music_assistant.models import ProviderInstanceType
38
39CONF_MASS_PLAYER_ID = "mass_player_id"
40CONF_ALLOW_PLAYER_SWITCH = "allow_player_switch"
41
42
43PLAYER_ID_AUTO = "__auto__"
44SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE}
45
46
47async def setup(
48 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
49) -> ProviderInstanceType:
50 """Initialize provider(instance) with given configuration."""
51 return AriaCastBridge(mass, manifest, config)
52
53
54async def get_config_entries(
55 mass: MusicAssistant,
56 instance_id: str | None = None, # noqa: ARG001
57 action: str | None = None, # noqa: ARG001
58 values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
59) -> tuple[ConfigEntry, ...]:
60 """Return Config entries to setup this provider."""
61 return (
62 CONF_ENTRY_WARN_PREVIEW,
63 ConfigEntry(
64 key=CONF_MASS_PLAYER_ID,
65 type=ConfigEntryType.STRING,
66 label="Connected Music Assistant Player",
67 description="The player to use for playback.",
68 default_value=PLAYER_ID_AUTO,
69 options=[
70 ConfigValueOption("Auto (prefer playing player)", PLAYER_ID_AUTO),
71 *(
72 ConfigValueOption(x.display_name, x.player_id)
73 for x in sorted(
74 mass.players.all_players(False, False), key=lambda p: p.display_name.lower()
75 )
76 ),
77 ],
78 required=True,
79 ),
80 ConfigEntry(
81 key=CONF_ALLOW_PLAYER_SWITCH,
82 type=ConfigEntryType.BOOLEAN,
83 label="Allow manual player switching",
84 default_value=True,
85 ),
86 )
87
88
89class AriaCastBridge(PluginProvider):
90 """Bridge for the AriaCast Go Binary."""
91
92 def __init__(
93 self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
94 ) -> None:
95 """Initialize AriaCast Receiver."""
96 super().__init__(mass, manifest, config, SUPPORTED_FEATURES)
97 self._default_player_id = str(config.get_value(CONF_MASS_PLAYER_ID))
98 self._allow_player_switch = bool(config.get_value(CONF_ALLOW_PLAYER_SWITCH))
99
100 # Process
101 self._binary_process: AsyncProcess | None = None
102
103 # Internal State
104 self._active_player_id: str | None = None
105 self._metadata_task: asyncio.Task[None] | None = None
106 self._stdout_reader_task: asyncio.Task[None] | None = None
107 self._stop_called = False
108 self._binary_is_playing: bool = False # Track binary playback state
109 self._current_track_title: str | None = None # Track song changes
110
111 # Audio buffer - larger for high-latency players like Sendspin
112 self.max_frames = 75 # 1.5 second buffer (75 frames * 20ms each)
113 self.frame_queue: deque[bytes] = deque(maxlen=self.max_frames)
114 self.frame_available = asyncio.Event()
115 self._buffering = True # Start in buffering mode
116
117 # Artwork storage
118 self._artwork_bytes: bytes | None = None
119 self._artwork_timestamp: int = 0
120
121 # Define the Source
122 self._source_details = PluginSource(
123 id=self.instance_id,
124 name=self.name,
125 passive=not self._allow_player_switch,
126 can_play_pause=True, # Binary stops stdout writes when paused
127 can_seek=False,
128 can_next_previous=True,
129 audio_format=AudioFormat(
130 content_type=ContentType.PCM_S16LE,
131 sample_rate=48000,
132 bit_depth=16,
133 channels=2,
134 ),
135 metadata=StreamMetadata(title="AriaCast Ready"),
136 stream_type=StreamType.CUSTOM,
137 )
138
139 # Bind Hooks
140 self._source_details.on_select = self._on_source_selected
141 self._source_details.on_play = self._cmd_play
142 self._source_details.on_pause = self._cmd_pause
143 self._source_details.on_next = self._cmd_next
144 self._source_details.on_previous = self._cmd_previous
145
146 async def handle_async_init(self) -> None:
147 """Start the provider."""
148 # Launch Binary with stdout and stderr mode
149 binary_path = await asyncio.to_thread(_get_binary_path)
150 args = [binary_path, "--stdout"]
151
152 self.logger.info("Starting AriaCast binary: %s", binary_path)
153 self._binary_process = AsyncProcess(args, name="ariacast", stdout=True, stderr=True)
154 await self._binary_process.start()
155
156 # Start Metadata Monitor
157 self._metadata_task = self.mass.create_task(self._monitor_metadata())
158
159 # Start Stdout Reader (feeds the frame queue)
160 self._stdout_reader_task = self.mass.create_task(self._read_stdout_to_queue())
161
162 # Start Stderr Reader (logging)
163 self.mass.create_task(self._read_stderr())
164
165 async def unload(self, is_removed: bool = False) -> None:
166 """Cleanup resources."""
167 self._stop_called = True
168
169 if self._metadata_task:
170 self._metadata_task.cancel()
171 with suppress(asyncio.CancelledError):
172 await self._metadata_task
173
174 if self._stdout_reader_task:
175 self._stdout_reader_task.cancel()
176 with suppress(asyncio.CancelledError):
177 await self._stdout_reader_task
178
179 if self._binary_process:
180 self.logger.info("Stopping AriaCast binary...")
181 await self._binary_process.close()
182
183 def get_source(self) -> PluginSource:
184 """Return the plugin source details."""
185 return self._source_details
186
187 async def _monitor_metadata(self) -> None:
188 """Connect to local Go binary WebSocket to receive metadata updates."""
189 url = "ws://127.0.0.1:12889/metadata"
190 retry_delay = 1
191
192 while not self._stop_called:
193 try:
194 async with self.mass.http_session.ws_connect(url, heartbeat=30) as ws:
195 self.logger.info("Connected to AriaCast metadata stream")
196 retry_delay = 1 # Reset delay on success
197 async for msg in ws:
198 if msg.type == aiohttp.WSMsgType.TEXT:
199 payload = msg.json()
200 if payload.get("type") == "metadata":
201 self._update_metadata(payload.get("data", {}))
202 elif msg.type == aiohttp.WSMsgType.ERROR:
203 break
204 except Exception as exc:
205 if not self._stop_called:
206 self.logger.debug(
207 "WebSocket connection to AriaCast metadata failed: %s. Retrying in %d s...",
208 exc,
209 retry_delay,
210 )
211 await asyncio.sleep(retry_delay)
212 retry_delay = min(retry_delay * 2, 60)
213
214 def _update_metadata(self, data: dict[str, Any]) -> None:
215 """Update Music Assistant metadata from Go binary data."""
216 if not self._source_details.metadata:
217 self._source_details.metadata = StreamMetadata(title="AriaCast Ready")
218
219 meta = self._source_details.metadata
220
221 # Detect song change and clear queue to prevent stale audio
222 new_title = data.get("title", "Unknown")
223 self._handle_track_change(new_title)
224
225 meta.title = new_title
226 meta.artist = data.get("artist", "Unknown")
227 meta.album = data.get("album", "Unknown")
228
229 # Handle artwork
230 self._handle_artwork_update(data.get("artwork_url"), meta)
231
232 # Duration & Progress
233 if duration_ms := data.get("duration_ms"):
234 meta.duration = int(duration_ms / 1000)
235
236 if position_ms := data.get("position_ms"):
237 meta.elapsed_time = int(position_ms / 1000)
238 meta.elapsed_time_last_updated = time.time()
239
240 # Handle playback state
241 self._handle_playback_state_update(data.get("is_playing", False))
242
243 # Trigger UI Update
244 if self._source_details.in_use_by:
245 self.mass.players.trigger_player_update(self._source_details.in_use_by)
246
247 def _handle_track_change(self, new_title: str) -> None:
248 """Handle track change detection and queue clearing."""
249 if self._current_track_title and new_title != self._current_track_title:
250 if self._binary_is_playing: # Only clear on song change during playback
251 self.logger.info(
252 "Song changed from '%s' to '%s' - clearing audio queue",
253 self._current_track_title,
254 new_title,
255 )
256 self.frame_queue.clear()
257 self.frame_available.clear()
258 self._current_track_title = new_title
259
260 def _handle_artwork_update(self, artwork_url: str | None, meta: StreamMetadata) -> None:
261 """Handle artwork detection and download."""
262 if not artwork_url:
263 return
264
265 last_artwork_identifier = getattr(self, "_last_artwork_identifier", None)
266 if artwork_url != last_artwork_identifier:
267 # New artwork detected
268 self.logger.debug(
269 "New artwork detected: %s (was: %s)", artwork_url, last_artwork_identifier
270 )
271 self._last_artwork_identifier = artwork_url
272 # Clear old artwork data to prevent serving stale image
273 self._artwork_bytes = None
274 if meta:
275 meta.image_url = None
276 self.mass.create_task(self._download_artwork())
277
278 def _handle_playback_state_update(self, is_playing: bool) -> None:
279 """Handle binary playback state and player management."""
280 was_playing = self._binary_is_playing
281 self.logger.debug(
282 "Metadata update: is_playing=%s, was_playing=%s, active=%s, in_use=%s",
283 is_playing,
284 was_playing,
285 self._active_player_id,
286 self._source_details.in_use_by,
287 )
288
289 # Track binary state
290 self._binary_is_playing = is_playing
291
292 if is_playing and not self._source_details.in_use_by:
293 # Binary is playing but no player is consuming the stream
294 if self._active_player_id:
295 # Resume after pause - reclaim the same player
296 self.logger.info(
297 "App resumed playback, reclaiming player %s", self._active_player_id
298 )
299 # Clear queue before resuming to remove old silence/data
300 self.frame_queue.clear()
301 self.frame_available.clear()
302 self._source_details.in_use_by = self._active_player_id
303 self.mass.players.trigger_player_update(self._active_player_id)
304 self.mass.create_task(
305 self.mass.players.select_source(self._active_player_id, self.instance_id)
306 )
307 else:
308 # First time playing - auto-select a player
309 self._handle_auto_play()
310 elif not is_playing and was_playing and self._source_details.in_use_by:
311 # App paused playback - release the player
312 self.logger.info("App paused playback, releasing player")
313 self._active_player_id = self._source_details.in_use_by
314 self._source_details.in_use_by = None
315 # Clear queue to prevent old silence from accumulating
316 self.frame_queue.clear()
317 self.frame_available.clear()
318 self.mass.players.trigger_player_update(self._active_player_id)
319
320 def _handle_auto_play(self) -> None:
321 """Automatically select a player when music starts."""
322 target_id = self._get_target_player_id()
323 if target_id:
324 self._active_player_id = target_id
325 self._source_details.in_use_by = target_id
326 self.mass.create_task(self.mass.players.select_source(target_id, self.instance_id))
327
328 # --- Command Wrappers ---
329
330 async def _cmd_play(self) -> None:
331 """Send play command."""
332 self.logger.info("PLAY command")
333
334 # If player was released on pause, reclaim it
335 if not self._source_details.in_use_by and self._active_player_id:
336 # Clear queue before resuming to remove old silence/data
337 self.frame_queue.clear()
338 self.frame_available.clear()
339 self._source_details.in_use_by = self._active_player_id
340 self.mass.players.trigger_player_update(self._active_player_id)
341 # Restart playback on the player
342 await self.mass.players.select_source(self._active_player_id, self.instance_id)
343
344 await self._send_api_command("play")
345
346 async def _cmd_pause(self) -> None:
347 """Send pause command."""
348 self.logger.info("PAUSE command")
349
350 # Release the player (like Spotify Connect does) - this makes MA show it as idle
351 # Keep track of active_player_id so we can reclaim it on resume
352 if self._source_details.in_use_by:
353 self._active_player_id = self._source_details.in_use_by
354 self._source_details.in_use_by = None
355 self.mass.players.trigger_player_update(self._active_player_id)
356
357 # Clear the frame queue to prevent old silence from being played on resume
358 self.frame_queue.clear()
359 self.frame_available.clear()
360
361 await self._send_api_command("pause")
362
363 async def _cmd_next(self) -> None:
364 """Send next-track command."""
365 await self._send_api_command("next")
366
367 async def _cmd_previous(self) -> None:
368 """Send previous-track command."""
369 await self._send_api_command("previous")
370
371 async def _send_api_command(self, action: str) -> None:
372 """Send control command (POST) using shared session."""
373 url = "http://127.0.0.1:12889/api/command"
374 try:
375 async with self.mass.http_session.post(url, json={"action": action}) as response:
376 body = await response.text()
377 if not 200 <= response.status < 300:
378 self.logger.warning(
379 "Command '%s' failed with HTTP %s: %s",
380 action,
381 response.status,
382 body,
383 )
384 except Exception as e:
385 self.logger.warning("Failed to send command '%s': %s", action, e)
386
387 async def _download_artwork(self) -> None:
388 """Fetch artwork bytes from Go binary."""
389 # Add a small delay to ensure binary has rotated the image
390 await asyncio.sleep(0.2)
391 artwork_url = "http://127.0.0.1:12889/image/artwork"
392 self.logger.debug("Downloading artwork from %s", artwork_url)
393 try:
394 async with self.mass.http_session.get(
395 artwork_url, timeout=ClientTimeout(total=5)
396 ) as response:
397 if response.status == 200:
398 img_data = await response.read()
399 if img_data:
400 self._artwork_bytes = img_data
401 self._artwork_timestamp = int(time.time() * 1000)
402 self.logger.info(
403 "Artwork downloaded successfully, size: %d bytes", len(img_data)
404 )
405
406 image = MediaItemImage(
407 type=ImageType.THUMB,
408 path="artwork",
409 provider=self.instance_id,
410 remotely_accessible=False,
411 )
412 base_url = self.mass.metadata.get_image_url(image)
413
414 if self._source_details.metadata:
415 self._source_details.metadata.image_url = (
416 f"{base_url}&t={self._artwork_timestamp}"
417 )
418
419 if self._source_details.in_use_by:
420 self.mass.players.trigger_player_update(self._source_details.in_use_by)
421 else:
422 self.logger.warning("Failed to download artwork: HTTP %s", response.status)
423 except Exception as e:
424 self.logger.debug("Failed to download artwork: %s", e)
425
426 async def resolve_image(self, path: str) -> bytes:
427 """Return raw image bytes to Music Assistant."""
428 if path == "artwork" and self._artwork_bytes:
429 return self._artwork_bytes
430 return b""
431
432 async def _read_stdout_to_queue(self) -> None:
433 """Background task to read from binary stdout and populate frame queue."""
434 frame_size = 3840 # 20ms of 48kHz stereo 16-bit
435
436 if not self._binary_process:
437 self.logger.error("Cannot read stdout: binary process not started")
438 return
439
440 self.logger.info("Starting to read audio from binary stdout")
441
442 try:
443 # Read from stdout in chunks
444 while not self._stop_called:
445 try:
446 # Read exactly one frame from stdout
447 data = await self._binary_process.read(frame_size)
448
449 if not data:
450 # Process ended or no more data
451 self.logger.debug("Stdout closed or no data")
452 break
453
454 if len(data) < frame_size:
455 # Incomplete frame, try to read remaining bytes
456 remaining = frame_size - len(data)
457 additional = await self._binary_process.read(remaining)
458 if additional:
459 data += additional
460
461 # Add to queue
462 self.frame_queue.append(data)
463 self.frame_available.set()
464
465 except asyncio.CancelledError:
466 break
467 except Exception as e:
468 self.logger.debug("Error reading from stdout: %s", e)
469 await asyncio.sleep(0.1)
470
471 except Exception as e:
472 self.logger.error("Fatal error in stdout reader: %s", e)
473 finally:
474 self.logger.info("Stdout reader task ended")
475
476 async def _read_stderr(self) -> None:
477 """Log errors from binary stderr."""
478 if not self._binary_process:
479 return
480 async for line in self._binary_process.iter_stderr():
481 self.logger.debug("[%s stderr] %s", self.name, line)
482
483 async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]:
484 """Return the custom audio stream for this source (like original ariacast_receiver)."""
485 self.logger.debug("Audio stream requested by player %s", player_id)
486
487 # Pre-buffering phase for high-latency players
488 min_buffer_size = int(self.max_frames * 0.6) # Wait for 60% full buffer
489 self.logger.info("Pre-buffering: waiting for %d frames...", min_buffer_size)
490
491 buffer_start = time.time()
492 while len(self.frame_queue) < min_buffer_size and not self._stop_called:
493 if time.time() - buffer_start > 5: # Timeout after 5 seconds
494 self.logger.warning(
495 "Pre-buffering timeout, starting with %d frames", len(self.frame_queue)
496 )
497 break
498 await asyncio.sleep(0.05)
499
500 self.logger.info("Starting playback with %d frames buffered", len(self.frame_queue))
501
502 # Stream audio frames from the queue until playback stops
503 try:
504 while not self._stop_called:
505 # Stop if player was released (pause) or changed
506 if self._source_details.in_use_by != player_id:
507 self.logger.debug("Player released or changed, stopping stream")
508 break
509
510 if self.frame_queue:
511 try:
512 frame = self.frame_queue.popleft()
513 yield frame
514 except IndexError:
515 # Queue became empty between the check and the pop
516 continue
517 else:
518 # No data available, wait for new frames or stop
519 with suppress(asyncio.TimeoutError):
520 await asyncio.wait_for(self.frame_available.wait(), timeout=1.0)
521 # Only clear the event if the queue is still empty
522 if not self.frame_queue:
523 self.frame_available.clear()
524 finally:
525 self.logger.debug("Audio stream ended for player %s", player_id)
526 self.frame_queue.clear()
527
528 # --- Helpers ---
529
530 def _get_target_player_id(self) -> str | None:
531 """Find the best player to use."""
532 if self._active_player_id:
533 if self.mass.players.get_player(self._active_player_id):
534 return self._active_player_id
535 self._active_player_id = None
536
537 if self._default_player_id == PLAYER_ID_AUTO:
538 for player in self.mass.players.all_players(False, False):
539 if player.state.playback_state == PlaybackState.PLAYING:
540 return player.player_id
541 players = list(self.mass.players.all_players(False, False))
542 return players[0].player_id if players else None
543
544 return str(self._default_player_id)
545
546 async def _on_source_selected(self) -> None:
547 """Handle manual selection in UI."""
548 new_player_id = self._source_details.in_use_by
549 if not new_player_id:
550 return
551
552 # Check if manual player switching is allowed
553 if not self._allow_player_switch:
554 current_target = self._get_target_player_id()
555 if new_player_id != current_target:
556 self.logger.debug(
557 "Manual player switching disabled, ignoring selection on %s",
558 new_player_id,
559 )
560 # Revert in_use_by
561 self._source_details.in_use_by = current_target
562 self.mass.players.trigger_player_update(new_player_id)
563 return
564
565 self._active_player_id = new_player_id
566