/
/
/
1"""Streaming operations for Tidal."""
2
3from __future__ import annotations
4
5from typing import TYPE_CHECKING
6
7from music_assistant_models.enums import ContentType, ExternalID, StreamType
8from music_assistant_models.errors import MediaNotFoundError
9from music_assistant_models.media_items import AudioFormat
10from music_assistant_models.streamdetails import StreamDetails
11
12from .constants import CACHE_CATEGORY_ISRC_MAP, CONF_QUALITY
13
14if TYPE_CHECKING:
15 from music_assistant_models.media_items import Track
16
17 from .provider import TidalProvider
18
19
20class TidalStreamingManager:
21 """Manages Tidal streaming operations."""
22
23 def __init__(self, provider: TidalProvider):
24 """Initialize streaming manager."""
25 self.provider = provider
26 self.api = provider.api
27 self.mass = provider.mass
28
29 async def get_stream_details(self, item_id: str) -> StreamDetails:
30 """Get stream details for a track."""
31 # 1. Try direct lookup
32 try:
33 track = await self.provider.get_track(item_id)
34 except MediaNotFoundError:
35 # 2. Fallback to ISRC lookup
36 if isrc_track := await self._get_track_by_isrc(item_id):
37 track = isrc_track
38 else:
39 raise MediaNotFoundError(f"Track {item_id} not found")
40
41 quality = self.provider.config.get_value(CONF_QUALITY)
42
43 # 3. Get playback info
44 async with self.api.throttler.bypass():
45 api_result = await self.api.get(
46 f"tracks/{track.item_id}/playbackinfopostpaywall",
47 params={
48 "playbackmode": "STREAM",
49 "assetpresentation": "FULL",
50 "audioquality": quality,
51 },
52 )
53
54 stream_data = api_result[0] if isinstance(api_result, tuple) else api_result
55
56 # 4. Parse stream URL
57 manifest_type = stream_data.get("manifestMimeType", "")
58 if "dash+xml" in manifest_type and "manifest" in stream_data:
59 url = f"data:application/dash+xml;base64,{stream_data['manifest']}"
60 else:
61 urls = stream_data.get("urls", [])
62 if not urls:
63 raise MediaNotFoundError("No stream URL found")
64 url = urls[0]
65
66 # 5. Determine format
67 audio_quality = stream_data.get("audioQuality")
68 if audio_quality in ("HIRES_LOSSLESS", "HI_RES_LOSSLESS", "LOSSLESS"):
69 content_type = ContentType.FLAC
70 elif codec := stream_data.get("codec"):
71 content_type = ContentType.try_parse(codec)
72 else:
73 content_type = ContentType.MP4
74
75 return StreamDetails(
76 item_id=track.item_id,
77 provider=self.provider.instance_id,
78 audio_format=AudioFormat(
79 content_type=content_type,
80 sample_rate=stream_data.get("sampleRate", 44100),
81 bit_depth=stream_data.get("bitDepth", 16),
82 channels=2,
83 ),
84 stream_type=StreamType.HTTP,
85 duration=track.duration,
86 path=url,
87 can_seek=True,
88 allow_seek=True,
89 )
90
91 async def _get_track_by_isrc(self, item_id: str) -> Track | None:
92 """Lookup track by ISRC with caching."""
93 # Check cache
94 if cached_id := await self.mass.cache.get(
95 item_id, provider=self.provider.instance_id, category=CACHE_CATEGORY_ISRC_MAP
96 ):
97 try:
98 return await self.provider.get_track(cached_id)
99 except MediaNotFoundError:
100 await self.mass.cache.delete(
101 item_id, provider=self.provider.instance_id, category=CACHE_CATEGORY_ISRC_MAP
102 )
103
104 # Get library item to find ISRC
105 lib_track = await self.mass.music.tracks.get_library_item_by_prov_id(
106 item_id, self.provider.instance_id
107 )
108 if not lib_track:
109 return None
110
111 isrc = next((x[1] for x in lib_track.external_ids if x[0] == ExternalID.ISRC), None)
112 if not isrc:
113 return None
114
115 # Lookup by ISRC
116 api_result = await self.api.get(
117 "/tracks", params={"filter[isrc]": isrc}, base_url=self.api.OPEN_API_URL
118 )
119 data = api_result[0] if isinstance(api_result, tuple) else api_result
120
121 data_items = data.get("data", [])
122 if not data_items:
123 return None
124
125 track_id = str(data_items[0]["id"])
126
127 # Cache result
128 await self.mass.cache.set(
129 key=item_id,
130 data=track_id,
131 provider=self.provider.instance_id,
132 category=CACHE_CATEGORY_ISRC_MAP,
133 persistent=True,
134 expiration=86400 * 90,
135 )
136
137 return await self.provider.get_track(track_id)
138