/
/
/
1"""Base (ABC) MediaType specific controller."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from abc import ABCMeta, abstractmethod
8from collections.abc import Iterable
9from contextlib import suppress
10from datetime import datetime
11from typing import TYPE_CHECKING, Any, TypeVar, cast, final
12
13from music_assistant_models.enums import EventType, ExternalID, MediaType, ProviderFeature
14from music_assistant_models.errors import (
15 InsufficientPermissions,
16 MediaNotFoundError,
17 ProviderUnavailableError,
18)
19from music_assistant_models.media_items import (
20 AudioFormat,
21 ItemMapping,
22 MediaItemType,
23 ProviderMapping,
24 Track,
25)
26
27from music_assistant.constants import (
28 DB_TABLE_GENRE_MEDIA_ITEM_MAPPING,
29 DB_TABLE_PLAYLOG,
30 DB_TABLE_PROVIDER_MAPPINGS,
31 MASS_LOGGER_NAME,
32)
33from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
34from music_assistant.helpers.compare import compare_media_item, create_safe_string
35from music_assistant.helpers.database import UNSET
36from music_assistant.helpers.json import json_loads, serialize_to_json
37from music_assistant.helpers.util import guard_single_request, parse_optional_bool
38
39if TYPE_CHECKING:
40 from collections.abc import AsyncGenerator, Mapping
41
42 from music_assistant import MusicAssistant
43 from music_assistant.models.music_provider import MusicProvider
44
45
46ItemCls = TypeVar("ItemCls", bound="MediaItemType")
47
48
49JSON_KEYS = (
50 "artists",
51 "track_album",
52 "metadata",
53 "provider_mappings",
54 "external_ids",
55 "narrators",
56 "authors",
57 "genre_aliases",
58)
59
60SORT_KEYS = {
61 # sqlite has no builtin support for natural sorting
62 # so we have use an additional column for this
63 # this also improves searching and sorting performance
64 "name": "search_name ASC",
65 "name_desc": "search_name DESC",
66 "duration": "duration ASC",
67 "duration_desc": "duration DESC",
68 "sort_name": "search_sort_name ASC",
69 "sort_name_desc": "search_sort_name DESC",
70 "timestamp_added": "timestamp_added ASC",
71 "timestamp_added_desc": "timestamp_added DESC",
72 "timestamp_modified": "timestamp_modified ASC",
73 "timestamp_modified_desc": "timestamp_modified DESC",
74 "last_played": "last_played ASC",
75 "last_played_desc": "last_played DESC",
76 "play_count": "play_count ASC",
77 "play_count_desc": "play_count DESC",
78 "year": "year ASC",
79 "year_desc": "year DESC",
80 "position": "position ASC",
81 "position_desc": "position DESC",
82 "artist_name": "artists.search_name ASC",
83 "artist_name_desc": "artists.search_name DESC",
84 "random": "RANDOM()",
85 "random_play_count": "RANDOM(), play_count ASC",
86}
87
88
89class MediaControllerBase[ItemCls: "MediaItemType"](metaclass=ABCMeta):
90 """Base model for controller managing a MediaType."""
91
92 media_type: MediaType
93 item_cls: type[MediaItemType]
94 db_table: str
95
96 def __init__(self, mass: MusicAssistant) -> None:
97 """Initialize class."""
98 self.mass = mass
99 self.base_query = f"""
100 SELECT
101 {self.db_table}.*,
102 (SELECT JSON_GROUP_ARRAY(
103 json_object(
104 'item_id', provider_mappings.provider_item_id,
105 'provider_domain', provider_mappings.provider_domain,
106 'provider_instance', provider_mappings.provider_instance,
107 'available', provider_mappings.available,
108 'audio_format', json(provider_mappings.audio_format),
109 'url', provider_mappings.url,
110 'details', provider_mappings.details,
111 'in_library', provider_mappings.in_library,
112 'is_unique', provider_mappings.is_unique
113 )) FROM provider_mappings WHERE provider_mappings.item_id = {self.db_table}.item_id
114 AND provider_mappings.media_type = '{self.media_type.value}') AS provider_mappings
115 FROM {self.db_table} """ # noqa: E501
116 self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.music.{self.media_type.value}")
117 # register (base) api handlers
118 self.api_base = api_base = f"{self.media_type}s"
119 self.mass.register_api_command(f"music/{api_base}/count", self.library_count)
120 self.mass.register_api_command(f"music/{api_base}/library_items", self.library_items)
121 self.mass.register_api_command(f"music/{api_base}/get", self.get)
122 # Backward compatibility alias - prefer the generic "get" endpoint
123 self.mass.register_api_command(
124 f"music/{api_base}/get_{self.media_type}", self.get, alias=True
125 )
126 self.mass.register_api_command(
127 f"music/{api_base}/update", self.update_item_in_library, required_role="admin"
128 )
129 self.mass.register_api_command(
130 f"music/{api_base}/remove", self.remove_item_from_library, required_role="admin"
131 )
132 self._db_add_lock = asyncio.Lock()
133
134 @final
135 async def add_item_to_library(
136 self,
137 item: ItemCls,
138 overwrite_existing: bool = False,
139 ) -> ItemCls:
140 """Add item to library and return the new (or updated) database item."""
141 new_item = False
142 # check for existing item first
143 if library_id := await self._get_library_item_by_match(item):
144 # update existing item
145 await self._update_library_item(library_id, item, overwrite=overwrite_existing)
146 else:
147 # actually add a new item in the library db
148 self.mass.music.match_provider_instances(item)
149 async with self._db_add_lock:
150 library_id = await self._add_library_item(item)
151 new_item = True
152 # return final library_item
153 library_item = await self.get_library_item(library_id)
154 self.mass.signal_event(
155 EventType.MEDIA_ITEM_ADDED if new_item else EventType.MEDIA_ITEM_UPDATED,
156 library_item.uri,
157 library_item,
158 )
159 return library_item
160
161 @final
162 async def _get_library_item_by_match(self, item: ItemCls | ItemMapping) -> int | None:
163 if item.provider == "library":
164 return int(item.item_id)
165 # search by provider mappings if item is ItemMapping
166 if isinstance(item, ItemMapping):
167 if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
168 return int(cur_item.item_id)
169
170 # for all other items that are MediaItemType, check provider_mappings if it exists
171 provider_mappings = getattr(item, "provider_mappings", None)
172 if provider_mappings:
173 if cur_item := await self.get_library_item_by_prov_mappings(provider_mappings):
174 return int(cur_item.item_id)
175 if cur_item := await self.get_library_item_by_external_ids(item.external_ids):
176 # existing item match by external id
177 # Double check external IDs - if MBID exists, regards that as overriding
178 if compare_media_item(item, cur_item):
179 return int(cur_item.item_id)
180 # search by (exact) name match
181 query = f"{self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
182 query_params = {"name": item.name, "sort_name": item.sort_name}
183 for db_item in await self.get_library_items_by_query(
184 extra_query_parts=[query], extra_query_params=query_params
185 ):
186 if compare_media_item(db_item, item, True):
187 return int(db_item.item_id)
188 return None
189
190 @final
191 async def update_item_in_library(
192 self, item_id: str | int, update: ItemCls, overwrite: bool = False
193 ) -> ItemCls:
194 """Update existing library record in the library database."""
195 self.mass.music.match_provider_instances(update)
196 await self._update_library_item(item_id, update, overwrite=overwrite)
197 # return the updated object
198 library_item = await self.get_library_item(item_id)
199 self.mass.signal_event(
200 EventType.MEDIA_ITEM_UPDATED,
201 library_item.uri,
202 library_item,
203 )
204 return library_item
205
206 async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
207 """Delete library record from the database."""
208 db_id = int(item_id) # ensure integer
209 library_item = await self.get_library_item(db_id)
210 assert library_item, f"Item does not exist: {db_id}"
211 # delete item
212 await self.mass.music.database.delete(
213 self.db_table,
214 {"item_id": db_id},
215 )
216 # update provider_mappings table
217 await self.mass.music.database.delete(
218 DB_TABLE_PROVIDER_MAPPINGS,
219 {"media_type": self.media_type.value, "item_id": db_id},
220 )
221 # cleanup playlog table
222 await self.mass.music.database.delete(
223 DB_TABLE_PLAYLOG,
224 {
225 "media_type": self.media_type.value,
226 "item_id": db_id,
227 "provider": "library",
228 },
229 )
230 for prov_mapping in library_item.provider_mappings:
231 await self.mass.music.database.delete(
232 DB_TABLE_PLAYLOG,
233 {
234 "media_type": self.media_type.value,
235 "item_id": prov_mapping.item_id,
236 "provider": prov_mapping.provider_instance,
237 },
238 )
239 # NOTE: this does not delete any references to this item in other records,
240 # this is handled/overridden in the mediatype specific controllers
241 self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, library_item.uri, library_item)
242 self.logger.debug("deleted item with id %s from database", db_id)
243
244 async def library_count(self, favorite_only: bool = False) -> int:
245 """Return the total number of items in the library."""
246 if favorite_only:
247 sql_query = f"SELECT item_id FROM {self.db_table} WHERE favorite = 1"
248 return await self.mass.music.database.get_count_from_query(sql_query)
249 return await self.mass.music.database.get_count(self.db_table)
250
251 async def library_items(
252 self,
253 favorite: bool | None = None,
254 search: str | None = None,
255 limit: int = 500,
256 offset: int = 0,
257 order_by: str = "sort_name",
258 provider: str | list[str] | None = None,
259 genre: int | list[int] | None = None,
260 **kwargs: Any,
261 ) -> list[ItemCls]:
262 """
263 Get the library items for this mediatype.
264
265 :param favorite: Filter by favorite status.
266 :param search: Filter by search query.
267 :param limit: Maximum number of items to return.
268 :param offset: Number of items to skip.
269 :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
270 :param provider: Filter by provider instance ID (single string or list).
271 :param genre: Filter by genre id(s).
272 """
273 return await self.get_library_items_by_query(
274 favorite=favorite,
275 search=search,
276 limit=limit,
277 offset=offset,
278 order_by=order_by,
279 provider_filter=self._ensure_provider_filter(provider),
280 genre_ids=genre,
281 in_library_only=True,
282 )
283
284 async def iter_library_items(
285 self,
286 favorite: bool | None = None,
287 search: str | None = None,
288 order_by: str = "sort_name",
289 provider: str | list[str] | None = None,
290 genre: int | list[int] | None = None,
291 library_items_only: bool = True,
292 ) -> AsyncGenerator[ItemCls, None]:
293 """Iterate all in-database items."""
294 limit: int = 500
295 offset: int = 0
296 if provider is not None:
297 provider_filter = provider if isinstance(provider, list) else [provider]
298 else:
299 provider_filter = None
300 while True:
301 next_items = await self.get_library_items_by_query(
302 favorite=favorite,
303 search=search,
304 genre_ids=genre,
305 limit=limit,
306 offset=offset,
307 order_by=order_by,
308 provider_filter=provider_filter,
309 in_library_only=library_items_only,
310 )
311 for item in next_items:
312 yield item
313 if len(next_items) < limit:
314 break
315 offset += limit
316
317 async def get(
318 self,
319 item_id: str,
320 provider_instance_id_or_domain: str,
321 ) -> ItemCls:
322 """Return (full) details for a single media item."""
323 # always prefer the full library item if we have it
324 if library_item := await self.get_library_item_by_prov_id(
325 item_id,
326 provider_instance_id_or_domain,
327 ):
328 # schedule a refresh of the metadata on access of the item
329 # e.g. the item is being played or opened in the UI
330 assert library_item.uri is not None
331 self.mass.metadata.schedule_update_metadata(library_item.uri)
332 return library_item
333 # grab full details from the provider
334 return await self.get_provider_item(
335 item_id,
336 provider_instance_id_or_domain,
337 )
338
339 async def search(
340 self,
341 search_query: str,
342 provider_instance_id_or_domain: str,
343 limit: int = 25,
344 ) -> list[ItemCls]:
345 """Search database or provider with given query."""
346 # create safe search string
347 search_query = search_query.replace("/", " ").replace("'", "")
348 if provider_instance_id_or_domain == "library":
349 return await self.library_items(search=search_query, limit=limit)
350 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
351 return []
352 prov = cast("MusicProvider", prov)
353 if ProviderFeature.SEARCH not in prov.supported_features:
354 return []
355 if not prov.library_supported(self.media_type):
356 # assume library supported also means that this mediatype is supported
357 return []
358 searchresult = await prov.search(
359 search_query,
360 [self.media_type],
361 limit,
362 )
363 match self.media_type:
364 case MediaType.ARTIST:
365 return cast("list[ItemCls]", searchresult.artists)
366 case MediaType.ALBUM:
367 return cast("list[ItemCls]", searchresult.albums)
368 case MediaType.TRACK:
369 return cast("list[ItemCls]", searchresult.tracks)
370 case MediaType.PLAYLIST:
371 return cast("list[ItemCls]", searchresult.playlists)
372 case MediaType.AUDIOBOOK:
373 return cast("list[ItemCls]", searchresult.audiobooks)
374 case MediaType.PODCAST:
375 return cast("list[ItemCls]", searchresult.podcasts)
376 case MediaType.RADIO:
377 return cast("list[ItemCls]", searchresult.radio)
378 case _:
379 return []
380
381 async def get_library_item(self, item_id: int | str) -> ItemCls:
382 """Get single library item by id."""
383 db_id = int(item_id) # ensure integer
384 extra_query = f"WHERE {self.db_table}.item_id = :item_id"
385 for db_item in await self.get_library_items_by_query(
386 extra_query_parts=[extra_query],
387 extra_query_params={"item_id": db_id},
388 in_library_only=False,
389 ):
390 return db_item
391 msg = f"{self.media_type.value} not found in library: {db_id}"
392 raise MediaNotFoundError(msg)
393
394 async def get_library_item_by_prov_id(
395 self,
396 item_id: str,
397 provider_instance_id_or_domain: str,
398 ) -> ItemCls | None:
399 """Get the library item for the given provider_instance."""
400 assert item_id
401 assert provider_instance_id_or_domain
402 if provider_instance_id_or_domain == "library":
403 return await self.get_library_item(item_id)
404 for item in await self.get_library_items_by_prov_id(
405 provider_instance_id_or_domain=provider_instance_id_or_domain,
406 provider_item_id=item_id,
407 ):
408 return item
409 return None
410
411 @final
412 async def get_library_item_by_prov_mappings(
413 self,
414 provider_mappings: Iterable[ProviderMapping],
415 ) -> ItemCls | None:
416 """Get the library item for the given provider_instance."""
417 # always prefer provider instance first
418 for mapping in provider_mappings:
419 for item in await self.get_library_items_by_prov_id(
420 provider_instance=mapping.provider_instance,
421 provider_item_id=mapping.item_id,
422 ):
423 return item
424 # check by domain too
425 for mapping in provider_mappings:
426 for item in await self.get_library_items_by_prov_id(
427 provider_domain=mapping.provider_domain,
428 provider_item_id=mapping.item_id,
429 ):
430 return item
431 return None
432
433 @final
434 async def get_library_item_by_external_id(
435 self, external_id: str, external_id_type: ExternalID | None = None
436 ) -> ItemCls | None:
437 """Get the library item for the given external id."""
438 query = f"{self.db_table}.external_ids LIKE :external_id_str"
439 if external_id_type:
440 external_id_str = f'%"{external_id_type}","{external_id}"%'
441 else:
442 external_id_str = f'%"{external_id}"%'
443 for item in await self.get_library_items_by_query(
444 extra_query_parts=[query],
445 extra_query_params={"external_id_str": external_id_str},
446 ):
447 return item
448 return None
449
450 @final
451 async def get_library_item_by_external_ids(
452 self, external_ids: set[tuple[ExternalID, str]]
453 ) -> ItemCls | None:
454 """Get the library item for (one of) the given external ids."""
455 for external_id_type, external_id in external_ids:
456 if match := await self.get_library_item_by_external_id(external_id, external_id_type):
457 return match
458 return None
459
460 @final
461 async def get_library_items_by_prov_id(
462 self,
463 provider_domain: str | None = None,
464 provider_instance: str | None = None,
465 provider_instance_id_or_domain: str | None = None,
466 provider_item_id: str | None = None,
467 limit: int = 500,
468 offset: int = 0,
469 ) -> list[ItemCls]:
470 """Fetch all records from library for given provider."""
471 assert provider_instance_id_or_domain != "library"
472 assert provider_domain != "library"
473 assert provider_instance != "library"
474 subquery_parts: list[str] = []
475 query_params: dict[str, Any] = {}
476 if provider_instance:
477 query_params = {"prov_id": provider_instance}
478 subquery_parts.append("provider_mappings.provider_instance = :prov_id")
479 elif provider_domain:
480 query_params = {"prov_id": provider_domain}
481 subquery_parts.append("provider_mappings.provider_domain = :prov_id")
482 else:
483 query_params = {"prov_id": provider_instance_id_or_domain}
484 subquery_parts.append(
485 "(provider_mappings.provider_instance = :prov_id "
486 "OR provider_mappings.provider_domain = :prov_id)"
487 )
488 if provider_item_id:
489 subquery_parts.append("provider_mappings.provider_item_id = :item_id")
490 query_params["item_id"] = provider_item_id
491 subquery = f"SELECT item_id FROM provider_mappings WHERE {' AND '.join(subquery_parts)}"
492 query = f"WHERE {self.db_table}.item_id IN ({subquery})"
493 return await self.get_library_items_by_query(
494 limit=limit,
495 offset=offset,
496 extra_query_parts=[query],
497 extra_query_params=query_params,
498 in_library_only=False,
499 )
500
501 @final
502 async def iter_library_items_by_prov_id(
503 self,
504 provider_instance_id_or_domain: str,
505 provider_item_id: str | None = None,
506 ) -> AsyncGenerator[ItemCls, None]:
507 """Iterate all records from database for given provider."""
508 limit: int = 500
509 offset: int = 0
510 while True:
511 next_items = await self.get_library_items_by_prov_id(
512 provider_instance_id_or_domain=provider_instance_id_or_domain,
513 provider_item_id=provider_item_id,
514 limit=limit,
515 offset=offset,
516 )
517 for item in next_items:
518 yield item
519 if len(next_items) < limit:
520 break
521 offset += limit
522
523 @final
524 async def set_favorite(self, item_id: str | int, favorite: bool) -> None:
525 """Set the favorite bool on a database item."""
526 db_id = int(item_id) # ensure integer
527 library_item = await self.get_library_item(db_id)
528 if library_item.favorite == favorite:
529 return
530 match = {"item_id": db_id}
531 await self.mass.music.database.update(self.db_table, match, {"favorite": favorite})
532 library_item = await self.get_library_item(db_id)
533 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
534
535 @guard_single_request # type: ignore[type-var] # TODO: fix typing for MediaControllerBase
536 @final
537 async def get_provider_item(
538 self,
539 item_id: str,
540 provider_instance_id_or_domain: str,
541 force_refresh: bool = False,
542 fallback: ItemMapping | ItemCls | None = None,
543 ) -> ItemCls:
544 """Return item details for the given provider item id."""
545 if provider_instance_id_or_domain == "library":
546 return await self.get_library_item(item_id)
547 if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
548 raise ProviderUnavailableError(f"{provider_instance_id_or_domain} is not available")
549 if provider := self.mass.get_provider(provider_instance_id_or_domain):
550 provider = cast("MusicProvider", provider)
551 with suppress(MediaNotFoundError):
552 async with self.mass.cache.handle_refresh(force_refresh):
553 return cast("ItemCls", await provider.get_item(self.media_type, item_id))
554 # if we reach this point all possibilities failed and the item could not be found.
555 # There is a possibility that the (streaming) provider changed the id of the item
556 # so we return the previous details (if we have any) marked as unavailable, so
557 # at least we have the possibility to sort out the new id through matching logic.
558 fallback = fallback or await self.get_library_item_by_prov_id(
559 item_id, provider_instance_id_or_domain
560 )
561 if (
562 fallback
563 and isinstance(fallback, ItemMapping)
564 and (fallback_provider := self.mass.get_provider(fallback.provider))
565 ):
566 # fallback is a ItemMapping, try to convert to full item
567 with suppress(LookupError, TypeError, ValueError):
568 return cast(
569 "ItemCls",
570 self.item_cls.from_dict(
571 {
572 **fallback.to_dict(),
573 "provider_mappings": [
574 {
575 "item_id": fallback.item_id,
576 "provider_domain": fallback_provider.domain,
577 "provider_instance": fallback_provider.instance_id,
578 "available": fallback.available,
579 }
580 ],
581 }
582 ),
583 )
584 if fallback:
585 # simply return the fallback item
586 return cast("ItemCls", fallback)
587 # all options exhausted, we really can not find this item
588 msg = (
589 f"{self.media_type.value}://{item_id} not "
590 f"found on provider {provider_instance_id_or_domain}"
591 )
592 raise MediaNotFoundError(msg)
593
594 @final
595 async def add_provider_mapping(
596 self, item_id: str | int, provider_mapping: ProviderMapping
597 ) -> None:
598 """Add provider mapping to existing library item."""
599 await self.add_provider_mappings(item_id, [provider_mapping])
600
601 @final
602 async def add_provider_mappings(
603 self, item_id: str | int, provider_mappings: Iterable[ProviderMapping]
604 ) -> None:
605 """
606 Add provider mappings to existing library item.
607
608 :param item_id: The library item ID to add mappings to.
609 :param provider_mappings: The provider mappings to add.
610 """
611 db_id = int(item_id) # ensure integer
612 library_item = await self.get_library_item(db_id)
613 new_mappings: set[ProviderMapping] = set()
614 for provider_mapping in provider_mappings:
615 # ignore if the mapping is already present
616 if provider_mapping not in library_item.provider_mappings:
617 new_mappings.add(provider_mapping)
618 if not new_mappings:
619 return
620 # handle special case where the user wants to merge 2 library items
621 for mapping in new_mappings:
622 if _library_item := await self.get_library_item_by_prov_id(
623 mapping.item_id, mapping.provider_instance
624 ):
625 if _library_item.item_id != library_item.item_id:
626 # merging items
627 self.logger.debug(
628 "merging item id %s into item id %s based on provider mapping %s/%s",
629 _library_item.item_id,
630 library_item.item_id,
631 mapping.provider_instance,
632 mapping.item_id,
633 )
634 await self.remove_item_from_library(_library_item.item_id, recursive=True)
635 break
636 library_item.provider_mappings.update(new_mappings)
637 self.mass.music.match_provider_instances(library_item)
638 await self.set_provider_mappings(db_id, library_item.provider_mappings)
639 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
640
641 @final
642 async def update_provider_mapping(
643 self,
644 item_id: str | int,
645 provider_instance_id: str,
646 provider_item_id: str,
647 *,
648 available: bool | Any = UNSET,
649 in_library: bool | Any = UNSET,
650 is_unique: bool | None | Any = UNSET,
651 url: str | None | Any = UNSET,
652 details: str | None | Any = UNSET,
653 audio_format: AudioFormat | Any = UNSET,
654 ) -> None:
655 """Update an existing provider mapping for a library item."""
656 db_id = int(item_id) # ensure integer
657 library_item = await self.get_library_item(db_id)
658
659 # find the current mapping (strictly by provider instance + provider item id)
660 cur_mapping: ProviderMapping | None = None
661 for mapping in library_item.provider_mappings:
662 if (
663 mapping.provider_instance == provider_instance_id
664 and mapping.item_id == provider_item_id
665 ):
666 cur_mapping = mapping
667 break
668 if cur_mapping is None:
669 msg = (
670 f"Provider mapping {provider_instance_id}/{provider_item_id} "
671 f"not found for item {db_id}"
672 )
673 raise MediaNotFoundError(msg)
674
675 # guard against nulls for NOT NULL columns
676 if available is None:
677 available = UNSET
678 if in_library is None:
679 in_library = UNSET
680
681 updates: dict[str, Any] = {}
682 if available is not UNSET:
683 updates["available"] = bool(available)
684 if in_library is not UNSET:
685 updates["in_library"] = bool(in_library)
686 if is_unique is not UNSET:
687 updates["is_unique"] = is_unique
688 if url is not UNSET:
689 updates["url"] = url
690 if details is not UNSET:
691 updates["details"] = details
692 if audio_format is not UNSET:
693 updates["audio_format"] = serialize_to_json(audio_format)
694
695 if not updates:
696 return
697
698 match = {
699 "media_type": self.media_type.value,
700 "item_id": db_id,
701 "provider_instance": provider_instance_id,
702 "provider_item_id": provider_item_id,
703 }
704 await self.mass.music.database.update(DB_TABLE_PROVIDER_MAPPINGS, match, updates)
705
706 # Re-fetch the updated item so the event payload reflects persisted DB state.
707 updated_item = await self.get_library_item(db_id)
708 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, updated_item.uri, updated_item)
709
710 @final
711 async def remove_provider_mapping(
712 self, item_id: str | int, provider_instance_id: str, provider_item_id: str
713 ) -> None:
714 """Remove provider mapping(s) from item."""
715 db_id = int(item_id) # ensure integer
716 try:
717 library_item = await self.get_library_item(db_id)
718 except MediaNotFoundError:
719 # edge case: already deleted / race condition
720 return
721
722 # update provider_mappings table
723 await self.mass.music.database.delete(
724 DB_TABLE_PROVIDER_MAPPINGS,
725 {
726 "media_type": self.media_type.value,
727 "item_id": db_id,
728 "provider_instance": provider_instance_id,
729 "provider_item_id": provider_item_id,
730 },
731 )
732 # cleanup playlog table
733 await self.mass.music.database.delete(
734 DB_TABLE_PLAYLOG,
735 {
736 "media_type": self.media_type.value,
737 "item_id": provider_item_id,
738 "provider": provider_instance_id,
739 },
740 )
741 library_item.provider_mappings = {
742 x
743 for x in library_item.provider_mappings
744 if not (x.provider_instance == provider_instance_id and x.item_id == provider_item_id)
745 }
746 if library_item.provider_mappings:
747 self.logger.debug(
748 "removed provider_mapping %s/%s from item id %s",
749 provider_instance_id,
750 provider_item_id,
751 db_id,
752 )
753 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
754 else:
755 # remove item if it has no more providers
756 with suppress(AssertionError):
757 await self.remove_item_from_library(db_id)
758
759 @final
760 async def remove_provider_mappings(self, item_id: str | int, provider_instance_id: str) -> None:
761 """Remove all provider mappings from an item."""
762 db_id = int(item_id) # ensure integer
763 try:
764 library_item = await self.get_library_item(db_id)
765 except MediaNotFoundError:
766 # edge case: already deleted / race condition
767 library_item = None
768 # update provider_mappings table
769 await self.mass.music.database.delete(
770 DB_TABLE_PROVIDER_MAPPINGS,
771 {
772 "media_type": self.media_type.value,
773 "item_id": db_id,
774 "provider_instance": provider_instance_id,
775 },
776 )
777 if library_item is None:
778 return
779 # update the item's provider mappings (and check if we still have any)
780 library_item.provider_mappings = {
781 x for x in library_item.provider_mappings if x.provider_instance != provider_instance_id
782 }
783 if library_item.provider_mappings:
784 self.logger.debug(
785 "removed all provider mappings for provider %s from item id %s",
786 provider_instance_id,
787 db_id,
788 )
789 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
790 else:
791 # remove item if it has no more providers
792 with suppress(AssertionError):
793 await self.remove_item_from_library(db_id)
794
795 @final
796 async def set_provider_mappings(
797 self,
798 item_id: str | int,
799 provider_mappings: Iterable[ProviderMapping],
800 overwrite: bool = False,
801 ) -> None:
802 """Update the provider_items table for the media item."""
803 db_id = int(item_id) # ensure integer
804 if overwrite:
805 # on overwrite, clear the provider_mappings table first
806 # this is done for filesystem provider changing the path (and thus item_id)
807 await self.mass.music.database.delete(
808 DB_TABLE_PROVIDER_MAPPINGS,
809 {"media_type": self.media_type.value, "item_id": db_id},
810 )
811 for provider_mapping in provider_mappings:
812 prov_map_obj = {
813 "media_type": self.media_type.value,
814 "item_id": db_id,
815 "provider_domain": provider_mapping.provider_domain,
816 "provider_instance": provider_mapping.provider_instance,
817 "provider_item_id": provider_mapping.item_id,
818 "available": provider_mapping.available,
819 "audio_format": serialize_to_json(provider_mapping.audio_format),
820 }
821 for key in ("url", "details", "in_library", "is_unique"):
822 if (value := getattr(provider_mapping, key, None)) is not None:
823 prov_map_obj[key] = value
824 await self.mass.music.database.upsert(
825 DB_TABLE_PROVIDER_MAPPINGS,
826 prov_map_obj,
827 )
828
829 @abstractmethod
830 async def _add_library_item(
831 self,
832 item: ItemCls,
833 overwrite_existing: bool = False,
834 ) -> int:
835 """Add artist to library and return the database id."""
836
837 @abstractmethod
838 async def _update_library_item(
839 self, item_id: str | int, update: ItemCls, overwrite: bool = False
840 ) -> None:
841 """Update existing library record in the database."""
842
843 @abstractmethod
844 async def match_providers(self, db_item: ItemCls) -> None:
845 """
846 Try to find match on all (streaming) providers for the provided (database) item.
847
848 This is used to link objects of different providers/qualities together.
849 """
850
851 @abstractmethod
852 async def radio_mode_base_tracks(
853 self,
854 item: ItemCls,
855 preferred_provider_instances: list[str] | None = None,
856 ) -> list[Track]:
857 """
858 Get the list of base tracks from the controller used to calculate the dynamic radio.
859
860 :param item: The MediaItem to get base tracks for.
861 :param preferred_provider_instances: List of preferred provider instance IDs to use.
862 When provided, these providers will be tried first before falling back to others.
863 """
864
865 @final
866 async def get_library_items_by_query( # noqa: PLR0913
867 self,
868 favorite: bool | None = None,
869 search: str | None = None,
870 limit: int = 500,
871 offset: int = 0,
872 order_by: str | None = None,
873 provider_filter: list[str] | None = None,
874 extra_query_parts: list[str] | None = None,
875 extra_query_params: dict[str, Any] | None = None,
876 extra_join_parts: list[str] | None = None,
877 genre_ids: int | list[int] | None = None,
878 in_library_only: bool = False,
879 ) -> list[ItemCls]:
880 """Fetch MediaItem records from database by building the query."""
881 query_params = dict(extra_query_params) if extra_query_params else {}
882 query_parts: list[str] = list(extra_query_parts) if extra_query_parts else []
883 join_parts: list[str] = list(extra_join_parts) if extra_join_parts else []
884 search = self._preprocess_search(search, query_params)
885 genre_ids = self._preprocess_genre_ids(genre_ids)
886 # create special performant random query
887 if order_by and order_by.startswith("random"):
888 self._apply_random_subquery(
889 query_parts=query_parts,
890 query_params=query_params,
891 join_parts=join_parts,
892 favorite=favorite,
893 search=search,
894 genre_ids=genre_ids,
895 provider_filter=provider_filter,
896 limit=limit,
897 in_library_only=in_library_only,
898 )
899 else:
900 # apply filters
901 self._apply_filters(
902 query_parts=query_parts,
903 query_params=query_params,
904 join_parts=join_parts,
905 favorite=favorite,
906 search=search,
907 genre_ids=genre_ids,
908 provider_filter=provider_filter,
909 in_library_only=in_library_only,
910 )
911 # build and execute final query
912 sql_query = self._build_final_query(query_parts, join_parts, order_by)
913
914 return [
915 cast("ItemCls", self.item_cls.from_dict(self._parse_db_row(db_row)))
916 for db_row in await self.mass.music.database.get_rows_from_query(
917 sql_query, query_params, limit=limit, offset=offset
918 )
919 ]
920
921 @property
922 def _search_filter_clause(self) -> str:
923 """Return the SQL WHERE clause fragment used for search filtering."""
924 return f"{self.db_table}.search_name LIKE :search"
925
926 @final
927 def _preprocess_search(self, search: str | None, query_params: dict[str, Any]) -> str | None:
928 """Preprocess search string and add to query params."""
929 if search:
930 search = create_safe_string(search, True, True)
931 query_params["search"] = f"%{search}%"
932 return search
933
934 @final
935 @staticmethod
936 def _preprocess_genre_ids(genre_ids: int | list[int] | None) -> list[int] | None:
937 if genre_ids is None:
938 return None
939 if isinstance(genre_ids, list):
940 normalized = [int(x) for x in genre_ids]
941 else:
942 normalized = [int(genre_ids)]
943 return normalized or None
944
945 @final
946 @staticmethod
947 def _clean_query_parts(query_parts: list[str]) -> list[str]:
948 """Clean the query parts list by removing duplicate where statements."""
949 return [x[5:] if x.lower().startswith("where ") else x for x in query_parts]
950
951 @final
952 def _apply_random_subquery(
953 self,
954 query_parts: list[str],
955 query_params: dict[str, Any],
956 join_parts: list[str],
957 favorite: bool | None,
958 search: str | None,
959 genre_ids: list[int] | None,
960 provider_filter: list[str] | None,
961 limit: int,
962 in_library_only: bool = False,
963 ) -> None:
964 """Build a fast random subquery with all filters applied."""
965 sub_query_parts = query_parts.copy()
966 sub_join_parts = join_parts.copy()
967
968 # Apply all filters to the subquery
969 self._apply_filters(
970 query_parts=sub_query_parts,
971 query_params=query_params,
972 join_parts=sub_join_parts,
973 favorite=favorite,
974 search=search,
975 genre_ids=genre_ids,
976 provider_filter=provider_filter,
977 in_library_only=in_library_only,
978 )
979
980 # Build the subquery
981 sub_query = f"SELECT {self.db_table}.item_id FROM {self.db_table}"
982
983 if sub_join_parts:
984 sub_query += f" {' '.join(sub_join_parts)}"
985
986 if sub_query_parts:
987 sub_query += " WHERE " + " AND ".join(self._clean_query_parts(sub_query_parts))
988
989 sub_query += f" ORDER BY RANDOM() LIMIT {limit}"
990
991 # The query now only consists of the random subquery, which applies all filters
992 # within itself
993 query_parts.clear()
994 query_parts.append(f"{self.db_table}.item_id in ({sub_query})")
995 join_parts.clear()
996
997 @final
998 def _apply_filters(
999 self,
1000 query_parts: list[str],
1001 query_params: dict[str, Any],
1002 join_parts: list[str],
1003 favorite: bool | None,
1004 search: str | None,
1005 genre_ids: list[int] | None,
1006 provider_filter: list[str] | None,
1007 in_library_only: bool = False,
1008 ) -> None:
1009 """Apply search, favorite, and provider filters."""
1010 # handle search
1011 if search:
1012 query_parts.append(self._search_filter_clause)
1013 # handle favorite filter
1014 if favorite is not None:
1015 query_parts.append(f"{self.db_table}.favorite = :favorite")
1016 query_params["favorite"] = favorite
1017 # handle genre filter
1018 if genre_ids:
1019 query_params["genre_ids"] = genre_ids
1020 query_params["genre_media_type"] = self.media_type.value
1021 query_parts.append(
1022 f"EXISTS("
1023 f"SELECT 1 FROM {DB_TABLE_GENRE_MEDIA_ITEM_MAPPING} gm "
1024 f"WHERE gm.media_id = {self.db_table}.item_id "
1025 "AND gm.media_type = :genre_media_type "
1026 "AND gm.genre_id IN :genre_ids)"
1027 )
1028 # Apply the provider filter
1029 if provider_filter:
1030 provider_conditions = []
1031 for idx, prov in enumerate(provider_filter):
1032 param_name = f"provider_filter_{idx}"
1033 provider_conditions.append(f"provider_mappings.provider_instance = :{param_name}")
1034 query_params[param_name] = prov
1035 query_params["provider_media_type"] = self.media_type.value
1036 in_library_clause = "AND provider_mappings.in_library = 1 " if in_library_only else ""
1037 join_parts.append(
1038 f"JOIN provider_mappings ON provider_mappings.item_id = {self.db_table}.item_id "
1039 "AND provider_mappings.media_type = :provider_media_type "
1040 f"{in_library_clause}"
1041 f"AND ({' OR '.join(provider_conditions)})"
1042 )
1043 elif in_library_only:
1044 query_params["provider_media_type"] = self.media_type.value
1045 join_parts.append(
1046 f"JOIN provider_mappings ON provider_mappings.item_id = {self.db_table}.item_id "
1047 "AND provider_mappings.media_type = :provider_media_type "
1048 "AND provider_mappings.in_library = 1"
1049 )
1050
1051 @final
1052 def _build_final_query(
1053 self,
1054 query_parts: list[str],
1055 join_parts: list[str],
1056 order_by: str | None,
1057 ) -> str:
1058 """Build the final SQL query string."""
1059 sql_query = self.base_query
1060
1061 # Add joins
1062 if join_parts:
1063 sql_query += f" {' '.join(join_parts)} "
1064
1065 # Add where clauses
1066 if query_parts:
1067 # prevent duplicate where statement
1068 sql_query += " WHERE " + " AND ".join(self._clean_query_parts(query_parts))
1069
1070 # Add grouping and ordering
1071 sql_query += f" GROUP BY {self.db_table}.item_id"
1072
1073 if order_by:
1074 if sort_key := SORT_KEYS.get(order_by):
1075 sql_query += f" ORDER BY {sort_key}"
1076
1077 return sql_query
1078
1079 @final
1080 @staticmethod
1081 def _parse_db_row(db_row: Mapping[str, Any]) -> dict[str, Any]:
1082 """Parse raw db Mapping into a dict."""
1083 db_row_dict = dict(db_row)
1084 db_row_dict["provider"] = "library"
1085 db_row_dict["favorite"] = bool(db_row_dict["favorite"])
1086 db_row_dict["item_id"] = str(db_row_dict["item_id"])
1087 db_row_dict["date_added"] = datetime.fromtimestamp(
1088 db_row_dict["timestamp_added"]
1089 ).isoformat()
1090
1091 for key in JSON_KEYS:
1092 if key not in db_row_dict:
1093 continue
1094 if not (raw_value := db_row_dict[key]):
1095 continue
1096 db_row_dict[key] = json_loads(raw_value)
1097
1098 # parse "fully_played" as bool if present in the row
1099 if "fully_played" in db_row_dict:
1100 db_row_dict["fully_played"] = parse_optional_bool(db_row_dict["fully_played"])
1101
1102 # copy track_album --> album
1103 if track_album := db_row_dict.get("track_album"):
1104 db_row_dict["album"] = track_album
1105 db_row_dict["disc_number"] = track_album["disc_number"]
1106 db_row_dict["track_number"] = track_album["track_number"]
1107 # always prefer album image over track image
1108 if (album_images := track_album.get("images")) and (
1109 album_thumb := next((x for x in album_images if x["type"] == "thumb"), None)
1110 ):
1111 # copy album image to itemmapping single image
1112 db_row_dict["image"] = album_thumb
1113 if db_row_dict["metadata"].get("images"):
1114 # merge album image with existing images
1115 db_row_dict["metadata"]["images"] = [
1116 album_thumb,
1117 *db_row_dict["metadata"]["images"],
1118 ]
1119 else:
1120 db_row_dict["metadata"]["images"] = [album_thumb]
1121 return db_row_dict
1122
1123 @final
1124 def _ensure_provider_filter(
1125 self,
1126 provider: str | list[str] | None,
1127 ) -> list[str] | None:
1128 """Ensure the provider filter respects the current user's provider filter."""
1129 # Apply user provider filter if needed
1130 user = get_current_user()
1131 user_provider_filter = user.provider_filter if user and user.provider_filter else None
1132 final_provider_filter: list[str] | None = None
1133 if user_provider_filter:
1134 # User has a provider filter set
1135 if provider:
1136 # Explicit provider filter provided - validate against user's allowed providers
1137 requested_providers = [provider] if isinstance(provider, str) else provider
1138 # Only include providers that are in both the user's filter and the requested list
1139 final_provider_filter = [
1140 p for p in requested_providers if p in user_provider_filter
1141 ]
1142 if not final_provider_filter:
1143 # No overlap - user requested providers they don't have access to
1144 raise InsufficientPermissions(
1145 "User does not have permission to access the requested provider(s)."
1146 )
1147 else:
1148 # No explicit filter - use user's provider filter
1149 final_provider_filter = user_provider_filter
1150 elif provider is not None:
1151 # No user filter - use the provided filter as is
1152 final_provider_filter = [provider] if isinstance(provider, str) else provider
1153 return final_provider_filter
1154
1155 @final
1156 def _select_provider_id(self, library_item: ItemCls) -> tuple[str, str]:
1157 """Select the correct provider id to use for fetching the item."""
1158 user = get_current_user()
1159 user_provider_filter = user.provider_filter if user and user.provider_filter else None
1160 # prefer user provider filter if available
1161 for mapping in library_item.provider_mappings:
1162 if user_provider_filter and mapping.provider_instance not in user_provider_filter:
1163 continue
1164 return (mapping.provider_instance, mapping.item_id)
1165 # fallback to first mapping
1166 mapping = next(iter(library_item.provider_mappings))
1167 return (mapping.provider_instance, mapping.item_id)
1168