music-assistant-server

55.2 KBPY
__init__.py
55.2 KB1,269 lines • python
1"""
2Apple Music musicprovider support for MusicAssistant.
3
4TODO MUSIC_APP_TOKEN expires after 6 months so should have a distribution mechanism outside
5  compulsory application updates. It is only a semi-private key in JWT format so code be refreshed
6  daily by a GitHub action and downloaded by the provider each initialise.
7TODO Widevine keys can be obtained dynamically from Apple Music API rather than copied into Docker
8  build. This is undocumented but @maxlyth has a working example.
9TODO MUSIC_USER_TOKEN must be refreshed (~min 180 days) and needs mechanism to prompt user to
10  re-authenticate in browser.
11TODO Current provider ignores private tracks that are not available in the storefront catalog as
12  streamable url is derived from the catalog id. It is undecumented but @maxlyth has a working
13  example to get a streamable url from the library id.
14"""
15
16from __future__ import annotations
17
18import base64
19import json
20import os
21import pathlib
22import re
23import time
24from collections.abc import Sequence
25from typing import TYPE_CHECKING, Any
26
27import aiofiles
28from aiohttp import web
29from aiohttp.client_exceptions import ClientError
30from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
31from music_assistant_models.enums import (
32    AlbumType,
33    ConfigEntryType,
34    ContentType,
35    ExternalID,
36    ImageType,
37    MediaType,
38    ProviderFeature,
39    StreamType,
40)
41from music_assistant_models.errors import (
42    LoginFailed,
43    MediaNotFoundError,
44    MusicAssistantError,
45    ResourceTemporarilyUnavailable,
46)
47from music_assistant_models.media_items import (
48    Album,
49    Artist,
50    AudioFormat,
51    BrowseFolder,
52    ItemMapping,
53    MediaItemImage,
54    MediaItemType,
55    Playlist,
56    ProviderMapping,
57    SearchResults,
58    Track,
59    UniqueList,
60)
61from music_assistant_models.streamdetails import StreamDetails
62from pywidevine import PSSH, Cdm, Device, DeviceTypes
63from pywidevine.license_protocol_pb2 import WidevinePsshData
64from shortuuid import uuid
65
66from music_assistant.controllers.cache import use_cache
67from music_assistant.helpers.app_vars import app_var
68from music_assistant.helpers.auth import AuthenticationHelper
69from music_assistant.helpers.json import json_loads
70from music_assistant.helpers.playlists import fetch_playlist
71from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
72from music_assistant.helpers.util import infer_album_type, parse_title_and_version
73from music_assistant.models.music_provider import MusicProvider
74from music_assistant.providers.apple_music.helpers import browse_playlists
75
76if TYPE_CHECKING:
77    from collections.abc import AsyncGenerator
78
79    from music_assistant_models.config_entries import ProviderConfig
80    from music_assistant_models.provider import ProviderManifest
81
82    from music_assistant import MusicAssistant
83    from music_assistant.models import ProviderInstanceType
84
85
86SUPPORTED_FEATURES = {
87    ProviderFeature.LIBRARY_ARTISTS,
88    ProviderFeature.LIBRARY_ALBUMS,
89    ProviderFeature.LIBRARY_TRACKS,
90    ProviderFeature.LIBRARY_PLAYLISTS,
91    ProviderFeature.BROWSE,
92    ProviderFeature.SEARCH,
93    ProviderFeature.ARTIST_ALBUMS,
94    ProviderFeature.ARTIST_TOPTRACKS,
95    ProviderFeature.SIMILAR_TRACKS,
96    ProviderFeature.LIBRARY_ALBUMS_EDIT,
97    ProviderFeature.LIBRARY_ARTISTS_EDIT,
98    ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
99    ProviderFeature.LIBRARY_TRACKS_EDIT,
100    ProviderFeature.FAVORITE_ALBUMS_EDIT,
101    ProviderFeature.FAVORITE_TRACKS_EDIT,
102    ProviderFeature.FAVORITE_PLAYLISTS_EDIT,
103}
104
105MUSIC_APP_TOKEN = app_var(8)
106WIDEVINE_BASE_PATH = "/usr/local/bin/widevine_cdm"
107DECRYPT_CLIENT_ID_FILENAME = "client_id.bin"
108DECRYPT_PRIVATE_KEY_FILENAME = "private_key.pem"
109UNKNOWN_PLAYLIST_NAME = "Unknown Apple Music Playlist"
110CONF_MUSIC_APP_TOKEN = "music_app_token"
111CONF_MUSIC_USER_TOKEN = "music_user_token"
112CONF_MUSIC_USER_MANUAL_TOKEN = "music_user_manual_token"
113CONF_MUSIC_USER_TOKEN_TIMESTAMP = "music_user_token_timestamp"
114CACHE_CATEGORY_DECRYPT_KEY = 1
115
116
117async def setup(
118    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
119) -> ProviderInstanceType:
120    """Initialize provider(instance) with given configuration."""
121    return AppleMusicProvider(mass, manifest, config, SUPPORTED_FEATURES)
122
123
124async def get_config_entries(
125    mass: MusicAssistant,
126    instance_id: str | None = None,
127    action: str | None = None,
128    values: dict[str, ConfigValueType] | None = None,
129) -> tuple[ConfigEntry, ...]:
130    """
131    Return Config entries to setup this provider.
132
133    instance_id: id of an existing provider instance (None if new instance setup).
134    action: [optional] action key called from config entries UI.
135    values: the (intermediate) raw values for config entries sent with the action.
136    """
137
138    def validate_user_token(token):
139        if not isinstance(token, str):
140            return False
141        valid = re.findall(r"[a-zA-Z0-9=/+]{32,}==$", token)
142        return bool(valid)
143
144    # Check for valid app token (1st with regex and then API check) otherwise display a config field
145    default_app_token_valid = False
146    async with (
147        mass.http_session.get(
148            "https://api.music.apple.com/v1/test",
149            headers={"Authorization": f"Bearer {MUSIC_APP_TOKEN}"},
150            ssl=True,
151            timeout=10,
152        ) as response,
153    ):
154        if response.status == 200:
155            values[CONF_MUSIC_APP_TOKEN] = f"{MUSIC_APP_TOKEN}"
156            default_app_token_valid = True
157
158    # Action is to launch MusicKit flow
159    if action == "CONF_ACTION_AUTH" and default_app_token_valid:
160        callback_method = "POST"
161        async with AuthenticationHelper(mass, values["session_id"], callback_method) as auth_helper:
162            callback_url = auth_helper.callback_url
163            flow_base_path = f"apple_music_auth/{values['session_id']}/"
164            flow_timeout = 600
165            parent_file_path = pathlib.Path(__file__).parent.resolve()
166            base_url = f"{mass.webserver.base_url}/{flow_base_path}"
167            flow_base_url = f"{base_url}index.html"
168
169            async def serve_mk_auth_page(request: web.Request) -> web.Response:
170                auth_html_path = parent_file_path.joinpath("musickit_auth/musickit_wrapper.html")
171                return web.FileResponse(
172                    auth_html_path,
173                    headers={"content-type": "text/html"},
174                )
175
176            async def serve_mk_auth_css(request: web.Request) -> web.Response:
177                auth_css_path = parent_file_path.joinpath("musickit_auth/musickit_wrapper.css")
178                return web.FileResponse(
179                    auth_css_path,
180                    headers={
181                        "content-type": "text/css",
182                    },
183                )
184
185            async def serve_mk_glue(request: web.Request) -> web.Response:
186                return_html = f"""
187                const return_url='{callback_url}';
188                const base_url='{base_url}';
189                const app_token='{values[CONF_MUSIC_APP_TOKEN]}';
190                const callback_method='{callback_method}';
191                const user_token='{
192                    values[CONF_MUSIC_USER_TOKEN]
193                    if validate_user_token(values[CONF_MUSIC_USER_TOKEN])
194                    else ""
195                }';
196                const user_token_timestamp='{values[CONF_MUSIC_USER_TOKEN_TIMESTAMP]}';
197                const flow_timeout={max([flow_timeout - 10, 60])};
198                const flow_start_time={int(time.time())};
199                const mass_version='{mass.version}';
200                """
201                return web.Response(
202                    body=return_html,
203                    headers={
204                        "content-type": "text/javascript",
205                    },
206                )
207
208            mass.webserver.register_dynamic_route(
209                f"/{flow_base_path}index.html", serve_mk_auth_page
210            )
211            mass.webserver.register_dynamic_route(f"/{flow_base_path}index.css", serve_mk_auth_css)
212            mass.webserver.register_dynamic_route(f"/{flow_base_path}index.js", serve_mk_glue)
213
214            try:
215                result = await auth_helper.authenticate(flow_base_url, flow_timeout)
216                values[CONF_MUSIC_USER_TOKEN] = result["music-user-token"]
217                values[CONF_MUSIC_USER_TOKEN_TIMESTAMP] = result["music-user-token-timestamp"]
218            except KeyError:
219                # no music-user-token URL param was found so likely user cancelled the auth
220                pass
221            except Exception as error:
222                raise LoginFailed(f"Failed to authenticate with Apple '{error}'.")
223            finally:
224                mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.html")
225                mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.css")
226                mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.js")
227
228    # ruff: noqa: ARG001
229    return (
230        ConfigEntry(
231            key=CONF_MUSIC_APP_TOKEN,
232            type=ConfigEntryType.SECURE_STRING,
233            label="MusicKit App Token",
234            hidden=default_app_token_valid,
235            required=True,
236            value=values.get(CONF_MUSIC_APP_TOKEN) if values else None,
237        ),
238        ConfigEntry(
239            key=CONF_MUSIC_USER_TOKEN,
240            type=ConfigEntryType.SECURE_STRING,
241            label="Music User Token",
242            required=False,
243            action="CONF_ACTION_AUTH",
244            description="Authenticate with Apple Music to retrieve a valid music user token.",
245            action_label="Authenticate with Apple Music",
246            value=values.get(CONF_MUSIC_USER_TOKEN)
247            if (
248                values
249                and isinstance(values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP), int)
250                and (
251                    values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP) > (time.time() - (3600 * 24 * 150))
252                )
253            )
254            else None,
255        ),
256        ConfigEntry(
257            key=CONF_MUSIC_USER_MANUAL_TOKEN,
258            type=ConfigEntryType.SECURE_STRING,
259            label="Manual Music User Token",
260            required=False,
261            advanced=True,
262            description=(
263                "Authenticate with a manual Music User Token in case the Authentication flow"
264                " is unsupported (e.g. when using child accounts)."
265            ),
266            help_link="https://www.music-assistant.io/music-providers/apple-music/",
267            value=values.get(CONF_MUSIC_USER_MANUAL_TOKEN),
268        ),
269        ConfigEntry(
270            key=CONF_MUSIC_USER_TOKEN_TIMESTAMP,
271            type=ConfigEntryType.INTEGER,
272            description="Timestamp music user token was updated.",
273            label="Music User Token Timestamp",
274            hidden=True,
275            required=True,
276            default_value=0,
277            value=values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP) if values else 0,
278        ),
279    )
280
281
282class AppleMusicProvider(MusicProvider):
283    """Implementation of an Apple Music MusicProvider."""
284
285    _music_user_token: str | None = None
286    _music_app_token: str | None = None
287    _storefront: str | None = None
288    _decrypt_client_id: bytes | None = None
289    _decrypt_private_key: bytes | None = None
290    _session_id: str | None = None
291    # rate limiter needs to be specified on provider-level,
292    # so make it an instance attribute
293    throttler = ThrottlerManager(rate_limit=1, period=2, initial_backoff=15)
294
295    async def handle_async_init(self) -> None:
296        """Handle async initialization of the provider."""
297        self._music_user_token = self.config.get_value(
298            CONF_MUSIC_USER_MANUAL_TOKEN
299        ) or self.config.get_value(CONF_MUSIC_USER_TOKEN)
300        self._music_app_token = self.config.get_value(CONF_MUSIC_APP_TOKEN)
301        self._storefront = await self._get_user_storefront()
302        # create random session id to use for decryption keys
303        # to invalidate cached keys on each provider initialization
304        self._session_id = str(uuid())
305        async with aiofiles.open(
306            os.path.join(WIDEVINE_BASE_PATH, DECRYPT_CLIENT_ID_FILENAME), "rb"
307        ) as _file:
308            self._decrypt_client_id = await _file.read()
309        async with aiofiles.open(
310            os.path.join(WIDEVINE_BASE_PATH, DECRYPT_PRIVATE_KEY_FILENAME), "rb"
311        ) as _file:
312            self._decrypt_private_key = await _file.read()
313
314    @use_cache()
315    async def search(
316        self, search_query: str, media_types: list[MediaType] | None, limit: int = 5
317    ) -> SearchResults:
318        """Perform search on musicprovider.
319
320        :param search_query: Search query.
321        :param media_types: A list of media_types to include. All types if None.
322        :param limit: Number of items to return in the search (per type).
323        """
324        endpoint = f"catalog/{self._storefront}/search"
325        # Apple music has a limit of 25 items for the search endpoint
326        limit = min(limit, 25)
327        searchresult = SearchResults()
328        searchtypes = []
329        if MediaType.ARTIST in media_types:
330            searchtypes.append("artists")
331        if MediaType.ALBUM in media_types:
332            searchtypes.append("albums")
333        if MediaType.TRACK in media_types:
334            searchtypes.append("songs")
335        if MediaType.PLAYLIST in media_types:
336            searchtypes.append("playlists")
337        if not searchtypes:
338            return searchresult
339        searchtype = ",".join(searchtypes)
340        search_query = search_query.replace("'", "")
341        response = await self._get_data(endpoint, term=search_query, types=searchtype, limit=limit)
342        if "artists" in response["results"]:
343            searchresult.artists += [
344                self._parse_artist(item) for item in response["results"]["artists"]["data"]
345            ]
346        if "albums" in response["results"]:
347            searchresult.albums += [
348                self._parse_album(item) for item in response["results"]["albums"]["data"]
349            ]
350        if "songs" in response["results"]:
351            searchresult.tracks += [
352                self._parse_track(item) for item in response["results"]["songs"]["data"]
353            ]
354        if "playlists" in response["results"]:
355            searchresult.playlists += [
356                self._parse_playlist(item) for item in response["results"]["playlists"]["data"]
357            ]
358        return searchresult
359
360    async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
361        """Browse Apple Music with support for playlist folders."""
362        if not path or "://" not in path:
363            return await super().browse(path)
364        sub_path = path.split("://", 1)[1]
365        path_parts = [part for part in sub_path.split("/") if part]
366        if path_parts and path_parts[0] == "playlists":
367            return await browse_playlists(self, path, path_parts)
368        return await super().browse(path)
369
370    async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
371        """Retrieve library artists from the provider."""
372        endpoint = "me/library/artists"
373        for item in await self._get_all_items(endpoint, include="catalog", extend="editorialNotes"):
374            if item and item["id"]:
375                yield self._parse_artist(item)
376
377    async def get_library_albums(self) -> AsyncGenerator[Album, None]:
378        """Retrieve library albums from the provider."""
379        endpoint = "me/library/albums"
380        album_items = await self._get_all_items(
381            endpoint, include="catalog,artists", extend="editorialNotes"
382        )
383        album_catalog_item_ids = [
384            item["id"]
385            for item in album_items
386            if item and item["id"] and not self.is_library_id(item["id"])
387        ]
388        album_library_item_ids = [
389            item["id"]
390            for item in album_items
391            if item and item["id"] and self.is_library_id(item["id"])
392        ]
393        rating_catalog_response = await self._get_ratings(album_catalog_item_ids, MediaType.ALBUM)
394        rating_library_response = await self._get_ratings(album_library_item_ids, MediaType.ALBUM)
395        for item in album_items:
396            if item and item["id"]:
397                is_favourite = (
398                    rating_catalog_response.get(item["id"])
399                    if not self.is_library_id(item["id"])
400                    else rating_library_response.get(item["id"])
401                )
402                album = self._parse_album(item, is_favourite)
403                if album:
404                    yield album
405
406    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
407        """Retrieve library tracks from the provider."""
408        endpoint = "me/library/songs"
409        song_catalog_ids = []
410        library_only_tracks = []
411        for item in await self._get_all_items(endpoint):
412            catalog_id = item.get("attributes", {}).get("playParams", {}).get("catalogId")
413            if not catalog_id:
414                # Track is library-only (private/uploaded), use library ID instead
415                library_only_tracks.append(item)
416            else:
417                song_catalog_ids.append(catalog_id)
418        # Obtain catalog info per 150 songs, the documented limit of 300 results in a 504 timeout
419        max_limit = 150
420        for i in range(0, len(song_catalog_ids), max_limit):
421            catalog_ids = song_catalog_ids[i : i + max_limit]
422            catalog_endpoint = f"catalog/{self._storefront}/songs"
423            response = await self._get_data(
424                catalog_endpoint, ids=",".join(catalog_ids), include="artists,albums"
425            )
426            # Fetch ratings for this batch
427            rating_response = await self._get_ratings(catalog_ids, MediaType.TRACK)
428            for item in response["data"]:
429                is_favourite = rating_response.get(item["id"])
430                track = self._parse_track(item, is_favourite)
431                yield track
432        # Yield library-only tracks using their library metadata
433        library_ids = [item["id"] for item in library_only_tracks if item and item["id"]]
434        library_rating_response = await self._get_ratings(library_ids, MediaType.TRACK)
435        for item in library_only_tracks:
436            is_favourite = library_rating_response.get(item["id"])
437            yield self._parse_track(item, is_favourite)
438
439    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
440        """Retrieve playlists from the provider."""
441        endpoint = "me/library/playlists"
442        playlist_items = await self._get_all_items(endpoint)
443        playlist_library_item_ids = [
444            item["id"]
445            for item in playlist_items
446            if item and item["id"] and self.is_library_id(item["id"])
447        ]
448        rating_library_response = await self._get_ratings(
449            playlist_library_item_ids, MediaType.PLAYLIST
450        )
451        for item in playlist_items:
452            is_favourite = rating_library_response.get(item["id"])
453            # Prefer catalog information over library information in case of public playlists
454            if item["attributes"]["hasCatalog"]:
455                yield await self.get_playlist(
456                    item["attributes"]["playParams"]["globalId"], is_favourite
457                )
458            elif item and item["id"]:
459                yield self._parse_playlist(item, is_favourite)
460
461    @use_cache()
462    async def get_artist(self, prov_artist_id) -> Artist:
463        """Get full artist details by id."""
464        endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}"
465        response = await self._get_data(endpoint, extend="editorialNotes")
466        return self._parse_artist(response["data"][0])
467
468    @use_cache()
469    async def get_album(self, prov_album_id) -> Album:
470        """Get full album details by id."""
471        endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}"
472        response = await self._get_data(endpoint, include="artists")
473        rating_response = await self._get_ratings([prov_album_id], MediaType.ALBUM)
474        is_favourite = rating_response.get(prov_album_id)
475        return self._parse_album(response["data"][0], is_favourite)
476
477    @use_cache()
478    async def get_track(self, prov_track_id) -> Track:
479        """Get full track details by id."""
480        endpoint = f"catalog/{self._storefront}/songs/{prov_track_id}"
481        response = await self._get_data(endpoint, include="artists,albums")
482        rating_response = await self._get_ratings([prov_track_id], MediaType.TRACK)
483        is_favourite = rating_response.get(prov_track_id)
484        return self._parse_track(response["data"][0], is_favourite)
485
486    @use_cache()
487    async def get_playlist(self, prov_playlist_id, is_favourite: bool = False) -> Playlist:
488        """Get full playlist details by id."""
489        if not self.is_library_id(prov_playlist_id):
490            endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}"
491        else:
492            endpoint = f"me/library/playlists/{prov_playlist_id}"
493        response = await self._get_data(endpoint)
494        return self._parse_playlist(response["data"][0], is_favourite)
495
496    @use_cache()
497    async def get_album_tracks(self, prov_album_id) -> list[Track]:
498        """Get all album tracks for given album id."""
499        endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}/tracks"
500        response = await self._get_data(endpoint, include="artists")
501        # Including albums results in a 504 error, so we need to fetch the album separately
502        album = await self.get_album(prov_album_id)
503        track_ids = [track_obj["id"] for track_obj in response["data"] if "id" in track_obj]
504        rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
505        tracks = []
506        for track_obj in response["data"]:
507            if "id" not in track_obj:
508                continue
509            track = self._parse_track(track_obj, rating_response.get(track_obj["id"]))
510            track.album = album
511            tracks.append(track)
512        return tracks
513
514    @use_cache(3600 * 3)  # cache for 3 hours
515    async def get_playlist_tracks(self, prov_playlist_id, page: int = 0) -> list[Track]:
516        """Get all playlist tracks for given playlist id."""
517        if self._is_catalog_id(prov_playlist_id):
518            endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}/tracks"
519        else:
520            endpoint = f"me/library/playlists/{prov_playlist_id}/tracks"
521        result = []
522        page_size = 100
523        offset = page * page_size
524        response = await self._get_data(
525            endpoint, include="artists,catalog", limit=page_size, offset=offset
526        )
527        if not response or "data" not in response:
528            return result
529        playlist_track_ids = [track["id"] for track in response["data"] if track and track["id"]]
530        rating_response = await self._get_ratings(playlist_track_ids, MediaType.TRACK)
531        for index, track in enumerate(response["data"]):
532            if track and track["id"]:
533                is_favourite = rating_response.get(track["id"])
534                parsed_track = self._parse_track(track, is_favourite)
535                parsed_track.position = offset + index + 1
536                result.append(parsed_track)
537        return result
538
539    @use_cache(3600 * 24 * 7)  # cache for 7 days
540    async def get_artist_albums(self, prov_artist_id) -> list[Album]:
541        """Get a list of all albums for the given artist."""
542        endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}/albums"
543        try:
544            response = await self._get_all_items(endpoint)
545        except MediaNotFoundError:
546            # Some artists do not have albums, return empty list
547            self.logger.info("No albums found for artist %s", prov_artist_id)
548            return []
549        album_ids = [album["id"] for album in response if album["id"]]
550        rating_response = await self._get_ratings(album_ids, MediaType.ALBUM)
551        albums = []
552        for album in response:
553            if not album["id"]:
554                continue
555            is_favourite = rating_response.get(album["id"])
556            parsed_album = self._parse_album(album, is_favourite)
557            if parsed_album:
558                albums.append(parsed_album)
559        return albums
560
561    @use_cache(3600 * 24 * 7)  # cache for 7 days
562    async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
563        """Get a list of 10 most popular tracks for the given artist."""
564        endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}/view/top-songs"
565        try:
566            response = await self._get_data(endpoint)
567        except MediaNotFoundError:
568            # Some artists do not have top tracks, return empty list
569            self.logger.info("No top tracks found for artist %s", prov_artist_id)
570            return []
571        track_ids = [track["id"] for track in response["data"] if track["id"]]
572        rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
573        tracks = []
574        for track in response["data"]:
575            if not track["id"]:
576                continue
577            is_favourite = rating_response.get(track["id"])
578            tracks.append(self._parse_track(track, is_favourite))
579        return tracks
580
581    async def library_add(self, item: MediaItemType) -> None:
582        """Add item to library."""
583        item_type = self._translate_media_type_to_apple_type(item.media_type)
584        kwargs = {
585            f"ids[{item_type}]": item.item_id,
586        }
587        await self._post_data("me/library/", **kwargs)
588
589    async def library_remove(self, prov_item_id, media_type: MediaType) -> None:
590        """Remove item from library."""
591        self.logger.warning(
592            "Deleting items from your library is not yet supported by the Apple Music API. "
593            f"Skipping deletion of {media_type} - {prov_item_id}."
594        )
595
596    async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]):
597        """Add track(s) to playlist."""
598        endpoint = f"me/library/playlists/{prov_playlist_id}/tracks"
599        data = {
600            "data": [
601                {
602                    "id": track_id,
603                    "type": "library-songs" if self.is_library_id(track_id) else "songs",
604                }
605                for track_id in prov_track_ids
606            ]
607        }
608        await self._post_data(endpoint, data=data)
609
610    async def remove_playlist_tracks(
611        self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
612    ) -> None:
613        """Remove track(s) from playlist."""
614        self.logger.warning(
615            "Removing tracks from playlists is not supported by the Apple Music "
616            "API. Make sure to delete them using the Apple Music app."
617        )
618
619    @use_cache(3600 * 24)  # cache for 24 hours
620    async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
621        """Retrieve a dynamic list of tracks based on the provided item."""
622        # Note, Apple music does not have an official endpoint for similar tracks.
623        # We will use the next-tracks endpoint to get a list of tracks that are similar to the
624        # provided track. However, Apple music only provides 2 tracks at a time, so we will
625        # need to call the endpoint multiple times. Therefore, set a limit to 6 to prevent
626        # flooding the apple music api.
627        limit = 6
628        endpoint = f"me/stations/next-tracks/ra.{prov_track_id}"
629        found_tracks = []
630        while len(found_tracks) < limit:
631            response = await self._post_data(endpoint, include="artists")
632            if not response or "data" not in response:
633                break
634            track_ids = [track["id"] for track in response["data"] if track and track["id"]]
635            rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
636            for track in response["data"]:
637                if track and track["id"]:
638                    is_favourite = rating_response.get(track["id"])
639                    found_tracks.append(self._parse_track(track, is_favourite))
640        return found_tracks
641
642    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
643        """Return the content details for the given track when it will be streamed."""
644        stream_metadata = await self._fetch_song_stream_metadata(item_id)
645        if self.is_library_id(item_id):
646            # Library items are not encrypted and do not need decryption keys
647            try:
648                stream_url = stream_metadata["assets"][0]["URL"]
649            except (KeyError, IndexError, TypeError) as exc:
650                raise MediaNotFoundError(
651                    f"Failed to extract stream URL for library track {item_id}: {exc}"
652                ) from exc
653            return StreamDetails(
654                item_id=item_id,
655                provider=self.instance_id,
656                path=stream_url,
657                stream_type=StreamType.HTTP,
658                audio_format=AudioFormat(content_type=ContentType.UNKNOWN),
659                can_seek=True,
660                allow_seek=True,
661            )
662        # Continue to obtain decryption keys for catalog items
663        license_url = stream_metadata["hls-key-server-url"]
664        stream_url, uri = await self._parse_stream_url_and_uri(stream_metadata["assets"])
665        if not stream_url or not uri:
666            raise MediaNotFoundError("No stream URL found for song.")
667        key_id = base64.b64decode(uri.split(",")[1])
668        return StreamDetails(
669            item_id=item_id,
670            provider=self.instance_id,
671            audio_format=AudioFormat(content_type=ContentType.MP4, codec_type=ContentType.AAC),
672            stream_type=StreamType.ENCRYPTED_HTTP,
673            decryption_key=await self._get_decryption_key(license_url, key_id, uri, item_id),
674            path=stream_url,
675            can_seek=True,
676            allow_seek=True,
677        )
678
679    async def set_favorite(self, prov_item_id: str, media_type: MediaType, favorite: bool) -> None:
680        """Set the favorite status of an item."""
681        data = {
682            "type": "ratings",
683            "attributes": {
684                "value": 1 if favorite else -1,
685            },
686        }
687        item_type = self._translate_media_type_to_apple_type(media_type)
688        if self._is_catalog_id(prov_item_id):
689            endpoint = f"me/ratings/{item_type}/{prov_item_id}"
690        else:
691            endpoint = f"me/ratings/library-{item_type}/{prov_item_id}"
692        await self._put_data(endpoint, data=data)
693
694    def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
695        """Parse artist object to generic layout."""
696        relationships = artist_obj.get("relationships", {})
697        if (
698            artist_obj.get("type") == "library-artists"
699            and relationships.get("catalog", {}).get("data", []) != []
700        ):
701            artist_id = relationships["catalog"]["data"][0]["id"]
702            attributes = relationships["catalog"]["data"][0]["attributes"]
703        elif "attributes" in artist_obj:
704            artist_id = artist_obj["id"]
705            attributes = artist_obj["attributes"]
706        else:
707            artist_id = artist_obj["id"]
708            self.logger.debug("No attributes found for artist %s", artist_obj)
709            # No more details available other than the id, return an ItemMapping
710            return ItemMapping(
711                media_type=MediaType.ARTIST,
712                provider=self.instance_id,
713                item_id=artist_id,
714                name=artist_id,
715            )
716        artist = Artist(
717            item_id=artist_id,
718            name=attributes.get("name"),
719            provider=self.domain,
720            provider_mappings={
721                ProviderMapping(
722                    item_id=artist_id,
723                    provider_domain=self.domain,
724                    provider_instance=self.instance_id,
725                    url=attributes.get("url"),
726                )
727            },
728        )
729        if artwork := attributes.get("artwork"):
730            artist.metadata.add_image(
731                MediaItemImage(
732                    provider=self.instance_id,
733                    type=ImageType.THUMB,
734                    path=artwork["url"].format(w=artwork["width"], h=artwork["height"]),
735                    remotely_accessible=True,
736                )
737            )
738        if genres := attributes.get("genreNames"):
739            artist.metadata.genres = set(genres)
740        if notes := attributes.get("editorialNotes"):
741            artist.metadata.description = notes.get("standard") or notes.get("short")
742        return artist
743
744    def _parse_album(
745        self, album_obj: dict, is_favourite: bool | None = None
746    ) -> Album | ItemMapping | None:
747        """Parse album object to generic layout."""
748        relationships = album_obj.get("relationships", {})
749        response_type = album_obj.get("type")
750        if (
751            response_type == "library-albums"
752            and relationships["catalog"]["data"] != []
753            and "attributes" in relationships["catalog"]["data"][0]
754        ):
755            album_id = relationships.get("catalog", {})["data"][0]["id"]
756            attributes = relationships.get("catalog", {})["data"][0]["attributes"]
757        elif "attributes" in album_obj:
758            album_id = album_obj["id"]
759            attributes = album_obj["attributes"]
760        else:
761            album_id = album_obj["id"]
762            # No more details available other than the id, return an ItemMapping
763            return ItemMapping(
764                media_type=MediaType.ALBUM,
765                provider=self.instance_id,
766                item_id=album_id,
767                name=album_id,
768            )
769        is_available_in_catalog = attributes.get("url") is not None
770        if not is_available_in_catalog:
771            self.logger.debug(
772                "Skipping album %s. Album is not available in the Apple Music catalog.",
773                attributes.get("name"),
774            )
775            return None
776        name, version = parse_title_and_version(attributes["name"])
777        album = Album(
778            item_id=album_id,
779            provider=self.domain,
780            name=name,
781            version=version,
782            provider_mappings={
783                ProviderMapping(
784                    item_id=album_id,
785                    provider_domain=self.domain,
786                    provider_instance=self.instance_id,
787                    url=attributes.get("url"),
788                    available=attributes.get("playParams", {}).get("id") is not None,
789                )
790            },
791        )
792        if artists := relationships.get("artists"):
793            album.artists = UniqueList([self._parse_artist(artist) for artist in artists["data"]])
794        elif artist_name := attributes.get("artistName"):
795            album.artists = UniqueList(
796                [
797                    ItemMapping(
798                        media_type=MediaType.ARTIST,
799                        provider=self.instance_id,
800                        item_id=artist_name,
801                        name=artist_name,
802                    )
803                ]
804            )
805        if release_date := attributes.get("releaseDate"):
806            album.year = int(release_date.split("-")[0])
807        if genres := attributes.get("genreNames"):
808            album.metadata.genres = set(genres)
809        if artwork := attributes.get("artwork"):
810            album.metadata.add_image(
811                MediaItemImage(
812                    provider=self.instance_id,
813                    type=ImageType.THUMB,
814                    path=artwork["url"].format(w=artwork["width"], h=artwork["height"]),
815                    remotely_accessible=True,
816                )
817            )
818        if album_copyright := attributes.get("copyright"):
819            album.metadata.copyright = album_copyright
820        if record_label := attributes.get("recordLabel"):
821            album.metadata.label = record_label
822        if upc := attributes.get("upc"):
823            album.external_ids.add((ExternalID.BARCODE, "0" + upc))
824        if notes := attributes.get("editorialNotes"):
825            album.metadata.description = notes.get("standard") or notes.get("short")
826        if content_rating := attributes.get("contentRating"):
827            album.metadata.explicit = content_rating == "explicit"
828        album_type = AlbumType.ALBUM
829        if attributes.get("isSingle"):
830            album_type = AlbumType.SINGLE
831        elif attributes.get("isCompilation"):
832            album_type = AlbumType.COMPILATION
833        album.album_type = album_type
834
835        # Try inference - override if it finds something more specific
836        # Apple Music doesn't seem to have version field
837        inferred_type = infer_album_type(album.name, "")
838        if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
839            album.album_type = inferred_type
840        album.favorite = is_favourite or False
841        return album
842
843    def _parse_track(
844        self,
845        track_obj: dict[str, Any],
846        is_favourite: bool | None = None,
847    ) -> Track:
848        """Parse track object to generic layout."""
849        relationships = track_obj.get("relationships", {})
850        if (
851            track_obj.get("type") == "library-songs"
852            and relationships.get("catalog", {}).get("data", []) != []
853        ):
854            # Library track with catalog version available
855            track_id = relationships.get("catalog", {})["data"][0]["id"]
856            attributes = relationships.get("catalog", {})["data"][0]["attributes"]
857        elif "attributes" in track_obj:
858            # Catalog track or library-only track
859            track_id = track_obj["id"]
860            attributes = track_obj["attributes"]
861        else:
862            track_id = track_obj["id"]
863            attributes = {}
864        name, version = parse_title_and_version(attributes.get("name", ""))
865        track = Track(
866            item_id=track_id,
867            provider=self.domain,
868            name=name,
869            version=version,
870            duration=attributes.get("durationInMillis", 0) / 1000,
871            provider_mappings={
872                ProviderMapping(
873                    item_id=track_id,
874                    provider_domain=self.domain,
875                    provider_instance=self.instance_id,
876                    audio_format=AudioFormat(content_type=ContentType.AAC),
877                    url=attributes.get("url"),
878                    available=attributes.get("playParams", {}).get("id") is not None,
879                )
880            },
881        )
882        if disc_number := attributes.get("discNumber"):
883            track.disc_number = disc_number
884        if track_number := attributes.get("trackNumber"):
885            track.track_number = track_number
886        # Prefer catalog information over library information for artists.
887        # For compilations it picks the wrong artists
888        if "artists" in relationships:
889            artists = relationships["artists"]
890            track.artists = [self._parse_artist(artist) for artist in artists["data"]]
891        # 'Similar tracks' do not provide full artist details
892        elif artist_name := attributes.get("artistName"):
893            track.artists = [
894                ItemMapping(
895                    media_type=MediaType.ARTIST,
896                    item_id=artist_name,
897                    provider=self.instance_id,
898                    name=artist_name,
899                )
900            ]
901        if albums := relationships.get("albums"):
902            if "data" in albums and len(albums["data"]) > 0:
903                track.album = self._parse_album(albums["data"][0])
904        if artwork := attributes.get("artwork"):
905            track.metadata.add_image(
906                MediaItemImage(
907                    provider=self.instance_id,
908                    type=ImageType.THUMB,
909                    path=artwork["url"].format(w=artwork["width"], h=artwork["height"]),
910                    remotely_accessible=True,
911                )
912            )
913        if genres := attributes.get("genreNames"):
914            track.metadata.genres = set(genres)
915        if composers := attributes.get("composerName"):
916            track.metadata.performers = set(composers.split(", "))
917        if isrc := attributes.get("isrc"):
918            track.external_ids.add((ExternalID.ISRC, isrc))
919        track.favorite = is_favourite or False
920        return track
921
922    def _parse_playlist(
923        self, playlist_obj: dict[str, Any], is_favourite: bool | None = None
924    ) -> Playlist:
925        """Parse Apple Music playlist object to generic layout."""
926        attributes = playlist_obj["attributes"]
927        playlist_id = attributes["playParams"].get("globalId") or playlist_obj["id"]
928        is_editable = attributes.get("canEdit", False)
929        playlist = Playlist(
930            item_id=playlist_id,
931            provider=self.instance_id,
932            name=attributes.get("name", UNKNOWN_PLAYLIST_NAME),
933            owner=attributes.get("curatorName", "me"),
934            provider_mappings={
935                ProviderMapping(
936                    item_id=playlist_id,
937                    provider_domain=self.domain,
938                    provider_instance=self.instance_id,
939                    url=attributes.get("url"),
940                    is_unique=is_editable,  # user-owned playlists are unique
941                )
942            },
943            is_editable=is_editable,
944        )
945        if artwork := attributes.get("artwork"):
946            url = artwork["url"]
947            if artwork["width"] and artwork["height"]:
948                url = url.format(w=artwork["width"], h=artwork["height"])
949            playlist.metadata.add_image(
950                MediaItemImage(
951                    provider=self.instance_id,
952                    type=ImageType.THUMB,
953                    path=url,
954                    remotely_accessible=True,
955                )
956            )
957        if description := attributes.get("description"):
958            playlist.metadata.description = description.get("standard")
959        playlist.favorite = is_favourite or False
960        return playlist
961
962    async def _get_all_items(self, endpoint, key="data", **kwargs) -> list[dict]:
963        """Get all items from a paged list."""
964        limit = 50
965        offset = 0
966        all_items = []
967        while True:
968            kwargs["limit"] = limit
969            kwargs["offset"] = offset
970            result = await self._get_data(endpoint, **kwargs)
971            if key not in result:
972                break
973            all_items += result[key]
974            if not result.get("next"):
975                break
976            offset += limit
977        return all_items
978
979    @throttle_with_retries
980    async def _get_data(self, endpoint, **kwargs) -> dict[str, Any]:
981        """Get data from api."""
982        url = f"https://api.music.apple.com/v1/{endpoint}"
983        headers = {"Authorization": f"Bearer {self._music_app_token}"}
984        headers["Music-User-Token"] = self._music_user_token
985        async with (
986            self.mass.http_session.get(
987                url, headers=headers, params=kwargs, ssl=True, timeout=120
988            ) as response,
989        ):
990            if response.status == 404 and "limit" in kwargs and "offset" in kwargs:
991                return {}
992            # Convert HTTP errors to exceptions
993            if response.status == 404:
994                raise MediaNotFoundError(f"{endpoint} not found")
995            if response.status == 504:
996                # See if we can get more info from the response on occasional timeouts
997                self.logger.debug(
998                    "Apple Music API Timeout: url=%s, params=%s, response_headers=%s",
999                    url,
1000                    kwargs,
1001                    response.headers,
1002                )
1003                raise ResourceTemporarilyUnavailable("Apple Music API Timeout")
1004            if response.status == 429:
1005                # Debug this for now to see if the response headers give us info about the
1006                # backoff time. There is no documentation on this.
1007                self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1008                raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1009            if response.status == 500:
1010                raise MusicAssistantError("Unexpected server error when calling Apple Music")
1011            response.raise_for_status()
1012            return await response.json(loads=json_loads)
1013
1014    @throttle_with_retries
1015    async def _delete_data(self, endpoint, data=None, **kwargs) -> None:
1016        """Delete data from api."""
1017        url = f"https://api.music.apple.com/v1/{endpoint}"
1018        headers = {"Authorization": f"Bearer {self._music_app_token}"}
1019        headers["Music-User-Token"] = self._music_user_token
1020        async with (
1021            self.mass.http_session.delete(
1022                url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1023            ) as response,
1024        ):
1025            # Convert HTTP errors to exceptions
1026            if response.status == 404:
1027                raise MediaNotFoundError(f"{endpoint} not found")
1028            if response.status == 429:
1029                # Debug this for now to see if the response headers give us info about the
1030                # backoff time. There is no documentation on this.
1031                self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1032                raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1033            response.raise_for_status()
1034
1035    async def _put_data(self, endpoint, data=None, **kwargs) -> str:
1036        """Put data on api."""
1037        url = f"https://api.music.apple.com/v1/{endpoint}"
1038        headers = {"Authorization": f"Bearer {self._music_app_token}"}
1039        headers["Music-User-Token"] = self._music_user_token
1040        async with (
1041            self.mass.http_session.put(
1042                url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1043            ) as response,
1044        ):
1045            # Convert HTTP errors to exceptions
1046            if response.status == 404:
1047                raise MediaNotFoundError(f"{endpoint} not found")
1048            if response.status == 429:
1049                # Debug this for now to see if the response headers give us info about the
1050                # backoff time. There is no documentation on this.
1051                self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1052                raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1053            response.raise_for_status()
1054            if response.content_length:
1055                return await response.json(loads=json_loads)
1056            return {}
1057
1058    @throttle_with_retries
1059    async def _post_data(self, endpoint, data=None, **kwargs) -> str:
1060        """Post data on api."""
1061        url = f"https://api.music.apple.com/v1/{endpoint}"
1062        headers = {"Authorization": f"Bearer {self._music_app_token}"}
1063        headers["Music-User-Token"] = self._music_user_token
1064        async with (
1065            self.mass.http_session.post(
1066                url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1067            ) as response,
1068        ):
1069            # Convert HTTP errors to exceptions
1070            if response.status == 404:
1071                raise MediaNotFoundError(f"{endpoint} not found")
1072            if response.status == 429:
1073                # Debug this for now to see if the response headers give us info about the
1074                # backoff time. There is no documentation on this.
1075                self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1076                raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1077            response.raise_for_status()
1078            return await response.json(loads=json_loads)
1079
1080    async def _get_user_storefront(self) -> str:
1081        """Get the user's storefront."""
1082        locale = self.mass.metadata.locale.replace("_", "-")
1083        language = locale.split("-")[0]
1084        result = await self._get_data("me/storefront", l=language)
1085        return result["data"][0]["id"]
1086
1087    async def _get_ratings(self, item_ids: list[str], media_type: MediaType) -> dict[str, bool]:
1088        """Get ratings (aka favorites) for a list of item ids."""
1089        if media_type == MediaType.ARTIST:
1090            raise NotImplementedError(
1091                "Ratings are not available for artist in the Apple Music API."
1092            )
1093        if len(item_ids) == 0:
1094            return {}
1095        apple_type = self._translate_media_type_to_apple_type(media_type)
1096        endpoint = apple_type if not self.is_library_id(item_ids[0]) else f"library-{apple_type}"
1097        # Apple Music limits to 200 ids per request
1098        max_ids_per_request = 200
1099        results = {}
1100        for i in range(0, len(item_ids), max_ids_per_request):
1101            batch_ids = item_ids[i : i + max_ids_per_request]
1102            response = await self._get_data(
1103                f"me/ratings/{endpoint}",
1104                ids=",".join(batch_ids),
1105            )
1106            results.update(
1107                {
1108                    item["id"]: bool(item["attributes"].get("value", False) == 1)
1109                    for item in response.get("data", [])
1110                }
1111            )
1112        return results
1113
1114    def _translate_media_type_to_apple_type(self, media_type: MediaType) -> str:
1115        """Translate MediaType to Apple Music endpoint string."""
1116        match media_type:
1117            case MediaType.ARTIST:
1118                return "artists"
1119            case MediaType.ALBUM:
1120                return "albums"
1121            case MediaType.TRACK:
1122                return "songs"
1123            case MediaType.PLAYLIST:
1124                return "playlists"
1125        raise MusicAssistantError(f"Unsupported media type: {media_type}")
1126
1127    def is_library_id(self, library_id) -> bool:
1128        """Check a library ID matches known format."""
1129        if not isinstance(library_id, str):
1130            return False
1131        valid = re.findall(r"^(?:[a|i|l|p]{1}\.|pl\.u\-)[a-zA-Z0-9]+$", library_id)
1132        return bool(valid)
1133
1134    def _is_catalog_id(self, catalog_id: str) -> bool:
1135        """Check if input is a catalog id, or a library id."""
1136        return catalog_id.isnumeric() or catalog_id.startswith("pl.")
1137
1138    async def _fetch_song_stream_metadata(self, song_id: str) -> str:
1139        """Get the stream URL for a song from Apple Music."""
1140        playback_url = "https://play.music.apple.com/WebObjects/MZPlay.woa/wa/webPlayback"
1141        data = {}
1142        self.logger.debug("_fetch_song_stream_metadata: Check if Library ID: %s", song_id)
1143        if self.is_library_id(song_id):
1144            data["universalLibraryId"] = song_id
1145            data["isLibrary"] = True
1146        else:
1147            data["salableAdamId"] = song_id
1148        for retry in (True, False):
1149            try:
1150                async with self.mass.http_session.post(
1151                    playback_url, headers=self._get_decryption_headers(), json=data, ssl=True
1152                ) as response:
1153                    response.raise_for_status()
1154                    content = await response.json(loads=json_loads)
1155                    if content.get("failureType"):
1156                        message = content.get("failureMessage")
1157                        raise MediaNotFoundError(f"Failed to get song stream metadata: {message}")
1158                    return content["songList"][0]
1159            except (MediaNotFoundError, ClientError) as exc:
1160                if retry:
1161                    self.logger.warning("Failed to get song stream metadata: %s", exc)
1162                    continue
1163                raise
1164        raise MediaNotFoundError(f"Failed to get song stream metadata for {song_id}")
1165
1166    async def _parse_stream_url_and_uri(self, stream_assets: list[dict]) -> str:
1167        """Parse the Stream URL and Key URI from the song."""
1168        ctrp256_urls = [asset["URL"] for asset in stream_assets if asset["flavor"] == "28:ctrp256"]
1169        if len(ctrp256_urls) == 0:
1170            raise MediaNotFoundError("No ctrp256 URL found for song.")
1171        playlist_url = ctrp256_urls[0]
1172        playlist_items = await fetch_playlist(self.mass, ctrp256_urls[0], raise_on_hls=False)
1173        # Apple returns a HLS (substream) playlist but instead of chunks,
1174        # each item is just the whole file. So we simply grab the first playlist item.
1175        playlist_item = playlist_items[0]
1176        # path is relative, stitch it together
1177        base_path = playlist_url.rsplit("/", 1)[0]
1178        track_url = base_path + "/" + playlist_items[0].path
1179        key = playlist_item.key
1180        return (track_url, key)
1181
1182    def _get_decryption_headers(self):
1183        """Get headers for decryption requests."""
1184        return {
1185            "authorization": f"Bearer {self._music_app_token}",
1186            "media-user-token": self._music_user_token,
1187            "connection": "keep-alive",
1188            "accept": "application/json",
1189            "origin": "https://music.apple.com",
1190            "referer": "https://music.apple.com/",
1191            "accept-encoding": "gzip, deflate, br",
1192            "content-type": "application/json;charset=utf-8",
1193            "user-agent": (
1194                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
1195                " Chrome/110.0.0.0 Safari/537.36"
1196            ),
1197        }
1198
1199    async def _get_decryption_key(
1200        self, license_url: str, key_id: bytes, uri: str, item_id: str
1201    ) -> str:
1202        """Get the decryption key for a song."""
1203        if decryption_key := await self.mass.cache.get(
1204            key=item_id,
1205            provider=self.instance_id,
1206            category=CACHE_CATEGORY_DECRYPT_KEY,
1207            checksum=self._session_id,
1208        ):
1209            self.logger.debug("Decryption key for %s found in cache.", item_id)
1210            return decryption_key
1211        pssh = self._get_pssh(key_id)
1212        device = Device(
1213            client_id=self._decrypt_client_id,
1214            private_key=self._decrypt_private_key,
1215            type_=DeviceTypes.ANDROID,
1216            security_level=3,
1217            flags={},
1218        )
1219        cdm = Cdm.from_device(device)
1220        session_id = cdm.open()
1221        challenge = cdm.get_license_challenge(session_id, pssh)
1222        track_license = await self._get_license(challenge, license_url, uri, item_id)
1223        cdm.parse_license(session_id, track_license)
1224        key = next(key for key in cdm.get_keys(session_id) if key.type == "CONTENT")
1225        if not key:
1226            raise MediaNotFoundError("Unable to get decryption key for song %s.", item_id)
1227        cdm.close(session_id)
1228        decryption_key = key.key.hex()
1229        self.mass.create_task(
1230            self.mass.cache.set(
1231                key=item_id,
1232                data=decryption_key,
1233                expiration=3600,
1234                provider=self.instance_id,
1235                category=CACHE_CATEGORY_DECRYPT_KEY,
1236                checksum=self._session_id,
1237            )
1238        )
1239        return decryption_key
1240
1241    def _get_pssh(self, key_id: bytes) -> PSSH:
1242        """Get the PSSH for a song."""
1243        pssh_data = WidevinePsshData()
1244        pssh_data.algorithm = 1
1245        pssh_data.key_ids.append(key_id)
1246        init_data = base64.b64encode(pssh_data.SerializeToString()).decode("utf-8")
1247        return PSSH.new(system_id=PSSH.SystemId.Widevine, init_data=init_data)
1248
1249    async def _get_license(self, challenge: bytes, license_url: str, uri: str, item_id: str) -> str:
1250        """Get the license for a song based on the challenge."""
1251        challenge_b64 = base64.b64encode(challenge).decode("utf-8")
1252        data = {
1253            "challenge": challenge_b64,
1254            "key-system": "com.widevine.alpha",
1255            "uri": uri,
1256            "adamId": item_id,
1257            "isLibrary": False,
1258            "user-initiated": True,
1259        }
1260        async with self.mass.http_session.post(
1261            license_url, data=json.dumps(data), headers=self._get_decryption_headers(), ssl=False
1262        ) as response:
1263            response.raise_for_status()
1264            content = await response.json(loads=json_loads)
1265            track_license = content.get("license")
1266            if not track_license:
1267                raise MediaNotFoundError("No license found for song %s.", item_id)
1268            return track_license
1269