/
/
/
1"""DLNA Player."""
2
3import asyncio
4import functools
5import time
6from collections.abc import Awaitable, Callable, Coroutine, Sequence
7from contextlib import suppress
8from typing import TYPE_CHECKING, Any, Concatenate
9from urllib.parse import urlparse
10
11import defusedxml.ElementTree as DefusedET
12from async_upnp_client.client import UpnpDevice, UpnpService, UpnpStateVariable
13from async_upnp_client.exceptions import UpnpError, UpnpResponseError
14from async_upnp_client.profiles.dlna import DmrDevice, TransportState
15from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
16from music_assistant_models.enums import IdentifierType, PlaybackState, PlayerFeature, PlayerType
17from music_assistant_models.errors import PlayerUnavailableError
18from music_assistant_models.player import PlayerMedia
19
20from music_assistant.constants import VERBOSE_LOG_LEVEL
21from music_assistant.helpers.upnp import create_didl_metadata
22from music_assistant.models.player import DeviceInfo, Player
23
24from .constants import PLAYER_CONFIG_ENTRIES
25
26if TYPE_CHECKING:
27 from .provider import DLNAPlayerProvider
28
29
30def catch_request_errors[DLNAPlayerT: "DLNAPlayer", **P, R](
31 func: Callable[Concatenate[DLNAPlayerT, P], Awaitable[R]],
32) -> Callable[Concatenate[DLNAPlayerT, P], Coroutine[Any, Any, R | None]]:
33 """Catch UpnpError errors."""
34
35 @functools.wraps(func)
36 async def wrapper(self: DLNAPlayerT, *args: P.args, **kwargs: P.kwargs) -> R | None:
37 """Catch UpnpError errors and check availability before and after request."""
38 self.last_command = time.time()
39 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
40 self.logger.debug(
41 "Handling command %s for player %s",
42 func.__name__,
43 self.display_name,
44 )
45 if not self.available:
46 self.logger.warning("Device disappeared when trying to call %s", func.__name__)
47 return None
48 try:
49 return await func(self, *args, **kwargs)
50 except UpnpError as err:
51 self.force_poll = True
52 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
53 self.logger.exception("Error during call %s: %r", func.__name__, err)
54 else:
55 self.logger.error("Error during call %s: %r", func.__name__, str(err))
56 return None
57
58 return wrapper
59
60
61class DLNAPlayer(Player):
62 """DLNA Player.
63
64 All DLNA players are considered generic protocol endpoints (PlayerType.PROTOCOL)
65 and will be wrapped in a UniversalPlayer. Devices with native provider support
66 (e.g., Sonos) are handled by their respective providers and will link to
67 the DLNA player as a protocol output.
68 """
69
70 # All DLNA devices are generic protocol endpoints - no vendor has native DLNA support in MA
71 _attr_type = PlayerType.PROTOCOL
72
73 def __init__(
74 self,
75 provider: "DLNAPlayerProvider",
76 player_id: str,
77 description_url: str,
78 device: DmrDevice | None = None,
79 ) -> None:
80 """Init Player.
81
82 The player_id is the udn.
83 """
84 super().__init__(provider, player_id)
85
86 self.device = device
87 self.description_url = description_url # last known location (description.xml) url
88
89 self.lock = asyncio.Lock() # Held when connecting or disconnecting the device
90
91 self.force_poll = False # used, if connection is lost
92
93 # ssdp_connect_failed: bool = False
94 #
95 # Track BOOTID in SSDP advertisements for device changes
96 self.bootid: int | None = None
97 self.last_seen = time.time()
98 self.last_command = time.time()
99
100 def set_available(self, available: bool) -> None:
101 """Set the availability of the player."""
102 self._attr_available = available
103
104 async def _device_connect(self) -> None:
105 """Connect DLNA/DMR Device."""
106 self.logger.debug("Connecting to device at %s", self.description_url)
107
108 async with self.lock:
109 if self.device:
110 self.logger.debug("Trying to connect when device already connected")
111 return
112
113 # Connect to the base UPNP device
114 if TYPE_CHECKING:
115 assert isinstance(self.provider, DLNAPlayerProvider) # for type checking
116 upnp_device = await self.provider.upnp_factory.async_create_device(self.description_url)
117
118 # Create profile wrapper
119 self.device = DmrDevice(upnp_device, self.provider.notify_server.event_handler)
120
121 # Subscribe to event notifications
122 try:
123 self.device.on_event = self._handle_event
124 await self.device.async_subscribe_services(auto_resubscribe=True)
125 except UpnpResponseError as err:
126 # Device rejected subscription request. This is OK, variables
127 # will be polled instead.
128 self.logger.debug("Device rejected subscription: %r", err)
129 except UpnpError as err:
130 # Don't leave the device half-constructed
131 self.device.on_event = None
132 self.device = None
133 self.logger.debug("Error while subscribing during device connect: %r", err)
134 raise
135 else:
136 # connect was successful, update device info
137 self._attr_device_info = DeviceInfo(
138 model=self.device.model_name,
139 manufacturer=self.device.manufacturer,
140 )
141 # Add UDN (player_id) as UUID identifier for matching with other protocols
142 # Strip the "uuid:" prefix if present for proper matching
143 uuid_value = self.player_id
144 if uuid_value.lower().startswith("uuid:"):
145 uuid_value = uuid_value[5:]
146 self._attr_device_info.add_identifier(IdentifierType.UUID, uuid_value)
147 # Try to extract MAC address from UUID
148 # Many UPnP devices embed MAC in the last 12 chars of UUID
149 # e.g., uuid:4d691234-444c-164e-1234-001f33eaacf1 -> 00:1f:33:ea:ac:f1
150 mac_address = self._extract_mac_from_uuid(uuid_value)
151 if mac_address:
152 self._attr_device_info.add_identifier(IdentifierType.MAC_ADDRESS, mac_address)
153 # Try to extract just the IP from the URL for matching
154 ip_address = self.device.device.presentation_url or self.description_url
155 with suppress(ValueError):
156 parsed = urlparse(ip_address)
157 if parsed.hostname:
158 self._attr_device_info.add_identifier(
159 IdentifierType.IP_ADDRESS, parsed.hostname
160 )
161
162 def _handle_event(
163 self,
164 service: UpnpService,
165 state_variables: Sequence[UpnpStateVariable[Any]],
166 ) -> None:
167 """Handle state variable(s) changed event from DLNA device."""
168 if not state_variables:
169 # Indicates a failure to resubscribe, check if device is still available
170 self.force_poll = True
171 return
172 if service.service_id == "urn:upnp-org:serviceId:AVTransport":
173 for state_variable in state_variables:
174 # Force a state refresh when player begins or pauses playback
175 # to update the position info.
176 if state_variable.name == "TransportState" and state_variable.value in (
177 TransportState.PLAYING,
178 TransportState.PAUSED_PLAYBACK,
179 ):
180 self.force_poll = True
181 self.mass.create_task(self.poll())
182 self.logger.log(
183 VERBOSE_LOG_LEVEL,
184 "Received new state from event for Player %s: %s",
185 self.display_name,
186 state_variable.value,
187 )
188 self.last_seen = time.time()
189 self.mass.create_task(self._update_player())
190
191 async def _update_player(self) -> None:
192 """Update DLNA Player."""
193 prev_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
194 prev_state = self.state
195 await self.set_dynamic_attributes()
196 current_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
197 current_state = self.state
198
199 if (prev_url != current_url) or (prev_state != current_state):
200 # fetch track details on state or url change
201 self.force_poll = True
202
203 try:
204 self.update_state()
205 except (KeyError, TypeError):
206 # at start the update might come faster than the config is initialized
207 await asyncio.sleep(2)
208 self.update_state()
209
210 def _set_player_features(self) -> None:
211 """Set Player Features based on config values and capabilities."""
212 assert self.device is not None # for type checking
213 supported_features: set[PlayerFeature] = set()
214
215 # Only add PLAY_MEDIA if the device actually supports playback
216 # Passive speakers (like stereo pair satellites) don't have play capability
217 if self.device.has_play_media:
218 supported_features.add(PlayerFeature.PLAY_MEDIA)
219 # there is no way to check if a dlna player support enqueuing
220 # so we simply assume it does and if it doesn't
221 # you'll find out at playback time and we log a warning
222 supported_features.add(PlayerFeature.ENQUEUE)
223 supported_features.add(PlayerFeature.GAPLESS_PLAYBACK)
224
225 if self.device.has_volume_level:
226 supported_features.add(PlayerFeature.VOLUME_SET)
227 if self.device.has_volume_mute:
228 supported_features.add(PlayerFeature.VOLUME_MUTE)
229 if self.device.has_pause:
230 supported_features.add(PlayerFeature.PAUSE)
231 self._attr_supported_features = supported_features
232
233 async def setup(self) -> bool:
234 """Set up player in MA.
235
236 :return: True if setup was successful, False if device should be ignored.
237 """
238 await self._device_connect()
239
240 if self.device and not self.device.has_play_media:
241 self.logger.debug("Ignoring %s - no play capability", self.device.name)
242 return False
243
244 if self.device and await self._is_sonos_passive_speaker():
245 self.logger.debug("Ignoring %s - passive stereo pair speaker", self.device.name)
246 return False
247
248 self.set_static_attributes()
249 await self.mass.players.register_or_update(self)
250 return True
251
252 async def _is_sonos_passive_speaker(self) -> bool:
253 """Check if this is a Sonos passive stereo pair speaker.
254
255 Queries the device's own topology. If that returns 403, the device is
256 considered passive (passive satellites and speakers with UPnP disabled
257 block topology queries). If successful, checks for Invisible="1" attribute.
258 """
259 if not self.device:
260 return False
261
262 manufacturer = (self.device.manufacturer or "").lower()
263 if "sonos" not in manufacturer:
264 return False
265
266 # Extract base UUID (strip "uuid:" prefix and "_MR" suffix)
267 our_uuid = self.player_id.removeprefix("uuid:").removesuffix("_MR")
268
269 # Query this device's topology
270 upnp_device = self.device.profile_device.root_device
271 result = await self._check_invisible_in_topology(upnp_device, our_uuid)
272
273 # Return the result: True if passive/403, False if active or check failed
274 return result if result is not None else False
275
276 async def _check_invisible_in_topology(
277 self, upnp_device: UpnpDevice, our_uuid: str
278 ) -> bool | None:
279 """Check if our UUID is marked as Invisible in the topology.
280
281 :param upnp_device: UPnP device to query
282 :param our_uuid: Our device UUID to search for
283 :return: True if invisible/403 error, False if visible, None if check failed
284 """
285 zone_topology_service = None
286 for service in upnp_device.all_services:
287 if "ZoneGroupTopology" in service.service_type:
288 zone_topology_service = service
289 break
290
291 if not zone_topology_service:
292 return None
293
294 try:
295 action = zone_topology_service.action("GetZoneGroupState")
296 if not action:
297 return None
298
299 result = await action.async_call()
300 zone_group_state_xml = result.get("ZoneGroupState", "")
301 if not zone_group_state_xml:
302 return None
303
304 root = DefusedET.fromstring(zone_group_state_xml)
305 for member in root.iter("ZoneGroupMember"):
306 if member.get("UUID", "").upper() == our_uuid.upper():
307 return str(member.get("Invisible", "0")) == "1"
308
309 except UpnpResponseError as err:
310 # 403 Forbidden indicates passive satellite (blocks topology queries)
311 if "403" in str(err):
312 self.logger.debug(
313 "Sonos device %s returned 403 - treating as passive satellite",
314 our_uuid,
315 )
316 return True
317 self.logger.log(
318 VERBOSE_LOG_LEVEL,
319 "Error checking Sonos zone topology: %s",
320 err,
321 )
322 except (UpnpError, DefusedET.ParseError) as err:
323 self.logger.log(
324 VERBOSE_LOG_LEVEL,
325 "Error checking Sonos zone topology: %s",
326 err,
327 )
328
329 return None
330
331 def set_static_attributes(self) -> None:
332 """Set static attributes."""
333 self._attr_needs_poll = True
334 self._attr_poll_interval = 30
335 self._set_player_features()
336
337 async def set_dynamic_attributes(self) -> None:
338 """Set dynamic attributes."""
339 available = self.device is not None and self.device.profile_device.available
340 self._attr_available = available
341 if not available:
342 return
343 assert self.device is not None # for type checking
344 self._attr_name = self.device.name
345 self._attr_volume_level = int((self.device.volume_level or 0) * 100)
346 self._attr_volume_muted = self.device.is_volume_muted or False
347 _playback_state = self._get_playback_state()
348 assert _playback_state is not None # for type checking
349 self._attr_playback_state = _playback_state
350
351 _device_uri = self.device.current_track_uri or ""
352 self.set_current_media(uri=_device_uri, clear_all=True)
353
354 # Let player controller determine active source, only override for known external sources
355 if _device_uri and _device_uri.startswith(self.mass.streams.base_url):
356 # MA stream - let controller determine source
357 self._attr_active_source = None
358 elif "spotify" in _device_uri:
359 # Spotify or Spotify Connect
360 self._attr_active_source = "spotify"
361 elif _device_uri:
362 # External HTTP source
363 self._attr_active_source = "http"
364 else:
365 # No URI - idle or unknown
366 self._attr_active_source = None
367 # TODO: extend this list with other possible sources
368 if self.device.media_position:
369 # only update elapsed_time if the device actually reports it
370 self._attr_elapsed_time = float(self.device.media_position)
371 if self.device.media_position_updated_at is not None:
372 self._attr_elapsed_time_last_updated = (
373 self.device.media_position_updated_at.timestamp()
374 )
375
376 def _get_playback_state(self) -> PlaybackState | None:
377 """Return current PlaybackState of the player."""
378 if self.device is None:
379 return None
380 if self.device.transport_state is None:
381 return PlaybackState.IDLE
382 if self.device.transport_state in (
383 TransportState.PLAYING,
384 TransportState.TRANSITIONING,
385 ):
386 return PlaybackState.PLAYING
387 if self.device.transport_state in (
388 TransportState.PAUSED_PLAYBACK,
389 TransportState.PAUSED_RECORDING,
390 ):
391 return PlaybackState.PAUSED
392 if self.device.transport_state == TransportState.VENDOR_DEFINED:
393 # Unable to map this state to anything reasonable, fallback to idle
394 return PlaybackState.IDLE
395
396 return PlaybackState.IDLE
397
398 async def get_config_entries(
399 self,
400 action: str | None = None,
401 values: dict[str, ConfigValueType] | None = None,
402 ) -> list[ConfigEntry]:
403 """Return all (provider/player specific) Config Entries for the given player (if any)."""
404 return [*PLAYER_CONFIG_ENTRIES]
405
406 # COMMANDS
407 @catch_request_errors
408 async def stop(self) -> None:
409 """Send STOP command to given player."""
410 assert self.device is not None # for type checking
411 await self.device.async_stop()
412
413 @catch_request_errors
414 async def play(self) -> None:
415 """Send PLAY command to given player."""
416 assert self.device is not None # for type checking
417 await self.device.async_play()
418
419 @catch_request_errors
420 async def play_media(self, media: PlayerMedia) -> None:
421 """Handle PLAY MEDIA on given player."""
422 assert self.device is not None # for type checking
423 # always clear queue (by sending stop) first
424 if self.device.can_stop:
425 await self.stop()
426 didl_metadata = create_didl_metadata(media)
427 title = media.title or media.uri
428 url = await self.provider.mass.streams.resolve_stream_url(self.player_id, media)
429 await self.device.async_set_transport_uri(url, title, didl_metadata)
430 # Play it
431 await self.device.async_wait_for_can_play(10)
432 # optimistically set this timestamp to help in case of a player
433 # that does not report the progress
434 self._attr_elapsed_time = 0
435 self._attr_elapsed_time_last_updated = time.time()
436 await self.device.async_play()
437 # force poll the device
438 for sleep in (1, 2):
439 await asyncio.sleep(sleep)
440 self.force_poll = True
441 await self.poll()
442
443 @catch_request_errors
444 async def enqueue_next_media(self, media: PlayerMedia) -> None:
445 """Handle enqueuing of the next queue item on the player."""
446 assert self.device is not None # for type checking
447 didl_metadata = create_didl_metadata(media)
448 title = media.title or media.uri
449 try:
450 await self.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
451 except UpnpError:
452 self.logger.error(
453 "Enqueuing the next track failed for player %s - "
454 "the player probably doesn't support this. "
455 "Enable 'flow mode' for this player.",
456 self.display_name,
457 )
458
459 @catch_request_errors
460 async def pause(self) -> None:
461 """Send PAUSE command to given player."""
462 assert self.device is not None # for type checking
463 if self.device.can_pause:
464 await self.device.async_pause()
465 else:
466 await self.device.async_stop()
467
468 @catch_request_errors
469 async def volume_set(self, volume_level: int) -> None:
470 """Send VOLUME_SET command to given player."""
471 assert self.device is not None # for type checking
472 await self.device.async_set_volume_level(volume_level / 100)
473
474 @catch_request_errors
475 async def volume_mute(self, muted: bool) -> None:
476 """Send VOLUME MUTE command to given player."""
477 assert self.device is not None # for type checking
478 await self.device.async_mute_volume(muted)
479
480 async def poll(self) -> None:
481 """Poll player for state updates."""
482 # try to reconnect the device if the connection was lost
483 if not self.device:
484 if not self.force_poll:
485 return
486 try:
487 await self._device_connect()
488 except UpnpError as err:
489 raise PlayerUnavailableError from err
490
491 assert self.device is not None
492
493 try:
494 now = time.time()
495 do_ping = self.force_poll or (now - self.last_seen) > 60
496 with suppress(ValueError):
497 await self.device.async_update(do_ping=do_ping)
498 self.last_seen = now if do_ping else self.last_seen
499 except UpnpError as err:
500 self.logger.debug("Device unavailable: %r", err)
501 if TYPE_CHECKING:
502 assert isinstance(self.provider, DLNAPlayerProvider) # for type checking
503 await self.provider._device_disconnect(self)
504 raise PlayerUnavailableError from err
505 finally:
506 self.force_poll = False
507
508 @staticmethod
509 def _extract_mac_from_uuid(uuid_value: str) -> str | None:
510 """Try to extract MAC address from UUID.
511
512 Many UPnP devices embed the MAC address in the last 12 hex characters of the UUID.
513 E.g., uuid:4d691234-444c-164e-1234-001f33eaacf1 -> 00:1f:33:ea:ac:f1
514
515 :param uuid_value: The UUID string (without 'uuid:' prefix).
516 :return: MAC address string in XX:XX:XX:XX:XX:XX format, or None if not extractable.
517 """
518 # Remove dashes and get last 12 hex characters
519 hex_chars = uuid_value.replace("-", "")
520 if len(hex_chars) < 12:
521 return None
522
523 mac_hex = hex_chars[-12:]
524
525 # Validate it looks like a MAC (all hex characters)
526 try:
527 int(mac_hex, 16)
528 except ValueError:
529 return None
530
531 # Check if it could be a valid MAC (not all zeros or all ones)
532 if mac_hex in ("000000000000", "ffffffffffff", "FFFFFFFFFFFF"):
533 return None
534
535 # Format as XX:XX:XX:XX:XX:XX
536 return ":".join(mac_hex[i : i + 2].upper() for i in range(0, 12, 2))
537