music-assistant-server

15 KBPY
helpers.py
15 KB407 lines • python
1"""Helper module for parsing the Youtube Music API.
2
3This helpers file is an async wrapper around the excellent ytmusicapi package.
4While the ytmusicapi package does an excellent job at parsing the Youtube Music results,
5it is unfortunately not async, which is required for Music Assistant to run smoothly.
6This also nicely separates the parsing logic from the Youtube Music provider logic.
7"""
8
9import asyncio
10from http.cookies import SimpleCookie
11from time import time
12
13import ytmusicapi
14
15from music_assistant.providers.ytmusic.constants import YTMRecommendationIcons
16
17
18async def get_artist(
19    prov_artist_id: str, headers: dict[str, str], language: str = "en"
20) -> dict[str, str]:
21    """Async wrapper around the ytmusicapi get_artist function."""
22
23    def _get_artist():
24        ytm = ytmusicapi.YTMusic(auth=headers, language=language)
25        try:
26            artist = ytm.get_artist(channelId=prov_artist_id)
27            # ChannelId can sometimes be different and original ID is not part of the response
28            artist["channelId"] = prov_artist_id
29        except KeyError:
30            try:
31                user = ytm.get_user(channelId=prov_artist_id)
32                artist = {"channelId": prov_artist_id, "name": user["name"]}
33            except KeyError:
34                artist = {"channelId": prov_artist_id, "name": "Unknown"}
35        return artist
36
37    return await asyncio.to_thread(_get_artist)
38
39
40async def get_album(prov_album_id: str, language: str = "en") -> dict[str, str]:
41    """Async wrapper around the ytmusicapi get_album function."""
42
43    def _get_album():
44        ytm = ytmusicapi.YTMusic(language=language)
45        album = ytm.get_album(browseId=prov_album_id)
46        if "audioPlaylistId" in album:
47            # Track id's from album tracks do not match with actual album tracks. E.g. a track
48            # points to the videoId of the original version, while we want the album version
49            album_playlist = ytm.get_playlist(playlistId=album["audioPlaylistId"], limit=None)
50            # Do some basic checks
51            if len(album_playlist.get("tracks", [])) != len(album.get("tracks", [])):
52                return album
53            # Move the correct track info to the album tracks
54            playlist_tracks_by_title = {t.get("title"): t for t in album_playlist.get("tracks", [])}
55            for album_track in album.get("tracks", []):
56                if playlist_track := playlist_tracks_by_title.get(album_track.get("title")):
57                    album_track["videoId"] = playlist_track["videoId"]
58                    album_track["isAvailable"] = playlist_track.get("isAvailable", True)
59                    album_track["likeStatus"] = playlist_track.get("likeStatus", "INDIFFERENT")
60            return album
61        return ytm.get_album(browseId=prov_album_id)
62
63    return await asyncio.to_thread(_get_album)
64
65
66async def get_playlist(
67    prov_playlist_id: str,
68    headers: dict[str, str],
69    language: str = "en",
70    user: str | None = None,
71    limit=None,
72) -> dict[str, str]:
73    """Async wrapper around the ytmusicapi get_playlist function."""
74
75    def _get_playlist():
76        ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
77        playlist = ytm.get_playlist(playlistId=prov_playlist_id, limit=limit)
78        playlist["checksum"] = get_playlist_checksum(playlist)
79        # Fix missing playlist id in some edge cases
80        playlist["id"] = prov_playlist_id if not playlist.get("id") else playlist["id"]
81        return playlist
82
83    return await asyncio.to_thread(_get_playlist)
84
85
86async def get_track(
87    prov_track_id: str, headers: dict[str, str], language: str = "en"
88) -> dict[str, str] | None:
89    """Async wrapper around the ytmusicapi get_playlist function."""
90
91    def _get_song():
92        ytm = ytmusicapi.YTMusic(auth=headers, language=language)
93        track_obj = ytm.get_song(videoId=prov_track_id)
94        track = {}
95        if "videoDetails" not in track_obj:
96            # video that no longer exists
97            return None
98        track["videoId"] = track_obj["videoDetails"]["videoId"]
99        track["title"] = track_obj["videoDetails"]["title"]
100        track["artists"] = [
101            {
102                "channelId": track_obj["videoDetails"]["channelId"],
103                "name": track_obj["videoDetails"]["author"],
104            }
105        ]
106        track["duration"] = track_obj["videoDetails"]["lengthSeconds"]
107        track["thumbnails"] = track_obj["microformat"]["microformatDataRenderer"]["thumbnail"][
108            "thumbnails"
109        ]
110        if track_thumbs := track_obj["videoDetails"].get("thumbnail", {}).get("thumbnails"):
111            track["thumbnails"] = track.get("thumbnails", []) + track_thumbs
112        track["isAvailable"] = track_obj["playabilityStatus"]["status"] == "OK"
113        return track
114
115    return await asyncio.to_thread(_get_song)
116
117
118async def get_podcast(
119    prov_podcast_id: str, headers: dict[str, str], language: str = "en"
120) -> dict[str, str] | None:
121    """Async wrapper around the get_podcast function."""
122
123    def _get_podcast():
124        ytm = ytmusicapi.YTMusic(auth=headers, language=language)
125        podcast_obj = ytm.get_podcast(playlistId=prov_podcast_id)
126        if "podcastId" not in podcast_obj:
127            podcast_obj["podcastId"] = prov_podcast_id
128        return podcast_obj
129
130    return await asyncio.to_thread(_get_podcast)
131
132
133async def get_podcast_episode(
134    prov_episode_id: str, headers: dict[str, str], language: str = "en"
135) -> dict[str, str] | None:
136    """Async wrapper around the podcast episode function."""
137
138    def _get_podcast_episode():
139        ytm = ytmusicapi.YTMusic(auth=headers, language=language)
140        episode = ytm.get_episode(videoId=prov_episode_id)
141        if "videoId" not in episode:
142            episode["videoId"] = prov_episode_id
143        return episode
144
145    return await asyncio.to_thread(_get_podcast_episode)
146
147
148async def get_library_artists(
149    headers: dict[str, str], language: str = "en", user: str | None = None
150) -> dict[str, str]:
151    """Async wrapper around the ytmusicapi get_library_artists function."""
152
153    def _get_library_artists():
154        ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
155        artists = ytm.get_library_subscriptions(limit=9999)
156        # Sync properties with uniformal artist object
157        for artist in artists:
158            artist["id"] = artist["browseId"]
159            artist["name"] = artist["artist"]
160            del artist["browseId"]
161            del artist["artist"]
162        return artists
163
164    return await asyncio.to_thread(_get_library_artists)
165
166
167async def get_library_albums(
168    headers: dict[str, str], language: str = "en", user: str | None = None
169) -> dict[str, str]:
170    """Async wrapper around the ytmusicapi get_library_albums function."""
171
172    def _get_library_albums():
173        ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
174        return ytm.get_library_albums(limit=9999)
175
176    return await asyncio.to_thread(_get_library_albums)
177
178
179async def get_library_playlists(
180    headers: dict[str, str], language: str = "en", user: str | None = None
181) -> dict[str, str]:
182    """Async wrapper around the ytmusicapi get_library_playlists function."""
183
184    def _get_library_playlists():
185        ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
186        playlists = ytm.get_library_playlists(limit=9999)
187        # Sync properties with uniformal playlist object
188        for playlist in playlists:
189            playlist["id"] = playlist["playlistId"]
190            del playlist["playlistId"]
191            playlist["checksum"] = get_playlist_checksum(playlist)
192        return playlists
193
194    return await asyncio.to_thread(_get_library_playlists)
195
196
197async def get_library_tracks(
198    headers: dict[str, str], language: str = "en", user: str | None = None
199) -> dict[str, str]:
200    """Async wrapper around the ytmusicapi get_library_tracks function."""
201
202    def _get_library_tracks():
203        ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
204        return ytm.get_library_songs(limit=9999)
205
206    return await asyncio.to_thread(_get_library_tracks)
207
208
209async def get_library_podcasts(
210    headers: dict[str, str], language: str = "en", user: str | None = None
211) -> dict[str, str]:
212    """Async wrapper around the ytmusic api get_library_podcasts function."""
213
214    def _get_library_podcasts():
215        ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
216        return ytm.get_library_podcasts(limit=None)
217
218    return await asyncio.to_thread(_get_library_podcasts)
219
220
221async def library_add_remove_artist(
222    headers: dict[str, str], prov_artist_id: str, add: bool = True, user: str | None = None
223) -> bool:
224    """Add or remove an artist to the user's library."""
225
226    def _library_add_remove_artist():
227        ytm = ytmusicapi.YTMusic(auth=headers, user=user)
228        if add:
229            return "actions" in ytm.subscribe_artists(channelIds=[prov_artist_id])
230        if not add:
231            return "actions" in ytm.unsubscribe_artists(channelIds=[prov_artist_id])
232        return None
233
234    return await asyncio.to_thread(_library_add_remove_artist)
235
236
237async def library_add_remove_album(
238    headers: dict[str, str], prov_item_id: str, add: bool = True, user: str | None = None
239) -> bool:
240    """Add or remove an album or playlist to the user's library."""
241    album = await get_album(prov_album_id=prov_item_id)
242
243    def _library_add_remove_album():
244        ytm = ytmusicapi.YTMusic(auth=headers, user=user)
245        playlist_id = album["audioPlaylistId"]
246        if add:
247            return ytm.rate_playlist(playlist_id, "LIKE")
248        if not add:
249            return ytm.rate_playlist(playlist_id, "INDIFFERENT")
250        return None
251
252    return await asyncio.to_thread(_library_add_remove_album)
253
254
255async def library_add_remove_playlist(
256    headers: dict[str, str], prov_item_id: str, add: bool = True, user: str | None = None
257) -> bool:
258    """Add or remove an album or playlist to the user's library."""
259
260    def _library_add_remove_playlist():
261        ytm = ytmusicapi.YTMusic(auth=headers, user=user)
262        if add:
263            return "actions" in ytm.rate_playlist(prov_item_id, "LIKE")
264        if not add:
265            return "actions" in ytm.rate_playlist(prov_item_id, "INDIFFERENT")
266        return None
267
268    return await asyncio.to_thread(_library_add_remove_playlist)
269
270
271async def add_remove_playlist_tracks(
272    headers: dict[str, str],
273    prov_playlist_id: str,
274    prov_track_ids: list[str],
275    add: bool,
276    user: str | None = None,
277) -> bool:
278    """Async wrapper around adding/removing tracks to a playlist."""
279
280    def _add_playlist_tracks():
281        ytm = ytmusicapi.YTMusic(auth=headers, user=user)
282        if add:
283            return ytm.add_playlist_items(playlistId=prov_playlist_id, videoIds=prov_track_ids)
284        if not add:
285            return ytm.remove_playlist_items(playlistId=prov_playlist_id, videos=prov_track_ids)
286        return None
287
288    return await asyncio.to_thread(_add_playlist_tracks)
289
290
291async def get_song_radio_tracks(
292    headers: dict[str, str], prov_item_id: str, limit=25, user: str | None = None
293) -> dict[str, str]:
294    """Async wrapper around the ytmusicapi radio function."""
295
296    def _get_song_radio_tracks():
297        ytm = ytmusicapi.YTMusic(auth=headers, user=user)
298        playlist_id = f"RDAMVM{prov_item_id}"
299        result = ytm.get_watch_playlist(
300            videoId=prov_item_id, playlistId=playlist_id, limit=limit, radio=True
301        )
302        # Replace inconsistensies for easier parsing
303        for track in result["tracks"]:
304            if track.get("thumbnail"):
305                track["thumbnails"] = track["thumbnail"]
306                del track["thumbnail"]
307            if track.get("length"):
308                track["duration"] = get_sec(track["length"])
309        return result
310
311    return await asyncio.to_thread(_get_song_radio_tracks)
312
313
314async def search(
315    query: str, ytm_filter: str | None = None, limit: int = 20, language: str = "en"
316) -> list[dict]:
317    """Async wrapper around the ytmusicapi search function."""
318
319    def _search():
320        ytm = ytmusicapi.YTMusic(language=language)
321        results = ytm.search(query=query, filter=ytm_filter, limit=limit)
322        # Sync result properties with uniformal objects
323        for result in results:
324            if result["resultType"] == "artist":
325                if "artists" in result and len(result["artists"]) > 0:
326                    result["id"] = result["artists"][0]["id"]
327                    result["name"] = result["artists"][0]["name"]
328                    del result["artists"]
329                else:
330                    result["id"] = result["browseId"]
331                    result["name"] = result["artist"]
332                    del result["browseId"]
333                    del result["artist"]
334            elif result["resultType"] == "playlist":
335                if "playlistId" in result:
336                    result["id"] = result["playlistId"]
337                    del result["playlistId"]
338                elif "browseId" in result:
339                    result["id"] = result["browseId"]
340                    del result["browseId"]
341        return results[:limit]
342
343    return await asyncio.to_thread(_search)
344
345
346def get_playlist_checksum(playlist_obj: dict) -> str:
347    """Try to calculate a checksum so we can detect changes in a playlist."""
348    for key in ("duration_seconds", "trackCount", "count"):
349        if key in playlist_obj:
350            return playlist_obj[key]
351    return str(int(time()))
352
353
354def is_brand_account(username: str) -> bool:
355    """Check if the provided username is a brand-account."""
356    return len(username) == 21 and username.isdigit()
357
358
359def get_sec(time_str):
360    """Get seconds from time."""
361    parts = time_str.split(":")
362    if len(parts) == 3:
363        return int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
364    if len(parts) == 2:
365        return int(parts[0]) * 60 + int(parts[1])
366    return 0
367
368
369def convert_to_netscape(raw_cookie_str: str, domain: str) -> str:
370    """Convert a raw cookie into Netscape format, so yt-dl can use it."""
371    domain = domain.replace("https://", "")
372    cookie = SimpleCookie()
373    cookie.load(rawdata=raw_cookie_str)
374    netscape_cookie = "# Netscape HTTP Cookie File\n"
375    for morsel in cookie.values():
376        netscape_cookie += f"{domain}\tTRUE\t/\tTRUE\t0\t{morsel.key}\t{morsel.value}\n"
377    return netscape_cookie
378
379
380async def get_home(
381    headers: dict[str, str], language: str = "en", user: str | None = None, limit: int = 3
382) -> dict[str, str]:
383    """Get the recommendations from the home page."""
384
385    def _get_home():
386        ytm = ytmusicapi.YTMusic(auth=headers, language=language, user=user)
387        return ytm.get_home(limit=limit)
388
389    return await asyncio.to_thread(_get_home)
390
391
392def determine_recommendation_icon(name: str) -> str:
393    """Determine the icon for a recommendation based on its name."""
394    query = name.lower()
395
396    if "listen again" in query:
397        return YTMRecommendationIcons.LISTEN_AGAIN
398    if "continue" in query:
399        return YTMRecommendationIcons.CONTINUE_WATCHING
400    if "your mix" in query:
401        return YTMRecommendationIcons.YOUR_MIX
402    if "new" in query:
403        return YTMRecommendationIcons.NEW_RELEASES
404    if "recommended" in query:
405        return YTMRecommendationIcons.RECOMMENDED
406    return YTMRecommendationIcons.DEFAULT
407