/
/
/
1"""Tests for webserver authentication and user management."""
2
3import asyncio
4import hashlib
5import logging
6import pathlib
7from collections.abc import AsyncGenerator
8from datetime import timedelta
9from sqlite3 import IntegrityError
10
11import pytest
12from music_assistant_models.auth import AuthProviderType, UserRole
13from music_assistant_models.errors import InvalidDataError
14
15from music_assistant.constants import HOMEASSISTANT_SYSTEM_USER
16from music_assistant.controllers.config import ConfigController
17from music_assistant.controllers.webserver.auth import AuthenticationManager
18from music_assistant.controllers.webserver.controller import WebserverController
19from music_assistant.controllers.webserver.helpers.auth_middleware import (
20 set_current_token,
21 set_current_user,
22)
23from music_assistant.controllers.webserver.helpers.auth_providers import BuiltinLoginProvider
24from music_assistant.helpers.datetime import utc
25from music_assistant.mass import MusicAssistant
26
27
28@pytest.fixture
29async def mass_minimal(tmp_path: pathlib.Path) -> AsyncGenerator[MusicAssistant, None]:
30 """Create a minimal Music Assistant instance for auth testing without starting the webserver.
31
32 :param tmp_path: Temporary directory for test data.
33 """
34 storage_path = tmp_path / "data"
35 cache_path = tmp_path / "cache"
36 storage_path.mkdir(parents=True)
37 cache_path.mkdir(parents=True)
38
39 # Suppress aiosqlite debug logging
40 logging.getLogger("aiosqlite").level = logging.INFO
41
42 mass_instance = MusicAssistant(str(storage_path), str(cache_path))
43
44 # Initialize the minimum required for auth testing
45 mass_instance.loop = asyncio.get_running_loop()
46 # Use id() as fallback since _thread_id is a private attribute that may not exist
47 mass_instance.loop_thread_id = (
48 getattr(mass_instance.loop, "_thread_id", None)
49 if hasattr(mass_instance.loop, "_thread_id")
50 else id(mass_instance.loop)
51 )
52
53 # Create config controller
54 mass_instance.config = ConfigController(mass_instance)
55 await mass_instance.config.setup()
56
57 # Create webserver controller (but don't start the actual server)
58 webserver = WebserverController(mass_instance)
59 mass_instance.webserver = webserver
60
61 # Get webserver config and manually set it (avoids starting the server)
62 webserver_config = await mass_instance.config.get_core_config("webserver")
63 webserver.config = webserver_config
64
65 # Setup auth manager only (not the full webserver with routes/sockets)
66 await webserver.auth.setup()
67
68 try:
69 yield mass_instance
70 finally:
71 # Cleanup
72 await webserver.auth.close()
73 await mass_instance.config.close()
74
75
76@pytest.fixture
77async def auth_manager(mass_minimal: MusicAssistant) -> AuthenticationManager:
78 """Get authentication manager from mass instance.
79
80 :param mass_minimal: Minimal MusicAssistant instance.
81 """
82 return mass_minimal.webserver.auth
83
84
85async def test_auth_manager_initialization(auth_manager: AuthenticationManager) -> None:
86 """Test that the authentication manager initializes correctly.
87
88 :param auth_manager: AuthenticationManager instance.
89 """
90 assert auth_manager is not None
91 assert auth_manager.database is not None
92 assert "builtin" in auth_manager.login_providers
93 assert isinstance(auth_manager.login_providers["builtin"], BuiltinLoginProvider)
94
95
96async def test_has_users_initially_empty(auth_manager: AuthenticationManager) -> None:
97 """Test that has_users returns False when no users exist.
98
99 :param auth_manager: AuthenticationManager instance.
100 """
101 has_users = auth_manager.has_users
102 assert has_users is False
103
104
105async def test_create_user(auth_manager: AuthenticationManager) -> None:
106 """Test creating a new user.
107
108 :param auth_manager: AuthenticationManager instance.
109 """
110 user = await auth_manager.create_user(
111 username="testuser",
112 role=UserRole.USER,
113 display_name="Test User",
114 )
115
116 assert user is not None
117 assert user.username == "testuser"
118 assert user.role == UserRole.USER
119 assert user.display_name == "Test User"
120 assert user.enabled is True
121 assert user.user_id is not None
122
123 # Verify user exists in database
124 has_users = auth_manager.has_users
125 assert has_users is True
126
127
128async def test_get_user(auth_manager: AuthenticationManager) -> None:
129 """Test retrieving a user by ID.
130
131 :param auth_manager: AuthenticationManager instance.
132 """
133 # Create a user first
134 created_user = await auth_manager.create_user(username="getuser", role=UserRole.USER)
135
136 # Set current user for authorization (get_user requires admin role)
137 admin_user = await auth_manager.create_user(username="admin", role=UserRole.ADMIN)
138 set_current_user(admin_user)
139
140 # Retrieve the user
141 retrieved_user = await auth_manager.get_user(created_user.user_id)
142
143 assert retrieved_user is not None
144 assert retrieved_user.user_id == created_user.user_id
145 assert retrieved_user.username == created_user.username
146
147
148async def test_create_user_with_builtin_provider(auth_manager: AuthenticationManager) -> None:
149 """Test creating a user with built-in authentication.
150
151 :param auth_manager: AuthenticationManager instance.
152 """
153 builtin_provider = auth_manager.login_providers.get("builtin")
154 assert builtin_provider is not None
155 assert isinstance(builtin_provider, BuiltinLoginProvider)
156
157 user = await builtin_provider.create_user_with_password(
158 username="testuser2",
159 password="testpassword123",
160 role=UserRole.USER,
161 )
162
163 assert user is not None
164 assert user.username == "testuser2"
165
166
167async def test_authenticate_with_password(auth_manager: AuthenticationManager) -> None:
168 """Test authenticating with username and password.
169
170 :param auth_manager: AuthenticationManager instance.
171 """
172 builtin_provider = auth_manager.login_providers.get("builtin")
173 assert builtin_provider is not None
174 assert isinstance(builtin_provider, BuiltinLoginProvider)
175
176 # Create user with password
177 await builtin_provider.create_user_with_password(
178 username="authtest",
179 password="secure_password_123",
180 role=UserRole.USER,
181 )
182
183 # Test successful authentication
184 result = await auth_manager.authenticate_with_credentials(
185 "builtin",
186 {"username": "authtest", "password": "secure_password_123"},
187 )
188
189 assert result.success is True
190 assert result.user is not None
191 assert result.user.username == "authtest"
192 # Note: Built-in provider doesn't auto-generate access token on login,
193 # that's done by the web login flow. We just verify authentication succeeds.
194
195 # Test failed authentication with wrong password
196 result = await auth_manager.authenticate_with_credentials(
197 "builtin",
198 {"username": "authtest", "password": "wrong_password"},
199 )
200
201 assert result.success is False
202 assert result.user is None
203 assert result.error is not None
204
205
206async def test_create_token(auth_manager: AuthenticationManager) -> None:
207 """Test creating access tokens.
208
209 :param auth_manager: AuthenticationManager instance.
210 """
211 user = await auth_manager.create_user(username="tokenuser", role=UserRole.USER)
212
213 # Create short-lived token
214 short_token = await auth_manager.create_token(user, "Test Device", is_long_lived=False)
215 assert short_token is not None
216 assert len(short_token) > 0
217
218 # Create long-lived token
219 long_token = await auth_manager.create_token(user, "API Key", is_long_lived=True)
220 assert long_token is not None
221 assert len(long_token) > 0
222 assert long_token != short_token
223
224
225async def test_authenticate_with_token(auth_manager: AuthenticationManager) -> None:
226 """Test authenticating with an access token.
227
228 :param auth_manager: AuthenticationManager instance.
229 """
230 user = await auth_manager.create_user(username="tokenauth", role=UserRole.USER)
231 token = await auth_manager.create_token(user, "Test Token", is_long_lived=False)
232
233 # Authenticate with token
234 authenticated_user = await auth_manager.authenticate_with_token(token)
235
236 assert authenticated_user is not None
237 assert authenticated_user.user_id == user.user_id
238 assert authenticated_user.username == user.username
239
240
241async def test_token_expiration(auth_manager: AuthenticationManager) -> None:
242 """Test that expired tokens are rejected.
243
244 :param auth_manager: AuthenticationManager instance.
245 """
246 user = await auth_manager.create_user(username="expireuser", role=UserRole.USER)
247 token = await auth_manager.create_token(user, "Expire Test", is_long_lived=False)
248
249 # Hash the token to look it up
250 token_hash = hashlib.sha256(token.encode()).hexdigest()
251 token_row = await auth_manager.database.get_row("auth_tokens", {"token_hash": token_hash})
252 assert token_row is not None
253
254 # Manually expire the token by setting expires_at in the past
255 past_time = utc() - timedelta(days=1)
256 await auth_manager.database.update(
257 "auth_tokens",
258 {"token_id": token_row["token_id"]},
259 {"expires_at": past_time.isoformat()},
260 )
261
262 # Try to authenticate with expired token
263 authenticated_user = await auth_manager.authenticate_with_token(token)
264 assert authenticated_user is None
265
266
267async def test_update_user_profile(auth_manager: AuthenticationManager) -> None:
268 """Test updating user profile information.
269
270 :param auth_manager: AuthenticationManager instance.
271 """
272 user = await auth_manager.create_user(
273 username="updateuser",
274 role=UserRole.USER,
275 display_name="Original Name",
276 )
277
278 # Update user profile
279 updated_user = await auth_manager.update_user(
280 user,
281 display_name="New Name",
282 avatar_url="https://example.com/avatar.jpg",
283 )
284
285 assert updated_user is not None
286 assert updated_user.display_name == "New Name"
287 assert updated_user.avatar_url == "https://example.com/avatar.jpg"
288 assert updated_user.username == user.username
289
290
291async def test_change_password(auth_manager: AuthenticationManager) -> None:
292 """Test changing user password.
293
294 :param auth_manager: AuthenticationManager instance.
295 """
296 builtin_provider = auth_manager.login_providers.get("builtin")
297 assert builtin_provider is not None
298 assert isinstance(builtin_provider, BuiltinLoginProvider)
299
300 # Create user with password
301 user = await builtin_provider.create_user_with_password(
302 username="pwdchange",
303 password="old_password_123",
304 role=UserRole.USER,
305 )
306
307 # Change password
308 success = await builtin_provider.change_password(
309 user,
310 "old_password_123",
311 "new_password_456",
312 )
313 assert success is True
314
315 # Verify old password no longer works
316 result = await auth_manager.authenticate_with_credentials(
317 "builtin",
318 {"username": "pwdchange", "password": "old_password_123"},
319 )
320 assert result.success is False
321
322 # Verify new password works
323 result = await auth_manager.authenticate_with_credentials(
324 "builtin",
325 {"username": "pwdchange", "password": "new_password_456"},
326 )
327 assert result.success is True
328
329
330async def test_revoke_token(auth_manager: AuthenticationManager) -> None:
331 """Test revoking an access token.
332
333 :param auth_manager: AuthenticationManager instance.
334 """
335 user = await auth_manager.create_user(username="revokeuser", role=UserRole.USER)
336 token = await auth_manager.create_token(user, "Revoke Test", is_long_lived=False)
337
338 # Set current user context for authorization
339 set_current_user(user)
340
341 # Get token_id
342 token_id = await auth_manager.get_token_id_from_token(token)
343 assert token_id is not None
344
345 # Token should work before revocation
346 authenticated_user = await auth_manager.authenticate_with_token(token)
347 assert authenticated_user is not None
348
349 # Revoke the token
350 await auth_manager.revoke_token(token_id)
351
352 # Token should not work after revocation
353 authenticated_user = await auth_manager.authenticate_with_token(token)
354 assert authenticated_user is None
355
356
357async def test_list_users(auth_manager: AuthenticationManager) -> None:
358 """Test listing all users (admin only).
359
360 :param auth_manager: AuthenticationManager instance.
361 """
362 # Create admin user and set as current
363 admin = await auth_manager.create_user(username="listadmin", role=UserRole.ADMIN)
364 set_current_user(admin)
365
366 # Create some test users
367 await auth_manager.create_user(username="user1", role=UserRole.USER)
368 await auth_manager.create_user(username="user2", role=UserRole.USER)
369
370 # List all users
371 users = await auth_manager.list_users()
372
373 # Should not include system users
374 usernames = [u.username for u in users]
375 assert "listadmin" in usernames
376 assert "user1" in usernames
377 assert "user2" in usernames
378
379
380async def test_disable_enable_user(auth_manager: AuthenticationManager) -> None:
381 """Test disabling and enabling user accounts.
382
383 :param auth_manager: AuthenticationManager instance.
384 """
385 # Create admin and regular user
386 admin = await auth_manager.create_user(username="disableadmin", role=UserRole.ADMIN)
387 user = await auth_manager.create_user(username="disableuser", role=UserRole.USER)
388
389 # Set admin as current user
390 set_current_user(admin)
391
392 # Disable the user
393 await auth_manager.disable_user(user.user_id)
394
395 # Verify user is disabled
396 disabled_user = await auth_manager.get_user(user.user_id)
397 assert disabled_user is None # get_user filters out disabled users
398
399 # Enable the user
400 await auth_manager.enable_user(user.user_id)
401
402 # Verify user is enabled
403 enabled_user = await auth_manager.get_user(user.user_id)
404 assert enabled_user is not None
405
406
407async def test_cannot_disable_own_account(auth_manager: AuthenticationManager) -> None:
408 """Test that users cannot disable their own account.
409
410 :param auth_manager: AuthenticationManager instance.
411 """
412 admin = await auth_manager.create_user(username="selfadmin", role=UserRole.ADMIN)
413 set_current_user(admin)
414
415 # Try to disable own account
416 with pytest.raises(InvalidDataError, match="Cannot disable your own account"):
417 await auth_manager.disable_user(admin.user_id)
418
419
420async def test_user_preferences(auth_manager: AuthenticationManager) -> None:
421 """Test updating user preferences.
422
423 :param auth_manager: AuthenticationManager instance.
424 """
425 user = await auth_manager.create_user(username="prefuser", role=UserRole.USER)
426
427 # Update preferences
428 preferences = {"theme": "dark", "language": "en"}
429 updated_user = await auth_manager.update_user_preferences(user, preferences)
430
431 assert updated_user is not None
432 assert updated_user.preferences == preferences
433
434
435async def test_link_user_to_provider(auth_manager: AuthenticationManager) -> None:
436 """Test linking user to authentication provider.
437
438 :param auth_manager: AuthenticationManager instance.
439 """
440 user = await auth_manager.create_user(username="linkuser", role=UserRole.USER)
441
442 # Link to provider
443 link = await auth_manager.link_user_to_provider(
444 user,
445 AuthProviderType.HOME_ASSISTANT,
446 "ha_user_123",
447 )
448
449 assert link is not None
450 assert link.user_id == user.user_id
451 assert link.provider_type == AuthProviderType.HOME_ASSISTANT
452 assert link.provider_user_id == "ha_user_123"
453
454 # Retrieve user by provider link
455 retrieved_user = await auth_manager.get_user_by_provider_link(
456 AuthProviderType.HOME_ASSISTANT,
457 "ha_user_123",
458 )
459
460 assert retrieved_user is not None
461 assert retrieved_user.user_id == user.user_id
462
463
464async def test_homeassistant_system_user(auth_manager: AuthenticationManager) -> None:
465 """Test Home Assistant system user creation.
466
467 :param auth_manager: AuthenticationManager instance.
468 """
469 # Get or create system user
470 system_user = await auth_manager.get_homeassistant_system_user()
471
472 assert system_user is not None
473 assert system_user.username == HOMEASSISTANT_SYSTEM_USER
474 assert system_user.display_name == "Home Assistant Integration"
475 assert system_user.role == UserRole.USER
476
477 # Getting it again should return the same user
478 system_user2 = await auth_manager.get_homeassistant_system_user()
479 assert system_user2.user_id == system_user.user_id
480
481
482async def test_homeassistant_system_user_token(auth_manager: AuthenticationManager) -> None:
483 """Test Home Assistant system user token creation.
484
485 :param auth_manager: AuthenticationManager instance.
486 """
487 # Get or create token
488 token1 = await auth_manager.get_homeassistant_system_user_token()
489 assert token1 is not None
490
491 # Getting it again should create a new token (old one is replaced)
492 token2 = await auth_manager.get_homeassistant_system_user_token()
493 assert token2 is not None
494 assert token2 != token1
495
496 # Old token should not work
497 user1 = await auth_manager.authenticate_with_token(token1)
498 assert user1 is None
499
500 # New token should work
501 user2 = await auth_manager.authenticate_with_token(token2)
502 assert user2 is not None
503
504
505async def test_update_user_role(auth_manager: AuthenticationManager) -> None:
506 """Test updating user role (admin only).
507
508 :param auth_manager: AuthenticationManager instance.
509 """
510 admin = await auth_manager.create_user(username="roleadmin", role=UserRole.ADMIN)
511 user = await auth_manager.create_user(username="roleuser", role=UserRole.USER)
512
513 # Update role
514 success = await auth_manager.update_user_role(user.user_id, UserRole.ADMIN, admin)
515 assert success is True
516
517 # Verify role was updated
518 set_current_user(admin)
519 updated_user = await auth_manager.get_user(user.user_id)
520 assert updated_user is not None
521 assert updated_user.role == UserRole.ADMIN
522
523
524async def test_delete_user(auth_manager: AuthenticationManager) -> None:
525 """Test deleting a user account.
526
527 :param auth_manager: AuthenticationManager instance.
528 """
529 admin = await auth_manager.create_user(username="deleteadmin", role=UserRole.ADMIN)
530 user = await auth_manager.create_user(username="deleteuser", role=UserRole.USER)
531
532 # Set admin as current user
533 set_current_user(admin)
534
535 # Delete the user
536 await auth_manager.delete_user(user.user_id)
537
538 # Verify user is deleted
539 deleted_user = await auth_manager.get_user(user.user_id)
540 assert deleted_user is None
541
542
543async def test_cannot_delete_own_account(auth_manager: AuthenticationManager) -> None:
544 """Test that users cannot delete their own account.
545
546 :param auth_manager: AuthenticationManager instance.
547 """
548 admin = await auth_manager.create_user(username="selfdeleteadmin", role=UserRole.ADMIN)
549 set_current_user(admin)
550
551 # Try to delete own account
552 with pytest.raises(InvalidDataError, match="Cannot delete your own account"):
553 await auth_manager.delete_user(admin.user_id)
554
555
556async def test_get_user_tokens(auth_manager: AuthenticationManager) -> None:
557 """Test getting user's tokens.
558
559 :param auth_manager: AuthenticationManager instance.
560 """
561 user = await auth_manager.create_user(username="tokensuser", role=UserRole.USER)
562 set_current_user(user)
563
564 # Create some tokens
565 await auth_manager.create_token(user, "Device 1", is_long_lived=False)
566 await auth_manager.create_token(user, "Device 2", is_long_lived=True)
567
568 # Get user tokens
569 tokens = await auth_manager.get_user_tokens(user.user_id)
570
571 assert len(tokens) == 2
572 token_names = [t.name for t in tokens]
573 assert "Device 1" in token_names
574 assert "Device 2" in token_names
575
576
577async def test_get_login_providers(auth_manager: AuthenticationManager) -> None:
578 """Test getting available login providers.
579
580 :param auth_manager: AuthenticationManager instance.
581 """
582 providers = await auth_manager.get_login_providers()
583
584 assert len(providers) > 0
585 assert any(p["provider_id"] == "builtin" for p in providers)
586
587
588async def test_create_user_with_api(auth_manager: AuthenticationManager) -> None:
589 """Test creating user via API command.
590
591 :param auth_manager: AuthenticationManager instance.
592 """
593 # Create admin user and set as current
594 admin = await auth_manager.create_user(username="apiadmin", role=UserRole.ADMIN)
595 set_current_user(admin)
596
597 # Create user via API
598 user = await auth_manager.create_user_with_api(
599 username="apiuser",
600 password="password123",
601 role="user",
602 display_name="API User",
603 )
604
605 assert user is not None
606 assert user.username == "apiuser"
607 assert user.role == UserRole.USER
608 assert user.display_name == "API User"
609
610
611async def test_create_user_api_validation(auth_manager: AuthenticationManager) -> None:
612 """Test validation in create_user_with_api.
613
614 :param auth_manager: AuthenticationManager instance.
615 """
616 admin = await auth_manager.create_user(username="validadmin", role=UserRole.ADMIN)
617 set_current_user(admin)
618
619 # Test username too short
620 with pytest.raises(InvalidDataError, match="Username must be at least 2 characters"):
621 await auth_manager.create_user_with_api(
622 username="a",
623 password="password123",
624 )
625
626 # Test 2-character username is accepted (minimum allowed)
627 user_2char = await auth_manager.create_user_with_api(
628 username="ab",
629 password="password123",
630 )
631 assert user_2char.username == "ab"
632
633 # Test password too short
634 with pytest.raises(InvalidDataError, match="Password must be at least 8 characters"):
635 await auth_manager.create_user_with_api(
636 username="validuser",
637 password="short",
638 )
639
640
641async def test_logout(auth_manager: AuthenticationManager) -> None:
642 """Test logout functionality.
643
644 :param auth_manager: AuthenticationManager instance.
645 """
646 user = await auth_manager.create_user(username="logoutuser", role=UserRole.USER)
647 token = await auth_manager.create_token(user, "Logout Test", is_long_lived=False)
648
649 # Set current user and token
650 set_current_user(user)
651 set_current_token(token)
652
653 # Token should work before logout
654 authenticated_user = await auth_manager.authenticate_with_token(token)
655 assert authenticated_user is not None
656
657 # Logout
658 await auth_manager.logout()
659
660 # Token should not work after logout
661 authenticated_user = await auth_manager.authenticate_with_token(token)
662 assert authenticated_user is None
663
664
665async def test_token_sliding_expiration(auth_manager: AuthenticationManager) -> None:
666 """Test that short-lived tokens auto-renew on use.
667
668 :param auth_manager: AuthenticationManager instance.
669 """
670 user = await auth_manager.create_user(username="slideuser", role=UserRole.USER)
671 token = await auth_manager.create_token(user, "Slide Test", is_long_lived=False)
672
673 # Get initial expiration
674 token_hash = hashlib.sha256(token.encode()).hexdigest()
675 token_row = await auth_manager.database.get_row("auth_tokens", {"token_hash": token_hash})
676 assert token_row is not None
677 initial_expires_at = token_row["expires_at"]
678
679 # Use the token (authenticate)
680 authenticated_user = await auth_manager.authenticate_with_token(token)
681 assert authenticated_user is not None
682
683 # Check that expiration was updated
684 token_row = await auth_manager.database.get_row("auth_tokens", {"token_hash": token_hash})
685 assert token_row is not None
686 updated_expires_at = token_row["expires_at"]
687
688 # Expiration should have been extended
689 assert updated_expires_at != initial_expires_at
690
691
692async def test_long_lived_token_no_auto_renewal(auth_manager: AuthenticationManager) -> None:
693 """Test that long-lived tokens do NOT auto-renew on use.
694
695 :param auth_manager: AuthenticationManager instance.
696 """
697 user = await auth_manager.create_user(username="longuser", role=UserRole.USER)
698 token = await auth_manager.create_token(user, "Long Test", is_long_lived=True)
699
700 # Get initial expiration
701 token_hash = hashlib.sha256(token.encode()).hexdigest()
702 token_row = await auth_manager.database.get_row("auth_tokens", {"token_hash": token_hash})
703 assert token_row is not None
704 initial_expires_at = token_row["expires_at"]
705
706 # Use the token (authenticate)
707 authenticated_user = await auth_manager.authenticate_with_token(token)
708 assert authenticated_user is not None
709
710 # Check that expiration was NOT updated
711 token_row = await auth_manager.database.get_row("auth_tokens", {"token_hash": token_hash})
712 assert token_row is not None
713 updated_expires_at = token_row["expires_at"]
714
715 # Expiration should remain the same for long-lived tokens
716 assert updated_expires_at == initial_expires_at
717
718
719async def test_username_case_insensitive_creation(auth_manager: AuthenticationManager) -> None:
720 """Test that usernames are normalized to lowercase on creation.
721
722 :param auth_manager: AuthenticationManager instance.
723 """
724 # Create user with mixed case username
725 user = await auth_manager.create_user(
726 username="TestUser",
727 role=UserRole.USER,
728 display_name="Test User",
729 )
730
731 # Username should be stored in lowercase
732 assert user.username == "testuser"
733
734
735async def test_username_case_insensitive_duplicate_prevention(
736 auth_manager: AuthenticationManager,
737) -> None:
738 """Test that duplicate usernames with different cases are prevented.
739
740 :param auth_manager: AuthenticationManager instance.
741 """
742 # Create user with lowercase username
743 await auth_manager.create_user(username="admin", role=UserRole.USER)
744
745 # Try to create user with same username but different case should fail
746 # (SQLite UNIQUE constraint violation)
747 with pytest.raises(IntegrityError, match="UNIQUE constraint failed"):
748 await auth_manager.create_user(username="Admin", role=UserRole.USER)
749
750
751async def test_username_case_insensitive_login(auth_manager: AuthenticationManager) -> None:
752 """Test that login works with any case variation of username.
753
754 :param auth_manager: AuthenticationManager instance.
755 """
756 builtin_provider = auth_manager.login_providers.get("builtin")
757 assert builtin_provider is not None
758 assert isinstance(builtin_provider, BuiltinLoginProvider)
759
760 # Create user with lowercase username
761 await builtin_provider.create_user_with_password(
762 username="testadmin",
763 password="SecurePassword123",
764 role=UserRole.ADMIN,
765 )
766
767 # Test login with lowercase
768 result = await auth_manager.authenticate_with_credentials(
769 "builtin",
770 {"username": "testadmin", "password": "SecurePassword123"},
771 )
772 assert result.success is True
773 assert result.user is not None
774 assert result.user.username == "testadmin"
775
776 # Test login with uppercase
777 result = await auth_manager.authenticate_with_credentials(
778 "builtin",
779 {"username": "TESTADMIN", "password": "SecurePassword123"},
780 )
781 assert result.success is True
782 assert result.user is not None
783 assert result.user.username == "testadmin"
784
785 # Test login with mixed case
786 result = await auth_manager.authenticate_with_credentials(
787 "builtin",
788 {"username": "TestAdmin", "password": "SecurePassword123"},
789 )
790 assert result.success is True
791 assert result.user is not None
792 assert result.user.username == "testadmin"
793
794
795async def test_username_case_insensitive_lookup(auth_manager: AuthenticationManager) -> None:
796 """Test that user lookup by username is case-insensitive.
797
798 :param auth_manager: AuthenticationManager instance.
799 """
800 # Create user with lowercase username
801 created_user = await auth_manager.create_user(username="lookupuser", role=UserRole.USER)
802
803 # Lookup with lowercase
804 user1 = await auth_manager.get_user_by_username("lookupuser")
805 assert user1 is not None
806 assert user1.user_id == created_user.user_id
807
808 # Lookup with uppercase
809 user2 = await auth_manager.get_user_by_username("LOOKUPUSER")
810 assert user2 is not None
811 assert user2.user_id == created_user.user_id
812
813 # Lookup with mixed case
814 user3 = await auth_manager.get_user_by_username("LookUpUser")
815 assert user3 is not None
816 assert user3.user_id == created_user.user_id
817
818
819async def test_username_update_normalizes(auth_manager: AuthenticationManager) -> None:
820 """Test that updating username normalizes it to lowercase.
821
822 :param auth_manager: AuthenticationManager instance.
823 """
824 user = await auth_manager.create_user(username="originaluser", role=UserRole.USER)
825
826 # Update username with mixed case
827 updated_user = await auth_manager.update_user(user, username="UpdatedUser")
828
829 # Username should be normalized to lowercase
830 assert updated_user is not None
831 assert updated_user.username == "updateduser"
832
833
834async def test_link_user_to_provider_idempotent(auth_manager: AuthenticationManager) -> None:
835 """Test that linking user to provider is idempotent.
836
837 This tests the fix for the bug where re-linking a user would cause
838 IntegrityError due to UNIQUE constraint on (provider_type, provider_user_id).
839
840 :param auth_manager: AuthenticationManager instance.
841 """
842 user = await auth_manager.create_user(username="hauser", role=UserRole.USER)
843
844 # Link user to Home Assistant provider for the first time
845 link1 = await auth_manager.link_user_to_provider(
846 user,
847 AuthProviderType.HOME_ASSISTANT,
848 "ha_user_456",
849 )
850
851 assert link1 is not None
852 assert link1.user_id == user.user_id
853 assert link1.provider_type == AuthProviderType.HOME_ASSISTANT
854 assert link1.provider_user_id == "ha_user_456"
855
856 # Linking the same user again should return existing link without error
857 link2 = await auth_manager.link_user_to_provider(
858 user,
859 AuthProviderType.HOME_ASSISTANT,
860 "ha_user_456",
861 )
862
863 assert link2 is not None
864 assert link2.link_id == link1.link_id # Should be same link
865 assert link2.user_id == user.user_id
866 assert link2.provider_type == AuthProviderType.HOME_ASSISTANT
867 assert link2.provider_user_id == "ha_user_456"
868
869
870async def test_ingress_auth_existing_username(auth_manager: AuthenticationManager) -> None:
871 """Test HA ingress auth when username exists but isn't linked to HA provider.
872
873 This tests the scenario where a user is created during setup, and then
874 tries to login via HA ingress with the same username.
875
876 :param auth_manager: AuthenticationManager instance.
877 """
878 # Simulate user created during initial setup
879 existing_user = await auth_manager.create_user(
880 username="admin",
881 role=UserRole.ADMIN,
882 display_name="Admin User",
883 )
884
885 # Now simulate HA ingress trying to auto-create a user with same username
886 # This should find the existing user and link it instead of creating new one
887 user = await auth_manager.get_user_by_username("admin")
888 assert user is not None
889 assert user.user_id == existing_user.user_id
890
891 # Link the existing user to HA provider (what ingress flow would do)
892 link = await auth_manager.link_user_to_provider(
893 user,
894 AuthProviderType.HOME_ASSISTANT,
895 "ha_admin_123",
896 )
897
898 assert link is not None
899 assert link.user_id == existing_user.user_id
900
901 # Verify we can retrieve user by provider link
902 retrieved_user = await auth_manager.get_user_by_provider_link(
903 AuthProviderType.HOME_ASSISTANT,
904 "ha_admin_123",
905 )
906
907 assert retrieved_user is not None
908 assert retrieved_user.user_id == existing_user.user_id
909 assert retrieved_user.username == "admin"
910