/
/
/
1"""iBroadcast support for MusicAssistant."""
2
3from __future__ import annotations
4
5from typing import TYPE_CHECKING, Any
6
7from aiohttp import ClientSession
8from ibroadcastaio import IBroadcastClient
9from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
10from music_assistant_models.enums import (
11 ConfigEntryType,
12 ContentType,
13 ImageType,
14 MediaType,
15 ProviderFeature,
16 StreamType,
17)
18from music_assistant_models.errors import InvalidDataError, LoginFailed
19from music_assistant_models.media_items import (
20 Album,
21 Artist,
22 AudioFormat,
23 ItemMapping,
24 MediaItemImage,
25 Playlist,
26 ProviderMapping,
27 Track,
28 UniqueList,
29)
30from music_assistant_models.streamdetails import StreamDetails
31
32from music_assistant.constants import (
33 CONF_PASSWORD,
34 CONF_USERNAME,
35 UNKNOWN_ARTIST,
36 VARIOUS_ARTISTS_MBID,
37 VARIOUS_ARTISTS_NAME,
38)
39from music_assistant.controllers.cache import use_cache
40from music_assistant.helpers.util import infer_album_type, parse_title_and_version
41from music_assistant.models.music_provider import MusicProvider
42
43SUPPORTED_FEATURES = {
44 ProviderFeature.LIBRARY_ARTISTS,
45 ProviderFeature.LIBRARY_TRACKS,
46 ProviderFeature.LIBRARY_ALBUMS,
47 ProviderFeature.LIBRARY_PLAYLISTS,
48 ProviderFeature.BROWSE,
49 ProviderFeature.ARTIST_ALBUMS,
50}
51
52
53if TYPE_CHECKING:
54 from collections.abc import AsyncGenerator
55
56 from music_assistant_models.config_entries import ProviderConfig
57 from music_assistant_models.provider import ProviderManifest
58
59 from music_assistant.mass import MusicAssistant
60 from music_assistant.models import ProviderInstanceType
61
62
63async def setup(
64 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
65) -> ProviderInstanceType:
66 """Initialize provider(instance) with given configuration."""
67 if not config.get_value(CONF_USERNAME) or not config.get_value(CONF_PASSWORD):
68 msg = "Invalid login credentials"
69 raise LoginFailed(msg)
70 return IBroadcastProvider(mass, manifest, config, SUPPORTED_FEATURES)
71
72
73async def get_config_entries(
74 mass: MusicAssistant,
75 instance_id: str | None = None,
76 action: str | None = None,
77 values: dict[str, ConfigValueType] | None = None,
78) -> tuple[ConfigEntry, ...]:
79 """
80 Return Config entries to setup this provider.
81
82 instance_id: id of an existing provider instance (None if new instance setup).
83 action: [optional] action key called from config entries UI.
84 values: the (intermediate) raw values for config entries sent with the action.
85 """
86 # ruff: noqa: ARG001
87 return (
88 ConfigEntry(
89 key=CONF_USERNAME,
90 type=ConfigEntryType.STRING,
91 label="Username",
92 required=True,
93 ),
94 ConfigEntry(
95 key=CONF_PASSWORD,
96 type=ConfigEntryType.SECURE_STRING,
97 label="Password",
98 required=True,
99 ),
100 )
101
102
103class IBroadcastProvider(MusicProvider):
104 """Provider for iBroadcast."""
105
106 _user_id: str
107 _client: IBroadcastClient
108 _token: str
109
110 async def handle_async_init(self) -> None:
111 """Set up the iBroadcast provider."""
112 async with ClientSession() as session:
113 self._client = IBroadcastClient(session)
114 status = await self._client.login(
115 self.config.get_value(CONF_USERNAME),
116 self.config.get_value(CONF_PASSWORD),
117 )
118 self._user_id = status["user"]["id"]
119 self._token = status["user"]["token"]
120
121 # temporary call to refresh library until ibroadcast provides a detailed api
122 await self._client.refresh_library()
123
124 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
125 """Retrieve library albums from ibroadcast."""
126 for album in (await self._client.get_albums()).values():
127 try:
128 yield await self._parse_album(album)
129 except (KeyError, TypeError, InvalidDataError, IndexError) as error:
130 self.logger.debug("Parse album failed: %s", album, exc_info=error)
131 continue
132
133 @use_cache(3600 * 24 * 7) # Cache for 7 days
134 async def get_album(self, prov_album_id: str) -> Album:
135 """Get full album details by id."""
136 album_obj = await self._client.get_album(int(prov_album_id))
137 return await self._parse_album(album_obj)
138
139 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
140 """Retrieve all library artists from iBroadcast."""
141 for artist in (await self._client.get_artists()).values():
142 try:
143 yield await self._parse_artist(artist)
144 except (KeyError, TypeError, InvalidDataError, IndexError) as error:
145 self.logger.debug("Parse artist failed: %s", artist, exc_info=error)
146 continue
147
148 @use_cache(3600 * 24 * 7) # Cache for 7 days
149 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
150 """Get a list of albums for the given artist."""
151 albums_objs = [
152 album
153 for album in (await self._client.get_albums()).values()
154 if album["artist_id"] == int(prov_artist_id)
155 ]
156 albums = []
157 for album in albums_objs:
158 try:
159 albums.append(await self._parse_album(album))
160 except (KeyError, TypeError, InvalidDataError, IndexError) as error:
161 self.logger.debug("Parse album failed: %s", album, exc_info=error)
162 continue
163 return albums
164
165 @use_cache(3600 * 24 * 7) # Cache for 7 days
166 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
167 """Get album tracks for given album id."""
168 album = await self._client.get_album(int(prov_album_id))
169 return await self._get_tracks(album["tracks"])
170
171 @use_cache(3600 * 24 * 7) # Cache for 7 days
172 async def get_track(self, prov_track_id: str) -> Track:
173 """Get full track details by id."""
174 track_obj = await self._client.get_track(int(prov_track_id))
175 return await self._parse_track(track_obj)
176
177 @use_cache(3600 * 24 * 7) # Cache for 7 days
178 async def get_artist(self, prov_artist_id: str) -> Artist:
179 """Get full artist details by id."""
180 artist_obj = await self._client.get_artist(int(prov_artist_id))
181 return await self._parse_artist(artist_obj)
182
183 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
184 """Retrieve library tracks from iBroadcast."""
185 for track in (await self._client.get_tracks()).values():
186 try:
187 yield await self._parse_track(track)
188 except IndexError:
189 continue
190 except (KeyError, TypeError, InvalidDataError) as error:
191 self.logger.debug("Parse track failed: %s", track, exc_info=error)
192 continue
193
194 def _get_artist_item_mapping(self, artist_id: str, artist_obj: dict[str, Any]) -> ItemMapping:
195 if (not artist_id and artist_obj["name"] == "Various Artists") or artist_id == "0":
196 artist_id = VARIOUS_ARTISTS_MBID
197 return self._get_item_mapping(MediaType.ARTIST, artist_id, str(artist_obj.get("name")))
198
199 def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
200 return ItemMapping(
201 media_type=media_type,
202 item_id=key,
203 provider=self.instance_id,
204 name=name,
205 )
206
207 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
208 """Retrieve playlists from iBroadcast."""
209 for playlist in (await self._client.get_playlists()).values():
210 # Skip the auto generated playlist
211 if playlist["type"] != "recently-played" and playlist["type"] != "thumbsup":
212 yield await self._parse_playlist(playlist)
213
214 @use_cache(3600 * 24 * 7) # Cache for 7 days
215 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
216 """Get full playlist details by id."""
217 playlist_obj = await self._client.get_playlist(int(prov_playlist_id))
218 try:
219 playlist = await self._parse_playlist(playlist_obj)
220 except (KeyError, TypeError, InvalidDataError, IndexError) as error:
221 self.logger.debug("Parse playlist failed: %s", playlist_obj, exc_info=error)
222 return playlist
223
224 @use_cache(3600) # Cache for 1 hour
225 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
226 """Get playlist tracks."""
227 tracks: list[Track] = []
228 if page > 0:
229 return tracks
230 playlist_obj = await self._client.get_playlist(int(prov_playlist_id))
231 if "tracks" not in playlist_obj:
232 return tracks
233 return await self._get_tracks(playlist_obj["tracks"], True)
234
235 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
236 """Return the content details for the given track when it will be streamed."""
237 # How to buildup a stream url:
238 # [streaming_server]/[url]?Expires=[now]&Signature=[user token]&file_id=[file ID]
239 # &user_id=[user ID]&platform=[your app name]&version=[your app version]
240 # See https://devguide.ibroadcast.com/?p=streaming-server
241 url = await self._client.get_full_stream_url(int(item_id), "music-assistant")
242
243 return StreamDetails(
244 provider=self.instance_id,
245 item_id=item_id,
246 audio_format=AudioFormat(
247 content_type=ContentType.UNKNOWN,
248 ),
249 stream_type=StreamType.HTTP,
250 path=url,
251 can_seek=True,
252 allow_seek=True,
253 )
254
255 async def _get_tracks(self, track_ids: list[int], is_playlist: bool = False) -> list[Track]:
256 """Retrieve a list of tracks based on provided track IDs."""
257 tracks = []
258 for index, track_id in enumerate(track_ids, 1):
259 track_obj = await self._client.get_track(track_id)
260 if track_obj is not None:
261 track = await self._parse_track(track_obj)
262 if is_playlist:
263 track.position = index
264 tracks.append(track)
265 return tracks
266
267 async def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
268 """Parse a iBroadcast user response to Artist model object."""
269 artist_id = artist_obj["artist_id"]
270 artist = Artist(
271 item_id=artist_id,
272 name=artist_obj["name"],
273 provider=self.instance_id,
274 provider_mappings={
275 ProviderMapping(
276 item_id=artist_id,
277 provider_domain=self.domain,
278 provider_instance=self.instance_id,
279 url=f"https://media.ibroadcast.com/?view=container&container_id={artist_id}&type=artists",
280 )
281 },
282 )
283 # Artwork
284 if "artwork_id" in artist_obj:
285 artist.metadata.images = UniqueList(
286 [
287 MediaItemImage(
288 type=ImageType.THUMB,
289 path=await self._client.get_artist_artwork_url(artist_id),
290 provider=self.instance_id,
291 remotely_accessible=True,
292 )
293 ]
294 )
295 return artist
296
297 async def _parse_album(self, album_obj: dict[str, Any]) -> Album:
298 """Parse ibroadcast album object to generic layout."""
299 album_id = album_obj["album_id"]
300 name, version = parse_title_and_version(album_obj["name"])
301 album = Album(
302 item_id=album_id,
303 provider=self.instance_id,
304 name=name,
305 year=album_obj["year"],
306 version=version,
307 provider_mappings={
308 ProviderMapping(
309 item_id=album_id,
310 provider_domain=self.domain,
311 provider_instance=self.instance_id,
312 audio_format=AudioFormat(content_type=ContentType.MPEG),
313 url=f"https://media.ibroadcast.com/?view=container&container_id={album_id}&type=albums",
314 )
315 },
316 )
317 if album_obj["artist_id"] == 0:
318 artist = Artist(
319 item_id=VARIOUS_ARTISTS_MBID,
320 name=VARIOUS_ARTISTS_NAME,
321 provider=self.instance_id,
322 provider_mappings={
323 ProviderMapping(
324 item_id=VARIOUS_ARTISTS_MBID,
325 provider_domain=self.domain,
326 provider_instance=self.instance_id,
327 )
328 },
329 )
330 album.artists.append(artist)
331 else:
332 artist_mapping = self._get_item_mapping(
333 MediaType.ARTIST,
334 album_obj["artist_id"],
335 (await self._client.get_artist(album_obj["artist_id"]))["name"]
336 if await self._client.get_artist(album_obj["artist_id"])
337 else UNKNOWN_ARTIST,
338 )
339 album.artists.append(artist_mapping)
340
341 if "rating" in album_obj and album_obj["rating"] == 5:
342 album.favorite = True
343 # iBroadcast doesn't seem to know album type - try inference
344 album.album_type = infer_album_type(name, version)
345
346 # There is only an artwork in the tracks, lets get the first track one
347 artwork_url = await self._client.get_album_artwork_url(album_id)
348 if artwork_url:
349 album.metadata.images = UniqueList([self._get_artwork_object(artwork_url)])
350 return album
351
352 def _get_artwork_object(self, url: str) -> MediaItemImage:
353 return MediaItemImage(
354 type=ImageType.THUMB,
355 path=url,
356 provider=self.instance_id,
357 remotely_accessible=True,
358 )
359
360 async def _parse_track(self, track_obj: dict[str, Any]) -> Track:
361 """Parse an iBroadcast track object to a Track model object."""
362 track = Track(
363 item_id=track_obj["track_id"],
364 provider=self.instance_id,
365 name=track_obj["title"],
366 provider_mappings={
367 ProviderMapping(
368 item_id=track_obj["track_id"],
369 provider_domain=self.domain,
370 provider_instance=self.instance_id,
371 available=not track_obj["trashed"],
372 audio_format=AudioFormat(
373 content_type=ContentType.MPEG,
374 ),
375 )
376 },
377 )
378 if track_obj["album_id"]:
379 album = await self._client.get_album(track_obj["album_id"])
380
381 if "rating" in track_obj and track_obj["rating"] == 5:
382 track.favorite = True
383 if "length" in track_obj and str(track_obj["length"]).isdigit():
384 track.duration = track_obj["length"]
385 # use the disc number if available
386 if album and album["disc"] > 0:
387 track.disc_number = album["disc"]
388 track.track_number = int(track_obj["track"])
389 # otherwise, track number might look like 201, meaning, disc 2, track 1
390 elif track_obj["track"] > 99:
391 track.disc_number = int(str(track_obj["track"])[:1])
392 track.track_number = int(str(track_obj["track"])[1:])
393 # or just the track number and no disc number
394 else:
395 track.track_number = int(track_obj["track"])
396 # Track artists
397 if "artist_id" in track_obj:
398 artist_id = track_obj["artist_id"]
399 track.artists = UniqueList(
400 [self._get_artist_item_mapping(artist_id, await self._client.get_artist(artist_id))]
401 )
402 # additional artists structure: 'artists_additional': [[artist id, phrase, type]]
403 track.artists.extend(
404 [
405 self._get_artist_item_mapping(
406 additional_artist[0],
407 await self._client.get_artist(additional_artist[0]),
408 )
409 for additional_artist in track_obj["artists_additional"]
410 if additional_artist[0]
411 ]
412 )
413 # guard that track has valid artists
414 if not track.artists:
415 msg = "Track is missing artists"
416 raise InvalidDataError(msg)
417
418 # Artwork
419 track.metadata.images = UniqueList(
420 [
421 self._get_artwork_object(
422 await self._client.get_track_artwork_url(track_obj["track_id"])
423 )
424 ]
425 )
426 # Genre
427 genres: set[str] = set()
428 if track_obj["genre"]:
429 genres.add(track_obj["genre"])
430 if track_obj["genres_additional"]:
431 genres.add(track_obj["genres_additional"])
432 track.metadata.genres = genres
433 # album info
434 if album:
435 track.album = self._get_item_mapping(
436 MediaType.ALBUM, track_obj["album_id"], album["name"]
437 )
438 return track
439
440 async def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
441 """Parse an iBroadcast Playlist response to a Playlist object."""
442 playlist_id = str(playlist_obj["playlist_id"])
443 playlist = Playlist(
444 item_id=playlist_id,
445 provider=self.instance_id,
446 name=playlist_obj["name"],
447 provider_mappings={
448 ProviderMapping(
449 item_id=playlist_id,
450 provider_domain=self.domain,
451 provider_instance=self.instance_id,
452 )
453 },
454 )
455 # Can be supported in future, the API has options available
456 playlist.is_editable = False
457 playlist.metadata.images = UniqueList(
458 [
459 self._get_artwork_object(
460 await self._client.get_playlist_artwork_url(int(playlist_id))
461 )
462 ]
463 )
464 if "description" in playlist_obj:
465 playlist.metadata.description = playlist_obj["description"]
466 return playlist
467