music-assistant-server

3.9 KBPY
track.py
3.9 KB100 lines • python
1"""MixIn for NicovideoMusicProvider: track-related methods."""
2
3from __future__ import annotations
4
5from collections.abc import AsyncGenerator
6from typing import TYPE_CHECKING, override
7
8import shortuuid
9from aiohttp import web
10from music_assistant_models.enums import ContentType, MediaType
11from music_assistant_models.errors import MediaNotFoundError
12from music_assistant_models.media_items import (
13    AudioFormat,
14    Track,
15)
16
17from music_assistant.controllers.cache import use_cache
18from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
19from music_assistant.providers.nicovideo.converters.stream import NicovideoStreamData
20from music_assistant.providers.nicovideo.helpers.hls_seek_optimizer import (
21    HLSSeekOptimizer,
22)
23from music_assistant.providers.nicovideo.provider_mixins.base import (
24    NicovideoMusicProviderMixinBase,
25)
26
27if TYPE_CHECKING:
28    from music_assistant_models.streamdetails import StreamDetails
29
30
31class NicovideoMusicProviderTrackMixin(NicovideoMusicProviderMixinBase):
32    """Track-related methods for NicovideoMusicProvider."""
33
34    @override
35    @use_cache(3600 * 24 * 14)  # Cache for 14 days
36    async def get_track(self, prov_track_id: str) -> Track:
37        """Get full track details by id."""
38        track = await self.service_manager.video.get_video(prov_track_id)
39        if not track:
40            raise MediaNotFoundError(f"Track with id {prov_track_id} not found on nicovideo.")
41        return track
42
43    @override
44    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
45        """Get stream details (streaming URL and format) for given item."""
46        if media_type is not MediaType.TRACK:
47            raise MediaNotFoundError(f"Media type {media_type} is not supported for stream details")
48        return await self.service_manager.video.get_stream_details(item_id)
49
50    @override
51    async def get_audio_stream(
52        self, streamdetails: StreamDetails, seek_position: int = 0
53    ) -> AsyncGenerator[bytes, None]:
54        """Get audio stream with dynamic playlist generation for optimized seeking.
55
56        Args:
57            streamdetails: Stream details containing domand_bid and parsed_playlist in data field
58            seek_position: Position to seek to in seconds
59
60        Yields:
61            Audio data bytes
62        """
63        if not isinstance(streamdetails.data, NicovideoStreamData):
64            msg = f"Invalid stream data type: {type(streamdetails.data)}"
65            raise TypeError(msg)
66
67        hls_data = streamdetails.data
68        processor = HLSSeekOptimizer(hls_data)
69        optimized_context = processor.create_stream_context(seek_position)
70
71        # Register dynamic route to serve HLS playlist
72        route_id = shortuuid.random(20)
73        route_path = f"/nicovideo_m3u8/{route_id}.m3u8"
74        playlist_url = f"{self.mass.streams.base_url}{route_path}"
75
76        async def _serve_hls_playlist(_request: web.Request) -> web.Response:
77            """Serve dynamically generated HLS playlist (.m3u8) file for seeking."""
78            return web.Response(
79                text=optimized_context.dynamic_playlist_text,
80                content_type="application/vnd.apple.mpegurl",
81            )
82
83        unregister = self.mass.streams.register_dynamic_route(route_path, _serve_hls_playlist)
84
85        try:
86            async for chunk in get_ffmpeg_stream(
87                audio_input=playlist_url,
88                input_format=streamdetails.audio_format,
89                output_format=AudioFormat(
90                    content_type=ContentType.NUT,
91                    sample_rate=streamdetails.audio_format.sample_rate,
92                    bit_depth=streamdetails.audio_format.bit_depth,
93                    channels=streamdetails.audio_format.channels,
94                ),
95                extra_input_args=optimized_context.extra_input_args,
96            ):
97                yield chunk
98        finally:
99            unregister()
100