music-assistant-server

30.3 KBPY
player.py
30.3 KB789 lines • python
1"""
2Sonos Player provider for Music Assistant: SonosPlayer object/model.
3
4Note that large parts of this code are copied over from the Home Assistant
5integration for Sonos.
6"""
7
8from __future__ import annotations
9
10import asyncio
11import contextlib
12import logging
13import time
14from collections.abc import Callable, Coroutine
15from typing import TYPE_CHECKING, Any, cast
16
17from music_assistant_models.enums import IdentifierType, MediaType, PlaybackState, PlayerState
18from music_assistant_models.errors import PlayerCommandFailed
19from soco import SoCoException
20from soco.core import MUSIC_SRC_RADIO, SoCo
21from soco.data_structures import DidlAudioBroadcast
22
23from music_assistant.constants import VERBOSE_LOG_LEVEL, create_sample_rates_config_entry
24from music_assistant.helpers.upnp import create_didl_metadata
25from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
26
27from .constants import (
28    DURATION_SECONDS,
29    LINEIN_SOURCE_IDS,
30    LINEIN_SOURCES,
31    NEVER_TIME,
32    PLAYER_FEATURES,
33    PLAYER_SOURCE_MAP,
34    POSITION_SECONDS,
35    RESUB_COOLDOWN_SECONDS,
36    SONOS_STATE_TRANSITIONING,
37    SOURCE_MAPPING,
38    SUBSCRIPTION_SERVICES,
39    SUBSCRIPTION_TIMEOUT,
40)
41from .helpers import SonosUpdateError, soco_error
42
43if TYPE_CHECKING:
44    from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
45    from soco.events_base import Event as SonosEvent
46    from soco.events_base import SubscriptionBase
47
48    from .provider import SonosPlayerProvider
49
50CALLBACK_TYPE = Callable[[], None]
51LOGGER = logging.getLogger(__name__)
52
53
54class SonosSubscriptionsFailed(PlayerCommandFailed):
55    """Subscription creation failed."""
56
57
58class SonosPlayer(Player):
59    """Sonos Player implementation for S1 speakers."""
60
61    def __init__(
62        self,
63        provider: SonosPlayerProvider,
64        soco: SoCo,
65    ) -> None:
66        """Initialize SonosPlayer instance."""
67        super().__init__(provider, soco.uid)
68        self.soco = soco
69        self.household_id: str = soco.household_id
70        self.subscriptions: list[SubscriptionBase] = []
71
72        # Set player attributes
73        self._attr_supported_features = set(PLAYER_FEATURES)
74        self._attr_name = soco.player_name
75        self._attr_device_info = DeviceInfo(
76            model=soco.speaker_info["model_name"],
77            manufacturer="Sonos",
78        )
79        self._attr_device_info.add_identifier(IdentifierType.IP_ADDRESS, soco.ip_address)
80        self._attr_device_info.add_identifier(IdentifierType.UUID, soco.uid)
81        mac_address = self._extract_mac_from_player_id()
82        if mac_address:
83            self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac_address)
84        self._attr_needs_poll = True
85        self._attr_poll_interval = 5
86        self._attr_available = True
87        self._attr_can_group_with = {provider.instance_id}
88
89        # Subscriptions and events
90        self._subscriptions: list[SubscriptionBase] = []
91        self._subscription_lock: asyncio.Lock | None = None
92        self._last_activity: float = NEVER_TIME
93        self._resub_cooldown_expires_at: float | None = None
94
95    @property
96    def missing_subscriptions(self) -> set[str]:
97        """Return a list of missing service subscriptions."""
98        subscribed_services = {sub.service.service_type for sub in self._subscriptions}
99        return SUBSCRIPTION_SERVICES - subscribed_services
100
101    def _extract_mac_from_player_id(self) -> str | None:
102        """Extract MAC address from Sonos player_id.
103
104        Sonos player_ids follow the format RINCON_XXXXXXXXXXXX01400 where
105        the middle 12 hex characters represent the MAC address.
106
107        :return: MAC address string in XX:XX:XX:XX:XX:XX format, or None if not extractable.
108        """
109        # Remove RINCON_ prefix if present
110        player_id = self.player_id
111        player_id = player_id.removeprefix("RINCON_")
112
113        # Remove the 01400 suffix (or similar) - should be last 5 chars
114        if len(player_id) >= 17:  # 12 hex chars for MAC + 5 chars suffix
115            mac_hex = player_id[:12]
116        else:
117            return None
118
119        # Validate it looks like a MAC (all hex characters)
120        try:
121            int(mac_hex, 16)
122        except ValueError:
123            return None
124
125        # Format as XX:XX:XX:XX:XX:XX
126        return ":".join(mac_hex[i : i + 2].upper() for i in range(0, 12, 2))
127
128    async def setup(self) -> None:
129        """Set up the player."""
130        self._attr_volume_level = self.soco.volume
131        self._attr_volume_muted = self.soco.mute
132        self.update_groups()
133        if not self.synced_to:
134            self.poll_media()
135        await self.subscribe()
136        await self.mass.players.register_or_update(self)
137
138    async def offline(self) -> None:
139        """Handle removal of speaker when unavailable."""
140        if not self._attr_available:
141            return
142
143        if self._resub_cooldown_expires_at is None and not self.mass.closing:
144            self._resub_cooldown_expires_at = time.monotonic() + RESUB_COOLDOWN_SECONDS
145            self.logger.debug("Starting resubscription cooldown for %s", self.display_name)
146
147        self._attr_available = False
148        self._share_link_plugin = None
149
150        self.update_state()
151        await self.unsubscribe()
152
153    async def get_config_entries(
154        self,
155        action: str | None = None,
156        values: dict[str, ConfigValueType] | None = None,
157    ) -> list[ConfigEntry]:
158        """Return all (provider/player specific) Config Entries for the player."""
159        return [
160            create_sample_rates_config_entry(
161                supported_sample_rates=[44100, 48000],
162                supported_bit_depths=[16],
163                hidden=True,
164            ),
165        ]
166
167    async def stop(self) -> None:
168        """Send STOP command to the player."""
169        if self.synced_to:
170            self.logger.debug(
171                "Ignore STOP command for %s: Player is synced to another player.",
172                self.player_id,
173            )
174            return
175        if self._attr_active_source in LINEIN_SOURCE_IDS:
176            # Play an invalid URI to force stop line-in sources
177            with contextlib.suppress(SoCoException):
178                await asyncio.to_thread(self.soco.play_uri, "")
179        else:
180            await asyncio.to_thread(self.soco.stop)
181        self.mass.call_later(2, self.poll)
182        self.update_state()
183
184    async def play(self) -> None:
185        """Send PLAY command to the player."""
186        if self.synced_to:
187            self.logger.debug(
188                "Ignore PLAY command for %s: Player is synced to another player.",
189                self.player_id,
190            )
191            return
192        await asyncio.to_thread(self.soco.play)
193        self.mass.call_later(2, self.poll)
194
195    async def pause(self) -> None:
196        """Send PAUSE command to the player."""
197        if self.synced_to:
198            self.logger.debug(
199                "Ignore PAUSE command for %s: Player is synced to another player.",
200                self.player_id,
201            )
202            return
203        if "Pause" not in self.soco.available_actions:
204            # pause not possible
205            await self.stop()
206            return
207        await asyncio.to_thread(self.soco.pause)
208        self.mass.call_later(2, self.poll)
209
210    async def volume_set(self, volume_level: int) -> None:
211        """Send VOLUME_SET command to the player."""
212
213        def set_volume_level(volume_level: int) -> None:
214            self.soco.volume = volume_level
215
216        await asyncio.to_thread(set_volume_level, volume_level)
217        self.mass.call_later(2, self.poll)
218
219    async def volume_mute(self, muted: bool) -> None:
220        """Send VOLUME MUTE command to the player."""
221
222        def set_volume_mute(muted: bool) -> None:
223            self.soco.mute = muted
224
225        await asyncio.to_thread(set_volume_mute, muted)
226        self.mass.call_later(2, self.poll)
227
228    async def play_media(self, media: PlayerMedia) -> None:
229        """Handle PLAY MEDIA on the player."""
230        if self.synced_to:
231            # this should be already handled by the player manager, but just in case...
232            msg = (
233                f"Player {self.display_name} can not "
234                "accept play_media command, it is synced to another player."
235            )
236            raise PlayerCommandFailed(msg)
237
238        if not media.duration:
239            # Sonos really does not like FLAC streams without duration
240            media.uri = media.uri.replace(".flac", ".mp3")
241
242        didl_metadata = create_didl_metadata(media)
243        is_announcement = media.media_type == MediaType.ANNOUNCEMENT
244        force_radio = False if is_announcement else not media.duration
245
246        stream_url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
247        await asyncio.to_thread(
248            self.soco.play_uri, stream_url, meta=didl_metadata, force_radio=force_radio
249        )
250        self.mass.call_later(2, self.poll)
251
252    async def enqueue_next_media(self, media: PlayerMedia) -> None:
253        """Handle enqueuing next media item."""
254        if self.synced_to:
255            # this should be already handled by the player manager, but just in case...
256            msg = (
257                f"Player {self.display_name} can not "
258                "accept enqueue command, it is synced to another player."
259            )
260            raise PlayerCommandFailed(msg)
261
262        didl_metadata = create_didl_metadata(media)
263
264        def add_to_queue() -> None:
265            self.soco.avTransport.SetNextAVTransportURI(
266                [
267                    ("InstanceID", 0),
268                    ("NextURI", media.uri),
269                    ("NextURIMetaData", didl_metadata),
270                ]
271            )
272
273        await asyncio.to_thread(add_to_queue)
274        self.mass.call_later(2, self.poll)
275
276    @soco_error()
277    async def set_members(
278        self,
279        player_ids_to_add: list[str] | None = None,
280        player_ids_to_remove: list[str] | None = None,
281    ) -> None:
282        """Handle SET_MEMBERS command on the player."""
283        if self.synced_to:
284            # this should not happen, but guard anyways
285            raise RuntimeError("Player is synced, cannot set members")
286        if not player_ids_to_add and not player_ids_to_remove:
287            return
288        player_ids_to_add = player_ids_to_add or []
289        player_ids_to_remove = player_ids_to_remove or []
290
291        if player_ids_to_remove:
292            for player_id in player_ids_to_remove:
293                if player_to_remove := cast("SonosPlayer", self.mass.players.get_player(player_id)):
294                    await asyncio.to_thread(player_to_remove.soco.unjoin)
295                    self.mass.call_later(2, player_to_remove.poll)
296
297        if player_ids_to_add:
298            for player_id in player_ids_to_add:
299                if player_to_add := cast("SonosPlayer", self.mass.players.get_player(player_id)):
300                    await asyncio.to_thread(player_to_add.soco.join, self.soco)
301                    self.mass.call_later(2, player_to_add.poll)
302
303    async def poll(self) -> None:
304        """Poll player for state updates."""
305
306        def _poll() -> None:
307            """Poll the speaker for updates (NOT async friendly)."""
308            self.update_groups()
309            self.poll_media()
310            self._attr_volume_level = self.soco.volume
311            self._attr_volume_muted = self.soco.mute
312
313        await self._check_availability()
314        if self._attr_available:
315            await asyncio.to_thread(_poll)
316
317    @soco_error()
318    def poll_media(self) -> None:
319        """Poll information about currently playing media."""
320        transport_info = self.soco.get_current_transport_info()
321        new_status = transport_info["current_transport_state"]
322
323        if new_status == SONOS_STATE_TRANSITIONING:
324            return
325
326        new_status = _convert_state(new_status)
327        update_position = new_status != self._attr_playback_state
328        self._attr_playback_state = new_status
329        self._set_basic_track_info(update_position=update_position)
330        self.update_player()
331
332    def update_ip(self, ip_address: str) -> None:
333        """Handle updated IP of a Sonos player (NOT async friendly)."""
334        if self._attr_available:
335            return
336        self.logger.debug(
337            "Player IP-address changed from %s to %s", self.soco.ip_address, ip_address
338        )
339        try:
340            self.ping()
341        except SonosUpdateError:
342            return
343        self.soco.ip_address = ip_address
344        asyncio.run_coroutine_threadsafe(self.setup(), self.mass.loop)
345        self._attr_device_info = DeviceInfo(
346            model=self._attr_device_info.model,
347            manufacturer=self._attr_device_info.manufacturer,
348        )
349        self._attr_device_info.add_identifier(IdentifierType.IP_ADDRESS, ip_address)
350        self._attr_device_info.add_identifier(IdentifierType.UUID, self.soco.uid)
351        mac_address = self._extract_mac_from_player_id()
352        if mac_address:
353            self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac_address)
354        self.update_player()
355
356    async def _check_availability(self) -> None:
357        """Check if the player is still available."""
358        try:
359            await asyncio.to_thread(self.ping)
360            self._speaker_activity("ping")
361        except SonosUpdateError:
362            if not self._attr_available:
363                return
364            self.logger.warning(
365                "No recent activity and cannot reach %s, marking unavailable",
366                self.display_name,
367            )
368            await self.offline()
369
370    @soco_error()
371    def ping(self) -> None:
372        """Test device availability. Failure will raise SonosUpdateError."""
373        self.soco.renderingControl.GetVolume([("InstanceID", 0), ("Channel", "Master")], timeout=1)
374
375    @soco_error()
376    def _poll_track_info(self) -> dict[str, Any]:
377        """Poll the speaker for current track info.
378
379        Add converted position values (NOT async fiendly).
380        """
381        track_info: dict[str, Any] = self.soco.get_current_track_info()
382        track_info[DURATION_SECONDS] = _timespan_secs(track_info.get("duration"))
383        track_info[POSITION_SECONDS] = _timespan_secs(track_info.get("position"))
384        return track_info
385
386    def update_player(self, signal_update: bool = True) -> None:
387        """Update Sonos Player."""
388        self._update_attributes()
389        if signal_update:
390            # send update to the player manager right away only if we are triggered from an event
391            # when we're just updating from a manual poll, the player manager
392            # will detect changes to the player object itself
393            self.mass.loop.call_soon_threadsafe(self.update_state)
394
395    async def _subscribe_target(
396        self, target: SubscriptionBase, sub_callback: Callable[[SonosEvent], None]
397    ) -> None:
398        """Create a Sonos subscription for given target."""
399
400        def on_renew_failed(exception: Exception) -> None:
401            """Handle a failed subscription renewal callback."""
402            self.mass.create_task(self._renew_failed(exception))
403
404        # Use events_asyncio which makes subscribe() async-awaitable
405        subscription = await target.subscribe(
406            auto_renew=True, requested_timeout=SUBSCRIPTION_TIMEOUT
407        )
408        subscription.callback = sub_callback
409        subscription.auto_renew_fail = on_renew_failed
410        self._subscriptions.append(subscription)
411
412    async def _renew_failed(self, exception: Exception) -> None:
413        """Mark the speaker as offline after a subscription renewal failure.
414
415        This is to reset the state to allow a future clean subscription attempt.
416        """
417        if not self._attr_available:
418            return
419
420        self.log_subscription_result(exception, "Subscription renewal", logging.WARNING)
421        await self.offline()
422
423    def log_subscription_result(self, result: Any, event: str, level: int = logging.DEBUG) -> None:
424        """Log a message if a subscription action (create/renew/stop) results in an exception."""
425        if not isinstance(result, Exception):
426            return
427
428        if isinstance(result, asyncio.exceptions.TimeoutError):
429            message = "Request timed out"
430            exc_info = None
431        else:
432            message = str(result)
433            exc_info = result if not str(result) else None
434
435        self.logger.log(
436            level,
437            "%s failed for %s: %s",
438            event,
439            self.display_name,
440            message,
441            exc_info=exc_info if self.logger.isEnabledFor(10) else None,
442        )
443
444    async def subscribe(self) -> None:
445        """Initiate event subscriptions under an async lock."""
446        if not self._subscription_lock:
447            self._subscription_lock = asyncio.Lock()
448
449        async with self._subscription_lock:
450            try:
451                # Create event subscriptions.
452                subscriptions = [
453                    self._subscribe_target(getattr(self.soco, service), self._handle_event)
454                    for service in self.missing_subscriptions
455                ]
456                if not subscriptions:
457                    return
458                self.logger.log(
459                    VERBOSE_LOG_LEVEL, "Creating subscriptions for %s", self.display_name
460                )
461                results = await asyncio.gather(*subscriptions, return_exceptions=True)
462                for result in results:
463                    self.log_subscription_result(result, "Creating subscription", logging.WARNING)
464                if any(isinstance(result, Exception) for result in results):
465                    raise SonosSubscriptionsFailed
466            except SonosSubscriptionsFailed:
467                self.logger.warning("Creating subscriptions failed for %s", self.display_name)
468                assert self._subscription_lock is not None
469                async with self._subscription_lock:
470                    await self.offline()
471
472    async def unsubscribe(self) -> None:
473        """Cancel all subscriptions."""
474        if not self._subscriptions:
475            return
476        self.logger.log(VERBOSE_LOG_LEVEL, "Unsubscribing from events for %s", self.display_name)
477        results = await asyncio.gather(
478            *(subscription.unsubscribe() for subscription in self._subscriptions),
479            return_exceptions=True,
480        )
481        for result in results:
482            self.log_subscription_result(result, "Unsubscribe")
483        self._subscriptions = []
484
485    def _handle_event(self, event: SonosEvent) -> None:
486        """Handle SonosEvent callback."""
487        service_type: str = event.service.service_type
488        self._speaker_activity(f"{service_type} subscription")
489        if service_type == "DeviceProperties":
490            self.update_player()
491            return
492        if service_type == "AVTransport":
493            self._handle_avtransport_event(event)
494            return
495        if service_type == "RenderingControl":
496            self._handle_rendering_control_event(event)
497            return
498        if service_type == "ZoneGroupTopology":
499            self._handle_zone_group_topology_event(event)
500            return
501
502    def _handle_avtransport_event(self, event: SonosEvent) -> None:
503        """Update information about currently playing media from an event."""
504        # NOTE: The new coordinator can be provided in a media update event but
505        # before the ZoneGroupState updates. If this happens the playback
506        # state will be incorrect and should be ignored. Switching to the
507        # new coordinator will use its media. The regrouping process will
508        # be completed during the next ZoneGroupState update.
509
510        # Missing transport_state indicates a transient error
511        if (new_status := event.variables.get("transport_state")) is None:
512            return
513
514        # Ignore transitions, we should get the target state soon
515        if new_status == SONOS_STATE_TRANSITIONING:
516            return
517
518        evars = event.variables
519        new_status = _convert_state(evars["transport_state"])
520        state_changed = new_status != self._attr_playback_state
521
522        self._attr_playback_state = new_status
523
524        track_uri = evars["enqueued_transport_uri"] or evars["current_track_uri"]
525        audio_source = self.soco.music_source_from_uri(track_uri)
526
527        self._set_basic_track_info(update_position=state_changed)
528        ct_md = evars["current_track_meta_data"]
529
530        et_uri_md = evars["enqueued_transport_uri_meta_data"]
531
532        channel = ""
533        if audio_source == MUSIC_SRC_RADIO:
534            if et_uri_md:
535                channel = et_uri_md.title
536
537            # Extra guards for S1 compatibility
538            if ct_md and hasattr(ct_md, "radio_show") and ct_md.radio_show:
539                radio_show = ct_md.radio_show.split(",")[0]
540                channel = " • ".join(filter(None, [channel, radio_show]))
541
542            if isinstance(et_uri_md, DidlAudioBroadcast) and self._attr_current_media:
543                self._attr_current_media.title = self._attr_current_media.title or channel
544
545        self.update_player()
546
547    def _handle_rendering_control_event(self, event: SonosEvent) -> None:
548        """Update information about currently volume settings."""
549        variables = event.variables
550
551        if "volume" in variables:
552            volume = variables["volume"]
553            self._attr_volume_level = int(volume["Master"])
554
555        if mute := variables.get("mute"):
556            self._attr_volume_muted = mute["Master"] == "1"
557
558        self.update_player()
559
560    def _handle_zone_group_topology_event(self, event: SonosEvent) -> None:
561        """Handle callback for topology change event."""
562        if "zone_player_uui_ds_in_group" not in event.variables:
563            return
564        asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(event), self.mass.loop)
565
566    def _update_attributes(self) -> None:
567        """Update attributes of the MA Player from SoCo state."""
568        if not self._attr_available:
569            self._attr_playback_state = PlayerState.IDLE
570            self._attr_group_members.clear()
571            return
572
573    def _set_basic_track_info(self, update_position: bool = False) -> None:
574        """Query the speaker to update media metadata and position info."""
575        try:
576            track_info = self._poll_track_info()
577        except SonosUpdateError as err:
578            self.logger.warning("Fetching track info failed: %s", err)
579            return
580        if not track_info["uri"]:
581            return
582        uri = track_info["uri"]
583
584        audio_source = self.soco.music_source_from_uri(uri)
585        if (source_id := SOURCE_MAPPING.get(audio_source)) and audio_source in LINEIN_SOURCES:
586            self._attr_elapsed_time = None
587            self._attr_elapsed_time_last_updated = None
588            self._attr_active_source = source_id
589            self._attr_current_media = None
590            if source_id not in [x.id for x in self._attr_source_list]:
591                self._attr_source_list.append(PLAYER_SOURCE_MAP[source_id])
592            return
593
594        current_media = PlayerMedia(
595            uri=uri,
596            artist=track_info.get("artist"),
597            album=track_info.get("album"),
598            title=track_info.get("title"),
599            image_url=track_info.get("album_art"),
600        )
601        self._attr_current_media = current_media
602        self._attr_active_source = None
603        self._update_media_position(track_info, force_update=update_position)
604
605    def _update_media_position(
606        self, position_info: dict[str, int], force_update: bool = False
607    ) -> None:
608        """Update state when playing music tracks."""
609        duration = position_info.get(DURATION_SECONDS)
610        current_position = position_info.get(POSITION_SECONDS)
611
612        if not (duration or current_position):
613            self._attr_elapsed_time = None
614            self._attr_elapsed_time_last_updated = None
615            return
616
617        should_update = force_update
618        if self._attr_current_media:
619            self._attr_current_media.duration = duration
620
621        # player started reporting position?
622        if current_position is not None and self._attr_elapsed_time is None:
623            should_update = True
624
625        # position jumped?
626        if current_position is not None and self._attr_elapsed_time is not None:
627            if self._attr_playback_state == PlaybackState.PLAYING:
628                assert self._attr_elapsed_time_last_updated is not None
629                time_diff = time.time() - self._attr_elapsed_time_last_updated
630            else:
631                time_diff = 0
632
633            calculated_position = self._attr_elapsed_time + time_diff
634
635            if abs(calculated_position - current_position) > 1.5:
636                should_update = True
637
638        if current_position is None:
639            self._attr_elapsed_time = None
640            self._attr_elapsed_time_last_updated = None
641        elif should_update:
642            self._attr_elapsed_time = current_position
643            self._attr_elapsed_time_last_updated = time.time()
644
645    def _speaker_activity(self, source: str) -> None:
646        """Track the last activity on this speaker, set availability and resubscribe."""
647        if self._resub_cooldown_expires_at:
648            if time.monotonic() < self._resub_cooldown_expires_at:
649                self.logger.debug(
650                    "Activity on %s from %s while in cooldown, ignoring",
651                    self.display_name,
652                    source,
653                )
654                return
655            self._resub_cooldown_expires_at = None
656
657        self.logger.log(VERBOSE_LOG_LEVEL, "Activity on %s from %s", self.display_name, source)
658        self._last_activity = time.monotonic()
659        was_available = self._attr_available
660        self._attr_available = True
661        if not was_available:
662            self.update_player()
663            self.mass.loop.call_soon_threadsafe(self.mass.create_task, self.subscribe())
664
665    def update_groups(self) -> None:
666        """Update group topology when polling."""
667        asyncio.run_coroutine_threadsafe(self.create_update_groups_coro(), self.mass.loop)
668
669    def create_update_groups_coro(
670        self, event: SonosEvent | None = None
671    ) -> Coroutine[Any, Any, None]:
672        """Handle callback for topology change event."""
673
674        def _get_soco_group() -> list[str]:
675            """Ask SoCo cache for existing topology."""
676            coordinator_uid = self.soco.uid
677            joined_uids = []
678            with contextlib.suppress(OSError, SoCoException):
679                if self.soco.group and self.soco.group.coordinator:
680                    coordinator_uid = self.soco.group.coordinator.uid
681                    joined_uids = [
682                        p.uid
683                        for p in self.soco.group.members
684                        if p.uid != coordinator_uid and p.is_visible
685                    ]
686
687            return [coordinator_uid, *joined_uids]
688
689        async def _extract_group(event: SonosEvent | None) -> list[str]:
690            """Extract group layout from a topology event."""
691            group = event and event.zone_player_uui_ds_in_group
692            if group:
693                assert isinstance(group, str)
694                return group.split(",")
695            return await asyncio.to_thread(_get_soco_group)
696
697        def _regroup(group: list[str]) -> None:
698            """Rebuild internal group layout (async safe)."""
699            if group == [self.soco.uid] and not self._attr_group_members:
700                # Skip updating existing single speakers in polling mode
701                return
702
703            group_members_ids = []
704
705            for uid in group:
706                speaker = self.mass.players.get_player(uid)
707                if speaker:
708                    group_members_ids.append(uid)
709                else:
710                    self.logger.debug(
711                        "%s group member unavailable (%s), will try again",
712                        self.display_name,
713                        uid,
714                    )
715                    return
716
717            if self._attr_group_members == group_members_ids:
718                # Useful in polling mode for speakers with stereo pairs or surrounds
719                # as those "invisible" speakers will bypass the single speaker check
720                return
721
722            self._attr_group_members = group_members_ids
723            self.mass.loop.call_soon_threadsafe(self.update_state)
724
725            self.logger.debug("Regrouped %s: %s", self.display_name, self._attr_group_members)
726            self.update_player()
727
728        async def _handle_group_event(event: SonosEvent | None) -> None:
729            """Get async lock and handle event."""
730            _provider = cast("SonosPlayerProvider", self._provider)
731            async with _provider.topology_condition:
732                group = await _extract_group(event)
733                if self.soco.uid == group[0]:
734                    _regroup(group)
735                    _provider.topology_condition.notify_all()
736
737        return _handle_group_event(event)
738
739    async def wait_for_groups(self, groups: list[list[SonosPlayer]]) -> None:
740        """Wait until all groups are present, or timeout."""
741
742        def _test_groups(groups: list[list[SonosPlayer]]) -> bool:
743            """Return whether all groups exist now."""
744            for group in groups:
745                coordinator = group[0]
746
747                # Test that coordinator is coordinating
748                current_group = coordinator.group_members
749                if coordinator != current_group[0]:
750                    return False
751
752                # Test that joined members match
753                if set(group[1:]) != set(current_group[1:]):
754                    return False
755
756            return True
757
758        _provider = cast("SonosPlayerProvider", self._provider)
759        try:
760            async with asyncio.timeout(5):
761                while not _test_groups(groups):
762                    await _provider.topology_condition.wait()
763        except TimeoutError:
764            self.logger.warning("Timeout waiting for target groups %s", groups)
765
766        if players := self.mass.players.all_players(provider_filter=_provider.instance_id):
767            any_speaker = cast("SonosPlayer", players[0])
768            any_speaker.soco.zone_group_state.clear_cache()
769
770
771def _convert_state(sonos_state: str | None) -> PlayerState:
772    """Convert Sonos state to PlayerState."""
773    if sonos_state == "PLAYING":
774        return PlayerState.PLAYING
775    if sonos_state == "TRANSITIONING":
776        return PlayerState.PLAYING
777    if sonos_state == "PAUSED_PLAYBACK":
778        return PlayerState.PAUSED
779    return PlayerState.IDLE
780
781
782def _timespan_secs(timespan: str | None) -> int | None:
783    """Parse a time-span into number of seconds."""
784    if timespan in ("", "NOT_IMPLEMENTED"):
785        return None
786    if timespan is None:
787        return None
788    return int(sum(60 ** x[0] * int(x[1]) for x in enumerate(reversed(timespan.split(":")))))
789