music-assistant-server

6.7 KBPY
api_client.py
6.7 KB202 lines • python
1"""API Client for Tidal."""
2
3from __future__ import annotations
4
5import json
6from typing import TYPE_CHECKING, Any, cast
7
8from music_assistant_models.errors import (
9    LoginFailed,
10    MediaNotFoundError,
11    ResourceTemporarilyUnavailable,
12)
13
14from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
15
16if TYPE_CHECKING:
17    from collections.abc import AsyncGenerator
18
19    from aiohttp import ClientResponse
20
21    from .provider import TidalProvider
22
23
24class TidalAPIClient:
25    """Client for interacting with Tidal API."""
26
27    BASE_URL: str = "https://api.tidal.com/v1"
28    BASE_URL_V2: str = "https://api.tidal.com/v2"
29    OPEN_API_URL: str = "https://openapi.tidal.com/v2"
30
31    # Define throttler here for use by the client
32    throttler = ThrottlerManager(rate_limit=1, period=2)
33
34    def __init__(self, provider: TidalProvider):
35        """Initialize API client."""
36        self.provider = provider
37        self.auth = provider.auth
38        self.logger = provider.logger
39        self.mass = provider.mass
40
41    async def get(
42        self, endpoint: str, **kwargs: Any
43    ) -> dict[str, Any] | tuple[dict[str, Any], str]:
44        """Get data from Tidal API."""
45        return await self._request("GET", endpoint, **kwargs)
46
47    async def get_data(self, endpoint: str, **kwargs: Any) -> dict[str, Any]:
48        """Get data from Tidal API, discarding headers/ETags."""
49        result = await self.get(endpoint, **kwargs)
50        return result[0] if isinstance(result, tuple) else result
51
52    async def post(
53        self,
54        endpoint: str,
55        data: dict[str, Any] | None = None,
56        as_form: bool = False,
57        **kwargs: Any,
58    ) -> dict[str, Any]:
59        """Send POST data to Tidal API."""
60        if as_form:
61            kwargs.setdefault("headers", {})["Content-Type"] = "application/x-www-form-urlencoded"
62            kwargs["data"] = data
63        else:
64            kwargs["json"] = data
65
66        return cast("dict[str, Any]", await self._request("POST", endpoint, **kwargs))
67
68    async def put(
69        self,
70        endpoint: str,
71        data: dict[str, Any] | None = None,
72        as_form: bool = False,
73        **kwargs: Any,
74    ) -> dict[str, Any]:
75        """Send PUT data to Tidal API."""
76        # Special handling for mixes which use V2
77        if "mixes" in endpoint and "base_url" not in kwargs:
78            kwargs["base_url"] = self.BASE_URL_V2
79
80        if as_form:
81            kwargs.setdefault("headers", {})["Content-Type"] = "application/x-www-form-urlencoded"
82            kwargs["data"] = data
83        else:
84            kwargs["json"] = data
85
86        return cast("dict[str, Any]", await self._request("PUT", endpoint, **kwargs))
87
88    async def delete(
89        self, endpoint: str, data: dict[str, Any] | None = None, **kwargs: Any
90    ) -> dict[str, Any]:
91        """Delete data from Tidal API."""
92        kwargs["json"] = data
93        return cast("dict[str, Any]", await self._request("DELETE", endpoint, **kwargs))
94
95    @throttle_with_retries  # type: ignore[type-var]
96    async def _request(
97        self, method: str, endpoint: str, **kwargs: Any
98    ) -> dict[str, Any] | tuple[dict[str, Any], str]:
99        """Handle API requests internally."""
100        if not await self.auth.ensure_valid_token():
101            raise LoginFailed("Failed to authenticate with Tidal")
102
103        # Prepare URL
104        base_url = kwargs.pop("base_url", self.BASE_URL)
105        url = f"{base_url}/{endpoint}"
106
107        # Prepare Headers
108        headers = kwargs.pop("headers", {})
109        headers["Authorization"] = f"Bearer {self.auth.access_token}"
110
111        locale = self.mass.metadata.locale.replace("_", "-")
112        language = locale.split("-")[0]
113        headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
114
115        # Prepare Params
116        params = kwargs.pop("params", {}) or {}
117        if self.auth.session_id:
118            params["sessionId"] = self.auth.session_id
119        if self.auth.country_code:
120            params["countryCode"] = self.auth.country_code
121
122        # Extract special handling flags
123        return_etag = kwargs.pop("return_etag", False)
124
125        self.logger.debug("Making %s request to Tidal API: %s", method, endpoint)
126
127        async with self.mass.http_session.request(
128            method, url, headers=headers, params=params, **kwargs
129        ) as response:
130            return await self._handle_response(response, return_etag)
131
132    async def _handle_response(
133        self, response: ClientResponse, return_etag: bool = False
134    ) -> dict[str, Any] | tuple[dict[str, Any], str]:
135        """Handle API response and common error conditions."""
136        if response.status == 401:
137            raise LoginFailed("Authentication failed")
138        if response.status == 404:
139            raise MediaNotFoundError(f"Item not found: {response.url}")
140        if response.status == 429:
141            retry_after = int(response.headers.get("Retry-After", 30))
142            raise ResourceTemporarilyUnavailable(
143                "Tidal Rate limit reached", backoff_time=retry_after
144            )
145        if response.status >= 400:
146            text = await response.text()
147            self.logger.error("API error: %s - %s", response.status, text)
148            raise ResourceTemporarilyUnavailable("API error")
149
150        try:
151            if response.status == 204 or response.content_length == 0:
152                data = {"success": True}
153            else:
154                data = await response.json()
155
156            if return_etag:
157                etag = response.headers.get("ETag", "")
158                return data, etag
159            return data
160        except json.JSONDecodeError as err:
161            raise ResourceTemporarilyUnavailable("Failed to parse response") from err
162
163    async def paginate(
164        self,
165        endpoint: str,
166        item_key: str = "items",
167        limit: int = 50,
168        cursor_based: bool = False,
169        **kwargs: Any,
170    ) -> AsyncGenerator[Any, None]:
171        """Paginate through all items from a Tidal API endpoint."""
172        offset = 0
173        cursor = None
174
175        while True:
176            params = {"limit": limit}
177            if cursor_based:
178                if cursor:
179                    params["cursor"] = cursor
180            else:
181                params["offset"] = offset
182
183            if "params" in kwargs:
184                params.update(kwargs.pop("params"))
185
186            api_result = await self.get(endpoint, params=params, **kwargs)
187            response = api_result[0] if isinstance(api_result, tuple) else api_result
188
189            items = response.get(item_key, [])
190            if not items:
191                break
192
193            for item in items:
194                yield item
195
196            if cursor_based:
197                cursor = response.get("cursor")
198                if not cursor:
199                    break
200            else:
201                offset += len(items)
202