music-assistant-server

36.9 KBPY
__init__.py
36.9 KB924 lines • python
1"""Qobuz musicprovider support for MusicAssistant."""
2
3from __future__ import annotations
4
5import datetime
6import hashlib
7import time
8from contextlib import suppress
9from typing import TYPE_CHECKING, Any, cast
10
11from aiohttp import client_exceptions
12from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
13from music_assistant_models.enums import (
14    AlbumType,
15    ConfigEntryType,
16    ContentType,
17    ExternalID,
18    ImageType,
19    MediaType,
20    ProviderFeature,
21    StreamType,
22)
23from music_assistant_models.errors import (
24    InvalidDataError,
25    LoginFailed,
26    MediaNotFoundError,
27    ResourceTemporarilyUnavailable,
28)
29from music_assistant_models.media_items import (
30    Album,
31    Artist,
32    AudioFormat,
33    MediaItemImage,
34    MediaItemType,
35    Playlist,
36    ProviderMapping,
37    SearchResults,
38    Track,
39)
40from music_assistant_models.streamdetails import StreamDetails
41
42from music_assistant.constants import (
43    CONF_PASSWORD,
44    CONF_USERNAME,
45    VARIOUS_ARTISTS_MBID,
46    VARIOUS_ARTISTS_NAME,
47)
48from music_assistant.controllers.cache import use_cache
49from music_assistant.helpers.app_vars import app_var  # type: ignore[attr-defined]
50from music_assistant.helpers.json import json_loads
51from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
52from music_assistant.helpers.util import (
53    infer_album_type,
54    lock,
55    parse_title_and_version,
56    try_parse_int,
57)
58from music_assistant.models.music_provider import MusicProvider
59
60if TYPE_CHECKING:
61    from collections.abc import AsyncGenerator
62
63    from music_assistant_models.config_entries import ProviderConfig
64    from music_assistant_models.provider import ProviderManifest
65
66    from music_assistant import MusicAssistant
67    from music_assistant.models import ProviderInstanceType
68
69
70SUPPORTED_FEATURES = {
71    ProviderFeature.LIBRARY_ARTISTS,
72    ProviderFeature.LIBRARY_ALBUMS,
73    ProviderFeature.LIBRARY_TRACKS,
74    ProviderFeature.LIBRARY_PLAYLISTS,
75    ProviderFeature.LIBRARY_ARTISTS_EDIT,
76    ProviderFeature.LIBRARY_ALBUMS_EDIT,
77    ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
78    ProviderFeature.LIBRARY_TRACKS_EDIT,
79    ProviderFeature.PLAYLIST_TRACKS_EDIT,
80    ProviderFeature.PLAYLIST_CREATE,
81    ProviderFeature.BROWSE,
82    ProviderFeature.SEARCH,
83    ProviderFeature.ARTIST_ALBUMS,
84    ProviderFeature.ARTIST_TOPTRACKS,
85}
86
87VARIOUS_ARTISTS_ID = "145383"
88
89CONF_QUALITY = "quality"
90
91
92async def setup(
93    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
94) -> ProviderInstanceType:
95    """Initialize provider(instance) with given configuration."""
96    return QobuzProvider(mass, manifest, config, SUPPORTED_FEATURES)
97
98
99async def get_config_entries(
100    mass: MusicAssistant,
101    instance_id: str | None = None,
102    action: str | None = None,
103    values: dict[str, ConfigValueType] | None = None,
104) -> tuple[ConfigEntry, ...]:
105    """
106    Return Config entries to setup this provider.
107
108    instance_id: id of an existing provider instance (None if new instance setup).
109    action: [optional] action key called from config entries UI.
110    values: the (intermediate) raw values for config entries sent with the action.
111    """
112    # ruff: noqa: ARG001
113    return (
114        ConfigEntry(
115            key=CONF_USERNAME,
116            type=ConfigEntryType.STRING,
117            label="Username",
118            required=True,
119        ),
120        ConfigEntry(
121            key=CONF_PASSWORD,
122            type=ConfigEntryType.SECURE_STRING,
123            label="Password",
124            required=True,
125        ),
126        ConfigEntry(
127            key=CONF_QUALITY,
128            type=ConfigEntryType.STRING,
129            label="Stream Quality",
130            description="Maximum streaming quality. Lower quality will be used "
131            "if selected quality is unavailable.",
132            default_value="27",
133            options=[
134                ConfigValueOption("Hi-Res 192kHz/24 bit", "27"),
135                ConfigValueOption("Hi-Res 96kHz/24 bit", "7"),
136                ConfigValueOption("CD Quality 44.1kHz/16 bit", "6"),
137                ConfigValueOption("MP3 320kbps", "5"),
138            ],
139        ),
140    )
141
142
143class QobuzProvider(MusicProvider):
144    """Provider for the Qobux music service."""
145
146    _user_auth_info: dict[str, Any] | None = None
147    # rate limiter needs to be specified on provider-level,
148    # so make it an instance attribute
149    throttler = ThrottlerManager(rate_limit=1, period=2)
150
151    async def handle_async_init(self) -> None:
152        """Handle async initialization of the provider."""
153        if not self.config.get_value(CONF_USERNAME) or not self.config.get_value(CONF_PASSWORD):
154            msg = "Invalid login credentials"
155            raise LoginFailed(msg)
156        # try to get a token, raise if that fails
157        token = await self._auth_token()
158        if not token:
159            msg = f"Login failed for user {self.config.get_value(CONF_USERNAME)}"
160            raise LoginFailed(msg)
161
162    @use_cache(3600 * 24 * 14)  # Cache for 14 days
163    async def search(
164        self, search_query: str, media_types: list[MediaType], limit: int = 5
165    ) -> SearchResults:
166        """Perform search on musicprovider.
167
168        :param search_query: Search query.
169        :param media_types: A list of media_types to include. All types if None.
170        :param limit: Number of items to return in the search (per type).
171        """
172        result = SearchResults()
173        media_types = [
174            x
175            for x in media_types
176            if x in (MediaType.ARTIST, MediaType.ALBUM, MediaType.TRACK, MediaType.PLAYLIST)
177        ]
178        if not media_types:
179            return result
180        params: dict[str, Any] = {"query": search_query, "limit": limit}
181        if len(media_types) == 1:
182            # qobuz does not support multiple searchtypes, falls back to all if no type given
183            if media_types[0] == MediaType.ARTIST:
184                params["type"] = "artists"
185            if media_types[0] == MediaType.ALBUM:
186                params["type"] = "albums"
187            if media_types[0] == MediaType.TRACK:
188                params["type"] = "tracks"
189            if media_types[0] == MediaType.PLAYLIST:
190                params["type"] = "playlists"
191        if searchresult := await self._get_data("catalog/search", **params):
192            if "artists" in searchresult and MediaType.ARTIST in media_types:
193                result.artists = [
194                    self._parse_artist(item)
195                    for item in searchresult["artists"]["items"]
196                    if (item and item["id"])
197                ]
198            if "albums" in searchresult and MediaType.ALBUM in media_types:
199                result.albums = [
200                    await self._parse_album(item)
201                    for item in searchresult["albums"]["items"]
202                    if (item and item["id"])
203                ]
204            if "tracks" in searchresult and MediaType.TRACK in media_types:
205                result.tracks = [
206                    await self._parse_track(item)
207                    for item in searchresult["tracks"]["items"]
208                    if (item and item["id"])
209                ]
210            if "playlists" in searchresult and MediaType.PLAYLIST in media_types:
211                result.playlists = [
212                    self._parse_playlist(item)
213                    for item in searchresult["playlists"]["items"]
214                    if (item and item["id"])
215                ]
216        return result
217
218    async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
219        """Retrieve all library artists from Qobuz."""
220        endpoint = "favorite/getUserFavorites"
221        for item in await self._get_all_items(endpoint, key="artists", type="artists"):
222            if item and item["id"]:
223                yield self._parse_artist(item)
224
225    async def get_library_albums(self) -> AsyncGenerator[Album, None]:
226        """Retrieve all library albums from Qobuz."""
227        endpoint = "favorite/getUserFavorites"
228        for item in await self._get_all_items(endpoint, key="albums", type="albums"):
229            if item and item["id"]:
230                yield await self._parse_album(item)
231
232    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
233        """Retrieve library tracks from Qobuz."""
234        endpoint = "favorite/getUserFavorites"
235        for item in await self._get_all_items(endpoint, key="tracks", type="tracks"):
236            if item and item["id"]:
237                yield await self._parse_track(item)
238
239    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
240        """Retrieve all library playlists from the provider."""
241        endpoint = "playlist/getUserPlaylists"
242        for item in await self._get_all_items(endpoint, key="playlists"):
243            if item and item["id"]:
244                yield self._parse_playlist(item)
245
246    @use_cache(3600 * 24 * 30)  # Cache for 30 days
247    async def get_artist(self, prov_artist_id: str) -> Artist:
248        """Get full artist details by id."""
249        params: dict[str, Any] = {"artist_id": prov_artist_id}
250        artist_obj = await self._get_data("artist/get", **params)
251        if artist_obj and artist_obj.get("id"):
252            return self._parse_artist(artist_obj)
253        msg = f"Item {prov_artist_id} not found"
254        raise MediaNotFoundError(msg)
255
256    @use_cache(3600 * 24 * 30)  # Cache for 30 days
257    async def get_album(self, prov_album_id: str) -> Album:
258        """Get full album details by id."""
259        params: dict[str, Any] = {"album_id": prov_album_id}
260        album_obj = await self._get_data("album/get", **params)
261        if album_obj and album_obj.get("id"):
262            return await self._parse_album(album_obj)
263        msg = f"Item {prov_album_id} not found"
264        raise MediaNotFoundError(msg)
265
266    @use_cache(3600 * 24 * 30)  # Cache for 30 days
267    async def get_track(self, prov_track_id: str) -> Track:
268        """Get full track details by id."""
269        params: dict[str, Any] = {"track_id": prov_track_id}
270        track_obj = await self._get_data("track/get", **params)
271        if track_obj and track_obj.get("id"):
272            return await self._parse_track(track_obj)
273        msg = f"Item {prov_track_id} not found"
274        raise MediaNotFoundError(msg)
275
276    @use_cache(3600 * 24 * 30)  # Cache for 30 days
277    async def get_playlist(self, prov_playlist_id: str) -> Playlist:
278        """Get full playlist details by id."""
279        params: dict[str, Any] = {"playlist_id": prov_playlist_id}
280        playlist_obj = await self._get_data("playlist/get", **params)
281        if playlist_obj and playlist_obj.get("id"):
282            return self._parse_playlist(playlist_obj)
283        msg = f"Item {prov_playlist_id} not found"
284        raise MediaNotFoundError(msg)
285
286    async def create_playlist(self, name: str) -> Playlist:
287        """Create a new playlist on Qobuz with the given name."""
288        playlist_obj = await self._get_data(
289            "playlist/create",
290            name=name,
291            description="",
292            is_public=0,
293            is_collaborative=0,
294        )
295        if not playlist_obj or not playlist_obj.get("id"):
296            msg = f"Failed to create playlist: {name}"
297            raise InvalidDataError(msg)
298        return self._parse_playlist(playlist_obj)
299
300    @use_cache(3600 * 24 * 30)  # Cache for 30 days
301    async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
302        """Get all album tracks for given album id."""
303        params = {"album_id": prov_album_id}
304        return [
305            await self._parse_track(item)
306            for item in await self._get_all_items("album/get", **params, key="tracks")
307            if (item and item["id"])
308        ]
309
310    @use_cache(3600 * 3)  # Cache for 3 hours
311    async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
312        """Get playlist tracks."""
313        result: list[Track] = []
314        page_size = 100
315        offset = page * page_size
316        qobuz_result = await self._get_data(
317            "playlist/get",
318            key="tracks",
319            playlist_id=prov_playlist_id,
320            extra="tracks",
321            offset=offset,
322            limit=page_size,
323        )
324        if not qobuz_result:
325            return result
326
327        for index, track_obj in enumerate(qobuz_result["tracks"]["items"], 1):
328            if not (track_obj and track_obj["id"]):
329                continue
330            track = await self._parse_track(track_obj)
331            track.position = index + offset
332            result.append(track)
333        return result
334
335    @use_cache(3600 * 24 * 14)  # Cache for 14 days
336    async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
337        """Get a list of albums for the given artist."""
338        result = await self._get_data(
339            "artist/get",
340            artist_id=prov_artist_id,
341            extra="albums",
342            offset=0,
343            limit=100,
344        )
345        if not result:
346            return []
347        return [
348            await self._parse_album(item)
349            for item in result["albums"]["items"]
350            if (item and item["id"] and str(item["artist"]["id"]) == prov_artist_id)
351        ]
352
353    @use_cache(3600 * 24 * 14)  # Cache for 14 days
354    async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
355        """Get a list of most popular tracks for the given artist."""
356        result = await self._get_data(
357            "artist/get",
358            artist_id=prov_artist_id,
359            extra="playlists",
360            offset=0,
361            limit=25,
362        )
363        if result and result.get("playlists"):
364            return [
365                await self._parse_track(item)
366                for item in result["playlists"][0]["tracks"]["items"]
367                if (item and item["id"])
368            ]
369        # fallback to search
370        artist = await self.get_artist(prov_artist_id)
371        searchresult = await self._get_data(
372            "catalog/search", query=artist.name, limit=25, type="tracks"
373        )
374        if not searchresult:
375            return []
376
377        return [
378            await self._parse_track(item)
379            for item in searchresult["tracks"]["items"]
380            if (
381                item
382                and item["id"]
383                and "performer" in item
384                and str(item["performer"]["id"]) == str(prov_artist_id)
385            )
386        ]
387
388    async def get_similar_artists(self, prov_artist_id: str) -> None:
389        """Get similar artists for given artist."""
390        # https://www.qobuz.com/api.json/0.2/artist/getSimilarArtists?artist_id=220020&offset=0&limit=3
391
392    async def library_add(self, item: MediaItemType) -> bool:
393        """Add item to library."""
394        result = None
395        if item.media_type == MediaType.ARTIST:
396            result = await self._get_data("favorite/create", artist_id=item.item_id)
397        elif item.media_type == MediaType.ALBUM:
398            result = await self._get_data("favorite/create", album_ids=item.item_id)
399        elif item.media_type == MediaType.TRACK:
400            result = await self._get_data("favorite/create", track_ids=item.item_id)
401        elif item.media_type == MediaType.PLAYLIST:
402            result = await self._get_data("playlist/subscribe", playlist_id=item.item_id)
403        return result is not None
404
405    async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
406        """Remove item from library."""
407        result = None
408        if media_type == MediaType.ARTIST:
409            result = await self._get_data("favorite/delete", artist_ids=prov_item_id)
410        elif media_type == MediaType.ALBUM:
411            result = await self._get_data("favorite/delete", album_ids=prov_item_id)
412        elif media_type == MediaType.TRACK:
413            result = await self._get_data("favorite/delete", track_ids=prov_item_id)
414        elif media_type == MediaType.PLAYLIST:
415            playlist = await self.get_playlist(prov_item_id)
416            if playlist.is_editable:
417                result = await self._get_data("playlist/delete", playlist_id=prov_item_id)
418            else:
419                result = await self._get_data("playlist/unsubscribe", playlist_id=prov_item_id)
420        return result is not None
421
422    async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
423        """Add track(s) to playlist."""
424        await self._get_data(
425            "playlist/addTracks",
426            playlist_id=prov_playlist_id,
427            track_ids=",".join(prov_track_ids),
428            playlist_track_ids=",".join(prov_track_ids),
429        )
430
431    async def remove_playlist_tracks(
432        self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
433    ) -> None:
434        """Remove track(s) from playlist."""
435        playlist_track_ids = set()
436        for pos in positions_to_remove:
437            idx = pos - 1
438            qobuz_result = await self._get_data(
439                "playlist/get",
440                key="tracks",
441                playlist_id=prov_playlist_id,
442                extra="tracks",
443                offset=idx,
444                limit=1,
445            )
446            if not qobuz_result:
447                continue
448            playlist_track_id = qobuz_result["tracks"]["items"][0]["playlist_track_id"]
449            playlist_track_ids.add(str(playlist_track_id))
450
451        await self._get_data(
452            "playlist/deleteTracks",
453            playlist_id=prov_playlist_id,
454            playlist_track_ids=",".join(playlist_track_ids),
455        )
456
457    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
458        """Return the content details for the given track when it will be streamed."""
459        max_quality = int(cast("str", self.config.get_value(CONF_QUALITY)) or "27")
460        # Quality order from highest to lowest
461        quality_order = [27, 7, 6, 5]
462        # Only try qualities up to the user's maximum setting
463        allowed_qualities = [q for q in quality_order if q <= max_quality]
464
465        streamdata: dict[str, Any] | None = None
466        for format_id in allowed_qualities:
467            # it seems that simply requesting for highest available quality does not work
468            # from time to time the api response is empty for this request ?!
469            result = await self._get_data(
470                "track/getFileUrl",
471                sign_request=True,
472                format_id=format_id,
473                track_id=item_id,
474                intent="stream",
475            )
476            if result and result.get("url"):
477                streamdata = result
478                break
479        if not streamdata:
480            msg = f"Unable to retrieve stream details for {item_id}"
481            raise MediaNotFoundError(msg)
482        if streamdata["mime_type"] == "audio/mpeg":
483            content_type = ContentType.MPEG
484        elif streamdata["mime_type"] == "audio/flac":
485            content_type = ContentType.FLAC
486        else:
487            msg = f"Unsupported mime type for {item_id}"
488            raise MediaNotFoundError(msg)
489        self.mass.create_task(self._report_playback_started(streamdata))
490        return StreamDetails(
491            item_id=str(item_id),
492            provider=self.instance_id,
493            audio_format=AudioFormat(
494                content_type=content_type,
495                sample_rate=int(streamdata["sampling_rate"] * 1000),
496                bit_depth=streamdata["bit_depth"],
497            ),
498            stream_type=StreamType.HTTP,
499            duration=streamdata["duration"],
500            data=streamdata,  # we need these details for reporting playback
501            path=streamdata["url"],
502            can_seek=True,
503            allow_seek=True,
504        )
505
506    async def _report_playback_started(self, streamdata: dict[str, Any]) -> None:
507        """Report playback start to qobuz."""
508        # TODO: need to figure out if the streamed track is purchased by user
509        # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
510        # {"albums":{"total":0,"items":[]},
511        # "tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}}
512        assert self._user_auth_info is not None  # for type checking
513        device_id = self._user_auth_info["user"]["device"]["id"]
514        credential_id = self._user_auth_info["user"]["credential"]["id"]
515        user_id = self._user_auth_info["user"]["id"]
516        format_id = streamdata["format_id"]
517        timestamp = int(time.time())
518        events = [
519            {
520                "online": True,
521                "sample": False,
522                "intent": "stream",
523                "device_id": device_id,
524                "track_id": streamdata["track_id"],
525                "purchase": False,
526                "date": timestamp,
527                "credential_id": credential_id,
528                "user_id": user_id,
529                "local": False,
530                "format_id": format_id,
531            }
532        ]
533        async with self.throttler.bypass():
534            await self._post_data("track/reportStreamingStart", data=events)
535
536    async def on_streamed(
537        self,
538        streamdetails: StreamDetails,
539    ) -> None:
540        """Handle callback when an item completed streaming."""
541        if self._user_auth_info is None:
542            msg = "User auth info not available"
543            raise LoginFailed(msg)
544        user_id = self._user_auth_info["user"]["id"]
545        async with self.throttler.bypass():
546            await self._get_data(
547                "/track/reportStreamingEnd",
548                user_id=user_id,
549                track_id=str(streamdetails.item_id),
550                duration=try_parse_int(streamdetails.seconds_streamed),
551            )
552
553    def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
554        """Parse qobuz artist object to generic layout."""
555        artist = Artist(
556            item_id=str(artist_obj["id"]),
557            provider=self.domain,
558            name=artist_obj["name"],
559            provider_mappings={
560                ProviderMapping(
561                    item_id=str(artist_obj["id"]),
562                    provider_domain=self.domain,
563                    provider_instance=self.instance_id,
564                    url=f"https://open.qobuz.com/artist/{artist_obj['id']}",
565                )
566            },
567        )
568        if artist.item_id == VARIOUS_ARTISTS_ID:
569            artist.mbid = VARIOUS_ARTISTS_MBID
570            artist.name = VARIOUS_ARTISTS_NAME
571        if img := self.__get_image(artist_obj):
572            artist.metadata.add_image(
573                MediaItemImage(
574                    type=ImageType.THUMB,
575                    path=img,
576                    provider=self.instance_id,
577                    remotely_accessible=True,
578                )
579            )
580        if artist_obj.get("biography"):
581            artist.metadata.description = artist_obj["biography"].get("content")
582        return artist
583
584    async def _parse_album(
585        self, album_obj: dict[str, Any], artist_obj: dict[str, Any] | None = None
586    ) -> Album:
587        """Parse qobuz album object to generic layout."""
588        if not artist_obj and "artist" not in album_obj:
589            # artist missing in album info, return full abum instead
590            return await self.get_album(album_obj["id"])
591        name, version = parse_title_and_version(album_obj["title"], album_obj.get("version"))
592        album = Album(
593            item_id=str(album_obj["id"]),
594            provider=self.domain,
595            name=name,
596            version=version,
597            provider_mappings={
598                ProviderMapping(
599                    item_id=str(album_obj["id"]),
600                    provider_domain=self.domain,
601                    provider_instance=self.instance_id,
602                    available=album_obj["streamable"] and album_obj["displayable"],
603                    audio_format=AudioFormat(
604                        content_type=ContentType.FLAC,
605                        sample_rate=album_obj["maximum_sampling_rate"] * 1000,
606                        bit_depth=album_obj["maximum_bit_depth"],
607                    ),
608                    url=f"https://open.qobuz.com/album/{album_obj['id']}",
609                )
610            },
611        )
612        album.external_ids.add((ExternalID.BARCODE, album_obj["upc"]))
613        album.artists.append(self._parse_artist(artist_obj or album_obj["artist"]))
614        if (
615            album_obj.get("product_type", "") == "single"
616            or album_obj.get("release_type", "") == "single"
617        ):
618            album.album_type = AlbumType.SINGLE
619        elif (
620            album_obj.get("product_type", "") == "compilation" or "Various" in album.artists[0].name
621        ):
622            album.album_type = AlbumType.COMPILATION
623        elif (
624            album_obj.get("product_type", "") == "album"
625            or album_obj.get("release_type", "") == "album"
626        ):
627            album.album_type = AlbumType.ALBUM
628
629        # Try inference - override if it finds something more specific
630        inferred_type = infer_album_type(name, version)
631        if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
632            album.album_type = inferred_type
633
634        if "genre" in album_obj:
635            album.metadata.genres = {album_obj["genre"]["name"]}
636        if img := self.__get_image(album_obj):
637            album.metadata.add_image(
638                MediaItemImage(
639                    provider=self.instance_id,
640                    type=ImageType.THUMB,
641                    path=img,
642                    remotely_accessible=True,
643                )
644            )
645        if "label" in album_obj:
646            album.metadata.label = album_obj["label"]["name"]
647        if released_at := album_obj.get("released_at"):
648            with suppress(ValueError):
649                album.year = datetime.datetime.fromtimestamp(released_at).year
650        if album_obj.get("copyright"):
651            album.metadata.copyright = album_obj["copyright"]
652        if album_obj.get("description"):
653            album.metadata.description = album_obj["description"]
654        if album_obj.get("parental_warning"):
655            album.metadata.explicit = True
656        return album
657
658    async def _parse_track(self, track_obj: dict[str, Any]) -> Track:
659        """Parse qobuz track object to generic layout."""
660        name, version = parse_title_and_version(track_obj["title"], track_obj.get("version"))
661        track = Track(
662            item_id=str(track_obj["id"]),
663            provider=self.domain,
664            name=name,
665            version=version,
666            duration=track_obj["duration"],
667            provider_mappings={
668                ProviderMapping(
669                    item_id=str(track_obj["id"]),
670                    provider_domain=self.domain,
671                    provider_instance=self.instance_id,
672                    available=track_obj["streamable"] and track_obj["displayable"],
673                    audio_format=AudioFormat(
674                        content_type=ContentType.FLAC,
675                        sample_rate=track_obj["maximum_sampling_rate"] * 1000,
676                        bit_depth=track_obj["maximum_bit_depth"],
677                    ),
678                    url=f"https://open.qobuz.com/track/{track_obj['id']}",
679                )
680            },
681            disc_number=track_obj.get("media_number", 0),
682            track_number=track_obj.get("track_number", 0),
683        )
684        if isrc := track_obj.get("isrc"):
685            track.external_ids.add((ExternalID.ISRC, isrc))
686        if track_obj.get("performer") and "Various " not in track_obj["performer"]:
687            artist = self._parse_artist(track_obj["performer"])
688            if artist:
689                track.artists.append(artist)
690        # try to grab artist from album
691        if not track.artists and (
692            track_obj.get("album")
693            and track_obj["album"].get("artist")
694            and "Various " not in track_obj["album"]["artist"]
695        ):
696            artist = self._parse_artist(track_obj["album"]["artist"])
697            if artist:
698                track.artists.append(artist)
699        if not track.artists:
700            # last resort: parse from performers string
701            for performer_str in track_obj["performers"].split(" - "):
702                role = performer_str.split(", ")[1]
703                name = performer_str.split(", ")[0]
704                if "artist" in role.lower():
705                    artist = Artist(
706                        item_id=name,
707                        provider=self.domain,
708                        name=name,
709                        provider_mappings={
710                            ProviderMapping(
711                                item_id=name,
712                                provider_domain=self.domain,
713                                provider_instance=self.instance_id,
714                            )
715                        },
716                    )
717                track.artists.append(artist)
718        # TODO: fix grabbing composer from details
719
720        if "album" in track_obj:
721            album = await self._parse_album(track_obj["album"])
722            if album:
723                track.album = album
724        if track_obj.get("performers"):
725            track.metadata.performers = {x.strip() for x in track_obj["performers"].split("-")}
726        if track_obj.get("copyright"):
727            track.metadata.copyright = track_obj["copyright"]
728        if track_obj.get("parental_warning"):
729            track.metadata.explicit = True
730        if img := self.__get_image(track_obj):
731            track.metadata.add_image(
732                MediaItemImage(
733                    type=ImageType.THUMB,
734                    path=img,
735                    provider=self.instance_id,
736                    remotely_accessible=True,
737                )
738            )
739        return track
740
741    def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
742        """Parse qobuz playlist object to generic layout."""
743        if self._user_auth_info is None:
744            msg = "User auth info not available"
745            raise LoginFailed(msg)
746
747        is_editable = (
748            playlist_obj["owner"]["id"] == self._user_auth_info["user"]["id"]
749            or playlist_obj["is_collaborative"]
750        )
751        playlist = Playlist(
752            item_id=str(playlist_obj["id"]),
753            provider=self.instance_id,
754            name=playlist_obj["name"],
755            owner=playlist_obj["owner"]["name"],
756            provider_mappings={
757                ProviderMapping(
758                    item_id=str(playlist_obj["id"]),
759                    provider_domain=self.domain,
760                    provider_instance=self.instance_id,
761                    url=f"https://open.qobuz.com/playlist/{playlist_obj['id']}",
762                    is_unique=is_editable,  # user-owned playlists are unique
763                )
764            },
765            is_editable=is_editable,
766        )
767        if img := self.__get_image(playlist_obj):
768            playlist.metadata.add_image(
769                MediaItemImage(
770                    type=ImageType.THUMB,
771                    path=img,
772                    provider=self.instance_id,
773                    remotely_accessible=True,
774                )
775            )
776        return playlist
777
778    @lock
779    async def _auth_token(self) -> str | None:
780        """Login to qobuz and store the token."""
781        if self._user_auth_info:
782            return str(self._user_auth_info["user_auth_token"])
783        params: dict[str, Any] = {
784            "username": self.config.get_value(CONF_USERNAME),
785            "password": self.config.get_value(CONF_PASSWORD),
786            "device_manufacturer_id": "music_assistant",
787        }
788        details = await self._get_data("user/login", **params)
789        if details and "user" in details:
790            self._user_auth_info = details
791            self.logger.info(
792                "Successfully logged in to Qobuz as %s", details["user"]["display_name"]
793            )
794            self.mass.metadata.set_default_preferred_language(details["user"]["country_code"])
795            return str(details["user_auth_token"])
796        return None
797
798    async def _get_all_items(
799        self, endpoint: str, key: str = "tracks", **kwargs: Any
800    ) -> list[dict[str, Any]]:
801        """Get all items from a paged list."""
802        limit = 50
803        offset = 0
804        all_items: list[dict[str, Any]] = []
805        while True:
806            kwargs["limit"] = limit
807            kwargs["offset"] = offset
808            result = await self._get_data(endpoint, **kwargs)
809            offset += limit
810            if not result:
811                break
812            if not result.get(key) or not result[key].get("items"):
813                break
814            for item in result[key]["items"]:
815                all_items.append(item)
816            if len(result[key]["items"]) < limit:
817                break
818        return all_items
819
820    @throttle_with_retries
821    async def _get_data(
822        self, endpoint: str, sign_request: bool = False, **kwargs: Any
823    ) -> dict[str, Any] | None:
824        """Get data from api."""
825        self.logger.debug("Handling GET request to %s", endpoint)
826        url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
827        headers = {"X-App-Id": app_var(0)}
828        locale = self.mass.metadata.locale.replace("_", "-")
829        language = locale.split("-")[0]
830        headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
831        if endpoint != "user/login":
832            auth_token = await self._auth_token()
833            if not auth_token:
834                self.logger.debug("Not logged in")
835                return None
836            headers["X-User-Auth-Token"] = auth_token
837        if sign_request:
838            signing_data = "".join(endpoint.split("/"))
839            keys = list(kwargs.keys())
840            keys.sort()
841            for key in keys:
842                signing_data += f"{key}{kwargs[key]}"
843            request_ts = str(time.time())
844            request_sig = signing_data + request_ts + app_var(1)
845            request_sig = str(hashlib.md5(request_sig.encode()).hexdigest())
846            kwargs["request_ts"] = request_ts
847            kwargs["request_sig"] = request_sig
848            kwargs["app_id"] = app_var(0)
849            kwargs["user_auth_token"] = await self._auth_token()
850        async with (
851            self.mass.http_session.get(url, headers=headers, params=kwargs) as response,
852        ):
853            # handle rate limiter
854            if response.status == 429:
855                backoff_time = int(response.headers.get("Retry-After", 0))
856                raise ResourceTemporarilyUnavailable("Rate Limiter", backoff_time=backoff_time)
857            # handle temporary server error
858            if response.status in (502, 503):
859                raise ResourceTemporarilyUnavailable(backoff_time=30)
860            # handle 404 not found, convert to MediaNotFoundError
861            if response.status == 404:
862                raise MediaNotFoundError(f"{endpoint} not found")
863            response.raise_for_status()
864            try:
865                return cast("dict[str, Any]", await response.json(loads=json_loads))
866            except client_exceptions.ContentTypeError as err:
867                text = err.message or await response.text() or err.status
868                msg = f"Error while handling {endpoint}: {text}"
869                raise InvalidDataError(msg)
870
871    @throttle_with_retries
872    async def _post_data(
873        self,
874        endpoint: str,
875        params: dict[str, Any] | None = None,
876        data: dict[str, Any] | list[dict[str, Any]] | None = None,
877    ) -> dict[str, Any]:
878        """Post data to api."""
879        self.logger.debug("Handling POST request to %s", endpoint)
880        if not params:
881            params = {}
882        if not data:
883            data = {}
884        url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
885        params["app_id"] = app_var(0)
886        auth_token = await self._auth_token()
887        if auth_token is None:
888            msg = "Authentication token is required"
889            raise LoginFailed(msg)
890        params["user_auth_token"] = auth_token
891        async with self.mass.http_session.post(
892            url, params=params, json=data, ssl=False
893        ) as response:
894            # handle rate limiter
895            if response.status == 429:
896                backoff_time = int(response.headers.get("Retry-After", 0))
897                raise ResourceTemporarilyUnavailable("Rate Limiter", backoff_time=backoff_time)
898            # handle temporary server error
899            if response.status in (502, 503):
900                raise ResourceTemporarilyUnavailable(backoff_time=30)
901            # handle 404 not found, convert to MediaNotFoundError
902            if response.status == 404:
903                raise MediaNotFoundError(f"{endpoint} not found")
904            response.raise_for_status()
905            return cast("dict[str, Any]", await response.json(loads=json_loads))
906
907    def __get_image(self, obj: dict[str, Any]) -> str | None:
908        """Try to parse image from Qobuz media object."""
909        if obj.get("image"):
910            for key in ["extralarge", "large", "medium", "small"]:
911                if obj["image"].get(key):
912                    img_value: str = obj["image"][key]
913                    if "2a96cbd8b46e442fc41c2b86b821562f" in img_value:
914                        continue
915                    return img_value
916        if obj.get("images300"):
917            # playlists seem to use this strange format
918            return str(obj["images300"][0])
919        if obj.get("album"):
920            return self.__get_image(obj["album"])
921        if obj.get("artist"):
922            return self.__get_image(obj["artist"])
923        return None
924