/
/
/
1"""
2nicovideo playlist mixin for Music Assistant.
3
4In this section, "Mylist" on niconico is treated as a playlist.
5"""
6
7from __future__ import annotations
8
9from collections.abc import AsyncGenerator
10from typing import override
11
12from music_assistant_models.errors import MediaNotFoundError
13from music_assistant_models.media_items import Playlist, Track # noqa: TC002 - used in @use_cache
14
15from music_assistant.controllers.cache import use_cache
16from music_assistant.providers.nicovideo.provider_mixins.base import (
17 NicovideoMusicProviderMixinBase,
18)
19
20
21class NicovideoMusicProviderPlaylistMixin(NicovideoMusicProviderMixinBase):
22 """Mixin class for handling playlist-related operations in NicovideoMusicProvider."""
23
24 @override
25 @use_cache(3600 * 24 * 14) # Cache for 14 days
26 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
27 """Get full playlist details by id."""
28 playlist_with_tracks = await self.service_manager.mylist.get_mylist_or_own_mylist(
29 prov_playlist_id, page_size=500
30 )
31 if not playlist_with_tracks:
32 raise MediaNotFoundError(f"Playlist with id {prov_playlist_id} not found on nicovideo.")
33 return playlist_with_tracks.playlist
34
35 @override
36 @use_cache(3600 * 3) # Cache for 3 hours
37 async def get_playlist_tracks(
38 self,
39 prov_playlist_id: str,
40 page: int = 0,
41 ) -> list[Track]:
42 """Get all playlist tracks for given playlist id."""
43 playlist_with_tracks = await self.service_manager.mylist.get_mylist_or_own_mylist(
44 prov_playlist_id, page_size=500, page=page + 1
45 )
46
47 return playlist_with_tracks.tracks if playlist_with_tracks else []
48
49 @override
50 async def get_library_playlists(
51 self,
52 ) -> AsyncGenerator[Playlist, None]:
53 """Retrieve library playlists from the provider."""
54 # Get own mylists (editable playlists)
55 own_mylists = await self.service_manager.mylist.get_own_mylists()
56 for mylist in own_mylists:
57 yield mylist
58 # Following mylists are not included in simplified config
59 return
60
61 @override
62 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
63 """Add track(s) to playlist."""
64 for track_id in prov_track_ids:
65 success = await self.service_manager.mylist.add_mylist_item(prov_playlist_id, track_id)
66 if success:
67 self.logger.debug(
68 "Successfully added track %s to playlist %s",
69 track_id,
70 prov_playlist_id,
71 )
72 else:
73 self.logger.warning(
74 "Failed to add track %s to playlist %s", track_id, prov_playlist_id
75 )
76
77 @override
78 async def remove_playlist_tracks(
79 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
80 ) -> None:
81 """Remove track(s) from playlist."""
82 # Get current playlist tracks to find track IDs at the specified positions
83 # Note: NicoNico's mylist does not allow duplicate entries of the same video_id
84 # within a single playlist. Therefore, mapping from 1-based positions to
85 # video_id is safe and uniquely identifies the target items.
86 playlist_tracks = await self.get_playlist_tracks(prov_playlist_id)
87
88 # Extract track IDs to remove based on positions
89 # Note: positions_to_remove uses 1-based indexing, so convert to 0-based
90 track_ids_to_remove = []
91 for position in positions_to_remove:
92 index = position - 1 # Convert from 1-based to 0-based indexing
93 if 0 <= index < len(playlist_tracks):
94 track_ids_to_remove.append(playlist_tracks[index].item_id)
95
96 if not track_ids_to_remove:
97 self.logger.warning(
98 "No valid tracks found to remove from playlist %s", prov_playlist_id
99 )
100 return
101
102 success = await self.service_manager.mylist.remove_mylist_items(
103 prov_playlist_id, track_ids_to_remove
104 )
105 if success:
106 self.logger.debug(
107 "Successfully removed %d tracks from playlist %s",
108 len(track_ids_to_remove),
109 prov_playlist_id,
110 )
111 else:
112 self.logger.warning("Failed to remove tracks from playlist %s", prov_playlist_id)
113
114 @override
115 async def create_playlist(self, name: str) -> Playlist:
116 """Create a new playlist on provider with given name."""
117 # Create a new mylist using niconico.py
118 create_result = await self.service_manager.mylist.create_mylist(
119 name, description="Created by Music Assistant", is_public=False
120 )
121
122 if not create_result:
123 raise MediaNotFoundError(f"Failed to create playlist '{name}' on nicovideo.")
124
125 # Get the created mylist details
126 mylist_id = str(create_result.mylist.id_)
127 playlist_with_tracks = await self.service_manager.mylist.get_own_mylist(
128 mylist_id, page_size=1
129 )
130
131 if not playlist_with_tracks:
132 raise MediaNotFoundError(
133 f"Failed to retrieve created playlist '{name}' from nicovideo."
134 )
135
136 self.logger.info("Successfully created playlist '%s' with ID %s", name, mylist_id)
137 return playlist_with_tracks.playlist
138