music-assistant-server

7.1 KBPY
video.py
7.1 KB188 lines • python
1"""Video service for nicovideo."""
2
3from __future__ import annotations
4
5from typing import TYPE_CHECKING
6from urllib.parse import urljoin
7
8from music_assistant_models.errors import InvalidDataError, UnplayableMediaError
9
10from music_assistant.providers.nicovideo.constants import (
11    DOMAND_BID_COOKIE_NAME,
12    NICOVIDEO_USER_AGENT,
13    SENSITIVE_CONTENTS,
14)
15from music_assistant.providers.nicovideo.converters.stream import (
16    StreamConversionData,
17)
18from music_assistant.providers.nicovideo.services.base import NicovideoBaseService
19
20if TYPE_CHECKING:
21    from music_assistant_models.media_items import Track
22    from music_assistant_models.streamdetails import StreamDetails
23    from niconico.objects.video.watch import WatchData, WatchMediaDomandAudio
24
25    from music_assistant.providers.nicovideo.services.manager import NicovideoServiceManager
26
27
28class NicovideoVideoService(NicovideoBaseService):
29    """Handles video and stream related operations for nicovideo."""
30
31    def __init__(self, service_manager: NicovideoServiceManager) -> None:
32        """Initialize NicovideoVideoService with reference to parent service manager."""
33        super().__init__(service_manager)
34
35    async def get_user_videos(
36        self, user_id: str, page: int = 1, page_size: int = 50
37    ) -> list[Track]:
38        """Get user videos and convert as Track list."""
39        user_video_data = await self.service_manager._call_with_throttler(
40            self.niconico_py_client.user.get_user_videos,
41            user_id,
42            page=page,
43            page_size=page_size,
44            sensitive_contents=SENSITIVE_CONTENTS,
45        )
46        if not user_video_data or not user_video_data.items:
47            return []
48        tracks = []
49        for item in user_video_data.items:
50            track = self.converter_manager.track.convert_by_essential_video(item.essential)
51            if track:
52                tracks.append(track)
53        return tracks
54
55    async def get_video(self, video_id: str) -> Track | None:
56        """Get video details using WatchData and convert as Track."""
57        watch_data = await self.service_manager._call_with_throttler(
58            self.niconico_py_client.video.watch.get_watch_data, video_id
59        )
60
61        if watch_data:
62            return self.converter_manager.track.convert_by_watch_data(watch_data)
63
64        return None
65
66    async def get_stream_details(self, video_id: str) -> StreamDetails:
67        """Get StreamDetails for a video using WatchData and converter."""
68        conversion_data = await self._prepare_conversion_data(video_id)
69        return self.converter_manager.stream.convert_from_conversion_data(conversion_data)
70
71    async def _prepare_conversion_data(self, video_id: str) -> StreamConversionData:
72        """Prepare StreamConversionData for a video."""
73        # 1. Fetch watch data
74        watch_data = await self.service_manager._call_with_throttler(
75            self.niconico_py_client.video.watch.get_watch_data, video_id
76        )
77        if not watch_data:
78            raise UnplayableMediaError("Failed to fetch watch data")
79
80        # 2. Select best available audio
81        selected_audio = self._select_best_audio(watch_data)
82
83        # 3. Get HLS URL for selected audio
84        hls_url = await self._get_hls_url(watch_data, selected_audio)
85
86        # 4. Get domand_bid for ffmpeg headers
87        domand_bid = self.niconico_py_client.session.cookies.get(DOMAND_BID_COOKIE_NAME)
88        if not domand_bid:
89            raise UnplayableMediaError("Failed to fetch domand_bid")
90
91        # 5. Fetch HLS playlist text
92        playlist_text = await self._fetch_media_playlist_text(hls_url, domand_bid)
93
94        # 6. Return conversion data
95        return StreamConversionData(
96            watch_data=watch_data,
97            selected_audio=selected_audio,
98            hls_url=hls_url,
99            domand_bid=domand_bid,
100            hls_playlist_text=playlist_text,
101        )
102
103    def _select_best_audio(self, watch_data: WatchData) -> WatchMediaDomandAudio:
104        """Select the best available audio from WatchData."""
105        best_audio = None
106        best_quality = -1
107        for audio in watch_data.media.domand.audios:
108            if audio.is_available and audio.quality_level > best_quality:
109                best_audio = audio
110                best_quality = audio.quality_level
111
112        if not best_audio:
113            raise UnplayableMediaError("No available audio found")
114
115        return best_audio
116
117    async def _get_hls_url(
118        self, watch_data: WatchData, selected_audio: WatchMediaDomandAudio
119    ) -> str:
120        """Get HLS URL for selected audio."""
121        # Create outputs list with selected audio ID only (audio-only)
122        outputs = [selected_audio.id_]
123
124        hls_url = await self.service_manager._call_with_throttler(
125            self.niconico_py_client.video.watch.get_hls_content_url,
126            watch_data,
127            [outputs],  # list[list[str]] format
128        )
129        if not hls_url:
130            raise UnplayableMediaError("Failed to get HLS content URL")
131
132        return str(hls_url)
133
134    async def _fetch_media_playlist_text(self, hls_url: str, domand_bid: str) -> str:
135        """Fetch media playlist text from HLS stream.
136
137        Args:
138            hls_url: URL to the HLS playlist (master or media)
139            domand_bid: Authentication cookie value
140
141        Returns:
142            Media playlist text (not parsed)
143        """
144        headers = {
145            "User-Agent": NICOVIDEO_USER_AGENT,
146            "Cookie": f"{DOMAND_BID_COOKIE_NAME}={domand_bid}",
147        }
148        session = self.service_manager.provider.mass.http_session
149
150        # Fetch master playlist
151        async with session.get(hls_url, headers=headers) as response:
152            response.raise_for_status()
153            master_playlist_text = await response.text()
154
155        # Check if this is already a media playlist (has #EXTINF)
156        if "#EXTINF:" in master_playlist_text:
157            return master_playlist_text
158
159        # Extract media playlist URL from master playlist
160        media_playlist_url = self._extract_media_playlist_url(master_playlist_text, hls_url)
161
162        # Fetch media playlist
163        async with session.get(media_playlist_url, headers=headers) as response:
164            response.raise_for_status()
165            return await response.text()
166
167    def _extract_media_playlist_url(self, master_playlist: str, base_url: str) -> str:
168        """Extract media playlist URL from master playlist.
169
170        Args:
171            master_playlist: Master playlist text
172            base_url: Base URL for resolving relative URLs
173
174        Returns:
175            Absolute URL to media playlist
176        """
177        lines = master_playlist.split("\n")
178        for i, line in enumerate(lines):
179            # Look for stream info line followed by URL
180            if line.startswith("#EXT-X-STREAM-INF:"):
181                if i + 1 < len(lines):
182                    media_url = lines[i + 1].strip()
183                    if media_url and not media_url.startswith("#"):
184                        # Resolve relative URL if needed
185                        return urljoin(base_url, media_url)
186        msg = f"No media playlist URL found in master playlist from {base_url}"
187        raise InvalidDataError(msg)
188