/
/
/
1"""Manage MediaItems of type Artist."""
2
3from __future__ import annotations
4
5import asyncio
6import contextlib
7from typing import TYPE_CHECKING, Any, cast
8
9from music_assistant_models.enums import AlbumType, MediaType, ProviderFeature
10from music_assistant_models.errors import (
11 MediaNotFoundError,
12 MusicAssistantError,
13 ProviderUnavailableError,
14)
15from music_assistant_models.media_items import Album, Artist, ItemMapping, ProviderMapping, Track
16
17from music_assistant.constants import (
18 DB_TABLE_ALBUM_ARTISTS,
19 DB_TABLE_ARTISTS,
20 DB_TABLE_TRACK_ARTISTS,
21 VARIOUS_ARTISTS_MBID,
22 VARIOUS_ARTISTS_NAME,
23)
24from music_assistant.controllers.media.base import MediaControllerBase
25from music_assistant.helpers.compare import compare_artist, compare_strings, create_safe_string
26from music_assistant.helpers.database import UNSET
27from music_assistant.helpers.json import serialize_to_json
28
29if TYPE_CHECKING:
30 from music_assistant import MusicAssistant
31 from music_assistant.models.music_provider import MusicProvider
32
33
34class ArtistsController(MediaControllerBase[Artist]):
35 """Controller managing MediaItems of type Artist."""
36
37 db_table = DB_TABLE_ARTISTS
38 media_type = MediaType.ARTIST
39 item_cls = Artist
40
41 def __init__(self, mass: MusicAssistant) -> None:
42 """Initialize class."""
43 super().__init__(mass)
44 self._db_add_lock = asyncio.Lock()
45 # register (extra) api handlers
46 api_base = self.api_base
47 self.mass.register_api_command(f"music/{api_base}/artist_albums", self.albums)
48 self.mass.register_api_command(f"music/{api_base}/artist_tracks", self.tracks)
49
50 async def library_count(
51 self, favorite_only: bool = False, album_artists_only: bool = False
52 ) -> int:
53 """Return the total number of items in the library."""
54 sql_query = f"SELECT item_id FROM {self.db_table}"
55 query_parts: list[str] = []
56 if favorite_only:
57 query_parts.append("favorite = 1")
58 if album_artists_only:
59 query_parts.append(
60 f"item_id in (select {DB_TABLE_ALBUM_ARTISTS}.artist_id "
61 f"FROM {DB_TABLE_ALBUM_ARTISTS})"
62 )
63 if query_parts:
64 sql_query += f" WHERE {' AND '.join(query_parts)}"
65 return await self.mass.music.database.get_count_from_query(sql_query)
66
67 async def library_items(
68 self,
69 favorite: bool | None = None,
70 search: str | None = None,
71 limit: int = 500,
72 offset: int = 0,
73 order_by: str = "sort_name",
74 provider: str | list[str] | None = None,
75 album_artists_only: bool = False,
76 **kwargs: Any,
77 ) -> list[Artist]:
78 """Get in-database (album) artists.
79
80 :param favorite: Filter by favorite status.
81 :param search: Filter by search query.
82 :param limit: Maximum number of items to return.
83 :param offset: Number of items to skip.
84 :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
85 :param provider: Filter by provider instance ID (single string or list).
86 :param album_artists_only: Only return artists that have albums.
87 """
88 extra_query_params: dict[str, Any] = {}
89 extra_query_parts: list[str] = []
90 if album_artists_only:
91 extra_query_parts.append(
92 f"artists.item_id in (select {DB_TABLE_ALBUM_ARTISTS}.artist_id "
93 f"from {DB_TABLE_ALBUM_ARTISTS})"
94 )
95 return await self.get_library_items_by_query(
96 favorite=favorite,
97 search=search,
98 limit=limit,
99 offset=offset,
100 order_by=order_by,
101 provider_filter=self._ensure_provider_filter(provider),
102 extra_query_parts=extra_query_parts,
103 extra_query_params=extra_query_params,
104 in_library_only=True,
105 )
106
107 async def tracks(
108 self,
109 item_id: str,
110 provider_instance_id_or_domain: str,
111 in_library_only: bool = False,
112 provider_filter: str | list[str] | None = None,
113 ) -> list[Track]:
114 """Return all/top tracks for an artist."""
115 if provider_filter and provider_instance_id_or_domain != "library":
116 raise MusicAssistantError("Cannot use provider_filter with specific provider request")
117 if isinstance(provider_filter, str):
118 provider_filter = [provider_filter]
119 # always check if we have a library item for this artist
120 library_artist = await self.get_library_item_by_prov_id(
121 item_id, provider_instance_id_or_domain
122 )
123 if not library_artist:
124 return await self.get_provider_artist_toptracks(item_id, provider_instance_id_or_domain)
125 db_items = await self.get_library_artist_tracks(library_artist.item_id)
126 result: list[Track] = db_items
127 if in_library_only and not provider_filter:
128 # return in-library items only
129 return result
130 # return all (unique) items from all providers
131 # initialize unique_ids with db_items to prevent duplicates
132 unique_ids: set[str] = {f"{item.name}.{item.version}" for item in db_items}
133 unique_providers = self.mass.music.get_unique_providers()
134 for provider_mapping in library_artist.provider_mappings:
135 if provider_mapping.provider_instance not in unique_providers:
136 continue
137 if provider_filter and provider_mapping.provider_instance not in provider_filter:
138 continue
139 provider_tracks = await self.get_provider_artist_toptracks(
140 provider_mapping.item_id, provider_mapping.provider_instance
141 )
142 for provider_track in provider_tracks:
143 unique_id = f"{provider_track.name}.{provider_track.version}"
144 if unique_id in unique_ids:
145 continue
146 unique_ids.add(unique_id)
147 # prefer db item
148 if db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
149 provider_track.item_id, provider_track.provider
150 ):
151 result.append(db_item)
152 elif not in_library_only:
153 result.append(provider_track)
154 return result
155
156 async def albums(
157 self,
158 item_id: str,
159 provider_instance_id_or_domain: str,
160 in_library_only: bool = False,
161 ) -> list[Album]:
162 """Return (all/most popular) albums for an artist."""
163 # always check if we have a library item for this artist
164 library_artist = await self.get_library_item_by_prov_id(
165 item_id, provider_instance_id_or_domain
166 )
167 if not library_artist:
168 return await self.get_provider_artist_albums(item_id, provider_instance_id_or_domain)
169 db_items = await self.get_library_artist_albums(library_artist.item_id)
170 result: list[Album] = db_items
171 if in_library_only:
172 # return in-library items only
173 return result
174 # return all (unique) items from all providers
175 # initialize unique_ids with db_items to prevent duplicates
176 unique_ids: set[str] = {f"{item.name}.{item.version}" for item in db_items}
177 unique_providers = self.mass.music.get_unique_providers()
178 for provider_mapping in library_artist.provider_mappings:
179 if provider_mapping.provider_instance not in unique_providers:
180 continue
181 provider_albums = await self.get_provider_artist_albums(
182 provider_mapping.item_id, provider_mapping.provider_instance
183 )
184 for provider_album in provider_albums:
185 unique_id = f"{provider_album.name}.{provider_album.version}"
186 if unique_id in unique_ids:
187 continue
188 unique_ids.add(unique_id)
189 # prefer db item
190 if db_item := await self.mass.music.albums.get_library_item_by_prov_id(
191 provider_album.item_id, provider_album.provider
192 ):
193 result.append(db_item)
194 elif not in_library_only:
195 result.append(provider_album)
196 return result
197
198 async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
199 """Delete record from the database."""
200 db_id = int(item_id) # ensure integer
201
202 # recursively also remove artist albums
203 for db_row in await self.mass.music.database.get_rows_from_query(
204 f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id",
205 {"artist_id": db_id},
206 limit=5000,
207 ):
208 if not recursive:
209 raise MusicAssistantError("Artist still has albums linked")
210 with contextlib.suppress(MediaNotFoundError):
211 await self.mass.music.albums.remove_item_from_library(db_row["album_id"])
212 # recursively also remove artist tracks
213 for db_row in await self.mass.music.database.get_rows_from_query(
214 f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id",
215 {"artist_id": db_id},
216 limit=5000,
217 ):
218 if not recursive:
219 raise MusicAssistantError("Artist still has tracks linked")
220 with contextlib.suppress(MediaNotFoundError):
221 await self.mass.music.tracks.remove_item_from_library(db_row["track_id"])
222
223 # delete the artist itself from db
224 # this will raise if the item still has references and recursive is false
225 await super().remove_item_from_library(db_id)
226
227 async def get_provider_artist_toptracks(
228 self,
229 item_id: str,
230 provider_instance_id_or_domain: str,
231 ) -> list[Track]:
232 """Return top tracks for an artist on given provider."""
233 assert provider_instance_id_or_domain != "library"
234 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
235 return []
236 prov = cast("MusicProvider", prov)
237 if ProviderFeature.ARTIST_TOPTRACKS in prov.supported_features:
238 return await prov.get_artist_toptracks(item_id)
239 # fallback implementation using the library db
240 if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
241 item_id,
242 provider_instance_id_or_domain,
243 ):
244 db_artist_id = int(db_artist.item_id) # ensure integer
245 subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id"
246 query = f"tracks.item_id in ({subquery})"
247 return await self.mass.music.tracks.get_library_items_by_query(
248 extra_query_parts=[query],
249 extra_query_params={"artist_id": db_artist_id},
250 provider_filter=[provider_instance_id_or_domain],
251 )
252 return []
253
254 async def get_library_artist_tracks(
255 self,
256 item_id: str | int,
257 ) -> list[Track]:
258 """Return all tracks for an artist in the library/db."""
259 db_id = int(item_id) # ensure integer
260 subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id"
261 query = f"tracks.item_id in ({subquery})"
262 return await self.mass.music.tracks.get_library_items_by_query(
263 extra_query_parts=[query],
264 extra_query_params={"artist_id": db_id},
265 )
266
267 async def get_provider_artist_albums(
268 self,
269 item_id: str,
270 provider_instance_id_or_domain: str,
271 ) -> list[Album]:
272 """Return albums for an artist on given provider."""
273 assert provider_instance_id_or_domain != "library"
274 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
275 return []
276 prov = cast("MusicProvider", prov)
277 if ProviderFeature.ARTIST_ALBUMS in prov.supported_features:
278 return await prov.get_artist_albums(item_id)
279 # fallback implementation using the db
280 if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
281 item_id,
282 provider_instance_id_or_domain,
283 ):
284 db_artist_id = int(db_artist.item_id) # ensure integer
285 subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id"
286 query = f"albums.item_id in ({subquery})"
287 return await self.mass.music.albums.get_library_items_by_query(
288 extra_query_parts=[query],
289 extra_query_params={"artist_id": db_artist_id},
290 provider_filter=[provider_instance_id_or_domain],
291 )
292 return []
293
294 async def get_library_artist_albums(
295 self,
296 item_id: str | int,
297 ) -> list[Album]:
298 """Return all in-library albums for an artist."""
299 db_id = int(item_id) # ensure integer
300 subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id"
301 query = f"albums.item_id in ({subquery})"
302 return await self.mass.music.albums.get_library_items_by_query(
303 extra_query_parts=[query],
304 extra_query_params={"artist_id": db_id},
305 )
306
307 async def _add_library_item(
308 self, item: Artist | ItemMapping, overwrite_existing: bool = False
309 ) -> int:
310 """Add a new item record to the database."""
311 # If item is an ItemMapping, convert it
312 if isinstance(item, ItemMapping):
313 item = self.artist_from_item_mapping(item)
314 # enforce various artists name + id
315 if compare_strings(item.name, VARIOUS_ARTISTS_NAME):
316 item.mbid = VARIOUS_ARTISTS_MBID
317 if item.mbid == VARIOUS_ARTISTS_MBID:
318 item.name = VARIOUS_ARTISTS_NAME
319 # no existing item matched: insert item
320 db_id = await self.mass.music.database.insert(
321 self.db_table,
322 {
323 "name": item.name,
324 "sort_name": item.sort_name,
325 "favorite": item.favorite,
326 "external_ids": serialize_to_json(item.external_ids),
327 "metadata": serialize_to_json(item.metadata),
328 "search_name": create_safe_string(item.name, True, True),
329 "search_sort_name": create_safe_string(item.sort_name or "", True, True),
330 "timestamp_added": int(item.date_added.timestamp()) if item.date_added else UNSET,
331 },
332 )
333 # update/set provider_mappings table
334 await self.set_provider_mappings(db_id, item.provider_mappings)
335 self.logger.debug("added %s to database (id: %s)", item.name, db_id)
336 return db_id
337
338 async def _update_library_item(
339 self, item_id: str | int, update: Artist | ItemMapping, overwrite: bool = False
340 ) -> None:
341 """Update existing record in the database."""
342 db_id = int(item_id) # ensure integer
343 cur_item = await self.get_library_item(db_id)
344 if isinstance(update, ItemMapping):
345 # NOTE that artist is the only mediatype where its accepted we
346 # receive an itemmapping from streaming providers
347 update = self.artist_from_item_mapping(update)
348 metadata = cur_item.metadata
349 else:
350 metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
351 cur_item.external_ids.update(update.external_ids)
352 # enforce various artists name + id
353 mbid = cur_item.mbid
354 if (not mbid or overwrite) and getattr(update, "mbid", None):
355 if compare_strings(update.name, VARIOUS_ARTISTS_NAME):
356 update.mbid = VARIOUS_ARTISTS_MBID
357 if update.mbid == VARIOUS_ARTISTS_MBID:
358 update.name = VARIOUS_ARTISTS_NAME
359
360 name = update.name if overwrite else cur_item.name
361 sort_name = update.sort_name if overwrite else cur_item.sort_name or update.sort_name
362 await self.mass.music.database.update(
363 self.db_table,
364 {"item_id": db_id},
365 {
366 "name": name,
367 "sort_name": sort_name,
368 "external_ids": serialize_to_json(
369 update.external_ids if overwrite else cur_item.external_ids
370 ),
371 "metadata": serialize_to_json(metadata),
372 "search_name": create_safe_string(name, True, True),
373 "search_sort_name": create_safe_string(sort_name or "", True, True),
374 "timestamp_added": int(update.date_added.timestamp())
375 if update.date_added
376 else UNSET,
377 },
378 )
379 self.logger.debug("updated %s in database: %s", update.name, db_id)
380 # update/set provider_mappings table
381 provider_mappings = (
382 update.provider_mappings
383 if overwrite
384 else {*update.provider_mappings, *cur_item.provider_mappings}
385 )
386 await self.set_provider_mappings(db_id, provider_mappings, overwrite)
387 self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
388
389 async def radio_mode_base_tracks(
390 self,
391 item: Artist,
392 preferred_provider_instances: list[str] | None = None,
393 ) -> list[Track]:
394 """
395 Get the list of base tracks from the controller used to calculate the dynamic radio.
396
397 :param item: The Artist to get base tracks for.
398 :param preferred_provider_instances: List of preferred provider instance IDs to use.
399 """
400 return await self.tracks(
401 item.item_id,
402 item.provider,
403 in_library_only=False,
404 )
405
406 async def match_provider(
407 self, db_artist: Artist, provider: MusicProvider, strict: bool = True
408 ) -> list[ProviderMapping]:
409 """
410 Try to find match on (streaming) provider for the provided (database) artist.
411
412 This is used to link objects of different providers/qualities together.
413 """
414 self.logger.debug("Trying to match artist %s on provider %s", db_artist.name, provider.name)
415 matches: list[ProviderMapping] = []
416 # try to get a match with some reference tracks of this artist
417 ref_tracks = await self.mass.music.artists.tracks(db_artist.item_id, db_artist.provider)
418 if len(ref_tracks) < 10:
419 # fetch reference tracks from provider(s) attached to the artist
420 for provider_mapping in db_artist.provider_mappings:
421 with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
422 ref_tracks += await self.mass.music.artists.tracks(
423 provider_mapping.item_id, provider_mapping.provider_instance
424 )
425 for ref_track in ref_tracks:
426 search_str = f"{db_artist.name} - {ref_track.name}"
427 search_results = await self.mass.music.tracks.search(search_str, provider.domain)
428 for search_result_item in search_results:
429 if not compare_strings(search_result_item.name, ref_track.name, strict=strict):
430 continue
431 # get matching artist from track
432 for search_item_artist in search_result_item.artists:
433 if not compare_strings(search_item_artist.name, db_artist.name, strict=strict):
434 continue
435 # 100% track match
436 # get full artist details so we have all metadata
437 prov_artist = await self.get_provider_item(
438 search_item_artist.item_id,
439 search_item_artist.provider,
440 fallback=search_item_artist,
441 )
442 # 100% match
443 matches.extend(prov_artist.provider_mappings)
444 if matches:
445 return matches
446 # try to get a match with some reference albums of this artist
447 ref_albums = await self.mass.music.artists.albums(db_artist.item_id, db_artist.provider)
448 if len(ref_albums) < 10:
449 # fetch reference albums from provider(s) attached to the artist
450 for provider_mapping in db_artist.provider_mappings:
451 with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
452 ref_albums += await self.mass.music.artists.albums(
453 provider_mapping.item_id, provider_mapping.provider_instance
454 )
455 for ref_album in ref_albums:
456 if ref_album.album_type == AlbumType.COMPILATION:
457 continue
458 if not ref_album.artists:
459 continue
460 search_str = f"{db_artist.name} - {ref_album.name}"
461 search_result_albums = await self.mass.music.albums.search(search_str, provider.domain)
462 for search_result_album in search_result_albums:
463 if not search_result_album.artists:
464 continue
465 if not compare_strings(search_result_album.name, ref_album.name, strict=strict):
466 continue
467 # artist must match 100%
468 if not compare_artist(db_artist, search_result_album.artists[0], strict=strict):
469 continue
470 # 100% match
471 # get full artist details so we have all metadata
472 prov_artist = await self.get_provider_item(
473 search_result_album.artists[0].item_id,
474 search_result_album.artists[0].provider,
475 fallback=search_result_album.artists[0],
476 )
477 matches.extend(prov_artist.provider_mappings)
478 if matches:
479 return matches
480 if not matches:
481 self.logger.debug(
482 "Could not find match for Artist %s on provider %s",
483 db_artist.name,
484 provider.name,
485 )
486 return matches
487
488 async def match_providers(self, db_artist: Artist) -> None:
489 """Try to find matching artists on all providers for the provided (database) item_id.
490
491 This is used to link objects of different providers together.
492 """
493 if db_artist.provider != "library":
494 return # Matching only supported for database items
495
496 # try to find match on all providers
497
498 cur_provider_domains = {
499 x.provider_domain for x in db_artist.provider_mappings if x.available
500 }
501 for provider in self.mass.music.providers:
502 if provider.domain in cur_provider_domains:
503 continue
504 if ProviderFeature.SEARCH not in provider.supported_features:
505 continue
506 if not provider.library_supported(MediaType.ARTIST):
507 continue
508 if not provider.is_streaming_provider:
509 # matching on unique providers is pointless as they push (all) their content to MA
510 continue
511 if match := await self.match_provider(db_artist, provider):
512 # 100% match, we update the db with the additional provider mapping(s)
513 await self.add_provider_mappings(db_artist.item_id, match)
514 cur_provider_domains.add(provider.domain)
515
516 def artist_from_item_mapping(self, item: ItemMapping) -> Artist:
517 """Create an Artist object from an ItemMapping object."""
518 domain, instance_id = None, None
519 if prov := self.mass.get_provider(item.provider):
520 domain = prov.domain
521 instance_id = prov.instance_id
522 return Artist.from_dict(
523 {
524 **item.to_dict(),
525 "provider_mappings": [
526 {
527 "item_id": item.item_id,
528 "provider_domain": domain,
529 "provider_instance": instance_id,
530 "available": item.available,
531 }
532 ],
533 }
534 )
535