music-assistant-server

10.9 KBPY
helpers.py
10.9 KB329 lines • python
1"""Helpers/utilities for the Internet Archive provider."""
2
3from __future__ import annotations
4
5import json
6import re
7from typing import TYPE_CHECKING, Any
8from urllib.parse import quote
9
10import aiohttp
11from music_assistant_models.errors import (
12    InvalidDataError,
13    MediaNotFoundError,
14    ResourceTemporarilyUnavailable,
15)
16
17from .constants import (
18    IA_DETAILS_URL,
19    IA_DOWNLOAD_URL,
20    IA_METADATA_URL,
21    IA_SEARCH_URL,
22    PREFERRED_AUDIO_FORMATS,
23    SUPPORTED_AUDIO_FORMATS,
24)
25
26if TYPE_CHECKING:
27    from music_assistant import MusicAssistant
28
29
30class InternetArchiveClient:
31    """Client for communicating with the Internet Archive API."""
32
33    def __init__(self, mass: MusicAssistant) -> None:
34        """Initialize the Internet Archive client."""
35        self.mass = mass
36
37    async def _get_json(self, url: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
38        """Make a GET request and return JSON response with proper error handling."""
39        try:
40            async with self.mass.http_session.get(
41                url, params=params, timeout=aiohttp.ClientTimeout(total=30)
42            ) as response:
43                if response.status == 429:
44                    # Rate limited - let throttler handle this
45                    backoff_time = int(response.headers.get("Retry-After", 60))
46                    raise ResourceTemporarilyUnavailable(
47                        "Internet Archive rate limit exceeded", backoff_time=backoff_time
48                    )
49
50                if response.status == 404:
51                    raise MediaNotFoundError("Item not found on Internet Archive")
52
53                if response.status >= 500:
54                    raise ResourceTemporarilyUnavailable(
55                        "Internet Archive server error", backoff_time=30
56                    )
57
58                response.raise_for_status()
59                json_data = await response.json()
60
61                if not isinstance(json_data, dict):
62                    raise InvalidDataError(f"Expected JSON object, got {type(json_data).__name__}")
63
64                return json_data
65
66        except aiohttp.ClientError as err:
67            raise ResourceTemporarilyUnavailable(f"Network error: {err}") from err
68        except TimeoutError as err:
69            raise ResourceTemporarilyUnavailable(f"Request timeout: {err}") from err
70        except json.JSONDecodeError as err:
71            raise InvalidDataError(f"Invalid JSON response: {err}") from err
72
73    async def search(
74        self,
75        query: str,
76        mediatype: str | None = None,
77        collection: str | None = None,
78        rows: int = 50,
79        page: int = 1,
80        sort: str | None = None,
81    ) -> dict[str, Any]:
82        """
83        Search the Internet Archive using the advanced search API.
84
85        Args:
86            query: Search query string
87            mediatype: Optional media type filter (e.g., 'audio')
88            collection: Optional collection filter (e.g., 'etree')
89            rows: Number of results per page (max 200)
90            page: Page number for pagination
91            sort: Sort order (e.g., 'downloads desc', 'date desc')
92
93        Returns:
94            Search response dictionary containing results and metadata
95        """
96        params: dict[str, Any] = {
97            "output": "json",
98            "rows": min(rows, 200),  # IA limits to 200 per request
99            "page": page,
100            "q": query,
101        }
102        if sort:
103            params["sort"] = sort
104
105        return await self._get_json(IA_SEARCH_URL, params)
106
107    async def get_metadata(self, identifier: str) -> dict[str, Any]:
108        """Get metadata for a specific Internet Archive item."""
109        url = f"{IA_METADATA_URL}/{identifier}"
110        return await self._get_json(url)
111
112    async def get_files(self, identifier: str) -> list[dict[str, Any]]:
113        """Get file list for an Internet Archive item."""
114        metadata = await self.get_metadata(identifier)
115        return list(metadata.get("files", []))
116
117    async def get_audio_files(self, identifier: str) -> list[dict[str, Any]]:
118        """
119        Get audio files for an item with format preference and deduplication.
120
121        Filters for supported audio formats, removes derivative low-quality files,
122        deduplicates by base filename, and selects the best quality format for
123        each unique track.
124
125        Args:
126            identifier: Internet Archive item identifier
127
128        Returns:
129            List of audio file information dictionaries, sorted by filename
130            for proper track ordering
131        """
132        files = await self.get_files(identifier)
133        files_by_basename: dict[str, list[dict[str, Any]]] = {}
134
135        for file_info in files:
136            filename = file_info.get("name", "")
137            file_format = file_info.get("format", "").lower()
138
139            if not self._is_supported_audio_format(file_format):
140                continue
141            if self._is_derivative_file(file_info, filename):
142                continue
143
144            base_name = self._get_base_filename(filename)
145            files_by_basename.setdefault(base_name, []).append(file_info)
146
147        preferred_files: list[dict[str, Any]] = []
148        for format_versions in files_by_basename.values():
149            best_file = self._select_best_audio_format(format_versions)
150            if best_file:
151                preferred_files.append(best_file)
152
153        return sorted(preferred_files, key=lambda x: x.get("name", ""))
154
155    def _is_supported_audio_format(self, file_format: str) -> bool:
156        """Check if the file format is a supported audio format."""
157        return any(fmt in file_format for fmt in SUPPORTED_AUDIO_FORMATS)
158
159    def _is_derivative_file(self, file_info: dict[str, Any], filename: str) -> bool:
160        """Check if a file is a derivative (low-quality) version."""
161        return file_info.get("source", "") == "derivative" and any(
162            skip in filename.lower() for skip in ("_64kb", "_vbr", "_sample", "_preview")
163        )
164
165    def _get_base_filename(self, filename: str) -> str:
166        """Extract base filename without extension and quality indicators for deduplication."""
167        # Remove extension first
168        base = filename.rsplit(".", 1)[0] if "." in filename else filename
169
170        # Remove common quality indicators from Internet Archive files
171        quality_patterns = [
172            r"_320kb$",
173            r"_256kb$",
174            r"_192kb$",
175            r"_128kb$",
176            r"_64kb$",
177            r"_vbr$",
178            r"_original$",
179            r"_sample$",
180            r"_preview$",
181        ]
182
183        for pattern in quality_patterns:
184            base = re.sub(pattern, "", base, flags=re.IGNORECASE)
185
186        return base
187
188    def _select_best_audio_format(
189        self, format_versions: list[dict[str, Any]]
190    ) -> dict[str, Any] | None:
191        """
192        Select the best audio format from available versions.
193
194        Prefers higher quality formats based on PREFERRED_AUDIO_FORMATS ordering.
195        Falls back to first available if no preferred format is found.
196
197        Args:
198            format_versions: List of file info dictionaries for the same track
199
200        Returns:
201            Best quality file info dictionary, or None if no valid files
202        """
203        for preferred_format in PREFERRED_AUDIO_FORMATS:
204            for file_info in format_versions:
205                if preferred_format in file_info.get("format", "").lower():
206                    return file_info
207        return format_versions[0] if format_versions else None
208
209    def get_download_url(self, identifier: str, filename: str) -> str:
210        """
211        Get download URL for a specific file.
212
213        Args:
214            identifier: Internet Archive item identifier
215            filename: Name of the file to download
216
217        Returns:
218            Full download URL for the file
219        """
220        return f"{IA_DOWNLOAD_URL}/{identifier}/{quote(filename)}"
221
222    def get_item_url(self, identifier: str) -> str:
223        """
224        Get the details page URL for an Internet Archive item.
225
226        Args:
227            identifier: Internet Archive item identifier
228
229        Returns:
230            Full URL to the item's details page
231        """
232        return f"{IA_DETAILS_URL}/{identifier}"
233
234
235def parse_duration(duration_str: str) -> int | None:
236    """
237    Parse duration string to seconds.
238
239    Handles various duration formats commonly found in Internet Archive metadata:
240    - "1:23:45" (hours:minutes:seconds)
241    - "12:34" (minutes:seconds)
242    - "123" (seconds only)
243
244    Args:
245        duration_str: Duration string to parse
246
247    Returns:
248        Duration in seconds, or None if parsing fails
249    """
250    if not duration_str:
251        return None
252    try:
253        if ":" in duration_str:
254            parts = duration_str.split(":")
255            if len(parts) == 3:  # h:m:s
256                hours, minutes, seconds = map(float, parts)
257                return int(hours * 3600 + minutes * 60 + seconds)
258            if len(parts) == 2:  # m:s
259                minutes, seconds = map(float, parts)
260                return int(minutes * 60 + seconds)
261            return None
262        return int(float(duration_str))
263    except (ValueError, TypeError):
264        return None
265
266
267def clean_text(text: str | list[str] | None) -> str:
268    """
269    Clean and normalize text fields from Internet Archive metadata.
270
271    Internet Archive metadata can contain text as strings or lists of strings.
272    This function normalizes the input to a clean string.
273
274    Args:
275        text: Text to clean (string, list of strings, or None)
276
277    Returns:
278        Cleaned text string, or empty string if no valid text found
279    """
280    if not text:
281        return ""
282    if isinstance(text, list):
283        for item in text:
284            if isinstance(item, str) and item.strip():
285                return item.strip()
286        return ""
287    return text.strip()
288
289
290def extract_year(date_str: str | list[str] | None) -> int | None:
291    """
292    Extract year from Internet Archive date string.
293
294    Internet Archive dates can be in various formats. This function attempts
295    to extract a 4-digit year from the date string.
296
297    Args:
298        date_str: Date string or list to extract year from
299
300    Returns:
301        4-digit year as integer, or None if extraction fails
302    """
303    date_text = clean_text(date_str)
304    if not date_text:
305        return None
306    try:
307        match = re.search(r"\b(19\d{2}|20\d{2})\b", date_text)
308        return int(match.group(1)) if match else None
309    except (ValueError, TypeError):
310        return None
311
312
313def get_image_url(identifier: str, filename: str | None = None) -> str | None:
314    """
315    Get image URL for an Internet Archive item.
316
317    Args:
318        identifier: Internet Archive item identifier
319        filename: Optional specific image filename
320
321    Returns:
322        Full URL to the image, or None if identifier is missing
323    """
324    if not identifier:
325        return None
326    if filename:
327        return f"{IA_DOWNLOAD_URL}/{identifier}/{quote(filename)}"
328    return f"{IA_DOWNLOAD_URL}/{identifier}/__ia_thumb.jpg"
329