music-assistant-server

43.9 KBPY
base.py
43.9 KB1,099 lines • python
1"""Base (ABC) MediaType specific controller."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from abc import ABCMeta, abstractmethod
8from collections.abc import Iterable
9from contextlib import suppress
10from datetime import datetime
11from typing import TYPE_CHECKING, Any, TypeVar, cast, final
12
13from music_assistant_models.enums import EventType, ExternalID, MediaType, ProviderFeature
14from music_assistant_models.errors import (
15    InsufficientPermissions,
16    MediaNotFoundError,
17    ProviderUnavailableError,
18)
19from music_assistant_models.media_items import (
20    AudioFormat,
21    ItemMapping,
22    MediaItemType,
23    ProviderMapping,
24    Track,
25)
26
27from music_assistant.constants import DB_TABLE_PLAYLOG, DB_TABLE_PROVIDER_MAPPINGS, MASS_LOGGER_NAME
28from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
29from music_assistant.helpers.compare import compare_media_item, create_safe_string
30from music_assistant.helpers.database import UNSET
31from music_assistant.helpers.json import json_loads, serialize_to_json
32from music_assistant.helpers.util import guard_single_request
33
34if TYPE_CHECKING:
35    from collections.abc import AsyncGenerator, Mapping
36
37    from music_assistant import MusicAssistant
38    from music_assistant.models.music_provider import MusicProvider
39
40
41ItemCls = TypeVar("ItemCls", bound="MediaItemType")
42
43
44JSON_KEYS = (
45    "artists",
46    "track_album",
47    "metadata",
48    "provider_mappings",
49    "external_ids",
50    "narrators",
51    "authors",
52)
53
54SORT_KEYS = {
55    # sqlite has no builtin support for natural sorting
56    # so we have use an additional column for this
57    # this also improves searching and sorting performance
58    "name": "search_name ASC",
59    "name_desc": "search_name DESC",
60    "duration": "duration ASC",
61    "duration_desc": "duration DESC",
62    "sort_name": "search_sort_name ASC",
63    "sort_name_desc": "search_sort_name DESC",
64    "timestamp_added": "timestamp_added ASC",
65    "timestamp_added_desc": "timestamp_added DESC",
66    "timestamp_modified": "timestamp_modified ASC",
67    "timestamp_modified_desc": "timestamp_modified DESC",
68    "last_played": "last_played ASC",
69    "last_played_desc": "last_played DESC",
70    "play_count": "play_count ASC",
71    "play_count_desc": "play_count DESC",
72    "year": "year ASC",
73    "year_desc": "year DESC",
74    "position": "position ASC",
75    "position_desc": "position DESC",
76    "artist_name": "artists.search_name ASC",
77    "artist_name_desc": "artists.search_name DESC",
78    "random": "RANDOM()",
79    "random_play_count": "RANDOM(), play_count ASC",
80}
81
82
83class MediaControllerBase[ItemCls: "MediaItemType"](metaclass=ABCMeta):
84    """Base model for controller managing a MediaType."""
85
86    media_type: MediaType
87    item_cls: type[MediaItemType]
88    db_table: str
89
90    def __init__(self, mass: MusicAssistant) -> None:
91        """Initialize class."""
92        self.mass = mass
93        self.base_query = f"""
94        SELECT
95            {self.db_table}.*,
96            (SELECT JSON_GROUP_ARRAY(
97                json_object(
98                'item_id', provider_mappings.provider_item_id,
99                    'provider_domain', provider_mappings.provider_domain,
100                        'provider_instance', provider_mappings.provider_instance,
101                        'available', provider_mappings.available,
102                        'audio_format', json(provider_mappings.audio_format),
103                        'url', provider_mappings.url,
104                        'details', provider_mappings.details,
105                        'in_library', provider_mappings.in_library,
106                        'is_unique', provider_mappings.is_unique
107                )) FROM provider_mappings WHERE provider_mappings.item_id = {self.db_table}.item_id
108                    AND provider_mappings.media_type = '{self.media_type.value}') AS provider_mappings
109            FROM {self.db_table} """  # noqa: E501
110        self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.music.{self.media_type.value}")
111        # register (base) api handlers
112        self.api_base = api_base = f"{self.media_type}s"
113        self.mass.register_api_command(f"music/{api_base}/count", self.library_count)
114        self.mass.register_api_command(f"music/{api_base}/library_items", self.library_items)
115        self.mass.register_api_command(f"music/{api_base}/get", self.get)
116        # Backward compatibility alias - prefer the generic "get" endpoint
117        self.mass.register_api_command(
118            f"music/{api_base}/get_{self.media_type}", self.get, alias=True
119        )
120        self.mass.register_api_command(
121            f"music/{api_base}/update", self.update_item_in_library, required_role="admin"
122        )
123        self.mass.register_api_command(
124            f"music/{api_base}/remove", self.remove_item_from_library, required_role="admin"
125        )
126        self._db_add_lock = asyncio.Lock()
127
128    @final
129    async def add_item_to_library(
130        self,
131        item: ItemCls,
132        overwrite_existing: bool = False,
133    ) -> ItemCls:
134        """Add item to library and return the new (or updated) database item."""
135        new_item = False
136        # check for existing item first
137        if library_id := await self._get_library_item_by_match(item):
138            # update existing item
139            await self._update_library_item(library_id, item, overwrite=overwrite_existing)
140        else:
141            # actually add a new item in the library db
142            self.mass.music.match_provider_instances(item)
143            async with self._db_add_lock:
144                library_id = await self._add_library_item(item)
145                new_item = True
146        # return final library_item
147        library_item = await self.get_library_item(library_id)
148        self.mass.signal_event(
149            EventType.MEDIA_ITEM_ADDED if new_item else EventType.MEDIA_ITEM_UPDATED,
150            library_item.uri,
151            library_item,
152        )
153        return library_item
154
155    @final
156    async def _get_library_item_by_match(self, item: ItemCls | ItemMapping) -> int | None:
157        if item.provider == "library":
158            return int(item.item_id)
159        # search by provider mappings if item is ItemMapping
160        if isinstance(item, ItemMapping):
161            if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
162                return int(cur_item.item_id)
163
164        # for all other items that are MediaItemType, check provider_mappings if it exists
165        provider_mappings = getattr(item, "provider_mappings", None)
166        if provider_mappings:
167            if cur_item := await self.get_library_item_by_prov_mappings(provider_mappings):
168                return int(cur_item.item_id)
169        if cur_item := await self.get_library_item_by_external_ids(item.external_ids):
170            # existing item match by external id
171            # Double check external IDs - if MBID exists, regards that as overriding
172            if compare_media_item(item, cur_item):
173                return int(cur_item.item_id)
174        # search by (exact) name match
175        query = f"{self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
176        query_params = {"name": item.name, "sort_name": item.sort_name}
177        for db_item in await self.get_library_items_by_query(
178            extra_query_parts=[query], extra_query_params=query_params
179        ):
180            if compare_media_item(db_item, item, True):
181                return int(db_item.item_id)
182        return None
183
184    @final
185    async def update_item_in_library(
186        self, item_id: str | int, update: ItemCls, overwrite: bool = False
187    ) -> ItemCls:
188        """Update existing library record in the library database."""
189        self.mass.music.match_provider_instances(update)
190        await self._update_library_item(item_id, update, overwrite=overwrite)
191        # return the updated object
192        library_item = await self.get_library_item(item_id)
193        self.mass.signal_event(
194            EventType.MEDIA_ITEM_UPDATED,
195            library_item.uri,
196            library_item,
197        )
198        return library_item
199
200    async def remove_item_from_library(self, item_id: str | int, recursive: bool = True) -> None:
201        """Delete library record from the database."""
202        db_id = int(item_id)  # ensure integer
203        library_item = await self.get_library_item(db_id)
204        assert library_item, f"Item does not exist: {db_id}"
205        # delete item
206        await self.mass.music.database.delete(
207            self.db_table,
208            {"item_id": db_id},
209        )
210        # update provider_mappings table
211        await self.mass.music.database.delete(
212            DB_TABLE_PROVIDER_MAPPINGS,
213            {"media_type": self.media_type.value, "item_id": db_id},
214        )
215        # cleanup playlog table
216        await self.mass.music.database.delete(
217            DB_TABLE_PLAYLOG,
218            {
219                "media_type": self.media_type.value,
220                "item_id": db_id,
221                "provider": "library",
222            },
223        )
224        for prov_mapping in library_item.provider_mappings:
225            await self.mass.music.database.delete(
226                DB_TABLE_PLAYLOG,
227                {
228                    "media_type": self.media_type.value,
229                    "item_id": prov_mapping.item_id,
230                    "provider": prov_mapping.provider_instance,
231                },
232            )
233        # NOTE: this does not delete any references to this item in other records,
234        # this is handled/overridden in the mediatype specific controllers
235        self.mass.signal_event(EventType.MEDIA_ITEM_DELETED, library_item.uri, library_item)
236        self.logger.debug("deleted item with id %s from database", db_id)
237
238    async def library_count(self, favorite_only: bool = False) -> int:
239        """Return the total number of items in the library."""
240        if favorite_only:
241            sql_query = f"SELECT item_id FROM {self.db_table} WHERE favorite = 1"
242            return await self.mass.music.database.get_count_from_query(sql_query)
243        return await self.mass.music.database.get_count(self.db_table)
244
245    async def library_items(
246        self,
247        favorite: bool | None = None,
248        search: str | None = None,
249        limit: int = 500,
250        offset: int = 0,
251        order_by: str = "sort_name",
252        provider: str | list[str] | None = None,
253    ) -> list[ItemCls]:
254        """
255        Get the library items for this mediatype.
256
257        :param favorite: Filter by favorite status.
258        :param search: Filter by search query.
259        :param limit: Maximum number of items to return.
260        :param offset: Number of items to skip.
261        :param order_by: Order by field (e.g. 'sort_name', 'timestamp_added').
262        :param provider: Filter by provider instance ID (single string or list).
263        """
264        return await self.get_library_items_by_query(
265            favorite=favorite,
266            search=search,
267            limit=limit,
268            offset=offset,
269            order_by=order_by,
270            provider_filter=self._ensure_provider_filter(provider),
271        )
272
273    async def iter_library_items(
274        self,
275        favorite: bool | None = None,
276        search: str | None = None,
277        order_by: str = "sort_name",
278        provider: str | list[str] | None = None,
279    ) -> AsyncGenerator[ItemCls, None]:
280        """Iterate all in-database items."""
281        limit: int = 500
282        offset: int = 0
283        if provider is not None:
284            provider_filter = provider if isinstance(provider, list) else [provider]
285        else:
286            provider_filter = None
287        while True:
288            next_items = await self.get_library_items_by_query(
289                favorite=favorite,
290                search=search,
291                limit=limit,
292                offset=offset,
293                order_by=order_by,
294                provider_filter=provider_filter,
295            )
296            for item in next_items:
297                yield item
298            if len(next_items) < limit:
299                break
300            offset += limit
301
302    async def get(
303        self,
304        item_id: str,
305        provider_instance_id_or_domain: str,
306    ) -> ItemCls:
307        """Return (full) details for a single media item."""
308        # always prefer the full library item if we have it
309        if library_item := await self.get_library_item_by_prov_id(
310            item_id,
311            provider_instance_id_or_domain,
312        ):
313            # schedule a refresh of the metadata on access of the item
314            # e.g. the item is being played or opened in the UI
315            assert library_item.uri is not None
316            self.mass.metadata.schedule_update_metadata(library_item.uri)
317            return library_item
318        # grab full details from the provider
319        return await self.get_provider_item(
320            item_id,
321            provider_instance_id_or_domain,
322        )
323
324    async def search(
325        self,
326        search_query: str,
327        provider_instance_id_or_domain: str,
328        limit: int = 25,
329    ) -> list[ItemCls]:
330        """Search database or provider with given query."""
331        # create safe search string
332        search_query = search_query.replace("/", " ").replace("'", "")
333        if provider_instance_id_or_domain == "library":
334            return await self.library_items(search=search_query, limit=limit)
335        if not (prov := self.mass.get_provider(provider_instance_id_or_domain)):
336            return []
337        prov = cast("MusicProvider", prov)
338        if ProviderFeature.SEARCH not in prov.supported_features:
339            return []
340        if not prov.library_supported(self.media_type):
341            # assume library supported also means that this mediatype is supported
342            return []
343        searchresult = await prov.search(
344            search_query,
345            [self.media_type],
346            limit,
347        )
348        match self.media_type:
349            case MediaType.ARTIST:
350                return cast("list[ItemCls]", searchresult.artists)
351            case MediaType.ALBUM:
352                return cast("list[ItemCls]", searchresult.albums)
353            case MediaType.TRACK:
354                return cast("list[ItemCls]", searchresult.tracks)
355            case MediaType.PLAYLIST:
356                return cast("list[ItemCls]", searchresult.playlists)
357            case MediaType.AUDIOBOOK:
358                return cast("list[ItemCls]", searchresult.audiobooks)
359            case MediaType.PODCAST:
360                return cast("list[ItemCls]", searchresult.podcasts)
361            case MediaType.RADIO:
362                return cast("list[ItemCls]", searchresult.radio)
363            case _:
364                return []
365
366    async def get_library_item(self, item_id: int | str) -> ItemCls:
367        """Get single library item by id."""
368        db_id = int(item_id)  # ensure integer
369        extra_query = f"WHERE {self.db_table}.item_id = :item_id"
370        for db_item in await self.get_library_items_by_query(
371            extra_query_parts=[extra_query],
372            extra_query_params={"item_id": db_id},
373        ):
374            return db_item
375        msg = f"{self.media_type.value} not found in library: {db_id}"
376        raise MediaNotFoundError(msg)
377
378    async def get_library_item_by_prov_id(
379        self,
380        item_id: str,
381        provider_instance_id_or_domain: str,
382    ) -> ItemCls | None:
383        """Get the library item for the given provider_instance."""
384        assert item_id
385        assert provider_instance_id_or_domain
386        if provider_instance_id_or_domain == "library":
387            return await self.get_library_item(item_id)
388        for item in await self.get_library_items_by_prov_id(
389            provider_instance_id_or_domain=provider_instance_id_or_domain,
390            provider_item_id=item_id,
391        ):
392            return item
393        return None
394
395    @final
396    async def get_library_item_by_prov_mappings(
397        self,
398        provider_mappings: Iterable[ProviderMapping],
399    ) -> ItemCls | None:
400        """Get the library item for the given provider_instance."""
401        # always prefer provider instance first
402        for mapping in provider_mappings:
403            for item in await self.get_library_items_by_prov_id(
404                provider_instance=mapping.provider_instance,
405                provider_item_id=mapping.item_id,
406            ):
407                return item
408        # check by domain too
409        for mapping in provider_mappings:
410            for item in await self.get_library_items_by_prov_id(
411                provider_domain=mapping.provider_domain,
412                provider_item_id=mapping.item_id,
413            ):
414                return item
415        return None
416
417    @final
418    async def get_library_item_by_external_id(
419        self, external_id: str, external_id_type: ExternalID | None = None
420    ) -> ItemCls | None:
421        """Get the library item for the given external id."""
422        query = f"{self.db_table}.external_ids LIKE :external_id_str"
423        if external_id_type:
424            external_id_str = f'%"{external_id_type}","{external_id}"%'
425        else:
426            external_id_str = f'%"{external_id}"%'
427        for item in await self.get_library_items_by_query(
428            extra_query_parts=[query],
429            extra_query_params={"external_id_str": external_id_str},
430        ):
431            return item
432        return None
433
434    @final
435    async def get_library_item_by_external_ids(
436        self, external_ids: set[tuple[ExternalID, str]]
437    ) -> ItemCls | None:
438        """Get the library item for (one of) the given external ids."""
439        for external_id_type, external_id in external_ids:
440            if match := await self.get_library_item_by_external_id(external_id, external_id_type):
441                return match
442        return None
443
444    @final
445    async def get_library_items_by_prov_id(
446        self,
447        provider_domain: str | None = None,
448        provider_instance: str | None = None,
449        provider_instance_id_or_domain: str | None = None,
450        provider_item_id: str | None = None,
451        limit: int = 500,
452        offset: int = 0,
453    ) -> list[ItemCls]:
454        """Fetch all records from library for given provider."""
455        assert provider_instance_id_or_domain != "library"
456        assert provider_domain != "library"
457        assert provider_instance != "library"
458        subquery_parts: list[str] = []
459        query_params: dict[str, Any] = {}
460        if provider_instance:
461            query_params = {"prov_id": provider_instance}
462            subquery_parts.append("provider_mappings.provider_instance = :prov_id")
463        elif provider_domain:
464            query_params = {"prov_id": provider_domain}
465            subquery_parts.append("provider_mappings.provider_domain = :prov_id")
466        else:
467            query_params = {"prov_id": provider_instance_id_or_domain}
468            subquery_parts.append(
469                "(provider_mappings.provider_instance = :prov_id "
470                "OR provider_mappings.provider_domain = :prov_id)"
471            )
472        if provider_item_id:
473            subquery_parts.append("provider_mappings.provider_item_id = :item_id")
474            query_params["item_id"] = provider_item_id
475        subquery = f"SELECT item_id FROM provider_mappings WHERE {' AND '.join(subquery_parts)}"
476        query = f"WHERE {self.db_table}.item_id IN ({subquery})"
477        return await self.get_library_items_by_query(
478            limit=limit,
479            offset=offset,
480            extra_query_parts=[query],
481            extra_query_params=query_params,
482        )
483
484    @final
485    async def iter_library_items_by_prov_id(
486        self,
487        provider_instance_id_or_domain: str,
488        provider_item_id: str | None = None,
489    ) -> AsyncGenerator[ItemCls, None]:
490        """Iterate all records from database for given provider."""
491        limit: int = 500
492        offset: int = 0
493        while True:
494            next_items = await self.get_library_items_by_prov_id(
495                provider_instance_id_or_domain=provider_instance_id_or_domain,
496                provider_item_id=provider_item_id,
497                limit=limit,
498                offset=offset,
499            )
500            for item in next_items:
501                yield item
502            if len(next_items) < limit:
503                break
504            offset += limit
505
506    @final
507    async def set_favorite(self, item_id: str | int, favorite: bool) -> None:
508        """Set the favorite bool on a database item."""
509        db_id = int(item_id)  # ensure integer
510        library_item = await self.get_library_item(db_id)
511        if library_item.favorite == favorite:
512            return
513        match = {"item_id": db_id}
514        await self.mass.music.database.update(self.db_table, match, {"favorite": favorite})
515        library_item = await self.get_library_item(db_id)
516        self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
517
518    @guard_single_request  # type: ignore[type-var]  # TODO: fix typing for MediaControllerBase
519    @final
520    async def get_provider_item(
521        self,
522        item_id: str,
523        provider_instance_id_or_domain: str,
524        force_refresh: bool = False,
525        fallback: ItemMapping | ItemCls | None = None,
526    ) -> ItemCls:
527        """Return item details for the given provider item id."""
528        if provider_instance_id_or_domain == "library":
529            return await self.get_library_item(item_id)
530        if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
531            raise ProviderUnavailableError(f"{provider_instance_id_or_domain} is not available")
532        if provider := self.mass.get_provider(provider_instance_id_or_domain):
533            provider = cast("MusicProvider", provider)
534            with suppress(MediaNotFoundError):
535                async with self.mass.cache.handle_refresh(force_refresh):
536                    return cast("ItemCls", await provider.get_item(self.media_type, item_id))
537        # if we reach this point all possibilities failed and the item could not be found.
538        # There is a possibility that the (streaming) provider changed the id of the item
539        # so we return the previous details (if we have any) marked as unavailable, so
540        # at least we have the possibility to sort out the new id through matching logic.
541        fallback = fallback or await self.get_library_item_by_prov_id(
542            item_id, provider_instance_id_or_domain
543        )
544        if (
545            fallback
546            and isinstance(fallback, ItemMapping)
547            and (fallback_provider := self.mass.get_provider(fallback.provider))
548        ):
549            # fallback is a ItemMapping, try to convert to full item
550            with suppress(LookupError, TypeError, ValueError):
551                return cast(
552                    "ItemCls",
553                    self.item_cls.from_dict(
554                        {
555                            **fallback.to_dict(),
556                            "provider_mappings": [
557                                {
558                                    "item_id": fallback.item_id,
559                                    "provider_domain": fallback_provider.domain,
560                                    "provider_instance": fallback_provider.instance_id,
561                                    "available": fallback.available,
562                                }
563                            ],
564                        }
565                    ),
566                )
567        if fallback:
568            # simply return the fallback item
569            return cast("ItemCls", fallback)
570        # all options exhausted, we really can not find this item
571        msg = (
572            f"{self.media_type.value}://{item_id} not "
573            f"found on provider {provider_instance_id_or_domain}"
574        )
575        raise MediaNotFoundError(msg)
576
577    @final
578    async def add_provider_mapping(
579        self, item_id: str | int, provider_mapping: ProviderMapping
580    ) -> None:
581        """Add provider mapping to existing library item."""
582        await self.add_provider_mappings(item_id, [provider_mapping])
583
584    @final
585    async def add_provider_mappings(
586        self, item_id: str | int, provider_mappings: Iterable[ProviderMapping]
587    ) -> None:
588        """
589        Add provider mappings to existing library item.
590
591        :param item_id: The library item ID to add mappings to.
592        :param provider_mappings: The provider mappings to add.
593        """
594        db_id = int(item_id)  # ensure integer
595        library_item = await self.get_library_item(db_id)
596        new_mappings: set[ProviderMapping] = set()
597        for provider_mapping in provider_mappings:
598            # ignore if the mapping is already present
599            if provider_mapping not in library_item.provider_mappings:
600                new_mappings.add(provider_mapping)
601        if not new_mappings:
602            return
603        # handle special case where the user wants to merge 2 library items
604        for mapping in new_mappings:
605            if _library_item := await self.get_library_item_by_prov_id(
606                mapping.item_id, mapping.provider_instance
607            ):
608                if _library_item.item_id != library_item.item_id:
609                    # merging items
610                    self.logger.debug(
611                        "merging item id %s into item id %s based on provider mapping %s/%s",
612                        _library_item.item_id,
613                        library_item.item_id,
614                        mapping.provider_instance,
615                        mapping.item_id,
616                    )
617                    await self.remove_item_from_library(_library_item.item_id, recursive=True)
618                    break
619        library_item.provider_mappings.update(new_mappings)
620        self.mass.music.match_provider_instances(library_item)
621        await self.set_provider_mappings(db_id, library_item.provider_mappings)
622        self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
623
624    @final
625    async def update_provider_mapping(
626        self,
627        item_id: str | int,
628        provider_instance_id: str,
629        provider_item_id: str,
630        *,
631        available: bool | Any = UNSET,
632        in_library: bool | Any = UNSET,
633        is_unique: bool | None | Any = UNSET,
634        url: str | None | Any = UNSET,
635        details: str | None | Any = UNSET,
636        audio_format: AudioFormat | Any = UNSET,
637    ) -> None:
638        """Update an existing provider mapping for a library item."""
639        db_id = int(item_id)  # ensure integer
640        library_item = await self.get_library_item(db_id)
641
642        # find the current mapping (strictly by provider instance + provider item id)
643        cur_mapping: ProviderMapping | None = None
644        for mapping in library_item.provider_mappings:
645            if (
646                mapping.provider_instance == provider_instance_id
647                and mapping.item_id == provider_item_id
648            ):
649                cur_mapping = mapping
650                break
651        if cur_mapping is None:
652            msg = (
653                f"Provider mapping {provider_instance_id}/{provider_item_id} "
654                f"not found for item {db_id}"
655            )
656            raise MediaNotFoundError(msg)
657
658        # guard against nulls for NOT NULL columns
659        if available is None:
660            available = UNSET
661        if in_library is None:
662            in_library = UNSET
663
664        updates: dict[str, Any] = {}
665        if available is not UNSET:
666            updates["available"] = bool(available)
667        if in_library is not UNSET:
668            updates["in_library"] = bool(in_library)
669        if is_unique is not UNSET:
670            updates["is_unique"] = is_unique
671        if url is not UNSET:
672            updates["url"] = url
673        if details is not UNSET:
674            updates["details"] = details
675        if audio_format is not UNSET:
676            updates["audio_format"] = serialize_to_json(audio_format)
677
678        if not updates:
679            return
680
681        match = {
682            "media_type": self.media_type.value,
683            "item_id": db_id,
684            "provider_instance": provider_instance_id,
685            "provider_item_id": provider_item_id,
686        }
687        await self.mass.music.database.update(DB_TABLE_PROVIDER_MAPPINGS, match, updates)
688
689        # Re-fetch the updated item so the event payload reflects persisted DB state.
690        updated_item = await self.get_library_item(db_id)
691        self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, updated_item.uri, updated_item)
692
693    @final
694    async def remove_provider_mapping(
695        self, item_id: str | int, provider_instance_id: str, provider_item_id: str
696    ) -> None:
697        """Remove provider mapping(s) from item."""
698        db_id = int(item_id)  # ensure integer
699        try:
700            library_item = await self.get_library_item(db_id)
701        except MediaNotFoundError:
702            # edge case: already deleted / race condition
703            return
704
705        # update provider_mappings table
706        await self.mass.music.database.delete(
707            DB_TABLE_PROVIDER_MAPPINGS,
708            {
709                "media_type": self.media_type.value,
710                "item_id": db_id,
711                "provider_instance": provider_instance_id,
712                "provider_item_id": provider_item_id,
713            },
714        )
715        # cleanup playlog table
716        await self.mass.music.database.delete(
717            DB_TABLE_PLAYLOG,
718            {
719                "media_type": self.media_type.value,
720                "item_id": provider_item_id,
721                "provider": provider_instance_id,
722            },
723        )
724        library_item.provider_mappings = {
725            x
726            for x in library_item.provider_mappings
727            if not (x.provider_instance == provider_instance_id and x.item_id == provider_item_id)
728        }
729        if library_item.provider_mappings:
730            self.logger.debug(
731                "removed provider_mapping %s/%s from item id %s",
732                provider_instance_id,
733                provider_item_id,
734                db_id,
735            )
736            self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
737        else:
738            # remove item if it has no more providers
739            with suppress(AssertionError):
740                await self.remove_item_from_library(db_id)
741
742    @final
743    async def remove_provider_mappings(self, item_id: str | int, provider_instance_id: str) -> None:
744        """Remove all provider mappings from an item."""
745        db_id = int(item_id)  # ensure integer
746        try:
747            library_item = await self.get_library_item(db_id)
748        except MediaNotFoundError:
749            # edge case: already deleted / race condition
750            library_item = None
751        # update provider_mappings table
752        await self.mass.music.database.delete(
753            DB_TABLE_PROVIDER_MAPPINGS,
754            {
755                "media_type": self.media_type.value,
756                "item_id": db_id,
757                "provider_instance": provider_instance_id,
758            },
759        )
760        if library_item is None:
761            return
762        # update the item's provider mappings (and check if we still have any)
763        library_item.provider_mappings = {
764            x for x in library_item.provider_mappings if x.provider_instance != provider_instance_id
765        }
766        if library_item.provider_mappings:
767            self.logger.debug(
768                "removed all provider mappings for provider %s from item id %s",
769                provider_instance_id,
770                db_id,
771            )
772            self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, library_item.uri, library_item)
773        else:
774            # remove item if it has no more providers
775            with suppress(AssertionError):
776                await self.remove_item_from_library(db_id)
777
778    @final
779    async def set_provider_mappings(
780        self,
781        item_id: str | int,
782        provider_mappings: Iterable[ProviderMapping],
783        overwrite: bool = False,
784    ) -> None:
785        """Update the provider_items table for the media item."""
786        db_id = int(item_id)  # ensure integer
787        if overwrite:
788            # on overwrite, clear the provider_mappings table first
789            # this is done for filesystem provider changing the path (and thus item_id)
790            await self.mass.music.database.delete(
791                DB_TABLE_PROVIDER_MAPPINGS,
792                {"media_type": self.media_type.value, "item_id": db_id},
793            )
794        for provider_mapping in provider_mappings:
795            prov_map_obj = {
796                "media_type": self.media_type.value,
797                "item_id": db_id,
798                "provider_domain": provider_mapping.provider_domain,
799                "provider_instance": provider_mapping.provider_instance,
800                "provider_item_id": provider_mapping.item_id,
801                "available": provider_mapping.available,
802                "audio_format": serialize_to_json(provider_mapping.audio_format),
803            }
804            for key in ("url", "details", "in_library", "is_unique"):
805                if (value := getattr(provider_mapping, key, None)) is not None:
806                    prov_map_obj[key] = value
807            await self.mass.music.database.upsert(
808                DB_TABLE_PROVIDER_MAPPINGS,
809                prov_map_obj,
810            )
811
812    @abstractmethod
813    async def _add_library_item(
814        self,
815        item: ItemCls,
816        overwrite_existing: bool = False,
817    ) -> int:
818        """Add artist to library and return the database id."""
819
820    @abstractmethod
821    async def _update_library_item(
822        self, item_id: str | int, update: ItemCls, overwrite: bool = False
823    ) -> None:
824        """Update existing library record in the database."""
825
826    @abstractmethod
827    async def match_providers(self, db_item: ItemCls) -> None:
828        """
829        Try to find match on all (streaming) providers for the provided (database) item.
830
831        This is used to link objects of different providers/qualities together.
832        """
833
834    @abstractmethod
835    async def radio_mode_base_tracks(
836        self,
837        item: ItemCls,
838        preferred_provider_instances: list[str] | None = None,
839    ) -> list[Track]:
840        """
841        Get the list of base tracks from the controller used to calculate the dynamic radio.
842
843        :param item: The MediaItem to get base tracks for.
844        :param preferred_provider_instances: List of preferred provider instance IDs to use.
845            When provided, these providers will be tried first before falling back to others.
846        """
847
848    @final
849    async def get_library_items_by_query(
850        self,
851        favorite: bool | None = None,
852        search: str | None = None,
853        limit: int = 500,
854        offset: int = 0,
855        order_by: str | None = None,
856        provider_filter: list[str] | None = None,
857        extra_query_parts: list[str] | None = None,
858        extra_query_params: dict[str, Any] | None = None,
859        extra_join_parts: list[str] | None = None,
860    ) -> list[ItemCls]:
861        """Fetch MediaItem records from database by building the query."""
862        query_params = dict(extra_query_params) if extra_query_params else {}
863        query_parts: list[str] = list(extra_query_parts) if extra_query_parts else []
864        join_parts: list[str] = list(extra_join_parts) if extra_join_parts else []
865        search = self._preprocess_search(search, query_params)
866        # create special performant random query
867        if order_by and order_by.startswith("random"):
868            self._apply_random_subquery(
869                query_parts=query_parts,
870                query_params=query_params,
871                join_parts=join_parts,
872                favorite=favorite,
873                search=search,
874                provider_filter=provider_filter,
875                limit=limit,
876            )
877        else:
878            # apply filters
879            self._apply_filters(
880                query_parts=query_parts,
881                query_params=query_params,
882                join_parts=join_parts,
883                favorite=favorite,
884                search=search,
885                provider_filter=provider_filter,
886            )
887        # build and execute final query
888        sql_query = self._build_final_query(query_parts, join_parts, order_by)
889
890        return [
891            cast("ItemCls", self.item_cls.from_dict(self._parse_db_row(db_row)))
892            for db_row in await self.mass.music.database.get_rows_from_query(
893                sql_query, query_params, limit=limit, offset=offset
894            )
895        ]
896
897    @final
898    def _preprocess_search(self, search: str | None, query_params: dict[str, Any]) -> str | None:
899        """Preprocess search string and add to query params."""
900        if search:
901            search = create_safe_string(search, True, True)
902            query_params["search"] = f"%{search}%"
903        return search
904
905    @final
906    @staticmethod
907    def _clean_query_parts(query_parts: list[str]) -> list[str]:
908        """Clean the query parts list by removing duplicate where statements."""
909        return [x[5:] if x.lower().startswith("where ") else x for x in query_parts]
910
911    @final
912    def _apply_random_subquery(
913        self,
914        query_parts: list[str],
915        query_params: dict[str, Any],
916        join_parts: list[str],
917        favorite: bool | None,
918        search: str | None,
919        provider_filter: list[str] | None,
920        limit: int,
921    ) -> None:
922        """Build a fast random subquery with all filters applied."""
923        sub_query_parts = query_parts.copy()
924        sub_join_parts = join_parts.copy()
925
926        # Apply all filters to the subquery
927        self._apply_filters(
928            query_parts=sub_query_parts,
929            query_params=query_params,
930            join_parts=sub_join_parts,
931            favorite=favorite,
932            search=search,
933            provider_filter=provider_filter,
934        )
935
936        # Build the subquery
937        sub_query = f"SELECT {self.db_table}.item_id FROM {self.db_table}"
938
939        if sub_join_parts:
940            sub_query += f" {' '.join(sub_join_parts)}"
941
942        if sub_query_parts:
943            sub_query += " WHERE " + " AND ".join(self._clean_query_parts(sub_query_parts))
944
945        sub_query += f" ORDER BY RANDOM() LIMIT {limit}"
946
947        # The query now only consists of the random subquery, which applies all filters
948        # within itself
949        query_parts.clear()
950        query_parts.append(f"{self.db_table}.item_id in ({sub_query})")
951        join_parts.clear()
952
953    @final
954    def _apply_filters(
955        self,
956        query_parts: list[str],
957        query_params: dict[str, Any],
958        join_parts: list[str],
959        favorite: bool | None,
960        search: str | None,
961        provider_filter: list[str] | None,
962    ) -> None:
963        """Apply search, favorite, and provider filters."""
964        # handle search
965        if search:
966            query_parts.append(f"{self.db_table}.search_name LIKE :search")
967        # handle favorite filter
968        if favorite is not None:
969            query_parts.append(f"{self.db_table}.favorite = :favorite")
970            query_params["favorite"] = favorite
971        # Apply the provider filter
972        if provider_filter:
973            provider_conditions = []
974            for idx, prov in enumerate(provider_filter):
975                param_name = f"provider_filter_{idx}"
976                provider_conditions.append(f"provider_mappings.provider_instance = :{param_name}")
977                query_params[param_name] = prov
978            query_params["provider_media_type"] = self.media_type.value
979            join_parts.append(
980                f"JOIN provider_mappings ON provider_mappings.item_id = {self.db_table}.item_id "
981                "AND provider_mappings.media_type = :provider_media_type "
982                "AND provider_mappings.in_library = 1 "
983                f"AND ({' OR '.join(provider_conditions)})"
984            )
985
986    @final
987    def _build_final_query(
988        self,
989        query_parts: list[str],
990        join_parts: list[str],
991        order_by: str | None,
992    ) -> str:
993        """Build the final SQL query string."""
994        sql_query = self.base_query
995
996        # Add joins
997        if join_parts:
998            sql_query += f" {' '.join(join_parts)} "
999
1000        # Add where clauses
1001        if query_parts:
1002            # prevent duplicate where statement
1003            sql_query += " WHERE " + " AND ".join(self._clean_query_parts(query_parts))
1004
1005        # Add grouping and ordering
1006        sql_query += f" GROUP BY {self.db_table}.item_id"
1007
1008        if order_by:
1009            if sort_key := SORT_KEYS.get(order_by):
1010                sql_query += f" ORDER BY {sort_key}"
1011
1012        return sql_query
1013
1014    @final
1015    @staticmethod
1016    def _parse_db_row(db_row: Mapping[str, Any]) -> dict[str, Any]:
1017        """Parse raw db Mapping into a dict."""
1018        db_row_dict = dict(db_row)
1019        db_row_dict["provider"] = "library"
1020        db_row_dict["favorite"] = bool(db_row_dict["favorite"])
1021        db_row_dict["item_id"] = str(db_row_dict["item_id"])
1022        db_row_dict["date_added"] = datetime.fromtimestamp(
1023            db_row_dict["timestamp_added"]
1024        ).isoformat()
1025
1026        for key in JSON_KEYS:
1027            if key not in db_row_dict:
1028                continue
1029            if not (raw_value := db_row_dict[key]):
1030                continue
1031            db_row_dict[key] = json_loads(raw_value)
1032
1033        # copy track_album --> album
1034        if track_album := db_row_dict.get("track_album"):
1035            db_row_dict["album"] = track_album
1036            db_row_dict["disc_number"] = track_album["disc_number"]
1037            db_row_dict["track_number"] = track_album["track_number"]
1038            # always prefer album image over track image
1039            if (album_images := track_album.get("images")) and (
1040                album_thumb := next((x for x in album_images if x["type"] == "thumb"), None)
1041            ):
1042                # copy album image to itemmapping single image
1043                db_row_dict["image"] = album_thumb
1044                if db_row_dict["metadata"].get("images"):
1045                    # merge album image with existing images
1046                    db_row_dict["metadata"]["images"] = [
1047                        album_thumb,
1048                        *db_row_dict["metadata"]["images"],
1049                    ]
1050                else:
1051                    db_row_dict["metadata"]["images"] = [album_thumb]
1052        return db_row_dict
1053
1054    @final
1055    def _ensure_provider_filter(
1056        self,
1057        provider: str | list[str] | None,
1058    ) -> list[str] | None:
1059        """Ensure the provider filter respects the current user's provider filter."""
1060        # Apply user provider filter if needed
1061        user = get_current_user()
1062        user_provider_filter = user.provider_filter if user and user.provider_filter else None
1063        final_provider_filter: list[str] | None = None
1064        if user_provider_filter:
1065            # User has a provider filter set
1066            if provider:
1067                # Explicit provider filter provided - validate against user's allowed providers
1068                requested_providers = [provider] if isinstance(provider, str) else provider
1069                # Only include providers that are in both the user's filter and the requested list
1070                final_provider_filter = [
1071                    p for p in requested_providers if p in user_provider_filter
1072                ]
1073                if not final_provider_filter:
1074                    # No overlap - user requested providers they don't have access to
1075                    raise InsufficientPermissions(
1076                        "User does not have permission to access the requested provider(s)."
1077                    )
1078            else:
1079                # No explicit filter - use user's provider filter
1080                final_provider_filter = user_provider_filter
1081        elif provider is not None:
1082            # No user filter - use the provided filter as is
1083            final_provider_filter = [provider] if isinstance(provider, str) else provider
1084        return final_provider_filter
1085
1086    @final
1087    def _select_provider_id(self, library_item: ItemCls) -> tuple[str, str]:
1088        """Select the correct provider id to use for fetching the item."""
1089        user = get_current_user()
1090        user_provider_filter = user.provider_filter if user and user.provider_filter else None
1091        # prefer user provider filter if available
1092        for mapping in library_item.provider_mappings:
1093            if user_provider_filter and mapping.provider_instance not in user_provider_filter:
1094                continue
1095            return (mapping.provider_instance, mapping.item_id)
1096        # fallback to first mapping
1097        mapping = next(iter(library_item.provider_mappings))
1098        return (mapping.provider_instance, mapping.item_id)
1099