/
/
/
1"""Tests for player grouping logic (independent of protocols).
2
3This module tests the core grouping behavior including:
4- can_group_with filtering logic
5- Group member inclusion/exclusion
6- Sync leader behavior
7- Group state transitions
8- Cache invalidation
9"""
10
11from __future__ import annotations
12
13from unittest.mock import MagicMock
14
15import pytest
16from music_assistant_models.enums import PlaybackState, PlayerFeature
17
18from music_assistant.controllers.players import PlayerController
19from tests.common import MockPlayer, MockProvider
20
21
22@pytest.fixture
23def mock_mass() -> MagicMock:
24 """Create a mock MusicAssistant instance."""
25 mass = MagicMock()
26 mass.closing = False
27 mass.config = MagicMock()
28 mass.config.get = MagicMock(return_value=[])
29 mass.config.get_raw_player_config_value = MagicMock(return_value="auto")
30 # Return "GLOBAL" for log level config (standard default)
31 mass.config.get_raw_core_config_value = MagicMock(return_value="GLOBAL")
32 mass.config.set = MagicMock()
33 mass.signal_event = MagicMock()
34 mass.get_providers = MagicMock(return_value=[])
35 return mass
36
37
38@pytest.fixture
39def controller(mock_mass: MagicMock) -> PlayerController:
40 """Create a PlayerController instance."""
41 return PlayerController(mock_mass)
42
43
44class TestCanGroupWithBasics:
45 """Test basic can_group_with filtering logic."""
46
47 def test_ungrouped_players_can_group(self, mock_mass: MagicMock) -> None:
48 """Test that two ungrouped players can group with each other."""
49 controller = PlayerController(mock_mass)
50 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
51
52 player_a = MockPlayer(provider, "player_a", "Player A")
53 player_a._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
54 # Use explicit player IDs instead of provider instance ID for simpler test
55 player_a._attr_can_group_with = {"player_b"}
56
57 player_b = MockPlayer(provider, "player_b", "Player B")
58 player_b._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
59 player_b._attr_can_group_with = {"player_a"}
60
61 controller._players = {"player_a": player_a, "player_b": player_b}
62 mock_mass.players = controller
63
64 # Trigger state calculation
65 player_a.update_state(signal_event=False)
66 player_b.update_state(signal_event=False)
67
68 # Both players should be able to group with each other
69 assert "player_b" in player_a.state.can_group_with
70 assert "player_a" in player_b.state.can_group_with
71
72 def test_unavailable_players_excluded(self, mock_mass: MagicMock) -> None:
73 """Test that unavailable players are excluded from can_group_with."""
74 controller = PlayerController(mock_mass)
75 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
76
77 player_a = MockPlayer(provider, "player_a", "Player A")
78 player_a._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
79 player_a._attr_can_group_with = {"player_b"}
80
81 player_b = MockPlayer(provider, "player_b", "Player B")
82 player_b._attr_available = False # Mark as unavailable
83
84 controller._players = {"player_a": player_a, "player_b": player_b}
85 mock_mass.players = controller
86
87 # Trigger state calculation
88 player_a.update_state(signal_event=False)
89 player_b.update_state(signal_event=False)
90
91 # Unavailable player should be excluded
92 assert "player_b" not in player_a.state.can_group_with
93
94 def test_playing_players_with_different_source_excluded(self, mock_mass: MagicMock) -> None:
95 """Test that players playing different sources are NOT excluded (behavior changed).
96
97 Note: Previously, players with different active sources were excluded from grouping,
98 but this was removed as it was difficult to track reliably.
99 """
100 controller = PlayerController(mock_mass)
101 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
102
103 player_a = MockPlayer(provider, "player_a", "Player A")
104 player_a._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
105 player_a._attr_can_group_with = {"player_b"}
106 player_a._attr_playback_state = PlaybackState.PLAYING
107 player_a._attr_active_source = "player_a"
108
109 player_b = MockPlayer(provider, "player_b", "Player B")
110 player_b._attr_playback_state = PlaybackState.PLAYING
111 player_b._attr_active_source = "player_b" # Different source
112
113 controller._players = {"player_a": player_a, "player_b": player_b}
114 mock_mass.players = controller
115
116 # Trigger state calculation
117 player_a.update_state(signal_event=False)
118 player_b.update_state(signal_event=False)
119
120 # Player with different active source is now ALLOWED (behavior changed)
121 assert "player_b" in player_a.state.can_group_with
122
123
124class TestSyncedPlayers:
125 """Test behavior with synced/grouped players."""
126
127 def test_sync_leader_excludes_itself_from_members_can_group_with(
128 self, mock_mass: MagicMock
129 ) -> None:
130 """Test that sync leader doesn't appear in its members' can_group_with."""
131 controller = PlayerController(mock_mass)
132 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
133
134 leader = MockPlayer(provider, "leader", "Leader")
135 leader._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
136 leader._attr_can_group_with = {"member"}
137 leader._attr_group_members = ["leader", "member"]
138
139 member = MockPlayer(provider, "member", "Member")
140
141 controller._players = {"leader": leader, "member": member}
142 mock_mass.players = controller
143
144 # Trigger synced_to calculation
145 leader.update_state(signal_event=False)
146 member.update_state(signal_event=False)
147
148 # Member is synced, so can_group_with should be empty
149 assert member.state.can_group_with == set()
150
151 def test_group_members_included_in_leader_can_group_with(self, mock_mass: MagicMock) -> None:
152 """
153 Test that group members appear in sync leader's can_group_with.
154
155 This allows ungrouping members from the leader.
156 """
157 controller = PlayerController(mock_mass)
158 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
159
160 leader = MockPlayer(provider, "leader", "Leader")
161 leader._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
162 leader._attr_can_group_with = {"member_a", "member_b"}
163 leader._attr_group_members = ["leader", "member_a", "member_b"]
164
165 member_a = MockPlayer(provider, "member_a", "Member A")
166 member_b = MockPlayer(provider, "member_b", "Member B")
167
168 controller._players = {
169 "leader": leader,
170 "member_a": member_a,
171 "member_b": member_b,
172 }
173 mock_mass.players = controller
174
175 # Trigger synced_to calculation
176 leader.update_state(signal_event=False)
177 member_a.update_state(signal_event=False)
178 member_b.update_state(signal_event=False)
179
180 # Leader should be able to see its own members (for ungrouping)
181 assert "member_a" in leader.state.can_group_with
182 assert "member_b" in leader.state.can_group_with
183
184
185class TestSyncLeaderBehavior:
186 """Test sync leader specific behavior."""
187
188 def test_sync_leader_excluded_from_can_group_with(self, mock_mass: MagicMock) -> None:
189 """Test that players with group members (sync leaders) are excluded."""
190 controller = PlayerController(mock_mass)
191 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
192
193 leader = MockPlayer(provider, "leader", "Leader")
194 leader._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
195 leader._attr_can_group_with = {"member", "other"}
196 leader._attr_group_members = ["leader", "member"]
197 leader._attr_playback_state = PlaybackState.PLAYING # Make it playing so it gets excluded
198
199 member = MockPlayer(provider, "member", "Member")
200
201 other = MockPlayer(provider, "other", "Other")
202 other._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
203 other._attr_can_group_with = {"leader", "member"}
204
205 controller._players = {"leader": leader, "member": member, "other": other}
206 mock_mass.players = controller
207
208 # Trigger synced_to calculation
209 leader.update_state(signal_event=False)
210 member.update_state(signal_event=False)
211 other.update_state(signal_event=False)
212
213 # Leader should NOT appear in other's can_group_with (has group members)
214 assert "leader" not in other.state.can_group_with
215
216
217class TestCircularDependency:
218 """Test that circular dependencies are avoided."""
219
220 def test_no_circular_dependency_in_synced_to(self, mock_mass: MagicMock) -> None:
221 """
222 Test that synced_to calculation doesn't cause circular dependency.
223
224 Regression test for: synced_to calling group_members causing infinite recursion.
225 """
226 controller = PlayerController(mock_mass)
227 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
228
229 leader = MockPlayer(provider, "leader", "Leader")
230 leader._attr_group_members = ["leader", "member"]
231
232 member = MockPlayer(provider, "member", "Member")
233
234 controller._players = {"leader": leader, "member": member}
235 mock_mass.players = controller
236
237 # Mark players as initialized so they are returned by all_players()
238 leader.set_initialized()
239 member.set_initialized()
240
241 # Trigger synced_to calculation via update_state
242 leader.update_state(signal_event=False)
243 member.update_state(signal_event=False)
244
245 # This should not cause infinite recursion
246 assert member.state.synced_to == "leader"
247 assert leader.state.synced_to is None
248
249
250class TestCacheInvalidation:
251 """Test that caches are invalidated correctly."""
252
253 def test_can_group_with_cache_cleared_on_update_state(self, mock_mass: MagicMock) -> None:
254 """Test that can_group_with cache is cleared when update_state is called."""
255 controller = PlayerController(mock_mass)
256 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
257
258 player_a = MockPlayer(provider, "player_a", "Player A")
259 player_a._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
260 player_a._attr_can_group_with = {"player_b"}
261
262 player_b = MockPlayer(provider, "player_b", "Player B")
263
264 controller._players = {"player_a": player_a, "player_b": player_b}
265 mock_mass.players = controller
266
267 # Update state after setting attributes and registering with controller
268 player_a.update_state(signal_event=False)
269 player_b.update_state(signal_event=False)
270
271 # Get can_group_with to populate cache
272 initial = player_a.state.can_group_with
273 assert "player_b" in initial
274
275 # Modify underlying data
276 player_a._attr_can_group_with = set()
277
278 # Cache should still have old value
279 assert player_a.state.can_group_with == initial
280
281 # Clear cache via update_state
282 player_a.update_state(signal_event=False)
283
284 # Cache should be cleared, new value should be returned
285 assert player_a.state.can_group_with == set()
286
287
288class TestProviderInstanceIdExpansion:
289 """Test expansion of provider instance IDs in can_group_with."""
290
291 def test_provider_instance_id_expands_to_all_players(self, mock_mass: MagicMock) -> None:
292 """Test that provider instance IDs expand to all available players from that provider."""
293 controller = PlayerController(mock_mass)
294 provider = MockProvider("test_provider", instance_id="test", mass=mock_mass)
295
296 player_a = MockPlayer(provider, "player_a", "Player A")
297 player_a._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
298 player_a._attr_can_group_with = {"test"} # Provider instance ID
299
300 player_b = MockPlayer(provider, "player_b", "Player B")
301 player_c = MockPlayer(provider, "player_c", "Player C")
302
303 controller._players = {
304 "player_a": player_a,
305 "player_b": player_b,
306 "player_c": player_c,
307 }
308 mock_mass.players = controller
309 # Set up get_provider to return the provider for instance ID
310 mock_mass.get_provider = MagicMock(return_value=provider)
311
312 # Mark players as initialized so they are returned by all_players()
313 player_a.set_initialized()
314 player_b.set_initialized()
315 player_c.set_initialized()
316
317 # Trigger state calculation
318 player_a.update_state(signal_event=False)
319 player_b.update_state(signal_event=False)
320 player_c.update_state(signal_event=False)
321
322 # Provider instance ID should expand to include all players from that provider
323 can_group = player_a.state.can_group_with
324 assert "player_b" in can_group
325 assert "player_c" in can_group
326
327
328if __name__ == "__main__":
329 pytest.main([__file__, "-v"])
330