/
/
/
1"""DLNA Player Provider."""
2
3import asyncio
4import logging
5from ipaddress import IPv4Address
6
7from async_upnp_client.aiohttp import AiohttpSessionRequester
8from async_upnp_client.client import UpnpRequester
9from async_upnp_client.client_factory import UpnpFactory
10from async_upnp_client.search import async_search
11from async_upnp_client.utils import CaseInsensitiveDict
12from music_assistant_models.player import DeviceInfo
13
14from music_assistant.constants import CONF_PLAYERS, VERBOSE_LOG_LEVEL
15from music_assistant.helpers.util import TaskManager
16from music_assistant.models.player_provider import PlayerProvider
17
18from .constants import CONF_NETWORK_SCAN
19from .helpers import DLNANotifyServer
20from .player import DLNAPlayer
21
22
23class DLNAPlayerProvider(PlayerProvider):
24 """DLNA Player provider."""
25
26 dlnaplayers: dict[str, DLNAPlayer] = {}
27 _discovery_running: bool = False
28
29 lock: asyncio.Lock
30 requester: UpnpRequester
31 upnp_factory: UpnpFactory
32 notify_server: DLNANotifyServer
33
34 async def handle_async_init(self) -> None:
35 """Handle async initialization of the provider."""
36 self.lock = asyncio.Lock()
37 # silence the async_upnp_client logger
38 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
39 logging.getLogger("async_upnp_client").setLevel(logging.DEBUG)
40 else:
41 logging.getLogger("async_upnp_client").setLevel(self.logger.level + 10)
42 self.requester = AiohttpSessionRequester(self.mass.http_session, with_sleep=True)
43 self.upnp_factory = UpnpFactory(self.requester, non_strict=True)
44 self.notify_server = DLNANotifyServer(self.requester, self.mass)
45
46 async def unload(self, is_removed: bool = False) -> None:
47 """
48 Handle unload/close of the provider.
49
50 Called when provider is deregistered (e.g. MA exiting or config reloading).
51 """
52 self.mass.streams.unregister_dynamic_route("/notify", "NOTIFY")
53
54 async with TaskManager(self.mass) as tg:
55 for dlna_player in self.dlnaplayers.values():
56 tg.create_task(self._device_disconnect(dlna_player))
57
58 async def discover_players(self, use_multicast: bool = False) -> None:
59 """Discover DLNA players on the network."""
60 if self._discovery_running:
61 return
62 try:
63 self._discovery_running = True
64 self.logger.debug("DLNA discovery started...")
65 allow_network_scan = self.config.get_value(CONF_NETWORK_SCAN)
66 discovered_devices: set[str] = set()
67
68 async def on_response(discovery_info: CaseInsensitiveDict) -> None:
69 """Process discovered device from ssdp search."""
70 ssdp_st: str = discovery_info.get("st", discovery_info.get("nt"))
71 if not ssdp_st:
72 return
73
74 if "MediaRenderer" not in ssdp_st:
75 # we're only interested in MediaRenderer devices
76 return
77
78 ssdp_usn: str = discovery_info["usn"]
79 ssdp_udn: str | None = discovery_info.get("_udn")
80 if not ssdp_udn and ssdp_usn.startswith("uuid:"):
81 ssdp_udn = ssdp_usn.split("::")[0]
82
83 if ssdp_udn in discovered_devices:
84 # already processed this device
85 return
86
87 assert ssdp_udn is not None # for type checking
88
89 discovered_devices.add(ssdp_udn)
90
91 await self._device_discovered(ssdp_udn, discovery_info["location"])
92
93 # we iterate between using a regular and multicast search (if enabled)
94 if allow_network_scan and use_multicast:
95 await async_search(on_response, target=(str(IPv4Address("255.255.255.255")), 1900))
96 else:
97 await async_search(on_response)
98
99 finally:
100 self._discovery_running = False
101
102 def reschedule() -> None:
103 self.mass.create_task(self.discover_players(use_multicast=not use_multicast))
104
105 # reschedule self once finished
106 self.mass.loop.call_later(300, reschedule)
107
108 async def _device_disconnect(self, dlna_player: DLNAPlayer) -> None:
109 """
110 Destroy connections to the device now that it's not available.
111
112 Also call when removing this entity from MA to clean up connections.
113 """
114 async with dlna_player.lock:
115 if not dlna_player.device:
116 self.logger.debug("Disconnecting from device that's not connected")
117 return
118
119 self.logger.debug("Disconnecting from %s", dlna_player.device.name)
120
121 dlna_player.device.on_event = None
122 old_device = dlna_player.device
123 dlna_player.device = None
124 dlna_player.set_available(False)
125 await old_device.async_unsubscribe_services()
126
127 async def _device_discovered(self, udn: str, description_url: str) -> None:
128 """Handle discovered DLNA player."""
129 async with self.lock:
130 if dlna_player := self.dlnaplayers.get(udn):
131 # existing player
132 if dlna_player.description_url == description_url and dlna_player.available:
133 # nothing to do, device is already connected
134 return
135 # update description url to newly discovered one
136 dlna_player.description_url = description_url
137 else:
138 # new player detected, setup our DLNAPlayer wrapper
139 conf_key = f"{CONF_PLAYERS}/{udn}/enabled"
140 enabled = self.mass.config.get(conf_key, True)
141 # ignore disabled players
142 if not enabled:
143 self.logger.debug("Ignoring disabled player: %s", udn)
144 return
145
146 dlna_player = DLNAPlayer(
147 provider=self,
148 player_id=udn,
149 description_url=description_url,
150 )
151 # will be updated later when device connects
152 dlna_player._attr_device_info = DeviceInfo(
153 model="unknown",
154 manufacturer="unknown",
155 )
156 self.dlnaplayers[udn] = dlna_player
157
158 # Setup will return False if the device should be ignored (e.g., passive speaker)
159 if not await dlna_player.setup():
160 # Remove from dict if it was just added
161 self.dlnaplayers.pop(udn, None)
162