/
/
/
1"""
2Apple Music musicprovider support for MusicAssistant.
3
4TODO MUSIC_APP_TOKEN expires after 6 months so should have a distribution mechanism outside
5 compulsory application updates. It is only a semi-private key in JWT format so code be refreshed
6 daily by a GitHub action and downloaded by the provider each initialise.
7TODO Widevine keys can be obtained dynamically from Apple Music API rather than copied into Docker
8 build. This is undocumented but @maxlyth has a working example.
9TODO MUSIC_USER_TOKEN must be refreshed (~min 180 days) and needs mechanism to prompt user to
10 re-authenticate in browser.
11TODO Current provider ignores private tracks that are not available in the storefront catalog as
12 streamable url is derived from the catalog id. It is undecumented but @maxlyth has a working
13 example to get a streamable url from the library id.
14"""
15
16from __future__ import annotations
17
18import base64
19import json
20import os
21import pathlib
22import re
23import time
24from collections.abc import Sequence
25from typing import TYPE_CHECKING, Any
26
27import aiofiles
28from aiohttp import web
29from aiohttp.client_exceptions import ClientError
30from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
31from music_assistant_models.enums import (
32 AlbumType,
33 ConfigEntryType,
34 ContentType,
35 ExternalID,
36 ImageType,
37 MediaType,
38 ProviderFeature,
39 StreamType,
40)
41from music_assistant_models.errors import (
42 LoginFailed,
43 MediaNotFoundError,
44 MusicAssistantError,
45 ResourceTemporarilyUnavailable,
46)
47from music_assistant_models.media_items import (
48 Album,
49 Artist,
50 AudioFormat,
51 BrowseFolder,
52 ItemMapping,
53 MediaItemImage,
54 MediaItemType,
55 Playlist,
56 ProviderMapping,
57 SearchResults,
58 Track,
59 UniqueList,
60)
61from music_assistant_models.streamdetails import StreamDetails
62from pywidevine import PSSH, Cdm, Device, DeviceTypes
63from pywidevine.license_protocol_pb2 import WidevinePsshData
64from shortuuid import uuid
65
66from music_assistant.controllers.cache import use_cache
67from music_assistant.helpers.app_vars import app_var
68from music_assistant.helpers.auth import AuthenticationHelper
69from music_assistant.helpers.json import json_loads
70from music_assistant.helpers.playlists import fetch_playlist
71from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
72from music_assistant.helpers.util import infer_album_type, parse_title_and_version
73from music_assistant.models.music_provider import MusicProvider
74from music_assistant.providers.apple_music.helpers import browse_playlists
75
76if TYPE_CHECKING:
77 from collections.abc import AsyncGenerator
78
79 from music_assistant_models.config_entries import ProviderConfig
80 from music_assistant_models.provider import ProviderManifest
81
82 from music_assistant import MusicAssistant
83 from music_assistant.models import ProviderInstanceType
84
85
86SUPPORTED_FEATURES = {
87 ProviderFeature.LIBRARY_ARTISTS,
88 ProviderFeature.LIBRARY_ALBUMS,
89 ProviderFeature.LIBRARY_TRACKS,
90 ProviderFeature.LIBRARY_PLAYLISTS,
91 ProviderFeature.BROWSE,
92 ProviderFeature.SEARCH,
93 ProviderFeature.ARTIST_ALBUMS,
94 ProviderFeature.ARTIST_TOPTRACKS,
95 ProviderFeature.SIMILAR_TRACKS,
96 ProviderFeature.LIBRARY_ALBUMS_EDIT,
97 ProviderFeature.LIBRARY_ARTISTS_EDIT,
98 ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
99 ProviderFeature.LIBRARY_TRACKS_EDIT,
100 ProviderFeature.FAVORITE_ALBUMS_EDIT,
101 ProviderFeature.FAVORITE_TRACKS_EDIT,
102 ProviderFeature.FAVORITE_PLAYLISTS_EDIT,
103}
104
105MUSIC_APP_TOKEN = app_var(8)
106WIDEVINE_BASE_PATH = "/usr/local/bin/widevine_cdm"
107DECRYPT_CLIENT_ID_FILENAME = "client_id.bin"
108DECRYPT_PRIVATE_KEY_FILENAME = "private_key.pem"
109UNKNOWN_PLAYLIST_NAME = "Unknown Apple Music Playlist"
110CONF_MUSIC_APP_TOKEN = "music_app_token"
111CONF_MUSIC_USER_TOKEN = "music_user_token"
112CONF_MUSIC_USER_MANUAL_TOKEN = "music_user_manual_token"
113CONF_MUSIC_USER_TOKEN_TIMESTAMP = "music_user_token_timestamp"
114CACHE_CATEGORY_DECRYPT_KEY = 1
115MAX_ARTWORK_DIMENSION = 1000
116
117
118async def setup(
119 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
120) -> ProviderInstanceType:
121 """Initialize provider(instance) with given configuration."""
122 return AppleMusicProvider(mass, manifest, config, SUPPORTED_FEATURES)
123
124
125async def get_config_entries(
126 mass: MusicAssistant,
127 instance_id: str | None = None,
128 action: str | None = None,
129 values: dict[str, ConfigValueType] | None = None,
130) -> tuple[ConfigEntry, ...]:
131 """
132 Return Config entries to setup this provider.
133
134 instance_id: id of an existing provider instance (None if new instance setup).
135 action: [optional] action key called from config entries UI.
136 values: the (intermediate) raw values for config entries sent with the action.
137 """
138
139 def validate_user_token(token):
140 if not isinstance(token, str):
141 return False
142 valid = re.findall(r"[a-zA-Z0-9=/+]{32,}==$", token)
143 return bool(valid)
144
145 # Check for valid app token (1st with regex and then API check) otherwise display a config field
146 default_app_token_valid = False
147 async with (
148 mass.http_session.get(
149 "https://api.music.apple.com/v1/test",
150 headers={"Authorization": f"Bearer {MUSIC_APP_TOKEN}"},
151 ssl=True,
152 timeout=10,
153 ) as response,
154 ):
155 if response.status == 200:
156 values[CONF_MUSIC_APP_TOKEN] = f"{MUSIC_APP_TOKEN}"
157 default_app_token_valid = True
158
159 # Action is to launch MusicKit flow
160 if action == "CONF_ACTION_AUTH" and default_app_token_valid:
161 callback_method = "POST"
162 async with AuthenticationHelper(mass, values["session_id"], callback_method) as auth_helper:
163 callback_url = auth_helper.callback_url
164 flow_base_path = f"apple_music_auth/{values['session_id']}/"
165 flow_timeout = 600
166 parent_file_path = pathlib.Path(__file__).parent.resolve()
167 base_url = f"{mass.webserver.base_url}/{flow_base_path}"
168 flow_base_url = f"{base_url}index.html"
169
170 async def serve_mk_auth_page(request: web.Request) -> web.Response:
171 auth_html_path = parent_file_path.joinpath("musickit_auth/musickit_wrapper.html")
172 return web.FileResponse(
173 auth_html_path,
174 headers={"content-type": "text/html"},
175 )
176
177 async def serve_mk_auth_css(request: web.Request) -> web.Response:
178 auth_css_path = parent_file_path.joinpath("musickit_auth/musickit_wrapper.css")
179 return web.FileResponse(
180 auth_css_path,
181 headers={
182 "content-type": "text/css",
183 },
184 )
185
186 async def serve_mk_glue(request: web.Request) -> web.Response:
187 return_html = f"""
188 const return_url='{callback_url}';
189 const base_url='{base_url}';
190 const app_token='{values[CONF_MUSIC_APP_TOKEN]}';
191 const callback_method='{callback_method}';
192 const user_token='{
193 values[CONF_MUSIC_USER_TOKEN]
194 if validate_user_token(values[CONF_MUSIC_USER_TOKEN])
195 else ""
196 }';
197 const user_token_timestamp='{values[CONF_MUSIC_USER_TOKEN_TIMESTAMP]}';
198 const flow_timeout={max([flow_timeout - 10, 60])};
199 const flow_start_time={int(time.time())};
200 const mass_version='{mass.version}';
201 """
202 return web.Response(
203 body=return_html,
204 headers={
205 "content-type": "text/javascript",
206 },
207 )
208
209 mass.webserver.register_dynamic_route(
210 f"/{flow_base_path}index.html", serve_mk_auth_page
211 )
212 mass.webserver.register_dynamic_route(f"/{flow_base_path}index.css", serve_mk_auth_css)
213 mass.webserver.register_dynamic_route(f"/{flow_base_path}index.js", serve_mk_glue)
214
215 try:
216 result = await auth_helper.authenticate(flow_base_url, flow_timeout)
217 values[CONF_MUSIC_USER_TOKEN] = result["music-user-token"]
218 values[CONF_MUSIC_USER_TOKEN_TIMESTAMP] = result["music-user-token-timestamp"]
219 except KeyError:
220 # no music-user-token URL param was found so likely user cancelled the auth
221 pass
222 except Exception as error:
223 raise LoginFailed(f"Failed to authenticate with Apple '{error}'.")
224 finally:
225 mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.html")
226 mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.css")
227 mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.js")
228
229 # ruff: noqa: ARG001
230 return (
231 ConfigEntry(
232 key=CONF_MUSIC_APP_TOKEN,
233 type=ConfigEntryType.SECURE_STRING,
234 label="MusicKit App Token",
235 hidden=default_app_token_valid,
236 required=True,
237 value=values.get(CONF_MUSIC_APP_TOKEN) if values else None,
238 ),
239 ConfigEntry(
240 key=CONF_MUSIC_USER_TOKEN,
241 type=ConfigEntryType.SECURE_STRING,
242 label="Music User Token",
243 required=False,
244 action="CONF_ACTION_AUTH",
245 description="Authenticate with Apple Music to retrieve a valid music user token.",
246 action_label="Authenticate with Apple Music",
247 value=values.get(CONF_MUSIC_USER_TOKEN)
248 if (
249 values
250 and isinstance(values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP), int)
251 and (
252 values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP) > (time.time() - (3600 * 24 * 150))
253 )
254 )
255 else None,
256 ),
257 ConfigEntry(
258 key=CONF_MUSIC_USER_MANUAL_TOKEN,
259 type=ConfigEntryType.SECURE_STRING,
260 label="Manual Music User Token",
261 required=False,
262 advanced=True,
263 description=(
264 "Authenticate with a manual Music User Token in case the Authentication flow"
265 " is unsupported (e.g. when using child accounts)."
266 ),
267 help_link="https://www.music-assistant.io/music-providers/apple-music/",
268 value=values.get(CONF_MUSIC_USER_MANUAL_TOKEN),
269 ),
270 ConfigEntry(
271 key=CONF_MUSIC_USER_TOKEN_TIMESTAMP,
272 type=ConfigEntryType.INTEGER,
273 description="Timestamp music user token was updated.",
274 label="Music User Token Timestamp",
275 hidden=True,
276 required=True,
277 default_value=0,
278 value=values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP) if values else 0,
279 ),
280 )
281
282
283class AppleMusicProvider(MusicProvider):
284 """Implementation of an Apple Music MusicProvider."""
285
286 _music_user_token: str | None = None
287 _music_app_token: str | None = None
288 _storefront: str | None = None
289 _decrypt_client_id: bytes | None = None
290 _decrypt_private_key: bytes | None = None
291 _session_id: str | None = None
292 # rate limiter needs to be specified on provider-level,
293 # so make it an instance attribute
294 throttler = ThrottlerManager(rate_limit=1, period=2, initial_backoff=15)
295
296 async def handle_async_init(self) -> None:
297 """Handle async initialization of the provider."""
298 self._music_user_token = self.config.get_value(
299 CONF_MUSIC_USER_MANUAL_TOKEN
300 ) or self.config.get_value(CONF_MUSIC_USER_TOKEN)
301 self._music_app_token = self.config.get_value(CONF_MUSIC_APP_TOKEN)
302 self._storefront = await self._get_user_storefront()
303 # create random session id to use for decryption keys
304 # to invalidate cached keys on each provider initialization
305 self._session_id = str(uuid())
306 async with aiofiles.open(
307 os.path.join(WIDEVINE_BASE_PATH, DECRYPT_CLIENT_ID_FILENAME), "rb"
308 ) as _file:
309 self._decrypt_client_id = await _file.read()
310 async with aiofiles.open(
311 os.path.join(WIDEVINE_BASE_PATH, DECRYPT_PRIVATE_KEY_FILENAME), "rb"
312 ) as _file:
313 self._decrypt_private_key = await _file.read()
314
315 @use_cache()
316 async def search(
317 self, search_query: str, media_types: list[MediaType] | None, limit: int = 5
318 ) -> SearchResults:
319 """Perform search on musicprovider.
320
321 :param search_query: Search query.
322 :param media_types: A list of media_types to include. All types if None.
323 :param limit: Number of items to return in the search (per type).
324 """
325 endpoint = f"catalog/{self._storefront}/search"
326 # Apple music has a limit of 25 items for the search endpoint
327 limit = min(limit, 25)
328 searchresult = SearchResults()
329 searchtypes = []
330 if MediaType.ARTIST in media_types:
331 searchtypes.append("artists")
332 if MediaType.ALBUM in media_types:
333 searchtypes.append("albums")
334 if MediaType.TRACK in media_types:
335 searchtypes.append("songs")
336 if MediaType.PLAYLIST in media_types:
337 searchtypes.append("playlists")
338 if not searchtypes:
339 return searchresult
340 searchtype = ",".join(searchtypes)
341 search_query = search_query.replace("'", "")
342 response = await self._get_data(endpoint, term=search_query, types=searchtype, limit=limit)
343 if "artists" in response["results"]:
344 searchresult.artists += [
345 self._parse_artist(item) for item in response["results"]["artists"]["data"]
346 ]
347 if "albums" in response["results"]:
348 searchresult.albums += [
349 self._parse_album(item) for item in response["results"]["albums"]["data"]
350 ]
351 if "songs" in response["results"]:
352 searchresult.tracks += [
353 self._parse_track(item) for item in response["results"]["songs"]["data"]
354 ]
355 if "playlists" in response["results"]:
356 searchresult.playlists += [
357 self._parse_playlist(item) for item in response["results"]["playlists"]["data"]
358 ]
359 return searchresult
360
361 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
362 """Browse Apple Music with support for playlist folders."""
363 if not path or "://" not in path:
364 return await super().browse(path)
365 sub_path = path.split("://", 1)[1]
366 path_parts = [part for part in sub_path.split("/") if part]
367 if path_parts and path_parts[0] == "playlists":
368 return await browse_playlists(self, path, path_parts)
369 return await super().browse(path)
370
371 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
372 """Retrieve library artists from the provider."""
373 endpoint = "me/library/artists"
374 for item in await self._get_all_items(endpoint, include="catalog", extend="editorialNotes"):
375 if item and item["id"]:
376 yield self._parse_artist(item)
377
378 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
379 """Retrieve library albums from the provider."""
380 endpoint = "me/library/albums"
381 album_items = await self._get_all_items(
382 endpoint, include="catalog,artists", extend="editorialNotes"
383 )
384 album_catalog_item_ids = [
385 item["id"]
386 for item in album_items
387 if item and item["id"] and not self.is_library_id(item["id"])
388 ]
389 album_library_item_ids = [
390 item["id"]
391 for item in album_items
392 if item and item["id"] and self.is_library_id(item["id"])
393 ]
394 rating_catalog_response = await self._get_ratings(album_catalog_item_ids, MediaType.ALBUM)
395 rating_library_response = await self._get_ratings(album_library_item_ids, MediaType.ALBUM)
396 for item in album_items:
397 if item and item["id"]:
398 is_favourite = (
399 rating_catalog_response.get(item["id"])
400 if not self.is_library_id(item["id"])
401 else rating_library_response.get(item["id"])
402 )
403 album = self._parse_album(item, is_favourite)
404 if album:
405 yield album
406
407 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
408 """Retrieve library tracks from the provider."""
409 endpoint = "me/library/songs"
410 song_catalog_ids = []
411 library_only_tracks = []
412 for item in await self._get_all_items(endpoint):
413 catalog_id = item.get("attributes", {}).get("playParams", {}).get("catalogId")
414 if not catalog_id:
415 # Track is library-only (private/uploaded), use library ID instead
416 library_only_tracks.append(item)
417 else:
418 song_catalog_ids.append(catalog_id)
419 # Obtain catalog info per 150 songs, the documented limit of 300 results in a 504 timeout
420 max_limit = 150
421 for i in range(0, len(song_catalog_ids), max_limit):
422 catalog_ids = song_catalog_ids[i : i + max_limit]
423 catalog_endpoint = f"catalog/{self._storefront}/songs"
424 response = await self._get_data(
425 catalog_endpoint, ids=",".join(catalog_ids), include="artists,albums"
426 )
427 # Fetch ratings for this batch
428 rating_response = await self._get_ratings(catalog_ids, MediaType.TRACK)
429 for item in response["data"]:
430 is_favourite = rating_response.get(item["id"])
431 track = self._parse_track(item, is_favourite)
432 yield track
433 # Yield library-only tracks using their library metadata
434 library_ids = [item["id"] for item in library_only_tracks if item and item["id"]]
435 library_rating_response = await self._get_ratings(library_ids, MediaType.TRACK)
436 for item in library_only_tracks:
437 is_favourite = library_rating_response.get(item["id"])
438 yield self._parse_track(item, is_favourite)
439
440 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
441 """Retrieve playlists from the provider."""
442 endpoint = "me/library/playlists"
443 playlist_items = await self._get_all_items(endpoint)
444 playlist_library_item_ids = [
445 item["id"]
446 for item in playlist_items
447 if item and item["id"] and self.is_library_id(item["id"])
448 ]
449 rating_library_response = await self._get_ratings(
450 playlist_library_item_ids, MediaType.PLAYLIST
451 )
452 for item in playlist_items:
453 is_favourite = rating_library_response.get(item["id"])
454 # Prefer catalog information over library information in case of public playlists
455 if item["attributes"]["hasCatalog"]:
456 yield await self.get_playlist(
457 item["attributes"]["playParams"]["globalId"], is_favourite
458 )
459 elif item and item["id"]:
460 yield self._parse_playlist(item, is_favourite)
461
462 @use_cache()
463 async def get_artist(self, prov_artist_id) -> Artist:
464 """Get full artist details by id."""
465 endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}"
466 response = await self._get_data(endpoint, extend="editorialNotes")
467 return self._parse_artist(response["data"][0])
468
469 @use_cache()
470 async def get_album(self, prov_album_id) -> Album:
471 """Get full album details by id."""
472 endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}"
473 response = await self._get_data(endpoint, include="artists")
474 rating_response = await self._get_ratings([prov_album_id], MediaType.ALBUM)
475 is_favourite = rating_response.get(prov_album_id)
476 return self._parse_album(response["data"][0], is_favourite)
477
478 @use_cache()
479 async def get_track(self, prov_track_id) -> Track:
480 """Get full track details by id."""
481 endpoint = f"catalog/{self._storefront}/songs/{prov_track_id}"
482 response = await self._get_data(endpoint, include="artists,albums")
483 rating_response = await self._get_ratings([prov_track_id], MediaType.TRACK)
484 is_favourite = rating_response.get(prov_track_id)
485 return self._parse_track(response["data"][0], is_favourite)
486
487 @use_cache()
488 async def get_playlist(self, prov_playlist_id, is_favourite: bool = False) -> Playlist:
489 """Get full playlist details by id."""
490 if not self.is_library_id(prov_playlist_id):
491 endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}"
492 else:
493 endpoint = f"me/library/playlists/{prov_playlist_id}"
494 response = await self._get_data(endpoint)
495 return self._parse_playlist(response["data"][0], is_favourite)
496
497 @use_cache()
498 async def get_album_tracks(self, prov_album_id) -> list[Track]:
499 """Get all album tracks for given album id."""
500 endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}/tracks"
501 response = await self._get_data(endpoint, include="artists")
502 # Including albums results in a 504 error, so we need to fetch the album separately
503 album = await self.get_album(prov_album_id)
504 track_ids = [track_obj["id"] for track_obj in response["data"] if "id" in track_obj]
505 rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
506 tracks = []
507 for track_obj in response["data"]:
508 if "id" not in track_obj:
509 continue
510 track = self._parse_track(track_obj, rating_response.get(track_obj["id"]))
511 track.album = album
512 tracks.append(track)
513 return tracks
514
515 @use_cache(3600 * 3) # cache for 3 hours
516 async def get_playlist_tracks(self, prov_playlist_id, page: int = 0) -> list[Track]:
517 """Get all playlist tracks for given playlist id."""
518 if self._is_catalog_id(prov_playlist_id):
519 endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}/tracks"
520 else:
521 endpoint = f"me/library/playlists/{prov_playlist_id}/tracks"
522 result = []
523 page_size = 100
524 offset = page * page_size
525 response = await self._get_data(
526 endpoint, include="artists,catalog", limit=page_size, offset=offset
527 )
528 if not response or "data" not in response:
529 return result
530 playlist_track_ids = [track["id"] for track in response["data"] if track and track["id"]]
531 rating_response = await self._get_ratings(playlist_track_ids, MediaType.TRACK)
532 for index, track in enumerate(response["data"]):
533 if track and track["id"]:
534 is_favourite = rating_response.get(track["id"])
535 parsed_track = self._parse_track(track, is_favourite)
536 parsed_track.position = offset + index + 1
537 result.append(parsed_track)
538 return result
539
540 @use_cache(3600 * 24 * 7) # cache for 7 days
541 async def get_artist_albums(self, prov_artist_id) -> list[Album]:
542 """Get a list of all albums for the given artist."""
543 endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}/albums"
544 try:
545 response = await self._get_all_items(endpoint)
546 except MediaNotFoundError:
547 # Some artists do not have albums, return empty list
548 self.logger.info("No albums found for artist %s", prov_artist_id)
549 return []
550 album_ids = [album["id"] for album in response if album["id"]]
551 rating_response = await self._get_ratings(album_ids, MediaType.ALBUM)
552 albums = []
553 for album in response:
554 if not album["id"]:
555 continue
556 is_favourite = rating_response.get(album["id"])
557 parsed_album = self._parse_album(album, is_favourite)
558 if parsed_album:
559 albums.append(parsed_album)
560 return albums
561
562 @use_cache(3600 * 24 * 7) # cache for 7 days
563 async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
564 """Get a list of 10 most popular tracks for the given artist."""
565 endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}/view/top-songs"
566 try:
567 response = await self._get_data(endpoint)
568 except MediaNotFoundError:
569 # Some artists do not have top tracks, return empty list
570 self.logger.info("No top tracks found for artist %s", prov_artist_id)
571 return []
572 track_ids = [track["id"] for track in response["data"] if track["id"]]
573 rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
574 tracks = []
575 for track in response["data"]:
576 if not track["id"]:
577 continue
578 is_favourite = rating_response.get(track["id"])
579 tracks.append(self._parse_track(track, is_favourite))
580 return tracks
581
582 async def library_add(self, item: MediaItemType) -> None:
583 """Add item to library."""
584 item_type = self._translate_media_type_to_apple_type(item.media_type)
585 kwargs = {
586 f"ids[{item_type}]": item.item_id,
587 }
588 await self._post_data("me/library/", **kwargs)
589
590 async def library_remove(self, prov_item_id, media_type: MediaType) -> None:
591 """Remove item from library."""
592 self.logger.warning(
593 "Deleting items from your library is not yet supported by the Apple Music API. "
594 f"Skipping deletion of {media_type} - {prov_item_id}."
595 )
596
597 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]):
598 """Add track(s) to playlist."""
599 endpoint = f"me/library/playlists/{prov_playlist_id}/tracks"
600 data = {
601 "data": [
602 {
603 "id": track_id,
604 "type": "library-songs" if self.is_library_id(track_id) else "songs",
605 }
606 for track_id in prov_track_ids
607 ]
608 }
609 await self._post_data(endpoint, data=data)
610
611 async def remove_playlist_tracks(
612 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
613 ) -> None:
614 """Remove track(s) from playlist."""
615 self.logger.warning(
616 "Removing tracks from playlists is not supported by the Apple Music "
617 "API. Make sure to delete them using the Apple Music app."
618 )
619
620 @use_cache(3600 * 24) # cache for 24 hours
621 async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
622 """Retrieve a dynamic list of tracks based on the provided item."""
623 # Note, Apple music does not have an official endpoint for similar tracks.
624 # We will use the next-tracks endpoint to get a list of tracks that are similar to the
625 # provided track. However, Apple music only provides 2 tracks at a time, so we will
626 # need to call the endpoint multiple times. Therefore, set a limit to 6 to prevent
627 # flooding the apple music api.
628 limit = 6
629 endpoint = f"me/stations/next-tracks/ra.{prov_track_id}"
630 found_tracks = []
631 while len(found_tracks) < limit:
632 response = await self._post_data(endpoint, include="artists")
633 if not response or "data" not in response:
634 break
635 track_ids = [track["id"] for track in response["data"] if track and track["id"]]
636 rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
637 for track in response["data"]:
638 if track and track["id"]:
639 is_favourite = rating_response.get(track["id"])
640 found_tracks.append(self._parse_track(track, is_favourite))
641 return found_tracks
642
643 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
644 """Return the content details for the given track when it will be streamed."""
645 stream_metadata = await self._fetch_song_stream_metadata(item_id)
646 if self.is_library_id(item_id):
647 # Library items are not encrypted and do not need decryption keys
648 try:
649 stream_url = stream_metadata["assets"][0]["URL"]
650 except (KeyError, IndexError, TypeError) as exc:
651 raise MediaNotFoundError(
652 f"Failed to extract stream URL for library track {item_id}: {exc}"
653 ) from exc
654 return StreamDetails(
655 item_id=item_id,
656 provider=self.instance_id,
657 path=stream_url,
658 stream_type=StreamType.HTTP,
659 audio_format=AudioFormat(content_type=ContentType.UNKNOWN),
660 can_seek=True,
661 allow_seek=True,
662 )
663 # Continue to obtain decryption keys for catalog items
664 license_url = stream_metadata["hls-key-server-url"]
665 stream_url, uri = await self._parse_stream_url_and_uri(stream_metadata["assets"])
666 if not stream_url or not uri:
667 raise MediaNotFoundError("No stream URL found for song.")
668 key_id = base64.b64decode(uri.split(",")[1])
669 return StreamDetails(
670 item_id=item_id,
671 provider=self.instance_id,
672 audio_format=AudioFormat(content_type=ContentType.MP4, codec_type=ContentType.AAC),
673 stream_type=StreamType.ENCRYPTED_HTTP,
674 decryption_key=await self._get_decryption_key(license_url, key_id, uri, item_id),
675 path=stream_url,
676 can_seek=True,
677 allow_seek=True,
678 )
679
680 async def set_favorite(self, prov_item_id: str, media_type: MediaType, favorite: bool) -> None:
681 """Set the favorite status of an item."""
682 data = {
683 "type": "ratings",
684 "attributes": {
685 "value": 1 if favorite else -1,
686 },
687 }
688 item_type = self._translate_media_type_to_apple_type(media_type)
689 if self._is_catalog_id(prov_item_id):
690 endpoint = f"me/ratings/{item_type}/{prov_item_id}"
691 else:
692 endpoint = f"me/ratings/library-{item_type}/{prov_item_id}"
693 await self._put_data(endpoint, data=data)
694
695 def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
696 """Parse artist object to generic layout."""
697 relationships = artist_obj.get("relationships", {})
698 if (
699 artist_obj.get("type") == "library-artists"
700 and relationships.get("catalog", {}).get("data", []) != []
701 ):
702 artist_id = relationships["catalog"]["data"][0]["id"]
703 attributes = relationships["catalog"]["data"][0]["attributes"]
704 elif "attributes" in artist_obj:
705 artist_id = artist_obj["id"]
706 attributes = artist_obj["attributes"]
707 else:
708 artist_id = artist_obj["id"]
709 self.logger.debug("No attributes found for artist %s", artist_obj)
710 # No more details available other than the id, return an ItemMapping
711 return ItemMapping(
712 media_type=MediaType.ARTIST,
713 provider=self.instance_id,
714 item_id=artist_id,
715 name=artist_id,
716 )
717 artist = Artist(
718 item_id=artist_id,
719 name=attributes.get("name"),
720 provider=self.domain,
721 provider_mappings={
722 ProviderMapping(
723 item_id=artist_id,
724 provider_domain=self.domain,
725 provider_instance=self.instance_id,
726 url=attributes.get("url"),
727 )
728 },
729 )
730 if artwork := attributes.get("artwork"):
731 artist.metadata.add_image(
732 MediaItemImage(
733 provider=self.instance_id,
734 type=ImageType.THUMB,
735 path=artwork["url"].format(
736 w=min(artwork["width"], MAX_ARTWORK_DIMENSION),
737 h=min(artwork["height"], MAX_ARTWORK_DIMENSION),
738 ),
739 remotely_accessible=True,
740 )
741 )
742 if genres := attributes.get("genreNames"):
743 artist.metadata.genres = set(genres)
744 if notes := attributes.get("editorialNotes"):
745 artist.metadata.description = notes.get("standard") or notes.get("short")
746 return artist
747
748 def _parse_album(
749 self, album_obj: dict, is_favourite: bool | None = None
750 ) -> Album | ItemMapping | None:
751 """Parse album object to generic layout."""
752 relationships = album_obj.get("relationships", {})
753 response_type = album_obj.get("type")
754 if (
755 response_type == "library-albums"
756 and relationships["catalog"]["data"] != []
757 and "attributes" in relationships["catalog"]["data"][0]
758 ):
759 album_id = relationships.get("catalog", {})["data"][0]["id"]
760 attributes = relationships.get("catalog", {})["data"][0]["attributes"]
761 elif "attributes" in album_obj:
762 album_id = album_obj["id"]
763 attributes = album_obj["attributes"]
764 else:
765 album_id = album_obj["id"]
766 # No more details available other than the id, return an ItemMapping
767 return ItemMapping(
768 media_type=MediaType.ALBUM,
769 provider=self.instance_id,
770 item_id=album_id,
771 name=album_id,
772 )
773 is_available_in_catalog = attributes.get("url") is not None
774 if not is_available_in_catalog:
775 self.logger.debug(
776 "Skipping album %s. Album is not available in the Apple Music catalog.",
777 attributes.get("name"),
778 )
779 return None
780 name, version = parse_title_and_version(attributes["name"])
781 album = Album(
782 item_id=album_id,
783 provider=self.domain,
784 name=name,
785 version=version,
786 provider_mappings={
787 ProviderMapping(
788 item_id=album_id,
789 provider_domain=self.domain,
790 provider_instance=self.instance_id,
791 url=attributes.get("url"),
792 available=attributes.get("playParams", {}).get("id") is not None,
793 )
794 },
795 )
796 if artists := relationships.get("artists"):
797 album.artists = UniqueList([self._parse_artist(artist) for artist in artists["data"]])
798 elif artist_name := attributes.get("artistName"):
799 album.artists = UniqueList(
800 [
801 ItemMapping(
802 media_type=MediaType.ARTIST,
803 provider=self.instance_id,
804 item_id=artist_name,
805 name=artist_name,
806 )
807 ]
808 )
809 if release_date := attributes.get("releaseDate"):
810 album.year = int(release_date.split("-")[0])
811 if genres := attributes.get("genreNames"):
812 album.metadata.genres = set(genres)
813 if artwork := attributes.get("artwork"):
814 album.metadata.add_image(
815 MediaItemImage(
816 provider=self.instance_id,
817 type=ImageType.THUMB,
818 path=artwork["url"].format(
819 w=min(artwork["width"], MAX_ARTWORK_DIMENSION),
820 h=min(artwork["height"], MAX_ARTWORK_DIMENSION),
821 ),
822 remotely_accessible=True,
823 )
824 )
825 if album_copyright := attributes.get("copyright"):
826 album.metadata.copyright = album_copyright
827 if record_label := attributes.get("recordLabel"):
828 album.metadata.label = record_label
829 if upc := attributes.get("upc"):
830 album.external_ids.add((ExternalID.BARCODE, "0" + upc))
831 if notes := attributes.get("editorialNotes"):
832 album.metadata.description = notes.get("standard") or notes.get("short")
833 if content_rating := attributes.get("contentRating"):
834 album.metadata.explicit = content_rating == "explicit"
835 album_type = AlbumType.ALBUM
836 if attributes.get("isSingle"):
837 album_type = AlbumType.SINGLE
838 elif attributes.get("isCompilation"):
839 album_type = AlbumType.COMPILATION
840 album.album_type = album_type
841
842 # Try inference - override if it finds something more specific
843 # Apple Music doesn't seem to have version field
844 inferred_type = infer_album_type(album.name, "")
845 if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
846 album.album_type = inferred_type
847 album.favorite = is_favourite or False
848 return album
849
850 def _parse_track(
851 self,
852 track_obj: dict[str, Any],
853 is_favourite: bool | None = None,
854 ) -> Track:
855 """Parse track object to generic layout."""
856 relationships = track_obj.get("relationships", {})
857 if (
858 track_obj.get("type") == "library-songs"
859 and relationships.get("catalog", {}).get("data", []) != []
860 ):
861 # Library track with catalog version available
862 track_id = relationships.get("catalog", {})["data"][0]["id"]
863 attributes = relationships.get("catalog", {})["data"][0]["attributes"]
864 elif "attributes" in track_obj:
865 # Catalog track or library-only track
866 track_id = track_obj["id"]
867 attributes = track_obj["attributes"]
868 else:
869 track_id = track_obj["id"]
870 attributes = {}
871 name, version = parse_title_and_version(attributes.get("name", ""))
872 track = Track(
873 item_id=track_id,
874 provider=self.domain,
875 name=name,
876 version=version,
877 duration=attributes.get("durationInMillis", 0) / 1000,
878 provider_mappings={
879 ProviderMapping(
880 item_id=track_id,
881 provider_domain=self.domain,
882 provider_instance=self.instance_id,
883 audio_format=AudioFormat(content_type=ContentType.AAC),
884 url=attributes.get("url"),
885 available=attributes.get("playParams", {}).get("id") is not None,
886 )
887 },
888 )
889 if disc_number := attributes.get("discNumber"):
890 track.disc_number = disc_number
891 if track_number := attributes.get("trackNumber"):
892 track.track_number = track_number
893 # Prefer catalog information over library information for artists.
894 # For compilations it picks the wrong artists
895 if "artists" in relationships:
896 artists = relationships["artists"]
897 track.artists = [self._parse_artist(artist) for artist in artists["data"]]
898 # 'Similar tracks' do not provide full artist details
899 elif artist_name := attributes.get("artistName"):
900 track.artists = [
901 ItemMapping(
902 media_type=MediaType.ARTIST,
903 item_id=artist_name,
904 provider=self.instance_id,
905 name=artist_name,
906 )
907 ]
908 if albums := relationships.get("albums"):
909 if "data" in albums and len(albums["data"]) > 0:
910 track.album = self._parse_album(albums["data"][0])
911 if artwork := attributes.get("artwork"):
912 track.metadata.add_image(
913 MediaItemImage(
914 provider=self.instance_id,
915 type=ImageType.THUMB,
916 path=artwork["url"].format(
917 w=min(artwork["width"], MAX_ARTWORK_DIMENSION),
918 h=min(artwork["height"], MAX_ARTWORK_DIMENSION),
919 ),
920 remotely_accessible=True,
921 )
922 )
923 if genres := attributes.get("genreNames"):
924 track.metadata.genres = set(genres)
925 if composers := attributes.get("composerName"):
926 track.metadata.performers = set(composers.split(", "))
927 if isrc := attributes.get("isrc"):
928 track.external_ids.add((ExternalID.ISRC, isrc))
929 track.favorite = is_favourite or False
930 return track
931
932 def _parse_playlist(
933 self, playlist_obj: dict[str, Any], is_favourite: bool | None = None
934 ) -> Playlist:
935 """Parse Apple Music playlist object to generic layout."""
936 attributes = playlist_obj["attributes"]
937 playlist_id = attributes["playParams"].get("globalId") or playlist_obj["id"]
938 is_editable = attributes.get("canEdit", False)
939 playlist = Playlist(
940 item_id=playlist_id,
941 provider=self.instance_id,
942 name=attributes.get("name", UNKNOWN_PLAYLIST_NAME),
943 owner=attributes.get("curatorName", "me"),
944 provider_mappings={
945 ProviderMapping(
946 item_id=playlist_id,
947 provider_domain=self.domain,
948 provider_instance=self.instance_id,
949 url=attributes.get("url"),
950 is_unique=is_editable, # user-owned playlists are unique
951 )
952 },
953 is_editable=is_editable,
954 )
955 if artwork := attributes.get("artwork"):
956 url = artwork["url"]
957 if artwork["width"] and artwork["height"]:
958 url = url.format(
959 w=min(artwork["width"], MAX_ARTWORK_DIMENSION),
960 h=min(artwork["height"], MAX_ARTWORK_DIMENSION),
961 )
962 playlist.metadata.add_image(
963 MediaItemImage(
964 provider=self.instance_id,
965 type=ImageType.THUMB,
966 path=url,
967 remotely_accessible=True,
968 )
969 )
970 if description := attributes.get("description"):
971 playlist.metadata.description = description.get("standard")
972 playlist.favorite = is_favourite or False
973 return playlist
974
975 async def _get_all_items(self, endpoint, key="data", **kwargs) -> list[dict]:
976 """Get all items from a paged list."""
977 limit = 50
978 offset = 0
979 all_items = []
980 while True:
981 kwargs["limit"] = limit
982 kwargs["offset"] = offset
983 result = await self._get_data(endpoint, **kwargs)
984 if key not in result:
985 break
986 all_items += result[key]
987 if not result.get("next"):
988 break
989 offset += limit
990 return all_items
991
992 @throttle_with_retries
993 async def _get_data(self, endpoint, **kwargs) -> dict[str, Any]:
994 """Get data from api."""
995 url = f"https://api.music.apple.com/v1/{endpoint}"
996 headers = {"Authorization": f"Bearer {self._music_app_token}"}
997 headers["Music-User-Token"] = self._music_user_token
998 async with (
999 self.mass.http_session.get(
1000 url, headers=headers, params=kwargs, ssl=True, timeout=120
1001 ) as response,
1002 ):
1003 if response.status == 404 and "limit" in kwargs and "offset" in kwargs:
1004 return {}
1005 # Convert HTTP errors to exceptions
1006 if response.status == 404:
1007 raise MediaNotFoundError(f"{endpoint} not found")
1008 if response.status == 504:
1009 # See if we can get more info from the response on occasional timeouts
1010 self.logger.debug(
1011 "Apple Music API Timeout: url=%s, params=%s, response_headers=%s",
1012 url,
1013 kwargs,
1014 response.headers,
1015 )
1016 raise ResourceTemporarilyUnavailable("Apple Music API Timeout")
1017 if response.status == 429:
1018 # Debug this for now to see if the response headers give us info about the
1019 # backoff time. There is no documentation on this.
1020 self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1021 raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1022 if response.status == 500:
1023 raise MusicAssistantError("Unexpected server error when calling Apple Music")
1024 response.raise_for_status()
1025 return await response.json(loads=json_loads)
1026
1027 @throttle_with_retries
1028 async def _delete_data(self, endpoint, data=None, **kwargs) -> None:
1029 """Delete data from api."""
1030 url = f"https://api.music.apple.com/v1/{endpoint}"
1031 headers = {"Authorization": f"Bearer {self._music_app_token}"}
1032 headers["Music-User-Token"] = self._music_user_token
1033 async with (
1034 self.mass.http_session.delete(
1035 url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1036 ) as response,
1037 ):
1038 # Convert HTTP errors to exceptions
1039 if response.status == 404:
1040 raise MediaNotFoundError(f"{endpoint} not found")
1041 if response.status == 429:
1042 # Debug this for now to see if the response headers give us info about the
1043 # backoff time. There is no documentation on this.
1044 self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1045 raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1046 response.raise_for_status()
1047
1048 async def _put_data(self, endpoint, data=None, **kwargs) -> str:
1049 """Put data on api."""
1050 url = f"https://api.music.apple.com/v1/{endpoint}"
1051 headers = {"Authorization": f"Bearer {self._music_app_token}"}
1052 headers["Music-User-Token"] = self._music_user_token
1053 async with (
1054 self.mass.http_session.put(
1055 url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1056 ) as response,
1057 ):
1058 # Convert HTTP errors to exceptions
1059 if response.status == 404:
1060 raise MediaNotFoundError(f"{endpoint} not found")
1061 if response.status == 429:
1062 # Debug this for now to see if the response headers give us info about the
1063 # backoff time. There is no documentation on this.
1064 self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1065 raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1066 response.raise_for_status()
1067 if response.content_length:
1068 return await response.json(loads=json_loads)
1069 return {}
1070
1071 @throttle_with_retries
1072 async def _post_data(self, endpoint, data=None, **kwargs) -> str:
1073 """Post data on api."""
1074 url = f"https://api.music.apple.com/v1/{endpoint}"
1075 headers = {"Authorization": f"Bearer {self._music_app_token}"}
1076 headers["Music-User-Token"] = self._music_user_token
1077 async with (
1078 self.mass.http_session.post(
1079 url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1080 ) as response,
1081 ):
1082 # Convert HTTP errors to exceptions
1083 if response.status == 404:
1084 raise MediaNotFoundError(f"{endpoint} not found")
1085 if response.status == 429:
1086 # Debug this for now to see if the response headers give us info about the
1087 # backoff time. There is no documentation on this.
1088 self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1089 raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1090 response.raise_for_status()
1091 return await response.json(loads=json_loads)
1092
1093 async def _get_user_storefront(self) -> str:
1094 """Get the user's storefront."""
1095 locale = self.mass.metadata.locale.replace("_", "-")
1096 language = locale.split("-")[0]
1097 result = await self._get_data("me/storefront", l=language)
1098 return result["data"][0]["id"]
1099
1100 async def _get_ratings(self, item_ids: list[str], media_type: MediaType) -> dict[str, bool]:
1101 """Get ratings (aka favorites) for a list of item ids."""
1102 if media_type == MediaType.ARTIST:
1103 raise NotImplementedError(
1104 "Ratings are not available for artist in the Apple Music API."
1105 )
1106 if len(item_ids) == 0:
1107 return {}
1108 apple_type = self._translate_media_type_to_apple_type(media_type)
1109 endpoint = apple_type if not self.is_library_id(item_ids[0]) else f"library-{apple_type}"
1110 # Apple Music limits to 200 ids per request
1111 max_ids_per_request = 200
1112 results = {}
1113 for i in range(0, len(item_ids), max_ids_per_request):
1114 batch_ids = item_ids[i : i + max_ids_per_request]
1115 response = await self._get_data(
1116 f"me/ratings/{endpoint}",
1117 ids=",".join(batch_ids),
1118 )
1119 results.update(
1120 {
1121 item["id"]: bool(item["attributes"].get("value", False) == 1)
1122 for item in response.get("data", [])
1123 }
1124 )
1125 return results
1126
1127 def _translate_media_type_to_apple_type(self, media_type: MediaType) -> str:
1128 """Translate MediaType to Apple Music endpoint string."""
1129 match media_type:
1130 case MediaType.ARTIST:
1131 return "artists"
1132 case MediaType.ALBUM:
1133 return "albums"
1134 case MediaType.TRACK:
1135 return "songs"
1136 case MediaType.PLAYLIST:
1137 return "playlists"
1138 raise MusicAssistantError(f"Unsupported media type: {media_type}")
1139
1140 def is_library_id(self, library_id) -> bool:
1141 """Check a library ID matches known format."""
1142 if not isinstance(library_id, str):
1143 return False
1144 valid = re.findall(r"^(?:[ailp]\.)[a-zA-Z0-9]+$", library_id)
1145 return bool(valid)
1146
1147 def _is_catalog_id(self, catalog_id: str) -> bool:
1148 """Check if input is a catalog id, or a library id."""
1149 return catalog_id.isnumeric() or catalog_id.startswith("pl.")
1150
1151 async def _fetch_song_stream_metadata(self, song_id: str) -> str:
1152 """Get the stream URL for a song from Apple Music."""
1153 playback_url = "https://play.music.apple.com/WebObjects/MZPlay.woa/wa/webPlayback"
1154 data = {}
1155 self.logger.debug("_fetch_song_stream_metadata: Check if Library ID: %s", song_id)
1156 if self.is_library_id(song_id):
1157 data["universalLibraryId"] = song_id
1158 data["isLibrary"] = True
1159 else:
1160 data["salableAdamId"] = song_id
1161 for retry in (True, False):
1162 try:
1163 async with self.mass.http_session.post(
1164 playback_url, headers=self._get_decryption_headers(), json=data, ssl=True
1165 ) as response:
1166 response.raise_for_status()
1167 content = await response.json(loads=json_loads)
1168 if content.get("failureType"):
1169 message = content.get("failureMessage")
1170 raise MediaNotFoundError(f"Failed to get song stream metadata: {message}")
1171 return content["songList"][0]
1172 except (MediaNotFoundError, ClientError) as exc:
1173 if retry:
1174 self.logger.warning("Failed to get song stream metadata: %s", exc)
1175 continue
1176 raise
1177 raise MediaNotFoundError(f"Failed to get song stream metadata for {song_id}")
1178
1179 async def _parse_stream_url_and_uri(self, stream_assets: list[dict]) -> str:
1180 """Parse the Stream URL and Key URI from the song."""
1181 ctrp256_urls = [asset["URL"] for asset in stream_assets if asset["flavor"] == "28:ctrp256"]
1182 if len(ctrp256_urls) == 0:
1183 raise MediaNotFoundError("No ctrp256 URL found for song.")
1184 playlist_url = ctrp256_urls[0]
1185 playlist_items = await fetch_playlist(self.mass, ctrp256_urls[0], raise_on_hls=False)
1186 # Apple returns a HLS (substream) playlist but instead of chunks,
1187 # each item is just the whole file. So we simply grab the first playlist item.
1188 playlist_item = playlist_items[0]
1189 # path is relative, stitch it together
1190 base_path = playlist_url.rsplit("/", 1)[0]
1191 track_url = base_path + "/" + playlist_items[0].path
1192 key = playlist_item.key
1193 return (track_url, key)
1194
1195 def _get_decryption_headers(self):
1196 """Get headers for decryption requests."""
1197 return {
1198 "authorization": f"Bearer {self._music_app_token}",
1199 "media-user-token": self._music_user_token,
1200 "connection": "keep-alive",
1201 "accept": "application/json",
1202 "origin": "https://music.apple.com",
1203 "referer": "https://music.apple.com/",
1204 "accept-encoding": "gzip, deflate, br",
1205 "content-type": "application/json;charset=utf-8",
1206 "user-agent": (
1207 "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
1208 " Chrome/110.0.0.0 Safari/537.36"
1209 ),
1210 }
1211
1212 async def _get_decryption_key(
1213 self, license_url: str, key_id: bytes, uri: str, item_id: str
1214 ) -> str:
1215 """Get the decryption key for a song."""
1216 if decryption_key := await self.mass.cache.get(
1217 key=item_id,
1218 provider=self.instance_id,
1219 category=CACHE_CATEGORY_DECRYPT_KEY,
1220 checksum=self._session_id,
1221 ):
1222 self.logger.debug("Decryption key for %s found in cache.", item_id)
1223 return decryption_key
1224 pssh = self._get_pssh(key_id)
1225 device = Device(
1226 client_id=self._decrypt_client_id,
1227 private_key=self._decrypt_private_key,
1228 type_=DeviceTypes.ANDROID,
1229 security_level=3,
1230 flags={},
1231 )
1232 cdm = Cdm.from_device(device)
1233 session_id = cdm.open()
1234 challenge = cdm.get_license_challenge(session_id, pssh)
1235 track_license = await self._get_license(challenge, license_url, uri, item_id)
1236 cdm.parse_license(session_id, track_license)
1237 key = next(key for key in cdm.get_keys(session_id) if key.type == "CONTENT")
1238 if not key:
1239 raise MediaNotFoundError("Unable to get decryption key for song %s.", item_id)
1240 cdm.close(session_id)
1241 decryption_key = key.key.hex()
1242 self.mass.create_task(
1243 self.mass.cache.set(
1244 key=item_id,
1245 data=decryption_key,
1246 expiration=3600,
1247 provider=self.instance_id,
1248 category=CACHE_CATEGORY_DECRYPT_KEY,
1249 checksum=self._session_id,
1250 )
1251 )
1252 return decryption_key
1253
1254 def _get_pssh(self, key_id: bytes) -> PSSH:
1255 """Get the PSSH for a song."""
1256 pssh_data = WidevinePsshData()
1257 pssh_data.algorithm = 1
1258 pssh_data.key_ids.append(key_id)
1259 init_data = base64.b64encode(pssh_data.SerializeToString()).decode("utf-8")
1260 return PSSH.new(system_id=PSSH.SystemId.Widevine, init_data=init_data)
1261
1262 async def _get_license(self, challenge: bytes, license_url: str, uri: str, item_id: str) -> str:
1263 """Get the license for a song based on the challenge."""
1264 challenge_b64 = base64.b64encode(challenge).decode("utf-8")
1265 data = {
1266 "challenge": challenge_b64,
1267 "key-system": "com.widevine.alpha",
1268 "uri": uri,
1269 "adamId": item_id,
1270 "isLibrary": False,
1271 "user-initiated": True,
1272 }
1273 async with self.mass.http_session.post(
1274 license_url, data=json.dumps(data), headers=self._get_decryption_headers(), ssl=False
1275 ) as response:
1276 response.raise_for_status()
1277 content = await response.json(loads=json_loads)
1278 track_license = content.get("license")
1279 if not track_license:
1280 raise MediaNotFoundError("No license found for song %s.", item_id)
1281 return track_license
1282