music-assistant-server

17.2 KBPY
__init__.py
17.2 KB453 lines • python
1"""The Musicbrainz Metadata provider for Music Assistant.
2
3At this time only used for retrieval of ID's but to be expanded to fetch metadata too.
4"""
5
6from __future__ import annotations
7
8import re
9from contextlib import suppress
10from dataclasses import dataclass, field
11from typing import TYPE_CHECKING, Any, cast
12
13from mashumaro import DataClassDictMixin
14from mashumaro.exceptions import MissingField
15from music_assistant_models.enums import ExternalID, ProviderFeature
16from music_assistant_models.errors import InvalidDataError, ResourceTemporarilyUnavailable
17
18from music_assistant.controllers.cache import use_cache
19from music_assistant.helpers.compare import compare_strings
20from music_assistant.helpers.json import json_loads
21from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
22from music_assistant.helpers.util import parse_title_and_version
23from music_assistant.models.metadata_provider import MetadataProvider
24
25if TYPE_CHECKING:
26    from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
27    from music_assistant_models.media_items import Album, Track
28    from music_assistant_models.provider import ProviderManifest
29
30    from music_assistant.mass import MusicAssistant
31    from music_assistant.models import ProviderInstanceType
32
33
34LUCENE_SPECIAL = r'([+\-&|!(){}\[\]\^"~*?:\\\/])'
35
36SUPPORTED_FEATURES: set[ProviderFeature] = (
37    set()
38)  # we don't have any special supported features (yet)
39
40
41async def setup(
42    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
43) -> ProviderInstanceType:
44    """Initialize provider(instance) with given configuration."""
45    return MusicbrainzProvider(mass, manifest, config, SUPPORTED_FEATURES)
46
47
48async def get_config_entries(
49    mass: MusicAssistant,
50    instance_id: str | None = None,
51    action: str | None = None,
52    values: dict[str, ConfigValueType] | None = None,
53) -> tuple[ConfigEntry, ...]:
54    """
55    Return Config entries to setup this provider.
56
57    instance_id: id of an existing provider instance (None if new instance setup).
58    action: [optional] action key called from config entries UI.
59    values: the (intermediate) raw values for config entries sent with the action.
60    """
61    # ruff: noqa: ARG001
62    return ()  # we do not have any config entries (yet)
63
64
65def replace_hyphens(
66    data: dict[str, Any] | list[dict[str, Any]] | Any,
67) -> dict[str, Any] | list[dict[str, Any]] | Any:
68    """Change all hyphened keys to underscores."""
69    if isinstance(data, dict):
70        return {key.replace("-", "_"): replace_hyphens(value) for key, value in data.items()}
71
72    if isinstance(data, list):
73        return [replace_hyphens(x) for x in data]
74
75    return data
76
77
78@dataclass
79class MusicBrainzTag(DataClassDictMixin):
80    """Model for a (basic) Tag object as received from the MusicBrainz API."""
81
82    count: int
83    name: str
84
85
86@dataclass
87class MusicBrainzAlias(DataClassDictMixin):
88    """Model for a (basic) Alias object from MusicBrainz."""
89
90    name: str
91    sort_name: str
92
93    # optional fields
94    locale: str | None = None
95    type: str | None = None
96    primary: bool | None = None
97    begin_date: str | None = None
98    end_date: str | None = None
99
100
101@dataclass
102class MusicBrainzArtist(DataClassDictMixin):
103    """Model for a (basic) Artist object from MusicBrainz."""
104
105    id: str
106    name: str
107    sort_name: str
108
109    # optional fields
110    aliases: list[MusicBrainzAlias] | None = None
111    tags: list[MusicBrainzTag] | None = None
112
113    @classmethod
114    def from_raw(cls, data: Any) -> MusicBrainzArtist:
115        """Instantiate object from raw api data."""
116        alt_data = replace_hyphens(data)
117        if TYPE_CHECKING:
118            alt_data = cast("dict[str, Any]", alt_data)
119        return MusicBrainzArtist.from_dict(alt_data)
120
121
122@dataclass
123class MusicBrainzArtistCredit(DataClassDictMixin):
124    """Model for a (basic) ArtistCredit object from MusicBrainz."""
125
126    name: str
127    artist: MusicBrainzArtist
128
129
130@dataclass
131class MusicBrainzReleaseGroup(DataClassDictMixin):
132    """Model for a (basic) ReleaseGroup object from MusicBrainz."""
133
134    id: str
135    title: str
136
137    # optional fields
138    primary_type: str | None = None
139    primary_type_id: str | None = None
140    secondary_types: list[str] | None = None
141    secondary_type_ids: list[str] | None = None
142    artist_credit: list[MusicBrainzArtistCredit] | None = None
143
144    @classmethod
145    def from_raw(cls, data: Any) -> MusicBrainzReleaseGroup:
146        """Instantiate object from raw api data."""
147        alt_data = replace_hyphens(data)
148        if TYPE_CHECKING:
149            alt_data = cast("dict[str, Any]", alt_data)
150        return MusicBrainzReleaseGroup.from_dict(alt_data)
151
152
153@dataclass
154class MusicBrainzTrack(DataClassDictMixin):
155    """Model for a (basic) Track object from MusicBrainz."""
156
157    id: str
158    number: str
159    title: str
160    length: int | None = None
161
162    @classmethod
163    def from_raw(cls, data: Any) -> MusicBrainzTrack:
164        """Instantiate object from raw api data."""
165        alt_data = replace_hyphens(data)
166        if TYPE_CHECKING:
167            alt_data = cast("dict[str, Any]", alt_data)
168        return MusicBrainzTrack.from_dict(alt_data)
169
170
171@dataclass
172class MusicBrainzMedia(DataClassDictMixin):
173    """Model for a (basic) Media object from MusicBrainz."""
174
175    format: str
176    track: list[MusicBrainzTrack]
177    position: int = 0
178    track_count: int = 0
179    track_offset: int = 0
180
181
182@dataclass
183class MusicBrainzRelease(DataClassDictMixin):
184    """Model for a (basic) Release object from MusicBrainz."""
185
186    id: str
187    status_id: str
188    count: int
189    title: str
190    status: str
191    artist_credit: list[MusicBrainzArtistCredit]
192    release_group: MusicBrainzReleaseGroup
193    track_count: int = 0
194
195    # optional fields
196    media: list[MusicBrainzMedia] = field(default_factory=list)
197    date: str | None = None
198    country: str | None = None
199    disambiguation: str | None = None  # version
200    # TODO (if needed): release-events
201
202    @classmethod
203    def from_raw(cls, data: Any) -> MusicBrainzRelease:
204        """Instantiate object from raw api data."""
205        alt_data = replace_hyphens(data)
206        if TYPE_CHECKING:
207            alt_data = cast("dict[str, Any]", alt_data)
208        return MusicBrainzRelease.from_dict(alt_data)
209
210
211@dataclass
212class MusicBrainzRecording(DataClassDictMixin):
213    """Model for a (basic) Recording object as received from the MusicBrainz API."""
214
215    id: str
216    title: str
217    artist_credit: list[MusicBrainzArtistCredit] = field(default_factory=list)
218    # optional fields
219    length: int | None = None
220    first_release_date: str | None = None
221    isrcs: list[str] | None = None
222    tags: list[MusicBrainzTag] | None = None
223    disambiguation: str | None = None  # version (e.g. live, karaoke etc.)
224
225    @classmethod
226    def from_raw(cls, data: Any) -> MusicBrainzRecording:
227        """Instantiate object from raw api data."""
228        alt_data = replace_hyphens(data)
229        if TYPE_CHECKING:
230            alt_data = cast("dict[str, Any]", alt_data)
231        return MusicBrainzRecording.from_dict(alt_data)
232
233
234class MusicbrainzProvider(MetadataProvider):
235    """The Musicbrainz Metadata provider."""
236
237    throttler = ThrottlerManager(rate_limit=5, period=1)
238
239    async def handle_async_init(self) -> None:
240        """Handle async initialization of the provider."""
241        self.cache = self.mass.cache
242
243    async def search(
244        self, artistname: str, albumname: str, trackname: str, trackversion: str | None = None
245    ) -> tuple[MusicBrainzArtist, MusicBrainzReleaseGroup, MusicBrainzRecording] | None:
246        """
247        Search MusicBrainz details by providing the artist, album and track name.
248
249        NOTE: The MusicBrainz objects returned are simplified objects without the optional data.
250        """
251        trackname, trackversion = parse_title_and_version(trackname, trackversion)
252        searchartist = re.sub(LUCENE_SPECIAL, r"\\\1", artistname)
253        searchalbum = re.sub(LUCENE_SPECIAL, r"\\\1", albumname)
254        searchtracks: list[str] = []
255        if trackversion:
256            searchtracks.append(f"{trackname} ({trackversion})")
257        searchtracks.append(trackname)
258        # the version is sometimes appended to the title and sometimes stored
259        # in disambiguation, so we try both
260        for strict in (True, False):
261            for searchtrack in searchtracks:
262                searchstr = re.sub(LUCENE_SPECIAL, r"\\\1", searchtrack)
263                result = await self.get_data(
264                    "recording",
265                    query=f'"{searchstr}" AND artist:"{searchartist}" AND release:"{searchalbum}"',
266                )
267                if not result or "recordings" not in result:
268                    continue
269                for item in result["recordings"]:
270                    # compare track title
271                    if not compare_strings(item["title"], searchtrack, strict):
272                        continue
273                    # compare track version if needed
274                    if (
275                        trackversion
276                        and trackversion not in searchtrack
277                        and not compare_strings(item.get("disambiguation"), trackversion, strict)
278                    ):
279                        continue
280                    # match (primary) track artist
281                    artist_match: MusicBrainzArtist | None = None
282                    for artist in item["artist-credit"]:
283                        if compare_strings(artist["artist"]["name"], artistname, strict):
284                            artist_match = MusicBrainzArtist.from_raw(artist["artist"])
285                        else:
286                            for alias in artist["artist"].get("aliases", []):
287                                if compare_strings(alias["name"], artistname, strict):
288                                    artist_match = MusicBrainzArtist.from_raw(artist["artist"])
289                    if not artist_match:
290                        continue
291                    # match album/release
292                    album_match: MusicBrainzReleaseGroup | None = None
293                    for release in item["releases"]:
294                        if compare_strings(release["title"], albumname, strict) or compare_strings(
295                            release["release-group"]["title"], albumname, strict
296                        ):
297                            album_match = MusicBrainzReleaseGroup.from_raw(release["release-group"])
298                            break
299                    else:
300                        continue
301                    # if we reach this point, we got a match on recording,
302                    # artist and release(group)
303                    recording = MusicBrainzRecording.from_raw(item)
304                    return (artist_match, album_match, recording)
305
306        return None
307
308    async def get_artist_details(self, artist_id: str) -> MusicBrainzArtist:
309        """Get (full) Artist details by providing a MusicBrainz artist id."""
310        endpoint = (
311            f"artist/{artist_id}?inc=aliases+annotation+tags+ratings+genres+url-rels+work-rels"
312        )
313        if result := await self.get_data(endpoint):
314            if "id" not in result:
315                result["id"] = artist_id
316            # TODO: Parse all the optional data like relations and such
317            try:
318                return MusicBrainzArtist.from_raw(result)
319            except MissingField as err:
320                raise InvalidDataError from err
321        msg = "Invalid MusicBrainz Artist ID provided"
322        raise InvalidDataError(msg)
323
324    async def get_recording_details(self, recording_id: str) -> MusicBrainzRecording:
325        """Get Recording details by providing a MusicBrainz Recording Id."""
326        if result := await self.get_data(f"recording/{recording_id}?inc=artists+releases"):
327            if "id" not in result:
328                result["id"] = recording_id
329            try:
330                return MusicBrainzRecording.from_raw(result)
331            except MissingField as err:
332                raise InvalidDataError from err
333        msg = "Invalid MusicBrainz recording ID provided"
334        raise InvalidDataError(msg)
335
336    async def get_release_details(self, album_id: str) -> MusicBrainzRelease:
337        """Get Release/Album details by providing a MusicBrainz Album id."""
338        endpoint = f"release/{album_id}?inc=artist-credits+aliases+labels"
339        if result := await self.get_data(endpoint):
340            if "id" not in result:
341                result["id"] = album_id
342            try:
343                return MusicBrainzRelease.from_raw(result)
344            except MissingField as err:
345                raise InvalidDataError from err
346        msg = "Invalid MusicBrainz Album ID provided"
347        raise InvalidDataError(msg)
348
349    async def get_releasegroup_details(self, releasegroup_id: str) -> MusicBrainzReleaseGroup:
350        """Get ReleaseGroup details by providing a MusicBrainz ReleaseGroup id."""
351        endpoint = f"release-group/{releasegroup_id}?inc=artists+aliases"
352        if result := await self.get_data(endpoint):
353            if "id" not in result:
354                result["id"] = releasegroup_id
355            try:
356                return MusicBrainzReleaseGroup.from_raw(result)
357            except MissingField as err:
358                raise InvalidDataError from err
359        msg = "Invalid MusicBrainz ReleaseGroup ID provided"
360        raise InvalidDataError(msg)
361
362    async def get_artist_details_by_album(
363        self, artistname: str, ref_album: Album
364    ) -> MusicBrainzArtist | None:
365        """
366        Get musicbrainz artist details by providing the artist name and a reference album.
367
368        MusicBrainzArtist object that is returned does not contain the optional data.
369        """
370        result: MusicBrainzRelease | MusicBrainzReleaseGroup | None = None
371        if mb_id := ref_album.get_external_id(ExternalID.MB_RELEASEGROUP):
372            with suppress(InvalidDataError):
373                result = await self.get_releasegroup_details(mb_id)
374        elif mb_id := ref_album.get_external_id(ExternalID.MB_ALBUM):
375            with suppress(InvalidDataError):
376                result = await self.get_release_details(mb_id)
377        else:
378            return None
379        if not (result and result.artist_credit):
380            return None
381        for strict in (True, False):
382            for artist_credit in result.artist_credit:
383                if compare_strings(artist_credit.artist.name, artistname, strict):
384                    return artist_credit.artist
385                for alias in artist_credit.artist.aliases or []:
386                    if compare_strings(alias.name, artistname, strict):
387                        return artist_credit.artist
388        return None
389
390    async def get_artist_details_by_track(
391        self, artistname: str, ref_track: Track
392    ) -> MusicBrainzArtist | None:
393        """
394        Get musicbrainz artist details by providing the artist name and a reference track.
395
396        MusicBrainzArtist object that is returned does not contain the optional data.
397        """
398        if not ref_track.mbid:
399            return None
400        result = None
401        with suppress(InvalidDataError):
402            result = await self.get_recording_details(ref_track.mbid)
403        if not (result and result.artist_credit):
404            return None
405        for strict in (True, False):
406            for artist_credit in result.artist_credit:
407                if compare_strings(artist_credit.artist.name, artistname, strict):
408                    return artist_credit.artist
409                for alias in artist_credit.artist.aliases or []:
410                    if compare_strings(alias.name, artistname, strict):
411                        return artist_credit.artist
412        return None
413
414    async def get_artist_details_by_resource_url(
415        self, resource_url: str
416    ) -> MusicBrainzArtist | None:
417        """
418        Get musicbrainz artist details by providing a resource URL (e.g. Spotify share URL).
419
420        MusicBrainzArtist object that is returned does not contain the optional data.
421        """
422        if result := await self.get_data("url", resource=resource_url, inc="artist-rels"):
423            for relation in result.get("relations", []):
424                if not (artist := relation.get("artist")):
425                    continue
426                return MusicBrainzArtist.from_raw(artist)
427        return None
428
429    @use_cache(86400 * 30)  # Cache for 30 days
430    @throttle_with_retries
431    async def get_data(self, endpoint: str, **kwargs: str) -> Any:
432        """Get data from api."""
433        url = f"https://musicbrainz-mirror.music-assistant.io/ws/2/{endpoint}"
434        headers = {
435            "User-Agent": f"Music Assistant/{self.mass.version} (https://music-assistant.io)"
436        }
437        kwargs["fmt"] = "json"
438        async with (
439            self.mass.http_session.get(url, headers=headers, params=kwargs) as response,
440        ):
441            # handle rate limiter
442            if response.status == 429:
443                backoff_time = int(response.headers.get("Retry-After", 0))
444                raise ResourceTemporarilyUnavailable("Rate Limiter", backoff_time=backoff_time)
445            # handle temporary server error
446            if response.status in (502, 503):
447                raise ResourceTemporarilyUnavailable(backoff_time=30)
448            # handle 404 not found
449            if response.status in (400, 401, 404):
450                return None
451            response.raise_for_status()
452            return await response.json(loads=json_loads)
453