music-assistant-server

20.2 KBPY
__init__.py
20.2 KB548 lines • python
1"""Nugs.net musicprovider support for MusicAssistant."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator
6from datetime import UTC, datetime
7from time import time
8from typing import TYPE_CHECKING, Any
9
10from aiohttp import ClientTimeout
11from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
12from music_assistant_models.enums import (
13    ConfigEntryType,
14    ContentType,
15    ImageType,
16    MediaType,
17    ProviderFeature,
18    StreamType,
19)
20from music_assistant_models.errors import (
21    InvalidDataError,
22    LoginFailed,
23    MediaNotFoundError,
24    ResourceTemporarilyUnavailable,
25)
26from music_assistant_models.media_items import (
27    Album,
28    Artist,
29    AudioFormat,
30    ItemMapping,
31    MediaItemImage,
32    MediaItemMetadata,
33    Playlist,
34    ProviderMapping,
35    RecommendationFolder,
36    Track,
37    UniqueList,
38)
39from music_assistant_models.streamdetails import StreamDetails
40
41from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
42from music_assistant.controllers.cache import use_cache
43from music_assistant.helpers.json import json_loads
44from music_assistant.helpers.util import infer_album_type, parse_title_and_version
45from music_assistant.models.music_provider import MusicProvider
46
47if TYPE_CHECKING:
48    from music_assistant_models.config_entries import ProviderConfig
49    from music_assistant_models.provider import ProviderManifest
50
51    from music_assistant.mass import MusicAssistant
52    from music_assistant.models import ProviderInstanceType
53
54SUPPORTED_FEATURES = {
55    ProviderFeature.BROWSE,
56    ProviderFeature.LIBRARY_ARTISTS,
57    ProviderFeature.LIBRARY_ALBUMS,
58    ProviderFeature.LIBRARY_PLAYLISTS,
59    ProviderFeature.ARTIST_ALBUMS,
60    ProviderFeature.RECOMMENDATIONS,
61}
62
63
64async def setup(
65    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
66) -> ProviderInstanceType:
67    """Initialize provider(instance) with given configuration."""
68    return NugsProvider(mass, manifest, config, SUPPORTED_FEATURES)
69
70
71async def get_config_entries(
72    mass: MusicAssistant,
73    instance_id: str | None = None,
74    action: str | None = None,
75    values: dict[str, ConfigValueType] | None = None,
76) -> tuple[ConfigEntry, ...]:
77    """
78    Return Config entries to setup this provider.
79
80    instance_id: id of an existing provider instance (None if new instance setup).
81    action: [optional] action key called from config entries UI.
82    values: the (intermediate) raw values for config entries sent with the action.
83    """
84    # ruff: noqa: ARG001
85    return (
86        ConfigEntry(
87            key=CONF_USERNAME,
88            type=ConfigEntryType.STRING,
89            label="Username",
90            required=True,
91        ),
92        ConfigEntry(
93            key=CONF_PASSWORD,
94            type=ConfigEntryType.SECURE_STRING,
95            label="Password",
96            required=True,
97        ),
98    )
99
100
101class NugsProvider(MusicProvider):
102    """Provider implementation for Nugs.net."""
103
104    _auth_token: str | None = None
105    _token_expiry: float = 0
106
107    async def handle_async_init(self) -> None:
108        """Handle async initialization of the provider."""
109        await self.login()
110
111    async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
112        """Retrieve library artists from nugs.net."""
113        artist_data = await self._get_all_items("stash", "artists/favorite/")
114        for item in artist_data:
115            if item and item["id"]:
116                yield self._parse_artist(item)
117
118    async def get_library_albums(self) -> AsyncGenerator[Album, None]:
119        """Retrieve library albums from the provider."""
120        album_data = await self._get_all_items("stash", "releases/favorite")
121        for item in album_data:
122            if item and item["id"]:
123                yield self._parse_album(item)
124
125    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
126        """Retrieve playlists from the provider."""
127        playlist_data = await self._get_all_items("stash", "playlists/")
128        for item in playlist_data:
129            if item and item["id"]:
130                yield self._parse_playlist(item)
131
132    @use_cache(3600 * 24 * 14)  # Cache for 14 days
133    async def get_artist(self, prov_artist_id: str) -> Artist:
134        """Get artist details by id."""
135        endpoint = f"/releases/recent?limit=1&artistIds={prov_artist_id}"
136        artist_response = await self._get_data("catalog", endpoint)
137        artist_data = artist_response["items"][0]["artist"]
138        return self._parse_artist(artist_data)
139
140    @use_cache(3600 * 24 * 14)  # Cache for 14 days
141    async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
142        """Get a list of all albums for the given artist."""
143        params = {
144            "artistIds": prov_artist_id,
145            "contentType": "any",
146        }
147        return [
148            self._parse_album(item)
149            for item in await self._get_all_items("catalog", "releases/recent", **params)
150            if (item and item["id"])
151        ]
152
153    @use_cache(3600 * 24 * 14)  # Cache for 14 days
154    async def get_album(self, prov_album_id: str) -> Album:
155        """Get album details by id."""
156        endpoint = f"shows/{prov_album_id}"
157        response = await self._get_data("catalog", endpoint)
158        return self._parse_album(response["Response"])
159
160    @use_cache(3600 * 24 * 14)  # Cache for 14 days
161    async def get_playlist(self, prov_playlist_id: str) -> Playlist:
162        """Get full playlist details by id."""
163        endpoint = f"playlists/{prov_playlist_id}"
164        response = await self._get_data("stash", endpoint)
165        return self._parse_playlist(response["items"])
166
167    @use_cache(3600 * 24 * 14)  # Cache for 14 days
168    async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
169        """Get all album tracks for given album id."""
170        endpoint = f"shows/{prov_album_id}"
171        response = await self._get_data("catalog", endpoint)
172        album_data = response["Response"]
173        artist = await self.get_artist(album_data["artistID"])
174        album = self._get_item_mapping(
175            MediaType.ALBUM, album_data["containerID"], album_data["containerInfo"]
176        )
177        image = f"https://api.livedownloads.com{album_data['img']['url']}"
178        return [
179            self._parse_track(item, artist=artist, album=album, image_url=image)
180            for item in album_data["tracks"]
181            if item["trackID"]
182        ]
183
184    @use_cache(3600)  # Cache for 1 hour
185    async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
186        """Get playlist tracks."""
187        result: list[Track] = []
188        if page > 0:
189            # paging not yet supported
190            return []
191        endpoint = f"/playlists/{prov_playlist_id}/playlist-tracks/all"
192        nugs_result = await self._get_data("stash", endpoint)
193        for index, item in enumerate(nugs_result["items"], 1):
194            track = self._parse_track(item)
195            track.position = index
196            result.append(track)
197        return result
198
199    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
200        """Return the content details for the given track when it will be streamed."""
201        stream_url = await self._get_stream_url(item_id)
202        return StreamDetails(
203            item_id=item_id,
204            provider=self.instance_id,
205            audio_format=AudioFormat(
206                content_type=ContentType.UNKNOWN,
207            ),
208            stream_type=StreamType.HTTP,
209            path=stream_url,
210        )
211
212    @use_cache(3600 * 4)  # Cache for 4 hours
213    async def recommendations(self) -> list[RecommendationFolder]:
214        """Get this provider's recommendations."""
215        popular = "releases/popular"
216        recom_shows = "me/releases/recommendations"
217        recent = "releases/recent"
218
219        popular_folder = RecommendationFolder(
220            name="Most Popular",
221            item_id="nugs_popular_shows",
222            provider=self.instance_id,
223        )
224        recommended_folder = RecommendationFolder(
225            name="Recommended Shows",
226            item_id="nugs_recommended_shows",
227            provider=self.instance_id,
228        )
229        recent_folder = RecommendationFolder(
230            name="Recent Shows",
231            item_id="nugs_recent_shows",
232            provider=self.instance_id,
233        )
234        popular_data = await self._get_data("catalog", popular, limit=20)
235        for item in popular_data["items"]:
236            endpoint = f"shows/{item['id']}"
237            response = await self._get_data("catalog", endpoint)
238            popular_folder.items.append(self._parse_album(response["Response"]))
239        recommended_data = await self._get_data("catalog", recom_shows)
240        for item in recommended_data["items"]:
241            recommended_folder.items.append(self._parse_album(item))
242        recent_data = await self._get_data("catalog", recent, limit=50)
243        for item in recent_data["items"]:
244            recent_folder.items.append(self._parse_album(item))
245
246        return [
247            popular_folder,
248            recommended_folder,
249            recent_folder,
250        ]
251
252    def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
253        """Parse nugs artist object to generic layout."""
254        artist_id = artist_obj.get("artistID") or artist_obj.get("id")
255        artist_name = artist_obj.get("artistName") or artist_obj.get("name")
256        artist = Artist(
257            item_id=str(artist_id),
258            provider=self.instance_id,
259            name=str(artist_name),
260            provider_mappings={
261                ProviderMapping(
262                    item_id=str(artist_id),
263                    provider_domain=self.domain,
264                    provider_instance=self.instance_id,
265                    url=f"https://catalog.nugs.net/api/v1/artists?ids={artist_id}",
266                )
267            },
268        )
269        if artist_obj.get("avatarImage"):
270            artist.metadata.add_image(
271                MediaItemImage(
272                    type=ImageType.THUMB,
273                    path=artist_obj["avatarImage"]["url"],
274                    provider=self.instance_id,
275                    remotely_accessible=True,
276                )
277            )
278        return artist
279
280    def _parse_album(self, album_obj: dict[str, Any]) -> Album:
281        """Parse nugs release/show/album object to generic album layout."""
282        item_id = album_obj.get("releaseId") or album_obj.get("id") or album_obj.get("containerID")
283        title = album_obj.get("title") or album_obj.get("containerInfo")
284        name, version = parse_title_and_version(str(title))
285        album = Album(
286            item_id=str(item_id),
287            provider=self.instance_id,
288            name=name,
289            version=version,
290            provider_mappings={
291                ProviderMapping(
292                    item_id=str(item_id),
293                    provider_domain=self.domain,
294                    provider_instance=self.instance_id,
295                )
296            },
297        )
298
299        artist_obj = album_obj.get("artist", False) or {
300            "id": album_obj["artistID"],
301            "name": album_obj["artistName"],
302        }
303        if artist_obj.get("name") and artist_obj.get("id"):
304            album.artists.append(self._parse_artist(artist_obj))
305
306        path: str | None = None
307        if album_obj.get("image"):
308            path = album_obj["image"]["url"]
309        if album_obj.get("img"):
310            path = f"https://api.livedownloads.com{album_obj['img']['url']}"
311        if path:
312            album.metadata.add_image(
313                MediaItemImage(
314                    type=ImageType.THUMB,
315                    path=path,
316                    provider=self.instance_id,
317                    remotely_accessible=True,
318                )
319            )
320        year = album_obj.get("performanceDateYear", False)
321        if not year:
322            date = album_obj.get("performanceDate", False) or album_obj.get(
323                "albumreleaseDate", False
324            )
325            if date:
326                year = date.split("-")[0]
327        if year:
328            album.year = int(year)
329
330        # No album type info in this provider so try and infer it
331        album.album_type = infer_album_type(album.name, album.version)
332
333        return album
334
335    def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
336        """Parse nugs playlist object to generic layout."""
337        return Playlist(
338            item_id=playlist_obj["id"],
339            provider=self.instance_id,
340            name=playlist_obj["name"],
341            provider_mappings={
342                ProviderMapping(
343                    item_id=playlist_obj["id"],
344                    provider_domain=self.domain,
345                    provider_instance=self.instance_id,
346                )
347            },
348            metadata=MediaItemMetadata(
349                images=UniqueList(
350                    [
351                        MediaItemImage(
352                            type=ImageType.THUMB,
353                            path=playlist_obj["imageUrl"],
354                            provider=self.instance_id,
355                            remotely_accessible=True,
356                        )
357                    ]
358                ),
359            ),
360            is_editable=False,
361        )
362
363    def _parse_track(
364        self,
365        track_obj: dict[str, Any],
366        artist: Artist | None = None,
367        album: Album | ItemMapping | None = None,
368        image_url: str | None = None,
369    ) -> Track:
370        """Parse response from inconsistent nugs.net APIs to a Track model object."""
371        track_id = (
372            track_obj.get("trackId") or track_obj.get("trackID") or track_obj.get("trackLabel")
373        )
374        track_name = track_obj.get("name") or track_obj.get("songTitle")
375        name, version = parse_title_and_version(str(track_name))
376
377        track = Track(
378            item_id=str(track_id),
379            provider=self.instance_id,
380            name=name,
381            version=version,
382            provider_mappings={
383                ProviderMapping(
384                    item_id=str(track_id),
385                    provider_domain=self.domain,
386                    provider_instance=self.instance_id,
387                    available=True,
388                )
389            },
390        )
391
392        if artist:
393            track.artists.append(artist)
394        if (
395            track_obj.get("artist")
396            and isinstance(track_obj.get("artist"), dict)
397            and track_obj["artist"].get("id")
398        ):
399            track.artists.append(
400                self._get_item_mapping(
401                    MediaType.ARTIST, track_obj["artist"]["id"], track_obj["artist"]["name"]
402                )
403            )
404        if not track.artists:
405            msg = "Track is missing artists"
406            raise InvalidDataError(msg)
407
408        if album:
409            track.album = album
410        if image_url is None and track_obj.get("image"):
411            image_url = track_obj["image"]["url"]
412        if image_url:
413            track.metadata.add_image(
414                MediaItemImage(
415                    type=ImageType.THUMB,
416                    path=image_url,
417                    provider=self.instance_id,
418                    remotely_accessible=True,
419                )
420            )
421        duration = track_obj.get("durationSeconds") or track_obj.get("totalRunningTime")
422        if duration:
423            track.duration = int(duration)
424        return track
425
426    async def _get_stream_url(self, item_id: str) -> Any:
427        subscription_info = await self._get_data("subscription", "")
428        dt_start = datetime.strptime(subscription_info["startedAt"], "%m/%d/%Y %H:%M:%S").replace(
429            tzinfo=UTC
430        )
431        dt_end = datetime.strptime(subscription_info["endsAt"], "%m/%d/%Y %H:%M:%S").replace(
432            tzinfo=UTC
433        )
434        user_info = await self._get_data("user", "")
435        url = "https://streamapi.nugs.net/bigriver/subplayer.aspx"
436        timeout = ClientTimeout(total=120)
437        params = {
438            "platformID": -1,
439            "app": 1,
440            "HLS": 1,
441            "orgn": "websdk",
442            "method": "subPlayer",
443            "trackId": item_id,
444            "subCostplanIDAccessList": subscription_info["plan"]["id"],
445            "startDateStamp": int(dt_start.timestamp()),
446            "endDateStamp": int(dt_end.timestamp()),
447            "nn_userID": user_info["userId"],
448            "subscriptionID": subscription_info["legacySubscriptionId"],
449        }
450        async with (
451            self.mass.http_session.get(url, params=params, ssl=True, timeout=timeout) as response,
452        ):
453            response.raise_for_status()
454            content = await response.text()
455            stream = json_loads(content)
456            if not stream.get("streamLink"):
457                raise MediaNotFoundError("No stream found for song %s.", item_id)
458            return stream["streamLink"]
459
460    def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
461        return ItemMapping(
462            media_type=media_type,
463            item_id=key,
464            provider=self.instance_id,
465            name=name,
466        )
467
468    async def login(self) -> Any:
469        """Login to nugs.net and return the token."""
470        if self._auth_token and (self._token_expiry > time()):
471            return self._auth_token
472        if not self.config.get_value(CONF_USERNAME) or not self.config.get_value(CONF_PASSWORD):
473            msg = "Invalid login credentials"
474            raise LoginFailed(msg)
475        login_data = {
476            "username": self.config.get_value(CONF_USERNAME),
477            "password": self.config.get_value(CONF_PASSWORD),
478            "scope": "offline_access nugsnet:api nugsnet:legacyapi openid profile email",
479            "grant_type": "password",
480            "client_id": "Eg7HuH873H65r5rt325UytR5429",
481        }
482        token = None
483        url = "https://id.nugs.net/connect/token"
484        timeout = ClientTimeout(total=120)
485        async with (
486            self.mass.http_session.post(
487                url, data=login_data, ssl=True, timeout=timeout
488            ) as response,
489        ):
490            # Handle errors
491            if response.status == 401:
492                raise LoginFailed("Invalid Nugs.net username or password")
493            # handle temporary server error
494            if response.status in (502, 503):
495                raise ResourceTemporarilyUnavailable(backoff_time=30)
496            response.raise_for_status()
497            token = await response.json()
498            self._auth_token = token["access_token"]
499            self._token_expiry = time() + token["expires_in"]
500        return token["access_token"]
501
502    async def _get_data(self, nugs_api: str, endpoint: str, **kwargs: Any) -> Any:
503        """Return the requested data from one of various nugs.net API."""
504        headers = {}
505        url: str | None = None
506        timeout = ClientTimeout(total=120)
507        tokeninfo = kwargs.pop("tokeninfo", None)
508        if tokeninfo is None:
509            tokeninfo = await self.login()
510        headers = {"Authorization": f"Bearer {tokeninfo}"}
511        if nugs_api == "catalog":
512            url = f"https://catalog.nugs.net/api/v1/{endpoint}"
513        if nugs_api == "stash":
514            url = f"https://stash.nugs.net/api/v1/me/{endpoint}"
515        if nugs_api == "subscription":
516            url = "https://subscriptions.nugs.net/api/v1/me/subscriptions"
517        if nugs_api == "user":
518            url = "https://stash.nugs.net/api/v1/stash"
519        if not url:
520            raise MediaNotFoundError(f"{nugs_api} not found")
521        async with (
522            self.mass.http_session.get(
523                url, headers=headers, params=kwargs, ssl=True, timeout=timeout
524            ) as response,
525        ):
526            if response.status == 404:
527                raise MediaNotFoundError(f"{url} not found")
528            response.raise_for_status()
529            return await response.json()
530
531    async def _get_all_items(
532        self, nugs_api: str, endpoint: str, **kwargs: Any
533    ) -> list[dict[str, Any]]:
534        limit = 100
535        offset = 0
536        total = 0
537        all_items = []
538        while True:
539            kwargs["limit"] = limit
540            kwargs["offset"] = offset
541            result = await self._get_data(nugs_api, endpoint, **kwargs)
542            total = result["total"]
543            all_items += result["items"]
544            if total <= offset + limit:
545                break
546            offset += limit
547        return all_items
548