/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 ProviderFeature,
32 QueueOption,
33 RepeatMode,
34)
35from music_assistant_models.errors import (
36 AudioError,
37 InvalidCommand,
38 InvalidDataError,
39 MediaNotFoundError,
40 MusicAssistantError,
41 PlayerUnavailableError,
42 QueueEmpty,
43 UnsupportedFeaturedException,
44)
45from music_assistant_models.media_items import (
46 Album,
47 Artist,
48 Audiobook,
49 BrowseFolder,
50 Genre,
51 ItemMapping,
52 MediaItemType,
53 PlayableMediaItemType,
54 Playlist,
55 Podcast,
56 PodcastEpisode,
57 Track,
58 UniqueList,
59 media_from_dict,
60)
61from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
62from music_assistant_models.player_queue import PlayerQueue
63from music_assistant_models.queue_item import QueueItem
64
65from music_assistant.constants import (
66 ATTR_ANNOUNCEMENT_IN_PROGRESS,
67 MASS_LOGO_ONLINE,
68 PLAYBACK_REPORT_INTERVAL_SECONDS,
69 PLAYLIST_MEDIA_TYPES,
70 VERBOSE_LOG_LEVEL,
71 PlaylistPlayableItem,
72)
73from music_assistant.controllers.players.controller import IN_QUEUE_COMMAND
74from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
75from music_assistant.helpers.api import api_command
76from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
77from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
78from music_assistant.helpers.util import get_changed_keys, percentage
79from music_assistant.models.core_controller import CoreController
80from music_assistant.models.player import Player, PlayerMedia
81
82if TYPE_CHECKING:
83 from collections.abc import Iterator
84
85 from music_assistant_models.auth import User
86 from music_assistant_models.media_items.metadata import MediaItemImage
87
88 from music_assistant import MusicAssistant
89 from music_assistant.models.player import Player
90
91CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
92CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
93
94ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
95ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
96
97CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
98CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
99CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
100CONF_DEFAULT_ENQUEUE_OPTION_GENRE = "default_enqueue_option_genre"
101CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
102CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
103CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
104CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
105CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
106CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
107CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
108RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
109CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
110CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
111
112
113class CompareState(TypedDict):
114 """Simple object where we store the (previous) state of a queue.
115
116 Used for compare actions.
117 """
118
119 queue_id: str
120 state: PlaybackState
121 current_item_id: str | None
122 next_item_id: str | None
123 current_item: QueueItem | None
124 elapsed_time: int
125 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
126 # used to determine if a track was fully played when transitioning to idle
127 last_playing_elapsed_time: int
128 stream_title: str | None
129 codec_type: ContentType | None
130 output_formats: list[str] | None
131
132
133class PlayerQueuesController(CoreController):
134 """Controller holding all logic to enqueue music for players."""
135
136 domain: str = "player_queues"
137
138 def __init__(self, mass: MusicAssistant) -> None:
139 """Initialize core controller."""
140 super().__init__(mass)
141 self._queues: dict[str, PlayerQueue] = {}
142 self._queue_items: dict[str, list[QueueItem]] = {}
143 self._prev_states: dict[str, CompareState] = {}
144 self._transitioning_players: set[str] = set()
145 self.manifest.name = "Player Queues controller"
146 self.manifest.description = (
147 "Music Assistant's core controller which manages the queues for all players."
148 )
149 self.manifest.icon = "playlist-music"
150
151 async def close(self) -> None:
152 """Cleanup on exit."""
153 # stop all playback
154 for queue in self.all():
155 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
156 await self.stop(queue.queue_id)
157
158 async def get_config_entries(
159 self,
160 action: str | None = None,
161 values: dict[str, ConfigValueType] | None = None,
162 ) -> tuple[ConfigEntry, ...]:
163 """Return all Config Entries for this core module (if any)."""
164 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
165 return (
166 ConfigEntry(
167 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
168 type=ConfigEntryType.STRING,
169 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
170 label="Items to select when you play a (in-library) artist.",
171 options=[
172 ConfigValueOption(
173 title="Only in-library tracks",
174 value="library_tracks",
175 ),
176 ConfigValueOption(
177 title="All tracks from all albums in the library",
178 value="library_album_tracks",
179 ),
180 ConfigValueOption(
181 title="All (top) tracks from (all) streaming provider(s)",
182 value="all_tracks",
183 ),
184 ConfigValueOption(
185 title="All tracks from all albums from (all) streaming provider(s)",
186 value="all_album_tracks",
187 ),
188 ],
189 ),
190 ConfigEntry(
191 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
192 type=ConfigEntryType.STRING,
193 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
194 label="Items to select when you play a (in-library) album.",
195 options=[
196 ConfigValueOption(
197 title="Only in-library tracks",
198 value="library_tracks",
199 ),
200 ConfigValueOption(
201 title="All tracks for album on (streaming) provider",
202 value="all_tracks",
203 ),
204 ],
205 ),
206 ConfigEntry(
207 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
208 type=ConfigEntryType.STRING,
209 default_value=QueueOption.REPLACE.value,
210 label="Default enqueue option for Artist item(s).",
211 options=enqueue_options,
212 description="Define the default enqueue action for this mediatype.",
213 ),
214 ConfigEntry(
215 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
216 type=ConfigEntryType.STRING,
217 default_value=QueueOption.REPLACE.value,
218 label="Default enqueue option for Album item(s).",
219 options=enqueue_options,
220 description="Define the default enqueue action for this mediatype.",
221 ),
222 ConfigEntry(
223 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
224 type=ConfigEntryType.STRING,
225 default_value=QueueOption.PLAY.value,
226 label="Default enqueue option for Track item(s).",
227 options=enqueue_options,
228 description="Define the default enqueue action for this mediatype.",
229 ),
230 ConfigEntry(
231 key=CONF_DEFAULT_ENQUEUE_OPTION_GENRE,
232 type=ConfigEntryType.STRING,
233 default_value=QueueOption.REPLACE.value,
234 label="Default enqueue option for Genre item(s).",
235 options=enqueue_options,
236 description="Define the default enqueue action for this mediatype.",
237 ),
238 ConfigEntry(
239 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
240 type=ConfigEntryType.STRING,
241 default_value=QueueOption.REPLACE.value,
242 label="Default enqueue option for Radio item(s).",
243 options=enqueue_options,
244 description="Define the default enqueue action for this mediatype.",
245 ),
246 ConfigEntry(
247 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
248 type=ConfigEntryType.STRING,
249 default_value=QueueOption.REPLACE.value,
250 label="Default enqueue option for Playlist item(s).",
251 options=enqueue_options,
252 description="Define the default enqueue action for this mediatype.",
253 ),
254 ConfigEntry(
255 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
256 type=ConfigEntryType.STRING,
257 default_value=QueueOption.REPLACE.value,
258 label="Default enqueue option for Audiobook item(s).",
259 options=enqueue_options,
260 hidden=True,
261 ),
262 ConfigEntry(
263 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
264 type=ConfigEntryType.STRING,
265 default_value=QueueOption.REPLACE.value,
266 label="Default enqueue option for Podcast item(s).",
267 options=enqueue_options,
268 hidden=True,
269 ),
270 ConfigEntry(
271 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
272 type=ConfigEntryType.STRING,
273 default_value=QueueOption.REPLACE.value,
274 label="Default enqueue option for Podcast-episode item(s).",
275 options=enqueue_options,
276 hidden=True,
277 ),
278 ConfigEntry(
279 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
280 type=ConfigEntryType.STRING,
281 default_value=QueueOption.REPLACE.value,
282 label="Default enqueue option for Folder item(s).",
283 options=enqueue_options,
284 hidden=True,
285 ),
286 )
287
288 def __iter__(self) -> Iterator[PlayerQueue]:
289 """Iterate over (available) players."""
290 return iter(self._queues.values())
291
292 @api_command("player_queues/all")
293 def all(self) -> tuple[PlayerQueue, ...]:
294 """Return all registered PlayerQueues."""
295 return tuple(self._queues.values())
296
297 @api_command("player_queues/get")
298 def get(self, queue_id: str) -> PlayerQueue | None:
299 """Return PlayerQueue by queue_id or None if not found."""
300 return self._queues.get(queue_id)
301
302 @api_command("player_queues/items")
303 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
304 """Return all QueueItems for given PlayerQueue."""
305 if queue_id not in self._queue_items:
306 return []
307
308 return self._queue_items[queue_id][offset : offset + limit]
309
310 @api_command("player_queues/get_active_queue")
311 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
312 """Return the current active/synced queue for a player."""
313 if player := self.mass.players.get_player(player_id):
314 return self.mass.players.get_active_queue(player)
315 return None
316
317 # Queue commands
318
319 @api_command("player_queues/shuffle")
320 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
321 """Configure shuffle setting on the the queue."""
322 queue = self._queues[queue_id]
323 if queue.shuffle_enabled == shuffle_enabled:
324 return # no change
325 queue.shuffle_enabled = shuffle_enabled
326 queue_items = self._queue_items[queue_id]
327 cur_index = (
328 queue.index_in_buffer if queue.index_in_buffer is not None else queue.current_index
329 )
330 if cur_index is not None:
331 next_index = cur_index + 1
332 next_items = queue_items[next_index:]
333 else:
334 next_items = []
335 next_index = 0
336 if not shuffle_enabled:
337 # shuffle disabled, try to restore original sort order of the remaining items
338 next_items.sort(key=lambda x: x.sort_index, reverse=False)
339 await self.load(
340 queue_id=queue_id,
341 queue_items=next_items,
342 insert_at_index=next_index,
343 keep_remaining=False,
344 shuffle=shuffle_enabled,
345 )
346
347 @api_command("player_queues/dont_stop_the_music")
348 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
349 """Configure Don't stop the music setting on the queue."""
350 providers_available_with_similar_tracks = any(
351 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
352 for provider in self.mass.music.providers
353 )
354 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
355 raise UnsupportedFeaturedException(
356 "Don't stop the music is not supported by any of the available music providers"
357 )
358 queue = self._queues[queue_id]
359 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
360 self.signal_update(queue_id=queue_id)
361 # if this happens to be the last track in the queue, fill the radio source
362 if (
363 queue.dont_stop_the_music_enabled
364 and queue.enqueued_media_items
365 and queue.current_index is not None
366 and (queue.items - queue.current_index) <= 1
367 ):
368 queue.radio_source = queue.enqueued_media_items
369 task_id = f"fill_radio_tracks_{queue_id}"
370 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
371
372 @api_command("player_queues/repeat")
373 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
374 """Configure repeat setting on the the queue."""
375 queue = self._queues[queue_id]
376 if queue.repeat_mode == repeat_mode:
377 return # no change
378 queue.repeat_mode = repeat_mode
379 self.signal_update(queue_id)
380 if (
381 queue.state == PlaybackState.PLAYING
382 and queue.index_in_buffer is not None
383 and queue.index_in_buffer == queue.current_index
384 ):
385 # if the queue is playing,
386 # ensure to (re)queue the next track because it might have changed
387 # note that we only do this if the player has loaded the current track
388 # if not, we wait until it has loaded to prevent conflicts
389 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
390 self._enqueue_next_item(queue_id, next_item)
391
392 @api_command("player_queues/set_playback_speed")
393 async def set_playback_speed(
394 self, queue_id: str, speed: float, queue_item_id: str | None = None
395 ) -> None:
396 """
397 Set the playback speed for the given queue item.
398
399 If queue_item_id is not provided,
400 the speed will be set for the current item in the queue.
401
402 :param queue_id: queue_id of the queue to configure.
403 :param speed: playback speed multiplier (0.5 to 2.0). 1.0 = normal speed.
404 """
405 if not (0.5 <= speed <= 2.0):
406 raise InvalidDataError(f"Playback speed must be between 0.5 and 2.0, got {speed}")
407 queue = self._queues[queue_id]
408 if not queue.current_item:
409 raise QueueEmpty("Cannot set playback speed: queue is empty")
410 queue_item_id = queue_item_id or queue.current_item.queue_item_id
411 queue_item = self.get_item(queue_id, queue_item_id)
412 if not queue_item:
413 raise InvalidDataError(f"Queue item {queue_item_id} not found in queue")
414 if not queue_item.duration or queue_item.media_type == MediaType.RADIO:
415 raise InvalidCommand("Cannot set playback speed for items with unknown duration")
416 current_speed = float(queue_item.extra_attributes.get("playback_speed") or 1.0)
417 if abs(current_speed - speed) < 0.001:
418 return # no change
419 # use extra_attributes of the queue item to store the playback speed
420 queue_item.extra_attributes["playback_speed"] = speed
421 self.signal_update(queue_id)
422 if queue.state == PlaybackState.PLAYING:
423 await self.resume(queue_id)
424
425 @api_command("player_queues/play_media")
426 async def play_media(
427 self,
428 queue_id: str,
429 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
430 option: QueueOption | None = None,
431 radio_mode: bool = False,
432 start_item: PlayableMediaItemType | str | None = None,
433 username: str | None = None,
434 ) -> None:
435 """Play media item(s) on the given queue.
436
437 :param queue_id: The queue_id of the queue to play media on.
438 :param media: Media that should be played (MediaItem(s) and/or uri's).
439 :param option: Which enqueue mode to use.
440 :param radio_mode: Enable radio mode for the given item(s).
441 :param start_item: Optional item to start the playlist or album from.
442 :param username: The username of the user requesting the playback.
443 Setting the username allows for overriding the logged-in user
444 to account for playback history per user when the play_media is
445 called from a shared context (like a web hook or automation).
446 """
447 # ruff: noqa: PLR0915
448 # we use a contextvar to bypass the throttler for this asyncio task/context
449 # this makes sure that playback has priority over other requests that may be
450 # happening in the background
451 BYPASS_THROTTLER.set(True)
452 if not (queue := self.get(queue_id)):
453 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
454 # always fetch the underlying player so we can raise early if its not available
455 queue_player = self.mass.players.get_player(queue_id, True)
456 assert queue_player is not None # for type checking
457 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
458 self.logger.warning("Ignore queue command: An announcement is in progress")
459 return
460
461 # save the user requesting the playback
462 playback_user: User | None
463 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
464 playback_user = user
465 else:
466 playback_user = get_current_user()
467 queue.userid = playback_user.user_id if playback_user else None
468
469 # a single item or list of items may be provided
470 media_list = media if isinstance(media, list) else [media]
471
472 # clear queue if needed
473 if option == QueueOption.REPLACE:
474 self.clear(queue_id)
475 # Clear the 'enqueued media item' list when a new queue is requested
476 if option not in (QueueOption.ADD, QueueOption.NEXT):
477 queue.enqueued_media_items.clear()
478
479 media_items: list[MediaItemType] = []
480 radio_source: list[MediaItemType] = []
481 # resolve all media items
482 for item in media_list:
483 try:
484 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
485 media_item: MediaItemType | ItemMapping | BrowseFolder
486 if isinstance(item, str):
487 media_item = await self.mass.music.get_item_by_uri(item)
488 elif isinstance(item, dict): # type: ignore[unreachable]
489 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
490 # converting them to MediaItem objects. The parse_value function in api.py
491 # should handle dict-to-object conversion, but dicts are slipping through
492 # in some cases. This is defensive handling for that parser bug.
493 media_item = media_from_dict(item) # type: ignore[unreachable]
494 self.logger.debug("Converted to: %s", type(media_item))
495 else:
496 # item is MediaItemType | ItemMapping at this point
497 media_item = item
498
499 # Save requested media item to play on the queue so we can use it as a source
500 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
501 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
502 if not isinstance(
503 media_item, (ItemMapping, BrowseFolder)
504 ) and media_item.media_type in (
505 MediaType.TRACK,
506 MediaType.ALBUM,
507 MediaType.PLAYLIST,
508 MediaType.ARTIST,
509 ):
510 queue.enqueued_media_items.append(media_item)
511 if len(queue.enqueued_media_items) > 10:
512 queue.enqueued_media_items.pop(0)
513
514 # handle default enqueue option if needed
515 if option is None:
516 config_value = await self.mass.config.get_core_config_value(
517 self.domain,
518 f"default_enqueue_option_{media_item.media_type.value}",
519 return_type=str,
520 )
521 option = QueueOption(config_value)
522 if option == QueueOption.REPLACE:
523 self.clear(queue_id, skip_stop=True)
524
525 # collect media_items to play
526 if radio_mode:
527 # Type guard for mypy - only add full MediaItemType to radio_source
528 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
529 radio_source.append(media_item)
530 else:
531 # Convert start_item to string URI if needed
532 start_item_uri: str | None = None
533 if isinstance(start_item, str):
534 start_item_uri = start_item
535 elif start_item is not None:
536 start_item_uri = start_item.uri
537 media_items += await self._resolve_media_items(
538 media_item, start_item_uri, queue_id=queue_id
539 )
540
541 except MusicAssistantError as err:
542 # invalid MA uri or item not found error
543 self.logger.warning("Skipping %s: %s", item, str(err))
544
545 # overwrite or append radio source items
546 if option not in (QueueOption.ADD, QueueOption.NEXT):
547 queue.radio_source = radio_source
548 else:
549 queue.radio_source += radio_source
550 # Use collected media items to calculate the radio if radio mode is on
551 if radio_mode:
552 radio_tracks = await self._get_radio_tracks(
553 queue_id=queue_id, is_initial_radio_mode=True
554 )
555 media_items = list(radio_tracks)
556
557 # only add valid/available items
558 queue_items: list[QueueItem] = []
559 for x in media_items:
560 if not x or not x.available:
561 continue
562 queue_items.append(
563 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
564 )
565
566 if not queue_items:
567 raise MediaNotFoundError("No playable items found")
568
569 # load the items into the queue
570 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
571 cur_index = (
572 queue.index_in_buffer
573 if queue.index_in_buffer is not None
574 else (queue.current_index if queue.current_index is not None else 0)
575 )
576 else:
577 cur_index = queue.current_index or 0
578 insert_at_index = cur_index + 1
579 # Radio modes are already shuffled in a pattern we would like to keep.
580 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
581
582 # handle replace: clear all items and replace with the new items
583 if option == QueueOption.REPLACE:
584 await self.load(
585 queue_id,
586 queue_items=queue_items,
587 keep_remaining=False,
588 keep_played=False,
589 shuffle=shuffle,
590 )
591 await self.play_index(queue_id, 0)
592 return
593 # handle next: add item(s) in the index next to the playing/loaded/buffered index
594 if option == QueueOption.NEXT:
595 await self.load(
596 queue_id,
597 queue_items=queue_items,
598 insert_at_index=insert_at_index,
599 shuffle=shuffle,
600 )
601 return
602 if option == QueueOption.REPLACE_NEXT:
603 await self.load(
604 queue_id,
605 queue_items=queue_items,
606 insert_at_index=insert_at_index,
607 keep_remaining=False,
608 shuffle=shuffle,
609 )
610 return
611 # handle play: replace current loaded/playing index with new item(s)
612 if option == QueueOption.PLAY:
613 await self.load(
614 queue_id,
615 queue_items=queue_items,
616 insert_at_index=insert_at_index,
617 shuffle=shuffle,
618 )
619 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
620 await self.play_index(queue_id, next_index)
621 return
622 # handle add: add/append item(s) to the remaining queue items
623 if option == QueueOption.ADD:
624 await self.load(
625 queue_id=queue_id,
626 queue_items=queue_items,
627 insert_at_index=insert_at_index
628 if queue.shuffle_enabled
629 else len(self._queue_items[queue_id]) + 1,
630 shuffle=queue.shuffle_enabled,
631 )
632 # handle edgecase, queue is empty and items are only added (not played)
633 # mark first item as new index
634 if queue.current_index is None:
635 queue.current_index = 0
636 queue.current_item = self.get_item(queue_id, 0)
637 queue.items = len(queue_items)
638 self.signal_update(queue_id)
639
640 @api_command("player_queues/move_item")
641 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
642 """
643 Move queue item x up/down the queue.
644
645 - queue_id: id of the queue to process this request.
646 - queue_item_id: the item_id of the queueitem that needs to be moved.
647 - pos_shift: move item x positions down if positive value
648 - pos_shift: move item x positions up if negative value
649 - pos_shift: move item to top of queue as next item if 0.
650 """
651 queue = self._queues[queue_id]
652 item_index = self.index_by_id(queue_id, queue_item_id)
653 if item_index is None:
654 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
655 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
656 msg = f"{item_index} is already played/buffered"
657 raise IndexError(msg)
658
659 queue_items = self._queue_items[queue_id]
660 queue_items = queue_items.copy()
661
662 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
663 new_index = (queue.current_index or 0) + 1
664 elif pos_shift == 0:
665 new_index = queue.current_index or 0
666 else:
667 new_index = item_index + pos_shift
668 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
669 return
670 # move the item in the list
671 queue_items.insert(new_index, queue_items.pop(item_index))
672 self.update_items(queue_id, queue_items)
673
674 @api_command("player_queues/move_item_end")
675 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
676 """
677 Move queue item to the end the queue.
678
679 - queue_id: id of the queue to process this request.
680 - queue_item_id: the item_id of the queueitem that needs to be moved.
681 """
682 queue = self._queues[queue_id]
683 item_index = self.index_by_id(queue_id, queue_item_id)
684 if item_index is None:
685 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
686 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
687 msg = f"{item_index} is already played/buffered"
688 raise IndexError(msg)
689
690 queue_items = self._queue_items[queue_id]
691 if item_index == (len(queue_items) - 1):
692 return
693 queue_items = queue_items.copy()
694
695 new_index = len(self._queue_items[queue_id]) - 1
696
697 # move the item in the list
698 queue_items.insert(new_index, queue_items.pop(item_index))
699 self.update_items(queue_id, queue_items)
700
701 @api_command("player_queues/delete_item")
702 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
703 """Delete item (by id or index) from the queue."""
704 if isinstance(item_id_or_index, str):
705 item_index = self.index_by_id(queue_id, item_id_or_index)
706 if item_index is None:
707 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
708 else:
709 item_index = item_id_or_index
710 queue = self._queues[queue_id]
711 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
712 # ignore request if track already loaded in the buffer
713 # the frontend should guard so this is just in case
714 self.logger.warning("delete requested for item already loaded in buffer")
715 return
716 queue_items = self._queue_items[queue_id]
717 queue_items.pop(item_index)
718 self.update_items(queue_id, queue_items)
719
720 @api_command("player_queues/clear")
721 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
722 """Clear all items in the queue."""
723 queue = self._queues[queue_id]
724 queue.radio_source = []
725 if queue.state != PlaybackState.IDLE and not skip_stop:
726 self.mass.create_task(self.stop(queue_id))
727 queue.current_index = None
728 queue.current_item = None
729 queue.elapsed_time = 0
730 queue.elapsed_time_last_updated = time.time()
731 queue.index_in_buffer = None
732 self.mass.create_task(self.mass.streams.cleanup_queue_audio_data(queue_id))
733 self.update_items(queue_id, [])
734
735 @api_command("player_queues/save_as_playlist")
736 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
737 """Save the current queue items as a new playlist.
738
739 :param queue_id: The queue_id of the queue to save.
740 :param name: The name for the new playlist.
741 """
742 if not self.get(queue_id):
743 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
744 queue_items = self._queue_items.get(queue_id, [])
745 if not queue_items:
746 raise QueueEmpty("Cannot save an empty queue as a playlist.")
747 # collect URIs from queue items that are playlist-compatible
748 uris: list[str] = []
749 for item in queue_items:
750 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
751 uris.append(item.uri)
752 if not uris:
753 raise InvalidDataError("No valid items in queue to save as playlist.")
754 playlist = await self.mass.music.playlists.create_playlist(name)
755 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
756 return playlist
757
758 @api_command("player_queues/stop")
759 async def stop(self, queue_id: str) -> None:
760 """
761 Handle STOP command for given queue.
762
763 - queue_id: queue_id of the playerqueue to handle the command.
764 """
765 queue_player = self.mass.players.get_player(queue_id, True)
766 if queue_player is None:
767 raise PlayerUnavailableError(f"Player {queue_id} is not available")
768 if (queue := self.get(queue_id)) and queue.active:
769 if queue.state == PlaybackState.PLAYING:
770 queue.resume_pos = int(queue.corrected_elapsed_time)
771 # Set context to prevent circular call, then forward the actual command to the player
772 token = IN_QUEUE_COMMAND.set(True)
773 try:
774 await self.mass.players.cmd_stop(queue_id)
775 finally:
776 IN_QUEUE_COMMAND.reset(token)
777 self.mass.create_task(self.mass.streams.cleanup_queue_audio_data(queue_id))
778
779 @api_command("player_queues/play")
780 async def play(self, queue_id: str) -> None:
781 """
782 Handle PLAY command for given queue.
783
784 - queue_id: queue_id of the playerqueue to handle the command.
785 """
786 queue_player = self.mass.players.get_player(queue_id, True)
787 if queue_player is None:
788 raise PlayerUnavailableError(f"Player {queue_id} is not available")
789 if (
790 (queue := self._queues.get(queue_id))
791 and queue.active
792 and queue.state == PlaybackState.PAUSED
793 ):
794 # forward the actual play/unpause command to the player
795 await queue_player.play()
796 return
797 # player is not paused, perform resume instead
798 await self.resume(queue_id)
799
800 @api_command("player_queues/pause")
801 async def pause(self, queue_id: str) -> None:
802 """Handle PAUSE command for given queue.
803
804 - queue_id: queue_id of the playerqueue to handle the command.
805 """
806 if not (queue := self._queues.get(queue_id)):
807 return
808 queue_active = queue.active
809 if queue.active and queue.state == PlaybackState.PLAYING:
810 queue.resume_pos = int(queue.corrected_elapsed_time)
811 # forward the actual command to the player controller
812 # Set context to prevent circular call, then forward the actual command to the player
813 token = IN_QUEUE_COMMAND.set(True)
814 try:
815 await self.mass.players.cmd_pause(queue_id)
816 finally:
817 IN_QUEUE_COMMAND.reset(token)
818
819 async def _watch_pause(player: Player) -> None:
820 count = 0
821 # wait for pause
822 while count < 5 and player.state.playback_state == PlaybackState.PLAYING:
823 count += 1
824 await asyncio.sleep(1)
825 # wait for unpause
826 if player.state.playback_state != PlaybackState.PAUSED:
827 return
828 count = 0
829 while count < 30 and player.state.playback_state == PlaybackState.PAUSED:
830 count += 1
831 await asyncio.sleep(1)
832 # if player is still paused when the limit is reached, send stop
833 if player.state.playback_state == PlaybackState.PAUSED:
834 await self.stop(queue_id)
835
836 # we auto stop a player from paused when its paused for 30 seconds
837 if (
838 queue_active
839 and (queue_player := self.mass.players.get_player(queue_id))
840 and not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS)
841 ):
842 self.mass.create_task(_watch_pause(queue_player))
843
844 @api_command("player_queues/play_pause")
845 async def play_pause(self, queue_id: str) -> None:
846 """Toggle play/pause on given playerqueue.
847
848 - queue_id: queue_id of the queue to handle the command.
849 """
850 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
851 await self.pause(queue_id)
852 return
853 await self.play(queue_id)
854
855 @api_command("player_queues/next")
856 async def next(self, queue_id: str) -> None:
857 """Handle NEXT TRACK command for given queue.
858
859 - queue_id: queue_id of the queue to handle the command.
860 """
861 if (queue := self.get(queue_id)) is None or not queue.active:
862 raise InvalidCommand(f"Queue {queue_id} is not active")
863 idx = self._queues[queue_id].current_index
864 if idx is None:
865 self.logger.warning("Queue %s has no current index", queue.display_name)
866 return
867 attempts = 5
868 while attempts:
869 try:
870 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
871 await self.play_index(queue_id, next_index, debounce=True)
872 break
873 except MediaNotFoundError:
874 self.logger.warning(
875 "Failed to fetch next track for queue %s - trying next item",
876 queue.display_name,
877 )
878 idx += 1
879 attempts -= 1
880
881 @api_command("player_queues/previous")
882 async def previous(self, queue_id: str) -> None:
883 """Handle PREVIOUS TRACK command for given queue.
884
885 - queue_id: queue_id of the queue to handle the command.
886 """
887 if (queue := self.get(queue_id)) is None or not queue.active:
888 raise InvalidCommand(f"Queue {queue_id} is not active")
889 current_index = self._queues[queue_id].current_index
890 if current_index is None:
891 return
892 next_index = int(current_index)
893 # restart current track if current track has played longer than 4
894 # otherwise skip to previous track
895 if self._queues[queue_id].elapsed_time < 5:
896 next_index = max(current_index - 1, 0)
897 await self.play_index(queue_id, next_index, debounce=True)
898
899 @api_command("player_queues/skip")
900 async def skip(self, queue_id: str, seconds: int = 10) -> None:
901 """Handle SKIP command for given queue.
902
903 - queue_id: queue_id of the queue to handle the command.
904 - seconds: number of seconds to skip in track. Use negative value to skip back.
905 """
906 if (queue := self.get(queue_id)) is None or not queue.active:
907 raise InvalidCommand(f"Queue {queue_id} is not active")
908 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
909
910 @api_command("player_queues/seek")
911 async def seek(self, queue_id: str, position: int = 10) -> None:
912 """Handle SEEK command for given queue.
913
914 - queue_id: queue_id of the queue to handle the command.
915 - position: position in seconds to seek to in the current playing item.
916 """
917 if (queue := self.get(queue_id)) is None or not queue.active:
918 raise InvalidCommand(f"Queue {queue_id} is not active")
919 queue_player = self.mass.players.get_player(queue_id, True)
920 if queue_player is None:
921 raise PlayerUnavailableError(f"Player {queue_id} is not available")
922 if not queue.current_item:
923 raise InvalidCommand(f"Queue {queue_player.state.name} has no item(s) loaded.")
924 if not queue.current_item.duration:
925 raise InvalidCommand("Can not seek items without duration.")
926 position = max(0, int(position))
927 if position > queue.current_item.duration:
928 raise InvalidCommand("Can not seek outside of duration range.")
929 if queue.current_index is None:
930 raise InvalidCommand(f"Queue {queue_player.state.name} has no current index.")
931 await self.play_index(queue_id, queue.current_index, seek_position=position)
932
933 @api_command("player_queues/resume")
934 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
935 """Handle RESUME command for given queue.
936
937 - queue_id: queue_id of the queue to handle the command.
938 """
939 queue = self._queues[queue_id]
940 queue_items = self._queue_items[queue_id]
941 resume_item = queue.current_item
942 if queue.state == PlaybackState.PLAYING:
943 # resume requested while already playing,
944 # use current position as resume position
945 resume_pos = queue.corrected_elapsed_time
946 fade_in = False
947 else:
948 resume_pos = queue.resume_pos or queue.elapsed_time
949
950 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
951 resume_item = self.get_item(queue_id, queue.current_index)
952 resume_pos = 0
953 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
954 # items available in queue but no previous track, start at 0
955 resume_item = self.get_item(queue_id, 0)
956 resume_pos = 0
957
958 if resume_item is not None:
959 queue_player = self.mass.players.get_player(queue_id)
960 if queue_player is None:
961 raise PlayerUnavailableError(f"Player {queue_id} is not available")
962 if (
963 fade_in is None
964 and queue_player.state.playback_state == PlaybackState.IDLE
965 and (time.time() - queue.elapsed_time_last_updated) > 60
966 ):
967 # enable fade in effect if the player is idle for a while
968 fade_in = resume_pos > 0
969 if resume_item.media_type == MediaType.RADIO:
970 # we're not able to skip in online radio so this is pointless
971 resume_pos = 0
972 await self.play_index(
973 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
974 )
975 else:
976 # Queue is empty, try to resume from playlog
977 if await self._try_resume_from_playlog(queue):
978 return
979 msg = f"Resume queue requested but queue {queue.display_name} is empty"
980 raise QueueEmpty(msg)
981
982 @api_command("player_queues/play_index")
983 async def play_index(
984 self,
985 queue_id: str,
986 index: int | str,
987 seek_position: int = 0,
988 fade_in: bool = False,
989 debounce: bool = False,
990 ) -> None:
991 """Play item at index (or item_id) X in queue."""
992 queue = self._queues[queue_id]
993 queue.resume_pos = 0
994 if isinstance(index, str):
995 temp_index = self.index_by_id(queue_id, index)
996 if temp_index is None:
997 raise InvalidDataError(f"Item {index} not found in queue")
998 index = temp_index
999 # At this point index is guaranteed to be int
1000 queue.current_index = index
1001 # update current item and elapsed time and signal update
1002 # this way the UI knows immediately that a new item is loading
1003 queue.current_item = self.get_item(queue_id, index)
1004 queue.elapsed_time = seek_position
1005 queue.elapsed_time_last_updated = time.time()
1006 self.signal_update(queue_id)
1007 queue.index_in_buffer = index
1008 queue.flow_mode_stream_log = []
1009 target_player = self.mass.players.get_player(queue_id)
1010 if target_player is None:
1011 raise PlayerUnavailableError(f"Player {queue_id} is not available")
1012 queue.next_item_id_enqueued = None
1013 # always update session id when we start a new playback session
1014 queue.session_id = shortuuid.random(length=8)
1015 # handle resume point of audiobook(chapter) or podcast(episode)
1016 if (
1017 not seek_position
1018 and (queue_item := self.get_item(queue_id, index))
1019 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
1020 ):
1021 seek_position = max(0, int((resume_position_ms - 500) / 1000))
1022
1023 # send play_media request to player
1024 # NOTE that we debounce this a bit to account for someone hitting the next button
1025 # like a madman. This will prevent the player from being overloaded with requests.
1026 async def _play_index(index: int, debounce: bool) -> None:
1027 for attempt in range(5):
1028 try:
1029 queue_item = self.get_item(queue_id, index)
1030 if not queue_item:
1031 continue # guard
1032 await self._load_item(
1033 queue_item,
1034 self._get_next_index(queue_id, index),
1035 is_start=True,
1036 seek_position=seek_position if attempt == 0 else 0,
1037 fade_in=fade_in if attempt == 0 else False,
1038 )
1039 # if we reach this point, loading the item succeeded, break the loop
1040 queue.current_index = index
1041 queue.current_item = queue_item
1042 break
1043 except (MediaNotFoundError, AudioError):
1044 # the requested index can not be played.
1045 if queue_item:
1046 self.logger.warning(
1047 "Skipping unplayable item %s (%s)",
1048 queue_item.name,
1049 queue_item.uri,
1050 )
1051 queue_item.available = False
1052 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
1053 if next_index is None:
1054 raise MediaNotFoundError("No next item available")
1055 index = next_index
1056 else:
1057 # all attempts to find a playable item failed
1058 raise MediaNotFoundError("No playable item found to start playback")
1059
1060 # Reset flow_mode - the streams controller will set it if flow mode is used.
1061 queue.flow_mode = False
1062 await asyncio.sleep(0.5 if debounce else 0.1)
1063 await self.mass.players.play_media(
1064 player_id=queue_id,
1065 media=await self.player_media_from_queue_item(queue_item),
1066 )
1067 queue.current_index = index
1068 queue.current_item = queue_item
1069 await asyncio.sleep(2)
1070 self._transitioning_players.discard(queue_id)
1071
1072 # we set a flag to notify the update logic that we're transitioning to a new track
1073 self._transitioning_players.add(queue_id)
1074
1075 # we debounce the play_index command to handle the case where someone
1076 # is spamming next/previous on the player
1077 task_id = f"play_index_{queue_id}"
1078 if existing_task := self.mass.get_task(task_id):
1079 existing_task.cancel()
1080 with suppress(asyncio.CancelledError):
1081 await existing_task
1082 task = self.mass.create_task(
1083 _play_index,
1084 index,
1085 debounce,
1086 task_id=task_id,
1087 )
1088 await task
1089 self.signal_update(queue_id)
1090
1091 @api_command("player_queues/transfer")
1092 async def transfer_queue(
1093 self,
1094 source_queue_id: str,
1095 target_queue_id: str,
1096 auto_play: bool | None = None,
1097 ) -> None:
1098 """Transfer queue to another queue."""
1099 if not (source_queue := self.get(source_queue_id)):
1100 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1101 if not (target_queue := self.get(target_queue_id)):
1102 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1103 if auto_play is None:
1104 auto_play = source_queue.state == PlaybackState.PLAYING
1105
1106 target_player = self.mass.players.get_player(target_queue_id)
1107 if target_player is None:
1108 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1109 if target_player.state.active_group or target_player.state.synced_to:
1110 # edge case: the user wants to move playback from the group as a whole, to a single
1111 # player in the group or it is grouped and the command targeted at the single player.
1112 # We need to dissolve the group first.
1113 group_id = target_player.state.active_group or target_player.state.synced_to
1114 assert group_id is not None # checked in if condition above
1115 await self.mass.players.cmd_ungroup(group_id)
1116 await asyncio.sleep(3)
1117
1118 source_items = self._queue_items[source_queue_id]
1119 target_queue.repeat_mode = source_queue.repeat_mode
1120 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1121 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1122 target_queue.radio_source = source_queue.radio_source
1123 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1124 target_queue.resume_pos = int(source_queue.elapsed_time)
1125 target_queue.current_index = source_queue.current_index
1126 if source_queue.current_item:
1127 target_queue.current_item = source_queue.current_item
1128 target_queue.current_item.queue_id = target_queue_id
1129 self.clear(source_queue_id)
1130
1131 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1132 for item in source_items:
1133 item.queue_id = target_queue_id
1134 self.update_items(target_queue_id, source_items)
1135 if auto_play:
1136 await self.resume(target_queue_id)
1137
1138 # Interaction with player
1139
1140 async def on_player_register(self, player: Player) -> None:
1141 """Register PlayerQueue for given player/queue id."""
1142 queue_id = player.player_id
1143 queue: PlayerQueue | None = None
1144 queue_items: list[QueueItem] = []
1145 # try to restore previous state
1146 if prev_state := await self.mass.cache.get(
1147 key=queue_id,
1148 provider=self.domain,
1149 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1150 ):
1151 try:
1152 queue = PlayerQueue.from_dict(prev_state)
1153 prev_items = await self.mass.cache.get(
1154 key=queue_id,
1155 provider=self.domain,
1156 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1157 default=[],
1158 )
1159 queue_items = []
1160 for idx, item_data in enumerate(prev_items):
1161 qi = QueueItem.from_cache(item_data)
1162 if not qi.media_item:
1163 # Skip items with missing media_item - this can happen if
1164 # MA was killed during shutdown while cache was being written
1165 self.logger.debug(
1166 "Skipping queue item %s (index %d) restored from cache "
1167 "without media_item",
1168 qi.name,
1169 idx,
1170 )
1171 continue
1172 queue_items.append(qi)
1173 if queue.enqueued_media_items:
1174 # we need to restore the MediaItem objects for the enqueued media items
1175 # Items from cache may be dicts that need deserialization
1176 restored_enqueued_items: list[MediaItemType] = []
1177 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1178 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1179 )
1180 for item in cached_items:
1181 if isinstance(item, dict):
1182 restored_item = media_from_dict(item)
1183 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1184 else:
1185 restored_enqueued_items.append(item)
1186 queue.enqueued_media_items = restored_enqueued_items
1187 except Exception as err:
1188 self.logger.warning(
1189 "Failed to restore the queue(items) for %s - %s",
1190 player.state.name,
1191 str(err),
1192 )
1193 # Reset to clean state on failure
1194 queue = None
1195 queue_items = []
1196 if queue is None:
1197 queue = PlayerQueue(
1198 queue_id=queue_id,
1199 active=False,
1200 display_name=player.state.name,
1201 available=player.state.available,
1202 dont_stop_the_music_enabled=False,
1203 items=0,
1204 )
1205
1206 self._queues[queue_id] = queue
1207 self._queue_items[queue_id] = queue_items
1208 # always call update to calculate state etc
1209 self.on_player_update(player, {})
1210 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1211
1212 def on_player_update(
1213 self,
1214 player: Player,
1215 changed_values: dict[str, tuple[Any, Any]],
1216 ) -> None:
1217 """
1218 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1219
1220 NOTE: This is called every second if the player is playing.
1221 """
1222 queue_id = player.player_id
1223 if (queue := self._queues.get(queue_id)) is None:
1224 # race condition
1225 return
1226 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1227 # do nothing while the announcement is in progress
1228 return
1229 # determine if this queue is currently active for this player
1230 queue.active = player.state.active_source in (queue.queue_id, None)
1231 if not queue.active and queue_id not in self._prev_states:
1232 queue.state = PlaybackState.IDLE
1233 # return early if the queue is not active and we have no previous state
1234 return
1235 if queue.queue_id in self._transitioning_players:
1236 # we're currently transitioning to a new track,
1237 # ignore updates from the player during this time
1238 return
1239
1240 # queue is active and preflight checks passed, update the queue details
1241 self._update_queue_from_player(player)
1242
1243 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1244 """Call when a player is removed from the registry."""
1245 if permanent:
1246 # if the player is permanently removed, we also remove the cached queue data
1247 self.mass.create_task(
1248 self.mass.cache.delete(
1249 key=player_id,
1250 provider=self.domain,
1251 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1252 )
1253 )
1254 self.mass.create_task(
1255 self.mass.cache.delete(
1256 key=player_id,
1257 provider=self.domain,
1258 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1259 )
1260 )
1261 self._queues.pop(player_id, None)
1262 self._queue_items.pop(player_id, None)
1263
1264 async def load_next_queue_item(
1265 self,
1266 queue_id: str,
1267 current_item_id: str,
1268 ) -> QueueItem:
1269 """
1270 Call when a player wants the next queue item to play.
1271
1272 Raises QueueEmpty if there are no more tracks left.
1273 """
1274 queue = self.get(queue_id)
1275 if not queue:
1276 msg = f"PlayerQueue {queue_id} is not available"
1277 raise PlayerUnavailableError(msg)
1278 cur_index = self.index_by_id(queue_id, current_item_id)
1279 if cur_index is None:
1280 # this is just a guard for bad data
1281 raise QueueEmpty("Invalid item id for queue given.")
1282 next_item: QueueItem | None = None
1283 idx = 0
1284 while True:
1285 next_index = self._get_next_index(queue_id, cur_index + idx)
1286 if next_index is None:
1287 raise QueueEmpty("No more tracks left in the queue.")
1288 queue_item = self.get_item(queue_id, next_index)
1289 if queue_item is None:
1290 raise QueueEmpty("No more tracks left in the queue.")
1291 if idx >= 10:
1292 # we only allow 10 retries to prevent infinite loops
1293 raise QueueEmpty("No more (playable) tracks left in the queue.")
1294 try:
1295 await self._load_item(queue_item, next_index)
1296 # we're all set, this is our next item
1297 next_item = queue_item
1298 break
1299 except (MediaNotFoundError, AudioError):
1300 # No stream details found, skip this QueueItem
1301 self.logger.warning(
1302 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1303 )
1304 queue_item.available = False
1305 idx += 1
1306 if idx != 0:
1307 # we skipped some items, signal a queue items update
1308 self.update_items(queue_id, self._queue_items[queue_id])
1309 if next_item is None:
1310 raise QueueEmpty("No more (playable) tracks left in the queue.")
1311
1312 return next_item
1313
1314 async def _load_item(
1315 self,
1316 queue_item: QueueItem,
1317 next_index: int | None,
1318 is_start: bool = False,
1319 seek_position: int = 0,
1320 fade_in: bool = False,
1321 ) -> None:
1322 """Try to load the stream details for the given queue item."""
1323 queue_id = queue_item.queue_id
1324 queue = self._queues[queue_id]
1325
1326 # we use a contextvar to bypass the throttler for this asyncio task/context
1327 # this makes sure that playback has priority over other requests that may be
1328 # happening in the background
1329 BYPASS_THROTTLER.set(True)
1330
1331 self.logger.debug(
1332 "(pre)loading (next) item for queue %s...",
1333 queue.display_name,
1334 )
1335
1336 if not queue_item.available:
1337 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1338
1339 # work out if we are playing an album and if we should prefer album
1340 # loudness
1341 next_track_from_same_album = (
1342 next_index is not None
1343 and (next_item := self.get_item(queue_id, next_index))
1344 and (
1345 queue_item.media_item
1346 and hasattr(queue_item.media_item, "album")
1347 and queue_item.media_item.album
1348 and next_item.media_item
1349 and hasattr(next_item.media_item, "album")
1350 and next_item.media_item.album
1351 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1352 )
1353 )
1354 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1355 if current_index is None:
1356 previous_track_from_same_album = False
1357 else:
1358 previous_index = max(current_index - 1, 0)
1359 previous_track_from_same_album = (
1360 previous_index > 0
1361 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1362 and previous_item.media_item is not None
1363 and hasattr(previous_item.media_item, "album")
1364 and previous_item.media_item.album is not None
1365 and queue_item.media_item is not None
1366 and hasattr(queue_item.media_item, "album")
1367 and queue_item.media_item.album is not None
1368 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1369 )
1370 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1371 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1372 album = queue_item.media_item.album
1373 # prefer the full library media item so we have all metadata and provider(quality) info
1374 # always request the full library item as there might be other qualities available
1375 if library_item := await self.mass.music.get_library_item_by_prov_id(
1376 queue_item.media_item.media_type,
1377 queue_item.media_item.item_id,
1378 queue_item.media_item.provider,
1379 ):
1380 queue_item.media_item = cast("Track", library_item)
1381 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1382 "ytmusic"
1383 ):
1384 # Youtube Music has poor thumbs by default, so we always fetch the full item
1385 # this also catches the case where they have an unavailable item in a listing
1386 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1387 queue_item.media_item = cast("Track", fetched_item)
1388
1389 # ensure we got the full (original) album set
1390 if album and (
1391 library_album := await self.mass.music.get_library_item_by_prov_id(
1392 album.media_type,
1393 album.item_id,
1394 album.provider,
1395 )
1396 ):
1397 queue_item.media_item.album = cast("Album", library_album)
1398 elif album:
1399 # Restore original album if we have no better alternative from the library
1400 queue_item.media_item.album = album
1401 # prefer album image over track image
1402 if queue_item.media_item.album and queue_item.media_item.album.image:
1403 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1404 queue_item.media_item.metadata.images = UniqueList(
1405 [
1406 queue_item.media_item.album.image,
1407 *org_images,
1408 ]
1409 )
1410 # Fetch the streamdetails, which could raise in case of an unplayable item.
1411 # For example, YT Music returns Radio Items that are not playable.
1412 queue_item.streamdetails = await get_stream_details(
1413 mass=self.mass,
1414 queue_item=queue_item,
1415 seek_position=seek_position,
1416 fade_in=fade_in,
1417 prefer_album_loudness=bool(playing_album_tracks),
1418 )
1419
1420 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1421 """Call when a player has (started) loading a track in the buffer."""
1422 queue = self.get(queue_id)
1423 if not queue:
1424 msg = f"PlayerQueue {queue_id} is not available"
1425 raise PlayerUnavailableError(msg)
1426 # store the index of the item that is currently (being) loaded in the buffer
1427 # which helps us a bit to determine how far the player has buffered ahead
1428 current_index = self.index_by_id(queue_id, item_id)
1429 queue.index_in_buffer = current_index
1430 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1431 self.signal_update(queue_id)
1432 # preload next streamdetails
1433 self._preload_next_item(queue_id, item_id)
1434 # clean up stale audio buffers for old queue items to prevent memory leaks
1435 if current_index is not None:
1436 self.mass.create_task(
1437 self.mass.streams.cleanup_stale_queue_buffers(queue_id, current_index)
1438 )
1439
1440 # Main queue manipulation methods
1441
1442 async def load(
1443 self,
1444 queue_id: str,
1445 queue_items: list[QueueItem],
1446 insert_at_index: int = 0,
1447 keep_remaining: bool = True,
1448 keep_played: bool = True,
1449 shuffle: bool = False,
1450 ) -> None:
1451 """Load new items at index.
1452
1453 - queue_id: id of the queue to process this request.
1454 - queue_items: a list of QueueItems
1455 - insert_at_index: insert the item(s) at this index
1456 - keep_remaining: keep the remaining items after the insert
1457 - shuffle: (re)shuffle the items after insert index
1458 """
1459 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1460 next_items = queue_items
1461
1462 # if keep_remaining, append the old 'next' items
1463 if keep_remaining:
1464 next_items += self._queue_items[queue_id][insert_at_index:]
1465
1466 # we set the original insert order as attribute so we can un-shuffle
1467 for index, item in enumerate(next_items):
1468 item.sort_index += insert_at_index + index
1469 # (re)shuffle the final batch if needed
1470 if shuffle:
1471 next_items = await _smart_shuffle(next_items)
1472 self.update_items(queue_id, prev_items + next_items)
1473
1474 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1475 """Update the existing queue items, mostly caused by reordering."""
1476 self._queue_items[queue_id] = queue_items
1477 queue = self._queues[queue_id]
1478 queue.items = len(self._queue_items[queue_id])
1479 # to track if the queue items changed we set a timestamp
1480 # this is a simple way to detect changes in the list of items
1481 # without having to compare the entire list
1482 queue.items_last_updated = time.time()
1483 self.signal_update(queue_id, True)
1484 if (
1485 queue.state == PlaybackState.PLAYING
1486 and queue.index_in_buffer is not None
1487 and queue.index_in_buffer == queue.current_index
1488 ):
1489 # if the queue is playing,
1490 # ensure to (re)queue the next track because it might have changed
1491 # note that we only do this if the player has loaded the current track
1492 # if not, we wait until it has loaded to prevent conflicts
1493 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1494 self._enqueue_next_item(queue_id, next_item)
1495
1496 # Helper methods
1497
1498 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1499 """Get queue item by index or item_id."""
1500 if item_id_or_index is None:
1501 return None
1502 if (queue_items := self._queue_items.get(queue_id)) is None:
1503 return None
1504 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1505 return queue_items[item_id_or_index]
1506 if isinstance(item_id_or_index, str):
1507 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1508 return None
1509
1510 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1511 """Signal state changed of given queue."""
1512 queue = self._queues[queue_id]
1513 if items_changed:
1514 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1515 # save items in cache - only cache items with valid media_item
1516 cache_data = [
1517 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1518 ]
1519 self.mass.create_task(
1520 self.mass.cache.set(
1521 key=queue_id,
1522 data=cache_data,
1523 provider=self.domain,
1524 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1525 )
1526 )
1527 # always send the base event
1528 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1529 # also signal update to the player itself so it can update its current_media
1530 self.mass.players.trigger_player_update(queue_id)
1531 # save state
1532 self.mass.create_task(
1533 self.mass.cache.set(
1534 key=queue_id,
1535 data=queue.to_cache(),
1536 provider=self.domain,
1537 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1538 )
1539 )
1540
1541 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1542 """Get index by queue_item_id."""
1543 queue_items = self._queue_items[queue_id]
1544 for index, item in enumerate(queue_items):
1545 if item.queue_item_id == queue_item_id:
1546 return index
1547 return None
1548
1549 async def player_media_from_queue_item(self, queue_item: QueueItem) -> PlayerMedia:
1550 """
1551 Parse PlayerMedia from QueueItem.
1552
1553 :param queue_item: The queue item to create media from.
1554 """
1555 queue = self._queues[queue_item.queue_id]
1556 if queue_item.streamdetails:
1557 # prefer netto duration
1558 # when seeking, the player only receives the remaining duration
1559 duration = queue_item.streamdetails.duration or queue_item.duration
1560 if duration and queue_item.streamdetails.seek_position:
1561 duration = duration - queue_item.streamdetails.seek_position
1562 else:
1563 duration = queue_item.duration
1564 if queue.session_id is None:
1565 # handle error or return early
1566 raise InvalidDataError("Queue session_id is None")
1567 media = PlayerMedia(
1568 uri=queue_item.uri,
1569 media_type=queue_item.media_type,
1570 title=queue_item.name,
1571 image_url=MASS_LOGO_ONLINE,
1572 duration=duration,
1573 source_id=queue_item.queue_id,
1574 queue_item_id=queue_item.queue_item_id,
1575 custom_data={
1576 "session_id": queue.session_id,
1577 "original_uri": queue_item.uri,
1578 },
1579 )
1580 if queue_item.media_item:
1581 media.title = queue_item.media_item.name
1582 media.artist = getattr(queue_item.media_item, "artist_str", "")
1583 media.album = (
1584 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1585 )
1586 if queue_item.image:
1587 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1588 # we prefer the imageproxy on the streamserver here because this request is sent
1589 # to the player itself which may not be able to reach the regular webserver
1590 media.image_url = self.mass.metadata.get_image_url(
1591 queue_item.image, size=500, prefer_stream_server=True
1592 )
1593 return media
1594
1595 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1596 """Return tracks for given artist, based on user preference."""
1597 artist_items_conf = self.mass.config.get_raw_core_config_value(
1598 self.domain,
1599 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1600 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1601 )
1602 self.logger.info(
1603 "Fetching tracks to play for artist %s",
1604 artist.name,
1605 )
1606 if artist_items_conf in ("library_tracks", "all_tracks"):
1607 all_items = await self.mass.music.artists.tracks(
1608 artist.item_id,
1609 artist.provider,
1610 in_library_only=artist_items_conf == "library_tracks",
1611 )
1612 random.shuffle(all_items)
1613 return all_items
1614 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1615 all_tracks: list[Track] = []
1616 for library_album in await self.mass.music.artists.albums(
1617 artist.item_id,
1618 artist.provider,
1619 in_library_only=artist_items_conf == "library_album_tracks",
1620 ):
1621 for album_track in await self.mass.music.albums.tracks(
1622 library_album.item_id, library_album.provider
1623 ):
1624 if album_track not in all_tracks:
1625 all_tracks.append(album_track)
1626 random.shuffle(all_tracks)
1627 return all_tracks
1628 return []
1629
1630 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1631 """Return tracks for given album, based on user preference."""
1632 album_items_conf = self.mass.config.get_raw_core_config_value(
1633 self.domain,
1634 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1635 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1636 )
1637 result: list[Track] = []
1638 start_item_found = False
1639 self.logger.info(
1640 "Fetching tracks to play for album %s",
1641 album.name,
1642 )
1643 for album_track in await self.mass.music.albums.tracks(
1644 item_id=album.item_id,
1645 provider_instance_id_or_domain=album.provider,
1646 in_library_only=album_items_conf == "library_tracks",
1647 ):
1648 if not album_track.available:
1649 continue
1650 if start_item in (album_track.item_id, album_track.uri):
1651 start_item_found = True
1652 if start_item is not None and not start_item_found:
1653 continue
1654 result.append(album_track)
1655 return result
1656
1657 async def get_genre_tracks(self, genre: Genre, start_item: str | None) -> list[Track]:
1658 """Return tracks for given genre, based on alias mappings.
1659
1660 Limits results to avoid loading thousands of tracks for broad genres.
1661 Directly mapped tracks are fetched with random ordering, then supplemented
1662 with tracks from a limited set of mapped albums and artists.
1663 """
1664 result: list[Track] = []
1665 start_item_found = False
1666 self.logger.info(
1667 "Fetching tracks to play for genre %s",
1668 genre.name,
1669 )
1670 tracks, albums, artists = await self.mass.music.genres.mapped_media(
1671 genre,
1672 track_limit=25,
1673 album_limit=5,
1674 artist_limit=5,
1675 order_by="random",
1676 )
1677
1678 for genre_track in tracks:
1679 if not genre_track.available:
1680 continue
1681 if start_item in (genre_track.item_id, genre_track.uri):
1682 start_item_found = True
1683 if start_item is not None and not start_item_found:
1684 continue
1685 result.append(genre_track)
1686
1687 for album in albums:
1688 album_tracks = await self.get_album_tracks(album, None)
1689 result.extend(album_tracks[:5])
1690
1691 for artist in artists:
1692 artist_tracks = await self.get_artist_tracks(artist)
1693 result.extend(artist_tracks[:5])
1694 return result
1695
1696 async def get_playlist_tracks(
1697 self, playlist: Playlist, start_item: str | None
1698 ) -> list[PlaylistPlayableItem]:
1699 """Return tracks for given playlist, based on user preference."""
1700 result: list[PlaylistPlayableItem] = []
1701 start_item_found = False
1702 self.logger.info(
1703 "Fetching tracks to play for playlist %s",
1704 playlist.name,
1705 )
1706 # TODO: Handle other sort options etc.
1707 async for playlist_track in self.mass.music.playlists.tracks(
1708 playlist.item_id, playlist.provider
1709 ):
1710 if not playlist_track.available:
1711 continue
1712 if start_item in (playlist_track.item_id, playlist_track.uri):
1713 start_item_found = True
1714 if start_item is not None and not start_item_found:
1715 continue
1716 result.append(playlist_track)
1717 return result
1718
1719 async def get_audiobook_resume_point(
1720 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1721 ) -> int:
1722 """Return resume point (in milliseconds) for given audio book."""
1723 self.logger.debug(
1724 "Fetching resume point to play for audio book %s",
1725 audio_book.name,
1726 )
1727 if chapter is not None:
1728 # user explicitly selected a chapter to play
1729 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1730 if chapters := audio_book.metadata.chapters:
1731 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1732 return int(_chapter.start * 1000)
1733 raise InvalidDataError(
1734 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1735 )
1736 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1737 audio_book, userid=userid
1738 )
1739 return 0 if full_played else resume_position_ms
1740
1741 async def get_next_podcast_episodes(
1742 self,
1743 podcast: Podcast | None,
1744 episode: PodcastEpisode | str | None,
1745 userid: str | None = None,
1746 ) -> UniqueList[PodcastEpisode]:
1747 """Return (next) episode(s) and resume point for given podcast."""
1748 if podcast is None and isinstance(episode, str | NoneType):
1749 raise InvalidDataError("Either podcast or episode must be provided")
1750 if podcast is None:
1751 # single podcast episode requested
1752 assert isinstance(episode, PodcastEpisode) # checked above
1753 self.logger.debug(
1754 "Fetching resume point to play for Podcast episode %s",
1755 episode.name,
1756 )
1757 (
1758 fully_played,
1759 resume_position_ms,
1760 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1761 episode.fully_played = fully_played
1762 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1763 return UniqueList([episode])
1764 # podcast with optional start episode requested
1765 self.logger.debug(
1766 "Fetching episode(s) and resume point to play for Podcast %s",
1767 podcast.name,
1768 )
1769 all_episodes = [
1770 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1771 ]
1772 all_episodes.sort(key=lambda x: x.position)
1773 # if a episode was provided, a user explicitly selected a episode to play
1774 # so we need to find the index of the episode in the list
1775 resolved_episode: PodcastEpisode | None = None
1776 if isinstance(episode, PodcastEpisode):
1777 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1778 if resolved_episode:
1779 # ensure we have accurate resume info
1780 (
1781 fully_played,
1782 resume_position_ms,
1783 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1784 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1785 elif isinstance(episode, str):
1786 resolved_episode = next(
1787 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1788 )
1789 if resolved_episode:
1790 # ensure we have accurate resume info
1791 (
1792 fully_played,
1793 resume_position_ms,
1794 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1795 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1796 else:
1797 # get first episode that is not fully played
1798 for ep in all_episodes:
1799 if ep.fully_played:
1800 continue
1801 # ensure we have accurate resume info
1802 (
1803 fully_played,
1804 resume_position_ms,
1805 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1806 if fully_played:
1807 continue
1808 ep.resume_position_ms = resume_position_ms
1809 resolved_episode = ep
1810 break
1811 else:
1812 # no episodes found that are not fully played, so we start at the beginning
1813 resolved_episode = next((x for x in all_episodes), None)
1814 if resolved_episode is None:
1815 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1816 # get the index of the episode
1817 episode_index = all_episodes.index(resolved_episode)
1818 # return the (remaining) episode(s) to play
1819 return UniqueList(all_episodes[episode_index:])
1820
1821 def _get_next_index(
1822 self,
1823 queue_id: str,
1824 cur_index: int | None,
1825 is_skip: bool = False,
1826 allow_repeat: bool = True,
1827 ) -> int | None:
1828 """
1829 Return the next index for the queue, accounting for repeat settings.
1830
1831 Will return None if there are no (more) items in the queue.
1832 """
1833 queue = self._queues[queue_id]
1834 queue_items = self._queue_items[queue_id]
1835 if not queue_items or cur_index is None:
1836 # queue is empty
1837 return None
1838 # handle repeat single track
1839 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1840 return cur_index if allow_repeat else None
1841 # handle cur_index is last index of the queue
1842 if cur_index >= (len(queue_items) - 1):
1843 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1844 # if repeat all is enabled, we simply start again from the beginning
1845 return 0
1846 return None
1847 # all other: just the next index
1848 return cur_index + 1
1849
1850 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1851 """Return next QueueItem for given queue."""
1852 index: int
1853 if isinstance(cur_index, str):
1854 resolved_index = self.index_by_id(queue_id, cur_index)
1855 if resolved_index is None:
1856 return None # guard
1857 index = resolved_index
1858 else:
1859 index = cur_index
1860 # At this point index is guaranteed to be int
1861 for skip in range(5):
1862 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1863 break
1864 next_item = self.get_item(queue_id, next_index)
1865 if next_item is None:
1866 continue
1867 if not next_item.available:
1868 # ensure that we skip unavailable items (set by load_next track logic)
1869 continue
1870 return next_item
1871 return None
1872
1873 async def _fill_radio_tracks(self, queue_id: str) -> None:
1874 """Fill a Queue with (additional) Radio tracks."""
1875 self.logger.debug(
1876 "Filling radio tracks for queue %s",
1877 queue_id,
1878 )
1879 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1880 # fill queue - filter out unavailable items
1881 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1882 await self.load(
1883 queue_id,
1884 queue_items,
1885 insert_at_index=len(self._queue_items[queue_id]) + 1,
1886 )
1887
1888 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1889 """Enqueue the next item on the player."""
1890 if not next_item:
1891 # no next item, nothing to do...
1892 return
1893
1894 queue = self._queues[queue_id]
1895 if queue.flow_mode:
1896 # ignore this for flow mode
1897 return
1898
1899 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1900 await self.mass.players.enqueue_next_media(
1901 player_id=queue_id,
1902 media=await self.player_media_from_queue_item(next_item),
1903 )
1904 if queue.next_item_id_enqueued != next_item.queue_item_id:
1905 queue.next_item_id_enqueued = next_item.queue_item_id
1906 self.logger.debug(
1907 "Enqueued next track %s on queue %s",
1908 next_item.name,
1909 self._queues[queue_id].display_name,
1910 )
1911
1912 task_id = f"enqueue_next_item_{queue_id}"
1913 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1914
1915 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1916 """
1917 Preload the streamdetails for the next item in the queue/buffer.
1918
1919 This basically ensures the item is playable and fetches the stream details.
1920 If an error occurs, the item will be skipped and the next item will be loaded.
1921 """
1922 queue = self._queues[queue_id]
1923
1924 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1925 try:
1926 # wait for the item that was loaded in the buffer is the actually playing item
1927 # this prevents a race condition when we preload the next item too soon
1928 # while the player is actually preloading the previously enqueued item.
1929 retries = 120
1930 while retries > 0:
1931 if not queue.current_item:
1932 return # guard
1933 if queue.current_item.queue_item_id == item_id_in_buffer:
1934 break
1935 retries -= 1
1936 await asyncio.sleep(1)
1937 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1938 self.logger.debug(
1939 "Preloaded next item %s for queue %s",
1940 next_item.name,
1941 queue.display_name,
1942 )
1943 # enqueue the next item on the player
1944 self._enqueue_next_item(queue_id, next_item)
1945
1946 except QueueEmpty:
1947 return
1948
1949 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1950 # this should not happen, but guard anyways
1951 return
1952 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1953 # radio items or no duration, nothing to do
1954 return
1955
1956 task_id = f"preload_next_item_{queue_id}"
1957 self.mass.create_task(
1958 _preload_streamdetails,
1959 item_id_in_buffer,
1960 task_id=task_id,
1961 abort_existing=True,
1962 )
1963
1964 async def _resolve_media_items(
1965 self,
1966 media_item: MediaItemType | ItemMapping | BrowseFolder,
1967 start_item: str | None = None,
1968 userid: str | None = None,
1969 queue_id: str | None = None,
1970 ) -> list[MediaItemType]:
1971 """Resolve/unwrap media items to enqueue."""
1972 # resolve Itemmapping to full media item
1973 if isinstance(media_item, ItemMapping):
1974 if media_item.uri is None:
1975 raise InvalidDataError("ItemMapping has no URI")
1976 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1977 if media_item.media_type == MediaType.PLAYLIST:
1978 media_item = cast("Playlist", media_item)
1979 self.mass.create_task(
1980 self.mass.music.mark_item_played(
1981 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1982 )
1983 )
1984 return list(await self.get_playlist_tracks(media_item, start_item))
1985 if media_item.media_type == MediaType.ARTIST:
1986 media_item = cast("Artist", media_item)
1987 self.mass.create_task(
1988 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1989 )
1990 return list(await self.get_artist_tracks(media_item))
1991 if media_item.media_type == MediaType.ALBUM:
1992 media_item = cast("Album", media_item)
1993 self.mass.create_task(
1994 self.mass.music.mark_item_played(
1995 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1996 )
1997 )
1998 return list(await self.get_album_tracks(media_item, start_item))
1999 if media_item.media_type == MediaType.GENRE:
2000 media_item = cast("Genre", media_item)
2001 self.mass.create_task(
2002 self.mass.music.mark_item_played(
2003 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2004 )
2005 )
2006 return list(await self.get_genre_tracks(media_item, start_item))
2007 if media_item.media_type == MediaType.AUDIOBOOK:
2008 media_item = cast("Audiobook", media_item)
2009 # ensure we grab the correct/latest resume point info
2010 media_item.resume_position_ms = await self.get_audiobook_resume_point(
2011 media_item, start_item, userid=userid
2012 )
2013 return [media_item]
2014 if media_item.media_type == MediaType.PODCAST:
2015 media_item = cast("Podcast", media_item)
2016 self.mass.create_task(
2017 self.mass.music.mark_item_played(
2018 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2019 )
2020 )
2021 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
2022 if media_item.media_type == MediaType.PODCAST_EPISODE:
2023 media_item = cast("PodcastEpisode", media_item)
2024 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
2025 if media_item.media_type == MediaType.FOLDER:
2026 media_item = cast("BrowseFolder", media_item)
2027 return list(await self._get_folder_tracks(media_item))
2028 # all other: single track or radio item
2029 return [cast("MediaItemType", media_item)]
2030
2031 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
2032 """Try to resume playback from playlog when queue is empty.
2033
2034 Attempts to find user-initiated recently played items in the following order:
2035 1. By userid AND queue_id
2036 2. By queue_id only
2037 3. By userid only (if available)
2038 4. Any recently played item
2039
2040 :param queue: The queue to resume playback on.
2041 :return: True if playback was started, False otherwise.
2042 """
2043 # Try different filter combinations in order of specificity
2044 filter_attempts: list[tuple[str | None, str | None, str]] = []
2045 if queue.userid:
2046 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
2047 filter_attempts.append((None, queue.queue_id, "queue_id match"))
2048 if queue.userid:
2049 filter_attempts.append((queue.userid, None, "userid match"))
2050 filter_attempts.append((None, None, "any recent item"))
2051
2052 for userid, queue_id, match_type in filter_attempts:
2053 items = await self.mass.music.recently_played(
2054 limit=5,
2055 fully_played_only=False,
2056 user_initiated_only=True,
2057 userid=userid,
2058 queue_id=queue_id,
2059 )
2060 for item in items:
2061 if not item.uri:
2062 continue
2063 try:
2064 await self.play_media(queue.queue_id, item)
2065 self.logger.info(
2066 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
2067 )
2068 return True
2069 except MusicAssistantError as err:
2070 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
2071 continue
2072
2073 return False
2074
2075 async def _get_radio_tracks(
2076 self, queue_id: str, is_initial_radio_mode: bool = False
2077 ) -> list[Track]:
2078 """Call the registered music providers for dynamic tracks."""
2079 queue = self._queues[queue_id]
2080 queue_track_items: list[Track] = [
2081 q.media_item
2082 for q in self._queue_items[queue_id]
2083 if q.media_item and isinstance(q.media_item, Track)
2084 ]
2085 if not queue.radio_source:
2086 # this may happen during race conditions as this method is called delayed
2087 return []
2088 self.logger.info(
2089 "Fetching radio tracks for queue %s based on: %s",
2090 queue.display_name,
2091 ", ".join([x.name for x in queue.radio_source]),
2092 )
2093
2094 # Get user's preferred provider instances for steering provider selection
2095 preferred_provider_instances: list[str] | None = None
2096 if (
2097 queue.userid
2098 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
2099 and playback_user.provider_filter
2100 ):
2101 preferred_provider_instances = playback_user.provider_filter
2102
2103 available_base_tracks: list[Track] = []
2104 base_track_sample_size = 5
2105 # Some providers have very deterministic similar track algorithms when providing
2106 # a single track item. When we have a radio mode based on 1 track and we have to
2107 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2108 if (
2109 len(queue.radio_source) == 1
2110 and queue.radio_source[0].media_type == MediaType.TRACK
2111 and not is_initial_radio_mode
2112 ):
2113 available_base_tracks = queue_track_items
2114 else:
2115 # Grab all the available base tracks based on the selected source items.
2116 # shuffle the source items, just in case
2117 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2118 ctrl = self.mass.music.get_controller(radio_item.media_type)
2119 try:
2120 available_base_tracks += [
2121 track
2122 for track in await ctrl.radio_mode_base_tracks(
2123 radio_item, # type: ignore[arg-type]
2124 preferred_provider_instances,
2125 )
2126 # Avoid duplicate base tracks
2127 if track not in available_base_tracks
2128 ]
2129 except UnsupportedFeaturedException as err:
2130 self.logger.debug(
2131 "Skip loading radio items for %s: %s ",
2132 radio_item.uri,
2133 str(err),
2134 )
2135 if not available_base_tracks:
2136 raise UnsupportedFeaturedException("Radio mode not available for source items")
2137
2138 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2139 base_tracks = random.sample(
2140 available_base_tracks,
2141 min(base_track_sample_size, len(available_base_tracks)),
2142 )
2143 # Use a set to avoid duplicate dynamic tracks
2144 dynamic_tracks: set[Track] = set()
2145 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2146 for allow_lookup in (False, True):
2147 if dynamic_tracks:
2148 break
2149 for base_track in base_tracks:
2150 try:
2151 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2152 base_track.item_id,
2153 base_track.provider,
2154 allow_lookup=allow_lookup,
2155 preferred_provider_instances=preferred_provider_instances,
2156 )
2157 except MediaNotFoundError:
2158 # Some providers don't have similar tracks for all items. For example,
2159 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2160 # in that case, just skip the track.
2161 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2162 continue
2163 for track in _similar_tracks:
2164 if (
2165 track not in base_tracks
2166 # Exclude tracks we have already played / queued
2167 and track not in queue_track_items
2168 # Ignore tracks that are too long for radio mode, e.g. mixes
2169 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2170 ):
2171 dynamic_tracks.add(track)
2172 if len(dynamic_tracks) >= 50:
2173 break
2174 queue_tracks: list[Track] = []
2175 dynamic_tracks_list = list(dynamic_tracks)
2176 # Only include the sampled base tracks when the radio mode is first initialized
2177 if is_initial_radio_mode:
2178 queue_tracks += [base_tracks[0]]
2179 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2180 if len(base_tracks) > 1:
2181 for base_track in base_tracks[1:]:
2182 queue_tracks += [base_track]
2183 if len(dynamic_tracks_list) > 2:
2184 queue_tracks += random.sample(dynamic_tracks_list, 2)
2185 else:
2186 queue_tracks += dynamic_tracks_list
2187 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2188 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2189 if remaining_dynamic_tracks:
2190 queue_tracks += random.sample(
2191 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2192 )
2193 return queue_tracks
2194
2195 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2196 """Fetch (playable) tracks for given browse folder."""
2197 self.logger.info(
2198 "Fetching tracks to play for folder %s",
2199 folder.name,
2200 )
2201 tracks: list[Track] = []
2202 for item in await self.mass.music.browse(folder.path):
2203 if not item.is_playable:
2204 continue
2205 # recursively fetch tracks from all media types
2206 resolved = await self._resolve_media_items(item)
2207 tracks += [x for x in resolved if isinstance(x, Track)]
2208
2209 return tracks
2210
2211 def _update_queue_from_player(
2212 self,
2213 player: Player,
2214 ) -> None:
2215 """Update the Queue when the player state changed."""
2216 queue_id = player.player_id
2217 queue = self._queues[queue_id]
2218
2219 # basic properties
2220 queue.display_name = player.state.name
2221 queue.available = player.state.available
2222 queue.items = len(self._queue_items[queue_id])
2223
2224 queue.state = (
2225 player.state.playback_state or PlaybackState.IDLE
2226 if queue.active
2227 else PlaybackState.IDLE
2228 )
2229 # update current item/index from player report
2230 if queue.active and queue.state in (
2231 PlaybackState.PLAYING,
2232 PlaybackState.PAUSED,
2233 ):
2234 # NOTE: If the queue is not playing (yet) we will not update the current index
2235 # to ensure we keep the previously known current index
2236 if queue.flow_mode:
2237 # flow mode active, the player is playing one long stream
2238 # so we need to calculate the current index and elapsed time
2239 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2240 elif item_id := self._parse_player_current_item_id(queue_id, player):
2241 # normal mode, the player itself will report the current item
2242 elapsed_time = int(player.state.corrected_elapsed_time or 0)
2243 current_index = self.index_by_id(queue_id, item_id)
2244 else:
2245 # this may happen if the player is still transitioning between tracks
2246 # we ignore this for now and keep the current index as is
2247 return
2248
2249 # get current/next item based on current index
2250 queue.current_index = current_index
2251 queue.current_item = current_item = self.get_item(queue_id, current_index)
2252 queue.next_item = (
2253 self.get_next_item(queue_id, current_index)
2254 if current_item and current_index is not None
2255 else None
2256 )
2257
2258 # correct elapsed time when seeking
2259 if (
2260 not queue.flow_mode
2261 and current_item
2262 and current_item.streamdetails
2263 and current_item.streamdetails.seek_position
2264 ):
2265 elapsed_time += current_item.streamdetails.seek_position
2266 queue.elapsed_time = elapsed_time
2267 queue.elapsed_time_last_updated = time.time()
2268
2269 elif not queue.current_item and queue.current_index is not None:
2270 current_index = queue.current_index
2271 queue.current_item = current_item = self.get_item(queue_id, current_index)
2272 queue.next_item = (
2273 self.get_next_item(queue_id, current_index)
2274 if current_item and current_index is not None
2275 else None
2276 )
2277
2278 # This is enough to detect any changes in the DSPDetails
2279 # (so child count changed, or any output format changed)
2280 output_formats = []
2281 if output_format := player.extra_data.get("output_format"):
2282 output_formats.append(str(output_format))
2283 for child_id in player.state.group_members:
2284 if (child := self.mass.players.get_player(child_id)) and (
2285 output_format := child.extra_data.get("output_format")
2286 ):
2287 output_formats.append(str(output_format))
2288 else:
2289 output_formats.append("unknown")
2290
2291 # basic throttle: do not send state changed events if queue did not actually change
2292 prev_state: CompareState = self._prev_states.get(
2293 queue_id,
2294 CompareState(
2295 queue_id=queue_id,
2296 state=PlaybackState.IDLE,
2297 current_item_id=None,
2298 next_item_id=None,
2299 current_item=None,
2300 elapsed_time=0,
2301 last_playing_elapsed_time=0,
2302 stream_title=None,
2303 codec_type=None,
2304 output_formats=None,
2305 ),
2306 )
2307 # update last_playing_elapsed_time only when the player is actively playing
2308 # use corrected_elapsed_time which accounts for time since last update
2309 # this preserves the last known elapsed time when transitioning to idle/paused
2310 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2311 prev_item_id = prev_state["current_item_id"]
2312 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2313 if queue.state == PlaybackState.PLAYING:
2314 current_elapsed = int(queue.corrected_elapsed_time)
2315 if current_item_id != prev_item_id:
2316 # new track started, reset the elapsed time tracker
2317 last_playing_elapsed_time = current_elapsed
2318 else:
2319 # same track, use the max of current and previous to handle timing issues
2320 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2321 else:
2322 last_playing_elapsed_time = prev_playing_elapsed
2323 new_state = CompareState(
2324 queue_id=queue_id,
2325 state=queue.state,
2326 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2327 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2328 current_item=queue.current_item,
2329 elapsed_time=int(queue.elapsed_time),
2330 last_playing_elapsed_time=last_playing_elapsed_time,
2331 stream_title=(
2332 queue.current_item.streamdetails.stream_title
2333 if queue.current_item and queue.current_item.streamdetails
2334 else None
2335 ),
2336 codec_type=(
2337 queue.current_item.streamdetails.audio_format.codec_type
2338 if queue.current_item and queue.current_item.streamdetails
2339 else None
2340 ),
2341 output_formats=output_formats,
2342 )
2343 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2344 with suppress(KeyError):
2345 changed_keys.remove("next_item_id")
2346 with suppress(KeyError):
2347 changed_keys.remove("last_playing_elapsed_time")
2348
2349 # store the new state
2350 if queue.active:
2351 self._prev_states[queue_id] = new_state
2352 else:
2353 self._prev_states.pop(queue_id, None)
2354
2355 # return early if nothing changed
2356 if len(changed_keys) == 0:
2357 return
2358
2359 # signal update and store state
2360 send_update = True
2361 if changed_keys == {"elapsed_time"}:
2362 # only elapsed time changed, do not send full queue update
2363 send_update = False
2364 prev_time = prev_state.get("elapsed_time") or 0
2365 cur_time = new_state.get("elapsed_time") or 0
2366 if abs(cur_time - prev_time) > 2:
2367 # send dedicated event for time updates when seeking
2368 self.mass.signal_event(
2369 EventType.QUEUE_TIME_UPDATED,
2370 object_id=queue_id,
2371 data=queue.elapsed_time,
2372 )
2373 # also signal update to the player itself so it can update its current_media
2374 self.mass.players.trigger_player_update(queue_id)
2375
2376 if send_update:
2377 self.signal_update(queue_id)
2378
2379 if "output_formats" in changed_keys:
2380 # refresh DSP details since they may have changed
2381 dsp = get_stream_dsp_details(self.mass, queue_id)
2382 if queue.current_item and queue.current_item.streamdetails:
2383 queue.current_item.streamdetails.dsp = dsp
2384 if queue.next_item and queue.next_item.streamdetails:
2385 queue.next_item.streamdetails.dsp = dsp
2386
2387 # handle updating stream_metadata if needed
2388 if (
2389 queue.current_item
2390 and (streamdetails := queue.current_item.streamdetails)
2391 and streamdetails.stream_metadata_update_callback
2392 and (
2393 streamdetails.stream_metadata_last_updated is None
2394 or (
2395 time.time() - streamdetails.stream_metadata_last_updated
2396 >= streamdetails.stream_metadata_update_interval
2397 )
2398 )
2399 ):
2400 streamdetails.stream_metadata_last_updated = time.time()
2401 self.mass.create_task(
2402 streamdetails.stream_metadata_update_callback(
2403 streamdetails, int(queue.corrected_elapsed_time)
2404 )
2405 )
2406
2407 # handle sending a playback progress report
2408 # we do this every 30 seconds or when the state changes
2409 if (
2410 changed_keys.intersection({"state", "current_item_id"})
2411 or int(queue.elapsed_time) % PLAYBACK_REPORT_INTERVAL_SECONDS == 0
2412 ):
2413 self._handle_playback_progress_report(queue, prev_state, new_state)
2414
2415 # check if we need to clear the queue if we reached the end
2416 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2417 self._handle_end_of_queue(queue, prev_state, new_state)
2418
2419 # watch dynamic radio items refill if needed
2420 if "current_item_id" in changed_keys:
2421 # auto enable radio mode if dont stop the music is enabled
2422 if (
2423 queue.dont_stop_the_music_enabled
2424 and queue.enqueued_media_items
2425 and queue.current_index is not None
2426 and (queue.items - queue.current_index) <= 1
2427 ):
2428 # We have received the last item in the queue and Don't stop the music is enabled
2429 # set the played media item(s) as radio items (which will refill the queue)
2430 # note that this will fail if there are no media items for which we have
2431 # a dynamic radio source.
2432 self.logger.debug(
2433 "End of queue detected and Don't stop the music is enabled for %s"
2434 " - setting enqueued media items as radio source: %s",
2435 queue.display_name,
2436 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2437 )
2438 queue.radio_source = queue.enqueued_media_items
2439 # auto fill radio tracks if less than 5 tracks left in the queue
2440 if (
2441 queue.radio_source
2442 and queue.current_index is not None
2443 and (queue.items - queue.current_index) < 5
2444 ):
2445 task_id = f"fill_radio_tracks_{queue_id}"
2446 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2447
2448 def _get_flow_queue_stream_index(
2449 self, queue: PlayerQueue, player: Player
2450 ) -> tuple[int | None, int]:
2451 """Calculate current queue index and current track elapsed time when flow mode is active."""
2452 elapsed_time_queue_total = player.state.corrected_elapsed_time or 0
2453 if queue.current_index is None and not queue.flow_mode_stream_log:
2454 return queue.current_index, int(queue.elapsed_time)
2455
2456 # For each track that has been streamed/buffered to the player,
2457 # a playlog entry will be created with the queue item id
2458 # and the amount of seconds streamed. We traverse the playlog to figure
2459 # out where we are in the queue, accounting for actual streamed
2460 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2461 # it will simply be in the playlog multiple times.
2462 played_time = 0.0
2463 queue_index: int | None = queue.current_index or 0
2464 track_time = 0.0
2465 for play_log_entry in queue.flow_mode_stream_log:
2466 queue_item_duration = (
2467 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2468 play_log_entry.seconds_streamed
2469 if play_log_entry.seconds_streamed is not None
2470 else play_log_entry.duration or 3600 * 24 * 7
2471 )
2472 if elapsed_time_queue_total > (queue_item_duration + played_time):
2473 # total elapsed time is more than (streamed) track duration
2474 # this track has been fully played, move on.
2475 played_time += queue_item_duration
2476 else:
2477 # no more seconds left to divide, this is our track
2478 # account for any seeking by adding the skipped/seeked seconds
2479 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2480 queue_item = self.get_item(queue.queue_id, queue_index)
2481 if queue_item and queue_item.streamdetails:
2482 track_sec_skipped = queue_item.streamdetails.seek_position
2483 else:
2484 track_sec_skipped = 0
2485 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2486 break
2487 if player.state.playback_state != PlaybackState.PLAYING:
2488 # if the player is not playing, we can't be sure that the elapsed time is correct
2489 # so we just return the queue index and the elapsed time
2490 return queue.current_index, int(queue.elapsed_time)
2491 return queue_index, int(track_time)
2492
2493 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2494 """Parse QueueItem ID from Player's current url."""
2495 protocol_player = player
2496 if player.active_output_protocol and player.active_output_protocol != "native":
2497 protocol_player = self.mass.players.get_player(player.active_output_protocol) or player
2498 if not protocol_player.current_media:
2499 # YES, we use player.current_media on purpose here because we need the raw metadata
2500 return None
2501 # prefer queue_id and queue_item_id within the current media
2502 if (
2503 protocol_player.current_media.source_id == queue_id
2504 and protocol_player.current_media.queue_item_id
2505 ):
2506 return protocol_player.current_media.queue_item_id
2507 # special case for sonos players
2508 if protocol_player.current_media.uri and protocol_player.current_media.uri.startswith(
2509 f"mass:{queue_id}"
2510 ):
2511 if protocol_player.current_media.queue_item_id:
2512 return protocol_player.current_media.queue_item_id
2513 current_item_id = protocol_player.current_media.uri.split(":")[-1]
2514 if self.get_item(queue_id, current_item_id):
2515 return current_item_id
2516 return None
2517 # try to extract the item id from a mass stream url
2518 if (
2519 protocol_player.current_media.uri
2520 and queue_id in protocol_player.current_media.uri
2521 and self.mass.streams.base_url in protocol_player.current_media.uri
2522 ):
2523 current_item_id = protocol_player.current_media.uri.rsplit("/")[-1].split(".")[0]
2524 if self.get_item(queue_id, current_item_id):
2525 return current_item_id
2526 # try to extract the item id from a queue_id/item_id combi
2527 if (
2528 protocol_player.current_media.uri
2529 and queue_id in protocol_player.current_media.uri
2530 and "/" in protocol_player.current_media.uri
2531 ):
2532 current_item_id = protocol_player.current_media.uri.split("/")[1]
2533 if self.get_item(queue_id, current_item_id):
2534 return current_item_id
2535
2536 return None
2537
2538 def _handle_end_of_queue(
2539 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2540 ) -> None:
2541 """Check if the queue should be cleared after the current item."""
2542 # check if queue state changed to stopped (from playing/paused to idle)
2543 if not (
2544 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2545 and new_state["state"] == PlaybackState.IDLE
2546 ):
2547 return
2548 # check if no more items in the queue (next_item should be None at end of queue)
2549 if queue.next_item is not None:
2550 return
2551 # check if we had a previous item playing
2552 if prev_state["current_item_id"] is None:
2553 return
2554
2555 async def _clear_queue_delayed() -> None:
2556 for _ in range(5):
2557 await asyncio.sleep(1)
2558 if queue.state != PlaybackState.IDLE:
2559 return
2560 if queue.next_item is not None:
2561 return
2562 self.logger.info("End of queue reached, clearing items")
2563 self.clear(queue.queue_id)
2564
2565 # all checks passed, we stopped playback at the last (or single) track of the queue
2566 # now determine if the item was fully played before clearing
2567
2568 # For flow mode, check if the last track was fully streamed using the stream log
2569 # This is more reliable than elapsed_time which can be reset/incorrect
2570 if queue.flow_mode and queue.flow_mode_stream_log:
2571 last_log_entry = queue.flow_mode_stream_log[-1]
2572 if last_log_entry.seconds_streamed is not None:
2573 # The last track finished streaming, safe to clear queue
2574 self.mass.create_task(_clear_queue_delayed())
2575 return
2576
2577 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2578 prev_item = prev_state["current_item"]
2579 if prev_item and (streamdetails := prev_item.streamdetails):
2580 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2581 elif prev_item:
2582 duration = prev_item.duration or 24 * 3600
2583 else:
2584 # No current item means player has already cleared it, safe to clear queue
2585 self.mass.create_task(_clear_queue_delayed())
2586 return
2587
2588 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2589 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2590 seconds_played = int(prev_state["last_playing_elapsed_time"])
2591 # debounce this a bit to make sure we're not clearing the queue by accident
2592 # only clear if the last track was played to near completion (within 5 seconds of end)
2593 if seconds_played >= (duration or 3600) - 5:
2594 self.mass.create_task(_clear_queue_delayed())
2595
2596 def _handle_playback_progress_report(
2597 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2598 ) -> None:
2599 """Handle playback progress report."""
2600 # detect change in current index to report that a item has been played
2601 prev_item_id = prev_state["current_item_id"]
2602 cur_item_id = new_state["current_item_id"]
2603 if prev_item_id is None and cur_item_id is None:
2604 return
2605
2606 if prev_item_id is not None and prev_item_id != cur_item_id:
2607 # we have a new item, so we need report the previous one
2608 is_current_item = False
2609 item_to_report = prev_state["current_item"]
2610 seconds_played = int(prev_state["elapsed_time"])
2611 else:
2612 # report on current item
2613 is_current_item = True
2614 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2615 seconds_played = int(new_state["elapsed_time"])
2616
2617 if not item_to_report:
2618 return # guard against invalid items
2619
2620 if not (media_item := item_to_report.media_item):
2621 # only report on media items
2622 return
2623 assert media_item.uri is not None # uri is set in __post_init__
2624
2625 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2626 # Ignore items that had a stream error
2627 return
2628
2629 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2630 duration = int(item_to_report.streamdetails.duration)
2631 else:
2632 duration = int(item_to_report.duration or 3 * 3600)
2633
2634 if seconds_played < 5:
2635 # ignore items that have been played less than 5 seconds
2636 # this also filters out a bounce effect where the previous item
2637 # gets reported with 0 elapsed seconds after a new item starts playing
2638 return
2639
2640 # determine if item is fully played
2641 # for podcasts and audiobooks we account for the last 60 seconds
2642 percentage_played = percentage(seconds_played, duration)
2643 if not is_current_item and item_to_report.media_type in (
2644 MediaType.AUDIOBOOK,
2645 MediaType.PODCAST_EPISODE,
2646 ):
2647 fully_played = seconds_played >= duration - 60
2648 elif not is_current_item:
2649 # 90% of the track must be played to be considered fully played
2650 fully_played = percentage_played >= 90
2651 else:
2652 fully_played = seconds_played >= duration - 10
2653
2654 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2655 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2656 self.logger.debug(
2657 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2658 queue.display_name,
2659 "is playing" if is_playing else "played",
2660 item_to_report.name,
2661 item_to_report.uri,
2662 fully_played,
2663 f"{percentage_played}%",
2664 seconds_played,
2665 duration,
2666 )
2667 # add entry to playlog - this also handles resume of podcasts/audiobooks
2668 self.mass.create_task(
2669 self.mass.music.mark_item_played(
2670 media_item,
2671 fully_played=fully_played,
2672 seconds_played=seconds_played,
2673 is_playing=is_playing,
2674 userid=queue.userid,
2675 queue_id=queue.queue_id,
2676 user_initiated=False,
2677 )
2678 )
2679
2680 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2681 # signal 'media item played' event,
2682 # which is useful for plugins that want to do scrobbling
2683 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2684 artists_names = [a.name for a in artists]
2685 self.mass.signal_event(
2686 EventType.MEDIA_ITEM_PLAYED,
2687 object_id=media_item.uri,
2688 data=MediaItemPlaybackProgressReport(
2689 uri=media_item.uri,
2690 media_type=media_item.media_type,
2691 name=media_item.name,
2692 version=getattr(media_item, "version", None),
2693 artist=(
2694 getattr(media_item, "artist_str", None) or artists_names[0]
2695 if artists_names
2696 else None
2697 ),
2698 artists=artists_names,
2699 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2700 album=album.name if album else None,
2701 album_mbid=album.mbid if album else None,
2702 album_artist=(album.artist_str if isinstance(album, Album) else None),
2703 album_artist_mbids=(
2704 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2705 ),
2706 image_url=(
2707 self.mass.metadata.get_image_url(
2708 item_to_report.media_item.image, prefer_proxy=False
2709 )
2710 if item_to_report.media_item.image
2711 else None
2712 ),
2713 duration=duration,
2714 mbid=(getattr(media_item, "mbid", None)),
2715 seconds_played=seconds_played,
2716 fully_played=fully_played,
2717 is_playing=is_playing,
2718 userid=queue.userid,
2719 ),
2720 )
2721
2722
2723async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2724 """Shuffle queue items, avoiding identical tracks next to each other.
2725
2726 Best-effort approach to prevent the same track from appearing adjacent.
2727 Does a random shuffle first, then makes a limited number of passes to
2728 swap adjacent duplicates with a random item further in the list.
2729
2730 :param items: List of queue items to shuffle.
2731 """
2732 if len(items) <= 2:
2733 return random.sample(items, len(items)) if len(items) == 2 else items
2734
2735 # Start with a random shuffle
2736 shuffled = random.sample(items, len(items))
2737
2738 # Make a few passes to fix adjacent duplicates
2739 max_passes = 3
2740 for _ in range(max_passes):
2741 swapped = False
2742 for i in range(len(shuffled) - 1):
2743 if shuffled[i].name == shuffled[i + 1].name:
2744 # Found adjacent duplicate - swap with random position at least 2 away
2745 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2746 if swap_candidates:
2747 swap_pos = random.choice(swap_candidates)
2748 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2749 swapped = True
2750 if not swapped:
2751 break
2752 # Yield to event loop between passes
2753 await asyncio.sleep(0)
2754
2755 return shuffled
2756