/
/
/
1"""Base (ABC) MediaType specific controller."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from abc import ABCMeta, abstractmethod
8from collections.abc import Iterable
9from contextlib import suppress
10from datetime import datetime
11from typing import TYPE_CHECKING, Any, TypeVar, cast, final
12
13from music_assistant_models.enums import EventType, ExternalID, MediaType, ProviderFeature
14from music_assistant_models.errors import (
15 InsufficientPermissions,
16 MediaNotFoundError,
17 ProviderUnavailableError,
18)
19from music_assistant_models.media_items import (
20 AudioFormat,
21 ItemMapping,
22 MediaItemType,
23 ProviderMapping,
24 Track,
25)
26
27from music_assistant.constants import DB_TABLE_PLAYLOG, DB_TABLE_PROVIDER_MAPPINGS, MASS_LOGGER_NAME
28from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
29from music_assistant.helpers.compare import compare_media_item, create_safe_string
30from music_assistant.helpers.database import UNSET
31from music_assistant.helpers.json import json_loads, serialize_to_json
32from music_assistant.helpers.util import guard_single_request
33
34if TYPE_CHECKING:
35 from collections.abc import AsyncGenerator, Mapping
36
37 from music_assistant import MusicAssistant
38 from music_assistant.models.music_provider import MusicProvider
39
40
41ItemCls = TypeVar("ItemCls", bound="MediaItemType")
42
43
44JSON_KEYS = (
45 "artists",
46 "track_album",
47 "metadata",
48 "provider_mappings",
49 "external_ids",
50 "narrators",
51 "authors",
52)
53
54SORT_KEYS = {
55 # sqlite has no builtin support for natural sorting
56 # so we have use an additional column for this
57 # this also improves searching and sorting performance
58 "name": "search_name ASC",
59 "name_desc": "search_name DESC",
60 "duration": "duration ASC",
61 "duration_desc": "duration DESC",
62 "sort_name": "search_sort_name ASC",
63 "sort_name_desc": "search_sort_name DESC",
64 "timestamp_added": "timestamp_added ASC",
65 "timestamp_added_desc": "timestamp_added DESC",
66 "timestamp_modified": "timestamp_modified ASC",
67 "timestamp_modified_desc": "timestamp_modified DESC",
68 "last_played": "last_played ASC",
69 "last_played_desc": "last_played DESC",
70 "play_count": "play_count ASC",
71 "play_count_desc": "play_count DESC",
72 "year": "year ASC",
73 "year_desc": "year DESC",
74 "position": "position ASC",
75 "position_desc": "position DESC",
76 "artist_name": "artists.search_name ASC",
77 "artist_name_desc": "artists.search_name DESC",
78 "random": "RANDOM()",
79 "random_play_count": "RANDOM(), play_count ASC",
80}
81
82
83class MediaControllerBase[ItemCls: "MediaItemType"](metaclass=ABCMeta):
84 """Base model for controller managing a MediaType."""
85
86 media_type: MediaType
87 item_cls: type[MediaItemType]
88 db_table: str
89
90 def __init__(self, mass: MusicAssistant) -> None:
91 """Initialize class."""
92 self.mass = mass
93 self.base_query = f"""
94 SELECT
95 {self.db_table}.*,
96 (SELECT JSON_GROUP_ARRAY(
97 json_object(
98 'item_id', provider_mappings.provider_item_id,
99 'provider_domain', provider_mappings.provider_domain,
100 'provider_instance', provider_mappings.provider_instance,
101 'available', provider_mappings.available,
102 'audio_format', json(provider_mappings.audio_format),
103 'url', provider_mappings.url,
104 'details', provider_mappings.details,
105 'in_library', provider_mappings.in_library,
106 'is_unique', provider_mappings.is_unique
107 )) FROM provider_mappings WHERE provider_mappings.item_id = {self.db_table}.item_id
108 AND provider_mappings.media_type = '{self.media_type.value}') AS provider_mappings
109 FROM {self.db_table} """ # noqa: E501
110 self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.music.{self.media_type.value}")
111 # register (base) api handlers
112 self.api_base = api_base = f"{self.media_type}s"
113 self.mass.register_api_command(f"music/{api_base}/count", self.library_count)
114 self.mass.register_api_command(f"music/{api_base}/library_items", self.library_items)
115 self.mass.register_api_command(f"music/{api_base}/get", self.get)
116 # Backward compatibility alias - prefer the generic "get" endpoint
117 self.mass.register_api_command(
118 f"music/{api_base}/get_{self.media_type}", self.get, alias=True
119 )
120 self.mass.register_api_command(
121 f"music/{api_base}/update", self.update_item_in_library, required_role="admin"
122 )
123 self.mass.register_api_command(
124 f"music/{api_base}/remove", self.remove_item_from_library, required_role="admin"
125 )
126 self._db_add_lock = asyncio.Lock()
127
128 @final
129 async def add_item_to_library(
130 self,
131 item: ItemCls,
132 overwrite_existing: bool = False,
133 ) -> ItemCls:
134 """Add item to library and return the new (or updated) database item."""
135 new_item = False
136 # check for existing item first
137 if library_id := await self._get_library_item_by_match(item):
138 # update existing item
139 await self._update_library_item(library_id, item, overwrite=overwrite_existing)
140 else:
141 # actually add a new item in the library db
142 self.mass.music.match_provider_instances(item)
143 async with self._db_add_lock:
144 library_id = await self._add_library_item(item)
145 new_item = True
146 # return final library_item
147 library_item = await self.get_library_item(library_id)
148 self.mass.signal_event(
149 EventType.MEDIA_ITEM_ADDED if new_item else EventType.MEDIA_ITEM_UPDATED,
150 library_item.uri,
151 library_item,
152 )
153 return library_item
154
155 @final
156 async def _get_library_item_by_match(self, item: ItemCls | ItemMapping) -> int | None:
157 if item.provider == "library":
158 return int(item.item_id)
159 # search by provider mappings if item is ItemMapping
160 if isinstance(item, ItemMapping):
161 if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
162 return int(cur_item.item_id)
163
164 # for all other items that are MediaItemType, check provider_mappings if it exists
165 provider_mappings = getattr(item, "provider_mappings", None)
166 if provider_mappings:
167 if cur_item := await self.get_library_item_by_prov_mappings(provider_mappings):
168 return int(cur_item.item_id)
169 if cur_item := await self.get_library_item_by_external_ids(item.external_ids):
170 # existing item match by external id
171 # Double check external IDs - if MBID exists, regards that as overriding
172 if compare_media_item(item, cur_item):
173 return int(cur_item.item_id)
174 # search by (exact) name match
175 query = f"{self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
176 query_params = {"name": item.name, "sort_name": item.sort_name}
177 for db_item in await self.get_library_items_by_query(
178 extra_query_parts=[query], extra_query_params=query_params
179 ):
180 if compare_media_item(db_item, item, True):
181 return int(db_item.item_id)
182 return None
183
184 @final
185 async def update_item_in_library(
186 self, item_id: str | int, update: ItemCls, overwrite: bool = False
187 ) -> ItemCls:
188 """Update existing library record in the library database."""
189 self.mass.music.match_provider_instances(update)
190 await self._update_library_item(item_id, update, overwrite=overwrite)
191 # return the updated object
192 library_item = await self.get_library_item(item_id)
193 self.mass.signal_event(
194 EventType.MEDIA_ITEM_UPDATED,
195 library_item.uri,
196 library_item,
197 )
198 return library_item
199
200 async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
201 """Delete library record from the database."""
202 db_id = int(item_id) # ensure integer
203 library_item = await self.get_library_item(db_id)
204 assert library_item, f"Item does not exist: {db_id}"
205 # delete item
206 await self.mass.music.database.delete(
207 self.db_table,
208 {"item_id": db_id},
209 )
210 # update provider_mappings table
211 await self.mass.music.database.delete(
212 DB_TABLE_PROVIDER_MAPPINGS,
213 {"media_type": self.media_type.value, "item_id": db_id},
214 )
215 # cleanup playlog table
216 await self.mass.music.database.delete(
217 DB_TABLE_PLAYLOG,
218 {
219 "media_type": self.media_type.value,
220 "item_id": db_id,
221 "provider": "library",
222 },
223 )
224 for prov_mapping in library_item.provider_mappings:
225 await self.mass.music.database.delete(
226 DB_TABLE_PLAYLOG,
227 {
228 "media_type": self.media_type.value,
229 "item_id": prov_mapping.item_id,
230 "provider": prov_mapping.provider_instance,
231 },
232 )
233 # NOTE: this does not delete any references to this item in other records,
234 # this is handled/overridden in the mediatype specific controllers
235 self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, library_item.uri, library_item)
236 self.logger.debug("deleted item with id %s from database", db_id)
237
238 async def library_count(self, favorite_only: bool = False) -> int:
239 """Return the total number of items in the library."""
240 if favorite_only:
241 sql_query = f"SELECT item_id FROM {self.db_table} WHERE favorite = 1"
242 return await self.mass.music.database.get_count_from_query(sql_query)
243 return await self.mass.music.database.get_count(self.db_table)
244
245 async def library_items(
246 self,
247 favorite: bool | None = None,
248 search: str | None = None,
249 limit: int = 500,
250 offset: int = 0,
251 order_by: str = "sort_name",
252 provider: str | list[str] | None = None,
253 **kwargs: Any,
254 ) -> list[ItemCls]:
255 """
256 Get the library items for this mediatype.
257
258 :param favorite: Filter by favorite status.
259 :param search: Filter by search query.
260 :param limit: Maximum number of items to return.
261 :param offset: Number of items to skip.
262 :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
263 :param provider: Filter by provider instance ID (single string or list).
264 """
265 return await self.get_library_items_by_query(
266 favorite=favorite,
267 search=search,
268 limit=limit,
269 offset=offset,
270 order_by=order_by,
271 provider_filter=self._ensure_provider_filter(provider),
272 in_library_only=True,
273 )
274
275 async def iter_library_items(
276 self,
277 favorite: bool | None = None,
278 search: str | None = None,
279 order_by: str = "sort_name",
280 provider: str | list[str] | None = None,
281 library_items_only: bool = True,
282 ) -> AsyncGenerator[ItemCls, None]:
283 """Iterate all in-database items."""
284 limit: int = 500
285 offset: int = 0
286 if provider is not None:
287 provider_filter = provider if isinstance(provider, list) else [provider]
288 else:
289 provider_filter = None
290 while True:
291 next_items = await self.get_library_items_by_query(
292 favorite=favorite,
293 search=search,
294 limit=limit,
295 offset=offset,
296 order_by=order_by,
297 provider_filter=provider_filter,
298 in_library_only=library_items_only,
299 )
300 for item in next_items:
301 yield item
302 if len(next_items) < limit:
303 break
304 offset += limit
305
306 async def get(
307 self,
308 item_id: str,
309 provider_instance_id_or_domain: str,
310 ) -> ItemCls:
311 """Return (full) details for a single media item."""
312 # always prefer the full library item if we have it
313 if library_item := await self.get_library_item_by_prov_id(
314 item_id,
315 provider_instance_id_or_domain,
316 ):
317 # schedule a refresh of the metadata on access of the item
318 # e.g. the item is being played or opened in the UI
319 assert library_item.uri is not None
320 self.mass.metadata.schedule_update_metadata(library_item.uri)
321 return library_item
322 # grab full details from the provider
323 return await self.get_provider_item(
324 item_id,
325 provider_instance_id_or_domain,
326 )
327
328 async def search(
329 self,
330 search_query: str,
331 provider_instance_id_or_domain: str,
332 limit: int = 25,
333 ) -> list[ItemCls]:
334 """Search database or provider with given query."""
335 # create safe search string
336 search_query = search_query.replace("/", " ").replace("'", "")
337 if provider_instance_id_or_domain == "library":
338 return await self.library_items(search=search_query, limit=limit)
339 if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
340 return []
341 prov = cast("MusicProvider", prov)
342 if ProviderFeature.SEARCH not in prov.supported_features:
343 return []
344 if not prov.library_supported(self.media_type):
345 # assume library supported also means that this mediatype is supported
346 return []
347 searchresult = await prov.search(
348 search_query,
349 [self.media_type],
350 limit,
351 )
352 match self.media_type:
353 case MediaType.ARTIST:
354 return cast("list[ItemCls]", searchresult.artists)
355 case MediaType.ALBUM:
356 return cast("list[ItemCls]", searchresult.albums)
357 case MediaType.TRACK:
358 return cast("list[ItemCls]", searchresult.tracks)
359 case MediaType.PLAYLIST:
360 return cast("list[ItemCls]", searchresult.playlists)
361 case MediaType.AUDIOBOOK:
362 return cast("list[ItemCls]", searchresult.audiobooks)
363 case MediaType.PODCAST:
364 return cast("list[ItemCls]", searchresult.podcasts)
365 case MediaType.RADIO:
366 return cast("list[ItemCls]", searchresult.radio)
367 case _:
368 return []
369
370 async def get_library_item(self, item_id: int | str) -> ItemCls:
371 """Get single library item by id."""
372 db_id = int(item_id) # ensure integer
373 extra_query = f"WHERE {self.db_table}.item_id = :item_id"
374 for db_item in await self.get_library_items_by_query(
375 extra_query_parts=[extra_query],
376 extra_query_params={"item_id": db_id},
377 in_library_only=False,
378 ):
379 return db_item
380 msg = f"{self.media_type.value} not found in library: {db_id}"
381 raise MediaNotFoundError(msg)
382
383 async def get_library_item_by_prov_id(
384 self,
385 item_id: str,
386 provider_instance_id_or_domain: str,
387 ) -> ItemCls | None:
388 """Get the library item for the given provider_instance."""
389 assert item_id
390 assert provider_instance_id_or_domain
391 if provider_instance_id_or_domain == "library":
392 return await self.get_library_item(item_id)
393 for item in await self.get_library_items_by_prov_id(
394 provider_instance_id_or_domain=provider_instance_id_or_domain,
395 provider_item_id=item_id,
396 ):
397 return item
398 return None
399
400 @final
401 async def get_library_item_by_prov_mappings(
402 self,
403 provider_mappings: Iterable[ProviderMapping],
404 ) -> ItemCls | None:
405 """Get the library item for the given provider_instance."""
406 # always prefer provider instance first
407 for mapping in provider_mappings:
408 for item in await self.get_library_items_by_prov_id(
409 provider_instance=mapping.provider_instance,
410 provider_item_id=mapping.item_id,
411 ):
412 return item
413 # check by domain too
414 for mapping in provider_mappings:
415 for item in await self.get_library_items_by_prov_id(
416 provider_domain=mapping.provider_domain,
417 provider_item_id=mapping.item_id,
418 ):
419 return item
420 return None
421
422 @final
423 async def get_library_item_by_external_id(
424 self, external_id: str, external_id_type: ExternalID | None = None
425 ) -> ItemCls | None:
426 """Get the library item for the given external id."""
427 query = f"{self.db_table}.external_ids LIKE :external_id_str"
428 if external_id_type:
429 external_id_str = f'%"{external_id_type}","{external_id}"%'
430 else:
431 external_id_str = f'%"{external_id}"%'
432 for item in await self.get_library_items_by_query(
433 extra_query_parts=[query],
434 extra_query_params={"external_id_str": external_id_str},
435 ):
436 return item
437 return None
438
439 @final
440 async def get_library_item_by_external_ids(
441 self, external_ids: set[tuple[ExternalID, str]]
442 ) -> ItemCls | None:
443 """Get the library item for (one of) the given external ids."""
444 for external_id_type, external_id in external_ids:
445 if match := await self.get_library_item_by_external_id(external_id, external_id_type):
446 return match
447 return None
448
449 @final
450 async def get_library_items_by_prov_id(
451 self,
452 provider_domain: str | None = None,
453 provider_instance: str | None = None,
454 provider_instance_id_or_domain: str | None = None,
455 provider_item_id: str | None = None,
456 limit: int = 500,
457 offset: int = 0,
458 ) -> list[ItemCls]:
459 """Fetch all records from library for given provider."""
460 assert provider_instance_id_or_domain != "library"
461 assert provider_domain != "library"
462 assert provider_instance != "library"
463 subquery_parts: list[str] = []
464 query_params: dict[str, Any] = {}
465 if provider_instance:
466 query_params = {"prov_id": provider_instance}
467 subquery_parts.append("provider_mappings.provider_instance = :prov_id")
468 elif provider_domain:
469 query_params = {"prov_id": provider_domain}
470 subquery_parts.append("provider_mappings.provider_domain = :prov_id")
471 else:
472 query_params = {"prov_id": provider_instance_id_or_domain}
473 subquery_parts.append(
474 "(provider_mappings.provider_instance = :prov_id "
475 "OR provider_mappings.provider_domain = :prov_id)"
476 )
477 if provider_item_id:
478 subquery_parts.append("provider_mappings.provider_item_id = :item_id")
479 query_params["item_id"] = provider_item_id
480 subquery = f"SELECT item_id FROM provider_mappings WHERE {' AND '.join(subquery_parts)}"
481 query = f"WHERE {self.db_table}.item_id IN ({subquery})"
482 return await self.get_library_items_by_query(
483 limit=limit,
484 offset=offset,
485 extra_query_parts=[query],
486 extra_query_params=query_params,
487 in_library_only=False,
488 )
489
490 @final
491 async def iter_library_items_by_prov_id(
492 self,
493 provider_instance_id_or_domain: str,
494 provider_item_id: str | None = None,
495 ) -> AsyncGenerator[ItemCls, None]:
496 """Iterate all records from database for given provider."""
497 limit: int = 500
498 offset: int = 0
499 while True:
500 next_items = await self.get_library_items_by_prov_id(
501 provider_instance_id_or_domain=provider_instance_id_or_domain,
502 provider_item_id=provider_item_id,
503 limit=limit,
504 offset=offset,
505 )
506 for item in next_items:
507 yield item
508 if len(next_items) < limit:
509 break
510 offset += limit
511
512 @final
513 async def set_favorite(self, item_id: str | int, favorite: bool) -> None:
514 """Set the favorite bool on a database item."""
515 db_id = int(item_id) # ensure integer
516 library_item = await self.get_library_item(db_id)
517 if library_item.favorite == favorite:
518 return
519 match = {"item_id": db_id}
520 await self.mass.music.database.update(self.db_table, match, {"favorite": favorite})
521 library_item = await self.get_library_item(db_id)
522 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
523
524 @guard_single_request # type: ignore[type-var] # TODO: fix typing for MediaControllerBase
525 @final
526 async def get_provider_item(
527 self,
528 item_id: str,
529 provider_instance_id_or_domain: str,
530 force_refresh: bool = False,
531 fallback: ItemMapping | ItemCls | None = None,
532 ) -> ItemCls:
533 """Return item details for the given provider item id."""
534 if provider_instance_id_or_domain == "library":
535 return await self.get_library_item(item_id)
536 if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
537 raise ProviderUnavailableError(f"{provider_instance_id_or_domain} is not available")
538 if provider := self.mass.get_provider(provider_instance_id_or_domain):
539 provider = cast("MusicProvider", provider)
540 with suppress(MediaNotFoundError):
541 async with self.mass.cache.handle_refresh(force_refresh):
542 return cast("ItemCls", await provider.get_item(self.media_type, item_id))
543 # if we reach this point all possibilities failed and the item could not be found.
544 # There is a possibility that the (streaming) provider changed the id of the item
545 # so we return the previous details (if we have any) marked as unavailable, so
546 # at least we have the possibility to sort out the new id through matching logic.
547 fallback = fallback or await self.get_library_item_by_prov_id(
548 item_id, provider_instance_id_or_domain
549 )
550 if (
551 fallback
552 and isinstance(fallback, ItemMapping)
553 and (fallback_provider := self.mass.get_provider(fallback.provider))
554 ):
555 # fallback is a ItemMapping, try to convert to full item
556 with suppress(LookupError, TypeError, ValueError):
557 return cast(
558 "ItemCls",
559 self.item_cls.from_dict(
560 {
561 **fallback.to_dict(),
562 "provider_mappings": [
563 {
564 "item_id": fallback.item_id,
565 "provider_domain": fallback_provider.domain,
566 "provider_instance": fallback_provider.instance_id,
567 "available": fallback.available,
568 }
569 ],
570 }
571 ),
572 )
573 if fallback:
574 # simply return the fallback item
575 return cast("ItemCls", fallback)
576 # all options exhausted, we really can not find this item
577 msg = (
578 f"{self.media_type.value}://{item_id} not "
579 f"found on provider {provider_instance_id_or_domain}"
580 )
581 raise MediaNotFoundError(msg)
582
583 @final
584 async def add_provider_mapping(
585 self, item_id: str | int, provider_mapping: ProviderMapping
586 ) -> None:
587 """Add provider mapping to existing library item."""
588 await self.add_provider_mappings(item_id, [provider_mapping])
589
590 @final
591 async def add_provider_mappings(
592 self, item_id: str | int, provider_mappings: Iterable[ProviderMapping]
593 ) -> None:
594 """
595 Add provider mappings to existing library item.
596
597 :param item_id: The library item ID to add mappings to.
598 :param provider_mappings: The provider mappings to add.
599 """
600 db_id = int(item_id) # ensure integer
601 library_item = await self.get_library_item(db_id)
602 new_mappings: set[ProviderMapping] = set()
603 for provider_mapping in provider_mappings:
604 # ignore if the mapping is already present
605 if provider_mapping not in library_item.provider_mappings:
606 new_mappings.add(provider_mapping)
607 if not new_mappings:
608 return
609 # handle special case where the user wants to merge 2 library items
610 for mapping in new_mappings:
611 if _library_item := await self.get_library_item_by_prov_id(
612 mapping.item_id, mapping.provider_instance
613 ):
614 if _library_item.item_id != library_item.item_id:
615 # merging items
616 self.logger.debug(
617 "merging item id %s into item id %s based on provider mapping %s/%s",
618 _library_item.item_id,
619 library_item.item_id,
620 mapping.provider_instance,
621 mapping.item_id,
622 )
623 await self.remove_item_from_library(_library_item.item_id, recursive=True)
624 break
625 library_item.provider_mappings.update(new_mappings)
626 self.mass.music.match_provider_instances(library_item)
627 await self.set_provider_mappings(db_id, library_item.provider_mappings)
628 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
629
630 @final
631 async def update_provider_mapping(
632 self,
633 item_id: str | int,
634 provider_instance_id: str,
635 provider_item_id: str,
636 *,
637 available: bool | Any = UNSET,
638 in_library: bool | Any = UNSET,
639 is_unique: bool | None | Any = UNSET,
640 url: str | None | Any = UNSET,
641 details: str | None | Any = UNSET,
642 audio_format: AudioFormat | Any = UNSET,
643 ) -> None:
644 """Update an existing provider mapping for a library item."""
645 db_id = int(item_id) # ensure integer
646 library_item = await self.get_library_item(db_id)
647
648 # find the current mapping (strictly by provider instance + provider item id)
649 cur_mapping: ProviderMapping | None = None
650 for mapping in library_item.provider_mappings:
651 if (
652 mapping.provider_instance == provider_instance_id
653 and mapping.item_id == provider_item_id
654 ):
655 cur_mapping = mapping
656 break
657 if cur_mapping is None:
658 msg = (
659 f"Provider mapping {provider_instance_id}/{provider_item_id} "
660 f"not found for item {db_id}"
661 )
662 raise MediaNotFoundError(msg)
663
664 # guard against nulls for NOT NULL columns
665 if available is None:
666 available = UNSET
667 if in_library is None:
668 in_library = UNSET
669
670 updates: dict[str, Any] = {}
671 if available is not UNSET:
672 updates["available"] = bool(available)
673 if in_library is not UNSET:
674 updates["in_library"] = bool(in_library)
675 if is_unique is not UNSET:
676 updates["is_unique"] = is_unique
677 if url is not UNSET:
678 updates["url"] = url
679 if details is not UNSET:
680 updates["details"] = details
681 if audio_format is not UNSET:
682 updates["audio_format"] = serialize_to_json(audio_format)
683
684 if not updates:
685 return
686
687 match = {
688 "media_type": self.media_type.value,
689 "item_id": db_id,
690 "provider_instance": provider_instance_id,
691 "provider_item_id": provider_item_id,
692 }
693 await self.mass.music.database.update(DB_TABLE_PROVIDER_MAPPINGS, match, updates)
694
695 # Re-fetch the updated item so the event payload reflects persisted DB state.
696 updated_item = await self.get_library_item(db_id)
697 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, updated_item.uri, updated_item)
698
699 @final
700 async def remove_provider_mapping(
701 self, item_id: str | int, provider_instance_id: str, provider_item_id: str
702 ) -> None:
703 """Remove provider mapping(s) from item."""
704 db_id = int(item_id) # ensure integer
705 try:
706 library_item = await self.get_library_item(db_id)
707 except MediaNotFoundError:
708 # edge case: already deleted / race condition
709 return
710
711 # update provider_mappings table
712 await self.mass.music.database.delete(
713 DB_TABLE_PROVIDER_MAPPINGS,
714 {
715 "media_type": self.media_type.value,
716 "item_id": db_id,
717 "provider_instance": provider_instance_id,
718 "provider_item_id": provider_item_id,
719 },
720 )
721 # cleanup playlog table
722 await self.mass.music.database.delete(
723 DB_TABLE_PLAYLOG,
724 {
725 "media_type": self.media_type.value,
726 "item_id": provider_item_id,
727 "provider": provider_instance_id,
728 },
729 )
730 library_item.provider_mappings = {
731 x
732 for x in library_item.provider_mappings
733 if not (x.provider_instance == provider_instance_id and x.item_id == provider_item_id)
734 }
735 if library_item.provider_mappings:
736 self.logger.debug(
737 "removed provider_mapping %s/%s from item id %s",
738 provider_instance_id,
739 provider_item_id,
740 db_id,
741 )
742 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
743 else:
744 # remove item if it has no more providers
745 with suppress(AssertionError):
746 await self.remove_item_from_library(db_id)
747
748 @final
749 async def remove_provider_mappings(self, item_id: str | int, provider_instance_id: str) -> None:
750 """Remove all provider mappings from an item."""
751 db_id = int(item_id) # ensure integer
752 try:
753 library_item = await self.get_library_item(db_id)
754 except MediaNotFoundError:
755 # edge case: already deleted / race condition
756 library_item = None
757 # update provider_mappings table
758 await self.mass.music.database.delete(
759 DB_TABLE_PROVIDER_MAPPINGS,
760 {
761 "media_type": self.media_type.value,
762 "item_id": db_id,
763 "provider_instance": provider_instance_id,
764 },
765 )
766 if library_item is None:
767 return
768 # update the item's provider mappings (and check if we still have any)
769 library_item.provider_mappings = {
770 x for x in library_item.provider_mappings if x.provider_instance != provider_instance_id
771 }
772 if library_item.provider_mappings:
773 self.logger.debug(
774 "removed all provider mappings for provider %s from item id %s",
775 provider_instance_id,
776 db_id,
777 )
778 self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
779 else:
780 # remove item if it has no more providers
781 with suppress(AssertionError):
782 await self.remove_item_from_library(db_id)
783
784 @final
785 async def set_provider_mappings(
786 self,
787 item_id: str | int,
788 provider_mappings: Iterable[ProviderMapping],
789 overwrite: bool = False,
790 ) -> None:
791 """Update the provider_items table for the media item."""
792 db_id = int(item_id) # ensure integer
793 if overwrite:
794 # on overwrite, clear the provider_mappings table first
795 # this is done for filesystem provider changing the path (and thus item_id)
796 await self.mass.music.database.delete(
797 DB_TABLE_PROVIDER_MAPPINGS,
798 {"media_type": self.media_type.value, "item_id": db_id},
799 )
800 for provider_mapping in provider_mappings:
801 prov_map_obj = {
802 "media_type": self.media_type.value,
803 "item_id": db_id,
804 "provider_domain": provider_mapping.provider_domain,
805 "provider_instance": provider_mapping.provider_instance,
806 "provider_item_id": provider_mapping.item_id,
807 "available": provider_mapping.available,
808 "audio_format": serialize_to_json(provider_mapping.audio_format),
809 }
810 for key in ("url", "details", "in_library", "is_unique"):
811 if (value := getattr(provider_mapping, key, None)) is not None:
812 prov_map_obj[key] = value
813 await self.mass.music.database.upsert(
814 DB_TABLE_PROVIDER_MAPPINGS,
815 prov_map_obj,
816 )
817
818 @abstractmethod
819 async def _add_library_item(
820 self,
821 item: ItemCls,
822 overwrite_existing: bool = False,
823 ) -> int:
824 """Add artist to library and return the database id."""
825
826 @abstractmethod
827 async def _update_library_item(
828 self, item_id: str | int, update: ItemCls, overwrite: bool = False
829 ) -> None:
830 """Update existing library record in the database."""
831
832 @abstractmethod
833 async def match_providers(self, db_item: ItemCls) -> None:
834 """
835 Try to find match on all (streaming) providers for the provided (database) item.
836
837 This is used to link objects of different providers/qualities together.
838 """
839
840 @abstractmethod
841 async def radio_mode_base_tracks(
842 self,
843 item: ItemCls,
844 preferred_provider_instances: list[str] | None = None,
845 ) -> list[Track]:
846 """
847 Get the list of base tracks from the controller used to calculate the dynamic radio.
848
849 :param item: The MediaItem to get base tracks for.
850 :param preferred_provider_instances: List of preferred provider instance IDs to use.
851 When provided, these providers will be tried first before falling back to others.
852 """
853
854 @final
855 async def get_library_items_by_query(
856 self,
857 favorite: bool | None = None,
858 search: str | None = None,
859 limit: int = 500,
860 offset: int = 0,
861 order_by: str | None = None,
862 provider_filter: list[str] | None = None,
863 extra_query_parts: list[str] | None = None,
864 extra_query_params: dict[str, Any] | None = None,
865 extra_join_parts: list[str] | None = None,
866 in_library_only: bool = False,
867 ) -> list[ItemCls]:
868 """Fetch MediaItem records from database by building the query."""
869 query_params = dict(extra_query_params) if extra_query_params else {}
870 query_parts: list[str] = list(extra_query_parts) if extra_query_parts else []
871 join_parts: list[str] = list(extra_join_parts) if extra_join_parts else []
872 search = self._preprocess_search(search, query_params)
873 # create special performant random query
874 if order_by and order_by.startswith("random"):
875 self._apply_random_subquery(
876 query_parts=query_parts,
877 query_params=query_params,
878 join_parts=join_parts,
879 favorite=favorite,
880 search=search,
881 provider_filter=provider_filter,
882 limit=limit,
883 in_library_only=in_library_only,
884 )
885 else:
886 # apply filters
887 self._apply_filters(
888 query_parts=query_parts,
889 query_params=query_params,
890 join_parts=join_parts,
891 favorite=favorite,
892 search=search,
893 provider_filter=provider_filter,
894 in_library_only=in_library_only,
895 )
896 # build and execute final query
897 sql_query = self._build_final_query(query_parts, join_parts, order_by)
898
899 return [
900 cast("ItemCls", self.item_cls.from_dict(self._parse_db_row(db_row)))
901 for db_row in await self.mass.music.database.get_rows_from_query(
902 sql_query, query_params, limit=limit, offset=offset
903 )
904 ]
905
906 @final
907 def _preprocess_search(self, search: str | None, query_params: dict[str, Any]) -> str | None:
908 """Preprocess search string and add to query params."""
909 if search:
910 search = create_safe_string(search, True, True)
911 query_params["search"] = f"%{search}%"
912 return search
913
914 @final
915 @staticmethod
916 def _clean_query_parts(query_parts: list[str]) -> list[str]:
917 """Clean the query parts list by removing duplicate where statements."""
918 return [x[5:] if x.lower().startswith("where ") else x for x in query_parts]
919
920 @final
921 def _apply_random_subquery(
922 self,
923 query_parts: list[str],
924 query_params: dict[str, Any],
925 join_parts: list[str],
926 favorite: bool | None,
927 search: str | None,
928 provider_filter: list[str] | None,
929 limit: int,
930 in_library_only: bool = False,
931 ) -> None:
932 """Build a fast random subquery with all filters applied."""
933 sub_query_parts = query_parts.copy()
934 sub_join_parts = join_parts.copy()
935
936 # Apply all filters to the subquery
937 self._apply_filters(
938 query_parts=sub_query_parts,
939 query_params=query_params,
940 join_parts=sub_join_parts,
941 favorite=favorite,
942 search=search,
943 provider_filter=provider_filter,
944 in_library_only=in_library_only,
945 )
946
947 # Build the subquery
948 sub_query = f"SELECT {self.db_table}.item_id FROM {self.db_table}"
949
950 if sub_join_parts:
951 sub_query += f" {' '.join(sub_join_parts)}"
952
953 if sub_query_parts:
954 sub_query += " WHERE " + " AND ".join(self._clean_query_parts(sub_query_parts))
955
956 sub_query += f" ORDER BY RANDOM() LIMIT {limit}"
957
958 # The query now only consists of the random subquery, which applies all filters
959 # within itself
960 query_parts.clear()
961 query_parts.append(f"{self.db_table}.item_id in ({sub_query})")
962 join_parts.clear()
963
964 @final
965 def _apply_filters(
966 self,
967 query_parts: list[str],
968 query_params: dict[str, Any],
969 join_parts: list[str],
970 favorite: bool | None,
971 search: str | None,
972 provider_filter: list[str] | None,
973 in_library_only: bool = False,
974 ) -> None:
975 """Apply search, favorite, and provider filters."""
976 # handle search
977 if search:
978 query_parts.append(f"{self.db_table}.search_name LIKE :search")
979 # handle favorite filter
980 if favorite is not None:
981 query_parts.append(f"{self.db_table}.favorite = :favorite")
982 query_params["favorite"] = favorite
983 # Apply the provider filter
984 if provider_filter:
985 provider_conditions = []
986 for idx, prov in enumerate(provider_filter):
987 param_name = f"provider_filter_{idx}"
988 provider_conditions.append(f"provider_mappings.provider_instance = :{param_name}")
989 query_params[param_name] = prov
990 query_params["provider_media_type"] = self.media_type.value
991 in_library_clause = "AND provider_mappings.in_library = 1 " if in_library_only else ""
992 join_parts.append(
993 f"JOIN provider_mappings ON provider_mappings.item_id = {self.db_table}.item_id "
994 "AND provider_mappings.media_type = :provider_media_type "
995 f"{in_library_clause}"
996 f"AND ({' OR '.join(provider_conditions)})"
997 )
998 elif in_library_only:
999 query_params["provider_media_type"] = self.media_type.value
1000 join_parts.append(
1001 f"JOIN provider_mappings ON provider_mappings.item_id = {self.db_table}.item_id "
1002 "AND provider_mappings.media_type = :provider_media_type "
1003 "AND provider_mappings.in_library = 1"
1004 )
1005
1006 @final
1007 def _build_final_query(
1008 self,
1009 query_parts: list[str],
1010 join_parts: list[str],
1011 order_by: str | None,
1012 ) -> str:
1013 """Build the final SQL query string."""
1014 sql_query = self.base_query
1015
1016 # Add joins
1017 if join_parts:
1018 sql_query += f" {' '.join(join_parts)} "
1019
1020 # Add where clauses
1021 if query_parts:
1022 # prevent duplicate where statement
1023 sql_query += " WHERE " + " AND ".join(self._clean_query_parts(query_parts))
1024
1025 # Add grouping and ordering
1026 sql_query += f" GROUP BY {self.db_table}.item_id"
1027
1028 if order_by:
1029 if sort_key := SORT_KEYS.get(order_by):
1030 sql_query += f" ORDER BY {sort_key}"
1031
1032 return sql_query
1033
1034 @final
1035 @staticmethod
1036 def _parse_db_row(db_row: Mapping[str, Any]) -> dict[str, Any]:
1037 """Parse raw db Mapping into a dict."""
1038 db_row_dict = dict(db_row)
1039 db_row_dict["provider"] = "library"
1040 db_row_dict["favorite"] = bool(db_row_dict["favorite"])
1041 db_row_dict["item_id"] = str(db_row_dict["item_id"])
1042 db_row_dict["date_added"] = datetime.fromtimestamp(
1043 db_row_dict["timestamp_added"]
1044 ).isoformat()
1045
1046 for key in JSON_KEYS:
1047 if key not in db_row_dict:
1048 continue
1049 if not (raw_value := db_row_dict[key]):
1050 continue
1051 db_row_dict[key] = json_loads(raw_value)
1052
1053 # copy track_album --> album
1054 if track_album := db_row_dict.get("track_album"):
1055 db_row_dict["album"] = track_album
1056 db_row_dict["disc_number"] = track_album["disc_number"]
1057 db_row_dict["track_number"] = track_album["track_number"]
1058 # always prefer album image over track image
1059 if (album_images := track_album.get("images")) and (
1060 album_thumb := next((x for x in album_images if x["type"] == "thumb"), None)
1061 ):
1062 # copy album image to itemmapping single image
1063 db_row_dict["image"] = album_thumb
1064 if db_row_dict["metadata"].get("images"):
1065 # merge album image with existing images
1066 db_row_dict["metadata"]["images"] = [
1067 album_thumb,
1068 *db_row_dict["metadata"]["images"],
1069 ]
1070 else:
1071 db_row_dict["metadata"]["images"] = [album_thumb]
1072 return db_row_dict
1073
1074 @final
1075 def _ensure_provider_filter(
1076 self,
1077 provider: str | list[str] | None,
1078 ) -> list[str] | None:
1079 """Ensure the provider filter respects the current user's provider filter."""
1080 # Apply user provider filter if needed
1081 user = get_current_user()
1082 user_provider_filter = user.provider_filter if user and user.provider_filter else None
1083 final_provider_filter: list[str] | None = None
1084 if user_provider_filter:
1085 # User has a provider filter set
1086 if provider:
1087 # Explicit provider filter provided - validate against user's allowed providers
1088 requested_providers = [provider] if isinstance(provider, str) else provider
1089 # Only include providers that are in both the user's filter and the requested list
1090 final_provider_filter = [
1091 p for p in requested_providers if p in user_provider_filter
1092 ]
1093 if not final_provider_filter:
1094 # No overlap - user requested providers they don't have access to
1095 raise InsufficientPermissions(
1096 "User does not have permission to access the requested provider(s)."
1097 )
1098 else:
1099 # No explicit filter - use user's provider filter
1100 final_provider_filter = user_provider_filter
1101 elif provider is not None:
1102 # No user filter - use the provided filter as is
1103 final_provider_filter = [provider] if isinstance(provider, str) else provider
1104 return final_provider_filter
1105
1106 @final
1107 def _select_provider_id(self, library_item: ItemCls) -> tuple[str, str]:
1108 """Select the correct provider id to use for fetching the item."""
1109 user = get_current_user()
1110 user_provider_filter = user.provider_filter if user and user.provider_filter else None
1111 # prefer user provider filter if available
1112 for mapping in library_item.provider_mappings:
1113 if user_provider_filter and mapping.provider_instance not in user_provider_filter:
1114 continue
1115 return (mapping.provider_instance, mapping.item_id)
1116 # fallback to first mapping
1117 mapping = next(iter(library_item.provider_mappings))
1118 return (mapping.provider_instance, mapping.item_id)
1119