/
/
/
1"""Plex musicprovider support for MusicAssistant."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from asyncio import Task, TaskGroup
8from collections.abc import Awaitable
9from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
10
11import plexapi.exceptions
12import requests
13from music_assistant_models.config_entries import (
14 ConfigEntry,
15 ConfigValueOption,
16 ConfigValueType,
17 ProviderConfig,
18)
19from music_assistant_models.enums import (
20 ConfigEntryType,
21 ContentType,
22 ImageType,
23 MediaType,
24 ProviderFeature,
25 StreamType,
26)
27from music_assistant_models.errors import (
28 InvalidDataError,
29 LoginFailed,
30 MediaNotFoundError,
31 SetupFailedError,
32)
33from music_assistant_models.media_items import (
34 Album,
35 Artist,
36 AudioFormat,
37 ItemMapping,
38 MediaItem,
39 MediaItemImage,
40 Playlist,
41 ProviderMapping,
42 RecommendationFolder,
43 SearchResults,
44 Track,
45 UniqueList,
46)
47from music_assistant_models.streamdetails import StreamDetails
48from plexapi.audio import Album as PlexAlbum
49from plexapi.audio import Artist as PlexArtist
50from plexapi.audio import Playlist as PlexPlaylist
51from plexapi.audio import Track as PlexTrack
52from plexapi.base import PlexObject
53from plexapi.myplex import MyPlexAccount, MyPlexPinLogin
54from plexapi.server import PlexServer
55
56from music_assistant.constants import UNKNOWN_ARTIST
57from music_assistant.controllers.cache import use_cache
58from music_assistant.helpers.auth import AuthenticationHelper
59from music_assistant.helpers.tags import async_parse_tags
60from music_assistant.helpers.util import parse_title_and_version
61from music_assistant.models.music_provider import MusicProvider
62from music_assistant.providers.plex.helpers import discover_local_servers, get_libraries
63
64if TYPE_CHECKING:
65 from collections.abc import AsyncGenerator, Callable, Coroutine
66
67 from music_assistant_models.provider import ProviderManifest
68 from plexapi.library import LibraryMediaTag as PlexCollection
69 from plexapi.library import MusicSection as PlexMusicSection
70 from plexapi.media import AudioStream as PlexAudioStream
71 from plexapi.media import Media as PlexMedia
72 from plexapi.media import MediaPart as PlexMediaPart
73
74 from music_assistant.mass import MusicAssistant
75 from music_assistant.models import ProviderInstanceType
76
77CONF_ACTION_AUTH_MYPLEX = "auth_myplex"
78CONF_ACTION_AUTH_LOCAL = "auth_local"
79CONF_ACTION_CLEAR_AUTH = "auth"
80CONF_ACTION_LIBRARY = "library"
81CONF_ACTION_GDM = "gdm"
82
83CONF_AUTH_TOKEN = "token"
84CONF_LIBRARY_ID = "library_id"
85CONF_LOCAL_SERVER_IP = "local_server_ip"
86CONF_LOCAL_SERVER_PORT = "local_server_port"
87CONF_LOCAL_SERVER_SSL = "local_server_ssl"
88CONF_LOCAL_SERVER_VERIFY_CERT = "local_server_verify_cert"
89CONF_IMPORT_COLLECTIONS = "import_collections"
90CONF_COLLECTION_PREFIX = "collection_prefix"
91CONF_PLEX_LIKE_RATING = "plex_like_rating"
92CONF_PLEX_FAVORITE_THRESHOLD = "plex_favorite_threshold"
93CONF_PLEX_UNLIKE_RATING = "plex_unlike_rating"
94CONF_HUB_ITEMS_LIMIT = "hub_items_limit"
95
96FAKE_ARTIST_PREFIX = "_fake://"
97
98AUTH_TOKEN_UNAUTH = "local_auth"
99
100SUPPORTED_FEATURES = {
101 ProviderFeature.LIBRARY_ARTISTS,
102 ProviderFeature.LIBRARY_ALBUMS,
103 ProviderFeature.LIBRARY_TRACKS,
104 ProviderFeature.LIBRARY_PLAYLISTS,
105 ProviderFeature.FAVORITE_ALBUMS_EDIT,
106 ProviderFeature.FAVORITE_TRACKS_EDIT,
107 ProviderFeature.BROWSE,
108 ProviderFeature.SEARCH,
109 ProviderFeature.ARTIST_ALBUMS,
110 ProviderFeature.ARTIST_TOPTRACKS,
111 ProviderFeature.SIMILAR_TRACKS,
112 ProviderFeature.RECOMMENDATIONS,
113}
114
115
116async def setup(
117 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
118) -> ProviderInstanceType:
119 """Initialize provider(instance) with given configuration."""
120 if not config.get_value(CONF_AUTH_TOKEN):
121 msg = "Invalid login credentials"
122 raise LoginFailed(msg)
123
124 return PlexProvider(mass, manifest, config, SUPPORTED_FEATURES)
125
126
127async def get_config_entries( # noqa: PLR0915
128 mass: MusicAssistant,
129 instance_id: str | None = None,
130 action: str | None = None,
131 values: dict[str, ConfigValueType] | None = None,
132) -> tuple[ConfigEntry, ...]:
133 """
134 Return Config entries to setup this provider.
135
136 instance_id: id of an existing provider instance (None if new instance setup).
137 action: [optional] action key called from config entries UI.
138 values: the (intermediate) raw values for config entries sent with the action.
139 """
140 # handle action GDM discovery
141 if action == CONF_ACTION_GDM:
142 server_details = await discover_local_servers()
143 if server_details and server_details[0] and server_details[1]:
144 assert values
145 values[CONF_LOCAL_SERVER_IP] = server_details[0]
146 values[CONF_LOCAL_SERVER_PORT] = server_details[1]
147 values[CONF_LOCAL_SERVER_SSL] = False
148 values[CONF_LOCAL_SERVER_VERIFY_CERT] = False
149 else:
150 assert values
151 values[CONF_LOCAL_SERVER_IP] = "Discovery failed, please add IP manually"
152 values[CONF_LOCAL_SERVER_PORT] = 32400
153 values[CONF_LOCAL_SERVER_SSL] = False
154 values[CONF_LOCAL_SERVER_VERIFY_CERT] = True
155
156 # handle action clear authentication
157 if action == CONF_ACTION_CLEAR_AUTH:
158 assert values
159 values[CONF_AUTH_TOKEN] = None
160 values[CONF_LOCAL_SERVER_IP] = None
161 values[CONF_LOCAL_SERVER_PORT] = 32400
162 values[CONF_LOCAL_SERVER_SSL] = False
163 values[CONF_LOCAL_SERVER_VERIFY_CERT] = True
164
165 # handle action MyPlex auth
166 if action == CONF_ACTION_AUTH_MYPLEX:
167 assert values
168 values[CONF_AUTH_TOKEN] = None
169 async with AuthenticationHelper(mass, str(values["session_id"])) as auth_helper:
170 plex_auth = MyPlexPinLogin(headers={"X-Plex-Product": "Music Assistant"}, oauth=True)
171 # Generate the PIN/code by calling the Plex API
172 await asyncio.to_thread(plex_auth._getCode)
173 auth_url = plex_auth.oauthUrl(auth_helper.callback_url)
174 await auth_helper.authenticate(auth_url)
175 # After OAuth callback completes, Plex's backend needs time to propagate the token
176 # Use exponential backoff to check if token is ready
177 for attempt in range(10): # Max 10 attempts (~10 seconds total)
178 if await asyncio.to_thread(plex_auth.checkLogin):
179 break
180 # Exponential backoff: 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, etc
181 await asyncio.sleep(0.1 * (2**attempt))
182 else:
183 # token still not available
184 msg = "Authentication to MyPlex failed: token not received"
185 raise LoginFailed(msg)
186 if not plex_auth.token:
187 msg = "Authentication to MyPlex failed"
188 raise LoginFailed(msg)
189 # set the retrieved token on the values object to pass along
190 values[CONF_AUTH_TOKEN] = plex_auth.token
191
192 # handle action Local auth (no MyPlex)
193 if action == CONF_ACTION_AUTH_LOCAL:
194 assert values
195 values[CONF_AUTH_TOKEN] = AUTH_TOKEN_UNAUTH
196
197 # collect all config entries to show
198 entries: list[ConfigEntry] = []
199
200 # show GDM discovery (if we do not yet have any server details)
201 if values is None or not values.get(CONF_LOCAL_SERVER_IP):
202 entries.append(
203 ConfigEntry(
204 key=CONF_ACTION_GDM,
205 type=ConfigEntryType.ACTION,
206 label="Use Plex GDM to discover local servers",
207 description='Enable "GDM" to discover local Plex servers automatically.',
208 action=CONF_ACTION_GDM,
209 action_label="Use Plex GDM to discover local servers",
210 )
211 )
212
213 # server details config entries (IP, port etc.)
214 entries += [
215 ConfigEntry(
216 key=CONF_LOCAL_SERVER_IP,
217 type=ConfigEntryType.STRING,
218 label="Local server IP",
219 description="The local server IP (e.g. 192.168.1.77)",
220 required=True,
221 value=cast("str", values.get(CONF_LOCAL_SERVER_IP)) if values else None,
222 ),
223 ConfigEntry(
224 key=CONF_LOCAL_SERVER_PORT,
225 type=ConfigEntryType.INTEGER,
226 label="Local server port",
227 description="The local server port (e.g. 32400)",
228 required=True,
229 default_value=32400,
230 value=cast("int", values.get(CONF_LOCAL_SERVER_PORT)) if values else None,
231 ),
232 ConfigEntry(
233 key=CONF_LOCAL_SERVER_SSL,
234 type=ConfigEntryType.BOOLEAN,
235 label="SSL (HTTPS)",
236 description="Connect to the local server using SSL (HTTPS)",
237 required=True,
238 default_value=False,
239 ),
240 ConfigEntry(
241 key=CONF_LOCAL_SERVER_VERIFY_CERT,
242 type=ConfigEntryType.BOOLEAN,
243 label="Verify certificate",
244 description="Verify local server SSL certificate",
245 required=True,
246 default_value=True,
247 depends_on=CONF_LOCAL_SERVER_SSL,
248 advanced=True,
249 ),
250 ConfigEntry(
251 key=CONF_AUTH_TOKEN,
252 type=ConfigEntryType.SECURE_STRING,
253 label=CONF_AUTH_TOKEN,
254 action=CONF_AUTH_TOKEN,
255 value=cast("str | None", values.get(CONF_AUTH_TOKEN)) if values else None,
256 hidden=True,
257 ),
258 ]
259
260 # config flow auth action/step to pick the library to use
261 # because this call is very slow, we only show/calculate the dropdown if we do
262 # not yet have this info or we/user invalidated it.
263 if values and values.get(CONF_AUTH_TOKEN):
264 conf_libraries = ConfigEntry(
265 key=CONF_LIBRARY_ID,
266 type=ConfigEntryType.STRING,
267 label="Library",
268 required=True,
269 description="The library to connect to (e.g. Music)",
270 depends_on=CONF_AUTH_TOKEN,
271 action=CONF_ACTION_LIBRARY,
272 action_label="Select Plex Music Library",
273 )
274 if action in (
275 CONF_ACTION_LIBRARY,
276 CONF_ACTION_AUTH_MYPLEX,
277 CONF_ACTION_AUTH_LOCAL,
278 ):
279 token = mass.config.decrypt_string(str(values.get(CONF_AUTH_TOKEN)))
280 server_http_ip = str(values.get(CONF_LOCAL_SERVER_IP))
281 server_http_port = str(values.get(CONF_LOCAL_SERVER_PORT))
282 server_http_ssl = bool(values.get(CONF_LOCAL_SERVER_SSL))
283 server_http_verify_cert = bool(values.get(CONF_LOCAL_SERVER_VERIFY_CERT))
284 if not (
285 libraries := await get_libraries(
286 mass,
287 token,
288 server_http_ssl,
289 server_http_ip,
290 server_http_port,
291 server_http_verify_cert,
292 instance_id,
293 )
294 ):
295 msg = "Unable to retrieve Servers and/or Music Libraries"
296 raise LoginFailed(msg)
297 conf_libraries.options = [
298 # use the same value for both the value and the title
299 # until we find out what plex uses as stable identifiers
300 ConfigValueOption(
301 title=x,
302 value=x,
303 )
304 for x in libraries
305 ]
306 # select first library as (default) value
307 conf_libraries.default_value = libraries[0]
308 conf_libraries.value = libraries[0]
309 entries.append(conf_libraries)
310
311 # show authentication options
312 if values is None or not values.get(CONF_AUTH_TOKEN):
313 entries.append(
314 ConfigEntry(
315 key=CONF_ACTION_AUTH_MYPLEX,
316 type=ConfigEntryType.ACTION,
317 label="Authenticate with MyPlex",
318 description="Authenticate with MyPlex to access your library.",
319 action=CONF_ACTION_AUTH_MYPLEX,
320 action_label="Authenticate with MyPlex",
321 )
322 )
323 entries.append(
324 ConfigEntry(
325 key=CONF_ACTION_AUTH_LOCAL,
326 type=ConfigEntryType.ACTION,
327 label="Authenticate locally",
328 description="Authenticate locally to access your library.",
329 action=CONF_ACTION_AUTH_LOCAL,
330 action_label="Authenticate locally",
331 )
332 )
333 else:
334 entries.append(
335 ConfigEntry(
336 key=CONF_ACTION_CLEAR_AUTH,
337 type=ConfigEntryType.ACTION,
338 label="Clear authentication",
339 description="Clear the current authentication details.",
340 action=CONF_ACTION_CLEAR_AUTH,
341 action_label="Clear authentication",
342 required=False,
343 )
344 )
345
346 # Collection import options (advanced settings)
347 entries.append(
348 ConfigEntry(
349 key=CONF_IMPORT_COLLECTIONS,
350 type=ConfigEntryType.BOOLEAN,
351 label="Import Collections",
352 description="Import collections (tracks, albums, or artists) as playlists",
353 default_value=False,
354 advanced=True,
355 )
356 )
357 entries.append(
358 ConfigEntry(
359 key=CONF_COLLECTION_PREFIX,
360 type=ConfigEntryType.STRING,
361 label="Collection Prefix",
362 description="Prefix to add to collection names when imported as playlists",
363 default_value="Collection: ",
364 depends_on=CONF_IMPORT_COLLECTIONS,
365 advanced=True,
366 )
367 )
368
369 # rating/favorite sync configuration
370 entries.append(
371 ConfigEntry(
372 key=CONF_PLEX_LIKE_RATING,
373 type=ConfigEntryType.FLOAT,
374 label="Plex rating when liking in Music Assistant",
375 description="When you like a track or album in Music Assistant, "
376 "set this rating value in Plex (0.0 = unrated, 10.0 = 5 stars).",
377 default_value=10.0,
378 range=(0, 10),
379 category="sync_options",
380 )
381 )
382 entries.append(
383 ConfigEntry(
384 key=CONF_PLEX_FAVORITE_THRESHOLD,
385 type=ConfigEntryType.FLOAT,
386 label="Minimum Plex rating to import as favorite",
387 description="Tracks and albums with a Plex rating at or above this threshold "
388 "will be imported as favorites in Music Assistant (0.0 = unrated, 10.0 = 5 stars).",
389 default_value=10.0,
390 range=(0, 10),
391 category="sync_options",
392 )
393 )
394 entries.append(
395 ConfigEntry(
396 key=CONF_PLEX_UNLIKE_RATING,
397 type=ConfigEntryType.FLOAT,
398 label="Plex rating when unliking in Music Assistant",
399 description="When you unlike a track or album in Music Assistant, "
400 "set this rating value in Plex (0.0 = unrated/clear rating, 10.0 = 5 stars).",
401 default_value=0.0,
402 range=(0, 10),
403 category="sync_options",
404 )
405 )
406
407 # Recommendation settings (advanced)
408 entries.append(
409 ConfigEntry(
410 key=CONF_HUB_ITEMS_LIMIT,
411 type=ConfigEntryType.INTEGER,
412 label="Items per hub",
413 description="Maximum number of items to load from each hub (default: 10)",
414 default_value=10,
415 advanced=True,
416 range=(1, 100),
417 )
418 )
419
420 # return all config entries
421 return tuple(entries)
422
423
424Param = ParamSpec("Param")
425RetType = TypeVar("RetType")
426PlexObjectT = TypeVar("PlexObjectT", bound=PlexObject)
427MediaItemT = TypeVar("MediaItemT", bound=MediaItem)
428
429
430class PlexProvider(MusicProvider):
431 """Provider for a plex music library."""
432
433 _plex_server: PlexServer = None
434 _plex_library: PlexMusicSection = None
435 _myplex_account: MyPlexAccount = None
436 _baseurl: str
437
438 async def handle_async_init(self) -> None:
439 """Set up the music provider by connecting to the server."""
440 # silence loggers
441 logging.getLogger("plexapi").setLevel(self.logger.level + 10)
442 # silence urllib3 InsecureRequestWarning when certificate verification is disabled
443 # this is expected when connecting to Plex servers using their wildcard certificates
444 # that don't validate against LAN IP addresses
445 logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
446 _, library_name = str(self.config.get_value(CONF_LIBRARY_ID)).split(" / ", 1)
447
448 def connect() -> PlexServer:
449 try:
450 session = requests.Session()
451 session.verify = (
452 self.config.get_value(CONF_LOCAL_SERVER_VERIFY_CERT)
453 if self.config.get_value(CONF_LOCAL_SERVER_SSL)
454 else False
455 )
456 # Add Music Assistant client identification headers
457 session.headers.update(
458 {
459 "X-Plex-Client-Identifier": self.instance_id,
460 "X-Plex-Product": "Music Assistant",
461 "X-Plex-Platform": "Music Assistant",
462 "X-Plex-Version": self.mass.version,
463 }
464 )
465 local_server_protocol = (
466 "https" if self.config.get_value(CONF_LOCAL_SERVER_SSL) else "http"
467 )
468 token = self.config.get_value(CONF_AUTH_TOKEN)
469 plex_url = (
470 f"{local_server_protocol}://{self.config.get_value(CONF_LOCAL_SERVER_IP)}"
471 f":{self.config.get_value(CONF_LOCAL_SERVER_PORT)}"
472 )
473 if token == AUTH_TOKEN_UNAUTH:
474 # Doing local connection, not via plex.tv.
475 plex_server = PlexServer(plex_url, session=session)
476 else:
477 plex_server = PlexServer(
478 plex_url,
479 token,
480 session=session,
481 )
482 # I don't think PlexAPI intends for this to be accessible, but we need it.
483 self._baseurl = plex_server._baseurl
484
485 except plexapi.exceptions.BadRequest as err:
486 if "Invalid token" in str(err):
487 # token invalid, invalidate the config
488 self.mass.create_task(
489 self.mass.config.remove_provider_config_value(
490 self.instance_id, CONF_AUTH_TOKEN
491 ),
492 )
493 msg = "Authentication failed"
494 raise LoginFailed(msg)
495 raise LoginFailed from err
496 return plex_server
497
498 self._myplex_account = await self.get_myplex_account_and_refresh_token(
499 str(self.config.get_value(CONF_AUTH_TOKEN))
500 )
501 try:
502 self._plex_server = await self._run_async(connect)
503 self._plex_library = await self._run_async(
504 self._plex_server.library.section, library_name
505 )
506 except requests.exceptions.ConnectionError as err:
507 raise SetupFailedError from err
508
509 @property
510 def is_streaming_provider(self) -> bool:
511 """
512 Return True if the provider is a streaming provider.
513
514 This literally means that the catalog is not the same as the library contents.
515 For local based providers (files, plex), the catalog is the same as the library content.
516 It also means that data is if this provider is NOT a streaming provider,
517 data cross instances is unique, the catalog and library differs per instance.
518
519 Setting this to True will only query one instance of the provider for search and lookups.
520 Setting this to False will query all instances of this provider for search and lookups.
521 """
522 return False
523
524 async def resolve_image(self, path: str) -> str | bytes:
525 """Return the full image URL including the auth token."""
526 return str(self._plex_server.url(path, True))
527
528 async def _run_async(
529 self, call: Callable[Param, RetType], *args: Param.args, **kwargs: Param.kwargs
530 ) -> RetType:
531 await self.get_myplex_account_and_refresh_token(str(self.config.get_value(CONF_AUTH_TOKEN)))
532 return await asyncio.to_thread(call, *args, **kwargs)
533
534 async def _get_data(self, key: str, cls: type[PlexObjectT]) -> PlexObjectT:
535 results = await self._run_async(self._plex_library.fetchItem, key, cls)
536 return cast("PlexObjectT", results)
537
538 def _get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
539 """Get item mapping for a given media type, key, and name."""
540 if not name:
541 self.logger.info(
542 "Received None or empty name for media item. Media type: %s, Key: %s",
543 media_type,
544 key,
545 )
546 name = "[Unknown]"
547
548 mapped_name, mapped_version = parse_title_and_version(name)
549
550 if not mapped_name:
551 self.logger.info(
552 "Failed to map name for media item. Media type: %s, Key: %s, Original name: %s",
553 media_type,
554 key,
555 name,
556 )
557 mapped_name = "[Unknown]"
558 if not mapped_version and media_type not in (MediaType.ALBUM, MediaType.TRACK):
559 mapped_version = ""
560
561 return ItemMapping(
562 media_type=media_type,
563 item_id=key,
564 provider=self.instance_id,
565 name=mapped_name,
566 version=mapped_version,
567 )
568
569 async def _get_or_create_artist_by_name(self, artist_name: str) -> Artist | ItemMapping:
570 if library_items := await self.mass.music.artists.get_library_items_by_query(
571 search=artist_name, provider_filter=[self.instance_id]
572 ):
573 return ItemMapping.from_item(library_items[0])
574
575 artist_id = FAKE_ARTIST_PREFIX + artist_name
576 return Artist(
577 item_id=artist_id,
578 name=artist_name or UNKNOWN_ARTIST,
579 provider=self.instance_id,
580 provider_mappings={
581 ProviderMapping(
582 item_id=str(artist_id),
583 provider_domain=self.domain,
584 provider_instance=self.instance_id,
585 )
586 },
587 )
588
589 async def _parse(self, plex_media: PlexObject) -> MediaItem | None:
590 if plex_media.type == "artist":
591 return await self._parse_artist(plex_media)
592 if plex_media.type == "album":
593 return await self._parse_album(plex_media)
594 if plex_media.type == "track":
595 return await self._parse_track(plex_media)
596 if plex_media.type == "playlist":
597 return await self._parse_playlist(plex_media)
598 return None
599
600 async def _search_track(self, search_query: str | None, limit: int) -> list[PlexTrack]:
601 return cast(
602 "list[PlexTrack]",
603 await self._run_async(self._plex_library.searchTracks, title=search_query, limit=limit),
604 )
605
606 async def _search_album(self, search_query: str, limit: int) -> list[PlexAlbum]:
607 return cast(
608 "list[PlexAlbum]",
609 await self._run_async(self._plex_library.searchAlbums, title=search_query, limit=limit),
610 )
611
612 async def _search_artist(self, search_query: str, limit: int) -> list[PlexArtist]:
613 return cast(
614 "list[PlexArtist]",
615 await self._run_async(
616 self._plex_library.searchArtists, title=search_query, limit=limit
617 ),
618 )
619
620 async def _search_playlist(self, search_query: str, limit: int) -> list[PlexPlaylist]:
621 return cast(
622 "list[PlexPlaylist]",
623 await self._run_async(self._plex_library.playlists, title=search_query, limit=limit),
624 )
625
626 async def _search_track_advanced(self, limit: int, **kwargs: Any) -> list[PlexTrack]:
627 return cast(
628 "list[PlexPlaylist]",
629 await self._run_async(self._plex_library.searchTracks, filters=kwargs, limit=limit),
630 )
631
632 async def _search_album_advanced(self, limit: int, **kwargs: Any) -> list[PlexAlbum]:
633 return cast(
634 "list[PlexPlaylist]",
635 await self._run_async(self._plex_library.searchAlbums, filters=kwargs, limit=limit),
636 )
637
638 async def _search_artist_advanced(self, limit: int, **kwargs: Any) -> list[PlexArtist]:
639 return cast(
640 "list[PlexPlaylist]",
641 await self._run_async(self._plex_library.searchArtists, filters=kwargs, limit=limit),
642 )
643
644 async def _search_playlist_advanced(self, limit: int, **kwargs: Any) -> list[PlexPlaylist]:
645 return cast(
646 "list[PlexPlaylist]",
647 await self._run_async(self._plex_library.playlists, filters=kwargs, limit=limit),
648 )
649
650 async def _search_and_parse(
651 self,
652 search_coro: Awaitable[list[PlexObjectT]],
653 parse_coro: Callable[[PlexObjectT], Coroutine[Any, Any, MediaItemT]],
654 ) -> list[MediaItemT]:
655 task_results: list[Task[MediaItemT]] = []
656 async with TaskGroup() as tg:
657 for item in await search_coro:
658 task_results.append(tg.create_task(parse_coro(item)))
659
660 results: list[MediaItemT] = []
661 for task in task_results:
662 results.append(task.result())
663
664 return results
665
666 async def _parse_album(self, plex_album: PlexAlbum) -> Album:
667 """Parse a Plex Album response to an Album model object."""
668 album_id = plex_album.key
669 album = Album(
670 item_id=album_id,
671 provider=self.instance_id,
672 name=plex_album.title or "[Unknown]",
673 provider_mappings={
674 ProviderMapping(
675 item_id=str(album_id),
676 provider_domain=self.domain,
677 provider_instance=self.instance_id,
678 url=plex_album.getWebURL(self._baseurl),
679 )
680 },
681 )
682 # Check if album rating meets the configured threshold for favorites
683 favorite_threshold = cast("float", self.config.get_value(CONF_PLEX_FAVORITE_THRESHOLD))
684 # Try to get the user rating - Plex stores ratings as 0.0-10.0
685 if hasattr(plex_album, "userRating") and plex_album.userRating is not None:
686 album.favorite = float(plex_album.userRating) >= favorite_threshold
687
688 if plex_album.year:
689 album.year = plex_album.year
690 if thumb := plex_album.firstAttr("thumb", "parentThumb", "grandparentThumb"):
691 album.metadata.images = UniqueList(
692 [
693 MediaItemImage(
694 type=ImageType.THUMB,
695 path=thumb,
696 provider=self.instance_id,
697 remotely_accessible=False,
698 )
699 ]
700 )
701 if plex_album.summary:
702 album.metadata.description = plex_album.summary
703
704 album.artists.append(
705 self._get_item_mapping(
706 MediaType.ARTIST,
707 plex_album.parentKey,
708 plex_album.parentTitle or UNKNOWN_ARTIST,
709 )
710 )
711 return album
712
713 async def _parse_artist(self, plex_artist: PlexArtist) -> Artist:
714 """Parse a Plex Artist response to Artist model object."""
715 artist_id = plex_artist.key
716 if not artist_id:
717 msg = "Artist does not have a valid ID"
718 raise InvalidDataError(msg)
719 artist = Artist(
720 item_id=artist_id,
721 name=plex_artist.title or UNKNOWN_ARTIST,
722 provider=self.instance_id,
723 provider_mappings={
724 ProviderMapping(
725 item_id=str(artist_id),
726 provider_domain=self.domain,
727 provider_instance=self.instance_id,
728 url=plex_artist.getWebURL(self._baseurl),
729 )
730 },
731 )
732 if plex_artist.summary:
733 artist.metadata.description = plex_artist.summary
734 if thumb := plex_artist.firstAttr("thumb", "parentThumb", "grandparentThumb"):
735 artist.metadata.images = UniqueList(
736 [
737 MediaItemImage(
738 type=ImageType.THUMB,
739 path=thumb,
740 provider=self.instance_id,
741 remotely_accessible=False,
742 )
743 ]
744 )
745 return artist
746
747 async def _parse_playlist(self, plex_playlist: PlexPlaylist) -> Playlist:
748 """Parse a Plex Playlist response to a Playlist object."""
749 playlist = Playlist(
750 item_id=plex_playlist.key,
751 provider=self.instance_id,
752 name=plex_playlist.title or "[Unknown]",
753 provider_mappings={
754 ProviderMapping(
755 item_id=plex_playlist.key,
756 provider_domain=self.domain,
757 provider_instance=self.instance_id,
758 url=plex_playlist.getWebURL(self._baseurl),
759 )
760 },
761 )
762 if plex_playlist.summary:
763 playlist.metadata.description = plex_playlist.summary
764 if thumb := plex_playlist.firstAttr("thumb", "parentThumb", "grandparentThumb"):
765 playlist.metadata.images = UniqueList(
766 [
767 MediaItemImage(
768 type=ImageType.THUMB,
769 path=thumb,
770 provider=self.instance_id,
771 remotely_accessible=False,
772 )
773 ]
774 )
775 playlist.is_editable = not plex_playlist.smart
776 return playlist
777
778 async def _parse_collection(self, plex_collection: PlexCollection) -> Playlist:
779 """Parse a Plex Collection response to a Playlist object."""
780 # Get the configured collection prefix
781 collection_prefix = str(self.config.get_value(CONF_COLLECTION_PREFIX) or "")
782
783 # Collections are imported as playlists with the configured prefix
784 playlist = Playlist(
785 item_id=f"collection:{plex_collection.key}",
786 provider=self.instance_id,
787 name=f"{collection_prefix}{plex_collection.title}",
788 provider_mappings={
789 ProviderMapping(
790 item_id=f"collection:{plex_collection.key}",
791 provider_domain=self.domain,
792 provider_instance=self.instance_id,
793 )
794 },
795 )
796 # Add collection poster/thumbnail if available
797 if thumb := plex_collection.firstAttr("thumb", "composite"):
798 playlist.metadata.images = UniqueList(
799 [
800 MediaItemImage(
801 type=ImageType.THUMB,
802 path=thumb,
803 provider=self.instance_id,
804 remotely_accessible=False,
805 )
806 ]
807 )
808 # Collections are not editable in Music Assistant
809 playlist.is_editable = False
810 return playlist
811
812 async def _parse_track(self, plex_track: PlexTrack) -> Track:
813 """Parse a Plex Track response to a Track model object."""
814 if plex_track.media:
815 available = True
816 content = plex_track.media[0].container
817 else:
818 # For Plex (local library provider), assume tracks are available by default
819 # even if media attribute is not populated in the initial response.
820 # This prevents tracks from being skipped during library sync.
821 available = True
822 content = None
823 track = Track(
824 item_id=plex_track.key,
825 provider=self.instance_id,
826 name=plex_track.title or "[Unknown]",
827 provider_mappings={
828 ProviderMapping(
829 item_id=plex_track.key,
830 provider_domain=self.domain,
831 provider_instance=self.instance_id,
832 available=available,
833 audio_format=AudioFormat(
834 content_type=(
835 ContentType.try_parse(content) if content else ContentType.UNKNOWN
836 ),
837 ),
838 url=plex_track.getWebURL(self._baseurl),
839 )
840 },
841 disc_number=plex_track.parentIndex or 0,
842 track_number=plex_track.trackNumber or 0,
843 )
844 # Check if track rating meets the configured threshold for favorites
845 favorite_threshold = cast("float", self.config.get_value(CONF_PLEX_FAVORITE_THRESHOLD))
846 # Try to get the user rating - Plex stores ratings as 0.0-10.0
847 if hasattr(plex_track, "userRating") and plex_track.userRating is not None:
848 track.favorite = float(plex_track.userRating) >= favorite_threshold
849
850 if plex_track.originalTitle and plex_track.originalTitle != plex_track.grandparentTitle:
851 # The artist of the track if different from the album's artist.
852 # For this kind of artist, we just know the name, so we create a fake artist,
853 # if it does not already exist.
854 track.artists.append(
855 await self._get_or_create_artist_by_name(plex_track.originalTitle or UNKNOWN_ARTIST)
856 )
857 elif plex_track.grandparentKey:
858 track.artists.append(
859 self._get_item_mapping(
860 MediaType.ARTIST,
861 plex_track.grandparentKey,
862 plex_track.grandparentTitle or UNKNOWN_ARTIST,
863 )
864 )
865 else:
866 msg = "No artist was found for track"
867 raise InvalidDataError(msg)
868
869 if thumb := plex_track.firstAttr("thumb", "parentThumb", "grandparentThumb"):
870 track.metadata.images = UniqueList(
871 [
872 MediaItemImage(
873 type=ImageType.THUMB,
874 path=thumb,
875 provider=self.instance_id,
876 remotely_accessible=False,
877 )
878 ]
879 )
880 if plex_track.parentKey:
881 track.album = self._get_item_mapping(
882 MediaType.ALBUM, plex_track.parentKey, plex_track.parentTitle
883 )
884 if plex_track.duration:
885 track.duration = int(plex_track.duration / 1000)
886 if plex_track.chapters:
887 pass # TODO!
888
889 return track
890
891 @use_cache(3600) # Cache for 1 hour
892 async def search(
893 self,
894 search_query: str,
895 media_types: list[MediaType],
896 limit: int = 20,
897 ) -> SearchResults:
898 """Perform search on the plex library.
899
900 :param search_query: Search query.
901 :param media_types: A list of media_types to include.
902 :param limit: Number of items to return in the search (per type).
903 """
904 artists = None
905 albums = None
906 tracks = None
907 playlists = None
908
909 async with TaskGroup() as tg:
910 if MediaType.ARTIST in media_types:
911 artists = tg.create_task(
912 self._search_and_parse(
913 self._search_artist(search_query, limit), self._parse_artist
914 )
915 )
916
917 if MediaType.ALBUM in media_types:
918 albums = tg.create_task(
919 self._search_and_parse(
920 self._search_album(search_query, limit), self._parse_album
921 )
922 )
923
924 if MediaType.TRACK in media_types:
925 tracks = tg.create_task(
926 self._search_and_parse(
927 self._search_track(search_query, limit), self._parse_track
928 )
929 )
930
931 if MediaType.PLAYLIST in media_types:
932 playlists = tg.create_task(
933 self._search_and_parse(
934 self._search_playlist(search_query, limit),
935 self._parse_playlist,
936 )
937 )
938
939 search_results = SearchResults()
940
941 if artists:
942 search_results.artists = artists.result()
943
944 if albums:
945 search_results.albums = albums.result()
946
947 if tracks:
948 search_results.tracks = tracks.result()
949
950 if playlists:
951 search_results.playlists = playlists.result()
952
953 return search_results
954
955 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
956 """Retrieve all library artists from Plex Music."""
957 artists_obj = await self._run_async(self._plex_library.all)
958 for artist in artists_obj:
959 yield await self._parse_artist(artist)
960
961 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
962 """Retrieve all library albums from Plex Music."""
963 albums_obj = await self._run_async(self._plex_library.albums)
964 for album in albums_obj:
965 yield await self._parse_album(album)
966
967 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
968 """Retrieve all library playlists from the provider."""
969 playlists_obj = await self._run_async(self._plex_library.playlists)
970 for playlist in playlists_obj:
971 yield await self._parse_playlist(playlist)
972
973 # Import collections as playlists if enabled
974 if self.config.get_value(CONF_IMPORT_COLLECTIONS):
975 collections_obj = await self._run_async(self._plex_library.collections)
976 for collection in collections_obj:
977 yield await self._parse_collection(collection)
978
979 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
980 """Retrieve library tracks from Plex Music."""
981 page_size = 500
982 offset = 0
983 while True:
984 batch = cast(
985 "list[PlexTrack]",
986 await self._run_async(
987 self._plex_library.searchTracks,
988 title=None,
989 container_size=page_size,
990 container_start=offset,
991 ),
992 )
993 if not batch:
994 break
995 for plex_track in batch:
996 yield await self._parse_track(plex_track)
997 offset += page_size
998
999 @use_cache(3600 * 3) # Cache for 3 hours
1000 async def get_album(self, prov_album_id: str) -> Album:
1001 """Get full album details by id."""
1002 if plex_album := await self._get_data(prov_album_id, PlexAlbum):
1003 return await self._parse_album(plex_album)
1004 msg = f"Item {prov_album_id} not found"
1005 raise MediaNotFoundError(msg)
1006
1007 @use_cache(3600 * 3) # Cache for 3 hours
1008 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
1009 """Get album tracks for given album id."""
1010 plex_album: PlexAlbum = await self._get_data(prov_album_id, PlexAlbum)
1011 tracks = []
1012 for plex_track in await self._run_async(plex_album.tracks):
1013 track = await self._parse_track(
1014 plex_track,
1015 )
1016 tracks.append(track)
1017 return tracks
1018
1019 @use_cache(3600 * 3) # Cache for 3 hours
1020 async def get_artist(self, prov_artist_id: str) -> Artist:
1021 """Get full artist details by id."""
1022 if prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
1023 # This artist does not exist in plex, so we can just load it from DB.
1024
1025 if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
1026 prov_artist_id, self.instance_id
1027 ):
1028 return db_artist
1029 msg = f"Artist not found: {prov_artist_id}"
1030 raise MediaNotFoundError(msg)
1031
1032 if plex_artist := await self._get_data(prov_artist_id, PlexArtist):
1033 return await self._parse_artist(plex_artist)
1034 msg = f"Item {prov_artist_id} not found"
1035 raise MediaNotFoundError(msg)
1036
1037 @use_cache(3600 * 3) # Cache for 3 hours
1038 async def get_track(self, prov_track_id: str) -> Track:
1039 """Get full track details by id."""
1040 if plex_track := await self._get_data(prov_track_id, PlexTrack):
1041 return await self._parse_track(plex_track)
1042 msg = f"Item {prov_track_id} not found"
1043 raise MediaNotFoundError(msg)
1044
1045 @use_cache(3600 * 3) # Cache for 3 hours
1046 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
1047 """Get full playlist details by id."""
1048 # Check if this is a collection (collections have the format "collection:<key>")
1049 if prov_playlist_id.startswith("collection:"):
1050 # Extract the collection key
1051 collection_key = prov_playlist_id.replace("collection:", "")
1052 # Fetch the collection
1053 if plex_collection := await self._run_async(
1054 self._plex_library.fetchItem, collection_key
1055 ):
1056 return await self._parse_collection(plex_collection)
1057 msg = f"Collection {prov_playlist_id} not found"
1058 raise MediaNotFoundError(msg)
1059
1060 if plex_playlist := await self._get_data(prov_playlist_id, PlexPlaylist):
1061 return await self._parse_playlist(plex_playlist)
1062 msg = f"Item {prov_playlist_id} not found"
1063 raise MediaNotFoundError(msg)
1064
1065 @use_cache(3600 * 3) # Cache for 3 hours
1066 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
1067 """Get playlist tracks."""
1068 result: list[Track] = []
1069 if page > 0:
1070 # paging not supported, we always return the whole list at once
1071 return []
1072
1073 # Check if this is a collection (collections have the format "collection:<key>")
1074 if prov_playlist_id.startswith("collection:"):
1075 # Extract the collection key
1076 collection_key = prov_playlist_id.replace("collection:", "")
1077 # Fetch the collection
1078 plex_collection = await self._run_async(self._plex_library.fetchItem, collection_key)
1079 if not plex_collection:
1080 msg = f"Collection {prov_playlist_id} not found"
1081 raise MediaNotFoundError(msg)
1082 if not (collection_items := await self._run_async(plex_collection.items)):
1083 return result
1084 # Collections can contain tracks, albums, or artists - we only want tracks
1085 for item in collection_items:
1086 if item.type == "track":
1087 if track := await self._parse_track(item):
1088 track.position = len(result) + 1
1089 result.append(track)
1090 elif item.type == "album":
1091 # If the collection contains albums, get all tracks from each album
1092 album_tracks = await self.get_album_tracks(item.key)
1093 for album_track in album_tracks:
1094 album_track.position = len(result) + 1
1095 result.append(album_track)
1096 return result
1097
1098 plex_playlist: PlexPlaylist = await self._get_data(prov_playlist_id, PlexPlaylist)
1099 if not (playlist_items := await self._run_async(plex_playlist.items)):
1100 return result
1101 for index, plex_track in enumerate(playlist_items, 1):
1102 if track := await self._parse_track(plex_track):
1103 track.position = index
1104 result.append(track)
1105 return result
1106
1107 @use_cache(3600 * 3) # Cache for 3 hours
1108 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
1109 """Get a list of albums for the given artist."""
1110 if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
1111 plex_artist = await self._get_data(prov_artist_id, PlexArtist)
1112 plex_albums = cast("list[PlexAlbum]", await self._run_async(plex_artist.albums))
1113 if plex_albums:
1114 albums = []
1115 for album_obj in plex_albums:
1116 albums.append(await self._parse_album(album_obj))
1117 return albums
1118 return []
1119
1120 @use_cache(3600 * 3) # Cache for 3 hours
1121 async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
1122 """Get top tracks for the given artist using Plex artist radio/station."""
1123 if prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
1124 return []
1125
1126 try:
1127 plex_artist = await self._get_data(prov_artist_id, PlexArtist)
1128 # Get the artist radio station which contains top/popular tracks
1129 if station := await self._run_async(plex_artist.station):
1130 # Get tracks from the station
1131 station_tracks = await self._run_async(station.items)
1132 tracks = []
1133 for plex_track in station_tracks[:25]: # Limit to 25 top tracks
1134 if track := await self._parse_track(plex_track):
1135 tracks.append(track)
1136 self.logger.debug(
1137 "Retrieved %d top tracks for artist %s", len(tracks), prov_artist_id
1138 )
1139 return tracks
1140 self.logger.warning("No station available for artist %s", prov_artist_id)
1141 except Exception as err:
1142 self.logger.warning("Error getting top tracks for artist %s: %s", prov_artist_id, err)
1143 return []
1144
1145 @use_cache(3600 * 3) # Cache for 3 hours
1146 async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
1147 """Get similar tracks using Plex's sonicallySimilar feature."""
1148 try:
1149 plex_track = await self._get_data(prov_track_id, PlexTrack)
1150 # Get sonically similar tracks
1151 similar_tracks = await self._run_async(plex_track.sonicallySimilar, limit=limit)
1152 tracks = []
1153 for similar_track in similar_tracks:
1154 if track := await self._parse_track(similar_track):
1155 tracks.append(track)
1156 self.logger.debug(
1157 "Retrieved %d similar tracks for track %s", len(tracks), prov_track_id
1158 )
1159 return tracks
1160 except Exception as err:
1161 self.logger.warning("Error getting similar tracks for %s: %s", prov_track_id, err)
1162 return []
1163
1164 @use_cache(3600 * 3, cache_checksum="v2") # Cache for 3 hours
1165 async def recommendations(self) -> list[RecommendationFolder]:
1166 """Get recommendations from Plex hubs."""
1167 try:
1168 # Get the configured limit for items per hub
1169 limit_value = self.config.get_value(CONF_HUB_ITEMS_LIMIT)
1170 limit = int(limit_value) if isinstance(limit_value, (int, float, str)) else 10
1171
1172 # Fetch hubs from the music library section with count parameter
1173 # The section's hubs() method uses /hubs/sections/{key}?includeStations=1
1174 # We need to add the count parameter manually to limit items per hub
1175 key = f"/hubs/sections/{self._plex_library.key}?includeStations=1&count={limit}"
1176 hubs = await self._run_async(self._plex_library.fetchItems, key)
1177
1178 if not hubs:
1179 self.logger.debug("No hubs available from Plex")
1180 return []
1181
1182 self.logger.debug(
1183 "Fetching %d hubs (limit: %d items per hub)",
1184 len(hubs),
1185 limit,
1186 )
1187
1188 folders = []
1189 for hub in hubs:
1190 # Create a recommendation folder for each hub
1191 folder = RecommendationFolder(
1192 name=hub.title,
1193 item_id=f"{self.instance_id}_{hub.hubIdentifier}",
1194 provider=self.instance_id,
1195 icon="mdi-music",
1196 )
1197
1198 # Parse each item based on its type (limit to configured max)
1199 # Use _partialItems to respect the count limit from the hubs() call
1200 # rather than hub.items() which fetches ALL items if more is True
1201 # _partialItems is a cached property that's already loaded, so no need for async
1202 hub_items = hub._partialItems
1203 self.logger.debug(
1204 "Processing hub '%s' (%s) with %d partial items",
1205 hub.title,
1206 hub.hubIdentifier,
1207 len(hub_items),
1208 )
1209 for item in hub_items:
1210 try:
1211 # Skip items without type attribute
1212 if not hasattr(item, "type"):
1213 self.logger.debug(
1214 "Skipping item in hub '%s': no type attribute",
1215 hub.title,
1216 )
1217 continue
1218
1219 # Parse item based on its type
1220 if item.type == "track":
1221 folder.items.append(await self._parse_track(item))
1222 elif item.type == "album":
1223 folder.items.append(await self._parse_album(item))
1224 elif item.type == "artist":
1225 folder.items.append(await self._parse_artist(item))
1226 elif item.type == "playlist":
1227 folder.items.append(await self._parse_playlist(item))
1228 # Try to parse other types generically
1229 elif parsed_item := await self._parse(item):
1230 folder.items.append(parsed_item) # type: ignore[arg-type]
1231 else:
1232 self.logger.debug(
1233 "Skipping unsupported item type '%s' in hub '%s'",
1234 item.type,
1235 hub.title,
1236 )
1237 except Exception as err:
1238 self.logger.debug(
1239 "Failed to parse item (type: %s) in hub '%s': %s",
1240 getattr(item, "type", "unknown"),
1241 hub.title,
1242 str(err),
1243 )
1244 continue
1245
1246 # Only add folder if it has items
1247 if folder.items:
1248 folders.append(folder)
1249 self.logger.debug(
1250 "Added hub '%s' (%s) with %d items",
1251 hub.title,
1252 hub.hubIdentifier,
1253 len(folder.items),
1254 )
1255 else:
1256 self.logger.debug(
1257 "Skipping hub '%s' (%s): no items after parsing",
1258 hub.title,
1259 hub.hubIdentifier,
1260 )
1261
1262 self.logger.debug("Retrieved %d recommendation folders from Plex", len(folders))
1263 return folders
1264
1265 except Exception as err:
1266 self.logger.warning("Error getting recommendations from Plex: %s", err)
1267 return []
1268
1269 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
1270 """Get streamdetails for a track."""
1271 plex_track = await self._get_data(item_id, PlexTrack)
1272 if not plex_track or not plex_track.media:
1273 msg = f"track {item_id} not found"
1274 raise MediaNotFoundError(msg)
1275
1276 media: PlexMedia = plex_track.media[0]
1277
1278 content_type = (
1279 ContentType.try_parse(media.container) if media.container else ContentType.UNKNOWN
1280 )
1281 media_part: PlexMediaPart = media.parts[0]
1282 audio_stream: PlexAudioStream = media_part.audioStreams()[0]
1283
1284 stream_details = StreamDetails(
1285 item_id=plex_track.key,
1286 provider=self.instance_id,
1287 audio_format=AudioFormat(
1288 content_type=content_type,
1289 channels=media.audioChannels,
1290 ),
1291 stream_type=StreamType.HTTP,
1292 duration=plex_track.duration,
1293 data=plex_track,
1294 can_seek=True,
1295 allow_seek=True,
1296 )
1297
1298 if content_type != ContentType.M4A:
1299 stream_details.path = self._plex_server.url(media_part.key, True)
1300 if audio_stream.samplingRate:
1301 stream_details.audio_format.sample_rate = audio_stream.samplingRate
1302 if audio_stream.bitDepth:
1303 stream_details.audio_format.bit_depth = audio_stream.bitDepth
1304
1305 else:
1306 url = plex_track.getStreamURL()
1307 media_info = await async_parse_tags(url)
1308 stream_details.path = url
1309 stream_details.audio_format.channels = media_info.channels
1310 stream_details.audio_format.content_type = ContentType.try_parse(media_info.format)
1311 stream_details.audio_format.sample_rate = media_info.sample_rate
1312 stream_details.audio_format.bit_depth = media_info.bits_per_sample
1313
1314 return stream_details
1315
1316 async def get_myplex_account_and_refresh_token(self, auth_token: str) -> MyPlexAccount:
1317 """Get a MyPlexAccount object and refresh the token if needed."""
1318 if auth_token == AUTH_TOKEN_UNAUTH:
1319 return self._myplex_account
1320
1321 def _refresh_plex_token() -> MyPlexAccount:
1322 if self._myplex_account is None:
1323 myplex_account = MyPlexAccount(token=auth_token)
1324 self._myplex_account = myplex_account
1325 self._myplex_account.ping()
1326 return self._myplex_account
1327
1328 return await asyncio.to_thread(_refresh_plex_token)
1329
1330 async def set_favorite(self, prov_item_id: str, media_type: MediaType, favorite: bool) -> None:
1331 """Set favorite status by setting rating in Plex."""
1332 if favorite:
1333 # Set like rating
1334 rating = cast("float", self.config.get_value(CONF_PLEX_LIKE_RATING))
1335 else:
1336 # Set unlike rating
1337 rating = cast("float", self.config.get_value(CONF_PLEX_UNLIKE_RATING))
1338
1339 if media_type == MediaType.TRACK:
1340 plex_track = await self._get_data(prov_item_id, PlexTrack)
1341 await self._run_async(plex_track.rate, rating)
1342 self.logger.debug(
1343 "Set Plex rating to %s for track with ID %s (ratingKey: %s)",
1344 rating,
1345 prov_item_id,
1346 plex_track.ratingKey,
1347 )
1348 elif media_type == MediaType.ALBUM:
1349 plex_album = await self._get_data(prov_item_id, PlexAlbum)
1350 await self._run_async(plex_album.rate, rating)
1351 self.logger.debug(
1352 "Set Plex rating to %s for album with ID %s (ratingKey: %s)",
1353 rating,
1354 prov_item_id,
1355 plex_album.ratingKey,
1356 )
1357