music-assistant-server

16 KBPY
tidal_page_parser.py
16 KB422 lines • python
1"""Parser for Tidal page structures with lazy loading."""
2
3from __future__ import annotations
4
5import json
6import time
7from typing import TYPE_CHECKING, Any
8
9from music_assistant_models.enums import MediaType
10
11from .constants import CACHE_CATEGORY_RECOMMENDATIONS
12from .parsers import parse_album, parse_artist, parse_playlist, parse_track
13
14if TYPE_CHECKING:
15    from music_assistant_models.media_items import Album, Artist, Playlist, Track
16
17    from .provider import TidalProvider
18
19
20class TidalPageParser:
21    """Parser for Tidal page structures with lazy loading."""
22
23    def __init__(self, provider: TidalProvider) -> None:
24        """Initialize the parser with the Tidal provider instance."""
25        self.provider = provider
26        self.logger = provider.logger
27        self._content_map: dict[str, dict[str, Any]] = {
28            "MIX": {},
29            "PLAYLIST": {},
30            "ALBUM": {},
31            "TRACK": {},
32            "ARTIST": {},
33        }
34        self._module_map: list[dict[str, Any]] = []
35        self._page_path: str | None = None
36        self._parsed_at: int = 0
37
38    def parse_page_structure(self, page_data: dict[str, Any], page_path: str) -> None:
39        """Parse Tidal page structure into indexed modules."""
40        self._page_path = page_path
41        self._parsed_at = int(time.time())
42        self._module_map = []
43
44        # Extract modules from rows
45        module_idx = 0
46        for row_idx, row in enumerate(page_data.get("rows", [])):
47            for module in row.get("modules", []):
48                # Store basic module info for later processing
49                module_info = {
50                    "title": module.get("title", ""),
51                    "type": module.get("type", ""),
52                    "raw_data": module,
53                    "module_idx": module_idx,
54                    "row_idx": row_idx,
55                }
56                self._module_map.append(module_info)
57                module_idx += 1
58
59    def get_module_items(
60        self, module_info: dict[str, Any]
61    ) -> tuple[list[Playlist | Album | Track | Artist], MediaType]:
62        """Extract media items from a module with simplified type handling."""
63        result: list[Playlist | Album | Track | Artist] = []
64        type_counts: dict[MediaType, int] = {
65            MediaType.PLAYLIST: 0,
66            MediaType.ALBUM: 0,
67            MediaType.TRACK: 0,
68            MediaType.ARTIST: 0,
69        }
70
71        module_data = module_info.get("raw_data", {})
72        module_type = module_data.get("type", "")
73
74        self.logger.debug(
75            "Processing module type: %s, title: %s",
76            module_type,
77            module_data.get("title", "Unknown"),
78        )
79
80        # Process module based on type
81        self._process_module_by_type(module_data, module_type, result, type_counts)
82
83        # Determine the primary content type based on counts
84        primary_type = self._determine_primary_type(type_counts)
85
86        self._log_module_results(module_data, result, type_counts)
87
88        return result, primary_type
89
90    def _process_module_by_type(
91        self,
92        module_data: dict[str, Any],
93        module_type: str,
94        result: list[Playlist | Album | Track | Artist],
95        type_counts: dict[MediaType, int],
96    ) -> None:
97        """Process module content based on module type."""
98        # Extract paged list if present (most modules have this)
99        paged_list = module_data.get("pagedList", {})
100        items = paged_list.get("items", [])
101
102        # Different module types have different content structures
103        if module_type == "PLAYLIST_LIST":
104            self._process_playlist_list(items, result, type_counts)
105        elif module_type == "TRACK_LIST":
106            self._process_track_list(items, result, type_counts)
107        elif module_type == "ALBUM_LIST":
108            self._process_album_list(items, result, type_counts)
109        elif module_type == "ARTIST_LIST":
110            self._process_artist_list(items, result, type_counts)
111        elif module_type == "MIX_LIST":
112            self._process_mix_list(items, result, type_counts)
113        elif module_type == "HIGHLIGHT_MODULE":
114            self._process_highlight_module(module_data, result, type_counts)
115        else:
116            # Generic fallback for other module types
117            self._process_generic_items(items, result, type_counts)
118
119    def _process_playlist_list(
120        self,
121        items: list[dict[str, Any]],
122        result: list[Playlist | Album | Track | Artist],
123        type_counts: dict[MediaType, int],
124    ) -> None:
125        """Process items from a PLAYLIST_LIST module."""
126        for item in items:
127            if isinstance(item, dict):
128                # Check if item appears to be a mix
129                is_mix = "mixId" in item or "mixType" in item
130
131                try:
132                    playlist = parse_playlist(self.provider, item, is_mix=is_mix)
133                    result.append(playlist)
134                    type_counts[MediaType.PLAYLIST] += 1
135                except (KeyError, ValueError, TypeError) as err:
136                    self.logger.warning("Error parsing playlist: %s", err)
137            else:
138                # Skip non-dict items
139                pass
140
141    def _process_track_list(
142        self,
143        items: list[dict[str, Any]],
144        result: list[Playlist | Album | Track | Artist],
145        type_counts: dict[MediaType, int],
146    ) -> None:
147        """Process items from a TRACK_LIST module."""
148        for item in items:
149            if isinstance(item, dict):
150                try:
151                    track = parse_track(self.provider, item)
152                    result.append(track)
153                    type_counts[MediaType.TRACK] += 1
154                except (KeyError, ValueError, TypeError) as err:
155                    self.logger.warning("Error parsing track: %s", err)
156            else:
157                # Skip non-dict items
158                pass
159
160    def _process_album_list(
161        self,
162        items: list[dict[str, Any]],
163        result: list[Playlist | Album | Track | Artist],
164        type_counts: dict[MediaType, int],
165    ) -> None:
166        """Process items from an ALBUM_LIST module."""
167        for item in items:
168            if isinstance(item, dict):
169                try:
170                    album = parse_album(self.provider, item)
171                    result.append(album)
172                    type_counts[MediaType.ALBUM] += 1
173                except (KeyError, ValueError, TypeError) as err:
174                    self.logger.warning("Error parsing album: %s", err)
175            else:
176                # Skip non-dict items
177                pass
178
179    def _process_artist_list(
180        self,
181        items: list[dict[str, Any]],
182        result: list[Playlist | Album | Track | Artist],
183        type_counts: dict[MediaType, int],
184    ) -> None:
185        """Process items from an ARTIST_LIST module."""
186        for item in items:
187            if isinstance(item, dict):
188                try:
189                    artist = parse_artist(self.provider, item)
190                    result.append(artist)
191                    type_counts[MediaType.ARTIST] += 1
192                except (KeyError, ValueError, TypeError) as err:
193                    self.logger.warning("Error parsing artist: %s", err)
194            else:
195                # Skip non-dict items
196                pass
197
198    def _process_mix_list(
199        self,
200        items: list[dict[str, Any]],
201        result: list[Playlist | Album | Track | Artist],
202        type_counts: dict[MediaType, int],
203    ) -> None:
204        """Process items from a MIX_LIST module."""
205        for item in items:
206            if isinstance(item, dict):
207                try:
208                    mix = parse_playlist(self.provider, item, is_mix=True)
209                    result.append(mix)
210                    type_counts[MediaType.PLAYLIST] += 1
211                except (KeyError, ValueError, TypeError) as err:
212                    self.logger.warning("Error parsing mix: %s", err)
213            else:
214                # Skip non-dict items
215                pass
216
217    def _process_generic_items(
218        self,
219        items: list[dict[str, Any]],
220        result: list[Playlist | Album | Track | Artist],
221        type_counts: dict[MediaType, int],
222    ) -> None:
223        """Process items with generic type detection."""
224        for item in items:
225            if isinstance(item, dict):
226                # Try to determine item type from structure
227                try:
228                    parsed_item = self._parse_item(item, type_counts)
229                    if parsed_item:
230                        result.append(parsed_item)
231                except (KeyError, ValueError, TypeError) as err:
232                    self.logger.warning("Error parsing generic item: %s", err)
233            else:
234                # Skip non-dict items
235                pass
236
237    def _log_module_results(
238        self,
239        module_data: dict[str, Any],
240        result: list[Playlist | Album | Track | Artist],
241        type_counts: dict[MediaType, int],
242    ) -> None:
243        """Log detailed module processing results."""
244        self.logger.debug(
245            "Module '%s' processed: %d items (%d playlists, %d albums, %d tracks, %d artists)",
246            module_data.get("title", "Unknown"),
247            len(result),
248            type_counts[MediaType.PLAYLIST],
249            type_counts[MediaType.ALBUM],
250            type_counts[MediaType.TRACK],
251            type_counts[MediaType.ARTIST],
252        )
253
254    def _determine_primary_type(self, type_counts: dict[MediaType, int]) -> MediaType:
255        """Determine the primary media type based on item counts."""
256        primary_type = MediaType.PLAYLIST  # Default
257        max_count = 0
258        for media_type, count in type_counts.items():
259            if count > max_count:
260                max_count = count
261                primary_type = media_type
262        return primary_type
263
264    def _process_highlight_module(
265        self,
266        module_data: dict[str, Any],
267        result: list[Playlist | Album | Track | Artist],
268        type_counts: dict[MediaType, int],
269    ) -> None:
270        """Process highlights from a HIGHLIGHT_MODULE."""
271        highlights = module_data.get("highlight", [])
272        for highlight in highlights:
273            if isinstance(highlight, dict):  # Make sure highlight is a dict
274                highlight_item = highlight.get("item", {})
275                highlight_type = highlight.get("type", "")
276                if isinstance(highlight_item, dict):
277                    if parsed_item := self._parse_item(highlight_item, type_counts, highlight_type):
278                        result.append(parsed_item)
279
280    def _process_paged_list(
281        self,
282        module_data: dict[str, Any],
283        module_type: str,
284        result: list[Playlist | Album | Track | Artist],
285        type_counts: dict[MediaType, int],
286    ) -> None:
287        """Process items from a paged list module."""
288        paged_list = module_data.get("pagedList", {})
289        items = paged_list.get("items", [])
290
291        # Handle module-specific type inference
292        inferred_type: str | None = None
293        if module_type in {"ALBUM_LIST", "TRACK_LIST", "PLAYLIST_LIST", "MIX_LIST"}:
294            inferred_type = module_type.replace("_LIST", "")
295
296        # Process each item
297        for item in items:
298            if not item or not isinstance(item, dict):
299                continue
300
301            # Use inferred type if no explicit type
302            item_type = item.get("type", inferred_type)
303            if parsed_item := self._parse_item(item, type_counts, item_type):
304                result.append(parsed_item)
305
306    def _parse_item(
307        self,
308        item: dict[str, Any],
309        type_counts: dict[MediaType, int],
310        item_type: str = "",
311    ) -> Playlist | Album | Track | Artist | None:
312        """Parse a single item from Tidal data into a media item.
313
314        Args:
315            item: Dictionary containing item data
316            type_counts: Dictionary to track counts by media type
317            item_type: Optional item type hint
318
319        Returns:
320            Parsed media item or None if parsing failed
321        """
322        # Handle nested item structure
323        if not item_type and isinstance(item, dict) and "type" in item and "item" in item:
324            item_type = item["type"]
325            item = item["item"]
326
327        # If no explicit type, try to infer from structure
328        if not item_type:
329            if "mixId" in item or "mixType" in item:
330                item_type = "MIX"
331            elif "uuid" in item:
332                item_type = "PLAYLIST"
333            elif "id" in item and "duration" in item and "album" in item:
334                item_type = "TRACK"
335            elif "id" in item and "numberOfTracks" in item and "artists" in item:
336                item_type = "ALBUM"
337            elif "id" in item and "picture" in item and "name" in item and "album" not in item:
338                item_type = "ARTIST"
339
340        # Parse based on detected type
341        try:
342            if item_type == "MIX":
343                media_item: Playlist | Album | Track | Artist = parse_playlist(
344                    self.provider, item, is_mix=True
345                )
346                type_counts[MediaType.PLAYLIST] += 1
347                return media_item
348            if item_type == "PLAYLIST":
349                media_item = parse_playlist(self.provider, item)
350                type_counts[MediaType.PLAYLIST] += 1
351                return media_item
352            if item_type == "ALBUM":
353                media_item = parse_album(self.provider, item)
354                type_counts[MediaType.ALBUM] += 1
355                return media_item
356            if item_type == "TRACK":
357                media_item = parse_track(self.provider, item)
358                type_counts[MediaType.TRACK] += 1
359                return media_item
360            if item_type == "ARTIST":
361                media_item = parse_artist(self.provider, item)
362                type_counts[MediaType.ARTIST] += 1
363                return media_item
364            # Last resort - try to infer from structure for unlabeled items
365            if "uuid" in item:
366                media_item = parse_playlist(self.provider, item)
367                type_counts[MediaType.PLAYLIST] += 1
368                return media_item
369            if "id" in item and "title" in item and "duration" in item:
370                media_item = parse_track(self.provider, item)
371                type_counts[MediaType.TRACK] += 1
372                return media_item
373            if "id" in item and "title" in item and "numberOfTracks" in item:
374                media_item = parse_album(self.provider, item)
375                type_counts[MediaType.ALBUM] += 1
376                return media_item
377
378            self.logger.warning("Unknown item type, could not parse: %s", item)
379            return None
380
381        except (KeyError, ValueError, TypeError) as err:
382            self.logger.debug("Error parsing %s item: %s", item_type, err)
383            return None
384        except AttributeError as err:
385            self.logger.debug("Attribute error parsing %s item: %s", item_type, err)
386            return None
387        except (json.JSONDecodeError, UnicodeError) as err:
388            self.logger.debug("JSON/Unicode error parsing %s item: %s", item_type, err)
389            return None
390
391    @classmethod
392    async def from_cache(cls, provider: TidalProvider, page_path: str) -> TidalPageParser | None:
393        """Create a parser instance from cached data if available and valid."""
394        cached_data = await provider.mass.cache.get(
395            page_path,
396            provider=provider.instance_id,
397            category=CACHE_CATEGORY_RECOMMENDATIONS,
398        )
399        if not cached_data:
400            return None
401
402        parser = cls(provider)
403        parser._page_path = page_path
404        parser._module_map = cached_data.get("module_map", [])
405        parser._content_map = cached_data.get("content_map", {})
406        parser._parsed_at = cached_data.get("parsed_at", 0)
407
408        return parser
409
410    @property
411    def content_stats(self) -> dict[str, int | float]:
412        """Get statistics about the parsed content."""
413        stats = {
414            "modules": len(self._module_map),
415            "cache_age_minutes": (time.time() - self._parsed_at) / 60,
416        }
417
418        for media_type, items in self._content_map.items():
419            stats[f"{media_type.lower()}_count"] = len(items)
420
421        return stats
422