/
/
/
1"""Base (ABC) MediaType specific controller."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from abc import ABCMeta, abstractmethod
8from collections.abc import Iterable
9from contextlib import suppress
10from datetime import datetime
11from typing import TYPE_CHECKING, Any, TypeVar, cast, final
12
13from music_assistant_models.enums import EventType, ExternalID, MediaType, ProviderFeature
14from music_assistant_models.errors import (
15 InsufficientPermissions,
16 MediaNotFoundError,
17 ProviderUnavailableError,
18)
19from music_assistant_models.media_items import ItemMapping, MediaItemType, ProviderMapping, Track
20
21from music_assistant.constants import DB_TABLE_PLAYLOG, DB_TABLE_PROVIDER_MAPPINGS, MASS_LOGGER_NAME
22from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
23from music_assistant.helpers.compare import compare_media_item, create_safe_string
24from music_assistant.helpers.json import json_loads, serialize_to_json
25from music_assistant.helpers.util import guard_single_request
26
27if TYPE_CHECKING:
28 from collections.abc import AsyncGenerator, Mapping
29
30 from music_assistant import MusicAssistant
31 from music_assistant.models.music_provider import MusicProvider
32
33
34ItemCls = TypeVar("ItemCls", bound="MediaItemType")
35
36
37JSON_KEYS = (
38 "artists",
39 "track_album",
40 "metadata",
41 "provider_mappings",
42 "external_ids",
43 "narrators",
44 "authors",
45)
46
47SORT_KEYS = {
48 # sqlite has no builtin support for natural sorting
49 # so we have use an additional column for this
50 # this also improves searching and sorting performance
51 "name": "search_name ASC",
52 "name_desc": "search_name DESC",
53 "duration": "duration ASC",
54 "duration_desc": "duration DESC",
55 "sort_name": "search_sort_name ASC",
56 "sort_name_desc": "search_sort_name DESC",
57 "timestamp_added": "timestamp_added ASC",
58 "timestamp_added_desc": "timestamp_added DESC",
59 "timestamp_modified": "timestamp_modified ASC",
60 "timestamp_modified_desc": "timestamp_modified DESC",
61 "last_played": "last_played ASC",
62 "last_played_desc": "last_played DESC",
63 "play_count": "play_count ASC",
64 "play_count_desc": "play_count DESC",
65 "year": "year ASC",
66 "year_desc": "year DESC",
67 "position": "position ASC",
68 "position_desc": "position DESC",
69 "artist_name": "artists.search_name ASC",
70 "artist_name_desc": "artists.search_name DESC",
71 "random": "RANDOM()",
72 "random_play_count": "RANDOM(), play_count ASC",
73}
74
75
76class MediaControllerBase[ItemCls: "MediaItemType"](metaclass=ABCMeta):
77 """Base model for controller managing a MediaType."""
78
79 media_type: MediaType
80 item_cls: type[MediaItemType]
81 db_table: str
82
83 def __init__(self, mass: MusicAssistant) -> None:
84 """Initialize class."""
85 self.mass = mass
86 self.base_query = f"""
87 SELECT
88 {self.db_table}.*,
89 (SELECT JSON_GROUP_ARRAY(
90 json_object(
91 'item_id', provider_mappings.provider_item_id,
92 'provider_domain', provider_mappings.provider_domain,
93 'provider_instance', provider_mappings.provider_instance,
94 'available', provider_mappings.available,
95 'audio_format', json(provider_mappings.audio_format),
96 'url', provider_mappings.url,
97 'details', provider_mappings.details,
98 'in_library', provider_mappings.in_library,
99 'is_unique', provider_mappings.is_unique
100 )) FROM provider_mappings WHERE provider_mappings.item_id = {self.db_table}.item_id
101 AND provider_mappings.media_type = '{self.media_type.value}') AS provider_mappings
102 FROM {self.db_table} """ # noqa: E501
103 self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.music.{self.media_type.value}")
104 # register (base) api handlers
105 self.api_base = api_base = f"{self.media_type}s"
106 self.mass.register_api_command(f"music/{api_base}/count", self.library_count)
107 self.mass.register_api_command(f"music/{api_base}/library_items", self.library_items)
108 self.mass.register_api_command(f"music/{api_base}/get", self.get)
109 # Backward compatibility alias - prefer the generic "get" endpoint
110 self.mass.register_api_command(
111 f"music/{api_base}/get_{self.media_type}", self.get, alias=True
112 )
113 self.mass.register_api_command(
114 f"music/{api_base}/update", self.update_item_in_library, required_role="admin"
115 )
116 self.mass.register_api_command(
117 f"music/{api_base}/remove", self.remove_item_from_library, required_role="admin"
118 )
119 self._db_add_lock = asyncio.Lock()
120
121 @final
122 async def add_item_to_library(
123 self,
124 item: ItemCls,
125 overwrite_existing: bool = False,
126 ) -> ItemCls:
127 """Add item to library and return the new (or updated) database item."""
128 new_item = False
129 # check for existing item first
130 if library_id := await self._get_library_item_by_match(item):
131 # update existing item
132 await self._update_library_item(library_id, item, overwrite=overwrite_existing)
133 else:
134 # actually add a new item in the library db
135 self.mass.music.match_provider_instances(item)
136 async with self._db_add_lock:
137 library_id = await self._add_library_item(item)
138 new_item = True
139 # return final library_item
140 library_item = await self.get_library_item(library_id)
141 self.mass.signal_event(
142 EventType.MEDIA_ITEM_ADDED if new_item else EventType.MEDIA_ITEM_UPDATED,
143 library_item.uri,
144 library_item,
145 )
146 return library_item
147
148 @final
149 async def _get_library_item_by_match(self, item: ItemCls | ItemMapping) -> int | None:
150 if item.provider == "library":
151 return int(item.item_id)
152 # search by provider mappings if item is ItemMapping
153 if isinstance(item, ItemMapping):
154 if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
155 return int(cur_item.item_id)
156
157 # for all other items that are MediaItemType, check provider_mappings if it exists
158 provider_mappings = getattr(item, "provider_mappings", None)
159 if provider_mappings:
160 if cur_item := await self.get_library_item_by_prov_mappings(provider_mappings):
161 return int(cur_item.item_id)
162 if cur_item := await self.get_library_item_by_external_ids(item.external_ids):
163 # existing item match by external id
164 # Double check external IDs - if MBID exists, regards that as overriding
165 if compare_media_item(item, cur_item):
166 return int(cur_item.item_id)
167 # search by (exact) name match
168 query = f"{self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
169 query_params = {"name": item.name, "sort_name": item.sort_name}
170 for db_item in await self.get_library_items_by_query(
171 extra_query_parts=[query], extra_query_params=query_params
172 ):
173 if compare_media_item(db_item, item, True):
174 return int(db_item.item_id)
175 return None
176
177 @final
178 async def update_item_in_library(
179 self, item_id: str | int, update: ItemCls, overwrite: bool = False
180 ) -> ItemCls:
181 """Update existing library record in the library database."""
182 self.mass.music.match_provider_instances(update)
183 await self._update_library_item(item_id, update, overwrite=overwrite)
184 # return the updated object
185 library_item = await self.get_library_item(item_id)
186 self.mass.signal_event(
187 EventType.MEDIA_ITEM_UPDATED,
188 library_item.uri,
189 library_item,
190 )
191 return library_item
192
193 async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
194 """Delete library record from the database."""
195 db_id = int(item_id) # ensure integer
196 library_item = await self.get_library_item(db_id)
197 assert library_item, f"Item does not exist: {db_id}"
198 # delete item
199 await self.mass.music.database.delete(
200 self.db_table,
201 {"item_id": db_id},
202 )
203 # update provider_mappings table
204 await self.mass.music.database.delete(
205 DB_TABLE_PROVIDER_MAPPINGS,
206 {"media_type": self.media_type.value, "item_id": db_id},
207 )
208 # cleanup playlog table
209 await self.mass.music.database.delete(
210 DB_TABLE_PLAYLOG,
211 {
212 "media_type": self.media_type.value,
213 "item_id": db_id,
214 "provider": "library",
215 },
216 )
217 for prov_mapping in library_item.provider_mappings:
218 await self.mass.music.database.delete(
219 DB_TABLE_PLAYLOG,
220 {
221 "media_type": self.media_type.value,
222 "item_id": prov_mapping.item_id,
223 "provider": prov_mapping.provider_instance,
224 },
225 )
226 # NOTE: this does not delete any references to this item in other records,
227 # this is handled/overridden in the mediatype specific controllers
228 self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, library_item.uri, library_item)
229 self.logger.debug("deleted item with id %s from database", db_id)
230
231 async def library_count(self, favorite_only: bool = False) -> int:
232 """Return the total number of items in the library."""
233 if favorite_only:
234 sql_query = f"SELECT item_id FROM {self.db_table} WHERE favorite = 1"
235 return await self.mass.music.database.get_count_from_query(sql_query)
236 return await self.mass.music.database.get_count(self.db_table)
237
238 async def library_items(
239 self,
240 favorite: bool | None = None,
241 search: str | None = None,
242 limit: int = 500,
243 offset: int = 0,
244 order_by: str = "sort_name",
245 provider: str | list[str] | None = None,
246 ) -> list[ItemCls]:
247 """
248 Get the library items for this mediatype.
249
250 :param favorite: Filter by favorite status.
251 :param search: Filter by search query.
252 :param limit: Maximum number of items to return.
253 :param offset: Number of items to skip.
254 :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
255 :param provider: Filter by provider instance ID (single string or list).
256 """
257 return await self.get_library_items_by_query(
258 favorite=favorite,
259 search=search,
260 limit=limit,
261 offset=offset,
262 order_by=order_by,
263 provider_filter=self._ensure_provider_filter(provider),
264 )
265
266 async def iter_library_items(
267 self,
268 favorite: bool | None = None,
269 search: str | None = None,
270 order_by: str = "sort_name",
271 provider: str | list[str] | None = None,
272 ) -> AsyncGenerator[ItemCls, None]:
273 """Iterate all in-database items."""
274 limit: int = 500
275 offset: int = 0
276 if provider is not None:
277 provider_filter = provider if isinstance(provider, list) else [provider]
278 else:
279 provider_filter = None
280 while True:
281 next_items = await self.get_library_items_by_query(
282 favorite=favorite,
283 search=search,
284 limit=limit,
285 offset=offset,
286 order_by=order_by,
287 provider_filter=provider_filter,
288 )
289 for item in next_items:
290 yield item
291 if len(next_items) < limit:
292 break
293 offset += limit
294
295 async def get(
296 self,
297 item_id: str,
298 provider_instance_id_or_domain: str,
299 ) -> ItemCls:
300 """Return (full) details for a single media item."""
301 # always prefer the full library item if we have it
302 if library_item := await self.get_library_item_by_prov_id(
303 item_id,
304 provider_instance_id_or_domain,
305 ):
306 # schedule a refresh of the metadata on access of the item
307 # e.g. the item is being played or opened in the UI
308 assert library_item.uri is not None
309 self.mass.metadata.schedule_update_metadata(library_item.uri)
310 return library_item
311 # grab full details from the provider
312 return await self.get_provider_item(
313 item_id,
314 provider_instance_id_or_domain,
315 )
316
317 async def search(
318 self,
319 search_query: str,
320 provider_instance_id_or_domain: str,
321 limit: int = 25,
322 ) -> list[ItemCls]:
323 """Search database or provider with given query."""
324 # create safe search string
325 search_query = search_query.replace("/", " ").replace("'", "")
326 if provider_instance_id_or_domain == "library":
327 return await self.library_items(search=search_query, limit=limit)
328 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
329 return []
330 prov = cast("MusicProvider", prov)
331 if ProviderFeature.SEARCH not in prov.supported_features:
332 return []
333 if not prov.library_supported(self.media_type):
334 # assume library supported also means that this mediatype is supported
335 return []
336 searchresult = await prov.search(
337 search_query,
338 [self.media_type],
339 limit,
340 )
341 match self.media_type:
342 case MediaType.ARTIST:
343 return cast("list[ItemCls]", searchresult.artists)
344 case MediaType.ALBUM:
345 return cast("list[ItemCls]", searchresult.albums)
346 case MediaType.TRACK:
347 return cast("list[ItemCls]", searchresult.tracks)
348 case MediaType.PLAYLIST:
349 return cast("list[ItemCls]", searchresult.playlists)
350 case MediaType.AUDIOBOOK:
351 return cast("list[ItemCls]", searchresult.audiobooks)
352 case MediaType.PODCAST:
353 return cast("list[ItemCls]", searchresult.podcasts)
354 case MediaType.RADIO:
355 return cast("list[ItemCls]", searchresult.radio)
356 case _:
357 return []
358
359 async def get_library_item(self, item_id: int | str) -> ItemCls:
360 """Get single library item by id."""
361 db_id = int(item_id) # ensure integer
362 extra_query = f"WHERE {self.db_table}.item_id = :item_id"
363 for db_item in await self.get_library_items_by_query(
364 extra_query_parts=[extra_query],
365 extra_query_params={"item_id": db_id},
366 ):
367 return db_item
368 msg = f"{self.media_type.value} not found in library: {db_id}"
369 raise MediaNotFoundError(msg)
370
371 async def get_library_item_by_prov_id(
372 self,
373 item_id: str,
374 provider_instance_id_or_domain: str,
375 ) -> ItemCls | None:
376 """Get the library item for the given provider_instance."""
377 assert item_id
378 assert provider_instance_id_or_domain
379 if provider_instance_id_or_domain == "library":
380 return await self.get_library_item(item_id)
381 for item in await self.get_library_items_by_prov_id(
382 provider_instance_id_or_domain=provider_instance_id_or_domain,
383 provider_item_id=item_id,
384 ):
385 return item
386 return None
387
388 @final
389 async def get_library_item_by_prov_mappings(
390 self,
391 provider_mappings: Iterable[ProviderMapping],
392 ) -> ItemCls | None:
393 """Get the library item for the given provider_instance."""
394 # always prefer provider instance first
395 for mapping in provider_mappings:
396 for item in await self.get_library_items_by_prov_id(
397 provider_instance=mapping.provider_instance,
398 provider_item_id=mapping.item_id,
399 ):
400 return item
401 # check by domain too
402 for mapping in provider_mappings:
403 for item in await self.get_library_items_by_prov_id(
404 provider_domain=mapping.provider_domain,
405 provider_item_id=mapping.item_id,
406 ):
407 return item
408 return None
409
410 @final
411 async def get_library_item_by_external_id(
412 self, external_id: str, external_id_type: ExternalID | None = None
413 ) -> ItemCls | None:
414 """Get the library item for the given external id."""
415 query = f"{self.db_table}.external_ids LIKE :external_id_str"
416 if external_id_type:
417 external_id_str = f'%"{external_id_type}","{external_id}"%'
418 else:
419 external_id_str = f'%"{external_id}"%'
420 for item in await self.get_library_items_by_query(
421 extra_query_parts=[query],
422 extra_query_params={"external_id_str": external_id_str},
423 ):
424 return item
425 return None
426
427 @final
428 async def get_library_item_by_external_ids(
429 self, external_ids: set[tuple[ExternalID, str]]
430 ) -> ItemCls | None:
431 """Get the library item for (one of) the given external ids."""
432 for external_id_type, external_id in external_ids:
433 if match := await self.get_library_item_by_external_id(external_id, external_id_type):
434 return match
435 return None
436
437 @final
438 async def get_library_items_by_prov_id(
439 self,
440 provider_domain: str | None = None,
441 provider_instance: str | None = None,
442 provider_instance_id_or_domain: str | None = None,
443 provider_item_id: str | None = None,
444 limit: int = 500,
445 offset: int = 0,
446 ) -> list[ItemCls]:
447 """Fetch all records from library for given provider."""
448 assert provider_instance_id_or_domain != "library"
449 assert provider_domain != "library"
450 assert provider_instance != "library"
451 subquery_parts: list[str] = []
452 query_params: dict[str, Any] = {}
453 if provider_instance:
454 query_params = {"prov_id": provider_instance}
455 subquery_parts.append("provider_mappings.provider_instance = :prov_id")
456 elif provider_domain:
457 query_params = {"prov_id": provider_domain}
458 subquery_parts.append("provider_mappings.provider_domain = :prov_id")
459 else:
460 query_params = {"prov_id": provider_instance_id_or_domain}
461 subquery_parts.append(
462 "(provider_mappings.provider_instance = :prov_id "
463 "OR provider_mappings.provider_domain = :prov_id)"
464 )
465 if provider_item_id:
466 subquery_parts.append("provider_mappings.provider_item_id = :item_id")
467 query_params["item_id"] = provider_item_id
468 subquery = f"SELECT item_id FROM provider_mappings WHERE {' AND '.join(subquery_parts)}"
469 query = f"WHERE {self.db_table}.item_id IN ({subquery})"
470 return await self.get_library_items_by_query(
471 limit=limit,
472 offset=offset,
473 extra_query_parts=[query],
474 extra_query_params=query_params,
475 )
476
477 @final
478 async def iter_library_items_by_prov_id(
479 self,
480 provider_instance_id_or_domain: str,
481 provider_item_id: str | None = None,
482 ) -> AsyncGenerator[ItemCls, None]:
483 """Iterate all records from database for given provider."""
484 limit: int = 500
485 offset: int = 0
486 while True:
487 next_items = await self.get_library_items_by_prov_id(
488 provider_instance_id_or_domain=provider_instance_id_or_domain,
489 provider_item_id=provider_item_id,
490 limit=limit,
491 offset=offset,
492 )
493 for item in next_items:
494 yield item
495 if len(next_items) < limit:
496 break
497 offset += limit
498
499 @final
500 async def set_favorite(self, item_id: str | int, favorite: bool) -> None:
501 """Set the favorite bool on a database item."""
502 db_id = int(item_id) # ensure integer
503 library_item = await self.get_library_item(db_id)
504 if library_item.favorite == favorite:
505 return
506 match = {"item_id": db_id}
507 await self.mass.music.database.update(self.db_table, match, {"favorite": favorite})
508 library_item = await self.get_library_item(db_id)
509 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
510
511 @guard_single_request # type: ignore[type-var] # TODO: fix typing for MediaControllerBase
512 @final
513 async def get_provider_item(
514 self,
515 item_id: str,
516 provider_instance_id_or_domain: str,
517 force_refresh: bool = False,
518 fallback: ItemMapping | ItemCls | None = None,
519 ) -> ItemCls:
520 """Return item details for the given provider item id."""
521 if provider_instance_id_or_domain == "library":
522 return await self.get_library_item(item_id)
523 if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
524 raise ProviderUnavailableError(f"{provider_instance_id_or_domain} is not available")
525 if provider := self.mass.get_provider(provider_instance_id_or_domain):
526 provider = cast("MusicProvider", provider)
527 with suppress(MediaNotFoundError):
528 async with self.mass.cache.handle_refresh(force_refresh):
529 return cast("ItemCls", await provider.get_item(self.media_type, item_id))
530 # if we reach this point all possibilities failed and the item could not be found.
531 # There is a possibility that the (streaming) provider changed the id of the item
532 # so we return the previous details (if we have any) marked as unavailable, so
533 # at least we have the possibility to sort out the new id through matching logic.
534 fallback = fallback or await self.get_library_item_by_prov_id(
535 item_id, provider_instance_id_or_domain
536 )
537 if (
538 fallback
539 and isinstance(fallback, ItemMapping)
540 and (fallback_provider := self.mass.get_provider(fallback.provider))
541 ):
542 # fallback is a ItemMapping, try to convert to full item
543 with suppress(LookupError, TypeError, ValueError):
544 return cast(
545 "ItemCls",
546 self.item_cls.from_dict(
547 {
548 **fallback.to_dict(),
549 "provider_mappings": [
550 {
551 "item_id": fallback.item_id,
552 "provider_domain": fallback_provider.domain,
553 "provider_instance": fallback_provider.instance_id,
554 "available": fallback.available,
555 }
556 ],
557 }
558 ),
559 )
560 if fallback:
561 # simply return the fallback item
562 return cast("ItemCls", fallback)
563 # all options exhausted, we really can not find this item
564 msg = (
565 f"{self.media_type.value}://{item_id} not "
566 f"found on provider {provider_instance_id_or_domain}"
567 )
568 raise MediaNotFoundError(msg)
569
570 @final
571 async def add_provider_mapping(
572 self, item_id: str | int, provider_mapping: ProviderMapping
573 ) -> None:
574 """Add provider mapping to existing library item."""
575 await self.add_provider_mappings(item_id, [provider_mapping])
576
577 @final
578 async def add_provider_mappings(
579 self, item_id: str | int, provider_mappings: Iterable[ProviderMapping]
580 ) -> None:
581 """
582 Add provider mappings to existing library item.
583
584 :param item_id: The library item ID to add mappings to.
585 :param provider_mappings: The provider mappings to add.
586 """
587 db_id = int(item_id) # ensure integer
588 library_item = await self.get_library_item(db_id)
589 new_mappings: set[ProviderMapping] = set()
590 for provider_mapping in provider_mappings:
591 # ignore if the mapping is already present
592 if provider_mapping not in library_item.provider_mappings:
593 new_mappings.add(provider_mapping)
594 if not new_mappings:
595 return
596 # handle special case where the user wants to merge 2 library items
597 for mapping in new_mappings:
598 if _library_item := await self.get_library_item_by_prov_id(
599 mapping.item_id, mapping.provider_instance
600 ):
601 if _library_item.item_id != library_item.item_id:
602 # merging items
603 self.logger.debug(
604 "merging item id %s into item id %s based on provider mapping %s/%s",
605 _library_item.item_id,
606 library_item.item_id,
607 mapping.provider_instance,
608 mapping.item_id,
609 )
610 await self.remove_item_from_library(_library_item.item_id, recursive=True)
611 break
612 library_item.provider_mappings.update(new_mappings)
613 self.mass.music.match_provider_instances(library_item)
614 await self.set_provider_mappings(db_id, library_item.provider_mappings)
615 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
616
617 @final
618 async def remove_provider_mapping(
619 self, item_id: str | int, provider_instance_id: str, provider_item_id: str
620 ) -> None:
621 """Remove provider mapping(s) from item."""
622 db_id = int(item_id) # ensure integer
623 try:
624 library_item = await self.get_library_item(db_id)
625 except MediaNotFoundError:
626 # edge case: already deleted / race condition
627 return
628
629 # update provider_mappings table
630 await self.mass.music.database.delete(
631 DB_TABLE_PROVIDER_MAPPINGS,
632 {
633 "media_type": self.media_type.value,
634 "item_id": db_id,
635 "provider_instance": provider_instance_id,
636 "provider_item_id": provider_item_id,
637 },
638 )
639 # cleanup playlog table
640 await self.mass.music.database.delete(
641 DB_TABLE_PLAYLOG,
642 {
643 "media_type": self.media_type.value,
644 "item_id": provider_item_id,
645 "provider": provider_instance_id,
646 },
647 )
648 library_item.provider_mappings = {
649 x
650 for x in library_item.provider_mappings
651 if not (x.provider_instance == provider_instance_id and x.item_id == provider_item_id)
652 }
653 if library_item.provider_mappings:
654 self.logger.debug(
655 "removed provider_mapping %s/%s from item id %s",
656 provider_instance_id,
657 provider_item_id,
658 db_id,
659 )
660 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
661 else:
662 # remove item if it has no more providers
663 with suppress(AssertionError):
664 await self.remove_item_from_library(db_id)
665
666 @final
667 async def remove_provider_mappings(self, item_id: str | int, provider_instance_id: str) -> None:
668 """Remove all provider mappings from an item."""
669 db_id = int(item_id) # ensure integer
670 try:
671 library_item = await self.get_library_item(db_id)
672 except MediaNotFoundError:
673 # edge case: already deleted / race condition
674 library_item = None
675 # update provider_mappings table
676 await self.mass.music.database.delete(
677 DB_TABLE_PROVIDER_MAPPINGS,
678 {
679 "media_type": self.media_type.value,
680 "item_id": db_id,
681 "provider_instance": provider_instance_id,
682 },
683 )
684 if library_item is None:
685 return
686 # update the item's provider mappings (and check if we still have any)
687 library_item.provider_mappings = {
688 x for x in library_item.provider_mappings if x.provider_instance != provider_instance_id
689 }
690 if library_item.provider_mappings:
691 self.logger.debug(
692 "removed all provider mappings for provider %s from item id %s",
693 provider_instance_id,
694 db_id,
695 )
696 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
697 else:
698 # remove item if it has no more providers
699 with suppress(AssertionError):
700 await self.remove_item_from_library(db_id)
701
702 @final
703 async def set_provider_mappings(
704 self,
705 item_id: str | int,
706 provider_mappings: Iterable[ProviderMapping],
707 overwrite: bool = False,
708 ) -> None:
709 """Update the provider_items table for the media item."""
710 db_id = int(item_id) # ensure integer
711 if overwrite:
712 # on overwrite, clear the provider_mappings table first
713 # this is done for filesystem provider changing the path (and thus item_id)
714 await self.mass.music.database.delete(
715 DB_TABLE_PROVIDER_MAPPINGS,
716 {"media_type": self.media_type.value, "item_id": db_id},
717 )
718 for provider_mapping in provider_mappings:
719 prov_map_obj = {
720 "media_type": self.media_type.value,
721 "item_id": db_id,
722 "provider_domain": provider_mapping.provider_domain,
723 "provider_instance": provider_mapping.provider_instance,
724 "provider_item_id": provider_mapping.item_id,
725 "available": provider_mapping.available,
726 "audio_format": serialize_to_json(provider_mapping.audio_format),
727 }
728 for key in ("url", "details", "in_library", "is_unique"):
729 if (value := getattr(provider_mapping, key, None)) is not None:
730 prov_map_obj[key] = value
731 await self.mass.music.database.upsert(
732 DB_TABLE_PROVIDER_MAPPINGS,
733 prov_map_obj,
734 )
735
736 @abstractmethod
737 async def _add_library_item(
738 self,
739 item: ItemCls,
740 overwrite_existing: bool = False,
741 ) -> int:
742 """Add artist to library and return the database id."""
743
744 @abstractmethod
745 async def _update_library_item(
746 self, item_id: str | int, update: ItemCls, overwrite: bool = False
747 ) -> None:
748 """Update existing library record in the database."""
749
750 @abstractmethod
751 async def match_providers(self, db_item: ItemCls) -> None:
752 """
753 Try to find match on all (streaming) providers for the provided (database) item.
754
755 This is used to link objects of different providers/qualities together.
756 """
757
758 @abstractmethod
759 async def radio_mode_base_tracks(
760 self,
761 item: ItemCls,
762 preferred_provider_instances: list[str] | None = None,
763 ) -> list[Track]:
764 """
765 Get the list of base tracks from the controller used to calculate the dynamic radio.
766
767 :param item: The MediaItem to get base tracks for.
768 :param preferred_provider_instances: List of preferred provider instance IDs to use.
769 When provided, these providers will be tried first before falling back to others.
770 """
771
772 @final
773 async def get_library_items_by_query(
774 self,
775 favorite: bool | None = None,
776 search: str | None = None,
777 limit: int = 500,
778 offset: int = 0,
779 order_by: str | None = None,
780 provider_filter: list[str] | None = None,
781 extra_query_parts: list[str] | None = None,
782 extra_query_params: dict[str, Any] | None = None,
783 extra_join_parts: list[str] | None = None,
784 ) -> list[ItemCls]:
785 """Fetch MediaItem records from database by building the query."""
786 query_params = extra_query_params or {}
787 query_parts: list[str] = extra_query_parts or []
788 join_parts: list[str] = extra_join_parts or []
789 search = self._preprocess_search(search, query_params)
790 # create special performant random query
791 if order_by and order_by.startswith("random"):
792 self._apply_random_subquery(
793 query_parts=query_parts,
794 query_params=query_params,
795 join_parts=join_parts,
796 favorite=favorite,
797 search=search,
798 provider_filter=provider_filter,
799 limit=limit,
800 )
801 else:
802 # apply filters
803 self._apply_filters(
804 query_parts=query_parts,
805 query_params=query_params,
806 join_parts=join_parts,
807 favorite=favorite,
808 search=search,
809 provider_filter=provider_filter,
810 )
811 # build and execute final query
812 sql_query = self._build_final_query(query_parts, join_parts, order_by)
813
814 return [
815 cast("ItemCls", self.item_cls.from_dict(self._parse_db_row(db_row)))
816 for db_row in await self.mass.music.database.get_rows_from_query(
817 sql_query, query_params, limit=limit, offset=offset
818 )
819 ]
820
821 @final
822 def _preprocess_search(self, search: str | None, query_params: dict[str, Any]) -> str | None:
823 """Preprocess search string and add to query params."""
824 if search:
825 search = create_safe_string(search, True, True)
826 query_params["search"] = f"%{search}%"
827 return search
828
829 @final
830 @staticmethod
831 def _clean_query_parts(query_parts: list[str]) -> list[str]:
832 """Clean the query parts list by removing duplicate where statements."""
833 return [x[5:] if x.lower().startswith("where ") else x for x in query_parts]
834
835 @final
836 def _apply_random_subquery(
837 self,
838 query_parts: list[str],
839 query_params: dict[str, Any],
840 join_parts: list[str],
841 favorite: bool | None,
842 search: str | None,
843 provider_filter: list[str] | None,
844 limit: int,
845 ) -> None:
846 """Build a fast random subquery with all filters applied."""
847 sub_query_parts = query_parts.copy()
848 sub_join_parts = join_parts.copy()
849
850 # Apply all filters to the subquery
851 self._apply_filters(
852 query_parts=sub_query_parts,
853 query_params=query_params,
854 join_parts=sub_join_parts,
855 favorite=favorite,
856 search=search,
857 provider_filter=provider_filter,
858 )
859
860 # Build the subquery
861 sub_query = f"SELECT {self.db_table}.item_id FROM {self.db_table}"
862
863 if sub_join_parts:
864 sub_query += f" {' '.join(sub_join_parts)}"
865
866 if sub_query_parts:
867 sub_query += " WHERE " + " AND ".join(self._clean_query_parts(sub_query_parts))
868
869 sub_query += f" ORDER BY RANDOM() LIMIT {limit}"
870
871 # The query now only consists of the random subquery, which applies all filters
872 # within itself
873 query_parts.clear()
874 query_parts.append(f"{self.db_table}.item_id in ({sub_query})")
875 join_parts.clear()
876
877 @final
878 def _apply_filters(
879 self,
880 query_parts: list[str],
881 query_params: dict[str, Any],
882 join_parts: list[str],
883 favorite: bool | None,
884 search: str | None,
885 provider_filter: list[str] | None,
886 ) -> None:
887 """Apply search, favorite, and provider filters."""
888 # handle search
889 if search:
890 query_parts.append(f"{self.db_table}.search_name LIKE :search")
891 # handle favorite filter
892 if favorite is not None:
893 query_parts.append(f"{self.db_table}.favorite = :favorite")
894 query_params["favorite"] = favorite
895 # Apply the provider filter
896 if provider_filter:
897 provider_conditions = []
898 for idx, prov in enumerate(provider_filter):
899 param_name = f"provider_filter_{idx}"
900 provider_conditions.append(f"provider_mappings.provider_instance = :{param_name}")
901 query_params[param_name] = prov
902 query_params["provider_media_type"] = self.media_type.value
903 join_parts.append(
904 f"JOIN provider_mappings ON provider_mappings.item_id = {self.db_table}.item_id "
905 "AND provider_mappings.media_type = :provider_media_type "
906 "AND provider_mappings.in_library = 1 "
907 f"AND ({' OR '.join(provider_conditions)})"
908 )
909
910 @final
911 def _build_final_query(
912 self,
913 query_parts: list[str],
914 join_parts: list[str],
915 order_by: str | None,
916 ) -> str:
917 """Build the final SQL query string."""
918 sql_query = self.base_query
919
920 # Add joins
921 if join_parts:
922 sql_query += f" {' '.join(join_parts)} "
923
924 # Add where clauses
925 if query_parts:
926 # prevent duplicate where statement
927 sql_query += " WHERE " + " AND ".join(self._clean_query_parts(query_parts))
928
929 # Add grouping and ordering
930 sql_query += f" GROUP BY {self.db_table}.item_id"
931
932 if order_by:
933 if sort_key := SORT_KEYS.get(order_by):
934 sql_query += f" ORDER BY {sort_key}"
935
936 return sql_query
937
938 @final
939 @staticmethod
940 def _parse_db_row(db_row: Mapping[str, Any]) -> dict[str, Any]:
941 """Parse raw db Mapping into a dict."""
942 db_row_dict = dict(db_row)
943 db_row_dict["provider"] = "library"
944 db_row_dict["favorite"] = bool(db_row_dict["favorite"])
945 db_row_dict["item_id"] = str(db_row_dict["item_id"])
946 db_row_dict["date_added"] = datetime.fromtimestamp(
947 db_row_dict["timestamp_added"]
948 ).isoformat()
949
950 for key in JSON_KEYS:
951 if key not in db_row_dict:
952 continue
953 if not (raw_value := db_row_dict[key]):
954 continue
955 db_row_dict[key] = json_loads(raw_value)
956
957 # copy track_album --> album
958 if track_album := db_row_dict.get("track_album"):
959 db_row_dict["album"] = track_album
960 db_row_dict["disc_number"] = track_album["disc_number"]
961 db_row_dict["track_number"] = track_album["track_number"]
962 # always prefer album image over track image
963 if (album_images := track_album.get("images")) and (
964 album_thumb := next((x for x in album_images if x["type"] == "thumb"), None)
965 ):
966 # copy album image to itemmapping single image
967 db_row_dict["image"] = album_thumb
968 if db_row_dict["metadata"].get("images"):
969 # merge album image with existing images
970 db_row_dict["metadata"]["images"] = [
971 album_thumb,
972 *db_row_dict["metadata"]["images"],
973 ]
974 else:
975 db_row_dict["metadata"]["images"] = [album_thumb]
976 return db_row_dict
977
978 @final
979 def _ensure_provider_filter(
980 self,
981 provider: str | list[str] | None,
982 ) -> list[str] | None:
983 """Ensure the provider filter respects the current user's provider filter."""
984 # Apply user provider filter if needed
985 user = get_current_user()
986 user_provider_filter = user.provider_filter if user and user.provider_filter else None
987 final_provider_filter: list[str] | None = None
988 if user_provider_filter:
989 # User has a provider filter set
990 if provider:
991 # Explicit provider filter provided - validate against user's allowed providers
992 requested_providers = [provider] if isinstance(provider, str) else provider
993 # Only include providers that are in both the user's filter and the requested list
994 final_provider_filter = [
995 p for p in requested_providers if p in user_provider_filter
996 ]
997 if not final_provider_filter:
998 # No overlap - user requested providers they don't have access to
999 raise InsufficientPermissions(
1000 "User does not have permission to access the requested provider(s)."
1001 )
1002 else:
1003 # No explicit filter - use user's provider filter
1004 final_provider_filter = user_provider_filter
1005 elif provider is not None:
1006 # No user filter - use the provided filter as is
1007 final_provider_filter = [provider] if isinstance(provider, str) else provider
1008 return final_provider_filter
1009
1010 @final
1011 def _select_provider_id(self, library_item: ItemCls) -> tuple[str, str]:
1012 """Select the correct provider id to use for fetching the item."""
1013 user = get_current_user()
1014 user_provider_filter = user.provider_filter if user and user.provider_filter else None
1015 # prefer user provider filter if available
1016 for mapping in library_item.provider_mappings:
1017 if user_provider_filter and mapping.provider_instance not in user_provider_filter:
1018 continue
1019 return (mapping.provider_instance, mapping.item_id)
1020 # fallback to first mapping
1021 mapping = next(iter(library_item.provider_mappings))
1022 return (mapping.provider_instance, mapping.item_id)
1023