music-assistant-server

6.2 KBPY
user.py
6.2 KB168 lines • python
1"""User adapter for nicovideo."""
2
3from __future__ import annotations
4
5from typing import TYPE_CHECKING
6
7from music_assistant.providers.nicovideo.constants import SENSITIVE_CONTENTS
8from music_assistant.providers.nicovideo.services.base import NicovideoBaseService
9
10if TYPE_CHECKING:
11    from typing import Literal
12
13    from music_assistant_models.media_items import Artist, Track
14
15    from music_assistant.providers.nicovideo.services.manager import NicovideoServiceManager
16
17# Import at runtime for isinstance checks
18from niconico.objects.video import EssentialVideo
19
20
21class NicovideoUserService(NicovideoBaseService):
22    """Get user details from nicovideo."""
23
24    def __init__(self, service_manager: NicovideoServiceManager) -> None:
25        """Initialize NicovideoUserService with reference to parent service manager."""
26        super().__init__(service_manager)
27
28    async def get_user(self, user_id: str) -> Artist | None:
29        """Get user details as Artist."""
30        user = await self.service_manager._call_with_throttler(
31            self.niconico_py_client.user.get_user, user_id
32        )
33        return self.converter_manager.artist.convert_by_owner_or_user(user) if user else None
34
35    async def get_recommendations(
36        self,
37        recipe_id: Literal[
38            "video_watch_recommendation", "video_recommendation_recommend", "video_top_recommend"
39        ] = "video_watch_recommendation",
40        limit: int = 25,
41    ) -> list[Track]:
42        """Get recommendations from nicovideo."""
43        recommendations = await self.service_manager._call_with_throttler(
44            self.niconico_py_client.user.get_recommendations,
45            recipe_id,
46            limit=limit,
47            sensitive_contents=SENSITIVE_CONTENTS,
48        )
49        if not recommendations or not recommendations.items:
50            return []
51
52        tracks = []
53        for item in recommendations.items:
54            # Only process video content, skip user recommendations
55            if item.content_type != "video":
56                continue
57
58            # Type check to ensure content is EssentialVideo
59            if isinstance(item.content, EssentialVideo):
60                track = self.converter_manager.track.convert_by_essential_video(item.content)
61                if track:
62                    tracks.append(track)
63        return tracks
64
65    async def get_similar_tracks(self, track_id: str, limit: int = 25) -> list[Track]:
66        """Get tracks similar to the given track."""
67        recommendation_api_item = await self.service_manager._call_with_throttler(
68            self.niconico_py_client.user.get_recommendations,
69            "video_watch_recommendation",
70            video_id=track_id,
71            limit=limit,
72            sensitive_contents=SENSITIVE_CONTENTS,
73        )
74        if not recommendation_api_item or not recommendation_api_item.items:
75            return []
76
77        tracks = []
78        for item in recommendation_api_item.items:
79            # Only process video content
80            if item.content_type != "video":
81                continue
82
83            # Type check to ensure content is EssentialVideo
84            if isinstance(item.content, EssentialVideo):
85                track = self.converter_manager.track.convert_by_essential_video(item.content)
86                if track:
87                    tracks.append(track)
88        return tracks
89
90    async def get_like_history(self, limit: int = 25) -> list[Track]:
91        """Get user's like history from nicovideo."""
92        # Calculate page_size based on limit
93        page_size = min(limit, 25)  # API max is 25 for like history
94        like_history = await self.service_manager._call_with_throttler(
95            self.niconico_py_client.video.get_like_history,
96            page_size=page_size,
97            page=1,
98        )
99        if not like_history or not like_history.items:
100            return []
101
102        tracks = []
103        for item in like_history.items:
104            track = self.converter_manager.track.convert_by_essential_video(item.video)
105            if track:
106                tracks.append(track)
107        return tracks
108
109    async def get_user_history(self, limit: int = 30) -> list[Track]:
110        """Get user's history from nicovideo."""
111        # Calculate page_size based on limit
112        page_size = min(limit, 100)  # API max is 100
113        history = await self.service_manager._call_with_throttler(
114            self.niconico_py_client.video.get_history,
115            page_size=page_size,
116            page=1,
117        )
118        if not history or not history.items:
119            return []
120
121        tracks = []
122        for item in history.items:
123            track = self.converter_manager.track.convert_by_essential_video(item.video)
124            if track:
125                tracks.append(track)
126        return tracks
127
128    async def get_following_activities(self, limit: int = 50) -> list[Track]:
129        """Get latest activities from followed users."""
130        feed_data = await self.service_manager._call_with_throttler(
131            self.niconico_py_client.user.get_following_activities,
132            endpoint="video",
133            context="header_timeline",
134            cursor=None,
135        )
136
137        if not feed_data:
138            return []
139
140        # Convert activities directly to tracks using lightweight conversion
141        tracks = []
142        for activity in feed_data.activities:
143            if activity.content and activity.content.video and "video" in activity.kind.lower():
144                track = self.converter_manager.track.convert_by_activity(activity)
145                if track:
146                    tracks.append(track)
147                if len(tracks) >= limit:
148                    break
149
150        return tracks
151
152    async def get_own_followings(self) -> list[Artist]:
153        """Get users the current user is following and convert them to Artists."""
154        followings_data = await self.service_manager._call_with_throttler(
155            self.niconico_py_client.user.get_own_followings,
156            page_size=25,
157            page=1,
158        )
159
160        if not followings_data or not followings_data.items:
161            return []
162
163        artists = []
164        for user in followings_data.items:
165            artist = self.converter_manager.artist.convert_by_owner_or_user(user)
166            artists.append(artist)
167        return artists
168