music-assistant-server

15.6 KBPY
track.py
15.6 KB431 lines • python
1"""Track converter for nicovideo objects."""
2
3from __future__ import annotations
4
5from datetime import datetime
6from typing import TYPE_CHECKING
7
8from music_assistant_models.enums import ImageType, LinkType
9from music_assistant_models.media_items import (
10    Artist,
11    AudioFormat,
12    ItemMapping,
13    MediaItemImage,
14    MediaItemLink,
15    MediaItemMetadata,
16    Track,
17)
18from music_assistant_models.unique_list import UniqueList
19from niconico.objects.video import EssentialVideo, Owner, VideoThumbnail
20
21from music_assistant.providers.nicovideo.converters.base import NicovideoConverterBase
22from music_assistant.providers.nicovideo.helpers import create_audio_format
23
24if TYPE_CHECKING:
25    from niconico.objects.nvapi import Activity
26    from niconico.objects.video.watch import WatchData, WatchVideo, WatchVideoThumbnail
27
28
29class NicovideoTrackConverter(NicovideoConverterBase):
30    """Handles track conversion for nicovideo."""
31
32    def convert_by_activity(self, activity: Activity) -> Track | None:
33        """Convert an Activity object from feed into a Track.
34
35        This is a lightweight conversion optimized for feed display,
36        using only the information available in the activity data.
37        Missing information like view counts and detailed metadata
38        will be absent, but this is acceptable for feed listings.
39        """
40        content = activity.content
41
42        # Only process video content
43        if content.type_ != "video" or not content.video:
44            return None
45
46        # Create audio format with minimal info
47        audio_format = create_audio_format()
48
49        # Build artists from actor information using ItemMapping
50        artists_list: UniqueList[Artist | ItemMapping] = UniqueList()
51        if activity.actor.id_ and activity.actor.name:
52            artist_mapping = ItemMapping(
53                item_id=activity.actor.id_,
54                provider=self.provider.domain,
55                name=activity.actor.name,
56            )
57            artists_list.append(artist_mapping)
58
59        # Create track with available information
60        return Track(
61            item_id=content.id_,
62            provider=self.provider.instance_id,
63            name=content.title,
64            duration=content.video.duration,
65            artists=artists_list,
66            # Assume playable if duration > 0 (we don't have payment info here)
67            is_playable=content.video.duration > 0,
68            metadata=self._create_track_metadata(
69                video_id=content.id_,
70                release_date_str=content.started_at,
71                thumbnail_url=activity.thumbnail_url,
72            ),
73            provider_mappings=self.helper.create_provider_mapping(
74                item_id=content.id_,
75                url_path="watch",
76                # We don't have availability info, so default to True if playable
77                available=content.video.duration > 0,
78                audio_format=audio_format,
79            ),
80        )
81
82    def convert_by_essential_video(self, video: EssentialVideo) -> Track | None:
83        """Convert an EssentialVideo object into a Track."""
84        # Skip muted videos
85        if video.is_muted:
86            return None
87
88        # Calculate popularity using standard formula
89        popularity = self.helper.calculate_popularity(
90            mylist_count=video.count.mylist,
91            like_count=video.count.like,
92        )
93
94        # Since EssentialVideo doesn't have detailed audio format info, we use defaults
95        audio_format = create_audio_format()
96
97        # Build artists using artist converter (prefer full Artist over ItemMapping)
98        artists_list: UniqueList[Artist | ItemMapping] = UniqueList()
99        if video.owner.id_ is not None:
100            artist_obj = self.converter_manager.artist.convert_by_owner_or_user(video.owner)
101            artists_list.append(artist_obj)
102
103        # Create base track with enhanced metadata
104        return Track(
105            item_id=video.id_,
106            provider=self.provider.instance_id,
107            name=video.title,
108            duration=video.duration,
109            artists=artists_list,
110            # Videos that cannot be played will have a duration of 0.
111            is_playable=video.duration > 0 and not video.is_payment_required,
112            metadata=self._create_track_metadata(
113                video_id=video.id_,
114                description=video.short_description,
115                explicit=video.require_sensitive_masking,
116                release_date_str=video.registered_at,
117                popularity=popularity,
118                thumbnail=video.thumbnail,
119            ),
120            provider_mappings=self.helper.create_provider_mapping(
121                item_id=video.id_,
122                url_path="watch",
123                available=self.is_video_available(video),
124                audio_format=audio_format,
125            ),
126        )
127
128    def convert_by_watch_data(self, watch_data: WatchData) -> Track | None:
129        """Convert a WatchData object into a Track."""
130        video = watch_data.video
131
132        # Skip deleted, private, or muted videos
133        if video.is_deleted or video.is_private:
134            return None
135
136        # Calculate popularity using standard formula
137        popularity = self.helper.calculate_popularity(
138            mylist_count=video.count.mylist,
139            like_count=video.count.like,
140        )
141
142        # Create owner object for artist conversion based on channel vs user video
143        if watch_data.channel:
144            # Channel video case
145            owner = Owner(
146                ownerType="channel",
147                type="channel",
148                visibility="visible",
149                id=watch_data.channel.id_,
150                name=watch_data.channel.name,
151                iconUrl=watch_data.channel.thumbnail.url if watch_data.channel.thumbnail else None,
152            )
153        else:
154            # User video case
155            owner = Owner(
156                ownerType="user",
157                type="user",
158                visibility="visible",
159                id=str(watch_data.owner.id_) if watch_data.owner else None,
160                name=watch_data.owner.nickname if watch_data.owner else None,
161                iconUrl=watch_data.owner.icon_url if watch_data.owner else None,
162            )
163
164        # Create audio format from watch data
165        audio_format = self._create_audio_format_from_watch_data(watch_data)
166
167        # Build artists using artist converter (avoid adding if owner id is missing)
168        artists_list: UniqueList[Artist | ItemMapping] = UniqueList()
169        if owner.id_ is not None:
170            artist_obj = self.converter_manager.artist.convert_by_owner_or_user(owner)
171            artists_list.append(artist_obj)
172
173        # Create base track with enhanced metadata
174        track = Track(
175            item_id=video.id_,
176            provider=self.provider.instance_id,
177            name=video.title,
178            duration=video.duration,
179            artists=artists_list,
180            # Videos that cannot be played will have a duration of 0.
181            is_playable=video.duration > 0 and not video.is_authentication_required,
182            metadata=self._create_track_metadata_from_watch_video(
183                video=video,
184                watch_data=watch_data,
185                popularity=popularity,
186            ),
187            provider_mappings=self.helper.create_provider_mapping(
188                item_id=video.id_,
189                url_path="watch",
190                available=self.is_video_available(video),
191                audio_format=audio_format,
192            ),
193        )
194
195        # Add album information if series data is available (prefer full Album over ItemMapping)
196        if watch_data.series is not None:
197            track.album = self.converter_manager.album.convert_by_series(
198                watch_data.series,
199                artists_list=artists_list,
200            )
201
202        return track
203
204    def _create_audio_format_from_watch_data(self, watch_data: WatchData) -> AudioFormat | None:
205        """Create AudioFormat from WatchData audio information.
206
207        Args:
208            watch_data: WatchData object containing media information.
209
210        Returns:
211            AudioFormat object if audio information is available, None otherwise.
212        """
213        if (
214            not watch_data.media
215            or not watch_data.media.domand
216            or not watch_data.media.domand.audios
217        ):
218            return None
219
220        # Use the first available audio stream (typically the highest quality)
221        audio = watch_data.media.domand.audios[0]
222
223        if not audio.is_available:
224            return None
225
226        return create_audio_format(
227            sample_rate=audio.sampling_rate,
228            bit_rate=audio.bit_rate,
229        )
230
231    def _create_track_metadata_from_watch_video(
232        self,
233        video: WatchVideo,
234        watch_data: WatchData,
235        *,
236        popularity: int | None = None,
237    ) -> MediaItemMetadata:
238        """Create track metadata from WatchVideo object."""
239        metadata = MediaItemMetadata()
240
241        if video.description:
242            metadata.description = video.description
243
244        if video.registered_at:
245            try:
246                # Handle both direct ISO format and Z-suffixed format
247                if video.registered_at.endswith("Z"):
248                    clean_date_str = video.registered_at.replace("Z", "+00:00")
249                    metadata.release_date = datetime.fromisoformat(clean_date_str)
250                else:
251                    metadata.release_date = datetime.fromisoformat(video.registered_at)
252            except (ValueError, AttributeError) as err:
253                # Log debug message for date parsing failures to help with troubleshooting
254                self.logger.debug(
255                    "Failed to convert release date '%s': %s", video.registered_at, err
256                )
257
258        if popularity is not None:
259            metadata.popularity = popularity
260
261        # Add tag information as genres
262        if watch_data.tag and watch_data.tag.items:
263            # Extract tag names from tag items and create genres set
264            tag_names: list[str] = []
265            for tag_item in watch_data.tag.items:
266                tag_names.append(tag_item.name)
267
268            if tag_names:
269                metadata.genres = set(tag_names)
270
271        # Add thumbnail images
272        if video.thumbnail:
273            metadata.images = self._convert_watch_video_thumbnails(video.thumbnail)
274
275        # Add video link
276        metadata.links = {
277            MediaItemLink(
278                type=LinkType.WEBSITE,
279                url=f"https://www.nicovideo.jp/watch/{video.id_}",
280            )
281        }
282
283        return metadata
284
285    def _convert_watch_video_thumbnails(
286        self, thumbnail: WatchVideoThumbnail
287    ) -> UniqueList[MediaItemImage]:
288        """Convert WatchVideo thumbnails into multiple image sizes."""
289        images: UniqueList[MediaItemImage] = UniqueList()
290
291        def _add_thumbnail_image(url: str) -> None:
292            images.append(
293                MediaItemImage(
294                    type=ImageType.THUMB,
295                    path=url,
296                    provider=self.provider.instance_id,
297                    remotely_accessible=True,
298                )
299            )
300
301        # Add main thumbnail URLs
302        if thumbnail.url:
303            _add_thumbnail_image(thumbnail.url)
304        if thumbnail.middle_url:
305            _add_thumbnail_image(thumbnail.middle_url)
306        if thumbnail.large_url:
307            _add_thumbnail_image(thumbnail.large_url)
308
309        return images
310
311    def _create_track_metadata(
312        self,
313        video_id: str,
314        *,
315        description: str | None = None,
316        explicit: bool | None = None,
317        release_date_str: str | None = None,
318        popularity: int | None = None,
319        thumbnail: VideoThumbnail | None = None,
320        thumbnail_url: str | None = None,
321    ) -> MediaItemMetadata:
322        """Create track metadata with common fields."""
323        metadata = MediaItemMetadata()
324
325        if description:
326            metadata.description = description
327
328        if explicit is not None:
329            metadata.explicit = explicit
330
331        if release_date_str:
332            try:
333                # Handle both direct ISO format and Z-suffixed format
334                if release_date_str.endswith("Z"):
335                    clean_date_str = release_date_str.replace("Z", "+00:00")
336                    metadata.release_date = datetime.fromisoformat(clean_date_str)
337                else:
338                    metadata.release_date = datetime.fromisoformat(release_date_str)
339            except (ValueError, AttributeError) as err:
340                # Log debug message for date parsing failures to help with troubleshooting
341                self.logger.debug("Failed to convert release date '%s': %s", release_date_str, err)
342
343        if popularity is not None:
344            metadata.popularity = popularity
345
346        # Add thumbnail images with enhanced support
347        if thumbnail:
348            # Use enhanced thumbnail parsing for multiple sizes
349            metadata.images = self._convert_video_thumbnails(thumbnail)
350        elif thumbnail_url:
351            # Fallback to single thumbnail URL
352            metadata.images = UniqueList(
353                [
354                    MediaItemImage(
355                        type=ImageType.THUMB,
356                        path=thumbnail_url,
357                        provider=self.provider.instance_id,
358                        remotely_accessible=True,
359                    )
360                ]
361            )
362
363        # Add video link
364        metadata.links = {
365            MediaItemLink(
366                type=LinkType.WEBSITE,
367                url=f"https://www.nicovideo.jp/watch/{video_id}",
368            )
369        }
370
371        return metadata
372
373    def _convert_video_thumbnails(self, thumbnail: VideoThumbnail) -> UniqueList[MediaItemImage]:
374        """Convert video thumbnails into multiple image sizes."""
375        images: UniqueList[MediaItemImage] = UniqueList()
376
377        # nhd_url is the largest size, use it as primary
378        if thumbnail.nhd_url:
379            images.append(
380                MediaItemImage(
381                    type=ImageType.THUMB,
382                    path=thumbnail.nhd_url,
383                    provider=self.provider.instance_id,
384                    remotely_accessible=True,
385                )
386            )
387
388        # large_url as secondary (if different from nhd_url)
389        if thumbnail.large_url and thumbnail.large_url != thumbnail.nhd_url:
390            images.append(
391                MediaItemImage(
392                    type=ImageType.THUMB,
393                    path=thumbnail.large_url,
394                    provider=self.provider.instance_id,
395                    remotely_accessible=True,
396                )
397            )
398
399        # middle_url and listing_url are same size, skip them if nhd_url exists
400        # Only add if nhd_url is not available
401        if not thumbnail.nhd_url and thumbnail.middle_url:
402            images.append(
403                MediaItemImage(
404                    type=ImageType.THUMB,
405                    path=thumbnail.middle_url,
406                    provider=self.provider.instance_id,
407                    remotely_accessible=True,
408                )
409            )
410
411        return images
412
413    def is_video_available(self, video: EssentialVideo | WatchVideo) -> bool:
414        """Check if a video is available for playback.
415
416        Args:
417            video: Either EssentialVideo or WatchVideo object.
418
419        Returns:
420            True if the video is available for playback, False otherwise.
421        """
422        # Common check: duration must be greater than 0
423        if video.duration <= 0:
424            return False
425
426        # Type-specific availability checks
427        if isinstance(video, EssentialVideo):
428            return not video.is_payment_required and not video.is_muted
429        # WatchVideo
430        return not video.is_deleted
431