/
/
/
1"""DLNA Player."""
2
3import asyncio
4import functools
5import time
6from collections.abc import Awaitable, Callable, Coroutine, Sequence
7from contextlib import suppress
8from typing import TYPE_CHECKING, Any, Concatenate
9from urllib.parse import urlparse
10
11import defusedxml.ElementTree as DefusedET
12from async_upnp_client.client import UpnpDevice, UpnpService, UpnpStateVariable
13from async_upnp_client.exceptions import UpnpError, UpnpResponseError
14from async_upnp_client.profiles.dlna import DmrDevice, TransportState
15from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
16from music_assistant_models.enums import IdentifierType, PlaybackState, PlayerFeature, PlayerType
17from music_assistant_models.errors import PlayerUnavailableError
18from music_assistant_models.player import PlayerMedia
19
20from music_assistant.constants import VERBOSE_LOG_LEVEL
21from music_assistant.helpers.upnp import create_didl_metadata
22from music_assistant.helpers.util import is_valid_mac_address
23from music_assistant.models.player import DeviceInfo, Player
24
25from .constants import PLAYER_CONFIG_ENTRIES
26
27if TYPE_CHECKING:
28 from .provider import DLNAPlayerProvider
29
30
31def catch_request_errors[DLNAPlayerT: "DLNAPlayer", **P, R](
32 func: Callable[Concatenate[DLNAPlayerT, P], Awaitable[R]],
33) -> Callable[Concatenate[DLNAPlayerT, P], Coroutine[Any, Any, R | None]]:
34 """Catch UpnpError errors."""
35
36 @functools.wraps(func)
37 async def wrapper(self: DLNAPlayerT, *args: P.args, **kwargs: P.kwargs) -> R | None:
38 """Catch UpnpError errors and check availability before and after request."""
39 self.last_command = time.time()
40 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
41 self.logger.debug(
42 "Handling command %s for player %s",
43 func.__name__,
44 self.display_name,
45 )
46 if not self.available:
47 self.logger.warning("Device disappeared when trying to call %s", func.__name__)
48 return None
49 try:
50 return await func(self, *args, **kwargs)
51 except UpnpError as err:
52 self.force_poll = True
53 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
54 self.logger.exception("Error during call %s: %r", func.__name__, err)
55 else:
56 self.logger.error("Error during call %s: %r", func.__name__, str(err))
57 return None
58
59 return wrapper
60
61
62class DLNAPlayer(Player):
63 """DLNA Player.
64
65 All DLNA players are considered generic protocol endpoints (PlayerType.PROTOCOL)
66 and will be wrapped in a UniversalPlayer. Devices with native provider support
67 (e.g., Sonos) are handled by their respective providers and will link to
68 the DLNA player as a protocol output.
69 """
70
71 # All DLNA devices are generic protocol endpoints - no vendor has native DLNA support in MA
72 _attr_type = PlayerType.PROTOCOL
73
74 def __init__(
75 self,
76 provider: "DLNAPlayerProvider",
77 player_id: str,
78 description_url: str,
79 device: DmrDevice | None = None,
80 ) -> None:
81 """Init Player.
82
83 The player_id is the udn.
84 """
85 super().__init__(provider, player_id)
86
87 self.device = device
88 self.description_url = description_url # last known location (description.xml) url
89
90 self.lock = asyncio.Lock() # Held when connecting or disconnecting the device
91
92 self.force_poll = False # used, if connection is lost
93
94 # ssdp_connect_failed: bool = False
95 #
96 # Track BOOTID in SSDP advertisements for device changes
97 self.bootid: int | None = None
98 self.last_seen = time.time()
99 self.last_command = time.time()
100
101 def set_available(self, available: bool) -> None:
102 """Set the availability of the player."""
103 self._attr_available = available
104
105 async def _device_connect(self) -> None:
106 """Connect DLNA/DMR Device."""
107 self.logger.debug("Connecting to device at %s", self.description_url)
108
109 async with self.lock:
110 if self.device:
111 self.logger.debug("Trying to connect when device already connected")
112 return
113
114 # Connect to the base UPNP device
115 if TYPE_CHECKING:
116 assert isinstance(self.provider, DLNAPlayerProvider) # for type checking
117 upnp_device = await self.provider.upnp_factory.async_create_device(self.description_url)
118
119 # Create profile wrapper
120 self.device = DmrDevice(upnp_device, self.provider.notify_server.event_handler)
121
122 # Subscribe to event notifications
123 try:
124 self.device.on_event = self._handle_event
125 await self.device.async_subscribe_services(auto_resubscribe=True)
126 except UpnpResponseError as err:
127 # Device rejected subscription request. This is OK, variables
128 # will be polled instead.
129 self.logger.debug("Device rejected subscription: %r", err)
130 except UpnpError as err:
131 # Don't leave the device half-constructed
132 self.device.on_event = None
133 self.device = None
134 self.logger.debug("Error while subscribing during device connect: %r", err)
135 raise
136 else:
137 # connect was successful, update device info
138 self._attr_device_info = DeviceInfo(
139 model=self.device.model_name,
140 manufacturer=self.device.manufacturer,
141 )
142 # Add UDN (player_id) as UUID identifier for matching with other protocols
143 # Strip the "uuid:" prefix if present for proper matching
144 uuid_value = self.player_id
145 if uuid_value.lower().startswith("uuid:"):
146 uuid_value = uuid_value[5:]
147 self._attr_device_info.add_identifier(IdentifierType.UUID, uuid_value)
148 # Try to extract MAC address from UUID
149 # Many UPnP devices embed MAC in the last 12 chars of UUID
150 # e.g., uuid:4d691234-444c-164e-1234-001f33eaacf1 -> 00:1f:33:ea:ac:f1
151 mac_address = self._extract_mac_from_uuid(uuid_value)
152 # Only add MAC address if it's valid (not 00:00:00:00:00:00)
153 if mac_address and is_valid_mac_address(mac_address):
154 self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac_address)
155 # Try to extract just the IP from the URL for matching
156 ip_address = self.device.device.presentation_url or self.description_url
157 with suppress(ValueError):
158 parsed = urlparse(ip_address)
159 if parsed.hostname:
160 self._attr_device_info.add_identifier(
161 IdentifierType.IP_ADDRESS, parsed.hostname
162 )
163
164 def _handle_event(
165 self,
166 service: UpnpService,
167 state_variables: Sequence[UpnpStateVariable[Any]],
168 ) -> None:
169 """Handle state variable(s) changed event from DLNA device."""
170 if not state_variables:
171 # Indicates a failure to resubscribe, check if device is still available
172 self.force_poll = True
173 return
174 if service.service_id == "urn:upnp-org:serviceId:AVTransport":
175 for state_variable in state_variables:
176 # Force a state refresh when player begins or pauses playback
177 # to update the position info.
178 if state_variable.name == "TransportState" and state_variable.value in (
179 TransportState.PLAYING,
180 TransportState.PAUSED_PLAYBACK,
181 ):
182 self.force_poll = True
183 self.mass.create_task(self.poll())
184 self.logger.log(
185 VERBOSE_LOG_LEVEL,
186 "Received new state from event for Player %s: %s",
187 self.display_name,
188 state_variable.value,
189 )
190 self.last_seen = time.time()
191 self.mass.create_task(self._update_player())
192
193 async def _update_player(self) -> None:
194 """Update DLNA Player."""
195 prev_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
196 prev_state = self.state
197 await self.set_dynamic_attributes()
198 current_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
199 current_state = self.state
200
201 if (prev_url != current_url) or (prev_state != current_state):
202 # fetch track details on state or url change
203 self.force_poll = True
204
205 try:
206 self.update_state()
207 except (KeyError, TypeError):
208 # at start the update might come faster than the config is initialized
209 await asyncio.sleep(2)
210 self.update_state()
211
212 def _set_player_features(self) -> None:
213 """Set Player Features based on config values and capabilities."""
214 assert self.device is not None # for type checking
215 supported_features: set[PlayerFeature] = set()
216
217 # Only add PLAY_MEDIA if the device actually supports playback
218 # Passive speakers (like stereo pair satellites) don't have play capability
219 if self.device.has_play_media:
220 supported_features.add(PlayerFeature.PLAY_MEDIA)
221 # there is no way to check if a dlna player support enqueuing
222 # so we simply assume it does and if it doesn't
223 # you'll find out at playback time and we log a warning
224 supported_features.add(PlayerFeature.ENQUEUE)
225 supported_features.add(PlayerFeature.GAPLESS_PLAYBACK)
226
227 if self.device.has_volume_level:
228 supported_features.add(PlayerFeature.VOLUME_SET)
229 if self.device.has_volume_mute:
230 supported_features.add(PlayerFeature.VOLUME_MUTE)
231 if self.device.has_pause:
232 supported_features.add(PlayerFeature.PAUSE)
233 self._attr_supported_features = supported_features
234
235 async def setup(self) -> bool:
236 """Set up player in MA.
237
238 :return: True if setup was successful, False if device should be ignored.
239 """
240 await self._device_connect()
241
242 if self.device and not self.device.has_play_media:
243 self.logger.debug("Ignoring %s - no play capability", self.device.name)
244 return False
245
246 if self.device and await self._is_sonos_passive_speaker():
247 self.logger.debug("Ignoring %s - passive stereo pair speaker", self.device.name)
248 return False
249
250 self.set_static_attributes()
251 await self.mass.players.register_or_update(self)
252 return True
253
254 async def _is_sonos_passive_speaker(self) -> bool:
255 """Check if this is a Sonos passive stereo pair speaker.
256
257 Queries the device's own topology. If that returns 403, the device is
258 considered passive (passive satellites and speakers with UPnP disabled
259 block topology queries). If successful, checks for Invisible="1" attribute.
260 """
261 if not self.device:
262 return False
263
264 manufacturer = (self.device.manufacturer or "").lower()
265 if "sonos" not in manufacturer:
266 return False
267
268 # Extract base UUID (strip "uuid:" prefix and "_MR" suffix)
269 our_uuid = self.player_id.removeprefix("uuid:").removesuffix("_MR")
270
271 # Query this device's topology
272 upnp_device = self.device.profile_device.root_device
273 result = await self._check_invisible_in_topology(upnp_device, our_uuid)
274
275 # Return the result: True if passive/403, False if active or check failed
276 return result if result is not None else False
277
278 async def _check_invisible_in_topology(
279 self, upnp_device: UpnpDevice, our_uuid: str
280 ) -> bool | None:
281 """Check if our UUID is marked as Invisible in the topology.
282
283 :param upnp_device: UPnP device to query
284 :param our_uuid: Our device UUID to search for
285 :return: True if invisible/403 error, False if visible, None if check failed
286 """
287 zone_topology_service = None
288 for service in upnp_device.all_services:
289 if "ZoneGroupTopology" in service.service_type:
290 zone_topology_service = service
291 break
292
293 if not zone_topology_service:
294 return None
295
296 try:
297 action = zone_topology_service.action("GetZoneGroupState")
298 if not action:
299 return None
300
301 result = await action.async_call()
302 zone_group_state_xml = result.get("ZoneGroupState", "")
303 if not zone_group_state_xml:
304 return None
305
306 root = DefusedET.fromstring(zone_group_state_xml)
307 for member in root.iter("ZoneGroupMember"):
308 if member.get("UUID", "").upper() == our_uuid.upper():
309 return str(member.get("Invisible", "0")) == "1"
310
311 except UpnpResponseError as err:
312 # 403 Forbidden indicates passive satellite (blocks topology queries)
313 if "403" in str(err):
314 self.logger.debug(
315 "Sonos device %s returned 403 - treating as passive satellite",
316 our_uuid,
317 )
318 return True
319 self.logger.log(
320 VERBOSE_LOG_LEVEL,
321 "Error checking Sonos zone topology: %s",
322 err,
323 )
324 except (UpnpError, DefusedET.ParseError) as err:
325 self.logger.log(
326 VERBOSE_LOG_LEVEL,
327 "Error checking Sonos zone topology: %s",
328 err,
329 )
330
331 return None
332
333 def set_static_attributes(self) -> None:
334 """Set static attributes."""
335 self._attr_needs_poll = True
336 self._attr_poll_interval = 30
337 self._set_player_features()
338
339 async def set_dynamic_attributes(self) -> None:
340 """Set dynamic attributes."""
341 available = self.device is not None and self.device.profile_device.available
342 self._attr_available = available
343 if not available:
344 return
345 assert self.device is not None # for type checking
346 self._attr_name = self.device.name
347 self._attr_volume_level = int((self.device.volume_level or 0) * 100)
348 self._attr_volume_muted = self.device.is_volume_muted or False
349 _playback_state = self._get_playback_state()
350 assert _playback_state is not None # for type checking
351 self._attr_playback_state = _playback_state
352
353 _device_uri = self.device.current_track_uri or ""
354 self.set_current_media(uri=_device_uri, clear_all=True)
355
356 # Let player controller determine active source, only override for known external sources
357 if _device_uri and _device_uri.startswith(self.mass.streams.base_url):
358 # MA stream - let controller determine source
359 self._attr_active_source = None
360 elif "spotify" in _device_uri:
361 # Spotify or Spotify Connect
362 self._attr_active_source = "spotify"
363 elif _device_uri:
364 # External HTTP source
365 self._attr_active_source = "http"
366 else:
367 # No URI - idle or unknown
368 self._attr_active_source = None
369 # TODO: extend this list with other possible sources
370 if self.device.media_position:
371 # only update elapsed_time if the device actually reports it
372 self._attr_elapsed_time = float(self.device.media_position)
373 if self.device.media_position_updated_at is not None:
374 self._attr_elapsed_time_last_updated = (
375 self.device.media_position_updated_at.timestamp()
376 )
377
378 def _get_playback_state(self) -> PlaybackState | None:
379 """Return current PlaybackState of the player."""
380 if self.device is None:
381 return None
382 if self.device.transport_state is None:
383 return PlaybackState.IDLE
384 if self.device.transport_state in (
385 TransportState.PLAYING,
386 TransportState.TRANSITIONING,
387 ):
388 return PlaybackState.PLAYING
389 if self.device.transport_state in (
390 TransportState.PAUSED_PLAYBACK,
391 TransportState.PAUSED_RECORDING,
392 ):
393 return PlaybackState.PAUSED
394 if self.device.transport_state == TransportState.VENDOR_DEFINED:
395 # Unable to map this state to anything reasonable, fallback to idle
396 return PlaybackState.IDLE
397
398 return PlaybackState.IDLE
399
400 async def get_config_entries(
401 self,
402 action: str | None = None,
403 values: dict[str, ConfigValueType] | None = None,
404 ) -> list[ConfigEntry]:
405 """Return all (provider/player specific) Config Entries for the given player (if any)."""
406 return [*PLAYER_CONFIG_ENTRIES]
407
408 # COMMANDS
409 @catch_request_errors
410 async def stop(self) -> None:
411 """Send STOP command to given player."""
412 assert self.device is not None # for type checking
413 await self.device.async_stop()
414
415 @catch_request_errors
416 async def play(self) -> None:
417 """Send PLAY command to given player."""
418 assert self.device is not None # for type checking
419 await self.device.async_play()
420
421 @catch_request_errors
422 async def play_media(self, media: PlayerMedia) -> None:
423 """Handle PLAY MEDIA on given player."""
424 assert self.device is not None # for type checking
425 # always clear queue (by sending stop) first
426 if self.device.can_stop:
427 await self.stop()
428 didl_metadata = create_didl_metadata(media)
429 title = media.title or media.uri
430 url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
431 await self.device.async_set_transport_uri(url, title, didl_metadata)
432 # Play it
433 await self.device.async_wait_for_can_play(10)
434 # optimistically set this timestamp to help in case of a player
435 # that does not report the progress
436 self._attr_elapsed_time = 0
437 self._attr_elapsed_time_last_updated = time.time()
438 await self.device.async_play()
439 # force poll the device
440 for sleep in (1, 2):
441 await asyncio.sleep(sleep)
442 self.force_poll = True
443 await self.poll()
444
445 @catch_request_errors
446 async def enqueue_next_media(self, media: PlayerMedia) -> None:
447 """Handle enqueuing of the next queue item on the player."""
448 assert self.device is not None # for type checking
449 didl_metadata = create_didl_metadata(media)
450 title = media.title or media.uri
451 try:
452 await self.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
453 except UpnpError:
454 self.logger.error(
455 "Enqueuing the next track failed for player %s - "
456 "the player probably doesn't support this. "
457 "Enable 'flow mode' for this player.",
458 self.display_name,
459 )
460
461 @catch_request_errors
462 async def pause(self) -> None:
463 """Send PAUSE command to given player."""
464 assert self.device is not None # for type checking
465 if self.device.can_pause:
466 await self.device.async_pause()
467 else:
468 await self.device.async_stop()
469
470 @catch_request_errors
471 async def volume_set(self, volume_level: int) -> None:
472 """Send VOLUME_SET command to given player."""
473 assert self.device is not None # for type checking
474 await self.device.async_set_volume_level(volume_level / 100)
475
476 @catch_request_errors
477 async def volume_mute(self, muted: bool) -> None:
478 """Send VOLUME MUTE command to given player."""
479 assert self.device is not None # for type checking
480 await self.device.async_mute_volume(muted)
481
482 async def poll(self) -> None:
483 """Poll player for state updates."""
484 # try to reconnect the device if the connection was lost
485 if not self.device:
486 if not self.force_poll:
487 return
488 try:
489 await self._device_connect()
490 except UpnpError as err:
491 raise PlayerUnavailableError from err
492
493 assert self.device is not None
494
495 try:
496 now = time.time()
497 do_ping = self.force_poll or (now - self.last_seen) > 60
498 with suppress(ValueError):
499 await self.device.async_update(do_ping=do_ping)
500 self.last_seen = now if do_ping else self.last_seen
501 except UpnpError as err:
502 self.logger.debug("Device unavailable: %r", err)
503 await self._device_disconnect()
504 raise PlayerUnavailableError from err
505 finally:
506 self.force_poll = False
507
508 async def on_unload(self) -> None:
509 """Handle logic when the player is unloaded from the Player controller."""
510 await super().on_unload()
511 await self._device_disconnect()
512
513 async def _device_disconnect(self) -> None:
514 """Destroy connections to the device."""
515 async with self.lock:
516 if not self.device:
517 self.logger.debug("Disconnecting from device that's not connected")
518 return
519
520 self.logger.debug("Disconnecting from %s", self.device.name)
521
522 self.device.on_event = None
523 old_device = self.device
524 self.device = None
525 self.set_available(False)
526 await old_device.async_unsubscribe_services()
527
528 @staticmethod
529 def _extract_mac_from_uuid(uuid_value: str) -> str | None:
530 """Try to extract MAC address from UUID.
531
532 Many UPnP devices embed the MAC address in the last 12 hex characters of the UUID.
533 E.g., uuid:4d691234-444c-164e-1234-001f33eaacf1 -> 00:1f:33:ea:ac:f1
534
535 :param uuid_value: The UUID string (without 'uuid:' prefix).
536 :return: MAC address string in XX:XX:XX:XX:XX:XX format, or None if not extractable.
537 """
538 # Remove dashes and get last 12 hex characters
539 hex_chars = uuid_value.replace("-", "")
540 if len(hex_chars) < 12:
541 return None
542
543 mac_hex = hex_chars[-12:]
544
545 # Validate it looks like a MAC (all hex characters)
546 try:
547 int(mac_hex, 16)
548 except ValueError:
549 return None
550
551 # Check if it could be a valid MAC (not all zeros or all ones)
552 if mac_hex in ("000000000000", "ffffffffffff", "FFFFFFFFFFFF"):
553 return None
554
555 # Format as XX:XX:XX:XX:XX:XX
556 return ":".join(mac_hex[i : i + 2].upper() for i in range(0, 12, 2))
557