/
/
/
1"""Sendspin Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections.abc import AsyncGenerator, Callable
8from io import BytesIO
9from typing import TYPE_CHECKING, cast
10
11from aiosendspin.models import MediaCommand
12from aiosendspin.models.types import ArtworkSource, PlaybackStateType
13from aiosendspin.models.types import RepeatMode as SendspinRepeatMode
14from aiosendspin.server import AudioFormat as SendspinAudioFormat
15from aiosendspin.server import (
16 ClientEvent,
17 GroupCommandEvent,
18 GroupEvent,
19 GroupStateChangedEvent,
20 SendspinGroup,
21 VolumeChangedEvent,
22)
23from aiosendspin.server.client import DisconnectBehaviour
24from aiosendspin.server.events import ClientGroupChangedEvent
25from aiosendspin.server.group import (
26 GroupDeletedEvent,
27 GroupMemberAddedEvent,
28 GroupMemberRemovedEvent,
29)
30from aiosendspin.server.metadata import Metadata
31from aiosendspin.server.stream import AudioCodec, MediaStream
32from music_assistant_models.constants import PLAYER_CONTROL_NONE
33from music_assistant_models.enums import (
34 ContentType,
35 ImageType,
36 PlaybackState,
37 PlayerFeature,
38 PlayerType,
39 RepeatMode,
40)
41from music_assistant_models.media_items import AudioFormat
42from music_assistant_models.player import DeviceInfo
43from PIL import Image
44
45from music_assistant.constants import CONF_OUTPUT_CHANNELS, CONF_OUTPUT_CODEC, INTERNAL_PCM_FORMAT
46from music_assistant.helpers.audio import get_player_filter_params
47from music_assistant.models.player import Player, PlayerMedia
48
49from .timed_client_stream import TimedClientStream
50
51# Supported group commands for Sendspin players
52SUPPORTED_GROUP_COMMANDS = [
53 MediaCommand.PLAY,
54 MediaCommand.PAUSE,
55 MediaCommand.STOP,
56 MediaCommand.NEXT,
57 MediaCommand.PREVIOUS,
58 MediaCommand.REPEAT_OFF,
59 MediaCommand.REPEAT_ONE,
60 MediaCommand.REPEAT_ALL,
61 MediaCommand.SHUFFLE,
62 MediaCommand.UNSHUFFLE,
63]
64
65if TYPE_CHECKING:
66 from aiosendspin.server.client import SendspinClient
67 from music_assistant_models.player_queue import PlayerQueue
68 from music_assistant_models.queue_item import QueueItem
69
70 from .provider import SendspinProvider
71
72
73class MusicAssistantMediaStream(MediaStream):
74 """MediaStream implementation for Music Assistant with per-player DSP support."""
75
76 player_instance: SendspinPlayer
77 internal_format: AudioFormat
78 output_format: AudioFormat
79
80 def __init__(
81 self,
82 *,
83 main_channel_source: AsyncGenerator[bytes, None],
84 main_channel_format: SendspinAudioFormat,
85 player_instance: SendspinPlayer,
86 internal_format: AudioFormat,
87 output_format: AudioFormat,
88 ) -> None:
89 """
90 Initialise the media stream with audio source and format for main_channel().
91
92 Args:
93 main_channel_source: Audio source generator for the main channel.
94 main_channel_format: Audio format for the main channel (includes codec).
95 player_instance: The SendspinPlayer instance for accessing mass and streams.
96 internal_format: Internal processing format (float32 for headroom).
97 output_format: Output PCM format (16-bit for player output).
98 """
99 super().__init__(
100 main_channel_source=main_channel_source,
101 main_channel_format=main_channel_format,
102 )
103 self.player_instance = player_instance
104 self.internal_format = internal_format
105 self.output_format = output_format
106
107 async def player_channel(
108 self,
109 player_id: str,
110 preferred_format: SendspinAudioFormat | None = None,
111 position_us: int = 0,
112 ) -> tuple[AsyncGenerator[bytes, None], SendspinAudioFormat, int] | None:
113 """
114 Get a player-specific audio stream with per-player DSP.
115
116 Args:
117 player_id: Identifier for the player requesting the stream.
118 preferred_format: The player's preferred native format for the stream.
119 The implementation may return a different format; the library
120 will handle any necessary conversion.
121 position_us: Position in microseconds relative to the main_stream start.
122 Used for late-joining players to sync with the main stream.
123
124 Returns:
125 A tuple of (audio generator, audio format, actual position in microseconds)
126 or None if unavailable. If None, the main_stream is used as fallback.
127 """
128 mass = self.player_instance.mass
129 multi_client_stream = self.player_instance.timed_client_stream
130 assert multi_client_stream is not None
131
132 dsp = mass.config.get_player_dsp_config(player_id)
133 output_channels = mass.config.get_raw_player_config_value(
134 player_id, CONF_OUTPUT_CHANNELS, "stereo"
135 )
136 if not dsp.enabled and output_channels == "stereo":
137 # DSP is disabled and output is stereo, use main_stream
138 return None
139
140 # Get per-player DSP filter parameters
141 filter_params = get_player_filter_params(
142 mass, player_id, self.internal_format, self.output_format
143 )
144
145 # Get the stream with position (in seconds)
146 stream_gen, actual_position = await multi_client_stream.get_stream(
147 output_format=self.output_format,
148 filter_params=filter_params,
149 )
150
151 # Convert position from seconds to microseconds for aiosendspin API
152 actual_position_us = int(actual_position * 1_000_000)
153
154 # Return actual position in microseconds relative to main_stream start
155 self.player_instance.logger.debug(
156 "Providing channel stream for player %s at position %d us",
157 player_id,
158 actual_position_us,
159 )
160 return (
161 stream_gen,
162 SendspinAudioFormat(
163 sample_rate=self.output_format.sample_rate,
164 bit_depth=self.output_format.bit_depth,
165 channels=self.output_format.channels,
166 codec=self._main_channel_format.codec,
167 ),
168 actual_position_us,
169 )
170
171
172class SendspinPlayer(Player):
173 """A sendspin audio player in Music Assistant."""
174
175 api: SendspinClient
176 unsub_event_cb: Callable[[], None]
177 unsub_group_event_cb: Callable[[], None]
178 last_sent_artwork_url: str | None = None
179 last_sent_artist_artwork_url: str | None = None
180 _playback_task: asyncio.Task[None] | None = None
181 timed_client_stream: TimedClientStream | None = None
182 is_web_player: bool = False
183
184 @property
185 def requires_flow_mode(self) -> bool:
186 """Return if the player requires flow mode."""
187 return True
188
189 def __init__(self, provider: SendspinProvider, player_id: str) -> None:
190 """Initialize the Player."""
191 super().__init__(provider, player_id)
192 sendspin_client = provider.server_api.get_client(player_id)
193 assert sendspin_client is not None
194 self.api = sendspin_client
195 self.api.disconnect_behaviour = DisconnectBehaviour.STOP
196 self.unsub_event_cb = sendspin_client.add_event_listener(self.event_cb)
197 self.unsub_group_event_cb = sendspin_client.group.add_event_listener(self.group_event_cb)
198 sendspin_client.group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
199
200 self.logger = self.provider.logger.getChild(player_id)
201 # init some static variables
202 self._attr_name = sendspin_client.name
203 self._attr_type = PlayerType.PLAYER
204 self._attr_supported_features = {
205 PlayerFeature.SET_MEMBERS,
206 PlayerFeature.MULTI_DEVICE_DSP,
207 PlayerFeature.VOLUME_SET,
208 PlayerFeature.VOLUME_MUTE,
209 }
210 self._attr_can_group_with = {provider.instance_id}
211 self._attr_power_control = PLAYER_CONTROL_NONE
212 if device_info := sendspin_client.info.device_info:
213 self._attr_device_info = DeviceInfo(
214 model=device_info.product_name or "Unknown model",
215 manufacturer=device_info.manufacturer or "Unknown Manufacturer",
216 software_version=device_info.software_version,
217 )
218 else:
219 self._attr_device_info = DeviceInfo()
220 if player_client := sendspin_client.player:
221 self._attr_volume_level = player_client.volume
222 self._attr_volume_muted = player_client.muted
223 self._attr_available = True
224 self.is_web_player = sendspin_client.name.startswith(
225 "Web (" # The regular Web Interface
226 ) or sendspin_client.name.startswith(
227 "PWA (" # The PWA App
228 )
229 self._attr_expose_to_ha_by_default = not self.is_web_player
230 self._attr_hidden_by_default = self.is_web_player
231
232 def event_cb(self, client: SendspinClient, event: ClientEvent) -> None:
233 """Event callback registered to the sendspin server."""
234 self.logger.debug("Received PlayerEvent: %s", event)
235 match event:
236 case VolumeChangedEvent(volume=volume, muted=muted):
237 self._attr_volume_level = volume
238 self._attr_volume_muted = muted
239 self.update_state()
240 case ClientGroupChangedEvent(new_group=new_group):
241 self.unsub_group_event_cb()
242 self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb)
243 # Sync playback state from the new group
244 match new_group.state:
245 case PlaybackStateType.PLAYING:
246 self._attr_playback_state = PlaybackState.PLAYING
247 case PlaybackStateType.PAUSED:
248 self._attr_playback_state = PlaybackState.PAUSED
249 case PlaybackStateType.STOPPED:
250 self._attr_playback_state = PlaybackState.IDLE
251 # Update in case this is a newly created group
252 new_group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
253 # GroupMemberAddedEvent or GroupMemberRemovedEvent will be fired before this
254 # so group members are already up to date at this point
255 if self.synced_to is None:
256 # We are the leader, stop on disconnect
257 self.api.disconnect_behaviour = DisconnectBehaviour.STOP
258 else:
259 self.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP
260 self.update_state()
261
262 async def _handle_group_command(self, command: MediaCommand) -> None:
263 """Handle a group command from aiosendspin."""
264 queue = self.mass.player_queues.get_active_queue(self.player_id)
265 match command:
266 case MediaCommand.PLAY:
267 await self.mass.players.cmd_play(self.player_id)
268 case MediaCommand.PAUSE:
269 await self.mass.players.cmd_pause(self.player_id)
270 case MediaCommand.STOP:
271 await self.mass.players.cmd_stop(self.player_id)
272 case MediaCommand.NEXT:
273 await self.mass.players.cmd_next_track(self.player_id)
274 case MediaCommand.PREVIOUS:
275 await self.mass.players.cmd_previous_track(self.player_id)
276 case MediaCommand.REPEAT_OFF if queue:
277 self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF)
278 case MediaCommand.REPEAT_ONE if queue:
279 self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE)
280 case MediaCommand.REPEAT_ALL if queue:
281 self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL)
282 case MediaCommand.SHUFFLE if queue:
283 await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=True)
284 case MediaCommand.UNSHUFFLE if queue:
285 await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=False)
286
287 def group_event_cb(self, group: SendspinGroup, event: GroupEvent) -> None:
288 """Event callback registered to the sendspin group this player belongs to."""
289 if self.synced_to is not None:
290 # Only handle group events as the leader, except for:
291 # - GroupMemberRemovedEvent: to handle being removed from a group
292 # - GroupStateChangedEvent: to update playback state when leader stops/disconnects
293 if not isinstance(event, (GroupMemberRemovedEvent, GroupStateChangedEvent)):
294 return
295 self.logger.debug("Received GroupEvent: %s", event)
296
297 match event:
298 case GroupCommandEvent(command=command):
299 self.logger.debug("Group command received: %s", command)
300 self.mass.create_task(self._handle_group_command(command))
301 case GroupStateChangedEvent(state=state):
302 self.logger.debug("Group state changed to: %s", state)
303 match state:
304 case PlaybackStateType.PLAYING:
305 self._attr_playback_state = PlaybackState.PLAYING
306 case PlaybackStateType.PAUSED:
307 self._attr_playback_state = PlaybackState.PAUSED
308 case PlaybackStateType.STOPPED:
309 self._attr_playback_state = PlaybackState.IDLE
310 self._attr_elapsed_time = 0
311 self._attr_elapsed_time_last_updated = time.time()
312 self.update_state()
313 case GroupMemberAddedEvent(client_id=client_id):
314 self.logger.debug("Group member added: %s", client_id)
315 if client_id not in self._attr_group_members:
316 self._attr_group_members.append(client_id)
317 self.update_state()
318 case GroupMemberRemovedEvent(client_id=client_id):
319 self.logger.debug("Group member removed: %s", client_id)
320 self.mass.create_task(self._handle_member_removed(group, client_id))
321 case GroupDeletedEvent():
322 pass
323
324 async def _handle_member_removed(self, group: SendspinGroup, client_id: str) -> None:
325 """Handle group member removed event asynchronously."""
326 if client_id == self.player_id:
327 if len(self._attr_group_members) > 0:
328 # We were just removed as a leader:
329 # 1. stop playback on the old group
330 await group.stop()
331 # 2. clear our members (since we are now alone)
332 group_members = [
333 member for member in self._attr_group_members if member != client_id
334 ]
335 self._attr_group_members = []
336 # 3. assign new leader if there are members left
337 if len(group_members) > 0 and (
338 new_leader := self.mass.players.get(group_members[0])
339 ):
340 new_leader = cast("SendspinPlayer", new_leader)
341 new_leader._attr_group_members = group_members[1:]
342 new_leader.api.disconnect_behaviour = DisconnectBehaviour.STOP
343 new_leader.update_state()
344 self.update_state()
345 elif client_id in self._attr_group_members:
346 # Someone else left our group
347 self._attr_group_members.remove(client_id)
348 self.update_state()
349
350 async def volume_set(self, volume_level: int) -> None:
351 """Handle VOLUME_SET command on the player."""
352 if player_client := self.api.player:
353 player_client.set_volume(volume_level)
354
355 async def volume_mute(self, muted: bool) -> None:
356 """Handle VOLUME MUTE command on the player."""
357 if player_client := self.api.player:
358 if muted:
359 player_client.mute()
360 else:
361 player_client.unmute()
362
363 async def stop(self) -> None:
364 """Stop command."""
365 self.logger.debug("Received STOP command on player %s", self.display_name)
366 # We don't care if we stopped the stream or it was already stopped
367 await self.api.group.stop()
368 # Clear the playback task reference (group.stop() handles stopping the stream)
369 self._playback_task = None
370 self._attr_current_media = None
371 self.update_state()
372
373 async def play_media(self, media: PlayerMedia) -> None:
374 """Play media command."""
375 self.logger.debug(
376 "Received PLAY_MEDIA command on player %s with uri %s", self.display_name, media.uri
377 )
378
379 # Update player state optimistically
380 self._attr_current_media = media
381 self._attr_elapsed_time = 0
382 self._attr_elapsed_time_last_updated = time.time()
383 # playback_state will be set by the group state change event
384
385 # Stop previous stream in case we were already playing something
386 await self.api.group.stop()
387 # Run playback in background task to immediately return
388 self._playback_task = asyncio.create_task(self._run_playback(media))
389 self.update_state()
390
391 async def _run_playback(self, media: PlayerMedia) -> None:
392 """Run the actual playback in a background task."""
393 try:
394 # Use 32-bit for the main channel: aiosendspin converts per player as needed
395 pcm_format = AudioFormat(
396 content_type=ContentType.PCM_S32LE,
397 sample_rate=48000,
398 bit_depth=32,
399 channels=2,
400 )
401 flow_pcm_format = AudioFormat(
402 content_type=INTERNAL_PCM_FORMAT.content_type,
403 sample_rate=pcm_format.sample_rate,
404 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
405 channels=pcm_format.channels,
406 )
407
408 output_codec = cast("str", self.config.get_value(CONF_OUTPUT_CODEC, "pcm"))
409
410 # Convert string codec to AudioCodec enum
411 audio_codec = AudioCodec(output_codec)
412
413 # Get clean audio source in flow format (high quality internal format)
414 # Format conversion and per-player DSP will be applied via player_channel
415 audio_source = self.mass.streams.get_stream(media, flow_pcm_format)
416
417 # Create TimedClientStream to wrap the clean audio source
418 # This distributes the audio to multiple subscribers without DSP
419 self.timed_client_stream = TimedClientStream(
420 audio_source=audio_source,
421 audio_format=flow_pcm_format,
422 )
423
424 # Setup the main channel subscription
425 main_channel_gen, main_position = await self.timed_client_stream.get_stream(
426 output_format=pcm_format,
427 filter_params=None, # TODO: this should probably still include the safety limiter
428 )
429 assert main_position == 0.0 # first subscriber, should be zero
430 media_stream = MusicAssistantMediaStream(
431 main_channel_source=main_channel_gen,
432 main_channel_format=SendspinAudioFormat(
433 sample_rate=pcm_format.sample_rate,
434 bit_depth=pcm_format.bit_depth,
435 channels=pcm_format.channels,
436 codec=audio_codec,
437 ),
438 player_instance=self,
439 internal_format=flow_pcm_format,
440 output_format=pcm_format,
441 )
442
443 stop_time = await self.api.group.play_media(media_stream)
444 await self.api.group.stop(stop_time)
445 except asyncio.CancelledError:
446 self.logger.debug("Playback cancelled for player %s", self.display_name)
447 raise
448 except Exception:
449 self.logger.exception("Error during playback for player %s", self.display_name)
450 raise
451 finally:
452 self.timed_client_stream = None
453
454 async def set_members(
455 self,
456 player_ids_to_add: list[str] | None = None,
457 player_ids_to_remove: list[str] | None = None,
458 ) -> None:
459 """Handle SET_MEMBERS command on the player."""
460 self.logger.debug(
461 "set_members called: adding %s, removing %s", player_ids_to_add, player_ids_to_remove
462 )
463 for player_id in player_ids_to_remove or []:
464 player = self.mass.players.get(player_id, True)
465 player = cast("SendspinPlayer", player) # For type checking
466 await self.api.group.remove_client(player.api)
467 for player_id in player_ids_to_add or []:
468 player = self.mass.players.get(player_id, True)
469 player = cast("SendspinPlayer", player) # For type checking
470 await self.api.group.add_client(player.api)
471 # self.group_members will be updated by the group event callback
472
473 async def _send_album_artwork(self, current_item: QueueItem) -> str | None:
474 """
475 Send album artwork to the sendspin group.
476
477 Args:
478 current_item: The current queue item.
479 """
480 artwork_url = None
481 if current_item.image is not None:
482 artwork_url = self.mass.metadata.get_image_url(current_item.image)
483
484 if artwork_url != self.last_sent_artwork_url:
485 # Image changed, resend the artwork
486 self.last_sent_artwork_url = artwork_url
487 if artwork_url is not None and current_item.media_item is not None:
488 image_data = await self.mass.metadata.get_image_data_for_item(
489 current_item.media_item
490 )
491 if image_data is not None:
492 image = await asyncio.to_thread(Image.open, BytesIO(image_data))
493 await self.api.group.set_media_art(image, source=ArtworkSource.ALBUM)
494 else:
495 # Clear artwork if none available
496 await self.api.group.set_media_art(None, source=ArtworkSource.ALBUM)
497
498 return artwork_url
499
500 async def _send_artist_artwork(self, current_item: QueueItem) -> None:
501 """
502 Send artist artwork to the sendspin group.
503
504 Args:
505 current_item: The current queue item.
506 """
507 # Extract primary artist if available
508 artist_artwork_url = None
509 if current_item.media_item is not None and hasattr(current_item.media_item, "artists"):
510 artists = getattr(current_item.media_item, "artists", None)
511 if artists and len(artists) > 0:
512 primary_artist = artists[0]
513 if hasattr(primary_artist, "image"):
514 artist_image = getattr(primary_artist, "image", None)
515 if artist_image is not None:
516 artist_artwork_url = self.mass.metadata.get_image_url(artist_image)
517
518 if artist_artwork_url != self.last_sent_artist_artwork_url:
519 # Artist image changed, resend the artwork
520 self.last_sent_artist_artwork_url = artist_artwork_url
521 if artist_artwork_url is not None:
522 artist_image_data = await self.mass.metadata.get_image_data_for_item(
523 primary_artist, img_type=ImageType.THUMB
524 )
525 if artist_image_data is not None:
526 artist_image = await asyncio.to_thread(Image.open, BytesIO(artist_image_data))
527 await self.api.group.set_media_art(artist_image, source=ArtworkSource.ARTIST)
528 else:
529 # Clear artist artwork if none available
530 await self.api.group.set_media_art(None, source=ArtworkSource.ARTIST)
531
532 def _on_player_media_updated(self) -> None:
533 """Handle callback when the current media of the player is updated."""
534 if self.synced_to is not None:
535 # Only leader sends metadata
536 return
537
538 if self.current_media is None:
539 # Clear metadata when no media loaded
540 self.api.group.set_metadata(Metadata())
541 return
542 self.mass.create_task(self.send_current_media_metadata())
543
544 async def send_current_media_metadata(self) -> None:
545 """Send the current media metadata to the sendspin group."""
546 if not self.available:
547 return
548 current_media = self.current_media
549 if current_media is None:
550 return
551 # check if we are playing a MA queue item
552 queue_item: QueueItem | None = None
553 queue: PlayerQueue | None = None
554 if current_media.source_id and current_media.queue_item_id:
555 queue = self.mass.player_queues.get(current_media.source_id)
556 queue_item = self.mass.player_queues.get_item(
557 current_media.source_id, current_media.queue_item_id
558 )
559
560 # Send album and artist artwork
561 if queue_item:
562 await self._send_album_artwork(queue_item)
563 await self._send_artist_artwork(queue_item)
564
565 track_duration = current_media.duration or 0
566 repeat = SendspinRepeatMode.OFF
567 if queue and queue.repeat_mode == RepeatMode.ALL:
568 repeat = SendspinRepeatMode.ALL
569 elif queue and queue.repeat_mode == RepeatMode.ONE:
570 repeat = SendspinRepeatMode.ONE
571
572 shuffle = queue.shuffle_enabled if queue else False
573
574 metadata = Metadata(
575 title=current_media.title,
576 artist=current_media.artist,
577 album_artist=None, # TODO: extract from optional queue item
578 album=current_media.album,
579 artwork_url=current_media.image_url,
580 year=None, # TODO: extract from optional queue item
581 track=None, # TODO: extract from optional queue item
582 track_duration=track_duration * 1000 if track_duration is not None else None,
583 track_progress=int(current_media.corrected_elapsed_time * 1000)
584 if current_media.corrected_elapsed_time
585 else 0,
586 playback_speed=1000,
587 repeat=repeat,
588 shuffle=shuffle,
589 )
590
591 # Send metadata to the group
592 self.api.group.set_metadata(metadata)
593
594 async def on_unload(self) -> None:
595 """Handle logic when the player is unloaded from the Player controller."""
596 await super().on_unload()
597 self.unsub_event_cb()
598 self.unsub_group_event_cb()
599