/
/
/
1"""DLNA Player."""
2
3import asyncio
4import functools
5import time
6from collections.abc import Awaitable, Callable, Coroutine, Sequence
7from contextlib import suppress
8from typing import TYPE_CHECKING, Any, Concatenate
9
10from async_upnp_client.client import UpnpService, UpnpStateVariable
11from async_upnp_client.exceptions import UpnpError, UpnpResponseError
12from async_upnp_client.profiles.dlna import DmrDevice, TransportState
13from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
14from music_assistant_models.enums import PlaybackState, PlayerFeature
15from music_assistant_models.errors import PlayerUnavailableError
16from music_assistant_models.player import DeviceInfo, PlayerMedia
17
18from music_assistant.constants import VERBOSE_LOG_LEVEL
19from music_assistant.helpers.upnp import create_didl_metadata
20from music_assistant.models.player import Player
21
22from .constants import PLAYER_CONFIG_ENTRIES
23
24if TYPE_CHECKING:
25 from .provider import DLNAPlayerProvider
26
27
28def catch_request_errors[DLNAPlayerT: "DLNAPlayer", **P, R](
29 func: Callable[Concatenate[DLNAPlayerT, P], Awaitable[R]],
30) -> Callable[Concatenate[DLNAPlayerT, P], Coroutine[Any, Any, R | None]]:
31 """Catch UpnpError errors."""
32
33 @functools.wraps(func)
34 async def wrapper(self: DLNAPlayerT, *args: P.args, **kwargs: P.kwargs) -> R | None:
35 """Catch UpnpError errors and check availability before and after request."""
36 self.last_command = time.time()
37 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
38 self.logger.debug(
39 "Handling command %s for player %s",
40 func.__name__,
41 self.display_name,
42 )
43 if not self.available:
44 self.logger.warning("Device disappeared when trying to call %s", func.__name__)
45 return None
46 try:
47 return await func(self, *args, **kwargs)
48 except UpnpError as err:
49 self.force_poll = True
50 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
51 self.logger.exception("Error during call %s: %r", func.__name__, err)
52 else:
53 self.logger.error("Error during call %s: %r", func.__name__, str(err))
54 return None
55
56 return wrapper
57
58
59class DLNAPlayer(Player):
60 """DLNA Player."""
61
62 def __init__(
63 self,
64 provider: "DLNAPlayerProvider",
65 player_id: str,
66 description_url: str,
67 device: DmrDevice | None = None,
68 ) -> None:
69 """Init Player.
70
71 The player_id is the udn.
72 """
73 super().__init__(provider, player_id)
74
75 self.device = device
76 self.description_url = description_url # last known location (description.xml) url
77
78 self.lock = asyncio.Lock() # Held when connecting or disconnecting the device
79
80 self.force_poll = False # used, if connection is lost
81
82 # ssdp_connect_failed: bool = False
83 #
84 # Track BOOTID in SSDP advertisements for device changes
85 self.bootid: int | None = None
86 self.last_seen = time.time()
87 self.last_command = time.time()
88
89 def set_available(self, available: bool) -> None:
90 """Set the availability of the player."""
91 self._attr_available = available
92
93 async def _device_connect(self) -> None:
94 """Connect DLNA/DMR Device."""
95 self.logger.debug("Connecting to device at %s", self.description_url)
96
97 async with self.lock:
98 if self.device:
99 self.logger.debug("Trying to connect when device already connected")
100 return
101
102 # Connect to the base UPNP device
103 if TYPE_CHECKING:
104 assert isinstance(self.provider, DLNAPlayerProvider) # for type checking
105 upnp_device = await self.provider.upnp_factory.async_create_device(self.description_url)
106
107 # Create profile wrapper
108 self.device = DmrDevice(upnp_device, self.provider.notify_server.event_handler)
109
110 # Subscribe to event notifications
111 try:
112 self.device.on_event = self._handle_event
113 await self.device.async_subscribe_services(auto_resubscribe=True)
114 except UpnpResponseError as err:
115 # Device rejected subscription request. This is OK, variables
116 # will be polled instead.
117 self.logger.debug("Device rejected subscription: %r", err)
118 except UpnpError as err:
119 # Don't leave the device half-constructed
120 self.device.on_event = None
121 self.device = None
122 self.logger.debug("Error while subscribing during device connect: %r", err)
123 raise
124 else:
125 # connect was successful, update device info
126 self._attr_device_info = DeviceInfo(
127 model=self.device.model_name,
128 manufacturer=self.device.manufacturer,
129 )
130
131 def _handle_event(
132 self,
133 service: UpnpService,
134 state_variables: Sequence[UpnpStateVariable[Any]],
135 ) -> None:
136 """Handle state variable(s) changed event from DLNA device."""
137 if not state_variables:
138 # Indicates a failure to resubscribe, check if device is still available
139 self.force_poll = True
140 return
141 if service.service_id == "urn:upnp-org:serviceId:AVTransport":
142 for state_variable in state_variables:
143 # Force a state refresh when player begins or pauses playback
144 # to update the position info.
145 if state_variable.name == "TransportState" and state_variable.value in (
146 TransportState.PLAYING,
147 TransportState.PAUSED_PLAYBACK,
148 ):
149 self.force_poll = True
150 self.mass.create_task(self.poll())
151 self.logger.debug(
152 "Received new state from event for Player %s: %s",
153 self.display_name,
154 state_variable.value,
155 )
156 self.last_seen = time.time()
157 self.mass.create_task(self._update_player())
158
159 async def _update_player(self) -> None:
160 """Update DLNA Player."""
161 prev_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
162 prev_state = self.state
163 await self.set_dynamic_attributes()
164 current_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
165 current_state = self.state
166
167 if (prev_url != current_url) or (prev_state != current_state):
168 # fetch track details on state or url change
169 self.force_poll = True
170
171 try:
172 self.update_state()
173 except (KeyError, TypeError):
174 # at start the update might come faster than the config is initialized
175 await asyncio.sleep(2)
176 self.update_state()
177
178 def _set_player_features(self) -> None:
179 """Set Player Features based on config values and capabilities."""
180 assert self.device is not None # for type checking
181 supported_features: set[PlayerFeature] = {
182 # there is no way to check if a dlna player support enqueuing
183 # so we simply assume it does and if it doesn't
184 # you'll find out at playback time and we log a warning
185 PlayerFeature.ENQUEUE,
186 PlayerFeature.GAPLESS_PLAYBACK,
187 }
188 if self.device.has_volume_level:
189 supported_features.add(PlayerFeature.VOLUME_SET)
190 if self.device.has_volume_mute:
191 supported_features.add(PlayerFeature.VOLUME_MUTE)
192 if self.device.has_pause:
193 supported_features.add(PlayerFeature.PAUSE)
194 self._attr_supported_features = supported_features
195
196 async def setup(self) -> None:
197 """Set up player in MA."""
198 await self._device_connect()
199 self.set_static_attributes()
200 await self.mass.players.register_or_update(self)
201
202 def set_static_attributes(self) -> None:
203 """Set static attributes."""
204 self._attr_needs_poll = True
205 self._attr_poll_interval = 30
206 self._set_player_features()
207
208 async def set_dynamic_attributes(self) -> None:
209 """Set dynamic attributes."""
210 available = self.device is not None and self.device.profile_device.available
211 self._attr_available = available
212 if not available:
213 return
214 assert self.device is not None # for type checking
215 self._attr_name = self.device.name
216 self._attr_volume_level = int((self.device.volume_level or 0) * 100)
217 self._attr_volume_muted = self.device.is_volume_muted or False
218 _playback_state = self._get_playback_state()
219 assert _playback_state is not None # for type checking
220 self._attr_playback_state = _playback_state
221
222 _device_uri = self.device.current_track_uri or ""
223 self.set_current_media(uri=_device_uri, clear_all=True)
224
225 # Let player controller determine active source, only override for known external sources
226 if _device_uri and _device_uri.startswith(self.mass.streams.base_url):
227 # MA stream - let controller determine source
228 self._attr_active_source = None
229 elif "spotify" in _device_uri:
230 # Spotify or Spotify Connect
231 self._attr_active_source = "spotify"
232 elif _device_uri:
233 # External HTTP source
234 self._attr_active_source = "http"
235 else:
236 # No URI - idle or unknown
237 self._attr_active_source = None
238 # TODO: extend this list with other possible sources
239 if self.device.media_position:
240 # only update elapsed_time if the device actually reports it
241 self._attr_elapsed_time = float(self.device.media_position)
242 if self.device.media_position_updated_at is not None:
243 self._attr_elapsed_time_last_updated = (
244 self.device.media_position_updated_at.timestamp()
245 )
246
247 def _get_playback_state(self) -> PlaybackState | None:
248 """Return current PlaybackState of the player."""
249 if self.device is None:
250 return None
251 if self.device.transport_state is None:
252 return PlaybackState.IDLE
253 if self.device.transport_state in (
254 TransportState.PLAYING,
255 TransportState.TRANSITIONING,
256 ):
257 return PlaybackState.PLAYING
258 if self.device.transport_state in (
259 TransportState.PAUSED_PLAYBACK,
260 TransportState.PAUSED_RECORDING,
261 ):
262 return PlaybackState.PAUSED
263 if self.device.transport_state == TransportState.VENDOR_DEFINED:
264 # Unable to map this state to anything reasonable, fallback to idle
265 return PlaybackState.IDLE
266
267 return PlaybackState.IDLE
268
269 async def get_config_entries(
270 self,
271 action: str | None = None,
272 values: dict[str, ConfigValueType] | None = None,
273 ) -> list[ConfigEntry]:
274 """Return all (provider/player specific) Config Entries for the given player (if any)."""
275 return [*PLAYER_CONFIG_ENTRIES]
276
277 # COMMANDS
278 @catch_request_errors
279 async def stop(self) -> None:
280 """Send STOP command to given player."""
281 assert self.device is not None # for type checking
282 await self.device.async_stop()
283
284 @catch_request_errors
285 async def play(self) -> None:
286 """Send PLAY command to given player."""
287 assert self.device is not None # for type checking
288 await self.device.async_play()
289
290 @catch_request_errors
291 async def play_media(self, media: PlayerMedia) -> None:
292 """Handle PLAY MEDIA on given player."""
293 assert self.device is not None # for type checking
294 # always clear queue (by sending stop) first
295 if self.device.can_stop:
296 await self.stop()
297 didl_metadata = create_didl_metadata(media)
298 title = media.title or media.uri
299 await self.device.async_set_transport_uri(media.uri, title, didl_metadata)
300 # Play it
301 await self.device.async_wait_for_can_play(10)
302 # optimistically set this timestamp to help in case of a player
303 # that does not report the progress
304 self._attr_elapsed_time = 0
305 self._attr_elapsed_time_last_updated = time.time()
306 await self.device.async_play()
307 # force poll the device
308 for sleep in (1, 2):
309 await asyncio.sleep(sleep)
310 self.force_poll = True
311 await self.poll()
312
313 @catch_request_errors
314 async def enqueue_next_media(self, media: PlayerMedia) -> None:
315 """Handle enqueuing of the next queue item on the player."""
316 assert self.device is not None # for type checking
317 didl_metadata = create_didl_metadata(media)
318 title = media.title or media.uri
319 try:
320 await self.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
321 except UpnpError:
322 self.logger.error(
323 "Enqueuing the next track failed for player %s - "
324 "the player probably doesn't support this. "
325 "Enable 'flow mode' for this player.",
326 self.display_name,
327 )
328
329 @catch_request_errors
330 async def pause(self) -> None:
331 """Send PAUSE command to given player."""
332 assert self.device is not None # for type checking
333 if self.device.can_pause:
334 await self.device.async_pause()
335 else:
336 await self.device.async_stop()
337
338 @catch_request_errors
339 async def volume_set(self, volume_level: int) -> None:
340 """Send VOLUME_SET command to given player."""
341 assert self.device is not None # for type checking
342 await self.device.async_set_volume_level(volume_level / 100)
343
344 @catch_request_errors
345 async def volume_mute(self, muted: bool) -> None:
346 """Send VOLUME MUTE command to given player."""
347 assert self.device is not None # for type checking
348 await self.device.async_mute_volume(muted)
349
350 async def poll(self) -> None:
351 """Poll player for state updates."""
352 # try to reconnect the device if the connection was lost
353 if not self.device:
354 if not self.force_poll:
355 return
356 try:
357 await self._device_connect()
358 except UpnpError as err:
359 raise PlayerUnavailableError from err
360
361 assert self.device is not None
362
363 try:
364 now = time.time()
365 do_ping = self.force_poll or (now - self.last_seen) > 60
366 with suppress(ValueError):
367 await self.device.async_update(do_ping=do_ping)
368 self.last_seen = now if do_ping else self.last_seen
369 except UpnpError as err:
370 self.logger.debug("Device unavailable: %r", err)
371 if TYPE_CHECKING:
372 assert isinstance(self.provider, DLNAPlayerProvider) # for type checking
373 await self.provider._device_disconnect(self)
374 raise PlayerUnavailableError from err
375 finally:
376 self.force_poll = False
377