/
/
/
1"""Streaming operations for Yandex Music."""
2
3from __future__ import annotations
4
5import asyncio
6from collections.abc import AsyncGenerator
7from typing import TYPE_CHECKING, Any
8
9from aiohttp import ClientPayloadError
10from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
11from music_assistant_models.enums import ContentType, StreamType
12from music_assistant_models.errors import MediaNotFoundError
13from music_assistant_models.media_items import AudioFormat
14from music_assistant_models.streamdetails import StreamDetails
15
16from .constants import (
17 CONF_QUALITY,
18 QUALITY_EFFICIENT,
19 QUALITY_HIGH,
20 QUALITY_SUPERB,
21 RADIO_TRACK_ID_SEP,
22)
23
24if TYPE_CHECKING:
25 from yandex_music import DownloadInfo
26
27 from .provider import YandexMusicProvider
28
29
30class YandexMusicStreamingManager:
31 """Manages Yandex Music streaming operations."""
32
33 def __init__(self, provider: YandexMusicProvider) -> None:
34 """Initialize streaming manager.
35
36 :param provider: The Yandex Music provider instance.
37 """
38 self.provider = provider
39 self.client = provider.client
40 self.mass = provider.mass
41 self.logger = provider.logger
42
43 def _track_id_from_item_id(self, item_id: str) -> str:
44 """Extract API track ID from item_id (may be track_id@station_id for My Wave)."""
45 if RADIO_TRACK_ID_SEP in item_id:
46 return item_id.split(RADIO_TRACK_ID_SEP, 1)[0]
47 return item_id
48
49 async def get_stream_details(self, item_id: str) -> StreamDetails:
50 """Get stream details for a track.
51
52 :param item_id: Track ID or composite track_id@station_id for My Wave.
53 :return: StreamDetails for the track (item_id preserved for on_streamed).
54 :raises MediaNotFoundError: If stream URL cannot be obtained.
55 """
56 track_id = self._track_id_from_item_id(item_id)
57 track = await self.provider.get_track(item_id)
58 if not track:
59 raise MediaNotFoundError(f"Track {item_id} not found")
60
61 quality = self.provider.config.get_value(CONF_QUALITY)
62 quality_str = str(quality) if quality is not None else None
63 preferred_normalized = (quality_str or "").strip().lower()
64
65 # Check for superb (lossless) quality
66 want_lossless = preferred_normalized in (QUALITY_SUPERB, "superb")
67
68 # Backward compatibility: also check old "lossless" value (exact match)
69 if preferred_normalized == "lossless":
70 want_lossless = True
71
72 # When user wants lossless, try get-file-info first (FLAC; download-info often MP3 only)
73 if want_lossless:
74 self.logger.debug("Requesting lossless via get-file-info for track %s", track_id)
75 file_info = await self.client.get_track_file_info_lossless(track_id)
76 if file_info:
77 url = file_info.get("url")
78 codec = file_info.get("codec") or ""
79 needs_decryption = file_info.get("needs_decryption", False)
80
81 if url and codec.lower() in ("flac", "flac-mp4"):
82 audio_format = self._build_audio_format(codec)
83
84 # Handle encrypted URLs from encraw transport
85 if needs_decryption and "key" in file_info:
86 self.logger.info(
87 "Streaming encrypted %s for track %s - will decrypt on-the-fly",
88 codec,
89 track_id,
90 )
91 # Return StreamType.CUSTOM for streaming decryption.
92 # can_seek=False: provider always streams from position 0;
93 # allow_seek=True: ffmpeg handles seek with -ss input flag.
94 return StreamDetails(
95 item_id=item_id,
96 provider=self.provider.instance_id,
97 audio_format=audio_format,
98 stream_type=StreamType.CUSTOM,
99 duration=track.duration,
100 data={
101 "encrypted_url": url,
102 "decryption_key": file_info["key"],
103 "codec": codec,
104 },
105 can_seek=False,
106 allow_seek=True,
107 )
108 # Unencrypted URL, use directly
109 self.logger.debug(
110 "Unencrypted stream for track %s: codec=%s",
111 item_id,
112 codec,
113 )
114 return StreamDetails(
115 item_id=item_id,
116 provider=self.provider.instance_id,
117 audio_format=audio_format,
118 stream_type=StreamType.HTTP,
119 duration=track.duration,
120 path=url,
121 can_seek=True,
122 allow_seek=True,
123 )
124
125 # Default: use /tracks/.../download-info and select best quality
126 download_infos = await self.client.get_track_download_info(track_id, get_direct_links=True)
127 if not download_infos:
128 raise MediaNotFoundError(f"No stream info available for track {item_id}")
129
130 codecs_available = [
131 (getattr(i, "codec", None), getattr(i, "bitrate_in_kbps", None)) for i in download_infos
132 ]
133 self.logger.debug(
134 "Stream quality for track %s: config quality=%s, available codecs=%s",
135 track_id,
136 quality_str,
137 codecs_available,
138 )
139 selected_info = self._select_best_quality(download_infos, quality_str)
140
141 if not selected_info or not selected_info.direct_link:
142 raise MediaNotFoundError(f"No stream URL available for track {item_id}")
143
144 self.logger.debug(
145 "Stream selected for track %s: codec=%s, bitrate=%s",
146 track_id,
147 getattr(selected_info, "codec", None),
148 getattr(selected_info, "bitrate_in_kbps", None),
149 )
150
151 bitrate = selected_info.bitrate_in_kbps or 0
152
153 return StreamDetails(
154 item_id=item_id,
155 provider=self.provider.instance_id,
156 audio_format=self._build_audio_format(selected_info.codec, bit_rate=bitrate),
157 stream_type=StreamType.HTTP,
158 duration=track.duration,
159 path=selected_info.direct_link,
160 can_seek=True,
161 allow_seek=True,
162 )
163
164 def _select_best_quality(
165 self, download_infos: list[Any], preferred_quality: str | None
166 ) -> DownloadInfo | None:
167 """Select the best quality download info based on user preference.
168
169 :param download_infos: List of DownloadInfo objects.
170 :param preferred_quality: User's quality preference (efficient/high/balanced/superb).
171 :return: Best matching DownloadInfo or None.
172 """
173 if not download_infos:
174 return None
175
176 preferred_normalized = (preferred_quality or "").strip().lower()
177
178 # Sort by bitrate descending
179 sorted_infos = sorted(
180 download_infos,
181 key=lambda x: x.bitrate_in_kbps or 0,
182 reverse=True,
183 )
184
185 # Superb: Prefer FLAC (backward compatibility with "lossless")
186 if preferred_normalized == QUALITY_SUPERB or "lossless" in preferred_normalized:
187 # Note: flac-mp4 typically comes from get-file-info API, not download-info,
188 # but we check here for forward compatibility in case the API changes.
189 for codec in ("flac-mp4", "flac"):
190 for info in sorted_infos:
191 if info.codec and info.codec.lower() == codec:
192 return info
193 self.logger.warning(
194 "Superb quality (FLAC) requested but not available; using best available"
195 )
196 return sorted_infos[0]
197
198 # Efficient: Prefer lowest bitrate AAC/MP3
199 if preferred_normalized == QUALITY_EFFICIENT:
200 # Sort ascending for lowest bitrate
201 sorted_infos_asc = sorted(
202 download_infos,
203 key=lambda x: x.bitrate_in_kbps or 999,
204 )
205 # Prefer AAC for efficiency, then MP3 (include MP4 container variants)
206 for codec in ("aac-mp4", "aac", "he-aac-mp4", "he-aac", "mp3"):
207 for info in sorted_infos_asc:
208 if info.codec and info.codec.lower() == codec:
209 return info
210 return sorted_infos_asc[0]
211
212 # High: Prefer high bitrate MP3 (~320kbps)
213 if preferred_normalized == QUALITY_HIGH:
214 # Look for MP3 with bitrate >= 256kbps
215 high_quality_mp3 = [
216 info
217 for info in sorted_infos
218 if info.codec
219 and info.codec.lower() == "mp3"
220 and info.bitrate_in_kbps
221 and info.bitrate_in_kbps >= 256
222 ]
223 if high_quality_mp3:
224 return high_quality_mp3[0] # Already sorted by bitrate descending
225
226 # Fallback: any MP3 available (highest bitrate)
227 for info in sorted_infos:
228 if info.codec and info.codec.lower() == "mp3":
229 return info
230
231 # If no MP3, use highest available (excluding FLAC)
232 for info in sorted_infos:
233 if info.codec and info.codec.lower() not in ("flac", "flac-mp4"):
234 return info
235
236 # Last resort: highest available
237 return sorted_infos[0]
238
239 # Balanced (default): Prefer ~192kbps AAC, or medium quality MP3
240 # Look for bitrate around 192kbps (within range 128-256)
241 balanced_infos = [
242 info
243 for info in sorted_infos
244 if info.bitrate_in_kbps and 128 <= info.bitrate_in_kbps <= 256
245 ]
246 if balanced_infos:
247 # Prefer AAC over MP3 at similar bitrate (include MP4 container variants)
248 for codec in ("aac-mp4", "aac", "he-aac-mp4", "he-aac", "mp3"):
249 for info in balanced_infos:
250 if info.codec and info.codec.lower() == codec:
251 return info
252 return balanced_infos[0]
253
254 # Fallback to highest available if no balanced option
255 return sorted_infos[0] if sorted_infos else None
256
257 def _get_content_type(self, codec: str | None) -> tuple[ContentType, ContentType]:
258 """Determine container and codec type from Yandex API codec string.
259
260 Yandex API returns codec strings like "flac-mp4" (FLAC in MP4 container),
261 "aac-mp4" (AAC in MP4 container), or plain "flac", "mp3", "aac".
262
263 :param codec: Codec string from Yandex API.
264 :return: Tuple of (content_type/container, codec_type).
265 """
266 if not codec:
267 return ContentType.UNKNOWN, ContentType.UNKNOWN
268
269 codec_lower = codec.lower()
270
271 # MP4 container variants: codec is inside an MP4 container
272 if codec_lower == "flac-mp4":
273 return ContentType.MP4, ContentType.FLAC
274 if codec_lower in ("aac-mp4", "he-aac-mp4"):
275 return ContentType.MP4, ContentType.AAC
276
277 # Plain single-codec formats: codec is implied by content_type, no separate codec_type
278 if codec_lower == "flac":
279 return ContentType.FLAC, ContentType.UNKNOWN
280 if codec_lower in ("mp3", "mpeg"):
281 return ContentType.MP3, ContentType.UNKNOWN
282 if codec_lower in ("aac", "he-aac"):
283 return ContentType.AAC, ContentType.UNKNOWN
284
285 return ContentType.UNKNOWN, ContentType.UNKNOWN
286
287 def _get_audio_params(self, codec: str | None) -> tuple[int, int]:
288 """Return (sample_rate, bit_depth) defaults based on codec string.
289
290 The Yandex get-file-info API does not return sample rate or bit depth,
291 so we use codec-based defaults. These values help the core select the
292 correct PCM output format and avoid unnecessary resampling.
293
294 :param codec: Codec string from Yandex API (e.g. "flac-mp4", "flac", "mp3").
295 :return: Tuple of (sample_rate, bit_depth).
296 """
297 if codec and codec.lower() == "flac-mp4":
298 return 48000, 24
299 # CD-quality defaults for all other codecs
300 return 44100, 16
301
302 def _build_audio_format(self, codec: str | None, bit_rate: int = 0) -> AudioFormat:
303 """Build AudioFormat with content type and codec-based audio params.
304
305 :param codec: Codec string from Yandex API (e.g. "flac-mp4", "flac", "mp3").
306 :param bit_rate: Bitrate in kbps (0 for variable/unknown).
307 :return: Configured AudioFormat instance.
308 """
309 content_type, codec_type = self._get_content_type(codec)
310 sample_rate, bit_depth = self._get_audio_params(codec)
311 return AudioFormat(
312 content_type=content_type,
313 codec_type=codec_type,
314 bit_rate=bit_rate,
315 sample_rate=sample_rate,
316 bit_depth=bit_depth,
317 )
318
319 async def get_audio_stream(
320 self, streamdetails: StreamDetails, seek_position: int = 0
321 ) -> AsyncGenerator[bytes, None]:
322 """Return the audio stream for the provider item with on-the-fly decryption.
323
324 Downloads and decrypts the encrypted stream chunk-by-chunk without buffering.
325 On connection drop, reconnects using a Range header and resumes AES-CTR
326 decryption from the correct block boundary (up to 3 retries).
327
328 :param streamdetails: Stream details containing encrypted URL and key.
329 :param seek_position: Always 0 (seeking delegated to ffmpeg via allow_seek=True).
330 :return: Async generator yielding decrypted audio bytes.
331 """
332 encrypted_url: str = streamdetails.data["encrypted_url"]
333 key_hex: str = streamdetails.data["decryption_key"]
334 key_bytes = bytes.fromhex(key_hex)
335 if len(key_bytes) not in (16, 24, 32):
336 raise MediaNotFoundError(f"Unsupported AES key length: {len(key_bytes)} bytes")
337
338 block_size = 16 # AES-CTR block size in bytes
339 max_retries = 3
340 bytes_yielded = 0 # total decrypted bytes delivered to caller
341
342 for attempt in range(max_retries + 1):
343 if attempt > 0:
344 await asyncio.sleep(min(2**attempt, 8)) # 2s, 4s, 8s
345
346 # Align resume position to AES-CTR block boundary
347 block_start = (bytes_yielded // block_size) * block_size
348 block_skip = bytes_yielded - block_start # overlap bytes to discard in first chunk
349
350 # AES-CTR: original nonce is 0x00..00, so counter = block number
351 nonce = (block_start // block_size).to_bytes(block_size, "big")
352 decryptor = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce)).decryptor()
353 headers = {"Range": f"bytes={block_start}-"} if block_start > 0 else {}
354
355 try:
356 async with self.mass.http_session.get(encrypted_url, headers=headers) as response:
357 try:
358 response.raise_for_status()
359 except Exception as err:
360 raise MediaNotFoundError(
361 f"Failed to fetch encrypted stream: {err}"
362 ) from err
363
364 carry_skip = block_skip
365 async for chunk in response.content.iter_chunked(65536):
366 decrypted = decryptor.update(chunk)
367 if carry_skip > 0:
368 skip = min(carry_skip, len(decrypted))
369 decrypted = decrypted[skip:]
370 carry_skip -= skip
371 if decrypted:
372 bytes_yielded += len(decrypted)
373 yield decrypted
374
375 final = decryptor.finalize()
376 if final:
377 bytes_yielded += len(final)
378 yield final
379 return # stream completed normally
380
381 except asyncio.CancelledError:
382 raise # propagate cancellation immediately, do not retry
383 except ClientPayloadError as err:
384 if attempt < max_retries:
385 self.logger.warning(
386 "Encrypted stream dropped at %d bytes (attempt %d/%d): %s â retrying",
387 bytes_yielded,
388 attempt + 1,
389 max_retries,
390 err,
391 )
392 else:
393 raise MediaNotFoundError(
394 "Encrypted stream ended early after retries were exhausted"
395 ) from err
396