/
/
/
1"""Manage MediaItems of type Artist."""
2
3from __future__ import annotations
4
5import asyncio
6import contextlib
7from typing import TYPE_CHECKING, Any, cast
8
9from music_assistant_models.enums import AlbumType, MediaType, ProviderFeature
10from music_assistant_models.errors import (
11 MediaNotFoundError,
12 MusicAssistantError,
13 ProviderUnavailableError,
14)
15from music_assistant_models.media_items import Album, Artist, ItemMapping, ProviderMapping, Track
16
17from music_assistant.constants import (
18 DB_TABLE_ALBUM_ARTISTS,
19 DB_TABLE_ARTISTS,
20 DB_TABLE_TRACK_ARTISTS,
21 VARIOUS_ARTISTS_MBID,
22 VARIOUS_ARTISTS_NAME,
23)
24from music_assistant.controllers.media.base import MediaControllerBase
25from music_assistant.helpers.compare import compare_artist, compare_strings, create_safe_string
26from music_assistant.helpers.database import UNSET
27from music_assistant.helpers.json import serialize_to_json
28
29if TYPE_CHECKING:
30 from music_assistant import MusicAssistant
31 from music_assistant.models.music_provider import MusicProvider
32
33
34class ArtistsController(MediaControllerBase[Artist]):
35 """Controller managing MediaItems of type Artist."""
36
37 db_table = DB_TABLE_ARTISTS
38 media_type = MediaType.ARTIST
39 item_cls = Artist
40
41 def __init__(self, mass: MusicAssistant) -> None:
42 """Initialize class."""
43 super().__init__(mass)
44 self._db_add_lock = asyncio.Lock()
45 # register (extra) api handlers
46 api_base = self.api_base
47 self.mass.register_api_command(f"music/{api_base}/artist_albums", self.albums)
48 self.mass.register_api_command(f"music/{api_base}/artist_tracks", self.tracks)
49
50 async def library_count(
51 self, favorite_only: bool = False, album_artists_only: bool = False
52 ) -> int:
53 """Return the total number of items in the library."""
54 sql_query = f"SELECT item_id FROM {self.db_table}"
55 query_parts: list[str] = []
56 if favorite_only:
57 query_parts.append("favorite = 1")
58 if album_artists_only:
59 query_parts.append(
60 f"item_id in (select {DB_TABLE_ALBUM_ARTISTS}.artist_id "
61 f"FROM {DB_TABLE_ALBUM_ARTISTS})"
62 )
63 if query_parts:
64 sql_query += f" WHERE {' AND '.join(query_parts)}"
65 return await self.mass.music.database.get_count_from_query(sql_query)
66
67 async def library_items(
68 self,
69 favorite: bool | None = None,
70 search: str | None = None,
71 limit: int = 500,
72 offset: int = 0,
73 order_by: str = "sort_name",
74 provider: str | list[str] | None = None,
75 album_artists_only: bool = False,
76 ) -> list[Artist]:
77 """Get in-database (album) artists.
78
79 :param favorite: Filter by favorite status.
80 :param search: Filter by search query.
81 :param limit: Maximum number of items to return.
82 :param offset: Number of items to skip.
83 :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
84 :param provider: Filter by provider instance ID (single string or list).
85 :param album_artists_only: Only return artists that have albums.
86 """
87 extra_query_params: dict[str, Any] = {}
88 extra_query_parts: list[str] = []
89 if album_artists_only:
90 extra_query_parts.append(
91 f"artists.item_id in (select {DB_TABLE_ALBUM_ARTISTS}.artist_id "
92 f"from {DB_TABLE_ALBUM_ARTISTS})"
93 )
94 return await self.get_library_items_by_query(
95 favorite=favorite,
96 search=search,
97 limit=limit,
98 offset=offset,
99 order_by=order_by,
100 provider_filter=self._ensure_provider_filter(provider),
101 extra_query_parts=extra_query_parts,
102 extra_query_params=extra_query_params,
103 )
104
105 async def tracks(
106 self,
107 item_id: str,
108 provider_instance_id_or_domain: str,
109 in_library_only: bool = False,
110 provider_filter: str | list[str] | None = None,
111 ) -> list[Track]:
112 """Return all/top tracks for an artist."""
113 if provider_filter and provider_instance_id_or_domain != "library":
114 raise MusicAssistantError("Cannot use provider_filter with specific provider request")
115 if isinstance(provider_filter, str):
116 provider_filter = [provider_filter]
117 # always check if we have a library item for this artist
118 library_artist = await self.get_library_item_by_prov_id(
119 item_id, provider_instance_id_or_domain
120 )
121 if not library_artist:
122 return await self.get_provider_artist_toptracks(item_id, provider_instance_id_or_domain)
123 db_items = await self.get_library_artist_tracks(library_artist.item_id)
124 result: list[Track] = db_items
125 if in_library_only and not provider_filter:
126 # return in-library items only
127 return result
128 # return all (unique) items from all providers
129 # initialize unique_ids with db_items to prevent duplicates
130 unique_ids: set[str] = {f"{item.name}.{item.version}" for item in db_items}
131 unique_providers = self.mass.music.get_unique_providers()
132 for provider_mapping in library_artist.provider_mappings:
133 if provider_mapping.provider_instance not in unique_providers:
134 continue
135 if provider_filter and provider_mapping.provider_instance not in provider_filter:
136 continue
137 provider_tracks = await self.get_provider_artist_toptracks(
138 provider_mapping.item_id, provider_mapping.provider_instance
139 )
140 for provider_track in provider_tracks:
141 unique_id = f"{provider_track.name}.{provider_track.version}"
142 if unique_id in unique_ids:
143 continue
144 unique_ids.add(unique_id)
145 # prefer db item
146 if db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
147 provider_track.item_id, provider_track.provider
148 ):
149 result.append(db_item)
150 elif not in_library_only:
151 result.append(provider_track)
152 return result
153
154 async def albums(
155 self,
156 item_id: str,
157 provider_instance_id_or_domain: str,
158 in_library_only: bool = False,
159 ) -> list[Album]:
160 """Return (all/most popular) albums for an artist."""
161 # always check if we have a library item for this artist
162 library_artist = await self.get_library_item_by_prov_id(
163 item_id, provider_instance_id_or_domain
164 )
165 if not library_artist:
166 return await self.get_provider_artist_albums(item_id, provider_instance_id_or_domain)
167 db_items = await self.get_library_artist_albums(library_artist.item_id)
168 result: list[Album] = db_items
169 if in_library_only:
170 # return in-library items only
171 return result
172 # return all (unique) items from all providers
173 # initialize unique_ids with db_items to prevent duplicates
174 unique_ids: set[str] = {f"{item.name}.{item.version}" for item in db_items}
175 unique_providers = self.mass.music.get_unique_providers()
176 for provider_mapping in library_artist.provider_mappings:
177 if provider_mapping.provider_instance not in unique_providers:
178 continue
179 provider_albums = await self.get_provider_artist_albums(
180 provider_mapping.item_id, provider_mapping.provider_instance
181 )
182 for provider_album in provider_albums:
183 unique_id = f"{provider_album.name}.{provider_album.version}"
184 if unique_id in unique_ids:
185 continue
186 unique_ids.add(unique_id)
187 # prefer db item
188 if db_item := await self.mass.music.albums.get_library_item_by_prov_id(
189 provider_album.item_id, provider_album.provider
190 ):
191 result.append(db_item)
192 elif not in_library_only:
193 result.append(provider_album)
194 return result
195
196 async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
197 """Delete record from the database."""
198 db_id = int(item_id) # ensure integer
199
200 # recursively also remove artist albums
201 for db_row in await self.mass.music.database.get_rows_from_query(
202 f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id",
203 {"artist_id": db_id},
204 limit=5000,
205 ):
206 if not recursive:
207 raise MusicAssistantError("Artist still has albums linked")
208 with contextlib.suppress(MediaNotFoundError):
209 await self.mass.music.albums.remove_item_from_library(db_row["album_id"])
210 # recursively also remove artist tracks
211 for db_row in await self.mass.music.database.get_rows_from_query(
212 f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id",
213 {"artist_id": db_id},
214 limit=5000,
215 ):
216 if not recursive:
217 raise MusicAssistantError("Artist still has tracks linked")
218 with contextlib.suppress(MediaNotFoundError):
219 await self.mass.music.tracks.remove_item_from_library(db_row["track_id"])
220
221 # delete the artist itself from db
222 # this will raise if the item still has references and recursive is false
223 await super().remove_item_from_library(db_id)
224
225 async def get_provider_artist_toptracks(
226 self,
227 item_id: str,
228 provider_instance_id_or_domain: str,
229 ) -> list[Track]:
230 """Return top tracks for an artist on given provider."""
231 assert provider_instance_id_or_domain != "library"
232 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
233 return []
234 prov = cast("MusicProvider", prov)
235 if ProviderFeature.ARTIST_TOPTRACKS in prov.supported_features:
236 return await prov.get_artist_toptracks(item_id)
237 # fallback implementation using the library db
238 if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
239 item_id,
240 provider_instance_id_or_domain,
241 ):
242 db_artist_id = int(db_artist.item_id) # ensure integer
243 subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id"
244 query = f"tracks.item_id in ({subquery})"
245 return await self.mass.music.tracks.get_library_items_by_query(
246 extra_query_parts=[query],
247 extra_query_params={"artist_id": db_artist_id},
248 provider_filter=[provider_instance_id_or_domain],
249 )
250 return []
251
252 async def get_library_artist_tracks(
253 self,
254 item_id: str | int,
255 ) -> list[Track]:
256 """Return all tracks for an artist in the library/db."""
257 db_id = int(item_id) # ensure integer
258 subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id"
259 query = f"tracks.item_id in ({subquery})"
260 return await self.mass.music.tracks.get_library_items_by_query(
261 extra_query_parts=[query],
262 extra_query_params={"artist_id": db_id},
263 )
264
265 async def get_provider_artist_albums(
266 self,
267 item_id: str,
268 provider_instance_id_or_domain: str,
269 ) -> list[Album]:
270 """Return albums for an artist on given provider."""
271 assert provider_instance_id_or_domain != "library"
272 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
273 return []
274 prov = cast("MusicProvider", prov)
275 if ProviderFeature.ARTIST_ALBUMS in prov.supported_features:
276 return await prov.get_artist_albums(item_id)
277 # fallback implementation using the db
278 if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
279 item_id,
280 provider_instance_id_or_domain,
281 ):
282 db_artist_id = int(db_artist.item_id) # ensure integer
283 subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id"
284 query = f"albums.item_id in ({subquery})"
285 return await self.mass.music.albums.get_library_items_by_query(
286 extra_query_parts=[query],
287 extra_query_params={"artist_id": db_artist_id},
288 provider_filter=[provider_instance_id_or_domain],
289 )
290 return []
291
292 async def get_library_artist_albums(
293 self,
294 item_id: str | int,
295 ) -> list[Album]:
296 """Return all in-library albums for an artist."""
297 db_id = int(item_id) # ensure integer
298 subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id"
299 query = f"albums.item_id in ({subquery})"
300 return await self.mass.music.albums.get_library_items_by_query(
301 extra_query_parts=[query],
302 extra_query_params={"artist_id": db_id},
303 )
304
305 async def _add_library_item(
306 self, item: Artist | ItemMapping, overwrite_existing: bool = False
307 ) -> int:
308 """Add a new item record to the database."""
309 # If item is an ItemMapping, convert it
310 if isinstance(item, ItemMapping):
311 item = self.artist_from_item_mapping(item)
312 # enforce various artists name + id
313 if compare_strings(item.name, VARIOUS_ARTISTS_NAME):
314 item.mbid = VARIOUS_ARTISTS_MBID
315 if item.mbid == VARIOUS_ARTISTS_MBID:
316 item.name = VARIOUS_ARTISTS_NAME
317 # no existing item matched: insert item
318 db_id = await self.mass.music.database.insert(
319 self.db_table,
320 {
321 "name": item.name,
322 "sort_name": item.sort_name,
323 "favorite": item.favorite,
324 "external_ids": serialize_to_json(item.external_ids),
325 "metadata": serialize_to_json(item.metadata),
326 "search_name": create_safe_string(item.name, True, True),
327 "search_sort_name": create_safe_string(item.sort_name or "", True, True),
328 "timestamp_added": int(item.date_added.timestamp()) if item.date_added else UNSET,
329 },
330 )
331 # update/set provider_mappings table
332 await self.set_provider_mappings(db_id, item.provider_mappings)
333 self.logger.debug("added %s to database (id: %s)", item.name, db_id)
334 return db_id
335
336 async def _update_library_item(
337 self, item_id: str | int, update: Artist | ItemMapping, overwrite: bool = False
338 ) -> None:
339 """Update existing record in the database."""
340 db_id = int(item_id) # ensure integer
341 cur_item = await self.get_library_item(db_id)
342 if isinstance(update, ItemMapping):
343 # NOTE that artist is the only mediatype where its accepted we
344 # receive an itemmapping from streaming providers
345 update = self.artist_from_item_mapping(update)
346 metadata = cur_item.metadata
347 else:
348 metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
349 cur_item.external_ids.update(update.external_ids)
350 # enforce various artists name + id
351 mbid = cur_item.mbid
352 if (not mbid or overwrite) and getattr(update, "mbid", None):
353 if compare_strings(update.name, VARIOUS_ARTISTS_NAME):
354 update.mbid = VARIOUS_ARTISTS_MBID
355 if update.mbid == VARIOUS_ARTISTS_MBID:
356 update.name = VARIOUS_ARTISTS_NAME
357
358 name = update.name if overwrite else cur_item.name
359 sort_name = update.sort_name if overwrite else cur_item.sort_name or update.sort_name
360 await self.mass.music.database.update(
361 self.db_table,
362 {"item_id": db_id},
363 {
364 "name": name,
365 "sort_name": sort_name,
366 "external_ids": serialize_to_json(
367 update.external_ids if overwrite else cur_item.external_ids
368 ),
369 "metadata": serialize_to_json(metadata),
370 "search_name": create_safe_string(name, True, True),
371 "search_sort_name": create_safe_string(sort_name or "", True, True),
372 "timestamp_added": int(update.date_added.timestamp())
373 if update.date_added
374 else UNSET,
375 },
376 )
377 self.logger.debug("updated %s in database: %s", update.name, db_id)
378 # update/set provider_mappings table
379 provider_mappings = (
380 update.provider_mappings
381 if overwrite
382 else {*update.provider_mappings, *cur_item.provider_mappings}
383 )
384 await self.set_provider_mappings(db_id, provider_mappings, overwrite)
385 self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
386
387 async def radio_mode_base_tracks(
388 self,
389 item: Artist,
390 preferred_provider_instances: list[str] | None = None,
391 ) -> list[Track]:
392 """
393 Get the list of base tracks from the controller used to calculate the dynamic radio.
394
395 :param item: The Artist to get base tracks for.
396 :param preferred_provider_instances: List of preferred provider instance IDs to use.
397 """
398 return await self.tracks(
399 item.item_id,
400 item.provider,
401 in_library_only=False,
402 )
403
404 async def match_provider(
405 self, db_artist: Artist, provider: MusicProvider, strict: bool = True
406 ) -> list[ProviderMapping]:
407 """
408 Try to find match on (streaming) provider for the provided (database) artist.
409
410 This is used to link objects of different providers/qualities together.
411 """
412 self.logger.debug("Trying to match artist %s on provider %s", db_artist.name, provider.name)
413 matches: list[ProviderMapping] = []
414 # try to get a match with some reference tracks of this artist
415 ref_tracks = await self.mass.music.artists.tracks(db_artist.item_id, db_artist.provider)
416 if len(ref_tracks) < 10:
417 # fetch reference tracks from provider(s) attached to the artist
418 for provider_mapping in db_artist.provider_mappings:
419 with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
420 ref_tracks += await self.mass.music.artists.tracks(
421 provider_mapping.item_id, provider_mapping.provider_instance
422 )
423 for ref_track in ref_tracks:
424 search_str = f"{db_artist.name} - {ref_track.name}"
425 search_results = await self.mass.music.tracks.search(search_str, provider.domain)
426 for search_result_item in search_results:
427 if not compare_strings(search_result_item.name, ref_track.name, strict=strict):
428 continue
429 # get matching artist from track
430 for search_item_artist in search_result_item.artists:
431 if not compare_strings(search_item_artist.name, db_artist.name, strict=strict):
432 continue
433 # 100% track match
434 # get full artist details so we have all metadata
435 prov_artist = await self.get_provider_item(
436 search_item_artist.item_id,
437 search_item_artist.provider,
438 fallback=search_item_artist,
439 )
440 # 100% match
441 matches.extend(prov_artist.provider_mappings)
442 if matches:
443 return matches
444 # try to get a match with some reference albums of this artist
445 ref_albums = await self.mass.music.artists.albums(db_artist.item_id, db_artist.provider)
446 if len(ref_albums) < 10:
447 # fetch reference albums from provider(s) attached to the artist
448 for provider_mapping in db_artist.provider_mappings:
449 with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
450 ref_albums += await self.mass.music.artists.albums(
451 provider_mapping.item_id, provider_mapping.provider_instance
452 )
453 for ref_album in ref_albums:
454 if ref_album.album_type == AlbumType.COMPILATION:
455 continue
456 if not ref_album.artists:
457 continue
458 search_str = f"{db_artist.name} - {ref_album.name}"
459 search_result_albums = await self.mass.music.albums.search(search_str, provider.domain)
460 for search_result_album in search_result_albums:
461 if not search_result_album.artists:
462 continue
463 if not compare_strings(search_result_album.name, ref_album.name, strict=strict):
464 continue
465 # artist must match 100%
466 if not compare_artist(db_artist, search_result_album.artists[0], strict=strict):
467 continue
468 # 100% match
469 # get full artist details so we have all metadata
470 prov_artist = await self.get_provider_item(
471 search_result_album.artists[0].item_id,
472 search_result_album.artists[0].provider,
473 fallback=search_result_album.artists[0],
474 )
475 matches.extend(prov_artist.provider_mappings)
476 if matches:
477 return matches
478 if not matches:
479 self.logger.debug(
480 "Could not find match for Artist %s on provider %s",
481 db_artist.name,
482 provider.name,
483 )
484 return matches
485
486 async def match_providers(self, db_artist: Artist) -> None:
487 """Try to find matching artists on all providers for the provided (database) item_id.
488
489 This is used to link objects of different providers together.
490 """
491 if db_artist.provider != "library":
492 return # Matching only supported for database items
493
494 # try to find match on all providers
495
496 cur_provider_domains = {
497 x.provider_domain for x in db_artist.provider_mappings if x.available
498 }
499 for provider in self.mass.music.providers:
500 if provider.domain in cur_provider_domains:
501 continue
502 if ProviderFeature.SEARCH not in provider.supported_features:
503 continue
504 if not provider.library_supported(MediaType.ARTIST):
505 continue
506 if not provider.is_streaming_provider:
507 # matching on unique providers is pointless as they push (all) their content to MA
508 continue
509 if match := await self.match_provider(db_artist, provider):
510 # 100% match, we update the db with the additional provider mapping(s)
511 await self.add_provider_mappings(db_artist.item_id, match)
512 cur_provider_domains.add(provider.domain)
513
514 def artist_from_item_mapping(self, item: ItemMapping) -> Artist:
515 """Create an Artist object from an ItemMapping object."""
516 domain, instance_id = None, None
517 if prov := self.mass.get_provider(item.provider):
518 domain = prov.domain
519 instance_id = prov.instance_id
520 return Artist.from_dict(
521 {
522 **item.to_dict(),
523 "provider_mappings": [
524 {
525 "item_id": item.item_id,
526 "provider_domain": domain,
527 "provider_instance": instance_id,
528 "available": item.available,
529 }
530 ],
531 }
532 )
533