/
/
/
1"""Helpers/utilities for the Internet Archive provider."""
2
3from __future__ import annotations
4
5import json
6import re
7from typing import TYPE_CHECKING, Any
8from urllib.parse import quote
9
10import aiohttp
11from music_assistant_models.errors import (
12 InvalidDataError,
13 MediaNotFoundError,
14 ResourceTemporarilyUnavailable,
15)
16
17from .constants import (
18 IA_DETAILS_URL,
19 IA_DOWNLOAD_URL,
20 IA_METADATA_URL,
21 IA_SEARCH_URL,
22 PREFERRED_AUDIO_FORMATS,
23 SUPPORTED_AUDIO_FORMATS,
24)
25
26if TYPE_CHECKING:
27 from music_assistant import MusicAssistant
28
29
30class InternetArchiveClient:
31 """Client for communicating with the Internet Archive API."""
32
33 def __init__(self, mass: MusicAssistant) -> None:
34 """Initialize the Internet Archive client."""
35 self.mass = mass
36
37 async def _get_json(self, url: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
38 """Make a GET request and return JSON response with proper error handling."""
39 try:
40 async with self.mass.http_session.get(
41 url, params=params, timeout=aiohttp.ClientTimeout(total=30)
42 ) as response:
43 if response.status == 429:
44 # Rate limited - let throttler handle this
45 backoff_time = int(response.headers.get("Retry-After", 60))
46 raise ResourceTemporarilyUnavailable(
47 "Internet Archive rate limit exceeded", backoff_time=backoff_time
48 )
49
50 if response.status == 404:
51 raise MediaNotFoundError("Item not found on Internet Archive")
52
53 if response.status >= 500:
54 raise ResourceTemporarilyUnavailable(
55 "Internet Archive server error", backoff_time=30
56 )
57
58 response.raise_for_status()
59 json_data = await response.json()
60
61 if not isinstance(json_data, dict):
62 raise InvalidDataError(f"Expected JSON object, got {type(json_data).__name__}")
63
64 return json_data
65
66 except aiohttp.ClientError as err:
67 raise ResourceTemporarilyUnavailable(f"Network error: {err}") from err
68 except TimeoutError as err:
69 raise ResourceTemporarilyUnavailable(f"Request timeout: {err}") from err
70 except json.JSONDecodeError as err:
71 raise InvalidDataError(f"Invalid JSON response: {err}") from err
72
73 async def search(
74 self,
75 query: str,
76 mediatype: str | None = None,
77 collection: str | None = None,
78 rows: int = 50,
79 page: int = 1,
80 sort: str | None = None,
81 ) -> dict[str, Any]:
82 """
83 Search the Internet Archive using the advanced search API.
84
85 Args:
86 query: Search query string
87 mediatype: Optional media type filter (e.g., 'audio')
88 collection: Optional collection filter (e.g., 'etree')
89 rows: Number of results per page (max 200)
90 page: Page number for pagination
91 sort: Sort order (e.g., 'downloads desc', 'date desc')
92
93 Returns:
94 Search response dictionary containing results and metadata
95 """
96 params: dict[str, Any] = {
97 "output": "json",
98 "rows": min(rows, 200), # IA limits to 200 per request
99 "page": page,
100 "q": query,
101 }
102 if sort:
103 params["sort"] = sort
104
105 return await self._get_json(IA_SEARCH_URL, params)
106
107 async def get_metadata(self, identifier: str) -> dict[str, Any]:
108 """Get metadata for a specific Internet Archive item."""
109 url = f"{IA_METADATA_URL}/{identifier}"
110 return await self._get_json(url)
111
112 async def get_files(self, identifier: str) -> list[dict[str, Any]]:
113 """Get file list for an Internet Archive item."""
114 metadata = await self.get_metadata(identifier)
115 return list(metadata.get("files", []))
116
117 async def get_audio_files(self, identifier: str) -> list[dict[str, Any]]:
118 """
119 Get audio files for an item with format preference and deduplication.
120
121 Filters for supported audio formats, removes derivative low-quality files,
122 deduplicates by base filename, and selects the best quality format for
123 each unique track.
124
125 Args:
126 identifier: Internet Archive item identifier
127
128 Returns:
129 List of audio file information dictionaries, sorted by filename
130 for proper track ordering
131 """
132 files = await self.get_files(identifier)
133 files_by_basename: dict[str, list[dict[str, Any]]] = {}
134
135 for file_info in files:
136 filename = file_info.get("name", "")
137 file_format = file_info.get("format", "").lower()
138
139 if not self._is_supported_audio_format(file_format):
140 continue
141 if self._is_derivative_file(file_info, filename):
142 continue
143
144 base_name = self._get_base_filename(filename)
145 files_by_basename.setdefault(base_name, []).append(file_info)
146
147 preferred_files: list[dict[str, Any]] = []
148 for format_versions in files_by_basename.values():
149 best_file = self._select_best_audio_format(format_versions)
150 if best_file:
151 preferred_files.append(best_file)
152
153 return sorted(preferred_files, key=lambda x: x.get("name", ""))
154
155 def _is_supported_audio_format(self, file_format: str) -> bool:
156 """Check if the file format is a supported audio format."""
157 return any(fmt in file_format for fmt in SUPPORTED_AUDIO_FORMATS)
158
159 def _is_derivative_file(self, file_info: dict[str, Any], filename: str) -> bool:
160 """Check if a file is a derivative (low-quality) version."""
161 return file_info.get("source", "") == "derivative" and any(
162 skip in filename.lower() for skip in ("_64kb", "_vbr", "_sample", "_preview")
163 )
164
165 def _get_base_filename(self, filename: str) -> str:
166 """Extract base filename without extension and quality indicators for deduplication."""
167 # Remove extension first
168 base = filename.rsplit(".", 1)[0] if "." in filename else filename
169
170 # Remove common quality indicators from Internet Archive files
171 quality_patterns = [
172 r"_320kb$",
173 r"_256kb$",
174 r"_192kb$",
175 r"_128kb$",
176 r"_64kb$",
177 r"_vbr$",
178 r"_original$",
179 r"_sample$",
180 r"_preview$",
181 ]
182
183 for pattern in quality_patterns:
184 base = re.sub(pattern, "", base, flags=re.IGNORECASE)
185
186 return base
187
188 def _select_best_audio_format(
189 self, format_versions: list[dict[str, Any]]
190 ) -> dict[str, Any] | None:
191 """
192 Select the best audio format from available versions.
193
194 Prefers higher quality formats based on PREFERRED_AUDIO_FORMATS ordering.
195 Falls back to first available if no preferred format is found.
196
197 Args:
198 format_versions: List of file info dictionaries for the same track
199
200 Returns:
201 Best quality file info dictionary, or None if no valid files
202 """
203 for preferred_format in PREFERRED_AUDIO_FORMATS:
204 for file_info in format_versions:
205 if preferred_format in file_info.get("format", "").lower():
206 return file_info
207 return format_versions[0] if format_versions else None
208
209 def get_download_url(self, identifier: str, filename: str) -> str:
210 """
211 Get download URL for a specific file.
212
213 Args:
214 identifier: Internet Archive item identifier
215 filename: Name of the file to download
216
217 Returns:
218 Full download URL for the file
219 """
220 return f"{IA_DOWNLOAD_URL}/{identifier}/{quote(filename)}"
221
222 def get_item_url(self, identifier: str) -> str:
223 """
224 Get the details page URL for an Internet Archive item.
225
226 Args:
227 identifier: Internet Archive item identifier
228
229 Returns:
230 Full URL to the item's details page
231 """
232 return f"{IA_DETAILS_URL}/{identifier}"
233
234
235def parse_duration(duration_str: str) -> int | None:
236 """
237 Parse duration string to seconds.
238
239 Handles various duration formats commonly found in Internet Archive metadata:
240 - "1:23:45" (hours:minutes:seconds)
241 - "12:34" (minutes:seconds)
242 - "123" (seconds only)
243
244 Args:
245 duration_str: Duration string to parse
246
247 Returns:
248 Duration in seconds, or None if parsing fails
249 """
250 if not duration_str:
251 return None
252 try:
253 if ":" in duration_str:
254 parts = duration_str.split(":")
255 if len(parts) == 3: # h:m:s
256 hours, minutes, seconds = map(float, parts)
257 return int(hours * 3600 + minutes * 60 + seconds)
258 if len(parts) == 2: # m:s
259 minutes, seconds = map(float, parts)
260 return int(minutes * 60 + seconds)
261 return None
262 return int(float(duration_str))
263 except (ValueError, TypeError):
264 return None
265
266
267def clean_text(text: str | list[str] | None) -> str:
268 """
269 Clean and normalize text fields from Internet Archive metadata.
270
271 Internet Archive metadata can contain text as strings or lists of strings.
272 This function normalizes the input to a clean string.
273
274 Args:
275 text: Text to clean (string, list of strings, or None)
276
277 Returns:
278 Cleaned text string, or empty string if no valid text found
279 """
280 if not text:
281 return ""
282 if isinstance(text, list):
283 for item in text:
284 if isinstance(item, str) and item.strip():
285 return item.strip()
286 return ""
287 return text.strip()
288
289
290def extract_year(date_str: str | list[str] | None) -> int | None:
291 """
292 Extract year from Internet Archive date string.
293
294 Internet Archive dates can be in various formats. This function attempts
295 to extract a 4-digit year from the date string.
296
297 Args:
298 date_str: Date string or list to extract year from
299
300 Returns:
301 4-digit year as integer, or None if extraction fails
302 """
303 date_text = clean_text(date_str)
304 if not date_text:
305 return None
306 try:
307 match = re.search(r"\b(19\d{2}|20\d{2})\b", date_text)
308 return int(match.group(1)) if match else None
309 except (ValueError, TypeError):
310 return None
311
312
313def get_image_url(identifier: str, filename: str | None = None) -> str | None:
314 """
315 Get image URL for an Internet Archive item.
316
317 Args:
318 identifier: Internet Archive item identifier
319 filename: Optional specific image filename
320
321 Returns:
322 Full URL to the image, or None if identifier is missing
323 """
324 if not identifier:
325 return None
326 if filename:
327 return f"{IA_DOWNLOAD_URL}/{identifier}/{quote(filename)}"
328 return f"{IA_DOWNLOAD_URL}/{identifier}/__ia_thumb.jpg"
329