music-assistant-server

23.6 KBPY
artists.py
23.6 KB535 lines • python
1"""Manage MediaItems of type Artist."""
2
3from __future__ import annotations
4
5import asyncio
6import contextlib
7from typing import TYPE_CHECKING, Any, cast
8
9from music_assistant_models.enums import AlbumType, MediaType, ProviderFeature
10from music_assistant_models.errors import (
11    MediaNotFoundError,
12    MusicAssistantError,
13    ProviderUnavailableError,
14)
15from music_assistant_models.media_items import Album, Artist, ItemMapping, ProviderMapping, Track
16
17from music_assistant.constants import (
18    DB_TABLE_ALBUM_ARTISTS,
19    DB_TABLE_ARTISTS,
20    DB_TABLE_TRACK_ARTISTS,
21    VARIOUS_ARTISTS_MBID,
22    VARIOUS_ARTISTS_NAME,
23)
24from music_assistant.controllers.media.base import MediaControllerBase
25from music_assistant.helpers.compare import compare_artist, compare_strings, create_safe_string
26from music_assistant.helpers.database import UNSET
27from music_assistant.helpers.json import serialize_to_json
28
29if TYPE_CHECKING:
30    from music_assistant import MusicAssistant
31    from music_assistant.models.music_provider import MusicProvider
32
33
34class ArtistsController(MediaControllerBase[Artist]):
35    """Controller managing MediaItems of type Artist."""
36
37    db_table = DB_TABLE_ARTISTS
38    media_type = MediaType.ARTIST
39    item_cls = Artist
40
41    def __init__(self, mass: MusicAssistant) -> None:
42        """Initialize class."""
43        super().__init__(mass)
44        self._db_add_lock = asyncio.Lock()
45        # register (extra) api handlers
46        api_base = self.api_base
47        self.mass.register_api_command(f"music/{api_base}/artist_albums", self.albums)
48        self.mass.register_api_command(f"music/{api_base}/artist_tracks", self.tracks)
49
50    async def library_count(
51        self, favorite_only: bool = False, album_artists_only: bool = False
52    ) -> int:
53        """Return the total number of items in the library."""
54        sql_query = f"SELECT item_id FROM {self.db_table}"
55        query_parts: list[str] = []
56        if favorite_only:
57            query_parts.append("favorite = 1")
58        if album_artists_only:
59            query_parts.append(
60                f"item_id in (select {DB_TABLE_ALBUM_ARTISTS}.artist_id "
61                f"FROM {DB_TABLE_ALBUM_ARTISTS})"
62            )
63        if query_parts:
64            sql_query += f" WHERE {' AND '.join(query_parts)}"
65        return await self.mass.music.database.get_count_from_query(sql_query)
66
67    async def library_items(
68        self,
69        favorite: bool | None = None,
70        search: str | None = None,
71        limit: int = 500,
72        offset: int = 0,
73        order_by: str = "sort_name",
74        provider: str | list[str] | None = None,
75        album_artists_only: bool = False,
76        **kwargs: Any,
77    ) -> list[Artist]:
78        """Get in-database (album) artists.
79
80        :param favorite: Filter by favorite status.
81        :param search: Filter by search query.
82        :param limit: Maximum number of items to return.
83        :param offset: Number of items to skip.
84        :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
85        :param provider: Filter by provider instance ID (single string or list).
86        :param album_artists_only: Only return artists that have albums.
87        """
88        extra_query_params: dict[str, Any] = {}
89        extra_query_parts: list[str] = []
90        if album_artists_only:
91            extra_query_parts.append(
92                f"artists.item_id in (select {DB_TABLE_ALBUM_ARTISTS}.artist_id "
93                f"from {DB_TABLE_ALBUM_ARTISTS})"
94            )
95        return await self.get_library_items_by_query(
96            favorite=favorite,
97            search=search,
98            limit=limit,
99            offset=offset,
100            order_by=order_by,
101            provider_filter=self._ensure_provider_filter(provider),
102            extra_query_parts=extra_query_parts,
103            extra_query_params=extra_query_params,
104            in_library_only=True,
105        )
106
107    async def tracks(
108        self,
109        item_id: str,
110        provider_instance_id_or_domain: str,
111        in_library_only: bool = False,
112        provider_filter: str | list[str] | None = None,
113    ) -> list[Track]:
114        """Return all/top tracks for an artist."""
115        if provider_filter and provider_instance_id_or_domain != "library":
116            raise MusicAssistantError("Cannot use provider_filter with specific provider request")
117        if isinstance(provider_filter, str):
118            provider_filter = [provider_filter]
119        # always check if we have a library item for this artist
120        library_artist = await self.get_library_item_by_prov_id(
121            item_id, provider_instance_id_or_domain
122        )
123        if not library_artist:
124            return await self.get_provider_artist_toptracks(item_id, provider_instance_id_or_domain)
125        db_items = await self.get_library_artist_tracks(library_artist.item_id)
126        result: list[Track] = db_items
127        if in_library_only and not provider_filter:
128            # return in-library items only
129            return result
130        # return all (unique) items from all providers
131        # initialize unique_ids with db_items to prevent duplicates
132        unique_ids: set[str] = {f"{item.name}.{item.version}" for item in db_items}
133        unique_providers = self.mass.music.get_unique_providers()
134        for provider_mapping in library_artist.provider_mappings:
135            if provider_mapping.provider_instance not in unique_providers:
136                continue
137            if provider_filter and provider_mapping.provider_instance not in provider_filter:
138                continue
139            provider_tracks = await self.get_provider_artist_toptracks(
140                provider_mapping.item_id, provider_mapping.provider_instance
141            )
142            for provider_track in provider_tracks:
143                unique_id = f"{provider_track.name}.{provider_track.version}"
144                if unique_id in unique_ids:
145                    continue
146                unique_ids.add(unique_id)
147                # prefer db item
148                if db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
149                    provider_track.item_id, provider_track.provider
150                ):
151                    result.append(db_item)
152                elif not in_library_only:
153                    result.append(provider_track)
154        return result
155
156    async def albums(
157        self,
158        item_id: str,
159        provider_instance_id_or_domain: str,
160        in_library_only: bool = False,
161    ) -> list[Album]:
162        """Return (all/most popular) albums for an artist."""
163        # always check if we have a library item for this artist
164        library_artist = await self.get_library_item_by_prov_id(
165            item_id, provider_instance_id_or_domain
166        )
167        if not library_artist:
168            return await self.get_provider_artist_albums(item_id, provider_instance_id_or_domain)
169        db_items = await self.get_library_artist_albums(library_artist.item_id)
170        result: list[Album] = db_items
171        if in_library_only:
172            # return in-library items only
173            return result
174        # return all (unique) items from all providers
175        # initialize unique_ids with db_items to prevent duplicates
176        unique_ids: set[str] = {f"{item.name}.{item.version}" for item in db_items}
177        unique_providers = self.mass.music.get_unique_providers()
178        for provider_mapping in library_artist.provider_mappings:
179            if provider_mapping.provider_instance not in unique_providers:
180                continue
181            provider_albums = await self.get_provider_artist_albums(
182                provider_mapping.item_id, provider_mapping.provider_instance
183            )
184            for provider_album in provider_albums:
185                unique_id = f"{provider_album.name}.{provider_album.version}"
186                if unique_id in unique_ids:
187                    continue
188                unique_ids.add(unique_id)
189                # prefer db item
190                if db_item := await self.mass.music.albums.get_library_item_by_prov_id(
191                    provider_album.item_id, provider_album.provider
192                ):
193                    result.append(db_item)
194                elif not in_library_only:
195                    result.append(provider_album)
196        return result
197
198    async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
199        """Delete record from the database."""
200        db_id = int(item_id)  # ensure integer
201
202        # recursively also remove artist albums
203        for db_row in await self.mass.music.database.get_rows_from_query(
204            f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id",
205            {"artist_id": db_id},
206            limit=5000,
207        ):
208            if not recursive:
209                raise MusicAssistantError("Artist still has albums linked")
210            with contextlib.suppress(MediaNotFoundError):
211                await self.mass.music.albums.remove_item_from_library(db_row["album_id"])
212        # recursively also remove artist tracks
213        for db_row in await self.mass.music.database.get_rows_from_query(
214            f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id",
215            {"artist_id": db_id},
216            limit=5000,
217        ):
218            if not recursive:
219                raise MusicAssistantError("Artist still has tracks linked")
220            with contextlib.suppress(MediaNotFoundError):
221                await self.mass.music.tracks.remove_item_from_library(db_row["track_id"])
222
223        # delete the artist itself from db
224        # this will raise if the item still has references and recursive is false
225        await super().remove_item_from_library(db_id)
226
227    async def get_provider_artist_toptracks(
228        self,
229        item_id: str,
230        provider_instance_id_or_domain: str,
231    ) -> list[Track]:
232        """Return top tracks for an artist on given provider."""
233        assert provider_instance_id_or_domain != "library"
234        if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
235            return []
236        prov = cast("MusicProvider", prov)
237        if ProviderFeature.ARTIST_TOPTRACKS in prov.supported_features:
238            return await prov.get_artist_toptracks(item_id)
239        # fallback implementation using the library db
240        if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
241            item_id,
242            provider_instance_id_or_domain,
243        ):
244            db_artist_id = int(db_artist.item_id)  # ensure integer
245            subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id"
246            query = f"tracks.item_id in ({subquery})"
247            return await self.mass.music.tracks.get_library_items_by_query(
248                extra_query_parts=[query],
249                extra_query_params={"artist_id": db_artist_id},
250                provider_filter=[provider_instance_id_or_domain],
251            )
252        return []
253
254    async def get_library_artist_tracks(
255        self,
256        item_id: str | int,
257    ) -> list[Track]:
258        """Return all tracks for an artist in the library/db."""
259        db_id = int(item_id)  # ensure integer
260        subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id"
261        query = f"tracks.item_id in ({subquery})"
262        return await self.mass.music.tracks.get_library_items_by_query(
263            extra_query_parts=[query],
264            extra_query_params={"artist_id": db_id},
265        )
266
267    async def get_provider_artist_albums(
268        self,
269        item_id: str,
270        provider_instance_id_or_domain: str,
271    ) -> list[Album]:
272        """Return albums for an artist on given provider."""
273        assert provider_instance_id_or_domain != "library"
274        if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
275            return []
276        prov = cast("MusicProvider", prov)
277        if ProviderFeature.ARTIST_ALBUMS in prov.supported_features:
278            return await prov.get_artist_albums(item_id)
279        # fallback implementation using the db
280        if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
281            item_id,
282            provider_instance_id_or_domain,
283        ):
284            db_artist_id = int(db_artist.item_id)  # ensure integer
285            subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id"
286            query = f"albums.item_id in ({subquery})"
287            return await self.mass.music.albums.get_library_items_by_query(
288                extra_query_parts=[query],
289                extra_query_params={"artist_id": db_artist_id},
290                provider_filter=[provider_instance_id_or_domain],
291            )
292        return []
293
294    async def get_library_artist_albums(
295        self,
296        item_id: str | int,
297    ) -> list[Album]:
298        """Return all in-library albums for an artist."""
299        db_id = int(item_id)  # ensure integer
300        subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id"
301        query = f"albums.item_id in ({subquery})"
302        return await self.mass.music.albums.get_library_items_by_query(
303            extra_query_parts=[query],
304            extra_query_params={"artist_id": db_id},
305        )
306
307    async def _add_library_item(
308        self, item: Artist | ItemMapping, overwrite_existing: bool = False
309    ) -> int:
310        """Add a new item record to the database."""
311        # If item is an ItemMapping, convert it
312        if isinstance(item, ItemMapping):
313            item = self.artist_from_item_mapping(item)
314        # enforce various artists name + id
315        if compare_strings(item.name, VARIOUS_ARTISTS_NAME):
316            item.mbid = VARIOUS_ARTISTS_MBID
317        if item.mbid == VARIOUS_ARTISTS_MBID:
318            item.name = VARIOUS_ARTISTS_NAME
319        # no existing item matched: insert item
320        db_id = await self.mass.music.database.insert(
321            self.db_table,
322            {
323                "name": item.name,
324                "sort_name": item.sort_name,
325                "favorite": item.favorite,
326                "external_ids": serialize_to_json(item.external_ids),
327                "metadata": serialize_to_json(item.metadata),
328                "search_name": create_safe_string(item.name, True, True),
329                "search_sort_name": create_safe_string(item.sort_name or "", True, True),
330                "timestamp_added": int(item.date_added.timestamp()) if item.date_added else UNSET,
331            },
332        )
333        # update/set provider_mappings table
334        await self.set_provider_mappings(db_id, item.provider_mappings)
335        self.logger.debug("added %s to database (id: %s)", item.name, db_id)
336        return db_id
337
338    async def _update_library_item(
339        self, item_id: str | int, update: Artist | ItemMapping, overwrite: bool = False
340    ) -> None:
341        """Update existing record in the database."""
342        db_id = int(item_id)  # ensure integer
343        cur_item = await self.get_library_item(db_id)
344        if isinstance(update, ItemMapping):
345            # NOTE that artist is the only mediatype where its accepted we
346            # receive an itemmapping from streaming providers
347            update = self.artist_from_item_mapping(update)
348            metadata = cur_item.metadata
349        else:
350            metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
351        cur_item.external_ids.update(update.external_ids)
352        # enforce various artists name + id
353        mbid = cur_item.mbid
354        if (not mbid or overwrite) and getattr(update, "mbid", None):
355            if compare_strings(update.name, VARIOUS_ARTISTS_NAME):
356                update.mbid = VARIOUS_ARTISTS_MBID
357            if update.mbid == VARIOUS_ARTISTS_MBID:
358                update.name = VARIOUS_ARTISTS_NAME
359
360        name = update.name if overwrite else cur_item.name
361        sort_name = update.sort_name if overwrite else cur_item.sort_name or update.sort_name
362        await self.mass.music.database.update(
363            self.db_table,
364            {"item_id": db_id},
365            {
366                "name": name,
367                "sort_name": sort_name,
368                "external_ids": serialize_to_json(
369                    update.external_ids if overwrite else cur_item.external_ids
370                ),
371                "metadata": serialize_to_json(metadata),
372                "search_name": create_safe_string(name, True, True),
373                "search_sort_name": create_safe_string(sort_name or "", True, True),
374                "timestamp_added": int(update.date_added.timestamp())
375                if update.date_added
376                else UNSET,
377            },
378        )
379        self.logger.debug("updated %s in database: %s", update.name, db_id)
380        # update/set provider_mappings table
381        provider_mappings = (
382            update.provider_mappings
383            if overwrite
384            else {*update.provider_mappings, *cur_item.provider_mappings}
385        )
386        await self.set_provider_mappings(db_id, provider_mappings, overwrite)
387        self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
388
389    async def radio_mode_base_tracks(
390        self,
391        item: Artist,
392        preferred_provider_instances: list[str] | None = None,
393    ) -> list[Track]:
394        """
395        Get the list of base tracks from the controller used to calculate the dynamic radio.
396
397        :param item: The Artist to get base tracks for.
398        :param preferred_provider_instances: List of preferred provider instance IDs to use.
399        """
400        return await self.tracks(
401            item.item_id,
402            item.provider,
403            in_library_only=False,
404        )
405
406    async def match_provider(
407        self, db_artist: Artist, provider: MusicProvider, strict: bool = True
408    ) -> list[ProviderMapping]:
409        """
410        Try to find match on (streaming) provider for the provided (database) artist.
411
412        This is used to link objects of different providers/qualities together.
413        """
414        self.logger.debug("Trying to match artist %s on provider %s", db_artist.name, provider.name)
415        matches: list[ProviderMapping] = []
416        # try to get a match with some reference tracks of this artist
417        ref_tracks = await self.mass.music.artists.tracks(db_artist.item_id, db_artist.provider)
418        if len(ref_tracks) < 10:
419            # fetch reference tracks from provider(s) attached to the artist
420            for provider_mapping in db_artist.provider_mappings:
421                with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
422                    ref_tracks += await self.mass.music.artists.tracks(
423                        provider_mapping.item_id, provider_mapping.provider_instance
424                    )
425        for ref_track in ref_tracks:
426            search_str = f"{db_artist.name} - {ref_track.name}"
427            search_results = await self.mass.music.tracks.search(search_str, provider.domain)
428            for search_result_item in search_results:
429                if not compare_strings(search_result_item.name, ref_track.name, strict=strict):
430                    continue
431                # get matching artist from track
432                for search_item_artist in search_result_item.artists:
433                    if not compare_strings(search_item_artist.name, db_artist.name, strict=strict):
434                        continue
435                    # 100% track match
436                    # get full artist details so we have all metadata
437                    prov_artist = await self.get_provider_item(
438                        search_item_artist.item_id,
439                        search_item_artist.provider,
440                        fallback=search_item_artist,
441                    )
442                    # 100% match
443                    matches.extend(prov_artist.provider_mappings)
444                    if matches:
445                        return matches
446        # try to get a match with some reference albums of this artist
447        ref_albums = await self.mass.music.artists.albums(db_artist.item_id, db_artist.provider)
448        if len(ref_albums) < 10:
449            # fetch reference albums from provider(s) attached to the artist
450            for provider_mapping in db_artist.provider_mappings:
451                with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
452                    ref_albums += await self.mass.music.artists.albums(
453                        provider_mapping.item_id, provider_mapping.provider_instance
454                    )
455        for ref_album in ref_albums:
456            if ref_album.album_type == AlbumType.COMPILATION:
457                continue
458            if not ref_album.artists:
459                continue
460            search_str = f"{db_artist.name} - {ref_album.name}"
461            search_result_albums = await self.mass.music.albums.search(search_str, provider.domain)
462            for search_result_album in search_result_albums:
463                if not search_result_album.artists:
464                    continue
465                if not compare_strings(search_result_album.name, ref_album.name, strict=strict):
466                    continue
467                # artist must match 100%
468                if not compare_artist(db_artist, search_result_album.artists[0], strict=strict):
469                    continue
470                # 100% match
471                # get full artist details so we have all metadata
472                prov_artist = await self.get_provider_item(
473                    search_result_album.artists[0].item_id,
474                    search_result_album.artists[0].provider,
475                    fallback=search_result_album.artists[0],
476                )
477                matches.extend(prov_artist.provider_mappings)
478                if matches:
479                    return matches
480        if not matches:
481            self.logger.debug(
482                "Could not find match for Artist %s on provider %s",
483                db_artist.name,
484                provider.name,
485            )
486        return matches
487
488    async def match_providers(self, db_artist: Artist) -> None:
489        """Try to find matching artists on all providers for the provided (database) item_id.
490
491        This is used to link objects of different providers together.
492        """
493        if db_artist.provider != "library":
494            return  # Matching only supported for database items
495
496        # try to find match on all providers
497
498        cur_provider_domains = {
499            x.provider_domain for x in db_artist.provider_mappings if x.available
500        }
501        for provider in self.mass.music.providers:
502            if provider.domain in cur_provider_domains:
503                continue
504            if ProviderFeature.SEARCH not in provider.supported_features:
505                continue
506            if not provider.library_supported(MediaType.ARTIST):
507                continue
508            if not provider.is_streaming_provider:
509                # matching on unique providers is pointless as they push (all) their content to MA
510                continue
511            if match := await self.match_provider(db_artist, provider):
512                # 100% match, we update the db with the additional provider mapping(s)
513                await self.add_provider_mappings(db_artist.item_id, match)
514                cur_provider_domains.add(provider.domain)
515
516    def artist_from_item_mapping(self, item: ItemMapping) -> Artist:
517        """Create an Artist object from an ItemMapping object."""
518        domain, instance_id = None, None
519        if prov := self.mass.get_provider(item.provider):
520            domain = prov.domain
521            instance_id = prov.instance_id
522        return Artist.from_dict(
523            {
524                **item.to_dict(),
525                "provider_mappings": [
526                    {
527                        "item_id": item.item_id,
528                        "provider_domain": domain,
529                        "provider_instance": instance_id,
530                        "available": item.available,
531                    }
532                ],
533            }
534        )
535