/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 ProviderFeature,
32 QueueOption,
33 RepeatMode,
34)
35from music_assistant_models.errors import (
36 AudioError,
37 InvalidCommand,
38 InvalidDataError,
39 MediaNotFoundError,
40 MusicAssistantError,
41 PlayerUnavailableError,
42 QueueEmpty,
43 UnsupportedFeaturedException,
44)
45from music_assistant_models.media_items import (
46 Album,
47 Artist,
48 Audiobook,
49 BrowseFolder,
50 ItemMapping,
51 MediaItemType,
52 PlayableMediaItemType,
53 Playlist,
54 Podcast,
55 PodcastEpisode,
56 Track,
57 UniqueList,
58 media_from_dict,
59)
60from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
61from music_assistant_models.player_queue import PlayerQueue
62from music_assistant_models.queue_item import QueueItem
63
64from music_assistant.constants import (
65 ATTR_ANNOUNCEMENT_IN_PROGRESS,
66 MASS_LOGO_ONLINE,
67 PLAYLIST_MEDIA_TYPES,
68 VERBOSE_LOG_LEVEL,
69 PlaylistPlayableItem,
70)
71from music_assistant.controllers.players.controller import IN_QUEUE_COMMAND
72from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
73from music_assistant.helpers.api import api_command
74from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
75from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
76from music_assistant.helpers.util import get_changed_keys, percentage
77from music_assistant.models.core_controller import CoreController
78from music_assistant.models.player import Player, PlayerMedia
79
80if TYPE_CHECKING:
81 from collections.abc import Iterator
82
83 from music_assistant_models.auth import User
84 from music_assistant_models.media_items.metadata import MediaItemImage
85
86 from music_assistant import MusicAssistant
87 from music_assistant.models.player import Player
88
89CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
90CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
91
92ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
93ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
94
95CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
96CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
97CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
98CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
99CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
100CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
101CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
102CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
103CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
104CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
105RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
106CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
107CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
108
109
110class CompareState(TypedDict):
111 """Simple object where we store the (previous) state of a queue.
112
113 Used for compare actions.
114 """
115
116 queue_id: str
117 state: PlaybackState
118 current_item_id: str | None
119 next_item_id: str | None
120 current_item: QueueItem | None
121 elapsed_time: int
122 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
123 # used to determine if a track was fully played when transitioning to idle
124 last_playing_elapsed_time: int
125 stream_title: str | None
126 codec_type: ContentType | None
127 output_formats: list[str] | None
128
129
130class PlayerQueuesController(CoreController):
131 """Controller holding all logic to enqueue music for players."""
132
133 domain: str = "player_queues"
134
135 def __init__(self, mass: MusicAssistant) -> None:
136 """Initialize core controller."""
137 super().__init__(mass)
138 self._queues: dict[str, PlayerQueue] = {}
139 self._queue_items: dict[str, list[QueueItem]] = {}
140 self._prev_states: dict[str, CompareState] = {}
141 self._transitioning_players: set[str] = set()
142 self.manifest.name = "Player Queues controller"
143 self.manifest.description = (
144 "Music Assistant's core controller which manages the queues for all players."
145 )
146 self.manifest.icon = "playlist-music"
147
148 async def close(self) -> None:
149 """Cleanup on exit."""
150 # stop all playback
151 for queue in self.all():
152 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
153 await self.stop(queue.queue_id)
154
155 async def get_config_entries(
156 self,
157 action: str | None = None,
158 values: dict[str, ConfigValueType] | None = None,
159 ) -> tuple[ConfigEntry, ...]:
160 """Return all Config Entries for this core module (if any)."""
161 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
162 return (
163 ConfigEntry(
164 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
165 type=ConfigEntryType.STRING,
166 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
167 label="Items to select when you play a (in-library) artist.",
168 options=[
169 ConfigValueOption(
170 title="Only in-library tracks",
171 value="library_tracks",
172 ),
173 ConfigValueOption(
174 title="All tracks from all albums in the library",
175 value="library_album_tracks",
176 ),
177 ConfigValueOption(
178 title="All (top) tracks from (all) streaming provider(s)",
179 value="all_tracks",
180 ),
181 ConfigValueOption(
182 title="All tracks from all albums from (all) streaming provider(s)",
183 value="all_album_tracks",
184 ),
185 ],
186 ),
187 ConfigEntry(
188 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
189 type=ConfigEntryType.STRING,
190 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
191 label="Items to select when you play a (in-library) album.",
192 options=[
193 ConfigValueOption(
194 title="Only in-library tracks",
195 value="library_tracks",
196 ),
197 ConfigValueOption(
198 title="All tracks for album on (streaming) provider",
199 value="all_tracks",
200 ),
201 ],
202 ),
203 ConfigEntry(
204 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
205 type=ConfigEntryType.STRING,
206 default_value=QueueOption.REPLACE.value,
207 label="Default enqueue option for Artist item(s).",
208 options=enqueue_options,
209 description="Define the default enqueue action for this mediatype.",
210 ),
211 ConfigEntry(
212 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
213 type=ConfigEntryType.STRING,
214 default_value=QueueOption.REPLACE.value,
215 label="Default enqueue option for Album item(s).",
216 options=enqueue_options,
217 description="Define the default enqueue action for this mediatype.",
218 ),
219 ConfigEntry(
220 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
221 type=ConfigEntryType.STRING,
222 default_value=QueueOption.PLAY.value,
223 label="Default enqueue option for Track item(s).",
224 options=enqueue_options,
225 description="Define the default enqueue action for this mediatype.",
226 ),
227 ConfigEntry(
228 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
229 type=ConfigEntryType.STRING,
230 default_value=QueueOption.REPLACE.value,
231 label="Default enqueue option for Radio item(s).",
232 options=enqueue_options,
233 description="Define the default enqueue action for this mediatype.",
234 ),
235 ConfigEntry(
236 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
237 type=ConfigEntryType.STRING,
238 default_value=QueueOption.REPLACE.value,
239 label="Default enqueue option for Playlist item(s).",
240 options=enqueue_options,
241 description="Define the default enqueue action for this mediatype.",
242 ),
243 ConfigEntry(
244 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
245 type=ConfigEntryType.STRING,
246 default_value=QueueOption.REPLACE.value,
247 label="Default enqueue option for Audiobook item(s).",
248 options=enqueue_options,
249 hidden=True,
250 ),
251 ConfigEntry(
252 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
253 type=ConfigEntryType.STRING,
254 default_value=QueueOption.REPLACE.value,
255 label="Default enqueue option for Podcast item(s).",
256 options=enqueue_options,
257 hidden=True,
258 ),
259 ConfigEntry(
260 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
261 type=ConfigEntryType.STRING,
262 default_value=QueueOption.REPLACE.value,
263 label="Default enqueue option for Podcast-episode item(s).",
264 options=enqueue_options,
265 hidden=True,
266 ),
267 ConfigEntry(
268 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
269 type=ConfigEntryType.STRING,
270 default_value=QueueOption.REPLACE.value,
271 label="Default enqueue option for Folder item(s).",
272 options=enqueue_options,
273 hidden=True,
274 ),
275 )
276
277 def __iter__(self) -> Iterator[PlayerQueue]:
278 """Iterate over (available) players."""
279 return iter(self._queues.values())
280
281 @api_command("player_queues/all")
282 def all(self) -> tuple[PlayerQueue, ...]:
283 """Return all registered PlayerQueues."""
284 return tuple(self._queues.values())
285
286 @api_command("player_queues/get")
287 def get(self, queue_id: str) -> PlayerQueue | None:
288 """Return PlayerQueue by queue_id or None if not found."""
289 return self._queues.get(queue_id)
290
291 @api_command("player_queues/items")
292 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
293 """Return all QueueItems for given PlayerQueue."""
294 if queue_id not in self._queue_items:
295 return []
296
297 return self._queue_items[queue_id][offset : offset + limit]
298
299 @api_command("player_queues/get_active_queue")
300 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
301 """Return the current active/synced queue for a player."""
302 if player := self.mass.players.get_player(player_id):
303 return self.mass.players.get_active_queue(player)
304 return None
305
306 # Queue commands
307
308 @api_command("player_queues/shuffle")
309 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
310 """Configure shuffle setting on the the queue."""
311 queue = self._queues[queue_id]
312 if queue.shuffle_enabled == shuffle_enabled:
313 return # no change
314 queue.shuffle_enabled = shuffle_enabled
315 queue_items = self._queue_items[queue_id]
316 cur_index = queue.index_in_buffer or queue.current_index
317 if cur_index is not None:
318 next_index = cur_index + 1
319 next_items = queue_items[next_index:]
320 else:
321 next_items = []
322 next_index = 0
323 if not shuffle_enabled:
324 # shuffle disabled, try to restore original sort order of the remaining items
325 next_items.sort(key=lambda x: x.sort_index, reverse=False)
326 await self.load(
327 queue_id=queue_id,
328 queue_items=next_items,
329 insert_at_index=next_index,
330 keep_remaining=False,
331 shuffle=shuffle_enabled,
332 )
333
334 @api_command("player_queues/dont_stop_the_music")
335 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
336 """Configure Don't stop the music setting on the queue."""
337 providers_available_with_similar_tracks = any(
338 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
339 for provider in self.mass.music.providers
340 )
341 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
342 raise UnsupportedFeaturedException(
343 "Don't stop the music is not supported by any of the available music providers"
344 )
345 queue = self._queues[queue_id]
346 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
347 self.signal_update(queue_id=queue_id)
348 # if this happens to be the last track in the queue, fill the radio source
349 if (
350 queue.dont_stop_the_music_enabled
351 and queue.enqueued_media_items
352 and queue.current_index is not None
353 and (queue.items - queue.current_index) <= 1
354 ):
355 queue.radio_source = queue.enqueued_media_items
356 task_id = f"fill_radio_tracks_{queue_id}"
357 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
358
359 @api_command("player_queues/repeat")
360 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
361 """Configure repeat setting on the the queue."""
362 queue = self._queues[queue_id]
363 if queue.repeat_mode == repeat_mode:
364 return # no change
365 queue.repeat_mode = repeat_mode
366 self.signal_update(queue_id)
367 if (
368 queue.state == PlaybackState.PLAYING
369 and queue.index_in_buffer is not None
370 and queue.index_in_buffer == queue.current_index
371 ):
372 # if the queue is playing,
373 # ensure to (re)queue the next track because it might have changed
374 # note that we only do this if the player has loaded the current track
375 # if not, we wait until it has loaded to prevent conflicts
376 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
377 self._enqueue_next_item(queue_id, next_item)
378
379 @api_command("player_queues/play_media")
380 async def play_media(
381 self,
382 queue_id: str,
383 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
384 option: QueueOption | None = None,
385 radio_mode: bool = False,
386 start_item: PlayableMediaItemType | str | None = None,
387 username: str | None = None,
388 ) -> None:
389 """Play media item(s) on the given queue.
390
391 :param queue_id: The queue_id of the queue to play media on.
392 :param media: Media that should be played (MediaItem(s) and/or uri's).
393 :param option: Which enqueue mode to use.
394 :param radio_mode: Enable radio mode for the given item(s).
395 :param start_item: Optional item to start the playlist or album from.
396 :param username: The username of the user requesting the playback.
397 Setting the username allows for overriding the logged-in user
398 to account for playback history per user when the play_media is
399 called from a shared context (like a web hook or automation).
400 """
401 # ruff: noqa: PLR0915
402 # we use a contextvar to bypass the throttler for this asyncio task/context
403 # this makes sure that playback has priority over other requests that may be
404 # happening in the background
405 BYPASS_THROTTLER.set(True)
406 if not (queue := self.get(queue_id)):
407 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
408 # always fetch the underlying player so we can raise early if its not available
409 queue_player = self.mass.players.get_player(queue_id, True)
410 assert queue_player is not None # for type checking
411 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
412 self.logger.warning("Ignore queue command: An announcement is in progress")
413 return
414
415 # save the user requesting the playback
416 playback_user: User | None
417 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
418 playback_user = user
419 else:
420 playback_user = get_current_user()
421 queue.userid = playback_user.user_id if playback_user else None
422
423 # a single item or list of items may be provided
424 media_list = media if isinstance(media, list) else [media]
425
426 # clear queue if needed
427 if option == QueueOption.REPLACE:
428 self.clear(queue_id)
429 # Clear the 'enqueued media item' list when a new queue is requested
430 if option not in (QueueOption.ADD, QueueOption.NEXT):
431 queue.enqueued_media_items.clear()
432
433 media_items: list[MediaItemType] = []
434 radio_source: list[MediaItemType] = []
435 # resolve all media items
436 for item in media_list:
437 try:
438 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
439 media_item: MediaItemType | ItemMapping | BrowseFolder
440 if isinstance(item, str):
441 media_item = await self.mass.music.get_item_by_uri(item)
442 elif isinstance(item, dict): # type: ignore[unreachable]
443 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
444 # converting them to MediaItem objects. The parse_value function in api.py
445 # should handle dict-to-object conversion, but dicts are slipping through
446 # in some cases. This is defensive handling for that parser bug.
447 media_item = media_from_dict(item) # type: ignore[unreachable]
448 self.logger.debug("Converted to: %s", type(media_item))
449 else:
450 # item is MediaItemType | ItemMapping at this point
451 media_item = item
452
453 # Save requested media item to play on the queue so we can use it as a source
454 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
455 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
456 if not isinstance(
457 media_item, (ItemMapping, BrowseFolder)
458 ) and media_item.media_type in (
459 MediaType.TRACK,
460 MediaType.ALBUM,
461 MediaType.PLAYLIST,
462 MediaType.ARTIST,
463 ):
464 queue.enqueued_media_items.append(media_item)
465 if len(queue.enqueued_media_items) > 10:
466 queue.enqueued_media_items.pop(0)
467
468 # handle default enqueue option if needed
469 if option is None:
470 config_value = await self.mass.config.get_core_config_value(
471 self.domain,
472 f"default_enqueue_option_{media_item.media_type.value}",
473 return_type=str,
474 )
475 option = QueueOption(config_value)
476 if option == QueueOption.REPLACE:
477 self.clear(queue_id, skip_stop=True)
478
479 # collect media_items to play
480 if radio_mode:
481 # Type guard for mypy - only add full MediaItemType to radio_source
482 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
483 radio_source.append(media_item)
484 else:
485 # Convert start_item to string URI if needed
486 start_item_uri: str | None = None
487 if isinstance(start_item, str):
488 start_item_uri = start_item
489 elif start_item is not None:
490 start_item_uri = start_item.uri
491 media_items += await self._resolve_media_items(
492 media_item, start_item_uri, queue_id=queue_id
493 )
494
495 except MusicAssistantError as err:
496 # invalid MA uri or item not found error
497 self.logger.warning("Skipping %s: %s", item, str(err))
498
499 # overwrite or append radio source items
500 if option not in (QueueOption.ADD, QueueOption.NEXT):
501 queue.radio_source = radio_source
502 else:
503 queue.radio_source += radio_source
504 # Use collected media items to calculate the radio if radio mode is on
505 if radio_mode:
506 radio_tracks = await self._get_radio_tracks(
507 queue_id=queue_id, is_initial_radio_mode=True
508 )
509 media_items = list(radio_tracks)
510
511 # only add valid/available items
512 queue_items: list[QueueItem] = []
513 for x in media_items:
514 if not x or not x.available:
515 continue
516 queue_items.append(
517 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
518 )
519
520 if not queue_items:
521 raise MediaNotFoundError("No playable items found")
522
523 # load the items into the queue
524 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
525 cur_index = queue.index_in_buffer or queue.current_index or 0
526 else:
527 cur_index = queue.current_index or 0
528 insert_at_index = cur_index + 1
529 # Radio modes are already shuffled in a pattern we would like to keep.
530 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
531
532 # handle replace: clear all items and replace with the new items
533 if option == QueueOption.REPLACE:
534 await self.load(
535 queue_id,
536 queue_items=queue_items,
537 keep_remaining=False,
538 keep_played=False,
539 shuffle=shuffle,
540 )
541 await self.play_index(queue_id, 0)
542 return
543 # handle next: add item(s) in the index next to the playing/loaded/buffered index
544 if option == QueueOption.NEXT:
545 await self.load(
546 queue_id,
547 queue_items=queue_items,
548 insert_at_index=insert_at_index,
549 shuffle=shuffle,
550 )
551 return
552 if option == QueueOption.REPLACE_NEXT:
553 await self.load(
554 queue_id,
555 queue_items=queue_items,
556 insert_at_index=insert_at_index,
557 keep_remaining=False,
558 shuffle=shuffle,
559 )
560 return
561 # handle play: replace current loaded/playing index with new item(s)
562 if option == QueueOption.PLAY:
563 await self.load(
564 queue_id,
565 queue_items=queue_items,
566 insert_at_index=insert_at_index,
567 shuffle=shuffle,
568 )
569 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
570 await self.play_index(queue_id, next_index)
571 return
572 # handle add: add/append item(s) to the remaining queue items
573 if option == QueueOption.ADD:
574 await self.load(
575 queue_id=queue_id,
576 queue_items=queue_items,
577 insert_at_index=insert_at_index
578 if queue.shuffle_enabled
579 else len(self._queue_items[queue_id]) + 1,
580 shuffle=queue.shuffle_enabled,
581 )
582 # handle edgecase, queue is empty and items are only added (not played)
583 # mark first item as new index
584 if queue.current_index is None:
585 queue.current_index = 0
586 queue.current_item = self.get_item(queue_id, 0)
587 queue.items = len(queue_items)
588 self.signal_update(queue_id)
589
590 @api_command("player_queues/move_item")
591 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
592 """
593 Move queue item x up/down the queue.
594
595 - queue_id: id of the queue to process this request.
596 - queue_item_id: the item_id of the queueitem that needs to be moved.
597 - pos_shift: move item x positions down if positive value
598 - pos_shift: move item x positions up if negative value
599 - pos_shift: move item to top of queue as next item if 0.
600 """
601 queue = self._queues[queue_id]
602 item_index = self.index_by_id(queue_id, queue_item_id)
603 if item_index is None:
604 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
605 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
606 msg = f"{item_index} is already played/buffered"
607 raise IndexError(msg)
608
609 queue_items = self._queue_items[queue_id]
610 queue_items = queue_items.copy()
611
612 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
613 new_index = (queue.current_index or 0) + 1
614 elif pos_shift == 0:
615 new_index = queue.current_index or 0
616 else:
617 new_index = item_index + pos_shift
618 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
619 return
620 # move the item in the list
621 queue_items.insert(new_index, queue_items.pop(item_index))
622 self.update_items(queue_id, queue_items)
623
624 @api_command("player_queues/move_item_end")
625 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
626 """
627 Move queue item to the end the queue.
628
629 - queue_id: id of the queue to process this request.
630 - queue_item_id: the item_id of the queueitem that needs to be moved.
631 """
632 queue = self._queues[queue_id]
633 item_index = self.index_by_id(queue_id, queue_item_id)
634 if item_index is None:
635 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
636 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
637 msg = f"{item_index} is already played/buffered"
638 raise IndexError(msg)
639
640 queue_items = self._queue_items[queue_id]
641 if item_index == (len(queue_items) - 1):
642 return
643 queue_items = queue_items.copy()
644
645 new_index = len(self._queue_items[queue_id]) - 1
646
647 # move the item in the list
648 queue_items.insert(new_index, queue_items.pop(item_index))
649 self.update_items(queue_id, queue_items)
650
651 @api_command("player_queues/delete_item")
652 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
653 """Delete item (by id or index) from the queue."""
654 if isinstance(item_id_or_index, str):
655 item_index = self.index_by_id(queue_id, item_id_or_index)
656 if item_index is None:
657 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
658 else:
659 item_index = item_id_or_index
660 queue = self._queues[queue_id]
661 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
662 # ignore request if track already loaded in the buffer
663 # the frontend should guard so this is just in case
664 self.logger.warning("delete requested for item already loaded in buffer")
665 return
666 queue_items = self._queue_items[queue_id]
667 queue_items.pop(item_index)
668 self.update_items(queue_id, queue_items)
669
670 @api_command("player_queues/clear")
671 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
672 """Clear all items in the queue."""
673 queue = self._queues[queue_id]
674 queue.radio_source = []
675 if queue.state != PlaybackState.IDLE and not skip_stop:
676 self.mass.create_task(self.stop(queue_id))
677 queue.current_index = None
678 queue.current_item = None
679 queue.elapsed_time = 0
680 queue.elapsed_time_last_updated = time.time()
681 queue.index_in_buffer = None
682 self.update_items(queue_id, [])
683
684 @api_command("player_queues/save_as_playlist")
685 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
686 """Save the current queue items as a new playlist.
687
688 :param queue_id: The queue_id of the queue to save.
689 :param name: The name for the new playlist.
690 """
691 if not self.get(queue_id):
692 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
693 queue_items = self._queue_items.get(queue_id, [])
694 if not queue_items:
695 raise QueueEmpty("Cannot save an empty queue as a playlist.")
696 # collect URIs from queue items that are playlist-compatible
697 uris: list[str] = []
698 for item in queue_items:
699 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
700 uris.append(item.uri)
701 if not uris:
702 raise InvalidDataError("No valid items in queue to save as playlist.")
703 playlist = await self.mass.music.playlists.create_playlist(name)
704 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
705 return playlist
706
707 @api_command("player_queues/stop")
708 async def stop(self, queue_id: str) -> None:
709 """
710 Handle STOP command for given queue.
711
712 - queue_id: queue_id of the playerqueue to handle the command.
713 """
714 queue_player = self.mass.players.get_player(queue_id, True)
715 if queue_player is None:
716 raise PlayerUnavailableError(f"Player {queue_id} is not available")
717 if (queue := self.get(queue_id)) and queue.active:
718 if queue.state == PlaybackState.PLAYING:
719 queue.resume_pos = int(queue.corrected_elapsed_time)
720 # Set context to prevent circular call, then forward the actual command to the player
721 token = IN_QUEUE_COMMAND.set(True)
722 try:
723 await self.mass.players.cmd_stop(queue_id)
724 finally:
725 IN_QUEUE_COMMAND.reset(token)
726
727 @api_command("player_queues/play")
728 async def play(self, queue_id: str) -> None:
729 """
730 Handle PLAY command for given queue.
731
732 - queue_id: queue_id of the playerqueue to handle the command.
733 """
734 queue_player = self.mass.players.get_player(queue_id, True)
735 if queue_player is None:
736 raise PlayerUnavailableError(f"Player {queue_id} is not available")
737 if (
738 (queue := self._queues.get(queue_id))
739 and queue.active
740 and queue.state == PlaybackState.PAUSED
741 ):
742 # forward the actual play/unpause command to the player
743 await queue_player.play()
744 return
745 # player is not paused, perform resume instead
746 await self.resume(queue_id)
747
748 @api_command("player_queues/pause")
749 async def pause(self, queue_id: str) -> None:
750 """Handle PAUSE command for given queue.
751
752 - queue_id: queue_id of the playerqueue to handle the command.
753 """
754 if not (queue := self._queues.get(queue_id)):
755 return
756 queue_active = queue.active
757 if queue.active and queue.state == PlaybackState.PLAYING:
758 queue.resume_pos = int(queue.corrected_elapsed_time)
759 # forward the actual command to the player controller
760 # Set context to prevent circular call, then forward the actual command to the player
761 token = IN_QUEUE_COMMAND.set(True)
762 try:
763 await self.mass.players.cmd_pause(queue_id)
764 finally:
765 IN_QUEUE_COMMAND.reset(token)
766
767 async def _watch_pause(player: Player) -> None:
768 count = 0
769 # wait for pause
770 while count < 5 and player.state.playback_state == PlaybackState.PLAYING:
771 count += 1
772 await asyncio.sleep(1)
773 # wait for unpause
774 if player.state.playback_state != PlaybackState.PAUSED:
775 return
776 count = 0
777 while count < 30 and player.state.playback_state == PlaybackState.PAUSED:
778 count += 1
779 await asyncio.sleep(1)
780 # if player is still paused when the limit is reached, send stop
781 if player.state.playback_state == PlaybackState.PAUSED:
782 await self.stop(queue_id)
783
784 # we auto stop a player from paused when its paused for 30 seconds
785 if (
786 queue_active
787 and (queue_player := self.mass.players.get_player(queue_id))
788 and not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS)
789 ):
790 self.mass.create_task(_watch_pause(queue_player))
791
792 @api_command("player_queues/play_pause")
793 async def play_pause(self, queue_id: str) -> None:
794 """Toggle play/pause on given playerqueue.
795
796 - queue_id: queue_id of the queue to handle the command.
797 """
798 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
799 await self.pause(queue_id)
800 return
801 await self.play(queue_id)
802
803 @api_command("player_queues/next")
804 async def next(self, queue_id: str) -> None:
805 """Handle NEXT TRACK command for given queue.
806
807 - queue_id: queue_id of the queue to handle the command.
808 """
809 if (queue := self.get(queue_id)) is None or not queue.active:
810 raise InvalidCommand(f"Queue {queue_id} is not active")
811 idx = self._queues[queue_id].current_index
812 if idx is None:
813 self.logger.warning("Queue %s has no current index", queue.display_name)
814 return
815 attempts = 5
816 while attempts:
817 try:
818 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
819 await self.play_index(queue_id, next_index, debounce=True)
820 break
821 except MediaNotFoundError:
822 self.logger.warning(
823 "Failed to fetch next track for queue %s - trying next item",
824 queue.display_name,
825 )
826 idx += 1
827 attempts -= 1
828
829 @api_command("player_queues/previous")
830 async def previous(self, queue_id: str) -> None:
831 """Handle PREVIOUS TRACK command for given queue.
832
833 - queue_id: queue_id of the queue to handle the command.
834 """
835 if (queue := self.get(queue_id)) is None or not queue.active:
836 raise InvalidCommand(f"Queue {queue_id} is not active")
837 current_index = self._queues[queue_id].current_index
838 if current_index is None:
839 return
840 next_index = int(current_index)
841 # restart current track if current track has played longer than 4
842 # otherwise skip to previous track
843 if self._queues[queue_id].elapsed_time < 5:
844 next_index = max(current_index - 1, 0)
845 await self.play_index(queue_id, next_index, debounce=True)
846
847 @api_command("player_queues/skip")
848 async def skip(self, queue_id: str, seconds: int = 10) -> None:
849 """Handle SKIP command for given queue.
850
851 - queue_id: queue_id of the queue to handle the command.
852 - seconds: number of seconds to skip in track. Use negative value to skip back.
853 """
854 if (queue := self.get(queue_id)) is None or not queue.active:
855 raise InvalidCommand(f"Queue {queue_id} is not active")
856 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
857
858 @api_command("player_queues/seek")
859 async def seek(self, queue_id: str, position: int = 10) -> None:
860 """Handle SEEK command for given queue.
861
862 - queue_id: queue_id of the queue to handle the command.
863 - position: position in seconds to seek to in the current playing item.
864 """
865 if (queue := self.get(queue_id)) is None or not queue.active:
866 raise InvalidCommand(f"Queue {queue_id} is not active")
867 queue_player = self.mass.players.get_player(queue_id, True)
868 if queue_player is None:
869 raise PlayerUnavailableError(f"Player {queue_id} is not available")
870 if not queue.current_item:
871 raise InvalidCommand(f"Queue {queue_player.state.name} has no item(s) loaded.")
872 if not queue.current_item.duration:
873 raise InvalidCommand("Can not seek items without duration.")
874 position = max(0, int(position))
875 if position > queue.current_item.duration:
876 raise InvalidCommand("Can not seek outside of duration range.")
877 if queue.current_index is None:
878 raise InvalidCommand(f"Queue {queue_player.state.name} has no current index.")
879 await self.play_index(queue_id, queue.current_index, seek_position=position)
880
881 @api_command("player_queues/resume")
882 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
883 """Handle RESUME command for given queue.
884
885 - queue_id: queue_id of the queue to handle the command.
886 """
887 queue = self._queues[queue_id]
888 queue_items = self._queue_items[queue_id]
889 resume_item = queue.current_item
890 if queue.state == PlaybackState.PLAYING:
891 # resume requested while already playing,
892 # use current position as resume position
893 resume_pos = queue.corrected_elapsed_time
894 fade_in = False
895 else:
896 resume_pos = queue.resume_pos or queue.elapsed_time
897
898 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
899 resume_item = self.get_item(queue_id, queue.current_index)
900 resume_pos = 0
901 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
902 # items available in queue but no previous track, start at 0
903 resume_item = self.get_item(queue_id, 0)
904 resume_pos = 0
905
906 if resume_item is not None:
907 queue_player = self.mass.players.get_player(queue_id)
908 if queue_player is None:
909 raise PlayerUnavailableError(f"Player {queue_id} is not available")
910 if (
911 fade_in is None
912 and queue_player.state.playback_state == PlaybackState.IDLE
913 and (time.time() - queue.elapsed_time_last_updated) > 60
914 ):
915 # enable fade in effect if the player is idle for a while
916 fade_in = resume_pos > 0
917 if resume_item.media_type == MediaType.RADIO:
918 # we're not able to skip in online radio so this is pointless
919 resume_pos = 0
920 await self.play_index(
921 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
922 )
923 else:
924 # Queue is empty, try to resume from playlog
925 if await self._try_resume_from_playlog(queue):
926 return
927 msg = f"Resume queue requested but queue {queue.display_name} is empty"
928 raise QueueEmpty(msg)
929
930 @api_command("player_queues/play_index")
931 async def play_index(
932 self,
933 queue_id: str,
934 index: int | str,
935 seek_position: int = 0,
936 fade_in: bool = False,
937 debounce: bool = False,
938 ) -> None:
939 """Play item at index (or item_id) X in queue."""
940 queue = self._queues[queue_id]
941 queue.resume_pos = 0
942 if isinstance(index, str):
943 temp_index = self.index_by_id(queue_id, index)
944 if temp_index is None:
945 raise InvalidDataError(f"Item {index} not found in queue")
946 index = temp_index
947 # At this point index is guaranteed to be int
948 queue.current_index = index
949 # update current item and elapsed time and signal update
950 # this way the UI knows immediately that a new item is loading
951 queue.current_item = self.get_item(queue_id, index)
952 queue.elapsed_time = seek_position
953 queue.elapsed_time_last_updated = time.time()
954 self.signal_update(queue_id)
955 queue.index_in_buffer = index
956 queue.flow_mode_stream_log = []
957 target_player = self.mass.players.get_player(queue_id)
958 if target_player is None:
959 raise PlayerUnavailableError(f"Player {queue_id} is not available")
960 queue.next_item_id_enqueued = None
961 # always update session id when we start a new playback session
962 queue.session_id = shortuuid.random(length=8)
963 # handle resume point of audiobook(chapter) or podcast(episode)
964 if (
965 not seek_position
966 and (queue_item := self.get_item(queue_id, index))
967 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
968 ):
969 seek_position = max(0, int((resume_position_ms - 500) / 1000))
970
971 # send play_media request to player
972 # NOTE that we debounce this a bit to account for someone hitting the next button
973 # like a madman. This will prevent the player from being overloaded with requests.
974 async def _play_index(index: int, debounce: bool) -> None:
975 for attempt in range(5):
976 try:
977 queue_item = self.get_item(queue_id, index)
978 if not queue_item:
979 continue # guard
980 await self._load_item(
981 queue_item,
982 self._get_next_index(queue_id, index),
983 is_start=True,
984 seek_position=seek_position if attempt == 0 else 0,
985 fade_in=fade_in if attempt == 0 else False,
986 )
987 # if we reach this point, loading the item succeeded, break the loop
988 queue.current_index = index
989 queue.current_item = queue_item
990 break
991 except (MediaNotFoundError, AudioError):
992 # the requested index can not be played.
993 if queue_item:
994 self.logger.warning(
995 "Skipping unplayable item %s (%s)",
996 queue_item.name,
997 queue_item.uri,
998 )
999 queue_item.available = False
1000 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
1001 if next_index is None:
1002 raise MediaNotFoundError("No next item available")
1003 index = next_index
1004 else:
1005 # all attempts to find a playable item failed
1006 raise MediaNotFoundError("No playable item found to start playback")
1007
1008 # work out if we need to use flow mode
1009 flow_mode = target_player.flow_mode and queue_item.media_type not in (
1010 # don't use flow mode for duration-less streams
1011 MediaType.RADIO,
1012 MediaType.PLUGIN_SOURCE,
1013 )
1014 await asyncio.sleep(0.5 if debounce else 0.1)
1015 queue.flow_mode = flow_mode
1016 await self.mass.players.play_media(
1017 player_id=queue_id,
1018 media=await self.player_media_from_queue_item(queue_item, flow_mode),
1019 )
1020 queue.current_index = index
1021 queue.current_item = queue_item
1022 await asyncio.sleep(2)
1023 self._transitioning_players.discard(queue_id)
1024
1025 # we set a flag to notify the update logic that we're transitioning to a new track
1026 self._transitioning_players.add(queue_id)
1027
1028 # we debounce the play_index command to handle the case where someone
1029 # is spamming next/previous on the player
1030 task_id = f"play_index_{queue_id}"
1031 if existing_task := self.mass.get_task(task_id):
1032 existing_task.cancel()
1033 with suppress(asyncio.CancelledError):
1034 await existing_task
1035 task = self.mass.create_task(
1036 _play_index,
1037 index,
1038 debounce,
1039 task_id=task_id,
1040 )
1041 await task
1042 self.signal_update(queue_id)
1043
1044 @api_command("player_queues/transfer")
1045 async def transfer_queue(
1046 self,
1047 source_queue_id: str,
1048 target_queue_id: str,
1049 auto_play: bool | None = None,
1050 ) -> None:
1051 """Transfer queue to another queue."""
1052 if not (source_queue := self.get(source_queue_id)):
1053 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1054 if not (target_queue := self.get(target_queue_id)):
1055 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1056 if auto_play is None:
1057 auto_play = source_queue.state == PlaybackState.PLAYING
1058
1059 target_player = self.mass.players.get_player(target_queue_id)
1060 if target_player is None:
1061 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1062 if target_player.state.active_group or target_player.state.synced_to:
1063 # edge case: the user wants to move playback from the group as a whole, to a single
1064 # player in the group or it is grouped and the command targeted at the single player.
1065 # We need to dissolve the group first.
1066 group_id = target_player.state.active_group or target_player.state.synced_to
1067 assert group_id is not None # checked in if condition above
1068 await self.mass.players.cmd_ungroup(group_id)
1069 await asyncio.sleep(3)
1070
1071 source_items = self._queue_items[source_queue_id]
1072 target_queue.repeat_mode = source_queue.repeat_mode
1073 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1074 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1075 target_queue.radio_source = source_queue.radio_source
1076 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1077 target_queue.resume_pos = int(source_queue.elapsed_time)
1078 target_queue.current_index = source_queue.current_index
1079 if source_queue.current_item:
1080 target_queue.current_item = source_queue.current_item
1081 target_queue.current_item.queue_id = target_queue_id
1082 self.clear(source_queue_id)
1083
1084 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1085 for item in source_items:
1086 item.queue_id = target_queue_id
1087 self.update_items(target_queue_id, source_items)
1088 if auto_play:
1089 await self.resume(target_queue_id)
1090
1091 # Interaction with player
1092
1093 async def on_player_register(self, player: Player) -> None:
1094 """Register PlayerQueue for given player/queue id."""
1095 queue_id = player.player_id
1096 queue: PlayerQueue | None = None
1097 queue_items: list[QueueItem] = []
1098 # try to restore previous state
1099 if prev_state := await self.mass.cache.get(
1100 key=queue_id,
1101 provider=self.domain,
1102 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1103 ):
1104 try:
1105 queue = PlayerQueue.from_dict(prev_state)
1106 prev_items = await self.mass.cache.get(
1107 key=queue_id,
1108 provider=self.domain,
1109 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1110 default=[],
1111 )
1112 queue_items = []
1113 for idx, item_data in enumerate(prev_items):
1114 qi = QueueItem.from_cache(item_data)
1115 if not qi.media_item:
1116 # Skip items with missing media_item - this can happen if
1117 # MA was killed during shutdown while cache was being written
1118 self.logger.debug(
1119 "Skipping queue item %s (index %d) restored from cache "
1120 "without media_item",
1121 qi.name,
1122 idx,
1123 )
1124 continue
1125 queue_items.append(qi)
1126 if queue.enqueued_media_items:
1127 # we need to restore the MediaItem objects for the enqueued media items
1128 # Items from cache may be dicts that need deserialization
1129 restored_enqueued_items: list[MediaItemType] = []
1130 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1131 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1132 )
1133 for item in cached_items:
1134 if isinstance(item, dict):
1135 restored_item = media_from_dict(item)
1136 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1137 else:
1138 restored_enqueued_items.append(item)
1139 queue.enqueued_media_items = restored_enqueued_items
1140 except Exception as err:
1141 self.logger.warning(
1142 "Failed to restore the queue(items) for %s - %s",
1143 player.state.name,
1144 str(err),
1145 )
1146 # Reset to clean state on failure
1147 queue = None
1148 queue_items = []
1149 if queue is None:
1150 queue = PlayerQueue(
1151 queue_id=queue_id,
1152 active=False,
1153 display_name=player.state.name,
1154 available=player.state.available,
1155 dont_stop_the_music_enabled=False,
1156 items=0,
1157 )
1158
1159 self._queues[queue_id] = queue
1160 self._queue_items[queue_id] = queue_items
1161 # always call update to calculate state etc
1162 self.on_player_update(player, {})
1163 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1164
1165 def on_player_update(
1166 self,
1167 player: Player,
1168 changed_values: dict[str, tuple[Any, Any]],
1169 ) -> None:
1170 """
1171 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1172
1173 NOTE: This is called every second if the player is playing.
1174 """
1175 queue_id = player.player_id
1176 if (queue := self._queues.get(queue_id)) is None:
1177 # race condition
1178 return
1179 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1180 # do nothing while the announcement is in progress
1181 return
1182 # determine if this queue is currently active for this player
1183 queue.active = player.state.active_source in (queue.queue_id, None)
1184 if not queue.active and queue_id not in self._prev_states:
1185 queue.state = PlaybackState.IDLE
1186 # return early if the queue is not active and we have no previous state
1187 return
1188 if queue.queue_id in self._transitioning_players:
1189 # we're currently transitioning to a new track,
1190 # ignore updates from the player during this time
1191 return
1192
1193 # queue is active and preflight checks passed, update the queue details
1194 self._update_queue_from_player(player)
1195
1196 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1197 """Call when a player is removed from the registry."""
1198 if permanent:
1199 # if the player is permanently removed, we also remove the cached queue data
1200 self.mass.create_task(
1201 self.mass.cache.delete(
1202 key=player_id,
1203 provider=self.domain,
1204 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1205 )
1206 )
1207 self.mass.create_task(
1208 self.mass.cache.delete(
1209 key=player_id,
1210 provider=self.domain,
1211 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1212 )
1213 )
1214 self._queues.pop(player_id, None)
1215 self._queue_items.pop(player_id, None)
1216
1217 async def load_next_queue_item(
1218 self,
1219 queue_id: str,
1220 current_item_id: str,
1221 ) -> QueueItem:
1222 """
1223 Call when a player wants the next queue item to play.
1224
1225 Raises QueueEmpty if there are no more tracks left.
1226 """
1227 queue = self.get(queue_id)
1228 if not queue:
1229 msg = f"PlayerQueue {queue_id} is not available"
1230 raise PlayerUnavailableError(msg)
1231 cur_index = self.index_by_id(queue_id, current_item_id)
1232 if cur_index is None:
1233 # this is just a guard for bad data
1234 raise QueueEmpty("Invalid item id for queue given.")
1235 next_item: QueueItem | None = None
1236 idx = 0
1237 while True:
1238 next_index = self._get_next_index(queue_id, cur_index + idx)
1239 if next_index is None:
1240 raise QueueEmpty("No more tracks left in the queue.")
1241 queue_item = self.get_item(queue_id, next_index)
1242 if queue_item is None:
1243 raise QueueEmpty("No more tracks left in the queue.")
1244 if idx >= 10:
1245 # we only allow 10 retries to prevent infinite loops
1246 raise QueueEmpty("No more (playable) tracks left in the queue.")
1247 try:
1248 await self._load_item(queue_item, next_index)
1249 # we're all set, this is our next item
1250 next_item = queue_item
1251 break
1252 except (MediaNotFoundError, AudioError):
1253 # No stream details found, skip this QueueItem
1254 self.logger.warning(
1255 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1256 )
1257 queue_item.available = False
1258 idx += 1
1259 if idx != 0:
1260 # we skipped some items, signal a queue items update
1261 self.update_items(queue_id, self._queue_items[queue_id])
1262 if next_item is None:
1263 raise QueueEmpty("No more (playable) tracks left in the queue.")
1264
1265 return next_item
1266
1267 async def _load_item(
1268 self,
1269 queue_item: QueueItem,
1270 next_index: int | None,
1271 is_start: bool = False,
1272 seek_position: int = 0,
1273 fade_in: bool = False,
1274 ) -> None:
1275 """Try to load the stream details for the given queue item."""
1276 queue_id = queue_item.queue_id
1277 queue = self._queues[queue_id]
1278
1279 # we use a contextvar to bypass the throttler for this asyncio task/context
1280 # this makes sure that playback has priority over other requests that may be
1281 # happening in the background
1282 BYPASS_THROTTLER.set(True)
1283
1284 self.logger.debug(
1285 "(pre)loading (next) item for queue %s...",
1286 queue.display_name,
1287 )
1288
1289 if not queue_item.available:
1290 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1291
1292 # work out if we are playing an album and if we should prefer album
1293 # loudness
1294 next_track_from_same_album = (
1295 next_index is not None
1296 and (next_item := self.get_item(queue_id, next_index))
1297 and (
1298 queue_item.media_item
1299 and hasattr(queue_item.media_item, "album")
1300 and queue_item.media_item.album
1301 and next_item.media_item
1302 and hasattr(next_item.media_item, "album")
1303 and next_item.media_item.album
1304 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1305 )
1306 )
1307 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1308 if current_index is None:
1309 previous_track_from_same_album = False
1310 else:
1311 previous_index = max(current_index - 1, 0)
1312 previous_track_from_same_album = (
1313 previous_index > 0
1314 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1315 and previous_item.media_item is not None
1316 and hasattr(previous_item.media_item, "album")
1317 and previous_item.media_item.album is not None
1318 and queue_item.media_item is not None
1319 and hasattr(queue_item.media_item, "album")
1320 and queue_item.media_item.album is not None
1321 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1322 )
1323 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1324 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1325 album = queue_item.media_item.album
1326 # prefer the full library media item so we have all metadata and provider(quality) info
1327 # always request the full library item as there might be other qualities available
1328 if library_item := await self.mass.music.get_library_item_by_prov_id(
1329 queue_item.media_item.media_type,
1330 queue_item.media_item.item_id,
1331 queue_item.media_item.provider,
1332 ):
1333 queue_item.media_item = cast("Track", library_item)
1334 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1335 "ytmusic"
1336 ):
1337 # Youtube Music has poor thumbs by default, so we always fetch the full item
1338 # this also catches the case where they have an unavailable item in a listing
1339 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1340 queue_item.media_item = cast("Track", fetched_item)
1341
1342 # ensure we got the full (original) album set
1343 if album and (
1344 library_album := await self.mass.music.get_library_item_by_prov_id(
1345 album.media_type,
1346 album.item_id,
1347 album.provider,
1348 )
1349 ):
1350 queue_item.media_item.album = cast("Album", library_album)
1351 elif album:
1352 # Restore original album if we have no better alternative from the library
1353 queue_item.media_item.album = album
1354 # prefer album image over track image
1355 if queue_item.media_item.album and queue_item.media_item.album.image:
1356 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1357 queue_item.media_item.metadata.images = UniqueList(
1358 [
1359 queue_item.media_item.album.image,
1360 *org_images,
1361 ]
1362 )
1363 # Fetch the streamdetails, which could raise in case of an unplayable item.
1364 # For example, YT Music returns Radio Items that are not playable.
1365 queue_item.streamdetails = await get_stream_details(
1366 mass=self.mass,
1367 queue_item=queue_item,
1368 seek_position=seek_position,
1369 fade_in=fade_in,
1370 prefer_album_loudness=bool(playing_album_tracks),
1371 )
1372
1373 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1374 """Call when a player has (started) loading a track in the buffer."""
1375 queue = self.get(queue_id)
1376 if not queue:
1377 msg = f"PlayerQueue {queue_id} is not available"
1378 raise PlayerUnavailableError(msg)
1379 # store the index of the item that is currently (being) loaded in the buffer
1380 # which helps us a bit to determine how far the player has buffered ahead
1381 queue.index_in_buffer = self.index_by_id(queue_id, item_id)
1382 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1383 self.signal_update(queue_id)
1384 # preload next streamdetails
1385 self._preload_next_item(queue_id, item_id)
1386
1387 # Main queue manipulation methods
1388
1389 async def load(
1390 self,
1391 queue_id: str,
1392 queue_items: list[QueueItem],
1393 insert_at_index: int = 0,
1394 keep_remaining: bool = True,
1395 keep_played: bool = True,
1396 shuffle: bool = False,
1397 ) -> None:
1398 """Load new items at index.
1399
1400 - queue_id: id of the queue to process this request.
1401 - queue_items: a list of QueueItems
1402 - insert_at_index: insert the item(s) at this index
1403 - keep_remaining: keep the remaining items after the insert
1404 - shuffle: (re)shuffle the items after insert index
1405 """
1406 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1407 next_items = queue_items
1408
1409 # if keep_remaining, append the old 'next' items
1410 if keep_remaining:
1411 next_items += self._queue_items[queue_id][insert_at_index:]
1412
1413 # we set the original insert order as attribute so we can un-shuffle
1414 for index, item in enumerate(next_items):
1415 item.sort_index += insert_at_index + index
1416 # (re)shuffle the final batch if needed
1417 if shuffle:
1418 next_items = await _smart_shuffle(next_items)
1419 self.update_items(queue_id, prev_items + next_items)
1420
1421 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1422 """Update the existing queue items, mostly caused by reordering."""
1423 self._queue_items[queue_id] = queue_items
1424 queue = self._queues[queue_id]
1425 queue.items = len(self._queue_items[queue_id])
1426 # to track if the queue items changed we set a timestamp
1427 # this is a simple way to detect changes in the list of items
1428 # without having to compare the entire list
1429 queue.items_last_updated = time.time()
1430 self.signal_update(queue_id, True)
1431 if (
1432 queue.state == PlaybackState.PLAYING
1433 and queue.index_in_buffer is not None
1434 and queue.index_in_buffer == queue.current_index
1435 ):
1436 # if the queue is playing,
1437 # ensure to (re)queue the next track because it might have changed
1438 # note that we only do this if the player has loaded the current track
1439 # if not, we wait until it has loaded to prevent conflicts
1440 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1441 self._enqueue_next_item(queue_id, next_item)
1442
1443 # Helper methods
1444
1445 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1446 """Get queue item by index or item_id."""
1447 if item_id_or_index is None:
1448 return None
1449 if (queue_items := self._queue_items.get(queue_id)) is None:
1450 return None
1451 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1452 return queue_items[item_id_or_index]
1453 if isinstance(item_id_or_index, str):
1454 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1455 return None
1456
1457 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1458 """Signal state changed of given queue."""
1459 queue = self._queues[queue_id]
1460 if items_changed:
1461 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1462 # save items in cache - only cache items with valid media_item
1463 cache_data = [
1464 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1465 ]
1466 self.mass.create_task(
1467 self.mass.cache.set(
1468 key=queue_id,
1469 data=cache_data,
1470 provider=self.domain,
1471 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1472 )
1473 )
1474 # always send the base event
1475 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1476 # also signal update to the player itself so it can update its current_media
1477 self.mass.players.trigger_player_update(queue_id)
1478 # save state
1479 self.mass.create_task(
1480 self.mass.cache.set(
1481 key=queue_id,
1482 data=queue.to_cache(),
1483 provider=self.domain,
1484 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1485 )
1486 )
1487
1488 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1489 """Get index by queue_item_id."""
1490 queue_items = self._queue_items[queue_id]
1491 for index, item in enumerate(queue_items):
1492 if item.queue_item_id == queue_item_id:
1493 return index
1494 return None
1495
1496 async def player_media_from_queue_item(
1497 self, queue_item: QueueItem, flow_mode: bool
1498 ) -> PlayerMedia:
1499 """Parse PlayerMedia from QueueItem."""
1500 queue = self._queues[queue_item.queue_id]
1501 if flow_mode:
1502 duration = None
1503 elif queue_item.streamdetails:
1504 # prefer netto duration
1505 # when seeking, the player only receives the remaining duration
1506 duration = queue_item.streamdetails.duration or queue_item.duration
1507 if duration and queue_item.streamdetails.seek_position:
1508 duration = duration - queue_item.streamdetails.seek_position
1509 else:
1510 duration = queue_item.duration
1511 if queue.session_id is None:
1512 # handle error or return early
1513 raise InvalidDataError("Queue session_id is None")
1514 media = PlayerMedia(
1515 uri=queue_item.uri,
1516 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1517 title="Music Assistant" if flow_mode else queue_item.name,
1518 image_url=MASS_LOGO_ONLINE,
1519 duration=duration,
1520 source_id=queue_item.queue_id,
1521 queue_item_id=queue_item.queue_item_id,
1522 custom_data={
1523 "session_id": queue.session_id,
1524 "original_uri": queue_item.uri,
1525 "flow_mode": flow_mode,
1526 },
1527 )
1528 if not flow_mode and queue_item.media_item:
1529 media.title = queue_item.media_item.name
1530 media.artist = getattr(queue_item.media_item, "artist_str", "")
1531 media.album = (
1532 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1533 )
1534 if queue_item.image:
1535 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1536 # we prefer the imageproxy on the streamserver here because this request is sent
1537 # to the player itself which may not be able to reach the regular webserver
1538 media.image_url = self.mass.metadata.get_image_url(
1539 queue_item.image, size=500, prefer_stream_server=True
1540 )
1541 return media
1542
1543 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1544 """Return tracks for given artist, based on user preference."""
1545 artist_items_conf = self.mass.config.get_raw_core_config_value(
1546 self.domain,
1547 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1548 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1549 )
1550 self.logger.info(
1551 "Fetching tracks to play for artist %s",
1552 artist.name,
1553 )
1554 if artist_items_conf in ("library_tracks", "all_tracks"):
1555 all_items = await self.mass.music.artists.tracks(
1556 artist.item_id,
1557 artist.provider,
1558 in_library_only=artist_items_conf == "library_tracks",
1559 )
1560 random.shuffle(all_items)
1561 return all_items
1562 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1563 all_tracks: list[Track] = []
1564 for library_album in await self.mass.music.artists.albums(
1565 artist.item_id,
1566 artist.provider,
1567 in_library_only=artist_items_conf == "library_album_tracks",
1568 ):
1569 for album_track in await self.mass.music.albums.tracks(
1570 library_album.item_id, library_album.provider
1571 ):
1572 if album_track not in all_tracks:
1573 all_tracks.append(album_track)
1574 random.shuffle(all_tracks)
1575 return all_tracks
1576 return []
1577
1578 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1579 """Return tracks for given album, based on user preference."""
1580 album_items_conf = self.mass.config.get_raw_core_config_value(
1581 self.domain,
1582 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1583 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1584 )
1585 result: list[Track] = []
1586 start_item_found = False
1587 self.logger.info(
1588 "Fetching tracks to play for album %s",
1589 album.name,
1590 )
1591 for album_track in await self.mass.music.albums.tracks(
1592 item_id=album.item_id,
1593 provider_instance_id_or_domain=album.provider,
1594 in_library_only=album_items_conf == "library_tracks",
1595 ):
1596 if not album_track.available:
1597 continue
1598 if start_item in (album_track.item_id, album_track.uri):
1599 start_item_found = True
1600 if start_item is not None and not start_item_found:
1601 continue
1602 result.append(album_track)
1603 return result
1604
1605 async def get_playlist_tracks(
1606 self, playlist: Playlist, start_item: str | None
1607 ) -> list[PlaylistPlayableItem]:
1608 """Return tracks for given playlist, based on user preference."""
1609 result: list[PlaylistPlayableItem] = []
1610 start_item_found = False
1611 self.logger.info(
1612 "Fetching tracks to play for playlist %s",
1613 playlist.name,
1614 )
1615 # TODO: Handle other sort options etc.
1616 async for playlist_track in self.mass.music.playlists.tracks(
1617 playlist.item_id, playlist.provider
1618 ):
1619 if not playlist_track.available:
1620 continue
1621 if start_item in (playlist_track.item_id, playlist_track.uri):
1622 start_item_found = True
1623 if start_item is not None and not start_item_found:
1624 continue
1625 result.append(playlist_track)
1626 return result
1627
1628 async def get_audiobook_resume_point(
1629 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1630 ) -> int:
1631 """Return resume point (in milliseconds) for given audio book."""
1632 self.logger.debug(
1633 "Fetching resume point to play for audio book %s",
1634 audio_book.name,
1635 )
1636 if chapter is not None:
1637 # user explicitly selected a chapter to play
1638 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1639 if chapters := audio_book.metadata.chapters:
1640 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1641 return int(_chapter.start * 1000)
1642 raise InvalidDataError(
1643 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1644 )
1645 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1646 audio_book, userid=userid
1647 )
1648 return 0 if full_played else resume_position_ms
1649
1650 async def get_next_podcast_episodes(
1651 self,
1652 podcast: Podcast | None,
1653 episode: PodcastEpisode | str | None,
1654 userid: str | None = None,
1655 ) -> UniqueList[PodcastEpisode]:
1656 """Return (next) episode(s) and resume point for given podcast."""
1657 if podcast is None and isinstance(episode, str | NoneType):
1658 raise InvalidDataError("Either podcast or episode must be provided")
1659 if podcast is None:
1660 # single podcast episode requested
1661 assert isinstance(episode, PodcastEpisode) # checked above
1662 self.logger.debug(
1663 "Fetching resume point to play for Podcast episode %s",
1664 episode.name,
1665 )
1666 (
1667 fully_played,
1668 resume_position_ms,
1669 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1670 episode.fully_played = fully_played
1671 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1672 return UniqueList([episode])
1673 # podcast with optional start episode requested
1674 self.logger.debug(
1675 "Fetching episode(s) and resume point to play for Podcast %s",
1676 podcast.name,
1677 )
1678 all_episodes = [
1679 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1680 ]
1681 all_episodes.sort(key=lambda x: x.position)
1682 # if a episode was provided, a user explicitly selected a episode to play
1683 # so we need to find the index of the episode in the list
1684 resolved_episode: PodcastEpisode | None = None
1685 if isinstance(episode, PodcastEpisode):
1686 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1687 if resolved_episode:
1688 # ensure we have accurate resume info
1689 (
1690 fully_played,
1691 resume_position_ms,
1692 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1693 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1694 elif isinstance(episode, str):
1695 resolved_episode = next(
1696 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1697 )
1698 if resolved_episode:
1699 # ensure we have accurate resume info
1700 (
1701 fully_played,
1702 resume_position_ms,
1703 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1704 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1705 else:
1706 # get first episode that is not fully played
1707 for ep in all_episodes:
1708 if ep.fully_played:
1709 continue
1710 # ensure we have accurate resume info
1711 (
1712 fully_played,
1713 resume_position_ms,
1714 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1715 if fully_played:
1716 continue
1717 ep.resume_position_ms = resume_position_ms
1718 resolved_episode = ep
1719 break
1720 else:
1721 # no episodes found that are not fully played, so we start at the beginning
1722 resolved_episode = next((x for x in all_episodes), None)
1723 if resolved_episode is None:
1724 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1725 # get the index of the episode
1726 episode_index = all_episodes.index(resolved_episode)
1727 # return the (remaining) episode(s) to play
1728 return UniqueList(all_episodes[episode_index:])
1729
1730 def _get_next_index(
1731 self,
1732 queue_id: str,
1733 cur_index: int | None,
1734 is_skip: bool = False,
1735 allow_repeat: bool = True,
1736 ) -> int | None:
1737 """
1738 Return the next index for the queue, accounting for repeat settings.
1739
1740 Will return None if there are no (more) items in the queue.
1741 """
1742 queue = self._queues[queue_id]
1743 queue_items = self._queue_items[queue_id]
1744 if not queue_items or cur_index is None:
1745 # queue is empty
1746 return None
1747 # handle repeat single track
1748 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1749 return cur_index if allow_repeat else None
1750 # handle cur_index is last index of the queue
1751 if cur_index >= (len(queue_items) - 1):
1752 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1753 # if repeat all is enabled, we simply start again from the beginning
1754 return 0
1755 return None
1756 # all other: just the next index
1757 return cur_index + 1
1758
1759 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1760 """Return next QueueItem for given queue."""
1761 index: int
1762 if isinstance(cur_index, str):
1763 resolved_index = self.index_by_id(queue_id, cur_index)
1764 if resolved_index is None:
1765 return None # guard
1766 index = resolved_index
1767 else:
1768 index = cur_index
1769 # At this point index is guaranteed to be int
1770 for skip in range(5):
1771 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1772 break
1773 next_item = self.get_item(queue_id, next_index)
1774 if next_item is None:
1775 continue
1776 if not next_item.available:
1777 # ensure that we skip unavailable items (set by load_next track logic)
1778 continue
1779 return next_item
1780 return None
1781
1782 async def _fill_radio_tracks(self, queue_id: str) -> None:
1783 """Fill a Queue with (additional) Radio tracks."""
1784 self.logger.debug(
1785 "Filling radio tracks for queue %s",
1786 queue_id,
1787 )
1788 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1789 # fill queue - filter out unavailable items
1790 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1791 await self.load(
1792 queue_id,
1793 queue_items,
1794 insert_at_index=len(self._queue_items[queue_id]) + 1,
1795 )
1796
1797 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1798 """Enqueue the next item on the player."""
1799 if not next_item:
1800 # no next item, nothing to do...
1801 return
1802
1803 queue = self._queues[queue_id]
1804 if queue.flow_mode:
1805 # ignore this for flow mode
1806 return
1807
1808 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1809 await self.mass.players.enqueue_next_media(
1810 player_id=queue_id,
1811 media=await self.player_media_from_queue_item(next_item, False),
1812 )
1813 if queue.next_item_id_enqueued != next_item.queue_item_id:
1814 queue.next_item_id_enqueued = next_item.queue_item_id
1815 self.logger.debug(
1816 "Enqueued next track %s on queue %s",
1817 next_item.name,
1818 self._queues[queue_id].display_name,
1819 )
1820
1821 task_id = f"enqueue_next_item_{queue_id}"
1822 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1823
1824 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1825 """
1826 Preload the streamdetails for the next item in the queue/buffer.
1827
1828 This basically ensures the item is playable and fetches the stream details.
1829 If an error occurs, the item will be skipped and the next item will be loaded.
1830 """
1831 queue = self._queues[queue_id]
1832
1833 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1834 try:
1835 # wait for the item that was loaded in the buffer is the actually playing item
1836 # this prevents a race condition when we preload the next item too soon
1837 # while the player is actually preloading the previously enqueued item.
1838 retries = 120
1839 while retries > 0:
1840 if not queue.current_item:
1841 return # guard
1842 if queue.current_item.queue_item_id == item_id_in_buffer:
1843 break
1844 retries -= 1
1845 await asyncio.sleep(1)
1846 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1847 self.logger.debug(
1848 "Preloaded next item %s for queue %s",
1849 next_item.name,
1850 queue.display_name,
1851 )
1852 # enqueue the next item on the player
1853 self._enqueue_next_item(queue_id, next_item)
1854
1855 except QueueEmpty:
1856 return
1857
1858 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1859 # this should not happen, but guard anyways
1860 return
1861 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1862 # radio items or no duration, nothing to do
1863 return
1864
1865 task_id = f"preload_next_item_{queue_id}"
1866 self.mass.create_task(
1867 _preload_streamdetails,
1868 item_id_in_buffer,
1869 task_id=task_id,
1870 abort_existing=True,
1871 )
1872
1873 async def _resolve_media_items(
1874 self,
1875 media_item: MediaItemType | ItemMapping | BrowseFolder,
1876 start_item: str | None = None,
1877 userid: str | None = None,
1878 queue_id: str | None = None,
1879 ) -> list[MediaItemType]:
1880 """Resolve/unwrap media items to enqueue."""
1881 # resolve Itemmapping to full media item
1882 if isinstance(media_item, ItemMapping):
1883 if media_item.uri is None:
1884 raise InvalidDataError("ItemMapping has no URI")
1885 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1886 if media_item.media_type == MediaType.PLAYLIST:
1887 media_item = cast("Playlist", media_item)
1888 self.mass.create_task(
1889 self.mass.music.mark_item_played(
1890 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1891 )
1892 )
1893 return list(await self.get_playlist_tracks(media_item, start_item))
1894 if media_item.media_type == MediaType.ARTIST:
1895 media_item = cast("Artist", media_item)
1896 self.mass.create_task(
1897 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1898 )
1899 return list(await self.get_artist_tracks(media_item))
1900 if media_item.media_type == MediaType.ALBUM:
1901 media_item = cast("Album", media_item)
1902 self.mass.create_task(
1903 self.mass.music.mark_item_played(
1904 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1905 )
1906 )
1907 return list(await self.get_album_tracks(media_item, start_item))
1908 if media_item.media_type == MediaType.AUDIOBOOK:
1909 media_item = cast("Audiobook", media_item)
1910 # ensure we grab the correct/latest resume point info
1911 media_item.resume_position_ms = await self.get_audiobook_resume_point(
1912 media_item, start_item, userid=userid
1913 )
1914 return [media_item]
1915 if media_item.media_type == MediaType.PODCAST:
1916 media_item = cast("Podcast", media_item)
1917 self.mass.create_task(
1918 self.mass.music.mark_item_played(
1919 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1920 )
1921 )
1922 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
1923 if media_item.media_type == MediaType.PODCAST_EPISODE:
1924 media_item = cast("PodcastEpisode", media_item)
1925 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
1926 if media_item.media_type == MediaType.FOLDER:
1927 media_item = cast("BrowseFolder", media_item)
1928 return list(await self._get_folder_tracks(media_item))
1929 # all other: single track or radio item
1930 return [cast("MediaItemType", media_item)]
1931
1932 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
1933 """Try to resume playback from playlog when queue is empty.
1934
1935 Attempts to find user-initiated recently played items in the following order:
1936 1. By userid AND queue_id
1937 2. By queue_id only
1938 3. By userid only (if available)
1939 4. Any recently played item
1940
1941 :param queue: The queue to resume playback on.
1942 :return: True if playback was started, False otherwise.
1943 """
1944 # Try different filter combinations in order of specificity
1945 filter_attempts: list[tuple[str | None, str | None, str]] = []
1946 if queue.userid:
1947 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
1948 filter_attempts.append((None, queue.queue_id, "queue_id match"))
1949 if queue.userid:
1950 filter_attempts.append((queue.userid, None, "userid match"))
1951 filter_attempts.append((None, None, "any recent item"))
1952
1953 for userid, queue_id, match_type in filter_attempts:
1954 items = await self.mass.music.recently_played(
1955 limit=5,
1956 fully_played_only=False,
1957 user_initiated_only=True,
1958 userid=userid,
1959 queue_id=queue_id,
1960 )
1961 for item in items:
1962 if not item.uri:
1963 continue
1964 try:
1965 await self.play_media(queue.queue_id, item)
1966 self.logger.info(
1967 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
1968 )
1969 return True
1970 except MusicAssistantError as err:
1971 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
1972 continue
1973
1974 return False
1975
1976 async def _get_radio_tracks(
1977 self, queue_id: str, is_initial_radio_mode: bool = False
1978 ) -> list[Track]:
1979 """Call the registered music providers for dynamic tracks."""
1980 queue = self._queues[queue_id]
1981 queue_track_items: list[Track] = [
1982 q.media_item
1983 for q in self._queue_items[queue_id]
1984 if q.media_item and isinstance(q.media_item, Track)
1985 ]
1986 if not queue.radio_source:
1987 # this may happen during race conditions as this method is called delayed
1988 return []
1989 self.logger.info(
1990 "Fetching radio tracks for queue %s based on: %s",
1991 queue.display_name,
1992 ", ".join([x.name for x in queue.radio_source]),
1993 )
1994
1995 # Get user's preferred provider instances for steering provider selection
1996 preferred_provider_instances: list[str] | None = None
1997 if (
1998 queue.userid
1999 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
2000 and playback_user.provider_filter
2001 ):
2002 preferred_provider_instances = playback_user.provider_filter
2003
2004 available_base_tracks: list[Track] = []
2005 base_track_sample_size = 5
2006 # Some providers have very deterministic similar track algorithms when providing
2007 # a single track item. When we have a radio mode based on 1 track and we have to
2008 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2009 if (
2010 len(queue.radio_source) == 1
2011 and queue.radio_source[0].media_type == MediaType.TRACK
2012 and not is_initial_radio_mode
2013 ):
2014 available_base_tracks = queue_track_items
2015 else:
2016 # Grab all the available base tracks based on the selected source items.
2017 # shuffle the source items, just in case
2018 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2019 ctrl = self.mass.music.get_controller(radio_item.media_type)
2020 try:
2021 available_base_tracks += [
2022 track
2023 for track in await ctrl.radio_mode_base_tracks(
2024 radio_item, # type: ignore[arg-type]
2025 preferred_provider_instances,
2026 )
2027 # Avoid duplicate base tracks
2028 if track not in available_base_tracks
2029 ]
2030 except UnsupportedFeaturedException as err:
2031 self.logger.debug(
2032 "Skip loading radio items for %s: %s ",
2033 radio_item.uri,
2034 str(err),
2035 )
2036 if not available_base_tracks:
2037 raise UnsupportedFeaturedException("Radio mode not available for source items")
2038
2039 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2040 base_tracks = random.sample(
2041 available_base_tracks,
2042 min(base_track_sample_size, len(available_base_tracks)),
2043 )
2044 # Use a set to avoid duplicate dynamic tracks
2045 dynamic_tracks: set[Track] = set()
2046 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2047 for allow_lookup in (False, True):
2048 if dynamic_tracks:
2049 break
2050 for base_track in base_tracks:
2051 try:
2052 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2053 base_track.item_id,
2054 base_track.provider,
2055 allow_lookup=allow_lookup,
2056 preferred_provider_instances=preferred_provider_instances,
2057 )
2058 except MediaNotFoundError:
2059 # Some providers don't have similar tracks for all items. For example,
2060 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2061 # in that case, just skip the track.
2062 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2063 continue
2064 for track in _similar_tracks:
2065 if (
2066 track not in base_tracks
2067 # Exclude tracks we have already played / queued
2068 and track not in queue_track_items
2069 # Ignore tracks that are too long for radio mode, e.g. mixes
2070 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2071 ):
2072 dynamic_tracks.add(track)
2073 if len(dynamic_tracks) >= 50:
2074 break
2075 queue_tracks: list[Track] = []
2076 dynamic_tracks_list = list(dynamic_tracks)
2077 # Only include the sampled base tracks when the radio mode is first initialized
2078 if is_initial_radio_mode:
2079 queue_tracks += [base_tracks[0]]
2080 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2081 if len(base_tracks) > 1:
2082 for base_track in base_tracks[1:]:
2083 queue_tracks += [base_track]
2084 if len(dynamic_tracks_list) > 2:
2085 queue_tracks += random.sample(dynamic_tracks_list, 2)
2086 else:
2087 queue_tracks += dynamic_tracks_list
2088 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2089 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2090 if remaining_dynamic_tracks:
2091 queue_tracks += random.sample(
2092 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2093 )
2094 return queue_tracks
2095
2096 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2097 """Fetch (playable) tracks for given browse folder."""
2098 self.logger.info(
2099 "Fetching tracks to play for folder %s",
2100 folder.name,
2101 )
2102 tracks: list[Track] = []
2103 for item in await self.mass.music.browse(folder.path):
2104 if not item.is_playable:
2105 continue
2106 # recursively fetch tracks from all media types
2107 resolved = await self._resolve_media_items(item)
2108 tracks += [x for x in resolved if isinstance(x, Track)]
2109
2110 return tracks
2111
2112 def _update_queue_from_player(
2113 self,
2114 player: Player,
2115 ) -> None:
2116 """Update the Queue when the player state changed."""
2117 queue_id = player.player_id
2118 queue = self._queues[queue_id]
2119
2120 # basic properties
2121 queue.display_name = player.state.name
2122 queue.available = player.state.available
2123 queue.items = len(self._queue_items[queue_id])
2124
2125 queue.state = (
2126 player.state.playback_state or PlaybackState.IDLE
2127 if queue.active
2128 else PlaybackState.IDLE
2129 )
2130 # update current item/index from player report
2131 if queue.active and queue.state in (
2132 PlaybackState.PLAYING,
2133 PlaybackState.PAUSED,
2134 ):
2135 # NOTE: If the queue is not playing (yet) we will not update the current index
2136 # to ensure we keep the previously known current index
2137 if queue.flow_mode:
2138 # flow mode active, the player is playing one long stream
2139 # so we need to calculate the current index and elapsed time
2140 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2141 elif item_id := self._parse_player_current_item_id(queue_id, player):
2142 # normal mode, the player itself will report the current item
2143 elapsed_time = int(player.state.corrected_elapsed_time or 0)
2144 current_index = self.index_by_id(queue_id, item_id)
2145 else:
2146 # this may happen if the player is still transitioning between tracks
2147 # we ignore this for now and keep the current index as is
2148 return
2149
2150 # get current/next item based on current index
2151 queue.current_index = current_index
2152 queue.current_item = current_item = self.get_item(queue_id, current_index)
2153 queue.next_item = (
2154 self.get_next_item(queue_id, current_index)
2155 if current_item and current_index is not None
2156 else None
2157 )
2158
2159 # correct elapsed time when seeking
2160 if (
2161 not queue.flow_mode
2162 and current_item
2163 and current_item.streamdetails
2164 and current_item.streamdetails.seek_position
2165 ):
2166 elapsed_time += current_item.streamdetails.seek_position
2167 queue.elapsed_time = elapsed_time
2168 queue.elapsed_time_last_updated = time.time()
2169
2170 elif not queue.current_item and queue.current_index is not None:
2171 current_index = queue.current_index
2172 queue.current_item = current_item = self.get_item(queue_id, current_index)
2173 queue.next_item = (
2174 self.get_next_item(queue_id, current_index)
2175 if current_item and current_index is not None
2176 else None
2177 )
2178
2179 # This is enough to detect any changes in the DSPDetails
2180 # (so child count changed, or any output format changed)
2181 output_formats = []
2182 if output_format := player.extra_data.get("output_format"):
2183 output_formats.append(str(output_format))
2184 for child_id in player.state.group_members:
2185 if (child := self.mass.players.get_player(child_id)) and (
2186 output_format := child.extra_data.get("output_format")
2187 ):
2188 output_formats.append(str(output_format))
2189 else:
2190 output_formats.append("unknown")
2191
2192 # basic throttle: do not send state changed events if queue did not actually change
2193 prev_state: CompareState = self._prev_states.get(
2194 queue_id,
2195 CompareState(
2196 queue_id=queue_id,
2197 state=PlaybackState.IDLE,
2198 current_item_id=None,
2199 next_item_id=None,
2200 current_item=None,
2201 elapsed_time=0,
2202 last_playing_elapsed_time=0,
2203 stream_title=None,
2204 codec_type=None,
2205 output_formats=None,
2206 ),
2207 )
2208 # update last_playing_elapsed_time only when the player is actively playing
2209 # use corrected_elapsed_time which accounts for time since last update
2210 # this preserves the last known elapsed time when transitioning to idle/paused
2211 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2212 prev_item_id = prev_state["current_item_id"]
2213 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2214 if queue.state == PlaybackState.PLAYING:
2215 current_elapsed = int(queue.corrected_elapsed_time)
2216 if current_item_id != prev_item_id:
2217 # new track started, reset the elapsed time tracker
2218 last_playing_elapsed_time = current_elapsed
2219 else:
2220 # same track, use the max of current and previous to handle timing issues
2221 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2222 else:
2223 last_playing_elapsed_time = prev_playing_elapsed
2224 new_state = CompareState(
2225 queue_id=queue_id,
2226 state=queue.state,
2227 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2228 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2229 current_item=queue.current_item,
2230 elapsed_time=int(queue.elapsed_time),
2231 last_playing_elapsed_time=last_playing_elapsed_time,
2232 stream_title=(
2233 queue.current_item.streamdetails.stream_title
2234 if queue.current_item and queue.current_item.streamdetails
2235 else None
2236 ),
2237 codec_type=(
2238 queue.current_item.streamdetails.audio_format.codec_type
2239 if queue.current_item and queue.current_item.streamdetails
2240 else None
2241 ),
2242 output_formats=output_formats,
2243 )
2244 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2245 with suppress(KeyError):
2246 changed_keys.remove("next_item_id")
2247 with suppress(KeyError):
2248 changed_keys.remove("last_playing_elapsed_time")
2249
2250 # store the new state
2251 if queue.active:
2252 self._prev_states[queue_id] = new_state
2253 else:
2254 self._prev_states.pop(queue_id, None)
2255
2256 # return early if nothing changed
2257 if len(changed_keys) == 0:
2258 return
2259
2260 # signal update and store state
2261 send_update = True
2262 if changed_keys == {"elapsed_time"}:
2263 # only elapsed time changed, do not send full queue update
2264 send_update = False
2265 prev_time = prev_state.get("elapsed_time") or 0
2266 cur_time = new_state.get("elapsed_time") or 0
2267 if abs(cur_time - prev_time) > 2:
2268 # send dedicated event for time updates when seeking
2269 self.mass.signal_event(
2270 EventType.QUEUE_TIME_UPDATED,
2271 object_id=queue_id,
2272 data=queue.elapsed_time,
2273 )
2274 # also signal update to the player itself so it can update its current_media
2275 self.mass.players.trigger_player_update(queue_id)
2276
2277 if send_update:
2278 self.signal_update(queue_id)
2279
2280 if "output_formats" in changed_keys:
2281 # refresh DSP details since they may have changed
2282 dsp = get_stream_dsp_details(self.mass, queue_id)
2283 if queue.current_item and queue.current_item.streamdetails:
2284 queue.current_item.streamdetails.dsp = dsp
2285 if queue.next_item and queue.next_item.streamdetails:
2286 queue.next_item.streamdetails.dsp = dsp
2287
2288 # handle updating stream_metadata if needed
2289 if (
2290 queue.current_item
2291 and (streamdetails := queue.current_item.streamdetails)
2292 and streamdetails.stream_metadata_update_callback
2293 and (
2294 streamdetails.stream_metadata_last_updated is None
2295 or (
2296 time.time() - streamdetails.stream_metadata_last_updated
2297 >= streamdetails.stream_metadata_update_interval
2298 )
2299 )
2300 ):
2301 streamdetails.stream_metadata_last_updated = time.time()
2302 self.mass.create_task(
2303 streamdetails.stream_metadata_update_callback(
2304 streamdetails, int(queue.corrected_elapsed_time)
2305 )
2306 )
2307
2308 # handle sending a playback progress report
2309 # we do this every 30 seconds or when the state changes
2310 if (
2311 changed_keys.intersection({"state", "current_item_id"})
2312 or int(queue.elapsed_time) % 30 == 0
2313 ):
2314 self._handle_playback_progress_report(queue, prev_state, new_state)
2315
2316 # check if we need to clear the queue if we reached the end
2317 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2318 self._handle_end_of_queue(queue, prev_state, new_state)
2319
2320 # watch dynamic radio items refill if needed
2321 if "current_item_id" in changed_keys:
2322 # auto enable radio mode if dont stop the music is enabled
2323 if (
2324 queue.dont_stop_the_music_enabled
2325 and queue.enqueued_media_items
2326 and queue.current_index is not None
2327 and (queue.items - queue.current_index) <= 1
2328 ):
2329 # We have received the last item in the queue and Don't stop the music is enabled
2330 # set the played media item(s) as radio items (which will refill the queue)
2331 # note that this will fail if there are no media items for which we have
2332 # a dynamic radio source.
2333 self.logger.debug(
2334 "End of queue detected and Don't stop the music is enabled for %s"
2335 " - setting enqueued media items as radio source: %s",
2336 queue.display_name,
2337 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2338 )
2339 queue.radio_source = queue.enqueued_media_items
2340 # auto fill radio tracks if less than 5 tracks left in the queue
2341 if (
2342 queue.radio_source
2343 and queue.current_index is not None
2344 and (queue.items - queue.current_index) < 5
2345 ):
2346 task_id = f"fill_radio_tracks_{queue_id}"
2347 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2348
2349 def _get_flow_queue_stream_index(
2350 self, queue: PlayerQueue, player: Player
2351 ) -> tuple[int | None, int]:
2352 """Calculate current queue index and current track elapsed time when flow mode is active."""
2353 elapsed_time_queue_total = player.state.corrected_elapsed_time or 0
2354 if queue.current_index is None and not queue.flow_mode_stream_log:
2355 return queue.current_index, int(queue.elapsed_time)
2356
2357 # For each track that has been streamed/buffered to the player,
2358 # a playlog entry will be created with the queue item id
2359 # and the amount of seconds streamed. We traverse the playlog to figure
2360 # out where we are in the queue, accounting for actual streamed
2361 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2362 # it will simply be in the playlog multiple times.
2363 played_time = 0.0
2364 queue_index: int | None = queue.current_index or 0
2365 track_time = 0.0
2366 for play_log_entry in queue.flow_mode_stream_log:
2367 queue_item_duration = (
2368 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2369 play_log_entry.seconds_streamed
2370 if play_log_entry.seconds_streamed is not None
2371 else play_log_entry.duration or 3600 * 24 * 7
2372 )
2373 if elapsed_time_queue_total > (queue_item_duration + played_time):
2374 # total elapsed time is more than (streamed) track duration
2375 # this track has been fully played, move on.
2376 played_time += queue_item_duration
2377 else:
2378 # no more seconds left to divide, this is our track
2379 # account for any seeking by adding the skipped/seeked seconds
2380 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2381 queue_item = self.get_item(queue.queue_id, queue_index)
2382 if queue_item and queue_item.streamdetails:
2383 track_sec_skipped = queue_item.streamdetails.seek_position
2384 else:
2385 track_sec_skipped = 0
2386 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2387 break
2388 if player.state.playback_state != PlaybackState.PLAYING:
2389 # if the player is not playing, we can't be sure that the elapsed time is correct
2390 # so we just return the queue index and the elapsed time
2391 return queue.current_index, int(queue.elapsed_time)
2392 return queue_index, int(track_time)
2393
2394 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2395 """Parse QueueItem ID from Player's current url."""
2396 protocol_player = player
2397 if player.active_output_protocol and player.active_output_protocol != "native":
2398 protocol_player = self.mass.players.get_player(player.active_output_protocol) or player
2399 if not protocol_player.current_media:
2400 # YES, we use player.current_media on purpose here because we need the raw metadata
2401 return None
2402 # prefer queue_id and queue_item_id within the current media
2403 if (
2404 protocol_player.current_media.source_id == queue_id
2405 and protocol_player.current_media.queue_item_id
2406 ):
2407 return protocol_player.current_media.queue_item_id
2408 # special case for sonos players
2409 if protocol_player.current_media.uri and protocol_player.current_media.uri.startswith(
2410 f"mass:{queue_id}"
2411 ):
2412 if protocol_player.current_media.queue_item_id:
2413 return protocol_player.current_media.queue_item_id
2414 return protocol_player.current_media.uri.split(":")[-1]
2415 # try to extract the item id from a mass stream url
2416 if (
2417 protocol_player.current_media.uri
2418 and queue_id in protocol_player.current_media.uri
2419 and self.mass.streams.base_url in protocol_player.current_media.uri
2420 ):
2421 current_item_id = protocol_player.current_media.uri.rsplit("/")[-1].split(".")[0]
2422 if self.get_item(queue_id, current_item_id):
2423 return current_item_id
2424 # try to extract the item id from a queue_id/item_id combi
2425 if (
2426 protocol_player.current_media.uri
2427 and queue_id in protocol_player.current_media.uri
2428 and "/" in protocol_player.current_media.uri
2429 ):
2430 current_item_id = protocol_player.current_media.uri.split("/")[1]
2431 if self.get_item(queue_id, current_item_id):
2432 return current_item_id
2433
2434 return None
2435
2436 def _handle_end_of_queue(
2437 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2438 ) -> None:
2439 """Check if the queue should be cleared after the current item."""
2440 # check if queue state changed to stopped (from playing/paused to idle)
2441 if not (
2442 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2443 and new_state["state"] == PlaybackState.IDLE
2444 ):
2445 return
2446 # check if no more items in the queue (next_item should be None at end of queue)
2447 if queue.next_item is not None:
2448 return
2449 # check if we had a previous item playing
2450 if prev_state["current_item_id"] is None:
2451 return
2452
2453 async def _clear_queue_delayed() -> None:
2454 for _ in range(5):
2455 await asyncio.sleep(1)
2456 if queue.state != PlaybackState.IDLE:
2457 return
2458 if queue.next_item is not None:
2459 return
2460 self.logger.info("End of queue reached, clearing items")
2461 self.clear(queue.queue_id)
2462
2463 # all checks passed, we stopped playback at the last (or single) track of the queue
2464 # now determine if the item was fully played before clearing
2465
2466 # For flow mode, check if the last track was fully streamed using the stream log
2467 # This is more reliable than elapsed_time which can be reset/incorrect
2468 if queue.flow_mode and queue.flow_mode_stream_log:
2469 last_log_entry = queue.flow_mode_stream_log[-1]
2470 if last_log_entry.seconds_streamed is not None:
2471 # The last track finished streaming, safe to clear queue
2472 self.mass.create_task(_clear_queue_delayed())
2473 return
2474
2475 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2476 prev_item = prev_state["current_item"]
2477 if prev_item and (streamdetails := prev_item.streamdetails):
2478 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2479 elif prev_item:
2480 duration = prev_item.duration or 24 * 3600
2481 else:
2482 # No current item means player has already cleared it, safe to clear queue
2483 self.mass.create_task(_clear_queue_delayed())
2484 return
2485
2486 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2487 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2488 seconds_played = int(prev_state["last_playing_elapsed_time"])
2489 # debounce this a bit to make sure we're not clearing the queue by accident
2490 # only clear if the last track was played to near completion (within 5 seconds of end)
2491 if seconds_played >= (duration or 3600) - 5:
2492 self.mass.create_task(_clear_queue_delayed())
2493
2494 def _handle_playback_progress_report(
2495 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2496 ) -> None:
2497 """Handle playback progress report."""
2498 # detect change in current index to report that a item has been played
2499 prev_item_id = prev_state["current_item_id"]
2500 cur_item_id = new_state["current_item_id"]
2501 if prev_item_id is None and cur_item_id is None:
2502 return
2503
2504 if prev_item_id is not None and prev_item_id != cur_item_id:
2505 # we have a new item, so we need report the previous one
2506 is_current_item = False
2507 item_to_report = prev_state["current_item"]
2508 seconds_played = int(prev_state["elapsed_time"])
2509 else:
2510 # report on current item
2511 is_current_item = True
2512 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2513 seconds_played = int(new_state["elapsed_time"])
2514
2515 if not item_to_report:
2516 return # guard against invalid items
2517
2518 if not (media_item := item_to_report.media_item):
2519 # only report on media items
2520 return
2521 assert media_item.uri is not None # uri is set in __post_init__
2522
2523 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2524 # Ignore items that had a stream error
2525 return
2526
2527 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2528 duration = int(item_to_report.streamdetails.duration)
2529 else:
2530 duration = int(item_to_report.duration or 3 * 3600)
2531
2532 if seconds_played < 5:
2533 # ignore items that have been played less than 5 seconds
2534 # this also filters out a bounce effect where the previous item
2535 # gets reported with 0 elapsed seconds after a new item starts playing
2536 return
2537
2538 # determine if item is fully played
2539 # for podcasts and audiobooks we account for the last 60 seconds
2540 percentage_played = percentage(seconds_played, duration)
2541 if not is_current_item and item_to_report.media_type in (
2542 MediaType.AUDIOBOOK,
2543 MediaType.PODCAST_EPISODE,
2544 ):
2545 fully_played = seconds_played >= duration - 60
2546 elif not is_current_item:
2547 # 90% of the track must be played to be considered fully played
2548 fully_played = percentage_played >= 90
2549 else:
2550 fully_played = seconds_played >= duration - 10
2551
2552 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2553 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2554 self.logger.debug(
2555 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2556 queue.display_name,
2557 "is playing" if is_playing else "played",
2558 item_to_report.name,
2559 item_to_report.uri,
2560 fully_played,
2561 f"{percentage_played}%",
2562 seconds_played,
2563 duration,
2564 )
2565 # add entry to playlog - this also handles resume of podcasts/audiobooks
2566 self.mass.create_task(
2567 self.mass.music.mark_item_played(
2568 media_item,
2569 fully_played=fully_played,
2570 seconds_played=seconds_played,
2571 is_playing=is_playing,
2572 userid=queue.userid,
2573 queue_id=queue.queue_id,
2574 user_initiated=False,
2575 )
2576 )
2577
2578 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2579 # signal 'media item played' event,
2580 # which is useful for plugins that want to do scrobbling
2581 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2582 artists_names = [a.name for a in artists]
2583 self.mass.signal_event(
2584 EventType.MEDIA_ITEM_PLAYED,
2585 object_id=media_item.uri,
2586 data=MediaItemPlaybackProgressReport(
2587 uri=media_item.uri,
2588 media_type=media_item.media_type,
2589 name=media_item.name,
2590 version=getattr(media_item, "version", None),
2591 artist=(
2592 getattr(media_item, "artist_str", None) or artists_names[0]
2593 if artists_names
2594 else None
2595 ),
2596 artists=artists_names,
2597 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2598 album=album.name if album else None,
2599 album_mbid=album.mbid if album else None,
2600 album_artist=(album.artist_str if isinstance(album, Album) else None),
2601 album_artist_mbids=(
2602 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2603 ),
2604 image_url=(
2605 self.mass.metadata.get_image_url(
2606 item_to_report.media_item.image, prefer_proxy=False
2607 )
2608 if item_to_report.media_item.image
2609 else None
2610 ),
2611 duration=duration,
2612 mbid=(getattr(media_item, "mbid", None)),
2613 seconds_played=seconds_played,
2614 fully_played=fully_played,
2615 is_playing=is_playing,
2616 userid=queue.userid,
2617 ),
2618 )
2619
2620
2621async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2622 """Shuffle queue items, avoiding identical tracks next to each other.
2623
2624 Best-effort approach to prevent the same track from appearing adjacent.
2625 Does a random shuffle first, then makes a limited number of passes to
2626 swap adjacent duplicates with a random item further in the list.
2627
2628 :param items: List of queue items to shuffle.
2629 """
2630 if len(items) <= 2:
2631 return random.sample(items, len(items)) if len(items) == 2 else items
2632
2633 # Start with a random shuffle
2634 shuffled = random.sample(items, len(items))
2635
2636 # Make a few passes to fix adjacent duplicates
2637 max_passes = 3
2638 for _ in range(max_passes):
2639 swapped = False
2640 for i in range(len(shuffled) - 1):
2641 if shuffled[i].name == shuffled[i + 1].name:
2642 # Found adjacent duplicate - swap with random position at least 2 away
2643 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2644 if swap_candidates:
2645 swap_pos = random.choice(swap_candidates)
2646 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2647 swapped = True
2648 if not swapped:
2649 break
2650 # Yield to event loop between passes
2651 await asyncio.sleep(0)
2652
2653 return shuffled
2654