/
/
/
1"""HEOS Player Provider implementation."""
2
3from __future__ import annotations
4
5import logging
6
7from music_assistant_models.errors import SetupFailedError
8from music_assistant_models.player import PlayerSource
9from pyheos import Heos, HeosError, HeosOptions, MediaItem, PlayerUpdateResult, const
10
11from music_assistant.constants import CONF_ENABLED, CONF_IP_ADDRESS, VERBOSE_LOG_LEVEL
12from music_assistant.models.player_provider import PlayerProvider
13from music_assistant.providers.heos.constants import HEOS_PASSIVE_SOURCES
14
15from .player import HeosPlayer
16
17
18class HeosPlayerProvider(PlayerProvider):
19 """Player provided for Denon HEOS."""
20
21 _heos: Heos
22 _music_source_list: list[PlayerSource] = []
23 _input_source_list: list[MediaItem] = []
24 _discovery_running: bool = False
25
26 async def handle_async_init(self) -> None:
27 """Handle async initialization of the provider."""
28 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
29 logging.getLogger("pyheos").setLevel(logging.DEBUG)
30 else:
31 logging.getLogger("pyheos").setLevel(self.logger.level + 10)
32
33 self._heos = Heos(
34 HeosOptions(
35 str(self.config.get_value(CONF_IP_ADDRESS)),
36 auto_reconnect=True,
37 )
38 )
39
40 try:
41 await self._heos.connect()
42
43 self._heos.add_on_controller_event(self._handle_controller_event)
44 except HeosError as e:
45 self.logger.error(f"Failed to connect to HEOS controller: {e}")
46 raise SetupFailedError("Failed to connect to HEOS controller") from e
47
48 # Initialize library values
49 try:
50 # Populate source lists
51 await self._populate_sources()
52 # NOTE: players are discovered via discovery method (called automatically by core)
53 except HeosError as e:
54 self.logger.error(f"Unexpected error setting up HEOS controller: {e}")
55 raise SetupFailedError("Unexpected error setting up HEOS controller") from e
56
57 async def _handle_controller_event(
58 self, event: str, result: PlayerUpdateResult | None = None
59 ) -> None:
60 self.logger.debug("Controller event received: %s", event)
61
62 if event == const.EVENT_GROUPS_CHANGED:
63 for player in self.mass.players.all(provider_filter=self.instance_id):
64 assert isinstance(player, HeosPlayer) # for type checking
65 await player.build_group_list()
66
67 if event == const.EVENT_PLAYERS_CHANGED:
68 if result is None:
69 return
70
71 for removed_player_id in result.removed_player_ids:
72 await self.mass.players.unregister(str(removed_player_id))
73
74 for new_player_id in result.added_player_ids:
75 try:
76 device = await self._heos.get_player_info(new_player_id)
77 heos_player = HeosPlayer(self, device)
78
79 await heos_player.setup()
80 except HeosError as e:
81 self.logger.error(
82 "Error adding new HEOS player with id %s: %s", new_player_id, e
83 )
84 continue
85
86 async def _populate_sources(self) -> None:
87 """Build source list based on data from controller."""
88 self._input_source_list = list(await self._heos.get_input_sources())
89
90 music_sources = await self._heos.get_music_sources()
91 for source_id, source in music_sources.items():
92 self._music_source_list.append(
93 PlayerSource(
94 id=str(source_id),
95 name=source.name,
96 passive=source_id in HEOS_PASSIVE_SOURCES or not source.available,
97 can_play_pause=True, # All sources support play/pause
98 can_next_previous=source_id == 1024, # TODO: properly check
99 )
100 )
101
102 @property
103 def music_source_list(self) -> list[PlayerSource]:
104 """Get mapped music source list from controller info."""
105 return self._music_source_list
106
107 @property
108 def input_source_list(self) -> list[MediaItem]:
109 """Get input list from controller info. This represents all inputs across all players."""
110 return self._input_source_list
111
112 async def unload(self, is_removed: bool = False) -> None:
113 """Handle unload/close of the provider."""
114 self._heos.dispatcher.disconnect_all() # Remove all event connections
115 await self._heos.disconnect()
116
117 for player in self.players:
118 self.logger.debug("Unloading player %s", player.name)
119 await self.mass.players.unregister(player.player_id)
120
121 async def discover_players(self) -> None:
122 """Discover players for this provider."""
123 if self._discovery_running:
124 return # discovery already running
125 try:
126 self._discovery_running = True
127 self.logger.debug("Discovering HEOS players")
128 devices = await self._heos.get_players()
129 already_registered = {p.player_id for p in self.players}
130 for device in devices.values():
131 player_id = str(device.player_id)
132 if player_id in already_registered:
133 continue # already registered
134 # ignore disabled players in discovery
135 player_enabled = self.mass.config.get_raw_player_config_value(
136 player_id, CONF_ENABLED, default=True
137 )
138 if not player_enabled:
139 continue
140 self.logger.info("Discovered new HEOS player: %s (%s)", device.name, player_id)
141
142 heos_player = HeosPlayer(self, device)
143 await heos_player.setup()
144 finally:
145 self._discovery_running = False
146 # reschedule discovery
147 task_id = f"discover_players_{self.instance_id}"
148 self.mass.call_later(600, self.discover_players, task_id=task_id)
149