/
/
/
1"""Youtube Music support for MusicAssistant."""
2
3from __future__ import annotations
4
5import asyncio
6import importlib
7import logging
8import time
9from collections.abc import AsyncGenerator
10from contextlib import suppress
11from datetime import datetime
12from io import StringIO
13from typing import TYPE_CHECKING, Any
14from urllib.parse import parse_qs, unquote, urlparse
15
16from aiohttp import ClientConnectorError
17from duration_parser import parse as parse_str_duration
18from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
19from music_assistant_models.enums import (
20 AlbumType,
21 ConfigEntryType,
22 ContentType,
23 ImageType,
24 ProviderFeature,
25 StreamType,
26)
27from music_assistant_models.errors import (
28 InvalidDataError,
29 LoginFailed,
30 MediaNotFoundError,
31 SetupFailedError,
32 UnplayableMediaError,
33)
34from music_assistant_models.media_items import (
35 Album,
36 Artist,
37 AudioFormat,
38 ItemMapping,
39 MediaItemImage,
40 MediaItemType,
41 MediaType,
42 Playlist,
43 Podcast,
44 PodcastEpisode,
45 ProviderMapping,
46 RecommendationFolder,
47 SearchResults,
48 Track,
49 UniqueList,
50)
51from music_assistant_models.streamdetails import StreamDetails
52from ytmusicapi.constants import SUPPORTED_LANGUAGES
53from ytmusicapi.exceptions import YTMusicServerError
54from ytmusicapi.helpers import get_authorization, sapisid_from_cookie
55
56from music_assistant.constants import CONF_USERNAME, VERBOSE_LOG_LEVEL
57from music_assistant.controllers.cache import use_cache
58from music_assistant.helpers.util import infer_album_type, install_package, parse_title_and_version
59from music_assistant.models.music_provider import MusicProvider
60
61from .helpers import (
62 add_remove_playlist_tracks,
63 convert_to_netscape,
64 determine_recommendation_icon,
65 get_album,
66 get_artist,
67 get_home,
68 get_library_albums,
69 get_library_artists,
70 get_library_playlists,
71 get_library_podcasts,
72 get_library_tracks,
73 get_playlist,
74 get_podcast,
75 get_podcast_episode,
76 get_song_radio_tracks,
77 get_track,
78 is_brand_account,
79 library_add_remove_album,
80 library_add_remove_artist,
81 library_add_remove_playlist,
82 search,
83)
84
85if TYPE_CHECKING:
86 from music_assistant_models.config_entries import ProviderConfig
87 from music_assistant_models.provider import ProviderManifest
88
89 from music_assistant import MusicAssistant
90 from music_assistant.models import ProviderInstanceType
91
92
93CONF_COOKIE = "cookie"
94CONF_PO_TOKEN_SERVER_URL = "po_token_server_url"
95DEFAULT_PO_TOKEN_SERVER_URL = "http://127.0.0.1:4416"
96
97YTM_DOMAIN = "https://music.youtube.com"
98YTM_COOKIE_DOMAIN = ".youtube.com"
99YTM_BASE_URL = f"{YTM_DOMAIN}/youtubei/v1/"
100VARIOUS_ARTISTS_YTM_ID = "UCUTXlgdcKU5vfzFqHOWIvkA"
101# Playlist ID's are not unique across instances for lists like 'Liked videos', 'SuperMix' etc.
102# So we need to add a delimiter to make them unique
103YT_PLAYLIST_ID_DELIMITER = "ðµ"
104PODCAST_EPISODE_SPLITTER = "|"
105YT_LIKED_SONGS_PLAYLIST_ID = "LM"
106YT_PERSONAL_PLAYLISTS = (
107 YT_LIKED_SONGS_PLAYLIST_ID, # Liked songs
108 "SE", # Episodes for Later
109 "RDTMAK5uy_kset8DisdE7LSD4TNjEVvrKRTmG7a56sY", # SuperMix
110 "RDTMAK5uy_nGQKSMIkpr4o9VI_2i56pkGliD6FQRo50", # My Mix 1
111 "RDTMAK5uy_lz2owBgwWf1mjzyn_NbxzMViQzIg8IAIg", # My Mix 2
112 "RDTMAK5uy_k5UUl0lmrrfrjMpsT0CoMpdcBz1ruAO1k", # My Mix 3
113 "RDTMAK5uy_nTsa0Irmcu2li2-qHBoZxtrpG9HuC3k_Q", # My Mix 4
114 "RDTMAK5uy_lfZhS7zmIcmUhsKtkWylKzc0EN0LW90-s", # My Mix 5
115 "RDTMAK5uy_k78ni6Y4fyyl0r2eiKkBEICh9Q5wJdfXk", # My Mix 6
116 "RDTMAK5uy_lfhhWWw9v71CPrR7MRMHgZzbH6Vku9iJc", # My Mix 7
117 "RDTMAK5uy_n_5IN6hzAOwdCnM8D8rzrs3vDl12UcZpA", # Discover Mix
118 "RDTMAK5uy_lr0LWzGrq6FU9GIxWvFHTRPQD2LHMqlFA", # New Release Mix
119 "RDTMAK5uy_nilrsVWxrKskY0ZUpVZ3zpB0u4LwWTVJ4", # Replay Mix
120 "RDTMAK5uy_mZtXeU08kxXJOUhL0ETdAuZTh1z7aAFAo", # Archive Mix
121)
122DYNAMIC_PLAYLIST_TRACK_LIMIT = 300
123YTM_PREMIUM_CHECK_TRACK_ID = "dQw4w9WgXcQ"
124PACKAGES_TO_INSTALL = ("yt-dlp[default]", "bgutil-ytdlp-pot-provider")
125DEFAULT_STREAM_URL_EXPIRATION = 3600 # 1 hour
126
127SUPPORTED_FEATURES = {
128 ProviderFeature.LIBRARY_ARTISTS,
129 ProviderFeature.LIBRARY_ALBUMS,
130 ProviderFeature.LIBRARY_TRACKS,
131 ProviderFeature.LIBRARY_PLAYLISTS,
132 ProviderFeature.BROWSE,
133 ProviderFeature.SEARCH,
134 ProviderFeature.ARTIST_ALBUMS,
135 ProviderFeature.ARTIST_TOPTRACKS,
136 ProviderFeature.SIMILAR_TRACKS,
137 ProviderFeature.LIBRARY_PODCASTS,
138 ProviderFeature.RECOMMENDATIONS,
139}
140
141
142# TODO: fix disabled tests
143# ruff: noqa: PLW2901
144
145
146async def setup(
147 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
148) -> ProviderInstanceType:
149 """Initialize provider(instance) with given configuration."""
150 return YoutubeMusicProvider(mass, manifest, config, SUPPORTED_FEATURES)
151
152
153async def get_config_entries(
154 mass: MusicAssistant, # noqa: ARG001
155 instance_id: str | None = None, # noqa: ARG001
156 action: str | None = None, # noqa: ARG001
157 values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
158) -> tuple[ConfigEntry, ...]:
159 """
160 Return Config entries to setup this provider.
161
162 instance_id: id of an existing provider instance (None if new instance setup).
163 action: [optional] action key called from config entries UI.
164 values: the (intermediate) raw values for config entries sent with the action.
165 """
166 return (
167 ConfigEntry(
168 key=CONF_USERNAME, type=ConfigEntryType.STRING, label="Username", required=True
169 ),
170 ConfigEntry(
171 key=CONF_COOKIE,
172 type=ConfigEntryType.SECURE_STRING,
173 label="Login Cookie",
174 required=True,
175 description="The Login cookie you grabbed from an existing session, "
176 "see the documentation.",
177 ),
178 ConfigEntry(
179 key=CONF_PO_TOKEN_SERVER_URL,
180 type=ConfigEntryType.STRING,
181 default_value=DEFAULT_PO_TOKEN_SERVER_URL,
182 label="PO Token Server URL",
183 required=True,
184 description="The URL to the PO Token server. "
185 "Can be left as default for most people. \n\n"
186 "**Note that this does require you to have the "
187 "'YT Music PO Token Generator' addon installed!**",
188 ),
189 )
190
191
192class YoutubeMusicProvider(MusicProvider):
193 """Provider for Youtube Music."""
194
195 _headers = None
196 _context = None
197 _cookies = None
198 _cipher = None
199 _yt_user = None
200 _cookie = None
201 _yt_dlp_module = None
202
203 async def handle_async_init(self) -> None:
204 """Set up the YTMusic provider."""
205 logging.getLogger("yt_dlp").setLevel(self.logger.level + 10)
206 await self._install_packages()
207 self._cookie = self.config.get_value(CONF_COOKIE)
208 self._po_token_server_url = (
209 self.config.get_value(CONF_PO_TOKEN_SERVER_URL) or DEFAULT_PO_TOKEN_SERVER_URL
210 )
211 if not await self._verify_po_token_url():
212 raise LoginFailed(
213 "PO Token server URL is not reachable. "
214 "Make sure you have installed the YT Music PO Token Generator "
215 "and that it is running."
216 )
217 yt_username = self.config.get_value(CONF_USERNAME)
218 self._yt_user = yt_username if is_brand_account(yt_username) else None
219 # yt-dlp needs a netscape formatted cookie
220 self._netscape_cookie = convert_to_netscape(self._cookie, YTM_COOKIE_DOMAIN)
221 self._initialize_headers()
222 self._initialize_context()
223 self._cookies = {"CONSENT": "YES+1"}
224 # get default language (that is supported by YTM)
225 mass_locale = self.mass.metadata.locale
226 for lang_code in SUPPORTED_LANGUAGES:
227 if lang_code in (mass_locale, mass_locale.split("_")[0]):
228 self.language = lang_code
229 break
230 else:
231 self.language = "en"
232 if not await self._user_has_ytm_premium():
233 raise LoginFailed("User does not have Youtube Music Premium")
234
235 @use_cache(3600 * 24 * 7) # Cache for 7 days
236 async def search(
237 self, search_query: str, media_types=list[MediaType], limit: int = 5
238 ) -> SearchResults:
239 """Perform search on musicprovider.
240
241 :param search_query: Search query.
242 :param media_types: A list of media_types to include. All types if None.
243 :param limit: Number of items to return in the search (per type).
244 """
245 parsed_results = SearchResults()
246 ytm_filter = None
247 if len(media_types) == 1:
248 # YTM does not support multiple searchtypes, falls back to all if no type given
249 if media_types[0] == MediaType.ARTIST:
250 ytm_filter = "artists"
251 if media_types[0] == MediaType.ALBUM:
252 ytm_filter = "albums"
253 if media_types[0] == MediaType.TRACK:
254 ytm_filter = "songs"
255 if media_types[0] == MediaType.PLAYLIST:
256 ytm_filter = "playlists"
257 if media_types[0] == MediaType.RADIO:
258 # bit of an edge case but still good to handle
259 return parsed_results
260 results = await search(
261 query=search_query, ytm_filter=ytm_filter, limit=limit, language=self.language
262 )
263 parsed_results = SearchResults()
264 for result in results:
265 try:
266 if result["resultType"] == "artist" and MediaType.ARTIST in media_types:
267 parsed_results.artists.append(self._parse_artist(result))
268 elif result["resultType"] == "album" and MediaType.ALBUM in media_types:
269 parsed_results.albums.append(self._parse_album(result))
270 elif result["resultType"] == "playlist" and MediaType.PLAYLIST in media_types:
271 parsed_results.playlists.append(self._parse_playlist(result))
272 elif (
273 result["resultType"] in ("song", "video")
274 and MediaType.TRACK in media_types
275 and (track := self._parse_track(result))
276 ):
277 parsed_results.tracks.append(track)
278 except InvalidDataError:
279 pass # ignore invalid item
280 return parsed_results
281
282 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
283 """Retrieve all library artists from Youtube Music."""
284 artists_obj = await get_library_artists(
285 headers=self._headers, language=self.language, user=self._yt_user
286 )
287 for artist in artists_obj:
288 yield self._parse_artist(artist)
289
290 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
291 """Retrieve all library albums from Youtube Music."""
292 albums_obj = await get_library_albums(
293 headers=self._headers, language=self.language, user=self._yt_user
294 )
295 for album in albums_obj:
296 yield self._parse_album(album, album["browseId"])
297
298 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
299 """Retrieve all library playlists from the provider."""
300 playlists_obj = await get_library_playlists(
301 headers=self._headers, language=self.language, user=self._yt_user
302 )
303 for playlist in playlists_obj:
304 yield self._parse_playlist(playlist)
305
306 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
307 """Retrieve library tracks from Youtube Music."""
308 tracks_obj = await get_library_tracks(
309 headers=self._headers, language=self.language, user=self._yt_user
310 )
311 for track in tracks_obj:
312 # Library tracks sometimes do not have a valid artist id
313 # In that case, call the API for track details based on track id
314 try:
315 yield self._parse_track(track)
316 except InvalidDataError:
317 track = await self.get_track(track["videoId"])
318 yield track
319
320 async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
321 """Retrieve the library podcasts from Youtube Music."""
322 podcasts_obj = await get_library_podcasts(
323 headers=self._headers, language=self.language, user=self._yt_user
324 )
325 for podcast in podcasts_obj:
326 yield self._parse_podcast(podcast)
327
328 @use_cache(3600 * 24 * 30) # Cache for 30 days
329 async def get_album(self, prov_album_id) -> Album:
330 """Get full album details by id."""
331 if album_obj := await get_album(prov_album_id=prov_album_id, language=self.language):
332 return self._parse_album(album_obj=album_obj, album_id=prov_album_id)
333 msg = f"Item {prov_album_id} not found"
334 raise MediaNotFoundError(msg)
335
336 @use_cache(3600 * 24 * 30) # Cache for 30 days
337 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
338 """Get album tracks for given album id."""
339 album_obj = await get_album(prov_album_id=prov_album_id, language=self.language)
340 if not album_obj.get("tracks"):
341 return []
342 tracks = []
343 for track_number, track_obj in enumerate(album_obj["tracks"], 1):
344 try:
345 track = self._parse_track(track_obj=track_obj, track_number=track_number)
346 except InvalidDataError:
347 continue
348 tracks.append(track)
349 return tracks
350
351 @use_cache(3600 * 24 * 30) # Cache for 30 days
352 async def get_artist(self, prov_artist_id) -> Artist:
353 """Get full artist details by id."""
354 if artist_obj := await get_artist(
355 prov_artist_id=prov_artist_id, headers=self._headers, language=self.language
356 ):
357 return self._parse_artist(artist_obj=artist_obj)
358 msg = f"Item {prov_artist_id} not found"
359 raise MediaNotFoundError(msg)
360
361 @use_cache(3600 * 24 * 30) # Cache for 30 days
362 async def get_track(self, prov_track_id) -> Track:
363 """Get full track details by id."""
364 if track_obj := await get_track(
365 prov_track_id=prov_track_id,
366 headers=self._headers,
367 language=self.language,
368 ):
369 return self._parse_track(track_obj)
370 msg = f"Item {prov_track_id} not found"
371 raise MediaNotFoundError(msg)
372
373 @use_cache(3600 * 24 * 7) # Cache for 7 days
374 async def get_playlist(self, prov_playlist_id) -> Playlist:
375 """Get full playlist details by id."""
376 # Grab the full playlist by default
377 limit = None
378 # Grab the playlist id from the full url in case of personal playlists
379 if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
380 prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
381 if (
382 prov_playlist_id in YT_PERSONAL_PLAYLISTS
383 and prov_playlist_id != YT_LIKED_SONGS_PLAYLIST_ID
384 ):
385 # Personal playlists are dynamic and can result in endless tracks
386 # limit to avoid memory issues
387 limit = DYNAMIC_PLAYLIST_TRACK_LIMIT
388 if playlist_obj := await get_playlist(
389 prov_playlist_id=prov_playlist_id,
390 headers=self._headers,
391 language=self.language,
392 user=self._yt_user,
393 limit=limit,
394 ):
395 return self._parse_playlist(playlist_obj)
396 msg = f"Item {prov_playlist_id} not found"
397 raise MediaNotFoundError(msg)
398
399 @use_cache(3600 * 3) # Cache for 3 hours
400 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
401 """Return playlist tracks for the given provider playlist id."""
402 if page > 0:
403 # paging not supported, we always return the whole list at once
404 return []
405 # Grab the full playlist by default
406 limit = None
407 # Grab the playlist id from the full url in case of personal playlists
408 if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
409 prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
410 if (
411 prov_playlist_id in YT_PERSONAL_PLAYLISTS
412 and prov_playlist_id != YT_LIKED_SONGS_PLAYLIST_ID
413 ):
414 # Personal playlists are dynamic and can result in endless tracks
415 # limit to avoid memory issues
416 limit = DYNAMIC_PLAYLIST_TRACK_LIMIT
417 # Add a try to prevent MA from stopping syncing whenever we fail a single playlist
418 try:
419 playlist_obj = await get_playlist(
420 prov_playlist_id=prov_playlist_id,
421 headers=self._headers,
422 user=self._yt_user,
423 limit=limit,
424 )
425 except KeyError as ke:
426 self.logger.warning("Could not load playlist: %s: %s", prov_playlist_id, ke)
427 return []
428 if "tracks" not in playlist_obj:
429 return []
430 result = []
431 # TODO: figure out how to handle paging in YTM
432 for index, track_obj in enumerate(playlist_obj["tracks"], 1):
433 if track_obj["isAvailable"]:
434 # Playlist tracks sometimes do not have a valid artist id
435 # In that case, call the API for track details based on track id
436 try:
437 if track := self._parse_track(track_obj):
438 track.position = index
439 result.append(track)
440 except InvalidDataError:
441 if track := await self.get_track(track_obj["videoId"]):
442 track.position = index
443 result.append(track)
444 # YTM doesn't seem to support paging so we ignore offset and limit
445 return result
446
447 @use_cache(3600 * 24 * 7) # Cache for 7 days
448 async def get_artist_albums(self, prov_artist_id) -> list[Album]:
449 """Get a list of albums for the given artist."""
450 artist_obj = await get_artist(prov_artist_id=prov_artist_id, headers=self._headers)
451 if "albums" in artist_obj and "results" in artist_obj["albums"]:
452 albums = []
453 for album_obj in artist_obj["albums"]["results"]:
454 if "artists" not in album_obj:
455 album_obj["artists"] = [
456 {"id": artist_obj["channelId"], "name": artist_obj["name"]}
457 ]
458 albums.append(self._parse_album(album_obj, album_obj["browseId"]))
459 return albums
460 return []
461
462 @use_cache(3600 * 24 * 7) # Cache for 7 days
463 async def get_artist_toptracks(self, prov_artist_id) -> list[Track]:
464 """Get a list of 25 most popular tracks for the given artist."""
465 artist_obj = await get_artist(prov_artist_id=prov_artist_id, headers=self._headers)
466 if artist_obj.get("songs") and artist_obj["songs"].get("browseId"):
467 prov_playlist_id = artist_obj["songs"]["browseId"]
468 playlist_tracks = await self.get_playlist_tracks(prov_playlist_id)
469 return playlist_tracks[:25]
470 return []
471
472 @use_cache(3600 * 24 * 14) # Cache for 14 days
473 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
474 """Get the full details of a Podcast."""
475 podcast_obj = await get_podcast(prov_podcast_id, headers=self._headers)
476 return self._parse_podcast(podcast_obj)
477
478 async def get_podcast_episodes(
479 self, prov_podcast_id: str
480 ) -> AsyncGenerator[PodcastEpisode, None]:
481 """Get all episodes from a podcast."""
482 podcast_obj = await get_podcast(prov_podcast_id, headers=self._headers)
483 podcast_obj["podcastId"] = prov_podcast_id
484 podcast = self._parse_podcast(podcast_obj)
485 for index, episode_obj in enumerate(podcast_obj.get("episodes", []), start=1):
486 episode = self._parse_podcast_episode(episode_obj, podcast)
487 ep_index = episode_obj.get("index") or index
488 episode.position = ep_index
489 yield episode
490
491 @use_cache(3600 * 3) # Cache for 3 hours
492 async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
493 """Get a single Podcast Episode."""
494 podcast_id, episode_id = prov_episode_id.split(PODCAST_EPISODE_SPLITTER)
495 podcast = await self.get_podcast(podcast_id)
496 episode_obj = await get_podcast_episode(episode_id, headers=self._headers)
497 episode = self._parse_podcast_episode(episode_obj, podcast)
498 episode.position = 0
499 return episode
500
501 async def library_add(self, item: MediaItemType) -> bool:
502 """Add an item to the library."""
503 result = False
504 if item.media_type == MediaType.ARTIST:
505 result = await library_add_remove_artist(
506 headers=self._headers, prov_artist_id=item.item_id, add=True, user=self._yt_user
507 )
508 elif item.media_type == MediaType.ALBUM:
509 result = await library_add_remove_album(
510 headers=self._headers, prov_item_id=item.item_id, add=True, user=self._yt_user
511 )
512 elif item.media_type == MediaType.PLAYLIST:
513 result = await library_add_remove_playlist(
514 headers=self._headers, prov_item_id=item.item_id, add=True, user=self._yt_user
515 )
516 elif item.media_type == MediaType.TRACK:
517 raise NotImplementedError
518 return result
519
520 async def library_remove(self, prov_item_id, media_type: MediaType):
521 """Remove an item from the library."""
522 result = False
523 try:
524 if media_type == MediaType.ARTIST:
525 result = await library_add_remove_artist(
526 headers=self._headers,
527 prov_artist_id=prov_item_id,
528 add=False,
529 user=self._yt_user,
530 )
531 elif media_type == MediaType.ALBUM:
532 result = await library_add_remove_album(
533 headers=self._headers, prov_item_id=prov_item_id, add=False, user=self._yt_user
534 )
535 elif media_type == MediaType.PLAYLIST:
536 result = await library_add_remove_playlist(
537 headers=self._headers, prov_item_id=prov_item_id, add=False, user=self._yt_user
538 )
539 elif media_type == MediaType.TRACK:
540 raise NotImplementedError
541 except YTMusicServerError as err:
542 # YTM raises if trying to remove an item that is not in the library
543 raise NotImplementedError(err) from err
544 return result
545
546 async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
547 """Add track(s) to playlist."""
548 # Grab the playlist id from the full url in case of personal playlists
549 if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
550 prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
551 return await add_remove_playlist_tracks(
552 headers=self._headers,
553 prov_playlist_id=prov_playlist_id,
554 prov_track_ids=prov_track_ids,
555 add=True,
556 user=self._yt_user,
557 )
558
559 async def remove_playlist_tracks(
560 self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
561 ) -> None:
562 """Remove track(s) from playlist."""
563 # Grab the full playlist by default
564 limit = None
565 # Grab the playlist id from the full url in case of personal playlists
566 if YT_PLAYLIST_ID_DELIMITER in prov_playlist_id:
567 prov_playlist_id = prov_playlist_id.split(YT_PLAYLIST_ID_DELIMITER)[0]
568 if (
569 prov_playlist_id in YT_PERSONAL_PLAYLISTS
570 and prov_playlist_id != YT_LIKED_SONGS_PLAYLIST_ID
571 ):
572 # Personal playlists are dynamic and can result in endless tracks
573 # limit to avoid memory issues
574 limit = DYNAMIC_PLAYLIST_TRACK_LIMIT
575 playlist_obj = await get_playlist(
576 prov_playlist_id=prov_playlist_id, headers=self._headers, limit=limit
577 )
578 if "tracks" not in playlist_obj:
579 return None
580 tracks_to_delete = []
581 for index, track in enumerate(playlist_obj["tracks"]):
582 if index in positions_to_remove:
583 # YT needs both the videoId and the setVideoId in order to remove
584 # the track. Thus, we need to obtain the playlist details and
585 # grab the info from there.
586 tracks_to_delete.append(
587 {"videoId": track["videoId"], "setVideoId": track["setVideoId"]}
588 )
589
590 return await add_remove_playlist_tracks(
591 headers=self._headers,
592 prov_playlist_id=prov_playlist_id,
593 prov_track_ids=tracks_to_delete,
594 add=False,
595 user=self._yt_user,
596 )
597
598 @use_cache(3600 * 24) # Cache for 1 day
599 async def get_similar_tracks(self, prov_track_id, limit=25) -> list[Track]:
600 """Retrieve a dynamic list of tracks based on the provided item."""
601 result = []
602 result = await get_song_radio_tracks(
603 headers=self._headers, prov_item_id=prov_track_id, limit=limit, user=self._yt_user
604 )
605 if "tracks" in result:
606 tracks = []
607 for track in result["tracks"]:
608 # Playlist tracks sometimes do not have a valid artist id
609 # In that case, call the API for track details based on track id
610 try:
611 track = self._parse_track(track)
612 if track:
613 tracks.append(track)
614 except InvalidDataError:
615 if track := await self.get_track(track["videoId"]):
616 tracks.append(track)
617 return tracks
618 return []
619
620 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
621 """Return the content details for the given track when it will be streamed."""
622 if media_type == MediaType.PODCAST_EPISODE:
623 item_id = item_id.split(PODCAST_EPISODE_SPLITTER)[1]
624 stream_format = await self._get_stream_format(item_id=item_id)
625 self.logger.debug("Found stream_format: %s for song %s", stream_format["format"], item_id)
626 url = stream_format["url"]
627 expiration = DEFAULT_STREAM_URL_EXPIRATION
628 if parsed := parse_qs(urlparse(url).query):
629 if expire_ts := parsed.get("expire", [None])[0]:
630 expiration = int(expire_ts) - int(time.time())
631 stream_details = StreamDetails(
632 provider=self.instance_id,
633 item_id=item_id,
634 audio_format=AudioFormat(
635 content_type=ContentType.try_parse(stream_format["audio_ext"]),
636 ),
637 stream_type=StreamType.HTTP,
638 path=url,
639 can_seek=True,
640 allow_seek=True,
641 expiration=expiration,
642 )
643 if (
644 stream_format.get("audio_channels")
645 and str(stream_format.get("audio_channels")).isdigit()
646 ):
647 stream_details.audio_format.channels = int(stream_format.get("audio_channels"))
648 if stream_format.get("asr"):
649 stream_details.audio_format.sample_rate = int(stream_format.get("asr"))
650 return stream_details
651
652 @use_cache(3600)
653 async def recommendations(self) -> list[RecommendationFolder]:
654 """Get available recommendations."""
655 recommendations = await get_home(self._headers, self.language, user=self._yt_user)
656 folders = []
657 for section in recommendations:
658 folder = RecommendationFolder(
659 name=section["title"],
660 item_id=f"{self.instance_id}_{section['title']}",
661 provider=self.instance_id,
662 icon=determine_recommendation_icon(section["title"]),
663 )
664 for recommended_item in section.get("contents", []):
665 if not recommended_item:
666 continue # yeah this seems to happen sometimes ?!
667 if recommended_item.get("videoId"):
668 # Probably a track
669 try:
670 track = self._parse_track(recommended_item)
671 folder.items.append(track)
672 except InvalidDataError:
673 self.logger.debug("Invalid track in recommendations: %s", recommended_item)
674 elif recommended_item.get("playlistId"):
675 # Probably a playlist
676 recommended_item["id"] = recommended_item["playlistId"]
677 del recommended_item["playlistId"]
678 folder.items.append(self._parse_playlist(recommended_item))
679 elif recommended_item.get("browseId"):
680 # Probably an album
681 folder.items.append(self._parse_album(recommended_item))
682 elif recommended_item.get("subscribers"):
683 # Probably artist
684 folder.items.append(self._parse_album(recommended_item))
685 else:
686 self.logger.warning(
687 "Unknown item type in recommendation folder: %s", recommended_item
688 )
689 continue
690 folders.append(folder)
691 return folders
692
693 async def _post_data(self, endpoint: str, data: dict[str, str], **kwargs):
694 """Post data to the given endpoint."""
695 url = f"{YTM_BASE_URL}{endpoint}"
696 data.update(self._context)
697 async with self.mass.http_session.post(
698 url,
699 headers=self._headers,
700 json=data,
701 ssl=False,
702 cookies=self._cookies,
703 ) as response:
704 return await response.json()
705
706 async def _get_data(self, url: str, params: dict | None = None):
707 """Get data from the given URL."""
708 async with self.mass.http_session.get(
709 url, headers=self._headers, params=params, cookies=self._cookies
710 ) as response:
711 return await response.text()
712
713 def _initialize_headers(self) -> dict[str, str]:
714 """Return headers to include in the requests."""
715 headers = {
716 "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:72.0) Gecko/20100101 Firefox/72.0", # noqa: E501
717 "Accept": "*/*",
718 "Accept-Language": "en-US,en;q=0.5",
719 "Content-Type": "application/json",
720 "X-Goog-AuthUser": "0",
721 "x-origin": YTM_DOMAIN,
722 "Cookie": self._cookie,
723 }
724 if "__Secure-3PAPISID" not in self._cookie:
725 raise LoginFailed(
726 "Invalid Cookie detected. Cookie is missing the __Secure-3PAPISID field. "
727 "Please ensure you are passing the correct cookie. "
728 "You can verify this by checking if the string "
729 "'__Secure-3PAPISID' is present in the cookie string."
730 )
731 sapisid = sapisid_from_cookie(self._cookie)
732 headers["Authorization"] = get_authorization(sapisid + " " + YTM_DOMAIN)
733 self._headers = headers
734
735 def _initialize_context(self) -> dict[str, str]:
736 """Return a dict to use as a context in requests."""
737 self._context = {
738 "context": {
739 "client": {"clientName": "WEB_REMIX", "clientVersion": "0.1"},
740 "user": {},
741 }
742 }
743
744 def _parse_album(self, album_obj: dict, album_id: str | None = None) -> Album:
745 """Parse a YT Album response to an Album model object."""
746 album_id = album_id or album_obj.get("id") or album_obj.get("browseId")
747
748 if not album_id:
749 raise InvalidDataError("Album ID is required but not found")
750
751 if "title" in album_obj:
752 name, version = parse_title_and_version(album_obj["title"])
753 elif "name" in album_obj:
754 name, version = parse_title_and_version(album_obj["name"])
755 else:
756 name, version = "", ""
757 album = Album(
758 item_id=album_id,
759 name=name,
760 version=version,
761 provider=self.instance_id,
762 provider_mappings={
763 ProviderMapping(
764 item_id=str(album_id),
765 provider_domain=self.domain,
766 provider_instance=self.instance_id,
767 url=f"{YTM_DOMAIN}/playlist?list={album_obj.get('audioPlaylistId')}",
768 )
769 },
770 favorite=album_obj.get("likeStatus", "INDIFFERENT") == "LIKE",
771 )
772 if album_obj.get("year") and album_obj["year"].isdigit():
773 album.year = album_obj["year"]
774 if "thumbnails" in album_obj:
775 album.metadata.images = UniqueList(self._parse_thumbnails(album_obj["thumbnails"]))
776 if description := album_obj.get("description"):
777 album.metadata.description = unquote(description)
778 if "isExplicit" in album_obj:
779 album.metadata.explicit = album_obj["isExplicit"]
780 if "artists" in album_obj:
781 album.artists = UniqueList(
782 [
783 self._get_artist_item_mapping(artist)
784 for artist in album_obj["artists"]
785 if artist.get("id")
786 or artist.get("channelId")
787 or artist.get("name") == "Various Artists"
788 ]
789 )
790 if "type" in album_obj:
791 if album_obj["type"] == "Single":
792 album_type = AlbumType.SINGLE
793 elif album_obj["type"] == "EP":
794 album_type = AlbumType.EP
795 elif album_obj["type"] == "Album":
796 album_type = AlbumType.ALBUM
797 else:
798 album_type = AlbumType.UNKNOWN
799 album.album_type = album_type
800
801 # Try inference - override if it finds something more specific
802 inferred_type = infer_album_type(name, version)
803 if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE):
804 album.album_type = inferred_type
805
806 return album
807
808 def _parse_artist(self, artist_obj: dict) -> Artist:
809 """Parse a YT Artist response to Artist model object."""
810 artist_id = None
811 if "channelId" in artist_obj:
812 artist_id = artist_obj["channelId"]
813 elif artist_obj.get("id"):
814 artist_id = artist_obj["id"]
815 elif artist_obj["name"] == "Various Artists":
816 artist_id = VARIOUS_ARTISTS_YTM_ID
817 if not artist_id:
818 msg = "Artist does not have a valid ID"
819 raise InvalidDataError(msg)
820 artist = Artist(
821 item_id=artist_id,
822 name=artist_obj["name"],
823 provider=self.instance_id,
824 provider_mappings={
825 ProviderMapping(
826 item_id=str(artist_id),
827 provider_domain=self.domain,
828 provider_instance=self.instance_id,
829 url=f"{YTM_DOMAIN}/channel/{artist_id}",
830 )
831 },
832 favorite=artist_obj.get("likeStatus", "INDIFFERENT") == "LIKE",
833 )
834 if "description" in artist_obj:
835 artist.metadata.description = artist_obj["description"]
836 if artist_obj.get("thumbnails"):
837 artist.metadata.images = self._parse_thumbnails(artist_obj["thumbnails"])
838 return artist
839
840 def _parse_playlist(self, playlist_obj: dict) -> Playlist:
841 """Parse a YT Playlist response to a Playlist object."""
842 playlist_id = playlist_obj["id"]
843 playlist_name = playlist_obj["title"]
844 is_editable = playlist_obj.get("privacy", "") == "PRIVATE"
845 # Playlist ID's are not unique across instances for lists like 'Likes', 'Supermix', etc.
846 # So suffix with the instance id to make them unique
847 if playlist_id in YT_PERSONAL_PLAYLISTS:
848 playlist_id = f"{playlist_id}{YT_PLAYLIST_ID_DELIMITER}{self.instance_id}"
849 playlist_name = f"{playlist_name} ({self.name})"
850 playlist = Playlist(
851 item_id=playlist_id,
852 provider=self.instance_id,
853 name=playlist_name,
854 provider_mappings={
855 ProviderMapping(
856 item_id=playlist_id,
857 provider_domain=self.domain,
858 provider_instance=self.instance_id,
859 url=f"{YTM_DOMAIN}/playlist?list={playlist_id}",
860 is_unique=is_editable, # user-owned playlists are unique
861 )
862 },
863 is_editable=is_editable,
864 favorite=playlist_obj.get("likeStatus", "INDIFFERENT") == "LIKE",
865 )
866 if "description" in playlist_obj:
867 playlist.metadata.description = playlist_obj["description"]
868 if playlist_obj.get("thumbnails"):
869 playlist.metadata.images = self._parse_thumbnails(playlist_obj["thumbnails"])
870
871 if authors := playlist_obj.get("author"):
872 if isinstance(authors, str):
873 playlist.owner = authors
874 elif isinstance(authors, list):
875 playlist.owner = authors[0]["name"]
876 else:
877 playlist.owner = authors["name"]
878 else:
879 playlist.owner = self.name
880 return playlist
881
882 def _parse_track(self, track_obj: dict, track_number: int = 0) -> Track:
883 """Parse a YT Track response to a Track model object."""
884 if not track_obj.get("videoId"):
885 msg = "Track is missing videoId"
886 raise InvalidDataError(msg)
887 track_id = str(track_obj["videoId"])
888 name, version = parse_title_and_version(track_obj["title"])
889 track = Track(
890 item_id=track_id,
891 provider=self.instance_id,
892 name=name,
893 version=version,
894 provider_mappings={
895 ProviderMapping(
896 item_id=track_id,
897 provider_domain=self.domain,
898 provider_instance=self.instance_id,
899 available=track_obj.get("isAvailable", True),
900 url=f"{YTM_DOMAIN}/watch?v={track_id}",
901 audio_format=AudioFormat(
902 content_type=ContentType.M4A,
903 ),
904 )
905 },
906 favorite=track_obj.get("likeStatus", "INDIFFERENT") == "LIKE",
907 # Disc info is not available in YTM
908 disc_number=0,
909 # Track number is "sometimes" available in the track object, otherwise approach
910 # by counting album tracks when fetching full album details
911 track_number=track_obj.get("trackNumber") or track_number or 0,
912 )
913
914 if track_obj.get("artists"):
915 track.artists = [
916 self._get_artist_item_mapping(artist)
917 for artist in track_obj["artists"]
918 if artist.get("id")
919 or artist.get("channelId")
920 or artist.get("name") == "Various Artists"
921 ]
922 # guard that track has valid artists
923 if not track.artists:
924 msg = "Track is missing artists"
925 raise InvalidDataError(msg)
926 if track_obj.get("thumbnails"):
927 track.metadata.images = self._parse_thumbnails(track_obj["thumbnails"])
928 if (
929 track_obj.get("album")
930 and isinstance(track_obj.get("album"), dict)
931 and track_obj["album"].get("id")
932 ):
933 album = track_obj["album"]
934 track.album = self._get_item_mapping(MediaType.ALBUM, album["id"], album["name"])
935 if "isExplicit" in track_obj:
936 track.metadata.explicit = track_obj["isExplicit"]
937 if "duration" in track_obj and str(track_obj["duration"]).isdigit():
938 track.duration = int(track_obj["duration"])
939 elif "duration_seconds" in track_obj and str(track_obj["duration_seconds"]).isdigit():
940 track.duration = int(track_obj["duration_seconds"])
941 return track
942
943 def _parse_podcast(self, podcast_obj: dict) -> Podcast:
944 """Parse a YTM Podcast into a MA Podcast."""
945 podcast = Podcast(
946 item_id=podcast_obj["podcastId"],
947 name=podcast_obj["title"],
948 provider=self.instance_id,
949 provider_mappings={
950 ProviderMapping(
951 item_id=podcast_obj["podcastId"],
952 provider_domain=self.domain,
953 provider_instance=self.instance_id,
954 )
955 },
956 )
957 if description := podcast_obj.get("description"):
958 podcast.metadata.description = description
959 if author := podcast_obj.get("author"):
960 podcast.publisher = author["name"]
961 if thumbnails := podcast_obj.get("thumbnails"):
962 podcast.metadata.images = self._parse_thumbnails(thumbnails)
963 return podcast
964
965 def _parse_podcast_episode(self, episode_obj: dict, podcast: Podcast | None) -> PodcastEpisode:
966 """Parse a raw episode into a PodcastEpisode."""
967 episode_id = episode_obj.get("videoId")
968 if not episode_id:
969 msg = "Podcast episode is missing videoId"
970 raise InvalidDataError(msg)
971 item_id = f"{podcast.item_id}{PODCAST_EPISODE_SPLITTER}{episode_id}"
972 episode = PodcastEpisode(
973 item_id=item_id,
974 provider=self.instance_id,
975 name=episode_obj.get("title"),
976 podcast=podcast,
977 provider_mappings={
978 ProviderMapping(
979 item_id=item_id,
980 provider_domain=self.domain,
981 provider_instance=self.instance_id,
982 audio_format=AudioFormat(
983 content_type=ContentType.M4A,
984 ),
985 url=f"{YTM_DOMAIN}/watch?v={episode_id}",
986 )
987 },
988 )
989 if duration := episode_obj.get("duration"):
990 duration_sec = parse_str_duration(duration)
991 episode.duration = duration_sec
992 if description := episode_obj.get("description"):
993 episode.metadata.description = description
994 if thumbnails := episode_obj.get("thumbnails"):
995 episode.metadata.images = self._parse_thumbnails(thumbnails)
996 if release_date := episode_obj.get("date"):
997 with suppress(ValueError):
998 episode.metadata.release_date = datetime.fromisoformat(release_date)
999 return episode
1000
1001 async def _get_stream_format(self, item_id: str) -> dict[str, Any]:
1002 """Figure out the stream URL to use and return the highest quality."""
1003
1004 def _extract_best_stream_url_format() -> dict[str, Any]:
1005 if self._yt_dlp_module is None:
1006 self._yt_dlp_module = importlib.import_module("yt_dlp")
1007 yt_dlp = self._yt_dlp_module
1008 url = f"{YTM_DOMAIN}/watch?v={item_id}"
1009 ydl_opts = {
1010 "quiet": self.logger.level > logging.DEBUG,
1011 "verbose": self.logger.level == VERBOSE_LOG_LEVEL,
1012 "cookiefile": StringIO(self._netscape_cookie),
1013 # This enforces a player client and skips unnecessary scraping to increase speed
1014 "extractor_args": {
1015 "youtubepot-bgutilhttp": {
1016 "base_url": [self._po_token_server_url],
1017 # Disable new PO Token server behavior. Disable after this issue is fixed:
1018 # https://github.com/Brainicism/bgutil-ytdlp-pot-provider/issues/138
1019 "disable_innertube": "1",
1020 },
1021 "youtube": {
1022 "skip": ["translated_subs", "dash"],
1023 "player_client": ["web_music"],
1024 "player_skip": ["webpage"],
1025 },
1026 },
1027 }
1028 with yt_dlp.YoutubeDL(ydl_opts) as ydl:
1029 try:
1030 info = ydl.extract_info(url, download=False)
1031 except yt_dlp.utils.DownloadError as err:
1032 raise UnplayableMediaError(err) from err
1033 format_selector = ydl.build_format_selector("m4a/bestaudio")
1034 if not (stream_format := next(format_selector({"formats": info["formats"]})), None):
1035 raise UnplayableMediaError("No stream formats found")
1036 return stream_format
1037
1038 return await asyncio.to_thread(_extract_best_stream_url_format)
1039
1040 def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
1041 return ItemMapping(
1042 media_type=media_type,
1043 item_id=key,
1044 provider=self.instance_id,
1045 name=name,
1046 )
1047
1048 def _get_artist_item_mapping(self, artist_obj: dict) -> ItemMapping:
1049 artist_id = artist_obj.get("id") or artist_obj.get("channelId")
1050 if not artist_id and artist_obj["name"] == "Various Artists":
1051 artist_id = VARIOUS_ARTISTS_YTM_ID
1052 return self._get_item_mapping(MediaType.ARTIST, artist_id, artist_obj.get("name"))
1053
1054 async def _verify_po_token_url(self) -> bool:
1055 """Ping the PO Token server and verify the response."""
1056 url = f"{self._po_token_server_url}/ping"
1057 try:
1058 async with self.mass.http_session.get(url) as response:
1059 response.raise_for_status()
1060 self.logger.debug("PO Token server responded with %s", response.status)
1061 return response.status == 200
1062 except ClientConnectorError:
1063 return False
1064
1065 async def _user_has_ytm_premium(self) -> bool:
1066 """Check if the user has Youtube Music Premium."""
1067 stream_format = await self._get_stream_format(YTM_PREMIUM_CHECK_TRACK_ID)
1068 # Only premium users can stream the HQ stream of this song
1069 return stream_format["format_id"] == "141"
1070
1071 def _parse_thumbnails(self, thumbnails_obj: dict) -> list[MediaItemImage]:
1072 """Parse and YTM thumbnails to MediaItemImage."""
1073 result: list[MediaItemImage] = []
1074 processed_images = set()
1075 for img in sorted(thumbnails_obj, key=lambda w: w.get("width", 0), reverse=True):
1076 url: str = img["url"]
1077 url_base = url.split("=w")[0]
1078 width: int = img["width"]
1079 height: int = img["height"]
1080 image_ratio: float = width / height
1081 image_type = (
1082 ImageType.LANDSCAPE
1083 if "maxresdefault" in url or image_ratio > 2.0
1084 else ImageType.THUMB
1085 )
1086 if "=w" not in url and width < 500:
1087 continue
1088 # if the size is in the url, we can actually request a higher thumb
1089 if "=w" in url and width < 600:
1090 url = f"{url_base}=w600-h600-p"
1091 image_type = ImageType.THUMB
1092 if (url_base, image_type) in processed_images:
1093 continue
1094 processed_images.add((url_base, image_type))
1095 result.append(
1096 MediaItemImage(
1097 type=image_type,
1098 path=url,
1099 provider=self.instance_id,
1100 remotely_accessible=True,
1101 )
1102 )
1103 return result
1104
1105 async def _install_packages(self) -> None:
1106 """Install frequently changing packages dynamically."""
1107 # NOTE: Google breaks things quite often which requires us to update
1108 # some packages very frequently. Installing them dynamically prevents
1109 # us from having to update MA to ensure this provider works.
1110 for package_name in PACKAGES_TO_INSTALL:
1111 await install_package(package_name)
1112 # verify if the yt_dlp package is usable
1113 try:
1114 await asyncio.to_thread(importlib.import_module, "yt_dlp")
1115 except ImportError:
1116 raise SetupFailedError("Package yt_dlp failed to install")
1117