/
/
/
1"""Sendspin Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections.abc import AsyncGenerator, Callable
8from io import BytesIO
9from typing import TYPE_CHECKING, cast
10
11from aiosendspin.models import MediaCommand
12from aiosendspin.models.types import ArtworkSource, PlaybackStateType
13from aiosendspin.models.types import RepeatMode as SendspinRepeatMode
14from aiosendspin.server import AudioFormat as SendspinAudioFormat
15from aiosendspin.server import (
16 ClientEvent,
17 GroupCommandEvent,
18 GroupEvent,
19 GroupStateChangedEvent,
20 SendspinGroup,
21 VolumeChangedEvent,
22)
23from aiosendspin.server.client import DisconnectBehaviour
24from aiosendspin.server.events import ClientGroupChangedEvent
25from aiosendspin.server.group import (
26 GroupDeletedEvent,
27 GroupMemberAddedEvent,
28 GroupMemberRemovedEvent,
29)
30from aiosendspin.server.metadata import Metadata
31from aiosendspin.server.stream import AudioCodec, MediaStream
32from music_assistant_models.constants import PLAYER_CONTROL_NONE
33from music_assistant_models.enums import (
34 ContentType,
35 ImageType,
36 PlaybackState,
37 PlayerFeature,
38 PlayerType,
39 RepeatMode,
40)
41from music_assistant_models.media_items import AudioFormat
42from music_assistant_models.player import DeviceInfo
43from PIL import Image
44
45from music_assistant.constants import CONF_OUTPUT_CHANNELS, CONF_OUTPUT_CODEC, INTERNAL_PCM_FORMAT
46from music_assistant.helpers.audio import get_player_filter_params
47from music_assistant.models.player import Player, PlayerMedia
48
49from .timed_client_stream import TimedClientStream
50
51# Supported group commands for Sendspin players
52SUPPORTED_GROUP_COMMANDS = [
53 MediaCommand.PLAY,
54 MediaCommand.PAUSE,
55 MediaCommand.STOP,
56 MediaCommand.NEXT,
57 MediaCommand.PREVIOUS,
58 MediaCommand.REPEAT_OFF,
59 MediaCommand.REPEAT_ONE,
60 MediaCommand.REPEAT_ALL,
61 MediaCommand.SHUFFLE,
62 MediaCommand.UNSHUFFLE,
63]
64
65if TYPE_CHECKING:
66 from aiosendspin.server.client import SendspinClient
67 from music_assistant_models.player_queue import PlayerQueue
68 from music_assistant_models.queue_item import QueueItem
69
70 from .provider import SendspinProvider
71
72
73class MusicAssistantMediaStream(MediaStream):
74 """MediaStream implementation for Music Assistant with per-player DSP support."""
75
76 player_instance: SendspinPlayer
77 internal_format: AudioFormat
78 output_format: AudioFormat
79
80 def __init__(
81 self,
82 *,
83 main_channel_source: AsyncGenerator[bytes, None],
84 main_channel_format: SendspinAudioFormat,
85 player_instance: SendspinPlayer,
86 internal_format: AudioFormat,
87 output_format: AudioFormat,
88 ) -> None:
89 """
90 Initialise the media stream with audio source and format for main_channel().
91
92 Args:
93 main_channel_source: Audio source generator for the main channel.
94 main_channel_format: Audio format for the main channel (includes codec).
95 player_instance: The SendspinPlayer instance for accessing mass and streams.
96 internal_format: Internal processing format (float32 for headroom).
97 output_format: Output PCM format (16-bit for player output).
98 """
99 super().__init__(
100 main_channel_source=main_channel_source,
101 main_channel_format=main_channel_format,
102 )
103 self.player_instance = player_instance
104 self.internal_format = internal_format
105 self.output_format = output_format
106
107 async def player_channel(
108 self,
109 player_id: str,
110 preferred_format: SendspinAudioFormat | None = None,
111 position_us: int = 0,
112 ) -> tuple[AsyncGenerator[bytes, None], SendspinAudioFormat, int] | None:
113 """
114 Get a player-specific audio stream with per-player DSP.
115
116 Args:
117 player_id: Identifier for the player requesting the stream.
118 preferred_format: The player's preferred native format for the stream.
119 The implementation may return a different format; the library
120 will handle any necessary conversion.
121 position_us: Position in microseconds relative to the main_stream start.
122 Used for late-joining players to sync with the main stream.
123
124 Returns:
125 A tuple of (audio generator, audio format, actual position in microseconds)
126 or None if unavailable. If None, the main_stream is used as fallback.
127 """
128 mass = self.player_instance.mass
129 multi_client_stream = self.player_instance.timed_client_stream
130 assert multi_client_stream is not None
131
132 dsp = mass.config.get_player_dsp_config(player_id)
133 output_channels = mass.config.get_raw_player_config_value(
134 player_id, CONF_OUTPUT_CHANNELS, "stereo"
135 )
136 if not dsp.enabled and output_channels == "stereo":
137 # DSP is disabled and output is stereo, use main_stream
138 return None
139
140 # Get per-player DSP filter parameters
141 filter_params = get_player_filter_params(
142 mass, player_id, self.internal_format, self.output_format
143 )
144
145 # Get the stream with position (in seconds)
146 stream_gen, actual_position = await multi_client_stream.get_stream(
147 output_format=self.output_format,
148 filter_params=filter_params,
149 )
150
151 # Convert position from seconds to microseconds for aiosendspin API
152 actual_position_us = int(actual_position * 1_000_000)
153
154 # Return actual position in microseconds relative to main_stream start
155 self.player_instance.logger.debug(
156 "Providing channel stream for player %s at position %d us",
157 player_id,
158 actual_position_us,
159 )
160 return (
161 stream_gen,
162 SendspinAudioFormat(
163 sample_rate=self.output_format.sample_rate,
164 bit_depth=self.output_format.bit_depth,
165 channels=self.output_format.channels,
166 codec=self._main_channel_format.codec,
167 ),
168 actual_position_us,
169 )
170
171
172class SendspinPlayer(Player):
173 """A sendspin audio player in Music Assistant."""
174
175 _attr_type = PlayerType.PROTOCOL
176
177 api: SendspinClient
178 unsub_event_cb: Callable[[], None]
179 unsub_group_event_cb: Callable[[], None]
180 last_sent_artwork_url: str | None = None
181 last_sent_artist_artwork_url: str | None = None
182 _playback_task: asyncio.Task[None] | None = None
183 timed_client_stream: TimedClientStream | None = None
184 is_web_player: bool = False
185
186 @property
187 def requires_flow_mode(self) -> bool:
188 """Return if the player requires flow mode."""
189 return True
190
191 def __init__(self, provider: SendspinProvider, player_id: str) -> None:
192 """Initialize the Player."""
193 super().__init__(provider, player_id)
194 sendspin_client = provider.server_api.get_client(player_id)
195 assert sendspin_client is not None
196 self.api = sendspin_client
197 self.api.disconnect_behaviour = DisconnectBehaviour.STOP
198 self.unsub_event_cb = sendspin_client.add_event_listener(self.event_cb)
199 self.unsub_group_event_cb = sendspin_client.group.add_event_listener(self.group_event_cb)
200 sendspin_client.group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
201
202 self.logger = self.provider.logger.getChild(player_id)
203 # init some static variables
204 self._attr_name = sendspin_client.name
205 self._attr_supported_features = {
206 PlayerFeature.PLAY_MEDIA,
207 PlayerFeature.SET_MEMBERS,
208 PlayerFeature.MULTI_DEVICE_DSP,
209 PlayerFeature.VOLUME_SET,
210 PlayerFeature.VOLUME_MUTE,
211 }
212 self._attr_can_group_with = {provider.instance_id}
213 self._attr_power_control = PLAYER_CONTROL_NONE
214 if device_info := sendspin_client.info.device_info:
215 self._attr_device_info = DeviceInfo(
216 model=device_info.product_name or "Unknown model",
217 manufacturer=device_info.manufacturer or "Unknown Manufacturer",
218 software_version=device_info.software_version,
219 )
220 else:
221 self._attr_device_info = DeviceInfo()
222 if player_client := sendspin_client.player:
223 self._attr_volume_level = player_client.volume
224 self._attr_volume_muted = player_client.muted
225 self._attr_available = True
226 self.is_web_player = sendspin_client.name.startswith(
227 "Web (" # The regular Web Interface
228 ) or sendspin_client.name.startswith(
229 "PWA (" # The PWA App
230 )
231 self._attr_expose_to_ha_by_default = not self.is_web_player
232 self._attr_hidden_by_default = self.is_web_player
233 self._attr_type = PlayerType.PLAYER if self.is_web_player else PlayerType.PROTOCOL
234
235 def event_cb(self, client: SendspinClient, event: ClientEvent) -> None:
236 """Event callback registered to the sendspin server."""
237 self.logger.debug("Received PlayerEvent: %s", event)
238 match event:
239 case VolumeChangedEvent(volume=volume, muted=muted):
240 self._attr_volume_level = volume
241 self._attr_volume_muted = muted
242 self.update_state()
243 case ClientGroupChangedEvent(new_group=new_group):
244 self.unsub_group_event_cb()
245 self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb)
246 # Sync playback state from the new group
247 match new_group.state:
248 case PlaybackStateType.PLAYING:
249 self._attr_playback_state = PlaybackState.PLAYING
250 case PlaybackStateType.PAUSED:
251 self._attr_playback_state = PlaybackState.PAUSED
252 case PlaybackStateType.STOPPED:
253 self._attr_playback_state = PlaybackState.IDLE
254 # Update in case this is a newly created group
255 new_group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
256 # GroupMemberAddedEvent or GroupMemberRemovedEvent will be fired before this
257 # so group members are already up to date at this point
258 if self.synced_to is None:
259 # We are the leader, stop on disconnect
260 self.api.disconnect_behaviour = DisconnectBehaviour.STOP
261 else:
262 self.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP
263 self.update_state()
264
265 async def _handle_group_command(self, command: MediaCommand) -> None:
266 """Handle a group command from aiosendspin."""
267 queue = self.mass.player_queues.get_active_queue(self.player_id)
268 match command:
269 case MediaCommand.PLAY:
270 await self.mass.players.cmd_play(self.player_id)
271 case MediaCommand.PAUSE:
272 await self.mass.players.cmd_pause(self.player_id)
273 case MediaCommand.STOP:
274 await self.mass.players.cmd_stop(self.player_id)
275 case MediaCommand.NEXT:
276 await self.mass.players.cmd_next_track(self.player_id)
277 case MediaCommand.PREVIOUS:
278 await self.mass.players.cmd_previous_track(self.player_id)
279 case MediaCommand.REPEAT_OFF if queue:
280 self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF)
281 case MediaCommand.REPEAT_ONE if queue:
282 self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE)
283 case MediaCommand.REPEAT_ALL if queue:
284 self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL)
285 case MediaCommand.SHUFFLE if queue:
286 await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=True)
287 case MediaCommand.UNSHUFFLE if queue:
288 await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=False)
289
290 def group_event_cb(self, group: SendspinGroup, event: GroupEvent) -> None:
291 """Event callback registered to the sendspin group this player belongs to."""
292 if self.synced_to is not None:
293 # Only handle group events as the leader, except for:
294 # - GroupMemberRemovedEvent: to handle being removed from a group
295 # - GroupStateChangedEvent: to update playback state when leader stops/disconnects
296 if not isinstance(event, (GroupMemberRemovedEvent, GroupStateChangedEvent)):
297 return
298 self.logger.debug("Received GroupEvent: %s", event)
299
300 match event:
301 case GroupCommandEvent(command=command):
302 self.logger.debug("Group command received: %s", command)
303 self.mass.create_task(self._handle_group_command(command))
304 case GroupStateChangedEvent(state=state):
305 self.logger.debug("Group state changed to: %s", state)
306 match state:
307 case PlaybackStateType.PLAYING:
308 self._attr_playback_state = PlaybackState.PLAYING
309 case PlaybackStateType.PAUSED:
310 self._attr_playback_state = PlaybackState.PAUSED
311 case PlaybackStateType.STOPPED:
312 self._attr_playback_state = PlaybackState.IDLE
313 self._attr_elapsed_time = 0
314 self._attr_elapsed_time_last_updated = time.time()
315 self.update_state()
316 case GroupMemberAddedEvent(client_id=client_id):
317 self.logger.debug("Group member added: %s", client_id)
318 if client_id not in self._attr_group_members:
319 self._attr_group_members.append(client_id)
320 self.update_state()
321 case GroupMemberRemovedEvent(client_id=client_id):
322 self.logger.debug("Group member removed: %s", client_id)
323 self.mass.create_task(self._handle_member_removed(group, client_id))
324 case GroupDeletedEvent():
325 pass
326
327 async def _handle_member_removed(self, group: SendspinGroup, client_id: str) -> None:
328 """Handle group member removed event asynchronously."""
329 if client_id == self.player_id:
330 if len(self._attr_group_members) > 0:
331 # We were just removed as a leader:
332 # 1. stop playback on the old group
333 await group.stop()
334 # 2. clear our members (since we are now alone)
335 group_members = [
336 member for member in self._attr_group_members if member != client_id
337 ]
338 self._attr_group_members = []
339 # 3. assign new leader if there are members left
340 if len(group_members) > 0 and (
341 new_leader := self.mass.players.get_player(group_members[0])
342 ):
343 new_leader = cast("SendspinPlayer", new_leader)
344 new_leader._attr_group_members = group_members[1:]
345 new_leader.api.disconnect_behaviour = DisconnectBehaviour.STOP
346 new_leader.update_state()
347 self.update_state()
348 elif client_id in self._attr_group_members:
349 # Someone else left our group
350 self._attr_group_members.remove(client_id)
351 self.update_state()
352
353 async def volume_set(self, volume_level: int) -> None:
354 """Handle VOLUME_SET command on the player."""
355 if player_client := self.api.player:
356 player_client.set_volume(volume_level)
357
358 async def volume_mute(self, muted: bool) -> None:
359 """Handle VOLUME MUTE command on the player."""
360 if player_client := self.api.player:
361 if muted:
362 player_client.mute()
363 else:
364 player_client.unmute()
365
366 async def stop(self) -> None:
367 """Stop command."""
368 self.logger.debug("Received STOP command on player %s", self.display_name)
369 # We don't care if we stopped the stream or it was already stopped
370 await self.api.group.stop()
371 # Clear the playback task reference (group.stop() handles stopping the stream)
372 self._playback_task = None
373 self._attr_current_media = None
374 self.update_state()
375
376 async def play_media(self, media: PlayerMedia) -> None:
377 """Play media command."""
378 self.logger.debug(
379 "Received PLAY_MEDIA command on player %s with uri %s", self.display_name, media.uri
380 )
381
382 # Update player state optimistically
383 self._attr_current_media = media
384 self._attr_elapsed_time = 0
385 self._attr_elapsed_time_last_updated = time.time()
386 # playback_state will be set by the group state change event
387
388 # Stop previous stream in case we were already playing something
389 await self.api.group.stop()
390 # Run playback in background task to immediately return
391 self._playback_task = asyncio.create_task(self._run_playback(media))
392 self.update_state()
393
394 async def _run_playback(self, media: PlayerMedia) -> None:
395 """Run the actual playback in a background task."""
396 try:
397 # Use 32-bit for the main channel: aiosendspin converts per player as needed
398 pcm_format = AudioFormat(
399 content_type=ContentType.PCM_S32LE,
400 sample_rate=48000,
401 bit_depth=32,
402 channels=2,
403 )
404 flow_pcm_format = AudioFormat(
405 content_type=INTERNAL_PCM_FORMAT.content_type,
406 sample_rate=pcm_format.sample_rate,
407 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
408 channels=pcm_format.channels,
409 )
410
411 output_codec = cast("str", self.config.get_value(CONF_OUTPUT_CODEC, "pcm"))
412
413 # Convert string codec to AudioCodec enum
414 audio_codec = AudioCodec(output_codec)
415
416 # Get clean audio source in flow format (high quality internal format)
417 # Format conversion and per-player DSP will be applied via player_channel
418 audio_source = self.mass.streams.get_stream(media, flow_pcm_format)
419
420 # Create TimedClientStream to wrap the clean audio source
421 # This distributes the audio to multiple subscribers without DSP
422 self.timed_client_stream = TimedClientStream(
423 audio_source=audio_source,
424 audio_format=flow_pcm_format,
425 )
426
427 # Setup the main channel subscription
428 main_channel_gen, main_position = await self.timed_client_stream.get_stream(
429 output_format=pcm_format,
430 filter_params=None, # TODO: this should probably still include the safety limiter
431 )
432 assert main_position == 0.0 # first subscriber, should be zero
433 media_stream = MusicAssistantMediaStream(
434 main_channel_source=main_channel_gen,
435 main_channel_format=SendspinAudioFormat(
436 sample_rate=pcm_format.sample_rate,
437 bit_depth=pcm_format.bit_depth,
438 channels=pcm_format.channels,
439 codec=audio_codec,
440 ),
441 player_instance=self,
442 internal_format=flow_pcm_format,
443 output_format=pcm_format,
444 )
445
446 stop_time = await self.api.group.play_media(media_stream)
447 await self.api.group.stop(stop_time)
448 except asyncio.CancelledError:
449 self.logger.debug("Playback cancelled for player %s", self.display_name)
450 raise
451 except Exception:
452 self.logger.exception("Error during playback for player %s", self.display_name)
453 raise
454 finally:
455 self.timed_client_stream = None
456
457 async def set_members(
458 self,
459 player_ids_to_add: list[str] | None = None,
460 player_ids_to_remove: list[str] | None = None,
461 ) -> None:
462 """Handle SET_MEMBERS command on the player."""
463 self.logger.debug(
464 "set_members called: adding %s, removing %s", player_ids_to_add, player_ids_to_remove
465 )
466 for player_id in player_ids_to_remove or []:
467 player = self.mass.players.get_player(player_id, True)
468 player = cast("SendspinPlayer", player) # For type checking
469 await self.api.group.remove_client(player.api)
470 for player_id in player_ids_to_add or []:
471 player = self.mass.players.get_player(player_id, True)
472 player = cast("SendspinPlayer", player) # For type checking
473 await self.api.group.add_client(player.api)
474 # self.group_members will be updated by the group event callback
475
476 async def _send_album_artwork(self, current_item: QueueItem) -> str | None:
477 """
478 Send album artwork to the sendspin group.
479
480 Args:
481 current_item: The current queue item.
482 """
483 artwork_url = None
484 if current_item.image is not None:
485 artwork_url = self.mass.metadata.get_image_url(current_item.image)
486
487 if artwork_url != self.last_sent_artwork_url:
488 # Image changed, resend the artwork
489 self.last_sent_artwork_url = artwork_url
490 if artwork_url is not None and current_item.media_item is not None:
491 image_data = await self.mass.metadata.get_image_data_for_item(
492 current_item.media_item
493 )
494 if image_data is not None:
495 image = await asyncio.to_thread(Image.open, BytesIO(image_data))
496 await self.api.group.set_media_art(image, source=ArtworkSource.ALBUM)
497 else:
498 # Clear artwork if none available
499 await self.api.group.set_media_art(None, source=ArtworkSource.ALBUM)
500
501 return artwork_url
502
503 async def _send_artist_artwork(self, current_item: QueueItem) -> None:
504 """
505 Send artist artwork to the sendspin group.
506
507 Args:
508 current_item: The current queue item.
509 """
510 # Extract primary artist if available
511 artist_artwork_url = None
512 if current_item.media_item is not None and hasattr(current_item.media_item, "artists"):
513 artists = getattr(current_item.media_item, "artists", None)
514 if artists and len(artists) > 0:
515 primary_artist = artists[0]
516 if hasattr(primary_artist, "image"):
517 artist_image = getattr(primary_artist, "image", None)
518 if artist_image is not None:
519 artist_artwork_url = self.mass.metadata.get_image_url(artist_image)
520
521 if artist_artwork_url != self.last_sent_artist_artwork_url:
522 # Artist image changed, resend the artwork
523 self.last_sent_artist_artwork_url = artist_artwork_url
524 if artist_artwork_url is not None:
525 artist_image_data = await self.mass.metadata.get_image_data_for_item(
526 primary_artist, img_type=ImageType.THUMB
527 )
528 if artist_image_data is not None:
529 artist_image = await asyncio.to_thread(Image.open, BytesIO(artist_image_data))
530 await self.api.group.set_media_art(artist_image, source=ArtworkSource.ARTIST)
531 else:
532 # Clear artist artwork if none available
533 await self.api.group.set_media_art(None, source=ArtworkSource.ARTIST)
534
535 def _on_player_media_updated(self) -> None:
536 """Handle callback when the current media of the player is updated."""
537 if self.synced_to is not None:
538 # Only leader sends metadata
539 return
540
541 if self.state.current_media is None:
542 # Clear metadata when no media loaded
543 self.api.group.set_metadata(Metadata())
544 return
545 self.mass.create_task(self.send_current_media_metadata())
546
547 async def send_current_media_metadata(self) -> None:
548 """Send the current media metadata to the sendspin group."""
549 if not self.available:
550 return
551 current_media = self.state.current_media
552 if current_media is None:
553 return
554 # check if we are playing a MA queue item
555 queue_item: QueueItem | None = None
556 queue: PlayerQueue | None = None
557 if current_media.source_id and current_media.queue_item_id:
558 queue = self.mass.player_queues.get(current_media.source_id)
559 queue_item = self.mass.player_queues.get_item(
560 current_media.source_id, current_media.queue_item_id
561 )
562
563 # Send album and artist artwork
564 if queue_item:
565 await self._send_album_artwork(queue_item)
566 await self._send_artist_artwork(queue_item)
567
568 track_duration = current_media.duration or 0
569 repeat = SendspinRepeatMode.OFF
570 if queue and queue.repeat_mode == RepeatMode.ALL:
571 repeat = SendspinRepeatMode.ALL
572 elif queue and queue.repeat_mode == RepeatMode.ONE:
573 repeat = SendspinRepeatMode.ONE
574
575 shuffle = queue.shuffle_enabled if queue else False
576
577 metadata = Metadata(
578 title=current_media.title,
579 artist=current_media.artist,
580 album_artist=None, # TODO: extract from optional queue item
581 album=current_media.album,
582 artwork_url=current_media.image_url,
583 year=None, # TODO: extract from optional queue item
584 track=None, # TODO: extract from optional queue item
585 track_duration=track_duration * 1000 if track_duration is not None else None,
586 track_progress=int(current_media.corrected_elapsed_time * 1000)
587 if current_media.corrected_elapsed_time
588 else 0,
589 playback_speed=1000,
590 repeat=repeat,
591 shuffle=shuffle,
592 )
593
594 # Send metadata to the group
595 self.api.group.set_metadata(metadata)
596
597 async def on_unload(self) -> None:
598 """Handle logic when the player is unloaded from the Player controller."""
599 await super().on_unload()
600 self.unsub_event_cb()
601 self.unsub_group_event_cb()
602