music-assistant-server

11 KBPY
__init__.py
11 KB317 lines • python
1"""Allows scrobbling of tracks with the help of PyLast."""
2
3import asyncio
4import logging
5import time
6from collections.abc import Callable
7from typing import TYPE_CHECKING, cast
8
9import pylast
10from music_assistant_models.config_entries import (
11    ConfigEntry,
12    ConfigValueOption,
13    ConfigValueType,
14    ProviderConfig,
15)
16from music_assistant_models.constants import SECURE_STRING_SUBSTITUTE
17from music_assistant_models.enums import ConfigEntryType, EventType, ProviderFeature
18from music_assistant_models.errors import LoginFailed, SetupFailedError
19from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
20from music_assistant_models.provider import ProviderManifest
21
22from music_assistant.constants import MASS_LOGGER_NAME
23from music_assistant.helpers.auth import AuthenticationHelper
24from music_assistant.helpers.scrobbler import (
25    ScrobblerConfig,
26    ScrobblerHelper,
27    create_scrobble_users_config_entry,
28)
29from music_assistant.mass import MusicAssistant
30from music_assistant.models import ProviderInstanceType
31from music_assistant.models.plugin import PluginProvider
32
33SUPPORTED_FEATURES: set[ProviderFeature] = (
34    set()
35)  # we don't have any special supported features (yet)
36
37
38async def setup(
39    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
40) -> ProviderInstanceType:
41    """Initialize provider(instance) with given configuration."""
42    provider = LastFMScrobbleProvider(mass, manifest, config, SUPPORTED_FEATURES)
43    pylast.logger.setLevel(provider.logger.level)
44
45    # httpcore is very spammy on debug without providing useful information 99% of the time
46    if provider.logger.level == logging.DEBUG:
47        logging.getLogger("httpcore").setLevel(logging.INFO)
48    else:
49        logging.getLogger("httpcore").setLevel(logging.WARNING)
50
51    return provider
52
53
54class LastFMScrobbleProvider(PluginProvider):
55    """Plugin provider to support scrobbling of tracks."""
56
57    network: pylast._Network
58    _on_unload: list[Callable[[], None]]
59
60    async def handle_async_init(self) -> None:
61        """Handle async setup."""
62        self._on_unload: list[Callable[[], None]] = []
63
64        if not self.config.get_value(CONF_API_KEY) or not self.config.get_value(CONF_API_SECRET):
65            raise SetupFailedError("API Key and Secret need to be set")
66
67        if not self.config.get_value(CONF_SESSION_KEY):
68            self.logger.info("No session key available, don't forget to authenticate!")
69            return
70        # creating the network instance is (potentially) blocking IO
71        # so run it in an executor thread to be safe
72        self.network = await asyncio.to_thread(get_network, self._get_network_config())
73
74    async def loaded_in_mass(self) -> None:
75        """Call after the provider has been loaded."""
76        await super().loaded_in_mass()
77
78        # subscribe to media_item_played event
79        handler = LastFMEventHandler(self.network, self.logger, self.config)
80        self._on_unload.append(
81            self.mass.subscribe(handler._on_mass_media_item_played, EventType.MEDIA_ITEM_PLAYED)
82        )
83
84    async def unload(self, is_removed: bool = False) -> None:
85        """Handle unload/close of the provider.
86
87        Called when provider is deregistered (e.g. MA exiting or config reloading).
88        """
89        for unload_cb in self._on_unload:
90            unload_cb()
91
92    def _get_network_config(self) -> dict[str, ConfigValueType]:
93        return {
94            CONF_API_KEY: self.config.get_value(CONF_API_KEY),
95            CONF_API_SECRET: self.config.get_value(CONF_API_SECRET),
96            CONF_PROVIDER: self.config.get_value(CONF_PROVIDER),
97            CONF_USERNAME: self.config.get_value(CONF_USERNAME),
98            CONF_SESSION_KEY: self.config.get_value(CONF_SESSION_KEY),
99        }
100
101
102class LastFMEventHandler(ScrobblerHelper):
103    """Handles the event handling."""
104
105    network: pylast._Network
106
107    def __init__(
108        self, network: pylast._Network, logger: logging.Logger, config: ProviderConfig
109    ) -> None:
110        """Initialize."""
111        super().__init__(logger, ScrobblerConfig.create_from_config(config))
112        self.network = network
113
114    async def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
115        # the lastfm client is not async friendly,
116        # so we need to run it in a executor thread
117        await asyncio.to_thread(
118            self.network.update_now_playing,
119            report.artist,
120            self.get_name(report),
121            report.album,
122            duration=report.duration,
123            mbid=report.mbid,
124        )
125
126    async def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
127        # the listenbrainz client is not async friendly,
128        # so we need to run it in a executor thread
129        # NOTE: album artist and track number are not available without an extra API call
130        # so they won't be scrobbled
131        await asyncio.to_thread(
132            self.network.scrobble,
133            report.artist or "unknown artist",
134            self.get_name(report),
135            int(time.time()),
136            report.album,
137            duration=report.duration,
138            mbid=report.mbid,
139        )
140
141
142# configuration keys
143CONF_API_KEY = "_api_key"
144CONF_API_SECRET = "_api_secret"
145CONF_SESSION_KEY = "_api_session_key"
146CONF_USERNAME = "_username"
147CONF_PROVIDER = "_provider"
148
149# configuration actions
150CONF_ACTION_AUTH = "_auth"
151
152# available networks
153CONF_OPTION_LASTFM: str = "lastfm"
154CONF_OPTION_LIBREFM: str = "librefm"
155
156
157async def get_config_entries(
158    mass: MusicAssistant,
159    instance_id: str | None = None,  # noqa: ARG001
160    action: str | None = None,
161    values: dict[str, ConfigValueType] | None = None,
162) -> tuple[ConfigEntry, ...]:
163    """
164    Return Config entries to setup this provider.
165
166    instance_id: id of an existing provider instance (None if new instance setup).
167    action: [optional] action key called from config entries UI.
168    values: the (intermediate) raw values for config entries sent with the action.
169    """
170    logger = logging.getLogger(MASS_LOGGER_NAME).getChild("lastfm")
171
172    provider: str = CONF_OPTION_LASTFM
173    if values is not None and values.get(CONF_PROVIDER) is not None:
174        provider = str(values.get(CONF_PROVIDER))
175
176    # collect all config entries to show
177    entries: list[ConfigEntry] = ScrobblerConfig.get_shared_config_entries(values)
178    entries += [
179        ConfigEntry(
180            key=CONF_PROVIDER,
181            type=ConfigEntryType.STRING,
182            label="Provider",
183            required=True,
184            description="The endpoint to use, defaults to Last.fm",
185            options=[
186                ConfigValueOption(title="Last.FM", value=CONF_OPTION_LASTFM),
187                ConfigValueOption(title="LibreFM", value=CONF_OPTION_LIBREFM),
188            ],
189            default_value=provider,
190            value=provider,
191        ),
192        ConfigEntry(
193            key=CONF_API_KEY,
194            type=ConfigEntryType.SECURE_STRING,
195            label="API Key",
196            required=True,
197            value=values.get(CONF_API_KEY) if values else None,
198        ),
199        ConfigEntry(
200            key=CONF_API_SECRET,
201            type=ConfigEntryType.SECURE_STRING,
202            label="Shared secret",
203            required=True,
204            value=values.get(CONF_API_SECRET) if values else None,
205        ),
206        # add user selection entry
207        await create_scrobble_users_config_entry(mass),
208    ]
209
210    # early return so we can assume values are present
211    if values is None:
212        return tuple(entries)
213
214    if action == CONF_ACTION_AUTH and values.get("session_id") is not None:
215        session_id = str(values.get("session_id"))
216
217        async with AuthenticationHelper(mass, session_id) as auth_helper:
218            network = get_network(values)
219            skg = pylast.SessionKeyGenerator(network)
220
221            # pylast says it does web auth, but actually does desktop auth
222            # so we need to do some URL juggling ourselves
223            # to get a proper web auth flow with a callback
224            url = (
225                f"{network.homepage}/api/auth/"
226                f"?api_key={network.api_key}"
227                f"&cb={auth_helper.callback_url}"
228            )
229
230            logger.info("authenticating on %s", url)
231            response = await auth_helper.authenticate(url)
232            if response.get("token") is None:
233                raise LoginFailed(f"no token available in {provider} response")
234
235            session_key, username = skg.get_web_auth_session_key_username(
236                url, str(response.get("token"))
237            )
238            values[CONF_USERNAME] = username
239            values[CONF_SESSION_KEY] = session_key
240
241            entries += [
242                ConfigEntry(
243                    key="save_reminder",
244                    type=ConfigEntryType.ALERT,
245                    required=False,
246                    default_value=None,
247                    label=f"Successfully logged in as {username}, "
248                    "don't forget to hit save to complete the setup",
249                ),
250            ]
251
252    if values is None or not values.get(CONF_SESSION_KEY):
253        # unable to use the encrypted values during an action
254        # so we make sure fresh credentials need to be entered
255        values[CONF_API_KEY] = None
256        values[CONF_API_SECRET] = None
257        entries += [
258            ConfigEntry(
259                key=CONF_ACTION_AUTH,
260                type=ConfigEntryType.ACTION,
261                label=f"Authorize with {provider}",
262                action=CONF_ACTION_AUTH,
263            ),
264        ]
265
266    entries += [
267        ConfigEntry(
268            key=CONF_USERNAME,
269            type=ConfigEntryType.STRING,
270            label="Logged in user",
271            hidden=True,
272            value=values.get(CONF_USERNAME) if values else None,
273        ),
274        ConfigEntry(
275            key=CONF_SESSION_KEY,
276            type=ConfigEntryType.SECURE_STRING,
277            label="Session key",
278            hidden=True,
279            required=False,
280            value=values.get(CONF_SESSION_KEY) if values else None,
281        ),
282    ]
283
284    return tuple(entries)
285
286
287def get_network(config: dict[str, ConfigValueType]) -> pylast._Network:
288    """Create a network instance."""
289    key = config.get(CONF_API_KEY)
290    secret = config.get(CONF_API_SECRET)
291    session_key = config.get(CONF_SESSION_KEY)
292    username = config.get(CONF_USERNAME)
293
294    assert key
295    assert key != SECURE_STRING_SUBSTITUTE
296    assert secret
297    assert secret != SECURE_STRING_SUBSTITUTE
298
299    if not key or not secret:
300        raise SetupFailedError("API Key and Secret need to be set")
301
302    provider: str = str(config.get(CONF_PROVIDER))
303
304    if TYPE_CHECKING:
305        key = cast("str", key)
306        secret = cast("str", secret)
307        session_key = cast("str", session_key)
308        username = cast("str", username)
309
310    match provider.lower():
311        case "lastfm":
312            return pylast.LastFMNetwork(key, secret, username=username, session_key=session_key)
313        case "librefm":
314            return pylast.LibreFMNetwork(key, secret, username=username, session_key=session_key)
315        case _:
316            raise SetupFailedError(f"unknown provider {provider} configured")
317