/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 ProviderFeature,
32 QueueOption,
33 RepeatMode,
34)
35from music_assistant_models.errors import (
36 AudioError,
37 InvalidCommand,
38 InvalidDataError,
39 MediaNotFoundError,
40 MusicAssistantError,
41 PlayerUnavailableError,
42 QueueEmpty,
43 UnsupportedFeaturedException,
44)
45from music_assistant_models.media_items import (
46 Album,
47 Artist,
48 Audiobook,
49 BrowseFolder,
50 Genre,
51 ItemMapping,
52 MediaItemType,
53 PlayableMediaItemType,
54 Playlist,
55 Podcast,
56 PodcastEpisode,
57 Track,
58 UniqueList,
59 media_from_dict,
60)
61from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
62from music_assistant_models.player_queue import PlayerQueue
63from music_assistant_models.queue_item import QueueItem
64
65from music_assistant.constants import (
66 ATTR_ANNOUNCEMENT_IN_PROGRESS,
67 MASS_LOGO_ONLINE,
68 PLAYBACK_REPORT_INTERVAL_SECONDS,
69 PLAYLIST_MEDIA_TYPES,
70 VERBOSE_LOG_LEVEL,
71 PlaylistPlayableItem,
72)
73from music_assistant.controllers.players.controller import IN_QUEUE_COMMAND
74from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
75from music_assistant.helpers.api import api_command
76from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
77from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
78from music_assistant.helpers.util import get_changed_keys, percentage
79from music_assistant.models.core_controller import CoreController
80from music_assistant.models.player import Player, PlayerMedia
81
82if TYPE_CHECKING:
83 from collections.abc import Iterator
84
85 from music_assistant_models.auth import User
86 from music_assistant_models.media_items.metadata import MediaItemImage
87
88 from music_assistant import MusicAssistant
89 from music_assistant.models.player import Player
90
91CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
92CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
93
94ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
95ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
96
97CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
98CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
99CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
100CONF_DEFAULT_ENQUEUE_OPTION_GENRE = "default_enqueue_option_genre"
101CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
102CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
103CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
104CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
105CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
106CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
107CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
108RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
109CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
110CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
111
112
113class CompareState(TypedDict):
114 """Simple object where we store the (previous) state of a queue.
115
116 Used for compare actions.
117 """
118
119 queue_id: str
120 state: PlaybackState
121 current_item_id: str | None
122 next_item_id: str | None
123 current_item: QueueItem | None
124 elapsed_time: int
125 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
126 # used to determine if a track was fully played when transitioning to idle
127 last_playing_elapsed_time: int
128 stream_title: str | None
129 codec_type: ContentType | None
130 output_formats: list[str] | None
131
132
133class PlayerQueuesController(CoreController):
134 """Controller holding all logic to enqueue music for players."""
135
136 domain: str = "player_queues"
137
138 def __init__(self, mass: MusicAssistant) -> None:
139 """Initialize core controller."""
140 super().__init__(mass)
141 self._queues: dict[str, PlayerQueue] = {}
142 self._queue_items: dict[str, list[QueueItem]] = {}
143 self._prev_states: dict[str, CompareState] = {}
144 self._transitioning_players: set[str] = set()
145 self.manifest.name = "Player Queues controller"
146 self.manifest.description = (
147 "Music Assistant's core controller which manages the queues for all players."
148 )
149 self.manifest.icon = "playlist-music"
150
151 async def close(self) -> None:
152 """Cleanup on exit."""
153 # stop all playback
154 for queue in self.all():
155 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
156 await self.stop(queue.queue_id)
157
158 async def get_config_entries(
159 self,
160 action: str | None = None,
161 values: dict[str, ConfigValueType] | None = None,
162 ) -> tuple[ConfigEntry, ...]:
163 """Return all Config Entries for this core module (if any)."""
164 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
165 return (
166 ConfigEntry(
167 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
168 type=ConfigEntryType.STRING,
169 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
170 label="Items to select when you play a (in-library) artist.",
171 options=[
172 ConfigValueOption(
173 title="Only in-library tracks",
174 value="library_tracks",
175 ),
176 ConfigValueOption(
177 title="All tracks from all albums in the library",
178 value="library_album_tracks",
179 ),
180 ConfigValueOption(
181 title="All (top) tracks from (all) streaming provider(s)",
182 value="all_tracks",
183 ),
184 ConfigValueOption(
185 title="All tracks from all albums from (all) streaming provider(s)",
186 value="all_album_tracks",
187 ),
188 ],
189 ),
190 ConfigEntry(
191 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
192 type=ConfigEntryType.STRING,
193 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
194 label="Items to select when you play a (in-library) album.",
195 options=[
196 ConfigValueOption(
197 title="Only in-library tracks",
198 value="library_tracks",
199 ),
200 ConfigValueOption(
201 title="All tracks for album on (streaming) provider",
202 value="all_tracks",
203 ),
204 ],
205 ),
206 ConfigEntry(
207 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
208 type=ConfigEntryType.STRING,
209 default_value=QueueOption.REPLACE.value,
210 label="Default enqueue option for Artist item(s).",
211 options=enqueue_options,
212 description="Define the default enqueue action for this mediatype.",
213 ),
214 ConfigEntry(
215 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
216 type=ConfigEntryType.STRING,
217 default_value=QueueOption.REPLACE.value,
218 label="Default enqueue option for Album item(s).",
219 options=enqueue_options,
220 description="Define the default enqueue action for this mediatype.",
221 ),
222 ConfigEntry(
223 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
224 type=ConfigEntryType.STRING,
225 default_value=QueueOption.PLAY.value,
226 label="Default enqueue option for Track item(s).",
227 options=enqueue_options,
228 description="Define the default enqueue action for this mediatype.",
229 ),
230 ConfigEntry(
231 key=CONF_DEFAULT_ENQUEUE_OPTION_GENRE,
232 type=ConfigEntryType.STRING,
233 default_value=QueueOption.REPLACE.value,
234 label="Default enqueue option for Genre item(s).",
235 options=enqueue_options,
236 description="Define the default enqueue action for this mediatype.",
237 ),
238 ConfigEntry(
239 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
240 type=ConfigEntryType.STRING,
241 default_value=QueueOption.REPLACE.value,
242 label="Default enqueue option for Radio item(s).",
243 options=enqueue_options,
244 description="Define the default enqueue action for this mediatype.",
245 ),
246 ConfigEntry(
247 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
248 type=ConfigEntryType.STRING,
249 default_value=QueueOption.REPLACE.value,
250 label="Default enqueue option for Playlist item(s).",
251 options=enqueue_options,
252 description="Define the default enqueue action for this mediatype.",
253 ),
254 ConfigEntry(
255 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
256 type=ConfigEntryType.STRING,
257 default_value=QueueOption.REPLACE.value,
258 label="Default enqueue option for Audiobook item(s).",
259 options=enqueue_options,
260 hidden=True,
261 ),
262 ConfigEntry(
263 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
264 type=ConfigEntryType.STRING,
265 default_value=QueueOption.REPLACE.value,
266 label="Default enqueue option for Podcast item(s).",
267 options=enqueue_options,
268 hidden=True,
269 ),
270 ConfigEntry(
271 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
272 type=ConfigEntryType.STRING,
273 default_value=QueueOption.REPLACE.value,
274 label="Default enqueue option for Podcast-episode item(s).",
275 options=enqueue_options,
276 hidden=True,
277 ),
278 ConfigEntry(
279 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
280 type=ConfigEntryType.STRING,
281 default_value=QueueOption.REPLACE.value,
282 label="Default enqueue option for Folder item(s).",
283 options=enqueue_options,
284 hidden=True,
285 ),
286 )
287
288 def __iter__(self) -> Iterator[PlayerQueue]:
289 """Iterate over (available) players."""
290 return iter(self._queues.values())
291
292 @api_command("player_queues/all")
293 def all(self) -> tuple[PlayerQueue, ...]:
294 """Return all registered PlayerQueues."""
295 return tuple(self._queues.values())
296
297 @api_command("player_queues/get")
298 def get(self, queue_id: str) -> PlayerQueue | None:
299 """Return PlayerQueue by queue_id or None if not found."""
300 return self._queues.get(queue_id)
301
302 @api_command("player_queues/items")
303 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
304 """Return all QueueItems for given PlayerQueue."""
305 if queue_id not in self._queue_items:
306 return []
307
308 return self._queue_items[queue_id][offset : offset + limit]
309
310 @api_command("player_queues/get_active_queue")
311 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
312 """Return the current active/synced queue for a player."""
313 if player := self.mass.players.get_player(player_id):
314 return self.mass.players.get_active_queue(player)
315 return None
316
317 # Queue commands
318
319 @api_command("player_queues/shuffle")
320 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
321 """Configure shuffle setting on the the queue."""
322 queue = self._queues[queue_id]
323 if queue.shuffle_enabled == shuffle_enabled:
324 return # no change
325 queue.shuffle_enabled = shuffle_enabled
326 queue_items = self._queue_items[queue_id]
327 cur_index = (
328 queue.index_in_buffer if queue.index_in_buffer is not None else queue.current_index
329 )
330 if cur_index is not None:
331 next_index = cur_index + 1
332 next_items = queue_items[next_index:]
333 else:
334 next_items = []
335 next_index = 0
336 if not shuffle_enabled:
337 # shuffle disabled, try to restore original sort order of the remaining items
338 next_items.sort(key=lambda x: x.sort_index, reverse=False)
339 await self.load(
340 queue_id=queue_id,
341 queue_items=next_items,
342 insert_at_index=next_index,
343 keep_remaining=False,
344 shuffle=shuffle_enabled,
345 )
346
347 @api_command("player_queues/dont_stop_the_music")
348 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
349 """Configure Don't stop the music setting on the queue."""
350 providers_available_with_similar_tracks = any(
351 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
352 for provider in self.mass.music.providers
353 )
354 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
355 raise UnsupportedFeaturedException(
356 "Don't stop the music is not supported by any of the available music providers"
357 )
358 queue = self._queues[queue_id]
359 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
360 self.signal_update(queue_id=queue_id)
361 # if this happens to be the last track in the queue, fill the radio source
362 if (
363 queue.dont_stop_the_music_enabled
364 and queue.enqueued_media_items
365 and queue.current_index is not None
366 and (queue.items - queue.current_index) <= 1
367 ):
368 queue.radio_source = queue.enqueued_media_items
369 task_id = f"fill_radio_tracks_{queue_id}"
370 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
371
372 @api_command("player_queues/repeat")
373 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
374 """Configure repeat setting on the the queue."""
375 queue = self._queues[queue_id]
376 if queue.repeat_mode == repeat_mode:
377 return # no change
378 queue.repeat_mode = repeat_mode
379 self.signal_update(queue_id)
380 if (
381 queue.state == PlaybackState.PLAYING
382 and queue.index_in_buffer is not None
383 and queue.index_in_buffer == queue.current_index
384 ):
385 # if the queue is playing,
386 # ensure to (re)queue the next track because it might have changed
387 # note that we only do this if the player has loaded the current track
388 # if not, we wait until it has loaded to prevent conflicts
389 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
390 self._enqueue_next_item(queue_id, next_item)
391
392 @api_command("player_queues/set_playback_speed")
393 async def set_playback_speed(
394 self, queue_id: str, speed: float, queue_item_id: str | None = None
395 ) -> None:
396 """
397 Set the playback speed for the given queue item.
398
399 If queue_item_id is not provided,
400 the speed will be set for the current item in the queue.
401
402 :param queue_id: queue_id of the queue to configure.
403 :param speed: playback speed multiplier (0.5 to 2.0). 1.0 = normal speed.
404 """
405 if not (0.5 <= speed <= 2.0):
406 raise InvalidDataError(f"Playback speed must be between 0.5 and 2.0, got {speed}")
407 queue = self._queues[queue_id]
408 if not queue.current_item:
409 raise QueueEmpty("Cannot set playback speed: queue is empty")
410 queue_item_id = queue_item_id or queue.current_item.queue_item_id
411 queue_item = self.get_item(queue_id, queue_item_id)
412 if not queue_item:
413 raise InvalidDataError(f"Queue item {queue_item_id} not found in queue")
414 if not queue_item.duration or queue_item.media_type == MediaType.RADIO:
415 raise InvalidCommand("Cannot set playback speed for items with unknown duration")
416 current_speed = float(queue_item.extra_attributes.get("playback_speed") or 1.0)
417 if abs(current_speed - speed) < 0.001:
418 return # no change
419 # use extra_attributes of the queue item to store the playback speed
420 queue_item.extra_attributes["playback_speed"] = speed
421 self.signal_update(queue_id)
422 if queue.state == PlaybackState.PLAYING:
423 await self.resume(queue_id)
424
425 @api_command("player_queues/play_media")
426 async def play_media(
427 self,
428 queue_id: str,
429 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
430 option: QueueOption | None = None,
431 radio_mode: bool = False,
432 start_item: PlayableMediaItemType | str | None = None,
433 username: str | None = None,
434 ) -> None:
435 """Play media item(s) on the given queue.
436
437 :param queue_id: The queue_id of the queue to play media on.
438 :param media: Media that should be played (MediaItem(s) and/or uri's).
439 :param option: Which enqueue mode to use.
440 :param radio_mode: Enable radio mode for the given item(s).
441 :param start_item: Optional item to start the playlist or album from.
442 :param username: The username of the user requesting the playback.
443 Setting the username allows for overriding the logged-in user
444 to account for playback history per user when the play_media is
445 called from a shared context (like a web hook or automation).
446 """
447 # ruff: noqa: PLR0915
448 # we use a contextvar to bypass the throttler for this asyncio task/context
449 # this makes sure that playback has priority over other requests that may be
450 # happening in the background
451 BYPASS_THROTTLER.set(True)
452 if not (queue := self.get(queue_id)):
453 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
454 # always fetch the underlying player so we can raise early if its not available
455 queue_player = self.mass.players.get_player(queue_id, True)
456 assert queue_player is not None # for type checking
457 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
458 self.logger.warning("Ignore queue command: An announcement is in progress")
459 return
460
461 # save the user requesting the playback
462 playback_user: User | None
463 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
464 playback_user = user
465 else:
466 playback_user = get_current_user()
467 queue.userid = playback_user.user_id if playback_user else None
468
469 # a single item or list of items may be provided
470 media_list = media if isinstance(media, list) else [media]
471
472 # clear queue if needed
473 if option == QueueOption.REPLACE:
474 self.clear(queue_id)
475 # Clear the 'enqueued media item' list when a new queue is requested
476 if option not in (QueueOption.ADD, QueueOption.NEXT):
477 queue.enqueued_media_items.clear()
478
479 media_items: list[MediaItemType] = []
480 radio_source: list[MediaItemType] = []
481 # resolve all media items
482 for item in media_list:
483 try:
484 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
485 media_item: MediaItemType | ItemMapping | BrowseFolder
486 if isinstance(item, str):
487 media_item = await self.mass.music.get_item_by_uri(item)
488 elif isinstance(item, dict): # type: ignore[unreachable]
489 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
490 # converting them to MediaItem objects. The parse_value function in api.py
491 # should handle dict-to-object conversion, but dicts are slipping through
492 # in some cases. This is defensive handling for that parser bug.
493 media_item = media_from_dict(item) # type: ignore[unreachable]
494 self.logger.debug("Converted to: %s", type(media_item))
495 else:
496 # item is MediaItemType | ItemMapping at this point
497 media_item = item
498
499 # Save requested media item to play on the queue so we can use it as a source
500 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
501 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
502 if not isinstance(
503 media_item, (ItemMapping, BrowseFolder)
504 ) and media_item.media_type in (
505 MediaType.TRACK,
506 MediaType.ALBUM,
507 MediaType.PLAYLIST,
508 MediaType.ARTIST,
509 ):
510 queue.enqueued_media_items.append(media_item)
511 if len(queue.enqueued_media_items) > 10:
512 queue.enqueued_media_items.pop(0)
513
514 # handle default enqueue option if needed
515 if option is None:
516 config_value = await self.mass.config.get_core_config_value(
517 self.domain,
518 f"default_enqueue_option_{media_item.media_type.value}",
519 return_type=str,
520 )
521 option = QueueOption(config_value)
522 if option == QueueOption.REPLACE:
523 self.clear(queue_id, skip_stop=True)
524
525 # collect media_items to play
526 if radio_mode:
527 # Type guard for mypy - only add full MediaItemType to radio_source
528 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
529 radio_source.append(media_item)
530 else:
531 # Convert start_item to string URI if needed
532 start_item_uri: str | None = None
533 if isinstance(start_item, str):
534 start_item_uri = start_item
535 elif start_item is not None:
536 start_item_uri = start_item.uri
537 media_items += await self._resolve_media_items(
538 media_item, start_item_uri, queue_id=queue_id
539 )
540
541 except MusicAssistantError as err:
542 # invalid MA uri or item not found error
543 self.logger.warning("Skipping %s: %s", item, str(err))
544
545 # overwrite or append radio source items
546 if option not in (QueueOption.ADD, QueueOption.NEXT):
547 queue.radio_source = radio_source
548 else:
549 queue.radio_source += radio_source
550 # Use collected media items to calculate the radio if radio mode is on
551 if radio_mode:
552 radio_tracks = await self._get_radio_tracks(
553 queue_id=queue_id, is_initial_radio_mode=True
554 )
555 media_items = list(radio_tracks)
556
557 # only add valid/available items
558 queue_items: list[QueueItem] = []
559 for x in media_items:
560 if not x or not x.available:
561 continue
562 queue_items.append(
563 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
564 )
565
566 if not queue_items:
567 raise MediaNotFoundError("No playable items found")
568
569 # load the items into the queue
570 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
571 cur_index = (
572 queue.index_in_buffer
573 if queue.index_in_buffer is not None
574 else (queue.current_index if queue.current_index is not None else 0)
575 )
576 else:
577 cur_index = queue.current_index or 0
578 insert_at_index = cur_index + 1
579 # Radio modes are already shuffled in a pattern we would like to keep.
580 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
581
582 # handle replace: clear all items and replace with the new items
583 if option == QueueOption.REPLACE:
584 await self.load(
585 queue_id,
586 queue_items=queue_items,
587 keep_remaining=False,
588 keep_played=False,
589 shuffle=shuffle,
590 )
591 await self.play_index(queue_id, 0)
592 return
593 # handle next: add item(s) in the index next to the playing/loaded/buffered index
594 if option == QueueOption.NEXT:
595 await self.load(
596 queue_id,
597 queue_items=queue_items,
598 insert_at_index=insert_at_index,
599 shuffle=shuffle,
600 )
601 return
602 if option == QueueOption.REPLACE_NEXT:
603 await self.load(
604 queue_id,
605 queue_items=queue_items,
606 insert_at_index=insert_at_index,
607 keep_remaining=False,
608 shuffle=shuffle,
609 )
610 return
611 # handle play: replace current loaded/playing index with new item(s)
612 if option == QueueOption.PLAY:
613 await self.load(
614 queue_id,
615 queue_items=queue_items,
616 insert_at_index=insert_at_index,
617 shuffle=shuffle,
618 )
619 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
620 await self.play_index(queue_id, next_index)
621 return
622 # handle add: add/append item(s) to the remaining queue items
623 if option == QueueOption.ADD:
624 await self.load(
625 queue_id=queue_id,
626 queue_items=queue_items,
627 insert_at_index=insert_at_index
628 if queue.shuffle_enabled
629 else len(self._queue_items[queue_id]) + 1,
630 shuffle=queue.shuffle_enabled,
631 )
632 # handle edgecase, queue is empty and items are only added (not played)
633 # mark first item as new index
634 if queue.current_index is None:
635 queue.current_index = 0
636 queue.current_item = self.get_item(queue_id, 0)
637 queue.items = len(queue_items)
638 self.signal_update(queue_id)
639
640 @api_command("player_queues/move_item")
641 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
642 """
643 Move queue item x up/down the queue.
644
645 - queue_id: id of the queue to process this request.
646 - queue_item_id: the item_id of the queueitem that needs to be moved.
647 - pos_shift: move item x positions down if positive value
648 - pos_shift: move item x positions up if negative value
649 - pos_shift: move item to top of queue as next item if 0.
650 """
651 queue = self._queues[queue_id]
652 item_index = self.index_by_id(queue_id, queue_item_id)
653 if item_index is None:
654 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
655 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
656 msg = f"{item_index} is already played/buffered"
657 raise IndexError(msg)
658
659 queue_items = self._queue_items[queue_id]
660 queue_items = queue_items.copy()
661
662 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
663 new_index = (queue.current_index or 0) + 1
664 elif pos_shift == 0:
665 new_index = queue.current_index or 0
666 else:
667 new_index = item_index + pos_shift
668 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
669 return
670 # move the item in the list
671 queue_items.insert(new_index, queue_items.pop(item_index))
672 self.update_items(queue_id, queue_items)
673
674 @api_command("player_queues/move_item_end")
675 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
676 """
677 Move queue item to the end the queue.
678
679 - queue_id: id of the queue to process this request.
680 - queue_item_id: the item_id of the queueitem that needs to be moved.
681 """
682 queue = self._queues[queue_id]
683 item_index = self.index_by_id(queue_id, queue_item_id)
684 if item_index is None:
685 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
686 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
687 msg = f"{item_index} is already played/buffered"
688 raise IndexError(msg)
689
690 queue_items = self._queue_items[queue_id]
691 if item_index == (len(queue_items) - 1):
692 return
693 queue_items = queue_items.copy()
694
695 new_index = len(self._queue_items[queue_id]) - 1
696
697 # move the item in the list
698 queue_items.insert(new_index, queue_items.pop(item_index))
699 self.update_items(queue_id, queue_items)
700
701 @api_command("player_queues/delete_item")
702 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
703 """Delete item (by id or index) from the queue."""
704 if isinstance(item_id_or_index, str):
705 item_index = self.index_by_id(queue_id, item_id_or_index)
706 if item_index is None:
707 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
708 else:
709 item_index = item_id_or_index
710 queue = self._queues[queue_id]
711 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
712 # ignore request if track already loaded in the buffer
713 # the frontend should guard so this is just in case
714 self.logger.warning("delete requested for item already loaded in buffer")
715 return
716 queue_items = self._queue_items[queue_id]
717 queue_items.pop(item_index)
718 self.update_items(queue_id, queue_items)
719
720 @api_command("player_queues/clear")
721 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
722 """Clear all items in the queue."""
723 queue = self._queues[queue_id]
724 queue.radio_source = []
725 if queue.state != PlaybackState.IDLE and not skip_stop:
726 self.mass.create_task(self.stop(queue_id))
727 queue.current_index = None
728 queue.current_item = None
729 queue.elapsed_time = 0
730 queue.elapsed_time_last_updated = time.time()
731 queue.index_in_buffer = None
732 self.mass.create_task(self.mass.streams.cleanup_queue_audio_data(queue_id))
733 self.update_items(queue_id, [])
734
735 @api_command("player_queues/save_as_playlist")
736 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
737 """Save the current queue items as a new playlist.
738
739 :param queue_id: The queue_id of the queue to save.
740 :param name: The name for the new playlist.
741 """
742 if not self.get(queue_id):
743 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
744 queue_items = self._queue_items.get(queue_id, [])
745 if not queue_items:
746 raise QueueEmpty("Cannot save an empty queue as a playlist.")
747 # collect URIs from queue items that are playlist-compatible
748 uris: list[str] = []
749 for item in queue_items:
750 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
751 uris.append(item.uri)
752 if not uris:
753 raise InvalidDataError("No valid items in queue to save as playlist.")
754 playlist = await self.mass.music.playlists.create_playlist(name)
755 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
756 return playlist
757
758 @api_command("player_queues/stop")
759 async def stop(self, queue_id: str) -> None:
760 """
761 Handle STOP command for given queue.
762
763 - queue_id: queue_id of the playerqueue to handle the command.
764 """
765 queue_player = self.mass.players.get_player(queue_id, True)
766 if queue_player is None:
767 raise PlayerUnavailableError(f"Player {queue_id} is not available")
768 if (queue := self.get(queue_id)) and queue.active:
769 if queue.state == PlaybackState.PLAYING:
770 queue.resume_pos = int(queue.corrected_elapsed_time)
771 # Set context to prevent circular call, then forward the actual command to the player
772 token = IN_QUEUE_COMMAND.set(True)
773 try:
774 await self.mass.players.cmd_stop(queue_id)
775 finally:
776 IN_QUEUE_COMMAND.reset(token)
777 self.mass.create_task(self.mass.streams.cleanup_queue_audio_data(queue_id))
778
779 @api_command("player_queues/play")
780 async def play(self, queue_id: str) -> None:
781 """
782 Handle PLAY command for given queue.
783
784 - queue_id: queue_id of the playerqueue to handle the command.
785 """
786 queue_player = self.mass.players.get_player(queue_id, True)
787 if queue_player is None:
788 raise PlayerUnavailableError(f"Player {queue_id} is not available")
789 if (
790 (queue := self._queues.get(queue_id))
791 and queue.active
792 and queue.state == PlaybackState.PAUSED
793 ):
794 # forward the actual play/unpause command to the player
795 await queue_player.play()
796 return
797 # player is not paused, perform resume instead
798 await self.resume(queue_id)
799
800 @api_command("player_queues/pause")
801 async def pause(self, queue_id: str) -> None:
802 """Handle PAUSE command for given queue.
803
804 - queue_id: queue_id of the playerqueue to handle the command.
805 """
806 if not (queue := self._queues.get(queue_id)):
807 return
808 queue_active = queue.active
809 if queue.active and queue.state == PlaybackState.PLAYING:
810 queue.resume_pos = int(queue.corrected_elapsed_time)
811 # forward the actual command to the player controller
812 # Set context to prevent circular call, then forward the actual command to the player
813 token = IN_QUEUE_COMMAND.set(True)
814 try:
815 await self.mass.players.cmd_pause(queue_id)
816 finally:
817 IN_QUEUE_COMMAND.reset(token)
818
819 async def _watch_pause(player: Player) -> None:
820 count = 0
821 # wait for pause
822 while count < 5 and player.state.playback_state == PlaybackState.PLAYING:
823 count += 1
824 await asyncio.sleep(1)
825 # wait for unpause
826 if player.state.playback_state != PlaybackState.PAUSED:
827 return
828 count = 0
829 while count < 30 and player.state.playback_state == PlaybackState.PAUSED:
830 count += 1
831 await asyncio.sleep(1)
832 # if player is still paused when the limit is reached, send stop
833 if player.state.playback_state == PlaybackState.PAUSED:
834 await self.stop(queue_id)
835
836 # we auto stop a player from paused when its paused for 30 seconds
837 if (
838 queue_active
839 and (queue_player := self.mass.players.get_player(queue_id))
840 and not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS)
841 ):
842 self.mass.create_task(_watch_pause(queue_player))
843
844 @api_command("player_queues/play_pause")
845 async def play_pause(self, queue_id: str) -> None:
846 """Toggle play/pause on given playerqueue.
847
848 - queue_id: queue_id of the queue to handle the command.
849 """
850 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
851 await self.pause(queue_id)
852 return
853 await self.play(queue_id)
854
855 @api_command("player_queues/next")
856 async def next(self, queue_id: str) -> None:
857 """Handle NEXT TRACK command for given queue.
858
859 - queue_id: queue_id of the queue to handle the command.
860 """
861 if (queue := self.get(queue_id)) is None or not queue.active:
862 raise InvalidCommand(f"Queue {queue_id} is not active")
863 idx = self._queues[queue_id].current_index
864 if idx is None:
865 self.logger.warning("Queue %s has no current index", queue.display_name)
866 return
867 attempts = 5
868 while attempts:
869 try:
870 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
871 await self.play_index(queue_id, next_index, debounce=True)
872 break
873 except MediaNotFoundError:
874 self.logger.warning(
875 "Failed to fetch next track for queue %s - trying next item",
876 queue.display_name,
877 )
878 idx += 1
879 attempts -= 1
880
881 @api_command("player_queues/previous")
882 async def previous(self, queue_id: str) -> None:
883 """Handle PREVIOUS TRACK command for given queue.
884
885 - queue_id: queue_id of the queue to handle the command.
886 """
887 if (queue := self.get(queue_id)) is None or not queue.active:
888 raise InvalidCommand(f"Queue {queue_id} is not active")
889 current_index = self._queues[queue_id].current_index
890 if current_index is None:
891 return
892 next_index = int(current_index)
893 # restart current track if current track has played longer than 4
894 # otherwise skip to previous track
895 if self._queues[queue_id].elapsed_time < 5:
896 next_index = max(current_index - 1, 0)
897 await self.play_index(queue_id, next_index, debounce=True)
898
899 @api_command("player_queues/skip")
900 async def skip(self, queue_id: str, seconds: int = 10) -> None:
901 """Handle SKIP command for given queue.
902
903 - queue_id: queue_id of the queue to handle the command.
904 - seconds: number of seconds to skip in track. Use negative value to skip back.
905 """
906 if (queue := self.get(queue_id)) is None or not queue.active:
907 raise InvalidCommand(f"Queue {queue_id} is not active")
908 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
909
910 @api_command("player_queues/seek")
911 async def seek(self, queue_id: str, position: int = 10) -> None:
912 """Handle SEEK command for given queue.
913
914 - queue_id: queue_id of the queue to handle the command.
915 - position: position in seconds to seek to in the current playing item.
916 """
917 if (queue := self.get(queue_id)) is None or not queue.active:
918 raise InvalidCommand(f"Queue {queue_id} is not active")
919 queue_player = self.mass.players.get_player(queue_id, True)
920 if queue_player is None:
921 raise PlayerUnavailableError(f"Player {queue_id} is not available")
922 if not queue.current_item:
923 raise InvalidCommand(f"Queue {queue_player.state.name} has no item(s) loaded.")
924 if not queue.current_item.duration:
925 raise InvalidCommand("Can not seek items without duration.")
926 position = max(0, int(position))
927 if position > queue.current_item.duration:
928 raise InvalidCommand("Can not seek outside of duration range.")
929 if queue.current_index is None:
930 raise InvalidCommand(f"Queue {queue_player.state.name} has no current index.")
931 await self.play_index(queue_id, queue.current_index, seek_position=position)
932
933 @api_command("player_queues/resume")
934 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
935 """Handle RESUME command for given queue.
936
937 - queue_id: queue_id of the queue to handle the command.
938 """
939 queue = self._queues[queue_id]
940 queue_items = self._queue_items[queue_id]
941 resume_item = queue.current_item
942 if queue.state == PlaybackState.PLAYING:
943 # resume requested while already playing,
944 # use current position as resume position
945 resume_pos = queue.corrected_elapsed_time
946 fade_in = False
947 else:
948 resume_pos = queue.resume_pos or queue.elapsed_time
949
950 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
951 resume_item = self.get_item(queue_id, queue.current_index)
952 resume_pos = 0
953 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
954 # items available in queue but no previous track, start at 0
955 resume_item = self.get_item(queue_id, 0)
956 resume_pos = 0
957
958 if resume_item is not None:
959 queue_player = self.mass.players.get_player(queue_id)
960 if queue_player is None:
961 raise PlayerUnavailableError(f"Player {queue_id} is not available")
962 if (
963 fade_in is None
964 and queue_player.state.playback_state == PlaybackState.IDLE
965 and (time.time() - queue.elapsed_time_last_updated) > 60
966 ):
967 # enable fade in effect if the player is idle for a while
968 fade_in = resume_pos > 0
969 if resume_item.media_type == MediaType.RADIO:
970 # we're not able to skip in online radio so this is pointless
971 resume_pos = 0
972 await self.play_index(
973 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
974 )
975 else:
976 # Queue is empty, try to resume from playlog
977 if await self._try_resume_from_playlog(queue):
978 return
979 msg = f"Resume queue requested but queue {queue.display_name} is empty"
980 raise QueueEmpty(msg)
981
982 @api_command("player_queues/play_index")
983 async def play_index(
984 self,
985 queue_id: str,
986 index: int | str,
987 seek_position: int = 0,
988 fade_in: bool = False,
989 debounce: bool = False,
990 ) -> None:
991 """Play item at index (or item_id) X in queue."""
992 queue = self._queues[queue_id]
993 queue.resume_pos = 0
994 if isinstance(index, str):
995 temp_index = self.index_by_id(queue_id, index)
996 if temp_index is None:
997 raise InvalidDataError(f"Item {index} not found in queue")
998 index = temp_index
999 # At this point index is guaranteed to be int
1000 queue.current_index = index
1001 # update current item and elapsed time and signal update
1002 # this way the UI knows immediately that a new item is loading
1003 queue.current_item = self.get_item(queue_id, index)
1004 queue.elapsed_time = seek_position
1005 queue.elapsed_time_last_updated = time.time()
1006 self.signal_update(queue_id)
1007 queue.index_in_buffer = index
1008 queue.flow_mode_stream_log = []
1009 target_player = self.mass.players.get_player(queue_id)
1010 if target_player is None:
1011 raise PlayerUnavailableError(f"Player {queue_id} is not available")
1012 queue.next_item_id_enqueued = None
1013 # always update session id when we start a new playback session
1014 queue.session_id = shortuuid.random(length=8)
1015 # handle resume point of audiobook(chapter) or podcast(episode)
1016 if (
1017 not seek_position
1018 and (queue_item := self.get_item(queue_id, index))
1019 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
1020 ):
1021 seek_position = max(0, int((resume_position_ms - 500) / 1000))
1022
1023 # send play_media request to player
1024 # NOTE that we debounce this a bit to account for someone hitting the next button
1025 # like a madman. This will prevent the player from being overloaded with requests.
1026 async def _play_index(index: int, debounce: bool) -> None:
1027 for attempt in range(5):
1028 try:
1029 queue_item = self.get_item(queue_id, index)
1030 if not queue_item:
1031 continue # guard
1032 await self._load_item(
1033 queue_item,
1034 self._get_next_index(queue_id, index),
1035 is_start=True,
1036 seek_position=seek_position if attempt == 0 else 0,
1037 fade_in=fade_in if attempt == 0 else False,
1038 )
1039 # if we reach this point, loading the item succeeded, break the loop
1040 queue.current_index = index
1041 queue.current_item = queue_item
1042 break
1043 except (MediaNotFoundError, AudioError):
1044 # the requested index can not be played.
1045 if queue_item:
1046 self.logger.warning(
1047 "Skipping unplayable item %s (%s)",
1048 queue_item.name,
1049 queue_item.uri,
1050 )
1051 queue_item.available = False
1052 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
1053 if next_index is None:
1054 raise MediaNotFoundError("No next item available")
1055 index = next_index
1056 else:
1057 # all attempts to find a playable item failed
1058 raise MediaNotFoundError("No playable item found to start playback")
1059
1060 # work out if we need to use flow mode
1061 flow_mode = target_player.flow_mode and queue_item.media_type not in (
1062 # don't use flow mode for duration-less streams
1063 MediaType.RADIO,
1064 MediaType.PLUGIN_SOURCE,
1065 )
1066 await asyncio.sleep(0.5 if debounce else 0.1)
1067 queue.flow_mode = flow_mode
1068 await self.mass.players.play_media(
1069 player_id=queue_id,
1070 media=await self.player_media_from_queue_item(queue_item, flow_mode),
1071 )
1072 queue.current_index = index
1073 queue.current_item = queue_item
1074 await asyncio.sleep(2)
1075 self._transitioning_players.discard(queue_id)
1076
1077 # we set a flag to notify the update logic that we're transitioning to a new track
1078 self._transitioning_players.add(queue_id)
1079
1080 # we debounce the play_index command to handle the case where someone
1081 # is spamming next/previous on the player
1082 task_id = f"play_index_{queue_id}"
1083 if existing_task := self.mass.get_task(task_id):
1084 existing_task.cancel()
1085 with suppress(asyncio.CancelledError):
1086 await existing_task
1087 task = self.mass.create_task(
1088 _play_index,
1089 index,
1090 debounce,
1091 task_id=task_id,
1092 )
1093 await task
1094 self.signal_update(queue_id)
1095
1096 @api_command("player_queues/transfer")
1097 async def transfer_queue(
1098 self,
1099 source_queue_id: str,
1100 target_queue_id: str,
1101 auto_play: bool | None = None,
1102 ) -> None:
1103 """Transfer queue to another queue."""
1104 if not (source_queue := self.get(source_queue_id)):
1105 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1106 if not (target_queue := self.get(target_queue_id)):
1107 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1108 if auto_play is None:
1109 auto_play = source_queue.state == PlaybackState.PLAYING
1110
1111 target_player = self.mass.players.get_player(target_queue_id)
1112 if target_player is None:
1113 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1114 if target_player.state.active_group or target_player.state.synced_to:
1115 # edge case: the user wants to move playback from the group as a whole, to a single
1116 # player in the group or it is grouped and the command targeted at the single player.
1117 # We need to dissolve the group first.
1118 group_id = target_player.state.active_group or target_player.state.synced_to
1119 assert group_id is not None # checked in if condition above
1120 await self.mass.players.cmd_ungroup(group_id)
1121 await asyncio.sleep(3)
1122
1123 source_items = self._queue_items[source_queue_id]
1124 target_queue.repeat_mode = source_queue.repeat_mode
1125 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1126 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1127 target_queue.radio_source = source_queue.radio_source
1128 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1129 target_queue.resume_pos = int(source_queue.elapsed_time)
1130 target_queue.current_index = source_queue.current_index
1131 if source_queue.current_item:
1132 target_queue.current_item = source_queue.current_item
1133 target_queue.current_item.queue_id = target_queue_id
1134 self.clear(source_queue_id)
1135
1136 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1137 for item in source_items:
1138 item.queue_id = target_queue_id
1139 self.update_items(target_queue_id, source_items)
1140 if auto_play:
1141 await self.resume(target_queue_id)
1142
1143 # Interaction with player
1144
1145 async def on_player_register(self, player: Player) -> None:
1146 """Register PlayerQueue for given player/queue id."""
1147 queue_id = player.player_id
1148 queue: PlayerQueue | None = None
1149 queue_items: list[QueueItem] = []
1150 # try to restore previous state
1151 if prev_state := await self.mass.cache.get(
1152 key=queue_id,
1153 provider=self.domain,
1154 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1155 ):
1156 try:
1157 queue = PlayerQueue.from_dict(prev_state)
1158 prev_items = await self.mass.cache.get(
1159 key=queue_id,
1160 provider=self.domain,
1161 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1162 default=[],
1163 )
1164 queue_items = []
1165 for idx, item_data in enumerate(prev_items):
1166 qi = QueueItem.from_cache(item_data)
1167 if not qi.media_item:
1168 # Skip items with missing media_item - this can happen if
1169 # MA was killed during shutdown while cache was being written
1170 self.logger.debug(
1171 "Skipping queue item %s (index %d) restored from cache "
1172 "without media_item",
1173 qi.name,
1174 idx,
1175 )
1176 continue
1177 queue_items.append(qi)
1178 if queue.enqueued_media_items:
1179 # we need to restore the MediaItem objects for the enqueued media items
1180 # Items from cache may be dicts that need deserialization
1181 restored_enqueued_items: list[MediaItemType] = []
1182 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1183 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1184 )
1185 for item in cached_items:
1186 if isinstance(item, dict):
1187 restored_item = media_from_dict(item)
1188 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1189 else:
1190 restored_enqueued_items.append(item)
1191 queue.enqueued_media_items = restored_enqueued_items
1192 except Exception as err:
1193 self.logger.warning(
1194 "Failed to restore the queue(items) for %s - %s",
1195 player.state.name,
1196 str(err),
1197 )
1198 # Reset to clean state on failure
1199 queue = None
1200 queue_items = []
1201 if queue is None:
1202 queue = PlayerQueue(
1203 queue_id=queue_id,
1204 active=False,
1205 display_name=player.state.name,
1206 available=player.state.available,
1207 dont_stop_the_music_enabled=False,
1208 items=0,
1209 )
1210
1211 self._queues[queue_id] = queue
1212 self._queue_items[queue_id] = queue_items
1213 # always call update to calculate state etc
1214 self.on_player_update(player, {})
1215 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1216
1217 def on_player_update(
1218 self,
1219 player: Player,
1220 changed_values: dict[str, tuple[Any, Any]],
1221 ) -> None:
1222 """
1223 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1224
1225 NOTE: This is called every second if the player is playing.
1226 """
1227 queue_id = player.player_id
1228 if (queue := self._queues.get(queue_id)) is None:
1229 # race condition
1230 return
1231 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1232 # do nothing while the announcement is in progress
1233 return
1234 # determine if this queue is currently active for this player
1235 queue.active = player.state.active_source in (queue.queue_id, None)
1236 if not queue.active and queue_id not in self._prev_states:
1237 queue.state = PlaybackState.IDLE
1238 # return early if the queue is not active and we have no previous state
1239 return
1240 if queue.queue_id in self._transitioning_players:
1241 # we're currently transitioning to a new track,
1242 # ignore updates from the player during this time
1243 return
1244
1245 # queue is active and preflight checks passed, update the queue details
1246 self._update_queue_from_player(player)
1247
1248 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1249 """Call when a player is removed from the registry."""
1250 if permanent:
1251 # if the player is permanently removed, we also remove the cached queue data
1252 self.mass.create_task(
1253 self.mass.cache.delete(
1254 key=player_id,
1255 provider=self.domain,
1256 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1257 )
1258 )
1259 self.mass.create_task(
1260 self.mass.cache.delete(
1261 key=player_id,
1262 provider=self.domain,
1263 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1264 )
1265 )
1266 self._queues.pop(player_id, None)
1267 self._queue_items.pop(player_id, None)
1268
1269 async def load_next_queue_item(
1270 self,
1271 queue_id: str,
1272 current_item_id: str,
1273 ) -> QueueItem:
1274 """
1275 Call when a player wants the next queue item to play.
1276
1277 Raises QueueEmpty if there are no more tracks left.
1278 """
1279 queue = self.get(queue_id)
1280 if not queue:
1281 msg = f"PlayerQueue {queue_id} is not available"
1282 raise PlayerUnavailableError(msg)
1283 cur_index = self.index_by_id(queue_id, current_item_id)
1284 if cur_index is None:
1285 # this is just a guard for bad data
1286 raise QueueEmpty("Invalid item id for queue given.")
1287 next_item: QueueItem | None = None
1288 idx = 0
1289 while True:
1290 next_index = self._get_next_index(queue_id, cur_index + idx)
1291 if next_index is None:
1292 raise QueueEmpty("No more tracks left in the queue.")
1293 queue_item = self.get_item(queue_id, next_index)
1294 if queue_item is None:
1295 raise QueueEmpty("No more tracks left in the queue.")
1296 if idx >= 10:
1297 # we only allow 10 retries to prevent infinite loops
1298 raise QueueEmpty("No more (playable) tracks left in the queue.")
1299 try:
1300 await self._load_item(queue_item, next_index)
1301 # we're all set, this is our next item
1302 next_item = queue_item
1303 break
1304 except (MediaNotFoundError, AudioError):
1305 # No stream details found, skip this QueueItem
1306 self.logger.warning(
1307 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1308 )
1309 queue_item.available = False
1310 idx += 1
1311 if idx != 0:
1312 # we skipped some items, signal a queue items update
1313 self.update_items(queue_id, self._queue_items[queue_id])
1314 if next_item is None:
1315 raise QueueEmpty("No more (playable) tracks left in the queue.")
1316
1317 return next_item
1318
1319 async def _load_item(
1320 self,
1321 queue_item: QueueItem,
1322 next_index: int | None,
1323 is_start: bool = False,
1324 seek_position: int = 0,
1325 fade_in: bool = False,
1326 ) -> None:
1327 """Try to load the stream details for the given queue item."""
1328 queue_id = queue_item.queue_id
1329 queue = self._queues[queue_id]
1330
1331 # we use a contextvar to bypass the throttler for this asyncio task/context
1332 # this makes sure that playback has priority over other requests that may be
1333 # happening in the background
1334 BYPASS_THROTTLER.set(True)
1335
1336 self.logger.debug(
1337 "(pre)loading (next) item for queue %s...",
1338 queue.display_name,
1339 )
1340
1341 if not queue_item.available:
1342 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1343
1344 # work out if we are playing an album and if we should prefer album
1345 # loudness
1346 next_track_from_same_album = (
1347 next_index is not None
1348 and (next_item := self.get_item(queue_id, next_index))
1349 and (
1350 queue_item.media_item
1351 and hasattr(queue_item.media_item, "album")
1352 and queue_item.media_item.album
1353 and next_item.media_item
1354 and hasattr(next_item.media_item, "album")
1355 and next_item.media_item.album
1356 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1357 )
1358 )
1359 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1360 if current_index is None:
1361 previous_track_from_same_album = False
1362 else:
1363 previous_index = max(current_index - 1, 0)
1364 previous_track_from_same_album = (
1365 previous_index > 0
1366 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1367 and previous_item.media_item is not None
1368 and hasattr(previous_item.media_item, "album")
1369 and previous_item.media_item.album is not None
1370 and queue_item.media_item is not None
1371 and hasattr(queue_item.media_item, "album")
1372 and queue_item.media_item.album is not None
1373 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1374 )
1375 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1376 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1377 album = queue_item.media_item.album
1378 # prefer the full library media item so we have all metadata and provider(quality) info
1379 # always request the full library item as there might be other qualities available
1380 if library_item := await self.mass.music.get_library_item_by_prov_id(
1381 queue_item.media_item.media_type,
1382 queue_item.media_item.item_id,
1383 queue_item.media_item.provider,
1384 ):
1385 queue_item.media_item = cast("Track", library_item)
1386 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1387 "ytmusic"
1388 ):
1389 # Youtube Music has poor thumbs by default, so we always fetch the full item
1390 # this also catches the case where they have an unavailable item in a listing
1391 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1392 queue_item.media_item = cast("Track", fetched_item)
1393
1394 # ensure we got the full (original) album set
1395 if album and (
1396 library_album := await self.mass.music.get_library_item_by_prov_id(
1397 album.media_type,
1398 album.item_id,
1399 album.provider,
1400 )
1401 ):
1402 queue_item.media_item.album = cast("Album", library_album)
1403 elif album:
1404 # Restore original album if we have no better alternative from the library
1405 queue_item.media_item.album = album
1406 # prefer album image over track image
1407 if queue_item.media_item.album and queue_item.media_item.album.image:
1408 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1409 queue_item.media_item.metadata.images = UniqueList(
1410 [
1411 queue_item.media_item.album.image,
1412 *org_images,
1413 ]
1414 )
1415 # Fetch the streamdetails, which could raise in case of an unplayable item.
1416 # For example, YT Music returns Radio Items that are not playable.
1417 queue_item.streamdetails = await get_stream_details(
1418 mass=self.mass,
1419 queue_item=queue_item,
1420 seek_position=seek_position,
1421 fade_in=fade_in,
1422 prefer_album_loudness=bool(playing_album_tracks),
1423 )
1424
1425 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1426 """Call when a player has (started) loading a track in the buffer."""
1427 queue = self.get(queue_id)
1428 if not queue:
1429 msg = f"PlayerQueue {queue_id} is not available"
1430 raise PlayerUnavailableError(msg)
1431 # store the index of the item that is currently (being) loaded in the buffer
1432 # which helps us a bit to determine how far the player has buffered ahead
1433 current_index = self.index_by_id(queue_id, item_id)
1434 queue.index_in_buffer = current_index
1435 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1436 self.signal_update(queue_id)
1437 # preload next streamdetails
1438 self._preload_next_item(queue_id, item_id)
1439 # clean up stale audio buffers for old queue items to prevent memory leaks
1440 if current_index is not None:
1441 self.mass.create_task(
1442 self.mass.streams.cleanup_stale_queue_buffers(queue_id, current_index)
1443 )
1444
1445 # Main queue manipulation methods
1446
1447 async def load(
1448 self,
1449 queue_id: str,
1450 queue_items: list[QueueItem],
1451 insert_at_index: int = 0,
1452 keep_remaining: bool = True,
1453 keep_played: bool = True,
1454 shuffle: bool = False,
1455 ) -> None:
1456 """Load new items at index.
1457
1458 - queue_id: id of the queue to process this request.
1459 - queue_items: a list of QueueItems
1460 - insert_at_index: insert the item(s) at this index
1461 - keep_remaining: keep the remaining items after the insert
1462 - shuffle: (re)shuffle the items after insert index
1463 """
1464 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1465 next_items = queue_items
1466
1467 # if keep_remaining, append the old 'next' items
1468 if keep_remaining:
1469 next_items += self._queue_items[queue_id][insert_at_index:]
1470
1471 # we set the original insert order as attribute so we can un-shuffle
1472 for index, item in enumerate(next_items):
1473 item.sort_index += insert_at_index + index
1474 # (re)shuffle the final batch if needed
1475 if shuffle:
1476 next_items = await _smart_shuffle(next_items)
1477 self.update_items(queue_id, prev_items + next_items)
1478
1479 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1480 """Update the existing queue items, mostly caused by reordering."""
1481 self._queue_items[queue_id] = queue_items
1482 queue = self._queues[queue_id]
1483 queue.items = len(self._queue_items[queue_id])
1484 # to track if the queue items changed we set a timestamp
1485 # this is a simple way to detect changes in the list of items
1486 # without having to compare the entire list
1487 queue.items_last_updated = time.time()
1488 self.signal_update(queue_id, True)
1489 if (
1490 queue.state == PlaybackState.PLAYING
1491 and queue.index_in_buffer is not None
1492 and queue.index_in_buffer == queue.current_index
1493 ):
1494 # if the queue is playing,
1495 # ensure to (re)queue the next track because it might have changed
1496 # note that we only do this if the player has loaded the current track
1497 # if not, we wait until it has loaded to prevent conflicts
1498 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1499 self._enqueue_next_item(queue_id, next_item)
1500
1501 # Helper methods
1502
1503 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1504 """Get queue item by index or item_id."""
1505 if item_id_or_index is None:
1506 return None
1507 if (queue_items := self._queue_items.get(queue_id)) is None:
1508 return None
1509 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1510 return queue_items[item_id_or_index]
1511 if isinstance(item_id_or_index, str):
1512 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1513 return None
1514
1515 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1516 """Signal state changed of given queue."""
1517 queue = self._queues[queue_id]
1518 if items_changed:
1519 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1520 # save items in cache - only cache items with valid media_item
1521 cache_data = [
1522 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1523 ]
1524 self.mass.create_task(
1525 self.mass.cache.set(
1526 key=queue_id,
1527 data=cache_data,
1528 provider=self.domain,
1529 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1530 )
1531 )
1532 # always send the base event
1533 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1534 # also signal update to the player itself so it can update its current_media
1535 self.mass.players.trigger_player_update(queue_id)
1536 # save state
1537 self.mass.create_task(
1538 self.mass.cache.set(
1539 key=queue_id,
1540 data=queue.to_cache(),
1541 provider=self.domain,
1542 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1543 )
1544 )
1545
1546 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1547 """Get index by queue_item_id."""
1548 queue_items = self._queue_items[queue_id]
1549 for index, item in enumerate(queue_items):
1550 if item.queue_item_id == queue_item_id:
1551 return index
1552 return None
1553
1554 async def player_media_from_queue_item(
1555 self, queue_item: QueueItem, flow_mode: bool
1556 ) -> PlayerMedia:
1557 """Parse PlayerMedia from QueueItem."""
1558 queue = self._queues[queue_item.queue_id]
1559 if flow_mode:
1560 duration = None
1561 elif queue_item.streamdetails:
1562 # prefer netto duration
1563 # when seeking, the player only receives the remaining duration
1564 duration = queue_item.streamdetails.duration or queue_item.duration
1565 if duration and queue_item.streamdetails.seek_position:
1566 duration = duration - queue_item.streamdetails.seek_position
1567 else:
1568 duration = queue_item.duration
1569 if queue.session_id is None:
1570 # handle error or return early
1571 raise InvalidDataError("Queue session_id is None")
1572 media = PlayerMedia(
1573 uri=queue_item.uri,
1574 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1575 title="Music Assistant" if flow_mode else queue_item.name,
1576 image_url=MASS_LOGO_ONLINE,
1577 duration=duration,
1578 source_id=queue_item.queue_id,
1579 queue_item_id=queue_item.queue_item_id,
1580 custom_data={
1581 "session_id": queue.session_id,
1582 "original_uri": queue_item.uri,
1583 "flow_mode": flow_mode,
1584 },
1585 )
1586 if not flow_mode and queue_item.media_item:
1587 media.title = queue_item.media_item.name
1588 media.artist = getattr(queue_item.media_item, "artist_str", "")
1589 media.album = (
1590 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1591 )
1592 if queue_item.image:
1593 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1594 # we prefer the imageproxy on the streamserver here because this request is sent
1595 # to the player itself which may not be able to reach the regular webserver
1596 media.image_url = self.mass.metadata.get_image_url(
1597 queue_item.image, size=500, prefer_stream_server=True
1598 )
1599 return media
1600
1601 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1602 """Return tracks for given artist, based on user preference."""
1603 artist_items_conf = self.mass.config.get_raw_core_config_value(
1604 self.domain,
1605 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1606 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1607 )
1608 self.logger.info(
1609 "Fetching tracks to play for artist %s",
1610 artist.name,
1611 )
1612 if artist_items_conf in ("library_tracks", "all_tracks"):
1613 all_items = await self.mass.music.artists.tracks(
1614 artist.item_id,
1615 artist.provider,
1616 in_library_only=artist_items_conf == "library_tracks",
1617 )
1618 random.shuffle(all_items)
1619 return all_items
1620 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1621 all_tracks: list[Track] = []
1622 for library_album in await self.mass.music.artists.albums(
1623 artist.item_id,
1624 artist.provider,
1625 in_library_only=artist_items_conf == "library_album_tracks",
1626 ):
1627 for album_track in await self.mass.music.albums.tracks(
1628 library_album.item_id, library_album.provider
1629 ):
1630 if album_track not in all_tracks:
1631 all_tracks.append(album_track)
1632 random.shuffle(all_tracks)
1633 return all_tracks
1634 return []
1635
1636 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1637 """Return tracks for given album, based on user preference."""
1638 album_items_conf = self.mass.config.get_raw_core_config_value(
1639 self.domain,
1640 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1641 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1642 )
1643 result: list[Track] = []
1644 start_item_found = False
1645 self.logger.info(
1646 "Fetching tracks to play for album %s",
1647 album.name,
1648 )
1649 for album_track in await self.mass.music.albums.tracks(
1650 item_id=album.item_id,
1651 provider_instance_id_or_domain=album.provider,
1652 in_library_only=album_items_conf == "library_tracks",
1653 ):
1654 if not album_track.available:
1655 continue
1656 if start_item in (album_track.item_id, album_track.uri):
1657 start_item_found = True
1658 if start_item is not None and not start_item_found:
1659 continue
1660 result.append(album_track)
1661 return result
1662
1663 async def get_genre_tracks(self, genre: Genre, start_item: str | None) -> list[Track]:
1664 """Return tracks for given genre, based on alias mappings.
1665
1666 Limits results to avoid loading thousands of tracks for broad genres.
1667 Directly mapped tracks are fetched with random ordering, then supplemented
1668 with tracks from a limited set of mapped albums and artists.
1669 """
1670 result: list[Track] = []
1671 start_item_found = False
1672 self.logger.info(
1673 "Fetching tracks to play for genre %s",
1674 genre.name,
1675 )
1676 tracks, albums, artists = await self.mass.music.genres.mapped_media(
1677 genre,
1678 track_limit=25,
1679 album_limit=5,
1680 artist_limit=5,
1681 order_by="random",
1682 )
1683
1684 for genre_track in tracks:
1685 if not genre_track.available:
1686 continue
1687 if start_item in (genre_track.item_id, genre_track.uri):
1688 start_item_found = True
1689 if start_item is not None and not start_item_found:
1690 continue
1691 result.append(genre_track)
1692
1693 for album in albums:
1694 album_tracks = await self.get_album_tracks(album, None)
1695 result.extend(album_tracks[:5])
1696
1697 for artist in artists:
1698 artist_tracks = await self.get_artist_tracks(artist)
1699 result.extend(artist_tracks[:5])
1700 return result
1701
1702 async def get_playlist_tracks(
1703 self, playlist: Playlist, start_item: str | None
1704 ) -> list[PlaylistPlayableItem]:
1705 """Return tracks for given playlist, based on user preference."""
1706 result: list[PlaylistPlayableItem] = []
1707 start_item_found = False
1708 self.logger.info(
1709 "Fetching tracks to play for playlist %s",
1710 playlist.name,
1711 )
1712 # TODO: Handle other sort options etc.
1713 async for playlist_track in self.mass.music.playlists.tracks(
1714 playlist.item_id, playlist.provider
1715 ):
1716 if not playlist_track.available:
1717 continue
1718 if start_item in (playlist_track.item_id, playlist_track.uri):
1719 start_item_found = True
1720 if start_item is not None and not start_item_found:
1721 continue
1722 result.append(playlist_track)
1723 return result
1724
1725 async def get_audiobook_resume_point(
1726 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1727 ) -> int:
1728 """Return resume point (in milliseconds) for given audio book."""
1729 self.logger.debug(
1730 "Fetching resume point to play for audio book %s",
1731 audio_book.name,
1732 )
1733 if chapter is not None:
1734 # user explicitly selected a chapter to play
1735 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1736 if chapters := audio_book.metadata.chapters:
1737 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1738 return int(_chapter.start * 1000)
1739 raise InvalidDataError(
1740 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1741 )
1742 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1743 audio_book, userid=userid
1744 )
1745 return 0 if full_played else resume_position_ms
1746
1747 async def get_next_podcast_episodes(
1748 self,
1749 podcast: Podcast | None,
1750 episode: PodcastEpisode | str | None,
1751 userid: str | None = None,
1752 ) -> UniqueList[PodcastEpisode]:
1753 """Return (next) episode(s) and resume point for given podcast."""
1754 if podcast is None and isinstance(episode, str | NoneType):
1755 raise InvalidDataError("Either podcast or episode must be provided")
1756 if podcast is None:
1757 # single podcast episode requested
1758 assert isinstance(episode, PodcastEpisode) # checked above
1759 self.logger.debug(
1760 "Fetching resume point to play for Podcast episode %s",
1761 episode.name,
1762 )
1763 (
1764 fully_played,
1765 resume_position_ms,
1766 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1767 episode.fully_played = fully_played
1768 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1769 return UniqueList([episode])
1770 # podcast with optional start episode requested
1771 self.logger.debug(
1772 "Fetching episode(s) and resume point to play for Podcast %s",
1773 podcast.name,
1774 )
1775 all_episodes = [
1776 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1777 ]
1778 all_episodes.sort(key=lambda x: x.position)
1779 # if a episode was provided, a user explicitly selected a episode to play
1780 # so we need to find the index of the episode in the list
1781 resolved_episode: PodcastEpisode | None = None
1782 if isinstance(episode, PodcastEpisode):
1783 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1784 if resolved_episode:
1785 # ensure we have accurate resume info
1786 (
1787 fully_played,
1788 resume_position_ms,
1789 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1790 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1791 elif isinstance(episode, str):
1792 resolved_episode = next(
1793 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1794 )
1795 if resolved_episode:
1796 # ensure we have accurate resume info
1797 (
1798 fully_played,
1799 resume_position_ms,
1800 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1801 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1802 else:
1803 # get first episode that is not fully played
1804 for ep in all_episodes:
1805 if ep.fully_played:
1806 continue
1807 # ensure we have accurate resume info
1808 (
1809 fully_played,
1810 resume_position_ms,
1811 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1812 if fully_played:
1813 continue
1814 ep.resume_position_ms = resume_position_ms
1815 resolved_episode = ep
1816 break
1817 else:
1818 # no episodes found that are not fully played, so we start at the beginning
1819 resolved_episode = next((x for x in all_episodes), None)
1820 if resolved_episode is None:
1821 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1822 # get the index of the episode
1823 episode_index = all_episodes.index(resolved_episode)
1824 # return the (remaining) episode(s) to play
1825 return UniqueList(all_episodes[episode_index:])
1826
1827 def _get_next_index(
1828 self,
1829 queue_id: str,
1830 cur_index: int | None,
1831 is_skip: bool = False,
1832 allow_repeat: bool = True,
1833 ) -> int | None:
1834 """
1835 Return the next index for the queue, accounting for repeat settings.
1836
1837 Will return None if there are no (more) items in the queue.
1838 """
1839 queue = self._queues[queue_id]
1840 queue_items = self._queue_items[queue_id]
1841 if not queue_items or cur_index is None:
1842 # queue is empty
1843 return None
1844 # handle repeat single track
1845 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1846 return cur_index if allow_repeat else None
1847 # handle cur_index is last index of the queue
1848 if cur_index >= (len(queue_items) - 1):
1849 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1850 # if repeat all is enabled, we simply start again from the beginning
1851 return 0
1852 return None
1853 # all other: just the next index
1854 return cur_index + 1
1855
1856 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1857 """Return next QueueItem for given queue."""
1858 index: int
1859 if isinstance(cur_index, str):
1860 resolved_index = self.index_by_id(queue_id, cur_index)
1861 if resolved_index is None:
1862 return None # guard
1863 index = resolved_index
1864 else:
1865 index = cur_index
1866 # At this point index is guaranteed to be int
1867 for skip in range(5):
1868 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1869 break
1870 next_item = self.get_item(queue_id, next_index)
1871 if next_item is None:
1872 continue
1873 if not next_item.available:
1874 # ensure that we skip unavailable items (set by load_next track logic)
1875 continue
1876 return next_item
1877 return None
1878
1879 async def _fill_radio_tracks(self, queue_id: str) -> None:
1880 """Fill a Queue with (additional) Radio tracks."""
1881 self.logger.debug(
1882 "Filling radio tracks for queue %s",
1883 queue_id,
1884 )
1885 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1886 # fill queue - filter out unavailable items
1887 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1888 await self.load(
1889 queue_id,
1890 queue_items,
1891 insert_at_index=len(self._queue_items[queue_id]) + 1,
1892 )
1893
1894 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1895 """Enqueue the next item on the player."""
1896 if not next_item:
1897 # no next item, nothing to do...
1898 return
1899
1900 queue = self._queues[queue_id]
1901 if queue.flow_mode:
1902 # ignore this for flow mode
1903 return
1904
1905 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1906 await self.mass.players.enqueue_next_media(
1907 player_id=queue_id,
1908 media=await self.player_media_from_queue_item(next_item, False),
1909 )
1910 if queue.next_item_id_enqueued != next_item.queue_item_id:
1911 queue.next_item_id_enqueued = next_item.queue_item_id
1912 self.logger.debug(
1913 "Enqueued next track %s on queue %s",
1914 next_item.name,
1915 self._queues[queue_id].display_name,
1916 )
1917
1918 task_id = f"enqueue_next_item_{queue_id}"
1919 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1920
1921 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1922 """
1923 Preload the streamdetails for the next item in the queue/buffer.
1924
1925 This basically ensures the item is playable and fetches the stream details.
1926 If an error occurs, the item will be skipped and the next item will be loaded.
1927 """
1928 queue = self._queues[queue_id]
1929
1930 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1931 try:
1932 # wait for the item that was loaded in the buffer is the actually playing item
1933 # this prevents a race condition when we preload the next item too soon
1934 # while the player is actually preloading the previously enqueued item.
1935 retries = 120
1936 while retries > 0:
1937 if not queue.current_item:
1938 return # guard
1939 if queue.current_item.queue_item_id == item_id_in_buffer:
1940 break
1941 retries -= 1
1942 await asyncio.sleep(1)
1943 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1944 self.logger.debug(
1945 "Preloaded next item %s for queue %s",
1946 next_item.name,
1947 queue.display_name,
1948 )
1949 # enqueue the next item on the player
1950 self._enqueue_next_item(queue_id, next_item)
1951
1952 except QueueEmpty:
1953 return
1954
1955 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1956 # this should not happen, but guard anyways
1957 return
1958 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1959 # radio items or no duration, nothing to do
1960 return
1961
1962 task_id = f"preload_next_item_{queue_id}"
1963 self.mass.create_task(
1964 _preload_streamdetails,
1965 item_id_in_buffer,
1966 task_id=task_id,
1967 abort_existing=True,
1968 )
1969
1970 async def _resolve_media_items(
1971 self,
1972 media_item: MediaItemType | ItemMapping | BrowseFolder,
1973 start_item: str | None = None,
1974 userid: str | None = None,
1975 queue_id: str | None = None,
1976 ) -> list[MediaItemType]:
1977 """Resolve/unwrap media items to enqueue."""
1978 # resolve Itemmapping to full media item
1979 if isinstance(media_item, ItemMapping):
1980 if media_item.uri is None:
1981 raise InvalidDataError("ItemMapping has no URI")
1982 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1983 if media_item.media_type == MediaType.PLAYLIST:
1984 media_item = cast("Playlist", media_item)
1985 self.mass.create_task(
1986 self.mass.music.mark_item_played(
1987 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1988 )
1989 )
1990 return list(await self.get_playlist_tracks(media_item, start_item))
1991 if media_item.media_type == MediaType.ARTIST:
1992 media_item = cast("Artist", media_item)
1993 self.mass.create_task(
1994 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1995 )
1996 return list(await self.get_artist_tracks(media_item))
1997 if media_item.media_type == MediaType.ALBUM:
1998 media_item = cast("Album", media_item)
1999 self.mass.create_task(
2000 self.mass.music.mark_item_played(
2001 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2002 )
2003 )
2004 return list(await self.get_album_tracks(media_item, start_item))
2005 if media_item.media_type == MediaType.GENRE:
2006 media_item = cast("Genre", media_item)
2007 self.mass.create_task(
2008 self.mass.music.mark_item_played(
2009 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2010 )
2011 )
2012 return list(await self.get_genre_tracks(media_item, start_item))
2013 if media_item.media_type == MediaType.AUDIOBOOK:
2014 media_item = cast("Audiobook", media_item)
2015 # ensure we grab the correct/latest resume point info
2016 media_item.resume_position_ms = await self.get_audiobook_resume_point(
2017 media_item, start_item, userid=userid
2018 )
2019 return [media_item]
2020 if media_item.media_type == MediaType.PODCAST:
2021 media_item = cast("Podcast", media_item)
2022 self.mass.create_task(
2023 self.mass.music.mark_item_played(
2024 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2025 )
2026 )
2027 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
2028 if media_item.media_type == MediaType.PODCAST_EPISODE:
2029 media_item = cast("PodcastEpisode", media_item)
2030 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
2031 if media_item.media_type == MediaType.FOLDER:
2032 media_item = cast("BrowseFolder", media_item)
2033 return list(await self._get_folder_tracks(media_item))
2034 # all other: single track or radio item
2035 return [cast("MediaItemType", media_item)]
2036
2037 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
2038 """Try to resume playback from playlog when queue is empty.
2039
2040 Attempts to find user-initiated recently played items in the following order:
2041 1. By userid AND queue_id
2042 2. By queue_id only
2043 3. By userid only (if available)
2044 4. Any recently played item
2045
2046 :param queue: The queue to resume playback on.
2047 :return: True if playback was started, False otherwise.
2048 """
2049 # Try different filter combinations in order of specificity
2050 filter_attempts: list[tuple[str | None, str | None, str]] = []
2051 if queue.userid:
2052 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
2053 filter_attempts.append((None, queue.queue_id, "queue_id match"))
2054 if queue.userid:
2055 filter_attempts.append((queue.userid, None, "userid match"))
2056 filter_attempts.append((None, None, "any recent item"))
2057
2058 for userid, queue_id, match_type in filter_attempts:
2059 items = await self.mass.music.recently_played(
2060 limit=5,
2061 fully_played_only=False,
2062 user_initiated_only=True,
2063 userid=userid,
2064 queue_id=queue_id,
2065 )
2066 for item in items:
2067 if not item.uri:
2068 continue
2069 try:
2070 await self.play_media(queue.queue_id, item)
2071 self.logger.info(
2072 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
2073 )
2074 return True
2075 except MusicAssistantError as err:
2076 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
2077 continue
2078
2079 return False
2080
2081 async def _get_radio_tracks(
2082 self, queue_id: str, is_initial_radio_mode: bool = False
2083 ) -> list[Track]:
2084 """Call the registered music providers for dynamic tracks."""
2085 queue = self._queues[queue_id]
2086 queue_track_items: list[Track] = [
2087 q.media_item
2088 for q in self._queue_items[queue_id]
2089 if q.media_item and isinstance(q.media_item, Track)
2090 ]
2091 if not queue.radio_source:
2092 # this may happen during race conditions as this method is called delayed
2093 return []
2094 self.logger.info(
2095 "Fetching radio tracks for queue %s based on: %s",
2096 queue.display_name,
2097 ", ".join([x.name for x in queue.radio_source]),
2098 )
2099
2100 # Get user's preferred provider instances for steering provider selection
2101 preferred_provider_instances: list[str] | None = None
2102 if (
2103 queue.userid
2104 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
2105 and playback_user.provider_filter
2106 ):
2107 preferred_provider_instances = playback_user.provider_filter
2108
2109 available_base_tracks: list[Track] = []
2110 base_track_sample_size = 5
2111 # Some providers have very deterministic similar track algorithms when providing
2112 # a single track item. When we have a radio mode based on 1 track and we have to
2113 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2114 if (
2115 len(queue.radio_source) == 1
2116 and queue.radio_source[0].media_type == MediaType.TRACK
2117 and not is_initial_radio_mode
2118 ):
2119 available_base_tracks = queue_track_items
2120 else:
2121 # Grab all the available base tracks based on the selected source items.
2122 # shuffle the source items, just in case
2123 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2124 ctrl = self.mass.music.get_controller(radio_item.media_type)
2125 try:
2126 available_base_tracks += [
2127 track
2128 for track in await ctrl.radio_mode_base_tracks(
2129 radio_item, # type: ignore[arg-type]
2130 preferred_provider_instances,
2131 )
2132 # Avoid duplicate base tracks
2133 if track not in available_base_tracks
2134 ]
2135 except UnsupportedFeaturedException as err:
2136 self.logger.debug(
2137 "Skip loading radio items for %s: %s ",
2138 radio_item.uri,
2139 str(err),
2140 )
2141 if not available_base_tracks:
2142 raise UnsupportedFeaturedException("Radio mode not available for source items")
2143
2144 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2145 base_tracks = random.sample(
2146 available_base_tracks,
2147 min(base_track_sample_size, len(available_base_tracks)),
2148 )
2149 # Use a set to avoid duplicate dynamic tracks
2150 dynamic_tracks: set[Track] = set()
2151 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2152 for allow_lookup in (False, True):
2153 if dynamic_tracks:
2154 break
2155 for base_track in base_tracks:
2156 try:
2157 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2158 base_track.item_id,
2159 base_track.provider,
2160 allow_lookup=allow_lookup,
2161 preferred_provider_instances=preferred_provider_instances,
2162 )
2163 except MediaNotFoundError:
2164 # Some providers don't have similar tracks for all items. For example,
2165 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2166 # in that case, just skip the track.
2167 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2168 continue
2169 for track in _similar_tracks:
2170 if (
2171 track not in base_tracks
2172 # Exclude tracks we have already played / queued
2173 and track not in queue_track_items
2174 # Ignore tracks that are too long for radio mode, e.g. mixes
2175 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2176 ):
2177 dynamic_tracks.add(track)
2178 if len(dynamic_tracks) >= 50:
2179 break
2180 queue_tracks: list[Track] = []
2181 dynamic_tracks_list = list(dynamic_tracks)
2182 # Only include the sampled base tracks when the radio mode is first initialized
2183 if is_initial_radio_mode:
2184 queue_tracks += [base_tracks[0]]
2185 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2186 if len(base_tracks) > 1:
2187 for base_track in base_tracks[1:]:
2188 queue_tracks += [base_track]
2189 if len(dynamic_tracks_list) > 2:
2190 queue_tracks += random.sample(dynamic_tracks_list, 2)
2191 else:
2192 queue_tracks += dynamic_tracks_list
2193 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2194 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2195 if remaining_dynamic_tracks:
2196 queue_tracks += random.sample(
2197 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2198 )
2199 return queue_tracks
2200
2201 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2202 """Fetch (playable) tracks for given browse folder."""
2203 self.logger.info(
2204 "Fetching tracks to play for folder %s",
2205 folder.name,
2206 )
2207 tracks: list[Track] = []
2208 for item in await self.mass.music.browse(folder.path):
2209 if not item.is_playable:
2210 continue
2211 # recursively fetch tracks from all media types
2212 resolved = await self._resolve_media_items(item)
2213 tracks += [x for x in resolved if isinstance(x, Track)]
2214
2215 return tracks
2216
2217 def _update_queue_from_player(
2218 self,
2219 player: Player,
2220 ) -> None:
2221 """Update the Queue when the player state changed."""
2222 queue_id = player.player_id
2223 queue = self._queues[queue_id]
2224
2225 # basic properties
2226 queue.display_name = player.state.name
2227 queue.available = player.state.available
2228 queue.items = len(self._queue_items[queue_id])
2229
2230 queue.state = (
2231 player.state.playback_state or PlaybackState.IDLE
2232 if queue.active
2233 else PlaybackState.IDLE
2234 )
2235 # update current item/index from player report
2236 if queue.active and queue.state in (
2237 PlaybackState.PLAYING,
2238 PlaybackState.PAUSED,
2239 ):
2240 # NOTE: If the queue is not playing (yet) we will not update the current index
2241 # to ensure we keep the previously known current index
2242 if queue.flow_mode:
2243 # flow mode active, the player is playing one long stream
2244 # so we need to calculate the current index and elapsed time
2245 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2246 elif item_id := self._parse_player_current_item_id(queue_id, player):
2247 # normal mode, the player itself will report the current item
2248 elapsed_time = int(player.state.corrected_elapsed_time or 0)
2249 current_index = self.index_by_id(queue_id, item_id)
2250 else:
2251 # this may happen if the player is still transitioning between tracks
2252 # we ignore this for now and keep the current index as is
2253 return
2254
2255 # get current/next item based on current index
2256 queue.current_index = current_index
2257 queue.current_item = current_item = self.get_item(queue_id, current_index)
2258 queue.next_item = (
2259 self.get_next_item(queue_id, current_index)
2260 if current_item and current_index is not None
2261 else None
2262 )
2263
2264 # correct elapsed time when seeking
2265 if (
2266 not queue.flow_mode
2267 and current_item
2268 and current_item.streamdetails
2269 and current_item.streamdetails.seek_position
2270 ):
2271 elapsed_time += current_item.streamdetails.seek_position
2272 queue.elapsed_time = elapsed_time
2273 queue.elapsed_time_last_updated = time.time()
2274
2275 elif not queue.current_item and queue.current_index is not None:
2276 current_index = queue.current_index
2277 queue.current_item = current_item = self.get_item(queue_id, current_index)
2278 queue.next_item = (
2279 self.get_next_item(queue_id, current_index)
2280 if current_item and current_index is not None
2281 else None
2282 )
2283
2284 # This is enough to detect any changes in the DSPDetails
2285 # (so child count changed, or any output format changed)
2286 output_formats = []
2287 if output_format := player.extra_data.get("output_format"):
2288 output_formats.append(str(output_format))
2289 for child_id in player.state.group_members:
2290 if (child := self.mass.players.get_player(child_id)) and (
2291 output_format := child.extra_data.get("output_format")
2292 ):
2293 output_formats.append(str(output_format))
2294 else:
2295 output_formats.append("unknown")
2296
2297 # basic throttle: do not send state changed events if queue did not actually change
2298 prev_state: CompareState = self._prev_states.get(
2299 queue_id,
2300 CompareState(
2301 queue_id=queue_id,
2302 state=PlaybackState.IDLE,
2303 current_item_id=None,
2304 next_item_id=None,
2305 current_item=None,
2306 elapsed_time=0,
2307 last_playing_elapsed_time=0,
2308 stream_title=None,
2309 codec_type=None,
2310 output_formats=None,
2311 ),
2312 )
2313 # update last_playing_elapsed_time only when the player is actively playing
2314 # use corrected_elapsed_time which accounts for time since last update
2315 # this preserves the last known elapsed time when transitioning to idle/paused
2316 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2317 prev_item_id = prev_state["current_item_id"]
2318 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2319 if queue.state == PlaybackState.PLAYING:
2320 current_elapsed = int(queue.corrected_elapsed_time)
2321 if current_item_id != prev_item_id:
2322 # new track started, reset the elapsed time tracker
2323 last_playing_elapsed_time = current_elapsed
2324 else:
2325 # same track, use the max of current and previous to handle timing issues
2326 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2327 else:
2328 last_playing_elapsed_time = prev_playing_elapsed
2329 new_state = CompareState(
2330 queue_id=queue_id,
2331 state=queue.state,
2332 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2333 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2334 current_item=queue.current_item,
2335 elapsed_time=int(queue.elapsed_time),
2336 last_playing_elapsed_time=last_playing_elapsed_time,
2337 stream_title=(
2338 queue.current_item.streamdetails.stream_title
2339 if queue.current_item and queue.current_item.streamdetails
2340 else None
2341 ),
2342 codec_type=(
2343 queue.current_item.streamdetails.audio_format.codec_type
2344 if queue.current_item and queue.current_item.streamdetails
2345 else None
2346 ),
2347 output_formats=output_formats,
2348 )
2349 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2350 with suppress(KeyError):
2351 changed_keys.remove("next_item_id")
2352 with suppress(KeyError):
2353 changed_keys.remove("last_playing_elapsed_time")
2354
2355 # store the new state
2356 if queue.active:
2357 self._prev_states[queue_id] = new_state
2358 else:
2359 self._prev_states.pop(queue_id, None)
2360
2361 # return early if nothing changed
2362 if len(changed_keys) == 0:
2363 return
2364
2365 # signal update and store state
2366 send_update = True
2367 if changed_keys == {"elapsed_time"}:
2368 # only elapsed time changed, do not send full queue update
2369 send_update = False
2370 prev_time = prev_state.get("elapsed_time") or 0
2371 cur_time = new_state.get("elapsed_time") or 0
2372 if abs(cur_time - prev_time) > 2:
2373 # send dedicated event for time updates when seeking
2374 self.mass.signal_event(
2375 EventType.QUEUE_TIME_UPDATED,
2376 object_id=queue_id,
2377 data=queue.elapsed_time,
2378 )
2379 # also signal update to the player itself so it can update its current_media
2380 self.mass.players.trigger_player_update(queue_id)
2381
2382 if send_update:
2383 self.signal_update(queue_id)
2384
2385 if "output_formats" in changed_keys:
2386 # refresh DSP details since they may have changed
2387 dsp = get_stream_dsp_details(self.mass, queue_id)
2388 if queue.current_item and queue.current_item.streamdetails:
2389 queue.current_item.streamdetails.dsp = dsp
2390 if queue.next_item and queue.next_item.streamdetails:
2391 queue.next_item.streamdetails.dsp = dsp
2392
2393 # handle updating stream_metadata if needed
2394 if (
2395 queue.current_item
2396 and (streamdetails := queue.current_item.streamdetails)
2397 and streamdetails.stream_metadata_update_callback
2398 and (
2399 streamdetails.stream_metadata_last_updated is None
2400 or (
2401 time.time() - streamdetails.stream_metadata_last_updated
2402 >= streamdetails.stream_metadata_update_interval
2403 )
2404 )
2405 ):
2406 streamdetails.stream_metadata_last_updated = time.time()
2407 self.mass.create_task(
2408 streamdetails.stream_metadata_update_callback(
2409 streamdetails, int(queue.corrected_elapsed_time)
2410 )
2411 )
2412
2413 # handle sending a playback progress report
2414 # we do this every 30 seconds or when the state changes
2415 if (
2416 changed_keys.intersection({"state", "current_item_id"})
2417 or int(queue.elapsed_time) % PLAYBACK_REPORT_INTERVAL_SECONDS == 0
2418 ):
2419 self._handle_playback_progress_report(queue, prev_state, new_state)
2420
2421 # check if we need to clear the queue if we reached the end
2422 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2423 self._handle_end_of_queue(queue, prev_state, new_state)
2424
2425 # watch dynamic radio items refill if needed
2426 if "current_item_id" in changed_keys:
2427 # auto enable radio mode if dont stop the music is enabled
2428 if (
2429 queue.dont_stop_the_music_enabled
2430 and queue.enqueued_media_items
2431 and queue.current_index is not None
2432 and (queue.items - queue.current_index) <= 1
2433 ):
2434 # We have received the last item in the queue and Don't stop the music is enabled
2435 # set the played media item(s) as radio items (which will refill the queue)
2436 # note that this will fail if there are no media items for which we have
2437 # a dynamic radio source.
2438 self.logger.debug(
2439 "End of queue detected and Don't stop the music is enabled for %s"
2440 " - setting enqueued media items as radio source: %s",
2441 queue.display_name,
2442 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2443 )
2444 queue.radio_source = queue.enqueued_media_items
2445 # auto fill radio tracks if less than 5 tracks left in the queue
2446 if (
2447 queue.radio_source
2448 and queue.current_index is not None
2449 and (queue.items - queue.current_index) < 5
2450 ):
2451 task_id = f"fill_radio_tracks_{queue_id}"
2452 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2453
2454 def _get_flow_queue_stream_index(
2455 self, queue: PlayerQueue, player: Player
2456 ) -> tuple[int | None, int]:
2457 """Calculate current queue index and current track elapsed time when flow mode is active."""
2458 elapsed_time_queue_total = player.state.corrected_elapsed_time or 0
2459 if queue.current_index is None and not queue.flow_mode_stream_log:
2460 return queue.current_index, int(queue.elapsed_time)
2461
2462 # For each track that has been streamed/buffered to the player,
2463 # a playlog entry will be created with the queue item id
2464 # and the amount of seconds streamed. We traverse the playlog to figure
2465 # out where we are in the queue, accounting for actual streamed
2466 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2467 # it will simply be in the playlog multiple times.
2468 played_time = 0.0
2469 queue_index: int | None = queue.current_index or 0
2470 track_time = 0.0
2471 for play_log_entry in queue.flow_mode_stream_log:
2472 queue_item_duration = (
2473 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2474 play_log_entry.seconds_streamed
2475 if play_log_entry.seconds_streamed is not None
2476 else play_log_entry.duration or 3600 * 24 * 7
2477 )
2478 if elapsed_time_queue_total > (queue_item_duration + played_time):
2479 # total elapsed time is more than (streamed) track duration
2480 # this track has been fully played, move on.
2481 played_time += queue_item_duration
2482 else:
2483 # no more seconds left to divide, this is our track
2484 # account for any seeking by adding the skipped/seeked seconds
2485 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2486 queue_item = self.get_item(queue.queue_id, queue_index)
2487 if queue_item and queue_item.streamdetails:
2488 track_sec_skipped = queue_item.streamdetails.seek_position
2489 else:
2490 track_sec_skipped = 0
2491 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2492 break
2493 if player.state.playback_state != PlaybackState.PLAYING:
2494 # if the player is not playing, we can't be sure that the elapsed time is correct
2495 # so we just return the queue index and the elapsed time
2496 return queue.current_index, int(queue.elapsed_time)
2497 return queue_index, int(track_time)
2498
2499 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2500 """Parse QueueItem ID from Player's current url."""
2501 protocol_player = player
2502 if player.active_output_protocol and player.active_output_protocol != "native":
2503 protocol_player = self.mass.players.get_player(player.active_output_protocol) or player
2504 if not protocol_player.current_media:
2505 # YES, we use player.current_media on purpose here because we need the raw metadata
2506 return None
2507 # prefer queue_id and queue_item_id within the current media
2508 if (
2509 protocol_player.current_media.source_id == queue_id
2510 and protocol_player.current_media.queue_item_id
2511 ):
2512 return protocol_player.current_media.queue_item_id
2513 # special case for sonos players
2514 if protocol_player.current_media.uri and protocol_player.current_media.uri.startswith(
2515 f"mass:{queue_id}"
2516 ):
2517 if protocol_player.current_media.queue_item_id:
2518 return protocol_player.current_media.queue_item_id
2519 current_item_id = protocol_player.current_media.uri.split(":")[-1]
2520 if self.get_item(queue_id, current_item_id):
2521 return current_item_id
2522 return None
2523 # try to extract the item id from a mass stream url
2524 if (
2525 protocol_player.current_media.uri
2526 and queue_id in protocol_player.current_media.uri
2527 and self.mass.streams.base_url in protocol_player.current_media.uri
2528 ):
2529 current_item_id = protocol_player.current_media.uri.rsplit("/")[-1].split(".")[0]
2530 if self.get_item(queue_id, current_item_id):
2531 return current_item_id
2532 # try to extract the item id from a queue_id/item_id combi
2533 if (
2534 protocol_player.current_media.uri
2535 and queue_id in protocol_player.current_media.uri
2536 and "/" in protocol_player.current_media.uri
2537 ):
2538 current_item_id = protocol_player.current_media.uri.split("/")[1]
2539 if self.get_item(queue_id, current_item_id):
2540 return current_item_id
2541
2542 return None
2543
2544 def _handle_end_of_queue(
2545 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2546 ) -> None:
2547 """Check if the queue should be cleared after the current item."""
2548 # check if queue state changed to stopped (from playing/paused to idle)
2549 if not (
2550 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2551 and new_state["state"] == PlaybackState.IDLE
2552 ):
2553 return
2554 # check if no more items in the queue (next_item should be None at end of queue)
2555 if queue.next_item is not None:
2556 return
2557 # check if we had a previous item playing
2558 if prev_state["current_item_id"] is None:
2559 return
2560
2561 async def _clear_queue_delayed() -> None:
2562 for _ in range(5):
2563 await asyncio.sleep(1)
2564 if queue.state != PlaybackState.IDLE:
2565 return
2566 if queue.next_item is not None:
2567 return
2568 self.logger.info("End of queue reached, clearing items")
2569 self.clear(queue.queue_id)
2570
2571 # all checks passed, we stopped playback at the last (or single) track of the queue
2572 # now determine if the item was fully played before clearing
2573
2574 # For flow mode, check if the last track was fully streamed using the stream log
2575 # This is more reliable than elapsed_time which can be reset/incorrect
2576 if queue.flow_mode and queue.flow_mode_stream_log:
2577 last_log_entry = queue.flow_mode_stream_log[-1]
2578 if last_log_entry.seconds_streamed is not None:
2579 # The last track finished streaming, safe to clear queue
2580 self.mass.create_task(_clear_queue_delayed())
2581 return
2582
2583 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2584 prev_item = prev_state["current_item"]
2585 if prev_item and (streamdetails := prev_item.streamdetails):
2586 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2587 elif prev_item:
2588 duration = prev_item.duration or 24 * 3600
2589 else:
2590 # No current item means player has already cleared it, safe to clear queue
2591 self.mass.create_task(_clear_queue_delayed())
2592 return
2593
2594 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2595 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2596 seconds_played = int(prev_state["last_playing_elapsed_time"])
2597 # debounce this a bit to make sure we're not clearing the queue by accident
2598 # only clear if the last track was played to near completion (within 5 seconds of end)
2599 if seconds_played >= (duration or 3600) - 5:
2600 self.mass.create_task(_clear_queue_delayed())
2601
2602 def _handle_playback_progress_report(
2603 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2604 ) -> None:
2605 """Handle playback progress report."""
2606 # detect change in current index to report that a item has been played
2607 prev_item_id = prev_state["current_item_id"]
2608 cur_item_id = new_state["current_item_id"]
2609 if prev_item_id is None and cur_item_id is None:
2610 return
2611
2612 if prev_item_id is not None and prev_item_id != cur_item_id:
2613 # we have a new item, so we need report the previous one
2614 is_current_item = False
2615 item_to_report = prev_state["current_item"]
2616 seconds_played = int(prev_state["elapsed_time"])
2617 else:
2618 # report on current item
2619 is_current_item = True
2620 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2621 seconds_played = int(new_state["elapsed_time"])
2622
2623 if not item_to_report:
2624 return # guard against invalid items
2625
2626 if not (media_item := item_to_report.media_item):
2627 # only report on media items
2628 return
2629 assert media_item.uri is not None # uri is set in __post_init__
2630
2631 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2632 # Ignore items that had a stream error
2633 return
2634
2635 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2636 duration = int(item_to_report.streamdetails.duration)
2637 else:
2638 duration = int(item_to_report.duration or 3 * 3600)
2639
2640 if seconds_played < 5:
2641 # ignore items that have been played less than 5 seconds
2642 # this also filters out a bounce effect where the previous item
2643 # gets reported with 0 elapsed seconds after a new item starts playing
2644 return
2645
2646 # determine if item is fully played
2647 # for podcasts and audiobooks we account for the last 60 seconds
2648 percentage_played = percentage(seconds_played, duration)
2649 if not is_current_item and item_to_report.media_type in (
2650 MediaType.AUDIOBOOK,
2651 MediaType.PODCAST_EPISODE,
2652 ):
2653 fully_played = seconds_played >= duration - 60
2654 elif not is_current_item:
2655 # 90% of the track must be played to be considered fully played
2656 fully_played = percentage_played >= 90
2657 else:
2658 fully_played = seconds_played >= duration - 10
2659
2660 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2661 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2662 self.logger.debug(
2663 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2664 queue.display_name,
2665 "is playing" if is_playing else "played",
2666 item_to_report.name,
2667 item_to_report.uri,
2668 fully_played,
2669 f"{percentage_played}%",
2670 seconds_played,
2671 duration,
2672 )
2673 # add entry to playlog - this also handles resume of podcasts/audiobooks
2674 self.mass.create_task(
2675 self.mass.music.mark_item_played(
2676 media_item,
2677 fully_played=fully_played,
2678 seconds_played=seconds_played,
2679 is_playing=is_playing,
2680 userid=queue.userid,
2681 queue_id=queue.queue_id,
2682 user_initiated=False,
2683 )
2684 )
2685
2686 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2687 # signal 'media item played' event,
2688 # which is useful for plugins that want to do scrobbling
2689 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2690 artists_names = [a.name for a in artists]
2691 self.mass.signal_event(
2692 EventType.MEDIA_ITEM_PLAYED,
2693 object_id=media_item.uri,
2694 data=MediaItemPlaybackProgressReport(
2695 uri=media_item.uri,
2696 media_type=media_item.media_type,
2697 name=media_item.name,
2698 version=getattr(media_item, "version", None),
2699 artist=(
2700 getattr(media_item, "artist_str", None) or artists_names[0]
2701 if artists_names
2702 else None
2703 ),
2704 artists=artists_names,
2705 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2706 album=album.name if album else None,
2707 album_mbid=album.mbid if album else None,
2708 album_artist=(album.artist_str if isinstance(album, Album) else None),
2709 album_artist_mbids=(
2710 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2711 ),
2712 image_url=(
2713 self.mass.metadata.get_image_url(
2714 item_to_report.media_item.image, prefer_proxy=False
2715 )
2716 if item_to_report.media_item.image
2717 else None
2718 ),
2719 duration=duration,
2720 mbid=(getattr(media_item, "mbid", None)),
2721 seconds_played=seconds_played,
2722 fully_played=fully_played,
2723 is_playing=is_playing,
2724 userid=queue.userid,
2725 ),
2726 )
2727
2728
2729async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2730 """Shuffle queue items, avoiding identical tracks next to each other.
2731
2732 Best-effort approach to prevent the same track from appearing adjacent.
2733 Does a random shuffle first, then makes a limited number of passes to
2734 swap adjacent duplicates with a random item further in the list.
2735
2736 :param items: List of queue items to shuffle.
2737 """
2738 if len(items) <= 2:
2739 return random.sample(items, len(items)) if len(items) == 2 else items
2740
2741 # Start with a random shuffle
2742 shuffled = random.sample(items, len(items))
2743
2744 # Make a few passes to fix adjacent duplicates
2745 max_passes = 3
2746 for _ in range(max_passes):
2747 swapped = False
2748 for i in range(len(shuffled) - 1):
2749 if shuffled[i].name == shuffled[i + 1].name:
2750 # Found adjacent duplicate - swap with random position at least 2 away
2751 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2752 if swap_candidates:
2753 swap_pos = random.choice(swap_candidates)
2754 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2755 swapped = True
2756 if not swapped:
2757 break
2758 # Yield to event loop between passes
2759 await asyncio.sleep(0)
2760
2761 return shuffled
2762