/
/
/
1"""Manage MediaItems of type Album."""
2
3from __future__ import annotations
4
5import contextlib
6from collections.abc import Iterable
7from typing import TYPE_CHECKING, Any, cast
8
9from music_assistant_models.enums import AlbumType, MediaType, ProviderFeature
10from music_assistant_models.errors import InvalidDataError, MediaNotFoundError, MusicAssistantError
11from music_assistant_models.media_items import (
12 Album,
13 Artist,
14 ItemMapping,
15 MediaItemImage,
16 ProviderMapping,
17 Track,
18 UniqueList,
19)
20
21from music_assistant.constants import DB_TABLE_ALBUM_ARTISTS, DB_TABLE_ALBUM_TRACKS, DB_TABLE_ALBUMS
22from music_assistant.controllers.media.base import MediaControllerBase
23from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
24from music_assistant.helpers.compare import (
25 compare_album,
26 compare_artists,
27 compare_media_item,
28 create_safe_string,
29 loose_compare_strings,
30)
31from music_assistant.helpers.database import UNSET
32from music_assistant.helpers.json import serialize_to_json
33from music_assistant.models.music_provider import MusicProvider
34
35if TYPE_CHECKING:
36 from music_assistant import MusicAssistant
37
38
39class AlbumsController(MediaControllerBase[Album]):
40 """Controller managing MediaItems of type Album."""
41
42 db_table = DB_TABLE_ALBUMS
43 media_type = MediaType.ALBUM
44 item_cls = Album
45
46 def __init__(self, mass: MusicAssistant) -> None:
47 """Initialize class."""
48 super().__init__(mass)
49 self.base_query = """
50 SELECT
51 albums.*,
52 (SELECT JSON_GROUP_ARRAY(
53 json_object(
54 'item_id', album_pm.provider_item_id,
55 'provider_domain', album_pm.provider_domain,
56 'provider_instance', album_pm.provider_instance,
57 'available', album_pm.available,
58 'audio_format', json(album_pm.audio_format),
59 'url', album_pm.url,
60 'details', album_pm.details,
61 'in_library', album_pm.in_library,
62 'is_unique', album_pm.is_unique
63 )) FROM provider_mappings album_pm WHERE album_pm.item_id = albums.item_id AND album_pm.media_type = 'album') AS provider_mappings,
64 (SELECT JSON_GROUP_ARRAY(
65 json_object(
66 'item_id', artists.item_id,
67 'provider', 'library',
68 'name', artists.name,
69 'sort_name', artists.sort_name,
70 'media_type', 'artist'
71 )) FROM artists JOIN album_artists on album_artists.album_id = albums.item_id WHERE artists.item_id = album_artists.artist_id) AS artists
72 FROM albums""" # noqa: E501
73 # register (extra) api handlers
74 api_base = self.api_base
75 self.mass.register_api_command(f"music/{api_base}/album_tracks", self.tracks)
76 self.mass.register_api_command(f"music/{api_base}/album_versions", self.versions)
77
78 async def get(
79 self,
80 item_id: str,
81 provider_instance_id_or_domain: str,
82 recursive: bool = True,
83 ) -> Album:
84 """Return (full) details for a single media item."""
85 album = await super().get(
86 item_id,
87 provider_instance_id_or_domain,
88 )
89 if not recursive:
90 return album
91
92 # append artist details to full album item (resolve ItemMappings)
93 album_artists: UniqueList[Artist | ItemMapping] = UniqueList()
94 for artist in album.artists:
95 if not isinstance(artist, ItemMapping):
96 album_artists.append(artist)
97 continue
98 with contextlib.suppress(MediaNotFoundError):
99 album_artists.append(
100 await self.mass.music.artists.get(
101 artist.item_id,
102 artist.provider,
103 )
104 )
105 album.artists = album_artists
106 return album
107
108 async def library_items(
109 self,
110 favorite: bool | None = None,
111 search: str | None = None,
112 limit: int = 500,
113 offset: int = 0,
114 order_by: str = "sort_name",
115 provider: str | list[str] | None = None,
116 album_types: list[AlbumType] | None = None,
117 **kwargs: Any,
118 ) -> list[Album]:
119 """Get in-database albums.
120
121 :param favorite: Filter by favorite status.
122 :param search: Filter by search query.
123 :param limit: Maximum number of items to return.
124 :param offset: Number of items to skip.
125 :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
126 :param provider: Filter by provider instance ID (single string or list).
127 :param album_types: Filter by album types.
128 """
129 extra_query_params: dict[str, Any] = {}
130 extra_query_parts: list[str] = []
131 extra_join_parts: list[str] = []
132 artist_table_joined = False
133 # optional album type filter
134 if album_types:
135 extra_query_parts.append("albums.album_type IN :album_types")
136 extra_query_params["album_types"] = [x.value for x in album_types]
137 if order_by and "artist_name" in order_by:
138 # join artist table to allow sorting on artist name
139 extra_join_parts.append(
140 "JOIN album_artists ON album_artists.album_id = albums.item_id "
141 "JOIN artists ON artists.item_id = album_artists.artist_id "
142 )
143 artist_table_joined = True
144 if search and " - " in search:
145 # handle combined artist + title search
146 artist_str, title_str = search.split(" - ", 1)
147 search = None
148 title_str = create_safe_string(title_str, True, True)
149 artist_str = create_safe_string(artist_str, True, True)
150 extra_query_parts.append("albums.search_name LIKE :search_title")
151 extra_query_params["search_title"] = f"%{title_str}%"
152 # use join with artists table to filter on artist name
153 extra_join_parts.append(
154 "JOIN album_artists ON album_artists.album_id = albums.item_id "
155 "JOIN artists ON artists.item_id = album_artists.artist_id "
156 "AND artists.search_name LIKE :search_artist"
157 if not artist_table_joined
158 else "AND artists.search_name LIKE :search_artist"
159 )
160 artist_table_joined = True
161 extra_query_params["search_artist"] = f"%{artist_str}%"
162 result = await self.get_library_items_by_query(
163 favorite=favorite,
164 search=search,
165 limit=limit,
166 offset=offset,
167 order_by=order_by,
168 provider_filter=self._ensure_provider_filter(provider),
169 extra_query_parts=extra_query_parts,
170 extra_query_params=extra_query_params,
171 extra_join_parts=extra_join_parts,
172 in_library_only=True,
173 )
174
175 # Calculate how many more items we need to reach the original limit
176 remaining_limit = limit - len(result)
177
178 if search and len(result) < 25 and not offset and remaining_limit > 0:
179 # append artist items to result
180 search = create_safe_string(search, True, True)
181 extra_join_parts.append(
182 "JOIN album_artists ON album_artists.album_id = albums.item_id "
183 "JOIN artists ON artists.item_id = album_artists.artist_id "
184 "AND artists.search_name LIKE :search_artist"
185 if not artist_table_joined
186 else "AND artists.search_name LIKE :search_artist"
187 )
188 extra_query_params["search_artist"] = f"%{search}%"
189 existing_uris = {item.uri for item in result}
190
191 for album in await self.get_library_items_by_query(
192 favorite=favorite,
193 search=None,
194 limit=remaining_limit,
195 order_by=order_by,
196 provider_filter=self._ensure_provider_filter(provider),
197 extra_query_parts=extra_query_parts,
198 extra_query_params=extra_query_params,
199 extra_join_parts=extra_join_parts,
200 in_library_only=True,
201 ):
202 # prevent duplicates (when artist is also in the title)
203 if album.uri not in existing_uris:
204 result.append(album)
205 # Stop if we've reached the original limit
206 if len(result) >= limit:
207 break
208 return result
209
210 async def library_count(
211 self, favorite_only: bool = False, album_types: list[AlbumType] | None = None
212 ) -> int:
213 """Return the total number of items in the library."""
214 sql_query = f"SELECT item_id FROM {self.db_table}"
215 query_parts: list[str] = []
216 query_params: dict[str, Any] = {}
217 if favorite_only:
218 query_parts.append("favorite = 1")
219 if album_types:
220 query_parts.append("albums.album_type IN :album_types")
221 query_params["album_types"] = [x.value for x in album_types]
222 if query_parts:
223 sql_query += f" WHERE {' AND '.join(query_parts)}"
224 return await self.mass.music.database.get_count_from_query(sql_query, query_params)
225
226 async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
227 """Delete item from the library(database)."""
228 db_id = int(item_id) # ensure integer
229 # recursively also remove album tracks
230 for db_track in await self.get_library_album_tracks(db_id):
231 if not recursive:
232 raise MusicAssistantError("Album still has tracks linked")
233 with contextlib.suppress(MediaNotFoundError):
234 await self.mass.music.tracks.remove_item_from_library(db_track.item_id)
235 # delete entry(s) from albumtracks table
236 await self.mass.music.database.delete(DB_TABLE_ALBUM_TRACKS, {"album_id": db_id})
237 # delete entry(s) from album artists table
238 await self.mass.music.database.delete(DB_TABLE_ALBUM_ARTISTS, {"album_id": db_id})
239 # delete the album itself from db
240 # this will raise if the item still has references and recursive is false
241 await super().remove_item_from_library(item_id)
242
243 async def tracks(
244 self,
245 item_id: str,
246 provider_instance_id_or_domain: str,
247 in_library_only: bool = False,
248 ) -> list[Track]:
249 """Return album tracks for the given provider album id."""
250 # always check if we have a library item for this album
251 library_album = await self.get_library_item_by_prov_id(
252 item_id, provider_instance_id_or_domain
253 )
254 if not library_album:
255 album_tracks = await self._get_provider_album_tracks(
256 item_id, provider_instance_id_or_domain
257 )
258 if album_tracks and not album_tracks[0].image:
259 # set album image from provider album if not present on tracks
260 prov_album = await self.get_provider_item(item_id, provider_instance_id_or_domain)
261 if prov_album.image:
262 for track in album_tracks:
263 if not track.image:
264 track.metadata.add_image(prov_album.image)
265 return album_tracks
266
267 db_items = await self.get_library_album_tracks(library_album.item_id)
268 result: list[Track] = list(db_items)
269 if in_library_only:
270 # return in-library items only
271 return sorted(db_items, key=lambda x: (x.disc_number, x.track_number))
272
273 # return all (unique) items from all providers
274 # because we are returning the items from all providers combined,
275 # we need to make sure that we don't return duplicates
276 unique_ids: set[str] = {f"{x.disc_number}.{x.track_number}" for x in db_items}
277 unique_ids.update({f"{x.name.lower()}.{x.version.lower()}" for x in db_items})
278 for db_item in db_items:
279 unique_ids.update(x.item_id for x in db_item.provider_mappings)
280 user = get_current_user()
281 user_provider_filter = user.provider_filter if user and user.provider_filter else None
282 for provider_mapping in library_album.provider_mappings:
283 if (
284 user_provider_filter
285 and provider_mapping.provider_instance not in user_provider_filter
286 ):
287 continue
288 provider_tracks = await self._get_provider_album_tracks(
289 provider_mapping.item_id, provider_mapping.provider_instance
290 )
291 for provider_track in provider_tracks:
292 # In some cases (looking at you YTM) the disc/track number is not obtained from
293 # library_tracks. Ensure to update the disc/track number when interacting with
294 # album tracks
295 db_track = next(
296 (
297 x
298 for x in db_items
299 if x.sort_name == provider_track.sort_name
300 and x.version == provider_track.version
301 ),
302 None,
303 )
304 if (
305 db_track
306 and db_track.track_number == 0
307 and db_track.track_number != provider_track.track_number
308 ):
309 await self._set_album_track(
310 db_id=int(library_album.item_id),
311 db_track_id=int(db_track.item_id),
312 track=provider_track,
313 )
314 if provider_track.item_id in unique_ids:
315 continue
316 unique_id = f"{provider_track.disc_number}.{provider_track.track_number}"
317 if unique_id in unique_ids:
318 continue
319 unique_id = f"{provider_track.name.lower()}.{provider_track.version.lower()}"
320 if unique_id in unique_ids:
321 continue
322 unique_ids.add(unique_id)
323 provider_track.album = library_album
324 # always prefer album image
325 album_images = [library_album.image] if library_album.image else []
326 track_images: list[MediaItemImage] = provider_track.metadata.images or []
327 provider_track.metadata.images = UniqueList(album_images + track_images)
328 result.append(provider_track)
329 # NOTE: we need to return the results sorted on disc/track here
330 # to ensure the correct order at playback
331 return sorted(result, key=lambda x: (x.disc_number, x.track_number))
332
333 async def versions(
334 self,
335 item_id: str,
336 provider_instance_id_or_domain: str,
337 ) -> UniqueList[Album]:
338 """Return all versions of an album we can find on all providers."""
339 album = await self.get_provider_item(item_id, provider_instance_id_or_domain)
340 search_query = f"{album.artists[0].name} - {album.name}" if album.artists else album.name
341 result: UniqueList[Album] = UniqueList()
342 for provider_id in self.mass.music.get_unique_providers():
343 provider = self.mass.get_provider(provider_id)
344 if not provider or not isinstance(provider, MusicProvider):
345 continue
346 if not provider.library_supported(MediaType.ALBUM):
347 continue
348 result.extend(
349 prov_item
350 for prov_item in await self.search(search_query, provider_id)
351 if loose_compare_strings(album.name, prov_item.name)
352 and compare_artists(prov_item.artists, album.artists, any_match=True)
353 # make sure that the 'base' version is NOT included
354 and not album.provider_mappings.intersection(prov_item.provider_mappings)
355 )
356 return result
357
358 async def get_library_album_tracks(
359 self,
360 item_id: str | int,
361 ) -> list[Track]:
362 """Return in-database album tracks for the given database album."""
363 db_id = int(item_id) # ensure integer
364 return await self.mass.music.tracks.get_library_items_by_query(
365 extra_query_parts=["WHERE album_tracks.album_id = :album_id"],
366 extra_query_params={"album_id": db_id},
367 )
368
369 async def add_item_mapping_as_album_to_library(self, item: ItemMapping) -> Album:
370 """
371 Add an ItemMapping as an Album to the library.
372
373 This is only used in special occasions as is basically adds an album
374 to the db without a lot of mandatory data, such as artists.
375 """
376 album = self.album_from_item_mapping(item)
377 return await self.add_item_to_library(album)
378
379 async def _add_library_item(self, item: Album, overwrite_existing: bool = False) -> int:
380 """Add a new record to the database."""
381 if not isinstance(item, Album): # TODO: Remove this once the codebase is fully typed
382 msg = "Not a valid Album object (ItemMapping can not be added to db)" # type: ignore[unreachable]
383 raise InvalidDataError(msg)
384 db_id = await self.mass.music.database.insert(
385 self.db_table,
386 {
387 "name": item.name,
388 "sort_name": item.sort_name,
389 "version": item.version,
390 "favorite": item.favorite,
391 "album_type": item.album_type,
392 "year": item.year,
393 "metadata": serialize_to_json(item.metadata),
394 "external_ids": serialize_to_json(item.external_ids),
395 "search_name": create_safe_string(item.name, True, True),
396 "search_sort_name": create_safe_string(item.sort_name or "", True, True),
397 "timestamp_added": int(item.date_added.timestamp()) if item.date_added else UNSET,
398 },
399 )
400 # update/set provider_mappings table
401 await self.set_provider_mappings(db_id, item.provider_mappings)
402 # set track artist(s)
403 await self._set_album_artists(db_id, item.artists)
404 self.logger.debug("added %s to database (id: %s)", item.name, db_id)
405 return db_id
406
407 async def _update_library_item(
408 self, item_id: str | int, update: Album, overwrite: bool = False
409 ) -> None:
410 """Update existing record in the database."""
411 db_id = int(item_id) # ensure integer
412 cur_item = await self.get_library_item(db_id)
413 metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
414 if getattr(update, "album_type", AlbumType.UNKNOWN) != AlbumType.UNKNOWN:
415 album_type = update.album_type
416 else:
417 album_type = cur_item.album_type
418 cur_item.external_ids.update(update.external_ids)
419 name = update.name if overwrite else cur_item.name
420 sort_name = update.sort_name if overwrite else cur_item.sort_name or update.sort_name
421 await self.mass.music.database.update(
422 self.db_table,
423 {"item_id": db_id},
424 {
425 "name": name,
426 "sort_name": sort_name,
427 "version": update.version if overwrite else cur_item.version or update.version,
428 "year": update.year if overwrite else cur_item.year or update.year,
429 "album_type": album_type.value,
430 "metadata": serialize_to_json(metadata),
431 "external_ids": serialize_to_json(
432 update.external_ids if overwrite else cur_item.external_ids
433 ),
434 "search_name": create_safe_string(name, True, True),
435 "search_sort_name": create_safe_string(sort_name or "", True, True),
436 "timestamp_added": int(update.date_added.timestamp())
437 if update.date_added
438 else UNSET,
439 },
440 )
441 # update/set provider_mappings table
442 provider_mappings = (
443 update.provider_mappings
444 if overwrite
445 else {*update.provider_mappings, *cur_item.provider_mappings}
446 )
447 await self.set_provider_mappings(db_id, provider_mappings, overwrite)
448 # set album artist(s)
449 artists = update.artists if overwrite else cur_item.artists + update.artists
450 await self._set_album_artists(db_id, artists, overwrite=overwrite)
451 self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
452
453 async def _get_provider_album_tracks(
454 self, item_id: str, provider_instance_id_or_domain: str
455 ) -> list[Track]:
456 """Return album tracks for the given provider album id."""
457 if prov := self.mass.get_provider(provider_instance_id_or_domain):
458 prov = cast("MusicProvider", prov)
459 return await prov.get_album_tracks(item_id)
460 return []
461
462 async def radio_mode_base_tracks(
463 self,
464 item: Album,
465 preferred_provider_instances: list[str] | None = None,
466 ) -> list[Track]:
467 """
468 Get the list of base tracks from the controller used to calculate the dynamic radio.
469
470 :param item: The Album to get base tracks for.
471 :param preferred_provider_instances: List of preferred provider instance IDs to use.
472 """
473 return await self.tracks(item.item_id, item.provider, in_library_only=False)
474
475 async def _set_album_artists(
476 self,
477 db_id: int,
478 artists: Iterable[Artist | ItemMapping],
479 overwrite: bool = False,
480 ) -> None:
481 """Store Album Artists."""
482 if overwrite:
483 # on overwrite, clear the album_artists table first
484 await self.mass.music.database.delete(
485 DB_TABLE_ALBUM_ARTISTS,
486 {
487 "album_id": db_id,
488 },
489 )
490 for artist in artists:
491 await self._set_album_artist(db_id, artist=artist, overwrite=overwrite)
492
493 async def _set_album_artist(
494 self, db_id: int, artist: Artist | ItemMapping, overwrite: bool = False
495 ) -> ItemMapping:
496 """Store Album Artist info."""
497 db_artist: Artist | ItemMapping | None = None
498 if artist.provider == "library":
499 db_artist = artist
500 elif existing := await self.mass.music.artists.get_library_item_by_prov_id(
501 artist.item_id, artist.provider
502 ):
503 db_artist = existing
504
505 if not db_artist or overwrite:
506 # Convert ItemMapping to Artist if needed
507 artist_to_add = (
508 self.mass.music.artists.artist_from_item_mapping(artist)
509 if isinstance(artist, ItemMapping)
510 else artist
511 )
512 db_artist = await self.mass.music.artists.add_item_to_library(
513 artist_to_add, overwrite_existing=overwrite
514 )
515 # write (or update) record in album_artists table
516 await self.mass.music.database.insert_or_replace(
517 DB_TABLE_ALBUM_ARTISTS,
518 {
519 "album_id": db_id,
520 "artist_id": int(db_artist.item_id),
521 },
522 )
523 return ItemMapping.from_item(db_artist)
524
525 async def _set_album_track(self, db_id: int, db_track_id: int, track: Track) -> None:
526 """Store Album Track info."""
527 # write (or update) record in album_tracks table
528 await self.mass.music.database.insert_or_replace(
529 DB_TABLE_ALBUM_TRACKS,
530 {
531 "album_id": db_id,
532 "track_id": db_track_id,
533 "track_number": track.track_number,
534 "disc_number": track.disc_number,
535 },
536 )
537
538 async def match_provider(
539 self, db_album: Album, provider: MusicProvider, strict: bool = True
540 ) -> list[ProviderMapping]:
541 """
542 Try to find match on (streaming) provider for the provided (database) album.
543
544 This is used to link objects of different providers/qualities together.
545 """
546 self.logger.debug("Trying to match album %s on provider %s", db_album.name, provider.name)
547 matches: list[ProviderMapping] = []
548 artist_name = db_album.artists[0].name
549 search_str = f"{artist_name} - {db_album.name}"
550 search_result = await self.search(search_str, provider.instance_id)
551 for search_result_item in search_result:
552 if not search_result_item.available:
553 continue
554 if not compare_media_item(db_album, search_result_item, strict=strict):
555 continue
556 # we must fetch the full album version, search results can be simplified objects
557 prov_album = await self.get_provider_item(
558 search_result_item.item_id,
559 search_result_item.provider,
560 fallback=search_result_item,
561 )
562 if compare_album(db_album, prov_album, strict=strict):
563 # 100% match
564 matches.extend(prov_album.provider_mappings)
565 if not matches:
566 self.logger.debug(
567 "Could not find match for Album %s on provider %s",
568 db_album.name,
569 provider.name,
570 )
571 return matches
572
573 async def match_providers(self, db_album: Album) -> None:
574 """Try to find match on all (streaming) providers for the provided (database) album.
575
576 This is used to link objects of different providers/qualities together.
577 """
578 if db_album.provider != "library":
579 return # Matching only supported for database items
580 if not db_album.artists:
581 return # guard
582
583 # try to find match on all providers
584 processed_domains = set()
585 for provider in self.mass.music.providers:
586 if provider.domain in processed_domains:
587 continue
588 if ProviderFeature.SEARCH not in provider.supported_features:
589 continue
590 if not provider.library_supported(MediaType.ALBUM):
591 continue
592 if not provider.is_streaming_provider:
593 # matching on unique providers is pointless as they push (all) their content to MA
594 continue
595 if match := await self.match_provider(db_album, provider):
596 # 100% match, we update the db with the additional provider mapping(s)
597 await self.add_provider_mappings(db_album.item_id, match)
598 processed_domains.add(provider.domain)
599
600 def album_from_item_mapping(self, item: ItemMapping) -> Album:
601 """Create an Album object from an ItemMapping object."""
602 domain, instance_id = None, None
603 if prov := self.mass.get_provider(item.provider):
604 domain = prov.domain
605 instance_id = prov.instance_id
606 return Album.from_dict(
607 {
608 **item.to_dict(),
609 "provider_mappings": [
610 {
611 "item_id": item.item_id,
612 "provider_domain": domain,
613 "provider_instance": instance_id,
614 "available": item.available,
615 }
616 ],
617 }
618 )
619