music-assistant-server

21.8 KBPY
player.py
21.8 KB539 lines • python
1"""DLNA Player."""
2
3import asyncio
4import functools
5import time
6from collections.abc import Awaitable, Callable, Coroutine, Sequence
7from contextlib import suppress
8from typing import TYPE_CHECKING, Any, Concatenate
9from urllib.parse import urlparse
10
11import defusedxml.ElementTree as DefusedET
12from async_upnp_client.client import UpnpDevice, UpnpService, UpnpStateVariable
13from async_upnp_client.exceptions import UpnpError, UpnpResponseError
14from async_upnp_client.profiles.dlna import DmrDevice, TransportState
15from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
16from music_assistant_models.enums import IdentifierType, PlaybackState, PlayerFeature, PlayerType
17from music_assistant_models.errors import PlayerUnavailableError
18from music_assistant_models.player import PlayerMedia
19
20from music_assistant.constants import VERBOSE_LOG_LEVEL
21from music_assistant.helpers.upnp import create_didl_metadata
22from music_assistant.helpers.util import is_valid_mac_address
23from music_assistant.models.player import DeviceInfo, Player
24
25from .constants import PLAYER_CONFIG_ENTRIES
26
27if TYPE_CHECKING:
28    from .provider import DLNAPlayerProvider
29
30
31def catch_request_errors[DLNAPlayerT: "DLNAPlayer", **P, R](
32    func: Callable[Concatenate[DLNAPlayerT, P], Awaitable[R]],
33) -> Callable[Concatenate[DLNAPlayerT, P], Coroutine[Any, Any, R | None]]:
34    """Catch UpnpError errors."""
35
36    @functools.wraps(func)
37    async def wrapper(self: DLNAPlayerT, *args: P.args, **kwargs: P.kwargs) -> R | None:
38        """Catch UpnpError errors and check availability before and after request."""
39        self.last_command = time.time()
40        if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
41            self.logger.debug(
42                "Handling command %s for player %s",
43                func.__name__,
44                self.display_name,
45            )
46        if not self.available:
47            self.logger.warning("Device disappeared when trying to call %s", func.__name__)
48            return None
49        try:
50            return await func(self, *args, **kwargs)
51        except UpnpError as err:
52            self.force_poll = True
53            if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
54                self.logger.exception("Error during call %s: %r", func.__name__, err)
55            else:
56                self.logger.error("Error during call %s: %r", func.__name__, str(err))
57        return None
58
59    return wrapper
60
61
62class DLNAPlayer(Player):
63    """DLNA Player.
64
65    All DLNA players are considered generic protocol endpoints (PlayerType.PROTOCOL)
66    and will be wrapped in a UniversalPlayer. Devices with native provider support
67    (e.g., Sonos) are handled by their respective providers and will link to
68    the DLNA player as a protocol output.
69    """
70
71    # All DLNA devices are generic protocol endpoints - no vendor has native DLNA support in MA
72    _attr_type = PlayerType.PROTOCOL
73
74    def __init__(
75        self,
76        provider: "DLNAPlayerProvider",
77        player_id: str,
78        description_url: str,
79        device: DmrDevice | None = None,
80    ) -> None:
81        """Init Player.
82
83        The player_id is the udn.
84        """
85        super().__init__(provider, player_id)
86
87        self.device = device
88        self.description_url = description_url  # last known location (description.xml) url
89
90        self.lock = asyncio.Lock()  # Held when connecting or disconnecting the device
91
92        self.force_poll = False  # used, if connection is lost
93
94        # ssdp_connect_failed: bool = False
95        #
96        # Track BOOTID in SSDP advertisements for device changes
97        self.bootid: int | None = None
98        self.last_seen = time.time()
99        self.last_command = time.time()
100
101    def set_available(self, available: bool) -> None:
102        """Set the availability of the player."""
103        self._attr_available = available
104
105    async def _device_connect(self) -> None:
106        """Connect DLNA/DMR Device."""
107        self.logger.debug("Connecting to device at %s", self.description_url)
108
109        async with self.lock:
110            if self.device:
111                self.logger.debug("Trying to connect when device already connected")
112                return
113
114            # Connect to the base UPNP device
115            if TYPE_CHECKING:
116                assert isinstance(self.provider, DLNAPlayerProvider)  # for type checking
117            upnp_device = await self.provider.upnp_factory.async_create_device(self.description_url)
118
119            # Create profile wrapper
120            self.device = DmrDevice(upnp_device, self.provider.notify_server.event_handler)
121
122            # Subscribe to event notifications
123            try:
124                self.device.on_event = self._handle_event
125                await self.device.async_subscribe_services(auto_resubscribe=True)
126            except UpnpResponseError as err:
127                # Device rejected subscription request. This is OK, variables
128                # will be polled instead.
129                self.logger.debug("Device rejected subscription: %r", err)
130            except UpnpError as err:
131                # Don't leave the device half-constructed
132                self.device.on_event = None
133                self.device = None
134                self.logger.debug("Error while subscribing during device connect: %r", err)
135                raise
136            else:
137                # connect was successful, update device info
138                self._attr_device_info = DeviceInfo(
139                    model=self.device.model_name,
140                    manufacturer=self.device.manufacturer,
141                )
142                # Add UDN (player_id) as UUID identifier for matching with other protocols
143                # Strip the "uuid:" prefix if present for proper matching
144                uuid_value = self.player_id
145                if uuid_value.lower().startswith("uuid:"):
146                    uuid_value = uuid_value[5:]
147                self._attr_device_info.add_identifier(IdentifierType.UUID, uuid_value)
148                # Try to extract MAC address from UUID
149                # Many UPnP devices embed MAC in the last 12 chars of UUID
150                # e.g., uuid:4d691234-444c-164e-1234-001f33eaacf1 -> 00:1f:33:ea:ac:f1
151                mac_address = self._extract_mac_from_uuid(uuid_value)
152                # Only add MAC address if it's valid (not 00:00:00:00:00:00)
153                if mac_address and is_valid_mac_address(mac_address):
154                    self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac_address)
155                # Try to extract just the IP from the URL for matching
156                ip_address = self.device.device.presentation_url or self.description_url
157                with suppress(ValueError):
158                    parsed = urlparse(ip_address)
159                    if parsed.hostname:
160                        self._attr_device_info.add_identifier(
161                            IdentifierType.IP_ADDRESS, parsed.hostname
162                        )
163
164    def _handle_event(
165        self,
166        service: UpnpService,
167        state_variables: Sequence[UpnpStateVariable[Any]],
168    ) -> None:
169        """Handle state variable(s) changed event from DLNA device."""
170        if not state_variables:
171            # Indicates a failure to resubscribe, check if device is still available
172            self.force_poll = True
173            return
174        if service.service_id == "urn:upnp-org:serviceId:AVTransport":
175            for state_variable in state_variables:
176                # Force a state refresh when player begins or pauses playback
177                # to update the position info.
178                if state_variable.name == "TransportState" and state_variable.value in (
179                    TransportState.PLAYING,
180                    TransportState.PAUSED_PLAYBACK,
181                ):
182                    self.force_poll = True
183                    self.mass.create_task(self.poll())
184                    self.logger.log(
185                        VERBOSE_LOG_LEVEL,
186                        "Received new state from event for Player %s: %s",
187                        self.display_name,
188                        state_variable.value,
189                    )
190        self.last_seen = time.time()
191        self.mass.create_task(self._update_player())
192
193    async def _update_player(self) -> None:
194        """Update DLNA Player."""
195        prev_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
196        prev_state = self.state
197        await self.set_dynamic_attributes()
198        current_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
199        current_state = self.state
200
201        if (prev_url != current_url) or (prev_state != current_state):
202            # fetch track details on state or url change
203            self.force_poll = True
204
205        try:
206            self.update_state()
207        except (KeyError, TypeError):
208            # at start the update might come faster than the config is initialized
209            await asyncio.sleep(2)
210            self.update_state()
211
212    def _set_player_features(self) -> None:
213        """Set Player Features based on config values and capabilities."""
214        assert self.device is not None  # for type checking
215        supported_features: set[PlayerFeature] = set()
216
217        # Only add PLAY_MEDIA if the device actually supports playback
218        # Passive speakers (like stereo pair satellites) don't have play capability
219        if self.device.has_play_media:
220            supported_features.add(PlayerFeature.PLAY_MEDIA)
221            # there is no way to check if a dlna player support enqueuing
222            # so we simply assume it does and if it doesn't
223            # you'll find out at playback time and we log a warning
224            supported_features.add(PlayerFeature.ENQUEUE)
225            supported_features.add(PlayerFeature.GAPLESS_PLAYBACK)
226
227        if self.device.has_volume_level:
228            supported_features.add(PlayerFeature.VOLUME_SET)
229        if self.device.has_volume_mute:
230            supported_features.add(PlayerFeature.VOLUME_MUTE)
231        if self.device.has_pause:
232            supported_features.add(PlayerFeature.PAUSE)
233        self._attr_supported_features = supported_features
234
235    async def setup(self) -> bool:
236        """Set up player in MA.
237
238        :return: True if setup was successful, False if device should be ignored.
239        """
240        await self._device_connect()
241
242        if self.device and not self.device.has_play_media:
243            self.logger.debug("Ignoring %s - no play capability", self.device.name)
244            return False
245
246        if self.device and await self._is_sonos_passive_speaker():
247            self.logger.debug("Ignoring %s - passive stereo pair speaker", self.device.name)
248            return False
249
250        self.set_static_attributes()
251        await self.mass.players.register_or_update(self)
252        return True
253
254    async def _is_sonos_passive_speaker(self) -> bool:
255        """Check if this is a Sonos passive stereo pair speaker.
256
257        Queries the device's own topology. If that returns 403, the device is
258        considered passive (passive satellites and speakers with UPnP disabled
259        block topology queries). If successful, checks for Invisible="1" attribute.
260        """
261        if not self.device:
262            return False
263
264        manufacturer = (self.device.manufacturer or "").lower()
265        if "sonos" not in manufacturer:
266            return False
267
268        # Extract base UUID (strip "uuid:" prefix and "_MR" suffix)
269        our_uuid = self.player_id.removeprefix("uuid:").removesuffix("_MR")
270
271        # Query this device's topology
272        upnp_device = self.device.profile_device.root_device
273        result = await self._check_invisible_in_topology(upnp_device, our_uuid)
274
275        # Return the result: True if passive/403, False if active or check failed
276        return result if result is not None else False
277
278    async def _check_invisible_in_topology(
279        self, upnp_device: UpnpDevice, our_uuid: str
280    ) -> bool | None:
281        """Check if our UUID is marked as Invisible in the topology.
282
283        :param upnp_device: UPnP device to query
284        :param our_uuid: Our device UUID to search for
285        :return: True if invisible/403 error, False if visible, None if check failed
286        """
287        zone_topology_service = None
288        for service in upnp_device.all_services:
289            if "ZoneGroupTopology" in service.service_type:
290                zone_topology_service = service
291                break
292
293        if not zone_topology_service:
294            return None
295
296        try:
297            action = zone_topology_service.action("GetZoneGroupState")
298            if not action:
299                return None
300
301            result = await action.async_call()
302            zone_group_state_xml = result.get("ZoneGroupState", "")
303            if not zone_group_state_xml:
304                return None
305
306            root = DefusedET.fromstring(zone_group_state_xml)
307            for member in root.iter("ZoneGroupMember"):
308                if member.get("UUID", "").upper() == our_uuid.upper():
309                    return str(member.get("Invisible", "0")) == "1"
310
311        except UpnpResponseError as err:
312            # 403 Forbidden indicates passive satellite (blocks topology queries)
313            if "403" in str(err):
314                self.logger.debug(
315                    "Sonos device %s returned 403 - treating as passive satellite",
316                    our_uuid,
317                )
318                return True
319            self.logger.log(
320                VERBOSE_LOG_LEVEL,
321                "Error checking Sonos zone topology: %s",
322                err,
323            )
324        except (UpnpError, DefusedET.ParseError) as err:
325            self.logger.log(
326                VERBOSE_LOG_LEVEL,
327                "Error checking Sonos zone topology: %s",
328                err,
329            )
330
331        return None
332
333    def set_static_attributes(self) -> None:
334        """Set static attributes."""
335        self._attr_needs_poll = True
336        self._attr_poll_interval = 30
337        self._set_player_features()
338
339    async def set_dynamic_attributes(self) -> None:
340        """Set dynamic attributes."""
341        available = self.device is not None and self.device.profile_device.available
342        self._attr_available = available
343        if not available:
344            return
345        assert self.device is not None  # for type checking
346        self._attr_name = self.device.name
347        self._attr_volume_level = int((self.device.volume_level or 0) * 100)
348        self._attr_volume_muted = self.device.is_volume_muted or False
349        _playback_state = self._get_playback_state()
350        assert _playback_state is not None  # for type checking
351        self._attr_playback_state = _playback_state
352
353        _device_uri = self.device.current_track_uri or ""
354        self.set_current_media(uri=_device_uri, clear_all=True)
355
356        # Let player controller determine active source, only override for known external sources
357        if _device_uri and _device_uri.startswith(self.mass.streams.base_url):
358            # MA stream - let controller determine source
359            self._attr_active_source = None
360        elif "spotify" in _device_uri:
361            # Spotify or Spotify Connect
362            self._attr_active_source = "spotify"
363        elif _device_uri:
364            # External HTTP source
365            self._attr_active_source = "http"
366        else:
367            # No URI - idle or unknown
368            self._attr_active_source = None
369        # TODO: extend this list with other possible sources
370        if self.device.media_position:
371            # only update elapsed_time if the device actually reports it
372            self._attr_elapsed_time = float(self.device.media_position)
373            if self.device.media_position_updated_at is not None:
374                self._attr_elapsed_time_last_updated = (
375                    self.device.media_position_updated_at.timestamp()
376                )
377
378    def _get_playback_state(self) -> PlaybackState | None:
379        """Return current PlaybackState of the player."""
380        if self.device is None:
381            return None
382        if self.device.transport_state is None:
383            return PlaybackState.IDLE
384        if self.device.transport_state in (
385            TransportState.PLAYING,
386            TransportState.TRANSITIONING,
387        ):
388            return PlaybackState.PLAYING
389        if self.device.transport_state in (
390            TransportState.PAUSED_PLAYBACK,
391            TransportState.PAUSED_RECORDING,
392        ):
393            return PlaybackState.PAUSED
394        if self.device.transport_state == TransportState.VENDOR_DEFINED:
395            # Unable to map this state to anything reasonable, fallback to idle
396            return PlaybackState.IDLE
397
398        return PlaybackState.IDLE
399
400    async def get_config_entries(
401        self,
402        action: str | None = None,
403        values: dict[str, ConfigValueType] | None = None,
404    ) -> list[ConfigEntry]:
405        """Return all (provider/player specific) Config Entries for the given player (if any)."""
406        return [*PLAYER_CONFIG_ENTRIES]
407
408    # COMMANDS
409    @catch_request_errors
410    async def stop(self) -> None:
411        """Send STOP command to given player."""
412        assert self.device is not None  # for type checking
413        await self.device.async_stop()
414
415    @catch_request_errors
416    async def play(self) -> None:
417        """Send PLAY command to given player."""
418        assert self.device is not None  # for type checking
419        await self.device.async_play()
420
421    @catch_request_errors
422    async def play_media(self, media: PlayerMedia) -> None:
423        """Handle PLAY MEDIA on given player."""
424        assert self.device is not None  # for type checking
425        # always clear queue (by sending stop) first
426        if self.device.can_stop:
427            await self.stop()
428        didl_metadata = create_didl_metadata(media)
429        title = media.title or media.uri
430        url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
431        await self.device.async_set_transport_uri(url, title, didl_metadata)
432        # Play it
433        await self.device.async_wait_for_can_play(10)
434        # optimistically set this timestamp to help in case of a player
435        # that does not report the progress
436        self._attr_elapsed_time = 0
437        self._attr_elapsed_time_last_updated = time.time()
438        await self.device.async_play()
439        # force poll the device
440        for sleep in (1, 2):
441            await asyncio.sleep(sleep)
442            self.force_poll = True
443            await self.poll()
444
445    @catch_request_errors
446    async def enqueue_next_media(self, media: PlayerMedia) -> None:
447        """Handle enqueuing of the next queue item on the player."""
448        assert self.device is not None  # for type checking
449        didl_metadata = create_didl_metadata(media)
450        title = media.title or media.uri
451        try:
452            await self.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
453        except UpnpError:
454            self.logger.error(
455                "Enqueuing the next track failed for player %s - "
456                "the player probably doesn't support this. "
457                "Enable 'flow mode' for this player.",
458                self.display_name,
459            )
460
461    @catch_request_errors
462    async def pause(self) -> None:
463        """Send PAUSE command to given player."""
464        assert self.device is not None  # for type checking
465        if self.device.can_pause:
466            await self.device.async_pause()
467        else:
468            await self.device.async_stop()
469
470    @catch_request_errors
471    async def volume_set(self, volume_level: int) -> None:
472        """Send VOLUME_SET command to given player."""
473        assert self.device is not None  # for type checking
474        await self.device.async_set_volume_level(volume_level / 100)
475
476    @catch_request_errors
477    async def volume_mute(self, muted: bool) -> None:
478        """Send VOLUME MUTE command to given player."""
479        assert self.device is not None  # for type checking
480        await self.device.async_mute_volume(muted)
481
482    async def poll(self) -> None:
483        """Poll player for state updates."""
484        # try to reconnect the device if the connection was lost
485        if not self.device:
486            if not self.force_poll:
487                return
488            try:
489                await self._device_connect()
490            except UpnpError as err:
491                raise PlayerUnavailableError from err
492
493        assert self.device is not None
494
495        try:
496            now = time.time()
497            do_ping = self.force_poll or (now - self.last_seen) > 60
498            with suppress(ValueError):
499                await self.device.async_update(do_ping=do_ping)
500            self.last_seen = now if do_ping else self.last_seen
501        except UpnpError as err:
502            self.logger.debug("Device unavailable: %r", err)
503            if TYPE_CHECKING:
504                assert isinstance(self.provider, DLNAPlayerProvider)  # for type checking
505            await self.provider._device_disconnect(self)
506            raise PlayerUnavailableError from err
507        finally:
508            self.force_poll = False
509
510    @staticmethod
511    def _extract_mac_from_uuid(uuid_value: str) -> str | None:
512        """Try to extract MAC address from UUID.
513
514        Many UPnP devices embed the MAC address in the last 12 hex characters of the UUID.
515        E.g., uuid:4d691234-444c-164e-1234-001f33eaacf1 -> 00:1f:33:ea:ac:f1
516
517        :param uuid_value: The UUID string (without 'uuid:' prefix).
518        :return: MAC address string in XX:XX:XX:XX:XX:XX format, or None if not extractable.
519        """
520        # Remove dashes and get last 12 hex characters
521        hex_chars = uuid_value.replace("-", "")
522        if len(hex_chars) < 12:
523            return None
524
525        mac_hex = hex_chars[-12:]
526
527        # Validate it looks like a MAC (all hex characters)
528        try:
529            int(mac_hex, 16)
530        except ValueError:
531            return None
532
533        # Check if it could be a valid MAC (not all zeros or all ones)
534        if mac_hex in ("000000000000", "ffffffffffff", "FFFFFFFFFFFF"):
535            return None
536
537        # Format as XX:XX:XX:XX:XX:XX
538        return ":".join(mac_hex[i : i + 2].upper() for i in range(0, 12, 2))
539