/
/
/
1"""
2Sonos Player provider for Music Assistant: SonosPlayer object/model.
3
4Note that large parts of this code are copied over from the Home Assistant
5integration for Sonos.
6"""
7
8from __future__ import annotations
9
10import asyncio
11import contextlib
12import logging
13import time
14from collections.abc import Callable, Coroutine
15from typing import TYPE_CHECKING, Any, cast
16
17from music_assistant_models.enums import MediaType, PlaybackState, PlayerState, PlayerType
18from music_assistant_models.errors import PlayerCommandFailed
19from soco import SoCoException
20from soco.core import MUSIC_SRC_RADIO, SoCo
21from soco.data_structures import DidlAudioBroadcast
22
23from music_assistant.constants import VERBOSE_LOG_LEVEL, create_sample_rates_config_entry
24from music_assistant.helpers.upnp import create_didl_metadata
25from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
26
27from .constants import (
28 DURATION_SECONDS,
29 LINEIN_SOURCE_IDS,
30 LINEIN_SOURCES,
31 NEVER_TIME,
32 PLAYER_FEATURES,
33 PLAYER_SOURCE_MAP,
34 POSITION_SECONDS,
35 RESUB_COOLDOWN_SECONDS,
36 SONOS_STATE_TRANSITIONING,
37 SOURCE_MAPPING,
38 SUBSCRIPTION_SERVICES,
39 SUBSCRIPTION_TIMEOUT,
40)
41from .helpers import SonosUpdateError, soco_error
42
43if TYPE_CHECKING:
44 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
45 from soco.events_base import Event as SonosEvent
46 from soco.events_base import SubscriptionBase
47
48 from .provider import SonosPlayerProvider
49
50CALLBACK_TYPE = Callable[[], None]
51LOGGER = logging.getLogger(__name__)
52
53
54class SonosSubscriptionsFailed(PlayerCommandFailed):
55 """Subscription creation failed."""
56
57
58class SonosPlayer(Player):
59 """Sonos Player implementation for S1 speakers."""
60
61 def __init__(
62 self,
63 provider: SonosPlayerProvider,
64 soco: SoCo,
65 ) -> None:
66 """Initialize SonosPlayer instance."""
67 super().__init__(provider, soco.uid)
68 self.soco = soco
69 self.household_id: str = soco.household_id
70 self.subscriptions: list[SubscriptionBase] = []
71
72 # Set player attributes
73 self._attr_type = PlayerType.PLAYER
74 self._attr_supported_features = set(PLAYER_FEATURES)
75 self._attr_name = soco.player_name
76 self._attr_device_info = DeviceInfo(
77 model=soco.speaker_info["model_name"],
78 manufacturer="Sonos",
79 )
80 self._attr_device_info.ip_address = soco.ip_address
81 self._attr_needs_poll = True
82 self._attr_poll_interval = 5
83 self._attr_available = True
84 self._attr_can_group_with = {provider.instance_id}
85
86 # Subscriptions and events
87 self._subscriptions: list[SubscriptionBase] = []
88 self._subscription_lock: asyncio.Lock | None = None
89 self._last_activity: float = NEVER_TIME
90 self._resub_cooldown_expires_at: float | None = None
91
92 @property
93 def missing_subscriptions(self) -> set[str]:
94 """Return a list of missing service subscriptions."""
95 subscribed_services = {sub.service.service_type for sub in self._subscriptions}
96 return SUBSCRIPTION_SERVICES - subscribed_services
97
98 async def setup(self) -> None:
99 """Set up the player."""
100 self._attr_volume_level = self.soco.volume
101 self._attr_volume_muted = self.soco.mute
102 self.update_groups()
103 if not self.synced_to:
104 self.poll_media()
105 await self.subscribe()
106 await self.mass.players.register_or_update(self)
107
108 async def offline(self) -> None:
109 """Handle removal of speaker when unavailable."""
110 if not self._attr_available:
111 return
112
113 if self._resub_cooldown_expires_at is None and not self.mass.closing:
114 self._resub_cooldown_expires_at = time.monotonic() + RESUB_COOLDOWN_SECONDS
115 self.logger.debug("Starting resubscription cooldown for %s", self.display_name)
116
117 self._attr_available = False
118 self._share_link_plugin = None
119
120 self.update_state()
121 await self.unsubscribe()
122
123 async def get_config_entries(
124 self,
125 action: str | None = None,
126 values: dict[str, ConfigValueType] | None = None,
127 ) -> list[ConfigEntry]:
128 """Return all (provider/player specific) Config Entries for the player."""
129 return [
130 create_sample_rates_config_entry(
131 supported_sample_rates=[44100, 48000],
132 supported_bit_depths=[16],
133 hidden=True,
134 ),
135 ]
136
137 async def stop(self) -> None:
138 """Send STOP command to the player."""
139 if self.synced_to:
140 self.logger.debug(
141 "Ignore STOP command for %s: Player is synced to another player.",
142 self.player_id,
143 )
144 return
145 if self._attr_active_source in LINEIN_SOURCE_IDS:
146 # Play an invalid URI to force stop line-in sources
147 with contextlib.suppress(SoCoException):
148 await asyncio.to_thread(self.soco.play_uri, "")
149 else:
150 await asyncio.to_thread(self.soco.stop)
151 self.mass.call_later(2, self.poll)
152 self.update_state()
153
154 async def play(self) -> None:
155 """Send PLAY command to the player."""
156 if self.synced_to:
157 self.logger.debug(
158 "Ignore PLAY command for %s: Player is synced to another player.",
159 self.player_id,
160 )
161 return
162 await asyncio.to_thread(self.soco.play)
163 self.mass.call_later(2, self.poll)
164
165 async def pause(self) -> None:
166 """Send PAUSE command to the player."""
167 if self.synced_to:
168 self.logger.debug(
169 "Ignore PAUSE command for %s: Player is synced to another player.",
170 self.player_id,
171 )
172 return
173 if "Pause" not in self.soco.available_actions:
174 # pause not possible
175 await self.stop()
176 return
177 await asyncio.to_thread(self.soco.pause)
178 self.mass.call_later(2, self.poll)
179
180 async def volume_set(self, volume_level: int) -> None:
181 """Send VOLUME_SET command to the player."""
182
183 def set_volume_level(volume_level: int) -> None:
184 self.soco.volume = volume_level
185
186 await asyncio.to_thread(set_volume_level, volume_level)
187 self.mass.call_later(2, self.poll)
188
189 async def volume_mute(self, muted: bool) -> None:
190 """Send VOLUME MUTE command to the player."""
191
192 def set_volume_mute(muted: bool) -> None:
193 self.soco.mute = muted
194
195 await asyncio.to_thread(set_volume_mute, muted)
196 self.mass.call_later(2, self.poll)
197
198 async def play_media(self, media: PlayerMedia) -> None:
199 """Handle PLAY MEDIA on the player."""
200 if self.synced_to:
201 # this should be already handled by the player manager, but just in case...
202 msg = (
203 f"Player {self.display_name} can not "
204 "accept play_media command, it is synced to another player."
205 )
206 raise PlayerCommandFailed(msg)
207
208 if not media.duration:
209 # Sonos really does not like FLAC streams without duration
210 media.uri = media.uri.replace(".flac", ".mp3")
211
212 didl_metadata = create_didl_metadata(media)
213 is_announcement = media.media_type == MediaType.ANNOUNCEMENT
214 force_radio = False if is_announcement else not media.duration
215
216 await asyncio.to_thread(
217 self.soco.play_uri, media.uri, meta=didl_metadata, force_radio=force_radio
218 )
219 self.mass.call_later(2, self.poll)
220
221 async def enqueue_next_media(self, media: PlayerMedia) -> None:
222 """Handle enqueuing next media item."""
223 if self.synced_to:
224 # this should be already handled by the player manager, but just in case...
225 msg = (
226 f"Player {self.display_name} can not "
227 "accept enqueue command, it is synced to another player."
228 )
229 raise PlayerCommandFailed(msg)
230
231 didl_metadata = create_didl_metadata(media)
232
233 def add_to_queue() -> None:
234 self.soco.avTransport.SetNextAVTransportURI(
235 [
236 ("InstanceID", 0),
237 ("NextURI", media.uri),
238 ("NextURIMetaData", didl_metadata),
239 ]
240 )
241
242 await asyncio.to_thread(add_to_queue)
243 self.mass.call_later(2, self.poll)
244
245 @soco_error()
246 async def set_members(
247 self,
248 player_ids_to_add: list[str] | None = None,
249 player_ids_to_remove: list[str] | None = None,
250 ) -> None:
251 """Handle SET_MEMBERS command on the player."""
252 if self.synced_to:
253 # this should not happen, but guard anyways
254 raise RuntimeError("Player is synced, cannot set members")
255 if not player_ids_to_add and not player_ids_to_remove:
256 return
257 player_ids_to_add = player_ids_to_add or []
258 player_ids_to_remove = player_ids_to_remove or []
259
260 if player_ids_to_remove:
261 for player_id in player_ids_to_remove:
262 if player_to_remove := cast("SonosPlayer", self.mass.players.get(player_id)):
263 await asyncio.to_thread(player_to_remove.soco.unjoin)
264 self.mass.call_later(2, player_to_remove.poll)
265
266 if player_ids_to_add:
267 for player_id in player_ids_to_add:
268 if player_to_add := cast("SonosPlayer", self.mass.players.get(player_id)):
269 await asyncio.to_thread(player_to_add.soco.join, self.soco)
270 self.mass.call_later(2, player_to_add.poll)
271
272 async def poll(self) -> None:
273 """Poll player for state updates."""
274
275 def _poll() -> None:
276 """Poll the speaker for updates (NOT async friendly)."""
277 self.update_groups()
278 self.poll_media()
279 self._attr_volume_level = self.soco.volume
280 self._attr_volume_muted = self.soco.mute
281
282 await self._check_availability()
283 if self._attr_available:
284 await asyncio.to_thread(_poll)
285
286 @soco_error()
287 def poll_media(self) -> None:
288 """Poll information about currently playing media."""
289 transport_info = self.soco.get_current_transport_info()
290 new_status = transport_info["current_transport_state"]
291
292 if new_status == SONOS_STATE_TRANSITIONING:
293 return
294
295 new_status = _convert_state(new_status)
296 update_position = new_status != self._attr_playback_state
297 self._attr_playback_state = new_status
298 self._set_basic_track_info(update_position=update_position)
299 self.update_player()
300
301 def update_ip(self, ip_address: str) -> None:
302 """Handle updated IP of a Sonos player (NOT async friendly)."""
303 if self._attr_available:
304 return
305 self.logger.debug(
306 "Player IP-address changed from %s to %s", self.soco.ip_address, ip_address
307 )
308 try:
309 self.ping()
310 except SonosUpdateError:
311 return
312 self.soco.ip_address = ip_address
313 asyncio.run_coroutine_threadsafe(self.setup(), self.mass.loop)
314 self._attr_device_info = DeviceInfo(
315 model=self._attr_device_info.model,
316 manufacturer=self._attr_device_info.manufacturer,
317 )
318 self._attr_device_info.ip_address = ip_address
319 self.update_player()
320
321 async def _check_availability(self) -> None:
322 """Check if the player is still available."""
323 try:
324 await asyncio.to_thread(self.ping)
325 self._speaker_activity("ping")
326 except SonosUpdateError:
327 if not self._attr_available:
328 return
329 self.logger.warning(
330 "No recent activity and cannot reach %s, marking unavailable",
331 self.display_name,
332 )
333 await self.offline()
334
335 @soco_error()
336 def ping(self) -> None:
337 """Test device availability. Failure will raise SonosUpdateError."""
338 self.soco.renderingControl.GetVolume([("InstanceID", 0), ("Channel", "Master")], timeout=1)
339
340 @soco_error()
341 def _poll_track_info(self) -> dict[str, Any]:
342 """Poll the speaker for current track info.
343
344 Add converted position values (NOT async fiendly).
345 """
346 track_info: dict[str, Any] = self.soco.get_current_track_info()
347 track_info[DURATION_SECONDS] = _timespan_secs(track_info.get("duration"))
348 track_info[POSITION_SECONDS] = _timespan_secs(track_info.get("position"))
349 return track_info
350
351 def update_player(self, signal_update: bool = True) -> None:
352 """Update Sonos Player."""
353 self._update_attributes()
354 if signal_update:
355 # send update to the player manager right away only if we are triggered from an event
356 # when we're just updating from a manual poll, the player manager
357 # will detect changes to the player object itself
358 self.mass.loop.call_soon_threadsafe(self.update_state)
359
360 async def _subscribe_target(
361 self, target: SubscriptionBase, sub_callback: Callable[[SonosEvent], None]
362 ) -> None:
363 """Create a Sonos subscription for given target."""
364
365 def on_renew_failed(exception: Exception) -> None:
366 """Handle a failed subscription renewal callback."""
367 self.mass.create_task(self._renew_failed(exception))
368
369 # Use events_asyncio which makes subscribe() async-awaitable
370 subscription = await target.subscribe(
371 auto_renew=True, requested_timeout=SUBSCRIPTION_TIMEOUT
372 )
373 subscription.callback = sub_callback
374 subscription.auto_renew_fail = on_renew_failed
375 self._subscriptions.append(subscription)
376
377 async def _renew_failed(self, exception: Exception) -> None:
378 """Mark the speaker as offline after a subscription renewal failure.
379
380 This is to reset the state to allow a future clean subscription attempt.
381 """
382 if not self._attr_available:
383 return
384
385 self.log_subscription_result(exception, "Subscription renewal", logging.WARNING)
386 await self.offline()
387
388 def log_subscription_result(self, result: Any, event: str, level: int = logging.DEBUG) -> None:
389 """Log a message if a subscription action (create/renew/stop) results in an exception."""
390 if not isinstance(result, Exception):
391 return
392
393 if isinstance(result, asyncio.exceptions.TimeoutError):
394 message = "Request timed out"
395 exc_info = None
396 else:
397 message = str(result)
398 exc_info = result if not str(result) else None
399
400 self.logger.log(
401 level,
402 "%s failed for %s: %s",
403 event,
404 self.display_name,
405 message,
406 exc_info=exc_info if self.logger.isEnabledFor(10) else None,
407 )
408
409 async def subscribe(self) -> None:
410 """Initiate event subscriptions under an async lock."""
411 if not self._subscription_lock:
412 self._subscription_lock = asyncio.Lock()
413
414 async with self._subscription_lock:
415 try:
416 # Create event subscriptions.
417 subscriptions = [
418 self._subscribe_target(getattr(self.soco, service), self._handle_event)
419 for service in self.missing_subscriptions
420 ]
421 if not subscriptions:
422 return
423 self.logger.log(
424 VERBOSE_LOG_LEVEL, "Creating subscriptions for %s", self.display_name
425 )
426 results = await asyncio.gather(*subscriptions, return_exceptions=True)
427 for result in results:
428 self.log_subscription_result(result, "Creating subscription", logging.WARNING)
429 if any(isinstance(result, Exception) for result in results):
430 raise SonosSubscriptionsFailed
431 except SonosSubscriptionsFailed:
432 self.logger.warning("Creating subscriptions failed for %s", self.display_name)
433 assert self._subscription_lock is not None
434 async with self._subscription_lock:
435 await self.offline()
436
437 async def unsubscribe(self) -> None:
438 """Cancel all subscriptions."""
439 if not self._subscriptions:
440 return
441 self.logger.log(VERBOSE_LOG_LEVEL, "Unsubscribing from events for %s", self.display_name)
442 results = await asyncio.gather(
443 *(subscription.unsubscribe() for subscription in self._subscriptions),
444 return_exceptions=True,
445 )
446 for result in results:
447 self.log_subscription_result(result, "Unsubscribe")
448 self._subscriptions = []
449
450 def _handle_event(self, event: SonosEvent) -> None:
451 """Handle SonosEvent callback."""
452 service_type: str = event.service.service_type
453 self._speaker_activity(f"{service_type} subscription")
454 if service_type == "DeviceProperties":
455 self.update_player()
456 return
457 if service_type == "AVTransport":
458 self._handle_avtransport_event(event)
459 return
460 if service_type == "RenderingControl":
461 self._handle_rendering_control_event(event)
462 return
463 if service_type == "ZoneGroupTopology":
464 self._handle_zone_group_topology_event(event)
465 return
466
467 def _handle_avtransport_event(self, event: SonosEvent) -> None:
468 """Update information about currently playing media from an event."""
469 # NOTE: The new coordinator can be provided in a media update event but
470 # before the ZoneGroupState updates. If this happens the playback
471 # state will be incorrect and should be ignored. Switching to the
472 # new coordinator will use its media. The regrouping process will
473 # be completed during the next ZoneGroupState update.
474
475 # Missing transport_state indicates a transient error
476 if (new_status := event.variables.get("transport_state")) is None:
477 return
478
479 # Ignore transitions, we should get the target state soon
480 if new_status == SONOS_STATE_TRANSITIONING:
481 return
482
483 evars = event.variables
484 new_status = _convert_state(evars["transport_state"])
485 state_changed = new_status != self._attr_playback_state
486
487 self._attr_playback_state = new_status
488
489 track_uri = evars["enqueued_transport_uri"] or evars["current_track_uri"]
490 audio_source = self.soco.music_source_from_uri(track_uri)
491
492 self._set_basic_track_info(update_position=state_changed)
493 ct_md = evars["current_track_meta_data"]
494
495 et_uri_md = evars["enqueued_transport_uri_meta_data"]
496
497 channel = ""
498 if audio_source == MUSIC_SRC_RADIO:
499 if et_uri_md:
500 channel = et_uri_md.title
501
502 # Extra guards for S1 compatibility
503 if ct_md and hasattr(ct_md, "radio_show") and ct_md.radio_show:
504 radio_show = ct_md.radio_show.split(",")[0]
505 channel = " • ".join(filter(None, [channel, radio_show]))
506
507 if isinstance(et_uri_md, DidlAudioBroadcast) and self._attr_current_media:
508 self._attr_current_media.title = self._attr_current_media.title or channel
509
510 self.update_player()
511
512 def _handle_rendering_control_event(self, event: SonosEvent) -> None:
513 """Update information about currently volume settings."""
514 variables = event.variables
515
516 if "volume" in variables:
517 volume = variables["volume"]
518 self._attr_volume_level = int(volume["Master"])
519
520 if mute := variables.get("mute"):
521 self._attr_volume_muted = mute["Master"] == "1"
522
523 self.update_player()
524
525 def _handle_zone_group_topology_event(self, event: SonosEvent) -> None:
526 """Handle callback for topology change event."""
527 if "zone_player_uui_ds_in_group" not in event.variables:
528 return
529 asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(event), self.mass.loop)
530
531 def _update_attributes(self) -> None:
532 """Update attributes of the MA Player from SoCo state."""
533 if not self._attr_available:
534 self._attr_playback_state = PlayerState.IDLE
535 self._attr_group_members.clear()
536 return
537
538 def _set_basic_track_info(self, update_position: bool = False) -> None:
539 """Query the speaker to update media metadata and position info."""
540 try:
541 track_info = self._poll_track_info()
542 except SonosUpdateError as err:
543 self.logger.warning("Fetching track info failed: %s", err)
544 return
545 if not track_info["uri"]:
546 return
547 uri = track_info["uri"]
548
549 audio_source = self.soco.music_source_from_uri(uri)
550 if (source_id := SOURCE_MAPPING.get(audio_source)) and audio_source in LINEIN_SOURCES:
551 self._attr_elapsed_time = None
552 self._attr_elapsed_time_last_updated = None
553 self._attr_active_source = source_id
554 self._attr_current_media = None
555 if source_id not in [x.id for x in self._attr_source_list]:
556 self._attr_source_list.append(PLAYER_SOURCE_MAP[source_id])
557 return
558
559 current_media = PlayerMedia(
560 uri=uri,
561 artist=track_info.get("artist"),
562 album=track_info.get("album"),
563 title=track_info.get("title"),
564 image_url=track_info.get("album_art"),
565 )
566 self._attr_current_media = current_media
567 self._attr_active_source = None
568 self._update_media_position(track_info, force_update=update_position)
569
570 def _update_media_position(
571 self, position_info: dict[str, int], force_update: bool = False
572 ) -> None:
573 """Update state when playing music tracks."""
574 duration = position_info.get(DURATION_SECONDS)
575 current_position = position_info.get(POSITION_SECONDS)
576
577 if not (duration or current_position):
578 self._attr_elapsed_time = None
579 self._attr_elapsed_time_last_updated = None
580 return
581
582 should_update = force_update
583 if self._attr_current_media:
584 self._attr_current_media.duration = duration
585
586 # player started reporting position?
587 if current_position is not None and self._attr_elapsed_time is None:
588 should_update = True
589
590 # position jumped?
591 if current_position is not None and self._attr_elapsed_time is not None:
592 if self._attr_playback_state == PlaybackState.PLAYING:
593 assert self._attr_elapsed_time_last_updated is not None
594 time_diff = time.time() - self._attr_elapsed_time_last_updated
595 else:
596 time_diff = 0
597
598 calculated_position = self._attr_elapsed_time + time_diff
599
600 if abs(calculated_position - current_position) > 1.5:
601 should_update = True
602
603 if current_position is None:
604 self._attr_elapsed_time = None
605 self._attr_elapsed_time_last_updated = None
606 elif should_update:
607 self._attr_elapsed_time = current_position
608 self._attr_elapsed_time_last_updated = time.time()
609
610 def _speaker_activity(self, source: str) -> None:
611 """Track the last activity on this speaker, set availability and resubscribe."""
612 if self._resub_cooldown_expires_at:
613 if time.monotonic() < self._resub_cooldown_expires_at:
614 self.logger.debug(
615 "Activity on %s from %s while in cooldown, ignoring",
616 self.display_name,
617 source,
618 )
619 return
620 self._resub_cooldown_expires_at = None
621
622 self.logger.log(VERBOSE_LOG_LEVEL, "Activity on %s from %s", self.display_name, source)
623 self._last_activity = time.monotonic()
624 was_available = self._attr_available
625 self._attr_available = True
626 if not was_available:
627 self.update_player()
628 self.mass.loop.call_soon_threadsafe(self.mass.create_task, self.subscribe())
629
630 def update_groups(self) -> None:
631 """Update group topology when polling."""
632 asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(), self.mass.loop)
633
634 def create_update_groups_coro(
635 self, event: SonosEvent | None = None
636 ) -> Coroutine[Any, Any, None]:
637 """Handle callback for topology change event."""
638
639 def _get_soco_group() -> list[str]:
640 """Ask SoCo cache for existing topology."""
641 coordinator_uid = self.soco.uid
642 joined_uids = []
643 with contextlib.suppress(OSError, SoCoException):
644 if self.soco.group and self.soco.group.coordinator:
645 coordinator_uid = self.soco.group.coordinator.uid
646 joined_uids = [
647 p.uid
648 for p in self.soco.group.members
649 if p.uid != coordinator_uid and p.is_visible
650 ]
651
652 return [coordinator_uid, *joined_uids]
653
654 async def _extract_group(event: SonosEvent | None) -> list[str]:
655 """Extract group layout from a topology event."""
656 group = event and event.zone_player_uui_ds_in_group
657 if group:
658 assert isinstance(group, str)
659 return group.split(",")
660 return await asyncio.to_thread(_get_soco_group)
661
662 def _regroup(group: list[str]) -> None:
663 """Rebuild internal group layout (async safe)."""
664 if group == [self.soco.uid] and not self._attr_group_members:
665 # Skip updating existing single speakers in polling mode
666 return
667
668 group_members_ids = []
669
670 for uid in group:
671 speaker = self.mass.players.get(uid)
672 if speaker:
673 group_members_ids.append(uid)
674 else:
675 self.logger.debug(
676 "%s group member unavailable (%s), will try again",
677 self.display_name,
678 uid,
679 )
680 return
681
682 if self._attr_group_members == group_members_ids:
683 # Useful in polling mode for speakers with stereo pairs or surrounds
684 # as those "invisible" speakers will bypass the single speaker check
685 return
686
687 self._attr_group_members = group_members_ids
688 self.mass.loop.call_soon_threadsafe(self.update_state)
689
690 self.logger.debug("Regrouped %s: %s", self.display_name, self._attr_group_members)
691 self.update_player()
692
693 async def _handle_group_event(event: SonosEvent | None) -> None:
694 """Get async lock and handle event."""
695 _provider = cast("SonosPlayerProvider", self._provider)
696 async with _provider.topology_condition:
697 group = await _extract_group(event)
698 if self.soco.uid == group[0]:
699 _regroup(group)
700 _provider.topology_condition.notify_all()
701
702 return _handle_group_event(event)
703
704 async def wait_for_groups(self, groups: list[list[SonosPlayer]]) -> None:
705 """Wait until all groups are present, or timeout."""
706
707 def _test_groups(groups: list[list[SonosPlayer]]) -> bool:
708 """Return whether all groups exist now."""
709 for group in groups:
710 coordinator = group[0]
711
712 # Test that coordinator is coordinating
713 current_group = coordinator.group_members
714 if coordinator != current_group[0]:
715 return False
716
717 # Test that joined members match
718 if set(group[1:]) != set(current_group[1:]):
719 return False
720
721 return True
722
723 _provider = cast("SonosPlayerProvider", self._provider)
724 try:
725 async with asyncio.timeout(5):
726 while not _test_groups(groups):
727 await _provider.topology_condition.wait()
728 except TimeoutError:
729 self.logger.warning("Timeout waiting for target groups %s", groups)
730
731 if players := self.mass.players.all(provider_filter=_provider.instance_id):
732 any_speaker = cast("SonosPlayer", players[0])
733 any_speaker.soco.zone_group_state.clear_cache()
734
735
736def _convert_state(sonos_state: str | None) -> PlayerState:
737 """Convert Sonos state to PlayerState."""
738 if sonos_state == "PLAYING":
739 return PlayerState.PLAYING
740 if sonos_state == "TRANSITIONING":
741 return PlayerState.PLAYING
742 if sonos_state == "PAUSED_PLAYBACK":
743 return PlayerState.PAUSED
744 return PlayerState.IDLE
745
746
747def _timespan_secs(timespan: str | None) -> int | None:
748 """Parse a time-span into number of seconds."""
749 if timespan in ("", "NOT_IMPLEMENTED"):
750 return None
751 if timespan is None:
752 return None
753 return int(sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":")))))
754