/
/
/
1"""Snapcast Player."""
2
3import asyncio
4import random
5import time
6import urllib.parse
7from contextlib import suppress
8from typing import TYPE_CHECKING, cast
9
10from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
11from music_assistant_models.enums import PlaybackState, PlayerFeature
12from music_assistant_models.player import DeviceInfo, PlayerMedia
13from snapcast.control.client import Snapclient
14from snapcast.control.group import Snapgroup
15from snapcast.control.stream import Snapstream
16
17from music_assistant.constants import ATTR_ANNOUNCEMENT_IN_PROGRESS
18from music_assistant.helpers.audio import get_player_filter_params
19from music_assistant.helpers.compare import create_safe_string
20from music_assistant.helpers.ffmpeg import FFMpeg
21from music_assistant.models.player import Player
22from music_assistant.providers.snapcast.constants import (
23 CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
24 DEFAULT_SNAPCAST_FORMAT,
25 MASS_ANNOUNCEMENT_POSTFIX,
26 MASS_STREAM_PREFIX,
27 SnapCastStreamType,
28)
29
30if TYPE_CHECKING:
31 from music_assistant.providers.snapcast.provider import SnapCastProvider
32
33
34class SnapCastPlayer(Player):
35 """SnapCastPlayer."""
36
37 def __init__(
38 self,
39 provider: "SnapCastProvider",
40 player_id: str,
41 snap_client: Snapclient,
42 snap_client_id: str,
43 ) -> None:
44 """Init."""
45 self.provider: SnapCastProvider # type: ignore[misc]
46 self.snap_client = snap_client
47 self.snap_client_id = snap_client_id
48 super().__init__(provider, player_id)
49 self._stream_task: asyncio.Task[None] | None = None
50
51 @property
52 def requires_flow_mode(self) -> bool:
53 """Return if the player requires flow mode."""
54 return True
55
56 @property
57 def synced_to(self) -> str | None:
58 """
59 Return the id of the player this player is synced to (sync leader).
60
61 If this player is not synced to another player (or is the sync leader itself),
62 this should return None.
63 If it is part of a (permanent) group, this should also return None.
64 """
65 snap_group = self._get_snapgroup()
66 assert snap_group is not None # for type checking
67 master_id: str = self.provider._get_ma_id(snap_group.clients[0])
68 if len(snap_group.clients) < 2 or self.player_id == master_id:
69 return None
70 return master_id
71
72 def setup(self) -> None:
73 """Set up player."""
74 self._attr_name = self.snap_client.friendly_name
75 self._attr_available = self.snap_client.connected
76 self._attr_device_info = DeviceInfo(
77 model=self.snap_client._client.get("host").get("os"),
78 manufacturer=self.snap_client._client.get("host").get("arch"),
79 )
80 self._attr_device_info.ip_address = self.snap_client._client.get("host").get("ip")
81 self._attr_supported_features = {
82 PlayerFeature.SET_MEMBERS,
83 PlayerFeature.VOLUME_SET,
84 PlayerFeature.VOLUME_MUTE,
85 PlayerFeature.PLAY_ANNOUNCEMENT,
86 }
87 self._attr_can_group_with = {self.provider.instance_id}
88
89 async def volume_set(self, volume_level: int) -> None:
90 """Send VOLUME_SET command to given player."""
91 await self.snap_client.set_volume(volume_level)
92
93 async def stop(self) -> None:
94 """Send STOP command to given player."""
95 # update the state first to avoid race conditions, if an active play_announcement
96 # finishes the player.state should be IDLE.
97 self._attr_playback_state = PlaybackState.IDLE
98 self._attr_current_media = None
99 self._set_childs_state()
100
101 self.update_state()
102
103 # we change the active stream only if music was playing
104 if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
105 snapgroup = self._get_snapgroup()
106 assert snapgroup is not None # for type checking
107 await snapgroup.set_stream("default")
108
109 # but we always delete the music stream (whether it was active or not)
110 await self._delete_stream(self._get_stream_name(SnapCastStreamType.MUSIC))
111
112 if self._stream_task is not None:
113 if not self._stream_task.done():
114 self._stream_task.cancel()
115 with suppress(asyncio.CancelledError):
116 await self._stream_task
117 self._stream_task = None
118
119 async def volume_mute(self, muted: bool) -> None:
120 """Send MUTE command to given player."""
121 # Using optimistic value because the library does not return the response from the api
122 await self.snap_client.set_muted(muted)
123 self._attr_volume_muted = muted
124 self.update_state()
125
126 async def set_members(
127 self,
128 player_ids_to_add: list[str] | None = None,
129 player_ids_to_remove: list[str] | None = None,
130 ) -> None:
131 """Handle SET_MEMBERS command on the player."""
132 group = self._get_snapgroup()
133 assert group is not None # for type checking
134 # handle client additions
135 for player_id in player_ids_to_add or []:
136 snapcast_id = self.provider._get_snapclient_id(player_id)
137 if snapcast_id not in group.clients:
138 await group.add_client(snapcast_id)
139 if player_id not in self._attr_group_members:
140 self._attr_group_members.append(player_id)
141 # handle client removals
142 for player_id in player_ids_to_remove or []:
143 snapcast_id = self.provider._get_snapclient_id(player_id)
144 if snapcast_id in group.clients:
145 await group.remove_client(snapcast_id)
146 if player_id in self._attr_group_members:
147 self._attr_group_members.remove(player_id)
148 # Set default stream and stop ungrouped players
149 removed_snapclient = self.provider._snapserver.client(snapcast_id)
150 await removed_snapclient.group.set_stream("default")
151 if removed_player := self.mass.players.get(player_id):
152 await removed_player.stop()
153 self.update_state()
154
155 async def play_media(self, media: PlayerMedia) -> None:
156 """Handle PLAY MEDIA on given player."""
157 if self.synced_to:
158 msg = "A synced player cannot receive play commands directly"
159 raise RuntimeError(msg)
160
161 # stop any existing streamtasks first
162 if self._stream_task is not None:
163 if not self._stream_task.done():
164 self._stream_task.cancel()
165 with suppress(asyncio.CancelledError):
166 await self._stream_task
167 self._stream_task = None
168
169 # get stream or create new one
170 stream_name = self._get_stream_name(SnapCastStreamType.MUSIC)
171 stream = await self._get_or_create_stream(stream_name, media.source_id or self.player_id)
172
173 # if no announcement is playing we activate the stream now, otherwise it
174 # will be activated by play_announcement when the announcement is over.
175 if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
176 snap_group = self._get_snapgroup()
177 assert snap_group is not None # for type checking
178 await snap_group.set_stream(stream.identifier)
179
180 self._attr_current_media = media
181
182 # select audio source
183 audio_source = self.mass.streams.get_stream(media, DEFAULT_SNAPCAST_FORMAT)
184
185 async def _streamer() -> None:
186 stream_path = self._get_stream_path(stream)
187 self.logger.debug("Start streaming to %s", stream_path)
188 async with FFMpeg(
189 audio_input=audio_source,
190 input_format=DEFAULT_SNAPCAST_FORMAT,
191 output_format=DEFAULT_SNAPCAST_FORMAT,
192 filter_params=get_player_filter_params(
193 self.mass, self.player_id, DEFAULT_SNAPCAST_FORMAT, DEFAULT_SNAPCAST_FORMAT
194 ),
195 audio_output=stream_path,
196 extra_input_args=["-y", "-re"],
197 ) as ffmpeg_proc:
198 self._attr_playback_state = PlaybackState.PLAYING
199 self._attr_current_media = media
200 self._attr_elapsed_time = 0
201 self._attr_elapsed_time_last_updated = time.time()
202 self.update_state()
203
204 self._set_childs_state()
205 await ffmpeg_proc.wait()
206
207 self.logger.debug("Finished streaming to %s", stream_path)
208 # we need to wait a bit for the stream status to become idle
209 # to ensure that all snapclients have consumed the audio
210 while stream.status != "idle":
211 await asyncio.sleep(0.25)
212 self._attr_playback_state = PlaybackState.IDLE
213 self._attr_elapsed_time = time.time() - self._attr_elapsed_time_last_updated
214 self.update_state()
215 self._set_childs_state()
216
217 # start streaming the queue (pcm) audio in a background task
218 self._stream_task = self.mass.create_task(_streamer())
219
220 async def play_announcement(
221 self, announcement: PlayerMedia, volume_level: int | None = None
222 ) -> None:
223 """Handle (provider native) playback of an announcement on given player."""
224 # get stream or create new one
225 stream_name = self._get_stream_name(SnapCastStreamType.ANNOUNCEMENT)
226 stream = await self._get_or_create_stream(stream_name, None)
227
228 # always activate the stream (announcements have priority over music)
229 snap_group = self._get_snapgroup()
230 assert snap_group is not None # for type checking
231 await snap_group.set_stream(stream.identifier)
232
233 # Unfortunately snapcast sets a volume per client (not per stream), so we need a way to
234 # set the announcement volume without affecting the music volume.
235 # We go for the simplest solution: save the previous volume, change it, restore later
236 # (with the downside that the change will be visible in the UI)
237 orig_volume_level = self.volume_level # Note: might be None
238
239 if volume_level is not None:
240 await self.volume_set(volume_level)
241
242 input_format = DEFAULT_SNAPCAST_FORMAT
243 assert announcement.custom_data is not None # for type checking
244 audio_source = self.mass.streams.get_announcement_stream(
245 announcement.custom_data["announcement_url"],
246 output_format=DEFAULT_SNAPCAST_FORMAT,
247 pre_announce=announcement.custom_data["pre_announce"],
248 pre_announce_url=announcement.custom_data["pre_announce_url"],
249 )
250
251 # stream the audio, wait for it to finish (play_announcement should return after the
252 # announcement is over to avoid simultaneous announcements).
253 stream_path = self._get_stream_path(stream)
254 self.logger.debug("Start announcement streaming to %s", stream_path)
255 async with FFMpeg(
256 audio_input=audio_source,
257 input_format=input_format,
258 output_format=DEFAULT_SNAPCAST_FORMAT,
259 filter_params=get_player_filter_params(
260 self.mass, self.player_id, input_format, DEFAULT_SNAPCAST_FORMAT
261 ),
262 audio_output=stream_path,
263 extra_input_args=["-y", "-re"],
264 ) as ffmpeg_proc:
265 await ffmpeg_proc.wait()
266
267 self.logger.debug("Finished announcement streaming to %s", stream_path)
268 # we need to wait a bit for the stream status to become idle
269 # to ensure that all snapclients have consumed the audio
270 while stream.status != "idle":
271 await asyncio.sleep(0.25)
272
273 # delete the announcement stream
274 await self._delete_stream(stream_name)
275
276 # restore volume, if we changed it above and it's still the same we set
277 # (the user did not change it himself while the announcement was playing)
278 if self.volume_level == volume_level and orig_volume_level is not None:
279 await self.volume_set(orig_volume_level)
280
281 # and restore the group to either the default or the music stream
282 if self.playback_state == PlaybackState.IDLE:
283 new_stream_name = "default"
284 else:
285 new_stream_name = self._get_stream_name(SnapCastStreamType.MUSIC)
286 group = self._get_snapgroup()
287 assert group is not None # for type checking
288 await group.set_stream(new_stream_name)
289
290 async def get_config_entries(
291 self,
292 action: str | None = None,
293 values: dict[str, ConfigValueType] | None = None,
294 ) -> list[ConfigEntry]:
295 """Player config."""
296 return [
297 CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
298 ]
299
300 def _handle_player_update(self, snap_client: Snapclient) -> None:
301 """Process Snapcast update to Player controller.
302
303 This is a callback function
304 """
305 self._attr_name = self.snap_client.friendly_name
306 self._attr_volume_level = self.snap_client.volume
307 self._attr_volume_muted = self.snap_client.muted
308 self._attr_available = self.snap_client.connected
309
310 # Note: when the active stream is a MASS stream the active_source is __not__ updated at all.
311 # So it doesn't matter whether a MASS stream is for music or announcements.
312 if stream := self._get_active_snapstream():
313 if stream.identifier == "default":
314 self._attr_active_source = None
315 elif not stream.identifier.startswith(MASS_STREAM_PREFIX):
316 # unknown source
317 self._attr_active_source = stream.identifier
318 else:
319 self._attr_active_source = None
320
321 self._group_childs()
322
323 self.update_state()
324
325 def _get_stream_name(self, stream_type: SnapCastStreamType) -> str:
326 """Return the name of the stream for the given player.
327
328 Each player can have up to two concurrent streams, for music and announcements.
329
330 The stream name depends only on player_id (not queue_id) for two reasones:
331 1. Avoid issues when the same queue_id is simultaneously used by two players
332 (eg in universal groups).
333 2. Easily identify which stream belongs to which player, for instance to be able to
334 delete a music stream even when it is not active due to an announcement.
335 """
336 safe_name = create_safe_string(self.player_id, replace_space=True)
337 stream_name = f"{MASS_STREAM_PREFIX}{safe_name}"
338 if stream_type == SnapCastStreamType.ANNOUNCEMENT:
339 stream_name += MASS_ANNOUNCEMENT_POSTFIX
340 return stream_name
341
342 async def _get_or_create_stream(self, stream_name: str, queue_id: str | None) -> Snapstream:
343 """Create new stream on snapcast server (or return existing one)."""
344 # prefer to reuse existing stream if possible
345 if stream := self._get_snapstream(stream_name):
346 return stream
347 # The control script is used only for music streams in the builtin server
348 # (queue_id is None only for announcement streams).
349 extra_args = ""
350 if (
351 self.provider._use_builtin_server
352 and queue_id
353 and self.provider._controlscript_available
354 ):
355 # Create socket server for control script communication
356 socket_path = await self.provider.get_or_create_socket_server(queue_id)
357 extra_args = (
358 f"&controlscript={urllib.parse.quote_plus('control.py')}"
359 f"&controlscriptparams=--queueid={urllib.parse.quote_plus(queue_id)}%20"
360 f"--socket={urllib.parse.quote_plus(socket_path)}%20"
361 f"--streamserver-ip={self.mass.streams.publish_ip}%20"
362 f"--streamserver-port={self.mass.streams.publish_port}"
363 )
364
365 attempts = 50
366 while attempts:
367 attempts -= 1
368 # pick a random port
369 port = random.randint(4953, 4953 + 200)
370 result = await self.provider._snapserver.stream_add_stream(
371 # NOTE: setting the sampleformat to something else
372 # (like 24 bits bit depth) does not seem to work at all!
373 f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
374 f"&idle_threshold={self.provider._snapcast_stream_idle_threshold}"
375 f"{extra_args}&name={stream_name}"
376 )
377 if "id" not in result:
378 # if the port is already taken, the result will be an error
379 self.logger.warning(result)
380 continue
381 return self.provider._snapserver.stream(result["id"])
382 msg = "Unable to create stream - No free port found?"
383 raise RuntimeError(msg)
384
385 def _get_snapstream(self, stream_name: str) -> Snapstream | None:
386 """Get a stream by name."""
387 with suppress(KeyError):
388 return self.provider._snapserver.stream(stream_name)
389 return None
390
391 def _get_stream_path(self, stream: Snapstream) -> str:
392 stream_path = stream.path or f"tcp://{stream._stream['uri']['host']}"
393 return stream_path.replace("0.0.0.0", self.provider._snapcast_server_host)
394
395 async def _delete_stream(self, stream_name: str) -> None:
396 if stream := self._get_snapstream(stream_name):
397 with suppress(TypeError, KeyError, AttributeError):
398 await self.provider._snapserver.stream_remove_stream(stream.identifier)
399
400 def _get_snapgroup(self) -> Snapgroup | None:
401 """Get snapcast group for given player_id."""
402 return cast("Snapgroup | None", self.snap_client.group)
403
404 def _set_childs_state(self) -> None:
405 """Set the state of the child`s of the player."""
406 for child_player_id in self.group_members:
407 if child_player_id == self.player_id:
408 continue
409 if mass_child_player := self.mass.players.get(child_player_id):
410 mass_child_player._attr_playback_state = self.playback_state
411 mass_child_player.update_state()
412
413 def _get_active_snapstream(self) -> Snapstream | None:
414 """Get active stream for given player_id."""
415 if group := self._get_snapgroup():
416 return self._get_snapstream(group.stream)
417 return None
418
419 def _group_childs(self) -> None:
420 """Return player_ids of the players synced to this player."""
421 snap_group = self._get_snapgroup()
422 assert snap_group is not None # for type checking
423 self._attr_group_members.clear()
424 if self.synced_to is not None:
425 return
426 self._attr_group_members.append(self.player_id)
427 for snap_client_id in snap_group.clients:
428 if (
429 self.provider._get_ma_id(snap_client_id) != self.player_id
430 and self.provider._snapserver.client(snap_client_id).connected
431 ):
432 self._attr_group_members.append(self.provider._get_ma_id(snap_client_id))
433 self.update_state()
434