/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 ProviderFeature,
32 QueueOption,
33 RepeatMode,
34)
35from music_assistant_models.errors import (
36 AudioError,
37 InvalidCommand,
38 InvalidDataError,
39 MediaNotFoundError,
40 MusicAssistantError,
41 PlayerUnavailableError,
42 QueueEmpty,
43 UnsupportedFeaturedException,
44)
45from music_assistant_models.media_items import (
46 Album,
47 Artist,
48 Audiobook,
49 BrowseFolder,
50 Genre,
51 ItemMapping,
52 MediaItemType,
53 PlayableMediaItemType,
54 Playlist,
55 Podcast,
56 PodcastEpisode,
57 Track,
58 UniqueList,
59 media_from_dict,
60)
61from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
62from music_assistant_models.player_queue import PlayerQueue
63from music_assistant_models.queue_item import QueueItem
64
65from music_assistant.constants import (
66 ATTR_ANNOUNCEMENT_IN_PROGRESS,
67 MASS_LOGO_ONLINE,
68 PLAYBACK_REPORT_INTERVAL_SECONDS,
69 PLAYLIST_MEDIA_TYPES,
70 VERBOSE_LOG_LEVEL,
71 PlaylistPlayableItem,
72)
73from music_assistant.controllers.players.controller import IN_QUEUE_COMMAND
74from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
75from music_assistant.helpers.api import api_command
76from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
77from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
78from music_assistant.helpers.util import get_changed_keys, percentage
79from music_assistant.models.core_controller import CoreController
80from music_assistant.models.player import Player, PlayerMedia
81
82if TYPE_CHECKING:
83 from collections.abc import Iterator
84
85 from music_assistant_models.auth import User
86 from music_assistant_models.media_items.metadata import MediaItemImage
87
88 from music_assistant import MusicAssistant
89 from music_assistant.models.player import Player
90
91CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
92CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
93
94ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
95ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
96
97CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
98CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
99CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
100CONF_DEFAULT_ENQUEUE_OPTION_GENRE = "default_enqueue_option_genre"
101CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
102CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
103CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
104CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
105CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
106CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
107CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
108RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
109CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
110CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
111
112
113class CompareState(TypedDict):
114 """Simple object where we store the (previous) state of a queue.
115
116 Used for compare actions.
117 """
118
119 queue_id: str
120 state: PlaybackState
121 current_item_id: str | None
122 next_item_id: str | None
123 current_item: QueueItem | None
124 elapsed_time: int
125 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
126 # used to determine if a track was fully played when transitioning to idle
127 last_playing_elapsed_time: int
128 stream_title: str | None
129 codec_type: ContentType | None
130 output_formats: list[str] | None
131
132
133class PlayerQueuesController(CoreController):
134 """Controller holding all logic to enqueue music for players."""
135
136 domain: str = "player_queues"
137
138 def __init__(self, mass: MusicAssistant) -> None:
139 """Initialize core controller."""
140 super().__init__(mass)
141 self._queues: dict[str, PlayerQueue] = {}
142 self._queue_items: dict[str, list[QueueItem]] = {}
143 self._prev_states: dict[str, CompareState] = {}
144 self._transitioning_players: set[str] = set()
145 self.manifest.name = "Player Queues controller"
146 self.manifest.description = (
147 "Music Assistant's core controller which manages the queues for all players."
148 )
149 self.manifest.icon = "playlist-music"
150
151 async def close(self) -> None:
152 """Cleanup on exit."""
153 # stop all playback
154 for queue in self.all():
155 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
156 await self.stop(queue.queue_id)
157
158 async def get_config_entries(
159 self,
160 action: str | None = None,
161 values: dict[str, ConfigValueType] | None = None,
162 ) -> tuple[ConfigEntry, ...]:
163 """Return all Config Entries for this core module (if any)."""
164 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
165 return (
166 ConfigEntry(
167 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
168 type=ConfigEntryType.STRING,
169 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
170 label="Items to select when you play a (in-library) artist.",
171 options=[
172 ConfigValueOption(
173 title="Only in-library tracks",
174 value="library_tracks",
175 ),
176 ConfigValueOption(
177 title="All tracks from all albums in the library",
178 value="library_album_tracks",
179 ),
180 ConfigValueOption(
181 title="All (top) tracks from (all) streaming provider(s)",
182 value="all_tracks",
183 ),
184 ConfigValueOption(
185 title="All tracks from all albums from (all) streaming provider(s)",
186 value="all_album_tracks",
187 ),
188 ],
189 ),
190 ConfigEntry(
191 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
192 type=ConfigEntryType.STRING,
193 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
194 label="Items to select when you play a (in-library) album.",
195 options=[
196 ConfigValueOption(
197 title="Only in-library tracks",
198 value="library_tracks",
199 ),
200 ConfigValueOption(
201 title="All tracks for album on (streaming) provider",
202 value="all_tracks",
203 ),
204 ],
205 ),
206 ConfigEntry(
207 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
208 type=ConfigEntryType.STRING,
209 default_value=QueueOption.REPLACE.value,
210 label="Default enqueue option for Artist item(s).",
211 options=enqueue_options,
212 description="Define the default enqueue action for this mediatype.",
213 ),
214 ConfigEntry(
215 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
216 type=ConfigEntryType.STRING,
217 default_value=QueueOption.REPLACE.value,
218 label="Default enqueue option for Album item(s).",
219 options=enqueue_options,
220 description="Define the default enqueue action for this mediatype.",
221 ),
222 ConfigEntry(
223 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
224 type=ConfigEntryType.STRING,
225 default_value=QueueOption.PLAY.value,
226 label="Default enqueue option for Track item(s).",
227 options=enqueue_options,
228 description="Define the default enqueue action for this mediatype.",
229 ),
230 ConfigEntry(
231 key=CONF_DEFAULT_ENQUEUE_OPTION_GENRE,
232 type=ConfigEntryType.STRING,
233 default_value=QueueOption.REPLACE.value,
234 label="Default enqueue option for Genre item(s).",
235 options=enqueue_options,
236 description="Define the default enqueue action for this mediatype.",
237 ),
238 ConfigEntry(
239 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
240 type=ConfigEntryType.STRING,
241 default_value=QueueOption.REPLACE.value,
242 label="Default enqueue option for Radio item(s).",
243 options=enqueue_options,
244 description="Define the default enqueue action for this mediatype.",
245 ),
246 ConfigEntry(
247 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
248 type=ConfigEntryType.STRING,
249 default_value=QueueOption.REPLACE.value,
250 label="Default enqueue option for Playlist item(s).",
251 options=enqueue_options,
252 description="Define the default enqueue action for this mediatype.",
253 ),
254 ConfigEntry(
255 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
256 type=ConfigEntryType.STRING,
257 default_value=QueueOption.REPLACE.value,
258 label="Default enqueue option for Audiobook item(s).",
259 options=enqueue_options,
260 hidden=True,
261 ),
262 ConfigEntry(
263 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
264 type=ConfigEntryType.STRING,
265 default_value=QueueOption.REPLACE.value,
266 label="Default enqueue option for Podcast item(s).",
267 options=enqueue_options,
268 hidden=True,
269 ),
270 ConfigEntry(
271 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
272 type=ConfigEntryType.STRING,
273 default_value=QueueOption.REPLACE.value,
274 label="Default enqueue option for Podcast-episode item(s).",
275 options=enqueue_options,
276 hidden=True,
277 ),
278 ConfigEntry(
279 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
280 type=ConfigEntryType.STRING,
281 default_value=QueueOption.REPLACE.value,
282 label="Default enqueue option for Folder item(s).",
283 options=enqueue_options,
284 hidden=True,
285 ),
286 )
287
288 def __iter__(self) -> Iterator[PlayerQueue]:
289 """Iterate over (available) players."""
290 return iter(self._queues.values())
291
292 @api_command("player_queues/all")
293 def all(self) -> tuple[PlayerQueue, ...]:
294 """Return all registered PlayerQueues."""
295 return tuple(self._queues.values())
296
297 @api_command("player_queues/get")
298 def get(self, queue_id: str) -> PlayerQueue | None:
299 """Return PlayerQueue by queue_id or None if not found."""
300 return self._queues.get(queue_id)
301
302 @api_command("player_queues/items")
303 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
304 """Return all QueueItems for given PlayerQueue."""
305 if queue_id not in self._queue_items:
306 return []
307
308 return self._queue_items[queue_id][offset : offset + limit]
309
310 @api_command("player_queues/get_active_queue")
311 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
312 """Return the current active/synced queue for a player."""
313 if player := self.mass.players.get_player(player_id):
314 return self.mass.players.get_active_queue(player)
315 return None
316
317 # Queue commands
318
319 @api_command("player_queues/shuffle")
320 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
321 """Configure shuffle setting on the the queue."""
322 queue = self._queues[queue_id]
323 if queue.shuffle_enabled == shuffle_enabled:
324 return # no change
325 queue.shuffle_enabled = shuffle_enabled
326 queue_items = self._queue_items[queue_id]
327 cur_index = (
328 queue.index_in_buffer if queue.index_in_buffer is not None else queue.current_index
329 )
330 if cur_index is not None:
331 next_index = cur_index + 1
332 next_items = queue_items[next_index:]
333 else:
334 next_items = []
335 next_index = 0
336 if not shuffle_enabled:
337 # shuffle disabled, try to restore original sort order of the remaining items
338 next_items.sort(key=lambda x: x.sort_index, reverse=False)
339 await self.load(
340 queue_id=queue_id,
341 queue_items=next_items,
342 insert_at_index=next_index,
343 keep_remaining=False,
344 shuffle=shuffle_enabled,
345 )
346
347 @api_command("player_queues/dont_stop_the_music")
348 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
349 """Configure Don't stop the music setting on the queue."""
350 providers_available_with_similar_tracks = any(
351 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
352 for provider in self.mass.music.providers
353 )
354 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
355 raise UnsupportedFeaturedException(
356 "Don't stop the music is not supported by any of the available music providers"
357 )
358 queue = self._queues[queue_id]
359 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
360 self.signal_update(queue_id=queue_id)
361 # if this happens to be the last track in the queue, fill the radio source
362 if (
363 queue.dont_stop_the_music_enabled
364 and queue.enqueued_media_items
365 and queue.current_index is not None
366 and (queue.items - queue.current_index) <= 1
367 ):
368 queue.radio_source = queue.enqueued_media_items
369 task_id = f"fill_radio_tracks_{queue_id}"
370 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
371
372 @api_command("player_queues/repeat")
373 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
374 """Configure repeat setting on the the queue."""
375 queue = self._queues[queue_id]
376 if queue.repeat_mode == repeat_mode:
377 return # no change
378 queue.repeat_mode = repeat_mode
379 self.signal_update(queue_id)
380 if (
381 queue.state == PlaybackState.PLAYING
382 and queue.index_in_buffer is not None
383 and queue.index_in_buffer == queue.current_index
384 ):
385 # if the queue is playing,
386 # ensure to (re)queue the next track because it might have changed
387 # note that we only do this if the player has loaded the current track
388 # if not, we wait until it has loaded to prevent conflicts
389 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
390 self._enqueue_next_item(queue_id, next_item)
391
392 @api_command("player_queues/set_playback_speed")
393 async def set_playback_speed(self, queue_id: str, speed: float) -> None:
394 """Set the playback speed for the given queue.
395
396 :param queue_id: queue_id of the queue to configure.
397 :param speed: playback speed multiplier (0.5 to 2.0). 1.0 = normal speed.
398 """
399 if not (0.5 <= speed <= 2.0):
400 raise InvalidDataError(f"Playback speed must be between 0.5 and 2.0, got {speed}")
401 queue = self._queues[queue_id]
402 current_speed = float(queue.extra_attributes.get("playback_speed") or 1.0)
403 if abs(current_speed - speed) < 0.001:
404 return # no change
405 queue.extra_attributes["playback_speed"] = speed
406 self.signal_update(queue_id)
407 if queue.state == PlaybackState.PLAYING:
408 await self.resume(queue_id)
409
410 @api_command("player_queues/play_media")
411 async def play_media(
412 self,
413 queue_id: str,
414 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
415 option: QueueOption | None = None,
416 radio_mode: bool = False,
417 start_item: PlayableMediaItemType | str | None = None,
418 username: str | None = None,
419 ) -> None:
420 """Play media item(s) on the given queue.
421
422 :param queue_id: The queue_id of the queue to play media on.
423 :param media: Media that should be played (MediaItem(s) and/or uri's).
424 :param option: Which enqueue mode to use.
425 :param radio_mode: Enable radio mode for the given item(s).
426 :param start_item: Optional item to start the playlist or album from.
427 :param username: The username of the user requesting the playback.
428 Setting the username allows for overriding the logged-in user
429 to account for playback history per user when the play_media is
430 called from a shared context (like a web hook or automation).
431 """
432 # ruff: noqa: PLR0915
433 # we use a contextvar to bypass the throttler for this asyncio task/context
434 # this makes sure that playback has priority over other requests that may be
435 # happening in the background
436 BYPASS_THROTTLER.set(True)
437 if not (queue := self.get(queue_id)):
438 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
439 # always fetch the underlying player so we can raise early if its not available
440 queue_player = self.mass.players.get_player(queue_id, True)
441 assert queue_player is not None # for type checking
442 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
443 self.logger.warning("Ignore queue command: An announcement is in progress")
444 return
445
446 # save the user requesting the playback
447 playback_user: User | None
448 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
449 playback_user = user
450 else:
451 playback_user = get_current_user()
452 queue.userid = playback_user.user_id if playback_user else None
453
454 # a single item or list of items may be provided
455 media_list = media if isinstance(media, list) else [media]
456
457 # clear queue if needed
458 if option == QueueOption.REPLACE:
459 self.clear(queue_id)
460 # Clear the 'enqueued media item' list when a new queue is requested
461 if option not in (QueueOption.ADD, QueueOption.NEXT):
462 queue.enqueued_media_items.clear()
463
464 media_items: list[MediaItemType] = []
465 radio_source: list[MediaItemType] = []
466 # resolve all media items
467 for item in media_list:
468 try:
469 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
470 media_item: MediaItemType | ItemMapping | BrowseFolder
471 if isinstance(item, str):
472 media_item = await self.mass.music.get_item_by_uri(item)
473 elif isinstance(item, dict): # type: ignore[unreachable]
474 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
475 # converting them to MediaItem objects. The parse_value function in api.py
476 # should handle dict-to-object conversion, but dicts are slipping through
477 # in some cases. This is defensive handling for that parser bug.
478 media_item = media_from_dict(item) # type: ignore[unreachable]
479 self.logger.debug("Converted to: %s", type(media_item))
480 else:
481 # item is MediaItemType | ItemMapping at this point
482 media_item = item
483
484 # Save requested media item to play on the queue so we can use it as a source
485 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
486 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
487 if not isinstance(
488 media_item, (ItemMapping, BrowseFolder)
489 ) and media_item.media_type in (
490 MediaType.TRACK,
491 MediaType.ALBUM,
492 MediaType.PLAYLIST,
493 MediaType.ARTIST,
494 ):
495 queue.enqueued_media_items.append(media_item)
496 if len(queue.enqueued_media_items) > 10:
497 queue.enqueued_media_items.pop(0)
498
499 # handle default enqueue option if needed
500 if option is None:
501 config_value = await self.mass.config.get_core_config_value(
502 self.domain,
503 f"default_enqueue_option_{media_item.media_type.value}",
504 return_type=str,
505 )
506 option = QueueOption(config_value)
507 if option == QueueOption.REPLACE:
508 self.clear(queue_id, skip_stop=True)
509
510 # collect media_items to play
511 if radio_mode:
512 # Type guard for mypy - only add full MediaItemType to radio_source
513 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
514 radio_source.append(media_item)
515 else:
516 # Convert start_item to string URI if needed
517 start_item_uri: str | None = None
518 if isinstance(start_item, str):
519 start_item_uri = start_item
520 elif start_item is not None:
521 start_item_uri = start_item.uri
522 media_items += await self._resolve_media_items(
523 media_item, start_item_uri, queue_id=queue_id
524 )
525
526 except MusicAssistantError as err:
527 # invalid MA uri or item not found error
528 self.logger.warning("Skipping %s: %s", item, str(err))
529
530 # overwrite or append radio source items
531 if option not in (QueueOption.ADD, QueueOption.NEXT):
532 queue.radio_source = radio_source
533 else:
534 queue.radio_source += radio_source
535 # Use collected media items to calculate the radio if radio mode is on
536 if radio_mode:
537 radio_tracks = await self._get_radio_tracks(
538 queue_id=queue_id, is_initial_radio_mode=True
539 )
540 media_items = list(radio_tracks)
541
542 # only add valid/available items
543 queue_items: list[QueueItem] = []
544 for x in media_items:
545 if not x or not x.available:
546 continue
547 queue_items.append(
548 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
549 )
550
551 if not queue_items:
552 raise MediaNotFoundError("No playable items found")
553
554 # load the items into the queue
555 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
556 cur_index = (
557 queue.index_in_buffer
558 if queue.index_in_buffer is not None
559 else (queue.current_index if queue.current_index is not None else 0)
560 )
561 else:
562 cur_index = queue.current_index or 0
563 insert_at_index = cur_index + 1
564 # Radio modes are already shuffled in a pattern we would like to keep.
565 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
566
567 # handle replace: clear all items and replace with the new items
568 if option == QueueOption.REPLACE:
569 await self.load(
570 queue_id,
571 queue_items=queue_items,
572 keep_remaining=False,
573 keep_played=False,
574 shuffle=shuffle,
575 )
576 await self.play_index(queue_id, 0)
577 return
578 # handle next: add item(s) in the index next to the playing/loaded/buffered index
579 if option == QueueOption.NEXT:
580 await self.load(
581 queue_id,
582 queue_items=queue_items,
583 insert_at_index=insert_at_index,
584 shuffle=shuffle,
585 )
586 return
587 if option == QueueOption.REPLACE_NEXT:
588 await self.load(
589 queue_id,
590 queue_items=queue_items,
591 insert_at_index=insert_at_index,
592 keep_remaining=False,
593 shuffle=shuffle,
594 )
595 return
596 # handle play: replace current loaded/playing index with new item(s)
597 if option == QueueOption.PLAY:
598 await self.load(
599 queue_id,
600 queue_items=queue_items,
601 insert_at_index=insert_at_index,
602 shuffle=shuffle,
603 )
604 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
605 await self.play_index(queue_id, next_index)
606 return
607 # handle add: add/append item(s) to the remaining queue items
608 if option == QueueOption.ADD:
609 await self.load(
610 queue_id=queue_id,
611 queue_items=queue_items,
612 insert_at_index=insert_at_index
613 if queue.shuffle_enabled
614 else len(self._queue_items[queue_id]) + 1,
615 shuffle=queue.shuffle_enabled,
616 )
617 # handle edgecase, queue is empty and items are only added (not played)
618 # mark first item as new index
619 if queue.current_index is None:
620 queue.current_index = 0
621 queue.current_item = self.get_item(queue_id, 0)
622 queue.items = len(queue_items)
623 self.signal_update(queue_id)
624
625 @api_command("player_queues/move_item")
626 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
627 """
628 Move queue item x up/down the queue.
629
630 - queue_id: id of the queue to process this request.
631 - queue_item_id: the item_id of the queueitem that needs to be moved.
632 - pos_shift: move item x positions down if positive value
633 - pos_shift: move item x positions up if negative value
634 - pos_shift: move item to top of queue as next item if 0.
635 """
636 queue = self._queues[queue_id]
637 item_index = self.index_by_id(queue_id, queue_item_id)
638 if item_index is None:
639 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
640 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
641 msg = f"{item_index} is already played/buffered"
642 raise IndexError(msg)
643
644 queue_items = self._queue_items[queue_id]
645 queue_items = queue_items.copy()
646
647 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
648 new_index = (queue.current_index or 0) + 1
649 elif pos_shift == 0:
650 new_index = queue.current_index or 0
651 else:
652 new_index = item_index + pos_shift
653 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
654 return
655 # move the item in the list
656 queue_items.insert(new_index, queue_items.pop(item_index))
657 self.update_items(queue_id, queue_items)
658
659 @api_command("player_queues/move_item_end")
660 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
661 """
662 Move queue item to the end the queue.
663
664 - queue_id: id of the queue to process this request.
665 - queue_item_id: the item_id of the queueitem that needs to be moved.
666 """
667 queue = self._queues[queue_id]
668 item_index = self.index_by_id(queue_id, queue_item_id)
669 if item_index is None:
670 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
671 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
672 msg = f"{item_index} is already played/buffered"
673 raise IndexError(msg)
674
675 queue_items = self._queue_items[queue_id]
676 if item_index == (len(queue_items) - 1):
677 return
678 queue_items = queue_items.copy()
679
680 new_index = len(self._queue_items[queue_id]) - 1
681
682 # move the item in the list
683 queue_items.insert(new_index, queue_items.pop(item_index))
684 self.update_items(queue_id, queue_items)
685
686 @api_command("player_queues/delete_item")
687 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
688 """Delete item (by id or index) from the queue."""
689 if isinstance(item_id_or_index, str):
690 item_index = self.index_by_id(queue_id, item_id_or_index)
691 if item_index is None:
692 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
693 else:
694 item_index = item_id_or_index
695 queue = self._queues[queue_id]
696 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
697 # ignore request if track already loaded in the buffer
698 # the frontend should guard so this is just in case
699 self.logger.warning("delete requested for item already loaded in buffer")
700 return
701 queue_items = self._queue_items[queue_id]
702 queue_items.pop(item_index)
703 self.update_items(queue_id, queue_items)
704
705 @api_command("player_queues/clear")
706 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
707 """Clear all items in the queue."""
708 queue = self._queues[queue_id]
709 queue.radio_source = []
710 if queue.state != PlaybackState.IDLE and not skip_stop:
711 self.mass.create_task(self.stop(queue_id))
712 queue.current_index = None
713 queue.current_item = None
714 queue.elapsed_time = 0
715 queue.elapsed_time_last_updated = time.time()
716 queue.index_in_buffer = None
717 self.mass.create_task(self.mass.streams.cleanup_queue_audio_data(queue_id))
718 self.update_items(queue_id, [])
719
720 @api_command("player_queues/save_as_playlist")
721 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
722 """Save the current queue items as a new playlist.
723
724 :param queue_id: The queue_id of the queue to save.
725 :param name: The name for the new playlist.
726 """
727 if not self.get(queue_id):
728 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
729 queue_items = self._queue_items.get(queue_id, [])
730 if not queue_items:
731 raise QueueEmpty("Cannot save an empty queue as a playlist.")
732 # collect URIs from queue items that are playlist-compatible
733 uris: list[str] = []
734 for item in queue_items:
735 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
736 uris.append(item.uri)
737 if not uris:
738 raise InvalidDataError("No valid items in queue to save as playlist.")
739 playlist = await self.mass.music.playlists.create_playlist(name)
740 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
741 return playlist
742
743 @api_command("player_queues/stop")
744 async def stop(self, queue_id: str) -> None:
745 """
746 Handle STOP command for given queue.
747
748 - queue_id: queue_id of the playerqueue to handle the command.
749 """
750 queue_player = self.mass.players.get_player(queue_id, True)
751 if queue_player is None:
752 raise PlayerUnavailableError(f"Player {queue_id} is not available")
753 if (queue := self.get(queue_id)) and queue.active:
754 if queue.state == PlaybackState.PLAYING:
755 queue.resume_pos = int(queue.corrected_elapsed_time)
756 # Set context to prevent circular call, then forward the actual command to the player
757 token = IN_QUEUE_COMMAND.set(True)
758 try:
759 await self.mass.players.cmd_stop(queue_id)
760 finally:
761 IN_QUEUE_COMMAND.reset(token)
762 self.mass.create_task(self.mass.streams.cleanup_queue_audio_data(queue_id))
763
764 @api_command("player_queues/play")
765 async def play(self, queue_id: str) -> None:
766 """
767 Handle PLAY command for given queue.
768
769 - queue_id: queue_id of the playerqueue to handle the command.
770 """
771 queue_player = self.mass.players.get_player(queue_id, True)
772 if queue_player is None:
773 raise PlayerUnavailableError(f"Player {queue_id} is not available")
774 if (
775 (queue := self._queues.get(queue_id))
776 and queue.active
777 and queue.state == PlaybackState.PAUSED
778 ):
779 # forward the actual play/unpause command to the player
780 await queue_player.play()
781 return
782 # player is not paused, perform resume instead
783 await self.resume(queue_id)
784
785 @api_command("player_queues/pause")
786 async def pause(self, queue_id: str) -> None:
787 """Handle PAUSE command for given queue.
788
789 - queue_id: queue_id of the playerqueue to handle the command.
790 """
791 if not (queue := self._queues.get(queue_id)):
792 return
793 queue_active = queue.active
794 if queue.active and queue.state == PlaybackState.PLAYING:
795 queue.resume_pos = int(queue.corrected_elapsed_time)
796 # forward the actual command to the player controller
797 # Set context to prevent circular call, then forward the actual command to the player
798 token = IN_QUEUE_COMMAND.set(True)
799 try:
800 await self.mass.players.cmd_pause(queue_id)
801 finally:
802 IN_QUEUE_COMMAND.reset(token)
803
804 async def _watch_pause(player: Player) -> None:
805 count = 0
806 # wait for pause
807 while count < 5 and player.state.playback_state == PlaybackState.PLAYING:
808 count += 1
809 await asyncio.sleep(1)
810 # wait for unpause
811 if player.state.playback_state != PlaybackState.PAUSED:
812 return
813 count = 0
814 while count < 30 and player.state.playback_state == PlaybackState.PAUSED:
815 count += 1
816 await asyncio.sleep(1)
817 # if player is still paused when the limit is reached, send stop
818 if player.state.playback_state == PlaybackState.PAUSED:
819 await self.stop(queue_id)
820
821 # we auto stop a player from paused when its paused for 30 seconds
822 if (
823 queue_active
824 and (queue_player := self.mass.players.get_player(queue_id))
825 and not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS)
826 ):
827 self.mass.create_task(_watch_pause(queue_player))
828
829 @api_command("player_queues/play_pause")
830 async def play_pause(self, queue_id: str) -> None:
831 """Toggle play/pause on given playerqueue.
832
833 - queue_id: queue_id of the queue to handle the command.
834 """
835 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
836 await self.pause(queue_id)
837 return
838 await self.play(queue_id)
839
840 @api_command("player_queues/next")
841 async def next(self, queue_id: str) -> None:
842 """Handle NEXT TRACK command for given queue.
843
844 - queue_id: queue_id of the queue to handle the command.
845 """
846 if (queue := self.get(queue_id)) is None or not queue.active:
847 raise InvalidCommand(f"Queue {queue_id} is not active")
848 idx = self._queues[queue_id].current_index
849 if idx is None:
850 self.logger.warning("Queue %s has no current index", queue.display_name)
851 return
852 attempts = 5
853 while attempts:
854 try:
855 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
856 await self.play_index(queue_id, next_index, debounce=True)
857 break
858 except MediaNotFoundError:
859 self.logger.warning(
860 "Failed to fetch next track for queue %s - trying next item",
861 queue.display_name,
862 )
863 idx += 1
864 attempts -= 1
865
866 @api_command("player_queues/previous")
867 async def previous(self, queue_id: str) -> None:
868 """Handle PREVIOUS TRACK command for given queue.
869
870 - queue_id: queue_id of the queue to handle the command.
871 """
872 if (queue := self.get(queue_id)) is None or not queue.active:
873 raise InvalidCommand(f"Queue {queue_id} is not active")
874 current_index = self._queues[queue_id].current_index
875 if current_index is None:
876 return
877 next_index = int(current_index)
878 # restart current track if current track has played longer than 4
879 # otherwise skip to previous track
880 if self._queues[queue_id].elapsed_time < 5:
881 next_index = max(current_index - 1, 0)
882 await self.play_index(queue_id, next_index, debounce=True)
883
884 @api_command("player_queues/skip")
885 async def skip(self, queue_id: str, seconds: int = 10) -> None:
886 """Handle SKIP command for given queue.
887
888 - queue_id: queue_id of the queue to handle the command.
889 - seconds: number of seconds to skip in track. Use negative value to skip back.
890 """
891 if (queue := self.get(queue_id)) is None or not queue.active:
892 raise InvalidCommand(f"Queue {queue_id} is not active")
893 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
894
895 @api_command("player_queues/seek")
896 async def seek(self, queue_id: str, position: int = 10) -> None:
897 """Handle SEEK command for given queue.
898
899 - queue_id: queue_id of the queue to handle the command.
900 - position: position in seconds to seek to in the current playing item.
901 """
902 if (queue := self.get(queue_id)) is None or not queue.active:
903 raise InvalidCommand(f"Queue {queue_id} is not active")
904 queue_player = self.mass.players.get_player(queue_id, True)
905 if queue_player is None:
906 raise PlayerUnavailableError(f"Player {queue_id} is not available")
907 if not queue.current_item:
908 raise InvalidCommand(f"Queue {queue_player.state.name} has no item(s) loaded.")
909 if not queue.current_item.duration:
910 raise InvalidCommand("Can not seek items without duration.")
911 position = max(0, int(position))
912 if position > queue.current_item.duration:
913 raise InvalidCommand("Can not seek outside of duration range.")
914 if queue.current_index is None:
915 raise InvalidCommand(f"Queue {queue_player.state.name} has no current index.")
916 await self.play_index(queue_id, queue.current_index, seek_position=position)
917
918 @api_command("player_queues/resume")
919 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
920 """Handle RESUME command for given queue.
921
922 - queue_id: queue_id of the queue to handle the command.
923 """
924 queue = self._queues[queue_id]
925 queue_items = self._queue_items[queue_id]
926 resume_item = queue.current_item
927 if queue.state == PlaybackState.PLAYING:
928 # resume requested while already playing,
929 # use current position as resume position
930 resume_pos = queue.corrected_elapsed_time
931 fade_in = False
932 else:
933 resume_pos = queue.resume_pos or queue.elapsed_time
934
935 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
936 resume_item = self.get_item(queue_id, queue.current_index)
937 resume_pos = 0
938 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
939 # items available in queue but no previous track, start at 0
940 resume_item = self.get_item(queue_id, 0)
941 resume_pos = 0
942
943 if resume_item is not None:
944 queue_player = self.mass.players.get_player(queue_id)
945 if queue_player is None:
946 raise PlayerUnavailableError(f"Player {queue_id} is not available")
947 if (
948 fade_in is None
949 and queue_player.state.playback_state == PlaybackState.IDLE
950 and (time.time() - queue.elapsed_time_last_updated) > 60
951 ):
952 # enable fade in effect if the player is idle for a while
953 fade_in = resume_pos > 0
954 if resume_item.media_type == MediaType.RADIO:
955 # we're not able to skip in online radio so this is pointless
956 resume_pos = 0
957 await self.play_index(
958 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
959 )
960 else:
961 # Queue is empty, try to resume from playlog
962 if await self._try_resume_from_playlog(queue):
963 return
964 msg = f"Resume queue requested but queue {queue.display_name} is empty"
965 raise QueueEmpty(msg)
966
967 @api_command("player_queues/play_index")
968 async def play_index(
969 self,
970 queue_id: str,
971 index: int | str,
972 seek_position: int = 0,
973 fade_in: bool = False,
974 debounce: bool = False,
975 ) -> None:
976 """Play item at index (or item_id) X in queue."""
977 queue = self._queues[queue_id]
978 queue.resume_pos = 0
979 if isinstance(index, str):
980 temp_index = self.index_by_id(queue_id, index)
981 if temp_index is None:
982 raise InvalidDataError(f"Item {index} not found in queue")
983 index = temp_index
984 # At this point index is guaranteed to be int
985 queue.current_index = index
986 # update current item and elapsed time and signal update
987 # this way the UI knows immediately that a new item is loading
988 queue.current_item = self.get_item(queue_id, index)
989 queue.elapsed_time = seek_position
990 queue.elapsed_time_last_updated = time.time()
991 self.signal_update(queue_id)
992 queue.index_in_buffer = index
993 queue.flow_mode_stream_log = []
994 target_player = self.mass.players.get_player(queue_id)
995 if target_player is None:
996 raise PlayerUnavailableError(f"Player {queue_id} is not available")
997 queue.next_item_id_enqueued = None
998 # always update session id when we start a new playback session
999 queue.session_id = shortuuid.random(length=8)
1000 # handle resume point of audiobook(chapter) or podcast(episode)
1001 if (
1002 not seek_position
1003 and (queue_item := self.get_item(queue_id, index))
1004 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
1005 ):
1006 seek_position = max(0, int((resume_position_ms - 500) / 1000))
1007
1008 # send play_media request to player
1009 # NOTE that we debounce this a bit to account for someone hitting the next button
1010 # like a madman. This will prevent the player from being overloaded with requests.
1011 async def _play_index(index: int, debounce: bool) -> None:
1012 for attempt in range(5):
1013 try:
1014 queue_item = self.get_item(queue_id, index)
1015 if not queue_item:
1016 continue # guard
1017 await self._load_item(
1018 queue_item,
1019 self._get_next_index(queue_id, index),
1020 is_start=True,
1021 seek_position=seek_position if attempt == 0 else 0,
1022 fade_in=fade_in if attempt == 0 else False,
1023 )
1024 # if we reach this point, loading the item succeeded, break the loop
1025 queue.current_index = index
1026 queue.current_item = queue_item
1027 break
1028 except (MediaNotFoundError, AudioError):
1029 # the requested index can not be played.
1030 if queue_item:
1031 self.logger.warning(
1032 "Skipping unplayable item %s (%s)",
1033 queue_item.name,
1034 queue_item.uri,
1035 )
1036 queue_item.available = False
1037 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
1038 if next_index is None:
1039 raise MediaNotFoundError("No next item available")
1040 index = next_index
1041 else:
1042 # all attempts to find a playable item failed
1043 raise MediaNotFoundError("No playable item found to start playback")
1044
1045 # work out if we need to use flow mode
1046 flow_mode = target_player.flow_mode and queue_item.media_type not in (
1047 # don't use flow mode for duration-less streams
1048 MediaType.RADIO,
1049 MediaType.PLUGIN_SOURCE,
1050 )
1051 await asyncio.sleep(0.5 if debounce else 0.1)
1052 queue.flow_mode = flow_mode
1053 await self.mass.players.play_media(
1054 player_id=queue_id,
1055 media=await self.player_media_from_queue_item(queue_item, flow_mode),
1056 )
1057 queue.current_index = index
1058 queue.current_item = queue_item
1059 await asyncio.sleep(2)
1060 self._transitioning_players.discard(queue_id)
1061
1062 # we set a flag to notify the update logic that we're transitioning to a new track
1063 self._transitioning_players.add(queue_id)
1064
1065 # we debounce the play_index command to handle the case where someone
1066 # is spamming next/previous on the player
1067 task_id = f"play_index_{queue_id}"
1068 if existing_task := self.mass.get_task(task_id):
1069 existing_task.cancel()
1070 with suppress(asyncio.CancelledError):
1071 await existing_task
1072 task = self.mass.create_task(
1073 _play_index,
1074 index,
1075 debounce,
1076 task_id=task_id,
1077 )
1078 await task
1079 self.signal_update(queue_id)
1080
1081 @api_command("player_queues/transfer")
1082 async def transfer_queue(
1083 self,
1084 source_queue_id: str,
1085 target_queue_id: str,
1086 auto_play: bool | None = None,
1087 ) -> None:
1088 """Transfer queue to another queue."""
1089 if not (source_queue := self.get(source_queue_id)):
1090 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1091 if not (target_queue := self.get(target_queue_id)):
1092 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1093 if auto_play is None:
1094 auto_play = source_queue.state == PlaybackState.PLAYING
1095
1096 target_player = self.mass.players.get_player(target_queue_id)
1097 if target_player is None:
1098 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1099 if target_player.state.active_group or target_player.state.synced_to:
1100 # edge case: the user wants to move playback from the group as a whole, to a single
1101 # player in the group or it is grouped and the command targeted at the single player.
1102 # We need to dissolve the group first.
1103 group_id = target_player.state.active_group or target_player.state.synced_to
1104 assert group_id is not None # checked in if condition above
1105 await self.mass.players.cmd_ungroup(group_id)
1106 await asyncio.sleep(3)
1107
1108 source_items = self._queue_items[source_queue_id]
1109 target_queue.repeat_mode = source_queue.repeat_mode
1110 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1111 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1112 target_queue.radio_source = source_queue.radio_source
1113 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1114 target_queue.resume_pos = int(source_queue.elapsed_time)
1115 target_queue.current_index = source_queue.current_index
1116 if source_queue.current_item:
1117 target_queue.current_item = source_queue.current_item
1118 target_queue.current_item.queue_id = target_queue_id
1119 self.clear(source_queue_id)
1120
1121 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1122 for item in source_items:
1123 item.queue_id = target_queue_id
1124 self.update_items(target_queue_id, source_items)
1125 if auto_play:
1126 await self.resume(target_queue_id)
1127
1128 # Interaction with player
1129
1130 async def on_player_register(self, player: Player) -> None:
1131 """Register PlayerQueue for given player/queue id."""
1132 queue_id = player.player_id
1133 queue: PlayerQueue | None = None
1134 queue_items: list[QueueItem] = []
1135 # try to restore previous state
1136 if prev_state := await self.mass.cache.get(
1137 key=queue_id,
1138 provider=self.domain,
1139 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1140 ):
1141 try:
1142 queue = PlayerQueue.from_dict(prev_state)
1143 prev_items = await self.mass.cache.get(
1144 key=queue_id,
1145 provider=self.domain,
1146 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1147 default=[],
1148 )
1149 queue_items = []
1150 for idx, item_data in enumerate(prev_items):
1151 qi = QueueItem.from_cache(item_data)
1152 if not qi.media_item:
1153 # Skip items with missing media_item - this can happen if
1154 # MA was killed during shutdown while cache was being written
1155 self.logger.debug(
1156 "Skipping queue item %s (index %d) restored from cache "
1157 "without media_item",
1158 qi.name,
1159 idx,
1160 )
1161 continue
1162 queue_items.append(qi)
1163 if queue.enqueued_media_items:
1164 # we need to restore the MediaItem objects for the enqueued media items
1165 # Items from cache may be dicts that need deserialization
1166 restored_enqueued_items: list[MediaItemType] = []
1167 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1168 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1169 )
1170 for item in cached_items:
1171 if isinstance(item, dict):
1172 restored_item = media_from_dict(item)
1173 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1174 else:
1175 restored_enqueued_items.append(item)
1176 queue.enqueued_media_items = restored_enqueued_items
1177 except Exception as err:
1178 self.logger.warning(
1179 "Failed to restore the queue(items) for %s - %s",
1180 player.state.name,
1181 str(err),
1182 )
1183 # Reset to clean state on failure
1184 queue = None
1185 queue_items = []
1186 if queue is None:
1187 queue = PlayerQueue(
1188 queue_id=queue_id,
1189 active=False,
1190 display_name=player.state.name,
1191 available=player.state.available,
1192 dont_stop_the_music_enabled=False,
1193 items=0,
1194 )
1195
1196 self._queues[queue_id] = queue
1197 self._queue_items[queue_id] = queue_items
1198 # always call update to calculate state etc
1199 self.on_player_update(player, {})
1200 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1201
1202 def on_player_update(
1203 self,
1204 player: Player,
1205 changed_values: dict[str, tuple[Any, Any]],
1206 ) -> None:
1207 """
1208 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1209
1210 NOTE: This is called every second if the player is playing.
1211 """
1212 queue_id = player.player_id
1213 if (queue := self._queues.get(queue_id)) is None:
1214 # race condition
1215 return
1216 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1217 # do nothing while the announcement is in progress
1218 return
1219 # determine if this queue is currently active for this player
1220 queue.active = player.state.active_source in (queue.queue_id, None)
1221 if not queue.active and queue_id not in self._prev_states:
1222 queue.state = PlaybackState.IDLE
1223 # return early if the queue is not active and we have no previous state
1224 return
1225 if queue.queue_id in self._transitioning_players:
1226 # we're currently transitioning to a new track,
1227 # ignore updates from the player during this time
1228 return
1229
1230 # queue is active and preflight checks passed, update the queue details
1231 self._update_queue_from_player(player)
1232
1233 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1234 """Call when a player is removed from the registry."""
1235 if permanent:
1236 # if the player is permanently removed, we also remove the cached queue data
1237 self.mass.create_task(
1238 self.mass.cache.delete(
1239 key=player_id,
1240 provider=self.domain,
1241 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1242 )
1243 )
1244 self.mass.create_task(
1245 self.mass.cache.delete(
1246 key=player_id,
1247 provider=self.domain,
1248 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1249 )
1250 )
1251 self._queues.pop(player_id, None)
1252 self._queue_items.pop(player_id, None)
1253
1254 async def load_next_queue_item(
1255 self,
1256 queue_id: str,
1257 current_item_id: str,
1258 ) -> QueueItem:
1259 """
1260 Call when a player wants the next queue item to play.
1261
1262 Raises QueueEmpty if there are no more tracks left.
1263 """
1264 queue = self.get(queue_id)
1265 if not queue:
1266 msg = f"PlayerQueue {queue_id} is not available"
1267 raise PlayerUnavailableError(msg)
1268 cur_index = self.index_by_id(queue_id, current_item_id)
1269 if cur_index is None:
1270 # this is just a guard for bad data
1271 raise QueueEmpty("Invalid item id for queue given.")
1272 next_item: QueueItem | None = None
1273 idx = 0
1274 while True:
1275 next_index = self._get_next_index(queue_id, cur_index + idx)
1276 if next_index is None:
1277 raise QueueEmpty("No more tracks left in the queue.")
1278 queue_item = self.get_item(queue_id, next_index)
1279 if queue_item is None:
1280 raise QueueEmpty("No more tracks left in the queue.")
1281 if idx >= 10:
1282 # we only allow 10 retries to prevent infinite loops
1283 raise QueueEmpty("No more (playable) tracks left in the queue.")
1284 try:
1285 await self._load_item(queue_item, next_index)
1286 # we're all set, this is our next item
1287 next_item = queue_item
1288 break
1289 except (MediaNotFoundError, AudioError):
1290 # No stream details found, skip this QueueItem
1291 self.logger.warning(
1292 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1293 )
1294 queue_item.available = False
1295 idx += 1
1296 if idx != 0:
1297 # we skipped some items, signal a queue items update
1298 self.update_items(queue_id, self._queue_items[queue_id])
1299 if next_item is None:
1300 raise QueueEmpty("No more (playable) tracks left in the queue.")
1301
1302 return next_item
1303
1304 async def _load_item(
1305 self,
1306 queue_item: QueueItem,
1307 next_index: int | None,
1308 is_start: bool = False,
1309 seek_position: int = 0,
1310 fade_in: bool = False,
1311 ) -> None:
1312 """Try to load the stream details for the given queue item."""
1313 queue_id = queue_item.queue_id
1314 queue = self._queues[queue_id]
1315
1316 # we use a contextvar to bypass the throttler for this asyncio task/context
1317 # this makes sure that playback has priority over other requests that may be
1318 # happening in the background
1319 BYPASS_THROTTLER.set(True)
1320
1321 self.logger.debug(
1322 "(pre)loading (next) item for queue %s...",
1323 queue.display_name,
1324 )
1325
1326 if not queue_item.available:
1327 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1328
1329 # work out if we are playing an album and if we should prefer album
1330 # loudness
1331 next_track_from_same_album = (
1332 next_index is not None
1333 and (next_item := self.get_item(queue_id, next_index))
1334 and (
1335 queue_item.media_item
1336 and hasattr(queue_item.media_item, "album")
1337 and queue_item.media_item.album
1338 and next_item.media_item
1339 and hasattr(next_item.media_item, "album")
1340 and next_item.media_item.album
1341 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1342 )
1343 )
1344 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1345 if current_index is None:
1346 previous_track_from_same_album = False
1347 else:
1348 previous_index = max(current_index - 1, 0)
1349 previous_track_from_same_album = (
1350 previous_index > 0
1351 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1352 and previous_item.media_item is not None
1353 and hasattr(previous_item.media_item, "album")
1354 and previous_item.media_item.album is not None
1355 and queue_item.media_item is not None
1356 and hasattr(queue_item.media_item, "album")
1357 and queue_item.media_item.album is not None
1358 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1359 )
1360 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1361 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1362 album = queue_item.media_item.album
1363 # prefer the full library media item so we have all metadata and provider(quality) info
1364 # always request the full library item as there might be other qualities available
1365 if library_item := await self.mass.music.get_library_item_by_prov_id(
1366 queue_item.media_item.media_type,
1367 queue_item.media_item.item_id,
1368 queue_item.media_item.provider,
1369 ):
1370 queue_item.media_item = cast("Track", library_item)
1371 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1372 "ytmusic"
1373 ):
1374 # Youtube Music has poor thumbs by default, so we always fetch the full item
1375 # this also catches the case where they have an unavailable item in a listing
1376 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1377 queue_item.media_item = cast("Track", fetched_item)
1378
1379 # ensure we got the full (original) album set
1380 if album and (
1381 library_album := await self.mass.music.get_library_item_by_prov_id(
1382 album.media_type,
1383 album.item_id,
1384 album.provider,
1385 )
1386 ):
1387 queue_item.media_item.album = cast("Album", library_album)
1388 elif album:
1389 # Restore original album if we have no better alternative from the library
1390 queue_item.media_item.album = album
1391 # prefer album image over track image
1392 if queue_item.media_item.album and queue_item.media_item.album.image:
1393 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1394 queue_item.media_item.metadata.images = UniqueList(
1395 [
1396 queue_item.media_item.album.image,
1397 *org_images,
1398 ]
1399 )
1400 # Fetch the streamdetails, which could raise in case of an unplayable item.
1401 # For example, YT Music returns Radio Items that are not playable.
1402 queue_item.streamdetails = await get_stream_details(
1403 mass=self.mass,
1404 queue_item=queue_item,
1405 seek_position=seek_position,
1406 fade_in=fade_in,
1407 prefer_album_loudness=bool(playing_album_tracks),
1408 )
1409
1410 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1411 """Call when a player has (started) loading a track in the buffer."""
1412 queue = self.get(queue_id)
1413 if not queue:
1414 msg = f"PlayerQueue {queue_id} is not available"
1415 raise PlayerUnavailableError(msg)
1416 # store the index of the item that is currently (being) loaded in the buffer
1417 # which helps us a bit to determine how far the player has buffered ahead
1418 current_index = self.index_by_id(queue_id, item_id)
1419 queue.index_in_buffer = current_index
1420 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1421 self.signal_update(queue_id)
1422 # preload next streamdetails
1423 self._preload_next_item(queue_id, item_id)
1424 # clean up stale audio buffers for old queue items to prevent memory leaks
1425 if current_index is not None:
1426 self.mass.create_task(
1427 self.mass.streams.cleanup_stale_queue_buffers(queue_id, current_index)
1428 )
1429
1430 # Main queue manipulation methods
1431
1432 async def load(
1433 self,
1434 queue_id: str,
1435 queue_items: list[QueueItem],
1436 insert_at_index: int = 0,
1437 keep_remaining: bool = True,
1438 keep_played: bool = True,
1439 shuffle: bool = False,
1440 ) -> None:
1441 """Load new items at index.
1442
1443 - queue_id: id of the queue to process this request.
1444 - queue_items: a list of QueueItems
1445 - insert_at_index: insert the item(s) at this index
1446 - keep_remaining: keep the remaining items after the insert
1447 - shuffle: (re)shuffle the items after insert index
1448 """
1449 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1450 next_items = queue_items
1451
1452 # if keep_remaining, append the old 'next' items
1453 if keep_remaining:
1454 next_items += self._queue_items[queue_id][insert_at_index:]
1455
1456 # we set the original insert order as attribute so we can un-shuffle
1457 for index, item in enumerate(next_items):
1458 item.sort_index += insert_at_index + index
1459 # (re)shuffle the final batch if needed
1460 if shuffle:
1461 next_items = await _smart_shuffle(next_items)
1462 self.update_items(queue_id, prev_items + next_items)
1463
1464 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1465 """Update the existing queue items, mostly caused by reordering."""
1466 self._queue_items[queue_id] = queue_items
1467 queue = self._queues[queue_id]
1468 queue.items = len(self._queue_items[queue_id])
1469 # to track if the queue items changed we set a timestamp
1470 # this is a simple way to detect changes in the list of items
1471 # without having to compare the entire list
1472 queue.items_last_updated = time.time()
1473 self.signal_update(queue_id, True)
1474 if (
1475 queue.state == PlaybackState.PLAYING
1476 and queue.index_in_buffer is not None
1477 and queue.index_in_buffer == queue.current_index
1478 ):
1479 # if the queue is playing,
1480 # ensure to (re)queue the next track because it might have changed
1481 # note that we only do this if the player has loaded the current track
1482 # if not, we wait until it has loaded to prevent conflicts
1483 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1484 self._enqueue_next_item(queue_id, next_item)
1485
1486 # Helper methods
1487
1488 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1489 """Get queue item by index or item_id."""
1490 if item_id_or_index is None:
1491 return None
1492 if (queue_items := self._queue_items.get(queue_id)) is None:
1493 return None
1494 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1495 return queue_items[item_id_or_index]
1496 if isinstance(item_id_or_index, str):
1497 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1498 return None
1499
1500 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1501 """Signal state changed of given queue."""
1502 queue = self._queues[queue_id]
1503 if items_changed:
1504 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1505 # save items in cache - only cache items with valid media_item
1506 cache_data = [
1507 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1508 ]
1509 self.mass.create_task(
1510 self.mass.cache.set(
1511 key=queue_id,
1512 data=cache_data,
1513 provider=self.domain,
1514 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1515 )
1516 )
1517 # always send the base event
1518 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1519 # also signal update to the player itself so it can update its current_media
1520 self.mass.players.trigger_player_update(queue_id)
1521 # save state
1522 self.mass.create_task(
1523 self.mass.cache.set(
1524 key=queue_id,
1525 data=queue.to_cache(),
1526 provider=self.domain,
1527 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1528 )
1529 )
1530
1531 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1532 """Get index by queue_item_id."""
1533 queue_items = self._queue_items[queue_id]
1534 for index, item in enumerate(queue_items):
1535 if item.queue_item_id == queue_item_id:
1536 return index
1537 return None
1538
1539 async def player_media_from_queue_item(
1540 self, queue_item: QueueItem, flow_mode: bool
1541 ) -> PlayerMedia:
1542 """Parse PlayerMedia from QueueItem."""
1543 queue = self._queues[queue_item.queue_id]
1544 if flow_mode:
1545 duration = None
1546 elif queue_item.streamdetails:
1547 # prefer netto duration
1548 # when seeking, the player only receives the remaining duration
1549 duration = queue_item.streamdetails.duration or queue_item.duration
1550 if duration and queue_item.streamdetails.seek_position:
1551 duration = duration - queue_item.streamdetails.seek_position
1552 else:
1553 duration = queue_item.duration
1554 if queue.session_id is None:
1555 # handle error or return early
1556 raise InvalidDataError("Queue session_id is None")
1557 media = PlayerMedia(
1558 uri=queue_item.uri,
1559 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1560 title="Music Assistant" if flow_mode else queue_item.name,
1561 image_url=MASS_LOGO_ONLINE,
1562 duration=duration,
1563 source_id=queue_item.queue_id,
1564 queue_item_id=queue_item.queue_item_id,
1565 custom_data={
1566 "session_id": queue.session_id,
1567 "original_uri": queue_item.uri,
1568 "flow_mode": flow_mode,
1569 },
1570 )
1571 if not flow_mode and queue_item.media_item:
1572 media.title = queue_item.media_item.name
1573 media.artist = getattr(queue_item.media_item, "artist_str", "")
1574 media.album = (
1575 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1576 )
1577 if queue_item.image:
1578 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1579 # we prefer the imageproxy on the streamserver here because this request is sent
1580 # to the player itself which may not be able to reach the regular webserver
1581 media.image_url = self.mass.metadata.get_image_url(
1582 queue_item.image, size=500, prefer_stream_server=True
1583 )
1584 return media
1585
1586 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1587 """Return tracks for given artist, based on user preference."""
1588 artist_items_conf = self.mass.config.get_raw_core_config_value(
1589 self.domain,
1590 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1591 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1592 )
1593 self.logger.info(
1594 "Fetching tracks to play for artist %s",
1595 artist.name,
1596 )
1597 if artist_items_conf in ("library_tracks", "all_tracks"):
1598 all_items = await self.mass.music.artists.tracks(
1599 artist.item_id,
1600 artist.provider,
1601 in_library_only=artist_items_conf == "library_tracks",
1602 )
1603 random.shuffle(all_items)
1604 return all_items
1605 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1606 all_tracks: list[Track] = []
1607 for library_album in await self.mass.music.artists.albums(
1608 artist.item_id,
1609 artist.provider,
1610 in_library_only=artist_items_conf == "library_album_tracks",
1611 ):
1612 for album_track in await self.mass.music.albums.tracks(
1613 library_album.item_id, library_album.provider
1614 ):
1615 if album_track not in all_tracks:
1616 all_tracks.append(album_track)
1617 random.shuffle(all_tracks)
1618 return all_tracks
1619 return []
1620
1621 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1622 """Return tracks for given album, based on user preference."""
1623 album_items_conf = self.mass.config.get_raw_core_config_value(
1624 self.domain,
1625 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1626 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1627 )
1628 result: list[Track] = []
1629 start_item_found = False
1630 self.logger.info(
1631 "Fetching tracks to play for album %s",
1632 album.name,
1633 )
1634 for album_track in await self.mass.music.albums.tracks(
1635 item_id=album.item_id,
1636 provider_instance_id_or_domain=album.provider,
1637 in_library_only=album_items_conf == "library_tracks",
1638 ):
1639 if not album_track.available:
1640 continue
1641 if start_item in (album_track.item_id, album_track.uri):
1642 start_item_found = True
1643 if start_item is not None and not start_item_found:
1644 continue
1645 result.append(album_track)
1646 return result
1647
1648 async def get_genre_tracks(self, genre: Genre, start_item: str | None) -> list[Track]:
1649 """Return tracks for given genre, based on alias mappings.
1650
1651 Limits results to avoid loading thousands of tracks for broad genres.
1652 Directly mapped tracks are fetched with random ordering, then supplemented
1653 with tracks from a limited set of mapped albums and artists.
1654 """
1655 result: list[Track] = []
1656 start_item_found = False
1657 self.logger.info(
1658 "Fetching tracks to play for genre %s",
1659 genre.name,
1660 )
1661 tracks, albums, artists = await self.mass.music.genres.mapped_media(
1662 genre,
1663 track_limit=25,
1664 album_limit=5,
1665 artist_limit=5,
1666 order_by="random",
1667 )
1668
1669 for genre_track in tracks:
1670 if not genre_track.available:
1671 continue
1672 if start_item in (genre_track.item_id, genre_track.uri):
1673 start_item_found = True
1674 if start_item is not None and not start_item_found:
1675 continue
1676 result.append(genre_track)
1677
1678 for album in albums:
1679 album_tracks = await self.get_album_tracks(album, None)
1680 result.extend(album_tracks[:5])
1681
1682 for artist in artists:
1683 artist_tracks = await self.get_artist_tracks(artist)
1684 result.extend(artist_tracks[:5])
1685 return result
1686
1687 async def get_playlist_tracks(
1688 self, playlist: Playlist, start_item: str | None
1689 ) -> list[PlaylistPlayableItem]:
1690 """Return tracks for given playlist, based on user preference."""
1691 result: list[PlaylistPlayableItem] = []
1692 start_item_found = False
1693 self.logger.info(
1694 "Fetching tracks to play for playlist %s",
1695 playlist.name,
1696 )
1697 # TODO: Handle other sort options etc.
1698 async for playlist_track in self.mass.music.playlists.tracks(
1699 playlist.item_id, playlist.provider
1700 ):
1701 if not playlist_track.available:
1702 continue
1703 if start_item in (playlist_track.item_id, playlist_track.uri):
1704 start_item_found = True
1705 if start_item is not None and not start_item_found:
1706 continue
1707 result.append(playlist_track)
1708 return result
1709
1710 async def get_audiobook_resume_point(
1711 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1712 ) -> int:
1713 """Return resume point (in milliseconds) for given audio book."""
1714 self.logger.debug(
1715 "Fetching resume point to play for audio book %s",
1716 audio_book.name,
1717 )
1718 if chapter is not None:
1719 # user explicitly selected a chapter to play
1720 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1721 if chapters := audio_book.metadata.chapters:
1722 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1723 return int(_chapter.start * 1000)
1724 raise InvalidDataError(
1725 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1726 )
1727 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1728 audio_book, userid=userid
1729 )
1730 return 0 if full_played else resume_position_ms
1731
1732 async def get_next_podcast_episodes(
1733 self,
1734 podcast: Podcast | None,
1735 episode: PodcastEpisode | str | None,
1736 userid: str | None = None,
1737 ) -> UniqueList[PodcastEpisode]:
1738 """Return (next) episode(s) and resume point for given podcast."""
1739 if podcast is None and isinstance(episode, str | NoneType):
1740 raise InvalidDataError("Either podcast or episode must be provided")
1741 if podcast is None:
1742 # single podcast episode requested
1743 assert isinstance(episode, PodcastEpisode) # checked above
1744 self.logger.debug(
1745 "Fetching resume point to play for Podcast episode %s",
1746 episode.name,
1747 )
1748 (
1749 fully_played,
1750 resume_position_ms,
1751 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1752 episode.fully_played = fully_played
1753 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1754 return UniqueList([episode])
1755 # podcast with optional start episode requested
1756 self.logger.debug(
1757 "Fetching episode(s) and resume point to play for Podcast %s",
1758 podcast.name,
1759 )
1760 all_episodes = [
1761 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1762 ]
1763 all_episodes.sort(key=lambda x: x.position)
1764 # if a episode was provided, a user explicitly selected a episode to play
1765 # so we need to find the index of the episode in the list
1766 resolved_episode: PodcastEpisode | None = None
1767 if isinstance(episode, PodcastEpisode):
1768 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1769 if resolved_episode:
1770 # ensure we have accurate resume info
1771 (
1772 fully_played,
1773 resume_position_ms,
1774 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1775 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1776 elif isinstance(episode, str):
1777 resolved_episode = next(
1778 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1779 )
1780 if resolved_episode:
1781 # ensure we have accurate resume info
1782 (
1783 fully_played,
1784 resume_position_ms,
1785 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1786 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1787 else:
1788 # get first episode that is not fully played
1789 for ep in all_episodes:
1790 if ep.fully_played:
1791 continue
1792 # ensure we have accurate resume info
1793 (
1794 fully_played,
1795 resume_position_ms,
1796 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1797 if fully_played:
1798 continue
1799 ep.resume_position_ms = resume_position_ms
1800 resolved_episode = ep
1801 break
1802 else:
1803 # no episodes found that are not fully played, so we start at the beginning
1804 resolved_episode = next((x for x in all_episodes), None)
1805 if resolved_episode is None:
1806 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1807 # get the index of the episode
1808 episode_index = all_episodes.index(resolved_episode)
1809 # return the (remaining) episode(s) to play
1810 return UniqueList(all_episodes[episode_index:])
1811
1812 def _get_next_index(
1813 self,
1814 queue_id: str,
1815 cur_index: int | None,
1816 is_skip: bool = False,
1817 allow_repeat: bool = True,
1818 ) -> int | None:
1819 """
1820 Return the next index for the queue, accounting for repeat settings.
1821
1822 Will return None if there are no (more) items in the queue.
1823 """
1824 queue = self._queues[queue_id]
1825 queue_items = self._queue_items[queue_id]
1826 if not queue_items or cur_index is None:
1827 # queue is empty
1828 return None
1829 # handle repeat single track
1830 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1831 return cur_index if allow_repeat else None
1832 # handle cur_index is last index of the queue
1833 if cur_index >= (len(queue_items) - 1):
1834 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1835 # if repeat all is enabled, we simply start again from the beginning
1836 return 0
1837 return None
1838 # all other: just the next index
1839 return cur_index + 1
1840
1841 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1842 """Return next QueueItem for given queue."""
1843 index: int
1844 if isinstance(cur_index, str):
1845 resolved_index = self.index_by_id(queue_id, cur_index)
1846 if resolved_index is None:
1847 return None # guard
1848 index = resolved_index
1849 else:
1850 index = cur_index
1851 # At this point index is guaranteed to be int
1852 for skip in range(5):
1853 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1854 break
1855 next_item = self.get_item(queue_id, next_index)
1856 if next_item is None:
1857 continue
1858 if not next_item.available:
1859 # ensure that we skip unavailable items (set by load_next track logic)
1860 continue
1861 return next_item
1862 return None
1863
1864 async def _fill_radio_tracks(self, queue_id: str) -> None:
1865 """Fill a Queue with (additional) Radio tracks."""
1866 self.logger.debug(
1867 "Filling radio tracks for queue %s",
1868 queue_id,
1869 )
1870 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1871 # fill queue - filter out unavailable items
1872 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1873 await self.load(
1874 queue_id,
1875 queue_items,
1876 insert_at_index=len(self._queue_items[queue_id]) + 1,
1877 )
1878
1879 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1880 """Enqueue the next item on the player."""
1881 if not next_item:
1882 # no next item, nothing to do...
1883 return
1884
1885 queue = self._queues[queue_id]
1886 if queue.flow_mode:
1887 # ignore this for flow mode
1888 return
1889
1890 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1891 await self.mass.players.enqueue_next_media(
1892 player_id=queue_id,
1893 media=await self.player_media_from_queue_item(next_item, False),
1894 )
1895 if queue.next_item_id_enqueued != next_item.queue_item_id:
1896 queue.next_item_id_enqueued = next_item.queue_item_id
1897 self.logger.debug(
1898 "Enqueued next track %s on queue %s",
1899 next_item.name,
1900 self._queues[queue_id].display_name,
1901 )
1902
1903 task_id = f"enqueue_next_item_{queue_id}"
1904 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1905
1906 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1907 """
1908 Preload the streamdetails for the next item in the queue/buffer.
1909
1910 This basically ensures the item is playable and fetches the stream details.
1911 If an error occurs, the item will be skipped and the next item will be loaded.
1912 """
1913 queue = self._queues[queue_id]
1914
1915 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1916 try:
1917 # wait for the item that was loaded in the buffer is the actually playing item
1918 # this prevents a race condition when we preload the next item too soon
1919 # while the player is actually preloading the previously enqueued item.
1920 retries = 120
1921 while retries > 0:
1922 if not queue.current_item:
1923 return # guard
1924 if queue.current_item.queue_item_id == item_id_in_buffer:
1925 break
1926 retries -= 1
1927 await asyncio.sleep(1)
1928 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1929 self.logger.debug(
1930 "Preloaded next item %s for queue %s",
1931 next_item.name,
1932 queue.display_name,
1933 )
1934 # enqueue the next item on the player
1935 self._enqueue_next_item(queue_id, next_item)
1936
1937 except QueueEmpty:
1938 return
1939
1940 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1941 # this should not happen, but guard anyways
1942 return
1943 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1944 # radio items or no duration, nothing to do
1945 return
1946
1947 task_id = f"preload_next_item_{queue_id}"
1948 self.mass.create_task(
1949 _preload_streamdetails,
1950 item_id_in_buffer,
1951 task_id=task_id,
1952 abort_existing=True,
1953 )
1954
1955 async def _resolve_media_items(
1956 self,
1957 media_item: MediaItemType | ItemMapping | BrowseFolder,
1958 start_item: str | None = None,
1959 userid: str | None = None,
1960 queue_id: str | None = None,
1961 ) -> list[MediaItemType]:
1962 """Resolve/unwrap media items to enqueue."""
1963 # resolve Itemmapping to full media item
1964 if isinstance(media_item, ItemMapping):
1965 if media_item.uri is None:
1966 raise InvalidDataError("ItemMapping has no URI")
1967 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1968 if media_item.media_type == MediaType.PLAYLIST:
1969 media_item = cast("Playlist", media_item)
1970 self.mass.create_task(
1971 self.mass.music.mark_item_played(
1972 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1973 )
1974 )
1975 return list(await self.get_playlist_tracks(media_item, start_item))
1976 if media_item.media_type == MediaType.ARTIST:
1977 media_item = cast("Artist", media_item)
1978 self.mass.create_task(
1979 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1980 )
1981 return list(await self.get_artist_tracks(media_item))
1982 if media_item.media_type == MediaType.ALBUM:
1983 media_item = cast("Album", media_item)
1984 self.mass.create_task(
1985 self.mass.music.mark_item_played(
1986 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1987 )
1988 )
1989 return list(await self.get_album_tracks(media_item, start_item))
1990 if media_item.media_type == MediaType.GENRE:
1991 media_item = cast("Genre", media_item)
1992 self.mass.create_task(
1993 self.mass.music.mark_item_played(
1994 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1995 )
1996 )
1997 return list(await self.get_genre_tracks(media_item, start_item))
1998 if media_item.media_type == MediaType.AUDIOBOOK:
1999 media_item = cast("Audiobook", media_item)
2000 # ensure we grab the correct/latest resume point info
2001 media_item.resume_position_ms = await self.get_audiobook_resume_point(
2002 media_item, start_item, userid=userid
2003 )
2004 return [media_item]
2005 if media_item.media_type == MediaType.PODCAST:
2006 media_item = cast("Podcast", media_item)
2007 self.mass.create_task(
2008 self.mass.music.mark_item_played(
2009 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2010 )
2011 )
2012 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
2013 if media_item.media_type == MediaType.PODCAST_EPISODE:
2014 media_item = cast("PodcastEpisode", media_item)
2015 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
2016 if media_item.media_type == MediaType.FOLDER:
2017 media_item = cast("BrowseFolder", media_item)
2018 return list(await self._get_folder_tracks(media_item))
2019 # all other: single track or radio item
2020 return [cast("MediaItemType", media_item)]
2021
2022 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
2023 """Try to resume playback from playlog when queue is empty.
2024
2025 Attempts to find user-initiated recently played items in the following order:
2026 1. By userid AND queue_id
2027 2. By queue_id only
2028 3. By userid only (if available)
2029 4. Any recently played item
2030
2031 :param queue: The queue to resume playback on.
2032 :return: True if playback was started, False otherwise.
2033 """
2034 # Try different filter combinations in order of specificity
2035 filter_attempts: list[tuple[str | None, str | None, str]] = []
2036 if queue.userid:
2037 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
2038 filter_attempts.append((None, queue.queue_id, "queue_id match"))
2039 if queue.userid:
2040 filter_attempts.append((queue.userid, None, "userid match"))
2041 filter_attempts.append((None, None, "any recent item"))
2042
2043 for userid, queue_id, match_type in filter_attempts:
2044 items = await self.mass.music.recently_played(
2045 limit=5,
2046 fully_played_only=False,
2047 user_initiated_only=True,
2048 userid=userid,
2049 queue_id=queue_id,
2050 )
2051 for item in items:
2052 if not item.uri:
2053 continue
2054 try:
2055 await self.play_media(queue.queue_id, item)
2056 self.logger.info(
2057 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
2058 )
2059 return True
2060 except MusicAssistantError as err:
2061 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
2062 continue
2063
2064 return False
2065
2066 async def _get_radio_tracks(
2067 self, queue_id: str, is_initial_radio_mode: bool = False
2068 ) -> list[Track]:
2069 """Call the registered music providers for dynamic tracks."""
2070 queue = self._queues[queue_id]
2071 queue_track_items: list[Track] = [
2072 q.media_item
2073 for q in self._queue_items[queue_id]
2074 if q.media_item and isinstance(q.media_item, Track)
2075 ]
2076 if not queue.radio_source:
2077 # this may happen during race conditions as this method is called delayed
2078 return []
2079 self.logger.info(
2080 "Fetching radio tracks for queue %s based on: %s",
2081 queue.display_name,
2082 ", ".join([x.name for x in queue.radio_source]),
2083 )
2084
2085 # Get user's preferred provider instances for steering provider selection
2086 preferred_provider_instances: list[str] | None = None
2087 if (
2088 queue.userid
2089 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
2090 and playback_user.provider_filter
2091 ):
2092 preferred_provider_instances = playback_user.provider_filter
2093
2094 available_base_tracks: list[Track] = []
2095 base_track_sample_size = 5
2096 # Some providers have very deterministic similar track algorithms when providing
2097 # a single track item. When we have a radio mode based on 1 track and we have to
2098 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2099 if (
2100 len(queue.radio_source) == 1
2101 and queue.radio_source[0].media_type == MediaType.TRACK
2102 and not is_initial_radio_mode
2103 ):
2104 available_base_tracks = queue_track_items
2105 else:
2106 # Grab all the available base tracks based on the selected source items.
2107 # shuffle the source items, just in case
2108 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2109 ctrl = self.mass.music.get_controller(radio_item.media_type)
2110 try:
2111 available_base_tracks += [
2112 track
2113 for track in await ctrl.radio_mode_base_tracks(
2114 radio_item, # type: ignore[arg-type]
2115 preferred_provider_instances,
2116 )
2117 # Avoid duplicate base tracks
2118 if track not in available_base_tracks
2119 ]
2120 except UnsupportedFeaturedException as err:
2121 self.logger.debug(
2122 "Skip loading radio items for %s: %s ",
2123 radio_item.uri,
2124 str(err),
2125 )
2126 if not available_base_tracks:
2127 raise UnsupportedFeaturedException("Radio mode not available for source items")
2128
2129 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2130 base_tracks = random.sample(
2131 available_base_tracks,
2132 min(base_track_sample_size, len(available_base_tracks)),
2133 )
2134 # Use a set to avoid duplicate dynamic tracks
2135 dynamic_tracks: set[Track] = set()
2136 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2137 for allow_lookup in (False, True):
2138 if dynamic_tracks:
2139 break
2140 for base_track in base_tracks:
2141 try:
2142 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2143 base_track.item_id,
2144 base_track.provider,
2145 allow_lookup=allow_lookup,
2146 preferred_provider_instances=preferred_provider_instances,
2147 )
2148 except MediaNotFoundError:
2149 # Some providers don't have similar tracks for all items. For example,
2150 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2151 # in that case, just skip the track.
2152 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2153 continue
2154 for track in _similar_tracks:
2155 if (
2156 track not in base_tracks
2157 # Exclude tracks we have already played / queued
2158 and track not in queue_track_items
2159 # Ignore tracks that are too long for radio mode, e.g. mixes
2160 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2161 ):
2162 dynamic_tracks.add(track)
2163 if len(dynamic_tracks) >= 50:
2164 break
2165 queue_tracks: list[Track] = []
2166 dynamic_tracks_list = list(dynamic_tracks)
2167 # Only include the sampled base tracks when the radio mode is first initialized
2168 if is_initial_radio_mode:
2169 queue_tracks += [base_tracks[0]]
2170 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2171 if len(base_tracks) > 1:
2172 for base_track in base_tracks[1:]:
2173 queue_tracks += [base_track]
2174 if len(dynamic_tracks_list) > 2:
2175 queue_tracks += random.sample(dynamic_tracks_list, 2)
2176 else:
2177 queue_tracks += dynamic_tracks_list
2178 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2179 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2180 if remaining_dynamic_tracks:
2181 queue_tracks += random.sample(
2182 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2183 )
2184 return queue_tracks
2185
2186 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2187 """Fetch (playable) tracks for given browse folder."""
2188 self.logger.info(
2189 "Fetching tracks to play for folder %s",
2190 folder.name,
2191 )
2192 tracks: list[Track] = []
2193 for item in await self.mass.music.browse(folder.path):
2194 if not item.is_playable:
2195 continue
2196 # recursively fetch tracks from all media types
2197 resolved = await self._resolve_media_items(item)
2198 tracks += [x for x in resolved if isinstance(x, Track)]
2199
2200 return tracks
2201
2202 def _update_queue_from_player(
2203 self,
2204 player: Player,
2205 ) -> None:
2206 """Update the Queue when the player state changed."""
2207 queue_id = player.player_id
2208 queue = self._queues[queue_id]
2209
2210 # basic properties
2211 queue.display_name = player.state.name
2212 queue.available = player.state.available
2213 queue.items = len(self._queue_items[queue_id])
2214
2215 queue.state = (
2216 player.state.playback_state or PlaybackState.IDLE
2217 if queue.active
2218 else PlaybackState.IDLE
2219 )
2220 # update current item/index from player report
2221 if queue.active and queue.state in (
2222 PlaybackState.PLAYING,
2223 PlaybackState.PAUSED,
2224 ):
2225 # NOTE: If the queue is not playing (yet) we will not update the current index
2226 # to ensure we keep the previously known current index
2227 if queue.flow_mode:
2228 # flow mode active, the player is playing one long stream
2229 # so we need to calculate the current index and elapsed time
2230 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2231 elif item_id := self._parse_player_current_item_id(queue_id, player):
2232 # normal mode, the player itself will report the current item
2233 elapsed_time = int(player.state.corrected_elapsed_time or 0)
2234 current_index = self.index_by_id(queue_id, item_id)
2235 else:
2236 # this may happen if the player is still transitioning between tracks
2237 # we ignore this for now and keep the current index as is
2238 return
2239
2240 # get current/next item based on current index
2241 queue.current_index = current_index
2242 queue.current_item = current_item = self.get_item(queue_id, current_index)
2243 queue.next_item = (
2244 self.get_next_item(queue_id, current_index)
2245 if current_item and current_index is not None
2246 else None
2247 )
2248
2249 # correct elapsed time when seeking
2250 if (
2251 not queue.flow_mode
2252 and current_item
2253 and current_item.streamdetails
2254 and current_item.streamdetails.seek_position
2255 ):
2256 elapsed_time += current_item.streamdetails.seek_position
2257 queue.elapsed_time = elapsed_time
2258 queue.elapsed_time_last_updated = time.time()
2259
2260 elif not queue.current_item and queue.current_index is not None:
2261 current_index = queue.current_index
2262 queue.current_item = current_item = self.get_item(queue_id, current_index)
2263 queue.next_item = (
2264 self.get_next_item(queue_id, current_index)
2265 if current_item and current_index is not None
2266 else None
2267 )
2268
2269 # This is enough to detect any changes in the DSPDetails
2270 # (so child count changed, or any output format changed)
2271 output_formats = []
2272 if output_format := player.extra_data.get("output_format"):
2273 output_formats.append(str(output_format))
2274 for child_id in player.state.group_members:
2275 if (child := self.mass.players.get_player(child_id)) and (
2276 output_format := child.extra_data.get("output_format")
2277 ):
2278 output_formats.append(str(output_format))
2279 else:
2280 output_formats.append("unknown")
2281
2282 # basic throttle: do not send state changed events if queue did not actually change
2283 prev_state: CompareState = self._prev_states.get(
2284 queue_id,
2285 CompareState(
2286 queue_id=queue_id,
2287 state=PlaybackState.IDLE,
2288 current_item_id=None,
2289 next_item_id=None,
2290 current_item=None,
2291 elapsed_time=0,
2292 last_playing_elapsed_time=0,
2293 stream_title=None,
2294 codec_type=None,
2295 output_formats=None,
2296 ),
2297 )
2298 # update last_playing_elapsed_time only when the player is actively playing
2299 # use corrected_elapsed_time which accounts for time since last update
2300 # this preserves the last known elapsed time when transitioning to idle/paused
2301 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2302 prev_item_id = prev_state["current_item_id"]
2303 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2304 if queue.state == PlaybackState.PLAYING:
2305 current_elapsed = int(queue.corrected_elapsed_time)
2306 if current_item_id != prev_item_id:
2307 # new track started, reset the elapsed time tracker
2308 last_playing_elapsed_time = current_elapsed
2309 else:
2310 # same track, use the max of current and previous to handle timing issues
2311 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2312 else:
2313 last_playing_elapsed_time = prev_playing_elapsed
2314 new_state = CompareState(
2315 queue_id=queue_id,
2316 state=queue.state,
2317 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2318 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2319 current_item=queue.current_item,
2320 elapsed_time=int(queue.elapsed_time),
2321 last_playing_elapsed_time=last_playing_elapsed_time,
2322 stream_title=(
2323 queue.current_item.streamdetails.stream_title
2324 if queue.current_item and queue.current_item.streamdetails
2325 else None
2326 ),
2327 codec_type=(
2328 queue.current_item.streamdetails.audio_format.codec_type
2329 if queue.current_item and queue.current_item.streamdetails
2330 else None
2331 ),
2332 output_formats=output_formats,
2333 )
2334 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2335 with suppress(KeyError):
2336 changed_keys.remove("next_item_id")
2337 with suppress(KeyError):
2338 changed_keys.remove("last_playing_elapsed_time")
2339
2340 # store the new state
2341 if queue.active:
2342 self._prev_states[queue_id] = new_state
2343 else:
2344 self._prev_states.pop(queue_id, None)
2345
2346 # return early if nothing changed
2347 if len(changed_keys) == 0:
2348 return
2349
2350 # signal update and store state
2351 send_update = True
2352 if changed_keys == {"elapsed_time"}:
2353 # only elapsed time changed, do not send full queue update
2354 send_update = False
2355 prev_time = prev_state.get("elapsed_time") or 0
2356 cur_time = new_state.get("elapsed_time") or 0
2357 if abs(cur_time - prev_time) > 2:
2358 # send dedicated event for time updates when seeking
2359 self.mass.signal_event(
2360 EventType.QUEUE_TIME_UPDATED,
2361 object_id=queue_id,
2362 data=queue.elapsed_time,
2363 )
2364 # also signal update to the player itself so it can update its current_media
2365 self.mass.players.trigger_player_update(queue_id)
2366
2367 if send_update:
2368 self.signal_update(queue_id)
2369
2370 if "output_formats" in changed_keys:
2371 # refresh DSP details since they may have changed
2372 dsp = get_stream_dsp_details(self.mass, queue_id)
2373 if queue.current_item and queue.current_item.streamdetails:
2374 queue.current_item.streamdetails.dsp = dsp
2375 if queue.next_item and queue.next_item.streamdetails:
2376 queue.next_item.streamdetails.dsp = dsp
2377
2378 # handle updating stream_metadata if needed
2379 if (
2380 queue.current_item
2381 and (streamdetails := queue.current_item.streamdetails)
2382 and streamdetails.stream_metadata_update_callback
2383 and (
2384 streamdetails.stream_metadata_last_updated is None
2385 or (
2386 time.time() - streamdetails.stream_metadata_last_updated
2387 >= streamdetails.stream_metadata_update_interval
2388 )
2389 )
2390 ):
2391 streamdetails.stream_metadata_last_updated = time.time()
2392 self.mass.create_task(
2393 streamdetails.stream_metadata_update_callback(
2394 streamdetails, int(queue.corrected_elapsed_time)
2395 )
2396 )
2397
2398 # handle sending a playback progress report
2399 # we do this every 30 seconds or when the state changes
2400 if (
2401 changed_keys.intersection({"state", "current_item_id"})
2402 or int(queue.elapsed_time) % PLAYBACK_REPORT_INTERVAL_SECONDS == 0
2403 ):
2404 self._handle_playback_progress_report(queue, prev_state, new_state)
2405
2406 # check if we need to clear the queue if we reached the end
2407 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2408 self._handle_end_of_queue(queue, prev_state, new_state)
2409
2410 # watch dynamic radio items refill if needed
2411 if "current_item_id" in changed_keys:
2412 # auto enable radio mode if dont stop the music is enabled
2413 if (
2414 queue.dont_stop_the_music_enabled
2415 and queue.enqueued_media_items
2416 and queue.current_index is not None
2417 and (queue.items - queue.current_index) <= 1
2418 ):
2419 # We have received the last item in the queue and Don't stop the music is enabled
2420 # set the played media item(s) as radio items (which will refill the queue)
2421 # note that this will fail if there are no media items for which we have
2422 # a dynamic radio source.
2423 self.logger.debug(
2424 "End of queue detected and Don't stop the music is enabled for %s"
2425 " - setting enqueued media items as radio source: %s",
2426 queue.display_name,
2427 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2428 )
2429 queue.radio_source = queue.enqueued_media_items
2430 # auto fill radio tracks if less than 5 tracks left in the queue
2431 if (
2432 queue.radio_source
2433 and queue.current_index is not None
2434 and (queue.items - queue.current_index) < 5
2435 ):
2436 task_id = f"fill_radio_tracks_{queue_id}"
2437 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2438
2439 def _get_flow_queue_stream_index(
2440 self, queue: PlayerQueue, player: Player
2441 ) -> tuple[int | None, int]:
2442 """Calculate current queue index and current track elapsed time when flow mode is active."""
2443 elapsed_time_queue_total = player.state.corrected_elapsed_time or 0
2444 if queue.current_index is None and not queue.flow_mode_stream_log:
2445 return queue.current_index, int(queue.elapsed_time)
2446
2447 # For each track that has been streamed/buffered to the player,
2448 # a playlog entry will be created with the queue item id
2449 # and the amount of seconds streamed. We traverse the playlog to figure
2450 # out where we are in the queue, accounting for actual streamed
2451 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2452 # it will simply be in the playlog multiple times.
2453 played_time = 0.0
2454 queue_index: int | None = queue.current_index or 0
2455 track_time = 0.0
2456 for play_log_entry in queue.flow_mode_stream_log:
2457 queue_item_duration = (
2458 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2459 play_log_entry.seconds_streamed
2460 if play_log_entry.seconds_streamed is not None
2461 else play_log_entry.duration or 3600 * 24 * 7
2462 )
2463 if elapsed_time_queue_total > (queue_item_duration + played_time):
2464 # total elapsed time is more than (streamed) track duration
2465 # this track has been fully played, move on.
2466 played_time += queue_item_duration
2467 else:
2468 # no more seconds left to divide, this is our track
2469 # account for any seeking by adding the skipped/seeked seconds
2470 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2471 queue_item = self.get_item(queue.queue_id, queue_index)
2472 if queue_item and queue_item.streamdetails:
2473 track_sec_skipped = queue_item.streamdetails.seek_position
2474 else:
2475 track_sec_skipped = 0
2476 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2477 break
2478 if player.state.playback_state != PlaybackState.PLAYING:
2479 # if the player is not playing, we can't be sure that the elapsed time is correct
2480 # so we just return the queue index and the elapsed time
2481 return queue.current_index, int(queue.elapsed_time)
2482 return queue_index, int(track_time)
2483
2484 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2485 """Parse QueueItem ID from Player's current url."""
2486 protocol_player = player
2487 if player.active_output_protocol and player.active_output_protocol != "native":
2488 protocol_player = self.mass.players.get_player(player.active_output_protocol) or player
2489 if not protocol_player.current_media:
2490 # YES, we use player.current_media on purpose here because we need the raw metadata
2491 return None
2492 # prefer queue_id and queue_item_id within the current media
2493 if (
2494 protocol_player.current_media.source_id == queue_id
2495 and protocol_player.current_media.queue_item_id
2496 ):
2497 return protocol_player.current_media.queue_item_id
2498 # special case for sonos players
2499 if protocol_player.current_media.uri and protocol_player.current_media.uri.startswith(
2500 f"mass:{queue_id}"
2501 ):
2502 if protocol_player.current_media.queue_item_id:
2503 return protocol_player.current_media.queue_item_id
2504 current_item_id = protocol_player.current_media.uri.split(":")[-1]
2505 if self.get_item(queue_id, current_item_id):
2506 return current_item_id
2507 return None
2508 # try to extract the item id from a mass stream url
2509 if (
2510 protocol_player.current_media.uri
2511 and queue_id in protocol_player.current_media.uri
2512 and self.mass.streams.base_url in protocol_player.current_media.uri
2513 ):
2514 current_item_id = protocol_player.current_media.uri.rsplit("/")[-1].split(".")[0]
2515 if self.get_item(queue_id, current_item_id):
2516 return current_item_id
2517 # try to extract the item id from a queue_id/item_id combi
2518 if (
2519 protocol_player.current_media.uri
2520 and queue_id in protocol_player.current_media.uri
2521 and "/" in protocol_player.current_media.uri
2522 ):
2523 current_item_id = protocol_player.current_media.uri.split("/")[1]
2524 if self.get_item(queue_id, current_item_id):
2525 return current_item_id
2526
2527 return None
2528
2529 def _handle_end_of_queue(
2530 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2531 ) -> None:
2532 """Check if the queue should be cleared after the current item."""
2533 # check if queue state changed to stopped (from playing/paused to idle)
2534 if not (
2535 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2536 and new_state["state"] == PlaybackState.IDLE
2537 ):
2538 return
2539 # check if no more items in the queue (next_item should be None at end of queue)
2540 if queue.next_item is not None:
2541 return
2542 # check if we had a previous item playing
2543 if prev_state["current_item_id"] is None:
2544 return
2545
2546 async def _clear_queue_delayed() -> None:
2547 for _ in range(5):
2548 await asyncio.sleep(1)
2549 if queue.state != PlaybackState.IDLE:
2550 return
2551 if queue.next_item is not None:
2552 return
2553 self.logger.info("End of queue reached, clearing items")
2554 self.clear(queue.queue_id)
2555
2556 # all checks passed, we stopped playback at the last (or single) track of the queue
2557 # now determine if the item was fully played before clearing
2558
2559 # For flow mode, check if the last track was fully streamed using the stream log
2560 # This is more reliable than elapsed_time which can be reset/incorrect
2561 if queue.flow_mode and queue.flow_mode_stream_log:
2562 last_log_entry = queue.flow_mode_stream_log[-1]
2563 if last_log_entry.seconds_streamed is not None:
2564 # The last track finished streaming, safe to clear queue
2565 self.mass.create_task(_clear_queue_delayed())
2566 return
2567
2568 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2569 prev_item = prev_state["current_item"]
2570 if prev_item and (streamdetails := prev_item.streamdetails):
2571 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2572 elif prev_item:
2573 duration = prev_item.duration or 24 * 3600
2574 else:
2575 # No current item means player has already cleared it, safe to clear queue
2576 self.mass.create_task(_clear_queue_delayed())
2577 return
2578
2579 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2580 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2581 seconds_played = int(prev_state["last_playing_elapsed_time"])
2582 # debounce this a bit to make sure we're not clearing the queue by accident
2583 # only clear if the last track was played to near completion (within 5 seconds of end)
2584 if seconds_played >= (duration or 3600) - 5:
2585 self.mass.create_task(_clear_queue_delayed())
2586
2587 def _handle_playback_progress_report(
2588 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2589 ) -> None:
2590 """Handle playback progress report."""
2591 # detect change in current index to report that a item has been played
2592 prev_item_id = prev_state["current_item_id"]
2593 cur_item_id = new_state["current_item_id"]
2594 if prev_item_id is None and cur_item_id is None:
2595 return
2596
2597 if prev_item_id is not None and prev_item_id != cur_item_id:
2598 # we have a new item, so we need report the previous one
2599 is_current_item = False
2600 item_to_report = prev_state["current_item"]
2601 seconds_played = int(prev_state["elapsed_time"])
2602 else:
2603 # report on current item
2604 is_current_item = True
2605 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2606 seconds_played = int(new_state["elapsed_time"])
2607
2608 if not item_to_report:
2609 return # guard against invalid items
2610
2611 if not (media_item := item_to_report.media_item):
2612 # only report on media items
2613 return
2614 assert media_item.uri is not None # uri is set in __post_init__
2615
2616 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2617 # Ignore items that had a stream error
2618 return
2619
2620 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2621 duration = int(item_to_report.streamdetails.duration)
2622 else:
2623 duration = int(item_to_report.duration or 3 * 3600)
2624
2625 if seconds_played < 5:
2626 # ignore items that have been played less than 5 seconds
2627 # this also filters out a bounce effect where the previous item
2628 # gets reported with 0 elapsed seconds after a new item starts playing
2629 return
2630
2631 # determine if item is fully played
2632 # for podcasts and audiobooks we account for the last 60 seconds
2633 percentage_played = percentage(seconds_played, duration)
2634 if not is_current_item and item_to_report.media_type in (
2635 MediaType.AUDIOBOOK,
2636 MediaType.PODCAST_EPISODE,
2637 ):
2638 fully_played = seconds_played >= duration - 60
2639 elif not is_current_item:
2640 # 90% of the track must be played to be considered fully played
2641 fully_played = percentage_played >= 90
2642 else:
2643 fully_played = seconds_played >= duration - 10
2644
2645 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2646 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2647 self.logger.debug(
2648 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2649 queue.display_name,
2650 "is playing" if is_playing else "played",
2651 item_to_report.name,
2652 item_to_report.uri,
2653 fully_played,
2654 f"{percentage_played}%",
2655 seconds_played,
2656 duration,
2657 )
2658 # add entry to playlog - this also handles resume of podcasts/audiobooks
2659 self.mass.create_task(
2660 self.mass.music.mark_item_played(
2661 media_item,
2662 fully_played=fully_played,
2663 seconds_played=seconds_played,
2664 is_playing=is_playing,
2665 userid=queue.userid,
2666 queue_id=queue.queue_id,
2667 user_initiated=False,
2668 )
2669 )
2670
2671 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2672 # signal 'media item played' event,
2673 # which is useful for plugins that want to do scrobbling
2674 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2675 artists_names = [a.name for a in artists]
2676 self.mass.signal_event(
2677 EventType.MEDIA_ITEM_PLAYED,
2678 object_id=media_item.uri,
2679 data=MediaItemPlaybackProgressReport(
2680 uri=media_item.uri,
2681 media_type=media_item.media_type,
2682 name=media_item.name,
2683 version=getattr(media_item, "version", None),
2684 artist=(
2685 getattr(media_item, "artist_str", None) or artists_names[0]
2686 if artists_names
2687 else None
2688 ),
2689 artists=artists_names,
2690 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2691 album=album.name if album else None,
2692 album_mbid=album.mbid if album else None,
2693 album_artist=(album.artist_str if isinstance(album, Album) else None),
2694 album_artist_mbids=(
2695 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2696 ),
2697 image_url=(
2698 self.mass.metadata.get_image_url(
2699 item_to_report.media_item.image, prefer_proxy=False
2700 )
2701 if item_to_report.media_item.image
2702 else None
2703 ),
2704 duration=duration,
2705 mbid=(getattr(media_item, "mbid", None)),
2706 seconds_played=seconds_played,
2707 fully_played=fully_played,
2708 is_playing=is_playing,
2709 userid=queue.userid,
2710 ),
2711 )
2712
2713
2714async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2715 """Shuffle queue items, avoiding identical tracks next to each other.
2716
2717 Best-effort approach to prevent the same track from appearing adjacent.
2718 Does a random shuffle first, then makes a limited number of passes to
2719 swap adjacent duplicates with a random item further in the list.
2720
2721 :param items: List of queue items to shuffle.
2722 """
2723 if len(items) <= 2:
2724 return random.sample(items, len(items)) if len(items) == 2 else items
2725
2726 # Start with a random shuffle
2727 shuffled = random.sample(items, len(items))
2728
2729 # Make a few passes to fix adjacent duplicates
2730 max_passes = 3
2731 for _ in range(max_passes):
2732 swapped = False
2733 for i in range(len(shuffled) - 1):
2734 if shuffled[i].name == shuffled[i + 1].name:
2735 # Found adjacent duplicate - swap with random position at least 2 away
2736 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2737 if swap_candidates:
2738 swap_pos = random.choice(swap_candidates)
2739 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2740 swapped = True
2741 if not swapped:
2742 break
2743 # Yield to event loop between passes
2744 await asyncio.sleep(0)
2745
2746 return shuffled
2747