/
/
/
1"""Qobuz musicprovider support for MusicAssistant."""
2
3from __future__ import annotations
4
5import datetime
6import hashlib
7import time
8from contextlib import suppress
9from typing import TYPE_CHECKING, Any, cast
10
11from aiohttp import client_exceptions
12from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
13from music_assistant_models.enums import (
14 AlbumType,
15 ConfigEntryType,
16 ContentType,
17 ExternalID,
18 ImageType,
19 MediaType,
20 ProviderFeature,
21 StreamType,
22)
23from music_assistant_models.errors import (
24 InvalidDataError,
25 LoginFailed,
26 MediaNotFoundError,
27 ResourceTemporarilyUnavailable,
28)
29from music_assistant_models.media_items import (
30 Album,
31 Artist,
32 AudioFormat,
33 MediaItemImage,
34 MediaItemType,
35 Playlist,
36 ProviderMapping,
37 SearchResults,
38 Track,
39)
40from music_assistant_models.streamdetails import StreamDetails
41
42from music_assistant.constants import (
43 CONF_PASSWORD,
44 CONF_USERNAME,
45 VARIOUS_ARTISTS_MBID,
46 VARIOUS_ARTISTS_NAME,
47)
48from music_assistant.controllers.cache import use_cache
49from music_assistant.helpers.app_vars import app_var # type: ignore[attr-defined]
50from music_assistant.helpers.json import json_loads
51from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
52from music_assistant.helpers.util import (
53 infer_album_type,
54 lock,
55 parse_title_and_version,
56 try_parse_int,
57)
58from music_assistant.models.music_provider import MusicProvider
59
60if TYPE_CHECKING:
61 from collections.abc import AsyncGenerator
62
63 from music_assistant_models.config_entries import ProviderConfig
64 from music_assistant_models.provider import ProviderManifest
65
66 from music_assistant import MusicAssistant
67 from music_assistant.models import ProviderInstanceType
68
69
70SUPPORTED_FEATURES = {
71 ProviderFeature.LIBRARY_ARTISTS,
72 ProviderFeature.LIBRARY_ALBUMS,
73 ProviderFeature.LIBRARY_TRACKS,
74 ProviderFeature.LIBRARY_PLAYLISTS,
75 ProviderFeature.LIBRARY_ARTISTS_EDIT,
76 ProviderFeature.LIBRARY_ALBUMS_EDIT,
77 ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
78 ProviderFeature.LIBRARY_TRACKS_EDIT,
79 ProviderFeature.PLAYLIST_TRACKS_EDIT,
80 ProviderFeature.PLAYLIST_CREATE,
81 ProviderFeature.BROWSE,
82 ProviderFeature.SEARCH,
83 ProviderFeature.ARTIST_ALBUMS,
84 ProviderFeature.ARTIST_TOPTRACKS,
85}
86
87VARIOUS_ARTISTS_ID = "145383"
88
89CONF_QUALITY = "quality"
90
91
92async def setup(
93 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
94) -> ProviderInstanceType:
95 """Initialize provider(instance) with given configuration."""
96 return QobuzProvider(mass, manifest, config, SUPPORTED_FEATURES)
97
98
99async def get_config_entries(
100 mass: MusicAssistant,
101 instance_id: str | None = None,
102 action: str | None = None,
103 values: dict[str, ConfigValueType] | None = None,
104) -> tuple[ConfigEntry, ...]:
105 """
106 Return Config entries to setup this provider.
107
108 instance_id: id of an existing provider instance (None if new instance setup).
109 action: [optional] action key called from config entries UI.
110 values: the (intermediate) raw values for config entries sent with the action.
111 """
112 # ruff: noqa: ARG001
113 return (
114 ConfigEntry(
115 key=CONF_USERNAME,
116 type=ConfigEntryType.STRING,
117 label="Username",
118 required=True,
119 ),
120 ConfigEntry(
121 key=CONF_PASSWORD,
122 type=ConfigEntryType.SECURE_STRING,
123 label="Password",
124 required=True,
125 ),
126 ConfigEntry(
127 key=CONF_QUALITY,
128 type=ConfigEntryType.STRING,
129 label="Stream Quality",
130 description="Maximum streaming quality. Lower quality will be used "
131 "if selected quality is unavailable.",
132 default_value="27",
133 options=[
134 ConfigValueOption("Hi-Res 192kHz/24 bit", "27"),
135 ConfigValueOption("Hi-Res 96kHz/24 bit", "7"),
136 ConfigValueOption("CD Quality 44.1kHz/16 bit", "6"),
137 ConfigValueOption("MP3 320kbps", "5"),
138 ],
139 ),
140 )
141
142
143class QobuzProvider(MusicProvider):
144 """Provider for the Qobux music service."""
145
146 _user_auth_info: dict[str, Any] | None = None
147 # rate limiter needs to be specified on provider-level,
148 # so make it an instance attribute
149 throttler = ThrottlerManager(rate_limit=1, period=2)
150
151 async def handle_async_init(self) -> None:
152 """Handle async initialization of the provider."""
153 if not self.config.get_value(CONF_USERNAME) or not self.config.get_value(CONF_PASSWORD):
154 msg = "Invalid login credentials"
155 raise LoginFailed(msg)
156 # try to get a token, raise if that fails
157 token = await self._auth_token()
158 if not token:
159 msg = f"Login failed for user {self.config.get_value(CONF_USERNAME)}"
160 raise LoginFailed(msg)
161
162 @use_cache(3600 * 24 * 14) # Cache for 14 days
163 async def search(
164 self, search_query: str, media_types: list[MediaType], limit: int = 5
165 ) -> SearchResults:
166 """Perform search on musicprovider.
167
168 :param search_query: Search query.
169 :param media_types: A list of media_types to include. All types if None.
170 :param limit: Number of items to return in the search (per type).
171 """
172 result = SearchResults()
173 media_types = [
174 x
175 for x in media_types
176 if x in (MediaType.ARTIST, MediaType.ALBUM, MediaType.TRACK, MediaType.PLAYLIST)
177 ]
178 if not media_types:
179 return result
180 params: dict[str, Any] = {"query": search_query, "limit": limit}
181 if len(media_types) == 1:
182 # qobuz does not support multiple searchtypes, falls back to all if no type given
183 if media_types[0] == MediaType.ARTIST:
184 params["type"] = "artists"
185 if media_types[0] == MediaType.ALBUM:
186 params["type"] = "albums"
187 if media_types[0] == MediaType.TRACK:
188 params["type"] = "tracks"
189 if media_types[0] == MediaType.PLAYLIST:
190 params["type"] = "playlists"
191 if searchresult := await self._get_data("catalog/search", **params):
192 if "artists" in searchresult and MediaType.ARTIST in media_types:
193 result.artists = [
194 self._parse_artist(item)
195 for item in searchresult["artists"]["items"]
196 if (item and item["id"])
197 ]
198 if "albums" in searchresult and MediaType.ALBUM in media_types:
199 result.albums = [
200 await self._parse_album(item)
201 for item in searchresult["albums"]["items"]
202 if (item and item["id"])
203 ]
204 if "tracks" in searchresult and MediaType.TRACK in media_types:
205 result.tracks = [
206 await self._parse_track(item)
207 for item in searchresult["tracks"]["items"]
208 if (item and item["id"])
209 ]
210 if "playlists" in searchresult and MediaType.PLAYLIST in media_types:
211 result.playlists = [
212 self._parse_playlist(item)
213 for item in searchresult["playlists"]["items"]
214 if (item and item["id"])
215 ]
216 return result
217
218 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
219 """Retrieve all library artists from Qobuz."""
220 endpoint = "favorite/getUserFavorites"
221 for item in await self._get_all_items(endpoint, key="artists", type="artists"):
222 if item and item["id"]:
223 yield self._parse_artist(item)
224
225 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
226 """Retrieve all library albums from Qobuz."""
227 endpoint = "favorite/getUserFavorites"
228 for item in await self._get_all_items(endpoint, key="albums", type="albums"):
229 if item and item["id"]:
230 yield await self._parse_album(item)
231
232 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
233 """Retrieve library tracks from Qobuz."""
234 endpoint = "favorite/getUserFavorites"
235 for item in await self._get_all_items(endpoint, key="tracks", type="tracks"):
236 if item and item["id"]:
237 yield await self._parse_track(item)
238
239 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
240 """Retrieve all library playlists from the provider."""
241 endpoint = "playlist/getUserPlaylists"
242 for item in await self._get_all_items(endpoint, key="playlists"):
243 if item and item["id"]:
244 yield self._parse_playlist(item)
245
246 @use_cache(3600 * 24 * 30) # Cache for 30 days
247 async def get_artist(self, prov_artist_id: str) -> Artist:
248 """Get full artist details by id."""
249 params: dict[str, Any] = {"artist_id": prov_artist_id}
250 artist_obj = await self._get_data("artist/get", **params)
251 if artist_obj and artist_obj.get("id"):
252 return self._parse_artist(artist_obj)
253 msg = f"Item {prov_artist_id} not found"
254 raise MediaNotFoundError(msg)
255
256 @use_cache(3600 * 24 * 30) # Cache for 30 days
257 async def get_album(self, prov_album_id: str) -> Album:
258 """Get full album details by id."""
259 params: dict[str, Any] = {"album_id": prov_album_id}
260 album_obj = await self._get_data("album/get", **params)
261 if album_obj and album_obj.get("id"):
262 return await self._parse_album(album_obj)
263 msg = f"Item {prov_album_id} not found"
264 raise MediaNotFoundError(msg)
265
266 @use_cache(3600 * 24 * 30) # Cache for 30 days
267 async def get_track(self, prov_track_id: str) -> Track:
268 """Get full track details by id."""
269 params: dict[str, Any] = {"track_id": prov_track_id}
270 track_obj = await self._get_data("track/get", **params)
271 if track_obj and track_obj.get("id"):
272 return await self._parse_track(track_obj)
273 msg = f"Item {prov_track_id} not found"
274 raise MediaNotFoundError(msg)
275
276 @use_cache(3600 * 24 * 30) # Cache for 30 days
277 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
278 """Get full playlist details by id."""
279 params: dict[str, Any] = {"playlist_id": prov_playlist_id}
280 playlist_obj = await self._get_data("playlist/get", **params)
281 if playlist_obj and playlist_obj.get("id"):
282 return self._parse_playlist(playlist_obj)
283 msg = f"Item {prov_playlist_id} not found"
284 raise MediaNotFoundError(msg)
285
286 async def create_playlist(self, name: str) -> Playlist:
287 """Create a new playlist on Qobuz with the given name."""
288 playlist_obj = await self._get_data(
289 "playlist/create",
290 name=name,
291 description="",
292 is_public=0,
293 is_collaborative=0,
294 )
295 if not playlist_obj or not playlist_obj.get("id"):
296 msg = f"Failed to create playlist: {name}"
297 raise InvalidDataError(msg)
298 return self._parse_playlist(playlist_obj)
299
300 @use_cache(3600 * 24 * 30) # Cache for 30 days
301 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
302 """Get all album tracks for given album id."""
303 params = {"album_id": prov_album_id}
304 return [
305 await self._parse_track(item)
306 for item in await self._get_all_items("album/get", **params, key="tracks")
307 if (item and item["id"])
308 ]
309
310 @use_cache(3600 * 3) # Cache for 3 hours
311 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
312 """Get playlist tracks."""
313 result: list[Track] = []
314 page_size = 100
315 offset = page * page_size
316 qobuz_result = await self._get_data(
317 "playlist/get",
318 key="tracks",
319 playlist_id=prov_playlist_id,
320 extra="tracks",
321 offset=offset,
322 limit=page_size,
323 )
324 if not qobuz_result:
325 return result
326
327 for index, track_obj in enumerate(qobuz_result["tracks"]["items"], 1):
328 if not (track_obj and track_obj["id"]):
329 continue
330 track = await self._parse_track(track_obj)
331 track.position = index + offset
332 result.append(track)
333 return result
334
335 @use_cache(3600 * 24 * 14) # Cache for 14 days
336 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
337 """Get a list of albums for the given artist."""
338 result = await self._get_data(
339 "artist/get",
340 artist_id=prov_artist_id,
341 extra="albums",
342 offset=0,
343 limit=100,
344 )
345 if not result:
346 return []
347 return [
348 await self._parse_album(item)
349 for item in result["albums"]["items"]
350 if (item and item["id"] and str(item["artist"]["id"]) == prov_artist_id)
351 ]
352
353 @use_cache(3600 * 24 * 14) # Cache for 14 days
354 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
355 """Get a list of most popular tracks for the given artist."""
356 result = await self._get_data(
357 "artist/get",
358 artist_id=prov_artist_id,
359 extra="playlists",
360 offset=0,
361 limit=25,
362 )
363 if result and result.get("playlists"):
364 return [
365 await self._parse_track(item)
366 for item in result["playlists"][0]["tracks"]["items"]
367 if (item and item["id"])
368 ]
369 # fallback to search
370 artist = await self.get_artist(prov_artist_id)
371 searchresult = await self._get_data(
372 "catalog/search", query=artist.name, limit=25, type="tracks"
373 )
374 if not searchresult:
375 return []
376
377 return [
378 await self._parse_track(item)
379 for item in searchresult["tracks"]["items"]
380 if (
381 item
382 and item["id"]
383 and "performer" in item
384 and str(item["performer"]["id"]) == str(prov_artist_id)
385 )
386 ]
387
388 async def get_similar_artists(self, prov_artist_id: str) -> None:
389 """Get similar artists for given artist."""
390 # https://www.qobuz.com/api.json/0.2/artist/getSimilarArtists?artist_id=220020&offset=0&limit=3
391
392 async def library_add(self, item: MediaItemType) -> bool:
393 """Add item to library."""
394 result = None
395 if item.media_type == MediaType.ARTIST:
396 result = await self._get_data("favorite/create", artist_id=item.item_id)
397 elif item.media_type == MediaType.ALBUM:
398 result = await self._get_data("favorite/create", album_ids=item.item_id)
399 elif item.media_type == MediaType.TRACK:
400 result = await self._get_data("favorite/create", track_ids=item.item_id)
401 elif item.media_type == MediaType.PLAYLIST:
402 result = await self._get_data("playlist/subscribe", playlist_id=item.item_id)
403 return result is not None
404
405 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
406 """Remove item from library."""
407 result = None
408 if media_type == MediaType.ARTIST:
409 result = await self._get_data("favorite/delete", artist_ids=prov_item_id)
410 elif media_type == MediaType.ALBUM:
411 result = await self._get_data("favorite/delete", album_ids=prov_item_id)
412 elif media_type == MediaType.TRACK:
413 result = await self._get_data("favorite/delete", track_ids=prov_item_id)
414 elif media_type == MediaType.PLAYLIST:
415 playlist = await self.get_playlist(prov_item_id)
416 if playlist.is_editable:
417 result = await self._get_data("playlist/delete", playlist_id=prov_item_id)
418 else:
419 result = await self._get_data("playlist/unsubscribe", playlist_id=prov_item_id)
420 return result is not None
421
422 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
423 """Add track(s) to playlist."""
424 await self._get_data(
425 "playlist/addTracks",
426 playlist_id=prov_playlist_id,
427 track_ids=",".join(prov_track_ids),
428 playlist_track_ids=",".join(prov_track_ids),
429 )
430
431 async def remove_playlist_tracks(
432 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
433 ) -> None:
434 """Remove track(s) from playlist."""
435 playlist_track_ids = set()
436 for pos in positions_to_remove:
437 idx = pos - 1
438 qobuz_result = await self._get_data(
439 "playlist/get",
440 key="tracks",
441 playlist_id=prov_playlist_id,
442 extra="tracks",
443 offset=idx,
444 limit=1,
445 )
446 if not qobuz_result:
447 continue
448 playlist_track_id = qobuz_result["tracks"]["items"][0]["playlist_track_id"]
449 playlist_track_ids.add(str(playlist_track_id))
450
451 await self._get_data(
452 "playlist/deleteTracks",
453 playlist_id=prov_playlist_id,
454 playlist_track_ids=",".join(playlist_track_ids),
455 )
456
457 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
458 """Return the content details for the given track when it will be streamed."""
459 max_quality = int(cast("str", self.config.get_value(CONF_QUALITY)) or "27")
460 # Quality order from highest to lowest
461 quality_order = [27, 7, 6, 5]
462 # Only try qualities up to the user's maximum setting
463 allowed_qualities = [q for q in quality_order if q <= max_quality]
464
465 streamdata: dict[str, Any] | None = None
466 for format_id in allowed_qualities:
467 # it seems that simply requesting for highest available quality does not work
468 # from time to time the api response is empty for this request ?!
469 result = await self._get_data(
470 "track/getFileUrl",
471 sign_request=True,
472 format_id=format_id,
473 track_id=item_id,
474 intent="stream",
475 )
476 if result and result.get("url"):
477 streamdata = result
478 break
479 if not streamdata:
480 msg = f"Unable to retrieve stream details for {item_id}"
481 raise MediaNotFoundError(msg)
482 if streamdata["mime_type"] == "audio/mpeg":
483 content_type = ContentType.MPEG
484 elif streamdata["mime_type"] == "audio/flac":
485 content_type = ContentType.FLAC
486 else:
487 msg = f"Unsupported mime type for {item_id}"
488 raise MediaNotFoundError(msg)
489 self.mass.create_task(self._report_playback_started(streamdata))
490 return StreamDetails(
491 item_id=str(item_id),
492 provider=self.instance_id,
493 audio_format=AudioFormat(
494 content_type=content_type,
495 sample_rate=int(streamdata["sampling_rate"] * 1000),
496 bit_depth=streamdata["bit_depth"],
497 ),
498 stream_type=StreamType.HTTP,
499 duration=streamdata["duration"],
500 data=streamdata, # we need these details for reporting playback
501 path=streamdata["url"],
502 can_seek=True,
503 allow_seek=True,
504 )
505
506 async def _report_playback_started(self, streamdata: dict[str, Any]) -> None:
507 """Report playback start to qobuz."""
508 # TODO: need to figure out if the streamed track is purchased by user
509 # https://www.qobuz.com/api.json/0.2/purchase/getUserPurchasesIds?limit=5000&user_id=xxxxxxx
510 # {"albums":{"total":0,"items":[]},
511 # "tracks":{"total":0,"items":[]},"user":{"id":xxxx,"login":"xxxxx"}}
512 assert self._user_auth_info is not None # for type checking
513 device_id = self._user_auth_info["user"]["device"]["id"]
514 credential_id = self._user_auth_info["user"]["credential"]["id"]
515 user_id = self._user_auth_info["user"]["id"]
516 format_id = streamdata["format_id"]
517 timestamp = int(time.time())
518 events = [
519 {
520 "online": True,
521 "sample": False,
522 "intent": "stream",
523 "device_id": device_id,
524 "track_id": streamdata["track_id"],
525 "purchase": False,
526 "date": timestamp,
527 "credential_id": credential_id,
528 "user_id": user_id,
529 "local": False,
530 "format_id": format_id,
531 }
532 ]
533 async with self.throttler.bypass():
534 await self._post_data("track/reportStreamingStart", data=events)
535
536 async def on_streamed(
537 self,
538 streamdetails: StreamDetails,
539 ) -> None:
540 """Handle callback when an item completed streaming."""
541 if self._user_auth_info is None:
542 msg = "User auth info not available"
543 raise LoginFailed(msg)
544 user_id = self._user_auth_info["user"]["id"]
545 async with self.throttler.bypass():
546 await self._get_data(
547 "/track/reportStreamingEnd",
548 user_id=user_id,
549 track_id=str(streamdetails.item_id),
550 duration=try_parse_int(streamdetails.seconds_streamed),
551 )
552
553 def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
554 """Parse qobuz artist object to generic layout."""
555 artist = Artist(
556 item_id=str(artist_obj["id"]),
557 provider=self.domain,
558 name=artist_obj["name"],
559 provider_mappings={
560 ProviderMapping(
561 item_id=str(artist_obj["id"]),
562 provider_domain=self.domain,
563 provider_instance=self.instance_id,
564 url=f"https://open.qobuz.com/artist/{artist_obj['id']}",
565 )
566 },
567 )
568 if artist.item_id == VARIOUS_ARTISTS_ID:
569 artist.mbid = VARIOUS_ARTISTS_MBID
570 artist.name = VARIOUS_ARTISTS_NAME
571 if img := self.__get_image(artist_obj):
572 artist.metadata.add_image(
573 MediaItemImage(
574 type=ImageType.THUMB,
575 path=img,
576 provider=self.instance_id,
577 remotely_accessible=True,
578 )
579 )
580 if artist_obj.get("biography"):
581 artist.metadata.description = artist_obj["biography"].get("content")
582 return artist
583
584 async def _parse_album(
585 self, album_obj: dict[str, Any], artist_obj: dict[str, Any] | None = None
586 ) -> Album:
587 """Parse qobuz album object to generic layout."""
588 if not artist_obj and "artist" not in album_obj:
589 # artist missing in album info, return full abum instead
590 return await self.get_album(album_obj["id"])
591 name, version = parse_title_and_version(album_obj["title"], album_obj.get("version"))
592 album = Album(
593 item_id=str(album_obj["id"]),
594 provider=self.domain,
595 name=name,
596 version=version,
597 provider_mappings={
598 ProviderMapping(
599 item_id=str(album_obj["id"]),
600 provider_domain=self.domain,
601 provider_instance=self.instance_id,
602 available=album_obj["streamable"] and album_obj["displayable"],
603 audio_format=AudioFormat(
604 content_type=ContentType.FLAC,
605 sample_rate=album_obj["maximum_sampling_rate"] * 1000,
606 bit_depth=album_obj["maximum_bit_depth"],
607 ),
608 url=f"https://open.qobuz.com/album/{album_obj['id']}",
609 )
610 },
611 )
612 album.external_ids.add((ExternalID.BARCODE, album_obj["upc"]))
613 album.artists.append(self._parse_artist(artist_obj or album_obj["artist"]))
614 if (
615 album_obj.get("product_type", "") == "single"
616 or album_obj.get("release_type", "") == "single"
617 ):
618 album.album_type = AlbumType.SINGLE
619 elif (
620 album_obj.get("product_type", "") == "compilation" or "Various" in album.artists[0].name
621 ):
622 album.album_type = AlbumType.COMPILATION
623 elif (
624 album_obj.get("product_type", "") == "album"
625 or album_obj.get("release_type", "") == "album"
626 ):
627 album.album_type = AlbumType.ALBUM
628
629 # Try inference - override if it finds something more specific
630 inferred_type = infer_album_type(name, version)
631 if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
632 album.album_type = inferred_type
633
634 if "genre" in album_obj:
635 album.metadata.genres = {album_obj["genre"]["name"]}
636 if img := self.__get_image(album_obj):
637 album.metadata.add_image(
638 MediaItemImage(
639 provider=self.instance_id,
640 type=ImageType.THUMB,
641 path=img,
642 remotely_accessible=True,
643 )
644 )
645 if "label" in album_obj:
646 album.metadata.label = album_obj["label"]["name"]
647 if released_at := album_obj.get("released_at"):
648 with suppress(ValueError):
649 album.year = datetime.datetime.fromtimestamp(released_at).year
650 if album_obj.get("copyright"):
651 album.metadata.copyright = album_obj["copyright"]
652 if album_obj.get("description"):
653 album.metadata.description = album_obj["description"]
654 if album_obj.get("parental_warning"):
655 album.metadata.explicit = True
656 return album
657
658 async def _parse_track(self, track_obj: dict[str, Any]) -> Track:
659 """Parse qobuz track object to generic layout."""
660 name, version = parse_title_and_version(track_obj["title"], track_obj.get("version"))
661 track = Track(
662 item_id=str(track_obj["id"]),
663 provider=self.domain,
664 name=name,
665 version=version,
666 duration=track_obj["duration"],
667 provider_mappings={
668 ProviderMapping(
669 item_id=str(track_obj["id"]),
670 provider_domain=self.domain,
671 provider_instance=self.instance_id,
672 available=track_obj["streamable"] and track_obj["displayable"],
673 audio_format=AudioFormat(
674 content_type=ContentType.FLAC,
675 sample_rate=track_obj["maximum_sampling_rate"] * 1000,
676 bit_depth=track_obj["maximum_bit_depth"],
677 ),
678 url=f"https://open.qobuz.com/track/{track_obj['id']}",
679 )
680 },
681 disc_number=track_obj.get("media_number", 0),
682 track_number=track_obj.get("track_number", 0),
683 )
684 if isrc := track_obj.get("isrc"):
685 track.external_ids.add((ExternalID.ISRC, isrc))
686 if track_obj.get("performer") and "Various " not in track_obj["performer"]:
687 artist = self._parse_artist(track_obj["performer"])
688 if artist:
689 track.artists.append(artist)
690 # try to grab artist from album
691 if not track.artists and (
692 track_obj.get("album")
693 and track_obj["album"].get("artist")
694 and "Various " not in track_obj["album"]["artist"]
695 ):
696 artist = self._parse_artist(track_obj["album"]["artist"])
697 if artist:
698 track.artists.append(artist)
699 if not track.artists:
700 # last resort: parse from performers string
701 for performer_str in track_obj["performers"].split(" - "):
702 role = performer_str.split(", ")[1]
703 name = performer_str.split(", ")[0]
704 if "artist" in role.lower():
705 artist = Artist(
706 item_id=name,
707 provider=self.domain,
708 name=name,
709 provider_mappings={
710 ProviderMapping(
711 item_id=name,
712 provider_domain=self.domain,
713 provider_instance=self.instance_id,
714 )
715 },
716 )
717 track.artists.append(artist)
718 # TODO: fix grabbing composer from details
719
720 if "album" in track_obj:
721 album = await self._parse_album(track_obj["album"])
722 if album:
723 track.album = album
724 if track_obj.get("performers"):
725 track.metadata.performers = {x.strip() for x in track_obj["performers"].split("-")}
726 if track_obj.get("copyright"):
727 track.metadata.copyright = track_obj["copyright"]
728 if track_obj.get("parental_warning"):
729 track.metadata.explicit = True
730 if img := self.__get_image(track_obj):
731 track.metadata.add_image(
732 MediaItemImage(
733 type=ImageType.THUMB,
734 path=img,
735 provider=self.instance_id,
736 remotely_accessible=True,
737 )
738 )
739 return track
740
741 def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
742 """Parse qobuz playlist object to generic layout."""
743 if self._user_auth_info is None:
744 msg = "User auth info not available"
745 raise LoginFailed(msg)
746
747 is_editable = (
748 playlist_obj["owner"]["id"] == self._user_auth_info["user"]["id"]
749 or playlist_obj["is_collaborative"]
750 )
751 playlist = Playlist(
752 item_id=str(playlist_obj["id"]),
753 provider=self.instance_id,
754 name=playlist_obj["name"],
755 owner=playlist_obj["owner"]["name"],
756 provider_mappings={
757 ProviderMapping(
758 item_id=str(playlist_obj["id"]),
759 provider_domain=self.domain,
760 provider_instance=self.instance_id,
761 url=f"https://open.qobuz.com/playlist/{playlist_obj['id']}",
762 is_unique=is_editable, # user-owned playlists are unique
763 )
764 },
765 is_editable=is_editable,
766 )
767 if img := self.__get_image(playlist_obj):
768 playlist.metadata.add_image(
769 MediaItemImage(
770 type=ImageType.THUMB,
771 path=img,
772 provider=self.instance_id,
773 remotely_accessible=True,
774 )
775 )
776 return playlist
777
778 @lock
779 async def _auth_token(self) -> str | None:
780 """Login to qobuz and store the token."""
781 if self._user_auth_info:
782 return str(self._user_auth_info["user_auth_token"])
783 params: dict[str, Any] = {
784 "username": self.config.get_value(CONF_USERNAME),
785 "password": self.config.get_value(CONF_PASSWORD),
786 "device_manufacturer_id": "music_assistant",
787 }
788 details = await self._get_data("user/login", **params)
789 if details and "user" in details:
790 self._user_auth_info = details
791 self.logger.info(
792 "Successfully logged in to Qobuz as %s", details["user"]["display_name"]
793 )
794 self.mass.metadata.set_default_preferred_language(details["user"]["country_code"])
795 return str(details["user_auth_token"])
796 return None
797
798 async def _get_all_items(
799 self, endpoint: str, key: str = "tracks", **kwargs: Any
800 ) -> list[dict[str, Any]]:
801 """Get all items from a paged list."""
802 limit = 50
803 offset = 0
804 all_items: list[dict[str, Any]] = []
805 while True:
806 kwargs["limit"] = limit
807 kwargs["offset"] = offset
808 result = await self._get_data(endpoint, **kwargs)
809 offset += limit
810 if not result:
811 break
812 if not result.get(key) or not result[key].get("items"):
813 break
814 for item in result[key]["items"]:
815 all_items.append(item)
816 if len(result[key]["items"]) < limit:
817 break
818 return all_items
819
820 @throttle_with_retries
821 async def _get_data(
822 self, endpoint: str, sign_request: bool = False, **kwargs: Any
823 ) -> dict[str, Any] | None:
824 """Get data from api."""
825 self.logger.debug("Handling GET request to %s", endpoint)
826 url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
827 headers = {"X-App-Id": app_var(0)}
828 locale = self.mass.metadata.locale.replace("_", "-")
829 language = locale.split("-")[0]
830 headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
831 if endpoint != "user/login":
832 auth_token = await self._auth_token()
833 if not auth_token:
834 self.logger.debug("Not logged in")
835 return None
836 headers["X-User-Auth-Token"] = auth_token
837 if sign_request:
838 signing_data = "".join(endpoint.split("/"))
839 keys = list(kwargs.keys())
840 keys.sort()
841 for key in keys:
842 signing_data += f"{key}{kwargs[key]}"
843 request_ts = str(time.time())
844 request_sig = signing_data + request_ts + app_var(1)
845 request_sig = str(hashlib.md5(request_sig.encode()).hexdigest())
846 kwargs["request_ts"] = request_ts
847 kwargs["request_sig"] = request_sig
848 kwargs["app_id"] = app_var(0)
849 kwargs["user_auth_token"] = await self._auth_token()
850 async with (
851 self.mass.http_session.get(url, headers=headers, params=kwargs) as response,
852 ):
853 # handle rate limiter
854 if response.status == 429:
855 backoff_time = int(response.headers.get("Retry-After", 0))
856 raise ResourceTemporarilyUnavailable("Rate Limiter", backoff_time=backoff_time)
857 # handle temporary server error
858 if response.status in (502, 503):
859 raise ResourceTemporarilyUnavailable(backoff_time=30)
860 # handle 404 not found, convert to MediaNotFoundError
861 if response.status == 404:
862 raise MediaNotFoundError(f"{endpoint} not found")
863 response.raise_for_status()
864 try:
865 return cast("dict[str, Any]", await response.json(loads=json_loads))
866 except client_exceptions.ContentTypeError as err:
867 text = err.message or await response.text() or err.status
868 msg = f"Error while handling {endpoint}: {text}"
869 raise InvalidDataError(msg)
870
871 @throttle_with_retries
872 async def _post_data(
873 self,
874 endpoint: str,
875 params: dict[str, Any] | None = None,
876 data: dict[str, Any] | list[dict[str, Any]] | None = None,
877 ) -> dict[str, Any]:
878 """Post data to api."""
879 self.logger.debug("Handling POST request to %s", endpoint)
880 if not params:
881 params = {}
882 if not data:
883 data = {}
884 url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
885 params["app_id"] = app_var(0)
886 auth_token = await self._auth_token()
887 if auth_token is None:
888 msg = "Authentication token is required"
889 raise LoginFailed(msg)
890 params["user_auth_token"] = auth_token
891 async with self.mass.http_session.post(
892 url, params=params, json=data, ssl=False
893 ) as response:
894 # handle rate limiter
895 if response.status == 429:
896 backoff_time = int(response.headers.get("Retry-After", 0))
897 raise ResourceTemporarilyUnavailable("Rate Limiter", backoff_time=backoff_time)
898 # handle temporary server error
899 if response.status in (502, 503):
900 raise ResourceTemporarilyUnavailable(backoff_time=30)
901 # handle 404 not found, convert to MediaNotFoundError
902 if response.status == 404:
903 raise MediaNotFoundError(f"{endpoint} not found")
904 response.raise_for_status()
905 return cast("dict[str, Any]", await response.json(loads=json_loads))
906
907 def __get_image(self, obj: dict[str, Any]) -> str | None:
908 """Try to parse image from Qobuz media object."""
909 if obj.get("image"):
910 for key in ["extralarge", "large", "medium", "small"]:
911 if obj["image"].get(key):
912 img_value: str = obj["image"][key]
913 if "2a96cbd8b46e442fc41c2b86b821562f" in img_value:
914 continue
915 return img_value
916 if obj.get("images300"):
917 # playlists seem to use this strange format
918 return str(obj["images300"][0])
919 if obj.get("album"):
920 return self.__get_image(obj["album"])
921 if obj.get("artist"):
922 return self.__get_image(obj["artist"])
923 return None
924