/
/
/
1"""Emby Music Provider for MusicAssistant."""
2
3from __future__ import annotations
4
5import hashlib
6import socket
7from asyncio import TaskGroup
8from collections.abc import AsyncGenerator
9from typing import TYPE_CHECKING, Any
10from urllib.parse import urljoin
11
12from aiohttp import ClientResponseError
13from music_assistant_models.config_entries import (
14 ConfigEntry,
15 ConfigValueType,
16 ProviderConfig,
17)
18from music_assistant_models.enums import (
19 ConfigEntryType,
20 MediaType,
21 ProviderFeature,
22 StreamType,
23)
24from music_assistant_models.errors import (
25 LoginFailed,
26 MediaNotFoundError,
27 ProviderPermissionDenied,
28)
29from music_assistant_models.media_items import (
30 Album,
31 Artist,
32 AudioFormat,
33 Playlist,
34 SearchResults,
35 Track,
36)
37from music_assistant_models.streamdetails import StreamDetails
38
39from music_assistant.controllers.cache import use_cache
40from music_assistant.mass import MusicAssistant
41from music_assistant.models import ProviderInstanceType
42from music_assistant.models.music_provider import MusicProvider
43from music_assistant.providers.emby.const import (
44 ALBUM_FIELDS,
45 ARTIST_FIELDS,
46 AUTH_ACCESS_TOKEN,
47 AUTH_USER,
48 ITEM_KEY_COLLECTION_TYPE,
49 ITEM_KEY_ID,
50 ITEM_KEY_MEDIA_STREAMS,
51 ITEM_LIMIT,
52 ITEMS,
53 SUPPORTED_CONTAINER_FORMATS,
54 TRACK_FIELDS,
55)
56from music_assistant.providers.emby.parsers import (
57 parse_album,
58 parse_artist,
59 parse_playlist,
60 parse_track,
61)
62
63if TYPE_CHECKING:
64 from music_assistant_models.provider import ProviderManifest
65
66from music_assistant.constants import (
67 APPLICATION_NAME,
68 CONF_IP_ADDRESS,
69 CONF_PASSWORD,
70 CONF_USERNAME,
71)
72
73SUPPORTED_FEATURES = {
74 ProviderFeature.LIBRARY_ARTISTS,
75 ProviderFeature.LIBRARY_ALBUMS,
76 ProviderFeature.LIBRARY_TRACKS,
77 ProviderFeature.LIBRARY_PLAYLISTS,
78 ProviderFeature.BROWSE,
79 ProviderFeature.SEARCH,
80 ProviderFeature.ARTIST_ALBUMS,
81 ProviderFeature.ARTIST_TOPTRACKS,
82 ProviderFeature.SIMILAR_TRACKS,
83}
84
85
86async def setup(
87 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
88) -> ProviderInstanceType:
89 """Initialize provider(instance) with given configuration."""
90 return EmbyProvider(mass, manifest, config, SUPPORTED_FEATURES)
91
92
93async def get_config_entries(
94 mass: MusicAssistant,
95 instance_id: str | None = None,
96 action: str | None = None,
97 values: dict[str, ConfigValueType] | None = None,
98) -> tuple[ConfigEntry, ...]:
99 """Get configuration entries for provider setup."""
100 # ruff: noqa: ARG001
101 return (
102 ConfigEntry(
103 key=CONF_IP_ADDRESS,
104 type=ConfigEntryType.STRING,
105 label="Server",
106 required=True,
107 description="The url of the Emby server to connect to.",
108 ),
109 ConfigEntry(
110 key=CONF_USERNAME,
111 type=ConfigEntryType.STRING,
112 label="Username",
113 required=True,
114 description="The username to authenticate to the remote server.",
115 ),
116 ConfigEntry(
117 key=CONF_PASSWORD,
118 type=ConfigEntryType.SECURE_STRING,
119 label="Password",
120 required=False,
121 description="The password to authenticate to the remote server.",
122 ),
123 )
124
125
126class EmbyProvider(MusicProvider):
127 """Provider for an Emby music library (uses Emby REST API)."""
128
129 async def handle_async_init(self) -> None:
130 """Initialize provider(instance) with given configuration."""
131 username = str(self.config.get_value(CONF_USERNAME))
132 password = str(self.config.get_value(CONF_PASSWORD) or "")
133 self._base_url = str(self.config.get_value(CONF_IP_ADDRESS)).rstrip("/") + "/"
134 self._session = self.mass.http_session
135
136 # stable device id
137 device_id = hashlib.sha256(f"{self.mass.server_id}+{username}".encode()).hexdigest()
138 self._device_id = device_id
139 self._device_name = socket.gethostname()
140
141 # authenticate against Emby /Users/AuthenticateByName
142 auth_url = urljoin(self._base_url, "Users/AuthenticateByName")
143 payload = {"Username": username, "Pw": password}
144 headers = {
145 "Accept": "application/json",
146 "X-Emby-Authorization": (
147 f'MediaBrowser Client="{APPLICATION_NAME}", '
148 f'Device="{self._device_name}", '
149 f'DeviceId="{device_id}", '
150 f'Version="{self.mass.version}"'
151 ),
152 }
153 try:
154 async with self._session.post(auth_url, json=payload, headers=headers) as resp:
155 resp.raise_for_status()
156 data = await resp.json()
157 except ClientResponseError as err:
158 if err.status == 401:
159 raise LoginFailed("Unauthorized: invalid credentials") from err
160 if err.status == 403:
161 raise ProviderPermissionDenied("Forbidden: insufficient permissions") from err
162 if err.status == 404:
163 raise MediaNotFoundError("Authentication endpoint not found") from err
164 raise
165
166 # store token and user id
167 token = data.get(AUTH_ACCESS_TOKEN)
168 user = data.get(AUTH_USER)
169 if not token or not user:
170 raise LoginFailed("Authentication failed: missing token/user in response")
171 self._token = token
172 self._user_id = user.get(ITEM_KEY_ID)
173 self._headers = {
174 "Accept": "application/json",
175 "X-Emby-Token": self._token,
176 "X-Emby-Authorization": (
177 f'MediaBrowser Client="{APPLICATION_NAME}", '
178 f'Device="{self._device_name}", '
179 f'DeviceId="{device_id}", '
180 f'Version="{self.mass.version}", '
181 f'Token="{self._token}"'
182 ),
183 }
184
185 @property
186 def is_streaming_provider(self) -> bool:
187 """Return True if provider supports streaming."""
188 return False
189
190 async def _get(self, path: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
191 url = urljoin(self._base_url, path.lstrip("/"))
192 try:
193 async with self._session.get(url, headers=self._headers, params=params) as resp:
194 resp.raise_for_status()
195 return await resp.json() # type: ignore[no-any-return]
196 except ClientResponseError as err:
197 if err.status == 401:
198 raise LoginFailed("Unauthorized: invalid credentials") from err
199 if err.status == 403:
200 raise ProviderPermissionDenied("Forbidden: insufficient permissions") from err
201 if err.status == 404:
202 raise MediaNotFoundError(f"Item {path} not found") from err
203 raise
204
205 async def _search_items(
206 self, search_query: str, include_types: str, fields: list[str], limit: int
207 ) -> list[dict[str, Any]]:
208 params = {
209 "SearchTerm": search_query,
210 "IncludeItemTypes": include_types,
211 "EnableUserData": "true",
212 "Fields": ",".join(fields or []),
213 "Limit": str(limit),
214 "Recursive": "true",
215 }
216 resp = await self._get(f"Users/{self._user_id}/Items", params=params)
217 return resp.get(ITEMS, []) # type: ignore[no-any-return]
218
219 async def _search_track(self, search_query: str, limit: int) -> list[Track]:
220 items = await self._search_items(search_query, "Audio", TRACK_FIELDS, limit)
221 return [parse_track(self.instance_id, self, item) for item in items]
222
223 async def _search_album(self, search_query: str, limit: int) -> list[Album]:
224 albumname = search_query.split(" - ", 1)[1] if " - " in search_query else search_query
225 items = await self._search_items(albumname, "MusicAlbum", ALBUM_FIELDS, limit)
226 return [parse_album(self.instance_id, self, item) for item in items]
227
228 async def _search_artist(self, search_query: str, limit: int) -> list[Artist]:
229 items = await self._search_items(search_query, "MusicArtist", ARTIST_FIELDS, limit)
230 return [parse_artist(self.instance_id, self, item) for item in items]
231
232 async def _search_playlist(self, search_query: str, limit: int) -> list[Playlist]:
233 items = await self._search_items(search_query, "Playlist", [], limit)
234 return [parse_playlist(self.instance_id, self, item) for item in items]
235
236 @use_cache(60 * 15)
237 async def search(
238 self,
239 search_query: str,
240 media_types: list[MediaType],
241 limit: int = 20,
242 ) -> SearchResults:
243 """Search for media items in the Emby library."""
244 artists = None
245 albums = None
246 tracks = None
247 playlists = None
248
249 async with TaskGroup() as tg:
250 if MediaType.ARTIST in media_types:
251 artists = tg.create_task(self._search_artist(search_query, limit))
252 if MediaType.ALBUM in media_types:
253 albums = tg.create_task(self._search_album(search_query, limit))
254 if MediaType.TRACK in media_types:
255 tracks = tg.create_task(self._search_track(search_query, limit))
256 if MediaType.PLAYLIST in media_types:
257 playlists = tg.create_task(self._search_playlist(search_query, limit))
258
259 search_results = SearchResults()
260 if artists:
261 search_results.artists = artists.result()
262 if albums:
263 search_results.albums = albums.result()
264 if tracks:
265 search_results.tracks = tracks.result()
266 if playlists:
267 search_results.playlists = playlists.result()
268 return search_results
269
270 async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
271 """Yield all artists from the music library."""
272 libs = await self._get_music_libraries()
273 for lib in libs:
274 params = {
275 "ParentId": lib[ITEM_KEY_ID],
276 "IncludeItemTypes": "MusicArtist",
277 "EnableUserData": "true",
278 "Fields": ",".join(ARTIST_FIELDS),
279 "Recursive": "true",
280 }
281 page = 0
282 while True:
283 params["StartIndex"] = str(page * ITEM_LIMIT)
284 params["Limit"] = ITEM_LIMIT
285 resp = await self._get(f"Users/{self._user_id}/Items", params=params)
286 items = resp.get(ITEMS, [])
287 if not items:
288 break
289 for artist in items:
290 yield parse_artist(self.instance_id, self, artist)
291 page += 1
292
293 async def get_library_albums(self) -> AsyncGenerator[Album, None]:
294 """Yield all albums from the music library."""
295 libs = await self._get_music_libraries()
296 for lib in libs:
297 params = {
298 "ParentId": lib[ITEM_KEY_ID],
299 "IncludeItemTypes": "MusicAlbum",
300 "EnableUserData": "true",
301 "Fields": ",".join(ALBUM_FIELDS),
302 "Recursive": "true",
303 }
304 page = 0
305 while True:
306 params["StartIndex"] = str(page * ITEM_LIMIT)
307 params["Limit"] = ITEM_LIMIT
308 resp = await self._get(f"Users/{self._user_id}/Items", params=params)
309 items = resp.get(ITEMS, [])
310 if not items:
311 break
312 for album in items:
313 yield parse_album(self.instance_id, self, album)
314 page += 1
315
316 async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
317 """Yield all tracks from the music library."""
318 libs = await self._get_music_libraries()
319 for lib in libs:
320 params = {
321 "ParentId": lib[ITEM_KEY_ID],
322 "IncludeItemTypes": "Audio",
323 "EnableUserData": "true",
324 "Fields": ",".join(TRACK_FIELDS),
325 "Recursive": "true",
326 }
327 page = 0
328 while True:
329 params["StartIndex"] = str(page * ITEM_LIMIT)
330 params["Limit"] = ITEM_LIMIT
331 resp = await self._get(f"Users/{self._user_id}/Items", params=params)
332 items = resp.get(ITEMS, [])
333 if not items:
334 break
335 for track in items:
336 if not len(track.get(ITEM_KEY_MEDIA_STREAMS, [])):
337 continue
338 yield parse_track(self.instance_id, self, track)
339 page += 1
340
341 async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
342 """Yield all playlists from the music library."""
343 libs = await self._get_music_libraries()
344 for lib in libs:
345 params = {
346 "ParentId": lib[ITEM_KEY_ID],
347 "IncludeItemTypes": "Playlist",
348 "EnableUserData": "true",
349 "Recursive": "true",
350 }
351 page = 0
352 while True:
353 params["StartIndex"] = str(page * ITEM_LIMIT)
354 params["Limit"] = ITEM_LIMIT
355 resp = await self._get(f"Users/{self._user_id}/Items", params=params)
356 items = resp.get(ITEMS, [])
357 if not items:
358 break
359 for playlist in items:
360 yield parse_playlist(self.instance_id, self, playlist)
361 page += 1
362
363 @use_cache(3600)
364 async def get_album(self, prov_album_id: str) -> Album:
365 """Get album by provider album id."""
366 album = await self._get(
367 f"Users/{self._user_id}/Items/{prov_album_id}",
368 params={
369 "EnableUserData": "true",
370 "Fields": ",".join(ALBUM_FIELDS),
371 "Recursive": "true",
372 },
373 )
374 return parse_album(self.instance_id, self, album)
375
376 @use_cache(3600)
377 async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
378 """Get tracks for a given album by provider album id."""
379 params = {
380 "ParentId": prov_album_id,
381 "IncludeItemTypes": "Audio",
382 "EnableUserData": "true",
383 "Fields": ",".join(TRACK_FIELDS),
384 "Limit": ITEM_LIMIT,
385 "Recursive": "true",
386 }
387 resp = await self._get(f"Users/{self._user_id}/Items", params=params)
388 return [parse_track(self.instance_id, self, item) for item in resp.get(ITEMS, [])]
389
390 @use_cache(60 * 15)
391 async def get_artist(self, prov_artist_id: str) -> Artist:
392 """Get artist by provider artist id."""
393 artist_data = await self._get(
394 f"Users/{self._user_id}/Items/{prov_artist_id}",
395 params={"EnableUserData": "true", "Fields": ",".join(ARTIST_FIELDS)},
396 )
397
398 return parse_artist(self.instance_id, self, artist_data)
399
400 @use_cache(3600)
401 async def get_artist_toptracks(self, prov_artist_id: str, limit: int = 25) -> list[Track]:
402 """Get top tracks for a given artist by provider artist id."""
403 params = {
404 "ArtistIds": prov_artist_id,
405 "IncludeItemTypes": "Audio",
406 "EnableUserData": "true",
407 "Fields": ",".join(TRACK_FIELDS),
408 "Recursive": "true",
409 "Limit": str(limit),
410 "SortBy": "PlayCount",
411 "SortOrder": "Descending",
412 }
413 resp = await self._get(f"Users/{self._user_id}/Items", params=params)
414 return [parse_track(self.instance_id, self, item) for item in resp.get(ITEMS, [])]
415
416 @use_cache(60 * 15)
417 async def get_track(self, prov_track_id: str) -> Track:
418 """Get track by provider track id."""
419 track = await self._get(
420 f"Users/{self._user_id}/Items/{prov_track_id}",
421 params={"EnableUserData": "true", "Fields": ",".join(TRACK_FIELDS)},
422 )
423
424 return parse_track(self.instance_id, self, track)
425
426 @use_cache(60 * 15)
427 async def get_playlist(self, prov_playlist_id: str) -> Playlist:
428 """Get playlist by provider playlist id."""
429 playlist = await self._get(
430 f"Users/{self._user_id}/Items/{prov_playlist_id}",
431 params={"EnableUserData": "true"},
432 )
433
434 return parse_playlist(self.instance_id, self, playlist)
435
436 @use_cache(3600)
437 async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
438 """Get tracks for a given playlist by provider playlist id."""
439 result: list[Track] = []
440 params = {
441 "ParentId": prov_playlist_id,
442 "IncludeItemTypes": "Audio",
443 "EnableUserData": "true",
444 "Fields": ",".join(TRACK_FIELDS),
445 "Limit": ITEM_LIMIT,
446 "StartIndex": str(page * ITEM_LIMIT),
447 }
448 resp = await self._get(f"Users/{self._user_id}/Items", params=params)
449 for index, item in enumerate(resp.get(ITEMS, []), 1):
450 pos = (page * ITEM_LIMIT) + index
451 if track := parse_track(self.instance_id, self, item):
452 track.position = pos
453 result.append(track)
454
455 return result
456
457 @use_cache(3600)
458 async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
459 """Get albums for a given artist by provider artist id."""
460 params = {
461 "AlbumArtistIds": prov_artist_id,
462 "IncludeItemTypes": "MusicAlbum",
463 "Fields": ",".join(ALBUM_FIELDS),
464 "EnableUserData": "true",
465 "Recursive": "true",
466 }
467 resp = await self._get(f"Users/{self._user_id}/Items", params=params)
468 return [parse_album(self.instance_id, self, album) for album in resp.get(ITEMS, [])]
469
470 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
471 """Get stream details for given item id and media type."""
472 track = await self.get_track(item_id)
473 # build universal audio URL (include token as query param for convenience)
474 container = ",".join(SUPPORTED_CONTAINER_FORMATS)
475 url = urljoin(self._base_url, f"Audio/{track.item_id}/universal")
476 params = {"Container": container, "api_key": self._token}
477 query = "&".join([f"{k}={v}" for k, v in params.items()])
478 return StreamDetails(
479 item_id=track.item_id,
480 provider=self.instance_id,
481 audio_format=AudioFormat(),
482 stream_type=StreamType.HTTP,
483 duration=int(track.duration) if getattr(track, "duration", None) else 0,
484 path=f"{url}?{query}",
485 can_seek=True,
486 allow_seek=True,
487 )
488
489 @use_cache(3600)
490 async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
491 """Get similar tracks."""
492 resp = await self._get(
493 f"Items/{prov_track_id}/Similar",
494 params={"Limit": str(limit), "Fields": ",".join(TRACK_FIELDS)},
495 )
496
497 return [parse_track(self.instance_id, self, t) for t in resp.get(ITEMS, [])]
498
499 async def _get_music_libraries(self) -> list[dict[str, Any]]:
500 resp = await self._get("Library/MediaFolders")
501 libs = resp.get(ITEMS, [])
502 result = []
503 for library in libs:
504 if ITEM_KEY_COLLECTION_TYPE in library:
505 collection_type = library.get(ITEM_KEY_COLLECTION_TYPE, "").lower()
506 if collection_type == "music":
507 result.append(library)
508 return result
509