music-assistant-server

12.6 KBPY
__init__.py
12.6 KB313 lines • python
1"""VBAN protocol receiver plugin for Music Assistant."""
2
3from __future__ import annotations
4
5import asyncio
6import re
7from collections.abc import AsyncGenerator
8from contextlib import suppress
9from typing import TYPE_CHECKING, cast
10
11from aiovban.asyncio.util import BackPressureStrategy
12from aiovban.enums import VBANSampleRate
13from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
14from music_assistant_models.enums import ConfigEntryType, ContentType, ProviderFeature, StreamType
15from music_assistant_models.errors import SetupFailedError
16from music_assistant_models.media_items import AudioFormat
17from music_assistant_models.streamdetails import StreamMetadata
18
19from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT, CONF_ENTRY_WARN_PREVIEW
20from music_assistant.helpers.util import get_ip_addresses
21from music_assistant.models.plugin import PluginProvider, PluginSource
22
23from .vban import AsyncVBANClientMod
24
25if TYPE_CHECKING:
26    from aiovban.asyncio.device import VBANDevice
27    from aiovban.asyncio.streams import VBANIncomingStream
28    from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
29    from music_assistant_models.provider import ProviderManifest
30
31    from music_assistant.mass import MusicAssistant
32    from music_assistant.models import ProviderInstanceType
33
34DEFAULT_UDP_PORT = 6980
35DEFAULT_PCM_AUDIO_FORMAT = "S16LE"
36DEFAULT_PCM_SAMPLE_RATE = 44100
37DEFAULT_AUDIO_CHANNELS = 2
38
39CONF_VBAN_STREAM_NAME = "vban_stream_name"
40CONF_SENDER_HOST = "sender_host"
41CONF_PCM_AUDIO_FORMAT = "audio_format"
42CONF_PCM_SAMPLE_RATE = "sample_rate"
43CONF_AUDIO_CHANNELS = "audio_channels"
44CONF_VBAN_QUEUE_STRATEGY = "vban_queue_strategy"
45CONF_VBAN_QUEUE_SIZE = "vban_queue_size"
46
47VBAN_QUEUE_STRATEGIES = {
48    "Clear entire queue": BackPressureStrategy.DROP,
49    "Clear the oldest half of the queue": BackPressureStrategy.DRAIN_OLDEST,
50    "Remove single oldest queue entry": BackPressureStrategy.POP,
51}
52
53SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE}
54
55
56def _get_supported_pcm_formats() -> dict[str, int]:
57    """Return supported PCM formats."""
58    pcm_formats = {}
59    for content_type in ContentType.__members__:
60        if match := re.match(r"PCM_([S|F](\d{2})LE)", content_type):
61            pcm_formats[match.group(1)] = int(match.group(2))
62    return pcm_formats
63
64
65def _get_vban_sample_rates() -> list[int]:
66    """Return supported VBAN sample rates."""
67    return [int(member.split("_")[1]) for member in VBANSampleRate.__members__]
68
69
70async def setup(
71    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
72) -> ProviderInstanceType:
73    """Initialize provider(instance) with given configuration."""
74    return VBANReceiverProvider(mass, manifest, config)
75
76
77async def get_config_entries(
78    mass: MusicAssistant,  # noqa: ARG001
79    instance_id: str | None = None,  # noqa: ARG001
80    action: str | None = None,  # noqa: ARG001
81    values: dict[str, ConfigValueType] | None = None,  # noqa: ARG001
82) -> tuple[ConfigEntry, ...]:
83    """
84    Return Config entries to setup this provider.
85
86    instance_id: id of an existing provider instance (None if new instance setup).
87    action: [optional] action key called from config entries UI.
88    values: the (intermediate) raw values for config entries sent with the action.
89    """
90    ip_addresses = await get_ip_addresses()
91
92    def _validate_stream_name(config_value: str) -> bool:
93        """Validate stream name."""
94        try:
95            config_value.encode("ascii")
96        except UnicodeEncodeError:
97            return False
98        return len(config_value) < 17
99
100    return (
101        CONF_ENTRY_WARN_PREVIEW,
102        ConfigEntry(
103            key=CONF_BIND_PORT,
104            type=ConfigEntryType.INTEGER,
105            default_value=DEFAULT_UDP_PORT,
106            label="Receiver: UDP Port",
107            description="The UDP port the VBAN receiver will listen on for connections. "
108            "Make sure that this server can be reached "
109            "on the given IP and UDP port by remote VBAN senders.",
110        ),
111        ConfigEntry(
112            key=CONF_VBAN_STREAM_NAME,
113            type=ConfigEntryType.STRING,
114            label="Sender: VBAN Stream Name",
115            default_value="Network AUX",
116            description="Max 16 ASCII chars.\n"
117            "The VBAN stream name to expect from the remote VBAN sender.\n"
118            "This MUST match what the remote VBAN sender has set for the session name "
119            "otherwise audio streaming will not work.",
120            required=True,
121            validate=_validate_stream_name,  # type: ignore[arg-type]
122        ),
123        ConfigEntry(
124            key=CONF_SENDER_HOST,
125            type=ConfigEntryType.STRING,
126            default_value="127.0.0.1",
127            label="Sender: VBAN Sender hostname/IP address",
128            description="The hostname/IP Address of the remote VBAN SENDER.",
129            required=True,
130        ),
131        ConfigEntry(
132            key=CONF_PCM_AUDIO_FORMAT,
133            type=ConfigEntryType.STRING,
134            default_value=DEFAULT_PCM_AUDIO_FORMAT,
135            options=[ConfigValueOption(x, x) for x in _get_supported_pcm_formats()],
136            label="PCM audio format",
137            description="The VBAN PCM audio format to expect from the remote VBAN sender. "
138            "This MUST match what the remote VBAN sender has set otherwise audio streaming "
139            "will not work.",
140            required=True,
141        ),
142        ConfigEntry(
143            key=CONF_PCM_SAMPLE_RATE,
144            type=ConfigEntryType.INTEGER,
145            default_value=DEFAULT_PCM_SAMPLE_RATE,
146            options=[ConfigValueOption(str(x), x) for x in _get_vban_sample_rates()],
147            label="PCM sample rate",
148            description="The VBAN PCM sample rate to expect from the remote VBAN sender. "
149            "This MUST match what the remote VBAN sender has set otherwise audio streaming "
150            "will not work.",
151            required=True,
152        ),
153        ConfigEntry(
154            key=CONF_AUDIO_CHANNELS,
155            type=ConfigEntryType.INTEGER,
156            default_value=DEFAULT_AUDIO_CHANNELS,
157            options=[ConfigValueOption(str(x), x) for x in list(range(1, 9))],
158            label="Channels",
159            description="The number of audio channels",
160            required=True,
161        ),
162        ConfigEntry(
163            key=CONF_BIND_IP,
164            type=ConfigEntryType.STRING,
165            default_value="0.0.0.0",
166            options=[ConfigValueOption(x, x) for x in {"0.0.0.0", *ip_addresses}],
167            label="Receiver: Bind to IP/interface",
168            description="Start the VBAN receiver on this specific interface. \n"
169            "Use 0.0.0.0 to bind to all interfaces, which is the default. \n"
170            "This is an advanced setting that should normally "
171            "not be adjusted in regular setups.",
172            advanced=True,
173            required=True,
174        ),
175        ConfigEntry(
176            key=CONF_VBAN_QUEUE_STRATEGY,
177            type=ConfigEntryType.STRING,
178            default_value=next(iter(VBAN_QUEUE_STRATEGIES)),
179            options=[ConfigValueOption(x, x) for x in VBAN_QUEUE_STRATEGIES],
180            label="Receiver: VBAN queue strategy",
181            description="What should happen if the receiving queue fills up?",
182            advanced=True,
183            required=True,
184        ),
185        ConfigEntry(
186            key=CONF_VBAN_QUEUE_SIZE,
187            type=ConfigEntryType.INTEGER,
188            default_value=AsyncVBANClientMod.default_queue_size,
189            label="Receiver: VBAN packets queue size",
190            description="This can be increased if MA is running on a very low power device, "
191            "otherwise this should not need to be changed.",
192            advanced=True,
193            required=True,
194        ),
195    )
196
197
198class VBANReceiverProvider(PluginProvider):
199    """Implementation of a VBAN protocol receiver plugin."""
200
201    def __init__(
202        self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
203    ) -> None:
204        """Initialize MusicProvider."""
205        super().__init__(mass, manifest, config, SUPPORTED_FEATURES)
206        self._bind_port: int = cast("int", self.config.get_value(CONF_BIND_PORT))
207        self._bind_ip: str = cast("str", self.config.get_value(CONF_BIND_IP))
208        self._sender_host: str = cast("str", self.config.get_value(CONF_SENDER_HOST))
209        self._vban_stream_name: str = cast("str", self.config.get_value(CONF_VBAN_STREAM_NAME))
210        self._pcm_audio_format: str = cast("str", self.config.get_value(CONF_PCM_AUDIO_FORMAT))
211        self._pcm_sample_rate: int = cast("int", self.config.get_value(CONF_PCM_SAMPLE_RATE))
212        self._audio_channels: int = cast("int", self.config.get_value(CONF_AUDIO_CHANNELS))
213        self._vban_queue_strategy: BackPressureStrategy = VBAN_QUEUE_STRATEGIES[
214            cast("str", self.config.get_value(CONF_VBAN_QUEUE_STRATEGY))
215        ]
216        self._vban_queue_size: int = cast("int", self.config.get_value(CONF_VBAN_QUEUE_SIZE))
217
218        self._vban_receiver: AsyncVBANClientMod | None = None
219        self._vban_sender: VBANDevice | None = None
220        self._vban_stream: VBANIncomingStream | None = None
221
222        self._source_details = PluginSource(
223            id=self.instance_id,
224            name=f"{self.manifest.name}: {self._vban_stream_name}",
225            passive=False,
226            can_play_pause=False,
227            can_seek=False,
228            can_next_previous=False,
229            audio_format=AudioFormat(
230                content_type=ContentType(self._pcm_audio_format.lower()),
231                codec_type=ContentType(self._pcm_audio_format.lower()),
232                sample_rate=self._pcm_sample_rate,
233                bit_depth=_get_supported_pcm_formats()[self._pcm_audio_format],
234                channels=self._audio_channels,
235            ),
236            metadata=StreamMetadata(
237                title=self._vban_stream_name,
238                artist=self._sender_host,
239            ),
240            stream_type=StreamType.CUSTOM,
241        )
242
243    @property
244    def instance_name_postfix(self) -> str | None:
245        """Return a (default) instance name postfix for this provider instance."""
246        return self._vban_stream_name
247
248    async def handle_async_init(self) -> None:
249        """Handle async initialization of the provider."""
250        self._vban_receiver = AsyncVBANClientMod(
251            default_queue_size=self._vban_queue_size, ignore_audio_streams=False
252        )
253        try:
254            self._udp_socket_task = asyncio.create_task(
255                self._vban_receiver.listen(
256                    address=self._bind_ip, port=self._bind_port, controller=self
257                )
258            )
259        except OSError as err:
260            raise SetupFailedError(f"Failed to start VBAN receiver plugin: {err}") from err
261
262        self._vban_sender = self._vban_receiver.register_device(self._sender_host)
263        if self._vban_sender:
264            self._vban_stream = self._vban_sender.receive_stream(
265                self._vban_stream_name, back_pressure_strategy=self._vban_queue_strategy
266            )
267
268    async def unload(self, is_removed: bool = False) -> None:
269        """Handle close/cleanup of the provider."""
270        self.logger.debug("Unloading plugin")
271        if self._vban_receiver:
272            self.logger.debug("Closing UDP transport")
273            self._vban_receiver.close()
274            with suppress(asyncio.CancelledError):
275                await self._udp_socket_task
276
277        self._vban_receiver = None
278        self._vban_sender = None
279        self._vban_stream = None
280        await asyncio.sleep(0.1)
281
282    def get_source(self) -> PluginSource:
283        """Get (audio)source details for this plugin."""
284        return self._source_details
285
286    @property
287    def active_player(self) -> bool:
288        """Report the active player status."""
289        return bool(self._source_details.in_use_by)
290
291    async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]:
292        """Yield raw PCM chunks from the VBANIncomingStream queue."""
293        self.logger.debug(
294            "Getting VBAN PCM audio stream for Player: %s//Stream: %s//Config: %s",
295            player_id,
296            self._vban_stream_name,
297            self._source_details.audio_format.output_format_str,
298        )
299        while (
300            self._source_details.in_use_by
301            and self._vban_stream
302            and not self._udp_socket_task.done()
303        ):
304            try:
305                packet = await self._vban_stream.get_packet()
306            except asyncio.QueueShutDown:  # type: ignore[attr-defined]
307                self.logger.error(
308                    "Found VBANIncomingStream queue shut down when attempting to get VBAN packet"
309                )
310                break
311
312            yield packet.body.data
313