/
/
/
1"""Track converter for nicovideo objects."""
2
3from __future__ import annotations
4
5from datetime import datetime
6from typing import TYPE_CHECKING
7
8from music_assistant_models.enums import ImageType, LinkType
9from music_assistant_models.media_items import (
10 Artist,
11 AudioFormat,
12 ItemMapping,
13 MediaItemImage,
14 MediaItemLink,
15 MediaItemMetadata,
16 Track,
17)
18from music_assistant_models.unique_list import UniqueList
19from niconico.objects.video import EssentialVideo, Owner, VideoThumbnail
20
21from music_assistant.providers.nicovideo.converters.base import NicovideoConverterBase
22from music_assistant.providers.nicovideo.helpers import create_audio_format
23
24if TYPE_CHECKING:
25 from niconico.objects.nvapi import Activity
26 from niconico.objects.video.watch import WatchData, WatchVideo, WatchVideoThumbnail
27
28
29class NicovideoTrackConverter(NicovideoConverterBase):
30 """Handles track conversion for nicovideo."""
31
32 def convert_by_activity(self, activity: Activity) -> Track | None:
33 """Convert an Activity object from feed into a Track.
34
35 This is a lightweight conversion optimized for feed display,
36 using only the information available in the activity data.
37 Missing information like view counts and detailed metadata
38 will be absent, but this is acceptable for feed listings.
39 """
40 content = activity.content
41
42 # Only process video content
43 if content.type_ != "video" or not content.video:
44 return None
45
46 # Create audio format with minimal info
47 audio_format = create_audio_format()
48
49 # Build artists from actor information using ItemMapping
50 artists_list: UniqueList[Artist | ItemMapping] = UniqueList()
51 if activity.actor.id_ and activity.actor.name:
52 artist_mapping = ItemMapping(
53 item_id=activity.actor.id_,
54 provider=self.provider.domain,
55 name=activity.actor.name,
56 )
57 artists_list.append(artist_mapping)
58
59 # Create track with available information
60 return Track(
61 item_id=content.id_,
62 provider=self.provider.instance_id,
63 name=content.title,
64 duration=content.video.duration,
65 artists=artists_list,
66 # Assume playable if duration > 0 (we don't have payment info here)
67 is_playable=content.video.duration > 0,
68 metadata=self._create_track_metadata(
69 video_id=content.id_,
70 release_date_str=content.started_at,
71 thumbnail_url=activity.thumbnail_url,
72 ),
73 provider_mappings=self.helper.create_provider_mapping(
74 item_id=content.id_,
75 url_path="watch",
76 # We don't have availability info, so default to True if playable
77 available=content.video.duration > 0,
78 audio_format=audio_format,
79 ),
80 )
81
82 def convert_by_essential_video(self, video: EssentialVideo) -> Track | None:
83 """Convert an EssentialVideo object into a Track."""
84 # Skip muted videos
85 if video.is_muted:
86 return None
87
88 # Calculate popularity using standard formula
89 popularity = self.helper.calculate_popularity(
90 mylist_count=video.count.mylist,
91 like_count=video.count.like,
92 )
93
94 # Since EssentialVideo doesn't have detailed audio format info, we use defaults
95 audio_format = create_audio_format()
96
97 # Build artists using artist converter (prefer full Artist over ItemMapping)
98 artists_list: UniqueList[Artist | ItemMapping] = UniqueList()
99 if video.owner.id_ is not None:
100 artist_obj = self.converter_manager.artist.convert_by_owner_or_user(video.owner)
101 artists_list.append(artist_obj)
102
103 # Create base track with enhanced metadata
104 return Track(
105 item_id=video.id_,
106 provider=self.provider.instance_id,
107 name=video.title,
108 duration=video.duration,
109 artists=artists_list,
110 # Videos that cannot be played will have a duration of 0.
111 is_playable=video.duration > 0 and not video.is_payment_required,
112 metadata=self._create_track_metadata(
113 video_id=video.id_,
114 description=video.short_description,
115 explicit=video.require_sensitive_masking,
116 release_date_str=video.registered_at,
117 popularity=popularity,
118 thumbnail=video.thumbnail,
119 ),
120 provider_mappings=self.helper.create_provider_mapping(
121 item_id=video.id_,
122 url_path="watch",
123 available=self.is_video_available(video),
124 audio_format=audio_format,
125 ),
126 )
127
128 def convert_by_watch_data(self, watch_data: WatchData) -> Track | None:
129 """Convert a WatchData object into a Track."""
130 video = watch_data.video
131
132 # Skip deleted, private, or muted videos
133 if video.is_deleted or video.is_private:
134 return None
135
136 # Calculate popularity using standard formula
137 popularity = self.helper.calculate_popularity(
138 mylist_count=video.count.mylist,
139 like_count=video.count.like,
140 )
141
142 # Create owner object for artist conversion based on channel vs user video
143 if watch_data.channel:
144 # Channel video case
145 owner = Owner(
146 ownerType="channel",
147 type="channel",
148 visibility="visible",
149 id=watch_data.channel.id_,
150 name=watch_data.channel.name,
151 iconUrl=watch_data.channel.thumbnail.url if watch_data.channel.thumbnail else None,
152 )
153 else:
154 # User video case
155 owner = Owner(
156 ownerType="user",
157 type="user",
158 visibility="visible",
159 id=str(watch_data.owner.id_) if watch_data.owner else None,
160 name=watch_data.owner.nickname if watch_data.owner else None,
161 iconUrl=watch_data.owner.icon_url if watch_data.owner else None,
162 )
163
164 # Create audio format from watch data
165 audio_format = self._create_audio_format_from_watch_data(watch_data)
166
167 # Build artists using artist converter (avoid adding if owner id is missing)
168 artists_list: UniqueList[Artist | ItemMapping] = UniqueList()
169 if owner.id_ is not None:
170 artist_obj = self.converter_manager.artist.convert_by_owner_or_user(owner)
171 artists_list.append(artist_obj)
172
173 # Create base track with enhanced metadata
174 track = Track(
175 item_id=video.id_,
176 provider=self.provider.instance_id,
177 name=video.title,
178 duration=video.duration,
179 artists=artists_list,
180 # Videos that cannot be played will have a duration of 0.
181 is_playable=video.duration > 0 and not video.is_authentication_required,
182 metadata=self._create_track_metadata_from_watch_video(
183 video=video,
184 watch_data=watch_data,
185 popularity=popularity,
186 ),
187 provider_mappings=self.helper.create_provider_mapping(
188 item_id=video.id_,
189 url_path="watch",
190 available=self.is_video_available(video),
191 audio_format=audio_format,
192 ),
193 )
194
195 # Add album information if series data is available (prefer full Album over ItemMapping)
196 if watch_data.series is not None:
197 track.album = self.converter_manager.album.convert_by_series(
198 watch_data.series,
199 artists_list=artists_list,
200 )
201
202 return track
203
204 def _create_audio_format_from_watch_data(self, watch_data: WatchData) -> AudioFormat | None:
205 """Create AudioFormat from WatchData audio information.
206
207 Args:
208 watch_data: WatchData object containing media information.
209
210 Returns:
211 AudioFormat object if audio information is available, None otherwise.
212 """
213 if (
214 not watch_data.media
215 or not watch_data.media.domand
216 or not watch_data.media.domand.audios
217 ):
218 return None
219
220 # Use the first available audio stream (typically the highest quality)
221 audio = watch_data.media.domand.audios[0]
222
223 if not audio.is_available:
224 return None
225
226 return create_audio_format(
227 sample_rate=audio.sampling_rate,
228 bit_rate=audio.bit_rate,
229 )
230
231 def _create_track_metadata_from_watch_video(
232 self,
233 video: WatchVideo,
234 watch_data: WatchData,
235 *,
236 popularity: int | None = None,
237 ) -> MediaItemMetadata:
238 """Create track metadata from WatchVideo object."""
239 metadata = MediaItemMetadata()
240
241 if video.description:
242 metadata.description = video.description
243
244 if video.registered_at:
245 try:
246 # Handle both direct ISO format and Z-suffixed format
247 if video.registered_at.endswith("Z"):
248 clean_date_str = video.registered_at.replace("Z", "+00:00")
249 metadata.release_date = datetime.fromisoformat(clean_date_str)
250 else:
251 metadata.release_date = datetime.fromisoformat(video.registered_at)
252 except (ValueError, AttributeError) as err:
253 # Log debug message for date parsing failures to help with troubleshooting
254 self.logger.debug(
255 "Failed to convert release date '%s': %s", video.registered_at, err
256 )
257
258 if popularity is not None:
259 metadata.popularity = popularity
260
261 # Add tag information as genres
262 if watch_data.tag and watch_data.tag.items:
263 # Extract tag names from tag items and create genres set
264 tag_names: list[str] = []
265 for tag_item in watch_data.tag.items:
266 tag_names.append(tag_item.name)
267
268 if tag_names:
269 metadata.genres = set(tag_names)
270
271 # Add thumbnail images
272 if video.thumbnail:
273 metadata.images = self._convert_watch_video_thumbnails(video.thumbnail)
274
275 # Add video link
276 metadata.links = {
277 MediaItemLink(
278 type=LinkType.WEBSITE,
279 url=f"https://www.nicovideo.jp/watch/{video.id_}",
280 )
281 }
282
283 return metadata
284
285 def _convert_watch_video_thumbnails(
286 self, thumbnail: WatchVideoThumbnail
287 ) -> UniqueList[MediaItemImage]:
288 """Convert WatchVideo thumbnails into multiple image sizes."""
289 images: UniqueList[MediaItemImage] = UniqueList()
290
291 def _add_thumbnail_image(url: str) -> None:
292 images.append(
293 MediaItemImage(
294 type=ImageType.THUMB,
295 path=url,
296 provider=self.provider.instance_id,
297 remotely_accessible=True,
298 )
299 )
300
301 # Add main thumbnail URLs
302 if thumbnail.url:
303 _add_thumbnail_image(thumbnail.url)
304 if thumbnail.middle_url:
305 _add_thumbnail_image(thumbnail.middle_url)
306 if thumbnail.large_url:
307 _add_thumbnail_image(thumbnail.large_url)
308
309 return images
310
311 def _create_track_metadata(
312 self,
313 video_id: str,
314 *,
315 description: str | None = None,
316 explicit: bool | None = None,
317 release_date_str: str | None = None,
318 popularity: int | None = None,
319 thumbnail: VideoThumbnail | None = None,
320 thumbnail_url: str | None = None,
321 ) -> MediaItemMetadata:
322 """Create track metadata with common fields."""
323 metadata = MediaItemMetadata()
324
325 if description:
326 metadata.description = description
327
328 if explicit is not None:
329 metadata.explicit = explicit
330
331 if release_date_str:
332 try:
333 # Handle both direct ISO format and Z-suffixed format
334 if release_date_str.endswith("Z"):
335 clean_date_str = release_date_str.replace("Z", "+00:00")
336 metadata.release_date = datetime.fromisoformat(clean_date_str)
337 else:
338 metadata.release_date = datetime.fromisoformat(release_date_str)
339 except (ValueError, AttributeError) as err:
340 # Log debug message for date parsing failures to help with troubleshooting
341 self.logger.debug("Failed to convert release date '%s': %s", release_date_str, err)
342
343 if popularity is not None:
344 metadata.popularity = popularity
345
346 # Add thumbnail images with enhanced support
347 if thumbnail:
348 # Use enhanced thumbnail parsing for multiple sizes
349 metadata.images = self._convert_video_thumbnails(thumbnail)
350 elif thumbnail_url:
351 # Fallback to single thumbnail URL
352 metadata.images = UniqueList(
353 [
354 MediaItemImage(
355 type=ImageType.THUMB,
356 path=thumbnail_url,
357 provider=self.provider.instance_id,
358 remotely_accessible=True,
359 )
360 ]
361 )
362
363 # Add video link
364 metadata.links = {
365 MediaItemLink(
366 type=LinkType.WEBSITE,
367 url=f"https://www.nicovideo.jp/watch/{video_id}",
368 )
369 }
370
371 return metadata
372
373 def _convert_video_thumbnails(self, thumbnail: VideoThumbnail) -> UniqueList[MediaItemImage]:
374 """Convert video thumbnails into multiple image sizes."""
375 images: UniqueList[MediaItemImage] = UniqueList()
376
377 # nhd_url is the largest size, use it as primary
378 if thumbnail.nhd_url:
379 images.append(
380 MediaItemImage(
381 type=ImageType.THUMB,
382 path=thumbnail.nhd_url,
383 provider=self.provider.instance_id,
384 remotely_accessible=True,
385 )
386 )
387
388 # large_url as secondary (if different from nhd_url)
389 if thumbnail.large_url and thumbnail.large_url != thumbnail.nhd_url:
390 images.append(
391 MediaItemImage(
392 type=ImageType.THUMB,
393 path=thumbnail.large_url,
394 provider=self.provider.instance_id,
395 remotely_accessible=True,
396 )
397 )
398
399 # middle_url and listing_url are same size, skip them if nhd_url exists
400 # Only add if nhd_url is not available
401 if not thumbnail.nhd_url and thumbnail.middle_url:
402 images.append(
403 MediaItemImage(
404 type=ImageType.THUMB,
405 path=thumbnail.middle_url,
406 provider=self.provider.instance_id,
407 remotely_accessible=True,
408 )
409 )
410
411 return images
412
413 def is_video_available(self, video: EssentialVideo | WatchVideo) -> bool:
414 """Check if a video is available for playback.
415
416 Args:
417 video: Either EssentialVideo or WatchVideo object.
418
419 Returns:
420 True if the video is available for playback, False otherwise.
421 """
422 # Common check: duration must be greater than 0
423 if video.duration <= 0:
424 return False
425
426 # Type-specific availability checks
427 if isinstance(video, EssentialVideo):
428 return not video.is_payment_required and not video.is_muted
429 # WatchVideo
430 return not video.is_deleted
431