/
/
/
1"""Deezer music provider support for MusicAssistant."""
2
3import hashlib
4import uuid
5from asyncio import TaskGroup
6from collections.abc import AsyncGenerator
7from dataclasses import dataclass
8from math import ceil
9from typing import Any, Literal, cast
10
11import deezer
12from aiohttp import ClientSession, ClientTimeout
13from Crypto.Cipher import Blowfish
14from deezer import exceptions as deezer_exceptions
15from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
16from music_assistant_models.enums import (
17 AlbumType,
18 ConfigEntryType,
19 ContentType,
20 ExternalID,
21 ImageType,
22 MediaType,
23 ProviderFeature,
24 StreamType,
25)
26from music_assistant_models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
27from music_assistant_models.media_items import (
28 Album,
29 Artist,
30 AudioFormat,
31 ItemMapping,
32 MediaItemImage,
33 MediaItemMetadata,
34 MediaItemType,
35 Playlist,
36 ProviderMapping,
37 RecommendationFolder,
38 SearchResults,
39 Track,
40 UniqueList,
41)
42from music_assistant_models.provider import ProviderManifest
43from music_assistant_models.streamdetails import StreamDetails
44
45from music_assistant import MusicAssistant
46from music_assistant.controllers.cache import use_cache
47from music_assistant.helpers.app_vars import app_var # type: ignore[attr-defined]
48from music_assistant.helpers.auth import AuthenticationHelper
49from music_assistant.helpers.datetime import utc_timestamp
50from music_assistant.helpers.util import infer_album_type, parse_title_and_version
51from music_assistant.models import ProviderInstanceType
52from music_assistant.models.music_provider import MusicProvider
53
54from .gw_client import GWClient
55
56SUPPORTED_FEATURES = {
57 ProviderFeature.LIBRARY_ARTISTS,
58 ProviderFeature.LIBRARY_ALBUMS,
59 ProviderFeature.LIBRARY_TRACKS,
60 ProviderFeature.LIBRARY_PLAYLISTS,
61 ProviderFeature.LIBRARY_ALBUMS_EDIT,
62 ProviderFeature.LIBRARY_TRACKS_EDIT,
63 ProviderFeature.LIBRARY_ARTISTS_EDIT,
64 ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
65 ProviderFeature.ALBUM_METADATA,
66 ProviderFeature.TRACK_METADATA,
67 ProviderFeature.ARTIST_METADATA,
68 ProviderFeature.ARTIST_ALBUMS,
69 ProviderFeature.ARTIST_TOPTRACKS,
70 ProviderFeature.BROWSE,
71 ProviderFeature.SEARCH,
72 ProviderFeature.PLAYLIST_TRACKS_EDIT,
73 ProviderFeature.PLAYLIST_CREATE,
74 ProviderFeature.RECOMMENDATIONS,
75 ProviderFeature.SIMILAR_TRACKS,
76}
77
78
79@dataclass
80class DeezerCredentials:
81 """Class for storing credentials."""
82
83 app_id: int
84 app_secret: str
85 access_token: str
86
87
88CONF_ACCESS_TOKEN = "access_token"
89CONF_ARL_TOKEN = "arl_token"
90CONF_ACTION_AUTH = "auth"
91DEEZER_AUTH_URL = "https://connect.deezer.com/oauth/auth.php"
92RELAY_URL = "https://deezer.oauth.jonathanbangert.com/"
93DEEZER_PERMS = "basic_access,email,offline_access,manage_library,\
94manage_community,delete_library,listening_history"
95DEEZER_APP_ID = app_var(6)
96DEEZER_APP_SECRET = app_var(7)
97
98# Virtual playlist IDs for dynamic Deezer content
99FLOW_PLAYLIST_ID = "flow"
100RECOMMENDED_TRACKS_PLAYLIST_ID = "recommended_tracks"
101TOP_CHARTS_PLAYLIST_ID = "top_charts"
102RADIO_PLAYLIST_PREFIX = "radio_"
103
104# Curated Deezer radio station IDs
105CURATED_RADIO_IDS = [
106 37151, # Hits
107 38305, # The '80s
108 38295, # The '70s
109 31061, # Pop
110 37765, # Rock classics
111 30901, # Metal
112 30991, # Hip Hop
113 30771, # Indie
114 30621, # Electronic
115 31031, # Jazz
116 30661, # Classical
117 36791, # Latin Music
118 38225, # Focus
119 39041, # Happy Hour
120]
121
122
123async def get_access_token(
124 app_id: str, app_secret: str, code: str, http_session: ClientSession
125) -> str:
126 """Update the access_token."""
127 response = await http_session.post(
128 "https://connect.deezer.com/oauth/access_token.php",
129 params={"code": code, "app_id": app_id, "secret": app_secret},
130 ssl=False,
131 )
132 if response.status != 200:
133 msg = f"HTTP Error {response.status}: {response.reason}"
134 raise ConnectionError(msg)
135 response_text = await response.text()
136 try:
137 return response_text.split("=")[1].split("&")[0]
138 except Exception as error:
139 msg = "Invalid auth code"
140 raise LoginFailed(msg) from error
141
142
143async def setup(
144 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
145) -> ProviderInstanceType:
146 """Initialize provider(instance) with given configuration."""
147 return DeezerProvider(mass, manifest, config, SUPPORTED_FEATURES)
148
149
150async def get_config_entries(
151 mass: MusicAssistant,
152 instance_id: str | None = None, # noqa: ARG001
153 action: str | None = None,
154 values: dict[str, ConfigValueType] | None = None,
155) -> tuple[ConfigEntry, ...]:
156 """Return Config entries to setup this provider."""
157 # Action is to launch oauth flow
158 if action == CONF_ACTION_AUTH:
159 # Use the AuthenticationHelper to authenticate
160 if not values or "session_id" not in values:
161 raise InvalidDataError("session_id not found in values")
162 async with AuthenticationHelper(mass, cast("str", values["session_id"])) as auth_helper:
163 url = f"{DEEZER_AUTH_URL}?app_id={DEEZER_APP_ID}&redirect_uri={RELAY_URL}\
164&perms={DEEZER_PERMS}&state={auth_helper.callback_url}"
165 code = (await auth_helper.authenticate(url))["code"]
166 values[CONF_ACCESS_TOKEN] = await get_access_token(
167 DEEZER_APP_ID, DEEZER_APP_SECRET, code, mass.http_session
168 )
169
170 return (
171 ConfigEntry(
172 key=CONF_ACCESS_TOKEN,
173 type=ConfigEntryType.SECURE_STRING,
174 label="Access token",
175 required=True,
176 action=CONF_ACTION_AUTH,
177 description="You need to authenticate on Deezer.",
178 action_label="Authenticate with Deezer",
179 value=values.get(CONF_ACCESS_TOKEN) if values else None,
180 ),
181 ConfigEntry(
182 key=CONF_ARL_TOKEN,
183 type=ConfigEntryType.SECURE_STRING,
184 label="Arl token",
185 required=True,
186 description="See https://www.dumpmedia.com/deezplus/deezer-arl.html",
187 value=values.get(CONF_ARL_TOKEN) if values else None,
188 ),
189 )
190
191
192class DeezerProvider(MusicProvider):
193 """Deezer provider support."""
194
195 client: deezer.Client
196 gw_client: GWClient
197 credentials: DeezerCredentials
198 user: deezer.User
199
200 async def handle_async_init(self) -> None:
201 """Handle async init of the Deezer provider."""
202 self.credentials = DeezerCredentials(
203 app_id=DEEZER_APP_ID,
204 app_secret=DEEZER_APP_SECRET,
205 access_token=cast("str", self.config.get_value(CONF_ACCESS_TOKEN)),
206 )
207
208 self.client = deezer.Client(
209 app_id=self.credentials.app_id,
210 app_secret=self.credentials.app_secret,
211 access_token=self.credentials.access_token,
212 )
213
214 self.user = await self.client.get_user()
215
216 self.gw_client = GWClient(
217 self.mass.http_session,
218 str(self.config.get_value(CONF_ACCESS_TOKEN)),
219 str(self.config.get_value(CONF_ARL_TOKEN)),
220 )
221 await self.gw_client.setup()
222
223 # Cached wrappers for dynamic Deezer content (ensures consistent data across calls)
224 @use_cache(3600) # Cache for 1 hour
225 async def _get_flow_tracks(self) -> list[deezer.Track]:
226 """Get cached Flow tracks."""
227 return list(await self.client.get_user_flow())
228
229 @use_cache(3600) # Cache for 1 hour
230 async def _get_recommended_tracks(self) -> list[deezer.Track]:
231 """Get cached recommended tracks."""
232 return list(await self.client.get_user_recommended_tracks())
233
234 @use_cache(3600) # Cache for 1 hour
235 async def _get_chart_tracks(self) -> list[deezer.Track]:
236 """Get cached chart tracks."""
237 chart = await self.client.get_chart()
238 return list(chart.tracks[:100]) if chart.tracks else []
239
240 @use_cache(3600 * 24 * 7) # Cache for 7 days
241 async def search(
242 self, search_query: str, media_types: list[MediaType], limit: int = 5
243 ) -> SearchResults:
244 """Perform search on music provider.
245
246 :param search_query: Search query.
247 :param media_types: A list of media_types to include. All types if None.
248 """
249 # Create a task for each media_type
250 tasks: dict[MediaType, Any] = {}
251
252 async with TaskGroup() as taskgroup:
253 for media_type in media_types:
254 if media_type == MediaType.TRACK:
255 tasks[MediaType.TRACK] = taskgroup.create_task(
256 self.search_and_parse_tracks(
257 query=search_query,
258 limit=limit,
259 user_country=self.gw_client.user_country,
260 )
261 )
262 elif media_type == MediaType.ARTIST:
263 tasks[MediaType.ARTIST] = taskgroup.create_task(
264 self.search_and_parse_artists(query=search_query, limit=limit)
265 )
266 elif media_type == MediaType.ALBUM:
267 tasks[MediaType.ALBUM] = taskgroup.create_task(
268 self.search_and_parse_albums(query=search_query, limit=limit)
269 )
270 elif media_type == MediaType.PLAYLIST:
271 tasks[MediaType.PLAYLIST] = taskgroup.create_task(
272 self.search_and_parse_playlists(query=search_query, limit=limit)
273 )
274
275 results = SearchResults()
276
277 for media_type, task in tasks.items():
278 if media_type == MediaType.ARTIST:
279 results.artists = task.result()
280 elif media_type == MediaType.ALBUM:
281 results.albums = task.result()
282 elif media_type == MediaType.TRACK:
283 results.tracks = task.result()
284 elif media_type == MediaType.PLAYLIST:
285 results.playlists = task.result()
286
287 return results
288
289 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
290 """Retrieve all library artists from Deezer."""
291 async for artist in await self.client.get_user_artists():
292 yield self.parse_artist(artist=artist)
293
294 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
295 """Retrieve all library albums from Deezer."""
296 async for album in await self.client.get_user_albums():
297 yield self.parse_album(album=album)
298
299 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
300 """Retrieve all library playlists from Deezer."""
301 async for playlist in await self.user.get_playlists():
302 yield self.parse_playlist(playlist=playlist)
303
304 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
305 """Retrieve all library tracks from Deezer."""
306 async for track in await self.client.get_user_tracks():
307 yield self.parse_track(track=track, user_country=self.gw_client.user_country)
308
309 @use_cache(3600 * 24 * 30) # Cache for 30 days
310 async def get_artist(self, prov_artist_id: str) -> Artist:
311 """Get full artist details by id."""
312 try:
313 return self.parse_artist(
314 artist=await self.client.get_artist(artist_id=int(prov_artist_id))
315 )
316 except deezer_exceptions.DeezerErrorResponse as error:
317 self.logger.warning("Failed getting artist: %s", error)
318 raise MediaNotFoundError(f"Artist {prov_artist_id} not found on Deezer") from error
319
320 @use_cache(3600 * 24 * 30) # Cache for 30 days
321 async def get_album(self, prov_album_id: str) -> Album:
322 """Get full album details by id."""
323 try:
324 return self.parse_album(album=await self.client.get_album(album_id=int(prov_album_id)))
325 except deezer_exceptions.DeezerErrorResponse as error:
326 self.logger.warning("Failed getting album: %s", error)
327 raise MediaNotFoundError(f"Album {prov_album_id} not found on Deezer") from error
328
329 @use_cache(3600 * 24 * 30) # Cache for 30 days
330 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
331 """Get full playlist details by id."""
332 # Handle virtual playlists (Flow, Recommended tracks, Top Charts, Radios)
333 if prov_playlist_id == FLOW_PLAYLIST_ID:
334 flow_tracks = await self._get_flow_tracks()
335 flow_cover = None
336 if flow_tracks and hasattr(flow_tracks[0], "album"):
337 flow_cover = getattr(flow_tracks[0].album, "cover_medium", None)
338 return self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover)
339 if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID:
340 rec_tracks = await self._get_recommended_tracks()
341 rec_cover = None
342 if rec_tracks and hasattr(rec_tracks[0], "album"):
343 rec_cover = getattr(rec_tracks[0].album, "cover_medium", None)
344 return self._create_virtual_playlist(
345 RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=rec_cover
346 )
347 if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID:
348 chart_tracks = await self._get_chart_tracks()
349 chart_cover = None
350 if chart_tracks and hasattr(chart_tracks[0], "album"):
351 chart_cover = getattr(chart_tracks[0].album, "cover_medium", None)
352 return self._create_virtual_playlist(
353 TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover
354 )
355 if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX):
356 radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, ""))
357 try:
358 radio = await self.client.get_radio(radio_id)
359 return self._create_virtual_playlist(
360 prov_playlist_id,
361 f"Radio: {radio.title}",
362 image_url=getattr(radio, "picture_medium", None),
363 )
364 except Exception as err:
365 self.logger.warning("Failed getting radio %s: %s", radio_id, err)
366 raise MediaNotFoundError(f"Radio {prov_playlist_id} not found on Deezer") from err
367 try:
368 return self.parse_playlist(
369 playlist=await self.client.get_playlist(playlist_id=int(prov_playlist_id)),
370 )
371 except deezer_exceptions.DeezerErrorResponse as error:
372 self.logger.warning("Failed getting playlist: %s", error)
373 raise MediaNotFoundError(f"Album {prov_playlist_id} not found on Deezer") from error
374
375 @use_cache(3600 * 24 * 30) # Cache for 30 days
376 async def get_track(self, prov_track_id: str) -> Track:
377 """Get full track details by id."""
378 try:
379 return self.parse_track(
380 track=await self.client.get_track(track_id=int(prov_track_id)),
381 user_country=self.gw_client.user_country,
382 )
383 except deezer_exceptions.DeezerErrorResponse as error:
384 self.logger.warning("Failed getting track: %s", error)
385 raise MediaNotFoundError(f"Album {prov_track_id} not found on Deezer") from error
386
387 @use_cache(3600 * 24 * 30) # Cache for 30 days
388 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
389 """Get all tracks in an album."""
390 album = await self.client.get_album(album_id=int(prov_album_id))
391 return [
392 self.parse_track(
393 track=deezer_track,
394 user_country=self.gw_client.user_country,
395 # TODO: doesn't Deezer have disc and track number in the api ?
396 position=0,
397 )
398 for deezer_track in await album.get_tracks()
399 ]
400
401 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
402 """Get playlist tracks."""
403 if page > 0:
404 # paging not supported, we always return the whole list at once
405 return []
406
407 # Virtual playlists use their own cached wrappers (not double-cached)
408 if prov_playlist_id == FLOW_PLAYLIST_ID:
409 return self._parse_tracks_list(await self._get_flow_tracks())
410
411 if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID:
412 return self._parse_tracks_list(await self._get_recommended_tracks())
413
414 if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID:
415 return self._parse_tracks_list(await self._get_chart_tracks())
416
417 if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX):
418 radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, ""))
419 try:
420 radio = await self.client.get_radio(radio_id)
421 return self._parse_tracks_list(list(await radio.get_tracks()))
422 except Exception as err:
423 self.logger.debug("Failed to get radio tracks %s: %s", radio_id, err)
424 return []
425
426 # Regular Deezer playlists (cached separately)
427 return await self._get_regular_playlist_tracks(prov_playlist_id)
428
429 @use_cache(3600 * 3) # Cache for 3 hours
430 async def _get_regular_playlist_tracks(self, prov_playlist_id: str) -> list[Track]:
431 """Get tracks for regular Deezer playlists (cached)."""
432 playlist = await self.client.get_playlist(int(prov_playlist_id))
433 playlist_tracks = await playlist.get_tracks()
434 return self._parse_tracks_list(list(playlist_tracks))
435
436 def _parse_tracks_list(self, tracks: list[deezer.Track]) -> list[Track]:
437 """Parse a list of Deezer tracks to Music Assistant tracks."""
438 return [
439 self.parse_track(
440 track=track,
441 user_country=self.gw_client.user_country,
442 position=index,
443 )
444 for index, track in enumerate(tracks, 1)
445 ]
446
447 @use_cache(3600 * 24 * 7) # Cache for 7 days
448 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
449 """Get albums by an artist."""
450 artist = await self.client.get_artist(artist_id=int(prov_artist_id))
451 return [self.parse_album(album=album) async for album in await artist.get_albums()]
452
453 @use_cache(3600 * 24 * 7) # Cache for 7 days
454 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
455 """Get top 50 tracks of an artist."""
456 artist = await self.client.get_artist(artist_id=int(prov_artist_id))
457 return [
458 self.parse_track(track=track, user_country=self.gw_client.user_country)
459 async for track in await artist.get_top(limit=50)
460 ]
461
462 async def library_add(self, item: MediaItemType) -> bool:
463 """Add an item to the provider's library/favorites."""
464 result = False
465 if item.media_type == MediaType.ARTIST:
466 result = bool(
467 await self.client.add_user_artist(
468 artist_id=int(item.item_id),
469 )
470 )
471 elif item.media_type == MediaType.ALBUM:
472 result = bool(
473 await self.client.add_user_album(
474 album_id=int(item.item_id),
475 )
476 )
477 elif item.media_type == MediaType.TRACK:
478 result = bool(
479 await self.client.add_user_track(
480 track_id=int(item.item_id),
481 )
482 )
483 elif item.media_type == MediaType.PLAYLIST:
484 result = bool(
485 await self.client.add_user_playlist(
486 playlist_id=int(item.item_id),
487 )
488 )
489 else:
490 raise NotImplementedError
491 return result
492
493 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
494 """Remove an item from the provider's library/favorites."""
495 result = False
496 if media_type == MediaType.ARTIST:
497 result = bool(
498 await self.client.remove_user_artist(
499 artist_id=int(prov_item_id),
500 )
501 )
502 elif media_type == MediaType.ALBUM:
503 result = bool(
504 await self.client.remove_user_album(
505 album_id=int(prov_item_id),
506 )
507 )
508 elif media_type == MediaType.TRACK:
509 result = bool(
510 await self.client.remove_user_track(
511 track_id=int(prov_item_id),
512 )
513 )
514 elif media_type == MediaType.PLAYLIST:
515 result = bool(
516 await self.client.remove_user_playlist(
517 playlist_id=int(prov_item_id),
518 )
519 )
520 else:
521 raise NotImplementedError
522 return result
523
524 @use_cache(3600)
525 async def recommendations(self) -> list[RecommendationFolder]:
526 """Get Deezer's recommendations including Flow and personalized content."""
527 result: list[RecommendationFolder] = []
528
529 # Made for you - combines Flow, Recommended tracks, and recommended playlists
530 # Get covers from first track's album for each virtual playlist
531 flow_cover = None
532 flow_tracks = await self._get_flow_tracks()
533 if flow_tracks and hasattr(flow_tracks[0], "album"):
534 flow_cover = getattr(flow_tracks[0].album, "cover_medium", None)
535
536 recommended_cover = None
537 recommended_tracks = await self._get_recommended_tracks()
538 if recommended_tracks and hasattr(recommended_tracks[0], "album"):
539 recommended_cover = getattr(recommended_tracks[0].album, "cover_medium", None)
540
541 chart_tracks = await self._get_chart_tracks()
542 chart_cover = None
543 if chart_tracks and hasattr(chart_tracks[0], "album"):
544 chart_cover = getattr(chart_tracks[0].album, "cover_medium", None)
545
546 made_for_you_items: list[Playlist] = [
547 # Flow - personalized endless radio
548 self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover),
549 # Recommended tracks
550 self._create_virtual_playlist(
551 RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=recommended_cover
552 ),
553 # Top Charts - global top tracks
554 self._create_virtual_playlist(
555 TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover
556 ),
557 ]
558 # Add recommended playlists from Deezer
559 for playlist in await self.client.get_user_recommended_playlists():
560 made_for_you_items.append(self.parse_playlist(playlist=playlist))
561
562 result.append(
563 RecommendationFolder(
564 item_id="made_for_you",
565 provider=self.instance_id,
566 name="Made for you",
567 items=UniqueList(made_for_you_items),
568 )
569 )
570
571 # Recommended albums
572 recommended_albums = list(await self.client.get_user_recommended_albums())
573 if recommended_albums:
574 result.append(
575 RecommendationFolder(
576 item_id="recommended_albums",
577 provider=self.instance_id,
578 name="Recommended albums",
579 items=UniqueList(
580 [self.parse_album(album=album) for album in recommended_albums]
581 ),
582 )
583 )
584
585 # Recommended artists
586 recommended_artists = list(await self.client.get_user_recommended_artists())
587 if recommended_artists:
588 result.append(
589 RecommendationFolder(
590 item_id="recommended_artists",
591 provider=self.instance_id,
592 name="Recommended artists",
593 items=UniqueList(
594 [self.parse_artist(artist=artist) for artist in recommended_artists]
595 ),
596 )
597 )
598
599 # Deezer Radios - curated selection (as virtual playlists in one folder)
600 radio_playlists: list[Playlist] = []
601 for radio_id in CURATED_RADIO_IDS:
602 try:
603 radio = await self.client.get_radio(radio_id)
604 radio_playlists.append(
605 self._create_virtual_playlist(
606 item_id=f"{RADIO_PLAYLIST_PREFIX}{radio_id}",
607 name=f"Radio: {radio.title}",
608 image_url=getattr(radio, "picture_medium", None),
609 )
610 )
611 except Exception as err:
612 self.logger.debug("Failed to load radio %s: %s", radio_id, err)
613
614 if radio_playlists:
615 result.append(
616 RecommendationFolder(
617 item_id="radios",
618 provider=self.instance_id,
619 name="Deezer Radios",
620 items=UniqueList(radio_playlists),
621 )
622 )
623
624 return result
625
626 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
627 """Add track(s) to playlist."""
628 playlist = await self.client.get_playlist(int(prov_playlist_id))
629 await playlist.add_tracks(tracks=[int(i) for i in prov_track_ids])
630
631 async def remove_playlist_tracks(
632 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
633 ) -> None:
634 """Remove track(s) from playlist."""
635 playlist_track_ids = []
636 for track in await self.get_playlist_tracks(prov_playlist_id, 0):
637 if track.position in positions_to_remove:
638 playlist_track_ids.append(int(track.item_id))
639 if len(playlist_track_ids) == len(positions_to_remove):
640 break
641 playlist = await self.client.get_playlist(int(prov_playlist_id))
642 await playlist.delete_tracks(playlist_track_ids)
643
644 async def create_playlist(self, name: str) -> Playlist:
645 """Create a new playlist on provider with given name."""
646 playlist_id = await self.client.create_playlist(playlist_name=name)
647 playlist = await self.client.get_playlist(playlist_id)
648 return self.parse_playlist(playlist=playlist)
649
650 @use_cache(3600 * 24) # Cache for 24 hours
651 async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
652 """Retrieve a dynamic list of tracks based on the provided item."""
653 endpoint = "song.getSearchTrackMix"
654 tracks = (await self.gw_client._gw_api_call(endpoint, args={"SNG_ID": prov_track_id}))[
655 "results"
656 ]["data"][:limit]
657 return [await self.get_track(track["SNG_ID"]) for track in tracks]
658
659 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
660 """Return the content details for the given track when it will be streamed."""
661 url_details, song_data = await self.gw_client.get_deezer_track_urls(item_id)
662 url = url_details["sources"][0]["url"]
663 return StreamDetails(
664 item_id=item_id,
665 provider=self.instance_id,
666 audio_format=AudioFormat(
667 content_type=ContentType.try_parse(url_details["format"].split("_")[0])
668 ),
669 stream_type=StreamType.CUSTOM,
670 duration=int(song_data["DURATION"]),
671 # Due to track replacement, the track ID of the stream may be different from the ID
672 # that is stored. We need the proper track ID to decrypt the stream, so store it
673 # separately so we can use it later on.
674 data={"url": url, "format": url_details["format"], "track_id": song_data["SNG_ID"]},
675 size=int(song_data[f"FILESIZE_{url_details['format']}"]),
676 can_seek=True,
677 allow_seek=True,
678 )
679
680 async def get_audio_stream(
681 self, streamdetails: StreamDetails, seek_position: int = 0
682 ) -> AsyncGenerator[bytes, None]:
683 """Return the audio stream for the provider item."""
684 blowfish_key = self.get_blowfish_key(streamdetails.data["track_id"])
685 chunk_index = 0
686 timeout = ClientTimeout(total=None, connect=30, sock_read=600)
687 headers: dict[str, str] = {}
688 # if seek_position and streamdetails.size:
689 # chunk_count = ceil(streamdetails.size / 2048)
690 # chunk_index = int(chunk_count / streamdetails.duration) * seek_position
691 # skip_bytes = chunk_index * 2048
692 # headers["Range"] = f"bytes={skip_bytes}-"
693
694 # NOTE: Seek with using the Range header is not working properly
695 # causing malformed audio so this is a temporary patch
696 # by just skipping chunks
697 if seek_position and streamdetails.size and streamdetails.duration:
698 chunk_count = ceil(streamdetails.size / 2048)
699 skip_chunks = int(chunk_count / streamdetails.duration) * seek_position
700 else:
701 skip_chunks = 0
702
703 buffer = bytearray()
704 streamdetails.data["start_ts"] = utc_timestamp()
705 streamdetails.data["stream_id"] = uuid.uuid1()
706 self.mass.create_task(self.gw_client.log_listen(next_track=streamdetails.item_id))
707 async with self.mass.http_session.get(
708 streamdetails.data["url"], headers=headers, timeout=timeout
709 ) as resp:
710 async for chunk in resp.content.iter_chunked(2048):
711 buffer += chunk
712 if len(buffer) >= 2048:
713 if chunk_index >= skip_chunks or chunk_index == 0:
714 if chunk_index % 3 > 0:
715 yield bytes(buffer[:2048])
716 else:
717 yield self.decrypt_chunk(bytes(buffer[:2048]), blowfish_key)
718
719 chunk_index += 1
720 del buffer[:2048]
721 yield bytes(buffer)
722
723 async def on_streamed(
724 self,
725 streamdetails: StreamDetails,
726 ) -> None:
727 """Handle callback when an item completed streaming."""
728 await self.gw_client.log_listen(last_track=streamdetails)
729
730 ### PARSING METADATA FUNCTIONS ###
731
732 def parse_metadata_track(self, track: deezer.Track) -> MediaItemMetadata:
733 """Parse the track metadata."""
734 metadata = MediaItemMetadata()
735 if hasattr(track, "preview"):
736 metadata.preview = track.preview
737 if hasattr(track, "explicit_lyrics"):
738 metadata.explicit = track.explicit_lyrics
739 if hasattr(track, "rank"):
740 metadata.popularity = track.rank
741 if hasattr(track, "album") and hasattr(track.album, "cover_big"):
742 metadata.add_image(
743 MediaItemImage(
744 type=ImageType.THUMB,
745 path=track.album.cover_big,
746 provider=self.instance_id,
747 remotely_accessible=True,
748 )
749 )
750 return metadata
751
752 def parse_metadata_album(self, album: deezer.Album) -> MediaItemMetadata:
753 """Parse the album metadata."""
754 return MediaItemMetadata(
755 explicit=album.explicit_lyrics,
756 images=UniqueList(
757 [
758 MediaItemImage(
759 type=ImageType.THUMB,
760 path=album.cover_big,
761 provider=self.instance_id,
762 remotely_accessible=True,
763 )
764 ]
765 ),
766 )
767
768 def parse_metadata_artist(self, artist: deezer.Artist) -> MediaItemMetadata:
769 """Parse the artist metadata."""
770 return MediaItemMetadata(
771 images=UniqueList(
772 [
773 MediaItemImage(
774 type=ImageType.THUMB,
775 path=artist.picture_big,
776 provider=self.instance_id,
777 remotely_accessible=True,
778 )
779 ]
780 ),
781 )
782
783 ### PARSING FUNCTIONS ###
784 def parse_artist(self, artist: deezer.Artist) -> Artist:
785 """Parse the deezer-python artist to a Music Assistant artist."""
786 return Artist(
787 item_id=str(artist.id),
788 provider=self.instance_id,
789 name=artist.name,
790 media_type=MediaType.ARTIST,
791 provider_mappings={
792 ProviderMapping(
793 item_id=str(artist.id),
794 provider_domain=self.domain,
795 provider_instance=self.instance_id,
796 url=getattr(artist, "link", None), # Sometimes the API doesn't return a link
797 )
798 },
799 metadata=self.parse_metadata_artist(artist=artist),
800 )
801
802 def parse_album(self, album: deezer.Album) -> Album:
803 """Parse the deezer-python album to a Music Assistant album."""
804 name, version = parse_title_and_version(album.title)
805 return Album(
806 album_type=self.get_album_type(album),
807 item_id=str(album.id),
808 provider=self.instance_id,
809 name=name,
810 version=version,
811 year=album.release_date.year if getattr(album, "release_date", None) else None,
812 artists=UniqueList(
813 [
814 ItemMapping(
815 media_type=MediaType.ARTIST,
816 item_id=str(album.artist.id),
817 provider=self.instance_id,
818 name=album.artist.name,
819 )
820 ]
821 ),
822 media_type=MediaType.ALBUM,
823 provider_mappings={
824 ProviderMapping(
825 item_id=str(album.id),
826 provider_domain=self.domain,
827 provider_instance=self.instance_id,
828 url=getattr(album, "link", None),
829 )
830 },
831 metadata=self.parse_metadata_album(album=album),
832 )
833
834 def parse_playlist(self, playlist: deezer.Playlist) -> Playlist:
835 """Parse the deezer-python playlist to a Music Assistant playlist."""
836 creator = self.get_playlist_creator(playlist)
837 is_editable = creator.id == self.user.id
838 return Playlist(
839 item_id=str(playlist.id),
840 provider=self.instance_id,
841 name=playlist.title,
842 media_type=MediaType.PLAYLIST,
843 provider_mappings={
844 ProviderMapping(
845 item_id=str(playlist.id),
846 provider_domain=self.domain,
847 provider_instance=self.instance_id,
848 url=getattr(playlist, "link", None),
849 is_unique=is_editable, # user-owned playlists are unique
850 )
851 },
852 metadata=MediaItemMetadata(
853 images=UniqueList(
854 [
855 MediaItemImage(
856 type=ImageType.THUMB,
857 path=playlist.picture_big,
858 provider=self.instance_id,
859 remotely_accessible=True,
860 )
861 ]
862 ),
863 ),
864 is_editable=is_editable,
865 owner=creator.name,
866 )
867
868 def get_playlist_creator(self, playlist: deezer.Playlist) -> deezer.User:
869 """On playlists, the creator is called creator, elsewhere it's called user."""
870 if hasattr(playlist, "creator"):
871 return playlist.creator
872 return playlist.user
873
874 def _create_virtual_playlist(
875 self,
876 item_id: str,
877 name: str,
878 image_url: str | None = None,
879 ) -> Playlist:
880 """Create a virtual playlist for Flow, Recommended tracks, or Radios.
881
882 :param item_id: The unique identifier (e.g., "flow", "radio_37151").
883 :param name: Display name for the playlist.
884 :param image_url: Optional image URL.
885 """
886 images: UniqueList[MediaItemImage] = UniqueList()
887 if image_url:
888 images.append(
889 MediaItemImage(
890 type=ImageType.THUMB,
891 path=image_url,
892 provider=self.instance_id,
893 remotely_accessible=True,
894 )
895 )
896 return Playlist(
897 item_id=item_id,
898 provider=self.instance_id,
899 name=name,
900 media_type=MediaType.PLAYLIST,
901 provider_mappings={
902 ProviderMapping(
903 item_id=item_id,
904 provider_domain=self.domain,
905 provider_instance=self.instance_id,
906 )
907 },
908 metadata=MediaItemMetadata(images=images) if images else MediaItemMetadata(),
909 is_editable=False,
910 owner="Deezer",
911 )
912
913 def parse_track(self, track: deezer.Track, user_country: str, position: int = 0) -> Track:
914 """Parse the deezer-python track to a Music Assistant track."""
915 if hasattr(track, "artist"):
916 artist = ItemMapping(
917 media_type=MediaType.ARTIST,
918 item_id=str(getattr(track.artist, "id", f"deezer-{track.artist.name}")),
919 provider=self.instance_id,
920 name=track.artist.name,
921 )
922 else:
923 artist = None
924 if hasattr(track, "album"):
925 album = ItemMapping(
926 media_type=MediaType.ALBUM,
927 item_id=str(track.album.id),
928 provider=self.instance_id,
929 name=track.album.title,
930 )
931 else:
932 album = None
933
934 name, version = parse_title_and_version(track.title)
935 item = Track(
936 item_id=str(track.id),
937 provider=self.instance_id,
938 name=name,
939 version=version,
940 sort_name=self.get_short_title(track),
941 duration=track.duration,
942 artists=UniqueList([artist]) if artist else UniqueList(),
943 album=album,
944 provider_mappings={
945 ProviderMapping(
946 item_id=str(track.id),
947 provider_domain=self.domain,
948 provider_instance=self.instance_id,
949 available=self.track_available(track=track, user_country=user_country),
950 url=getattr(track, "link", None),
951 )
952 },
953 metadata=self.parse_metadata_track(track=track),
954 track_number=getattr(track, "track_position", position),
955 position=position,
956 disc_number=getattr(track, "disk_number", 0),
957 )
958 if isrc := getattr(track, "isrc", None):
959 item.external_ids.add((ExternalID.ISRC, isrc))
960 return item
961
962 def get_short_title(self, track: deezer.Track) -> str:
963 """Short names only returned, if available."""
964 if hasattr(track, "title_short"):
965 return str(track.title_short)
966 return str(track.title)
967
968 def get_album_type(self, album: deezer.Album) -> AlbumType:
969 """Read and convert the Deezer album type."""
970 # Get provider's basic type first
971 provider_type = AlbumType.UNKNOWN
972 if hasattr(album, "record_type"):
973 match album.record_type:
974 case "album":
975 provider_type = AlbumType.ALBUM
976 case "single":
977 provider_type = AlbumType.SINGLE
978 case "ep":
979 provider_type = AlbumType.EP
980 case "compile":
981 provider_type = AlbumType.COMPILATION
982
983 # Try inference - override if it finds something more specific
984 inferred_type = infer_album_type(album.title, "")
985 if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
986 return inferred_type
987
988 # Otherwise use provider type
989 return provider_type
990
991 ### SEARCH AND PARSE FUNCTIONS ###
992 async def search_and_parse_tracks(
993 self, query: str, user_country: str, limit: int = 20
994 ) -> list[Track]:
995 """Search for tracks and parse them."""
996 deezer_tracks = await self.client.search(query=query, limit=limit)
997 tracks = []
998 for index, track in enumerate(deezer_tracks):
999 tracks.append(self.parse_track(track, user_country))
1000 if index == limit:
1001 return tracks
1002 return tracks
1003
1004 async def search_and_parse_artists(self, query: str, limit: int = 20) -> list[Artist]:
1005 """Search for artists and parse them."""
1006 deezer_artist = await self.client.search_artists(query=query, limit=limit)
1007 artists = []
1008 for index, artist in enumerate(deezer_artist):
1009 artists.append(self.parse_artist(artist))
1010 if index == limit:
1011 return artists
1012 return artists
1013
1014 async def search_and_parse_albums(self, query: str, limit: int = 20) -> list[Album]:
1015 """Search for album and parse them."""
1016 deezer_albums = await self.client.search_albums(query=query, limit=limit)
1017 albums = []
1018 for index, album in enumerate(deezer_albums):
1019 albums.append(self.parse_album(album))
1020 if index == limit:
1021 return albums
1022 return albums
1023
1024 async def search_and_parse_playlists(self, query: str, limit: int = 20) -> list[Playlist]:
1025 """Search for playlists and parse them."""
1026 deezer_playlists = await self.client.search_playlists(query=query, limit=limit)
1027 playlists = []
1028 for index, playlist in enumerate(deezer_playlists):
1029 playlists.append(self.parse_playlist(playlist))
1030 if index == limit:
1031 return playlists
1032 return playlists
1033
1034 ### OTHER FUNCTIONS ###
1035
1036 async def get_track_content_type(
1037 self, gw_client: GWClient, track_id: str
1038 ) -> Literal[ContentType.FLAC, ContentType.MP3]:
1039 """Get a tracks contentType."""
1040 song_data = await gw_client.get_song_data(track_id)
1041 if song_data["results"]["FILESIZE_FLAC"]:
1042 return ContentType.FLAC
1043
1044 if song_data["results"]["FILESIZE_MP3_320"] or song_data["results"]["FILESIZE_MP3_128"]:
1045 return ContentType.MP3
1046
1047 msg = "Unsupported contenttype"
1048 raise NotImplementedError(msg)
1049
1050 def track_available(self, track: deezer.Track, user_country: str) -> bool:
1051 """Check if a given track is available in the users country."""
1052 if hasattr(track, "available_countries"):
1053 return user_country in track.available_countries
1054 return True
1055
1056 def _md5(self, data: str, data_type: str = "ascii") -> str:
1057 md5sum = hashlib.md5()
1058 md5sum.update(data.encode(data_type))
1059 return md5sum.hexdigest()
1060
1061 def get_blowfish_key(self, track_id: str) -> str:
1062 """Get blowfish key to decrypt a chunk of a track."""
1063 secret = app_var(5)
1064 id_md5 = self._md5(track_id)
1065 return "".join(
1066 chr(ord(id_md5[i]) ^ ord(id_md5[i + 16]) ^ ord(secret[i])) for i in range(16)
1067 )
1068
1069 def decrypt_chunk(self, chunk: bytes, blowfish_key: str) -> bytes:
1070 """Decrypt a given chunk using the blow fish key."""
1071 cipher = Blowfish.new(
1072 blowfish_key.encode("ascii"),
1073 Blowfish.MODE_CBC,
1074 b"\x00\x01\x02\x03\x04\x05\x06\x07",
1075 )
1076 return cipher.decrypt(chunk) # type: ignore[no-any-return,unused-ignore]
1077