/
/
/
1"""VBAN protocol receiver plugin for Music Assistant."""
2
3from __future__ import annotations
4
5import asyncio
6import re
7from collections.abc import AsyncGenerator
8from contextlib import suppress
9from typing import TYPE_CHECKING, cast
10
11from aiovban.asyncio.util import BackPressureStrategy
12from aiovban.enums import VBANSampleRate
13from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
14from music_assistant_models.enums import ConfigEntryType, ContentType, ProviderFeature, StreamType
15from music_assistant_models.errors import SetupFailedError
16from music_assistant_models.media_items import AudioFormat
17from music_assistant_models.streamdetails import StreamMetadata
18
19from music_assistant.constants import CONF_BIND_IP, CONF_BIND_PORT, CONF_ENTRY_WARN_PREVIEW
20from music_assistant.helpers.util import get_ip_addresses
21from music_assistant.models.plugin import PluginProvider, PluginSource
22
23from .vban import AsyncVBANClientMod
24
25if TYPE_CHECKING:
26 from aiovban.asyncio.device import VBANDevice
27 from aiovban.asyncio.streams import VBANIncomingStream
28 from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
29 from music_assistant_models.provider import ProviderManifest
30
31 from music_assistant.mass import MusicAssistant
32 from music_assistant.models import ProviderInstanceType
33
34DEFAULT_UDP_PORT = 6980
35DEFAULT_PCM_AUDIO_FORMAT = "S16LE"
36DEFAULT_PCM_SAMPLE_RATE = 44100
37DEFAULT_AUDIO_CHANNELS = 2
38
39CONF_VBAN_STREAM_NAME = "vban_stream_name"
40CONF_SENDER_HOST = "sender_host"
41CONF_PCM_AUDIO_FORMAT = "audio_format"
42CONF_PCM_SAMPLE_RATE = "sample_rate"
43CONF_AUDIO_CHANNELS = "audio_channels"
44CONF_VBAN_QUEUE_STRATEGY = "vban_queue_strategy"
45CONF_VBAN_QUEUE_SIZE = "vban_queue_size"
46
47VBAN_QUEUE_STRATEGIES = {
48 "Clear entire queue": BackPressureStrategy.DROP,
49 "Clear the oldest half of the queue": BackPressureStrategy.DRAIN_OLDEST,
50 "Remove single oldest queue entry": BackPressureStrategy.POP,
51}
52
53SUPPORTED_FEATURES = {ProviderFeature.AUDIO_SOURCE}
54
55
56def _get_supported_pcm_formats() -> dict[str, int]:
57 """Return supported PCM formats."""
58 pcm_formats = {}
59 for content_type in ContentType.__members__:
60 if match := re.match(r"PCM_([S|F](\d{2})LE)", content_type):
61 pcm_formats[match.group(1)] = int(match.group(2))
62 return pcm_formats
63
64
65def _get_vban_sample_rates() -> list[int]:
66 """Return supported VBAN sample rates."""
67 return [int(member.split("_")[1]) for member in VBANSampleRate.__members__]
68
69
70async def setup(
71 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
72) -> ProviderInstanceType:
73 """Initialize provider(instance) with given configuration."""
74 return VBANReceiverProvider(mass, manifest, config)
75
76
77async def get_config_entries(
78 mass: MusicAssistant, # noqa: ARG001
79 instance_id: str | None = None, # noqa: ARG001
80 action: str | None = None, # noqa: ARG001
81 values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
82) -> tuple[ConfigEntry, ...]:
83 """
84 Return Config entries to setup this provider.
85
86 instance_id: id of an existing provider instance (None if new instance setup).
87 action: [optional] action key called from config entries UI.
88 values: the (intermediate) raw values for config entries sent with the action.
89 """
90 ip_addresses = await get_ip_addresses()
91
92 def _validate_stream_name(config_value: str) -> bool:
93 """Validate stream name."""
94 try:
95 config_value.encode("ascii")
96 except UnicodeEncodeError:
97 return False
98 return len(config_value) < 17
99
100 return (
101 CONF_ENTRY_WARN_PREVIEW,
102 ConfigEntry(
103 key=CONF_BIND_PORT,
104 type=ConfigEntryType.INTEGER,
105 default_value=DEFAULT_UDP_PORT,
106 label="Receiver: UDP Port",
107 description="The UDP port the VBAN receiver will listen on for connections. "
108 "Make sure that this server can be reached "
109 "on the given IP and UDP port by remote VBAN senders.",
110 ),
111 ConfigEntry(
112 key=CONF_VBAN_STREAM_NAME,
113 type=ConfigEntryType.STRING,
114 label="Sender: VBAN Stream Name",
115 default_value="Network AUX",
116 description="Max 16 ASCII chars.\n"
117 "The VBAN stream name to expect from the remote VBAN sender.\n"
118 "This MUST match what the remote VBAN sender has set for the session name "
119 "otherwise audio streaming will not work.",
120 required=True,
121 validate=_validate_stream_name, # type: ignore[arg-type]
122 ),
123 ConfigEntry(
124 key=CONF_SENDER_HOST,
125 type=ConfigEntryType.STRING,
126 default_value="127.0.0.1",
127 label="Sender: VBAN Sender hostname/IP address",
128 description="The hostname/IP Address of the remote VBAN SENDER.",
129 required=True,
130 ),
131 ConfigEntry(
132 key=CONF_PCM_AUDIO_FORMAT,
133 type=ConfigEntryType.STRING,
134 default_value=DEFAULT_PCM_AUDIO_FORMAT,
135 options=[ConfigValueOption(x, x) for x in _get_supported_pcm_formats()],
136 label="PCM audio format",
137 description="The VBAN PCM audio format to expect from the remote VBAN sender. "
138 "This MUST match what the remote VBAN sender has set otherwise audio streaming "
139 "will not work.",
140 required=True,
141 ),
142 ConfigEntry(
143 key=CONF_PCM_SAMPLE_RATE,
144 type=ConfigEntryType.INTEGER,
145 default_value=DEFAULT_PCM_SAMPLE_RATE,
146 options=[ConfigValueOption(str(x), x) for x in _get_vban_sample_rates()],
147 label="PCM sample rate",
148 description="The VBAN PCM sample rate to expect from the remote VBAN sender. "
149 "This MUST match what the remote VBAN sender has set otherwise audio streaming "
150 "will not work.",
151 required=True,
152 ),
153 ConfigEntry(
154 key=CONF_AUDIO_CHANNELS,
155 type=ConfigEntryType.INTEGER,
156 default_value=DEFAULT_AUDIO_CHANNELS,
157 options=[ConfigValueOption(str(x), x) for x in list(range(1, 9))],
158 label="Channels",
159 description="The number of audio channels",
160 required=True,
161 ),
162 ConfigEntry(
163 key=CONF_BIND_IP,
164 type=ConfigEntryType.STRING,
165 default_value="0.0.0.0",
166 options=[ConfigValueOption(x, x) for x in {"0.0.0.0", *ip_addresses}],
167 label="Receiver: Bind to IP/interface",
168 description="Start the VBAN receiver on this specific interface. \n"
169 "Use 0.0.0.0 to bind to all interfaces, which is the default. \n"
170 "This is an advanced setting that should normally "
171 "not be adjusted in regular setups.",
172 advanced=True,
173 required=True,
174 ),
175 ConfigEntry(
176 key=CONF_VBAN_QUEUE_STRATEGY,
177 type=ConfigEntryType.STRING,
178 default_value=next(iter(VBAN_QUEUE_STRATEGIES)),
179 options=[ConfigValueOption(x, x) for x in VBAN_QUEUE_STRATEGIES],
180 label="Receiver: VBAN queue strategy",
181 description="What should happen if the receiving queue fills up?",
182 advanced=True,
183 required=True,
184 ),
185 ConfigEntry(
186 key=CONF_VBAN_QUEUE_SIZE,
187 type=ConfigEntryType.INTEGER,
188 default_value=AsyncVBANClientMod.default_queue_size,
189 label="Receiver: VBAN packets queue size",
190 description="This can be increased if MA is running on a very low power device, "
191 "otherwise this should not need to be changed.",
192 advanced=True,
193 required=True,
194 ),
195 )
196
197
198class VBANReceiverProvider(PluginProvider):
199 """Implementation of a VBAN protocol receiver plugin."""
200
201 def __init__(
202 self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
203 ) -> None:
204 """Initialize MusicProvider."""
205 super().__init__(mass, manifest, config, SUPPORTED_FEATURES)
206 self._bind_port: int = cast("int", self.config.get_value(CONF_BIND_PORT))
207 self._bind_ip: str = cast("str", self.config.get_value(CONF_BIND_IP))
208 self._sender_host: str = cast("str", self.config.get_value(CONF_SENDER_HOST))
209 self._vban_stream_name: str = cast("str", self.config.get_value(CONF_VBAN_STREAM_NAME))
210 self._pcm_audio_format: str = cast("str", self.config.get_value(CONF_PCM_AUDIO_FORMAT))
211 self._pcm_sample_rate: int = cast("int", self.config.get_value(CONF_PCM_SAMPLE_RATE))
212 self._audio_channels: int = cast("int", self.config.get_value(CONF_AUDIO_CHANNELS))
213 self._vban_queue_strategy: BackPressureStrategy = VBAN_QUEUE_STRATEGIES[
214 cast("str", self.config.get_value(CONF_VBAN_QUEUE_STRATEGY))
215 ]
216 self._vban_queue_size: int = cast("int", self.config.get_value(CONF_VBAN_QUEUE_SIZE))
217
218 self._vban_receiver: AsyncVBANClientMod | None = None
219 self._vban_sender: VBANDevice | None = None
220 self._vban_stream: VBANIncomingStream | None = None
221
222 self._source_details = PluginSource(
223 id=self.instance_id,
224 name=f"{self.manifest.name}: {self._vban_stream_name}",
225 passive=False,
226 can_play_pause=False,
227 can_seek=False,
228 can_next_previous=False,
229 audio_format=AudioFormat(
230 content_type=ContentType(self._pcm_audio_format.lower()),
231 codec_type=ContentType(self._pcm_audio_format.lower()),
232 sample_rate=self._pcm_sample_rate,
233 bit_depth=_get_supported_pcm_formats()[self._pcm_audio_format],
234 channels=self._audio_channels,
235 ),
236 metadata=StreamMetadata(
237 title=self._vban_stream_name,
238 artist=self._sender_host,
239 ),
240 stream_type=StreamType.CUSTOM,
241 )
242
243 @property
244 def instance_name_postfix(self) -> str | None:
245 """Return a (default) instance name postfix for this provider instance."""
246 return self._vban_stream_name
247
248 async def handle_async_init(self) -> None:
249 """Handle async initialization of the provider."""
250 self._vban_receiver = AsyncVBANClientMod(
251 default_queue_size=self._vban_queue_size, ignore_audio_streams=False
252 )
253 try:
254 self._udp_socket_task = asyncio.create_task(
255 self._vban_receiver.listen(
256 address=self._bind_ip, port=self._bind_port, controller=self
257 )
258 )
259 except OSError as err:
260 raise SetupFailedError(f"Failed to start VBAN receiver plugin: {err}") from err
261
262 self._vban_sender = self._vban_receiver.register_device(self._sender_host)
263 if self._vban_sender:
264 self._vban_stream = self._vban_sender.receive_stream(
265 self._vban_stream_name, back_pressure_strategy=self._vban_queue_strategy
266 )
267
268 async def unload(self, is_removed: bool = False) -> None:
269 """Handle close/cleanup of the provider."""
270 self.logger.debug("Unloading plugin")
271 if self._vban_receiver:
272 self.logger.debug("Closing UDP transport")
273 self._vban_receiver.close()
274 with suppress(asyncio.CancelledError):
275 await self._udp_socket_task
276
277 self._vban_receiver = None
278 self._vban_sender = None
279 self._vban_stream = None
280 await asyncio.sleep(0.1)
281
282 def get_source(self) -> PluginSource:
283 """Get (audio)source details for this plugin."""
284 return self._source_details
285
286 @property
287 def active_player(self) -> bool:
288 """Report the active player status."""
289 return bool(self._source_details.in_use_by)
290
291 async def get_audio_stream(self, player_id: str) -> AsyncGenerator[bytes, None]:
292 """Yield raw PCM chunks from the VBANIncomingStream queue."""
293 self.logger.debug(
294 "Getting VBAN PCM audio stream for Player: %s//Stream: %s//Config: %s",
295 player_id,
296 self._vban_stream_name,
297 self._source_details.audio_format.output_format_str,
298 )
299 while (
300 self._source_details.in_use_by
301 and self._vban_stream
302 and not self._udp_socket_task.done()
303 ):
304 try:
305 packet = await self._vban_stream.get_packet()
306 except asyncio.QueueShutDown: # type: ignore[attr-defined]
307 self.logger.error(
308 "Found VBANIncomingStream queue shut down when attempting to get VBAN packet"
309 )
310 break
311
312 yield packet.body.data
313