/
/
/
1"""API client wrapper for Yandex Music."""
2
3from __future__ import annotations
4
5import asyncio
6import base64
7import hashlib
8import hmac
9import logging
10import re
11import time
12from collections.abc import Awaitable, Callable
13from datetime import UTC, datetime
14from typing import TYPE_CHECKING, Any, TypeVar, cast
15
16from music_assistant_models.errors import (
17 LoginFailed,
18 ProviderUnavailableError,
19 ResourceTemporarilyUnavailable,
20)
21from yandex_music import Album as YandexAlbum
22from yandex_music import Artist as YandexArtist
23from yandex_music import ClientAsync, MixLink, Search, TrackShort
24from yandex_music import Playlist as YandexPlaylist
25from yandex_music import Track as YandexTrack
26from yandex_music.exceptions import BadRequestError, NetworkError, UnauthorizedError
27from yandex_music.utils.sign_request import DEFAULT_SIGN_KEY
28
29if TYPE_CHECKING:
30 from yandex_music import DownloadInfo
31 from yandex_music.feed.feed import Feed
32 from yandex_music.landing.chart_info import ChartInfo
33 from yandex_music.landing.landing import Landing
34 from yandex_music.landing.landing_list import LandingList
35 from yandex_music.rotor.dashboard import Dashboard
36 from yandex_music.rotor.station_result import StationResult
37
38from .constants import DEFAULT_LIMIT, ROTOR_STATION_MY_WAVE
39
40# get-file-info with quality=lossless returns FLAC; default /tracks/.../download-info often does not
41# Prefer flac-mp4/aac-mp4 (Yandex API moved to these formats around 2025)
42GET_FILE_INFO_CODECS = "flac-mp4,flac,aac-mp4,aac,he-aac,mp3,he-aac-mp4"
43
44LOGGER = logging.getLogger(__name__)
45
46_T = TypeVar("_T")
47
48
49class YandexMusicClient:
50 """Wrapper around yandex-music-api ClientAsync."""
51
52 def __init__(self, token: str, base_url: str | None = None) -> None:
53 """Initialize the Yandex Music client.
54
55 :param token: Yandex Music OAuth token.
56 :param base_url: Optional API base URL (defaults to Yandex Music API).
57 """
58 self._token = token
59 self._base_url = base_url
60 self._client: ClientAsync | None = None
61 self._user_id: int | None = None
62 self._last_reconnect_at: float = -30.0 # allow first reconnect immediately
63 self._reconnect_lock = asyncio.Lock()
64
65 @property
66 def user_id(self) -> int:
67 """Return the user ID."""
68 if self._user_id is None:
69 raise ProviderUnavailableError("Client not initialized, call connect() first")
70 return self._user_id
71
72 async def connect(self) -> bool:
73 """Initialize the client and verify token validity.
74
75 :return: True if connection was successful.
76 :raises LoginFailed: If the token is invalid.
77 """
78 try:
79 self._client = await ClientAsync(self._token, base_url=self._base_url).init()
80 if self._client.me is None or self._client.me.account is None:
81 raise LoginFailed("Failed to get account info")
82 self._user_id = self._client.me.account.uid
83 LOGGER.debug("Connected to Yandex Music as user %s", self._user_id)
84 return True
85 except UnauthorizedError as err:
86 raise LoginFailed("Invalid Yandex Music token") from err
87 except NetworkError as err:
88 msg = "Network error connecting to Yandex Music"
89 raise ResourceTemporarilyUnavailable(msg) from err
90
91 async def disconnect(self) -> None:
92 """Disconnect the client."""
93 self._client = None
94 self._user_id = None
95
96 async def _ensure_connected(self) -> ClientAsync:
97 """Ensure the client is connected, attempting reconnect if needed."""
98 if self._client is not None:
99 return self._client
100 async with self._reconnect_lock:
101 # Re-check after acquiring lock â another task may have connected already
102 if self._client is not None:
103 return self._client # type: ignore[unreachable]
104 LOGGER.info("Client disconnected, attempting to reconnect...")
105 try:
106 await self.connect()
107 except LoginFailed:
108 raise
109 except Exception as err:
110 raise ProviderUnavailableError("Client not connected and reconnect failed") from err
111 return cast("ClientAsync", self._client)
112
113 def _is_connection_error(self, err: Exception) -> bool:
114 """Return True if the exception indicates a connection or server drop."""
115 if isinstance(err, NetworkError):
116 return True
117 msg = str(err).lower()
118 return "disconnect" in msg or "connection" in msg or "timeout" in msg
119
120 async def _reconnect(self) -> None:
121 """Disconnect and connect again to recover from Server disconnected / connection errors.
122
123 Enforces a 30-second cooldown between reconnect attempts to avoid hammering Yandex
124 and triggering rate limiting. A lock ensures concurrent callers don't bypass the cooldown.
125 """
126 async with self._reconnect_lock:
127 now = time.monotonic()
128 if now - self._last_reconnect_at < 30.0:
129 raise ProviderUnavailableError("Reconnect cooldown active, skipping")
130 self._last_reconnect_at = now
131 await self.disconnect()
132 await self.connect()
133
134 async def _call_with_retry(self, func: Callable[[ClientAsync], Awaitable[_T]]) -> _T:
135 """Execute an async API call with one reconnect attempt on connection error.
136
137 :param func: Async callable that takes a ClientAsync and returns a result.
138 :return: The result of the API call.
139 """
140 client = await self._ensure_connected()
141 try:
142 return await func(client)
143 except Exception as err:
144 if not self._is_connection_error(err):
145 raise
146 LOGGER.warning("Connection error, reconnecting and retrying: %s", err)
147 try:
148 await self._reconnect()
149 except Exception as recon_err:
150 raise ProviderUnavailableError("Reconnect failed") from recon_err
151 client = cast("ClientAsync", self._client)
152 return await func(client)
153
154 async def _call_no_retry(self, func: Callable[[ClientAsync], Awaitable[_T]]) -> _T:
155 """Execute an async API call without reconnect retry on call failure.
156
157 Used for fire-and-forget calls (e.g. rotor feedback) where a failed request
158 should be silently dropped rather than triggering a reconnect cycle that
159 could cause rate limiting. Note: _ensure_connected() is still called to
160 establish the initial connection if needed; only the reconnect-on-error
161 path is skipped.
162
163 :param func: Async callable that takes a ClientAsync and returns a result.
164 :return: The result of the API call.
165 """
166 client = await self._ensure_connected()
167 return await func(client)
168
169 # Rotor (radio station) methods
170
171 async def get_rotor_station_tracks(
172 self,
173 station_id: str,
174 queue: str | int | None = None,
175 ) -> tuple[list[YandexTrack], str | None]:
176 """Get tracks from a rotor station (e.g. user:onyourwave or track:1234).
177
178 :param station_id: Station ID (e.g. ROTOR_STATION_MY_WAVE or "track:1234" for similar).
179 :param queue: Optional track ID for pagination (first track of previous batch).
180 :return: Tuple of (list of track objects, batch_id for feedback or None).
181 """
182 try:
183 result = await self._call_with_retry(
184 lambda c: c.rotor_station_tracks(station_id, settings2=True, queue=queue)
185 )
186 except BadRequestError as err:
187 LOGGER.warning("Error fetching rotor station %s tracks: %s", station_id, err)
188 return ([], None)
189 except (NetworkError, ProviderUnavailableError) as err:
190 LOGGER.warning("Error fetching rotor station tracks: %s", err)
191 return ([], None)
192
193 if not result or not result.sequence:
194 return ([], result.batch_id if result else None)
195 track_ids = []
196 for seq in result.sequence:
197 if seq.track is None:
198 continue
199 tid = getattr(seq.track, "id", None) or getattr(seq.track, "track_id", None)
200 if tid is not None:
201 track_ids.append(str(tid))
202 if not track_ids:
203 return ([], result.batch_id if result else None)
204 try:
205 full_tracks = await self.get_tracks(track_ids)
206 except ResourceTemporarilyUnavailable as err:
207 LOGGER.warning("Error fetching rotor station track details: %s", err)
208 return ([], result.batch_id if result else None)
209 order_map = {str(t.id): t for t in full_tracks if hasattr(t, "id") and t.id}
210 ordered = [order_map[tid] for tid in track_ids if tid in order_map]
211 return (ordered, result.batch_id if result else None)
212
213 async def get_my_wave_tracks(
214 self, queue: str | int | None = None
215 ) -> tuple[list[YandexTrack], str | None]:
216 """Get tracks from the My Wave radio station.
217
218 :param queue: Optional track ID of the last track from the previous batch (API uses it for
219 pagination; do not pass batch_id).
220 :return: Tuple of (list of track objects, batch_id for feedback).
221 """
222 return await self.get_rotor_station_tracks(ROTOR_STATION_MY_WAVE, queue=queue)
223
224 async def send_rotor_station_feedback(
225 self,
226 station_id: str,
227 feedback_type: str,
228 *,
229 batch_id: str | None = None,
230 track_id: str | None = None,
231 total_played_seconds: int | None = None,
232 ) -> bool:
233 """Send rotor station feedback for My Wave recommendations.
234
235 Used to report radioStarted, trackStarted, trackFinished, skip so that
236 Yandex can improve subsequent recommendations.
237
238 :param station_id: Station ID (e.g. ROTOR_STATION_MY_WAVE).
239 :param feedback_type: One of 'radioStarted', 'trackStarted', 'trackFinished', 'skip'.
240 :param batch_id: Optional batch ID from the last get_my_wave_tracks response.
241 :param track_id: Track ID (required for trackStarted, trackFinished, skip).
242 :param total_played_seconds: Seconds played (for trackFinished, skip).
243 :return: True if the request succeeded.
244 """
245 payload: dict[str, Any] = {
246 "type": feedback_type,
247 "timestamp": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
248 }
249 if feedback_type == "radioStarted":
250 payload["from"] = "YandexMusicDesktopAppWindows"
251 if track_id is not None:
252 payload["trackId"] = track_id
253 if total_played_seconds is not None:
254 payload["totalPlayedSeconds"] = total_played_seconds
255 if batch_id is not None:
256 payload["batchId"] = batch_id
257
258 async def _post(c: ClientAsync) -> bool:
259 url = f"{c.base_url}/rotor/station/{station_id}/feedback"
260 await c._request.post(url, payload)
261 return True
262
263 try:
264 result = await self._call_no_retry(_post)
265 LOGGER.debug(
266 "Rotor feedback %s track_id=%s total_played_seconds=%s",
267 feedback_type,
268 track_id,
269 total_played_seconds,
270 )
271 return result
272 except BadRequestError as err:
273 LOGGER.warning("Rotor feedback %s failed: %s", feedback_type, err)
274 return False
275 except (NetworkError, ProviderUnavailableError) as err:
276 LOGGER.warning("Rotor feedback %s failed: %s", feedback_type, err)
277 return False
278
279 # Library methods
280
281 async def get_liked_tracks(self) -> list[TrackShort]:
282 """Get user's liked tracks sorted by timestamp (most recent first).
283
284 :return: List of liked track objects sorted in reverse chronological order.
285 """
286 try:
287 result = await self._call_with_retry(lambda c: c.users_likes_tracks())
288 if result is None:
289 return []
290 tracks = result.tracks or []
291 # Sort by timestamp in descending order (most recently liked first)
292 # TrackShort objects have a timestamp field containing the date the track was liked
293 return sorted(
294 tracks,
295 key=lambda t: getattr(t, "timestamp", datetime.min.replace(tzinfo=UTC)),
296 reverse=True,
297 )
298 except BadRequestError as err:
299 LOGGER.error("Error fetching liked tracks: %s", err)
300 raise ResourceTemporarilyUnavailable("Failed to fetch liked tracks") from err
301 except (NetworkError, ProviderUnavailableError) as err:
302 LOGGER.error("Error fetching liked tracks: %s", err)
303 raise ResourceTemporarilyUnavailable("Failed to fetch liked tracks") from err
304
305 async def get_liked_albums(self, batch_size: int = 50) -> list[YandexAlbum]:
306 """Get user's liked albums with full details (including cover art).
307
308 The users_likes_albums endpoint returns minimal album data without
309 cover_uri, so we fetch full album details in batches afterwards.
310
311 :return: List of liked album objects with full details.
312 """
313 try:
314 result = await self._call_with_retry(lambda c: c.users_likes_albums())
315 except BadRequestError as err:
316 LOGGER.error("Error fetching liked albums: %s", err)
317 raise ResourceTemporarilyUnavailable("Failed to fetch liked albums") from err
318 except (NetworkError, ProviderUnavailableError) as err:
319 LOGGER.error("Error fetching liked albums: %s", err)
320 raise ResourceTemporarilyUnavailable("Failed to fetch liked albums") from err
321
322 if result is None:
323 return []
324 album_ids = [
325 str(like.album.id) for like in result if like.album is not None and like.album.id
326 ]
327 if not album_ids:
328 return []
329 # Fetch full album details in batches to get cover_uri and other metadata
330 full_albums: list[YandexAlbum] = []
331 for i in range(0, len(album_ids), batch_size):
332 batch = album_ids[i : i + batch_size]
333 try:
334 batch_result = await self._call_with_retry(
335 lambda c, _b=batch: c.albums(_b) # type: ignore[misc]
336 )
337 if batch_result:
338 full_albums.extend(batch_result)
339 except (BadRequestError, NetworkError, ProviderUnavailableError) as batch_err:
340 LOGGER.warning("Error fetching album details batch: %s", batch_err)
341 # Fall back to minimal data for this batch
342 batch_set = set(batch)
343 for like in result:
344 if like.album is not None and like.album.id and str(like.album.id) in batch_set:
345 full_albums.append(like.album)
346 return full_albums
347
348 async def get_liked_artists(self) -> list[YandexArtist]:
349 """Get user's liked artists.
350
351 :return: List of liked artist objects.
352 """
353 try:
354 result = await self._call_with_retry(lambda c: c.users_likes_artists())
355 if result is None:
356 return []
357 return [like.artist for like in result if like.artist is not None]
358 except BadRequestError as err:
359 LOGGER.error("Error fetching liked artists: %s", err)
360 raise ResourceTemporarilyUnavailable("Failed to fetch liked artists") from err
361 except (NetworkError, ProviderUnavailableError) as err:
362 LOGGER.error("Error fetching liked artists: %s", err)
363 raise ResourceTemporarilyUnavailable("Failed to fetch liked artists") from err
364
365 async def get_user_playlists(self) -> list[YandexPlaylist]:
366 """Get user's playlists.
367
368 :return: List of playlist objects.
369 """
370 try:
371 result = await self._call_with_retry(lambda c: c.users_playlists_list())
372 if result is None:
373 return []
374 return list(result)
375 except BadRequestError as err:
376 LOGGER.error("Error fetching playlists: %s", err)
377 raise ResourceTemporarilyUnavailable("Failed to fetch playlists") from err
378 except (NetworkError, ProviderUnavailableError) as err:
379 LOGGER.error("Error fetching playlists: %s", err)
380 raise ResourceTemporarilyUnavailable("Failed to fetch playlists") from err
381
382 async def get_liked_playlists(self) -> list[YandexPlaylist]:
383 """Get user's liked/saved editorial playlists.
384
385 :return: List of liked playlist objects.
386 """
387 try:
388 result = await self._call_with_retry(lambda c: c.users_likes_playlists())
389 if result is None:
390 return []
391 playlists = []
392 for like in result:
393 if like.playlist is not None:
394 playlists.append(like.playlist)
395 return playlists
396 except BadRequestError as err:
397 LOGGER.error("Error fetching liked playlists: %s", err)
398 raise ResourceTemporarilyUnavailable("Failed to fetch liked playlists") from err
399 except (NetworkError, ProviderUnavailableError) as err:
400 LOGGER.error("Error fetching liked playlists: %s", err)
401 raise ResourceTemporarilyUnavailable("Failed to fetch liked playlists") from err
402
403 # Search
404
405 async def search(
406 self,
407 query: str,
408 search_type: str = "all",
409 limit: int = DEFAULT_LIMIT,
410 ) -> Search | None:
411 """Search for tracks, albums, artists, or playlists.
412
413 :param query: Search query string.
414 :param search_type: Type of search ('all', 'track', 'album', 'artist', 'playlist').
415 :param limit: Maximum number of results per type.
416 :return: Search results object.
417 """
418 try:
419 return await self._call_with_retry(
420 lambda c: c.search(query, type_=search_type, page=0, nocorrect=False)
421 )
422 except BadRequestError as err:
423 LOGGER.error("Search error: %s", err)
424 raise ResourceTemporarilyUnavailable("Search failed") from err
425 except (NetworkError, ProviderUnavailableError) as err:
426 LOGGER.error("Search error: %s", err)
427 raise ResourceTemporarilyUnavailable("Search failed") from err
428
429 # Get single items
430
431 async def get_track(self, track_id: str) -> YandexTrack | None:
432 """Get a single track by ID.
433
434 :param track_id: Track ID.
435 :return: Track object or None if not found.
436 """
437 try:
438 tracks = await self._call_with_retry(lambda c: c.tracks([track_id]))
439 return tracks[0] if tracks else None
440 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
441 LOGGER.error("Error fetching track %s: %s", track_id, err)
442 return None
443
444 async def get_track_lyrics(self, track_id: str) -> tuple[str | None, bool]:
445 """Get lyrics for a track.
446
447 Fetches lyrics from Yandex Music API. Returns the lyrics text and whether
448 it's in synced LRC format (with timestamps) or plain text.
449
450 Note: This method fetches the track first to check lyrics_available. If you
451 already have the YandexTrack object, use get_track_lyrics_from_track() to
452 avoid a redundant API call.
453
454 :param track_id: Track ID.
455 :return: Tuple of (lyrics_text, is_synced). Returns (None, False) if unavailable.
456 """
457 try:
458 tracks = await self._call_with_retry(lambda c: c.tracks([track_id]))
459 if not tracks:
460 return None, False
461
462 return await self.get_track_lyrics_from_track(tracks[0])
463
464 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
465 LOGGER.debug("Error fetching lyrics for track %s: %s", track_id, err)
466 return None, False
467 except Exception as err:
468 # Catch any other errors (e.g., geo-restrictions, API changes)
469 LOGGER.debug("Unexpected error fetching lyrics for track %s: %s", track_id, err)
470 return None, False
471
472 async def get_track_lyrics_from_track(self, track: YandexTrack) -> tuple[str | None, bool]:
473 """Get lyrics for an already-fetched track.
474
475 Avoids the extra tracks([track_id]) API call when the YandexTrack object
476 is already available.
477
478 :param track: YandexTrack object (already fetched).
479 :return: Tuple of (lyrics_text, is_synced). Returns (None, False) if unavailable.
480 """
481 track_id = getattr(track, "id", None) or getattr(track, "track_id", "unknown")
482 try:
483 if not getattr(track, "lyrics_available", False):
484 LOGGER.debug("Lyrics not available for track %s", track_id)
485 return None, False
486
487 track_lyrics = await track.get_lyrics_async()
488 if not track_lyrics:
489 LOGGER.debug("Failed to get lyrics metadata for track %s", track_id)
490 return None, False
491
492 lyrics_text = await track_lyrics.fetch_lyrics_async()
493 if not lyrics_text:
494 return None, False
495
496 # Check if it's LRC format (synced lyrics have timestamps like [00:12.34])
497 # Use re.search without ^ so metadata lines like [ar:Artist] don't prevent detection
498 is_synced = bool(re.search(r"\[\d{2}:\d{2}(?:\.\d{2,3})?\]", lyrics_text))
499 return lyrics_text, is_synced
500
501 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
502 LOGGER.debug("Error fetching lyrics for track %s: %s", track_id, err)
503 return None, False
504 except Exception as err:
505 # Catch any other errors (e.g., geo-restrictions, API changes)
506 LOGGER.debug("Unexpected error fetching lyrics for track %s: %s", track_id, err)
507 return None, False
508
509 async def get_tracks(self, track_ids: list[str]) -> list[YandexTrack]:
510 """Get multiple tracks by IDs.
511
512 :param track_ids: List of track IDs.
513 :return: List of track objects.
514 :raises ResourceTemporarilyUnavailable: On network errors after retry.
515 """
516 try:
517 result = await self._call_with_retry(lambda c: c.tracks(track_ids))
518 return result or []
519 except BadRequestError as err:
520 LOGGER.error("Error fetching tracks: %s", err)
521 return []
522 except (NetworkError, ProviderUnavailableError) as err:
523 LOGGER.error("Error fetching tracks (retry failed): %s", err)
524 raise ResourceTemporarilyUnavailable("Failed to fetch tracks") from err
525
526 async def get_album(self, album_id: str) -> YandexAlbum | None:
527 """Get a single album by ID.
528
529 :param album_id: Album ID.
530 :return: Album object or None if not found.
531 """
532 try:
533 albums = await self._call_with_retry(lambda c: c.albums([album_id]))
534 return albums[0] if albums else None
535 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
536 LOGGER.error("Error fetching album %s: %s", album_id, err)
537 return None
538
539 async def get_album_with_tracks(self, album_id: str) -> YandexAlbum | None:
540 """Get an album with its tracks.
541
542 Uses the same semantics as the web client: albums/{id}/with-tracks
543 with resumeStream, richTracks, withListeningFinished when the library
544 passes them through.
545
546 :param album_id: Album ID.
547 :return: Album object with tracks or None if not found.
548 """
549
550 async def _fetch(c: ClientAsync) -> YandexAlbum | None:
551 try:
552 return await c.albums_with_tracks(
553 album_id,
554 resumeStream=True,
555 richTracks=True,
556 withListeningFinished=True,
557 )
558 except TypeError:
559 # Older yandex-music may not accept these kwargs
560 return await c.albums_with_tracks(album_id)
561
562 try:
563 return await self._call_with_retry(_fetch)
564 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
565 LOGGER.error("Error fetching album with tracks %s: %s", album_id, err)
566 return None
567
568 async def get_artist(self, artist_id: str) -> YandexArtist | None:
569 """Get a single artist by ID.
570
571 :param artist_id: Artist ID.
572 :return: Artist object or None if not found.
573 """
574 try:
575 artists = await self._call_with_retry(lambda c: c.artists([artist_id]))
576 return artists[0] if artists else None
577 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
578 LOGGER.error("Error fetching artist %s: %s", artist_id, err)
579 return None
580
581 async def get_artist_albums(
582 self, artist_id: str, limit: int = DEFAULT_LIMIT
583 ) -> list[YandexAlbum]:
584 """Get artist's albums.
585
586 :param artist_id: Artist ID.
587 :param limit: Maximum number of albums.
588 :return: List of album objects.
589 """
590 try:
591 result = await self._call_with_retry(
592 lambda c: c.artists_direct_albums(artist_id, page=0, page_size=limit)
593 )
594 if result is None:
595 return []
596 return result.albums or []
597 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
598 LOGGER.error("Error fetching artist albums %s: %s", artist_id, err)
599 return []
600
601 async def get_artist_tracks(
602 self, artist_id: str, limit: int = DEFAULT_LIMIT
603 ) -> list[YandexTrack]:
604 """Get artist's top tracks.
605
606 :param artist_id: Artist ID.
607 :param limit: Maximum number of tracks.
608 :return: List of track objects.
609 """
610 try:
611 result = await self._call_with_retry(
612 lambda c: c.artists_tracks(artist_id, page=0, page_size=limit)
613 )
614 if result is None:
615 return []
616 return result.tracks or []
617 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
618 LOGGER.error("Error fetching artist tracks %s: %s", artist_id, err)
619 return []
620
621 async def get_playlist(self, user_id: str, playlist_id: str) -> YandexPlaylist | None:
622 """Get a playlist by ID.
623
624 :param user_id: User ID (owner of the playlist).
625 :param playlist_id: Playlist ID (kind).
626 :return: Playlist object or None if not found.
627 :raises ResourceTemporarilyUnavailable: On network errors.
628 """
629 try:
630 result = await self._call_with_retry(
631 lambda c: c.users_playlists(kind=int(playlist_id), user_id=user_id)
632 )
633 if isinstance(result, list):
634 return result[0] if result else None
635 return result
636 except BadRequestError as err:
637 LOGGER.error("Error fetching playlist %s/%s: %s", user_id, playlist_id, err)
638 return None
639 except (NetworkError, ProviderUnavailableError) as err:
640 LOGGER.warning("Network error fetching playlist %s/%s: %s", user_id, playlist_id, err)
641 raise ResourceTemporarilyUnavailable("Failed to fetch playlist") from err
642
643 # Streaming
644
645 async def get_track_download_info(
646 self, track_id: str, get_direct_links: bool = True
647 ) -> list[DownloadInfo]:
648 """Get download info for a track.
649
650 :param track_id: Track ID.
651 :param get_direct_links: Whether to get direct download links.
652 :return: List of download info objects.
653 """
654 try:
655 result = await self._call_with_retry(
656 lambda c: c.tracks_download_info(track_id, get_direct_links=get_direct_links)
657 )
658 return result or []
659 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
660 LOGGER.error("Error fetching download info for track %s: %s", track_id, err)
661 return []
662
663 async def get_track_file_info_lossless(self, track_id: str) -> dict[str, Any] | None:
664 """Request lossless stream via get-file-info (quality=lossless).
665
666 The /tracks/{id}/download-info endpoint often returns only MP3; get-file-info
667 with quality=lossless and codecs=flac,... returns FLAC when available.
668
669 Uses manual sign calculation matching yandex-music-downloader-realflac.
670 Uses _call_with_retry for automatic reconnection on transient failures.
671
672 :param track_id: Track ID.
673 :return: Parsed downloadInfo dict (url, codec, urls, ...) or None on error.
674 """
675
676 def _build_signed_params(client: ClientAsync) -> tuple[str, dict[str, Any]]:
677 """Build URL and signed params using current client and timestamp.
678
679 Called on each attempt by _call_with_retry, so the HMAC signature
680 is recomputed with a fresh timestamp on every retry.
681 """
682 timestamp = int(time.time())
683 params = {
684 "ts": timestamp,
685 "trackId": track_id,
686 "quality": "lossless",
687 "codecs": GET_FILE_INFO_CODECS,
688 "transports": "encraw",
689 }
690 # Build sign string explicitly matching Yandex API specification:
691 # concatenate ts + trackId + quality + codecs (commas stripped) + transports.
692 # Comma stripping matches yandex-music-downloader-realflac reference implementation
693 # (see get_file_info signing in that project).
694 codecs_for_sign = GET_FILE_INFO_CODECS.replace(",", "")
695 param_string = f"{timestamp}{track_id}lossless{codecs_for_sign}encraw"
696 hmac_sign = hmac.new(
697 DEFAULT_SIGN_KEY.encode(),
698 param_string.encode(),
699 hashlib.sha256,
700 )
701 # SHA-256 (32 bytes) -> base64 = 44 chars with "=" padding.
702 # Yandex API expects exactly 43 chars (one "=" removed).
703 # Matches yandex-music-downloader-realflac reference implementation.
704 params["sign"] = base64.b64encode(hmac_sign.digest()).decode()[:-1]
705 url = f"{client.base_url}/get-file-info"
706 return url, params
707
708 def _parse_file_info_result(raw: dict[str, Any] | None) -> dict[str, Any] | None:
709 if not raw or not isinstance(raw, dict):
710 return None
711 download_info = raw.get("download_info")
712 if not download_info or not download_info.get("url"):
713 return None
714
715 result = cast("dict[str, Any]", download_info)
716
717 if "key" in download_info:
718 result["needs_decryption"] = True
719 LOGGER.debug(
720 "Encrypted URL received for track %s, will require decryption",
721 track_id,
722 )
723 else:
724 result["needs_decryption"] = False
725
726 return result
727
728 async def _do_request(c: ClientAsync) -> dict[str, Any] | None:
729 url, params = _build_signed_params(c)
730 return await c._request.get(url, params=params) # type: ignore[no-any-return]
731
732 try:
733 result = await self._call_with_retry(_do_request)
734 parsed = _parse_file_info_result(result)
735 if parsed:
736 LOGGER.debug(
737 "get-file-info lossless for track %s: Success, codec=%s",
738 track_id,
739 parsed.get("codec"),
740 )
741 return parsed
742 except (BadRequestError, NetworkError) as err:
743 LOGGER.debug(
744 "get-file-info lossless for track %s: %s %s",
745 track_id,
746 type(err).__name__,
747 getattr(err, "message", str(err)) or repr(err),
748 )
749 except UnauthorizedError as err:
750 LOGGER.debug(
751 "get-file-info lossless for track %s: UnauthorizedError %s",
752 track_id,
753 getattr(err, "message", str(err)) or repr(err),
754 )
755 except Exception as err:
756 LOGGER.warning(
757 "get-file-info lossless for track %s: Unexpected error: %s",
758 track_id,
759 err,
760 exc_info=True,
761 )
762
763 return None
764
765 # Discovery / recommendations
766
767 async def get_feed(self) -> Feed | None:
768 """Get personalized feed with generated playlists (Playlist of the Day, etc.).
769
770 :return: Feed object with generated_playlists, or None on error.
771 """
772 try:
773 return await self._call_with_retry(lambda c: c.feed())
774 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
775 LOGGER.debug("Error fetching feed: %s", err)
776 return None
777
778 async def get_chart(self, chart_option: str = "") -> ChartInfo | None:
779 """Get chart data.
780
781 :param chart_option: Optional chart variant (e.g. 'world', 'russia').
782 :return: ChartInfo object or None on error.
783 """
784 try:
785 return await self._call_with_retry(lambda c: c.chart(chart_option))
786 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
787 LOGGER.debug("Error fetching chart: %s", err)
788 return None
789
790 async def get_new_releases(self) -> LandingList | None:
791 """Get new album releases.
792
793 :return: LandingList with new_releases (list of album IDs) or None on error.
794 """
795 try:
796 return await self._call_with_retry(lambda c: c.new_releases())
797 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
798 LOGGER.debug("Error fetching new releases: %s", err)
799 return None
800
801 async def get_new_playlists(self) -> LandingList | None:
802 """Get new editorial playlists.
803
804 :return: LandingList with new_playlists (list of PlaylistId) or None on error.
805 """
806 try:
807 return await self._call_with_retry(lambda c: c.new_playlists())
808 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
809 LOGGER.debug("Error fetching new playlists: %s", err)
810 return None
811
812 async def get_albums(self, album_ids: list[str]) -> list[YandexAlbum]:
813 """Get multiple albums by IDs.
814
815 :param album_ids: List of album IDs.
816 :return: List of album objects.
817 """
818 try:
819 result = await self._call_with_retry(lambda c: c.albums(album_ids))
820 return result or []
821 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
822 LOGGER.debug("Error fetching albums: %s", err)
823 return []
824
825 async def get_playlists(self, playlist_ids: list[str]) -> list[YandexPlaylist]:
826 """Get multiple playlists by IDs (format: 'uid:kind').
827
828 :param playlist_ids: List of playlist IDs in 'uid:kind' format.
829 :return: List of playlist objects.
830 """
831 try:
832 result = await self._call_with_retry(lambda c: c.playlists_list(playlist_ids))
833 return result or []
834 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
835 LOGGER.debug("Error fetching playlists: %s", err)
836 return []
837
838 async def get_tag_playlists(self, tag_id: str) -> list[YandexPlaylist]:
839 """Get playlists for a specific tag (mood, era, activity, genre, etc.).
840
841 Tags are used for curated collections like 'chill', '80s', 'workout', 'rock', etc.
842 The API returns playlist IDs which are then fetched in full.
843
844 :param tag_id: Tag identifier (e.g. 'chill', '80s', 'workout', 'rock').
845 :return: List of playlist objects with full details.
846 """
847 try:
848 tag_result = await self._call_with_retry(lambda c: c.tags(tag_id))
849 if not tag_result or not tag_result.ids:
850 LOGGER.debug("No playlists found for tag: %s", tag_id)
851 return []
852
853 # Convert PlaylistId objects to 'uid:kind' format
854 playlist_ids = [f"{pid.uid}:{pid.kind}" for pid in tag_result.ids]
855
856 # Fetch full playlist details
857 return await self.get_playlists(playlist_ids)
858 except BadRequestError as err:
859 LOGGER.debug("Tag %s not found: %s", tag_id, err)
860 return []
861 except (NetworkError, ProviderUnavailableError) as err:
862 LOGGER.debug("Error fetching tag %s playlists: %s", tag_id, err)
863 return []
864
865 async def get_landing_tags(self) -> list[tuple[str, str]]:
866 """Discover available tag slugs from the landing mixes block.
867
868 Uses the landing("mixes") API which returns MixLink entities
869 containing tag URLs (e.g., /tag/chill/) and display titles.
870 Filters out editorial post entries (/post/ URLs) which have no playlists.
871
872 :return: List of (tag_slug, title) tuples for real tag entries only.
873 """
874 try:
875 landing: Landing | None = await self._call_with_retry(lambda c: c.landing("mixes"))
876 if not landing or not landing.blocks:
877 return []
878 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
879 LOGGER.debug("Error fetching landing tags: %s", err)
880 return []
881
882 tags: list[tuple[str, str]] = []
883 for block in landing.blocks:
884 if not block.entities:
885 continue
886 for entity in block.entities:
887 if entity.type == "mix-link" and isinstance(entity.data, MixLink):
888 url = entity.data.url # e.g., "/tag/chill/" or "/post/..."
889 # Filter out editorial posts â only include /tag/ URLs
890 if not url.startswith("/tag/"):
891 continue
892 slug = url.strip("/").split("/")[-1]
893 if slug:
894 tags.append((slug, entity.data.title))
895 return tags
896
897 async def get_mixes_waves(self) -> list[dict[str, Any]] | None:
898 """Get AI Wave Set stations from /landing-blocks/mixes-waves endpoint.
899
900 Returns structured mix data with categories and station items, each
901 containing station_id, title, seeds, and visual metadata.
902
903 :return: List of mix category dicts, or None on error.
904 """
905 return await self._get_landing_waves("mixes-waves")
906
907 async def get_waves_landing(self) -> list[dict[str, Any]] | None:
908 """Get featured wave stations from /landing-blocks/waves endpoint.
909
910 Returns Yandex-curated wave categories with station items â the "ÐолнÑ"
911 landing page content, separate from the full rotor/stations/list and from
912 the AI mixes-waves sets.
913
914 :return: List of wave category dicts, or None on error.
915 """
916 return await self._get_landing_waves("waves")
917
918 async def _get_landing_waves(self, block: str) -> list[dict[str, Any]] | None:
919 """Fetch wave categories from a /landing-blocks/<block> endpoint.
920
921 Note: Response keys are auto-converted from camelCase to snake_case
922 by the yandex-music library's JSON parser.
923
924 :param block: Block name, e.g. 'waves' or 'mixes-waves'.
925 :return: List of wave category dicts, or None on error.
926 """
927
928 async def _get(c: ClientAsync) -> dict[str, Any]:
929 url = f"{c.base_url}/landing-blocks/{block}"
930 return await c._request.get(url) # type: ignore[no-any-return]
931
932 try:
933 result = await self._call_with_retry(_get)
934 if result and isinstance(result, dict):
935 waves = result.get("waves", [])
936 LOGGER.debug(
937 "landing-blocks/%s returned %d categories",
938 block,
939 len(waves) if isinstance(waves, list) else -1,
940 )
941 return waves if isinstance(waves, list) else []
942 return None
943 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
944 LOGGER.debug("Error fetching landing-blocks/%s: %s", block, err)
945 return None
946
947 async def get_wave_stations(
948 self, language: str | None = None
949 ) -> list[tuple[str, str, str, str | None]]:
950 """Get available rotor wave stations grouped by category.
951
952 Calls rotor_stations_list() â equivalent to the rotor/stations/list API endpoint.
953 Filters out personal stations (type 'user') since My Wave is handled separately.
954
955 :param language: Language for station names (e.g. 'ru', 'en'). Defaults to API default.
956 :return: List of (station_id, category, name, image_url) tuples,
957 e.g. ('genre:rock', 'genre', 'Рок', 'https://...').
958 """
959 try:
960 results: list[StationResult] = await self._call_with_retry(
961 lambda c: c.rotor_stations_list(language)
962 )
963 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
964 LOGGER.warning("Error fetching wave stations: %s", err)
965 return []
966
967 stations: list[tuple[str, str, str, str | None]] = []
968 for result in results or []:
969 station = result.station
970 if station is None or station.id is None:
971 continue
972 category = station.id.type
973 tag = station.id.tag
974 if not category or not tag:
975 continue
976 if category in ("user", "local-language"):
977 # Skip personal stations (My Wave is handled separately)
978 # and local-language stations (Yandex returns overlapping tracks across them)
979 continue
980 station_id = f"{category}:{tag}"
981 name = station.name or result.rup_title or tag
982 image_url: str | None = None
983 raw_url = station.full_image_url or (station.icon.image_url if station.icon else None)
984 if raw_url:
985 # Yandex avatar URIs use '%%' as a size placeholder; replace it with
986 # the desired size. If no placeholder, append the size as a suffix
987 # since these URLs return HTTP 400 without a size component.
988 if not raw_url.startswith("http"):
989 raw_url = f"https://{raw_url}"
990 if "%%" in raw_url:
991 image_url = raw_url.replace("%%", "400x400")
992 else:
993 image_url = f"{raw_url}/400x400"
994 stations.append((station_id, category, name, image_url))
995 return stations
996
997 async def get_dashboard_stations(self) -> list[tuple[str, str, str | None]]:
998 """Get personalized recommended stations for the current user.
999
1000 Calls rotor_stations_dashboard() â returns user-specific stations based
1001 on listening history, unlike rotor_stations_list() which is non-personalized.
1002
1003 :return: List of (station_id, name, image_url) tuples,
1004 e.g. ('genre:rock', 'Рок', 'https://...').
1005 """
1006 try:
1007 dashboard: Dashboard | None = await self._call_with_retry(
1008 lambda c: c.rotor_stations_dashboard()
1009 )
1010 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
1011 LOGGER.warning("Error fetching dashboard stations: %s", err)
1012 return []
1013
1014 if not dashboard or not dashboard.stations:
1015 return []
1016
1017 stations: list[tuple[str, str, str | None]] = []
1018 for result in dashboard.stations:
1019 station = result.station
1020 if station is None or station.id is None:
1021 continue
1022 category = station.id.type
1023 tag = station.id.tag
1024 if not category or not tag:
1025 continue
1026 if category == "user":
1027 continue
1028 station_id = f"{category}:{tag}"
1029 name = station.name or result.rup_title or tag
1030 image_url: str | None = None
1031 raw_url = station.full_image_url or (station.icon.image_url if station.icon else None)
1032 if raw_url:
1033 if not raw_url.startswith("http"):
1034 raw_url = f"https://{raw_url}"
1035 if "%%" in raw_url:
1036 image_url = raw_url.replace("%%", "400x400")
1037 else:
1038 image_url = f"{raw_url}/400x400"
1039 stations.append((station_id, name, image_url))
1040 return stations
1041
1042 # Library modifications
1043
1044 async def like_track(self, track_id: str) -> bool:
1045 """Add a track to liked tracks.
1046
1047 :param track_id: Track ID to like.
1048 :return: True if successful.
1049 """
1050 try:
1051 result = await self._call_with_retry(lambda c: c.users_likes_tracks_add(track_id))
1052 return result is not None
1053 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
1054 LOGGER.error("Error liking track %s: %s", track_id, err)
1055 return False
1056
1057 async def unlike_track(self, track_id: str) -> bool:
1058 """Remove a track from liked tracks.
1059
1060 :param track_id: Track ID to unlike.
1061 :return: True if successful.
1062 """
1063 try:
1064 result = await self._call_with_retry(lambda c: c.users_likes_tracks_remove(track_id))
1065 return result is not None
1066 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
1067 LOGGER.error("Error unliking track %s: %s", track_id, err)
1068 return False
1069
1070 async def like_album(self, album_id: str) -> bool:
1071 """Add an album to liked albums.
1072
1073 :param album_id: Album ID to like.
1074 :return: True if successful.
1075 """
1076 try:
1077 result = await self._call_with_retry(lambda c: c.users_likes_albums_add(album_id))
1078 return result is not None
1079 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
1080 LOGGER.error("Error liking album %s: %s", album_id, err)
1081 return False
1082
1083 async def unlike_album(self, album_id: str) -> bool:
1084 """Remove an album from liked albums.
1085
1086 :param album_id: Album ID to unlike.
1087 :return: True if successful.
1088 """
1089 try:
1090 result = await self._call_with_retry(lambda c: c.users_likes_albums_remove(album_id))
1091 return result is not None
1092 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
1093 LOGGER.error("Error unliking album %s: %s", album_id, err)
1094 return False
1095
1096 async def like_artist(self, artist_id: str) -> bool:
1097 """Add an artist to liked artists.
1098
1099 :param artist_id: Artist ID to like.
1100 :return: True if successful.
1101 """
1102 try:
1103 result = await self._call_with_retry(lambda c: c.users_likes_artists_add(artist_id))
1104 return result is not None
1105 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
1106 LOGGER.error("Error liking artist %s: %s", artist_id, err)
1107 return False
1108
1109 async def unlike_artist(self, artist_id: str) -> bool:
1110 """Remove an artist from liked artists.
1111
1112 :param artist_id: Artist ID to unlike.
1113 :return: True if successful.
1114 """
1115 try:
1116 result = await self._call_with_retry(lambda c: c.users_likes_artists_remove(artist_id))
1117 return result is not None
1118 except (BadRequestError, NetworkError, ProviderUnavailableError) as err:
1119 LOGGER.error("Error unliking artist %s: %s", artist_id, err)
1120 return False
1121