/
/
/
1"""Deezer music provider support for MusicAssistant."""
2
3import hashlib
4import uuid
5from asyncio import TaskGroup
6from collections.abc import AsyncGenerator
7from dataclasses import dataclass
8from math import ceil
9from typing import Any, Literal, cast
10
11import deezer
12from aiohttp import ClientSession, ClientTimeout
13from Crypto.Cipher import Blowfish
14from deezer import exceptions as deezer_exceptions
15from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
16from music_assistant_models.enums import (
17 AlbumType,
18 ConfigEntryType,
19 ContentType,
20 ExternalID,
21 ImageType,
22 MediaType,
23 ProviderFeature,
24 StreamType,
25)
26from music_assistant_models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
27from music_assistant_models.media_items import (
28 Album,
29 Artist,
30 AudioFormat,
31 ItemMapping,
32 MediaItemImage,
33 MediaItemMetadata,
34 MediaItemType,
35 Playlist,
36 ProviderMapping,
37 RecommendationFolder,
38 SearchResults,
39 Track,
40 UniqueList,
41)
42from music_assistant_models.provider import ProviderManifest
43from music_assistant_models.streamdetails import StreamDetails
44
45from music_assistant import MusicAssistant
46from music_assistant.controllers.cache import use_cache
47from music_assistant.helpers.app_vars import app_var # type: ignore[attr-defined]
48from music_assistant.helpers.auth import AuthenticationHelper
49from music_assistant.helpers.datetime import utc_timestamp
50from music_assistant.helpers.util import infer_album_type, parse_title_and_version
51from music_assistant.models import ProviderInstanceType
52from music_assistant.models.music_provider import MusicProvider
53
54from .gw_client import GWClient
55
56SUPPORTED_FEATURES = {
57 ProviderFeature.LIBRARY_ARTISTS,
58 ProviderFeature.LIBRARY_ALBUMS,
59 ProviderFeature.LIBRARY_TRACKS,
60 ProviderFeature.LIBRARY_PLAYLISTS,
61 ProviderFeature.LIBRARY_ALBUMS_EDIT,
62 ProviderFeature.LIBRARY_TRACKS_EDIT,
63 ProviderFeature.LIBRARY_ARTISTS_EDIT,
64 ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
65 ProviderFeature.ALBUM_METADATA,
66 ProviderFeature.TRACK_METADATA,
67 ProviderFeature.ARTIST_METADATA,
68 ProviderFeature.ARTIST_ALBUMS,
69 ProviderFeature.ARTIST_TOPTRACKS,
70 ProviderFeature.BROWSE,
71 ProviderFeature.SEARCH,
72 ProviderFeature.PLAYLIST_TRACKS_EDIT,
73 ProviderFeature.PLAYLIST_CREATE,
74 ProviderFeature.RECOMMENDATIONS,
75 ProviderFeature.SIMILAR_TRACKS,
76}
77
78
79@dataclass
80class DeezerCredentials:
81 """Class for storing credentials."""
82
83 app_id: int
84 app_secret: str
85 access_token: str
86
87
88CONF_ACCESS_TOKEN = "access_token"
89CONF_ARL_TOKEN = "arl_token"
90CONF_ACTION_AUTH = "auth"
91DEEZER_AUTH_URL = "https://connect.deezer.com/oauth/auth.php"
92RELAY_URL = "https://deezer.oauth.jonathanbangert.com/"
93DEEZER_PERMS = "basic_access,email,offline_access,manage_library,\
94manage_community,delete_library,listening_history"
95DEEZER_APP_ID = app_var(6)
96DEEZER_APP_SECRET = app_var(7)
97
98
99async def get_access_token(
100 app_id: str, app_secret: str, code: str, http_session: ClientSession
101) -> str:
102 """Update the access_token."""
103 response = await http_session.post(
104 "https://connect.deezer.com/oauth/access_token.php",
105 params={"code": code, "app_id": app_id, "secret": app_secret},
106 ssl=False,
107 )
108 if response.status != 200:
109 msg = f"HTTP Error {response.status}: {response.reason}"
110 raise ConnectionError(msg)
111 response_text = await response.text()
112 try:
113 return response_text.split("=")[1].split("&")[0]
114 except Exception as error:
115 msg = "Invalid auth code"
116 raise LoginFailed(msg) from error
117
118
119async def setup(
120 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
121) -> ProviderInstanceType:
122 """Initialize provider(instance) with given configuration."""
123 return DeezerProvider(mass, manifest, config, SUPPORTED_FEATURES)
124
125
126async def get_config_entries(
127 mass: MusicAssistant,
128 instance_id: str | None = None, # noqa: ARG001
129 action: str | None = None,
130 values: dict[str, ConfigValueType] | None = None,
131) -> tuple[ConfigEntry, ...]:
132 """Return Config entries to setup this provider."""
133 # Action is to launch oauth flow
134 if action == CONF_ACTION_AUTH:
135 # Use the AuthenticationHelper to authenticate
136 if not values or "session_id" not in values:
137 raise InvalidDataError("session_id not found in values")
138 async with AuthenticationHelper(mass, cast("str", values["session_id"])) as auth_helper:
139 url = f"{DEEZER_AUTH_URL}?app_id={DEEZER_APP_ID}&redirect_uri={RELAY_URL}\
140&perms={DEEZER_PERMS}&state={auth_helper.callback_url}"
141 code = (await auth_helper.authenticate(url))["code"]
142 values[CONF_ACCESS_TOKEN] = await get_access_token(
143 DEEZER_APP_ID, DEEZER_APP_SECRET, code, mass.http_session
144 )
145
146 return (
147 ConfigEntry(
148 key=CONF_ACCESS_TOKEN,
149 type=ConfigEntryType.SECURE_STRING,
150 label="Access token",
151 required=True,
152 action=CONF_ACTION_AUTH,
153 description="You need to authenticate on Deezer.",
154 action_label="Authenticate with Deezer",
155 value=values.get(CONF_ACCESS_TOKEN) if values else None,
156 ),
157 ConfigEntry(
158 key=CONF_ARL_TOKEN,
159 type=ConfigEntryType.SECURE_STRING,
160 label="Arl token",
161 required=True,
162 description="See https://www.dumpmedia.com/deezplus/deezer-arl.html",
163 value=values.get(CONF_ARL_TOKEN) if values else None,
164 ),
165 )
166
167
168class DeezerProvider(MusicProvider):
169 """Deezer provider support."""
170
171 client: deezer.Client
172 gw_client: GWClient
173 credentials: DeezerCredentials
174 user: deezer.User
175
176 async def handle_async_init(self) -> None:
177 """Handle async init of the Deezer provider."""
178 self.credentials = DeezerCredentials(
179 app_id=DEEZER_APP_ID,
180 app_secret=DEEZER_APP_SECRET,
181 access_token=cast("str", self.config.get_value(CONF_ACCESS_TOKEN)),
182 )
183
184 self.client = deezer.Client(
185 app_id=self.credentials.app_id,
186 app_secret=self.credentials.app_secret,
187 access_token=self.credentials.access_token,
188 )
189
190 self.user = await self.client.get_user()
191
192 self.gw_client = GWClient(
193 self.mass.http_session,
194 str(self.config.get_value(CONF_ACCESS_TOKEN)),
195 str(self.config.get_value(CONF_ARL_TOKEN)),
196 )
197 await self.gw_client.setup()
198
199 @use_cache(3600 * 24 * 7) # Cache for 7 days
200 async def search(
201 self, search_query: str, media_types: list[MediaType], limit: int = 5
202 ) -> SearchResults:
203 """Perform search on music provider.
204
205 :param search_query: Search query.
206 :param media_types: A list of media_types to include. All types if None.
207 """
208 # Create a task for each media_type
209 tasks: dict[MediaType, Any] = {}
210
211 async with TaskGroup() as taskgroup:
212 for media_type in media_types:
213 if media_type == MediaType.TRACK:
214 tasks[MediaType.TRACK] = taskgroup.create_task(
215 self.search_and_parse_tracks(
216 query=search_query,
217 limit=limit,
218 user_country=self.gw_client.user_country,
219 )
220 )
221 elif media_type == MediaType.ARTIST:
222 tasks[MediaType.ARTIST] = taskgroup.create_task(
223 self.search_and_parse_artists(query=search_query, limit=limit)
224 )
225 elif media_type == MediaType.ALBUM:
226 tasks[MediaType.ALBUM] = taskgroup.create_task(
227 self.search_and_parse_albums(query=search_query, limit=limit)
228 )
229 elif media_type == MediaType.PLAYLIST:
230 tasks[MediaType.PLAYLIST] = taskgroup.create_task(
231 self.search_and_parse_playlists(query=search_query, limit=limit)
232 )
233
234 results = SearchResults()
235
236 for media_type, task in tasks.items():
237 if media_type == MediaType.ARTIST:
238 results.artists = task.result()
239 elif media_type == MediaType.ALBUM:
240 results.albums = task.result()
241 elif media_type == MediaType.TRACK:
242 results.tracks = task.result()
243 elif media_type == MediaType.PLAYLIST:
244 results.playlists = task.result()
245
246 return results
247
248 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
249 """Retrieve all library artists from Deezer."""
250 async for artist in await self.client.get_user_artists():
251 yield self.parse_artist(artist=artist)
252
253 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
254 """Retrieve all library albums from Deezer."""
255 async for album in await self.client.get_user_albums():
256 yield self.parse_album(album=album)
257
258 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
259 """Retrieve all library playlists from Deezer."""
260 async for playlist in await self.user.get_playlists():
261 yield self.parse_playlist(playlist=playlist)
262
263 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
264 """Retrieve all library tracks from Deezer."""
265 async for track in await self.client.get_user_tracks():
266 yield self.parse_track(track=track, user_country=self.gw_client.user_country)
267
268 @use_cache(3600 * 24 * 30) # Cache for 30 days
269 async def get_artist(self, prov_artist_id: str) -> Artist:
270 """Get full artist details by id."""
271 try:
272 return self.parse_artist(
273 artist=await self.client.get_artist(artist_id=int(prov_artist_id))
274 )
275 except deezer_exceptions.DeezerErrorResponse as error:
276 self.logger.warning("Failed getting artist: %s", error)
277 raise MediaNotFoundError(f"Artist {prov_artist_id} not found on Deezer") from error
278
279 @use_cache(3600 * 24 * 30) # Cache for 30 days
280 async def get_album(self, prov_album_id: str) -> Album:
281 """Get full album details by id."""
282 try:
283 return self.parse_album(album=await self.client.get_album(album_id=int(prov_album_id)))
284 except deezer_exceptions.DeezerErrorResponse as error:
285 self.logger.warning("Failed getting album: %s", error)
286 raise MediaNotFoundError(f"Album {prov_album_id} not found on Deezer") from error
287
288 @use_cache(3600 * 24 * 30) # Cache for 30 days
289 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
290 """Get full playlist details by id."""
291 try:
292 return self.parse_playlist(
293 playlist=await self.client.get_playlist(playlist_id=int(prov_playlist_id)),
294 )
295 except deezer_exceptions.DeezerErrorResponse as error:
296 self.logger.warning("Failed getting playlist: %s", error)
297 raise MediaNotFoundError(f"Album {prov_playlist_id} not found on Deezer") from error
298
299 @use_cache(3600 * 24 * 30) # Cache for 30 days
300 async def get_track(self, prov_track_id: str) -> Track:
301 """Get full track details by id."""
302 try:
303 return self.parse_track(
304 track=await self.client.get_track(track_id=int(prov_track_id)),
305 user_country=self.gw_client.user_country,
306 )
307 except deezer_exceptions.DeezerErrorResponse as error:
308 self.logger.warning("Failed getting track: %s", error)
309 raise MediaNotFoundError(f"Album {prov_track_id} not found on Deezer") from error
310
311 @use_cache(3600 * 24 * 30) # Cache for 30 days
312 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
313 """Get all tracks in an album."""
314 album = await self.client.get_album(album_id=int(prov_album_id))
315 return [
316 self.parse_track(
317 track=deezer_track,
318 user_country=self.gw_client.user_country,
319 # TODO: doesn't Deezer have disc and track number in the api ?
320 position=0,
321 )
322 for deezer_track in await album.get_tracks()
323 ]
324
325 @use_cache(3600 * 3) # Cache for 3 hours
326 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
327 """Get playlist tracks."""
328 result: list[Track] = []
329 if page > 0:
330 # paging not supported, we always return the whole list at once
331 return []
332 # TODO: access the underlying paging on the deezer api (if possible))
333 playlist = await self.client.get_playlist(int(prov_playlist_id))
334 playlist_tracks = await playlist.get_tracks()
335 for index, deezer_track in enumerate(playlist_tracks, 1):
336 result.append(
337 self.parse_track(
338 track=deezer_track,
339 user_country=self.gw_client.user_country,
340 position=index,
341 )
342 )
343 return result
344
345 @use_cache(3600 * 24 * 7) # Cache for 7 days
346 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
347 """Get albums by an artist."""
348 artist = await self.client.get_artist(artist_id=int(prov_artist_id))
349 return [self.parse_album(album=album) async for album in await artist.get_albums()]
350
351 @use_cache(3600 * 24 * 7) # Cache for 7 days
352 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
353 """Get top 50 tracks of an artist."""
354 artist = await self.client.get_artist(artist_id=int(prov_artist_id))
355 return [
356 self.parse_track(track=track, user_country=self.gw_client.user_country)
357 async for track in await artist.get_top(limit=50)
358 ]
359
360 async def library_add(self, item: MediaItemType) -> bool:
361 """Add an item to the provider's library/favorites."""
362 result = False
363 if item.media_type == MediaType.ARTIST:
364 result = bool(
365 await self.client.add_user_artist(
366 artist_id=int(item.item_id),
367 )
368 )
369 elif item.media_type == MediaType.ALBUM:
370 result = bool(
371 await self.client.add_user_album(
372 album_id=int(item.item_id),
373 )
374 )
375 elif item.media_type == MediaType.TRACK:
376 result = bool(
377 await self.client.add_user_track(
378 track_id=int(item.item_id),
379 )
380 )
381 elif item.media_type == MediaType.PLAYLIST:
382 result = bool(
383 await self.client.add_user_playlist(
384 playlist_id=int(item.item_id),
385 )
386 )
387 else:
388 raise NotImplementedError
389 return result
390
391 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
392 """Remove an item from the provider's library/favorites."""
393 result = False
394 if media_type == MediaType.ARTIST:
395 result = bool(
396 await self.client.remove_user_artist(
397 artist_id=int(prov_item_id),
398 )
399 )
400 elif media_type == MediaType.ALBUM:
401 result = bool(
402 await self.client.remove_user_album(
403 album_id=int(prov_item_id),
404 )
405 )
406 elif media_type == MediaType.TRACK:
407 result = bool(
408 await self.client.remove_user_track(
409 track_id=int(prov_item_id),
410 )
411 )
412 elif media_type == MediaType.PLAYLIST:
413 result = bool(
414 await self.client.remove_user_playlist(
415 playlist_id=int(prov_item_id),
416 )
417 )
418 else:
419 raise NotImplementedError
420 return result
421
422 @use_cache(3600) # Cache for 1 hour
423 async def recommendations(self) -> list[RecommendationFolder]:
424 """Get deezer's recommendations."""
425 return [
426 RecommendationFolder(
427 item_id="recommended_tracks",
428 provider=self.instance_id,
429 name="Recommended tracks",
430 translation_key="recommended_tracks",
431 items=UniqueList(
432 [
433 self.parse_track(track=track, user_country=self.gw_client.user_country)
434 for track in await self.client.get_user_recommended_tracks()
435 ]
436 ),
437 )
438 ]
439
440 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
441 """Add track(s) to playlist."""
442 playlist = await self.client.get_playlist(int(prov_playlist_id))
443 await playlist.add_tracks(tracks=[int(i) for i in prov_track_ids])
444
445 async def remove_playlist_tracks(
446 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
447 ) -> None:
448 """Remove track(s) from playlist."""
449 playlist_track_ids = []
450 for track in await self.get_playlist_tracks(prov_playlist_id, 0):
451 if track.position in positions_to_remove:
452 playlist_track_ids.append(int(track.item_id))
453 if len(playlist_track_ids) == len(positions_to_remove):
454 break
455 playlist = await self.client.get_playlist(int(prov_playlist_id))
456 await playlist.delete_tracks(playlist_track_ids)
457
458 async def create_playlist(self, name: str) -> Playlist:
459 """Create a new playlist on provider with given name."""
460 playlist_id = await self.client.create_playlist(playlist_name=name)
461 playlist = await self.client.get_playlist(playlist_id)
462 return self.parse_playlist(playlist=playlist)
463
464 @use_cache(3600 * 24) # Cache for 24 hours
465 async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
466 """Retrieve a dynamic list of tracks based on the provided item."""
467 endpoint = "song.getSearchTrackMix"
468 tracks = (await self.gw_client._gw_api_call(endpoint, args={"SNG_ID": prov_track_id}))[
469 "results"
470 ]["data"][:limit]
471 return [await self.get_track(track["SNG_ID"]) for track in tracks]
472
473 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
474 """Return the content details for the given track when it will be streamed."""
475 url_details, song_data = await self.gw_client.get_deezer_track_urls(item_id)
476 url = url_details["sources"][0]["url"]
477 return StreamDetails(
478 item_id=item_id,
479 provider=self.instance_id,
480 audio_format=AudioFormat(
481 content_type=ContentType.try_parse(url_details["format"].split("_")[0])
482 ),
483 stream_type=StreamType.CUSTOM,
484 duration=int(song_data["DURATION"]),
485 # Due to track replacement, the track ID of the stream may be different from the ID
486 # that is stored. We need the proper track ID to decrypt the stream, so store it
487 # separately so we can use it later on.
488 data={"url": url, "format": url_details["format"], "track_id": song_data["SNG_ID"]},
489 size=int(song_data[f"FILESIZE_{url_details['format']}"]),
490 can_seek=True,
491 allow_seek=True,
492 )
493
494 async def get_audio_stream(
495 self, streamdetails: StreamDetails, seek_position: int = 0
496 ) -> AsyncGenerator[bytes, None]:
497 """Return the audio stream for the provider item."""
498 blowfish_key = self.get_blowfish_key(streamdetails.data["track_id"])
499 chunk_index = 0
500 timeout = ClientTimeout(total=None, connect=30, sock_read=600)
501 headers: dict[str, str] = {}
502 # if seek_position and streamdetails.size:
503 # chunk_count = ceil(streamdetails.size / 2048)
504 # chunk_index = int(chunk_count / streamdetails.duration) * seek_position
505 # skip_bytes = chunk_index * 2048
506 # headers["Range"] = f"bytes={skip_bytes}-"
507
508 # NOTE: Seek with using the Range header is not working properly
509 # causing malformed audio so this is a temporary patch
510 # by just skipping chunks
511 if seek_position and streamdetails.size and streamdetails.duration:
512 chunk_count = ceil(streamdetails.size / 2048)
513 skip_chunks = int(chunk_count / streamdetails.duration) * seek_position
514 else:
515 skip_chunks = 0
516
517 buffer = bytearray()
518 streamdetails.data["start_ts"] = utc_timestamp()
519 streamdetails.data["stream_id"] = uuid.uuid1()
520 self.mass.create_task(self.gw_client.log_listen(next_track=streamdetails.item_id))
521 async with self.mass.http_session.get(
522 streamdetails.data["url"], headers=headers, timeout=timeout
523 ) as resp:
524 async for chunk in resp.content.iter_chunked(2048):
525 buffer += chunk
526 if len(buffer) >= 2048:
527 if chunk_index >= skip_chunks or chunk_index == 0:
528 if chunk_index % 3 > 0:
529 yield bytes(buffer[:2048])
530 else:
531 yield self.decrypt_chunk(bytes(buffer[:2048]), blowfish_key)
532
533 chunk_index += 1
534 del buffer[:2048]
535 yield bytes(buffer)
536
537 async def on_streamed(
538 self,
539 streamdetails: StreamDetails,
540 ) -> None:
541 """Handle callback when an item completed streaming."""
542 await self.gw_client.log_listen(last_track=streamdetails)
543
544 ### PARSING METADATA FUNCTIONS ###
545
546 def parse_metadata_track(self, track: deezer.Track) -> MediaItemMetadata:
547 """Parse the track metadata."""
548 metadata = MediaItemMetadata()
549 if hasattr(track, "preview"):
550 metadata.preview = track.preview
551 if hasattr(track, "explicit_lyrics"):
552 metadata.explicit = track.explicit_lyrics
553 if hasattr(track, "rank"):
554 metadata.popularity = track.rank
555 if hasattr(track, "album") and hasattr(track.album, "cover_big"):
556 metadata.add_image(
557 MediaItemImage(
558 type=ImageType.THUMB,
559 path=track.album.cover_big,
560 provider=self.instance_id,
561 remotely_accessible=True,
562 )
563 )
564 return metadata
565
566 def parse_metadata_album(self, album: deezer.Album) -> MediaItemMetadata:
567 """Parse the album metadata."""
568 return MediaItemMetadata(
569 explicit=album.explicit_lyrics,
570 images=UniqueList(
571 [
572 MediaItemImage(
573 type=ImageType.THUMB,
574 path=album.cover_big,
575 provider=self.instance_id,
576 remotely_accessible=True,
577 )
578 ]
579 ),
580 )
581
582 def parse_metadata_artist(self, artist: deezer.Artist) -> MediaItemMetadata:
583 """Parse the artist metadata."""
584 return MediaItemMetadata(
585 images=UniqueList(
586 [
587 MediaItemImage(
588 type=ImageType.THUMB,
589 path=artist.picture_big,
590 provider=self.instance_id,
591 remotely_accessible=True,
592 )
593 ]
594 ),
595 )
596
597 ### PARSING FUNCTIONS ###
598 def parse_artist(self, artist: deezer.Artist) -> Artist:
599 """Parse the deezer-python artist to a Music Assistant artist."""
600 return Artist(
601 item_id=str(artist.id),
602 provider=self.instance_id,
603 name=artist.name,
604 media_type=MediaType.ARTIST,
605 provider_mappings={
606 ProviderMapping(
607 item_id=str(artist.id),
608 provider_domain=self.domain,
609 provider_instance=self.instance_id,
610 url=getattr(artist, "link", None), # Sometimes the API doesn't return a link
611 )
612 },
613 metadata=self.parse_metadata_artist(artist=artist),
614 )
615
616 def parse_album(self, album: deezer.Album) -> Album:
617 """Parse the deezer-python album to a Music Assistant album."""
618 name, version = parse_title_and_version(album.title)
619 return Album(
620 album_type=self.get_album_type(album),
621 item_id=str(album.id),
622 provider=self.instance_id,
623 name=name,
624 version=version,
625 year=album.release_date.year if getattr(album, "release_date", None) else None,
626 artists=UniqueList(
627 [
628 ItemMapping(
629 media_type=MediaType.ARTIST,
630 item_id=str(album.artist.id),
631 provider=self.instance_id,
632 name=album.artist.name,
633 )
634 ]
635 ),
636 media_type=MediaType.ALBUM,
637 provider_mappings={
638 ProviderMapping(
639 item_id=str(album.id),
640 provider_domain=self.domain,
641 provider_instance=self.instance_id,
642 url=getattr(album, "link", None),
643 )
644 },
645 metadata=self.parse_metadata_album(album=album),
646 )
647
648 def parse_playlist(self, playlist: deezer.Playlist) -> Playlist:
649 """Parse the deezer-python playlist to a Music Assistant playlist."""
650 creator = self.get_playlist_creator(playlist)
651 is_editable = creator.id == self.user.id
652 return Playlist(
653 item_id=str(playlist.id),
654 provider=self.instance_id,
655 name=playlist.title,
656 media_type=MediaType.PLAYLIST,
657 provider_mappings={
658 ProviderMapping(
659 item_id=str(playlist.id),
660 provider_domain=self.domain,
661 provider_instance=self.instance_id,
662 url=getattr(playlist, "link", None),
663 is_unique=is_editable, # user-owned playlists are unique
664 )
665 },
666 metadata=MediaItemMetadata(
667 images=UniqueList(
668 [
669 MediaItemImage(
670 type=ImageType.THUMB,
671 path=playlist.picture_big,
672 provider=self.instance_id,
673 remotely_accessible=True,
674 )
675 ]
676 ),
677 ),
678 is_editable=is_editable,
679 owner=creator.name,
680 )
681
682 def get_playlist_creator(self, playlist: deezer.Playlist) -> deezer.User:
683 """On playlists, the creator is called creator, elsewhere it's called user."""
684 if hasattr(playlist, "creator"):
685 return playlist.creator
686 return playlist.user
687
688 def parse_track(self, track: deezer.Track, user_country: str, position: int = 0) -> Track:
689 """Parse the deezer-python track to a Music Assistant track."""
690 if hasattr(track, "artist"):
691 artist = ItemMapping(
692 media_type=MediaType.ARTIST,
693 item_id=str(getattr(track.artist, "id", f"deezer-{track.artist.name}")),
694 provider=self.instance_id,
695 name=track.artist.name,
696 )
697 else:
698 artist = None
699 if hasattr(track, "album"):
700 album = ItemMapping(
701 media_type=MediaType.ALBUM,
702 item_id=str(track.album.id),
703 provider=self.instance_id,
704 name=track.album.title,
705 )
706 else:
707 album = None
708
709 name, version = parse_title_and_version(track.title)
710 item = Track(
711 item_id=str(track.id),
712 provider=self.instance_id,
713 name=name,
714 version=version,
715 sort_name=self.get_short_title(track),
716 duration=track.duration,
717 artists=UniqueList([artist]) if artist else UniqueList(),
718 album=album,
719 provider_mappings={
720 ProviderMapping(
721 item_id=str(track.id),
722 provider_domain=self.domain,
723 provider_instance=self.instance_id,
724 available=self.track_available(track=track, user_country=user_country),
725 url=getattr(track, "link", None),
726 )
727 },
728 metadata=self.parse_metadata_track(track=track),
729 track_number=getattr(track, "track_position", position),
730 position=position,
731 disc_number=getattr(track, "disk_number", 0),
732 )
733 if isrc := getattr(track, "isrc", None):
734 item.external_ids.add((ExternalID.ISRC, isrc))
735 return item
736
737 def get_short_title(self, track: deezer.Track) -> str:
738 """Short names only returned, if available."""
739 if hasattr(track, "title_short"):
740 return str(track.title_short)
741 return str(track.title)
742
743 def get_album_type(self, album: deezer.Album) -> AlbumType:
744 """Read and convert the Deezer album type."""
745 # Get provider's basic type first
746 provider_type = AlbumType.UNKNOWN
747 if hasattr(album, "record_type"):
748 match album.record_type:
749 case "album":
750 provider_type = AlbumType.ALBUM
751 case "single":
752 provider_type = AlbumType.SINGLE
753 case "ep":
754 provider_type = AlbumType.EP
755 case "compile":
756 provider_type = AlbumType.COMPILATION
757
758 # Try inference - override if it finds something more specific
759 inferred_type = infer_album_type(album.title, "")
760 if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
761 return inferred_type
762
763 # Otherwise use provider type
764 return provider_type
765
766 ### SEARCH AND PARSE FUNCTIONS ###
767 async def search_and_parse_tracks(
768 self, query: str, user_country: str, limit: int = 20
769 ) -> list[Track]:
770 """Search for tracks and parse them."""
771 deezer_tracks = await self.client.search(query=query, limit=limit)
772 tracks = []
773 for index, track in enumerate(deezer_tracks):
774 tracks.append(self.parse_track(track, user_country))
775 if index == limit:
776 return tracks
777 return tracks
778
779 async def search_and_parse_artists(self, query: str, limit: int = 20) -> list[Artist]:
780 """Search for artists and parse them."""
781 deezer_artist = await self.client.search_artists(query=query, limit=limit)
782 artists = []
783 for index, artist in enumerate(deezer_artist):
784 artists.append(self.parse_artist(artist))
785 if index == limit:
786 return artists
787 return artists
788
789 async def search_and_parse_albums(self, query: str, limit: int = 20) -> list[Album]:
790 """Search for album and parse them."""
791 deezer_albums = await self.client.search_albums(query=query, limit=limit)
792 albums = []
793 for index, album in enumerate(deezer_albums):
794 albums.append(self.parse_album(album))
795 if index == limit:
796 return albums
797 return albums
798
799 async def search_and_parse_playlists(self, query: str, limit: int = 20) -> list[Playlist]:
800 """Search for playlists and parse them."""
801 deezer_playlists = await self.client.search_playlists(query=query, limit=limit)
802 playlists = []
803 for index, playlist in enumerate(deezer_playlists):
804 playlists.append(self.parse_playlist(playlist))
805 if index == limit:
806 return playlists
807 return playlists
808
809 ### OTHER FUNCTIONS ###
810
811 async def get_track_content_type(
812 self, gw_client: GWClient, track_id: str
813 ) -> Literal[ContentType.FLAC, ContentType.MP3]:
814 """Get a tracks contentType."""
815 song_data = await gw_client.get_song_data(track_id)
816 if song_data["results"]["FILESIZE_FLAC"]:
817 return ContentType.FLAC
818
819 if song_data["results"]["FILESIZE_MP3_320"] or song_data["results"]["FILESIZE_MP3_128"]:
820 return ContentType.MP3
821
822 msg = "Unsupported contenttype"
823 raise NotImplementedError(msg)
824
825 def track_available(self, track: deezer.Track, user_country: str) -> bool:
826 """Check if a given track is available in the users country."""
827 if hasattr(track, "available_countries"):
828 return user_country in track.available_countries
829 return True
830
831 def _md5(self, data: str, data_type: str = "ascii") -> str:
832 md5sum = hashlib.md5()
833 md5sum.update(data.encode(data_type))
834 return md5sum.hexdigest()
835
836 def get_blowfish_key(self, track_id: str) -> str:
837 """Get blowfish key to decrypt a chunk of a track."""
838 secret = app_var(5)
839 id_md5 = self._md5(track_id)
840 return "".join(
841 chr(ord(id_md5[i]) ^ ord(id_md5[i + 16]) ^ ord(secret[i])) for i in range(16)
842 )
843
844 def decrypt_chunk(self, chunk: bytes, blowfish_key: str) -> bytes:
845 """Decrypt a given chunk using the blow fish key."""
846 cipher = Blowfish.new(
847 blowfish_key.encode("ascii"),
848 Blowfish.MODE_CBC,
849 b"\x00\x01\x02\x03\x04\x05\x06\x07",
850 )
851 return cipher.decrypt(chunk) # type: ignore[no-any-return,unused-ignore]
852