/
/
/
1"""RadioBrowser musicprovider support for MusicAssistant."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator, Sequence
6from typing import TYPE_CHECKING, cast
7
8from music_assistant_models.config_entries import ConfigEntry
9from music_assistant_models.enums import (
10 ConfigEntryType,
11 ContentType,
12 ImageType,
13 LinkType,
14 MediaType,
15 ProviderFeature,
16 StreamType,
17)
18from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError
19from music_assistant_models.media_items import (
20 AudioFormat,
21 BrowseFolder,
22 MediaItemImage,
23 MediaItemLink,
24 MediaItemType,
25 ProviderMapping,
26 Radio,
27 SearchResults,
28 UniqueList,
29)
30from music_assistant_models.streamdetails import StreamDetails
31from radios import FilterBy, Order, RadioBrowser, RadioBrowserError, Station
32
33from music_assistant.constants import (
34 CONF_ENTRY_LIBRARY_SYNC_BACK,
35 CONF_ENTRY_LIBRARY_SYNC_RADIOS,
36 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS,
37)
38from music_assistant.controllers.cache import use_cache
39from music_assistant.models.music_provider import MusicProvider
40
41SUPPORTED_FEATURES = {
42 ProviderFeature.SEARCH,
43 ProviderFeature.BROWSE,
44 ProviderFeature.LIBRARY_RADIOS,
45 ProviderFeature.LIBRARY_RADIOS_EDIT,
46}
47
48if TYPE_CHECKING:
49 from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
50 from music_assistant_models.provider import ProviderManifest
51
52 from music_assistant.mass import MusicAssistant
53 from music_assistant.models import ProviderInstanceType
54
55CONF_STORED_RADIOS = "stored_radios"
56
57CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN = ConfigEntry.from_dict(
58 {
59 **CONF_ENTRY_LIBRARY_SYNC_RADIOS.to_dict(),
60 "hidden": True,
61 "default_value": "import_only",
62 }
63)
64CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN = ConfigEntry.from_dict(
65 {
66 **CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS.to_dict(),
67 "hidden": True,
68 "default_value": 180,
69 }
70)
71CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN = ConfigEntry.from_dict(
72 {
73 **CONF_ENTRY_LIBRARY_SYNC_BACK.to_dict(),
74 "hidden": True,
75 "default_value": True,
76 }
77)
78
79
80async def setup(
81 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
82) -> ProviderInstanceType:
83 """Initialize provider(instance) with given configuration."""
84 return RadioBrowserProvider(mass, manifest, config, SUPPORTED_FEATURES)
85
86
87async def get_config_entries(
88 mass: MusicAssistant,
89 instance_id: str | None = None,
90 action: str | None = None,
91 values: dict[str, ConfigValueType] | None = None,
92) -> tuple[ConfigEntry, ...]:
93 """
94 Return Config entries to setup this provider.
95 instance_id: id of an existing provider instance (None if new instance setup).
96 action: [optional] action key called from config entries UI.
97 values: the (intermediate) raw values for config entries sent with the action.
98 """
99 # ruff: noqa: ARG001 D205
100 return (
101 ConfigEntry(
102 # RadioBrowser doesn't support a library feature at all
103 # but MA users like to favorite their radio stations and
104 # have that included in backups so we store it in the config.
105 key=CONF_STORED_RADIOS,
106 type=ConfigEntryType.STRING,
107 multi_value=True,
108 label=CONF_STORED_RADIOS,
109 default_value=[],
110 required=False,
111 hidden=True,
112 ),
113 # hide some of the default (dynamic) entries for library management
114 CONF_ENTRY_LIBRARY_SYNC_RADIOS_HIDDEN,
115 CONF_ENTRY_PROVIDER_SYNC_INTERVAL_RADIOS_HIDDEN,
116 CONF_ENTRY_LIBRARY_SYNC_BACK_HIDDEN,
117 )
118
119
120class RadioBrowserProvider(MusicProvider):
121 """Provider implementation for RadioBrowser."""
122
123 async def handle_async_init(self) -> None:
124 """Handle async initialization of the provider."""
125 self.radios = RadioBrowser(
126 session=self.mass.http_session, user_agent=f"MusicAssistant/{self.mass.version}"
127 )
128 try:
129 await self.radios.stats()
130 except RadioBrowserError as err:
131 raise ProviderUnavailableError(f"RadioBrowser API unavailable: {err}") from err
132
133 # copy the radiobrowser items that were added to the library
134 # TODO: remove this logic after version 2.3.0 or later
135 if not self.config.get_value(CONF_STORED_RADIOS) and self.mass.music.database:
136 async for db_row in self.mass.music.database.iter_items(
137 "provider_mappings",
138 {"media_type": "radio", "provider_domain": "radiobrowser"},
139 ):
140 await self.library_add(await self.get_radio(db_row["provider_item_id"]))
141
142 @use_cache(3600 * 24 * 14) # Cache for 14 days
143 async def search(
144 self, search_query: str, media_types: list[MediaType], limit: int = 10
145 ) -> SearchResults:
146 """Perform search on musicprovider."""
147 result = SearchResults()
148 if MediaType.RADIO not in media_types:
149 return result
150
151 try:
152 searchresult = await self.radios.search(name=search_query, limit=limit)
153 result.radio = [await self._parse_radio(item) for item in searchresult]
154 except RadioBrowserError as err:
155 self.logger.warning("RadioBrowser search failed for query '%s': %s", search_query, err)
156
157 return result
158
159 async def browse(self, path: str) -> Sequence[MediaItemType | BrowseFolder]:
160 """Browse this provider's items."""
161 path_parts = [] if "://" not in path else path.split("://")[1].split("/")
162
163 subpath = path_parts[0] if len(path_parts) > 0 else ""
164 subsubpath = path_parts[1] if len(path_parts) > 1 else ""
165 subsubsubpath = path_parts[2] if len(path_parts) > 2 else ""
166
167 if not subpath:
168 return [
169 BrowseFolder(
170 item_id="popularity",
171 provider=self.domain,
172 path=path + "popularity",
173 name="",
174 translation_key="radiobrowser_by_popularity",
175 ),
176 BrowseFolder(
177 item_id="category",
178 provider=self.domain,
179 path=path + "category",
180 name="",
181 translation_key="radiobrowser_by_category",
182 ),
183 ]
184
185 if subpath == "popularity":
186 if not subsubpath:
187 return [
188 BrowseFolder(
189 item_id="popular",
190 provider=self.domain,
191 path=path + "/popular",
192 name="",
193 translation_key="radiobrowser_by_clicks",
194 ),
195 BrowseFolder(
196 item_id="votes",
197 provider=self.domain,
198 path=path + "/votes",
199 name="",
200 translation_key="radiobrowser_by_votes",
201 ),
202 ]
203
204 if subsubpath == "popular":
205 return await self.get_by_popularity()
206
207 if subsubpath == "votes":
208 return await self.get_by_votes()
209
210 if subpath == "category":
211 if not subsubpath:
212 return [
213 BrowseFolder(
214 item_id="country",
215 provider=self.domain,
216 path=path + "/country",
217 name="",
218 translation_key="radiobrowser_by_country",
219 ),
220 BrowseFolder(
221 item_id="language",
222 provider=self.domain,
223 path=path + "/language",
224 name="",
225 translation_key="radiobrowser_by_language",
226 ),
227 BrowseFolder(
228 item_id="tag",
229 provider=self.domain,
230 path=path + "/tag",
231 name="",
232 translation_key="radiobrowser_by_tag",
233 ),
234 ]
235
236 if subsubpath == "country":
237 if subsubsubpath:
238 return await self.get_by_country(subsubsubpath)
239 return await self.get_country_folders(path)
240
241 if subsubpath == "language":
242 if subsubsubpath:
243 return await self.get_by_language(subsubsubpath)
244 return await self.get_language_folders(path)
245
246 if subsubpath == "tag":
247 if subsubsubpath:
248 return await self.get_by_tag(subsubsubpath)
249 return await self.get_tag_folders(path)
250
251 return []
252
253 async def get_library_radios(self) -> AsyncGenerator[Radio, None]:
254 """Retrieve library/subscribed radio stations from the provider."""
255 stored_radios = self.config.get_value(CONF_STORED_RADIOS)
256 if TYPE_CHECKING:
257 stored_radios = cast("list[str]", stored_radios)
258 for item in stored_radios:
259 try:
260 yield await self.get_radio(item)
261 except MediaNotFoundError:
262 self.logger.warning("Radio station %s no longer exists", item)
263
264 async def library_add(self, item: MediaItemType) -> bool:
265 """Add item to provider's library. Return true on success."""
266 stored_radios = self.config.get_value(CONF_STORED_RADIOS)
267 if TYPE_CHECKING:
268 stored_radios = cast("list[str]", stored_radios)
269 if item.item_id in stored_radios:
270 return False
271 self.logger.debug("Adding radio %s to stored radios", item.item_id)
272 stored_radios = [*stored_radios, item.item_id]
273 self._update_config_value(CONF_STORED_RADIOS, stored_radios)
274 return True
275
276 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
277 """Remove item from provider's library. Return true on success."""
278 stored_radios = self.config.get_value(CONF_STORED_RADIOS)
279 if TYPE_CHECKING:
280 stored_radios = cast("list[str]", stored_radios)
281 if prov_item_id not in stored_radios:
282 return False
283 self.logger.debug("Removing radio %s from stored radios", prov_item_id)
284 stored_radios = [x for x in stored_radios if x != prov_item_id]
285 self._update_config_value(CONF_STORED_RADIOS, stored_radios)
286 return True
287
288 @use_cache(3600 * 6) # Cache for 6 hours
289 async def get_by_popularity(self) -> Sequence[Radio]:
290 """Get radio stations by popularity."""
291 try:
292 stations = await self.radios.stations(
293 hide_broken=True,
294 limit=1000,
295 order=Order.CLICK_COUNT,
296 reverse=True,
297 )
298 return [await self._parse_radio(station) for station in stations]
299 except RadioBrowserError as err:
300 raise ProviderUnavailableError(f"Failed to fetch popular stations: {err}") from err
301
302 @use_cache(3600 * 6) # Cache for 6 hours
303 async def get_by_votes(self) -> Sequence[Radio]:
304 """Get radio stations by votes."""
305 try:
306 stations = await self.radios.stations(
307 hide_broken=True,
308 limit=1000,
309 order=Order.VOTES,
310 reverse=True,
311 )
312 return [await self._parse_radio(station) for station in stations]
313 except RadioBrowserError as err:
314 raise ProviderUnavailableError(f"Failed to fetch stations by votes: {err}") from err
315
316 @use_cache(3600 * 24 * 7) # Cache for 7 days
317 async def get_country_folders(self, base_path: str) -> list[BrowseFolder]:
318 """Get a list of country names as BrowseFolder."""
319 try:
320 countries = await self.radios.countries(order=Order.NAME, hide_broken=True, limit=1000)
321 except RadioBrowserError as err:
322 raise ProviderUnavailableError(f"Failed to fetch countries: {err}") from err
323
324 items: list[BrowseFolder] = []
325 for country in countries:
326 folder = BrowseFolder(
327 item_id=country.code.lower(),
328 provider=self.domain,
329 path=base_path + "/" + country.code.lower(),
330 name=country.name,
331 )
332 if country.favicon and country.favicon.strip():
333 folder.image = MediaItemImage(
334 type=ImageType.THUMB,
335 path=country.favicon,
336 provider=self.instance_id,
337 remotely_accessible=True,
338 )
339 items.append(folder)
340 return items
341
342 @use_cache(3600 * 24 * 7) # Cache for 7 days
343 async def get_language_folders(self, base_path: str) -> list[BrowseFolder]:
344 """Get a list of language names as BrowseFolder."""
345 try:
346 languages = await self.radios.languages(
347 order=Order.STATION_COUNT, reverse=True, hide_broken=True, limit=1000
348 )
349 except RadioBrowserError as err:
350 raise ProviderUnavailableError(f"Failed to fetch languages: {err}") from err
351
352 return [
353 BrowseFolder(
354 item_id=language.name,
355 provider=self.domain,
356 path=base_path + "/" + language.name,
357 name=language.name,
358 )
359 for language in languages
360 ]
361
362 @use_cache(3600 * 24 * 7) # Cache for 7 days
363 async def get_tag_folders(self, base_path: str) -> list[BrowseFolder]:
364 """Get a list of tag names as BrowseFolder."""
365 try:
366 tags = await self.radios.tags(
367 hide_broken=True,
368 order=Order.STATION_COUNT,
369 reverse=True,
370 limit=100,
371 )
372 except RadioBrowserError as err:
373 raise ProviderUnavailableError(f"Failed to fetch tags: {err}") from err
374
375 tags.sort(key=lambda tag: tag.name)
376 return [
377 BrowseFolder(
378 item_id=tag.name,
379 provider=self.domain,
380 path=base_path + "/" + tag.name,
381 name=tag.name.title(),
382 )
383 for tag in tags
384 ]
385
386 @use_cache(3600 * 24) # Cache for 1 day
387 async def get_by_country(self, country_code: str) -> list[Radio]:
388 """Get radio stations by country."""
389 try:
390 stations = await self.radios.stations(
391 filter_by=FilterBy.COUNTRY_CODE_EXACT,
392 filter_term=country_code,
393 hide_broken=True,
394 limit=1000,
395 order=Order.CLICK_COUNT,
396 reverse=True,
397 )
398 return [await self._parse_radio(station) for station in stations]
399 except RadioBrowserError as err:
400 raise ProviderUnavailableError(
401 f"Failed to fetch stations for country {country_code}: {err}"
402 ) from err
403
404 @use_cache(3600 * 24) # Cache for 1 day
405 async def get_by_language(self, language: str) -> list[Radio]:
406 """Get radio stations by language."""
407 try:
408 stations = await self.radios.stations(
409 filter_by=FilterBy.LANGUAGE_EXACT,
410 filter_term=language,
411 hide_broken=True,
412 limit=1000,
413 order=Order.CLICK_COUNT,
414 reverse=True,
415 )
416 return [await self._parse_radio(station) for station in stations]
417 except RadioBrowserError as err:
418 raise ProviderUnavailableError(
419 f"Failed to fetch stations for language {language}: {err}"
420 ) from err
421
422 @use_cache(3600 * 24) # Cache for 1 day
423 async def get_by_tag(self, tag: str) -> list[Radio]:
424 """Get radio stations by tag."""
425 try:
426 stations = await self.radios.stations(
427 filter_by=FilterBy.TAG_EXACT,
428 filter_term=tag,
429 hide_broken=True,
430 limit=1000,
431 order=Order.CLICK_COUNT,
432 reverse=True,
433 )
434 return [await self._parse_radio(station) for station in stations]
435 except RadioBrowserError as err:
436 raise ProviderUnavailableError(
437 f"Failed to fetch stations for tag {tag}: {err}"
438 ) from err
439
440 @use_cache(3600 * 24 * 14) # Cache for 14 days
441 async def get_radio(self, prov_radio_id: str) -> Radio:
442 """Get radio station details."""
443 try:
444 radio = await self.radios.station(uuid=prov_radio_id)
445 if not radio:
446 raise MediaNotFoundError(f"Radio station {prov_radio_id} not found")
447 return await self._parse_radio(radio)
448 except RadioBrowserError as err:
449 raise ProviderUnavailableError(
450 f"Failed to fetch radio station {prov_radio_id}: {err}"
451 ) from err
452
453 async def _parse_radio(self, radio_obj: Station) -> Radio:
454 """Parse Radio object from json obj returned from api."""
455 radio = Radio(
456 item_id=radio_obj.uuid,
457 provider=self.domain,
458 name=radio_obj.name,
459 provider_mappings={
460 ProviderMapping(
461 item_id=radio_obj.uuid,
462 provider_domain=self.domain,
463 provider_instance=self.instance_id,
464 )
465 },
466 )
467 radio.metadata.popularity = radio_obj.click_count
468 radio.metadata.links = {MediaItemLink(type=LinkType.WEBSITE, url=radio_obj.homepage)}
469 radio.metadata.images = UniqueList(
470 [
471 MediaItemImage(
472 type=ImageType.THUMB,
473 path=radio_obj.favicon,
474 provider=self.instance_id,
475 remotely_accessible=True,
476 )
477 ]
478 )
479 return radio
480
481 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
482 """Get streamdetails for a radio station."""
483 try:
484 stream = await self.radios.station(uuid=item_id)
485 if not stream:
486 raise MediaNotFoundError(f"Radio station {item_id} not found")
487
488 await self.radios.station_click(uuid=item_id)
489
490 return StreamDetails(
491 provider=self.domain,
492 item_id=item_id,
493 audio_format=AudioFormat(
494 content_type=ContentType.try_parse(stream.codec),
495 ),
496 media_type=MediaType.RADIO,
497 stream_type=StreamType.HTTP,
498 path=stream.url_resolved or stream.url,
499 can_seek=False,
500 allow_seek=False,
501 )
502 except RadioBrowserError as err:
503 raise ProviderUnavailableError(
504 f"Failed to get stream details for {item_id}: {err}"
505 ) from err
506