/
/
/
1"""Streaming operations for Tidal."""
2
3from __future__ import annotations
4
5from sqlite3 import OperationalError
6from typing import TYPE_CHECKING
7
8from music_assistant_models.enums import ContentType, ExternalID, StreamType
9from music_assistant_models.errors import MediaNotFoundError
10from music_assistant_models.media_items import AudioFormat
11from music_assistant_models.streamdetails import StreamDetails
12
13from .constants import CACHE_CATEGORY_ISRC_MAP, CONF_QUALITY
14
15if TYPE_CHECKING:
16 from music_assistant_models.media_items import Track
17
18 from .provider import TidalProvider
19
20
21class TidalStreamingManager:
22 """Manages Tidal streaming operations."""
23
24 def __init__(self, provider: TidalProvider):
25 """Initialize streaming manager."""
26 self.provider = provider
27 self.api = provider.api
28 self.mass = provider.mass
29
30 async def get_stream_details(self, item_id: str) -> StreamDetails:
31 """Get stream details for a track."""
32 # 1. Try direct lookup
33 try:
34 track = await self.provider.get_track(item_id)
35 except MediaNotFoundError:
36 # 2. Fallback to ISRC lookup
37 if isrc_track := await self._get_track_by_isrc(item_id):
38 track = isrc_track
39 else:
40 raise MediaNotFoundError(f"Track {item_id} not found")
41
42 quality = self.provider.config.get_value(CONF_QUALITY)
43
44 # 3. Get playback info
45 async with self.api.throttler.bypass():
46 api_result = await self.api.get(
47 f"tracks/{track.item_id}/playbackinfopostpaywall",
48 params={
49 "playbackmode": "STREAM",
50 "assetpresentation": "FULL",
51 "audioquality": quality,
52 },
53 )
54
55 stream_data = api_result[0] if isinstance(api_result, tuple) else api_result
56
57 # 4. Parse stream URL
58 manifest_type = stream_data.get("manifestMimeType", "")
59 if "dash+xml" in manifest_type and "manifest" in stream_data:
60 url = f"data:application/dash+xml;base64,{stream_data['manifest']}"
61 else:
62 urls = stream_data.get("urls", [])
63 if not urls:
64 raise MediaNotFoundError("No stream URL found")
65 url = urls[0]
66
67 # 5. Determine format
68 audio_quality = stream_data.get("audioQuality")
69 if audio_quality in ("HIRES_LOSSLESS", "HI_RES_LOSSLESS", "LOSSLESS"):
70 content_type = ContentType.FLAC
71 elif codec := stream_data.get("codec"):
72 content_type = ContentType.try_parse(codec)
73 else:
74 content_type = ContentType.MP4
75
76 resolved_audio_format = AudioFormat(
77 content_type=content_type,
78 sample_rate=stream_data.get("sampleRate", 44100),
79 bit_depth=stream_data.get("bitDepth", 16),
80 channels=2,
81 )
82
83 # Never block or fail playback on DB issues.
84 self.mass.create_task(
85 self._async_update_provider_mapping_audio_format(
86 provider_track_id=track.item_id,
87 resolved_audio_format=resolved_audio_format,
88 )
89 )
90
91 return StreamDetails(
92 item_id=track.item_id,
93 provider=self.provider.instance_id,
94 audio_format=resolved_audio_format,
95 stream_type=StreamType.HTTP,
96 duration=track.duration,
97 path=url,
98 can_seek=True,
99 allow_seek=True,
100 )
101
102 async def _async_update_provider_mapping_audio_format(
103 self,
104 provider_track_id: str,
105 resolved_audio_format: AudioFormat,
106 ) -> None:
107 """Persist resolved audio format on the provider mapping (best-effort)."""
108 try:
109 lib_track = await self.mass.music.tracks.get_library_item_by_prov_id(
110 provider_track_id, self.provider.instance_id
111 )
112 if not lib_track:
113 return
114
115 cur_mapping = next(
116 (
117 m
118 for m in lib_track.provider_mappings
119 if m.provider_instance == self.provider.instance_id
120 and m.item_id == provider_track_id
121 ),
122 None,
123 )
124 if not cur_mapping or cur_mapping.audio_format == resolved_audio_format:
125 return
126
127 await self.mass.music.tracks.update_provider_mapping(
128 item_id=lib_track.item_id,
129 provider_instance_id=self.provider.instance_id,
130 provider_item_id=provider_track_id,
131 audio_format=resolved_audio_format,
132 )
133 except (MediaNotFoundError, OperationalError, AssertionError) as err:
134 self.provider.logger.debug(
135 "Failed to persist audio_format on provider mapping for Tidal track %s "
136 "(provider_instance=%s): %s",
137 provider_track_id,
138 self.provider.instance_id,
139 err,
140 )
141 except Exception:
142 self.provider.logger.exception(
143 "Unexpected error while persisting audio_format on provider mapping for "
144 "Tidal track %s (provider_instance=%s)",
145 provider_track_id,
146 self.provider.instance_id,
147 )
148
149 async def _get_track_by_isrc(self, item_id: str) -> Track | None:
150 """Lookup track by ISRC with caching."""
151 # Check cache
152 if cached_id := await self.mass.cache.get(
153 item_id, provider=self.provider.instance_id, category=CACHE_CATEGORY_ISRC_MAP
154 ):
155 try:
156 return await self.provider.get_track(cached_id)
157 except MediaNotFoundError:
158 await self.mass.cache.delete(
159 item_id, provider=self.provider.instance_id, category=CACHE_CATEGORY_ISRC_MAP
160 )
161
162 # Get library item to find ISRC
163 lib_track = await self.mass.music.tracks.get_library_item_by_prov_id(
164 item_id, self.provider.instance_id
165 )
166 if not lib_track:
167 return None
168
169 isrc = next((x[1] for x in lib_track.external_ids if x[0] == ExternalID.ISRC), None)
170 if not isrc:
171 return None
172
173 # Lookup by ISRC
174 api_result = await self.api.get(
175 "/tracks", params={"filter[isrc]": isrc}, base_url=self.api.OPEN_API_URL
176 )
177 data = api_result[0] if isinstance(api_result, tuple) else api_result
178
179 data_items = data.get("data", [])
180 if not data_items:
181 return None
182
183 track_id = str(data_items[0]["id"])
184
185 # Cache result
186 await self.mass.cache.set(
187 key=item_id,
188 data=track_id,
189 provider=self.provider.instance_id,
190 category=CACHE_CATEGORY_ISRC_MAP,
191 persistent=True,
192 expiration=86400 * 90,
193 )
194
195 return await self.provider.get_track(track_id)
196