/
/
/
1"""Allows scrobbling of tracks with the help of PyLast."""
2
3import asyncio
4import logging
5import time
6from collections.abc import Callable
7from typing import TYPE_CHECKING, cast
8
9import pylast
10from music_assistant_models.config_entries import (
11 ConfigEntry,
12 ConfigValueOption,
13 ConfigValueType,
14 ProviderConfig,
15)
16from music_assistant_models.constants import SECURE_STRING_SUBSTITUTE
17from music_assistant_models.enums import ConfigEntryType, EventType, ProviderFeature
18from music_assistant_models.errors import LoginFailed, SetupFailedError
19from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
20from music_assistant_models.provider import ProviderManifest
21
22from music_assistant.constants import MASS_LOGGER_NAME
23from music_assistant.helpers.auth import AuthenticationHelper
24from music_assistant.helpers.scrobbler import (
25 ScrobblerConfig,
26 ScrobblerHelper,
27 create_scrobble_users_config_entry,
28)
29from music_assistant.mass import MusicAssistant
30from music_assistant.models import ProviderInstanceType
31from music_assistant.models.plugin import PluginProvider
32
33SUPPORTED_FEATURES: set[ProviderFeature] = (
34 set()
35) # we don't have any special supported features (yet)
36
37
38async def setup(
39 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
40) -> ProviderInstanceType:
41 """Initialize provider(instance) with given configuration."""
42 provider = LastFMScrobbleProvider(mass, manifest, config, SUPPORTED_FEATURES)
43 pylast.logger.setLevel(provider.logger.level)
44
45 # httpcore is very spammy on debug without providing useful information 99% of the time
46 if provider.logger.level == logging.DEBUG:
47 logging.getLogger("httpcore").setLevel(logging.INFO)
48 else:
49 logging.getLogger("httpcore").setLevel(logging.WARNING)
50
51 return provider
52
53
54class LastFMScrobbleProvider(PluginProvider):
55 """Plugin provider to support scrobbling of tracks."""
56
57 network: pylast._Network
58 _on_unload: list[Callable[[], None]]
59
60 async def handle_async_init(self) -> None:
61 """Handle async setup."""
62 self._on_unload: list[Callable[[], None]] = []
63
64 if not self.config.get_value(CONF_API_KEY) or not self.config.get_value(CONF_API_SECRET):
65 raise SetupFailedError("API Key and Secret need to be set")
66
67 if not self.config.get_value(CONF_SESSION_KEY):
68 self.logger.info("No session key available, don't forget to authenticate!")
69 return
70 # creating the network instance is (potentially) blocking IO
71 # so run it in an executor thread to be safe
72 self.network = await asyncio.to_thread(get_network, self._get_network_config())
73
74 async def loaded_in_mass(self) -> None:
75 """Call after the provider has been loaded."""
76 await super().loaded_in_mass()
77
78 # subscribe to media_item_played event
79 handler = LastFMEventHandler(self.network, self.logger, self.config)
80 self._on_unload.append(
81 self.mass.subscribe(handler._on_mass_media_item_played, EventType.MEDIA_ITEM_PLAYED)
82 )
83
84 async def unload(self, is_removed: bool = False) -> None:
85 """Handle unload/close of the provider.
86
87 Called when provider is deregistered (e.g. MA exiting or config reloading).
88 """
89 for unload_cb in self._on_unload:
90 unload_cb()
91
92 def _get_network_config(self) -> dict[str, ConfigValueType]:
93 return {
94 CONF_API_KEY: self.config.get_value(CONF_API_KEY),
95 CONF_API_SECRET: self.config.get_value(CONF_API_SECRET),
96 CONF_PROVIDER: self.config.get_value(CONF_PROVIDER),
97 CONF_USERNAME: self.config.get_value(CONF_USERNAME),
98 CONF_SESSION_KEY: self.config.get_value(CONF_SESSION_KEY),
99 }
100
101
102class LastFMEventHandler(ScrobblerHelper):
103 """Handles the event handling."""
104
105 network: pylast._Network
106
107 def __init__(
108 self, network: pylast._Network, logger: logging.Logger, config: ProviderConfig
109 ) -> None:
110 """Initialize."""
111 super().__init__(logger, ScrobblerConfig.create_from_config(config))
112 self.network = network
113
114 async def _update_now_playing(self, report: MediaItemPlaybackProgressReport) -> None:
115 # the lastfm client is not async friendly,
116 # so we need to run it in a executor thread
117 await asyncio.to_thread(
118 self.network.update_now_playing,
119 report.artist,
120 self.get_name(report),
121 report.album,
122 duration=report.duration,
123 mbid=report.mbid,
124 )
125
126 async def _scrobble(self, report: MediaItemPlaybackProgressReport) -> None:
127 # the listenbrainz client is not async friendly,
128 # so we need to run it in a executor thread
129 # NOTE: album artist and track number are not available without an extra API call
130 # so they won't be scrobbled
131 await asyncio.to_thread(
132 self.network.scrobble,
133 report.artist or "unknown artist",
134 self.get_name(report),
135 int(time.time()),
136 report.album,
137 duration=report.duration,
138 mbid=report.mbid,
139 )
140
141
142# configuration keys
143CONF_API_KEY = "_api_key"
144CONF_API_SECRET = "_api_secret"
145CONF_SESSION_KEY = "_api_session_key"
146CONF_USERNAME = "_username"
147CONF_PROVIDER = "_provider"
148
149# configuration actions
150CONF_ACTION_AUTH = "_auth"
151
152# available networks
153CONF_OPTION_LASTFM: str = "lastfm"
154CONF_OPTION_LIBREFM: str = "librefm"
155
156
157async def get_config_entries(
158 mass: MusicAssistant,
159 instance_id: str | None = None, # noqa: ARG001
160 action: str | None = None,
161 values: dict[str, ConfigValueType] | None = None,
162) -> tuple[ConfigEntry, ...]:
163 """
164 Return Config entries to setup this provider.
165
166 instance_id: id of an existing provider instance (None if new instance setup).
167 action: [optional] action key called from config entries UI.
168 values: the (intermediate) raw values for config entries sent with the action.
169 """
170 logger = logging.getLogger(MASS_LOGGER_NAME).getChild("lastfm")
171
172 provider: str = CONF_OPTION_LASTFM
173 if values is not None and values.get(CONF_PROVIDER) is not None:
174 provider = str(values.get(CONF_PROVIDER))
175
176 # collect all config entries to show
177 entries: list[ConfigEntry] = ScrobblerConfig.get_shared_config_entries(values)
178 entries += [
179 ConfigEntry(
180 key=CONF_PROVIDER,
181 type=ConfigEntryType.STRING,
182 label="Provider",
183 required=True,
184 description="The endpoint to use, defaults to Last.fm",
185 options=[
186 ConfigValueOption(title="Last.FM", value=CONF_OPTION_LASTFM),
187 ConfigValueOption(title="LibreFM", value=CONF_OPTION_LIBREFM),
188 ],
189 default_value=provider,
190 value=provider,
191 ),
192 ConfigEntry(
193 key=CONF_API_KEY,
194 type=ConfigEntryType.SECURE_STRING,
195 label="API Key",
196 required=True,
197 value=values.get(CONF_API_KEY) if values else None,
198 ),
199 ConfigEntry(
200 key=CONF_API_SECRET,
201 type=ConfigEntryType.SECURE_STRING,
202 label="Shared secret",
203 required=True,
204 value=values.get(CONF_API_SECRET) if values else None,
205 ),
206 # add user selection entry
207 await create_scrobble_users_config_entry(mass),
208 ]
209
210 # early return so we can assume values are present
211 if values is None:
212 return tuple(entries)
213
214 if action == CONF_ACTION_AUTH and values.get("session_id") is not None:
215 session_id = str(values.get("session_id"))
216
217 async with AuthenticationHelper(mass, session_id) as auth_helper:
218 network = get_network(values)
219 skg = pylast.SessionKeyGenerator(network)
220
221 # pylast says it does web auth, but actually does desktop auth
222 # so we need to do some URL juggling ourselves
223 # to get a proper web auth flow with a callback
224 url = (
225 f"{network.homepage}/api/auth/"
226 f"?api_key={network.api_key}"
227 f"&cb={auth_helper.callback_url}"
228 )
229
230 logger.info("authenticating on %s", url)
231 response = await auth_helper.authenticate(url)
232 if response.get("token") is None:
233 raise LoginFailed(f"no token available in {provider} response")
234
235 session_key, username = skg.get_web_auth_session_key_username(
236 url, str(response.get("token"))
237 )
238 values[CONF_USERNAME] = username
239 values[CONF_SESSION_KEY] = session_key
240
241 entries += [
242 ConfigEntry(
243 key="save_reminder",
244 type=ConfigEntryType.ALERT,
245 required=False,
246 default_value=None,
247 label=f"Successfully logged in as {username}, "
248 "don't forget to hit save to complete the setup",
249 ),
250 ]
251
252 if values is None or not values.get(CONF_SESSION_KEY):
253 # unable to use the encrypted values during an action
254 # so we make sure fresh credentials need to be entered
255 values[CONF_API_KEY] = None
256 values[CONF_API_SECRET] = None
257 entries += [
258 ConfigEntry(
259 key=CONF_ACTION_AUTH,
260 type=ConfigEntryType.ACTION,
261 label=f"Authorize with {provider}",
262 action=CONF_ACTION_AUTH,
263 ),
264 ]
265
266 entries += [
267 ConfigEntry(
268 key=CONF_USERNAME,
269 type=ConfigEntryType.STRING,
270 label="Logged in user",
271 hidden=True,
272 value=values.get(CONF_USERNAME) if values else None,
273 ),
274 ConfigEntry(
275 key=CONF_SESSION_KEY,
276 type=ConfigEntryType.SECURE_STRING,
277 label="Session key",
278 hidden=True,
279 required=False,
280 value=values.get(CONF_SESSION_KEY) if values else None,
281 ),
282 ]
283
284 return tuple(entries)
285
286
287def get_network(config: dict[str, ConfigValueType]) -> pylast._Network:
288 """Create a network instance."""
289 key = config.get(CONF_API_KEY)
290 secret = config.get(CONF_API_SECRET)
291 session_key = config.get(CONF_SESSION_KEY)
292 username = config.get(CONF_USERNAME)
293
294 assert key
295 assert key != SECURE_STRING_SUBSTITUTE
296 assert secret
297 assert secret != SECURE_STRING_SUBSTITUTE
298
299 if not key or not secret:
300 raise SetupFailedError("API Key and Secret need to be set")
301
302 provider: str = str(config.get(CONF_PROVIDER))
303
304 if TYPE_CHECKING:
305 key = cast("str", key)
306 secret = cast("str", secret)
307 session_key = cast("str", session_key)
308 username = cast("str", username)
309
310 match provider.lower():
311 case "lastfm":
312 return pylast.LastFMNetwork(key, secret, username=username, session_key=session_key)
313 case "librefm":
314 return pylast.LibreFMNetwork(key, secret, username=username, session_key=session_key)
315 case _:
316 raise SetupFailedError(f"unknown provider {provider} configured")
317