/
/
/
1"""AriaCast Receiver Plugin Provider."""
2
3from __future__ import annotations
4
5import asyncio
6import hashlib
7import time
8from collections import deque
9from collections.abc import AsyncGenerator
10from contextlib import suppress
11from typing import TYPE_CHECKING, Any
12
13import aiohttp
14from aiohttp import ClientTimeout
15from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
16from music_assistant_models.enums import (
17 ConfigEntryType,
18 ContentType,
19 ImageType,
20 PlaybackState,
21 ProviderFeature,
22 StreamType,
23)
24from music_assistant_models.media_items import AudioFormat, MediaItemImage
25from music_assistant_models.streamdetails import StreamMetadata
26
27from music_assistant.constants import CONF_ENTRY_WARN_PREVIEW
28from music_assistant.helpers.process import AsyncProcess
29from music_assistant.models.plugin import PluginProvider, PluginSource
30
31from .helpers import _get_binary_path
32
33if TYPE_CHECKING:
34 from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
35 from music_assistant_models.provider import ProviderManifest
36
37 from music_assistant.mass import MusicAssistant
38 from music_assistant.models import ProviderInstanceType
39
40CONF_MASS_PLAYER_ID = "mass_player_id"
41CONF_ALLOW_PLAYER_SWITCH = "allow_player_switch"
42
43
44PLAYER_ID_AUTO = "__auto__"
45SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE}
46
47
48async def setup(
49 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
50) -> ProviderInstanceType:
51 """Initialize provider(instance) with given configuration."""
52 return AriaCastBridge(mass, manifest, config)
53
54
55async def get_config_entries(
56 mass: MusicAssistant,
57 instance_id: str | None = None, # noqa: ARG001
58 action: str | None = None, # noqa: ARG001
59 values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
60) -> tuple[ConfigEntry, ...]:
61 """Return Config entries to setup this provider."""
62 return (
63 CONF_ENTRY_WARN_PREVIEW,
64 ConfigEntry(
65 key=CONF_MASS_PLAYER_ID,
66 type=ConfigEntryType.STRING,
67 label="Connected Music Assistant Player",
68 description="The player to use for playback.",
69 default_value=PLAYER_ID_AUTO,
70 options=[
71 ConfigValueOption("Auto (prefer playing player)", PLAYER_ID_AUTO),
72 *(
73 ConfigValueOption(x.display_name, x.player_id)
74 for x in sorted(
75 mass.players.all_players(False, False), key=lambda p: p.display_name.lower()
76 )
77 ),
78 ],
79 required=True,
80 ),
81 ConfigEntry(
82 key=CONF_ALLOW_PLAYER_SWITCH,
83 type=ConfigEntryType.BOOLEAN,
84 label="Allow manual player switching",
85 default_value=True,
86 ),
87 )
88
89
90class AriaCastBridge(PluginProvider):
91 """Bridge for the AriaCast Go Binary."""
92
93 def __init__(
94 self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
95 ) -> None:
96 """Initialize AriaCast Receiver."""
97 super().__init__(mass, manifest, config, SUPPORTED_FEATURES)
98 self._default_player_id = str(config.get_value(CONF_MASS_PLAYER_ID))
99 self._allow_player_switch = bool(config.get_value(CONF_ALLOW_PLAYER_SWITCH))
100
101 # Process
102 self._binary_process: AsyncProcess | None = None
103
104 # Internal State
105 self._active_player_id: str | None = None
106 self._metadata_task: asyncio.Task[None] | None = None
107 self._stdout_reader_task: asyncio.Task[None] | None = None
108 self._stop_called = False
109 self._binary_is_playing: bool = False # Track binary playback state
110 self._current_track_title: str | None = None # Track song changes
111
112 # Audio buffer - larger for high-latency players like Sendspin
113 self.max_frames = 75 # 1.5 second buffer (75 frames * 20ms each)
114 self.frame_queue: deque[bytes] = deque(maxlen=self.max_frames)
115 self.frame_available = asyncio.Event()
116 self._buffering = True # Start in buffering mode
117
118 # Artwork storage
119 self._artwork_bytes: bytes | None = None
120 self._artwork_timestamp: int = 0
121
122 # Define the Source
123 self._source_details = PluginSource(
124 id=self.instance_id,
125 name=self.name,
126 passive=not self._allow_player_switch,
127 can_play_pause=True, # Binary stops stdout writes when paused
128 can_seek=False,
129 can_next_previous=True,
130 audio_format=AudioFormat(
131 content_type=ContentType.PCM_S16LE,
132 sample_rate=48000,
133 bit_depth=16,
134 channels=2,
135 ),
136 metadata=StreamMetadata(title="AriaCast Ready"),
137 stream_type=StreamType.CUSTOM,
138 )
139
140 # Bind Hooks
141 self._source_details.on_select = self._on_source_selected
142 self._source_details.on_play = self._cmd_play
143 self._source_details.on_pause = self._cmd_pause
144 self._source_details.on_next = self._cmd_next
145 self._source_details.on_previous = self._cmd_previous
146
147 async def handle_async_init(self) -> None:
148 """Start the provider."""
149 # Launch Binary with stdout and stderr mode
150 binary_path = await asyncio.to_thread(_get_binary_path)
151 args = [binary_path, "--stdout"]
152
153 self.logger.info("Starting AriaCast binary: %s", binary_path)
154 self._binary_process = AsyncProcess(args, name="ariacast", stdout=True, stderr=True)
155 await self._binary_process.start()
156
157 # Start Metadata Monitor
158 self._metadata_task = self.mass.create_task(self._monitor_metadata())
159
160 # Start Stdout Reader (feeds the frame queue)
161 self._stdout_reader_task = self.mass.create_task(self._read_stdout_to_queue())
162
163 # Start Stderr Reader (logging)
164 self.mass.create_task(self._read_stderr())
165
166 async def unload(self, is_removed: bool = False) -> None:
167 """Cleanup resources."""
168 self._stop_called = True
169
170 if self._metadata_task:
171 self._metadata_task.cancel()
172 with suppress(asyncio.CancelledError):
173 await self._metadata_task
174
175 if self._stdout_reader_task:
176 self._stdout_reader_task.cancel()
177 with suppress(asyncio.CancelledError):
178 await self._stdout_reader_task
179
180 if self._binary_process:
181 self.logger.info("Stopping AriaCast binary...")
182 await self._binary_process.close()
183
184 def get_source(self) -> PluginSource:
185 """Return the plugin source details."""
186 return self._source_details
187
188 async def _monitor_metadata(self) -> None:
189 """Connect to local Go binary WebSocket to receive metadata updates."""
190 url = "ws://127.0.0.1:12889/metadata"
191 retry_delay = 1
192
193 while not self._stop_called:
194 try:
195 async with self.mass.http_session.ws_connect(url, heartbeat=30) as ws:
196 self.logger.info("Connected to AriaCast metadata stream")
197 retry_delay = 1 # Reset delay on success
198 async for msg in ws:
199 if msg.type == aiohttp.WSMsgType.TEXT:
200 payload = msg.json()
201 if payload.get("type") == "metadata":
202 self._update_metadata(payload.get("data", {}))
203 elif msg.type == aiohttp.WSMsgType.ERROR:
204 break
205 except Exception as exc:
206 if not self._stop_called:
207 self.logger.debug(
208 "WebSocket connection to AriaCast metadata failed: %s. Retrying in %d s...",
209 exc,
210 retry_delay,
211 )
212 await asyncio.sleep(retry_delay)
213 retry_delay = min(retry_delay * 2, 60)
214
215 def _update_metadata(self, data: dict[str, Any]) -> None:
216 """Update Music Assistant metadata from Go binary data."""
217 if not self._source_details.metadata:
218 self._source_details.metadata = StreamMetadata(title="AriaCast Ready")
219
220 meta = self._source_details.metadata
221
222 # Detect song change and clear queue to prevent stale audio
223 new_title = data.get("title", "Unknown")
224 self._handle_track_change(new_title)
225
226 meta.title = new_title
227 meta.artist = data.get("artist", "Unknown")
228 meta.album = data.get("album", "Unknown")
229
230 # Handle artwork
231 self._handle_artwork_update(data.get("artwork_url"), meta)
232
233 # Duration & Progress
234 if duration_ms := data.get("duration_ms"):
235 meta.duration = int(duration_ms / 1000)
236
237 if position_ms := data.get("position_ms"):
238 meta.elapsed_time = int(position_ms / 1000)
239 meta.elapsed_time_last_updated = time.time()
240
241 # Handle playback state
242 self._handle_playback_state_update(data.get("is_playing", False))
243
244 # Trigger UI Update
245 if self._source_details.in_use_by:
246 self.mass.players.trigger_player_update(self._source_details.in_use_by)
247
248 def _handle_track_change(self, new_title: str) -> None:
249 """Handle track change detection and queue clearing."""
250 if self._current_track_title and new_title != self._current_track_title:
251 if self._binary_is_playing: # Only clear on song change during playback
252 self.logger.info(
253 "Song changed from '%s' to '%s' - clearing audio queue",
254 self._current_track_title,
255 new_title,
256 )
257 self.frame_queue.clear()
258 self.frame_available.clear()
259 self._current_track_title = new_title
260
261 def _handle_artwork_update(self, artwork_url: str | None, meta: StreamMetadata) -> None:
262 """Handle artwork detection and download."""
263 if not artwork_url:
264 return
265
266 # The binary often sends a static local URL (like http://127.0.0.1/image/artwork).
267 # We combine it with the track title to detect actual song/artwork changes.
268 current_identifier = f"{artwork_url}_{meta.title}_{meta.artist}"
269
270 last_artwork_identifier = getattr(self, "_last_artwork_identifier", None)
271 if current_identifier != last_artwork_identifier:
272 # New artwork detected
273 self.logger.debug(
274 "New artwork detected for track: %s (was: %s)", meta.title, last_artwork_identifier
275 )
276 self._last_artwork_identifier = current_identifier
277 # Clear old artwork data to prevent serving stale image
278 self._artwork_bytes = None
279 if meta:
280 meta.image_url = None
281 self.mass.create_task(self._download_artwork())
282
283 def _handle_playback_state_update(self, is_playing: bool) -> None:
284 """Handle binary playback state and player management."""
285 was_playing = self._binary_is_playing
286 self.logger.debug(
287 "Metadata update: is_playing=%s, was_playing=%s, active=%s, in_use=%s",
288 is_playing,
289 was_playing,
290 self._active_player_id,
291 self._source_details.in_use_by,
292 )
293
294 # Track binary state
295 self._binary_is_playing = is_playing
296
297 if is_playing and not self._source_details.in_use_by:
298 # Binary is playing but no player is consuming the stream
299 if self._active_player_id:
300 # Resume after pause - reclaim the same player
301 self.logger.info(
302 "App resumed playback, reclaiming player %s", self._active_player_id
303 )
304 # Clear queue before resuming to remove old silence/data
305 self.frame_queue.clear()
306 self.frame_available.clear()
307 self._source_details.in_use_by = self._active_player_id
308 self.mass.players.trigger_player_update(self._active_player_id)
309 self.mass.create_task(
310 self.mass.players.select_source(self._active_player_id, self.instance_id)
311 )
312 else:
313 # First time playing - auto-select a player
314 self._handle_auto_play()
315 elif not is_playing and was_playing and self._source_details.in_use_by:
316 # App paused playback - release the player
317 self.logger.info("App paused playback, releasing player")
318 self._active_player_id = self._source_details.in_use_by
319 self._source_details.in_use_by = None
320 # Clear queue to prevent old silence from accumulating
321 self.frame_queue.clear()
322 self.frame_available.clear()
323 self.mass.players.trigger_player_update(self._active_player_id)
324
325 def _handle_auto_play(self) -> None:
326 """Automatically select a player when music starts."""
327 target_id = self._get_target_player_id()
328 if target_id:
329 self._active_player_id = target_id
330 self._source_details.in_use_by = target_id
331 self.mass.create_task(self.mass.players.select_source(target_id, self.instance_id))
332
333 # --- Command Wrappers ---
334
335 async def _cmd_play(self) -> None:
336 """Send play command."""
337 self.logger.info("PLAY command")
338
339 # If player was released on pause, reclaim it
340 if not self._source_details.in_use_by and self._active_player_id:
341 # Clear queue before resuming to remove old silence/data
342 self.frame_queue.clear()
343 self.frame_available.clear()
344 self._source_details.in_use_by = self._active_player_id
345 self.mass.players.trigger_player_update(self._active_player_id)
346 # Restart playback on the player
347 await self.mass.players.select_source(self._active_player_id, self.instance_id)
348
349 await self._send_api_command("play")
350
351 async def _cmd_pause(self) -> None:
352 """Send pause command."""
353 self.logger.info("PAUSE command")
354
355 # Release the player (like Spotify Connect does) - this makes MA show it as idle
356 # Keep track of active_player_id so we can reclaim it on resume
357 if self._source_details.in_use_by:
358 self._active_player_id = self._source_details.in_use_by
359 self._source_details.in_use_by = None
360 self.mass.players.trigger_player_update(self._active_player_id)
361
362 # Clear the frame queue to prevent old silence from being played on resume
363 self.frame_queue.clear()
364 self.frame_available.clear()
365
366 await self._send_api_command("pause")
367
368 async def _cmd_next(self) -> None:
369 """Send next-track command."""
370 await self._send_api_command("next")
371
372 async def _cmd_previous(self) -> None:
373 """Send previous-track command."""
374 await self._send_api_command("previous")
375
376 async def _send_api_command(self, action: str) -> None:
377 """Send control command (POST) using shared session."""
378 url = "http://127.0.0.1:12889/api/command"
379 try:
380 async with self.mass.http_session.post(url, json={"action": action}) as response:
381 body = await response.text()
382 if not 200 <= response.status < 300:
383 self.logger.warning(
384 "Command '%s' failed with HTTP %s: %s",
385 action,
386 response.status,
387 body,
388 )
389 except Exception as e:
390 self.logger.warning("Failed to send command '%s': %s", action, e)
391
392 async def _download_artwork(self) -> None:
393 """Fetch artwork bytes from Go binary."""
394 # Add a small delay to ensure binary has rotated the image
395 await asyncio.sleep(0.2)
396 artwork_url = "http://127.0.0.1:12889/image/artwork"
397 self.logger.debug("Downloading artwork from %s", artwork_url)
398 try:
399 async with self.mass.http_session.get(
400 artwork_url, timeout=ClientTimeout(total=5)
401 ) as response:
402 if response.status == 200:
403 img_data = await response.read()
404 if img_data:
405 self._artwork_bytes = img_data
406 self._artwork_timestamp = int(time.time() * 1000)
407 self.logger.info(
408 "Artwork downloaded successfully, size: %d bytes", len(img_data)
409 )
410 # Use a content-derived hash to prevent unbounded cache growth
411 img_hash = hashlib.md5(img_data).hexdigest()[:8]
412 image = MediaItemImage(
413 type=ImageType.THUMB,
414 path=f"artwork_{img_hash}",
415 provider=self.instance_id,
416 remotely_accessible=False,
417 )
418
419 if self._source_details.metadata:
420 self._source_details.metadata.image_url = (
421 self.mass.metadata.get_image_url(image)
422 )
423
424 if self._source_details.in_use_by:
425 self.mass.players.trigger_player_update(self._source_details.in_use_by)
426 else:
427 self.logger.warning("Failed to download artwork: HTTP %s", response.status)
428 except Exception as e:
429 self.logger.debug("Failed to download artwork: %s", e)
430
431 async def resolve_image(self, path: str) -> bytes:
432 """Return raw image bytes to Music Assistant."""
433 if path.startswith("artwork") and self._artwork_bytes:
434 return self._artwork_bytes
435 return b""
436
437 async def _read_stdout_to_queue(self) -> None:
438 """Background task to read from binary stdout and populate frame queue."""
439 frame_size = 3840 # 20ms of 48kHz stereo 16-bit
440
441 if not self._binary_process:
442 self.logger.error("Cannot read stdout: binary process not started")
443 return
444
445 self.logger.info("Starting to read audio from binary stdout")
446
447 try:
448 # Read from stdout in chunks
449 while not self._stop_called:
450 try:
451 # Read exactly one frame from stdout
452 data = await self._binary_process.read(frame_size)
453
454 if not data:
455 # Process ended or no more data
456 self.logger.debug("Stdout closed or no data")
457 break
458
459 if len(data) < frame_size:
460 # Incomplete frame, try to read remaining bytes
461 remaining = frame_size - len(data)
462 additional = await self._binary_process.read(remaining)
463 if additional:
464 data += additional
465
466 # Add to queue
467 self.frame_queue.append(data)
468 self.frame_available.set()
469
470 except asyncio.CancelledError:
471 break
472 except Exception as e:
473 self.logger.debug("Error reading from stdout: %s", e)
474 await asyncio.sleep(0.1)
475
476 except Exception as e:
477 self.logger.error("Fatal error in stdout reader: %s", e)
478 finally:
479 self.logger.info("Stdout reader task ended")
480
481 async def _read_stderr(self) -> None:
482 """Log errors from binary stderr."""
483 if not self._binary_process:
484 return
485 async for line in self._binary_process.iter_stderr():
486 self.logger.debug("[%s stderr] %s", self.name, line)
487
488 async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]:
489 """Return the custom audio stream for this source (like original ariacast_receiver)."""
490 self.logger.debug("Audio stream requested by player %s", player_id)
491
492 # Pre-buffering phase for high-latency players
493 min_buffer_size = int(self.max_frames * 0.6) # Wait for 60% full buffer
494 self.logger.info("Pre-buffering: waiting for %d frames...", min_buffer_size)
495
496 buffer_start = time.time()
497 while len(self.frame_queue) < min_buffer_size and not self._stop_called:
498 if time.time() - buffer_start > 5: # Timeout after 5 seconds
499 self.logger.warning(
500 "Pre-buffering timeout, starting with %d frames", len(self.frame_queue)
501 )
502 break
503 await asyncio.sleep(0.05)
504
505 self.logger.info("Starting playback with %d frames buffered", len(self.frame_queue))
506
507 # Stream audio frames from the queue until playback stops
508 try:
509 while not self._stop_called:
510 # Stop if player was released (pause) or changed
511 if self._source_details.in_use_by != player_id:
512 self.logger.debug("Player released or changed, stopping stream")
513 break
514
515 if self.frame_queue:
516 try:
517 frame = self.frame_queue.popleft()
518 yield frame
519 except IndexError:
520 # Queue became empty between the check and the pop
521 continue
522 else:
523 # No data available, wait for new frames or stop
524 with suppress(asyncio.TimeoutError):
525 await asyncio.wait_for(self.frame_available.wait(), timeout=1.0)
526 # Only clear the event if the queue is still empty
527 if not self.frame_queue:
528 self.frame_available.clear()
529 finally:
530 self.logger.debug("Audio stream ended for player %s", player_id)
531 self.frame_queue.clear()
532
533 # --- Helpers ---
534
535 def _get_target_player_id(self) -> str | None:
536 """Find the best player to use."""
537 if self._active_player_id:
538 if self.mass.players.get_player(self._active_player_id):
539 return self._active_player_id
540 self._active_player_id = None
541
542 if self._default_player_id == PLAYER_ID_AUTO:
543 for player in self.mass.players.all_players(False, False):
544 if player.state.playback_state == PlaybackState.PLAYING:
545 return player.player_id
546 players = list(self.mass.players.all_players(False, False))
547 return players[0].player_id if players else None
548
549 return str(self._default_player_id)
550
551 async def _on_source_selected(self) -> None:
552 """Handle manual selection in UI."""
553 new_player_id = self._source_details.in_use_by
554 if not new_player_id:
555 return
556
557 # Check if manual player switching is allowed
558 if not self._allow_player_switch:
559 current_target = self._get_target_player_id()
560 if new_player_id != current_target:
561 self.logger.debug(
562 "Manual player switching disabled, ignoring selection on %s",
563 new_player_id,
564 )
565 # Revert in_use_by
566 self._source_details.in_use_by = current_target
567 self.mass.players.trigger_player_update(new_player_id)
568 return
569
570 self._active_player_id = new_player_id
571