/
/
/
1"""Base (ABC) MediaType specific controller."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from abc import ABCMeta, abstractmethod
8from collections.abc import Iterable
9from contextlib import suppress
10from datetime import datetime
11from typing import TYPE_CHECKING, Any, TypeVar, cast, final
12
13from music_assistant_models.enums import EventType, ExternalID, MediaType, ProviderFeature
14from music_assistant_models.errors import (
15 InsufficientPermissions,
16 MediaNotFoundError,
17 ProviderUnavailableError,
18)
19from music_assistant_models.media_items import (
20 AudioFormat,
21 ItemMapping,
22 MediaItemType,
23 ProviderMapping,
24 Track,
25)
26
27from music_assistant.constants import DB_TABLE_PLAYLOG, DB_TABLE_PROVIDER_MAPPINGS, MASS_LOGGER_NAME
28from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
29from music_assistant.helpers.compare import compare_media_item, create_safe_string
30from music_assistant.helpers.database import UNSET
31from music_assistant.helpers.json import json_loads, serialize_to_json
32from music_assistant.helpers.util import guard_single_request
33
34if TYPE_CHECKING:
35 from collections.abc import AsyncGenerator, Mapping
36
37 from music_assistant import MusicAssistant
38 from music_assistant.models.music_provider import MusicProvider
39
40
41ItemCls = TypeVar("ItemCls", bound="MediaItemType")
42
43
44JSON_KEYS = (
45 "artists",
46 "track_album",
47 "metadata",
48 "provider_mappings",
49 "external_ids",
50 "narrators",
51 "authors",
52)
53
54SORT_KEYS = {
55 # sqlite has no builtin support for natural sorting
56 # so we have use an additional column for this
57 # this also improves searching and sorting performance
58 "name": "search_name ASC",
59 "name_desc": "search_name DESC",
60 "duration": "duration ASC",
61 "duration_desc": "duration DESC",
62 "sort_name": "search_sort_name ASC",
63 "sort_name_desc": "search_sort_name DESC",
64 "timestamp_added": "timestamp_added ASC",
65 "timestamp_added_desc": "timestamp_added DESC",
66 "timestamp_modified": "timestamp_modified ASC",
67 "timestamp_modified_desc": "timestamp_modified DESC",
68 "last_played": "last_played ASC",
69 "last_played_desc": "last_played DESC",
70 "play_count": "play_count ASC",
71 "play_count_desc": "play_count DESC",
72 "year": "year ASC",
73 "year_desc": "year DESC",
74 "position": "position ASC",
75 "position_desc": "position DESC",
76 "artist_name": "artists.search_name ASC",
77 "artist_name_desc": "artists.search_name DESC",
78 "random": "RANDOM()",
79 "random_play_count": "RANDOM(), play_count ASC",
80}
81
82
83class MediaControllerBase[ItemCls: "MediaItemType"](metaclass=ABCMeta):
84 """Base model for controller managing a MediaType."""
85
86 media_type: MediaType
87 item_cls: type[MediaItemType]
88 db_table: str
89
90 def __init__(self, mass: MusicAssistant) -> None:
91 """Initialize class."""
92 self.mass = mass
93 self.base_query = f"""
94 SELECT
95 {self.db_table}.*,
96 (SELECT JSON_GROUP_ARRAY(
97 json_object(
98 'item_id', provider_mappings.provider_item_id,
99 'provider_domain', provider_mappings.provider_domain,
100 'provider_instance', provider_mappings.provider_instance,
101 'available', provider_mappings.available,
102 'audio_format', json(provider_mappings.audio_format),
103 'url', provider_mappings.url,
104 'details', provider_mappings.details,
105 'in_library', provider_mappings.in_library,
106 'is_unique', provider_mappings.is_unique
107 )) FROM provider_mappings WHERE provider_mappings.item_id = {self.db_table}.item_id
108 AND provider_mappings.media_type = '{self.media_type.value}') AS provider_mappings
109 FROM {self.db_table} """ # noqa: E501
110 self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.music.{self.media_type.value}")
111 # register (base) api handlers
112 self.api_base = api_base = f"{self.media_type}s"
113 self.mass.register_api_command(f"music/{api_base}/count", self.library_count)
114 self.mass.register_api_command(f"music/{api_base}/library_items", self.library_items)
115 self.mass.register_api_command(f"music/{api_base}/get", self.get)
116 # Backward compatibility alias - prefer the generic "get" endpoint
117 self.mass.register_api_command(
118 f"music/{api_base}/get_{self.media_type}", self.get, alias=True
119 )
120 self.mass.register_api_command(
121 f"music/{api_base}/update", self.update_item_in_library, required_role="admin"
122 )
123 self.mass.register_api_command(
124 f"music/{api_base}/remove", self.remove_item_from_library, required_role="admin"
125 )
126 self._db_add_lock = asyncio.Lock()
127
128 @final
129 async def add_item_to_library(
130 self,
131 item: ItemCls,
132 overwrite_existing: bool = False,
133 ) -> ItemCls:
134 """Add item to library and return the new (or updated) database item."""
135 new_item = False
136 # check for existing item first
137 if library_id := await self._get_library_item_by_match(item):
138 # update existing item
139 await self._update_library_item(library_id, item, overwrite=overwrite_existing)
140 else:
141 # actually add a new item in the library db
142 self.mass.music.match_provider_instances(item)
143 async with self._db_add_lock:
144 library_id = await self._add_library_item(item)
145 new_item = True
146 # return final library_item
147 library_item = await self.get_library_item(library_id)
148 self.mass.signal_event(
149 EventType.MEDIA_ITEM_ADDED if new_item else EventType.MEDIA_ITEM_UPDATED,
150 library_item.uri,
151 library_item,
152 )
153 return library_item
154
155 @final
156 async def _get_library_item_by_match(self, item: ItemCls | ItemMapping) -> int | None:
157 if item.provider == "library":
158 return int(item.item_id)
159 # search by provider mappings if item is ItemMapping
160 if isinstance(item, ItemMapping):
161 if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
162 return int(cur_item.item_id)
163
164 # for all other items that are MediaItemType, check provider_mappings if it exists
165 provider_mappings = getattr(item, "provider_mappings", None)
166 if provider_mappings:
167 if cur_item := await self.get_library_item_by_prov_mappings(provider_mappings):
168 return int(cur_item.item_id)
169 if cur_item := await self.get_library_item_by_external_ids(item.external_ids):
170 # existing item match by external id
171 # Double check external IDs - if MBID exists, regards that as overriding
172 if compare_media_item(item, cur_item):
173 return int(cur_item.item_id)
174 # search by (exact) name match
175 query = f"{self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
176 query_params = {"name": item.name, "sort_name": item.sort_name}
177 for db_item in await self.get_library_items_by_query(
178 extra_query_parts=[query], extra_query_params=query_params
179 ):
180 if compare_media_item(db_item, item, True):
181 return int(db_item.item_id)
182 return None
183
184 @final
185 async def update_item_in_library(
186 self, item_id: str | int, update: ItemCls, overwrite: bool = False
187 ) -> ItemCls:
188 """Update existing library record in the library database."""
189 self.mass.music.match_provider_instances(update)
190 await self._update_library_item(item_id, update, overwrite=overwrite)
191 # return the updated object
192 library_item = await self.get_library_item(item_id)
193 self.mass.signal_event(
194 EventType.MEDIA_ITEM_UPDATED,
195 library_item.uri,
196 library_item,
197 )
198 return library_item
199
200 async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
201 """Delete library record from the database."""
202 db_id = int(item_id) # ensure integer
203 library_item = await self.get_library_item(db_id)
204 assert library_item, f"Item does not exist: {db_id}"
205 # delete item
206 await self.mass.music.database.delete(
207 self.db_table,
208 {"item_id": db_id},
209 )
210 # update provider_mappings table
211 await self.mass.music.database.delete(
212 DB_TABLE_PROVIDER_MAPPINGS,
213 {"media_type": self.media_type.value, "item_id": db_id},
214 )
215 # cleanup playlog table
216 await self.mass.music.database.delete(
217 DB_TABLE_PLAYLOG,
218 {
219 "media_type": self.media_type.value,
220 "item_id": db_id,
221 "provider": "library",
222 },
223 )
224 for prov_mapping in library_item.provider_mappings:
225 await self.mass.music.database.delete(
226 DB_TABLE_PLAYLOG,
227 {
228 "media_type": self.media_type.value,
229 "item_id": prov_mapping.item_id,
230 "provider": prov_mapping.provider_instance,
231 },
232 )
233 # NOTE: this does not delete any references to this item in other records,
234 # this is handled/overridden in the mediatype specific controllers
235 self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, library_item.uri, library_item)
236 self.logger.debug("deleted item with id %s from database", db_id)
237
238 async def library_count(self, favorite_only: bool = False) -> int:
239 """Return the total number of items in the library."""
240 if favorite_only:
241 sql_query = f"SELECT item_id FROM {self.db_table} WHERE favorite = 1"
242 return await self.mass.music.database.get_count_from_query(sql_query)
243 return await self.mass.music.database.get_count(self.db_table)
244
245 async def library_items(
246 self,
247 favorite: bool | None = None,
248 search: str | None = None,
249 limit: int = 500,
250 offset: int = 0,
251 order_by: str = "sort_name",
252 provider: str | list[str] | None = None,
253 ) -> list[ItemCls]:
254 """
255 Get the library items for this mediatype.
256
257 :param favorite: Filter by favorite status.
258 :param search: Filter by search query.
259 :param limit: Maximum number of items to return.
260 :param offset: Number of items to skip.
261 :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
262 :param provider: Filter by provider instance ID (single string or list).
263 """
264 return await self.get_library_items_by_query(
265 favorite=favorite,
266 search=search,
267 limit=limit,
268 offset=offset,
269 order_by=order_by,
270 provider_filter=self._ensure_provider_filter(provider),
271 )
272
273 async def iter_library_items(
274 self,
275 favorite: bool | None = None,
276 search: str | None = None,
277 order_by: str = "sort_name",
278 provider: str | list[str] | None = None,
279 ) -> AsyncGenerator[ItemCls, None]:
280 """Iterate all in-database items."""
281 limit: int = 500
282 offset: int = 0
283 if provider is not None:
284 provider_filter = provider if isinstance(provider, list) else [provider]
285 else:
286 provider_filter = None
287 while True:
288 next_items = await self.get_library_items_by_query(
289 favorite=favorite,
290 search=search,
291 limit=limit,
292 offset=offset,
293 order_by=order_by,
294 provider_filter=provider_filter,
295 )
296 for item in next_items:
297 yield item
298 if len(next_items) < limit:
299 break
300 offset += limit
301
302 async def get(
303 self,
304 item_id: str,
305 provider_instance_id_or_domain: str,
306 ) -> ItemCls:
307 """Return (full) details for a single media item."""
308 # always prefer the full library item if we have it
309 if library_item := await self.get_library_item_by_prov_id(
310 item_id,
311 provider_instance_id_or_domain,
312 ):
313 # schedule a refresh of the metadata on access of the item
314 # e.g. the item is being played or opened in the UI
315 assert library_item.uri is not None
316 self.mass.metadata.schedule_update_metadata(library_item.uri)
317 return library_item
318 # grab full details from the provider
319 return await self.get_provider_item(
320 item_id,
321 provider_instance_id_or_domain,
322 )
323
324 async def search(
325 self,
326 search_query: str,
327 provider_instance_id_or_domain: str,
328 limit: int = 25,
329 ) -> list[ItemCls]:
330 """Search database or provider with given query."""
331 # create safe search string
332 search_query = search_query.replace("/", " ").replace("'", "")
333 if provider_instance_id_or_domain == "library":
334 return await self.library_items(search=search_query, limit=limit)
335 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
336 return []
337 prov = cast("MusicProvider", prov)
338 if ProviderFeature.SEARCH not in prov.supported_features:
339 return []
340 if not prov.library_supported(self.media_type):
341 # assume library supported also means that this mediatype is supported
342 return []
343 searchresult = await prov.search(
344 search_query,
345 [self.media_type],
346 limit,
347 )
348 match self.media_type:
349 case MediaType.ARTIST:
350 return cast("list[ItemCls]", searchresult.artists)
351 case MediaType.ALBUM:
352 return cast("list[ItemCls]", searchresult.albums)
353 case MediaType.TRACK:
354 return cast("list[ItemCls]", searchresult.tracks)
355 case MediaType.PLAYLIST:
356 return cast("list[ItemCls]", searchresult.playlists)
357 case MediaType.AUDIOBOOK:
358 return cast("list[ItemCls]", searchresult.audiobooks)
359 case MediaType.PODCAST:
360 return cast("list[ItemCls]", searchresult.podcasts)
361 case MediaType.RADIO:
362 return cast("list[ItemCls]", searchresult.radio)
363 case _:
364 return []
365
366 async def get_library_item(self, item_id: int | str) -> ItemCls:
367 """Get single library item by id."""
368 db_id = int(item_id) # ensure integer
369 extra_query = f"WHERE {self.db_table}.item_id = :item_id"
370 for db_item in await self.get_library_items_by_query(
371 extra_query_parts=[extra_query],
372 extra_query_params={"item_id": db_id},
373 ):
374 return db_item
375 msg = f"{self.media_type.value} not found in library: {db_id}"
376 raise MediaNotFoundError(msg)
377
378 async def get_library_item_by_prov_id(
379 self,
380 item_id: str,
381 provider_instance_id_or_domain: str,
382 ) -> ItemCls | None:
383 """Get the library item for the given provider_instance."""
384 assert item_id
385 assert provider_instance_id_or_domain
386 if provider_instance_id_or_domain == "library":
387 return await self.get_library_item(item_id)
388 for item in await self.get_library_items_by_prov_id(
389 provider_instance_id_or_domain=provider_instance_id_or_domain,
390 provider_item_id=item_id,
391 ):
392 return item
393 return None
394
395 @final
396 async def get_library_item_by_prov_mappings(
397 self,
398 provider_mappings: Iterable[ProviderMapping],
399 ) -> ItemCls | None:
400 """Get the library item for the given provider_instance."""
401 # always prefer provider instance first
402 for mapping in provider_mappings:
403 for item in await self.get_library_items_by_prov_id(
404 provider_instance=mapping.provider_instance,
405 provider_item_id=mapping.item_id,
406 ):
407 return item
408 # check by domain too
409 for mapping in provider_mappings:
410 for item in await self.get_library_items_by_prov_id(
411 provider_domain=mapping.provider_domain,
412 provider_item_id=mapping.item_id,
413 ):
414 return item
415 return None
416
417 @final
418 async def get_library_item_by_external_id(
419 self, external_id: str, external_id_type: ExternalID | None = None
420 ) -> ItemCls | None:
421 """Get the library item for the given external id."""
422 query = f"{self.db_table}.external_ids LIKE :external_id_str"
423 if external_id_type:
424 external_id_str = f'%"{external_id_type}","{external_id}"%'
425 else:
426 external_id_str = f'%"{external_id}"%'
427 for item in await self.get_library_items_by_query(
428 extra_query_parts=[query],
429 extra_query_params={"external_id_str": external_id_str},
430 ):
431 return item
432 return None
433
434 @final
435 async def get_library_item_by_external_ids(
436 self, external_ids: set[tuple[ExternalID, str]]
437 ) -> ItemCls | None:
438 """Get the library item for (one of) the given external ids."""
439 for external_id_type, external_id in external_ids:
440 if match := await self.get_library_item_by_external_id(external_id, external_id_type):
441 return match
442 return None
443
444 @final
445 async def get_library_items_by_prov_id(
446 self,
447 provider_domain: str | None = None,
448 provider_instance: str | None = None,
449 provider_instance_id_or_domain: str | None = None,
450 provider_item_id: str | None = None,
451 limit: int = 500,
452 offset: int = 0,
453 ) -> list[ItemCls]:
454 """Fetch all records from library for given provider."""
455 assert provider_instance_id_or_domain != "library"
456 assert provider_domain != "library"
457 assert provider_instance != "library"
458 subquery_parts: list[str] = []
459 query_params: dict[str, Any] = {}
460 if provider_instance:
461 query_params = {"prov_id": provider_instance}
462 subquery_parts.append("provider_mappings.provider_instance = :prov_id")
463 elif provider_domain:
464 query_params = {"prov_id": provider_domain}
465 subquery_parts.append("provider_mappings.provider_domain = :prov_id")
466 else:
467 query_params = {"prov_id": provider_instance_id_or_domain}
468 subquery_parts.append(
469 "(provider_mappings.provider_instance = :prov_id "
470 "OR provider_mappings.provider_domain = :prov_id)"
471 )
472 if provider_item_id:
473 subquery_parts.append("provider_mappings.provider_item_id = :item_id")
474 query_params["item_id"] = provider_item_id
475 subquery = f"SELECT item_id FROM provider_mappings WHERE {' AND '.join(subquery_parts)}"
476 query = f"WHERE {self.db_table}.item_id IN ({subquery})"
477 return await self.get_library_items_by_query(
478 limit=limit,
479 offset=offset,
480 extra_query_parts=[query],
481 extra_query_params=query_params,
482 )
483
484 @final
485 async def iter_library_items_by_prov_id(
486 self,
487 provider_instance_id_or_domain: str,
488 provider_item_id: str | None = None,
489 ) -> AsyncGenerator[ItemCls, None]:
490 """Iterate all records from database for given provider."""
491 limit: int = 500
492 offset: int = 0
493 while True:
494 next_items = await self.get_library_items_by_prov_id(
495 provider_instance_id_or_domain=provider_instance_id_or_domain,
496 provider_item_id=provider_item_id,
497 limit=limit,
498 offset=offset,
499 )
500 for item in next_items:
501 yield item
502 if len(next_items) < limit:
503 break
504 offset += limit
505
506 @final
507 async def set_favorite(self, item_id: str | int, favorite: bool) -> None:
508 """Set the favorite bool on a database item."""
509 db_id = int(item_id) # ensure integer
510 library_item = await self.get_library_item(db_id)
511 if library_item.favorite == favorite:
512 return
513 match = {"item_id": db_id}
514 await self.mass.music.database.update(self.db_table, match, {"favorite": favorite})
515 library_item = await self.get_library_item(db_id)
516 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
517
518 @guard_single_request # type: ignore[type-var] # TODO: fix typing for MediaControllerBase
519 @final
520 async def get_provider_item(
521 self,
522 item_id: str,
523 provider_instance_id_or_domain: str,
524 force_refresh: bool = False,
525 fallback: ItemMapping | ItemCls | None = None,
526 ) -> ItemCls:
527 """Return item details for the given provider item id."""
528 if provider_instance_id_or_domain == "library":
529 return await self.get_library_item(item_id)
530 if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
531 raise ProviderUnavailableError(f"{provider_instance_id_or_domain} is not available")
532 if provider := self.mass.get_provider(provider_instance_id_or_domain):
533 provider = cast("MusicProvider", provider)
534 with suppress(MediaNotFoundError):
535 async with self.mass.cache.handle_refresh(force_refresh):
536 return cast("ItemCls", await provider.get_item(self.media_type, item_id))
537 # if we reach this point all possibilities failed and the item could not be found.
538 # There is a possibility that the (streaming) provider changed the id of the item
539 # so we return the previous details (if we have any) marked as unavailable, so
540 # at least we have the possibility to sort out the new id through matching logic.
541 fallback = fallback or await self.get_library_item_by_prov_id(
542 item_id, provider_instance_id_or_domain
543 )
544 if (
545 fallback
546 and isinstance(fallback, ItemMapping)
547 and (fallback_provider := self.mass.get_provider(fallback.provider))
548 ):
549 # fallback is a ItemMapping, try to convert to full item
550 with suppress(LookupError, TypeError, ValueError):
551 return cast(
552 "ItemCls",
553 self.item_cls.from_dict(
554 {
555 **fallback.to_dict(),
556 "provider_mappings": [
557 {
558 "item_id": fallback.item_id,
559 "provider_domain": fallback_provider.domain,
560 "provider_instance": fallback_provider.instance_id,
561 "available": fallback.available,
562 }
563 ],
564 }
565 ),
566 )
567 if fallback:
568 # simply return the fallback item
569 return cast("ItemCls", fallback)
570 # all options exhausted, we really can not find this item
571 msg = (
572 f"{self.media_type.value}://{item_id} not "
573 f"found on provider {provider_instance_id_or_domain}"
574 )
575 raise MediaNotFoundError(msg)
576
577 @final
578 async def add_provider_mapping(
579 self, item_id: str | int, provider_mapping: ProviderMapping
580 ) -> None:
581 """Add provider mapping to existing library item."""
582 await self.add_provider_mappings(item_id, [provider_mapping])
583
584 @final
585 async def add_provider_mappings(
586 self, item_id: str | int, provider_mappings: Iterable[ProviderMapping]
587 ) -> None:
588 """
589 Add provider mappings to existing library item.
590
591 :param item_id: The library item ID to add mappings to.
592 :param provider_mappings: The provider mappings to add.
593 """
594 db_id = int(item_id) # ensure integer
595 library_item = await self.get_library_item(db_id)
596 new_mappings: set[ProviderMapping] = set()
597 for provider_mapping in provider_mappings:
598 # ignore if the mapping is already present
599 if provider_mapping not in library_item.provider_mappings:
600 new_mappings.add(provider_mapping)
601 if not new_mappings:
602 return
603 # handle special case where the user wants to merge 2 library items
604 for mapping in new_mappings:
605 if _library_item := await self.get_library_item_by_prov_id(
606 mapping.item_id, mapping.provider_instance
607 ):
608 if _library_item.item_id != library_item.item_id:
609 # merging items
610 self.logger.debug(
611 "merging item id %s into item id %s based on provider mapping %s/%s",
612 _library_item.item_id,
613 library_item.item_id,
614 mapping.provider_instance,
615 mapping.item_id,
616 )
617 await self.remove_item_from_library(_library_item.item_id, recursive=True)
618 break
619 library_item.provider_mappings.update(new_mappings)
620 self.mass.music.match_provider_instances(library_item)
621 await self.set_provider_mappings(db_id, library_item.provider_mappings)
622 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
623
624 @final
625 async def update_provider_mapping(
626 self,
627 item_id: str | int,
628 provider_instance_id: str,
629 provider_item_id: str,
630 *,
631 available: bool | Any = UNSET,
632 in_library: bool | Any = UNSET,
633 is_unique: bool | None | Any = UNSET,
634 url: str | None | Any = UNSET,
635 details: str | None | Any = UNSET,
636 audio_format: AudioFormat | Any = UNSET,
637 ) -> None:
638 """Update an existing provider mapping for a library item."""
639 db_id = int(item_id) # ensure integer
640 library_item = await self.get_library_item(db_id)
641
642 # find the current mapping (strictly by provider instance + provider item id)
643 cur_mapping: ProviderMapping | None = None
644 for mapping in library_item.provider_mappings:
645 if (
646 mapping.provider_instance == provider_instance_id
647 and mapping.item_id == provider_item_id
648 ):
649 cur_mapping = mapping
650 break
651 if cur_mapping is None:
652 msg = (
653 f"Provider mapping {provider_instance_id}/{provider_item_id} "
654 f"not found for item {db_id}"
655 )
656 raise MediaNotFoundError(msg)
657
658 # guard against nulls for NOT NULL columns
659 if available is None:
660 available = UNSET
661 if in_library is None:
662 in_library = UNSET
663
664 updates: dict[str, Any] = {}
665 if available is not UNSET:
666 updates["available"] = bool(available)
667 if in_library is not UNSET:
668 updates["in_library"] = bool(in_library)
669 if is_unique is not UNSET:
670 updates["is_unique"] = is_unique
671 if url is not UNSET:
672 updates["url"] = url
673 if details is not UNSET:
674 updates["details"] = details
675 if audio_format is not UNSET:
676 updates["audio_format"] = serialize_to_json(audio_format)
677
678 if not updates:
679 return
680
681 match = {
682 "media_type": self.media_type.value,
683 "item_id": db_id,
684 "provider_instance": provider_instance_id,
685 "provider_item_id": provider_item_id,
686 }
687 await self.mass.music.database.update(DB_TABLE_PROVIDER_MAPPINGS, match, updates)
688
689 # Re-fetch the updated item so the event payload reflects persisted DB state.
690 updated_item = await self.get_library_item(db_id)
691 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, updated_item.uri, updated_item)
692
693 @final
694 async def remove_provider_mapping(
695 self, item_id: str | int, provider_instance_id: str, provider_item_id: str
696 ) -> None:
697 """Remove provider mapping(s) from item."""
698 db_id = int(item_id) # ensure integer
699 try:
700 library_item = await self.get_library_item(db_id)
701 except MediaNotFoundError:
702 # edge case: already deleted / race condition
703 return
704
705 # update provider_mappings table
706 await self.mass.music.database.delete(
707 DB_TABLE_PROVIDER_MAPPINGS,
708 {
709 "media_type": self.media_type.value,
710 "item_id": db_id,
711 "provider_instance": provider_instance_id,
712 "provider_item_id": provider_item_id,
713 },
714 )
715 # cleanup playlog table
716 await self.mass.music.database.delete(
717 DB_TABLE_PLAYLOG,
718 {
719 "media_type": self.media_type.value,
720 "item_id": provider_item_id,
721 "provider": provider_instance_id,
722 },
723 )
724 library_item.provider_mappings = {
725 x
726 for x in library_item.provider_mappings
727 if not (x.provider_instance == provider_instance_id and x.item_id == provider_item_id)
728 }
729 if library_item.provider_mappings:
730 self.logger.debug(
731 "removed provider_mapping %s/%s from item id %s",
732 provider_instance_id,
733 provider_item_id,
734 db_id,
735 )
736 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
737 else:
738 # remove item if it has no more providers
739 with suppress(AssertionError):
740 await self.remove_item_from_library(db_id)
741
742 @final
743 async def remove_provider_mappings(self, item_id: str | int, provider_instance_id: str) -> None:
744 """Remove all provider mappings from an item."""
745 db_id = int(item_id) # ensure integer
746 try:
747 library_item = await self.get_library_item(db_id)
748 except MediaNotFoundError:
749 # edge case: already deleted / race condition
750 library_item = None
751 # update provider_mappings table
752 await self.mass.music.database.delete(
753 DB_TABLE_PROVIDER_MAPPINGS,
754 {
755 "media_type": self.media_type.value,
756 "item_id": db_id,
757 "provider_instance": provider_instance_id,
758 },
759 )
760 if library_item is None:
761 return
762 # update the item's provider mappings (and check if we still have any)
763 library_item.provider_mappings = {
764 x for x in library_item.provider_mappings if x.provider_instance != provider_instance_id
765 }
766 if library_item.provider_mappings:
767 self.logger.debug(
768 "removed all provider mappings for provider %s from item id %s",
769 provider_instance_id,
770 db_id,
771 )
772 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
773 else:
774 # remove item if it has no more providers
775 with suppress(AssertionError):
776 await self.remove_item_from_library(db_id)
777
778 @final
779 async def set_provider_mappings(
780 self,
781 item_id: str | int,
782 provider_mappings: Iterable[ProviderMapping],
783 overwrite: bool = False,
784 ) -> None:
785 """Update the provider_items table for the media item."""
786 db_id = int(item_id) # ensure integer
787 if overwrite:
788 # on overwrite, clear the provider_mappings table first
789 # this is done for filesystem provider changing the path (and thus item_id)
790 await self.mass.music.database.delete(
791 DB_TABLE_PROVIDER_MAPPINGS,
792 {"media_type": self.media_type.value, "item_id": db_id},
793 )
794 for provider_mapping in provider_mappings:
795 prov_map_obj = {
796 "media_type": self.media_type.value,
797 "item_id": db_id,
798 "provider_domain": provider_mapping.provider_domain,
799 "provider_instance": provider_mapping.provider_instance,
800 "provider_item_id": provider_mapping.item_id,
801 "available": provider_mapping.available,
802 "audio_format": serialize_to_json(provider_mapping.audio_format),
803 }
804 for key in ("url", "details", "in_library", "is_unique"):
805 if (value := getattr(provider_mapping, key, None)) is not None:
806 prov_map_obj[key] = value
807 await self.mass.music.database.upsert(
808 DB_TABLE_PROVIDER_MAPPINGS,
809 prov_map_obj,
810 )
811
812 @abstractmethod
813 async def _add_library_item(
814 self,
815 item: ItemCls,
816 overwrite_existing: bool = False,
817 ) -> int:
818 """Add artist to library and return the database id."""
819
820 @abstractmethod
821 async def _update_library_item(
822 self, item_id: str | int, update: ItemCls, overwrite: bool = False
823 ) -> None:
824 """Update existing library record in the database."""
825
826 @abstractmethod
827 async def match_providers(self, db_item: ItemCls) -> None:
828 """
829 Try to find match on all (streaming) providers for the provided (database) item.
830
831 This is used to link objects of different providers/qualities together.
832 """
833
834 @abstractmethod
835 async def radio_mode_base_tracks(
836 self,
837 item: ItemCls,
838 preferred_provider_instances: list[str] | None = None,
839 ) -> list[Track]:
840 """
841 Get the list of base tracks from the controller used to calculate the dynamic radio.
842
843 :param item: The MediaItem to get base tracks for.
844 :param preferred_provider_instances: List of preferred provider instance IDs to use.
845 When provided, these providers will be tried first before falling back to others.
846 """
847
848 @final
849 async def get_library_items_by_query(
850 self,
851 favorite: bool | None = None,
852 search: str | None = None,
853 limit: int = 500,
854 offset: int = 0,
855 order_by: str | None = None,
856 provider_filter: list[str] | None = None,
857 extra_query_parts: list[str] | None = None,
858 extra_query_params: dict[str, Any] | None = None,
859 extra_join_parts: list[str] | None = None,
860 ) -> list[ItemCls]:
861 """Fetch MediaItem records from database by building the query."""
862 query_params = dict(extra_query_params) if extra_query_params else {}
863 query_parts: list[str] = list(extra_query_parts) if extra_query_parts else []
864 join_parts: list[str] = list(extra_join_parts) if extra_join_parts else []
865 search = self._preprocess_search(search, query_params)
866 # create special performant random query
867 if order_by and order_by.startswith("random"):
868 self._apply_random_subquery(
869 query_parts=query_parts,
870 query_params=query_params,
871 join_parts=join_parts,
872 favorite=favorite,
873 search=search,
874 provider_filter=provider_filter,
875 limit=limit,
876 )
877 else:
878 # apply filters
879 self._apply_filters(
880 query_parts=query_parts,
881 query_params=query_params,
882 join_parts=join_parts,
883 favorite=favorite,
884 search=search,
885 provider_filter=provider_filter,
886 )
887 # build and execute final query
888 sql_query = self._build_final_query(query_parts, join_parts, order_by)
889
890 return [
891 cast("ItemCls", self.item_cls.from_dict(self._parse_db_row(db_row)))
892 for db_row in await self.mass.music.database.get_rows_from_query(
893 sql_query, query_params, limit=limit, offset=offset
894 )
895 ]
896
897 @final
898 def _preprocess_search(self, search: str | None, query_params: dict[str, Any]) -> str | None:
899 """Preprocess search string and add to query params."""
900 if search:
901 search = create_safe_string(search, True, True)
902 query_params["search"] = f"%{search}%"
903 return search
904
905 @final
906 @staticmethod
907 def _clean_query_parts(query_parts: list[str]) -> list[str]:
908 """Clean the query parts list by removing duplicate where statements."""
909 return [x[5:] if x.lower().startswith("where ") else x for x in query_parts]
910
911 @final
912 def _apply_random_subquery(
913 self,
914 query_parts: list[str],
915 query_params: dict[str, Any],
916 join_parts: list[str],
917 favorite: bool | None,
918 search: str | None,
919 provider_filter: list[str] | None,
920 limit: int,
921 ) -> None:
922 """Build a fast random subquery with all filters applied."""
923 sub_query_parts = query_parts.copy()
924 sub_join_parts = join_parts.copy()
925
926 # Apply all filters to the subquery
927 self._apply_filters(
928 query_parts=sub_query_parts,
929 query_params=query_params,
930 join_parts=sub_join_parts,
931 favorite=favorite,
932 search=search,
933 provider_filter=provider_filter,
934 )
935
936 # Build the subquery
937 sub_query = f"SELECT {self.db_table}.item_id FROM {self.db_table}"
938
939 if sub_join_parts:
940 sub_query += f" {' '.join(sub_join_parts)}"
941
942 if sub_query_parts:
943 sub_query += " WHERE " + " AND ".join(self._clean_query_parts(sub_query_parts))
944
945 sub_query += f" ORDER BY RANDOM() LIMIT {limit}"
946
947 # The query now only consists of the random subquery, which applies all filters
948 # within itself
949 query_parts.clear()
950 query_parts.append(f"{self.db_table}.item_id in ({sub_query})")
951 join_parts.clear()
952
953 @final
954 def _apply_filters(
955 self,
956 query_parts: list[str],
957 query_params: dict[str, Any],
958 join_parts: list[str],
959 favorite: bool | None,
960 search: str | None,
961 provider_filter: list[str] | None,
962 ) -> None:
963 """Apply search, favorite, and provider filters."""
964 # handle search
965 if search:
966 query_parts.append(f"{self.db_table}.search_name LIKE :search")
967 # handle favorite filter
968 if favorite is not None:
969 query_parts.append(f"{self.db_table}.favorite = :favorite")
970 query_params["favorite"] = favorite
971 # Apply the provider filter
972 if provider_filter:
973 provider_conditions = []
974 for idx, prov in enumerate(provider_filter):
975 param_name = f"provider_filter_{idx}"
976 provider_conditions.append(f"provider_mappings.provider_instance = :{param_name}")
977 query_params[param_name] = prov
978 query_params["provider_media_type"] = self.media_type.value
979 join_parts.append(
980 f"JOIN provider_mappings ON provider_mappings.item_id = {self.db_table}.item_id "
981 "AND provider_mappings.media_type = :provider_media_type "
982 "AND provider_mappings.in_library = 1 "
983 f"AND ({' OR '.join(provider_conditions)})"
984 )
985
986 @final
987 def _build_final_query(
988 self,
989 query_parts: list[str],
990 join_parts: list[str],
991 order_by: str | None,
992 ) -> str:
993 """Build the final SQL query string."""
994 sql_query = self.base_query
995
996 # Add joins
997 if join_parts:
998 sql_query += f" {' '.join(join_parts)} "
999
1000 # Add where clauses
1001 if query_parts:
1002 # prevent duplicate where statement
1003 sql_query += " WHERE " + " AND ".join(self._clean_query_parts(query_parts))
1004
1005 # Add grouping and ordering
1006 sql_query += f" GROUP BY {self.db_table}.item_id"
1007
1008 if order_by:
1009 if sort_key := SORT_KEYS.get(order_by):
1010 sql_query += f" ORDER BY {sort_key}"
1011
1012 return sql_query
1013
1014 @final
1015 @staticmethod
1016 def _parse_db_row(db_row: Mapping[str, Any]) -> dict[str, Any]:
1017 """Parse raw db Mapping into a dict."""
1018 db_row_dict = dict(db_row)
1019 db_row_dict["provider"] = "library"
1020 db_row_dict["favorite"] = bool(db_row_dict["favorite"])
1021 db_row_dict["item_id"] = str(db_row_dict["item_id"])
1022 db_row_dict["date_added"] = datetime.fromtimestamp(
1023 db_row_dict["timestamp_added"]
1024 ).isoformat()
1025
1026 for key in JSON_KEYS:
1027 if key not in db_row_dict:
1028 continue
1029 if not (raw_value := db_row_dict[key]):
1030 continue
1031 db_row_dict[key] = json_loads(raw_value)
1032
1033 # copy track_album --> album
1034 if track_album := db_row_dict.get("track_album"):
1035 db_row_dict["album"] = track_album
1036 db_row_dict["disc_number"] = track_album["disc_number"]
1037 db_row_dict["track_number"] = track_album["track_number"]
1038 # always prefer album image over track image
1039 if (album_images := track_album.get("images")) and (
1040 album_thumb := next((x for x in album_images if x["type"] == "thumb"), None)
1041 ):
1042 # copy album image to itemmapping single image
1043 db_row_dict["image"] = album_thumb
1044 if db_row_dict["metadata"].get("images"):
1045 # merge album image with existing images
1046 db_row_dict["metadata"]["images"] = [
1047 album_thumb,
1048 *db_row_dict["metadata"]["images"],
1049 ]
1050 else:
1051 db_row_dict["metadata"]["images"] = [album_thumb]
1052 return db_row_dict
1053
1054 @final
1055 def _ensure_provider_filter(
1056 self,
1057 provider: str | list[str] | None,
1058 ) -> list[str] | None:
1059 """Ensure the provider filter respects the current user's provider filter."""
1060 # Apply user provider filter if needed
1061 user = get_current_user()
1062 user_provider_filter = user.provider_filter if user and user.provider_filter else None
1063 final_provider_filter: list[str] | None = None
1064 if user_provider_filter:
1065 # User has a provider filter set
1066 if provider:
1067 # Explicit provider filter provided - validate against user's allowed providers
1068 requested_providers = [provider] if isinstance(provider, str) else provider
1069 # Only include providers that are in both the user's filter and the requested list
1070 final_provider_filter = [
1071 p for p in requested_providers if p in user_provider_filter
1072 ]
1073 if not final_provider_filter:
1074 # No overlap - user requested providers they don't have access to
1075 raise InsufficientPermissions(
1076 "User does not have permission to access the requested provider(s)."
1077 )
1078 else:
1079 # No explicit filter - use user's provider filter
1080 final_provider_filter = user_provider_filter
1081 elif provider is not None:
1082 # No user filter - use the provided filter as is
1083 final_provider_filter = [provider] if isinstance(provider, str) else provider
1084 return final_provider_filter
1085
1086 @final
1087 def _select_provider_id(self, library_item: ItemCls) -> tuple[str, str]:
1088 """Select the correct provider id to use for fetching the item."""
1089 user = get_current_user()
1090 user_provider_filter = user.provider_filter if user and user.provider_filter else None
1091 # prefer user provider filter if available
1092 for mapping in library_item.provider_mappings:
1093 if user_provider_filter and mapping.provider_instance not in user_provider_filter:
1094 continue
1095 return (mapping.provider_instance, mapping.item_id)
1096 # fallback to first mapping
1097 mapping = next(iter(library_item.provider_mappings))
1098 return (mapping.provider_instance, mapping.item_id)
1099