/
/
/
1"""Video service for nicovideo."""
2
3from __future__ import annotations
4
5from typing import TYPE_CHECKING
6from urllib.parse import urljoin
7
8from music_assistant_models.errors import InvalidDataError, UnplayableMediaError
9
10from music_assistant.providers.nicovideo.constants import (
11 DOMAND_BID_COOKIE_NAME,
12 NICOVIDEO_USER_AGENT,
13 SENSITIVE_CONTENTS,
14)
15from music_assistant.providers.nicovideo.converters.stream import (
16 StreamConversionData,
17)
18from music_assistant.providers.nicovideo.services.base import NicovideoBaseService
19
20if TYPE_CHECKING:
21 from music_assistant_models.media_items import Track
22 from music_assistant_models.streamdetails import StreamDetails
23 from niconico.objects.video.watch import WatchData, WatchMediaDomandAudio
24
25 from music_assistant.providers.nicovideo.services.manager import NicovideoServiceManager
26
27
28class NicovideoVideoService(NicovideoBaseService):
29 """Handles video and stream related operations for nicovideo."""
30
31 def __init__(self, service_manager: NicovideoServiceManager) -> None:
32 """Initialize NicovideoVideoService with reference to parent service manager."""
33 super().__init__(service_manager)
34
35 async def get_user_videos(
36 self, user_id: str, page: int = 1, page_size: int = 50
37 ) -> list[Track]:
38 """Get user videos and convert as Track list."""
39 user_video_data = await self.service_manager._call_with_throttler(
40 self.niconico_py_client.user.get_user_videos,
41 user_id,
42 page=page,
43 page_size=page_size,
44 sensitive_contents=SENSITIVE_CONTENTS,
45 )
46 if not user_video_data or not user_video_data.items:
47 return []
48 tracks = []
49 for item in user_video_data.items:
50 track = self.converter_manager.track.convert_by_essential_video(item.essential)
51 if track:
52 tracks.append(track)
53 return tracks
54
55 async def get_video(self, video_id: str) -> Track | None:
56 """Get video details using WatchData and convert as Track."""
57 watch_data = await self.service_manager._call_with_throttler(
58 self.niconico_py_client.video.watch.get_watch_data, video_id
59 )
60
61 if watch_data:
62 return self.converter_manager.track.convert_by_watch_data(watch_data)
63
64 return None
65
66 async def get_stream_details(self, video_id: str) -> StreamDetails:
67 """Get StreamDetails for a video using WatchData and converter."""
68 conversion_data = await self._prepare_conversion_data(video_id)
69 return self.converter_manager.stream.convert_from_conversion_data(conversion_data)
70
71 async def _prepare_conversion_data(self, video_id: str) -> StreamConversionData:
72 """Prepare StreamConversionData for a video."""
73 # 1. Fetch watch data
74 watch_data = await self.service_manager._call_with_throttler(
75 self.niconico_py_client.video.watch.get_watch_data, video_id
76 )
77 if not watch_data:
78 raise UnplayableMediaError("Failed to fetch watch data")
79
80 # 2. Select best available audio
81 selected_audio = self._select_best_audio(watch_data)
82
83 # 3. Get HLS URL for selected audio
84 hls_url = await self._get_hls_url(watch_data, selected_audio)
85
86 # 4. Get domand_bid for ffmpeg headers
87 domand_bid = self.niconico_py_client.session.cookies.get(DOMAND_BID_COOKIE_NAME)
88 if not domand_bid:
89 raise UnplayableMediaError("Failed to fetch domand_bid")
90
91 # 5. Fetch HLS playlist text
92 playlist_text = await self._fetch_media_playlist_text(hls_url, domand_bid)
93
94 # 6. Return conversion data
95 return StreamConversionData(
96 watch_data=watch_data,
97 selected_audio=selected_audio,
98 hls_url=hls_url,
99 domand_bid=domand_bid,
100 hls_playlist_text=playlist_text,
101 )
102
103 def _select_best_audio(self, watch_data: WatchData) -> WatchMediaDomandAudio:
104 """Select the best available audio from WatchData."""
105 best_audio = None
106 best_quality = -1
107 for audio in watch_data.media.domand.audios:
108 if audio.is_available and audio.quality_level > best_quality:
109 best_audio = audio
110 best_quality = audio.quality_level
111
112 if not best_audio:
113 raise UnplayableMediaError("No available audio found")
114
115 return best_audio
116
117 async def _get_hls_url(
118 self, watch_data: WatchData, selected_audio: WatchMediaDomandAudio
119 ) -> str:
120 """Get HLS URL for selected audio."""
121 # Create outputs list with selected audio ID only (audio-only)
122 outputs = [selected_audio.id_]
123
124 hls_url = await self.service_manager._call_with_throttler(
125 self.niconico_py_client.video.watch.get_hls_content_url,
126 watch_data,
127 [outputs], # list[list[str]] format
128 )
129 if not hls_url:
130 raise UnplayableMediaError("Failed to get HLS content URL")
131
132 return str(hls_url)
133
134 async def _fetch_media_playlist_text(self, hls_url: str, domand_bid: str) -> str:
135 """Fetch media playlist text from HLS stream.
136
137 Args:
138 hls_url: URL to the HLS playlist (master or media)
139 domand_bid: Authentication cookie value
140
141 Returns:
142 Media playlist text (not parsed)
143 """
144 headers = {
145 "User-Agent": NICOVIDEO_USER_AGENT,
146 "Cookie": f"{DOMAND_BID_COOKIE_NAME}={domand_bid}",
147 }
148 session = self.service_manager.provider.mass.http_session
149
150 # Fetch master playlist
151 async with session.get(hls_url, headers=headers) as response:
152 response.raise_for_status()
153 master_playlist_text = await response.text()
154
155 # Check if this is already a media playlist (has #EXTINF)
156 if "#EXTINF:" in master_playlist_text:
157 return master_playlist_text
158
159 # Extract media playlist URL from master playlist
160 media_playlist_url = self._extract_media_playlist_url(master_playlist_text, hls_url)
161
162 # Fetch media playlist
163 async with session.get(media_playlist_url, headers=headers) as response:
164 response.raise_for_status()
165 return await response.text()
166
167 def _extract_media_playlist_url(self, master_playlist: str, base_url: str) -> str:
168 """Extract media playlist URL from master playlist.
169
170 Args:
171 master_playlist: Master playlist text
172 base_url: Base URL for resolving relative URLs
173
174 Returns:
175 Absolute URL to media playlist
176 """
177 lines = master_playlist.split("\n")
178 for i, line in enumerate(lines):
179 # Look for stream info line followed by URL
180 if line.startswith("#EXT-X-STREAM-INF:"):
181 if i + 1 < len(lines):
182 media_url = lines[i + 1].strip()
183 if media_url and not media_url.startswith("#"):
184 # Resolve relative URL if needed
185 return urljoin(base_url, media_url)
186 msg = f"No media playlist URL found in master playlist from {base_url}"
187 raise InvalidDataError(msg)
188