/
/
/
1"""Parser for Tidal page structures with lazy loading."""
2
3from __future__ import annotations
4
5import json
6import time
7from typing import TYPE_CHECKING, Any
8
9from music_assistant_models.enums import MediaType
10
11from .constants import CACHE_CATEGORY_RECOMMENDATIONS
12from .parsers import parse_album, parse_artist, parse_playlist, parse_track
13
14if TYPE_CHECKING:
15 from music_assistant_models.media_items import Album, Artist, Playlist, Track
16
17 from .provider import TidalProvider
18
19
20class TidalPageParser:
21 """Parser for Tidal page structures with lazy loading."""
22
23 def __init__(self, provider: TidalProvider) -> None:
24 """Initialize the parser with the Tidal provider instance."""
25 self.provider = provider
26 self.logger = provider.logger
27 self._content_map: dict[str, dict[str, Any]] = {
28 "MIX": {},
29 "PLAYLIST": {},
30 "ALBUM": {},
31 "TRACK": {},
32 "ARTIST": {},
33 }
34 self._module_map: list[dict[str, Any]] = []
35 self._page_path: str | None = None
36 self._parsed_at: int = 0
37
38 def parse_page_structure(self, page_data: dict[str, Any], page_path: str) -> None:
39 """Parse Tidal page structure into indexed modules."""
40 self._page_path = page_path
41 self._parsed_at = int(time.time())
42 self._module_map = []
43
44 # Extract modules from rows
45 module_idx = 0
46 for row_idx, row in enumerate(page_data.get("rows", [])):
47 for module in row.get("modules", []):
48 # Store basic module info for later processing
49 module_info = {
50 "title": module.get("title", ""),
51 "type": module.get("type", ""),
52 "raw_data": module,
53 "module_idx": module_idx,
54 "row_idx": row_idx,
55 }
56 self._module_map.append(module_info)
57 module_idx += 1
58
59 def get_module_items(
60 self, module_info: dict[str, Any]
61 ) -> tuple[list[Playlist | Album | Track | Artist], MediaType]:
62 """Extract media items from a module with simplified type handling."""
63 result: list[Playlist | Album | Track | Artist] = []
64 type_counts: dict[MediaType, int] = {
65 MediaType.PLAYLIST: 0,
66 MediaType.ALBUM: 0,
67 MediaType.TRACK: 0,
68 MediaType.ARTIST: 0,
69 }
70
71 module_data = module_info.get("raw_data", {})
72 module_type = module_data.get("type", "")
73
74 self.logger.debug(
75 "Processing module type: %s, title: %s",
76 module_type,
77 module_data.get("title", "Unknown"),
78 )
79
80 # Process module based on type
81 self._process_module_by_type(module_data, module_type, result, type_counts)
82
83 # Determine the primary content type based on counts
84 primary_type = self._determine_primary_type(type_counts)
85
86 self._log_module_results(module_data, result, type_counts)
87
88 return result, primary_type
89
90 def _process_module_by_type(
91 self,
92 module_data: dict[str, Any],
93 module_type: str,
94 result: list[Playlist | Album | Track | Artist],
95 type_counts: dict[MediaType, int],
96 ) -> None:
97 """Process module content based on module type."""
98 # Extract paged list if present (most modules have this)
99 paged_list = module_data.get("pagedList", {})
100 items = paged_list.get("items", [])
101
102 # Different module types have different content structures
103 if module_type == "PLAYLIST_LIST":
104 self._process_playlist_list(items, result, type_counts)
105 elif module_type == "TRACK_LIST":
106 self._process_track_list(items, result, type_counts)
107 elif module_type == "ALBUM_LIST":
108 self._process_album_list(items, result, type_counts)
109 elif module_type == "ARTIST_LIST":
110 self._process_artist_list(items, result, type_counts)
111 elif module_type == "MIX_LIST":
112 self._process_mix_list(items, result, type_counts)
113 elif module_type == "HIGHLIGHT_MODULE":
114 self._process_highlight_module(module_data, result, type_counts)
115 else:
116 # Generic fallback for other module types
117 self._process_generic_items(items, result, type_counts)
118
119 def _process_playlist_list(
120 self,
121 items: list[dict[str, Any]],
122 result: list[Playlist | Album | Track | Artist],
123 type_counts: dict[MediaType, int],
124 ) -> None:
125 """Process items from a PLAYLIST_LIST module."""
126 for item in items:
127 if isinstance(item, dict):
128 # Check if item appears to be a mix
129 is_mix = "mixId" in item or "mixType" in item
130
131 try:
132 playlist = parse_playlist(self.provider, item, is_mix=is_mix)
133 result.append(playlist)
134 type_counts[MediaType.PLAYLIST] += 1
135 except (KeyError, ValueError, TypeError) as err:
136 self.logger.warning("Error parsing playlist: %s", err)
137 else:
138 # Skip non-dict items
139 pass
140
141 def _process_track_list(
142 self,
143 items: list[dict[str, Any]],
144 result: list[Playlist | Album | Track | Artist],
145 type_counts: dict[MediaType, int],
146 ) -> None:
147 """Process items from a TRACK_LIST module."""
148 for item in items:
149 if isinstance(item, dict):
150 try:
151 track = parse_track(self.provider, item)
152 result.append(track)
153 type_counts[MediaType.TRACK] += 1
154 except (KeyError, ValueError, TypeError) as err:
155 self.logger.warning("Error parsing track: %s", err)
156 else:
157 # Skip non-dict items
158 pass
159
160 def _process_album_list(
161 self,
162 items: list[dict[str, Any]],
163 result: list[Playlist | Album | Track | Artist],
164 type_counts: dict[MediaType, int],
165 ) -> None:
166 """Process items from an ALBUM_LIST module."""
167 for item in items:
168 if isinstance(item, dict):
169 try:
170 album = parse_album(self.provider, item)
171 result.append(album)
172 type_counts[MediaType.ALBUM] += 1
173 except (KeyError, ValueError, TypeError) as err:
174 self.logger.warning("Error parsing album: %s", err)
175 else:
176 # Skip non-dict items
177 pass
178
179 def _process_artist_list(
180 self,
181 items: list[dict[str, Any]],
182 result: list[Playlist | Album | Track | Artist],
183 type_counts: dict[MediaType, int],
184 ) -> None:
185 """Process items from an ARTIST_LIST module."""
186 for item in items:
187 if isinstance(item, dict):
188 try:
189 artist = parse_artist(self.provider, item)
190 result.append(artist)
191 type_counts[MediaType.ARTIST] += 1
192 except (KeyError, ValueError, TypeError) as err:
193 self.logger.warning("Error parsing artist: %s", err)
194 else:
195 # Skip non-dict items
196 pass
197
198 def _process_mix_list(
199 self,
200 items: list[dict[str, Any]],
201 result: list[Playlist | Album | Track | Artist],
202 type_counts: dict[MediaType, int],
203 ) -> None:
204 """Process items from a MIX_LIST module."""
205 for item in items:
206 if isinstance(item, dict):
207 try:
208 mix = parse_playlist(self.provider, item, is_mix=True)
209 result.append(mix)
210 type_counts[MediaType.PLAYLIST] += 1
211 except (KeyError, ValueError, TypeError) as err:
212 self.logger.warning("Error parsing mix: %s", err)
213 else:
214 # Skip non-dict items
215 pass
216
217 def _process_generic_items(
218 self,
219 items: list[dict[str, Any]],
220 result: list[Playlist | Album | Track | Artist],
221 type_counts: dict[MediaType, int],
222 ) -> None:
223 """Process items with generic type detection."""
224 for item in items:
225 if isinstance(item, dict):
226 # Try to determine item type from structure
227 try:
228 parsed_item = self._parse_item(item, type_counts)
229 if parsed_item:
230 result.append(parsed_item)
231 except (KeyError, ValueError, TypeError) as err:
232 self.logger.warning("Error parsing generic item: %s", err)
233 else:
234 # Skip non-dict items
235 pass
236
237 def _log_module_results(
238 self,
239 module_data: dict[str, Any],
240 result: list[Playlist | Album | Track | Artist],
241 type_counts: dict[MediaType, int],
242 ) -> None:
243 """Log detailed module processing results."""
244 self.logger.debug(
245 "Module '%s' processed: %d items (%d playlists, %d albums, %d tracks, %d artists)",
246 module_data.get("title", "Unknown"),
247 len(result),
248 type_counts[MediaType.PLAYLIST],
249 type_counts[MediaType.ALBUM],
250 type_counts[MediaType.TRACK],
251 type_counts[MediaType.ARTIST],
252 )
253
254 def _determine_primary_type(self, type_counts: dict[MediaType, int]) -> MediaType:
255 """Determine the primary media type based on item counts."""
256 primary_type = MediaType.PLAYLIST # Default
257 max_count = 0
258 for media_type, count in type_counts.items():
259 if count > max_count:
260 max_count = count
261 primary_type = media_type
262 return primary_type
263
264 def _process_highlight_module(
265 self,
266 module_data: dict[str, Any],
267 result: list[Playlist | Album | Track | Artist],
268 type_counts: dict[MediaType, int],
269 ) -> None:
270 """Process highlights from a HIGHLIGHT_MODULE."""
271 highlights = module_data.get("highlight", [])
272 for highlight in highlights:
273 if isinstance(highlight, dict): # Make sure highlight is a dict
274 highlight_item = highlight.get("item", {})
275 highlight_type = highlight.get("type", "")
276 if isinstance(highlight_item, dict):
277 if parsed_item := self._parse_item(highlight_item, type_counts, highlight_type):
278 result.append(parsed_item)
279
280 def _process_paged_list(
281 self,
282 module_data: dict[str, Any],
283 module_type: str,
284 result: list[Playlist | Album | Track | Artist],
285 type_counts: dict[MediaType, int],
286 ) -> None:
287 """Process items from a paged list module."""
288 paged_list = module_data.get("pagedList", {})
289 items = paged_list.get("items", [])
290
291 # Handle module-specific type inference
292 inferred_type: str | None = None
293 if module_type in {"ALBUM_LIST", "TRACK_LIST", "PLAYLIST_LIST", "MIX_LIST"}:
294 inferred_type = module_type.replace("_LIST", "")
295
296 # Process each item
297 for item in items:
298 if not item or not isinstance(item, dict):
299 continue
300
301 # Use inferred type if no explicit type
302 item_type = item.get("type", inferred_type)
303 if parsed_item := self._parse_item(item, type_counts, item_type):
304 result.append(parsed_item)
305
306 def _parse_item(
307 self,
308 item: dict[str, Any],
309 type_counts: dict[MediaType, int],
310 item_type: str = "",
311 ) -> Playlist | Album | Track | Artist | None:
312 """Parse a single item from Tidal data into a media item.
313
314 Args:
315 item: Dictionary containing item data
316 type_counts: Dictionary to track counts by media type
317 item_type: Optional item type hint
318
319 Returns:
320 Parsed media item or None if parsing failed
321 """
322 # Handle nested item structure
323 if not item_type and isinstance(item, dict) and "type" in item and "item" in item:
324 item_type = item["type"]
325 item = item["item"]
326
327 # If no explicit type, try to infer from structure
328 if not item_type:
329 if "mixId" in item or "mixType" in item:
330 item_type = "MIX"
331 elif "uuid" in item:
332 item_type = "PLAYLIST"
333 elif "id" in item and "duration" in item and "album" in item:
334 item_type = "TRACK"
335 elif "id" in item and "numberOfTracks" in item and "artists" in item:
336 item_type = "ALBUM"
337 elif "id" in item and "picture" in item and "name" in item and "album" not in item:
338 item_type = "ARTIST"
339
340 # Parse based on detected type
341 try:
342 if item_type == "MIX":
343 media_item: Playlist | Album | Track | Artist = parse_playlist(
344 self.provider, item, is_mix=True
345 )
346 type_counts[MediaType.PLAYLIST] += 1
347 return media_item
348 if item_type == "PLAYLIST":
349 media_item = parse_playlist(self.provider, item)
350 type_counts[MediaType.PLAYLIST] += 1
351 return media_item
352 if item_type == "ALBUM":
353 media_item = parse_album(self.provider, item)
354 type_counts[MediaType.ALBUM] += 1
355 return media_item
356 if item_type == "TRACK":
357 media_item = parse_track(self.provider, item)
358 type_counts[MediaType.TRACK] += 1
359 return media_item
360 if item_type == "ARTIST":
361 media_item = parse_artist(self.provider, item)
362 type_counts[MediaType.ARTIST] += 1
363 return media_item
364 # Last resort - try to infer from structure for unlabeled items
365 if "uuid" in item:
366 media_item = parse_playlist(self.provider, item)
367 type_counts[MediaType.PLAYLIST] += 1
368 return media_item
369 if "id" in item and "title" in item and "duration" in item:
370 media_item = parse_track(self.provider, item)
371 type_counts[MediaType.TRACK] += 1
372 return media_item
373 if "id" in item and "title" in item and "numberOfTracks" in item:
374 media_item = parse_album(self.provider, item)
375 type_counts[MediaType.ALBUM] += 1
376 return media_item
377
378 self.logger.warning("Unknown item type, could not parse: %s", item)
379 return None
380
381 except (KeyError, ValueError, TypeError) as err:
382 self.logger.debug("Error parsing %s item: %s", item_type, err)
383 return None
384 except AttributeError as err:
385 self.logger.debug("Attribute error parsing %s item: %s", item_type, err)
386 return None
387 except (json.JSONDecodeError, UnicodeError) as err:
388 self.logger.debug("JSON/Unicode error parsing %s item: %s", item_type, err)
389 return None
390
391 @classmethod
392 async def from_cache(cls, provider: TidalProvider, page_path: str) -> TidalPageParser | None:
393 """Create a parser instance from cached data if available and valid."""
394 cached_data = await provider.mass.cache.get(
395 page_path,
396 provider=provider.instance_id,
397 category=CACHE_CATEGORY_RECOMMENDATIONS,
398 )
399 if not cached_data:
400 return None
401
402 parser = cls(provider)
403 parser._page_path = page_path
404 parser._module_map = cached_data.get("module_map", [])
405 parser._content_map = cached_data.get("content_map", {})
406 parser._parsed_at = cached_data.get("parsed_at", 0)
407
408 return parser
409
410 @property
411 def content_stats(self) -> dict[str, int | float]:
412 """Get statistics about the parsed content."""
413 stats = {
414 "modules": len(self._module_map),
415 "cache_age_minutes": (time.time() - self._parsed_at) / 60,
416 }
417
418 for media_type, items in self._content_map.items():
419 stats[f"{media_type.lower()}_count"] = len(items)
420
421 return stats
422