/
/
/
1"""Manage MediaItems of type Artist."""
2
3from __future__ import annotations
4
5import asyncio
6import contextlib
7from typing import TYPE_CHECKING, Any, cast
8
9from music_assistant_models.enums import AlbumType, MediaType, ProviderFeature
10from music_assistant_models.errors import (
11 MediaNotFoundError,
12 MusicAssistantError,
13 ProviderUnavailableError,
14)
15from music_assistant_models.media_items import Album, Artist, ItemMapping, ProviderMapping, Track
16
17from music_assistant.constants import (
18 DB_TABLE_ALBUM_ARTISTS,
19 DB_TABLE_ARTISTS,
20 DB_TABLE_TRACK_ARTISTS,
21 VARIOUS_ARTISTS_MBID,
22 VARIOUS_ARTISTS_NAME,
23)
24from music_assistant.controllers.media.base import MediaControllerBase
25from music_assistant.helpers.compare import compare_artist, compare_strings, create_safe_string
26from music_assistant.helpers.database import UNSET
27from music_assistant.helpers.json import serialize_to_json
28
29if TYPE_CHECKING:
30 from music_assistant import MusicAssistant
31 from music_assistant.models.music_provider import MusicProvider
32
33
34class ArtistsController(MediaControllerBase[Artist]):
35 """Controller managing MediaItems of type Artist."""
36
37 db_table = DB_TABLE_ARTISTS
38 media_type = MediaType.ARTIST
39 item_cls = Artist
40
41 def __init__(self, mass: MusicAssistant) -> None:
42 """Initialize class."""
43 super().__init__(mass)
44 self._db_add_lock = asyncio.Lock()
45 # register (extra) api handlers
46 api_base = self.api_base
47 self.mass.register_api_command(f"music/{api_base}/artist_albums", self.albums)
48 self.mass.register_api_command(f"music/{api_base}/artist_tracks", self.tracks)
49
50 async def library_count(
51 self, favorite_only: bool = False, album_artists_only: bool = False
52 ) -> int:
53 """Return the total number of items in the library."""
54 sql_query = f"SELECT item_id FROM {self.db_table}"
55 query_parts: list[str] = []
56 if favorite_only:
57 query_parts.append("favorite = 1")
58 if album_artists_only:
59 query_parts.append(
60 f"item_id in (select {DB_TABLE_ALBUM_ARTISTS}.artist_id "
61 f"FROM {DB_TABLE_ALBUM_ARTISTS})"
62 )
63 if query_parts:
64 sql_query += f" WHERE {' AND '.join(query_parts)}"
65 return await self.mass.music.database.get_count_from_query(sql_query)
66
67 async def library_items(
68 self,
69 favorite: bool | None = None,
70 search: str | None = None,
71 limit: int = 500,
72 offset: int = 0,
73 order_by: str = "sort_name",
74 provider: str | list[str] | None = None,
75 genre: int | list[int] | None = None,
76 album_artists_only: bool = False,
77 **kwargs: Any,
78 ) -> list[Artist]:
79 """Get in-database (album) artists.
80
81 :param favorite: Filter by favorite status.
82 :param search: Filter by search query.
83 :param limit: Maximum number of items to return.
84 :param offset: Number of items to skip.
85 :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
86 :param provider: Filter by provider instance ID (single string or list).
87 :param album_artists_only: Only return artists that have albums.
88 :param genre: Filter by genre id(s).
89 """
90 extra_query_params: dict[str, Any] = {}
91 extra_query_parts: list[str] = []
92 if album_artists_only:
93 extra_query_parts.append(
94 f"artists.item_id in (select {DB_TABLE_ALBUM_ARTISTS}.artist_id "
95 f"from {DB_TABLE_ALBUM_ARTISTS})"
96 )
97 return await self.get_library_items_by_query(
98 favorite=favorite,
99 search=search,
100 genre_ids=genre,
101 limit=limit,
102 offset=offset,
103 order_by=order_by,
104 provider_filter=self._ensure_provider_filter(provider),
105 extra_query_parts=extra_query_parts,
106 extra_query_params=extra_query_params,
107 in_library_only=True,
108 )
109
110 async def tracks(
111 self,
112 item_id: str,
113 provider_instance_id_or_domain: str,
114 in_library_only: bool = False,
115 provider_filter: str | list[str] | None = None,
116 ) -> list[Track]:
117 """Return all/top tracks for an artist."""
118 if provider_filter and provider_instance_id_or_domain != "library":
119 raise MusicAssistantError("Cannot use provider_filter with specific provider request")
120 if isinstance(provider_filter, str):
121 provider_filter = [provider_filter]
122 # always check if we have a library item for this artist
123 library_artist = await self.get_library_item_by_prov_id(
124 item_id, provider_instance_id_or_domain
125 )
126 if not library_artist:
127 return await self.get_provider_artist_toptracks(item_id, provider_instance_id_or_domain)
128 db_items = await self.get_library_artist_tracks(library_artist.item_id)
129 result: list[Track] = db_items
130 if in_library_only and not provider_filter:
131 # return in-library items only
132 return result
133 # return all (unique) items from all providers
134 # initialize unique_ids with db_items to prevent duplicates
135 unique_ids: set[str] = {f"{item.name}.{item.version}" for item in db_items}
136 unique_providers = self.mass.music.get_unique_providers()
137 for provider_mapping in library_artist.provider_mappings:
138 if provider_mapping.provider_instance not in unique_providers:
139 continue
140 if provider_filter and provider_mapping.provider_instance not in provider_filter:
141 continue
142 provider_tracks = await self.get_provider_artist_toptracks(
143 provider_mapping.item_id, provider_mapping.provider_instance
144 )
145 for provider_track in provider_tracks:
146 unique_id = f"{provider_track.name}.{provider_track.version}"
147 if unique_id in unique_ids:
148 continue
149 unique_ids.add(unique_id)
150 # prefer db item
151 if db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
152 provider_track.item_id, provider_track.provider
153 ):
154 result.append(db_item)
155 elif not in_library_only:
156 result.append(provider_track)
157 return result
158
159 async def albums(
160 self,
161 item_id: str,
162 provider_instance_id_or_domain: str,
163 in_library_only: bool = False,
164 ) -> list[Album]:
165 """Return (all/most popular) albums for an artist."""
166 # always check if we have a library item for this artist
167 library_artist = await self.get_library_item_by_prov_id(
168 item_id, provider_instance_id_or_domain
169 )
170 if not library_artist:
171 return await self.get_provider_artist_albums(item_id, provider_instance_id_or_domain)
172 db_items = await self.get_library_artist_albums(library_artist.item_id)
173 result: list[Album] = db_items
174 if in_library_only:
175 # return in-library items only
176 return result
177 # return all (unique) items from all providers
178 # initialize unique_ids with db_items to prevent duplicates
179 unique_ids: set[str] = {f"{item.name}.{item.version}" for item in db_items}
180 unique_providers = self.mass.music.get_unique_providers()
181 for provider_mapping in library_artist.provider_mappings:
182 if provider_mapping.provider_instance not in unique_providers:
183 continue
184 provider_albums = await self.get_provider_artist_albums(
185 provider_mapping.item_id, provider_mapping.provider_instance
186 )
187 for provider_album in provider_albums:
188 unique_id = f"{provider_album.name}.{provider_album.version}"
189 if unique_id in unique_ids:
190 continue
191 unique_ids.add(unique_id)
192 # prefer db item
193 if db_item := await self.mass.music.albums.get_library_item_by_prov_id(
194 provider_album.item_id, provider_album.provider
195 ):
196 result.append(db_item)
197 elif not in_library_only:
198 result.append(provider_album)
199 return result
200
201 async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
202 """Delete record from the database."""
203 db_id = int(item_id) # ensure integer
204
205 # recursively also remove artist albums
206 for db_row in await self.mass.music.database.get_rows_from_query(
207 f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id",
208 {"artist_id": db_id},
209 limit=5000,
210 ):
211 if not recursive:
212 raise MusicAssistantError("Artist still has albums linked")
213 with contextlib.suppress(MediaNotFoundError):
214 await self.mass.music.albums.remove_item_from_library(db_row["album_id"])
215 # recursively also remove artist tracks
216 for db_row in await self.mass.music.database.get_rows_from_query(
217 f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id",
218 {"artist_id": db_id},
219 limit=5000,
220 ):
221 if not recursive:
222 raise MusicAssistantError("Artist still has tracks linked")
223 with contextlib.suppress(MediaNotFoundError):
224 await self.mass.music.tracks.remove_item_from_library(db_row["track_id"])
225
226 # delete the artist itself from db
227 # this will raise if the item still has references and recursive is false
228 await super().remove_item_from_library(db_id)
229
230 async def get_provider_artist_toptracks(
231 self,
232 item_id: str,
233 provider_instance_id_or_domain: str,
234 ) -> list[Track]:
235 """Return top tracks for an artist on given provider."""
236 assert provider_instance_id_or_domain != "library"
237 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
238 return []
239 prov = cast("MusicProvider", prov)
240 if ProviderFeature.ARTIST_TOPTRACKS in prov.supported_features:
241 return await prov.get_artist_toptracks(item_id)
242 # fallback implementation using the library db
243 if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
244 item_id,
245 provider_instance_id_or_domain,
246 ):
247 db_artist_id = int(db_artist.item_id) # ensure integer
248 subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id"
249 query = f"tracks.item_id in ({subquery})"
250 return await self.mass.music.tracks.get_library_items_by_query(
251 extra_query_parts=[query],
252 extra_query_params={"artist_id": db_artist_id},
253 provider_filter=[provider_instance_id_or_domain],
254 )
255 return []
256
257 async def get_library_artist_tracks(
258 self,
259 item_id: str | int,
260 ) -> list[Track]:
261 """Return all tracks for an artist in the library/db."""
262 db_id = int(item_id) # ensure integer
263 subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = :artist_id"
264 query = f"tracks.item_id in ({subquery})"
265 return await self.mass.music.tracks.get_library_items_by_query(
266 extra_query_parts=[query],
267 extra_query_params={"artist_id": db_id},
268 )
269
270 async def get_provider_artist_albums(
271 self,
272 item_id: str,
273 provider_instance_id_or_domain: str,
274 ) -> list[Album]:
275 """Return albums for an artist on given provider."""
276 assert provider_instance_id_or_domain != "library"
277 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
278 return []
279 prov = cast("MusicProvider", prov)
280 if ProviderFeature.ARTIST_ALBUMS in prov.supported_features:
281 return await prov.get_artist_albums(item_id)
282 # fallback implementation using the db
283 if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
284 item_id,
285 provider_instance_id_or_domain,
286 ):
287 db_artist_id = int(db_artist.item_id) # ensure integer
288 subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id"
289 query = f"albums.item_id in ({subquery})"
290 return await self.mass.music.albums.get_library_items_by_query(
291 extra_query_parts=[query],
292 extra_query_params={"artist_id": db_artist_id},
293 provider_filter=[provider_instance_id_or_domain],
294 )
295 return []
296
297 async def get_library_artist_albums(
298 self,
299 item_id: str | int,
300 ) -> list[Album]:
301 """Return all in-library albums for an artist."""
302 db_id = int(item_id) # ensure integer
303 subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = :artist_id"
304 query = f"albums.item_id in ({subquery})"
305 return await self.mass.music.albums.get_library_items_by_query(
306 extra_query_parts=[query],
307 extra_query_params={"artist_id": db_id},
308 )
309
310 async def _add_library_item(
311 self, item: Artist | ItemMapping, overwrite_existing: bool = False
312 ) -> int:
313 """Add a new item record to the database."""
314 # If item is an ItemMapping, convert it
315 if isinstance(item, ItemMapping):
316 item = self.artist_from_item_mapping(item)
317 # enforce various artists name + id
318 if compare_strings(item.name, VARIOUS_ARTISTS_NAME):
319 item.mbid = VARIOUS_ARTISTS_MBID
320 if item.mbid == VARIOUS_ARTISTS_MBID:
321 item.name = VARIOUS_ARTISTS_NAME
322 # no existing item matched: insert item
323 db_id = await self.mass.music.database.insert(
324 self.db_table,
325 {
326 "name": item.name,
327 "sort_name": item.sort_name,
328 "favorite": item.favorite,
329 "external_ids": serialize_to_json(item.external_ids),
330 "metadata": serialize_to_json(item.metadata),
331 "search_name": create_safe_string(item.name, True, True),
332 "search_sort_name": create_safe_string(item.sort_name or "", True, True),
333 "timestamp_added": int(item.date_added.timestamp()) if item.date_added else UNSET,
334 },
335 )
336 # update/set provider_mappings table
337 await self.set_provider_mappings(db_id, item.provider_mappings)
338 self.logger.debug("added %s to database (id: %s)", item.name, db_id)
339 return db_id
340
341 async def _update_library_item(
342 self, item_id: str | int, update: Artist | ItemMapping, overwrite: bool = False
343 ) -> None:
344 """Update existing record in the database."""
345 db_id = int(item_id) # ensure integer
346 cur_item = await self.get_library_item(db_id)
347 if isinstance(update, ItemMapping):
348 # NOTE that artist is the only mediatype where its accepted we
349 # receive an itemmapping from streaming providers
350 update = self.artist_from_item_mapping(update)
351 metadata = cur_item.metadata
352 else:
353 metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
354 cur_item.external_ids.update(update.external_ids)
355 # enforce various artists name + id
356 mbid = cur_item.mbid
357 if (not mbid or overwrite) and getattr(update, "mbid", None):
358 if compare_strings(update.name, VARIOUS_ARTISTS_NAME):
359 update.mbid = VARIOUS_ARTISTS_MBID
360 if update.mbid == VARIOUS_ARTISTS_MBID:
361 update.name = VARIOUS_ARTISTS_NAME
362
363 name = update.name if overwrite else cur_item.name
364 sort_name = update.sort_name if overwrite else cur_item.sort_name or update.sort_name
365 await self.mass.music.database.update(
366 self.db_table,
367 {"item_id": db_id},
368 {
369 "name": name,
370 "sort_name": sort_name,
371 "external_ids": serialize_to_json(
372 update.external_ids if overwrite else cur_item.external_ids
373 ),
374 "metadata": serialize_to_json(metadata),
375 "search_name": create_safe_string(name, True, True),
376 "search_sort_name": create_safe_string(sort_name or "", True, True),
377 "timestamp_added": int(update.date_added.timestamp())
378 if update.date_added
379 else UNSET,
380 },
381 )
382 self.logger.debug("updated %s in database: %s", update.name, db_id)
383 # update/set provider_mappings table
384 provider_mappings = (
385 update.provider_mappings
386 if overwrite
387 else {*update.provider_mappings, *cur_item.provider_mappings}
388 )
389 await self.set_provider_mappings(db_id, provider_mappings, overwrite)
390 self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
391
392 async def radio_mode_base_tracks(
393 self,
394 item: Artist,
395 preferred_provider_instances: list[str] | None = None,
396 ) -> list[Track]:
397 """
398 Get the list of base tracks from the controller used to calculate the dynamic radio.
399
400 :param item: The Artist to get base tracks for.
401 :param preferred_provider_instances: List of preferred provider instance IDs to use.
402 """
403 return await self.tracks(
404 item.item_id,
405 item.provider,
406 in_library_only=False,
407 )
408
409 async def match_provider(
410 self, db_artist: Artist, provider: MusicProvider, strict: bool = True
411 ) -> list[ProviderMapping]:
412 """
413 Try to find match on (streaming) provider for the provided (database) artist.
414
415 This is used to link objects of different providers/qualities together.
416 """
417 self.logger.debug("Trying to match artist %s on provider %s", db_artist.name, provider.name)
418 matches: list[ProviderMapping] = []
419 # try to get a match with some reference tracks of this artist
420 ref_tracks = await self.mass.music.artists.tracks(db_artist.item_id, db_artist.provider)
421 if len(ref_tracks) < 10:
422 # fetch reference tracks from provider(s) attached to the artist
423 for provider_mapping in db_artist.provider_mappings:
424 with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
425 ref_tracks += await self.mass.music.artists.tracks(
426 provider_mapping.item_id, provider_mapping.provider_instance
427 )
428 for ref_track in ref_tracks:
429 search_str = f"{db_artist.name} - {ref_track.name}"
430 search_results = await self.mass.music.tracks.search(search_str, provider.domain)
431 for search_result_item in search_results:
432 if not compare_strings(search_result_item.name, ref_track.name, strict=strict):
433 continue
434 # get matching artist from track
435 for search_item_artist in search_result_item.artists:
436 if not compare_strings(search_item_artist.name, db_artist.name, strict=strict):
437 continue
438 # 100% track match
439 # get full artist details so we have all metadata
440 prov_artist = await self.get_provider_item(
441 search_item_artist.item_id,
442 search_item_artist.provider,
443 fallback=search_item_artist,
444 )
445 # 100% match
446 matches.extend(prov_artist.provider_mappings)
447 if matches:
448 return matches
449 # try to get a match with some reference albums of this artist
450 ref_albums = await self.mass.music.artists.albums(db_artist.item_id, db_artist.provider)
451 if len(ref_albums) < 10:
452 # fetch reference albums from provider(s) attached to the artist
453 for provider_mapping in db_artist.provider_mappings:
454 with contextlib.suppress(ProviderUnavailableError, MediaNotFoundError):
455 ref_albums += await self.mass.music.artists.albums(
456 provider_mapping.item_id, provider_mapping.provider_instance
457 )
458 for ref_album in ref_albums:
459 if ref_album.album_type == AlbumType.COMPILATION:
460 continue
461 if not ref_album.artists:
462 continue
463 search_str = f"{db_artist.name} - {ref_album.name}"
464 search_result_albums = await self.mass.music.albums.search(search_str, provider.domain)
465 for search_result_album in search_result_albums:
466 if not search_result_album.artists:
467 continue
468 if not compare_strings(search_result_album.name, ref_album.name, strict=strict):
469 continue
470 # artist must match 100%
471 if not compare_artist(db_artist, search_result_album.artists[0], strict=strict):
472 continue
473 # 100% match
474 # get full artist details so we have all metadata
475 prov_artist = await self.get_provider_item(
476 search_result_album.artists[0].item_id,
477 search_result_album.artists[0].provider,
478 fallback=search_result_album.artists[0],
479 )
480 matches.extend(prov_artist.provider_mappings)
481 if matches:
482 return matches
483 if not matches:
484 self.logger.debug(
485 "Could not find match for Artist %s on provider %s",
486 db_artist.name,
487 provider.name,
488 )
489 return matches
490
491 async def match_providers(self, db_artist: Artist) -> None:
492 """Try to find matching artists on all providers for the provided (database) item_id.
493
494 This is used to link objects of different providers together.
495 """
496 if db_artist.provider != "library":
497 return # Matching only supported for database items
498
499 # try to find match on all providers
500
501 cur_provider_domains = {
502 x.provider_domain for x in db_artist.provider_mappings if x.available
503 }
504 for provider in self.mass.music.providers:
505 if provider.domain in cur_provider_domains:
506 continue
507 if ProviderFeature.SEARCH not in provider.supported_features:
508 continue
509 if not provider.library_supported(MediaType.ARTIST):
510 continue
511 if not provider.is_streaming_provider:
512 # matching on unique providers is pointless as they push (all) their content to MA
513 continue
514 if match := await self.match_provider(db_artist, provider):
515 # 100% match, we update the db with the additional provider mapping(s)
516 await self.add_provider_mappings(db_artist.item_id, match)
517 cur_provider_domains.add(provider.domain)
518
519 def artist_from_item_mapping(self, item: ItemMapping) -> Artist:
520 """Create an Artist object from an ItemMapping object."""
521 domain, instance_id = None, None
522 if prov := self.mass.get_provider(item.provider):
523 domain = prov.domain
524 instance_id = prov.instance_id
525 return Artist.from_dict(
526 {
527 **item.to_dict(),
528 "provider_mappings": [
529 {
530 "item_id": item.item_id,
531 "provider_domain": domain,
532 "provider_instance": instance_id,
533 "available": item.available,
534 }
535 ],
536 }
537 )
538