/
/
/
1"""DLNA Player."""
2
3import asyncio
4import functools
5import time
6from collections.abc import Awaitable, Callable, Coroutine, Sequence
7from contextlib import suppress
8from typing import TYPE_CHECKING, Any, Concatenate
9
10from async_upnp_client.client import UpnpService, UpnpStateVariable
11from async_upnp_client.exceptions import UpnpError, UpnpResponseError
12from async_upnp_client.profiles.dlna import DmrDevice, TransportState
13from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
14from music_assistant_models.enums import PlaybackState, PlayerFeature
15from music_assistant_models.errors import PlayerUnavailableError
16from music_assistant_models.player import DeviceInfo, PlayerMedia
17
18from music_assistant.constants import VERBOSE_LOG_LEVEL
19from music_assistant.helpers.upnp import create_didl_metadata
20from music_assistant.models.player import Player
21
22from .constants import PLAYER_CONFIG_ENTRIES
23
24if TYPE_CHECKING:
25 from .provider import DLNAPlayerProvider
26
27
28def catch_request_errors[DLNAPlayerT: "DLNAPlayer", **P, R](
29 func: Callable[Concatenate[DLNAPlayerT, P], Awaitable[R]],
30) -> Callable[Concatenate[DLNAPlayerT, P], Coroutine[Any, Any, R | None]]:
31 """Catch UpnpError errors."""
32
33 @functools.wraps(func)
34 async def wrapper(self: DLNAPlayerT, *args: P.args, **kwargs: P.kwargs) -> R | None:
35 """Catch UpnpError errors and check availability before and after request."""
36 self.last_command = time.time()
37 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
38 self.logger.debug(
39 "Handling command %s for player %s",
40 func.__name__,
41 self.display_name,
42 )
43 if not self.available:
44 self.logger.warning("Device disappeared when trying to call %s", func.__name__)
45 return None
46 try:
47 return await func(self, *args, **kwargs)
48 except UpnpError as err:
49 self.force_poll = True
50 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
51 self.logger.exception("Error during call %s: %r", func.__name__, err)
52 else:
53 self.logger.error("Error during call %s: %r", func.__name__, str(err))
54 return None
55
56 return wrapper
57
58
59class DLNAPlayer(Player):
60 """DLNA Player."""
61
62 def __init__(
63 self,
64 provider: "DLNAPlayerProvider",
65 player_id: str,
66 description_url: str,
67 device: DmrDevice | None = None,
68 ) -> None:
69 """Init Player.
70
71 The player_id is the udn.
72 """
73 super().__init__(provider, player_id)
74
75 self.device = device
76 self.description_url = description_url # last known location (description.xml) url
77
78 self.lock = asyncio.Lock() # Held when connecting or disconnecting the device
79
80 self.force_poll = False # used, if connection is lost
81
82 # ssdp_connect_failed: bool = False
83 #
84 # Track BOOTID in SSDP advertisements for device changes
85 self.bootid: int | None = None
86 self.last_seen = time.time()
87 self.last_command = time.time()
88
89 async def _device_connect(self) -> None:
90 """Connect DLNA/DMR Device."""
91 self.logger.debug("Connecting to device at %s", self.description_url)
92
93 async with self.lock:
94 if self.device:
95 self.logger.debug("Trying to connect when device already connected")
96 return
97
98 # Connect to the base UPNP device
99 if TYPE_CHECKING:
100 assert isinstance(self.provider, DLNAPlayerProvider) # for type checking
101 upnp_device = await self.provider.upnp_factory.async_create_device(self.description_url)
102
103 # Create profile wrapper
104 self.device = DmrDevice(upnp_device, self.provider.notify_server.event_handler)
105
106 # Subscribe to event notifications
107 try:
108 self.device.on_event = self._handle_event
109 await self.device.async_subscribe_services(auto_resubscribe=True)
110 except UpnpResponseError as err:
111 # Device rejected subscription request. This is OK, variables
112 # will be polled instead.
113 self.logger.debug("Device rejected subscription: %r", err)
114 except UpnpError as err:
115 # Don't leave the device half-constructed
116 self.device.on_event = None
117 self.device = None
118 self.logger.debug("Error while subscribing during device connect: %r", err)
119 raise
120 else:
121 # connect was successful, update device info
122 self._attr_device_info = DeviceInfo(
123 model=self.device.model_name,
124 manufacturer=self.device.manufacturer,
125 )
126
127 def _handle_event(
128 self,
129 service: UpnpService,
130 state_variables: Sequence[UpnpStateVariable[Any]],
131 ) -> None:
132 """Handle state variable(s) changed event from DLNA device."""
133 if not state_variables:
134 # Indicates a failure to resubscribe, check if device is still available
135 self.force_poll = True
136 return
137 if service.service_id == "urn:upnp-org:serviceId:AVTransport":
138 for state_variable in state_variables:
139 # Force a state refresh when player begins or pauses playback
140 # to update the position info.
141 if state_variable.name == "TransportState" and state_variable.value in (
142 TransportState.PLAYING,
143 TransportState.PAUSED_PLAYBACK,
144 ):
145 self.force_poll = True
146 self.mass.create_task(self.poll())
147 self.logger.debug(
148 "Received new state from event for Player %s: %s",
149 self.display_name,
150 state_variable.value,
151 )
152 self.last_seen = time.time()
153 self.mass.create_task(self._update_player())
154
155 async def _update_player(self) -> None:
156 """Update DLNA Player."""
157 prev_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
158 prev_state = self.state
159 await self.set_dynamic_attributes()
160 current_url = self._attr_current_media.uri if self._attr_current_media is not None else ""
161 current_state = self.state
162
163 if (prev_url != current_url) or (prev_state != current_state):
164 # fetch track details on state or url change
165 self.force_poll = True
166
167 try:
168 self.update_state()
169 except (KeyError, TypeError):
170 # at start the update might come faster than the config is initialized
171 await asyncio.sleep(2)
172 self.update_state()
173
174 def _set_player_features(self) -> None:
175 """Set Player Features based on config values and capabilities."""
176 assert self.device is not None # for type checking
177 supported_features: set[PlayerFeature] = {
178 # there is no way to check if a dlna player support enqueuing
179 # so we simply assume it does and if it doesn't
180 # you'll find out at playback time and we log a warning
181 PlayerFeature.ENQUEUE,
182 PlayerFeature.GAPLESS_PLAYBACK,
183 }
184 if self.device.has_volume_level:
185 supported_features.add(PlayerFeature.VOLUME_SET)
186 if self.device.has_volume_mute:
187 supported_features.add(PlayerFeature.VOLUME_MUTE)
188 if self.device.has_pause:
189 supported_features.add(PlayerFeature.PAUSE)
190 self._attr_supported_features = supported_features
191
192 async def setup(self) -> None:
193 """Set up player in MA."""
194 await self._device_connect()
195 self.set_static_attributes()
196 await self.mass.players.register_or_update(self)
197
198 def set_static_attributes(self) -> None:
199 """Set static attributes."""
200 self._attr_needs_poll = True
201 self._attr_poll_interval = 30
202 self._set_player_features()
203
204 async def set_dynamic_attributes(self) -> None:
205 """Set dynamic attributes."""
206 available = self.device is not None and self.device.profile_device.available
207 self._attr_available = available
208 if not available:
209 return
210 assert self.device is not None # for type checking
211 self._attr_name = self.device.name
212 self._attr_volume_level = int((self.device.volume_level or 0) * 100)
213 self._attr_volume_muted = self.device.is_volume_muted or False
214 _playback_state = self._get_playback_state()
215 assert _playback_state is not None # for type checking
216 self._attr_playback_state = _playback_state
217
218 _device_uri = self.device.current_track_uri or ""
219 self.set_current_media(uri=_device_uri, clear_all=True)
220
221 # Let player controller determine active source, only override for known external sources
222 if _device_uri and _device_uri.startswith(self.mass.streams.base_url):
223 # MA stream - let controller determine source
224 self._attr_active_source = None
225 elif "spotify" in _device_uri:
226 # Spotify or Spotify Connect
227 self._attr_active_source = "spotify"
228 elif _device_uri:
229 # External HTTP source
230 self._attr_active_source = "http"
231 else:
232 # No URI - idle or unknown
233 self._attr_active_source = None
234 # TODO: extend this list with other possible sources
235 if self.device.media_position:
236 # only update elapsed_time if the device actually reports it
237 self._attr_elapsed_time = float(self.device.media_position)
238 if self.device.media_position_updated_at is not None:
239 self._attr_elapsed_time_last_updated = (
240 self.device.media_position_updated_at.timestamp()
241 )
242
243 def _get_playback_state(self) -> PlaybackState | None:
244 """Return current PlaybackState of the player."""
245 if self.device is None:
246 return None
247 if self.device.transport_state is None:
248 return PlaybackState.IDLE
249 if self.device.transport_state in (
250 TransportState.PLAYING,
251 TransportState.TRANSITIONING,
252 ):
253 return PlaybackState.PLAYING
254 if self.device.transport_state in (
255 TransportState.PAUSED_PLAYBACK,
256 TransportState.PAUSED_RECORDING,
257 ):
258 return PlaybackState.PAUSED
259 if self.device.transport_state == TransportState.VENDOR_DEFINED:
260 # Unable to map this state to anything reasonable, fallback to idle
261 return PlaybackState.IDLE
262
263 return PlaybackState.IDLE
264
265 async def get_config_entries(
266 self,
267 action: str | None = None,
268 values: dict[str, ConfigValueType] | None = None,
269 ) -> list[ConfigEntry]:
270 """Return all (provider/player specific) Config Entries for the given player (if any)."""
271 return [*PLAYER_CONFIG_ENTRIES]
272
273 # COMMANDS
274 @catch_request_errors
275 async def stop(self) -> None:
276 """Send STOP command to given player."""
277 assert self.device is not None # for type checking
278 await self.device.async_stop()
279
280 @catch_request_errors
281 async def play(self) -> None:
282 """Send PLAY command to given player."""
283 assert self.device is not None # for type checking
284 await self.device.async_play()
285
286 @catch_request_errors
287 async def play_media(self, media: PlayerMedia) -> None:
288 """Handle PLAY MEDIA on given player."""
289 assert self.device is not None # for type checking
290 # always clear queue (by sending stop) first
291 if self.device.can_stop:
292 await self.stop()
293 didl_metadata = create_didl_metadata(media)
294 title = media.title or media.uri
295 await self.device.async_set_transport_uri(media.uri, title, didl_metadata)
296 # Play it
297 await self.device.async_wait_for_can_play(10)
298 # optimistically set this timestamp to help in case of a player
299 # that does not report the progress
300 self._attr_elapsed_time = 0
301 self._attr_elapsed_time_last_updated = time.time()
302 await self.device.async_play()
303 # force poll the device
304 for sleep in (1, 2):
305 await asyncio.sleep(sleep)
306 self.force_poll = True
307 await self.poll()
308
309 @catch_request_errors
310 async def enqueue_next_media(self, media: PlayerMedia) -> None:
311 """Handle enqueuing of the next queue item on the player."""
312 assert self.device is not None # for type checking
313 didl_metadata = create_didl_metadata(media)
314 title = media.title or media.uri
315 try:
316 await self.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
317 except UpnpError:
318 self.logger.error(
319 "Enqueuing the next track failed for player %s - "
320 "the player probably doesn't support this. "
321 "Enable 'flow mode' for this player.",
322 self.display_name,
323 )
324
325 @catch_request_errors
326 async def pause(self) -> None:
327 """Send PAUSE command to given player."""
328 assert self.device is not None # for type checking
329 if self.device.can_pause:
330 await self.device.async_pause()
331 else:
332 await self.device.async_stop()
333
334 @catch_request_errors
335 async def volume_set(self, volume_level: int) -> None:
336 """Send VOLUME_SET command to given player."""
337 assert self.device is not None # for type checking
338 await self.device.async_set_volume_level(volume_level / 100)
339
340 @catch_request_errors
341 async def volume_mute(self, muted: bool) -> None:
342 """Send VOLUME MUTE command to given player."""
343 assert self.device is not None # for type checking
344 await self.device.async_mute_volume(muted)
345
346 async def poll(self) -> None:
347 """Poll player for state updates."""
348 # try to reconnect the device if the connection was lost
349 if not self.device:
350 if not self.force_poll:
351 return
352 try:
353 await self._device_connect()
354 except UpnpError as err:
355 raise PlayerUnavailableError from err
356
357 assert self.device is not None
358
359 try:
360 now = time.time()
361 do_ping = self.force_poll or (now - self.last_seen) > 60
362 with suppress(ValueError):
363 await self.device.async_update(do_ping=do_ping)
364 self.last_seen = now if do_ping else self.last_seen
365 except UpnpError as err:
366 self.logger.debug("Device unavailable: %r", err)
367 if TYPE_CHECKING:
368 assert isinstance(self.provider, DLNAPlayerProvider) # for type checking
369 await self.provider._device_disconnect(self)
370 raise PlayerUnavailableError from err
371 finally:
372 self.force_poll = False
373