/
/
/
1"""
2Sonos Player provider for Music Assistant: SonosPlayer object/model.
3
4Note that large parts of this code are copied over from the Home Assistant
5integration for Sonos.
6"""
7
8from __future__ import annotations
9
10import asyncio
11import contextlib
12import logging
13import time
14from collections.abc import Callable, Coroutine
15from typing import TYPE_CHECKING, Any, cast
16
17from music_assistant_models.enums import IdentifierType, MediaType, PlaybackState, PlayerState
18from music_assistant_models.errors import PlayerCommandFailed
19from soco import SoCoException
20from soco.core import MUSIC_SRC_RADIO, SoCo
21from soco.data_structures import DidlAudioBroadcast
22
23from music_assistant.constants import VERBOSE_LOG_LEVEL, create_sample_rates_config_entry
24from music_assistant.helpers.upnp import create_didl_metadata
25from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
26
27from .constants import (
28 DURATION_SECONDS,
29 LINEIN_SOURCE_IDS,
30 LINEIN_SOURCES,
31 NEVER_TIME,
32 PLAYER_FEATURES,
33 PLAYER_SOURCE_MAP,
34 POSITION_SECONDS,
35 RESUB_COOLDOWN_SECONDS,
36 SONOS_STATE_TRANSITIONING,
37 SOURCE_MAPPING,
38 SUBSCRIPTION_SERVICES,
39 SUBSCRIPTION_TIMEOUT,
40)
41from .helpers import SonosUpdateError, soco_error
42
43if TYPE_CHECKING:
44 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
45 from soco.events_base import Event as SonosEvent
46 from soco.events_base import SubscriptionBase
47
48 from .provider import SonosPlayerProvider
49
50CALLBACK_TYPE = Callable[[], None]
51LOGGER = logging.getLogger(__name__)
52
53
54class SonosSubscriptionsFailed(PlayerCommandFailed):
55 """Subscription creation failed."""
56
57
58class SonosPlayer(Player):
59 """Sonos Player implementation for S1 speakers."""
60
61 def __init__(
62 self,
63 provider: SonosPlayerProvider,
64 soco: SoCo,
65 ) -> None:
66 """Initialize SonosPlayer instance."""
67 super().__init__(provider, soco.uid)
68 self.soco = soco
69 self.household_id: str = soco.household_id
70 self.subscriptions: list[SubscriptionBase] = []
71
72 # Set player attributes
73 self._attr_supported_features = set(PLAYER_FEATURES)
74 self._attr_name = soco.player_name
75 self._attr_device_info = DeviceInfo(
76 model=soco.speaker_info["model_name"],
77 manufacturer="Sonos",
78 )
79 self._attr_device_info.add_identifier(IdentifierType.IP_ADDRESS, soco.ip_address)
80 self._attr_device_info.add_identifier(IdentifierType.UUID, soco.uid)
81 mac_address = self._extract_mac_from_player_id()
82 if mac_address:
83 self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac_address)
84 self._attr_needs_poll = True
85 self._attr_poll_interval = 5
86 self._attr_available = True
87 self._attr_can_group_with = {provider.instance_id}
88
89 # Subscriptions and events
90 self._subscriptions: list[SubscriptionBase] = []
91 self._subscription_lock: asyncio.Lock | None = None
92 self._last_activity: float = NEVER_TIME
93 self._resub_cooldown_expires_at: float | None = None
94
95 @property
96 def missing_subscriptions(self) -> set[str]:
97 """Return a list of missing service subscriptions."""
98 subscribed_services = {sub.service.service_type for sub in self._subscriptions}
99 return SUBSCRIPTION_SERVICES - subscribed_services
100
101 def _extract_mac_from_player_id(self) -> str | None:
102 """Extract MAC address from Sonos player_id.
103
104 Sonos player_ids follow the format RINCON_XXXXXXXXXXXX01400 where
105 the middle 12 hex characters represent the MAC address.
106
107 :return: MAC address string in XX:XX:XX:XX:XX:XX format, or None if not extractable.
108 """
109 # Remove RINCON_ prefix if present
110 player_id = self.player_id
111 player_id = player_id.removeprefix("RINCON_")
112
113 # Remove the 01400 suffix (or similar) - should be last 5 chars
114 if len(player_id) >= 17: # 12 hex chars for MAC + 5 chars suffix
115 mac_hex = player_id[:12]
116 else:
117 return None
118
119 # Validate it looks like a MAC (all hex characters)
120 try:
121 int(mac_hex, 16)
122 except ValueError:
123 return None
124
125 # Format as XX:XX:XX:XX:XX:XX
126 return ":".join(mac_hex[i : i + 2].upper() for i in range(0, 12, 2))
127
128 async def setup(self) -> None:
129 """Set up the player."""
130 self._attr_volume_level = self.soco.volume
131 self._attr_volume_muted = self.soco.mute
132 self.update_groups()
133 if not self.synced_to:
134 self.poll_media()
135 await self.subscribe()
136 await self.mass.players.register_or_update(self)
137
138 async def offline(self) -> None:
139 """Handle removal of speaker when unavailable."""
140 if not self._attr_available:
141 return
142
143 if self._resub_cooldown_expires_at is None and not self.mass.closing:
144 self._resub_cooldown_expires_at = time.monotonic() + RESUB_COOLDOWN_SECONDS
145 self.logger.debug("Starting resubscription cooldown for %s", self.display_name)
146
147 self._attr_available = False
148 self._share_link_plugin = None
149
150 self.update_state()
151 await self.unsubscribe()
152
153 async def get_config_entries(
154 self,
155 action: str | None = None,
156 values: dict[str, ConfigValueType] | None = None,
157 ) -> list[ConfigEntry]:
158 """Return all (provider/player specific) Config Entries for the player."""
159 return [
160 create_sample_rates_config_entry(
161 supported_sample_rates=[44100, 48000],
162 supported_bit_depths=[16],
163 hidden=True,
164 ),
165 ]
166
167 async def stop(self) -> None:
168 """Send STOP command to the player."""
169 if self.synced_to:
170 self.logger.debug(
171 "Ignore STOP command for %s: Player is synced to another player.",
172 self.player_id,
173 )
174 return
175 if self._attr_active_source in LINEIN_SOURCE_IDS:
176 # Play an invalid URI to force stop line-in sources
177 with contextlib.suppress(SoCoException):
178 await asyncio.to_thread(self.soco.play_uri, "")
179 else:
180 await asyncio.to_thread(self.soco.stop)
181 self.mass.call_later(2, self.poll)
182 self.update_state()
183
184 async def play(self) -> None:
185 """Send PLAY command to the player."""
186 if self.synced_to:
187 self.logger.debug(
188 "Ignore PLAY command for %s: Player is synced to another player.",
189 self.player_id,
190 )
191 return
192 await asyncio.to_thread(self.soco.play)
193 self.mass.call_later(2, self.poll)
194
195 async def pause(self) -> None:
196 """Send PAUSE command to the player."""
197 if self.synced_to:
198 self.logger.debug(
199 "Ignore PAUSE command for %s: Player is synced to another player.",
200 self.player_id,
201 )
202 return
203 if "Pause" not in self.soco.available_actions:
204 # pause not possible
205 await self.stop()
206 return
207 await asyncio.to_thread(self.soco.pause)
208 self.mass.call_later(2, self.poll)
209
210 async def volume_set(self, volume_level: int) -> None:
211 """Send VOLUME_SET command to the player."""
212
213 def set_volume_level(volume_level: int) -> None:
214 self.soco.volume = volume_level
215
216 await asyncio.to_thread(set_volume_level, volume_level)
217 self.mass.call_later(2, self.poll)
218
219 async def volume_mute(self, muted: bool) -> None:
220 """Send VOLUME MUTE command to the player."""
221
222 def set_volume_mute(muted: bool) -> None:
223 self.soco.mute = muted
224
225 await asyncio.to_thread(set_volume_mute, muted)
226 self.mass.call_later(2, self.poll)
227
228 async def play_media(self, media: PlayerMedia) -> None:
229 """Handle PLAY MEDIA on the player."""
230 if self.synced_to:
231 # this should be already handled by the player manager, but just in case...
232 msg = (
233 f"Player {self.display_name} can not "
234 "accept play_media command, it is synced to another player."
235 )
236 raise PlayerCommandFailed(msg)
237
238 if not media.duration:
239 # Sonos really does not like FLAC streams without duration
240 media.uri = media.uri.replace(".flac", ".mp3")
241
242 didl_metadata = create_didl_metadata(media)
243 is_announcement = media.media_type == MediaType.ANNOUNCEMENT
244 force_radio = False if is_announcement else not media.duration
245
246 stream_url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
247 await asyncio.to_thread(
248 self.soco.play_uri, stream_url, meta=didl_metadata, force_radio=force_radio
249 )
250 self.mass.call_later(2, self.poll)
251
252 async def enqueue_next_media(self, media: PlayerMedia) -> None:
253 """Handle enqueuing next media item."""
254 if self.synced_to:
255 # this should be already handled by the player manager, but just in case...
256 msg = (
257 f"Player {self.display_name} can not "
258 "accept enqueue command, it is synced to another player."
259 )
260 raise PlayerCommandFailed(msg)
261
262 didl_metadata = create_didl_metadata(media)
263
264 def add_to_queue() -> None:
265 self.soco.avTransport.SetNextAVTransportURI(
266 [
267 ("InstanceID", 0),
268 ("NextURI", media.uri),
269 ("NextURIMetaData", didl_metadata),
270 ]
271 )
272
273 await asyncio.to_thread(add_to_queue)
274 self.mass.call_later(2, self.poll)
275
276 @soco_error()
277 async def set_members(
278 self,
279 player_ids_to_add: list[str] | None = None,
280 player_ids_to_remove: list[str] | None = None,
281 ) -> None:
282 """Handle SET_MEMBERS command on the player."""
283 if self.synced_to:
284 # this should not happen, but guard anyways
285 raise RuntimeError("Player is synced, cannot set members")
286 if not player_ids_to_add and not player_ids_to_remove:
287 return
288 player_ids_to_add = player_ids_to_add or []
289 player_ids_to_remove = player_ids_to_remove or []
290
291 if player_ids_to_remove:
292 for player_id in player_ids_to_remove:
293 if player_to_remove := cast("SonosPlayer", self.mass.players.get_player(player_id)):
294 await asyncio.to_thread(player_to_remove.soco.unjoin)
295 self.mass.call_later(2, player_to_remove.poll)
296
297 if player_ids_to_add:
298 for player_id in player_ids_to_add:
299 if player_to_add := cast("SonosPlayer", self.mass.players.get_player(player_id)):
300 await asyncio.to_thread(player_to_add.soco.join, self.soco)
301 self.mass.call_later(2, player_to_add.poll)
302
303 async def poll(self) -> None:
304 """Poll player for state updates."""
305
306 def _poll() -> None:
307 """Poll the speaker for updates (NOT async friendly)."""
308 self.update_groups()
309 self.poll_media()
310 self._attr_volume_level = self.soco.volume
311 self._attr_volume_muted = self.soco.mute
312
313 await self._check_availability()
314 if self._attr_available:
315 await asyncio.to_thread(_poll)
316
317 @soco_error()
318 def poll_media(self) -> None:
319 """Poll information about currently playing media."""
320 transport_info = self.soco.get_current_transport_info()
321 new_status = transport_info["current_transport_state"]
322
323 if new_status == SONOS_STATE_TRANSITIONING:
324 return
325
326 new_status = _convert_state(new_status)
327 update_position = new_status != self._attr_playback_state
328 self._attr_playback_state = new_status
329 self._set_basic_track_info(update_position=update_position)
330 self.update_player()
331
332 def update_ip(self, ip_address: str) -> None:
333 """Handle updated IP of a Sonos player (NOT async friendly)."""
334 if self._attr_available:
335 return
336 self.logger.debug(
337 "Player IP-address changed from %s to %s", self.soco.ip_address, ip_address
338 )
339 try:
340 self.ping()
341 except SonosUpdateError:
342 return
343 self.soco.ip_address = ip_address
344 asyncio.run_coroutine_threadsafe(self.setup(), self.mass.loop)
345 self._attr_device_info = DeviceInfo(
346 model=self._attr_device_info.model,
347 manufacturer=self._attr_device_info.manufacturer,
348 )
349 self._attr_device_info.add_identifier(IdentifierType.IP_ADDRESS, ip_address)
350 self._attr_device_info.add_identifier(IdentifierType.UUID, self.soco.uid)
351 mac_address = self._extract_mac_from_player_id()
352 if mac_address:
353 self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac_address)
354 self.update_player()
355
356 async def _check_availability(self) -> None:
357 """Check if the player is still available."""
358 try:
359 await asyncio.to_thread(self.ping)
360 self._speaker_activity("ping")
361 except SonosUpdateError:
362 if not self._attr_available:
363 return
364 self.logger.warning(
365 "No recent activity and cannot reach %s, marking unavailable",
366 self.display_name,
367 )
368 await self.offline()
369
370 @soco_error()
371 def ping(self) -> None:
372 """Test device availability. Failure will raise SonosUpdateError."""
373 self.soco.renderingControl.GetVolume([("InstanceID", 0), ("Channel", "Master")], timeout=1)
374
375 @soco_error()
376 def _poll_track_info(self) -> dict[str, Any]:
377 """Poll the speaker for current track info.
378
379 Add converted position values (NOT async fiendly).
380 """
381 track_info: dict[str, Any] = self.soco.get_current_track_info()
382 track_info[DURATION_SECONDS] = _timespan_secs(track_info.get("duration"))
383 track_info[POSITION_SECONDS] = _timespan_secs(track_info.get("position"))
384 return track_info
385
386 def update_player(self, signal_update: bool = True) -> None:
387 """Update Sonos Player."""
388 self._update_attributes()
389 if signal_update:
390 # send update to the player manager right away only if we are triggered from an event
391 # when we're just updating from a manual poll, the player manager
392 # will detect changes to the player object itself
393 self.mass.loop.call_soon_threadsafe(self.update_state)
394
395 async def _subscribe_target(
396 self, target: SubscriptionBase, sub_callback: Callable[[SonosEvent], None]
397 ) -> None:
398 """Create a Sonos subscription for given target."""
399
400 def on_renew_failed(exception: Exception) -> None:
401 """Handle a failed subscription renewal callback."""
402 self.mass.create_task(self._renew_failed(exception))
403
404 # Use events_asyncio which makes subscribe() async-awaitable
405 subscription = await target.subscribe(
406 auto_renew=True, requested_timeout=SUBSCRIPTION_TIMEOUT
407 )
408 subscription.callback = sub_callback
409 subscription.auto_renew_fail = on_renew_failed
410 self._subscriptions.append(subscription)
411
412 async def _renew_failed(self, exception: Exception) -> None:
413 """Mark the speaker as offline after a subscription renewal failure.
414
415 This is to reset the state to allow a future clean subscription attempt.
416 """
417 if not self._attr_available:
418 return
419
420 self.log_subscription_result(exception, "Subscription renewal", logging.WARNING)
421 await self.offline()
422
423 def log_subscription_result(self, result: Any, event: str, level: int = logging.DEBUG) -> None:
424 """Log a message if a subscription action (create/renew/stop) results in an exception."""
425 if not isinstance(result, Exception):
426 return
427
428 if isinstance(result, asyncio.exceptions.TimeoutError):
429 message = "Request timed out"
430 exc_info = None
431 else:
432 message = str(result)
433 exc_info = result if not str(result) else None
434
435 self.logger.log(
436 level,
437 "%s failed for %s: %s",
438 event,
439 self.display_name,
440 message,
441 exc_info=exc_info if self.logger.isEnabledFor(10) else None,
442 )
443
444 async def subscribe(self) -> None:
445 """Initiate event subscriptions under an async lock."""
446 if not self._subscription_lock:
447 self._subscription_lock = asyncio.Lock()
448
449 async with self._subscription_lock:
450 try:
451 # Create event subscriptions.
452 subscriptions = [
453 self._subscribe_target(getattr(self.soco, service), self._handle_event)
454 for service in self.missing_subscriptions
455 ]
456 if not subscriptions:
457 return
458 self.logger.log(
459 VERBOSE_LOG_LEVEL, "Creating subscriptions for %s", self.display_name
460 )
461 results = await asyncio.gather(*subscriptions, return_exceptions=True)
462 for result in results:
463 self.log_subscription_result(result, "Creating subscription", logging.WARNING)
464 if any(isinstance(result, Exception) for result in results):
465 raise SonosSubscriptionsFailed
466 except SonosSubscriptionsFailed:
467 self.logger.warning("Creating subscriptions failed for %s", self.display_name)
468 assert self._subscription_lock is not None
469 async with self._subscription_lock:
470 await self.offline()
471
472 async def unsubscribe(self) -> None:
473 """Cancel all subscriptions."""
474 if not self._subscriptions:
475 return
476 self.logger.log(VERBOSE_LOG_LEVEL, "Unsubscribing from events for %s", self.display_name)
477 results = await asyncio.gather(
478 *(subscription.unsubscribe() for subscription in self._subscriptions),
479 return_exceptions=True,
480 )
481 for result in results:
482 self.log_subscription_result(result, "Unsubscribe")
483 self._subscriptions = []
484
485 def _handle_event(self, event: SonosEvent) -> None:
486 """Handle SonosEvent callback."""
487 service_type: str = event.service.service_type
488 self._speaker_activity(f"{service_type} subscription")
489 if service_type == "DeviceProperties":
490 self.update_player()
491 return
492 if service_type == "AVTransport":
493 self._handle_avtransport_event(event)
494 return
495 if service_type == "RenderingControl":
496 self._handle_rendering_control_event(event)
497 return
498 if service_type == "ZoneGroupTopology":
499 self._handle_zone_group_topology_event(event)
500 return
501
502 def _handle_avtransport_event(self, event: SonosEvent) -> None:
503 """Update information about currently playing media from an event."""
504 # NOTE: The new coordinator can be provided in a media update event but
505 # before the ZoneGroupState updates. If this happens the playback
506 # state will be incorrect and should be ignored. Switching to the
507 # new coordinator will use its media. The regrouping process will
508 # be completed during the next ZoneGroupState update.
509
510 # Missing transport_state indicates a transient error
511 if (new_status := event.variables.get("transport_state")) is None:
512 return
513
514 # Ignore transitions, we should get the target state soon
515 if new_status == SONOS_STATE_TRANSITIONING:
516 return
517
518 evars = event.variables
519 new_status = _convert_state(evars["transport_state"])
520 state_changed = new_status != self._attr_playback_state
521
522 self._attr_playback_state = new_status
523
524 track_uri = evars["enqueued_transport_uri"] or evars["current_track_uri"]
525 audio_source = self.soco.music_source_from_uri(track_uri)
526
527 self._set_basic_track_info(update_position=state_changed)
528 ct_md = evars["current_track_meta_data"]
529
530 et_uri_md = evars["enqueued_transport_uri_meta_data"]
531
532 channel = ""
533 if audio_source == MUSIC_SRC_RADIO:
534 if et_uri_md:
535 channel = et_uri_md.title
536
537 # Extra guards for S1 compatibility
538 if ct_md and hasattr(ct_md, "radio_show") and ct_md.radio_show:
539 radio_show = ct_md.radio_show.split(",")[0]
540 channel = " • ".join(filter(None, [channel, radio_show]))
541
542 if isinstance(et_uri_md, DidlAudioBroadcast) and self._attr_current_media:
543 self._attr_current_media.title = self._attr_current_media.title or channel
544
545 self.update_player()
546
547 def _handle_rendering_control_event(self, event: SonosEvent) -> None:
548 """Update information about currently volume settings."""
549 variables = event.variables
550
551 if "volume" in variables:
552 volume = variables["volume"]
553 self._attr_volume_level = int(volume["Master"])
554
555 if mute := variables.get("mute"):
556 self._attr_volume_muted = mute["Master"] == "1"
557
558 self.update_player()
559
560 def _handle_zone_group_topology_event(self, event: SonosEvent) -> None:
561 """Handle callback for topology change event."""
562 if "zone_player_uui_ds_in_group" not in event.variables:
563 return
564 asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(event), self.mass.loop)
565
566 def _update_attributes(self) -> None:
567 """Update attributes of the MA Player from SoCo state."""
568 if not self._attr_available:
569 self._attr_playback_state = PlayerState.IDLE
570 self._attr_group_members.clear()
571 return
572
573 def _set_basic_track_info(self, update_position: bool = False) -> None:
574 """Query the speaker to update media metadata and position info."""
575 try:
576 track_info = self._poll_track_info()
577 except SonosUpdateError as err:
578 self.logger.warning("Fetching track info failed: %s", err)
579 return
580 if not track_info["uri"]:
581 return
582 uri = track_info["uri"]
583
584 audio_source = self.soco.music_source_from_uri(uri)
585 if (source_id := SOURCE_MAPPING.get(audio_source)) and audio_source in LINEIN_SOURCES:
586 self._attr_elapsed_time = None
587 self._attr_elapsed_time_last_updated = None
588 self._attr_active_source = source_id
589 self._attr_current_media = None
590 if source_id not in [x.id for x in self._attr_source_list]:
591 self._attr_source_list.append(PLAYER_SOURCE_MAP[source_id])
592 return
593
594 current_media = PlayerMedia(
595 uri=uri,
596 artist=track_info.get("artist"),
597 album=track_info.get("album"),
598 title=track_info.get("title"),
599 image_url=track_info.get("album_art"),
600 )
601 self._attr_current_media = current_media
602 self._attr_active_source = None
603 self._update_media_position(track_info, force_update=update_position)
604
605 def _update_media_position(
606 self, position_info: dict[str, int], force_update: bool = False
607 ) -> None:
608 """Update state when playing music tracks."""
609 duration = position_info.get(DURATION_SECONDS)
610 current_position = position_info.get(POSITION_SECONDS)
611
612 if not (duration or current_position):
613 self._attr_elapsed_time = None
614 self._attr_elapsed_time_last_updated = None
615 return
616
617 should_update = force_update
618 if self._attr_current_media:
619 self._attr_current_media.duration = duration
620
621 # player started reporting position?
622 if current_position is not None and self._attr_elapsed_time is None:
623 should_update = True
624
625 # position jumped?
626 if current_position is not None and self._attr_elapsed_time is not None:
627 if self._attr_playback_state == PlaybackState.PLAYING:
628 assert self._attr_elapsed_time_last_updated is not None
629 time_diff = time.time() - self._attr_elapsed_time_last_updated
630 else:
631 time_diff = 0
632
633 calculated_position = self._attr_elapsed_time + time_diff
634
635 if abs(calculated_position - current_position) > 1.5:
636 should_update = True
637
638 if current_position is None:
639 self._attr_elapsed_time = None
640 self._attr_elapsed_time_last_updated = None
641 elif should_update:
642 self._attr_elapsed_time = current_position
643 self._attr_elapsed_time_last_updated = time.time()
644
645 def _speaker_activity(self, source: str) -> None:
646 """Track the last activity on this speaker, set availability and resubscribe."""
647 if self._resub_cooldown_expires_at:
648 if time.monotonic() < self._resub_cooldown_expires_at:
649 self.logger.debug(
650 "Activity on %s from %s while in cooldown, ignoring",
651 self.display_name,
652 source,
653 )
654 return
655 self._resub_cooldown_expires_at = None
656
657 self.logger.log(VERBOSE_LOG_LEVEL, "Activity on %s from %s", self.display_name, source)
658 self._last_activity = time.monotonic()
659 was_available = self._attr_available
660 self._attr_available = True
661 if not was_available:
662 self.update_player()
663 self.mass.loop.call_soon_threadsafe(self.mass.create_task, self.subscribe())
664
665 def update_groups(self) -> None:
666 """Update group topology when polling."""
667 asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(), self.mass.loop)
668
669 def create_update_groups_coro(
670 self, event: SonosEvent | None = None
671 ) -> Coroutine[Any, Any, None]:
672 """Handle callback for topology change event."""
673
674 def _get_soco_group() -> list[str]:
675 """Ask SoCo cache for existing topology."""
676 coordinator_uid = self.soco.uid
677 joined_uids = []
678 with contextlib.suppress(OSError, SoCoException):
679 if self.soco.group and self.soco.group.coordinator:
680 coordinator_uid = self.soco.group.coordinator.uid
681 joined_uids = [
682 p.uid
683 for p in self.soco.group.members
684 if p.uid != coordinator_uid and p.is_visible
685 ]
686
687 return [coordinator_uid, *joined_uids]
688
689 async def _extract_group(event: SonosEvent | None) -> list[str]:
690 """Extract group layout from a topology event."""
691 group = event and event.zone_player_uui_ds_in_group
692 if group:
693 assert isinstance(group, str)
694 return group.split(",")
695 return await asyncio.to_thread(_get_soco_group)
696
697 def _regroup(group: list[str]) -> None:
698 """Rebuild internal group layout (async safe)."""
699 if group == [self.soco.uid] and not self._attr_group_members:
700 # Skip updating existing single speakers in polling mode
701 return
702
703 group_members_ids = []
704
705 for uid in group:
706 speaker = self.mass.players.get_player(uid)
707 if speaker:
708 group_members_ids.append(uid)
709 else:
710 self.logger.debug(
711 "%s group member unavailable (%s), will try again",
712 self.display_name,
713 uid,
714 )
715 return
716
717 if self._attr_group_members == group_members_ids:
718 # Useful in polling mode for speakers with stereo pairs or surrounds
719 # as those "invisible" speakers will bypass the single speaker check
720 return
721
722 self._attr_group_members = group_members_ids
723 self.mass.loop.call_soon_threadsafe(self.update_state)
724
725 self.logger.debug("Regrouped %s: %s", self.display_name, self._attr_group_members)
726 self.update_player()
727
728 async def _handle_group_event(event: SonosEvent | None) -> None:
729 """Get async lock and handle event."""
730 _provider = cast("SonosPlayerProvider", self._provider)
731 async with _provider.topology_condition:
732 group = await _extract_group(event)
733 if self.soco.uid == group[0]:
734 _regroup(group)
735 _provider.topology_condition.notify_all()
736
737 return _handle_group_event(event)
738
739 async def wait_for_groups(self, groups: list[list[SonosPlayer]]) -> None:
740 """Wait until all groups are present, or timeout."""
741
742 def _test_groups(groups: list[list[SonosPlayer]]) -> bool:
743 """Return whether all groups exist now."""
744 for group in groups:
745 coordinator = group[0]
746
747 # Test that coordinator is coordinating
748 current_group = coordinator.group_members
749 if coordinator != current_group[0]:
750 return False
751
752 # Test that joined members match
753 if set(group[1:]) != set(current_group[1:]):
754 return False
755
756 return True
757
758 _provider = cast("SonosPlayerProvider", self._provider)
759 try:
760 async with asyncio.timeout(5):
761 while not _test_groups(groups):
762 await _provider.topology_condition.wait()
763 except TimeoutError:
764 self.logger.warning("Timeout waiting for target groups %s", groups)
765
766 if players := self.mass.players.all_players(provider_filter=_provider.instance_id):
767 any_speaker = cast("SonosPlayer", players[0])
768 any_speaker.soco.zone_group_state.clear_cache()
769
770
771def _convert_state(sonos_state: str | None) -> PlayerState:
772 """Convert Sonos state to PlayerState."""
773 if sonos_state == "PLAYING":
774 return PlayerState.PLAYING
775 if sonos_state == "TRANSITIONING":
776 return PlayerState.PLAYING
777 if sonos_state == "PAUSED_PLAYBACK":
778 return PlayerState.PAUSED
779 return PlayerState.IDLE
780
781
782def _timespan_secs(timespan: str | None) -> int | None:
783 """Parse a time-span into number of seconds."""
784 if timespan in ("", "NOT_IMPLEMENTED"):
785 return None
786 if timespan is None:
787 return None
788 return int(sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":")))))
789