music-assistant-server

6.6 KBPY
helpers.py
6.6 KB218 lines • python
1"""Helper functions for Podcast Index provider."""
2
3from __future__ import annotations
4
5import hashlib
6import time
7from datetime import UTC, datetime
8from typing import TYPE_CHECKING, Any
9
10import aiohttp
11from music_assistant_models.enums import ContentType, ImageType, MediaType
12from music_assistant_models.errors import (
13    InvalidDataError,
14    LoginFailed,
15    ProviderUnavailableError,
16)
17from music_assistant_models.media_items import (
18    AudioFormat,
19    ItemMapping,
20    MediaItemImage,
21    Podcast,
22    PodcastEpisode,
23    ProviderMapping,
24    UniqueList,
25)
26
27from .constants import API_BASE_URL
28
29if TYPE_CHECKING:
30    from music_assistant.mass import MusicAssistant
31
32
33async def make_api_request(
34    mass: MusicAssistant,
35    api_key: str,
36    api_secret: str,
37    endpoint: str,
38    params: dict[str, Any] | None = None,
39) -> dict[str, Any]:
40    """
41    Make authenticated request to Podcast Index API.
42
43    Handles authentication using SHA1 hash of API key, secret, and timestamp.
44    Maps HTTP errors appropriately: 401 -> LoginFailed, others -> ProviderUnavailableError.
45    """
46    # Prepare authentication headers
47    auth_date = str(int(time.time()))
48    auth_string = api_key + api_secret + auth_date
49    auth_hash = hashlib.sha1(auth_string.encode()).hexdigest()
50
51    headers = {
52        "X-Auth-Key": api_key,
53        "X-Auth-Date": auth_date,
54        "Authorization": auth_hash,
55    }
56
57    url = f"{API_BASE_URL}/{endpoint}"
58
59    try:
60        async with mass.http_session.get(url, headers=headers, params=params or {}) as response:
61            response.raise_for_status()
62
63            try:
64                data: dict[str, Any] = await response.json()
65            except aiohttp.ContentTypeError as err:
66                raise InvalidDataError("Invalid JSON response from API") from err
67
68            if str(data.get("status")).lower() != "true":
69                raise InvalidDataError(data.get("description") or "API error")
70
71            return data
72
73    except aiohttp.ClientConnectorError as err:
74        raise ProviderUnavailableError(f"Failed to connect to Podcast Index API: {err}") from err
75    except aiohttp.ServerTimeoutError as err:
76        raise ProviderUnavailableError(f"Podcast Index API timeout: {err}") from err
77    except aiohttp.ClientResponseError as err:
78        if err.status == 401:
79            raise LoginFailed(f"Authentication failed: {err.status}") from err
80        raise ProviderUnavailableError(f"API request failed: {err.status}") from err
81
82
83def parse_podcast_from_feed(
84    feed_data: dict[str, Any], instance_id: str, domain: str
85) -> Podcast | None:
86    """Parse podcast from API feed data."""
87    feed_url = feed_data.get("url")
88    podcast_id = feed_data.get("id")
89
90    if not feed_url or not podcast_id:
91        return None
92
93    podcast = Podcast(
94        item_id=str(podcast_id),
95        name=feed_data.get("title", "Unknown Podcast"),
96        publisher=feed_data.get("author") or feed_data.get("ownerName", "Unknown"),
97        provider=instance_id,
98        provider_mappings={
99            ProviderMapping(
100                item_id=str(podcast_id),
101                provider_domain=domain,
102                provider_instance=instance_id,
103                url=feed_url,
104            )
105        },
106    )
107
108    # Add metadata
109    podcast.metadata.description = feed_data.get("description", "")
110    podcast.metadata.explicit = bool(feed_data.get("explicit", False))
111
112    # Set episode count only if provided
113    episode_count = feed_data.get("episodeCount")
114    if episode_count is not None:
115        podcast.total_episodes = int(episode_count) or 0
116
117    # Add image - prefer 'image' field, fallback to 'artwork'
118    image_url = feed_data.get("image") or feed_data.get("artwork")
119    if image_url:
120        podcast.metadata.add_image(
121            MediaItemImage(
122                type=ImageType.THUMB,
123                path=image_url,
124                provider=instance_id,
125                remotely_accessible=True,
126            )
127        )
128
129    # Add categories as genres - categories is a dict {id: name}
130    categories = feed_data.get("categories", {})
131    if categories and isinstance(categories, dict):
132        podcast.metadata.genres = set(categories.values())
133
134    # Add language
135    language = feed_data.get("language", "")
136    if language:
137        podcast.metadata.languages = UniqueList([language])
138
139    return podcast
140
141
142def parse_episode_from_data(
143    episode_data: dict[str, Any],
144    podcast_id: str,
145    episode_idx: int,
146    instance_id: str,
147    domain: str,
148    podcast_name: str | None = None,
149) -> PodcastEpisode | None:
150    """Parse episode from API episode data."""
151    episode_api_id = episode_data.get("id")
152    if not episode_api_id:
153        return None
154
155    episode_id = f"{podcast_id}|{episode_api_id}"
156
157    position = episode_data.get("episode")
158    if position is None:
159        position = episode_idx + 1
160
161    if podcast_name is None:
162        podcast_name = episode_data.get("feedTitle") or "Unknown Podcast"
163
164    raw_duration = episode_data.get("duration")
165    try:
166        duration = int(raw_duration) if raw_duration is not None else 0
167    except (ValueError, TypeError):
168        duration = 0
169
170    episode = PodcastEpisode(
171        item_id=episode_id,
172        provider=instance_id,
173        name=episode_data.get("title", "Unknown Episode"),
174        duration=duration,
175        position=position,
176        podcast=ItemMapping(
177            item_id=podcast_id,
178            provider=instance_id,
179            name=podcast_name,
180            media_type=MediaType.PODCAST,
181        ),
182        provider_mappings={
183            ProviderMapping(
184                item_id=episode_id,
185                provider_domain=domain,
186                provider_instance=instance_id,
187                available=True,
188                audio_format=AudioFormat(
189                    content_type=ContentType.try_parse(
190                        episode_data.get("enclosureType") or "audio/mpeg"
191                    ),
192                ),
193                url=episode_data.get("enclosureUrl"),
194            )
195        },
196    )
197
198    # Add metadata
199    episode.metadata.description = episode_data.get("description", "")
200    episode.metadata.explicit = bool(episode_data.get("explicit", 0))
201
202    date_published = episode_data.get("datePublished")
203    if date_published:
204        episode.metadata.release_date = datetime.fromtimestamp(date_published, tz=UTC)
205
206    image_url = episode_data.get("image") or episode_data.get("feedImage")
207    if image_url:
208        episode.metadata.add_image(
209            MediaItemImage(
210                type=ImageType.THUMB,
211                path=image_url,
212                provider=instance_id,
213                remotely_accessible=True,
214            )
215        )
216
217    return episode
218