music-assistant-server

3.3 KBPY
core.py
3.3 KB85 lines • python
1"""
2NicovideoMusicProviderCoreMixin: Core functionality not belonging to specific domains.
3
4This mixin handles core functionality that doesn't belong to any specific feature area:
5- Instance management (adapter, config)
6- Authentication and session management
7- Provider lifecycle management (initialization/cleanup)
8- Basic provider properties
9"""
10
11from __future__ import annotations
12
13from typing import Any, override
14
15from music_assistant_models.errors import LoginFailed
16
17from music_assistant.providers.nicovideo.config import NicovideoConfig
18from music_assistant.providers.nicovideo.provider_mixins.base import (
19    NicovideoMusicProviderMixinBase,
20)
21from music_assistant.providers.nicovideo.services.manager import NicovideoServiceManager
22
23
24class NicovideoMusicProviderCoreMixin(NicovideoMusicProviderMixinBase):
25    """Core mixin handling instance management and provider lifecycle."""
26
27    def __init__(self, *args: Any, **kwargs: Any) -> None:
28        """Initialize the core mixin."""
29        super().__init__(*args, **kwargs)
30        self._nicovideo_config = NicovideoConfig(self)
31        self._service_manager = NicovideoServiceManager(self, self.nicovideo_config)
32
33    @property
34    @override
35    def nicovideo_config(self) -> NicovideoConfig:
36        """Get the config helper instance."""
37        return self._nicovideo_config
38
39    @property
40    @override
41    def service_manager(self) -> NicovideoServiceManager:
42        """Get the nicovideo service manager instance."""
43        return self._service_manager
44
45    @property
46    @override
47    def is_streaming_provider(self) -> bool:
48        """Return True if the provider is a streaming provider."""
49        # For streaming providers return True here but for local file based providers return False.
50        return True
51
52    @override
53    async def handle_async_init_for_mixin(self) -> None:
54        """Handle async initialization of the provider."""
55        try:
56            # Check if login credentials are provided
57            has_credentials = bool(
58                self.nicovideo_config.auth.user_session
59                or (self.nicovideo_config.auth.mail and self.nicovideo_config.auth.password)
60            )
61
62            if has_credentials:
63                # Try login if credentials are provided
64                login_success = await self.service_manager.auth.try_login()
65                if not login_success:
66                    raise LoginFailed("Login failed with provided credentials")
67                self.service_manager.auth.start_periodic_relogin_task()
68                self.logger.debug("nicovideo provider initialized successfully with login")
69            else:
70                # No credentials provided - initialize without login
71                self.logger.debug("nicovideo provider initialized successfully without login")
72        except Exception as err:
73            self.logger.error("Failed to initialize nicovideo provider: %s", err)
74            raise
75
76    @override
77    async def unload_for_mixin(self, is_removed: bool = False) -> None:
78        """Handle unload/close of the provider."""
79        try:
80            # Stop the periodic relogin task
81            self.service_manager.auth.stop_periodic_relogin_task()
82            self.logger.debug("nicovideo provider unloaded successfully")
83        except Exception as err:
84            self.logger.warning("Error during nicovideo provider unload: %s", err)
85