music-assistant-server

5.4 KBPY
hls.py
5.4 KB165 lines • python
1"""
2RFC 8216-based HLS utilities.
3
4For simple variant stream selection from master playlists, use helpers.playlists.parse_m3u.
5"""
6
7from __future__ import annotations
8
9from dataclasses import dataclass, field
10
11from music_assistant_models.errors import InvalidDataError
12
13
14@dataclass
15class HLSMediaSegment:
16    """Single HLS media segment entry with associated metadata."""
17
18    extinf_line: str = ""
19    segment_url: str = ""
20    key_line: str | None = None
21    byterange_line: str | None = None
22    discontinuity: bool = False
23    map_line: str | None = None
24    program_date_time: str | None = None
25
26    @property
27    def duration(self) -> float:
28        """Extract duration in seconds from #EXTINF line."""
29        try:
30            duration_part = self.extinf_line.split("#EXTINF:")[1].split(",", 1)[0]
31            return float(duration_part.strip())
32        except (IndexError, ValueError):
33            return 0.0
34
35    @property
36    def title(self) -> str | None:
37        """Extract optional title from #EXTINF line."""
38        try:
39            parts = self.extinf_line.split("#EXTINF:")[1].split(",", 1)
40            if len(parts) == 2:
41                title = parts[1].strip()
42                return title if title else None
43            return None
44        except IndexError:
45            return None
46
47
48@dataclass
49class HLSMediaPlaylist:
50    """
51    HLS media playlist structure with headers, segments, and footers preserved.
52
53    Note: header_lines excludes EXT-X-KEY and EXT-X-MAP tags. Per RFC 8216, these
54    tags apply to subsequent segments until overridden, so they're stored per-segment
55    for easier manipulation.
56    """
57
58    header_lines: list[str] = field(default_factory=list)
59    segments: list[HLSMediaSegment] = field(default_factory=list)
60    footer_lines: list[str] = field(default_factory=list)
61
62
63class HLSMediaPlaylistParser:
64    """RFC 8216-based HLS media playlist parser."""
65
66    def __init__(self, hls_playlist_text: str) -> None:
67        """Initialize parser with playlist text."""
68        self.hls_playlist_text = hls_playlist_text
69        self.result = HLSMediaPlaylist()
70        self.working_segment = HLSMediaSegment()
71        self.segments_started = False
72
73    def parse(self) -> HLSMediaPlaylist:
74        """Parse HLS media playlist text into structured data.
75
76        Returns:
77            HLSMediaPlaylist object with extracted structure
78
79        Raises:
80            InvalidDataError: If playlist doesn't start with #EXTM3U or has invalid format
81        """
82        lines = [line.strip() for line in self.hls_playlist_text.split("\n") if line.strip()]
83
84        if not lines or not lines[0].startswith("#EXTM3U"):
85            msg = "Invalid HLS playlist: must start with #EXTM3U"
86            raise InvalidDataError(msg)
87
88        for line in lines:
89            self.process_line(line)
90
91        if not self.result.segments:
92            msg = "Invalid HLS playlist: no segments found"
93            raise InvalidDataError(msg)
94
95        return self.result
96
97    def process_line(self, line: str) -> None:
98        """Process a single line from the playlist."""
99        if line.startswith("#EXTINF:"):
100            self._on_extinf(line)
101        elif line.startswith("#EXT-X-KEY:"):
102            self._on_key_line(line)
103        elif line.startswith("#EXT-X-MAP:"):
104            self._on_map_line(line)
105        elif line.startswith("#EXT-X-PROGRAM-DATE-TIME:"):
106            self._on_program_date_time(line)
107        elif line.startswith("#EXT-X-BYTERANGE:"):
108            self._on_byterange(line)
109        elif line.startswith("#EXT-X-DISCONTINUITY"):
110            self._on_discontinuity()
111        elif line.startswith("#EXT"):
112            self._on_ext_tag(line)
113        elif line.startswith("#"):
114            pass
115        elif self.working_segment.extinf_line:
116            self._on_segment_url(line)
117
118    def _on_extinf(self, line: str) -> None:
119        """Handle #EXTINF tag."""
120        if self.working_segment.extinf_line:
121            msg = (
122                f"Malformed HLS playlist: #EXTINF '{line}' found without "
123                f"preceding segment URL for '{self.working_segment.extinf_line}'"
124            )
125            raise InvalidDataError(msg)
126        self.segments_started = True
127        self.working_segment.extinf_line = line
128
129    def _on_key_line(self, line: str) -> None:
130        """Handle #EXT-X-KEY tag."""
131        self.working_segment.key_line = line
132
133    def _on_map_line(self, line: str) -> None:
134        """Handle #EXT-X-MAP tag."""
135        self.working_segment.map_line = line
136
137    def _on_program_date_time(self, line: str) -> None:
138        """Handle #EXT-X-PROGRAM-DATE-TIME tag."""
139        self.working_segment.program_date_time = line
140
141    def _on_byterange(self, line: str) -> None:
142        """Handle #EXT-X-BYTERANGE tag."""
143        self.working_segment.byterange_line = line
144
145    def _on_discontinuity(self) -> None:
146        """Handle #EXT-X-DISCONTINUITY tag."""
147        self.working_segment.discontinuity = True
148
149    def _on_ext_tag(self, line: str) -> None:
150        """Handle other #EXT tags."""
151        if self.segments_started:
152            self.result.footer_lines.append(line)
153        else:
154            self.result.header_lines.append(line)
155
156    def _on_segment_url(self, line: str) -> None:
157        """Handle segment URL following #EXTINF."""
158        self.working_segment.segment_url = line
159        self.result.segments.append(self.working_segment)
160
161        self.working_segment = HLSMediaSegment(
162            key_line=self.working_segment.key_line,
163            map_line=self.working_segment.map_line,
164        )
165