/
/
/
1"""Zvuk Music provider implementation."""
2
3from __future__ import annotations
4
5from typing import TYPE_CHECKING
6
7from music_assistant_models.enums import ContentType, MediaType, StreamType
8from music_assistant_models.errors import (
9 InvalidDataError,
10 LoginFailed,
11 MediaNotFoundError,
12 ProviderUnavailableError,
13)
14from music_assistant_models.media_items import (
15 Album,
16 Artist,
17 AudioFormat,
18 ItemMapping,
19 MediaItemType,
20 Playlist,
21 SearchResults,
22 Track,
23)
24from music_assistant_models.streamdetails import StreamDetails
25from zvuk_music.enums import Quality
26from zvuk_music.exceptions import QualityNotAvailableError, SubscriptionRequiredError
27
28from music_assistant.controllers.cache import use_cache
29from music_assistant.models.music_provider import MusicProvider
30
31from .api_client import ZvukMusicClient
32from .constants import (
33 CONF_QUALITY,
34 CONF_TOKEN,
35 DEFAULT_LIMIT,
36 PLAYLIST_TRACKS_PAGE_SIZE,
37 QUALITY_LOSSLESS,
38)
39from .parsers import parse_album, parse_artist, parse_playlist, parse_track
40
41if TYPE_CHECKING:
42 from collections.abc import AsyncGenerator
43
44
45class ZvukMusicProvider(MusicProvider):
46 """Implementation of a Zvuk Music MusicProvider."""
47
48 _client: ZvukMusicClient | None = None
49
50 @property
51 def client(self) -> ZvukMusicClient:
52 """Return the Zvuk Music client."""
53 if self._client is None:
54 raise ProviderUnavailableError("Provider not initialized")
55 return self._client
56
57 async def handle_async_init(self) -> None:
58 """Handle async initialization of the provider."""
59 token = self.config.get_value(CONF_TOKEN)
60 if not token:
61 raise LoginFailed("No Zvuk Music token provided")
62
63 self._client = ZvukMusicClient(str(token))
64 await self._client.connect()
65 self.logger.info("Successfully connected to Zvuk Music")
66
67 async def unload(self, is_removed: bool = False) -> None:
68 """Handle unload/close of the provider.
69
70 :param is_removed: Whether the provider is being removed.
71 """
72 if self._client:
73 await self._client.disconnect()
74 self._client = None
75 await super().unload(is_removed)
76
77 def get_item_mapping(self, media_type: MediaType | str, key: str, name: str) -> ItemMapping:
78 """Create a generic item mapping.
79
80 :param media_type: The media type.
81 :param key: The item ID.
82 :param name: The item name.
83 :return: An ItemMapping instance.
84 """
85 if isinstance(media_type, str):
86 media_type = MediaType(media_type)
87 return ItemMapping(
88 media_type=media_type,
89 item_id=key,
90 provider=self.instance_id,
91 name=name,
92 )
93
94 # Search
95
96 @use_cache(3600 * 24 * 14)
97 async def search(
98 self, search_query: str, media_types: list[MediaType], limit: int = 5
99 ) -> SearchResults:
100 """Perform search on Zvuk Music.
101
102 :param search_query: The search query.
103 :param media_types: List of media types to search for.
104 :param limit: Maximum number of results per type.
105 :return: SearchResults with found items.
106 """
107 result = SearchResults()
108
109 search_result = await self.client.search(
110 search_query,
111 limit=limit,
112 search_tracks=MediaType.TRACK in media_types,
113 search_artists=MediaType.ARTIST in media_types,
114 search_releases=MediaType.ALBUM in media_types,
115 search_playlists=MediaType.PLAYLIST in media_types,
116 )
117 if not search_result:
118 return result
119
120 # Parse tracks
121 if MediaType.TRACK in media_types and search_result.tracks:
122 for track in search_result.tracks.items[:limit]:
123 try:
124 result.tracks = [*result.tracks, parse_track(self, track)]
125 except InvalidDataError as err:
126 self.logger.debug("Error parsing track: %s", err)
127
128 # Parse albums (Zvuk releases)
129 if MediaType.ALBUM in media_types and search_result.releases:
130 for release in search_result.releases.items[:limit]:
131 try:
132 result.albums = [*result.albums, parse_album(self, release)]
133 except InvalidDataError as err:
134 self.logger.debug("Error parsing album: %s", err)
135
136 # Parse artists
137 if MediaType.ARTIST in media_types and search_result.artists:
138 for artist in search_result.artists.items[:limit]:
139 try:
140 result.artists = [*result.artists, parse_artist(self, artist)]
141 except InvalidDataError as err:
142 self.logger.debug("Error parsing artist: %s", err)
143
144 # Parse playlists
145 if MediaType.PLAYLIST in media_types and search_result.playlists:
146 for playlist in search_result.playlists.items[:limit]:
147 try:
148 result.playlists = [*result.playlists, parse_playlist(self, playlist)]
149 except InvalidDataError as err:
150 self.logger.debug("Error parsing playlist: %s", err)
151
152 return result
153
154 # Get single items
155
156 @use_cache(3600 * 24 * 30)
157 async def get_artist(self, prov_artist_id: str) -> Artist:
158 """Get artist details by ID.
159
160 :param prov_artist_id: The provider artist ID.
161 :return: Artist object.
162 :raises MediaNotFoundError: If artist not found.
163 """
164 artist = await self.client.get_artist(prov_artist_id)
165 if not artist:
166 raise MediaNotFoundError(f"Artist {prov_artist_id} not found")
167 return parse_artist(self, artist)
168
169 @use_cache(3600 * 24 * 30)
170 async def get_album(self, prov_album_id: str) -> Album:
171 """Get album details by ID.
172
173 :param prov_album_id: The provider album ID.
174 :return: Album object.
175 :raises MediaNotFoundError: If album not found.
176 """
177 release = await self.client.get_release(prov_album_id)
178 if not release:
179 raise MediaNotFoundError(f"Album {prov_album_id} not found")
180 return parse_album(self, release)
181
182 @use_cache(3600 * 24 * 30)
183 async def get_track(self, prov_track_id: str) -> Track:
184 """Get track details by ID.
185
186 :param prov_track_id: The provider track ID.
187 :return: Track object.
188 :raises MediaNotFoundError: If track not found.
189 """
190 track = await self.client.get_track(prov_track_id)
191 if not track:
192 raise MediaNotFoundError(f"Track {prov_track_id} not found")
193 return parse_track(self, track)
194
195 @use_cache(3600 * 24 * 30)
196 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
197 """Get playlist details by ID.
198
199 :param prov_playlist_id: The provider playlist ID.
200 :return: Playlist object.
201 :raises MediaNotFoundError: If playlist not found.
202 """
203 playlist = await self.client.get_playlist(prov_playlist_id)
204 if not playlist:
205 raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found")
206 return parse_playlist(self, playlist)
207
208 # Get related items
209
210 @use_cache(3600 * 24 * 30)
211 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
212 """Get album tracks.
213
214 :param prov_album_id: The provider album ID.
215 :return: List of Track objects.
216 """
217 release = await self.client.get_release(prov_album_id)
218 if not release or not release.tracks:
219 return []
220
221 tracks = []
222 for index, track in enumerate(release.tracks):
223 try:
224 parsed_track = parse_track(self, track)
225 parsed_track.disc_number = 1
226 parsed_track.track_number = index + 1
227 tracks.append(parsed_track)
228 except InvalidDataError as err:
229 self.logger.debug("Error parsing album track: %s", err)
230 return tracks
231
232 @use_cache(3600 * 3)
233 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
234 """Get playlist tracks.
235
236 :param prov_playlist_id: The provider playlist ID.
237 :param page: Page number for pagination.
238 :return: List of Track objects.
239 """
240 offset = page * PLAYLIST_TRACKS_PAGE_SIZE
241 simple_tracks = await self.client.get_playlist_tracks(
242 prov_playlist_id, limit=PLAYLIST_TRACKS_PAGE_SIZE, offset=offset
243 )
244 if not simple_tracks:
245 return []
246
247 # Fetch full track details from SimpleTrack IDs
248 track_ids = [str(t.id) for t in simple_tracks if t.id]
249 if not track_ids:
250 return []
251
252 full_tracks = await self.client.get_tracks(track_ids)
253 tracks = []
254 for track in full_tracks:
255 try:
256 tracks.append(parse_track(self, track))
257 except InvalidDataError as err:
258 self.logger.debug("Error parsing playlist track: %s", err)
259 return tracks
260
261 @use_cache(3600 * 24 * 7)
262 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
263 """Get artist's albums.
264
265 :param prov_artist_id: The provider artist ID.
266 :return: List of Album objects.
267 """
268 artists = await self.client.get_artist_releases(prov_artist_id, limit=DEFAULT_LIMIT)
269 if not artists:
270 return []
271
272 result = []
273 for artist in artists:
274 for release in artist.releases:
275 try:
276 result.append(parse_album(self, release))
277 except InvalidDataError as err:
278 self.logger.debug("Error parsing artist album: %s", err)
279 return result
280
281 @use_cache(3600 * 24 * 7)
282 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
283 """Get artist's top tracks.
284
285 :param prov_artist_id: The provider artist ID.
286 :return: List of Track objects.
287 """
288 artists = await self.client.get_artist_top_tracks(prov_artist_id, limit=DEFAULT_LIMIT)
289 if not artists:
290 return []
291
292 result = []
293 for artist in artists:
294 for track in artist.popular_tracks:
295 try:
296 result.append(parse_track(self, track))
297 except InvalidDataError as err:
298 self.logger.debug("Error parsing artist track: %s", err)
299 return result
300
301 # Library methods
302
303 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
304 """Retrieve library artists from Zvuk Music."""
305 collection = await self.client.get_collection()
306 if not collection or not collection.artists:
307 return
308
309 artist_ids = [str(item.id) for item in collection.artists if item.id]
310 for i in range(0, len(artist_ids), DEFAULT_LIMIT):
311 batch_ids = artist_ids[i : i + DEFAULT_LIMIT]
312 artists = await self.client.get_artists(batch_ids)
313 for artist in artists:
314 try:
315 yield parse_artist(self, artist)
316 except InvalidDataError as err:
317 self.logger.debug("Error parsing library artist: %s", err)
318
319 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
320 """Retrieve library albums from Zvuk Music."""
321 collection = await self.client.get_collection()
322 if not collection or not collection.releases:
323 return
324
325 release_ids = [str(item.id) for item in collection.releases if item.id]
326 for i in range(0, len(release_ids), DEFAULT_LIMIT):
327 batch_ids = release_ids[i : i + DEFAULT_LIMIT]
328 releases = await self.client.get_releases(batch_ids)
329 for release in releases:
330 try:
331 yield parse_album(self, release)
332 except InvalidDataError as err:
333 self.logger.debug("Error parsing library album: %s", err)
334
335 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
336 """Retrieve library tracks from Zvuk Music."""
337 tracks = await self.client.get_liked_tracks()
338 for track in tracks:
339 try:
340 yield parse_track(self, track)
341 except InvalidDataError as err:
342 self.logger.debug("Error parsing library track: %s", err)
343
344 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
345 """Retrieve library playlists from Zvuk Music."""
346 collection_items = await self.client.get_user_playlists()
347 if not collection_items:
348 return
349
350 playlist_ids = [str(item.id) for item in collection_items if item.id]
351 for i in range(0, len(playlist_ids), DEFAULT_LIMIT):
352 batch_ids = playlist_ids[i : i + DEFAULT_LIMIT]
353 playlists = await self.client.get_playlists(batch_ids)
354 for playlist in playlists:
355 try:
356 yield parse_playlist(self, playlist)
357 except InvalidDataError as err:
358 self.logger.debug("Error parsing library playlist: %s", err)
359
360 # Library edit methods
361
362 async def library_add(self, item: MediaItemType) -> bool:
363 """Add item to library.
364
365 :param item: The media item to add.
366 :return: True if successful.
367 """
368 prov_item_id = self._get_provider_item_id(item)
369 if not prov_item_id:
370 return False
371
372 if item.media_type == MediaType.TRACK:
373 return await self.client.like_track(prov_item_id)
374 if item.media_type == MediaType.ALBUM:
375 return await self.client.like_release(prov_item_id)
376 if item.media_type == MediaType.ARTIST:
377 return await self.client.like_artist(prov_item_id)
378 if item.media_type == MediaType.PLAYLIST:
379 return await self.client.like_playlist(prov_item_id)
380 return False
381
382 async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
383 """Remove item from library.
384
385 :param prov_item_id: The provider item ID.
386 :param media_type: The media type.
387 :return: True if successful.
388 """
389 if media_type == MediaType.TRACK:
390 return await self.client.unlike_track(prov_item_id)
391 if media_type == MediaType.ALBUM:
392 return await self.client.unlike_release(prov_item_id)
393 if media_type == MediaType.ARTIST:
394 return await self.client.unlike_artist(prov_item_id)
395 if media_type == MediaType.PLAYLIST:
396 return await self.client.unlike_playlist(prov_item_id)
397 return False
398
399 def _get_provider_item_id(self, item: MediaItemType) -> str | None:
400 """Get provider item ID from media item."""
401 for mapping in item.provider_mappings:
402 if mapping.provider_instance == self.instance_id:
403 return mapping.item_id
404 return item.item_id if item.provider == self.instance_id else None
405
406 # Playlist management
407
408 async def create_playlist(self, name: str) -> Playlist:
409 """Create a new playlist.
410
411 :param name: Playlist name.
412 :return: The created Playlist object.
413 """
414 playlist_id = await self.client.create_playlist(name)
415 playlist = await self.client.get_playlist(playlist_id)
416 if not playlist:
417 raise MediaNotFoundError(f"Created playlist {playlist_id} not found")
418 return parse_playlist(self, playlist)
419
420 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
421 """Add tracks to a playlist.
422
423 :param prov_playlist_id: The provider playlist ID.
424 :param prov_track_ids: List of track IDs to add.
425 """
426 await self.client.add_tracks_to_playlist(prov_playlist_id, prov_track_ids)
427
428 async def remove_playlist_tracks(
429 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
430 ) -> None:
431 """Remove tracks from a playlist by position.
432
433 :param prov_playlist_id: The provider playlist ID.
434 :param positions_to_remove: Tuple of track positions (0-based) to remove.
435 """
436 # Fetch current tracks and filter out the ones at given positions
437 simple_tracks = await self.client.get_playlist_tracks(prov_playlist_id, limit=10000)
438 remove_positions = set(positions_to_remove)
439 remaining_ids = [
440 str(t.id) for i, t in enumerate(simple_tracks) if t.id and i not in remove_positions
441 ]
442 await self.client.update_playlist(prov_playlist_id, remaining_ids)
443
444 # Streaming
445
446 async def get_stream_details( # noqa: PLR0915
447 self, item_id: str, media_type: MediaType = MediaType.TRACK
448 ) -> StreamDetails:
449 """Get stream details for a track.
450
451 :param item_id: The track ID.
452 :param media_type: The media type (should be TRACK).
453 :return: StreamDetails for the track.
454 :raises MediaNotFoundError: If stream URL cannot be obtained.
455 """
456 streams = await self.client.get_stream_urls(item_id)
457 if not streams:
458 raise MediaNotFoundError(f"No stream info available for track {item_id}")
459
460 stream = streams[0]
461 quality_pref = self.config.get_value(CONF_QUALITY)
462 quality_str = str(quality_pref) if quality_pref is not None else QUALITY_LOSSLESS
463
464 # Select quality with fallback chain
465 url: str | None = None
466 content_type = ContentType.UNKNOWN
467 bitrate = 0
468
469 if quality_str == QUALITY_LOSSLESS:
470 # Try FLAC -> HIGH -> MID
471 for quality in (Quality.FLAC, Quality.HIGH, Quality.MID):
472 try:
473 url = stream.get_url(quality)
474 if quality == Quality.FLAC:
475 content_type = ContentType.FLAC
476 bitrate = 0
477 elif quality == Quality.HIGH:
478 content_type = ContentType.MP3
479 bitrate = 320
480 else:
481 content_type = ContentType.MP3
482 bitrate = 128
483 break
484 except (SubscriptionRequiredError, QualityNotAvailableError):
485 continue
486 else:
487 # High quality: try HIGH -> MID
488 for quality in (Quality.HIGH, Quality.MID):
489 try:
490 url = stream.get_url(quality)
491 if quality == Quality.HIGH:
492 content_type = ContentType.MP3
493 bitrate = 320
494 else:
495 content_type = ContentType.MP3
496 bitrate = 128
497 break
498 except (SubscriptionRequiredError, QualityNotAvailableError):
499 continue
500
501 # Ultimate fallback
502 if not url:
503 best_quality, url = stream.get_best_available()
504 if best_quality == Quality.FLAC:
505 content_type = ContentType.FLAC
506 bitrate = 0
507 elif best_quality == Quality.HIGH:
508 content_type = ContentType.MP3
509 bitrate = 320
510 else:
511 content_type = ContentType.MP3
512 bitrate = 128
513
514 if not url:
515 raise MediaNotFoundError(f"No stream URL available for track {item_id}")
516
517 # zvuk-music Stream model (get_stream_urls) has no duration; only expire and URLs.
518 # Fetch track for duration so StreamDetails can expose it (e.g. for progress/seeking).
519 track = await self.client.get_track(item_id)
520 duration: int | None = None
521 if track is not None and getattr(track, "duration", None) is not None:
522 duration = int(track.duration)
523
524 return StreamDetails(
525 item_id=item_id,
526 provider=self.instance_id,
527 audio_format=AudioFormat(
528 content_type=content_type,
529 bit_rate=bitrate,
530 ),
531 stream_type=StreamType.HTTP,
532 path=url,
533 duration=duration,
534 allow_seek=True,
535 can_seek=True,
536 )
537