/
/
/
1"""Sendspin Player implementation."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from collections.abc import AsyncGenerator, Callable
8from io import BytesIO
9from typing import TYPE_CHECKING, cast
10
11from aiosendspin.models import MediaCommand
12from aiosendspin.models.types import ArtworkSource, PlaybackStateType
13from aiosendspin.models.types import RepeatMode as SendspinRepeatMode
14from aiosendspin.server import AudioFormat as SendspinAudioFormat
15from aiosendspin.server import (
16 ClientEvent,
17 GroupCommandEvent,
18 GroupEvent,
19 GroupStateChangedEvent,
20 SendspinGroup,
21 VolumeChangedEvent,
22)
23from aiosendspin.server.client import DisconnectBehaviour
24from aiosendspin.server.events import ClientGroupChangedEvent
25from aiosendspin.server.group import (
26 GroupDeletedEvent,
27 GroupMemberAddedEvent,
28 GroupMemberRemovedEvent,
29)
30from aiosendspin.server.metadata import Metadata
31from aiosendspin.server.stream import AudioCodec, MediaStream
32from music_assistant_models.constants import PLAYER_CONTROL_NONE
33from music_assistant_models.enums import (
34 ContentType,
35 ImageType,
36 PlaybackState,
37 PlayerFeature,
38 PlayerType,
39 RepeatMode,
40)
41from music_assistant_models.media_items import AudioFormat
42from music_assistant_models.player import DeviceInfo
43from PIL import Image
44
45from music_assistant.constants import CONF_OUTPUT_CHANNELS, CONF_OUTPUT_CODEC, INTERNAL_PCM_FORMAT
46from music_assistant.helpers.audio import get_player_filter_params
47from music_assistant.models.player import Player, PlayerMedia
48
49from .timed_client_stream import TimedClientStream
50
51# Supported group commands for Sendspin players
52SUPPORTED_GROUP_COMMANDS = [
53 MediaCommand.PLAY,
54 MediaCommand.PAUSE,
55 MediaCommand.STOP,
56 MediaCommand.NEXT,
57 MediaCommand.PREVIOUS,
58 MediaCommand.REPEAT_OFF,
59 MediaCommand.REPEAT_ONE,
60 MediaCommand.REPEAT_ALL,
61 MediaCommand.SHUFFLE,
62 MediaCommand.UNSHUFFLE,
63]
64
65if TYPE_CHECKING:
66 from aiosendspin.server.client import SendspinClient
67 from music_assistant_models.player_queue import PlayerQueue
68 from music_assistant_models.queue_item import QueueItem
69
70 from .provider import SendspinProvider
71
72
73class MusicAssistantMediaStream(MediaStream):
74 """MediaStream implementation for Music Assistant with per-player DSP support."""
75
76 player_instance: SendspinPlayer
77 internal_format: AudioFormat
78 output_format: AudioFormat
79
80 def __init__(
81 self,
82 *,
83 main_channel_source: AsyncGenerator[bytes, None],
84 main_channel_format: SendspinAudioFormat,
85 player_instance: SendspinPlayer,
86 internal_format: AudioFormat,
87 output_format: AudioFormat,
88 ) -> None:
89 """
90 Initialise the media stream with audio source and format for main_channel().
91
92 Args:
93 main_channel_source: Audio source generator for the main channel.
94 main_channel_format: Audio format for the main channel (includes codec).
95 player_instance: The SendspinPlayer instance for accessing mass and streams.
96 internal_format: Internal processing format (float32 for headroom).
97 output_format: Output PCM format (16-bit for player output).
98 """
99 super().__init__(
100 main_channel_source=main_channel_source,
101 main_channel_format=main_channel_format,
102 )
103 self.player_instance = player_instance
104 self.internal_format = internal_format
105 self.output_format = output_format
106
107 async def player_channel(
108 self,
109 player_id: str,
110 preferred_format: SendspinAudioFormat | None = None,
111 position_us: int = 0,
112 ) -> tuple[AsyncGenerator[bytes, None], SendspinAudioFormat, int] | None:
113 """
114 Get a player-specific audio stream with per-player DSP.
115
116 Args:
117 player_id: Identifier for the player requesting the stream.
118 preferred_format: The player's preferred native format for the stream.
119 The implementation may return a different format; the library
120 will handle any necessary conversion.
121 position_us: Position in microseconds relative to the main_stream start.
122 Used for late-joining players to sync with the main stream.
123
124 Returns:
125 A tuple of (audio generator, audio format, actual position in microseconds)
126 or None if unavailable. If None, the main_stream is used as fallback.
127 """
128 mass = self.player_instance.mass
129 multi_client_stream = self.player_instance.timed_client_stream
130 assert multi_client_stream is not None
131
132 dsp = mass.config.get_player_dsp_config(player_id)
133 output_channels = mass.config.get_raw_player_config_value(
134 player_id, CONF_OUTPUT_CHANNELS, "stereo"
135 )
136 if not dsp.enabled and output_channels == "stereo":
137 # DSP is disabled and output is stereo, use main_stream
138 return None
139
140 # Get per-player DSP filter parameters
141 filter_params = get_player_filter_params(
142 mass, player_id, self.internal_format, self.output_format
143 )
144
145 # Get the stream with position (in seconds)
146 stream_gen, actual_position = await multi_client_stream.get_stream(
147 output_format=self.output_format,
148 filter_params=filter_params,
149 )
150
151 # Convert position from seconds to microseconds for aiosendspin API
152 actual_position_us = int(actual_position * 1_000_000)
153
154 # Return actual position in microseconds relative to main_stream start
155 self.player_instance.logger.debug(
156 "Providing channel stream for player %s at position %d us",
157 player_id,
158 actual_position_us,
159 )
160 return (
161 stream_gen,
162 SendspinAudioFormat(
163 sample_rate=self.output_format.sample_rate,
164 bit_depth=self.output_format.bit_depth,
165 channels=self.output_format.channels,
166 codec=self._main_channel_format.codec,
167 ),
168 actual_position_us,
169 )
170
171
172class SendspinPlayer(Player):
173 """A sendspin audio player in Music Assistant."""
174
175 _attr_type = PlayerType.PROTOCOL
176
177 api: SendspinClient
178 unsub_event_cb: Callable[[], None]
179 unsub_group_event_cb: Callable[[], None]
180 last_sent_artwork_url: str | None = None
181 last_sent_artist_artwork_url: str | None = None
182 _playback_task: asyncio.Task[None] | None = None
183 timed_client_stream: TimedClientStream | None = None
184 is_web_player: bool = False
185
186 @property
187 def requires_flow_mode(self) -> bool:
188 """Return if the player requires flow mode."""
189 return True
190
191 def __init__(self, provider: SendspinProvider, player_id: str) -> None:
192 """Initialize the Player."""
193 super().__init__(provider, player_id)
194 sendspin_client = provider.server_api.get_client(player_id)
195 assert sendspin_client is not None
196 self.api = sendspin_client
197 self.api.disconnect_behaviour = DisconnectBehaviour.STOP
198 self.unsub_event_cb = sendspin_client.add_event_listener(self.event_cb)
199 self.unsub_group_event_cb = sendspin_client.group.add_event_listener(self.group_event_cb)
200 sendspin_client.group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
201
202 self.logger = self.provider.logger.getChild(player_id)
203 # init some static variables
204 self._attr_name = sendspin_client.name
205 self._attr_supported_features = {
206 PlayerFeature.PLAY_MEDIA,
207 PlayerFeature.SET_MEMBERS,
208 PlayerFeature.MULTI_DEVICE_DSP,
209 PlayerFeature.VOLUME_SET,
210 PlayerFeature.VOLUME_MUTE,
211 }
212 self._attr_can_group_with = {provider.instance_id}
213 self._attr_power_control = PLAYER_CONTROL_NONE
214 if device_info := sendspin_client.info.device_info:
215 self._attr_device_info = DeviceInfo(
216 model=device_info.product_name or "Unknown model",
217 manufacturer=device_info.manufacturer or "Unknown Manufacturer",
218 software_version=device_info.software_version,
219 )
220 else:
221 self._attr_device_info = DeviceInfo()
222 if player_client := sendspin_client.player:
223 self._attr_volume_level = player_client.volume
224 self._attr_volume_muted = player_client.muted
225 self._attr_available = True
226 self.is_web_player = sendspin_client.name.startswith(
227 "Web (" # The regular Web Interface
228 ) or sendspin_client.name.startswith(
229 "PWA (" # The PWA App
230 )
231 self._attr_expose_to_ha_by_default = not self.is_web_player
232 self._attr_hidden_by_default = self.is_web_player
233
234 def event_cb(self, client: SendspinClient, event: ClientEvent) -> None:
235 """Event callback registered to the sendspin server."""
236 self.logger.debug("Received PlayerEvent: %s", event)
237 match event:
238 case VolumeChangedEvent(volume=volume, muted=muted):
239 self._attr_volume_level = volume
240 self._attr_volume_muted = muted
241 self.update_state()
242 case ClientGroupChangedEvent(new_group=new_group):
243 self.unsub_group_event_cb()
244 self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb)
245 # Sync playback state from the new group
246 match new_group.state:
247 case PlaybackStateType.PLAYING:
248 self._attr_playback_state = PlaybackState.PLAYING
249 case PlaybackStateType.PAUSED:
250 self._attr_playback_state = PlaybackState.PAUSED
251 case PlaybackStateType.STOPPED:
252 self._attr_playback_state = PlaybackState.IDLE
253 # Update in case this is a newly created group
254 new_group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
255 # GroupMemberAddedEvent or GroupMemberRemovedEvent will be fired before this
256 # so group members are already up to date at this point
257 if self.synced_to is None:
258 # We are the leader, stop on disconnect
259 self.api.disconnect_behaviour = DisconnectBehaviour.STOP
260 else:
261 self.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP
262 self.update_state()
263
264 async def _handle_group_command(self, command: MediaCommand) -> None:
265 """Handle a group command from aiosendspin."""
266 queue = self.mass.player_queues.get_active_queue(self.player_id)
267 match command:
268 case MediaCommand.PLAY:
269 await self.mass.players.cmd_play(self.player_id)
270 case MediaCommand.PAUSE:
271 await self.mass.players.cmd_pause(self.player_id)
272 case MediaCommand.STOP:
273 await self.mass.players.cmd_stop(self.player_id)
274 case MediaCommand.NEXT:
275 await self.mass.players.cmd_next_track(self.player_id)
276 case MediaCommand.PREVIOUS:
277 await self.mass.players.cmd_previous_track(self.player_id)
278 case MediaCommand.REPEAT_OFF if queue:
279 self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF)
280 case MediaCommand.REPEAT_ONE if queue:
281 self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE)
282 case MediaCommand.REPEAT_ALL if queue:
283 self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL)
284 case MediaCommand.SHUFFLE if queue:
285 await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=True)
286 case MediaCommand.UNSHUFFLE if queue:
287 await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=False)
288
289 def group_event_cb(self, group: SendspinGroup, event: GroupEvent) -> None:
290 """Event callback registered to the sendspin group this player belongs to."""
291 if self.synced_to is not None:
292 # Only handle group events as the leader, except for:
293 # - GroupMemberRemovedEvent: to handle being removed from a group
294 # - GroupStateChangedEvent: to update playback state when leader stops/disconnects
295 if not isinstance(event, (GroupMemberRemovedEvent, GroupStateChangedEvent)):
296 return
297 self.logger.debug("Received GroupEvent: %s", event)
298
299 match event:
300 case GroupCommandEvent(command=command):
301 self.logger.debug("Group command received: %s", command)
302 self.mass.create_task(self._handle_group_command(command))
303 case GroupStateChangedEvent(state=state):
304 self.logger.debug("Group state changed to: %s", state)
305 match state:
306 case PlaybackStateType.PLAYING:
307 self._attr_playback_state = PlaybackState.PLAYING
308 case PlaybackStateType.PAUSED:
309 self._attr_playback_state = PlaybackState.PAUSED
310 case PlaybackStateType.STOPPED:
311 self._attr_playback_state = PlaybackState.IDLE
312 self._attr_elapsed_time = 0
313 self._attr_elapsed_time_last_updated = time.time()
314 self.update_state()
315 case GroupMemberAddedEvent(client_id=client_id):
316 self.logger.debug("Group member added: %s", client_id)
317 if client_id not in self._attr_group_members:
318 self._attr_group_members.append(client_id)
319 self.update_state()
320 case GroupMemberRemovedEvent(client_id=client_id):
321 self.logger.debug("Group member removed: %s", client_id)
322 self.mass.create_task(self._handle_member_removed(group, client_id))
323 case GroupDeletedEvent():
324 pass
325
326 async def _handle_member_removed(self, group: SendspinGroup, client_id: str) -> None:
327 """Handle group member removed event asynchronously."""
328 if client_id == self.player_id:
329 if len(self._attr_group_members) > 0:
330 # We were just removed as a leader:
331 # 1. stop playback on the old group
332 await group.stop()
333 # 2. clear our members (since we are now alone)
334 group_members = [
335 member for member in self._attr_group_members if member != client_id
336 ]
337 self._attr_group_members = []
338 # 3. assign new leader if there are members left
339 if len(group_members) > 0 and (
340 new_leader := self.mass.players.get_player(group_members[0])
341 ):
342 new_leader = cast("SendspinPlayer", new_leader)
343 new_leader._attr_group_members = group_members[1:]
344 new_leader.api.disconnect_behaviour = DisconnectBehaviour.STOP
345 new_leader.update_state()
346 self.update_state()
347 elif client_id in self._attr_group_members:
348 # Someone else left our group
349 self._attr_group_members.remove(client_id)
350 self.update_state()
351
352 async def volume_set(self, volume_level: int) -> None:
353 """Handle VOLUME_SET command on the player."""
354 if player_client := self.api.player:
355 player_client.set_volume(volume_level)
356
357 async def volume_mute(self, muted: bool) -> None:
358 """Handle VOLUME MUTE command on the player."""
359 if player_client := self.api.player:
360 if muted:
361 player_client.mute()
362 else:
363 player_client.unmute()
364
365 async def stop(self) -> None:
366 """Stop command."""
367 self.logger.debug("Received STOP command on player %s", self.display_name)
368 # We don't care if we stopped the stream or it was already stopped
369 await self.api.group.stop()
370 # Clear the playback task reference (group.stop() handles stopping the stream)
371 self._playback_task = None
372 self._attr_current_media = None
373 self.update_state()
374
375 async def play_media(self, media: PlayerMedia) -> None:
376 """Play media command."""
377 self.logger.debug(
378 "Received PLAY_MEDIA command on player %s with uri %s", self.display_name, media.uri
379 )
380
381 # Update player state optimistically
382 self._attr_current_media = media
383 self._attr_elapsed_time = 0
384 self._attr_elapsed_time_last_updated = time.time()
385 # playback_state will be set by the group state change event
386
387 # Stop previous stream in case we were already playing something
388 await self.api.group.stop()
389 # Run playback in background task to immediately return
390 self._playback_task = asyncio.create_task(self._run_playback(media))
391 self.update_state()
392
393 async def _run_playback(self, media: PlayerMedia) -> None:
394 """Run the actual playback in a background task."""
395 try:
396 # Use 32-bit for the main channel: aiosendspin converts per player as needed
397 pcm_format = AudioFormat(
398 content_type=ContentType.PCM_S32LE,
399 sample_rate=48000,
400 bit_depth=32,
401 channels=2,
402 )
403 flow_pcm_format = AudioFormat(
404 content_type=INTERNAL_PCM_FORMAT.content_type,
405 sample_rate=pcm_format.sample_rate,
406 bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
407 channels=pcm_format.channels,
408 )
409
410 output_codec = cast("str", self.config.get_value(CONF_OUTPUT_CODEC, "pcm"))
411
412 # Convert string codec to AudioCodec enum
413 audio_codec = AudioCodec(output_codec)
414
415 # Get clean audio source in flow format (high quality internal format)
416 # Format conversion and per-player DSP will be applied via player_channel
417 audio_source = self.mass.streams.get_stream(media, flow_pcm_format)
418
419 # Create TimedClientStream to wrap the clean audio source
420 # This distributes the audio to multiple subscribers without DSP
421 self.timed_client_stream = TimedClientStream(
422 audio_source=audio_source,
423 audio_format=flow_pcm_format,
424 )
425
426 # Setup the main channel subscription
427 main_channel_gen, main_position = await self.timed_client_stream.get_stream(
428 output_format=pcm_format,
429 filter_params=None, # TODO: this should probably still include the safety limiter
430 )
431 assert main_position == 0.0 # first subscriber, should be zero
432 media_stream = MusicAssistantMediaStream(
433 main_channel_source=main_channel_gen,
434 main_channel_format=SendspinAudioFormat(
435 sample_rate=pcm_format.sample_rate,
436 bit_depth=pcm_format.bit_depth,
437 channels=pcm_format.channels,
438 codec=audio_codec,
439 ),
440 player_instance=self,
441 internal_format=flow_pcm_format,
442 output_format=pcm_format,
443 )
444
445 stop_time = await self.api.group.play_media(media_stream)
446 await self.api.group.stop(stop_time)
447 except asyncio.CancelledError:
448 self.logger.debug("Playback cancelled for player %s", self.display_name)
449 raise
450 except Exception:
451 self.logger.exception("Error during playback for player %s", self.display_name)
452 raise
453 finally:
454 self.timed_client_stream = None
455
456 async def set_members(
457 self,
458 player_ids_to_add: list[str] | None = None,
459 player_ids_to_remove: list[str] | None = None,
460 ) -> None:
461 """Handle SET_MEMBERS command on the player."""
462 self.logger.debug(
463 "set_members called: adding %s, removing %s", player_ids_to_add, player_ids_to_remove
464 )
465 for player_id in player_ids_to_remove or []:
466 player = self.mass.players.get_player(player_id, True)
467 player = cast("SendspinPlayer", player) # For type checking
468 await self.api.group.remove_client(player.api)
469 for player_id in player_ids_to_add or []:
470 player = self.mass.players.get_player(player_id, True)
471 player = cast("SendspinPlayer", player) # For type checking
472 await self.api.group.add_client(player.api)
473 # self.group_members will be updated by the group event callback
474
475 async def _send_album_artwork(self, current_item: QueueItem) -> str | None:
476 """
477 Send album artwork to the sendspin group.
478
479 Args:
480 current_item: The current queue item.
481 """
482 artwork_url = None
483 if current_item.image is not None:
484 artwork_url = self.mass.metadata.get_image_url(current_item.image)
485
486 if artwork_url != self.last_sent_artwork_url:
487 # Image changed, resend the artwork
488 self.last_sent_artwork_url = artwork_url
489 if artwork_url is not None and current_item.media_item is not None:
490 image_data = await self.mass.metadata.get_image_data_for_item(
491 current_item.media_item
492 )
493 if image_data is not None:
494 image = await asyncio.to_thread(Image.open, BytesIO(image_data))
495 await self.api.group.set_media_art(image, source=ArtworkSource.ALBUM)
496 else:
497 # Clear artwork if none available
498 await self.api.group.set_media_art(None, source=ArtworkSource.ALBUM)
499
500 return artwork_url
501
502 async def _send_artist_artwork(self, current_item: QueueItem) -> None:
503 """
504 Send artist artwork to the sendspin group.
505
506 Args:
507 current_item: The current queue item.
508 """
509 # Extract primary artist if available
510 artist_artwork_url = None
511 if current_item.media_item is not None and hasattr(current_item.media_item, "artists"):
512 artists = getattr(current_item.media_item, "artists", None)
513 if artists and len(artists) > 0:
514 primary_artist = artists[0]
515 if hasattr(primary_artist, "image"):
516 artist_image = getattr(primary_artist, "image", None)
517 if artist_image is not None:
518 artist_artwork_url = self.mass.metadata.get_image_url(artist_image)
519
520 if artist_artwork_url != self.last_sent_artist_artwork_url:
521 # Artist image changed, resend the artwork
522 self.last_sent_artist_artwork_url = artist_artwork_url
523 if artist_artwork_url is not None:
524 artist_image_data = await self.mass.metadata.get_image_data_for_item(
525 primary_artist, img_type=ImageType.THUMB
526 )
527 if artist_image_data is not None:
528 artist_image = await asyncio.to_thread(Image.open, BytesIO(artist_image_data))
529 await self.api.group.set_media_art(artist_image, source=ArtworkSource.ARTIST)
530 else:
531 # Clear artist artwork if none available
532 await self.api.group.set_media_art(None, source=ArtworkSource.ARTIST)
533
534 def _on_player_media_updated(self) -> None:
535 """Handle callback when the current media of the player is updated."""
536 if self.synced_to is not None:
537 # Only leader sends metadata
538 return
539
540 if self.state.current_media is None:
541 # Clear metadata when no media loaded
542 self.api.group.set_metadata(Metadata())
543 return
544 self.mass.create_task(self.send_current_media_metadata())
545
546 async def send_current_media_metadata(self) -> None:
547 """Send the current media metadata to the sendspin group."""
548 if not self.available:
549 return
550 current_media = self.state.current_media
551 if current_media is None:
552 return
553 # check if we are playing a MA queue item
554 queue_item: QueueItem | None = None
555 queue: PlayerQueue | None = None
556 if current_media.source_id and current_media.queue_item_id:
557 queue = self.mass.player_queues.get(current_media.source_id)
558 queue_item = self.mass.player_queues.get_item(
559 current_media.source_id, current_media.queue_item_id
560 )
561
562 # Send album and artist artwork
563 if queue_item:
564 await self._send_album_artwork(queue_item)
565 await self._send_artist_artwork(queue_item)
566
567 track_duration = current_media.duration or 0
568 repeat = SendspinRepeatMode.OFF
569 if queue and queue.repeat_mode == RepeatMode.ALL:
570 repeat = SendspinRepeatMode.ALL
571 elif queue and queue.repeat_mode == RepeatMode.ONE:
572 repeat = SendspinRepeatMode.ONE
573
574 shuffle = queue.shuffle_enabled if queue else False
575
576 metadata = Metadata(
577 title=current_media.title,
578 artist=current_media.artist,
579 album_artist=None, # TODO: extract from optional queue item
580 album=current_media.album,
581 artwork_url=current_media.image_url,
582 year=None, # TODO: extract from optional queue item
583 track=None, # TODO: extract from optional queue item
584 track_duration=track_duration * 1000 if track_duration is not None else None,
585 track_progress=int(current_media.corrected_elapsed_time * 1000)
586 if current_media.corrected_elapsed_time
587 else 0,
588 playback_speed=1000,
589 repeat=repeat,
590 shuffle=shuffle,
591 )
592
593 # Send metadata to the group
594 self.api.group.set_metadata(metadata)
595
596 async def on_unload(self) -> None:
597 """Handle logic when the player is unloaded from the Player controller."""
598 await super().on_unload()
599 self.unsub_event_cb()
600 self.unsub_group_event_cb()
601