/
/
/
1"""Deezer music provider support for MusicAssistant."""
2
3import hashlib
4import uuid
5from asyncio import TaskGroup
6from collections.abc import AsyncGenerator
7from dataclasses import dataclass
8from math import ceil
9from typing import Any, Literal, cast
10
11import deezer
12from aiohttp import ClientSession, ClientTimeout
13from Crypto.Cipher import Blowfish
14from deezer import exceptions as deezer_exceptions
15from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
16from music_assistant_models.enums import (
17 AlbumType,
18 ConfigEntryType,
19 ContentType,
20 ExternalID,
21 ImageType,
22 MediaType,
23 ProviderFeature,
24 StreamType,
25)
26from music_assistant_models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
27from music_assistant_models.media_items import (
28 Album,
29 Artist,
30 AudioFormat,
31 ItemMapping,
32 MediaItemImage,
33 MediaItemMetadata,
34 MediaItemType,
35 Playlist,
36 ProviderMapping,
37 RecommendationFolder,
38 SearchResults,
39 Track,
40 UniqueList,
41)
42from music_assistant_models.provider import ProviderManifest
43from music_assistant_models.streamdetails import StreamDetails
44
45from music_assistant import MusicAssistant
46from music_assistant.controllers.cache import use_cache
47from music_assistant.helpers.app_vars import app_var # type: ignore[attr-defined]
48from music_assistant.helpers.auth import AuthenticationHelper
49from music_assistant.helpers.datetime import utc_timestamp
50from music_assistant.helpers.util import infer_album_type, parse_title_and_version
51from music_assistant.models import ProviderInstanceType
52from music_assistant.models.music_provider import MusicProvider
53
54from .gw_client import GWClient
55
56SUPPORTED_FEATURES = {
57 ProviderFeature.LIBRARY_ARTISTS,
58 ProviderFeature.LIBRARY_ALBUMS,
59 ProviderFeature.LIBRARY_TRACKS,
60 ProviderFeature.LIBRARY_PLAYLISTS,
61 ProviderFeature.LIBRARY_ALBUMS_EDIT,
62 ProviderFeature.LIBRARY_TRACKS_EDIT,
63 ProviderFeature.LIBRARY_ARTISTS_EDIT,
64 ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
65 ProviderFeature.ALBUM_METADATA,
66 ProviderFeature.TRACK_METADATA,
67 ProviderFeature.ARTIST_METADATA,
68 ProviderFeature.ARTIST_ALBUMS,
69 ProviderFeature.ARTIST_TOPTRACKS,
70 ProviderFeature.BROWSE,
71 ProviderFeature.SEARCH,
72 ProviderFeature.PLAYLIST_TRACKS_EDIT,
73 ProviderFeature.PLAYLIST_CREATE,
74 ProviderFeature.RECOMMENDATIONS,
75 ProviderFeature.SIMILAR_TRACKS,
76}
77
78
79@dataclass
80class DeezerCredentials:
81 """Class for storing credentials."""
82
83 app_id: int
84 app_secret: str
85 access_token: str
86
87
88CONF_ACCESS_TOKEN = "access_token"
89CONF_ARL_TOKEN = "arl_token"
90CONF_ACTION_AUTH = "auth"
91DEEZER_AUTH_URL = "https://connect.deezer.com/oauth/auth.php"
92RELAY_URL = "https://deezer.oauth.jonathanbangert.com/"
93DEEZER_PERMS = "basic_access,email,offline_access,manage_library,\
94manage_community,delete_library,listening_history"
95DEEZER_APP_ID = app_var(6)
96DEEZER_APP_SECRET = app_var(7)
97
98# Virtual playlist IDs for dynamic Deezer content
99FLOW_PLAYLIST_ID = "flow"
100RECOMMENDED_TRACKS_PLAYLIST_ID = "recommended_tracks"
101TOP_CHARTS_PLAYLIST_ID = "top_charts"
102RADIO_PLAYLIST_PREFIX = "radio_"
103MOOD_FLOW_PREFIX = "mood_flow_"
104
105# Curated Deezer radio station IDs
106CURATED_RADIO_IDS = [
107 37151, # Hits
108 38305, # The '80s
109 38295, # The '70s
110 31061, # Pop
111 37765, # Rock classics
112 30901, # Metal
113 30991, # Hip Hop
114 30771, # Indie
115 30621, # Electronic
116 31031, # Jazz
117 30661, # Classical
118 36791, # Latin Music
119 38225, # Focus
120 39041, # Happy Hour
121]
122
123
124async def get_access_token(
125 app_id: str, app_secret: str, code: str, http_session: ClientSession
126) -> str:
127 """Update the access_token."""
128 response = await http_session.post(
129 "https://connect.deezer.com/oauth/access_token.php",
130 params={"code": code, "app_id": app_id, "secret": app_secret},
131 ssl=False,
132 )
133 if response.status != 200:
134 msg = f"HTTP Error {response.status}: {response.reason}"
135 raise ConnectionError(msg)
136 response_text = await response.text()
137 try:
138 return response_text.split("=")[1].split("&")[0]
139 except Exception as error:
140 msg = "Invalid auth code"
141 raise LoginFailed(msg) from error
142
143
144async def setup(
145 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
146) -> ProviderInstanceType:
147 """Initialize provider(instance) with given configuration."""
148 return DeezerProvider(mass, manifest, config, SUPPORTED_FEATURES)
149
150
151async def get_config_entries(
152 mass: MusicAssistant,
153 instance_id: str | None = None, # noqa: ARG001
154 action: str | None = None,
155 values: dict[str, ConfigValueType] | None = None,
156) -> tuple[ConfigEntry, ...]:
157 """Return Config entries to setup this provider."""
158 # Action is to launch oauth flow
159 if action == CONF_ACTION_AUTH:
160 # Use the AuthenticationHelper to authenticate
161 if not values or "session_id" not in values:
162 raise InvalidDataError("session_id not found in values")
163 async with AuthenticationHelper(mass, cast("str", values["session_id"])) as auth_helper:
164 url = f"{DEEZER_AUTH_URL}?app_id={DEEZER_APP_ID}&redirect_uri={RELAY_URL}\
165&perms={DEEZER_PERMS}&state={auth_helper.callback_url}"
166 code = (await auth_helper.authenticate(url))["code"]
167 values[CONF_ACCESS_TOKEN] = await get_access_token(
168 DEEZER_APP_ID, DEEZER_APP_SECRET, code, mass.http_session
169 )
170
171 return (
172 ConfigEntry(
173 key=CONF_ACCESS_TOKEN,
174 type=ConfigEntryType.SECURE_STRING,
175 label="Access token",
176 required=True,
177 action=CONF_ACTION_AUTH,
178 description="You need to authenticate on Deezer.",
179 action_label="Authenticate with Deezer",
180 value=values.get(CONF_ACCESS_TOKEN) if values else None,
181 ),
182 ConfigEntry(
183 key=CONF_ARL_TOKEN,
184 type=ConfigEntryType.SECURE_STRING,
185 label="Arl token",
186 required=True,
187 description="See https://www.dumpmedia.com/deezplus/deezer-arl.html",
188 value=values.get(CONF_ARL_TOKEN) if values else None,
189 ),
190 )
191
192
193class DeezerProvider(MusicProvider):
194 """Deezer provider support."""
195
196 client: deezer.Client
197 gw_client: GWClient
198 credentials: DeezerCredentials
199 user: deezer.User
200
201 async def handle_async_init(self) -> None:
202 """Handle async init of the Deezer provider."""
203 self.credentials = DeezerCredentials(
204 app_id=DEEZER_APP_ID,
205 app_secret=DEEZER_APP_SECRET,
206 access_token=cast("str", self.config.get_value(CONF_ACCESS_TOKEN)),
207 )
208
209 self.client = deezer.Client(
210 app_id=self.credentials.app_id,
211 app_secret=self.credentials.app_secret,
212 access_token=self.credentials.access_token,
213 )
214
215 self.user = await self.client.get_user()
216
217 self.gw_client = GWClient(
218 self.mass.http_session,
219 str(self.config.get_value(CONF_ACCESS_TOKEN)),
220 str(self.config.get_value(CONF_ARL_TOKEN)),
221 )
222 await self.gw_client.setup()
223
224 # Cached wrappers for dynamic Deezer content (ensures consistent data across calls)
225 @use_cache(3600) # Cache for 1 hour
226 async def _get_flow_tracks(self) -> list[deezer.Track]:
227 """Get cached Flow tracks."""
228 return list(await self.client.get_user_flow())
229
230 @use_cache(3600) # Cache for 1 hour
231 async def _get_recommended_tracks(self) -> list[deezer.Track]:
232 """Get cached recommended tracks."""
233 return list(await self.client.get_user_recommended_tracks())
234
235 @use_cache(3600) # Cache for 1 hour
236 async def _get_chart_tracks(self) -> list[deezer.Track]:
237 """Get cached chart tracks."""
238 chart = await self.client.get_chart()
239 return list(chart.tracks[:100]) if chart.tracks else []
240
241 @use_cache(3600) # Cache for 1 hour
242 async def _get_mood_flow_tracks(self, config_id: str) -> list[dict[str, Any]]:
243 """Get cached mood/genre Flow tracks from the GW API.
244
245 :param config_id: The Flow config identifier (e.g. "happy", "chill", "genre-rock").
246 """
247 return await self.gw_client.get_user_radio(config_id)
248
249 @use_cache(3600 * 24) # Cache for 24 hours
250 async def _get_available_flows(self) -> list[tuple[str, str, str | None]]:
251 """Discover available mood/genre Flow variants from the Deezer home page.
252
253 Genre flows have config_ids starting with 'genre-'.
254 Returns a list of (config_id, display_name, cover_url) tuples.
255 """
256 items = await self.gw_client.get_home_flows()
257 flows: list[tuple[str, str, str | None]] = []
258 for item in items:
259 config_id = item["data"]["id"]
260 if config_id == "default":
261 continue
262 title = f"Flow: {item['title']}"
263 cover_url = None
264 if pictures := item.get("pictures"):
265 cover_url = f"https://e-cdns-images.dzcdn.net/images/misc/{pictures[0]['md5']}/264x264-000000-80-0-0.jpg"
266 flows.append((config_id, title, cover_url))
267 return flows
268
269 @use_cache(3600 * 24 * 7) # Cache for 7 days
270 async def search(
271 self, search_query: str, media_types: list[MediaType], limit: int = 5
272 ) -> SearchResults:
273 """Perform search on music provider.
274
275 :param search_query: Search query.
276 :param media_types: A list of media_types to include. All types if None.
277 """
278 # Create a task for each media_type
279 tasks: dict[MediaType, Any] = {}
280
281 async with TaskGroup() as taskgroup:
282 for media_type in media_types:
283 if media_type == MediaType.TRACK:
284 tasks[MediaType.TRACK] = taskgroup.create_task(
285 self.search_and_parse_tracks(
286 query=search_query,
287 limit=limit,
288 user_country=self.gw_client.user_country,
289 )
290 )
291 elif media_type == MediaType.ARTIST:
292 tasks[MediaType.ARTIST] = taskgroup.create_task(
293 self.search_and_parse_artists(query=search_query, limit=limit)
294 )
295 elif media_type == MediaType.ALBUM:
296 tasks[MediaType.ALBUM] = taskgroup.create_task(
297 self.search_and_parse_albums(query=search_query, limit=limit)
298 )
299 elif media_type == MediaType.PLAYLIST:
300 tasks[MediaType.PLAYLIST] = taskgroup.create_task(
301 self.search_and_parse_playlists(query=search_query, limit=limit)
302 )
303
304 results = SearchResults()
305
306 for media_type, task in tasks.items():
307 if media_type == MediaType.ARTIST:
308 results.artists = task.result()
309 elif media_type == MediaType.ALBUM:
310 results.albums = task.result()
311 elif media_type == MediaType.TRACK:
312 results.tracks = task.result()
313 elif media_type == MediaType.PLAYLIST:
314 results.playlists = task.result()
315
316 return results
317
318 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
319 """Retrieve all library artists from Deezer."""
320 async for artist in await self.client.get_user_artists():
321 yield self.parse_artist(artist=artist)
322
323 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
324 """Retrieve all library albums from Deezer."""
325 async for album in await self.client.get_user_albums():
326 yield self.parse_album(album=album)
327
328 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
329 """Retrieve all library playlists from Deezer."""
330 async for playlist in await self.user.get_playlists():
331 yield self.parse_playlist(playlist=playlist)
332
333 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
334 """Retrieve all library tracks from Deezer."""
335 async for track in await self.client.get_user_tracks():
336 yield self.parse_track(track=track, user_country=self.gw_client.user_country)
337
338 @use_cache(3600 * 24 * 30) # Cache for 30 days
339 async def get_artist(self, prov_artist_id: str) -> Artist:
340 """Get full artist details by id."""
341 try:
342 return self.parse_artist(
343 artist=await self.client.get_artist(artist_id=int(prov_artist_id))
344 )
345 except deezer_exceptions.DeezerErrorResponse as error:
346 self.logger.warning("Failed getting artist: %s", error)
347 raise MediaNotFoundError(f"Artist {prov_artist_id} not found on Deezer") from error
348
349 @use_cache(3600 * 24 * 30) # Cache for 30 days
350 async def get_album(self, prov_album_id: str) -> Album:
351 """Get full album details by id."""
352 try:
353 return self.parse_album(album=await self.client.get_album(album_id=int(prov_album_id)))
354 except deezer_exceptions.DeezerErrorResponse as error:
355 self.logger.warning("Failed getting album: %s", error)
356 raise MediaNotFoundError(f"Album {prov_album_id} not found on Deezer") from error
357
358 @use_cache(3600 * 24 * 30) # Cache for 30 days
359 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
360 """Get full playlist details by id."""
361 # Handle virtual playlists (Flow, Recommended tracks, Top Charts, Radios)
362 if prov_playlist_id == FLOW_PLAYLIST_ID:
363 flow_tracks = await self._get_flow_tracks()
364 flow_cover = None
365 if flow_tracks and hasattr(flow_tracks[0], "album"):
366 flow_cover = getattr(flow_tracks[0].album, "cover_medium", None)
367 return self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover)
368 if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID:
369 rec_tracks = await self._get_recommended_tracks()
370 rec_cover = None
371 if rec_tracks and hasattr(rec_tracks[0], "album"):
372 rec_cover = getattr(rec_tracks[0].album, "cover_medium", None)
373 return self._create_virtual_playlist(
374 RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=rec_cover
375 )
376 if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID:
377 chart_tracks = await self._get_chart_tracks()
378 chart_cover = None
379 if chart_tracks and hasattr(chart_tracks[0], "album"):
380 chart_cover = getattr(chart_tracks[0].album, "cover_medium", None)
381 return self._create_virtual_playlist(
382 TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover
383 )
384 if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX):
385 radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, ""))
386 try:
387 radio = await self.client.get_radio(radio_id)
388 return self._create_virtual_playlist(
389 prov_playlist_id,
390 f"Radio: {radio.title}",
391 image_url=getattr(radio, "picture_medium", None),
392 )
393 except Exception as err:
394 self.logger.warning("Failed getting radio %s: %s", radio_id, err)
395 raise MediaNotFoundError(f"Radio {prov_playlist_id} not found on Deezer") from err
396 if prov_playlist_id.startswith(MOOD_FLOW_PREFIX):
397 config_id = prov_playlist_id.removeprefix(MOOD_FLOW_PREFIX)
398 all_flows = await self._get_available_flows()
399 flow_info = {cid: (name, cover) for cid, name, cover in all_flows}
400 name, cover_url = flow_info.get(config_id, (f"Flow: {config_id}", None))
401 return self._create_virtual_playlist(prov_playlist_id, name, image_url=cover_url)
402 try:
403 return self.parse_playlist(
404 playlist=await self.client.get_playlist(playlist_id=int(prov_playlist_id)),
405 )
406 except deezer_exceptions.DeezerErrorResponse as error:
407 self.logger.warning("Failed getting playlist: %s", error)
408 raise MediaNotFoundError(f"Album {prov_playlist_id} not found on Deezer") from error
409
410 @use_cache(3600 * 24 * 30) # Cache for 30 days
411 async def get_track(self, prov_track_id: str) -> Track:
412 """Get full track details by id."""
413 try:
414 return self.parse_track(
415 track=await self.client.get_track(track_id=int(prov_track_id)),
416 user_country=self.gw_client.user_country,
417 )
418 except deezer_exceptions.DeezerErrorResponse as error:
419 self.logger.warning("Failed getting track: %s", error)
420 raise MediaNotFoundError(f"Album {prov_track_id} not found on Deezer") from error
421
422 @use_cache(3600 * 24 * 30) # Cache for 30 days
423 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
424 """Get all tracks in an album."""
425 album = await self.client.get_album(album_id=int(prov_album_id))
426 return [
427 self.parse_track(
428 track=deezer_track,
429 user_country=self.gw_client.user_country,
430 # TODO: doesn't Deezer have disc and track number in the api ?
431 position=0,
432 )
433 for deezer_track in await album.get_tracks()
434 ]
435
436 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
437 """Get playlist tracks."""
438 if page > 0:
439 # paging not supported, we always return the whole list at once
440 return []
441
442 # Virtual playlists use their own cached wrappers (not double-cached)
443 if prov_playlist_id == FLOW_PLAYLIST_ID:
444 return self._parse_tracks_list(await self._get_flow_tracks())
445
446 if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID:
447 return self._parse_tracks_list(await self._get_recommended_tracks())
448
449 if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID:
450 return self._parse_tracks_list(await self._get_chart_tracks())
451
452 if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX):
453 radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, ""))
454 try:
455 radio = await self.client.get_radio(radio_id)
456 return self._parse_tracks_list(list(await radio.get_tracks()))
457 except Exception as err:
458 self.logger.debug("Failed to get radio tracks %s: %s", radio_id, err)
459 return []
460
461 if prov_playlist_id.startswith(MOOD_FLOW_PREFIX):
462 config_id = prov_playlist_id.removeprefix(MOOD_FLOW_PREFIX)
463 gw_tracks = await self._get_mood_flow_tracks(config_id)
464 return [await self.get_track(track["SNG_ID"]) for track in gw_tracks]
465
466 # Regular Deezer playlists (cached separately)
467 return await self._get_regular_playlist_tracks(prov_playlist_id)
468
469 @use_cache(3600 * 3) # Cache for 3 hours
470 async def _get_regular_playlist_tracks(self, prov_playlist_id: str) -> list[Track]:
471 """Get tracks for regular Deezer playlists (cached)."""
472 playlist = await self.client.get_playlist(int(prov_playlist_id))
473 playlist_tracks = await playlist.get_tracks()
474 return self._parse_tracks_list(list(playlist_tracks))
475
476 def _parse_tracks_list(self, tracks: list[deezer.Track]) -> list[Track]:
477 """Parse a list of Deezer tracks to Music Assistant tracks."""
478 return [
479 self.parse_track(
480 track=track,
481 user_country=self.gw_client.user_country,
482 position=index,
483 )
484 for index, track in enumerate(tracks, 1)
485 ]
486
487 @use_cache(3600 * 24 * 7) # Cache for 7 days
488 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
489 """Get albums by an artist."""
490 artist = await self.client.get_artist(artist_id=int(prov_artist_id))
491 return [self.parse_album(album=album) async for album in await artist.get_albums()]
492
493 @use_cache(3600 * 24 * 7) # Cache for 7 days
494 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
495 """Get top 50 tracks of an artist."""
496 artist = await self.client.get_artist(artist_id=int(prov_artist_id))
497 return [
498 self.parse_track(track=track, user_country=self.gw_client.user_country)
499 async for track in await artist.get_top(limit=50)
500 ]
501
502 async def library_add(self, item: MediaItemType) -> bool:
503 """Add an item to the provider's library/favorites."""
504 result = False
505 if item.media_type == MediaType.ARTIST:
506 result = bool(
507 await self.client.add_user_artist(
508 artist_id=int(item.item_id),
509 )
510 )
511 elif item.media_type == MediaType.ALBUM:
512 result = bool(
513 await self.client.add_user_album(
514 album_id=int(item.item_id),
515 )
516 )
517 elif item.media_type == MediaType.TRACK:
518 result = bool(
519 await self.client.add_user_track(
520 track_id=int(item.item_id),
521 )
522 )
523 elif item.media_type == MediaType.PLAYLIST:
524 result = bool(
525 await self.client.add_user_playlist(
526 playlist_id=int(item.item_id),
527 )
528 )
529 else:
530 raise NotImplementedError
531 return result
532
533 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
534 """Remove an item from the provider's library/favorites."""
535 result = False
536 if media_type == MediaType.ARTIST:
537 result = bool(
538 await self.client.remove_user_artist(
539 artist_id=int(prov_item_id),
540 )
541 )
542 elif media_type == MediaType.ALBUM:
543 result = bool(
544 await self.client.remove_user_album(
545 album_id=int(prov_item_id),
546 )
547 )
548 elif media_type == MediaType.TRACK:
549 result = bool(
550 await self.client.remove_user_track(
551 track_id=int(prov_item_id),
552 )
553 )
554 elif media_type == MediaType.PLAYLIST:
555 result = bool(
556 await self.client.remove_user_playlist(
557 playlist_id=int(prov_item_id),
558 )
559 )
560 else:
561 raise NotImplementedError
562 return result
563
564 @use_cache(3600)
565 async def recommendations(self) -> list[RecommendationFolder]:
566 """Get Deezer's recommendations including Flow and personalized content."""
567 result: list[RecommendationFolder] = []
568
569 # Made for you - combines Flow, Recommended tracks, and recommended playlists
570 # Get covers from first track's album for each virtual playlist
571 flow_cover = None
572 flow_tracks = await self._get_flow_tracks()
573 if flow_tracks and hasattr(flow_tracks[0], "album"):
574 flow_cover = getattr(flow_tracks[0].album, "cover_medium", None)
575
576 recommended_cover = None
577 recommended_tracks = await self._get_recommended_tracks()
578 if recommended_tracks and hasattr(recommended_tracks[0], "album"):
579 recommended_cover = getattr(recommended_tracks[0].album, "cover_medium", None)
580
581 chart_tracks = await self._get_chart_tracks()
582 chart_cover = None
583 if chart_tracks and hasattr(chart_tracks[0], "album"):
584 chart_cover = getattr(chart_tracks[0].album, "cover_medium", None)
585
586 made_for_you_items: list[Playlist] = [
587 # Flow - personalized endless radio
588 self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover),
589 # Recommended tracks
590 self._create_virtual_playlist(
591 RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=recommended_cover
592 ),
593 # Top Charts - global top tracks
594 self._create_virtual_playlist(
595 TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover
596 ),
597 ]
598 # Add recommended playlists from Deezer
599 for playlist in await self.client.get_user_recommended_playlists():
600 made_for_you_items.append(self.parse_playlist(playlist=playlist))
601
602 result.append(
603 RecommendationFolder(
604 item_id="made_for_you",
605 provider=self.instance_id,
606 name="Made for you",
607 items=UniqueList(made_for_you_items),
608 )
609 )
610
611 # Recommended albums
612 try:
613 recommended_albums = list(await self.client.get_user_recommended_albums())
614 if recommended_albums:
615 result.append(
616 RecommendationFolder(
617 item_id="recommended_albums",
618 provider=self.instance_id,
619 name="Recommended albums",
620 items=UniqueList(
621 [self.parse_album(album=album) for album in recommended_albums]
622 ),
623 )
624 )
625 except deezer_exceptions.DeezerErrorResponse as err:
626 self.logger.debug("Failed to get recommended albums: %s", err)
627
628 # Recommended artists
629 try:
630 recommended_artists = list(await self.client.get_user_recommended_artists())
631 if recommended_artists:
632 result.append(
633 RecommendationFolder(
634 item_id="recommended_artists",
635 provider=self.instance_id,
636 name="Recommended artists",
637 items=UniqueList(
638 [self.parse_artist(artist=artist) for artist in recommended_artists]
639 ),
640 )
641 )
642 except deezer_exceptions.DeezerErrorResponse as err:
643 self.logger.debug("Failed to get recommended artists: %s", err)
644
645 # Deezer Mood and Genre Flows - personalized playlists (dynamically discovered)
646 all_flows = await self._get_available_flows()
647 mood_flows = [(c, n, img) for c, n, img in all_flows if not c.startswith("genre-")]
648 genre_flows = [(c, n, img) for c, n, img in all_flows if c.startswith("genre-")]
649 for folder_id, folder_name, flows in [
650 ("mood_flows", "Deezer Mood Flows", mood_flows),
651 ("genre_flows", "Deezer Genre Flows", genre_flows),
652 ]:
653 flow_playlists = [
654 self._create_virtual_playlist(
655 item_id=f"{MOOD_FLOW_PREFIX}{config_id}",
656 name=display_name,
657 image_url=cover_url,
658 )
659 for config_id, display_name, cover_url in flows
660 ]
661 if flow_playlists:
662 result.append(
663 RecommendationFolder(
664 item_id=folder_id,
665 provider=self.instance_id,
666 name=folder_name,
667 items=UniqueList(flow_playlists),
668 )
669 )
670
671 # Deezer Radios - curated selection (as virtual playlists in one folder)
672 radio_playlists: list[Playlist] = []
673 for radio_id in CURATED_RADIO_IDS:
674 try:
675 radio = await self.client.get_radio(radio_id)
676 radio_playlists.append(
677 self._create_virtual_playlist(
678 item_id=f"{RADIO_PLAYLIST_PREFIX}{radio_id}",
679 name=f"Radio: {radio.title}",
680 image_url=getattr(radio, "picture_medium", None),
681 )
682 )
683 except Exception as err:
684 self.logger.debug("Failed to load radio %s: %s", radio_id, err)
685
686 if radio_playlists:
687 result.append(
688 RecommendationFolder(
689 item_id="radios",
690 provider=self.instance_id,
691 name="Deezer Radios",
692 items=UniqueList(radio_playlists),
693 )
694 )
695
696 return result
697
698 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
699 """Add track(s) to playlist."""
700 playlist = await self.client.get_playlist(int(prov_playlist_id))
701 await playlist.add_tracks(tracks=[int(i) for i in prov_track_ids])
702
703 async def remove_playlist_tracks(
704 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
705 ) -> None:
706 """Remove track(s) from playlist."""
707 playlist_track_ids = []
708 for track in await self.get_playlist_tracks(prov_playlist_id, 0):
709 if track.position in positions_to_remove:
710 playlist_track_ids.append(int(track.item_id))
711 if len(playlist_track_ids) == len(positions_to_remove):
712 break
713 playlist = await self.client.get_playlist(int(prov_playlist_id))
714 await playlist.delete_tracks(playlist_track_ids)
715
716 async def create_playlist(self, name: str) -> Playlist:
717 """Create a new playlist on provider with given name."""
718 playlist_id = await self.client.create_playlist(playlist_name=name)
719 playlist = await self.client.get_playlist(playlist_id)
720 return self.parse_playlist(playlist=playlist)
721
722 @use_cache(3600 * 24) # Cache for 24 hours
723 async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
724 """Retrieve a dynamic list of tracks based on the provided item."""
725 endpoint = "song.getSearchTrackMix"
726 tracks = (await self.gw_client._gw_api_call(endpoint, args={"SNG_ID": prov_track_id}))[
727 "results"
728 ]["data"][:limit]
729 return [await self.get_track(track["SNG_ID"]) for track in tracks]
730
731 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
732 """Return the content details for the given track when it will be streamed."""
733 url_details, song_data = await self.gw_client.get_deezer_track_urls(item_id)
734 url = url_details["sources"][0]["url"]
735 return StreamDetails(
736 item_id=item_id,
737 provider=self.instance_id,
738 audio_format=AudioFormat(
739 content_type=ContentType.try_parse(url_details["format"].split("_")[0])
740 ),
741 stream_type=StreamType.CUSTOM,
742 duration=int(song_data["DURATION"]),
743 # Due to track replacement, the track ID of the stream may be different from the ID
744 # that is stored. We need the proper track ID to decrypt the stream, so store it
745 # separately so we can use it later on.
746 data={"url": url, "format": url_details["format"], "track_id": song_data["SNG_ID"]},
747 size=int(song_data[f"FILESIZE_{url_details['format']}"]),
748 can_seek=True,
749 allow_seek=True,
750 )
751
752 async def get_audio_stream(
753 self, streamdetails: StreamDetails, seek_position: int = 0
754 ) -> AsyncGenerator[bytes, None]:
755 """Return the audio stream for the provider item."""
756 blowfish_key = self.get_blowfish_key(streamdetails.data["track_id"])
757 chunk_index = 0
758 timeout = ClientTimeout(total=None, connect=30, sock_read=600)
759 headers: dict[str, str] = {}
760 # if seek_position and streamdetails.size:
761 # chunk_count = ceil(streamdetails.size / 2048)
762 # chunk_index = int(chunk_count / streamdetails.duration) * seek_position
763 # skip_bytes = chunk_index * 2048
764 # headers["Range"] = f"bytes={skip_bytes}-"
765
766 # NOTE: Seek with using the Range header is not working properly
767 # causing malformed audio so this is a temporary patch
768 # by just skipping chunks
769 if seek_position and streamdetails.size and streamdetails.duration:
770 chunk_count = ceil(streamdetails.size / 2048)
771 skip_chunks = int(chunk_count / streamdetails.duration) * seek_position
772 else:
773 skip_chunks = 0
774
775 buffer = bytearray()
776 streamdetails.data["start_ts"] = utc_timestamp()
777 streamdetails.data["stream_id"] = uuid.uuid1()
778 self.mass.create_task(self.gw_client.log_listen(next_track=streamdetails.item_id))
779 async with self.mass.http_session.get(
780 streamdetails.data["url"], headers=headers, timeout=timeout
781 ) as resp:
782 async for chunk in resp.content.iter_chunked(2048):
783 buffer += chunk
784 if len(buffer) >= 2048:
785 if chunk_index >= skip_chunks or chunk_index == 0:
786 if chunk_index % 3 > 0:
787 yield bytes(buffer[:2048])
788 else:
789 yield self.decrypt_chunk(bytes(buffer[:2048]), blowfish_key)
790
791 chunk_index += 1
792 del buffer[:2048]
793 yield bytes(buffer)
794
795 async def on_streamed(
796 self,
797 streamdetails: StreamDetails,
798 ) -> None:
799 """Handle callback when an item completed streaming."""
800 await self.gw_client.log_listen(last_track=streamdetails)
801
802 ### PARSING METADATA FUNCTIONS ###
803
804 def parse_metadata_track(self, track: deezer.Track) -> MediaItemMetadata:
805 """Parse the track metadata."""
806 metadata = MediaItemMetadata()
807 if hasattr(track, "preview"):
808 metadata.preview = track.preview
809 if hasattr(track, "explicit_lyrics"):
810 metadata.explicit = track.explicit_lyrics
811 if hasattr(track, "rank"):
812 metadata.popularity = track.rank
813 if hasattr(track, "album") and hasattr(track.album, "cover_big"):
814 metadata.add_image(
815 MediaItemImage(
816 type=ImageType.THUMB,
817 path=track.album.cover_big,
818 provider=self.instance_id,
819 remotely_accessible=True,
820 )
821 )
822 return metadata
823
824 def parse_metadata_album(self, album: deezer.Album) -> MediaItemMetadata:
825 """Parse the album metadata."""
826 return MediaItemMetadata(
827 explicit=album.explicit_lyrics,
828 images=UniqueList(
829 [
830 MediaItemImage(
831 type=ImageType.THUMB,
832 path=album.cover_big,
833 provider=self.instance_id,
834 remotely_accessible=True,
835 )
836 ]
837 ),
838 )
839
840 def parse_metadata_artist(self, artist: deezer.Artist) -> MediaItemMetadata:
841 """Parse the artist metadata."""
842 return MediaItemMetadata(
843 images=UniqueList(
844 [
845 MediaItemImage(
846 type=ImageType.THUMB,
847 path=artist.picture_big,
848 provider=self.instance_id,
849 remotely_accessible=True,
850 )
851 ]
852 ),
853 )
854
855 ### PARSING FUNCTIONS ###
856 def parse_artist(self, artist: deezer.Artist) -> Artist:
857 """Parse the deezer-python artist to a Music Assistant artist."""
858 return Artist(
859 item_id=str(artist.id),
860 provider=self.instance_id,
861 name=artist.name,
862 media_type=MediaType.ARTIST,
863 provider_mappings={
864 ProviderMapping(
865 item_id=str(artist.id),
866 provider_domain=self.domain,
867 provider_instance=self.instance_id,
868 url=getattr(artist, "link", None), # Sometimes the API doesn't return a link
869 )
870 },
871 metadata=self.parse_metadata_artist(artist=artist),
872 )
873
874 def parse_album(self, album: deezer.Album) -> Album:
875 """Parse the deezer-python album to a Music Assistant album."""
876 name, version = parse_title_and_version(album.title)
877 return Album(
878 album_type=self.get_album_type(album),
879 item_id=str(album.id),
880 provider=self.instance_id,
881 name=name,
882 version=version,
883 year=album.release_date.year if getattr(album, "release_date", None) else None,
884 artists=UniqueList(
885 [
886 ItemMapping(
887 media_type=MediaType.ARTIST,
888 item_id=str(album.artist.id),
889 provider=self.instance_id,
890 name=album.artist.name,
891 )
892 ]
893 ),
894 media_type=MediaType.ALBUM,
895 provider_mappings={
896 ProviderMapping(
897 item_id=str(album.id),
898 provider_domain=self.domain,
899 provider_instance=self.instance_id,
900 url=getattr(album, "link", None),
901 )
902 },
903 metadata=self.parse_metadata_album(album=album),
904 )
905
906 def parse_playlist(self, playlist: deezer.Playlist) -> Playlist:
907 """Parse the deezer-python playlist to a Music Assistant playlist."""
908 creator = self.get_playlist_creator(playlist)
909 is_editable = creator.id == self.user.id
910 return Playlist(
911 item_id=str(playlist.id),
912 provider=self.instance_id,
913 name=playlist.title,
914 media_type=MediaType.PLAYLIST,
915 provider_mappings={
916 ProviderMapping(
917 item_id=str(playlist.id),
918 provider_domain=self.domain,
919 provider_instance=self.instance_id,
920 url=getattr(playlist, "link", None),
921 is_unique=is_editable, # user-owned playlists are unique
922 )
923 },
924 metadata=MediaItemMetadata(
925 images=UniqueList(
926 [
927 MediaItemImage(
928 type=ImageType.THUMB,
929 path=playlist.picture_big,
930 provider=self.instance_id,
931 remotely_accessible=True,
932 )
933 ]
934 ),
935 ),
936 is_editable=is_editable,
937 owner=creator.name,
938 )
939
940 def get_playlist_creator(self, playlist: deezer.Playlist) -> deezer.User:
941 """On playlists, the creator is called creator, elsewhere it's called user."""
942 if hasattr(playlist, "creator"):
943 return playlist.creator
944 return playlist.user
945
946 def _create_virtual_playlist(
947 self,
948 item_id: str,
949 name: str,
950 image_url: str | None = None,
951 ) -> Playlist:
952 """Create a virtual playlist for Flow, Recommended tracks, or Radios.
953
954 :param item_id: The unique identifier (e.g., "flow", "radio_37151").
955 :param name: Display name for the playlist.
956 :param image_url: Optional image URL.
957 """
958 images: UniqueList[MediaItemImage] = UniqueList()
959 if image_url:
960 images.append(
961 MediaItemImage(
962 type=ImageType.THUMB,
963 path=image_url,
964 provider=self.instance_id,
965 remotely_accessible=True,
966 )
967 )
968 return Playlist(
969 item_id=item_id,
970 provider=self.instance_id,
971 name=name,
972 media_type=MediaType.PLAYLIST,
973 provider_mappings={
974 ProviderMapping(
975 item_id=item_id,
976 provider_domain=self.domain,
977 provider_instance=self.instance_id,
978 )
979 },
980 metadata=MediaItemMetadata(images=images) if images else MediaItemMetadata(),
981 is_editable=False,
982 owner="Deezer",
983 )
984
985 def parse_track(self, track: deezer.Track, user_country: str, position: int = 0) -> Track:
986 """Parse the deezer-python track to a Music Assistant track."""
987 if hasattr(track, "artist"):
988 artist = ItemMapping(
989 media_type=MediaType.ARTIST,
990 item_id=str(getattr(track.artist, "id", f"deezer-{track.artist.name}")),
991 provider=self.instance_id,
992 name=track.artist.name,
993 )
994 else:
995 artist = None
996 if hasattr(track, "album"):
997 album = ItemMapping(
998 media_type=MediaType.ALBUM,
999 item_id=str(track.album.id),
1000 provider=self.instance_id,
1001 name=track.album.title,
1002 )
1003 else:
1004 album = None
1005
1006 name, version = parse_title_and_version(track.title)
1007 item = Track(
1008 item_id=str(track.id),
1009 provider=self.instance_id,
1010 name=name,
1011 version=version,
1012 sort_name=self.get_short_title(track),
1013 duration=track.duration,
1014 artists=UniqueList([artist]) if artist else UniqueList(),
1015 album=album,
1016 provider_mappings={
1017 ProviderMapping(
1018 item_id=str(track.id),
1019 provider_domain=self.domain,
1020 provider_instance=self.instance_id,
1021 available=self.track_available(track=track, user_country=user_country),
1022 url=getattr(track, "link", None),
1023 )
1024 },
1025 metadata=self.parse_metadata_track(track=track),
1026 track_number=getattr(track, "track_position", position),
1027 position=position,
1028 disc_number=getattr(track, "disk_number", 0),
1029 )
1030 if isrc := getattr(track, "isrc", None):
1031 item.external_ids.add((ExternalID.ISRC, isrc))
1032 return item
1033
1034 def get_short_title(self, track: deezer.Track) -> str:
1035 """Short names only returned, if available."""
1036 if hasattr(track, "title_short"):
1037 return str(track.title_short)
1038 return str(track.title)
1039
1040 def get_album_type(self, album: deezer.Album) -> AlbumType:
1041 """Read and convert the Deezer album type."""
1042 # Get provider's basic type first
1043 provider_type = AlbumType.UNKNOWN
1044 if hasattr(album, "record_type"):
1045 match album.record_type:
1046 case "album":
1047 provider_type = AlbumType.ALBUM
1048 case "single":
1049 provider_type = AlbumType.SINGLE
1050 case "ep":
1051 provider_type = AlbumType.EP
1052 case "compile":
1053 provider_type = AlbumType.COMPILATION
1054
1055 # Try inference - override if it finds something more specific
1056 inferred_type = infer_album_type(album.title, "")
1057 if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
1058 return inferred_type
1059
1060 # Otherwise use provider type
1061 return provider_type
1062
1063 ### SEARCH AND PARSE FUNCTIONS ###
1064 async def search_and_parse_tracks(
1065 self, query: str, user_country: str, limit: int = 20
1066 ) -> list[Track]:
1067 """Search for tracks and parse them."""
1068 deezer_tracks = await self.client.search(query=query, limit=limit)
1069 tracks = []
1070 for index, track in enumerate(deezer_tracks):
1071 tracks.append(self.parse_track(track, user_country))
1072 if index == limit:
1073 return tracks
1074 return tracks
1075
1076 async def search_and_parse_artists(self, query: str, limit: int = 20) -> list[Artist]:
1077 """Search for artists and parse them."""
1078 deezer_artist = await self.client.search_artists(query=query, limit=limit)
1079 artists = []
1080 for index, artist in enumerate(deezer_artist):
1081 artists.append(self.parse_artist(artist))
1082 if index == limit:
1083 return artists
1084 return artists
1085
1086 async def search_and_parse_albums(self, query: str, limit: int = 20) -> list[Album]:
1087 """Search for album and parse them."""
1088 deezer_albums = await self.client.search_albums(query=query, limit=limit)
1089 albums = []
1090 for index, album in enumerate(deezer_albums):
1091 albums.append(self.parse_album(album))
1092 if index == limit:
1093 return albums
1094 return albums
1095
1096 async def search_and_parse_playlists(self, query: str, limit: int = 20) -> list[Playlist]:
1097 """Search for playlists and parse them."""
1098 deezer_playlists = await self.client.search_playlists(query=query, limit=limit)
1099 playlists = []
1100 for index, playlist in enumerate(deezer_playlists):
1101 playlists.append(self.parse_playlist(playlist))
1102 if index == limit:
1103 return playlists
1104 return playlists
1105
1106 ### OTHER FUNCTIONS ###
1107
1108 async def get_track_content_type(
1109 self, gw_client: GWClient, track_id: str
1110 ) -> Literal[ContentType.FLAC, ContentType.MP3]:
1111 """Get a tracks contentType."""
1112 song_data = await gw_client.get_song_data(track_id)
1113 if song_data["results"]["FILESIZE_FLAC"]:
1114 return ContentType.FLAC
1115
1116 if song_data["results"]["FILESIZE_MP3_320"] or song_data["results"]["FILESIZE_MP3_128"]:
1117 return ContentType.MP3
1118
1119 msg = "Unsupported contenttype"
1120 raise NotImplementedError(msg)
1121
1122 def track_available(self, track: deezer.Track, user_country: str) -> bool:
1123 """Check if a given track is available in the users country."""
1124 if hasattr(track, "available_countries"):
1125 return user_country in track.available_countries
1126 return True
1127
1128 def _md5(self, data: str, data_type: str = "ascii") -> str:
1129 md5sum = hashlib.md5()
1130 md5sum.update(data.encode(data_type))
1131 return md5sum.hexdigest()
1132
1133 def get_blowfish_key(self, track_id: str) -> str:
1134 """Get blowfish key to decrypt a chunk of a track."""
1135 secret = app_var(5)
1136 id_md5 = self._md5(track_id)
1137 return "".join(
1138 chr(ord(id_md5[i]) ^ ord(id_md5[i + 16]) ^ ord(secret[i])) for i in range(16)
1139 )
1140
1141 def decrypt_chunk(self, chunk: bytes, blowfish_key: str) -> bytes:
1142 """Decrypt a given chunk using the blow fish key."""
1143 cipher = Blowfish.new(
1144 blowfish_key.encode("ascii"),
1145 Blowfish.MODE_CBC,
1146 b"\x00\x01\x02\x03\x04\x05\x06\x07",
1147 )
1148 return cipher.decrypt(chunk) # type: ignore[no-any-return,unused-ignore]
1149