music-assistant-server

3.7 KBPY
vban.py
3.7 KB113 lines • python
1"""VBAN subclasses to workaround issues in aiovban 0.6.3."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import os
8from dataclasses import dataclass
9from typing import TYPE_CHECKING, Any
10
11from aiovban.asyncio import AsyncVBANClient
12from aiovban.packet import VBANPacket
13from aiovban.packet.headers import VBANHeaderException
14
15if TYPE_CHECKING:
16    from . import VBANReceiverProvider
17
18logger = logging.getLogger(__name__)
19_aiovban_log_level = os.environ.get("AIOVBAN_LOG_LEVEL", "info").upper()
20logging.getLogger("aiovban.asyncio.aiovban.asyncio.util").setLevel(_aiovban_log_level)
21
22
23class VBANListenerProtocolMod(asyncio.DatagramProtocol):
24    """VBANListenerProcotol workaround."""
25
26    def __init__(self, client: AsyncVBANClientMod) -> None:
27        """Initialize."""
28        # WORKAROUND: each instance gets it's own Future.
29        self.done: asyncio.Future[Any] = asyncio.get_event_loop().create_future()
30        self._background_tasks: set[asyncio.Task[Any]] = set()
31        self._client = client
32
33    def error_received(self, exc: Exception) -> None:
34        """Handle error."""
35        self.done.set_exception(exc)
36
37    def connection_lost(self, exc: Exception | None) -> None:
38        """Handle lost connection."""
39        if self.done.done():
40            return
41        # WORKAROUND: handle exc properly.
42        if exc:
43            self.done.set_exception(exc)
44        else:
45            self.done.set_result(None)
46
47    def connection_made(self, transport) -> None:  # type: ignore[no-untyped-def]
48        """Handle connection made."""
49        logger.debug(f"Connection made to {transport}")
50
51    def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
52        """Handle received datagram."""
53        sender_ip, sender_port = addr
54
55        if self._client.quick_reject(sender_ip) or not self._client.active_player:
56            return
57
58        try:
59            packet = VBANPacket.unpack(data)
60        except VBANHeaderException as exc:
61            logger.error(f"Error unpacking packet: {exc}")
62            return
63        except ValueError as exc:
64            # Handle odd packet sent when Voicemeeter start/stops stream
65            error_msg = "6000 is not a valid VBANSampleRate"
66            if str(exc) == error_msg:
67                return
68            raise
69
70        task = asyncio.create_task(self._client.process_packet(sender_ip, sender_port, packet))
71        self._background_tasks.add(task)
72        task.add_done_callback(self._background_tasks.discard)
73
74
75@dataclass
76class AsyncVBANClientMod(AsyncVBANClient):  # type: ignore[misc]
77    """AsyncVBANClient workaround."""
78
79    _controller: VBANReceiverProvider | None = None
80
81    @property
82    def active_player(self) -> bool:
83        """Report the active player status."""
84        return False if not self._controller else self._controller.active_player
85
86    async def listen(
87        self,
88        address: str = "0.0.0.0",
89        port: int = 6980,
90        loop: asyncio.AbstractEventLoop | None = None,
91        controller: VBANReceiverProvider | None = None,
92    ) -> None:
93        """Create UDP listener."""
94        loop = loop or asyncio.get_running_loop()
95        self._controller = controller
96
97        # Create a socket and set the options
98        self._transport, proto = await loop.create_datagram_endpoint(
99            lambda: VBANListenerProtocolMod(self),
100            local_addr=(address, port),
101            allow_broadcast=not self.ignore_audio_streams,
102        )
103
104        # WORKAROUND: await, not return.
105        await proto.done
106
107    def close(self) -> None:
108        """Close down the connection."""
109        self._controller = None
110        if self._transport:
111            self._transport.close()
112            self._transport = None  # type: ignore[assignment]
113