/
/
/
1"""Various helpers and utils for the DLNA Player Provider."""
2
3from __future__ import annotations
4
5from typing import TYPE_CHECKING
6
7from aiohttp.web import Request, Response
8from async_upnp_client.const import HttpRequest
9from async_upnp_client.event_handler import UpnpEventHandler, UpnpNotifyServer
10
11if TYPE_CHECKING:
12 from async_upnp_client.client import UpnpRequester
13
14 from music_assistant import MusicAssistant
15
16
17class DLNANotifyServer(UpnpNotifyServer): # type: ignore[misc,unused-ignore]
18 """Notify server for async_upnp_client which uses the MA webserver."""
19
20 def __init__(
21 self,
22 requester: UpnpRequester,
23 mass: MusicAssistant,
24 ) -> None:
25 """Initialize."""
26 self.mass = mass
27 self.event_handler = UpnpEventHandler(self, requester)
28 self.mass.streams.register_dynamic_route("/notify", self._handle_request, method="NOTIFY")
29
30 async def _handle_request(self, request: Request) -> Response:
31 """Handle incoming requests."""
32 if request.method != "NOTIFY":
33 return Response(status=405)
34
35 # transform aiohttp request to async_upnp_client request
36 http_request = HttpRequest(
37 method=request.method,
38 url=str(request.url),
39 headers=request.headers,
40 body=await request.text(),
41 )
42
43 status = await self.event_handler.handle_notify(http_request)
44
45 return Response(status=status)
46
47 @property
48 def callback_url(self) -> str:
49 """Return callback URL on which we are callable."""
50 return f"{self.mass.streams.base_url}/notify"
51