/
/
/
1"""
2BBC Sounds music provider support for MusicAssistant.
3
4TODO implement seeking of live stream
5TODO watch for settings change
6TODO add podcast menu to non-UK menu
7FIXME skipping in non-live radio shows restarts the stream but keeps the seek time
8"""
9
10from __future__ import annotations
11
12import asyncio
13from collections.abc import AsyncGenerator
14from datetime import timedelta
15from typing import TYPE_CHECKING, Literal
16
17from music_assistant_models.config_entries import (
18 ConfigEntry,
19 ConfigValueOption,
20 ConfigValueType,
21 ProviderConfig,
22)
23from music_assistant_models.enums import ConfigEntryType, ImageType, MediaType, ProviderFeature
24from music_assistant_models.errors import LoginFailed, MusicAssistantError
25from music_assistant_models.media_items import (
26 BrowseFolder,
27 ItemMapping,
28 MediaItemImage,
29 MediaItemMetadata,
30 MediaItemType,
31 Podcast,
32 PodcastEpisode,
33 ProviderMapping,
34 Radio,
35 RecommendationFolder,
36 SearchResults,
37 Track,
38)
39from music_assistant_models.streamdetails import StreamMetadata
40from music_assistant_models.unique_list import UniqueList
41
42import music_assistant.helpers.datetime as dt
43from music_assistant.constants import CONF_PASSWORD, CONF_USERNAME
44from music_assistant.controllers.cache import use_cache
45from music_assistant.helpers.datetime import LOCAL_TIMEZONE
46from music_assistant.models.music_provider import MusicProvider
47from music_assistant.providers.bbc_sounds.adaptor import Adaptor
48
49if TYPE_CHECKING:
50 from collections.abc import Sequence
51
52 from music_assistant_models.provider import ProviderManifest
53 from music_assistant_models.streamdetails import StreamDetails
54 from sounds.models import SoundsTypes
55
56 from music_assistant.mass import MusicAssistant
57 from music_assistant.models import ProviderInstanceType
58
59from sounds import (
60 Container,
61 LiveStation,
62 Menu,
63 MenuRecommendationOptions,
64 PlayStatus,
65 RadioShow,
66 Segment,
67 SoundsClient,
68 exceptions,
69)
70from sounds import PodcastEpisode as SoundsPodcastEpisode
71
72SUPPORTED_FEATURES = {
73 ProviderFeature.BROWSE,
74 ProviderFeature.RECOMMENDATIONS,
75 ProviderFeature.SEARCH,
76}
77
78FEATURES = {"now_playing": True, "catchup_segments": True, "check_blank_image": False}
79
80type _StreamTypes = Literal["hls", "dash"]
81
82
83async def setup(
84 mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
85) -> ProviderInstanceType:
86 """Create new provider instance."""
87 instance = BBCSoundsProvider(mass, manifest, config, SUPPORTED_FEATURES)
88 await instance.handle_async_init()
89 return instance
90
91
92async def get_config_entries(
93 mass: MusicAssistant,
94 instance_id: str | None = None,
95 action: str | None = None,
96 values: dict[str, ConfigValueType] | None = None,
97) -> tuple[ConfigEntry, ...]:
98 """
99 Return Config entries to setup this provider.
100
101 instance_id: id of an existing provider instance (None if new instance setup).
102 action: [optional] action key called from config entries UI.
103 values: the (intermediate) raw values for config entries sent with the action.
104 """
105 # ruff: noqa: ARG001
106
107 return (
108 ConfigEntry(
109 key=_Constants.CONF_INTRO,
110 type=ConfigEntryType.LABEL,
111 label="A BBC Sounds account is optional, but some UK-only content may not work without"
112 " it",
113 ),
114 ConfigEntry(
115 key=CONF_USERNAME,
116 type=ConfigEntryType.STRING,
117 label="Email or username",
118 required=False,
119 ),
120 ConfigEntry(
121 key=CONF_PASSWORD,
122 type=ConfigEntryType.SECURE_STRING,
123 label="Password",
124 required=False,
125 ),
126 ConfigEntry(
127 key=_Constants.CONF_SHOW_LOCAL,
128 advanced=True,
129 type=ConfigEntryType.BOOLEAN,
130 label="Show local radio stations?",
131 default_value=False,
132 ),
133 ConfigEntry(
134 key=_Constants.CONF_STREAM_FORMAT,
135 advanced=True,
136 label="Preferred stream format",
137 type=ConfigEntryType.STRING,
138 options=[
139 ConfigValueOption(
140 "HLS",
141 _Constants.CONF_STREAM_FORMAT_HLS,
142 ),
143 ConfigValueOption(
144 "MPEG-DASH",
145 _Constants.CONF_STREAM_FORMAT_DASH,
146 ),
147 ],
148 default_value=_Constants.CONF_STREAM_FORMAT_HLS,
149 ),
150 )
151
152
153class _Constants:
154 # This is the image id that is shown when there's no track image
155 BLANK_IMAGE_NAME: str = "p0bqcdzf"
156 DEFAULT_IMAGE_SIZE = 1280
157 TRACK_DURATION_THRESHOLD: int = 300 # 5 minutes
158 NOW_PLAYING_REFRESH_TIME: int = 5
159 HLS: Literal["hls"] = "hls"
160 DASH: Literal["dash"] = "dash"
161 CONF_SHOW_LOCAL: str = "show_local"
162 CONF_INTRO: str = "intro"
163 CONF_STREAM_FORMAT: str = "stream_format"
164 CONF_STREAM_FORMAT_HLS: str = HLS
165 CONF_STREAM_FORMAT_DASH: str = DASH
166 DEFAULT_EXPIRATION = 60 * 60 * 24 * 30 # 30 days
167 SHORT_EXPIRATION = 60 * 60 * 3 # 3 hours
168
169
170class BBCSoundsProvider(MusicProvider):
171 """A MusicProvider class to interact with the BBC Sounds API via auntie-sounds."""
172
173 client: SoundsClient
174 menu: Menu | None = None
175 current_task: asyncio.Task[None] | None = None
176
177 async def handle_async_init(self) -> None:
178 """Handle async initialization of the provider."""
179 self.client = SoundsClient(
180 session=self.mass.http_session,
181 logger=self.logger,
182 timezone=LOCAL_TIMEZONE,
183 )
184
185 self.show_local_stations: bool = bool(
186 self.config.get_value(_Constants.CONF_SHOW_LOCAL, False)
187 )
188 self.stream_format: _StreamTypes = (
189 _Constants.DASH
190 if self.config.get_value(_Constants.CONF_STREAM_FORMAT) == _Constants.DASH
191 else _Constants.HLS
192 )
193 self.adaptor = Adaptor(self)
194
195 # If we have an account, authenticate. Testing shows all features work without auth
196 # but BBC will be disabling BBC Sounds from outside the UK at some point
197 if self.config.get_value(CONF_USERNAME) and self.config.get_value(CONF_PASSWORD):
198 if self.client.auth.is_logged_in:
199 # Check if we need to reauth
200 try:
201 await self.client.personal.get_experience_menu()
202 return
203 except (exceptions.UnauthorisedError, exceptions.APIResponseError):
204 await self.client.auth.renew_session()
205
206 try:
207 await self.client.auth.authenticate(
208 username=str(self.config.get_value(CONF_USERNAME)),
209 password=str(self.config.get_value(CONF_PASSWORD)),
210 )
211 except exceptions.LoginFailedError as e:
212 raise LoginFailed(e)
213
214 async def loaded_in_mass(self) -> None:
215 """Do post-loaded actions."""
216 if not self.menu or (
217 isinstance(self.menu, Menu) and self.menu.sub_items and len(self.menu.sub_items) == 0
218 ):
219 is_uk_listener = await self.client.auth.is_uk_listener
220 if self.client.auth.is_logged_in and is_uk_listener:
221 await self._fetch_menu()
222
223 def _get_provider_mapping(self, item_id: str) -> ProviderMapping:
224 return ProviderMapping(
225 item_id=item_id,
226 provider_domain=self.domain,
227 provider_instance=self.instance_id,
228 )
229
230 async def _fetch_menu(self) -> None:
231 self.logger.debug("No cached menu, fetching from API")
232 self.menu = await self.client.personal.get_experience_menu(
233 recommendations=MenuRecommendationOptions.EXCLUDE
234 )
235
236 def _stream_error(self, item_id: str, media_type: MediaType) -> MusicAssistantError:
237 return MusicAssistantError(f"Couldn't get stream details for {item_id} ({media_type})")
238
239 @property
240 def is_streaming_provider(self) -> bool:
241 """Return True as the provider is a streaming provider."""
242 return True
243
244 @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
245 async def get_track(self, prov_track_id: str) -> Track:
246 """Get full track details by id."""
247 episode_info = await self.client.streaming.get_by_pid(
248 pid=prov_track_id, stream_format=self.stream_format
249 )
250 track = await self.adaptor.new_object(episode_info, force_type=Track)
251 if not isinstance(track, Track):
252 raise MusicAssistantError(f"Incorrect track returned for {prov_track_id}")
253 return track
254
255 @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
256 async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
257 # If we are requesting a previously-aired radio show, we lose access to the
258 # schedule time. The best we can find out from the API is original release
259 # date, so the stream title loses access to the air date
260 """Get full podcast episode details by id."""
261 self.logger.debug(f"Getting podcast episode for {prov_episode_id}")
262 episode = await self.client.streaming.get_podcast_episode(prov_episode_id)
263 ma_episode = await self.adaptor.new_object(episode, force_type=PodcastEpisode)
264 if not isinstance(ma_episode, PodcastEpisode):
265 raise MusicAssistantError(f"Incorrect format for podcast episode {prov_episode_id}")
266 return ma_episode
267
268 async def _get_playable_stream_details(
269 self, item_id: str, media_type: MediaType
270 ) -> StreamDetails:
271 episode_info = await self.client.streaming.get_by_pid(
272 item_id, include_stream=True, stream_format=self.stream_format
273 )
274 stream_details = await self.adaptor.new_streamable_object(episode_info)
275 if not stream_details:
276 raise self._stream_error(item_id, media_type)
277
278 if episode_info and FEATURES["catchup_segments"]:
279 stream_details.data = {"vpid": episode_info.id}
280 stream_details.stream_metadata_update_callback = self._update_on_demand_stream_metadata
281 stream_details.stream_metadata_update_interval = _Constants.NOW_PLAYING_REFRESH_TIME
282 return stream_details
283
284 async def _get_station_stream_details(self, item_id: str) -> StreamDetails:
285 self.logger.debug(f"Getting stream details for station {item_id}")
286 station = await self.client.stations.get_station(
287 item_id, include_stream=True, stream_format=self.stream_format
288 )
289 if not station:
290 raise MusicAssistantError(f"Couldn't get stream details for station {item_id}")
291
292 self.logger.debug(f"Found station: {station}")
293 if not station.stream:
294 raise MusicAssistantError(f"No stream found for {item_id}")
295
296 stream_details = await self.adaptor.new_streamable_object(station)
297
298 if not stream_details:
299 raise self._stream_error(item_id, MediaType.RADIO)
300
301 if FEATURES["now_playing"]:
302 stream_details.stream_metadata_update_callback = self._update_live_stream_metadata
303 stream_details.stream_metadata_update_interval = _Constants.NOW_PLAYING_REFRESH_TIME
304 return stream_details
305
306 async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
307 """Get streamdetails for a track/radio."""
308 self.logger.debug(f"Getting stream details for {item_id} ({media_type})")
309 if media_type in [MediaType.PODCAST_EPISODE, MediaType.TRACK]:
310 return await self._get_playable_stream_details(item_id, media_type)
311 return await self._get_station_stream_details(item_id)
312
313 async def _get_programme_segments(self, vpid: str) -> list[Segment] | None:
314 """Get on demand segments from cache or API."""
315 segments = await self.mass.cache.get(
316 provider=self.domain, key=f"programme_segments_{vpid}", default=False
317 )
318 if segments is False:
319 segments = await self.client.streaming.get_show_segments(vpid)
320 await self.mass.cache.set(
321 provider=self.domain,
322 key=f"programme_segments_{vpid}",
323 data=segments,
324 )
325 if isinstance(segments, list):
326 return segments
327 return None
328
329 async def _update_on_demand_stream_metadata(
330 self, stream_details: StreamDetails, elapsed_time: int
331 ) -> None:
332 """Get the currently playing segment (song) for on-demand episodes.
333
334 Called by the callback function in StreamDetails.
335 """
336 self.logger.debug("Updating on-demand stream metadata")
337 if not stream_details or not stream_details.stream_metadata:
338 return
339 # segments API required vpid which is not the same as pid
340 vpid = stream_details.data.get("vpid")
341 if vpid:
342 segments = await self._get_programme_segments(vpid=vpid)
343
344 if segments and isinstance(segments, list):
345 segment = next(
346 (
347 s
348 for s in segments
349 if s.offset
350 and int(s.offset.get("start")) <= elapsed_time < int(s.offset.get("end"))
351 ),
352 None,
353 )
354
355 if segment:
356 # Currently playing segment found, update metadata
357 stream_details.stream_metadata = self.now_playing_to_stream_metadata(segment)
358 else:
359 # No segment found for current time, reset to main episode info
360 stream_details = await self._get_playable_stream_details(
361 item_id=stream_details.item_id, media_type=stream_details.media_type
362 )
363
364 def now_playing_to_stream_metadata(self, now_playing: Segment) -> StreamMetadata:
365 """Convert now playing segment to StreamMetadata."""
366 title = now_playing.titles.get("secondary", "")
367 artist = now_playing.titles.get("primary", "")
368 image_url = now_playing.image_url
369 if image_url and _Constants.BLANK_IMAGE_NAME in image_url:
370 image_url = None
371 return StreamMetadata(title=title, artist=artist, image_url=image_url)
372
373 async def _update_live_stream_metadata(
374 self, stream_details: StreamDetails, elapsed_time: int
375 ) -> None:
376 """Get the currently playing song for live radio streams."""
377 self.logger.debug("Updating live stream metadata")
378 if not stream_details or not stream_details.stream_metadata:
379 return
380
381 station_id = stream_details.item_id
382 if not station_id:
383 return
384
385 now_playing = await self.client.schedules.currently_playing_song(station_id)
386 if now_playing:
387 self.logger.debug(f"Now playing for {station_id}: {now_playing}")
388 stream_details.stream_metadata = self.now_playing_to_stream_metadata(now_playing)
389 else:
390 self.logger.debug(f"No song playing on {station_id}, fetching station info")
391 station = await self.client.stations.get_station(station_id)
392 if station:
393 stream_details.stream_metadata = await self._station_programme_display(
394 station=station
395 )
396
397 @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
398 async def _vod_programme_display(self, pid: str) -> StreamMetadata | None:
399 episode = await self.client.streaming.get_by_pid(pid=pid, stream_format=self.stream_format)
400 if isinstance(episode, (SoundsPodcastEpisode, RadioShow)):
401 if episode and episode.titles:
402 return StreamMetadata(title=episode.titles.get("secondary", ""))
403 return None
404
405 @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
406 async def _station_programme_display(self, station: LiveStation) -> StreamMetadata | None:
407 if station and station.titles:
408 title = f"{station.titles.get('secondary')} • {station.titles.get('primary')}"
409 return StreamMetadata(title=title, artist=None, image_url=station.image_url)
410 return None
411
412 async def _station_list(self, include_local: bool = False) -> list[Radio]:
413 """Get list of stations as Radios."""
414 radio_list: list[Radio] = []
415 for station in await self.client.stations.get_stations(include_local=include_local):
416 if station and station.item_id:
417 station_info = await self._station_programme_display(station=station)
418 description = station_info.title if station_info else None
419 radio_list.append(
420 Radio(
421 item_id=station.item_id,
422 name=(
423 station.network.short_title
424 if station.network and station.network.short_title
425 else "Unknown station"
426 ),
427 provider=self.domain,
428 metadata=MediaItemMetadata(
429 description=description,
430 images=(
431 UniqueList(
432 [
433 MediaItemImage(
434 type=ImageType.THUMB,
435 provider=self.domain,
436 path=station.network.logo_url,
437 remotely_accessible=True,
438 ),
439 ]
440 )
441 if station.network and station.network.logo_url
442 else None
443 ),
444 ),
445 provider_mappings={
446 ProviderMapping(
447 item_id=station.item_id,
448 provider_domain=self.domain,
449 provider_instance=self.instance_id,
450 )
451 },
452 )
453 )
454 return radio_list
455
456 async def _get_category(
457 self, category_name: str
458 ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
459 category = await self.client.streaming.get_category(category=category_name)
460
461 if category is not None and category.sub_items:
462 return [
463 obj
464 for obj in [await self._render_browse_item(item) for item in category.sub_items]
465 if obj is not None
466 ]
467 return []
468
469 async def _get_collection(
470 self, pid: str
471 ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
472 collection = await self.client.streaming.get_collection(pid=pid)
473 if collection and collection.sub_items:
474 return [
475 obj
476 for obj in [
477 await self._render_browse_item(item) for item in collection.sub_items if item
478 ]
479 if obj
480 ]
481 return []
482
483 async def _get_menu(
484 self, path_parts: list[str] | None = None
485 ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
486 if self.client.auth.is_logged_in and await self.client.auth.is_uk_listener:
487 return await self._get_full_menu(path_parts=path_parts)
488 return await self._get_slim_menu(path_parts=path_parts)
489
490 async def _get_full_menu(
491 self, path_parts: list[str] | None = None
492 ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
493 if not self.menu:
494 await self._fetch_menu()
495 if not self.menu or not self.menu.sub_items:
496 raise MusicAssistantError("Menu API response is empty or invalid")
497 menu_items = []
498 for item in self.menu.sub_items:
499 new_item = await self._render_browse_item(item, path_parts)
500 if isinstance(new_item, (MediaItemType | ItemMapping | BrowseFolder)):
501 menu_items.append(new_item)
502
503 # The Sounds default menu doesn't include listings as they are linked elsewhere
504 menu_items.insert(
505 1,
506 BrowseFolder(
507 item_id="stations",
508 provider=self.domain,
509 name="Schedule and Programmes",
510 path=f"{self.domain}://stations",
511 image=MediaItemImage(
512 path="https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid/latest.png",
513 remotely_accessible=True,
514 provider=self.domain,
515 type=ImageType.THUMB,
516 ),
517 ),
518 )
519 return menu_items
520
521 async def _get_slim_menu(
522 self, path_parts: list[str] | None
523 ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
524 return [
525 BrowseFolder(
526 item_id="listen_live",
527 provider=self.domain,
528 name="Listen Live",
529 path=f"{self.domain}://listen_live",
530 image=MediaItemImage(
531 path="https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid/listen_live.png",
532 remotely_accessible=True,
533 provider=self.domain,
534 type=ImageType.THUMB,
535 ),
536 ),
537 BrowseFolder(
538 item_id="stations",
539 provider=self.domain,
540 name="Schedules and Programmes",
541 path=f"{self.domain}://stations",
542 image=MediaItemImage(
543 path="https://cdn.jsdelivr.net/gh/kieranhogg/auntie-sounds@main/src/sounds/icons/solid/latest.png",
544 remotely_accessible=True,
545 provider=self.domain,
546 type=ImageType.THUMB,
547 ),
548 ),
549 ]
550
551 async def _render_browse_item(
552 self,
553 item: SoundsTypes,
554 path_parts: list[str] | None = None,
555 ) -> BrowseFolder | Track | Podcast | PodcastEpisode | RecommendationFolder | Radio | None:
556 new_item = await self.adaptor.new_object(item, path_parts=path_parts)
557 if isinstance(
558 new_item,
559 (BrowseFolder | Track | Podcast | PodcastEpisode | RecommendationFolder | Radio),
560 ):
561 return new_item
562 return None
563
564 async def _get_subpath_menu(
565 self, sub_path: str
566 ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
567 item_list: list[MediaItemType | ItemMapping | BrowseFolder] = []
568 if self.client.auth.is_logged_in:
569 if not self.menu:
570 return item_list
571 sub_menu = self.menu.get(sub_path)
572
573 if sub_menu and sub_path != "listen_live" and isinstance(sub_menu, Container):
574 if sub_menu.sub_items:
575 # We have some sub-items, so let's show those
576 for item in sub_menu.sub_items:
577 new_item = await self._render_browse_item(item)
578 if new_item:
579 item_list.append(new_item)
580 else:
581 new_item = await self._render_browse_item(sub_menu)
582 if new_item:
583 item_list.append(new_item)
584
585 if sub_path == "listen_live":
586 for item in await self.client.stations.get_stations():
587 new_item = await self._render_browse_item(item)
588 if new_item:
589 item_list.append(new_item)
590 # Check if we need to append local stations
591 if self.show_local_stations:
592 for item in await self.client.stations.get_local_stations():
593 new_item = await self._render_browse_item(item)
594 if new_item is not None:
595 item_list.append(new_item)
596 return item_list
597
598 async def _get_station_schedule_menu(
599 self,
600 show_local: bool,
601 path_parts: list[str],
602 sub_sub_path: str,
603 sub_sub_sub_path: str,
604 ) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
605 if sub_sub_sub_path:
606 # Lookup a date schedule
607 self.logger.debug(
608 await self.client.schedules.get_schedule(
609 station_id=sub_sub_path,
610 date=sub_sub_sub_path,
611 )
612 )
613 schedule = await self.client.schedules.get_schedule(
614 station_id=sub_sub_path,
615 date=sub_sub_sub_path,
616 )
617 items = []
618 if schedule and schedule.sub_items:
619 for folder in schedule.sub_items:
620 new_folder = await self._render_browse_item(folder, path_parts=path_parts)
621 if new_folder:
622 items.append(new_folder)
623 return items
624 if sub_sub_path:
625 # Date listings for a station
626 date_folders = [
627 BrowseFolder(
628 item_id="today",
629 name="Today",
630 provider=self.domain,
631 path="/".join([*path_parts, dt.now().strftime("%Y-%m-%d")]),
632 ),
633 BrowseFolder(
634 item_id="yesterday",
635 name="Yesterday",
636 provider=self.domain,
637 path="/".join(
638 [
639 *path_parts,
640 (dt.now() - timedelta(days=1)).strftime("%Y-%m-%d"),
641 ]
642 ),
643 ),
644 ]
645 # Maximum is 30 days prior
646 for diff in range(28):
647 this_date = dt.now() - timedelta(days=2 + diff)
648 date_string = this_date.strftime("%Y-%m-%d")
649 date_folders.extend(
650 [
651 BrowseFolder(
652 item_id=date_string,
653 name=date_string,
654 provider=self.domain,
655 path="/".join([*path_parts, date_string]),
656 )
657 ]
658 )
659 return date_folders
660 return [
661 BrowseFolder(
662 item_id=station.item_id,
663 provider=self.domain,
664 name=station.name,
665 path="/".join([*path_parts, station.item_id]),
666 image=(
667 MediaItemImage(
668 type=ImageType.THUMB,
669 path=station.metadata.images[0].path,
670 provider=self.domain,
671 )
672 if station.metadata.images
673 else None
674 ),
675 )
676 for station in await self._station_list(include_local=show_local)
677 ]
678
679 async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
680 """Browse this provider's items.
681
682 :param path: The path to browse, (e.g. provider_id://artists).
683 """
684 self.logger.debug(f"Browsing path: {path}")
685 if not path.startswith(f"{self.domain}://"):
686 raise MusicAssistantError(f"Invalid path for {self.domain} provider: {path}")
687 path_parts = path.split("://", 1)[1].split("/")
688 self.logger.debug(f"Path parts: {path_parts}")
689 sub_path = path_parts[0] if path_parts else ""
690 sub_sub_path = path_parts[1] if len(path_parts) > 1 else ""
691 sub_sub_sub_path = path_parts[2] if len(path_parts) > 2 else ""
692 path_parts = [
693 f"{self.domain}:/",
694 *[part for part in path_parts if len(part) > 0],
695 ]
696
697 if sub_path == "":
698 return await self._get_menu()
699 if sub_path == "categories" and sub_sub_path:
700 return await self._get_category(sub_sub_path)
701 if sub_path == "collections" and sub_sub_path:
702 return await self._get_collection(sub_sub_path)
703 if sub_path != "stations":
704 return await self._get_subpath_menu(sub_path)
705 if sub_path == "stations":
706 return await self._get_station_schedule_menu(
707 self.show_local_stations, path_parts, sub_sub_path, sub_sub_sub_path
708 )
709 return []
710
711 async def search(
712 self, search_query: str, media_types: list[MediaType] | None, limit: int = 5
713 ) -> SearchResults:
714 """Perform search for BBC Sounds stations."""
715 results = SearchResults()
716 search_result = await self.client.streaming.search(search_query)
717 self.logger.debug(search_result)
718 if media_types is None or MediaType.RADIO in media_types:
719 radios = [await self.adaptor.new_object(radio) for radio in search_result.stations]
720 results.radio = [radio for radio in radios if isinstance(radio, Radio)]
721 if (
722 media_types is None
723 or MediaType.TRACK in media_types
724 or MediaType.PODCAST_EPISODE in media_types
725 ):
726 episodes = [await self.adaptor.new_object(track) for track in search_result.episodes]
727 results.tracks = [track for track in episodes if type(track) is Track]
728
729 if media_types is None or MediaType.PODCAST in media_types:
730 podcasts = [await self.adaptor.new_object(show) for show in search_result.shows]
731 results.podcasts = [podcast for podcast in podcasts if isinstance(podcast, Podcast)]
732
733 return results
734
735 @use_cache(expiration=_Constants.DEFAULT_EXPIRATION)
736 async def get_podcast(self, prov_podcast_id: str) -> Podcast:
737 """Get full podcast details by id."""
738 self.logger.debug(f"Getting podcast for {prov_podcast_id}")
739 podcast = await self.client.streaming.get_podcast(pid=prov_podcast_id)
740 ma_podcast = await self.adaptor.new_object(source_obj=podcast, force_type=Podcast)
741
742 if isinstance(ma_podcast, Podcast):
743 return ma_podcast
744 raise MusicAssistantError("Incorrect format for podcast")
745
746 async def get_podcast_episodes(
747 self,
748 prov_podcast_id: str,
749 ) -> AsyncGenerator[PodcastEpisode, None]:
750 """Get all PodcastEpisodes for given podcast id."""
751 podcast_episodes = await self.client.streaming.get_podcast_episodes(prov_podcast_id)
752
753 if podcast_episodes:
754 for episode in podcast_episodes:
755 this_episode = await self.adaptor.new_object(
756 source_obj=episode, force_type=PodcastEpisode
757 )
758 if this_episode and isinstance(this_episode, PodcastEpisode):
759 yield this_episode
760
761 @use_cache(expiration=_Constants.SHORT_EXPIRATION)
762 async def recommendations(self) -> list[RecommendationFolder]:
763 """Get available recommendations."""
764 folders = []
765
766 if self.client.auth.is_logged_in:
767 recommendations = await self.client.personal.get_experience_menu(
768 recommendations=MenuRecommendationOptions.ONLY
769 )
770 self.logger.debug("Getting recommendations from API")
771 if recommendations.sub_items:
772 for recommendation in recommendations.sub_items:
773 # recommendation is a RecommendedMenuItem
774 folder = await self.adaptor.new_object(
775 recommendation, force_type=RecommendationFolder
776 )
777 if isinstance(folder, RecommendationFolder):
778 folders.append(folder)
779 return folders
780 return []
781
782 async def get_radio(self, prov_radio_id: str) -> Radio:
783 """Get full radio details by id."""
784 self.logger.debug(f"Getting radio for {prov_radio_id}")
785 station = await self.client.stations.get_station(prov_radio_id, include_stream=True)
786 if station:
787 ma_radio = await self.adaptor.new_object(station, force_type=Radio)
788 if ma_radio and isinstance(ma_radio, Radio):
789 return ma_radio
790 else:
791 raise MusicAssistantError(f"No station found: {prov_radio_id}")
792
793 self.logger.debug(f"{station} {ma_radio} {type(ma_radio)}")
794 raise MusicAssistantError("No valid radio stream found")
795
796 async def on_played(
797 self,
798 media_type: MediaType,
799 prov_item_id: str,
800 fully_played: bool,
801 position: int,
802 media_item: MediaItemType,
803 is_playing: bool = False,
804 ) -> None:
805 """Handle callback when a (playable) media item has been played."""
806 if media_type != MediaType.RADIO:
807 # Handle Sounds API play status updates
808 action = None
809
810 if is_playing:
811 action = PlayStatus.STARTED if position < 30 else PlayStatus.HEARTBEAT
812 elif fully_played:
813 action = PlayStatus.ENDED
814 else:
815 action = PlayStatus.PAUSED
816
817 if action:
818 try:
819 success = await self.client.streaming.update_play_status(
820 pid=media_item.item_id, elapsed_time=position, action=action
821 )
822 self.logger.debug(f"Updated play status: {success}")
823 except exceptions.APIResponseError as err:
824 self.logger.error(f"Error updating play status: {err}")
825