music-assistant-server

55.7 KBPY
__init__.py
55.7 KB1,282 lines • python
1"""
2Apple Music musicprovider support for MusicAssistant.
3
4TODO MUSIC_APP_TOKEN expires after 6 months so should have a distribution mechanism outside
5  compulsory application updates. It is only a semi-private key in JWT format so code be refreshed
6  daily by a GitHub action and downloaded by the provider each initialise.
7TODO Widevine keys can be obtained dynamically from Apple Music API rather than copied into Docker
8  build. This is undocumented but @maxlyth has a working example.
9TODO MUSIC_USER_TOKEN must be refreshed (~min 180 days) and needs mechanism to prompt user to
10  re-authenticate in browser.
11TODO Current provider ignores private tracks that are not available in the storefront catalog as
12  streamable url is derived from the catalog id. It is undecumented but @maxlyth has a working
13  example to get a streamable url from the library id.
14"""
15
16from __future__ import annotations
17
18import base64
19import json
20import os
21import pathlib
22import re
23import time
24from collections.abc import Sequence
25from typing import TYPE_CHECKING, Any
26
27import aiofiles
28from aiohttp import web
29from aiohttp.client_exceptions import ClientError
30from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
31from music_assistant_models.enums import (
32    AlbumType,
33    ConfigEntryType,
34    ContentType,
35    ExternalID,
36    ImageType,
37    MediaType,
38    ProviderFeature,
39    StreamType,
40)
41from music_assistant_models.errors import (
42    LoginFailed,
43    MediaNotFoundError,
44    MusicAssistantError,
45    ResourceTemporarilyUnavailable,
46)
47from music_assistant_models.media_items import (
48    Album,
49    Artist,
50    AudioFormat,
51    BrowseFolder,
52    ItemMapping,
53    MediaItemImage,
54    MediaItemType,
55    Playlist,
56    ProviderMapping,
57    SearchResults,
58    Track,
59    UniqueList,
60)
61from music_assistant_models.streamdetails import StreamDetails
62from pywidevine import PSSH, Cdm, Device, DeviceTypes
63from pywidevine.license_protocol_pb2 import WidevinePsshData
64from shortuuid import uuid
65
66from music_assistant.controllers.cache import use_cache
67from music_assistant.helpers.app_vars import app_var
68from music_assistant.helpers.auth import AuthenticationHelper
69from music_assistant.helpers.json import json_loads
70from music_assistant.helpers.playlists import fetch_playlist
71from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
72from music_assistant.helpers.util import infer_album_type, parse_title_and_version
73from music_assistant.models.music_provider import MusicProvider
74from music_assistant.providers.apple_music.helpers import browse_playlists
75
76if TYPE_CHECKING:
77    from collections.abc import AsyncGenerator
78
79    from music_assistant_models.config_entries import ProviderConfig
80    from music_assistant_models.provider import ProviderManifest
81
82    from music_assistant import MusicAssistant
83    from music_assistant.models import ProviderInstanceType
84
85
86SUPPORTED_FEATURES = {
87    ProviderFeature.LIBRARY_ARTISTS,
88    ProviderFeature.LIBRARY_ALBUMS,
89    ProviderFeature.LIBRARY_TRACKS,
90    ProviderFeature.LIBRARY_PLAYLISTS,
91    ProviderFeature.BROWSE,
92    ProviderFeature.SEARCH,
93    ProviderFeature.ARTIST_ALBUMS,
94    ProviderFeature.ARTIST_TOPTRACKS,
95    ProviderFeature.SIMILAR_TRACKS,
96    ProviderFeature.LIBRARY_ALBUMS_EDIT,
97    ProviderFeature.LIBRARY_ARTISTS_EDIT,
98    ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
99    ProviderFeature.LIBRARY_TRACKS_EDIT,
100    ProviderFeature.FAVORITE_ALBUMS_EDIT,
101    ProviderFeature.FAVORITE_TRACKS_EDIT,
102    ProviderFeature.FAVORITE_PLAYLISTS_EDIT,
103}
104
105MUSIC_APP_TOKEN = app_var(8)
106WIDEVINE_BASE_PATH = "/usr/local/bin/widevine_cdm"
107DECRYPT_CLIENT_ID_FILENAME = "client_id.bin"
108DECRYPT_PRIVATE_KEY_FILENAME = "private_key.pem"
109UNKNOWN_PLAYLIST_NAME = "Unknown Apple Music Playlist"
110CONF_MUSIC_APP_TOKEN = "music_app_token"
111CONF_MUSIC_USER_TOKEN = "music_user_token"
112CONF_MUSIC_USER_MANUAL_TOKEN = "music_user_manual_token"
113CONF_MUSIC_USER_TOKEN_TIMESTAMP = "music_user_token_timestamp"
114CACHE_CATEGORY_DECRYPT_KEY = 1
115MAX_ARTWORK_DIMENSION = 1000
116
117
118async def setup(
119    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
120) -> ProviderInstanceType:
121    """Initialize provider(instance) with given configuration."""
122    return AppleMusicProvider(mass, manifest, config, SUPPORTED_FEATURES)
123
124
125async def get_config_entries(
126    mass: MusicAssistant,
127    instance_id: str | None = None,
128    action: str | None = None,
129    values: dict[str, ConfigValueType] | None = None,
130) -> tuple[ConfigEntry, ...]:
131    """
132    Return Config entries to setup this provider.
133
134    instance_id: id of an existing provider instance (None if new instance setup).
135    action: [optional] action key called from config entries UI.
136    values: the (intermediate) raw values for config entries sent with the action.
137    """
138
139    def validate_user_token(token):
140        if not isinstance(token, str):
141            return False
142        valid = re.findall(r"[a-zA-Z0-9=/+]{32,}==$", token)
143        return bool(valid)
144
145    # Check for valid app token (1st with regex and then API check) otherwise display a config field
146    default_app_token_valid = False
147    async with (
148        mass.http_session.get(
149            "https://api.music.apple.com/v1/test",
150            headers={"Authorization": f"Bearer {MUSIC_APP_TOKEN}"},
151            ssl=True,
152            timeout=10,
153        ) as response,
154    ):
155        if response.status == 200:
156            values[CONF_MUSIC_APP_TOKEN] = f"{MUSIC_APP_TOKEN}"
157            default_app_token_valid = True
158
159    # Action is to launch MusicKit flow
160    if action == "CONF_ACTION_AUTH" and default_app_token_valid:
161        callback_method = "POST"
162        async with AuthenticationHelper(mass, values["session_id"], callback_method) as auth_helper:
163            callback_url = auth_helper.callback_url
164            flow_base_path = f"apple_music_auth/{values['session_id']}/"
165            flow_timeout = 600
166            parent_file_path = pathlib.Path(__file__).parent.resolve()
167            base_url = f"{mass.webserver.base_url}/{flow_base_path}"
168            flow_base_url = f"{base_url}index.html"
169
170            async def serve_mk_auth_page(request: web.Request) -> web.Response:
171                auth_html_path = parent_file_path.joinpath("musickit_auth/musickit_wrapper.html")
172                return web.FileResponse(
173                    auth_html_path,
174                    headers={"content-type": "text/html"},
175                )
176
177            async def serve_mk_auth_css(request: web.Request) -> web.Response:
178                auth_css_path = parent_file_path.joinpath("musickit_auth/musickit_wrapper.css")
179                return web.FileResponse(
180                    auth_css_path,
181                    headers={
182                        "content-type": "text/css",
183                    },
184                )
185
186            async def serve_mk_glue(request: web.Request) -> web.Response:
187                return_html = f"""
188                const return_url='{callback_url}';
189                const base_url='{base_url}';
190                const app_token='{values[CONF_MUSIC_APP_TOKEN]}';
191                const callback_method='{callback_method}';
192                const user_token='{
193                    values[CONF_MUSIC_USER_TOKEN]
194                    if validate_user_token(values[CONF_MUSIC_USER_TOKEN])
195                    else ""
196                }';
197                const user_token_timestamp='{values[CONF_MUSIC_USER_TOKEN_TIMESTAMP]}';
198                const flow_timeout={max([flow_timeout - 10, 60])};
199                const flow_start_time={int(time.time())};
200                const mass_version='{mass.version}';
201                """
202                return web.Response(
203                    body=return_html,
204                    headers={
205                        "content-type": "text/javascript",
206                    },
207                )
208
209            mass.webserver.register_dynamic_route(
210                f"/{flow_base_path}index.html", serve_mk_auth_page
211            )
212            mass.webserver.register_dynamic_route(f"/{flow_base_path}index.css", serve_mk_auth_css)
213            mass.webserver.register_dynamic_route(f"/{flow_base_path}index.js", serve_mk_glue)
214
215            try:
216                result = await auth_helper.authenticate(flow_base_url, flow_timeout)
217                values[CONF_MUSIC_USER_TOKEN] = result["music-user-token"]
218                values[CONF_MUSIC_USER_TOKEN_TIMESTAMP] = result["music-user-token-timestamp"]
219            except KeyError:
220                # no music-user-token URL param was found so likely user cancelled the auth
221                pass
222            except Exception as error:
223                raise LoginFailed(f"Failed to authenticate with Apple '{error}'.")
224            finally:
225                mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.html")
226                mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.css")
227                mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.js")
228
229    # ruff: noqa: ARG001
230    return (
231        ConfigEntry(
232            key=CONF_MUSIC_APP_TOKEN,
233            type=ConfigEntryType.SECURE_STRING,
234            label="MusicKit App Token",
235            hidden=default_app_token_valid,
236            required=True,
237            value=values.get(CONF_MUSIC_APP_TOKEN) if values else None,
238        ),
239        ConfigEntry(
240            key=CONF_MUSIC_USER_TOKEN,
241            type=ConfigEntryType.SECURE_STRING,
242            label="Music User Token",
243            required=False,
244            action="CONF_ACTION_AUTH",
245            description="Authenticate with Apple Music to retrieve a valid music user token.",
246            action_label="Authenticate with Apple Music",
247            value=values.get(CONF_MUSIC_USER_TOKEN)
248            if (
249                values
250                and isinstance(values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP), int)
251                and (
252                    values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP) > (time.time() - (3600 * 24 * 150))
253                )
254            )
255            else None,
256        ),
257        ConfigEntry(
258            key=CONF_MUSIC_USER_MANUAL_TOKEN,
259            type=ConfigEntryType.SECURE_STRING,
260            label="Manual Music User Token",
261            required=False,
262            advanced=True,
263            description=(
264                "Authenticate with a manual Music User Token in case the Authentication flow"
265                " is unsupported (e.g. when using child accounts)."
266            ),
267            help_link="https://www.music-assistant.io/music-providers/apple-music/",
268            value=values.get(CONF_MUSIC_USER_MANUAL_TOKEN),
269        ),
270        ConfigEntry(
271            key=CONF_MUSIC_USER_TOKEN_TIMESTAMP,
272            type=ConfigEntryType.INTEGER,
273            description="Timestamp music user token was updated.",
274            label="Music User Token Timestamp",
275            hidden=True,
276            required=True,
277            default_value=0,
278            value=values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP) if values else 0,
279        ),
280    )
281
282
283class AppleMusicProvider(MusicProvider):
284    """Implementation of an Apple Music MusicProvider."""
285
286    _music_user_token: str | None = None
287    _music_app_token: str | None = None
288    _storefront: str | None = None
289    _decrypt_client_id: bytes | None = None
290    _decrypt_private_key: bytes | None = None
291    _session_id: str | None = None
292    # rate limiter needs to be specified on provider-level,
293    # so make it an instance attribute
294    throttler = ThrottlerManager(rate_limit=1, period=2, initial_backoff=15)
295
296    async def handle_async_init(self) -> None:
297        """Handle async initialization of the provider."""
298        self._music_user_token = self.config.get_value(
299            CONF_MUSIC_USER_MANUAL_TOKEN
300        ) or self.config.get_value(CONF_MUSIC_USER_TOKEN)
301        self._music_app_token = self.config.get_value(CONF_MUSIC_APP_TOKEN)
302        self._storefront = await self._get_user_storefront()
303        # create random session id to use for decryption keys
304        # to invalidate cached keys on each provider initialization
305        self._session_id = str(uuid())
306        async with aiofiles.open(
307            os.path.join(WIDEVINE_BASE_PATH, DECRYPT_CLIENT_ID_FILENAME), "rb"
308        ) as _file:
309            self._decrypt_client_id = await _file.read()
310        async with aiofiles.open(
311            os.path.join(WIDEVINE_BASE_PATH, DECRYPT_PRIVATE_KEY_FILENAME), "rb"
312        ) as _file:
313            self._decrypt_private_key = await _file.read()
314
315    @use_cache()
316    async def search(
317        self, search_query: str, media_types: list[MediaType] | None, limit: int = 5
318    ) -> SearchResults:
319        """Perform search on musicprovider.
320
321        :param search_query: Search query.
322        :param media_types: A list of media_types to include. All types if None.
323        :param limit: Number of items to return in the search (per type).
324        """
325        endpoint = f"catalog/{self._storefront}/search"
326        # Apple music has a limit of 25 items for the search endpoint
327        limit = min(limit, 25)
328        searchresult = SearchResults()
329        searchtypes = []
330        if MediaType.ARTIST in media_types:
331            searchtypes.append("artists")
332        if MediaType.ALBUM in media_types:
333            searchtypes.append("albums")
334        if MediaType.TRACK in media_types:
335            searchtypes.append("songs")
336        if MediaType.PLAYLIST in media_types:
337            searchtypes.append("playlists")
338        if not searchtypes:
339            return searchresult
340        searchtype = ",".join(searchtypes)
341        search_query = search_query.replace("'", "")
342        response = await self._get_data(endpoint, term=search_query, types=searchtype, limit=limit)
343        if "artists" in response["results"]:
344            searchresult.artists += [
345                self._parse_artist(item) for item in response["results"]["artists"]["data"]
346            ]
347        if "albums" in response["results"]:
348            searchresult.albums += [
349                self._parse_album(item) for item in response["results"]["albums"]["data"]
350            ]
351        if "songs" in response["results"]:
352            searchresult.tracks += [
353                self._parse_track(item) for item in response["results"]["songs"]["data"]
354            ]
355        if "playlists" in response["results"]:
356            searchresult.playlists += [
357                self._parse_playlist(item) for item in response["results"]["playlists"]["data"]
358            ]
359        return searchresult
360
361    async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
362        """Browse Apple Music with support for playlist folders."""
363        if not path or "://" not in path:
364            return await super().browse(path)
365        sub_path = path.split("://", 1)[1]
366        path_parts = [part for part in sub_path.split("/") if part]
367        if path_parts and path_parts[0] == "playlists":
368            return await browse_playlists(self, path, path_parts)
369        return await super().browse(path)
370
371    async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
372        """Retrieve library artists from the provider."""
373        endpoint = "me/library/artists"
374        for item in await self._get_all_items(endpoint, include="catalog", extend="editorialNotes"):
375            if item and item["id"]:
376                yield self._parse_artist(item)
377
378    async def get_library_albums(self) -> AsyncGenerator[Album, None]:
379        """Retrieve library albums from the provider."""
380        endpoint = "me/library/albums"
381        album_items = await self._get_all_items(
382            endpoint, include="catalog,artists", extend="editorialNotes"
383        )
384        album_catalog_item_ids = [
385            item["id"]
386            for item in album_items
387            if item and item["id"] and not self.is_library_id(item["id"])
388        ]
389        album_library_item_ids = [
390            item["id"]
391            for item in album_items
392            if item and item["id"] and self.is_library_id(item["id"])
393        ]
394        rating_catalog_response = await self._get_ratings(album_catalog_item_ids, MediaType.ALBUM)
395        rating_library_response = await self._get_ratings(album_library_item_ids, MediaType.ALBUM)
396        for item in album_items:
397            if item and item["id"]:
398                is_favourite = (
399                    rating_catalog_response.get(item["id"])
400                    if not self.is_library_id(item["id"])
401                    else rating_library_response.get(item["id"])
402                )
403                album = self._parse_album(item, is_favourite)
404                if album:
405                    yield album
406
407    async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
408        """Retrieve library tracks from the provider."""
409        endpoint = "me/library/songs"
410        song_catalog_ids = []
411        library_only_tracks = []
412        for item in await self._get_all_items(endpoint):
413            catalog_id = item.get("attributes", {}).get("playParams", {}).get("catalogId")
414            if not catalog_id:
415                # Track is library-only (private/uploaded), use library ID instead
416                library_only_tracks.append(item)
417            else:
418                song_catalog_ids.append(catalog_id)
419        # Obtain catalog info per 150 songs, the documented limit of 300 results in a 504 timeout
420        max_limit = 150
421        for i in range(0, len(song_catalog_ids), max_limit):
422            catalog_ids = song_catalog_ids[i : i + max_limit]
423            catalog_endpoint = f"catalog/{self._storefront}/songs"
424            response = await self._get_data(
425                catalog_endpoint, ids=",".join(catalog_ids), include="artists,albums"
426            )
427            # Fetch ratings for this batch
428            rating_response = await self._get_ratings(catalog_ids, MediaType.TRACK)
429            for item in response["data"]:
430                is_favourite = rating_response.get(item["id"])
431                track = self._parse_track(item, is_favourite)
432                yield track
433        # Yield library-only tracks using their library metadata
434        library_ids = [item["id"] for item in library_only_tracks if item and item["id"]]
435        library_rating_response = await self._get_ratings(library_ids, MediaType.TRACK)
436        for item in library_only_tracks:
437            is_favourite = library_rating_response.get(item["id"])
438            yield self._parse_track(item, is_favourite)
439
440    async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
441        """Retrieve playlists from the provider."""
442        endpoint = "me/library/playlists"
443        playlist_items = await self._get_all_items(endpoint)
444        playlist_library_item_ids = [
445            item["id"]
446            for item in playlist_items
447            if item and item["id"] and self.is_library_id(item["id"])
448        ]
449        rating_library_response = await self._get_ratings(
450            playlist_library_item_ids, MediaType.PLAYLIST
451        )
452        for item in playlist_items:
453            is_favourite = rating_library_response.get(item["id"])
454            # Prefer catalog information over library information in case of public playlists
455            if item["attributes"]["hasCatalog"]:
456                yield await self.get_playlist(
457                    item["attributes"]["playParams"]["globalId"], is_favourite
458                )
459            elif item and item["id"]:
460                yield self._parse_playlist(item, is_favourite)
461
462    @use_cache()
463    async def get_artist(self, prov_artist_id) -> Artist:
464        """Get full artist details by id."""
465        endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}"
466        response = await self._get_data(endpoint, extend="editorialNotes")
467        return self._parse_artist(response["data"][0])
468
469    @use_cache()
470    async def get_album(self, prov_album_id) -> Album:
471        """Get full album details by id."""
472        endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}"
473        response = await self._get_data(endpoint, include="artists")
474        rating_response = await self._get_ratings([prov_album_id], MediaType.ALBUM)
475        is_favourite = rating_response.get(prov_album_id)
476        return self._parse_album(response["data"][0], is_favourite)
477
478    @use_cache()
479    async def get_track(self, prov_track_id) -> Track:
480        """Get full track details by id."""
481        endpoint = f"catalog/{self._storefront}/songs/{prov_track_id}"
482        response = await self._get_data(endpoint, include="artists,albums")
483        rating_response = await self._get_ratings([prov_track_id], MediaType.TRACK)
484        is_favourite = rating_response.get(prov_track_id)
485        return self._parse_track(response["data"][0], is_favourite)
486
487    @use_cache()
488    async def get_playlist(self, prov_playlist_id, is_favourite: bool = False) -> Playlist:
489        """Get full playlist details by id."""
490        if not self.is_library_id(prov_playlist_id):
491            endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}"
492        else:
493            endpoint = f"me/library/playlists/{prov_playlist_id}"
494        response = await self._get_data(endpoint)
495        return self._parse_playlist(response["data"][0], is_favourite)
496
497    @use_cache()
498    async def get_album_tracks(self, prov_album_id) -> list[Track]:
499        """Get all album tracks for given album id."""
500        endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}/tracks"
501        response = await self._get_data(endpoint, include="artists")
502        # Including albums results in a 504 error, so we need to fetch the album separately
503        album = await self.get_album(prov_album_id)
504        track_ids = [track_obj["id"] for track_obj in response["data"] if "id" in track_obj]
505        rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
506        tracks = []
507        for track_obj in response["data"]:
508            if "id" not in track_obj:
509                continue
510            track = self._parse_track(track_obj, rating_response.get(track_obj["id"]))
511            track.album = album
512            tracks.append(track)
513        return tracks
514
515    @use_cache(3600 * 3)  # cache for 3 hours
516    async def get_playlist_tracks(self, prov_playlist_id, page: int = 0) -> list[Track]:
517        """Get all playlist tracks for given playlist id."""
518        if self._is_catalog_id(prov_playlist_id):
519            endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}/tracks"
520        else:
521            endpoint = f"me/library/playlists/{prov_playlist_id}/tracks"
522        result = []
523        page_size = 100
524        offset = page * page_size
525        response = await self._get_data(
526            endpoint, include="artists,catalog", limit=page_size, offset=offset
527        )
528        if not response or "data" not in response:
529            return result
530        playlist_track_ids = [track["id"] for track in response["data"] if track and track["id"]]
531        rating_response = await self._get_ratings(playlist_track_ids, MediaType.TRACK)
532        for index, track in enumerate(response["data"]):
533            if track and track["id"]:
534                is_favourite = rating_response.get(track["id"])
535                parsed_track = self._parse_track(track, is_favourite)
536                parsed_track.position = offset + index + 1
537                result.append(parsed_track)
538        return result
539
540    @use_cache(3600 * 24 * 7)  # cache for 7 days
541    async def get_artist_albums(self, prov_artist_id) -> list[Album]:
542        """Get a list of all albums for the given artist."""
543        endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}/albums"
544        try:
545            response = await self._get_all_items(endpoint)
546        except MediaNotFoundError:
547            # Some artists do not have albums, return empty list
548            self.logger.info("No albums found for artist %s", prov_artist_id)
549            return []
550        album_ids = [album["id"] for album in response if album["id"]]
551        rating_response = await self._get_ratings(album_ids, MediaType.ALBUM)
552        albums = []
553        for album in response:
554            if not album["id"]:
555                continue
556            is_favourite = rating_response.get(album["id"])
557            parsed_album = self._parse_album(album, is_favourite)
558            if parsed_album:
559                albums.append(parsed_album)
560        return albums
561
562    @use_cache(3600 * 24 * 7)  # cache for 7 days
563    async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
564        """Get a list of 10 most popular tracks for the given artist."""
565        endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}/view/top-songs"
566        try:
567            response = await self._get_data(endpoint)
568        except MediaNotFoundError:
569            # Some artists do not have top tracks, return empty list
570            self.logger.info("No top tracks found for artist %s", prov_artist_id)
571            return []
572        track_ids = [track["id"] for track in response["data"] if track["id"]]
573        rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
574        tracks = []
575        for track in response["data"]:
576            if not track["id"]:
577                continue
578            is_favourite = rating_response.get(track["id"])
579            tracks.append(self._parse_track(track, is_favourite))
580        return tracks
581
582    async def library_add(self, item: MediaItemType) -> None:
583        """Add item to library."""
584        item_type = self._translate_media_type_to_apple_type(item.media_type)
585        kwargs = {
586            f"ids[{item_type}]": item.item_id,
587        }
588        await self._post_data("me/library/", **kwargs)
589
590    async def library_remove(self, prov_item_id, media_type: MediaType) -> None:
591        """Remove item from library."""
592        self.logger.warning(
593            "Deleting items from your library is not yet supported by the Apple Music API. "
594            f"Skipping deletion of {media_type} - {prov_item_id}."
595        )
596
597    async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]):
598        """Add track(s) to playlist."""
599        endpoint = f"me/library/playlists/{prov_playlist_id}/tracks"
600        data = {
601            "data": [
602                {
603                    "id": track_id,
604                    "type": "library-songs" if self.is_library_id(track_id) else "songs",
605                }
606                for track_id in prov_track_ids
607            ]
608        }
609        await self._post_data(endpoint, data=data)
610
611    async def remove_playlist_tracks(
612        self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
613    ) -> None:
614        """Remove track(s) from playlist."""
615        self.logger.warning(
616            "Removing tracks from playlists is not supported by the Apple Music "
617            "API. Make sure to delete them using the Apple Music app."
618        )
619
620    @use_cache(3600 * 24)  # cache for 24 hours
621    async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
622        """Retrieve a dynamic list of tracks based on the provided item."""
623        # Note, Apple music does not have an official endpoint for similar tracks.
624        # We will use the next-tracks endpoint to get a list of tracks that are similar to the
625        # provided track. However, Apple music only provides 2 tracks at a time, so we will
626        # need to call the endpoint multiple times. Therefore, set a limit to 6 to prevent
627        # flooding the apple music api.
628        limit = 6
629        endpoint = f"me/stations/next-tracks/ra.{prov_track_id}"
630        found_tracks = []
631        while len(found_tracks) < limit:
632            response = await self._post_data(endpoint, include="artists")
633            if not response or "data" not in response:
634                break
635            track_ids = [track["id"] for track in response["data"] if track and track["id"]]
636            rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
637            for track in response["data"]:
638                if track and track["id"]:
639                    is_favourite = rating_response.get(track["id"])
640                    found_tracks.append(self._parse_track(track, is_favourite))
641        return found_tracks
642
643    async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
644        """Return the content details for the given track when it will be streamed."""
645        stream_metadata = await self._fetch_song_stream_metadata(item_id)
646        if self.is_library_id(item_id):
647            # Library items are not encrypted and do not need decryption keys
648            try:
649                stream_url = stream_metadata["assets"][0]["URL"]
650            except (KeyError, IndexError, TypeError) as exc:
651                raise MediaNotFoundError(
652                    f"Failed to extract stream URL for library track {item_id}: {exc}"
653                ) from exc
654            return StreamDetails(
655                item_id=item_id,
656                provider=self.instance_id,
657                path=stream_url,
658                stream_type=StreamType.HTTP,
659                audio_format=AudioFormat(content_type=ContentType.UNKNOWN),
660                can_seek=True,
661                allow_seek=True,
662            )
663        # Continue to obtain decryption keys for catalog items
664        license_url = stream_metadata["hls-key-server-url"]
665        stream_url, uri = await self._parse_stream_url_and_uri(stream_metadata["assets"])
666        if not stream_url or not uri:
667            raise MediaNotFoundError("No stream URL found for song.")
668        key_id = base64.b64decode(uri.split(",")[1])
669        return StreamDetails(
670            item_id=item_id,
671            provider=self.instance_id,
672            audio_format=AudioFormat(content_type=ContentType.MP4, codec_type=ContentType.AAC),
673            stream_type=StreamType.ENCRYPTED_HTTP,
674            decryption_key=await self._get_decryption_key(license_url, key_id, uri, item_id),
675            path=stream_url,
676            can_seek=True,
677            allow_seek=True,
678        )
679
680    async def set_favorite(self, prov_item_id: str, media_type: MediaType, favorite: bool) -> None:
681        """Set the favorite status of an item."""
682        data = {
683            "type": "ratings",
684            "attributes": {
685                "value": 1 if favorite else -1,
686            },
687        }
688        item_type = self._translate_media_type_to_apple_type(media_type)
689        if self._is_catalog_id(prov_item_id):
690            endpoint = f"me/ratings/{item_type}/{prov_item_id}"
691        else:
692            endpoint = f"me/ratings/library-{item_type}/{prov_item_id}"
693        await self._put_data(endpoint, data=data)
694
695    def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
696        """Parse artist object to generic layout."""
697        relationships = artist_obj.get("relationships", {})
698        if (
699            artist_obj.get("type") == "library-artists"
700            and relationships.get("catalog", {}).get("data", []) != []
701        ):
702            artist_id = relationships["catalog"]["data"][0]["id"]
703            attributes = relationships["catalog"]["data"][0]["attributes"]
704        elif "attributes" in artist_obj:
705            artist_id = artist_obj["id"]
706            attributes = artist_obj["attributes"]
707        else:
708            artist_id = artist_obj["id"]
709            self.logger.debug("No attributes found for artist %s", artist_obj)
710            # No more details available other than the id, return an ItemMapping
711            return ItemMapping(
712                media_type=MediaType.ARTIST,
713                provider=self.instance_id,
714                item_id=artist_id,
715                name=artist_id,
716            )
717        artist = Artist(
718            item_id=artist_id,
719            name=attributes.get("name"),
720            provider=self.domain,
721            provider_mappings={
722                ProviderMapping(
723                    item_id=artist_id,
724                    provider_domain=self.domain,
725                    provider_instance=self.instance_id,
726                    url=attributes.get("url"),
727                )
728            },
729        )
730        if artwork := attributes.get("artwork"):
731            artist.metadata.add_image(
732                MediaItemImage(
733                    provider=self.instance_id,
734                    type=ImageType.THUMB,
735                    path=artwork["url"].format(
736                        w=min(artwork["width"], MAX_ARTWORK_DIMENSION),
737                        h=min(artwork["height"], MAX_ARTWORK_DIMENSION),
738                    ),
739                    remotely_accessible=True,
740                )
741            )
742        if genres := attributes.get("genreNames"):
743            artist.metadata.genres = set(genres)
744        if notes := attributes.get("editorialNotes"):
745            artist.metadata.description = notes.get("standard") or notes.get("short")
746        return artist
747
748    def _parse_album(
749        self, album_obj: dict, is_favourite: bool | None = None
750    ) -> Album | ItemMapping | None:
751        """Parse album object to generic layout."""
752        relationships = album_obj.get("relationships", {})
753        response_type = album_obj.get("type")
754        if (
755            response_type == "library-albums"
756            and relationships["catalog"]["data"] != []
757            and "attributes" in relationships["catalog"]["data"][0]
758        ):
759            album_id = relationships.get("catalog", {})["data"][0]["id"]
760            attributes = relationships.get("catalog", {})["data"][0]["attributes"]
761        elif "attributes" in album_obj:
762            album_id = album_obj["id"]
763            attributes = album_obj["attributes"]
764        else:
765            album_id = album_obj["id"]
766            # No more details available other than the id, return an ItemMapping
767            return ItemMapping(
768                media_type=MediaType.ALBUM,
769                provider=self.instance_id,
770                item_id=album_id,
771                name=album_id,
772            )
773        is_available_in_catalog = attributes.get("url") is not None
774        if not is_available_in_catalog:
775            self.logger.debug(
776                "Skipping album %s. Album is not available in the Apple Music catalog.",
777                attributes.get("name"),
778            )
779            return None
780        name, version = parse_title_and_version(attributes["name"])
781        album = Album(
782            item_id=album_id,
783            provider=self.domain,
784            name=name,
785            version=version,
786            provider_mappings={
787                ProviderMapping(
788                    item_id=album_id,
789                    provider_domain=self.domain,
790                    provider_instance=self.instance_id,
791                    url=attributes.get("url"),
792                    available=attributes.get("playParams", {}).get("id") is not None,
793                )
794            },
795        )
796        if artists := relationships.get("artists"):
797            album.artists = UniqueList([self._parse_artist(artist) for artist in artists["data"]])
798        elif artist_name := attributes.get("artistName"):
799            album.artists = UniqueList(
800                [
801                    ItemMapping(
802                        media_type=MediaType.ARTIST,
803                        provider=self.instance_id,
804                        item_id=artist_name,
805                        name=artist_name,
806                    )
807                ]
808            )
809        if release_date := attributes.get("releaseDate"):
810            album.year = int(release_date.split("-")[0])
811        if genres := attributes.get("genreNames"):
812            album.metadata.genres = set(genres)
813        if artwork := attributes.get("artwork"):
814            album.metadata.add_image(
815                MediaItemImage(
816                    provider=self.instance_id,
817                    type=ImageType.THUMB,
818                    path=artwork["url"].format(
819                        w=min(artwork["width"], MAX_ARTWORK_DIMENSION),
820                        h=min(artwork["height"], MAX_ARTWORK_DIMENSION),
821                    ),
822                    remotely_accessible=True,
823                )
824            )
825        if album_copyright := attributes.get("copyright"):
826            album.metadata.copyright = album_copyright
827        if record_label := attributes.get("recordLabel"):
828            album.metadata.label = record_label
829        if upc := attributes.get("upc"):
830            album.external_ids.add((ExternalID.BARCODE, "0" + upc))
831        if notes := attributes.get("editorialNotes"):
832            album.metadata.description = notes.get("standard") or notes.get("short")
833        if content_rating := attributes.get("contentRating"):
834            album.metadata.explicit = content_rating == "explicit"
835        album_type = AlbumType.ALBUM
836        if attributes.get("isSingle"):
837            album_type = AlbumType.SINGLE
838        elif attributes.get("isCompilation"):
839            album_type = AlbumType.COMPILATION
840        album.album_type = album_type
841
842        # Try inference - override if it finds something more specific
843        # Apple Music doesn't seem to have version field
844        inferred_type = infer_album_type(album.name, "")
845        if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
846            album.album_type = inferred_type
847        album.favorite = is_favourite or False
848        return album
849
850    def _parse_track(
851        self,
852        track_obj: dict[str, Any],
853        is_favourite: bool | None = None,
854    ) -> Track:
855        """Parse track object to generic layout."""
856        relationships = track_obj.get("relationships", {})
857        if (
858            track_obj.get("type") == "library-songs"
859            and relationships.get("catalog", {}).get("data", []) != []
860        ):
861            # Library track with catalog version available
862            track_id = relationships.get("catalog", {})["data"][0]["id"]
863            attributes = relationships.get("catalog", {})["data"][0]["attributes"]
864        elif "attributes" in track_obj:
865            # Catalog track or library-only track
866            track_id = track_obj["id"]
867            attributes = track_obj["attributes"]
868        else:
869            track_id = track_obj["id"]
870            attributes = {}
871        name, version = parse_title_and_version(attributes.get("name", ""))
872        track = Track(
873            item_id=track_id,
874            provider=self.domain,
875            name=name,
876            version=version,
877            duration=attributes.get("durationInMillis", 0) / 1000,
878            provider_mappings={
879                ProviderMapping(
880                    item_id=track_id,
881                    provider_domain=self.domain,
882                    provider_instance=self.instance_id,
883                    audio_format=AudioFormat(content_type=ContentType.AAC),
884                    url=attributes.get("url"),
885                    available=attributes.get("playParams", {}).get("id") is not None,
886                )
887            },
888        )
889        if disc_number := attributes.get("discNumber"):
890            track.disc_number = disc_number
891        if track_number := attributes.get("trackNumber"):
892            track.track_number = track_number
893        # Prefer catalog information over library information for artists.
894        # For compilations it picks the wrong artists
895        if "artists" in relationships:
896            artists = relationships["artists"]
897            track.artists = [self._parse_artist(artist) for artist in artists["data"]]
898        # 'Similar tracks' do not provide full artist details
899        elif artist_name := attributes.get("artistName"):
900            track.artists = [
901                ItemMapping(
902                    media_type=MediaType.ARTIST,
903                    item_id=artist_name,
904                    provider=self.instance_id,
905                    name=artist_name,
906                )
907            ]
908        if albums := relationships.get("albums"):
909            if "data" in albums and len(albums["data"]) > 0:
910                track.album = self._parse_album(albums["data"][0])
911        if artwork := attributes.get("artwork"):
912            track.metadata.add_image(
913                MediaItemImage(
914                    provider=self.instance_id,
915                    type=ImageType.THUMB,
916                    path=artwork["url"].format(
917                        w=min(artwork["width"], MAX_ARTWORK_DIMENSION),
918                        h=min(artwork["height"], MAX_ARTWORK_DIMENSION),
919                    ),
920                    remotely_accessible=True,
921                )
922            )
923        if genres := attributes.get("genreNames"):
924            track.metadata.genres = set(genres)
925        if composers := attributes.get("composerName"):
926            track.metadata.performers = set(composers.split(", "))
927        if isrc := attributes.get("isrc"):
928            track.external_ids.add((ExternalID.ISRC, isrc))
929        track.favorite = is_favourite or False
930        return track
931
932    def _parse_playlist(
933        self, playlist_obj: dict[str, Any], is_favourite: bool | None = None
934    ) -> Playlist:
935        """Parse Apple Music playlist object to generic layout."""
936        attributes = playlist_obj["attributes"]
937        playlist_id = attributes["playParams"].get("globalId") or playlist_obj["id"]
938        is_editable = attributes.get("canEdit", False)
939        playlist = Playlist(
940            item_id=playlist_id,
941            provider=self.instance_id,
942            name=attributes.get("name", UNKNOWN_PLAYLIST_NAME),
943            owner=attributes.get("curatorName", "me"),
944            provider_mappings={
945                ProviderMapping(
946                    item_id=playlist_id,
947                    provider_domain=self.domain,
948                    provider_instance=self.instance_id,
949                    url=attributes.get("url"),
950                    is_unique=is_editable,  # user-owned playlists are unique
951                )
952            },
953            is_editable=is_editable,
954        )
955        if artwork := attributes.get("artwork"):
956            url = artwork["url"]
957            if artwork["width"] and artwork["height"]:
958                url = url.format(
959                    w=min(artwork["width"], MAX_ARTWORK_DIMENSION),
960                    h=min(artwork["height"], MAX_ARTWORK_DIMENSION),
961                )
962            playlist.metadata.add_image(
963                MediaItemImage(
964                    provider=self.instance_id,
965                    type=ImageType.THUMB,
966                    path=url,
967                    remotely_accessible=True,
968                )
969            )
970        if description := attributes.get("description"):
971            playlist.metadata.description = description.get("standard")
972        playlist.favorite = is_favourite or False
973        return playlist
974
975    async def _get_all_items(self, endpoint, key="data", **kwargs) -> list[dict]:
976        """Get all items from a paged list."""
977        limit = 50
978        offset = 0
979        all_items = []
980        while True:
981            kwargs["limit"] = limit
982            kwargs["offset"] = offset
983            result = await self._get_data(endpoint, **kwargs)
984            if key not in result:
985                break
986            all_items += result[key]
987            if not result.get("next"):
988                break
989            offset += limit
990        return all_items
991
992    @throttle_with_retries
993    async def _get_data(self, endpoint, **kwargs) -> dict[str, Any]:
994        """Get data from api."""
995        url = f"https://api.music.apple.com/v1/{endpoint}"
996        headers = {"Authorization": f"Bearer {self._music_app_token}"}
997        headers["Music-User-Token"] = self._music_user_token
998        async with (
999            self.mass.http_session.get(
1000                url, headers=headers, params=kwargs, ssl=True, timeout=120
1001            ) as response,
1002        ):
1003            if response.status == 404 and "limit" in kwargs and "offset" in kwargs:
1004                return {}
1005            # Convert HTTP errors to exceptions
1006            if response.status == 404:
1007                raise MediaNotFoundError(f"{endpoint} not found")
1008            if response.status == 504:
1009                # See if we can get more info from the response on occasional timeouts
1010                self.logger.debug(
1011                    "Apple Music API Timeout: url=%s, params=%s, response_headers=%s",
1012                    url,
1013                    kwargs,
1014                    response.headers,
1015                )
1016                raise ResourceTemporarilyUnavailable("Apple Music API Timeout")
1017            if response.status == 429:
1018                # Debug this for now to see if the response headers give us info about the
1019                # backoff time. There is no documentation on this.
1020                self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1021                raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1022            if response.status == 500:
1023                raise MusicAssistantError("Unexpected server error when calling Apple Music")
1024            response.raise_for_status()
1025            return await response.json(loads=json_loads)
1026
1027    @throttle_with_retries
1028    async def _delete_data(self, endpoint, data=None, **kwargs) -> None:
1029        """Delete data from api."""
1030        url = f"https://api.music.apple.com/v1/{endpoint}"
1031        headers = {"Authorization": f"Bearer {self._music_app_token}"}
1032        headers["Music-User-Token"] = self._music_user_token
1033        async with (
1034            self.mass.http_session.delete(
1035                url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1036            ) as response,
1037        ):
1038            # Convert HTTP errors to exceptions
1039            if response.status == 404:
1040                raise MediaNotFoundError(f"{endpoint} not found")
1041            if response.status == 429:
1042                # Debug this for now to see if the response headers give us info about the
1043                # backoff time. There is no documentation on this.
1044                self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1045                raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1046            response.raise_for_status()
1047
1048    async def _put_data(self, endpoint, data=None, **kwargs) -> str:
1049        """Put data on api."""
1050        url = f"https://api.music.apple.com/v1/{endpoint}"
1051        headers = {"Authorization": f"Bearer {self._music_app_token}"}
1052        headers["Music-User-Token"] = self._music_user_token
1053        async with (
1054            self.mass.http_session.put(
1055                url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1056            ) as response,
1057        ):
1058            # Convert HTTP errors to exceptions
1059            if response.status == 404:
1060                raise MediaNotFoundError(f"{endpoint} not found")
1061            if response.status == 429:
1062                # Debug this for now to see if the response headers give us info about the
1063                # backoff time. There is no documentation on this.
1064                self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1065                raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1066            response.raise_for_status()
1067            if response.content_length:
1068                return await response.json(loads=json_loads)
1069            return {}
1070
1071    @throttle_with_retries
1072    async def _post_data(self, endpoint, data=None, **kwargs) -> str:
1073        """Post data on api."""
1074        url = f"https://api.music.apple.com/v1/{endpoint}"
1075        headers = {"Authorization": f"Bearer {self._music_app_token}"}
1076        headers["Music-User-Token"] = self._music_user_token
1077        async with (
1078            self.mass.http_session.post(
1079                url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1080            ) as response,
1081        ):
1082            # Convert HTTP errors to exceptions
1083            if response.status == 404:
1084                raise MediaNotFoundError(f"{endpoint} not found")
1085            if response.status == 429:
1086                # Debug this for now to see if the response headers give us info about the
1087                # backoff time. There is no documentation on this.
1088                self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1089                raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1090            response.raise_for_status()
1091            return await response.json(loads=json_loads)
1092
1093    async def _get_user_storefront(self) -> str:
1094        """Get the user's storefront."""
1095        locale = self.mass.metadata.locale.replace("_", "-")
1096        language = locale.split("-")[0]
1097        result = await self._get_data("me/storefront", l=language)
1098        return result["data"][0]["id"]
1099
1100    async def _get_ratings(self, item_ids: list[str], media_type: MediaType) -> dict[str, bool]:
1101        """Get ratings (aka favorites) for a list of item ids."""
1102        if media_type == MediaType.ARTIST:
1103            raise NotImplementedError(
1104                "Ratings are not available for artist in the Apple Music API."
1105            )
1106        if len(item_ids) == 0:
1107            return {}
1108        apple_type = self._translate_media_type_to_apple_type(media_type)
1109        endpoint = apple_type if not self.is_library_id(item_ids[0]) else f"library-{apple_type}"
1110        # Apple Music limits to 200 ids per request
1111        max_ids_per_request = 200
1112        results = {}
1113        for i in range(0, len(item_ids), max_ids_per_request):
1114            batch_ids = item_ids[i : i + max_ids_per_request]
1115            response = await self._get_data(
1116                f"me/ratings/{endpoint}",
1117                ids=",".join(batch_ids),
1118            )
1119            results.update(
1120                {
1121                    item["id"]: bool(item["attributes"].get("value", False) == 1)
1122                    for item in response.get("data", [])
1123                }
1124            )
1125        return results
1126
1127    def _translate_media_type_to_apple_type(self, media_type: MediaType) -> str:
1128        """Translate MediaType to Apple Music endpoint string."""
1129        match media_type:
1130            case MediaType.ARTIST:
1131                return "artists"
1132            case MediaType.ALBUM:
1133                return "albums"
1134            case MediaType.TRACK:
1135                return "songs"
1136            case MediaType.PLAYLIST:
1137                return "playlists"
1138        raise MusicAssistantError(f"Unsupported media type: {media_type}")
1139
1140    def is_library_id(self, library_id) -> bool:
1141        """Check a library ID matches known format."""
1142        if not isinstance(library_id, str):
1143            return False
1144        valid = re.findall(r"^(?:[ailp]\.)[a-zA-Z0-9]+$", library_id)
1145        return bool(valid)
1146
1147    def _is_catalog_id(self, catalog_id: str) -> bool:
1148        """Check if input is a catalog id, or a library id."""
1149        return catalog_id.isnumeric() or catalog_id.startswith("pl.")
1150
1151    async def _fetch_song_stream_metadata(self, song_id: str) -> str:
1152        """Get the stream URL for a song from Apple Music."""
1153        playback_url = "https://play.music.apple.com/WebObjects/MZPlay.woa/wa/webPlayback"
1154        data = {}
1155        self.logger.debug("_fetch_song_stream_metadata: Check if Library ID: %s", song_id)
1156        if self.is_library_id(song_id):
1157            data["universalLibraryId"] = song_id
1158            data["isLibrary"] = True
1159        else:
1160            data["salableAdamId"] = song_id
1161        for retry in (True, False):
1162            try:
1163                async with self.mass.http_session.post(
1164                    playback_url, headers=self._get_decryption_headers(), json=data, ssl=True
1165                ) as response:
1166                    response.raise_for_status()
1167                    content = await response.json(loads=json_loads)
1168                    if content.get("failureType"):
1169                        message = content.get("failureMessage")
1170                        raise MediaNotFoundError(f"Failed to get song stream metadata: {message}")
1171                    return content["songList"][0]
1172            except (MediaNotFoundError, ClientError) as exc:
1173                if retry:
1174                    self.logger.warning("Failed to get song stream metadata: %s", exc)
1175                    continue
1176                raise
1177        raise MediaNotFoundError(f"Failed to get song stream metadata for {song_id}")
1178
1179    async def _parse_stream_url_and_uri(self, stream_assets: list[dict]) -> str:
1180        """Parse the Stream URL and Key URI from the song."""
1181        ctrp256_urls = [asset["URL"] for asset in stream_assets if asset["flavor"] == "28:ctrp256"]
1182        if len(ctrp256_urls) == 0:
1183            raise MediaNotFoundError("No ctrp256 URL found for song.")
1184        playlist_url = ctrp256_urls[0]
1185        playlist_items = await fetch_playlist(self.mass, ctrp256_urls[0], raise_on_hls=False)
1186        # Apple returns a HLS (substream) playlist but instead of chunks,
1187        # each item is just the whole file. So we simply grab the first playlist item.
1188        playlist_item = playlist_items[0]
1189        # path is relative, stitch it together
1190        base_path = playlist_url.rsplit("/", 1)[0]
1191        track_url = base_path + "/" + playlist_items[0].path
1192        key = playlist_item.key
1193        return (track_url, key)
1194
1195    def _get_decryption_headers(self):
1196        """Get headers for decryption requests."""
1197        return {
1198            "authorization": f"Bearer {self._music_app_token}",
1199            "media-user-token": self._music_user_token,
1200            "connection": "keep-alive",
1201            "accept": "application/json",
1202            "origin": "https://music.apple.com",
1203            "referer": "https://music.apple.com/",
1204            "accept-encoding": "gzip, deflate, br",
1205            "content-type": "application/json;charset=utf-8",
1206            "user-agent": (
1207                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
1208                " Chrome/110.0.0.0 Safari/537.36"
1209            ),
1210        }
1211
1212    async def _get_decryption_key(
1213        self, license_url: str, key_id: bytes, uri: str, item_id: str
1214    ) -> str:
1215        """Get the decryption key for a song."""
1216        if decryption_key := await self.mass.cache.get(
1217            key=item_id,
1218            provider=self.instance_id,
1219            category=CACHE_CATEGORY_DECRYPT_KEY,
1220            checksum=self._session_id,
1221        ):
1222            self.logger.debug("Decryption key for %s found in cache.", item_id)
1223            return decryption_key
1224        pssh = self._get_pssh(key_id)
1225        device = Device(
1226            client_id=self._decrypt_client_id,
1227            private_key=self._decrypt_private_key,
1228            type_=DeviceTypes.ANDROID,
1229            security_level=3,
1230            flags={},
1231        )
1232        cdm = Cdm.from_device(device)
1233        session_id = cdm.open()
1234        challenge = cdm.get_license_challenge(session_id, pssh)
1235        track_license = await self._get_license(challenge, license_url, uri, item_id)
1236        cdm.parse_license(session_id, track_license)
1237        key = next(key for key in cdm.get_keys(session_id) if key.type == "CONTENT")
1238        if not key:
1239            raise MediaNotFoundError("Unable to get decryption key for song %s.", item_id)
1240        cdm.close(session_id)
1241        decryption_key = key.key.hex()
1242        self.mass.create_task(
1243            self.mass.cache.set(
1244                key=item_id,
1245                data=decryption_key,
1246                expiration=3600,
1247                provider=self.instance_id,
1248                category=CACHE_CATEGORY_DECRYPT_KEY,
1249                checksum=self._session_id,
1250            )
1251        )
1252        return decryption_key
1253
1254    def _get_pssh(self, key_id: bytes) -> PSSH:
1255        """Get the PSSH for a song."""
1256        pssh_data = WidevinePsshData()
1257        pssh_data.algorithm = 1
1258        pssh_data.key_ids.append(key_id)
1259        init_data = base64.b64encode(pssh_data.SerializeToString()).decode("utf-8")
1260        return PSSH.new(system_id=PSSH.SystemId.Widevine, init_data=init_data)
1261
1262    async def _get_license(self, challenge: bytes, license_url: str, uri: str, item_id: str) -> str:
1263        """Get the license for a song based on the challenge."""
1264        challenge_b64 = base64.b64encode(challenge).decode("utf-8")
1265        data = {
1266            "challenge": challenge_b64,
1267            "key-system": "com.widevine.alpha",
1268            "uri": uri,
1269            "adamId": item_id,
1270            "isLibrary": False,
1271            "user-initiated": True,
1272        }
1273        async with self.mass.http_session.post(
1274            license_url, data=json.dumps(data), headers=self._get_decryption_headers(), ssl=False
1275        ) as response:
1276            response.raise_for_status()
1277            content = await response.json(loads=json_loads)
1278            track_license = content.get("license")
1279            if not track_license:
1280                raise MediaNotFoundError("No license found for song %s.", item_id)
1281            return track_license
1282