/
/
/
1"""Nugs.net musicprovider support for MusicAssistant."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator
6from datetime import UTC, datetime
7from time import time
8from typing import TYPE_CHECKING, Any
9
10from aiohttp import ClientTimeout
11from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
12from music_assistant_models.enums import (
13 ConfigEntryType,
14 ContentType,
15 ImageType,
16 MediaType,
17 ProviderFeature,
18 StreamType,
19)
20from music_assistant_models.errors import (
21 InvalidDataError,
22 LoginFailed,
23 MediaNotFoundError,
24 ResourceTemporarilyUnavailable,
25)
26from music_assistant_models.media_items import (
27 Album,
28 Artist,
29 AudioFormat,
30 ItemMapping,
31 MediaItemImage,
32 MediaItemMetadata,
33 Playlist,
34 ProviderMapping,
35 RecommendationFolder,
36 Track,
37 UniqueList,
38)
39from music_assistant_models.streamdetails import StreamDetails
40
41from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
42from music_assistant.controllers.cache import use_cache
43from music_assistant.helpers.json import json_loads
44from music_assistant.helpers.util import infer_album_type, parse_title_and_version
45from music_assistant.models.music_provider import MusicProvider
46
47if TYPE_CHECKING:
48 from music_assistant_models.config_entries import ProviderConfig
49 from music_assistant_models.provider import ProviderManifest
50
51 from music_assistant.mass import MusicAssistant
52 from music_assistant.models import ProviderInstanceType
53
54SUPPORTED_FEATURES = {
55 ProviderFeature.BROWSE,
56 ProviderFeature.LIBRARY_ARTISTS,
57 ProviderFeature.LIBRARY_ALBUMS,
58 ProviderFeature.LIBRARY_PLAYLISTS,
59 ProviderFeature.ARTIST_ALBUMS,
60 ProviderFeature.RECOMMENDATIONS,
61}
62
63
64async def setup(
65 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
66) -> ProviderInstanceType:
67 """Initialize provider(instance) with given configuration."""
68 return NugsProvider(mass, manifest, config, SUPPORTED_FEATURES)
69
70
71async def get_config_entries(
72 mass: MusicAssistant,
73 instance_id: str | None = None,
74 action: str | None = None,
75 values: dict[str, ConfigValueType] | None = None,
76) -> tuple[ConfigEntry, ...]:
77 """
78 Return Config entries to setup this provider.
79
80 instance_id: id of an existing provider instance (None if new instance setup).
81 action: [optional] action key called from config entries UI.
82 values: the (intermediate) raw values for config entries sent with the action.
83 """
84 # ruff: noqa: ARG001
85 return (
86 ConfigEntry(
87 key=CONF_USERNAME,
88 type=ConfigEntryType.STRING,
89 label="Username",
90 required=True,
91 ),
92 ConfigEntry(
93 key=CONF_PASSWORD,
94 type=ConfigEntryType.SECURE_STRING,
95 label="Password",
96 required=True,
97 ),
98 )
99
100
101class NugsProvider(MusicProvider):
102 """Provider implementation for Nugs.net."""
103
104 _auth_token: str | None = None
105 _token_expiry: float = 0
106
107 async def handle_async_init(self) -> None:
108 """Handle async initialization of the provider."""
109 await self.login()
110
111 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
112 """Retrieve library artists from nugs.net."""
113 artist_data = await self._get_all_items("stash", "artists/favorite/")
114 for item in artist_data:
115 if item and item["id"]:
116 yield self._parse_artist(item)
117
118 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
119 """Retrieve library albums from the provider."""
120 album_data = await self._get_all_items("stash", "releases/favorite")
121 for item in album_data:
122 if item and item["id"]:
123 yield self._parse_album(item)
124
125 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
126 """Retrieve playlists from the provider."""
127 playlist_data = await self._get_all_items("stash", "playlists/")
128 for item in playlist_data:
129 if item and item["id"]:
130 yield self._parse_playlist(item)
131
132 @use_cache(3600 * 24 * 14) # Cache for 14 days
133 async def get_artist(self, prov_artist_id: str) -> Artist:
134 """Get artist details by id."""
135 endpoint = f"/releases/recent?limit=1&artistIds={prov_artist_id}"
136 artist_response = await self._get_data("catalog", endpoint)
137 artist_data = artist_response["items"][0]["artist"]
138 return self._parse_artist(artist_data)
139
140 @use_cache(3600 * 24 * 14) # Cache for 14 days
141 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
142 """Get a list of all albums for the given artist."""
143 params = {
144 "artistIds": prov_artist_id,
145 "contentType": "any",
146 }
147 return [
148 self._parse_album(item)
149 for item in await self._get_all_items("catalog", "releases/recent", **params)
150 if (item and item["id"])
151 ]
152
153 @use_cache(3600 * 24 * 14) # Cache for 14 days
154 async def get_album(self, prov_album_id: str) -> Album:
155 """Get album details by id."""
156 endpoint = f"shows/{prov_album_id}"
157 response = await self._get_data("catalog", endpoint)
158 return self._parse_album(response["Response"])
159
160 @use_cache(3600 * 24 * 14) # Cache for 14 days
161 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
162 """Get full playlist details by id."""
163 endpoint = f"playlists/{prov_playlist_id}"
164 response = await self._get_data("stash", endpoint)
165 return self._parse_playlist(response["items"])
166
167 @use_cache(3600 * 24 * 14) # Cache for 14 days
168 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
169 """Get all album tracks for given album id."""
170 endpoint = f"shows/{prov_album_id}"
171 response = await self._get_data("catalog", endpoint)
172 album_data = response["Response"]
173 artist = await self.get_artist(album_data["artistID"])
174 album = self._get_item_mapping(
175 MediaType.ALBUM, album_data["containerID"], album_data["containerInfo"]
176 )
177 image = f"https://api.livedownloads.com{album_data['img']['url']}"
178 return [
179 self._parse_track(item, artist=artist, album=album, image_url=image)
180 for item in album_data["tracks"]
181 if item["trackID"]
182 ]
183
184 @use_cache(3600) # Cache for 1 hour
185 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
186 """Get playlist tracks."""
187 result: list[Track] = []
188 if page > 0:
189 # paging not yet supported
190 return []
191 endpoint = f"/playlists/{prov_playlist_id}/playlist-tracks/all"
192 nugs_result = await self._get_data("stash", endpoint)
193 for index, item in enumerate(nugs_result["items"], 1):
194 track = self._parse_track(item)
195 track.position = index
196 result.append(track)
197 return result
198
199 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
200 """Return the content details for the given track when it will be streamed."""
201 stream_url = await self._get_stream_url(item_id)
202 return StreamDetails(
203 item_id=item_id,
204 provider=self.instance_id,
205 audio_format=AudioFormat(
206 content_type=ContentType.UNKNOWN,
207 ),
208 stream_type=StreamType.HTTP,
209 path=stream_url,
210 )
211
212 @use_cache(3600 * 4) # Cache for 4 hours
213 async def recommendations(self) -> list[RecommendationFolder]:
214 """Get this provider's recommendations."""
215 popular = "releases/popular"
216 recom_shows = "me/releases/recommendations"
217 recent = "releases/recent"
218
219 popular_folder = RecommendationFolder(
220 name="Most Popular",
221 item_id="nugs_popular_shows",
222 provider=self.instance_id,
223 )
224 recommended_folder = RecommendationFolder(
225 name="Recommended Shows",
226 item_id="nugs_recommended_shows",
227 provider=self.instance_id,
228 )
229 recent_folder = RecommendationFolder(
230 name="Recent Shows",
231 item_id="nugs_recent_shows",
232 provider=self.instance_id,
233 )
234 popular_data = await self._get_data("catalog", popular, limit=20)
235 for item in popular_data["items"]:
236 endpoint = f"shows/{item['id']}"
237 response = await self._get_data("catalog", endpoint)
238 popular_folder.items.append(self._parse_album(response["Response"]))
239 recommended_data = await self._get_data("catalog", recom_shows)
240 for item in recommended_data["items"]:
241 recommended_folder.items.append(self._parse_album(item))
242 recent_data = await self._get_data("catalog", recent, limit=50)
243 for item in recent_data["items"]:
244 recent_folder.items.append(self._parse_album(item))
245
246 return [
247 popular_folder,
248 recommended_folder,
249 recent_folder,
250 ]
251
252 def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
253 """Parse nugs artist object to generic layout."""
254 artist_id = artist_obj.get("artistID") or artist_obj.get("id")
255 artist_name = artist_obj.get("artistName") or artist_obj.get("name")
256 artist = Artist(
257 item_id=str(artist_id),
258 provider=self.instance_id,
259 name=str(artist_name),
260 provider_mappings={
261 ProviderMapping(
262 item_id=str(artist_id),
263 provider_domain=self.domain,
264 provider_instance=self.instance_id,
265 url=f"https://catalog.nugs.net/api/v1/artists?ids={artist_id}",
266 )
267 },
268 )
269 if artist_obj.get("avatarImage"):
270 artist.metadata.add_image(
271 MediaItemImage(
272 type=ImageType.THUMB,
273 path=artist_obj["avatarImage"]["url"],
274 provider=self.instance_id,
275 remotely_accessible=True,
276 )
277 )
278 return artist
279
280 def _parse_album(self, album_obj: dict[str, Any]) -> Album:
281 """Parse nugs release/show/album object to generic album layout."""
282 item_id = album_obj.get("releaseId") or album_obj.get("id") or album_obj.get("containerID")
283 title = album_obj.get("title") or album_obj.get("containerInfo")
284 name, version = parse_title_and_version(str(title))
285 album = Album(
286 item_id=str(item_id),
287 provider=self.instance_id,
288 name=name,
289 version=version,
290 provider_mappings={
291 ProviderMapping(
292 item_id=str(item_id),
293 provider_domain=self.domain,
294 provider_instance=self.instance_id,
295 )
296 },
297 )
298
299 artist_obj = album_obj.get("artist", False) or {
300 "id": album_obj["artistID"],
301 "name": album_obj["artistName"],
302 }
303 if artist_obj.get("name") and artist_obj.get("id"):
304 album.artists.append(self._parse_artist(artist_obj))
305
306 path: str | None = None
307 if album_obj.get("image"):
308 path = album_obj["image"]["url"]
309 if album_obj.get("img"):
310 path = f"https://api.livedownloads.com{album_obj['img']['url']}"
311 if path:
312 album.metadata.add_image(
313 MediaItemImage(
314 type=ImageType.THUMB,
315 path=path,
316 provider=self.instance_id,
317 remotely_accessible=True,
318 )
319 )
320 year = album_obj.get("performanceDateYear", False)
321 if not year:
322 date = album_obj.get("performanceDate", False) or album_obj.get(
323 "albumreleaseDate", False
324 )
325 if date:
326 year = date.split("-")[0]
327 if year:
328 album.year = int(year)
329
330 # No album type info in this provider so try and infer it
331 album.album_type = infer_album_type(album.name, album.version)
332
333 return album
334
335 def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
336 """Parse nugs playlist object to generic layout."""
337 return Playlist(
338 item_id=playlist_obj["id"],
339 provider=self.instance_id,
340 name=playlist_obj["name"],
341 provider_mappings={
342 ProviderMapping(
343 item_id=playlist_obj["id"],
344 provider_domain=self.domain,
345 provider_instance=self.instance_id,
346 )
347 },
348 metadata=MediaItemMetadata(
349 images=UniqueList(
350 [
351 MediaItemImage(
352 type=ImageType.THUMB,
353 path=playlist_obj["imageUrl"],
354 provider=self.instance_id,
355 remotely_accessible=True,
356 )
357 ]
358 ),
359 ),
360 is_editable=False,
361 )
362
363 def _parse_track(
364 self,
365 track_obj: dict[str, Any],
366 artist: Artist | None = None,
367 album: Album | ItemMapping | None = None,
368 image_url: str | None = None,
369 ) -> Track:
370 """Parse response from inconsistent nugs.net APIs to a Track model object."""
371 track_id = (
372 track_obj.get("trackId") or track_obj.get("trackID") or track_obj.get("trackLabel")
373 )
374 track_name = track_obj.get("name") or track_obj.get("songTitle")
375 name, version = parse_title_and_version(str(track_name))
376
377 track = Track(
378 item_id=str(track_id),
379 provider=self.instance_id,
380 name=name,
381 version=version,
382 provider_mappings={
383 ProviderMapping(
384 item_id=str(track_id),
385 provider_domain=self.domain,
386 provider_instance=self.instance_id,
387 available=True,
388 )
389 },
390 )
391
392 if artist:
393 track.artists.append(artist)
394 if (
395 track_obj.get("artist")
396 and isinstance(track_obj.get("artist"), dict)
397 and track_obj["artist"].get("id")
398 ):
399 track.artists.append(
400 self._get_item_mapping(
401 MediaType.ARTIST, track_obj["artist"]["id"], track_obj["artist"]["name"]
402 )
403 )
404 if not track.artists:
405 msg = "Track is missing artists"
406 raise InvalidDataError(msg)
407
408 if album:
409 track.album = album
410 if image_url is None and track_obj.get("image"):
411 image_url = track_obj["image"]["url"]
412 if image_url:
413 track.metadata.add_image(
414 MediaItemImage(
415 type=ImageType.THUMB,
416 path=image_url,
417 provider=self.instance_id,
418 remotely_accessible=True,
419 )
420 )
421 duration = track_obj.get("durationSeconds") or track_obj.get("totalRunningTime")
422 if duration:
423 track.duration = int(duration)
424 return track
425
426 async def _get_stream_url(self, item_id: str) -> Any:
427 subscription_info = await self._get_data("subscription", "")
428 dt_start = datetime.strptime(subscription_info["startedAt"], "%m/%d/%Y %H:%M:%S").replace(
429 tzinfo=UTC
430 )
431 dt_end = datetime.strptime(subscription_info["endsAt"], "%m/%d/%Y %H:%M:%S").replace(
432 tzinfo=UTC
433 )
434 user_info = await self._get_data("user", "")
435 url = "https://streamapi.nugs.net/bigriver/subplayer.aspx"
436 timeout = ClientTimeout(total=120)
437 params = {
438 "platformID": -1,
439 "app": 1,
440 "HLS": 1,
441 "orgn": "websdk",
442 "method": "subPlayer",
443 "trackId": item_id,
444 "subCostplanIDAccessList": subscription_info["plan"]["id"],
445 "startDateStamp": int(dt_start.timestamp()),
446 "endDateStamp": int(dt_end.timestamp()),
447 "nn_userID": user_info["userId"],
448 "subscriptionID": subscription_info["legacySubscriptionId"],
449 }
450 async with (
451 self.mass.http_session.get(url, params=params, ssl=True, timeout=timeout) as response,
452 ):
453 response.raise_for_status()
454 content = await response.text()
455 stream = json_loads(content)
456 if not stream.get("streamLink"):
457 raise MediaNotFoundError("No stream found for song %s.", item_id)
458 return stream["streamLink"]
459
460 def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
461 return ItemMapping(
462 media_type=media_type,
463 item_id=key,
464 provider=self.instance_id,
465 name=name,
466 )
467
468 async def login(self) -> Any:
469 """Login to nugs.net and return the token."""
470 if self._auth_token and (self._token_expiry > time()):
471 return self._auth_token
472 if not self.config.get_value(CONF_USERNAME) or not self.config.get_value(CONF_PASSWORD):
473 msg = "Invalid login credentials"
474 raise LoginFailed(msg)
475 login_data = {
476 "username": self.config.get_value(CONF_USERNAME),
477 "password": self.config.get_value(CONF_PASSWORD),
478 "scope": "offline_access nugsnet:api nugsnet:legacyapi openid profile email",
479 "grant_type": "password",
480 "client_id": "Eg7HuH873H65r5rt325UytR5429",
481 }
482 token = None
483 url = "https://id.nugs.net/connect/token"
484 timeout = ClientTimeout(total=120)
485 async with (
486 self.mass.http_session.post(
487 url, data=login_data, ssl=True, timeout=timeout
488 ) as response,
489 ):
490 # Handle errors
491 if response.status == 401:
492 raise LoginFailed("Invalid Nugs.net username or password")
493 # handle temporary server error
494 if response.status in (502, 503):
495 raise ResourceTemporarilyUnavailable(backoff_time=30)
496 response.raise_for_status()
497 token = await response.json()
498 self._auth_token = token["access_token"]
499 self._token_expiry = time() + token["expires_in"]
500 return token["access_token"]
501
502 async def _get_data(self, nugs_api: str, endpoint: str, **kwargs: Any) -> Any:
503 """Return the requested data from one of various nugs.net API."""
504 headers = {}
505 url: str | None = None
506 timeout = ClientTimeout(total=120)
507 tokeninfo = kwargs.pop("tokeninfo", None)
508 if tokeninfo is None:
509 tokeninfo = await self.login()
510 headers = {"Authorization": f"Bearer {tokeninfo}"}
511 if nugs_api == "catalog":
512 url = f"https://catalog.nugs.net/api/v1/{endpoint}"
513 if nugs_api == "stash":
514 url = f"https://stash.nugs.net/api/v1/me/{endpoint}"
515 if nugs_api == "subscription":
516 url = "https://subscriptions.nugs.net/api/v1/me/subscriptions"
517 if nugs_api == "user":
518 url = "https://stash.nugs.net/api/v1/stash"
519 if not url:
520 raise MediaNotFoundError(f"{nugs_api} not found")
521 async with (
522 self.mass.http_session.get(
523 url, headers=headers, params=kwargs, ssl=True, timeout=timeout
524 ) as response,
525 ):
526 if response.status == 404:
527 raise MediaNotFoundError(f"{url} not found")
528 response.raise_for_status()
529 return await response.json()
530
531 async def _get_all_items(
532 self, nugs_api: str, endpoint: str, **kwargs: Any
533 ) -> list[dict[str, Any]]:
534 limit = 100
535 offset = 0
536 total = 0
537 all_items = []
538 while True:
539 kwargs["limit"] = limit
540 kwargs["offset"] = offset
541 result = await self._get_data(nugs_api, endpoint, **kwargs)
542 total = result["total"]
543 all_items += result["items"]
544 if total <= offset + limit:
545 break
546 offset += limit
547 return all_items
548