/
/
/
1"""
2Apple Music musicprovider support for MusicAssistant.
3
4TODO MUSIC_APP_TOKEN expires after 6 months so should have a distribution mechanism outside
5 compulsory application updates. It is only a semi-private key in JWT format so code be refreshed
6 daily by a GitHub action and downloaded by the provider each initialise.
7TODO Widevine keys can be obtained dynamically from Apple Music API rather than copied into Docker
8 build. This is undocumented but @maxlyth has a working example.
9TODO MUSIC_USER_TOKEN must be refreshed (~min 180 days) and needs mechanism to prompt user to
10 re-authenticate in browser.
11TODO Current provider ignores private tracks that are not available in the storefront catalog as
12 streamable url is derived from the catalog id. It is undecumented but @maxlyth has a working
13 example to get a streamable url from the library id.
14"""
15
16from __future__ import annotations
17
18import base64
19import json
20import os
21import pathlib
22import re
23import time
24from collections.abc import Sequence
25from typing import TYPE_CHECKING, Any
26
27import aiofiles
28from aiohttp import web
29from aiohttp.client_exceptions import ClientError
30from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
31from music_assistant_models.enums import (
32 AlbumType,
33 ConfigEntryType,
34 ContentType,
35 ExternalID,
36 ImageType,
37 MediaType,
38 ProviderFeature,
39 StreamType,
40)
41from music_assistant_models.errors import (
42 LoginFailed,
43 MediaNotFoundError,
44 MusicAssistantError,
45 ResourceTemporarilyUnavailable,
46)
47from music_assistant_models.media_items import (
48 Album,
49 Artist,
50 AudioFormat,
51 BrowseFolder,
52 ItemMapping,
53 MediaItemImage,
54 MediaItemType,
55 Playlist,
56 ProviderMapping,
57 SearchResults,
58 Track,
59 UniqueList,
60)
61from music_assistant_models.streamdetails import StreamDetails
62from pywidevine import PSSH, Cdm, Device, DeviceTypes
63from pywidevine.license_protocol_pb2 import WidevinePsshData
64from shortuuid import uuid
65
66from music_assistant.controllers.cache import use_cache
67from music_assistant.helpers.app_vars import app_var
68from music_assistant.helpers.auth import AuthenticationHelper
69from music_assistant.helpers.json import json_loads
70from music_assistant.helpers.playlists import fetch_playlist
71from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
72from music_assistant.helpers.util import infer_album_type, parse_title_and_version
73from music_assistant.models.music_provider import MusicProvider
74from music_assistant.providers.apple_music.helpers import browse_playlists
75
76if TYPE_CHECKING:
77 from collections.abc import AsyncGenerator
78
79 from music_assistant_models.config_entries import ProviderConfig
80 from music_assistant_models.provider import ProviderManifest
81
82 from music_assistant import MusicAssistant
83 from music_assistant.models import ProviderInstanceType
84
85
86SUPPORTED_FEATURES = {
87 ProviderFeature.LIBRARY_ARTISTS,
88 ProviderFeature.LIBRARY_ALBUMS,
89 ProviderFeature.LIBRARY_TRACKS,
90 ProviderFeature.LIBRARY_PLAYLISTS,
91 ProviderFeature.BROWSE,
92 ProviderFeature.SEARCH,
93 ProviderFeature.ARTIST_ALBUMS,
94 ProviderFeature.ARTIST_TOPTRACKS,
95 ProviderFeature.SIMILAR_TRACKS,
96 ProviderFeature.LIBRARY_ALBUMS_EDIT,
97 ProviderFeature.LIBRARY_ARTISTS_EDIT,
98 ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
99 ProviderFeature.LIBRARY_TRACKS_EDIT,
100 ProviderFeature.FAVORITE_ALBUMS_EDIT,
101 ProviderFeature.FAVORITE_TRACKS_EDIT,
102 ProviderFeature.FAVORITE_PLAYLISTS_EDIT,
103}
104
105MUSIC_APP_TOKEN = app_var(8)
106WIDEVINE_BASE_PATH = "/usr/local/bin/widevine_cdm"
107DECRYPT_CLIENT_ID_FILENAME = "client_id.bin"
108DECRYPT_PRIVATE_KEY_FILENAME = "private_key.pem"
109UNKNOWN_PLAYLIST_NAME = "Unknown Apple Music Playlist"
110CONF_MUSIC_APP_TOKEN = "music_app_token"
111CONF_MUSIC_USER_TOKEN = "music_user_token"
112CONF_MUSIC_USER_MANUAL_TOKEN = "music_user_manual_token"
113CONF_MUSIC_USER_TOKEN_TIMESTAMP = "music_user_token_timestamp"
114CACHE_CATEGORY_DECRYPT_KEY = 1
115
116
117async def setup(
118 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
119) -> ProviderInstanceType:
120 """Initialize provider(instance) with given configuration."""
121 return AppleMusicProvider(mass, manifest, config, SUPPORTED_FEATURES)
122
123
124async def get_config_entries(
125 mass: MusicAssistant,
126 instance_id: str | None = None,
127 action: str | None = None,
128 values: dict[str, ConfigValueType] | None = None,
129) -> tuple[ConfigEntry, ...]:
130 """
131 Return Config entries to setup this provider.
132
133 instance_id: id of an existing provider instance (None if new instance setup).
134 action: [optional] action key called from config entries UI.
135 values: the (intermediate) raw values for config entries sent with the action.
136 """
137
138 def validate_user_token(token):
139 if not isinstance(token, str):
140 return False
141 valid = re.findall(r"[a-zA-Z0-9=/+]{32,}==$", token)
142 return bool(valid)
143
144 # Check for valid app token (1st with regex and then API check) otherwise display a config field
145 default_app_token_valid = False
146 async with (
147 mass.http_session.get(
148 "https://api.music.apple.com/v1/test",
149 headers={"Authorization": f"Bearer {MUSIC_APP_TOKEN}"},
150 ssl=True,
151 timeout=10,
152 ) as response,
153 ):
154 if response.status == 200:
155 values[CONF_MUSIC_APP_TOKEN] = f"{MUSIC_APP_TOKEN}"
156 default_app_token_valid = True
157
158 # Action is to launch MusicKit flow
159 if action == "CONF_ACTION_AUTH" and default_app_token_valid:
160 callback_method = "POST"
161 async with AuthenticationHelper(mass, values["session_id"], callback_method) as auth_helper:
162 callback_url = auth_helper.callback_url
163 flow_base_path = f"apple_music_auth/{values['session_id']}/"
164 flow_timeout = 600
165 parent_file_path = pathlib.Path(__file__).parent.resolve()
166 base_url = f"{mass.webserver.base_url}/{flow_base_path}"
167 flow_base_url = f"{base_url}index.html"
168
169 async def serve_mk_auth_page(request: web.Request) -> web.Response:
170 auth_html_path = parent_file_path.joinpath("musickit_auth/musickit_wrapper.html")
171 return web.FileResponse(
172 auth_html_path,
173 headers={"content-type": "text/html"},
174 )
175
176 async def serve_mk_auth_css(request: web.Request) -> web.Response:
177 auth_css_path = parent_file_path.joinpath("musickit_auth/musickit_wrapper.css")
178 return web.FileResponse(
179 auth_css_path,
180 headers={
181 "content-type": "text/css",
182 },
183 )
184
185 async def serve_mk_glue(request: web.Request) -> web.Response:
186 return_html = f"""
187 const return_url='{callback_url}';
188 const base_url='{base_url}';
189 const app_token='{values[CONF_MUSIC_APP_TOKEN]}';
190 const callback_method='{callback_method}';
191 const user_token='{
192 values[CONF_MUSIC_USER_TOKEN]
193 if validate_user_token(values[CONF_MUSIC_USER_TOKEN])
194 else ""
195 }';
196 const user_token_timestamp='{values[CONF_MUSIC_USER_TOKEN_TIMESTAMP]}';
197 const flow_timeout={max([flow_timeout - 10, 60])};
198 const flow_start_time={int(time.time())};
199 const mass_version='{mass.version}';
200 """
201 return web.Response(
202 body=return_html,
203 headers={
204 "content-type": "text/javascript",
205 },
206 )
207
208 mass.webserver.register_dynamic_route(
209 f"/{flow_base_path}index.html", serve_mk_auth_page
210 )
211 mass.webserver.register_dynamic_route(f"/{flow_base_path}index.css", serve_mk_auth_css)
212 mass.webserver.register_dynamic_route(f"/{flow_base_path}index.js", serve_mk_glue)
213
214 try:
215 result = await auth_helper.authenticate(flow_base_url, flow_timeout)
216 values[CONF_MUSIC_USER_TOKEN] = result["music-user-token"]
217 values[CONF_MUSIC_USER_TOKEN_TIMESTAMP] = result["music-user-token-timestamp"]
218 except KeyError:
219 # no music-user-token URL param was found so likely user cancelled the auth
220 pass
221 except Exception as error:
222 raise LoginFailed(f"Failed to authenticate with Apple '{error}'.")
223 finally:
224 mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.html")
225 mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.css")
226 mass.webserver.unregister_dynamic_route(f"/{flow_base_path}index.js")
227
228 # ruff: noqa: ARG001
229 return (
230 ConfigEntry(
231 key=CONF_MUSIC_APP_TOKEN,
232 type=ConfigEntryType.SECURE_STRING,
233 label="MusicKit App Token",
234 hidden=default_app_token_valid,
235 required=True,
236 value=values.get(CONF_MUSIC_APP_TOKEN) if values else None,
237 ),
238 ConfigEntry(
239 key=CONF_MUSIC_USER_TOKEN,
240 type=ConfigEntryType.SECURE_STRING,
241 label="Music User Token",
242 required=False,
243 action="CONF_ACTION_AUTH",
244 description="Authenticate with Apple Music to retrieve a valid music user token.",
245 action_label="Authenticate with Apple Music",
246 value=values.get(CONF_MUSIC_USER_TOKEN)
247 if (
248 values
249 and isinstance(values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP), int)
250 and (
251 values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP) > (time.time() - (3600 * 24 * 150))
252 )
253 )
254 else None,
255 ),
256 ConfigEntry(
257 key=CONF_MUSIC_USER_MANUAL_TOKEN,
258 type=ConfigEntryType.SECURE_STRING,
259 label="Manual Music User Token",
260 required=False,
261 advanced=True,
262 description=(
263 "Authenticate with a manual Music User Token in case the Authentication flow"
264 " is unsupported (e.g. when using child accounts)."
265 ),
266 help_link="https://www.music-assistant.io/music-providers/apple-music/",
267 value=values.get(CONF_MUSIC_USER_MANUAL_TOKEN),
268 ),
269 ConfigEntry(
270 key=CONF_MUSIC_USER_TOKEN_TIMESTAMP,
271 type=ConfigEntryType.INTEGER,
272 description="Timestamp music user token was updated.",
273 label="Music User Token Timestamp",
274 hidden=True,
275 required=True,
276 default_value=0,
277 value=values.get(CONF_MUSIC_USER_TOKEN_TIMESTAMP) if values else 0,
278 ),
279 )
280
281
282class AppleMusicProvider(MusicProvider):
283 """Implementation of an Apple Music MusicProvider."""
284
285 _music_user_token: str | None = None
286 _music_app_token: str | None = None
287 _storefront: str | None = None
288 _decrypt_client_id: bytes | None = None
289 _decrypt_private_key: bytes | None = None
290 _session_id: str | None = None
291 # rate limiter needs to be specified on provider-level,
292 # so make it an instance attribute
293 throttler = ThrottlerManager(rate_limit=1, period=2, initial_backoff=15)
294
295 async def handle_async_init(self) -> None:
296 """Handle async initialization of the provider."""
297 self._music_user_token = self.config.get_value(
298 CONF_MUSIC_USER_MANUAL_TOKEN
299 ) or self.config.get_value(CONF_MUSIC_USER_TOKEN)
300 self._music_app_token = self.config.get_value(CONF_MUSIC_APP_TOKEN)
301 self._storefront = await self._get_user_storefront()
302 # create random session id to use for decryption keys
303 # to invalidate cached keys on each provider initialization
304 self._session_id = str(uuid())
305 async with aiofiles.open(
306 os.path.join(WIDEVINE_BASE_PATH, DECRYPT_CLIENT_ID_FILENAME), "rb"
307 ) as _file:
308 self._decrypt_client_id = await _file.read()
309 async with aiofiles.open(
310 os.path.join(WIDEVINE_BASE_PATH, DECRYPT_PRIVATE_KEY_FILENAME), "rb"
311 ) as _file:
312 self._decrypt_private_key = await _file.read()
313
314 @use_cache()
315 async def search(
316 self, search_query: str, media_types: list[MediaType] | None, limit: int = 5
317 ) -> SearchResults:
318 """Perform search on musicprovider.
319
320 :param search_query: Search query.
321 :param media_types: A list of media_types to include. All types if None.
322 :param limit: Number of items to return in the search (per type).
323 """
324 endpoint = f"catalog/{self._storefront}/search"
325 # Apple music has a limit of 25 items for the search endpoint
326 limit = min(limit, 25)
327 searchresult = SearchResults()
328 searchtypes = []
329 if MediaType.ARTIST in media_types:
330 searchtypes.append("artists")
331 if MediaType.ALBUM in media_types:
332 searchtypes.append("albums")
333 if MediaType.TRACK in media_types:
334 searchtypes.append("songs")
335 if MediaType.PLAYLIST in media_types:
336 searchtypes.append("playlists")
337 if not searchtypes:
338 return searchresult
339 searchtype = ",".join(searchtypes)
340 search_query = search_query.replace("'", "")
341 response = await self._get_data(endpoint, term=search_query, types=searchtype, limit=limit)
342 if "artists" in response["results"]:
343 searchresult.artists += [
344 self._parse_artist(item) for item in response["results"]["artists"]["data"]
345 ]
346 if "albums" in response["results"]:
347 searchresult.albums += [
348 self._parse_album(item) for item in response["results"]["albums"]["data"]
349 ]
350 if "songs" in response["results"]:
351 searchresult.tracks += [
352 self._parse_track(item) for item in response["results"]["songs"]["data"]
353 ]
354 if "playlists" in response["results"]:
355 searchresult.playlists += [
356 self._parse_playlist(item) for item in response["results"]["playlists"]["data"]
357 ]
358 return searchresult
359
360 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
361 """Browse Apple Music with support for playlist folders."""
362 if not path or "://" not in path:
363 return await super().browse(path)
364 sub_path = path.split("://", 1)[1]
365 path_parts = [part for part in sub_path.split("/") if part]
366 if path_parts and path_parts[0] == "playlists":
367 return await browse_playlists(self, path, path_parts)
368 return await super().browse(path)
369
370 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
371 """Retrieve library artists from the provider."""
372 endpoint = "me/library/artists"
373 for item in await self._get_all_items(endpoint, include="catalog", extend="editorialNotes"):
374 if item and item["id"]:
375 yield self._parse_artist(item)
376
377 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
378 """Retrieve library albums from the provider."""
379 endpoint = "me/library/albums"
380 album_items = await self._get_all_items(
381 endpoint, include="catalog,artists", extend="editorialNotes"
382 )
383 album_catalog_item_ids = [
384 item["id"]
385 for item in album_items
386 if item and item["id"] and not self.is_library_id(item["id"])
387 ]
388 album_library_item_ids = [
389 item["id"]
390 for item in album_items
391 if item and item["id"] and self.is_library_id(item["id"])
392 ]
393 rating_catalog_response = await self._get_ratings(album_catalog_item_ids, MediaType.ALBUM)
394 rating_library_response = await self._get_ratings(album_library_item_ids, MediaType.ALBUM)
395 for item in album_items:
396 if item and item["id"]:
397 is_favourite = (
398 rating_catalog_response.get(item["id"])
399 if not self.is_library_id(item["id"])
400 else rating_library_response.get(item["id"])
401 )
402 album = self._parse_album(item, is_favourite)
403 if album:
404 yield album
405
406 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
407 """Retrieve library tracks from the provider."""
408 endpoint = "me/library/songs"
409 song_catalog_ids = []
410 library_only_tracks = []
411 for item in await self._get_all_items(endpoint):
412 catalog_id = item.get("attributes", {}).get("playParams", {}).get("catalogId")
413 if not catalog_id:
414 # Track is library-only (private/uploaded), use library ID instead
415 library_only_tracks.append(item)
416 else:
417 song_catalog_ids.append(catalog_id)
418 # Obtain catalog info per 150 songs, the documented limit of 300 results in a 504 timeout
419 max_limit = 150
420 for i in range(0, len(song_catalog_ids), max_limit):
421 catalog_ids = song_catalog_ids[i : i + max_limit]
422 catalog_endpoint = f"catalog/{self._storefront}/songs"
423 response = await self._get_data(
424 catalog_endpoint, ids=",".join(catalog_ids), include="artists,albums"
425 )
426 # Fetch ratings for this batch
427 rating_response = await self._get_ratings(catalog_ids, MediaType.TRACK)
428 for item in response["data"]:
429 is_favourite = rating_response.get(item["id"])
430 track = self._parse_track(item, is_favourite)
431 yield track
432 # Yield library-only tracks using their library metadata
433 library_ids = [item["id"] for item in library_only_tracks if item and item["id"]]
434 library_rating_response = await self._get_ratings(library_ids, MediaType.TRACK)
435 for item in library_only_tracks:
436 is_favourite = library_rating_response.get(item["id"])
437 yield self._parse_track(item, is_favourite)
438
439 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
440 """Retrieve playlists from the provider."""
441 endpoint = "me/library/playlists"
442 playlist_items = await self._get_all_items(endpoint)
443 playlist_library_item_ids = [
444 item["id"]
445 for item in playlist_items
446 if item and item["id"] and self.is_library_id(item["id"])
447 ]
448 rating_library_response = await self._get_ratings(
449 playlist_library_item_ids, MediaType.PLAYLIST
450 )
451 for item in playlist_items:
452 is_favourite = rating_library_response.get(item["id"])
453 # Prefer catalog information over library information in case of public playlists
454 if item["attributes"]["hasCatalog"]:
455 yield await self.get_playlist(
456 item["attributes"]["playParams"]["globalId"], is_favourite
457 )
458 elif item and item["id"]:
459 yield self._parse_playlist(item, is_favourite)
460
461 @use_cache()
462 async def get_artist(self, prov_artist_id) -> Artist:
463 """Get full artist details by id."""
464 endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}"
465 response = await self._get_data(endpoint, extend="editorialNotes")
466 return self._parse_artist(response["data"][0])
467
468 @use_cache()
469 async def get_album(self, prov_album_id) -> Album:
470 """Get full album details by id."""
471 endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}"
472 response = await self._get_data(endpoint, include="artists")
473 rating_response = await self._get_ratings([prov_album_id], MediaType.ALBUM)
474 is_favourite = rating_response.get(prov_album_id)
475 return self._parse_album(response["data"][0], is_favourite)
476
477 @use_cache()
478 async def get_track(self, prov_track_id) -> Track:
479 """Get full track details by id."""
480 endpoint = f"catalog/{self._storefront}/songs/{prov_track_id}"
481 response = await self._get_data(endpoint, include="artists,albums")
482 rating_response = await self._get_ratings([prov_track_id], MediaType.TRACK)
483 is_favourite = rating_response.get(prov_track_id)
484 return self._parse_track(response["data"][0], is_favourite)
485
486 @use_cache()
487 async def get_playlist(self, prov_playlist_id, is_favourite: bool = False) -> Playlist:
488 """Get full playlist details by id."""
489 if not self.is_library_id(prov_playlist_id):
490 endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}"
491 else:
492 endpoint = f"me/library/playlists/{prov_playlist_id}"
493 response = await self._get_data(endpoint)
494 return self._parse_playlist(response["data"][0], is_favourite)
495
496 @use_cache()
497 async def get_album_tracks(self, prov_album_id) -> list[Track]:
498 """Get all album tracks for given album id."""
499 endpoint = f"catalog/{self._storefront}/albums/{prov_album_id}/tracks"
500 response = await self._get_data(endpoint, include="artists")
501 # Including albums results in a 504 error, so we need to fetch the album separately
502 album = await self.get_album(prov_album_id)
503 track_ids = [track_obj["id"] for track_obj in response["data"] if "id" in track_obj]
504 rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
505 tracks = []
506 for track_obj in response["data"]:
507 if "id" not in track_obj:
508 continue
509 track = self._parse_track(track_obj, rating_response.get(track_obj["id"]))
510 track.album = album
511 tracks.append(track)
512 return tracks
513
514 @use_cache(3600 * 3) # cache for 3 hours
515 async def get_playlist_tracks(self, prov_playlist_id, page: int = 0) -> list[Track]:
516 """Get all playlist tracks for given playlist id."""
517 if self._is_catalog_id(prov_playlist_id):
518 endpoint = f"catalog/{self._storefront}/playlists/{prov_playlist_id}/tracks"
519 else:
520 endpoint = f"me/library/playlists/{prov_playlist_id}/tracks"
521 result = []
522 page_size = 100
523 offset = page * page_size
524 response = await self._get_data(
525 endpoint, include="artists,catalog", limit=page_size, offset=offset
526 )
527 if not response or "data" not in response:
528 return result
529 playlist_track_ids = [track["id"] for track in response["data"] if track and track["id"]]
530 rating_response = await self._get_ratings(playlist_track_ids, MediaType.TRACK)
531 for index, track in enumerate(response["data"]):
532 if track and track["id"]:
533 is_favourite = rating_response.get(track["id"])
534 parsed_track = self._parse_track(track, is_favourite)
535 parsed_track.position = offset + index + 1
536 result.append(parsed_track)
537 return result
538
539 @use_cache(3600 * 24 * 7) # cache for 7 days
540 async def get_artist_albums(self, prov_artist_id) -> list[Album]:
541 """Get a list of all albums for the given artist."""
542 endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}/albums"
543 try:
544 response = await self._get_all_items(endpoint)
545 except MediaNotFoundError:
546 # Some artists do not have albums, return empty list
547 self.logger.info("No albums found for artist %s", prov_artist_id)
548 return []
549 album_ids = [album["id"] for album in response if album["id"]]
550 rating_response = await self._get_ratings(album_ids, MediaType.ALBUM)
551 albums = []
552 for album in response:
553 if not album["id"]:
554 continue
555 is_favourite = rating_response.get(album["id"])
556 parsed_album = self._parse_album(album, is_favourite)
557 if parsed_album:
558 albums.append(parsed_album)
559 return albums
560
561 @use_cache(3600 * 24 * 7) # cache for 7 days
562 async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
563 """Get a list of 10 most popular tracks for the given artist."""
564 endpoint = f"catalog/{self._storefront}/artists/{prov_artist_id}/view/top-songs"
565 try:
566 response = await self._get_data(endpoint)
567 except MediaNotFoundError:
568 # Some artists do not have top tracks, return empty list
569 self.logger.info("No top tracks found for artist %s", prov_artist_id)
570 return []
571 track_ids = [track["id"] for track in response["data"] if track["id"]]
572 rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
573 tracks = []
574 for track in response["data"]:
575 if not track["id"]:
576 continue
577 is_favourite = rating_response.get(track["id"])
578 tracks.append(self._parse_track(track, is_favourite))
579 return tracks
580
581 async def library_add(self, item: MediaItemType) -> None:
582 """Add item to library."""
583 item_type = self._translate_media_type_to_apple_type(item.media_type)
584 kwargs = {
585 f"ids[{item_type}]": item.item_id,
586 }
587 await self._post_data("me/library/", **kwargs)
588
589 async def library_remove(self, prov_item_id, media_type: MediaType) -> None:
590 """Remove item from library."""
591 self.logger.warning(
592 "Deleting items from your library is not yet supported by the Apple Music API. "
593 f"Skipping deletion of {media_type} - {prov_item_id}."
594 )
595
596 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]):
597 """Add track(s) to playlist."""
598 endpoint = f"me/library/playlists/{prov_playlist_id}/tracks"
599 data = {
600 "data": [
601 {
602 "id": track_id,
603 "type": "library-songs" if self.is_library_id(track_id) else "songs",
604 }
605 for track_id in prov_track_ids
606 ]
607 }
608 await self._post_data(endpoint, data=data)
609
610 async def remove_playlist_tracks(
611 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
612 ) -> None:
613 """Remove track(s) from playlist."""
614 self.logger.warning(
615 "Removing tracks from playlists is not supported by the Apple Music "
616 "API. Make sure to delete them using the Apple Music app."
617 )
618
619 @use_cache(3600 * 24) # cache for 24 hours
620 async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
621 """Retrieve a dynamic list of tracks based on the provided item."""
622 # Note, Apple music does not have an official endpoint for similar tracks.
623 # We will use the next-tracks endpoint to get a list of tracks that are similar to the
624 # provided track. However, Apple music only provides 2 tracks at a time, so we will
625 # need to call the endpoint multiple times. Therefore, set a limit to 6 to prevent
626 # flooding the apple music api.
627 limit = 6
628 endpoint = f"me/stations/next-tracks/ra.{prov_track_id}"
629 found_tracks = []
630 while len(found_tracks) < limit:
631 response = await self._post_data(endpoint, include="artists")
632 if not response or "data" not in response:
633 break
634 track_ids = [track["id"] for track in response["data"] if track and track["id"]]
635 rating_response = await self._get_ratings(track_ids, MediaType.TRACK)
636 for track in response["data"]:
637 if track and track["id"]:
638 is_favourite = rating_response.get(track["id"])
639 found_tracks.append(self._parse_track(track, is_favourite))
640 return found_tracks
641
642 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
643 """Return the content details for the given track when it will be streamed."""
644 stream_metadata = await self._fetch_song_stream_metadata(item_id)
645 if self.is_library_id(item_id):
646 # Library items are not encrypted and do not need decryption keys
647 try:
648 stream_url = stream_metadata["assets"][0]["URL"]
649 except (KeyError, IndexError, TypeError) as exc:
650 raise MediaNotFoundError(
651 f"Failed to extract stream URL for library track {item_id}: {exc}"
652 ) from exc
653 return StreamDetails(
654 item_id=item_id,
655 provider=self.instance_id,
656 path=stream_url,
657 stream_type=StreamType.HTTP,
658 audio_format=AudioFormat(content_type=ContentType.UNKNOWN),
659 can_seek=True,
660 allow_seek=True,
661 )
662 # Continue to obtain decryption keys for catalog items
663 license_url = stream_metadata["hls-key-server-url"]
664 stream_url, uri = await self._parse_stream_url_and_uri(stream_metadata["assets"])
665 if not stream_url or not uri:
666 raise MediaNotFoundError("No stream URL found for song.")
667 key_id = base64.b64decode(uri.split(",")[1])
668 return StreamDetails(
669 item_id=item_id,
670 provider=self.instance_id,
671 audio_format=AudioFormat(content_type=ContentType.MP4, codec_type=ContentType.AAC),
672 stream_type=StreamType.ENCRYPTED_HTTP,
673 decryption_key=await self._get_decryption_key(license_url, key_id, uri, item_id),
674 path=stream_url,
675 can_seek=True,
676 allow_seek=True,
677 )
678
679 async def set_favorite(self, prov_item_id: str, media_type: MediaType, favorite: bool) -> None:
680 """Set the favorite status of an item."""
681 data = {
682 "type": "ratings",
683 "attributes": {
684 "value": 1 if favorite else -1,
685 },
686 }
687 item_type = self._translate_media_type_to_apple_type(media_type)
688 if self._is_catalog_id(prov_item_id):
689 endpoint = f"me/ratings/{item_type}/{prov_item_id}"
690 else:
691 endpoint = f"me/ratings/library-{item_type}/{prov_item_id}"
692 await self._put_data(endpoint, data=data)
693
694 def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
695 """Parse artist object to generic layout."""
696 relationships = artist_obj.get("relationships", {})
697 if (
698 artist_obj.get("type") == "library-artists"
699 and relationships.get("catalog", {}).get("data", []) != []
700 ):
701 artist_id = relationships["catalog"]["data"][0]["id"]
702 attributes = relationships["catalog"]["data"][0]["attributes"]
703 elif "attributes" in artist_obj:
704 artist_id = artist_obj["id"]
705 attributes = artist_obj["attributes"]
706 else:
707 artist_id = artist_obj["id"]
708 self.logger.debug("No attributes found for artist %s", artist_obj)
709 # No more details available other than the id, return an ItemMapping
710 return ItemMapping(
711 media_type=MediaType.ARTIST,
712 provider=self.instance_id,
713 item_id=artist_id,
714 name=artist_id,
715 )
716 artist = Artist(
717 item_id=artist_id,
718 name=attributes.get("name"),
719 provider=self.domain,
720 provider_mappings={
721 ProviderMapping(
722 item_id=artist_id,
723 provider_domain=self.domain,
724 provider_instance=self.instance_id,
725 url=attributes.get("url"),
726 )
727 },
728 )
729 if artwork := attributes.get("artwork"):
730 artist.metadata.add_image(
731 MediaItemImage(
732 provider=self.instance_id,
733 type=ImageType.THUMB,
734 path=artwork["url"].format(w=artwork["width"], h=artwork["height"]),
735 remotely_accessible=True,
736 )
737 )
738 if genres := attributes.get("genreNames"):
739 artist.metadata.genres = set(genres)
740 if notes := attributes.get("editorialNotes"):
741 artist.metadata.description = notes.get("standard") or notes.get("short")
742 return artist
743
744 def _parse_album(
745 self, album_obj: dict, is_favourite: bool | None = None
746 ) -> Album | ItemMapping | None:
747 """Parse album object to generic layout."""
748 relationships = album_obj.get("relationships", {})
749 response_type = album_obj.get("type")
750 if (
751 response_type == "library-albums"
752 and relationships["catalog"]["data"] != []
753 and "attributes" in relationships["catalog"]["data"][0]
754 ):
755 album_id = relationships.get("catalog", {})["data"][0]["id"]
756 attributes = relationships.get("catalog", {})["data"][0]["attributes"]
757 elif "attributes" in album_obj:
758 album_id = album_obj["id"]
759 attributes = album_obj["attributes"]
760 else:
761 album_id = album_obj["id"]
762 # No more details available other than the id, return an ItemMapping
763 return ItemMapping(
764 media_type=MediaType.ALBUM,
765 provider=self.instance_id,
766 item_id=album_id,
767 name=album_id,
768 )
769 is_available_in_catalog = attributes.get("url") is not None
770 if not is_available_in_catalog:
771 self.logger.debug(
772 "Skipping album %s. Album is not available in the Apple Music catalog.",
773 attributes.get("name"),
774 )
775 return None
776 name, version = parse_title_and_version(attributes["name"])
777 album = Album(
778 item_id=album_id,
779 provider=self.domain,
780 name=name,
781 version=version,
782 provider_mappings={
783 ProviderMapping(
784 item_id=album_id,
785 provider_domain=self.domain,
786 provider_instance=self.instance_id,
787 url=attributes.get("url"),
788 available=attributes.get("playParams", {}).get("id") is not None,
789 )
790 },
791 )
792 if artists := relationships.get("artists"):
793 album.artists = UniqueList([self._parse_artist(artist) for artist in artists["data"]])
794 elif artist_name := attributes.get("artistName"):
795 album.artists = UniqueList(
796 [
797 ItemMapping(
798 media_type=MediaType.ARTIST,
799 provider=self.instance_id,
800 item_id=artist_name,
801 name=artist_name,
802 )
803 ]
804 )
805 if release_date := attributes.get("releaseDate"):
806 album.year = int(release_date.split("-")[0])
807 if genres := attributes.get("genreNames"):
808 album.metadata.genres = set(genres)
809 if artwork := attributes.get("artwork"):
810 album.metadata.add_image(
811 MediaItemImage(
812 provider=self.instance_id,
813 type=ImageType.THUMB,
814 path=artwork["url"].format(w=artwork["width"], h=artwork["height"]),
815 remotely_accessible=True,
816 )
817 )
818 if album_copyright := attributes.get("copyright"):
819 album.metadata.copyright = album_copyright
820 if record_label := attributes.get("recordLabel"):
821 album.metadata.label = record_label
822 if upc := attributes.get("upc"):
823 album.external_ids.add((ExternalID.BARCODE, "0" + upc))
824 if notes := attributes.get("editorialNotes"):
825 album.metadata.description = notes.get("standard") or notes.get("short")
826 if content_rating := attributes.get("contentRating"):
827 album.metadata.explicit = content_rating == "explicit"
828 album_type = AlbumType.ALBUM
829 if attributes.get("isSingle"):
830 album_type = AlbumType.SINGLE
831 elif attributes.get("isCompilation"):
832 album_type = AlbumType.COMPILATION
833 album.album_type = album_type
834
835 # Try inference - override if it finds something more specific
836 # Apple Music doesn't seem to have version field
837 inferred_type = infer_album_type(album.name, "")
838 if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
839 album.album_type = inferred_type
840 album.favorite = is_favourite or False
841 return album
842
843 def _parse_track(
844 self,
845 track_obj: dict[str, Any],
846 is_favourite: bool | None = None,
847 ) -> Track:
848 """Parse track object to generic layout."""
849 relationships = track_obj.get("relationships", {})
850 if (
851 track_obj.get("type") == "library-songs"
852 and relationships.get("catalog", {}).get("data", []) != []
853 ):
854 # Library track with catalog version available
855 track_id = relationships.get("catalog", {})["data"][0]["id"]
856 attributes = relationships.get("catalog", {})["data"][0]["attributes"]
857 elif "attributes" in track_obj:
858 # Catalog track or library-only track
859 track_id = track_obj["id"]
860 attributes = track_obj["attributes"]
861 else:
862 track_id = track_obj["id"]
863 attributes = {}
864 name, version = parse_title_and_version(attributes.get("name", ""))
865 track = Track(
866 item_id=track_id,
867 provider=self.domain,
868 name=name,
869 version=version,
870 duration=attributes.get("durationInMillis", 0) / 1000,
871 provider_mappings={
872 ProviderMapping(
873 item_id=track_id,
874 provider_domain=self.domain,
875 provider_instance=self.instance_id,
876 audio_format=AudioFormat(content_type=ContentType.AAC),
877 url=attributes.get("url"),
878 available=attributes.get("playParams", {}).get("id") is not None,
879 )
880 },
881 )
882 if disc_number := attributes.get("discNumber"):
883 track.disc_number = disc_number
884 if track_number := attributes.get("trackNumber"):
885 track.track_number = track_number
886 # Prefer catalog information over library information for artists.
887 # For compilations it picks the wrong artists
888 if "artists" in relationships:
889 artists = relationships["artists"]
890 track.artists = [self._parse_artist(artist) for artist in artists["data"]]
891 # 'Similar tracks' do not provide full artist details
892 elif artist_name := attributes.get("artistName"):
893 track.artists = [
894 ItemMapping(
895 media_type=MediaType.ARTIST,
896 item_id=artist_name,
897 provider=self.instance_id,
898 name=artist_name,
899 )
900 ]
901 if albums := relationships.get("albums"):
902 if "data" in albums and len(albums["data"]) > 0:
903 track.album = self._parse_album(albums["data"][0])
904 if artwork := attributes.get("artwork"):
905 track.metadata.add_image(
906 MediaItemImage(
907 provider=self.instance_id,
908 type=ImageType.THUMB,
909 path=artwork["url"].format(w=artwork["width"], h=artwork["height"]),
910 remotely_accessible=True,
911 )
912 )
913 if genres := attributes.get("genreNames"):
914 track.metadata.genres = set(genres)
915 if composers := attributes.get("composerName"):
916 track.metadata.performers = set(composers.split(", "))
917 if isrc := attributes.get("isrc"):
918 track.external_ids.add((ExternalID.ISRC, isrc))
919 track.favorite = is_favourite or False
920 return track
921
922 def _parse_playlist(
923 self, playlist_obj: dict[str, Any], is_favourite: bool | None = None
924 ) -> Playlist:
925 """Parse Apple Music playlist object to generic layout."""
926 attributes = playlist_obj["attributes"]
927 playlist_id = attributes["playParams"].get("globalId") or playlist_obj["id"]
928 is_editable = attributes.get("canEdit", False)
929 playlist = Playlist(
930 item_id=playlist_id,
931 provider=self.instance_id,
932 name=attributes.get("name", UNKNOWN_PLAYLIST_NAME),
933 owner=attributes.get("curatorName", "me"),
934 provider_mappings={
935 ProviderMapping(
936 item_id=playlist_id,
937 provider_domain=self.domain,
938 provider_instance=self.instance_id,
939 url=attributes.get("url"),
940 is_unique=is_editable, # user-owned playlists are unique
941 )
942 },
943 is_editable=is_editable,
944 )
945 if artwork := attributes.get("artwork"):
946 url = artwork["url"]
947 if artwork["width"] and artwork["height"]:
948 url = url.format(w=artwork["width"], h=artwork["height"])
949 playlist.metadata.add_image(
950 MediaItemImage(
951 provider=self.instance_id,
952 type=ImageType.THUMB,
953 path=url,
954 remotely_accessible=True,
955 )
956 )
957 if description := attributes.get("description"):
958 playlist.metadata.description = description.get("standard")
959 playlist.favorite = is_favourite or False
960 return playlist
961
962 async def _get_all_items(self, endpoint, key="data", **kwargs) -> list[dict]:
963 """Get all items from a paged list."""
964 limit = 50
965 offset = 0
966 all_items = []
967 while True:
968 kwargs["limit"] = limit
969 kwargs["offset"] = offset
970 result = await self._get_data(endpoint, **kwargs)
971 if key not in result:
972 break
973 all_items += result[key]
974 if not result.get("next"):
975 break
976 offset += limit
977 return all_items
978
979 @throttle_with_retries
980 async def _get_data(self, endpoint, **kwargs) -> dict[str, Any]:
981 """Get data from api."""
982 url = f"https://api.music.apple.com/v1/{endpoint}"
983 headers = {"Authorization": f"Bearer {self._music_app_token}"}
984 headers["Music-User-Token"] = self._music_user_token
985 async with (
986 self.mass.http_session.get(
987 url, headers=headers, params=kwargs, ssl=True, timeout=120
988 ) as response,
989 ):
990 if response.status == 404 and "limit" in kwargs and "offset" in kwargs:
991 return {}
992 # Convert HTTP errors to exceptions
993 if response.status == 404:
994 raise MediaNotFoundError(f"{endpoint} not found")
995 if response.status == 504:
996 # See if we can get more info from the response on occasional timeouts
997 self.logger.debug(
998 "Apple Music API Timeout: url=%s, params=%s, response_headers=%s",
999 url,
1000 kwargs,
1001 response.headers,
1002 )
1003 raise ResourceTemporarilyUnavailable("Apple Music API Timeout")
1004 if response.status == 429:
1005 # Debug this for now to see if the response headers give us info about the
1006 # backoff time. There is no documentation on this.
1007 self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1008 raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1009 if response.status == 500:
1010 raise MusicAssistantError("Unexpected server error when calling Apple Music")
1011 response.raise_for_status()
1012 return await response.json(loads=json_loads)
1013
1014 @throttle_with_retries
1015 async def _delete_data(self, endpoint, data=None, **kwargs) -> None:
1016 """Delete data from api."""
1017 url = f"https://api.music.apple.com/v1/{endpoint}"
1018 headers = {"Authorization": f"Bearer {self._music_app_token}"}
1019 headers["Music-User-Token"] = self._music_user_token
1020 async with (
1021 self.mass.http_session.delete(
1022 url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1023 ) as response,
1024 ):
1025 # Convert HTTP errors to exceptions
1026 if response.status == 404:
1027 raise MediaNotFoundError(f"{endpoint} not found")
1028 if response.status == 429:
1029 # Debug this for now to see if the response headers give us info about the
1030 # backoff time. There is no documentation on this.
1031 self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1032 raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1033 response.raise_for_status()
1034
1035 async def _put_data(self, endpoint, data=None, **kwargs) -> str:
1036 """Put data on api."""
1037 url = f"https://api.music.apple.com/v1/{endpoint}"
1038 headers = {"Authorization": f"Bearer {self._music_app_token}"}
1039 headers["Music-User-Token"] = self._music_user_token
1040 async with (
1041 self.mass.http_session.put(
1042 url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1043 ) as response,
1044 ):
1045 # Convert HTTP errors to exceptions
1046 if response.status == 404:
1047 raise MediaNotFoundError(f"{endpoint} not found")
1048 if response.status == 429:
1049 # Debug this for now to see if the response headers give us info about the
1050 # backoff time. There is no documentation on this.
1051 self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1052 raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1053 response.raise_for_status()
1054 if response.content_length:
1055 return await response.json(loads=json_loads)
1056 return {}
1057
1058 @throttle_with_retries
1059 async def _post_data(self, endpoint, data=None, **kwargs) -> str:
1060 """Post data on api."""
1061 url = f"https://api.music.apple.com/v1/{endpoint}"
1062 headers = {"Authorization": f"Bearer {self._music_app_token}"}
1063 headers["Music-User-Token"] = self._music_user_token
1064 async with (
1065 self.mass.http_session.post(
1066 url, headers=headers, params=kwargs, json=data, ssl=True, timeout=120
1067 ) as response,
1068 ):
1069 # Convert HTTP errors to exceptions
1070 if response.status == 404:
1071 raise MediaNotFoundError(f"{endpoint} not found")
1072 if response.status == 429:
1073 # Debug this for now to see if the response headers give us info about the
1074 # backoff time. There is no documentation on this.
1075 self.logger.debug("Apple Music Rate Limiter. Headers: %s", response.headers)
1076 raise ResourceTemporarilyUnavailable("Apple Music Rate Limiter")
1077 response.raise_for_status()
1078 return await response.json(loads=json_loads)
1079
1080 async def _get_user_storefront(self) -> str:
1081 """Get the user's storefront."""
1082 locale = self.mass.metadata.locale.replace("_", "-")
1083 language = locale.split("-")[0]
1084 result = await self._get_data("me/storefront", l=language)
1085 return result["data"][0]["id"]
1086
1087 async def _get_ratings(self, item_ids: list[str], media_type: MediaType) -> dict[str, bool]:
1088 """Get ratings (aka favorites) for a list of item ids."""
1089 if media_type == MediaType.ARTIST:
1090 raise NotImplementedError(
1091 "Ratings are not available for artist in the Apple Music API."
1092 )
1093 if len(item_ids) == 0:
1094 return {}
1095 apple_type = self._translate_media_type_to_apple_type(media_type)
1096 endpoint = apple_type if not self.is_library_id(item_ids[0]) else f"library-{apple_type}"
1097 # Apple Music limits to 200 ids per request
1098 max_ids_per_request = 200
1099 results = {}
1100 for i in range(0, len(item_ids), max_ids_per_request):
1101 batch_ids = item_ids[i : i + max_ids_per_request]
1102 response = await self._get_data(
1103 f"me/ratings/{endpoint}",
1104 ids=",".join(batch_ids),
1105 )
1106 results.update(
1107 {
1108 item["id"]: bool(item["attributes"].get("value", False) == 1)
1109 for item in response.get("data", [])
1110 }
1111 )
1112 return results
1113
1114 def _translate_media_type_to_apple_type(self, media_type: MediaType) -> str:
1115 """Translate MediaType to Apple Music endpoint string."""
1116 match media_type:
1117 case MediaType.ARTIST:
1118 return "artists"
1119 case MediaType.ALBUM:
1120 return "albums"
1121 case MediaType.TRACK:
1122 return "songs"
1123 case MediaType.PLAYLIST:
1124 return "playlists"
1125 raise MusicAssistantError(f"Unsupported media type: {media_type}")
1126
1127 def is_library_id(self, library_id) -> bool:
1128 """Check a library ID matches known format."""
1129 if not isinstance(library_id, str):
1130 return False
1131 valid = re.findall(r"^(?:[ailp]\.)[a-zA-Z0-9]+$", library_id)
1132 return bool(valid)
1133
1134 def _is_catalog_id(self, catalog_id: str) -> bool:
1135 """Check if input is a catalog id, or a library id."""
1136 return catalog_id.isnumeric() or catalog_id.startswith("pl.")
1137
1138 async def _fetch_song_stream_metadata(self, song_id: str) -> str:
1139 """Get the stream URL for a song from Apple Music."""
1140 playback_url = "https://play.music.apple.com/WebObjects/MZPlay.woa/wa/webPlayback"
1141 data = {}
1142 self.logger.debug("_fetch_song_stream_metadata: Check if Library ID: %s", song_id)
1143 if self.is_library_id(song_id):
1144 data["universalLibraryId"] = song_id
1145 data["isLibrary"] = True
1146 else:
1147 data["salableAdamId"] = song_id
1148 for retry in (True, False):
1149 try:
1150 async with self.mass.http_session.post(
1151 playback_url, headers=self._get_decryption_headers(), json=data, ssl=True
1152 ) as response:
1153 response.raise_for_status()
1154 content = await response.json(loads=json_loads)
1155 if content.get("failureType"):
1156 message = content.get("failureMessage")
1157 raise MediaNotFoundError(f"Failed to get song stream metadata: {message}")
1158 return content["songList"][0]
1159 except (MediaNotFoundError, ClientError) as exc:
1160 if retry:
1161 self.logger.warning("Failed to get song stream metadata: %s", exc)
1162 continue
1163 raise
1164 raise MediaNotFoundError(f"Failed to get song stream metadata for {song_id}")
1165
1166 async def _parse_stream_url_and_uri(self, stream_assets: list[dict]) -> str:
1167 """Parse the Stream URL and Key URI from the song."""
1168 ctrp256_urls = [asset["URL"] for asset in stream_assets if asset["flavor"] == "28:ctrp256"]
1169 if len(ctrp256_urls) == 0:
1170 raise MediaNotFoundError("No ctrp256 URL found for song.")
1171 playlist_url = ctrp256_urls[0]
1172 playlist_items = await fetch_playlist(self.mass, ctrp256_urls[0], raise_on_hls=False)
1173 # Apple returns a HLS (substream) playlist but instead of chunks,
1174 # each item is just the whole file. So we simply grab the first playlist item.
1175 playlist_item = playlist_items[0]
1176 # path is relative, stitch it together
1177 base_path = playlist_url.rsplit("/", 1)[0]
1178 track_url = base_path + "/" + playlist_items[0].path
1179 key = playlist_item.key
1180 return (track_url, key)
1181
1182 def _get_decryption_headers(self):
1183 """Get headers for decryption requests."""
1184 return {
1185 "authorization": f"Bearer {self._music_app_token}",
1186 "media-user-token": self._music_user_token,
1187 "connection": "keep-alive",
1188 "accept": "application/json",
1189 "origin": "https://music.apple.com",
1190 "referer": "https://music.apple.com/",
1191 "accept-encoding": "gzip, deflate, br",
1192 "content-type": "application/json;charset=utf-8",
1193 "user-agent": (
1194 "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
1195 " Chrome/110.0.0.0 Safari/537.36"
1196 ),
1197 }
1198
1199 async def _get_decryption_key(
1200 self, license_url: str, key_id: bytes, uri: str, item_id: str
1201 ) -> str:
1202 """Get the decryption key for a song."""
1203 if decryption_key := await self.mass.cache.get(
1204 key=item_id,
1205 provider=self.instance_id,
1206 category=CACHE_CATEGORY_DECRYPT_KEY,
1207 checksum=self._session_id,
1208 ):
1209 self.logger.debug("Decryption key for %s found in cache.", item_id)
1210 return decryption_key
1211 pssh = self._get_pssh(key_id)
1212 device = Device(
1213 client_id=self._decrypt_client_id,
1214 private_key=self._decrypt_private_key,
1215 type_=DeviceTypes.ANDROID,
1216 security_level=3,
1217 flags={},
1218 )
1219 cdm = Cdm.from_device(device)
1220 session_id = cdm.open()
1221 challenge = cdm.get_license_challenge(session_id, pssh)
1222 track_license = await self._get_license(challenge, license_url, uri, item_id)
1223 cdm.parse_license(session_id, track_license)
1224 key = next(key for key in cdm.get_keys(session_id) if key.type == "CONTENT")
1225 if not key:
1226 raise MediaNotFoundError("Unable to get decryption key for song %s.", item_id)
1227 cdm.close(session_id)
1228 decryption_key = key.key.hex()
1229 self.mass.create_task(
1230 self.mass.cache.set(
1231 key=item_id,
1232 data=decryption_key,
1233 expiration=3600,
1234 provider=self.instance_id,
1235 category=CACHE_CATEGORY_DECRYPT_KEY,
1236 checksum=self._session_id,
1237 )
1238 )
1239 return decryption_key
1240
1241 def _get_pssh(self, key_id: bytes) -> PSSH:
1242 """Get the PSSH for a song."""
1243 pssh_data = WidevinePsshData()
1244 pssh_data.algorithm = 1
1245 pssh_data.key_ids.append(key_id)
1246 init_data = base64.b64encode(pssh_data.SerializeToString()).decode("utf-8")
1247 return PSSH.new(system_id=PSSH.SystemId.Widevine, init_data=init_data)
1248
1249 async def _get_license(self, challenge: bytes, license_url: str, uri: str, item_id: str) -> str:
1250 """Get the license for a song based on the challenge."""
1251 challenge_b64 = base64.b64encode(challenge).decode("utf-8")
1252 data = {
1253 "challenge": challenge_b64,
1254 "key-system": "com.widevine.alpha",
1255 "uri": uri,
1256 "adamId": item_id,
1257 "isLibrary": False,
1258 "user-initiated": True,
1259 }
1260 async with self.mass.http_session.post(
1261 license_url, data=json.dumps(data), headers=self._get_decryption_headers(), ssl=False
1262 ) as response:
1263 response.raise_for_status()
1264 content = await response.json(loads=json_loads)
1265 track_license = content.get("license")
1266 if not track_license:
1267 raise MediaNotFoundError("No license found for song %s.", item_id)
1268 return track_license
1269