/
/
/
1"""MixIn for NicovideoMusicProvider: track-related methods."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator
6from typing import TYPE_CHECKING, override
7
8import shortuuid
9from aiohttp import web
10from music_assistant_models.enums import ContentType, MediaType
11from music_assistant_models.errors import MediaNotFoundError
12from music_assistant_models.media_items import (
13 AudioFormat,
14 Track,
15)
16
17from music_assistant.controllers.cache import use_cache
18from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
19from music_assistant.providers.nicovideo.converters.stream import NicovideoStreamData
20from music_assistant.providers.nicovideo.helpers.hls_seek_optimizer import (
21 HLSSeekOptimizer,
22)
23from music_assistant.providers.nicovideo.provider_mixins.base import (
24 NicovideoMusicProviderMixinBase,
25)
26
27if TYPE_CHECKING:
28 from music_assistant_models.streamdetails import StreamDetails
29
30
31class NicovideoMusicProviderTrackMixin(NicovideoMusicProviderMixinBase):
32 """Track-related methods for NicovideoMusicProvider."""
33
34 @override
35 @use_cache(3600 * 24 * 14) # Cache for 14 days
36 async def get_track(self, prov_track_id: str) -> Track:
37 """Get full track details by id."""
38 track = await self.service_manager.video.get_video(prov_track_id)
39 if not track:
40 raise MediaNotFoundError(f"Track with id {prov_track_id} not found on nicovideo.")
41 return track
42
43 @override
44 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
45 """Get stream details (streaming URL and format) for given item."""
46 if media_type is not MediaType.TRACK:
47 raise MediaNotFoundError(f"Media type {media_type} is not supported for stream details")
48 return await self.service_manager.video.get_stream_details(item_id)
49
50 @override
51 async def get_audio_stream(
52 self, streamdetails: StreamDetails, seek_position: int = 0
53 ) -> AsyncGenerator[bytes, None]:
54 """Get audio stream with dynamic playlist generation for optimized seeking.
55
56 Args:
57 streamdetails: Stream details containing domand_bid and parsed_playlist in data field
58 seek_position: Position to seek to in seconds
59
60 Yields:
61 Audio data bytes
62 """
63 if not isinstance(streamdetails.data, NicovideoStreamData):
64 msg = f"Invalid stream data type: {type(streamdetails.data)}"
65 raise TypeError(msg)
66
67 hls_data = streamdetails.data
68 processor = HLSSeekOptimizer(hls_data)
69 optimized_context = processor.create_stream_context(seek_position)
70
71 # Register dynamic route to serve HLS playlist
72 route_id = shortuuid.random(20)
73 route_path = f"/nicovideo_m3u8/{route_id}.m3u8"
74 playlist_url = f"{self.mass.streams.base_url}{route_path}"
75
76 async def _serve_hls_playlist(_request: web.Request) -> web.Response:
77 """Serve dynamically generated HLS playlist (.m3u8) file for seeking."""
78 return web.Response(
79 text=optimized_context.dynamic_playlist_text,
80 content_type="application/vnd.apple.mpegurl",
81 )
82
83 unregister = self.mass.streams.register_dynamic_route(route_path, _serve_hls_playlist)
84
85 try:
86 async for chunk in get_ffmpeg_stream(
87 audio_input=playlist_url,
88 input_format=streamdetails.audio_format,
89 output_format=AudioFormat(
90 content_type=ContentType.NUT,
91 sample_rate=streamdetails.audio_format.sample_rate,
92 bit_depth=streamdetails.audio_format.bit_depth,
93 channels=streamdetails.audio_format.channels,
94 ),
95 extra_input_args=optimized_context.extra_input_args,
96 ):
97 yield chunk
98 finally:
99 unregister()
100