/
/
/
1"""RadioBrowser musicprovider support for MusicAssistant."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator, Sequence
6from typing import TYPE_CHECKING, cast
7
8from music_assistant_models.config_entries import ConfigEntry
9from music_assistant_models.enums import (
10 ConfigEntryType,
11 ContentType,
12 ImageType,
13 LinkType,
14 MediaType,
15 ProviderFeature,
16 StreamType,
17)
18from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError
19from music_assistant_models.media_items import (
20 AudioFormat,
21 BrowseFolder,
22 MediaItemImage,
23 MediaItemLink,
24 MediaItemType,
25 ProviderMapping,
26 Radio,
27 SearchResults,
28 UniqueList,
29)
30from music_assistant_models.streamdetails import StreamDetails
31from radios import FilterBy, Order, RadioBrowser, RadioBrowserError, Station
32
33from music_assistant.constants import (
34 CONF_ENTRY_LIBRARY_SYNC_BACK,
35 CONF_ENTRY_LIBRARY_SYNC_RADIOS,
36 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS,
37)
38from music_assistant.controllers.cache import use_cache
39from music_assistant.models.music_provider import MusicProvider
40
41SUPPORTED_FEATURES = {
42 ProviderFeature.SEARCH,
43 ProviderFeature.BROWSE,
44 ProviderFeature.LIBRARY_RADIOS,
45 ProviderFeature.LIBRARY_RADIOS_EDIT,
46}
47
48if TYPE_CHECKING:
49 from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
50 from music_assistant_models.provider import ProviderManifest
51
52 from music_assistant.mass import MusicAssistant
53 from music_assistant.models import ProviderInstanceType
54
55CONF_STORED_RADIOS = "stored_radios"
56
57CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN = ConfigEntry.from_dict(
58 {
59 **CONF_ENTRY_LIBRARY_SYNC_RADIOS.to_dict(),
60 "hidden": True,
61 "default_value": "import_only",
62 }
63)
64CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN = ConfigEntry.from_dict(
65 {
66 **CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS.to_dict(),
67 "hidden": True,
68 "default_value": 180,
69 }
70)
71CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN = ConfigEntry.from_dict(
72 {
73 **CONF_ENTRY_LIBRARY_SYNC_BACK.to_dict(),
74 "hidden": True,
75 "default_value": True,
76 }
77)
78
79
80async def setup(
81 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
82) -> ProviderInstanceType:
83 """Initialize provider(instance) with given configuration."""
84 return RadioBrowserProvider(mass, manifest, config, SUPPORTED_FEATURES)
85
86
87async def get_config_entries(
88 mass: MusicAssistant,
89 instance_id: str | None = None,
90 action: str | None = None,
91 values: dict[str, ConfigValueType] | None = None,
92) -> tuple[ConfigEntry, ...]:
93 """
94 Return Config entries to setup this provider.
95 instance_id: id of an existing provider instance (None if new instance setup).
96 action: [optional] action key called from config entries UI.
97 values: the (intermediate) raw values for config entries sent with the action.
98 """
99 # ruff: noqa: ARG001 D205
100 return (
101 ConfigEntry(
102 # RadioBrowser doesn't support a library feature at all
103 # but MA users like to favorite their radio stations and
104 # have that included in backups so we store it in the config.
105 key=CONF_STORED_RADIOS,
106 type=ConfigEntryType.STRING,
107 multi_value=True,
108 label=CONF_STORED_RADIOS,
109 default_value=[],
110 required=False,
111 hidden=True,
112 ),
113 # hide some of the default (dynamic) entries for library management
114 CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN,
115 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN,
116 CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN,
117 )
118
119
120class RadioBrowserProvider(MusicProvider):
121 """Provider implementation for RadioBrowser."""
122
123 async def handle_async_init(self) -> None:
124 """Handle async initialization of the provider."""
125 self.radios = RadioBrowser(
126 session=self.mass.http_session, user_agent=f"MusicAssistant/{self.mass.version}"
127 )
128 try:
129 await self.radios.stats()
130 except RadioBrowserError as err:
131 raise ProviderUnavailableError(f"RadioBrowser API unavailable: {err}") from err
132
133 @use_cache(3600 * 24 * 14) # Cache for 14 days
134 async def search(
135 self, search_query: str, media_types: list[MediaType], limit: int = 10
136 ) -> SearchResults:
137 """Perform search on musicprovider."""
138 result = SearchResults()
139 if MediaType.RADIO not in media_types:
140 return result
141
142 try:
143 searchresult = await self.radios.search(name=search_query, limit=limit)
144 result.radio = [await self._parse_radio(item) for item in searchresult]
145 except RadioBrowserError as err:
146 self.logger.warning("RadioBrowser search failed for query '%s': %s", search_query, err)
147
148 return result
149
150 async def browse(self, path: str) -> Sequence[MediaItemType | BrowseFolder]:
151 """Browse this provider's items."""
152 path_parts = [] if "://" not in path else path.split("://")[1].split("/")
153
154 subpath = path_parts[0] if len(path_parts) > 0 else ""
155 subsubpath = path_parts[1] if len(path_parts) > 1 else ""
156 subsubsubpath = path_parts[2] if len(path_parts) > 2 else ""
157
158 if not subpath:
159 return [
160 BrowseFolder(
161 item_id="popularity",
162 provider=self.domain,
163 path=path + "popularity",
164 name="",
165 translation_key="radiobrowser_by_popularity",
166 ),
167 BrowseFolder(
168 item_id="category",
169 provider=self.domain,
170 path=path + "category",
171 name="",
172 translation_key="radiobrowser_by_category",
173 ),
174 ]
175
176 if subpath == "popularity":
177 if not subsubpath:
178 return [
179 BrowseFolder(
180 item_id="popular",
181 provider=self.domain,
182 path=path + "/popular",
183 name="",
184 translation_key="radiobrowser_by_clicks",
185 ),
186 BrowseFolder(
187 item_id="votes",
188 provider=self.domain,
189 path=path + "/votes",
190 name="",
191 translation_key="radiobrowser_by_votes",
192 ),
193 ]
194
195 if subsubpath == "popular":
196 return await self.get_by_popularity()
197
198 if subsubpath == "votes":
199 return await self.get_by_votes()
200
201 if subpath == "category":
202 if not subsubpath:
203 return [
204 BrowseFolder(
205 item_id="country",
206 provider=self.domain,
207 path=path + "/country",
208 name="",
209 translation_key="radiobrowser_by_country",
210 ),
211 BrowseFolder(
212 item_id="language",
213 provider=self.domain,
214 path=path + "/language",
215 name="",
216 translation_key="radiobrowser_by_language",
217 ),
218 BrowseFolder(
219 item_id="tag",
220 provider=self.domain,
221 path=path + "/tag",
222 name="",
223 translation_key="radiobrowser_by_tag",
224 ),
225 ]
226
227 if subsubpath == "country":
228 if subsubsubpath:
229 return await self.get_by_country(subsubsubpath)
230 return await self.get_country_folders(path)
231
232 if subsubpath == "language":
233 if subsubsubpath:
234 return await self.get_by_language(subsubsubpath)
235 return await self.get_language_folders(path)
236
237 if subsubpath == "tag":
238 if subsubsubpath:
239 return await self.get_by_tag(subsubsubpath)
240 return await self.get_tag_folders(path)
241
242 return []
243
244 async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
245 """Retrieve library/subscribed radio stations from the provider."""
246 stored_radios = self.config.get_value(CONF_STORED_RADIOS)
247 if TYPE_CHECKING:
248 stored_radios = cast("list[str]", stored_radios)
249 for item in stored_radios:
250 try:
251 yield await self.get_radio(item)
252 except MediaNotFoundError:
253 self.logger.warning("Radio station %s no longer exists", item)
254
255 async def library_add(self, item: MediaItemType) -> bool:
256 """Add item to provider's library. Return true on success."""
257 stored_radios = self.config.get_value(CONF_STORED_RADIOS)
258 if TYPE_CHECKING:
259 stored_radios = cast("list[str]", stored_radios)
260 if item.item_id in stored_radios:
261 return False
262 self.logger.debug("Adding radio %s to stored radios", item.item_id)
263 stored_radios = [*stored_radios, item.item_id]
264 self._update_config_value(CONF_STORED_RADIOS, stored_radios)
265 return True
266
267 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
268 """Remove item from provider's library. Return true on success."""
269 stored_radios = self.config.get_value(CONF_STORED_RADIOS)
270 if TYPE_CHECKING:
271 stored_radios = cast("list[str]", stored_radios)
272 if prov_item_id not in stored_radios:
273 return False
274 self.logger.debug("Removing radio %s from stored radios", prov_item_id)
275 stored_radios = [x for x in stored_radios if x != prov_item_id]
276 self._update_config_value(CONF_STORED_RADIOS, stored_radios)
277 return True
278
279 @use_cache(3600 * 6) # Cache for 6 hours
280 async def get_by_popularity(self) -> Sequence[Radio]:
281 """Get radio stations by popularity."""
282 try:
283 stations = await self.radios.stations(
284 hide_broken=True,
285 limit=1000,
286 order=Order.CLICK_COUNT,
287 reverse=True,
288 )
289 return [await self._parse_radio(station) for station in stations]
290 except RadioBrowserError as err:
291 raise ProviderUnavailableError(f"Failed to fetch popular stations: {err}") from err
292
293 @use_cache(3600 * 6) # Cache for 6 hours
294 async def get_by_votes(self) -> Sequence[Radio]:
295 """Get radio stations by votes."""
296 try:
297 stations = await self.radios.stations(
298 hide_broken=True,
299 limit=1000,
300 order=Order.VOTES,
301 reverse=True,
302 )
303 return [await self._parse_radio(station) for station in stations]
304 except RadioBrowserError as err:
305 raise ProviderUnavailableError(f"Failed to fetch stations by votes: {err}") from err
306
307 @use_cache(3600 * 24 * 7) # Cache for 7 days
308 async def get_country_folders(self, base_path: str) -> list[BrowseFolder]:
309 """Get a list of country names as BrowseFolder."""
310 try:
311 countries = await self.radios.countries(order=Order.NAME, hide_broken=True, limit=1000)
312 except RadioBrowserError as err:
313 raise ProviderUnavailableError(f"Failed to fetch countries: {err}") from err
314
315 items: list[BrowseFolder] = []
316 for country in countries:
317 folder = BrowseFolder(
318 item_id=country.code.lower(),
319 provider=self.domain,
320 path=base_path + "/" + country.code.lower(),
321 name=country.name,
322 )
323 if country.favicon and country.favicon.strip():
324 folder.image = MediaItemImage(
325 type=ImageType.THUMB,
326 path=country.favicon,
327 provider=self.instance_id,
328 remotely_accessible=True,
329 )
330 items.append(folder)
331 return items
332
333 @use_cache(3600 * 24 * 7) # Cache for 7 days
334 async def get_language_folders(self, base_path: str) -> list[BrowseFolder]:
335 """Get a list of language names as BrowseFolder."""
336 try:
337 languages = await self.radios.languages(
338 order=Order.STATION_COUNT, reverse=True, hide_broken=True, limit=1000
339 )
340 except RadioBrowserError as err:
341 raise ProviderUnavailableError(f"Failed to fetch languages: {err}") from err
342
343 return [
344 BrowseFolder(
345 item_id=language.name,
346 provider=self.domain,
347 path=base_path + "/" + language.name,
348 name=language.name,
349 )
350 for language in languages
351 ]
352
353 @use_cache(3600 * 24 * 7) # Cache for 7 days
354 async def get_tag_folders(self, base_path: str) -> list[BrowseFolder]:
355 """Get a list of tag names as BrowseFolder."""
356 try:
357 tags = await self.radios.tags(
358 hide_broken=True,
359 order=Order.STATION_COUNT,
360 reverse=True,
361 limit=100,
362 )
363 except RadioBrowserError as err:
364 raise ProviderUnavailableError(f"Failed to fetch tags: {err}") from err
365
366 tags.sort(key=lambda tag: tag.name)
367 return [
368 BrowseFolder(
369 item_id=tag.name,
370 provider=self.domain,
371 path=base_path + "/" + tag.name,
372 name=tag.name.title(),
373 )
374 for tag in tags
375 ]
376
377 @use_cache(3600 * 24) # Cache for 1 day
378 async def get_by_country(self, country_code: str) -> list[Radio]:
379 """Get radio stations by country."""
380 try:
381 stations = await self.radios.stations(
382 filter_by=FilterBy.COUNTRY_CODE_EXACT,
383 filter_term=country_code,
384 hide_broken=True,
385 limit=1000,
386 order=Order.CLICK_COUNT,
387 reverse=True,
388 )
389 return [await self._parse_radio(station) for station in stations]
390 except RadioBrowserError as err:
391 raise ProviderUnavailableError(
392 f"Failed to fetch stations for country {country_code}: {err}"
393 ) from err
394
395 @use_cache(3600 * 24) # Cache for 1 day
396 async def get_by_language(self, language: str) -> list[Radio]:
397 """Get radio stations by language."""
398 try:
399 stations = await self.radios.stations(
400 filter_by=FilterBy.LANGUAGE_EXACT,
401 filter_term=language,
402 hide_broken=True,
403 limit=1000,
404 order=Order.CLICK_COUNT,
405 reverse=True,
406 )
407 return [await self._parse_radio(station) for station in stations]
408 except RadioBrowserError as err:
409 raise ProviderUnavailableError(
410 f"Failed to fetch stations for language {language}: {err}"
411 ) from err
412
413 @use_cache(3600 * 24) # Cache for 1 day
414 async def get_by_tag(self, tag: str) -> list[Radio]:
415 """Get radio stations by tag."""
416 try:
417 stations = await self.radios.stations(
418 filter_by=FilterBy.TAG_EXACT,
419 filter_term=tag,
420 hide_broken=True,
421 limit=1000,
422 order=Order.CLICK_COUNT,
423 reverse=True,
424 )
425 return [await self._parse_radio(station) for station in stations]
426 except RadioBrowserError as err:
427 raise ProviderUnavailableError(
428 f"Failed to fetch stations for tag {tag}: {err}"
429 ) from err
430
431 @use_cache(3600 * 24 * 14) # Cache for 14 days
432 async def get_radio(self, prov_radio_id: str) -> Radio:
433 """Get radio station details."""
434 try:
435 radio = await self.radios.station(uuid=prov_radio_id)
436 if not radio:
437 raise MediaNotFoundError(f"Radio station {prov_radio_id} not found")
438 return await self._parse_radio(radio)
439 except RadioBrowserError as err:
440 raise ProviderUnavailableError(
441 f"Failed to fetch radio station {prov_radio_id}: {err}"
442 ) from err
443
444 async def _parse_radio(self, radio_obj: Station) -> Radio:
445 """Parse Radio object from json obj returned from api."""
446 radio = Radio(
447 item_id=radio_obj.uuid,
448 provider=self.domain,
449 name=radio_obj.name,
450 provider_mappings={
451 ProviderMapping(
452 item_id=radio_obj.uuid,
453 provider_domain=self.domain,
454 provider_instance=self.instance_id,
455 )
456 },
457 )
458 radio.metadata.popularity = radio_obj.click_count
459 radio.metadata.links = {MediaItemLink(type=LinkType.WEBSITE, url=radio_obj.homepage)}
460 radio.metadata.images = UniqueList(
461 [
462 MediaItemImage(
463 type=ImageType.THUMB,
464 path=radio_obj.favicon,
465 provider=self.instance_id,
466 remotely_accessible=True,
467 )
468 ]
469 )
470 return radio
471
472 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
473 """Get streamdetails for a radio station."""
474 try:
475 stream = await self.radios.station(uuid=item_id)
476 if not stream:
477 raise MediaNotFoundError(f"Radio station {item_id} not found")
478
479 await self.radios.station_click(uuid=item_id)
480
481 return StreamDetails(
482 provider=self.domain,
483 item_id=item_id,
484 audio_format=AudioFormat(
485 content_type=ContentType.try_parse(stream.codec),
486 ),
487 media_type=MediaType.RADIO,
488 stream_type=StreamType.HTTP,
489 path=stream.url_resolved or stream.url,
490 can_seek=False,
491 allow_seek=False,
492 )
493 except RadioBrowserError as err:
494 raise ProviderUnavailableError(
495 f"Failed to get stream details for {item_id}: {err}"
496 ) from err
497