/
/
/
1"""Tests for PlayerController high-level operations.
2
3This module tests:
4- cmd_set_members validation and execution
5- Group/ungroup commands
6- Player state management
7- Cache invalidation after grouping operations
8"""
9
10from __future__ import annotations
11
12import asyncio
13import contextlib
14from unittest.mock import MagicMock
15
16import pytest
17from music_assistant_models.enums import PlayerFeature, PlayerType
18from music_assistant_models.errors import UnsupportedFeaturedException
19from music_assistant_models.player import OutputProtocol
20
21from music_assistant.controllers.players import PlayerController
22from music_assistant.helpers.throttle_retry import Throttler
23from tests.common import MockPlayer, MockProvider
24
25
26@pytest.fixture
27def mock_mass() -> MagicMock:
28 """Create a mock MusicAssistant instance."""
29 mass = MagicMock()
30 mass.closing = False
31 mass.loop = None
32 mass.config = MagicMock()
33 mass.config.get = MagicMock(return_value=[])
34 mass.config.get_raw_player_config_value = MagicMock(return_value="auto")
35 # Return "GLOBAL" for log level config (standard default)
36 mass.config.get_raw_core_config_value = MagicMock(return_value="GLOBAL")
37 mass.config.set = MagicMock()
38 mass.signal_event = MagicMock()
39 mass.get_providers = MagicMock(return_value=[])
40 return mass
41
42
43@pytest.fixture
44def controller(mock_mass: MagicMock) -> PlayerController:
45 """Create a PlayerController instance."""
46 return PlayerController(mock_mass)
47
48
49class TestSetMembersValidation:
50 """Test cmd_set_members validation logic."""
51
52 def test_set_members_requires_feature(self, mock_mass: MagicMock) -> None:
53 """Test that set_members requires SET_MEMBERS feature."""
54 controller = PlayerController(mock_mass)
55 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
56
57 leader = MockPlayer(provider, "leader", "Leader")
58 # Note: NOT adding SET_MEMBERS feature
59
60 member = MockPlayer(provider, "member", "Member")
61
62 controller._players = {"leader": leader, "member": member}
63 controller._player_throttlers = {
64 "leader": Throttler(1, 0.05),
65 "member": Throttler(1, 0.05),
66 }
67 mock_mass.players = controller
68
69 # Should raise exception because leader doesn't support SET_MEMBERS
70 with pytest.raises(UnsupportedFeaturedException):
71 asyncio.run(controller.cmd_set_members("leader", player_ids_to_add=["member"]))
72
73 def test_cannot_group_incompatible_players(self, mock_mass: MagicMock) -> None:
74 """Test that incompatible players cannot be grouped."""
75 controller = PlayerController(mock_mass)
76 provider_a = MockProvider("provider_a", instance_id="provider_a", mass=mock_mass)
77 provider_b = MockProvider("provider_b", instance_id="provider_b", mass=mock_mass)
78
79 player_a = MockPlayer(provider_a, "player_a", "Player A")
80 player_a._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
81 player_a._attr_can_group_with = {"provider_a"} # Only same provider
82
83 player_b = MockPlayer(provider_b, "player_b", "Player B")
84
85 controller._players = {"player_a": player_a, "player_b": player_b}
86 controller._player_throttlers = {
87 "player_a": Throttler(1, 0.05),
88 "player_b": Throttler(1, 0.05),
89 }
90 mock_mass.players = controller
91
92 # Should raise exception because players are incompatible
93 with pytest.raises(UnsupportedFeaturedException):
94 asyncio.run(controller.cmd_set_members("player_a", player_ids_to_add=["player_b"]))
95
96
97class TestCacheInvalidationAfterGrouping:
98 """Test that caches are invalidated after grouping operations."""
99
100 async def test_all_players_cache_cleared_after_set_members(self, mock_mass: MagicMock) -> None:
101 """
102 Test that all players' caches are cleared after set_members.
103
104 Regression test for: Stale can_group_with cache after grouping changes.
105 """
106 controller = PlayerController(mock_mass)
107 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
108
109 leader = MockPlayer(provider, "leader", "Leader")
110 leader._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
111 leader._attr_can_group_with = {"test"}
112 leader._attr_group_members = []
113
114 member = MockPlayer(provider, "member", "Member")
115
116 other = MockPlayer(provider, "other", "Other")
117 other._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
118 other._attr_can_group_with = {"test"}
119
120 controller._players = {"leader": leader, "member": member, "other": other}
121 controller._player_throttlers = {
122 "leader": Throttler(1, 0.05),
123 "member": Throttler(1, 0.05),
124 "other": Throttler(1, 0.05),
125 }
126 mock_mass.players = controller
127
128 # Populate caches
129 _ = leader.state.can_group_with
130 _ = other.state.can_group_with
131
132 # Simulate grouping (normally done by provider's set_members implementation)
133 leader._attr_group_members = ["leader", "member"]
134
135 # Call set_members to trigger cache invalidation
136 await controller._handle_set_members_with_protocols(
137 leader, player_ids_to_add=["member"], player_ids_to_remove=[]
138 )
139
140 # Note: The actual cache clearing happens via trigger_player_update
141 # which schedules update_state to be called later
142 # In a real scenario, this would clear all players' caches
143
144
145class TestGroupUngroup:
146 """Test group and ungroup commands."""
147
148 async def test_group_command(self, mock_mass: MagicMock) -> None:
149 """Test the group command (cmd_group)."""
150 controller = PlayerController(mock_mass)
151 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
152
153 leader = MockPlayer(provider, "leader", "Leader")
154 leader._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
155 leader._attr_can_group_with = {"member"} # Leader can group with member
156
157 member = MockPlayer(provider, "member", "Member")
158 # Make sure member is already powered on to skip power handling
159 member._attr_powered = True
160
161 controller._players = {"leader": leader, "member": member}
162 controller._player_throttlers = {
163 "leader": Throttler(1, 0.05),
164 "member": Throttler(1, 0.05),
165 }
166 mock_mass.players = controller
167
168 # Update state after modifying attributes and registering with controller
169 leader.update_state(signal_event=False)
170 member.update_state(signal_event=False)
171
172 # Track if set_members was called
173 set_members_called = False
174 original_set_members = leader.set_members
175
176 async def mock_set_members(
177 player_ids_to_add: list[str] | None = None,
178 player_ids_to_remove: list[str] | None = None,
179 ) -> None:
180 nonlocal set_members_called
181 set_members_called = True
182 # Call the original to update group_members
183 await original_set_members(player_ids_to_add, player_ids_to_remove)
184
185 leader.set_members = mock_set_members # type: ignore[method-assign]
186
187 # Mock power handling to skip power control (focus is on grouping logic)
188 async def mock_handle_cmd_power(player_id: str, powered: bool) -> None:
189 pass
190
191 controller._handle_cmd_power = mock_handle_cmd_power # type: ignore[method-assign]
192
193 # Execute group command
194 await controller.cmd_group("member", "leader")
195
196 # Verify set_members was called
197 assert set_members_called
198 # Verify member was added to leader's group
199 assert "member" in leader._attr_group_members
200
201
202class TestPlayerAvailability:
203 """Test player availability checks in grouping."""
204
205 def test_unavailable_player_rejected(self, mock_mass: MagicMock) -> None:
206 """Test that unavailable players are rejected when grouping."""
207 controller = PlayerController(mock_mass)
208 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
209
210 leader = MockPlayer(provider, "leader", "Leader")
211 leader._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
212 leader._attr_can_group_with = {"test"}
213
214 member = MockPlayer(provider, "member", "Member")
215 member._attr_available = False # Mark as unavailable
216
217 controller._players = {"leader": leader, "member": member}
218 controller._player_throttlers = {
219 "leader": Throttler(1, 0.05),
220 "member": Throttler(1, 0.05),
221 }
222 mock_mass.players = controller
223
224 # Attempting to group with unavailable player should be handled
225 # (either silently ignored or raise exception depending on implementation)
226 # This should either skip the unavailable player or raise an exception
227 with contextlib.suppress(Exception):
228 asyncio.run(controller.cmd_set_members("leader", player_ids_to_add=["member"]))
229
230
231class TestSelectOutputProtocol:
232 """Test select_output_protocol method."""
233
234 def test_select_native_playback_when_no_linked_protocols(self, mock_mass: MagicMock) -> None:
235 """Test that native playback is selected when player has no linked protocols."""
236 controller = PlayerController(mock_mass)
237 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
238
239 player = MockPlayer(provider, "test_player", "Test Player")
240 player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
241
242 controller._players = {"test_player": player}
243 controller._player_throttlers = {"test_player": Throttler(1, 0.05)}
244 mock_mass.players = controller
245
246 player.update_state(signal_event=False)
247
248 # Execute select_output_protocol
249 target_player = controller.select_output_protocol("test_player")
250
251 # Should return the same player (native playback)
252 assert target_player.player_id == player.player_id
253 # Active protocol should be set to "native"
254 assert player.active_output_protocol == "native"
255
256 def test_select_preferred_protocol(self, mock_mass: MagicMock) -> None:
257 """Test that preferred protocol is selected when configured."""
258 controller = PlayerController(mock_mass)
259 provider = MockProvider("sonos", instance_id="sonos_instance", mass=mock_mass)
260 airplay_provider = MockProvider("airplay", instance_id="airplay_instance", mass=mock_mass)
261
262 # Create main player (e.g., Sonos)
263 main_player = MockPlayer(provider, "sonos_player", "Sonos Speaker")
264 main_player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
265
266 # Create protocol player (e.g., AirPlay)
267 protocol_player = MockPlayer(
268 airplay_provider, "airplay_player", "Sonos via AirPlay", PlayerType.PROTOCOL
269 )
270 protocol_player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
271 protocol_player._attr_available = True
272
273 controller._players = {
274 "sonos_player": main_player,
275 "airplay_player": protocol_player,
276 }
277 controller._player_throttlers = {
278 "sonos_player": Throttler(1, 0.05),
279 "airplay_player": Throttler(1, 0.05),
280 }
281 mock_mass.players = controller
282
283 # Set up linked protocols on main player
284 linked_protocol = OutputProtocol(
285 output_protocol_id="airplay_player",
286 name="AirPlay",
287 protocol_domain="airplay",
288 is_native=False,
289 priority=10,
290 available=True,
291 )
292 main_player.set_linked_output_protocols([linked_protocol])
293
294 main_player.update_state(signal_event=False)
295 protocol_player.update_state(signal_event=False)
296
297 # Configure preferred protocol to be airplay
298 mock_mass.config.get_raw_player_config_value = MagicMock(return_value="airplay_player")
299
300 # Execute select_output_protocol
301 target_player = controller.select_output_protocol("sonos_player")
302
303 # Should return the protocol player
304 assert target_player.player_id == "airplay_player"
305 # Active protocol should be set to the protocol player id
306 assert main_player.active_output_protocol == "airplay_player"
307
308 def test_select_protocol_sets_active_before_flow_mode_check(self, mock_mass: MagicMock) -> None:
309 """
310 Test that selecting protocol sets active_output_protocol before flow_mode is checked.
311
312 This is the core regression test for the timing issue where flow_mode
313 was evaluated before active_output_protocol was set.
314 """
315 controller = PlayerController(mock_mass)
316 provider = MockProvider("sonos", instance_id="sonos_instance", mass=mock_mass)
317 airplay_provider = MockProvider("airplay", instance_id="airplay_instance", mass=mock_mass)
318
319 # Create main player
320 main_player = MockPlayer(provider, "sonos_player", "Sonos Speaker")
321 main_player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
322
323 # Create protocol player that requires flow mode
324 protocol_player = MockPlayer(
325 airplay_provider, "airplay_player", "Sonos via AirPlay", PlayerType.PROTOCOL
326 )
327 protocol_player._attr_supported_features.add(PlayerFeature.PLAY_MEDIA)
328 # AirPlay typically doesn't support enqueue, so flow mode would be needed
329 protocol_player._attr_available = True
330
331 controller._players = {
332 "sonos_player": main_player,
333 "airplay_player": protocol_player,
334 }
335 controller._player_throttlers = {
336 "sonos_player": Throttler(1, 0.05),
337 "airplay_player": Throttler(1, 0.05),
338 }
339 mock_mass.players = controller
340
341 # Set up linked protocols
342 linked_protocol = OutputProtocol(
343 output_protocol_id="airplay_player",
344 name="AirPlay",
345 protocol_domain="airplay",
346 is_native=False,
347 priority=10,
348 available=True,
349 )
350 main_player.set_linked_output_protocols([linked_protocol])
351
352 main_player.update_state(signal_event=False)
353 protocol_player.update_state(signal_event=False)
354
355 # Configure preferred protocol
356 mock_mass.config.get_raw_player_config_value = MagicMock(return_value="airplay_player")
357
358 # Verify active_output_protocol is not set before calling select_output_protocol
359 assert main_player.active_output_protocol is None
360
361 # Execute select_output_protocol
362 controller.select_output_protocol("sonos_player")
363
364 # Active protocol should now be set BEFORE any flow_mode check would occur
365 assert main_player.active_output_protocol == "airplay_player"
366
367 # Now when we check flow_mode, it should correctly consider the protocol player
368 # (This verifies the timing fix - flow_mode now uses the active protocol)
369 _ = main_player.flow_mode # This should not raise and should use protocol's flow_mode
370
371
372if __name__ == "__main__":
373 pytest.main([__file__, "-v"])
374