/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 ProviderFeature,
32 QueueOption,
33 RepeatMode,
34)
35from music_assistant_models.errors import (
36 AudioError,
37 InvalidCommand,
38 InvalidDataError,
39 MediaNotFoundError,
40 MusicAssistantError,
41 PlayerUnavailableError,
42 QueueEmpty,
43 UnsupportedFeaturedException,
44)
45from music_assistant_models.media_items import (
46 Album,
47 Artist,
48 Audiobook,
49 BrowseFolder,
50 ItemMapping,
51 MediaItemType,
52 PlayableMediaItemType,
53 Playlist,
54 Podcast,
55 PodcastEpisode,
56 Track,
57 UniqueList,
58 media_from_dict,
59)
60from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
61from music_assistant_models.player_queue import PlayerQueue
62from music_assistant_models.queue_item import QueueItem
63
64from music_assistant.constants import (
65 ATTR_ANNOUNCEMENT_IN_PROGRESS,
66 MASS_LOGO_ONLINE,
67 PLAYLIST_MEDIA_TYPES,
68 VERBOSE_LOG_LEVEL,
69 PlaylistPlayableItem,
70)
71from music_assistant.controllers.players.controller import IN_QUEUE_COMMAND
72from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
73from music_assistant.helpers.api import api_command
74from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
75from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
76from music_assistant.helpers.util import get_changed_keys, percentage
77from music_assistant.models.core_controller import CoreController
78from music_assistant.models.player import Player, PlayerMedia
79
80if TYPE_CHECKING:
81 from collections.abc import Iterator
82
83 from music_assistant_models.auth import User
84 from music_assistant_models.media_items.metadata import MediaItemImage
85
86 from music_assistant import MusicAssistant
87 from music_assistant.models.player import Player
88
89CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
90CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
91
92ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
93ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
94
95CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
96CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
97CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
98CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
99CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
100CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
101CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
102CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
103CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
104CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
105RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
106CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
107CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
108
109
110class CompareState(TypedDict):
111 """Simple object where we store the (previous) state of a queue.
112
113 Used for compare actions.
114 """
115
116 queue_id: str
117 state: PlaybackState
118 current_item_id: str | None
119 next_item_id: str | None
120 current_item: QueueItem | None
121 elapsed_time: int
122 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
123 # used to determine if a track was fully played when transitioning to idle
124 last_playing_elapsed_time: int
125 stream_title: str | None
126 codec_type: ContentType | None
127 output_formats: list[str] | None
128
129
130class PlayerQueuesController(CoreController):
131 """Controller holding all logic to enqueue music for players."""
132
133 domain: str = "player_queues"
134
135 def __init__(self, mass: MusicAssistant) -> None:
136 """Initialize core controller."""
137 super().__init__(mass)
138 self._queues: dict[str, PlayerQueue] = {}
139 self._queue_items: dict[str, list[QueueItem]] = {}
140 self._prev_states: dict[str, CompareState] = {}
141 self._transitioning_players: set[str] = set()
142 self.manifest.name = "Player Queues controller"
143 self.manifest.description = (
144 "Music Assistant's core controller which manages the queues for all players."
145 )
146 self.manifest.icon = "playlist-music"
147
148 async def close(self) -> None:
149 """Cleanup on exit."""
150 # stop all playback
151 for queue in self.all():
152 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
153 await self.stop(queue.queue_id)
154
155 async def get_config_entries(
156 self,
157 action: str | None = None,
158 values: dict[str, ConfigValueType] | None = None,
159 ) -> tuple[ConfigEntry, ...]:
160 """Return all Config Entries for this core module (if any)."""
161 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
162 return (
163 ConfigEntry(
164 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
165 type=ConfigEntryType.STRING,
166 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
167 label="Items to select when you play a (in-library) artist.",
168 options=[
169 ConfigValueOption(
170 title="Only in-library tracks",
171 value="library_tracks",
172 ),
173 ConfigValueOption(
174 title="All tracks from all albums in the library",
175 value="library_album_tracks",
176 ),
177 ConfigValueOption(
178 title="All (top) tracks from (all) streaming provider(s)",
179 value="all_tracks",
180 ),
181 ConfigValueOption(
182 title="All tracks from all albums from (all) streaming provider(s)",
183 value="all_album_tracks",
184 ),
185 ],
186 ),
187 ConfigEntry(
188 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
189 type=ConfigEntryType.STRING,
190 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
191 label="Items to select when you play a (in-library) album.",
192 options=[
193 ConfigValueOption(
194 title="Only in-library tracks",
195 value="library_tracks",
196 ),
197 ConfigValueOption(
198 title="All tracks for album on (streaming) provider",
199 value="all_tracks",
200 ),
201 ],
202 ),
203 ConfigEntry(
204 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
205 type=ConfigEntryType.STRING,
206 default_value=QueueOption.REPLACE.value,
207 label="Default enqueue option for Artist item(s).",
208 options=enqueue_options,
209 description="Define the default enqueue action for this mediatype.",
210 ),
211 ConfigEntry(
212 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
213 type=ConfigEntryType.STRING,
214 default_value=QueueOption.REPLACE.value,
215 label="Default enqueue option for Album item(s).",
216 options=enqueue_options,
217 description="Define the default enqueue action for this mediatype.",
218 ),
219 ConfigEntry(
220 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
221 type=ConfigEntryType.STRING,
222 default_value=QueueOption.PLAY.value,
223 label="Default enqueue option for Track item(s).",
224 options=enqueue_options,
225 description="Define the default enqueue action for this mediatype.",
226 ),
227 ConfigEntry(
228 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
229 type=ConfigEntryType.STRING,
230 default_value=QueueOption.REPLACE.value,
231 label="Default enqueue option for Radio item(s).",
232 options=enqueue_options,
233 description="Define the default enqueue action for this mediatype.",
234 ),
235 ConfigEntry(
236 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
237 type=ConfigEntryType.STRING,
238 default_value=QueueOption.REPLACE.value,
239 label="Default enqueue option for Playlist item(s).",
240 options=enqueue_options,
241 description="Define the default enqueue action for this mediatype.",
242 ),
243 ConfigEntry(
244 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
245 type=ConfigEntryType.STRING,
246 default_value=QueueOption.REPLACE.value,
247 label="Default enqueue option for Audiobook item(s).",
248 options=enqueue_options,
249 hidden=True,
250 ),
251 ConfigEntry(
252 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
253 type=ConfigEntryType.STRING,
254 default_value=QueueOption.REPLACE.value,
255 label="Default enqueue option for Podcast item(s).",
256 options=enqueue_options,
257 hidden=True,
258 ),
259 ConfigEntry(
260 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
261 type=ConfigEntryType.STRING,
262 default_value=QueueOption.REPLACE.value,
263 label="Default enqueue option for Podcast-episode item(s).",
264 options=enqueue_options,
265 hidden=True,
266 ),
267 ConfigEntry(
268 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
269 type=ConfigEntryType.STRING,
270 default_value=QueueOption.REPLACE.value,
271 label="Default enqueue option for Folder item(s).",
272 options=enqueue_options,
273 hidden=True,
274 ),
275 )
276
277 def __iter__(self) -> Iterator[PlayerQueue]:
278 """Iterate over (available) players."""
279 return iter(self._queues.values())
280
281 @api_command("player_queues/all")
282 def all(self) -> tuple[PlayerQueue, ...]:
283 """Return all registered PlayerQueues."""
284 return tuple(self._queues.values())
285
286 @api_command("player_queues/get")
287 def get(self, queue_id: str) -> PlayerQueue | None:
288 """Return PlayerQueue by queue_id or None if not found."""
289 return self._queues.get(queue_id)
290
291 @api_command("player_queues/items")
292 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
293 """Return all QueueItems for given PlayerQueue."""
294 if queue_id not in self._queue_items:
295 return []
296
297 return self._queue_items[queue_id][offset : offset + limit]
298
299 @api_command("player_queues/get_active_queue")
300 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
301 """Return the current active/synced queue for a player."""
302 if player := self.mass.players.get_player(player_id):
303 return self.mass.players.get_active_queue(player)
304 return None
305
306 # Queue commands
307
308 @api_command("player_queues/shuffle")
309 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
310 """Configure shuffle setting on the the queue."""
311 queue = self._queues[queue_id]
312 if queue.shuffle_enabled == shuffle_enabled:
313 return # no change
314 queue.shuffle_enabled = shuffle_enabled
315 queue_items = self._queue_items[queue_id]
316 cur_index = (
317 queue.index_in_buffer if queue.index_in_buffer is not None else queue.current_index
318 )
319 if cur_index is not None:
320 next_index = cur_index + 1
321 next_items = queue_items[next_index:]
322 else:
323 next_items = []
324 next_index = 0
325 if not shuffle_enabled:
326 # shuffle disabled, try to restore original sort order of the remaining items
327 next_items.sort(key=lambda x: x.sort_index, reverse=False)
328 await self.load(
329 queue_id=queue_id,
330 queue_items=next_items,
331 insert_at_index=next_index,
332 keep_remaining=False,
333 shuffle=shuffle_enabled,
334 )
335
336 @api_command("player_queues/dont_stop_the_music")
337 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
338 """Configure Don't stop the music setting on the queue."""
339 providers_available_with_similar_tracks = any(
340 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
341 for provider in self.mass.music.providers
342 )
343 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
344 raise UnsupportedFeaturedException(
345 "Don't stop the music is not supported by any of the available music providers"
346 )
347 queue = self._queues[queue_id]
348 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
349 self.signal_update(queue_id=queue_id)
350 # if this happens to be the last track in the queue, fill the radio source
351 if (
352 queue.dont_stop_the_music_enabled
353 and queue.enqueued_media_items
354 and queue.current_index is not None
355 and (queue.items - queue.current_index) <= 1
356 ):
357 queue.radio_source = queue.enqueued_media_items
358 task_id = f"fill_radio_tracks_{queue_id}"
359 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
360
361 @api_command("player_queues/repeat")
362 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
363 """Configure repeat setting on the the queue."""
364 queue = self._queues[queue_id]
365 if queue.repeat_mode == repeat_mode:
366 return # no change
367 queue.repeat_mode = repeat_mode
368 self.signal_update(queue_id)
369 if (
370 queue.state == PlaybackState.PLAYING
371 and queue.index_in_buffer is not None
372 and queue.index_in_buffer == queue.current_index
373 ):
374 # if the queue is playing,
375 # ensure to (re)queue the next track because it might have changed
376 # note that we only do this if the player has loaded the current track
377 # if not, we wait until it has loaded to prevent conflicts
378 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
379 self._enqueue_next_item(queue_id, next_item)
380
381 @api_command("player_queues/play_media")
382 async def play_media(
383 self,
384 queue_id: str,
385 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
386 option: QueueOption | None = None,
387 radio_mode: bool = False,
388 start_item: PlayableMediaItemType | str | None = None,
389 username: str | None = None,
390 ) -> None:
391 """Play media item(s) on the given queue.
392
393 :param queue_id: The queue_id of the queue to play media on.
394 :param media: Media that should be played (MediaItem(s) and/or uri's).
395 :param option: Which enqueue mode to use.
396 :param radio_mode: Enable radio mode for the given item(s).
397 :param start_item: Optional item to start the playlist or album from.
398 :param username: The username of the user requesting the playback.
399 Setting the username allows for overriding the logged-in user
400 to account for playback history per user when the play_media is
401 called from a shared context (like a web hook or automation).
402 """
403 # ruff: noqa: PLR0915
404 # we use a contextvar to bypass the throttler for this asyncio task/context
405 # this makes sure that playback has priority over other requests that may be
406 # happening in the background
407 BYPASS_THROTTLER.set(True)
408 if not (queue := self.get(queue_id)):
409 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
410 # always fetch the underlying player so we can raise early if its not available
411 queue_player = self.mass.players.get_player(queue_id, True)
412 assert queue_player is not None # for type checking
413 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
414 self.logger.warning("Ignore queue command: An announcement is in progress")
415 return
416
417 # save the user requesting the playback
418 playback_user: User | None
419 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
420 playback_user = user
421 else:
422 playback_user = get_current_user()
423 queue.userid = playback_user.user_id if playback_user else None
424
425 # a single item or list of items may be provided
426 media_list = media if isinstance(media, list) else [media]
427
428 # clear queue if needed
429 if option == QueueOption.REPLACE:
430 self.clear(queue_id)
431 # Clear the 'enqueued media item' list when a new queue is requested
432 if option not in (QueueOption.ADD, QueueOption.NEXT):
433 queue.enqueued_media_items.clear()
434
435 media_items: list[MediaItemType] = []
436 radio_source: list[MediaItemType] = []
437 # resolve all media items
438 for item in media_list:
439 try:
440 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
441 media_item: MediaItemType | ItemMapping | BrowseFolder
442 if isinstance(item, str):
443 media_item = await self.mass.music.get_item_by_uri(item)
444 elif isinstance(item, dict): # type: ignore[unreachable]
445 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
446 # converting them to MediaItem objects. The parse_value function in api.py
447 # should handle dict-to-object conversion, but dicts are slipping through
448 # in some cases. This is defensive handling for that parser bug.
449 media_item = media_from_dict(item) # type: ignore[unreachable]
450 self.logger.debug("Converted to: %s", type(media_item))
451 else:
452 # item is MediaItemType | ItemMapping at this point
453 media_item = item
454
455 # Save requested media item to play on the queue so we can use it as a source
456 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
457 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
458 if not isinstance(
459 media_item, (ItemMapping, BrowseFolder)
460 ) and media_item.media_type in (
461 MediaType.TRACK,
462 MediaType.ALBUM,
463 MediaType.PLAYLIST,
464 MediaType.ARTIST,
465 ):
466 queue.enqueued_media_items.append(media_item)
467 if len(queue.enqueued_media_items) > 10:
468 queue.enqueued_media_items.pop(0)
469
470 # handle default enqueue option if needed
471 if option is None:
472 config_value = await self.mass.config.get_core_config_value(
473 self.domain,
474 f"default_enqueue_option_{media_item.media_type.value}",
475 return_type=str,
476 )
477 option = QueueOption(config_value)
478 if option == QueueOption.REPLACE:
479 self.clear(queue_id, skip_stop=True)
480
481 # collect media_items to play
482 if radio_mode:
483 # Type guard for mypy - only add full MediaItemType to radio_source
484 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
485 radio_source.append(media_item)
486 else:
487 # Convert start_item to string URI if needed
488 start_item_uri: str | None = None
489 if isinstance(start_item, str):
490 start_item_uri = start_item
491 elif start_item is not None:
492 start_item_uri = start_item.uri
493 media_items += await self._resolve_media_items(
494 media_item, start_item_uri, queue_id=queue_id
495 )
496
497 except MusicAssistantError as err:
498 # invalid MA uri or item not found error
499 self.logger.warning("Skipping %s: %s", item, str(err))
500
501 # overwrite or append radio source items
502 if option not in (QueueOption.ADD, QueueOption.NEXT):
503 queue.radio_source = radio_source
504 else:
505 queue.radio_source += radio_source
506 # Use collected media items to calculate the radio if radio mode is on
507 if radio_mode:
508 radio_tracks = await self._get_radio_tracks(
509 queue_id=queue_id, is_initial_radio_mode=True
510 )
511 media_items = list(radio_tracks)
512
513 # only add valid/available items
514 queue_items: list[QueueItem] = []
515 for x in media_items:
516 if not x or not x.available:
517 continue
518 queue_items.append(
519 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
520 )
521
522 if not queue_items:
523 raise MediaNotFoundError("No playable items found")
524
525 # load the items into the queue
526 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
527 cur_index = (
528 queue.index_in_buffer
529 if queue.index_in_buffer is not None
530 else (queue.current_index if queue.current_index is not None else 0)
531 )
532 else:
533 cur_index = queue.current_index or 0
534 insert_at_index = cur_index + 1
535 # Radio modes are already shuffled in a pattern we would like to keep.
536 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
537
538 # handle replace: clear all items and replace with the new items
539 if option == QueueOption.REPLACE:
540 await self.load(
541 queue_id,
542 queue_items=queue_items,
543 keep_remaining=False,
544 keep_played=False,
545 shuffle=shuffle,
546 )
547 await self.play_index(queue_id, 0)
548 return
549 # handle next: add item(s) in the index next to the playing/loaded/buffered index
550 if option == QueueOption.NEXT:
551 await self.load(
552 queue_id,
553 queue_items=queue_items,
554 insert_at_index=insert_at_index,
555 shuffle=shuffle,
556 )
557 return
558 if option == QueueOption.REPLACE_NEXT:
559 await self.load(
560 queue_id,
561 queue_items=queue_items,
562 insert_at_index=insert_at_index,
563 keep_remaining=False,
564 shuffle=shuffle,
565 )
566 return
567 # handle play: replace current loaded/playing index with new item(s)
568 if option == QueueOption.PLAY:
569 await self.load(
570 queue_id,
571 queue_items=queue_items,
572 insert_at_index=insert_at_index,
573 shuffle=shuffle,
574 )
575 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
576 await self.play_index(queue_id, next_index)
577 return
578 # handle add: add/append item(s) to the remaining queue items
579 if option == QueueOption.ADD:
580 await self.load(
581 queue_id=queue_id,
582 queue_items=queue_items,
583 insert_at_index=insert_at_index
584 if queue.shuffle_enabled
585 else len(self._queue_items[queue_id]) + 1,
586 shuffle=queue.shuffle_enabled,
587 )
588 # handle edgecase, queue is empty and items are only added (not played)
589 # mark first item as new index
590 if queue.current_index is None:
591 queue.current_index = 0
592 queue.current_item = self.get_item(queue_id, 0)
593 queue.items = len(queue_items)
594 self.signal_update(queue_id)
595
596 @api_command("player_queues/move_item")
597 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
598 """
599 Move queue item x up/down the queue.
600
601 - queue_id: id of the queue to process this request.
602 - queue_item_id: the item_id of the queueitem that needs to be moved.
603 - pos_shift: move item x positions down if positive value
604 - pos_shift: move item x positions up if negative value
605 - pos_shift: move item to top of queue as next item if 0.
606 """
607 queue = self._queues[queue_id]
608 item_index = self.index_by_id(queue_id, queue_item_id)
609 if item_index is None:
610 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
611 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
612 msg = f"{item_index} is already played/buffered"
613 raise IndexError(msg)
614
615 queue_items = self._queue_items[queue_id]
616 queue_items = queue_items.copy()
617
618 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
619 new_index = (queue.current_index or 0) + 1
620 elif pos_shift == 0:
621 new_index = queue.current_index or 0
622 else:
623 new_index = item_index + pos_shift
624 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
625 return
626 # move the item in the list
627 queue_items.insert(new_index, queue_items.pop(item_index))
628 self.update_items(queue_id, queue_items)
629
630 @api_command("player_queues/move_item_end")
631 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
632 """
633 Move queue item to the end the queue.
634
635 - queue_id: id of the queue to process this request.
636 - queue_item_id: the item_id of the queueitem that needs to be moved.
637 """
638 queue = self._queues[queue_id]
639 item_index = self.index_by_id(queue_id, queue_item_id)
640 if item_index is None:
641 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
642 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
643 msg = f"{item_index} is already played/buffered"
644 raise IndexError(msg)
645
646 queue_items = self._queue_items[queue_id]
647 if item_index == (len(queue_items) - 1):
648 return
649 queue_items = queue_items.copy()
650
651 new_index = len(self._queue_items[queue_id]) - 1
652
653 # move the item in the list
654 queue_items.insert(new_index, queue_items.pop(item_index))
655 self.update_items(queue_id, queue_items)
656
657 @api_command("player_queues/delete_item")
658 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
659 """Delete item (by id or index) from the queue."""
660 if isinstance(item_id_or_index, str):
661 item_index = self.index_by_id(queue_id, item_id_or_index)
662 if item_index is None:
663 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
664 else:
665 item_index = item_id_or_index
666 queue = self._queues[queue_id]
667 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
668 # ignore request if track already loaded in the buffer
669 # the frontend should guard so this is just in case
670 self.logger.warning("delete requested for item already loaded in buffer")
671 return
672 queue_items = self._queue_items[queue_id]
673 queue_items.pop(item_index)
674 self.update_items(queue_id, queue_items)
675
676 @api_command("player_queues/clear")
677 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
678 """Clear all items in the queue."""
679 queue = self._queues[queue_id]
680 queue.radio_source = []
681 if queue.state != PlaybackState.IDLE and not skip_stop:
682 self.mass.create_task(self.stop(queue_id))
683 queue.current_index = None
684 queue.current_item = None
685 queue.elapsed_time = 0
686 queue.elapsed_time_last_updated = time.time()
687 queue.index_in_buffer = None
688 self.update_items(queue_id, [])
689
690 @api_command("player_queues/save_as_playlist")
691 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
692 """Save the current queue items as a new playlist.
693
694 :param queue_id: The queue_id of the queue to save.
695 :param name: The name for the new playlist.
696 """
697 if not self.get(queue_id):
698 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
699 queue_items = self._queue_items.get(queue_id, [])
700 if not queue_items:
701 raise QueueEmpty("Cannot save an empty queue as a playlist.")
702 # collect URIs from queue items that are playlist-compatible
703 uris: list[str] = []
704 for item in queue_items:
705 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
706 uris.append(item.uri)
707 if not uris:
708 raise InvalidDataError("No valid items in queue to save as playlist.")
709 playlist = await self.mass.music.playlists.create_playlist(name)
710 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
711 return playlist
712
713 @api_command("player_queues/stop")
714 async def stop(self, queue_id: str) -> None:
715 """
716 Handle STOP command for given queue.
717
718 - queue_id: queue_id of the playerqueue to handle the command.
719 """
720 queue_player = self.mass.players.get_player(queue_id, True)
721 if queue_player is None:
722 raise PlayerUnavailableError(f"Player {queue_id} is not available")
723 if (queue := self.get(queue_id)) and queue.active:
724 if queue.state == PlaybackState.PLAYING:
725 queue.resume_pos = int(queue.corrected_elapsed_time)
726 # Set context to prevent circular call, then forward the actual command to the player
727 token = IN_QUEUE_COMMAND.set(True)
728 try:
729 await self.mass.players.cmd_stop(queue_id)
730 finally:
731 IN_QUEUE_COMMAND.reset(token)
732
733 @api_command("player_queues/play")
734 async def play(self, queue_id: str) -> None:
735 """
736 Handle PLAY command for given queue.
737
738 - queue_id: queue_id of the playerqueue to handle the command.
739 """
740 queue_player = self.mass.players.get_player(queue_id, True)
741 if queue_player is None:
742 raise PlayerUnavailableError(f"Player {queue_id} is not available")
743 if (
744 (queue := self._queues.get(queue_id))
745 and queue.active
746 and queue.state == PlaybackState.PAUSED
747 ):
748 # forward the actual play/unpause command to the player
749 await queue_player.play()
750 return
751 # player is not paused, perform resume instead
752 await self.resume(queue_id)
753
754 @api_command("player_queues/pause")
755 async def pause(self, queue_id: str) -> None:
756 """Handle PAUSE command for given queue.
757
758 - queue_id: queue_id of the playerqueue to handle the command.
759 """
760 if not (queue := self._queues.get(queue_id)):
761 return
762 queue_active = queue.active
763 if queue.active and queue.state == PlaybackState.PLAYING:
764 queue.resume_pos = int(queue.corrected_elapsed_time)
765 # forward the actual command to the player controller
766 # Set context to prevent circular call, then forward the actual command to the player
767 token = IN_QUEUE_COMMAND.set(True)
768 try:
769 await self.mass.players.cmd_pause(queue_id)
770 finally:
771 IN_QUEUE_COMMAND.reset(token)
772
773 async def _watch_pause(player: Player) -> None:
774 count = 0
775 # wait for pause
776 while count < 5 and player.state.playback_state == PlaybackState.PLAYING:
777 count += 1
778 await asyncio.sleep(1)
779 # wait for unpause
780 if player.state.playback_state != PlaybackState.PAUSED:
781 return
782 count = 0
783 while count < 30 and player.state.playback_state == PlaybackState.PAUSED:
784 count += 1
785 await asyncio.sleep(1)
786 # if player is still paused when the limit is reached, send stop
787 if player.state.playback_state == PlaybackState.PAUSED:
788 await self.stop(queue_id)
789
790 # we auto stop a player from paused when its paused for 30 seconds
791 if (
792 queue_active
793 and (queue_player := self.mass.players.get_player(queue_id))
794 and not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS)
795 ):
796 self.mass.create_task(_watch_pause(queue_player))
797
798 @api_command("player_queues/play_pause")
799 async def play_pause(self, queue_id: str) -> None:
800 """Toggle play/pause on given playerqueue.
801
802 - queue_id: queue_id of the queue to handle the command.
803 """
804 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
805 await self.pause(queue_id)
806 return
807 await self.play(queue_id)
808
809 @api_command("player_queues/next")
810 async def next(self, queue_id: str) -> None:
811 """Handle NEXT TRACK command for given queue.
812
813 - queue_id: queue_id of the queue to handle the command.
814 """
815 if (queue := self.get(queue_id)) is None or not queue.active:
816 raise InvalidCommand(f"Queue {queue_id} is not active")
817 idx = self._queues[queue_id].current_index
818 if idx is None:
819 self.logger.warning("Queue %s has no current index", queue.display_name)
820 return
821 attempts = 5
822 while attempts:
823 try:
824 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
825 await self.play_index(queue_id, next_index, debounce=True)
826 break
827 except MediaNotFoundError:
828 self.logger.warning(
829 "Failed to fetch next track for queue %s - trying next item",
830 queue.display_name,
831 )
832 idx += 1
833 attempts -= 1
834
835 @api_command("player_queues/previous")
836 async def previous(self, queue_id: str) -> None:
837 """Handle PREVIOUS TRACK command for given queue.
838
839 - queue_id: queue_id of the queue to handle the command.
840 """
841 if (queue := self.get(queue_id)) is None or not queue.active:
842 raise InvalidCommand(f"Queue {queue_id} is not active")
843 current_index = self._queues[queue_id].current_index
844 if current_index is None:
845 return
846 next_index = int(current_index)
847 # restart current track if current track has played longer than 4
848 # otherwise skip to previous track
849 if self._queues[queue_id].elapsed_time < 5:
850 next_index = max(current_index - 1, 0)
851 await self.play_index(queue_id, next_index, debounce=True)
852
853 @api_command("player_queues/skip")
854 async def skip(self, queue_id: str, seconds: int = 10) -> None:
855 """Handle SKIP command for given queue.
856
857 - queue_id: queue_id of the queue to handle the command.
858 - seconds: number of seconds to skip in track. Use negative value to skip back.
859 """
860 if (queue := self.get(queue_id)) is None or not queue.active:
861 raise InvalidCommand(f"Queue {queue_id} is not active")
862 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
863
864 @api_command("player_queues/seek")
865 async def seek(self, queue_id: str, position: int = 10) -> None:
866 """Handle SEEK command for given queue.
867
868 - queue_id: queue_id of the queue to handle the command.
869 - position: position in seconds to seek to in the current playing item.
870 """
871 if (queue := self.get(queue_id)) is None or not queue.active:
872 raise InvalidCommand(f"Queue {queue_id} is not active")
873 queue_player = self.mass.players.get_player(queue_id, True)
874 if queue_player is None:
875 raise PlayerUnavailableError(f"Player {queue_id} is not available")
876 if not queue.current_item:
877 raise InvalidCommand(f"Queue {queue_player.state.name} has no item(s) loaded.")
878 if not queue.current_item.duration:
879 raise InvalidCommand("Can not seek items without duration.")
880 position = max(0, int(position))
881 if position > queue.current_item.duration:
882 raise InvalidCommand("Can not seek outside of duration range.")
883 if queue.current_index is None:
884 raise InvalidCommand(f"Queue {queue_player.state.name} has no current index.")
885 await self.play_index(queue_id, queue.current_index, seek_position=position)
886
887 @api_command("player_queues/resume")
888 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
889 """Handle RESUME command for given queue.
890
891 - queue_id: queue_id of the queue to handle the command.
892 """
893 queue = self._queues[queue_id]
894 queue_items = self._queue_items[queue_id]
895 resume_item = queue.current_item
896 if queue.state == PlaybackState.PLAYING:
897 # resume requested while already playing,
898 # use current position as resume position
899 resume_pos = queue.corrected_elapsed_time
900 fade_in = False
901 else:
902 resume_pos = queue.resume_pos or queue.elapsed_time
903
904 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
905 resume_item = self.get_item(queue_id, queue.current_index)
906 resume_pos = 0
907 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
908 # items available in queue but no previous track, start at 0
909 resume_item = self.get_item(queue_id, 0)
910 resume_pos = 0
911
912 if resume_item is not None:
913 queue_player = self.mass.players.get_player(queue_id)
914 if queue_player is None:
915 raise PlayerUnavailableError(f"Player {queue_id} is not available")
916 if (
917 fade_in is None
918 and queue_player.state.playback_state == PlaybackState.IDLE
919 and (time.time() - queue.elapsed_time_last_updated) > 60
920 ):
921 # enable fade in effect if the player is idle for a while
922 fade_in = resume_pos > 0
923 if resume_item.media_type == MediaType.RADIO:
924 # we're not able to skip in online radio so this is pointless
925 resume_pos = 0
926 await self.play_index(
927 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
928 )
929 else:
930 # Queue is empty, try to resume from playlog
931 if await self._try_resume_from_playlog(queue):
932 return
933 msg = f"Resume queue requested but queue {queue.display_name} is empty"
934 raise QueueEmpty(msg)
935
936 @api_command("player_queues/play_index")
937 async def play_index(
938 self,
939 queue_id: str,
940 index: int | str,
941 seek_position: int = 0,
942 fade_in: bool = False,
943 debounce: bool = False,
944 ) -> None:
945 """Play item at index (or item_id) X in queue."""
946 queue = self._queues[queue_id]
947 queue.resume_pos = 0
948 if isinstance(index, str):
949 temp_index = self.index_by_id(queue_id, index)
950 if temp_index is None:
951 raise InvalidDataError(f"Item {index} not found in queue")
952 index = temp_index
953 # At this point index is guaranteed to be int
954 queue.current_index = index
955 # update current item and elapsed time and signal update
956 # this way the UI knows immediately that a new item is loading
957 queue.current_item = self.get_item(queue_id, index)
958 queue.elapsed_time = seek_position
959 queue.elapsed_time_last_updated = time.time()
960 self.signal_update(queue_id)
961 queue.index_in_buffer = index
962 queue.flow_mode_stream_log = []
963 target_player = self.mass.players.get_player(queue_id)
964 if target_player is None:
965 raise PlayerUnavailableError(f"Player {queue_id} is not available")
966 queue.next_item_id_enqueued = None
967 # always update session id when we start a new playback session
968 queue.session_id = shortuuid.random(length=8)
969 # handle resume point of audiobook(chapter) or podcast(episode)
970 if (
971 not seek_position
972 and (queue_item := self.get_item(queue_id, index))
973 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
974 ):
975 seek_position = max(0, int((resume_position_ms - 500) / 1000))
976
977 # send play_media request to player
978 # NOTE that we debounce this a bit to account for someone hitting the next button
979 # like a madman. This will prevent the player from being overloaded with requests.
980 async def _play_index(index: int, debounce: bool) -> None:
981 for attempt in range(5):
982 try:
983 queue_item = self.get_item(queue_id, index)
984 if not queue_item:
985 continue # guard
986 await self._load_item(
987 queue_item,
988 self._get_next_index(queue_id, index),
989 is_start=True,
990 seek_position=seek_position if attempt == 0 else 0,
991 fade_in=fade_in if attempt == 0 else False,
992 )
993 # if we reach this point, loading the item succeeded, break the loop
994 queue.current_index = index
995 queue.current_item = queue_item
996 break
997 except (MediaNotFoundError, AudioError):
998 # the requested index can not be played.
999 if queue_item:
1000 self.logger.warning(
1001 "Skipping unplayable item %s (%s)",
1002 queue_item.name,
1003 queue_item.uri,
1004 )
1005 queue_item.available = False
1006 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
1007 if next_index is None:
1008 raise MediaNotFoundError("No next item available")
1009 index = next_index
1010 else:
1011 # all attempts to find a playable item failed
1012 raise MediaNotFoundError("No playable item found to start playback")
1013
1014 # work out if we need to use flow mode
1015 flow_mode = target_player.flow_mode and queue_item.media_type not in (
1016 # don't use flow mode for duration-less streams
1017 MediaType.RADIO,
1018 MediaType.PLUGIN_SOURCE,
1019 )
1020 await asyncio.sleep(0.5 if debounce else 0.1)
1021 queue.flow_mode = flow_mode
1022 await self.mass.players.play_media(
1023 player_id=queue_id,
1024 media=await self.player_media_from_queue_item(queue_item, flow_mode),
1025 )
1026 queue.current_index = index
1027 queue.current_item = queue_item
1028 await asyncio.sleep(2)
1029 self._transitioning_players.discard(queue_id)
1030
1031 # we set a flag to notify the update logic that we're transitioning to a new track
1032 self._transitioning_players.add(queue_id)
1033
1034 # we debounce the play_index command to handle the case where someone
1035 # is spamming next/previous on the player
1036 task_id = f"play_index_{queue_id}"
1037 if existing_task := self.mass.get_task(task_id):
1038 existing_task.cancel()
1039 with suppress(asyncio.CancelledError):
1040 await existing_task
1041 task = self.mass.create_task(
1042 _play_index,
1043 index,
1044 debounce,
1045 task_id=task_id,
1046 )
1047 await task
1048 self.signal_update(queue_id)
1049
1050 @api_command("player_queues/transfer")
1051 async def transfer_queue(
1052 self,
1053 source_queue_id: str,
1054 target_queue_id: str,
1055 auto_play: bool | None = None,
1056 ) -> None:
1057 """Transfer queue to another queue."""
1058 if not (source_queue := self.get(source_queue_id)):
1059 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1060 if not (target_queue := self.get(target_queue_id)):
1061 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1062 if auto_play is None:
1063 auto_play = source_queue.state == PlaybackState.PLAYING
1064
1065 target_player = self.mass.players.get_player(target_queue_id)
1066 if target_player is None:
1067 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1068 if target_player.state.active_group or target_player.state.synced_to:
1069 # edge case: the user wants to move playback from the group as a whole, to a single
1070 # player in the group or it is grouped and the command targeted at the single player.
1071 # We need to dissolve the group first.
1072 group_id = target_player.state.active_group or target_player.state.synced_to
1073 assert group_id is not None # checked in if condition above
1074 await self.mass.players.cmd_ungroup(group_id)
1075 await asyncio.sleep(3)
1076
1077 source_items = self._queue_items[source_queue_id]
1078 target_queue.repeat_mode = source_queue.repeat_mode
1079 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1080 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1081 target_queue.radio_source = source_queue.radio_source
1082 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1083 target_queue.resume_pos = int(source_queue.elapsed_time)
1084 target_queue.current_index = source_queue.current_index
1085 if source_queue.current_item:
1086 target_queue.current_item = source_queue.current_item
1087 target_queue.current_item.queue_id = target_queue_id
1088 self.clear(source_queue_id)
1089
1090 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1091 for item in source_items:
1092 item.queue_id = target_queue_id
1093 self.update_items(target_queue_id, source_items)
1094 if auto_play:
1095 await self.resume(target_queue_id)
1096
1097 # Interaction with player
1098
1099 async def on_player_register(self, player: Player) -> None:
1100 """Register PlayerQueue for given player/queue id."""
1101 queue_id = player.player_id
1102 queue: PlayerQueue | None = None
1103 queue_items: list[QueueItem] = []
1104 # try to restore previous state
1105 if prev_state := await self.mass.cache.get(
1106 key=queue_id,
1107 provider=self.domain,
1108 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1109 ):
1110 try:
1111 queue = PlayerQueue.from_dict(prev_state)
1112 prev_items = await self.mass.cache.get(
1113 key=queue_id,
1114 provider=self.domain,
1115 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1116 default=[],
1117 )
1118 queue_items = []
1119 for idx, item_data in enumerate(prev_items):
1120 qi = QueueItem.from_cache(item_data)
1121 if not qi.media_item:
1122 # Skip items with missing media_item - this can happen if
1123 # MA was killed during shutdown while cache was being written
1124 self.logger.debug(
1125 "Skipping queue item %s (index %d) restored from cache "
1126 "without media_item",
1127 qi.name,
1128 idx,
1129 )
1130 continue
1131 queue_items.append(qi)
1132 if queue.enqueued_media_items:
1133 # we need to restore the MediaItem objects for the enqueued media items
1134 # Items from cache may be dicts that need deserialization
1135 restored_enqueued_items: list[MediaItemType] = []
1136 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1137 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1138 )
1139 for item in cached_items:
1140 if isinstance(item, dict):
1141 restored_item = media_from_dict(item)
1142 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1143 else:
1144 restored_enqueued_items.append(item)
1145 queue.enqueued_media_items = restored_enqueued_items
1146 except Exception as err:
1147 self.logger.warning(
1148 "Failed to restore the queue(items) for %s - %s",
1149 player.state.name,
1150 str(err),
1151 )
1152 # Reset to clean state on failure
1153 queue = None
1154 queue_items = []
1155 if queue is None:
1156 queue = PlayerQueue(
1157 queue_id=queue_id,
1158 active=False,
1159 display_name=player.state.name,
1160 available=player.state.available,
1161 dont_stop_the_music_enabled=False,
1162 items=0,
1163 )
1164
1165 self._queues[queue_id] = queue
1166 self._queue_items[queue_id] = queue_items
1167 # always call update to calculate state etc
1168 self.on_player_update(player, {})
1169 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1170
1171 def on_player_update(
1172 self,
1173 player: Player,
1174 changed_values: dict[str, tuple[Any, Any]],
1175 ) -> None:
1176 """
1177 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1178
1179 NOTE: This is called every second if the player is playing.
1180 """
1181 queue_id = player.player_id
1182 if (queue := self._queues.get(queue_id)) is None:
1183 # race condition
1184 return
1185 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1186 # do nothing while the announcement is in progress
1187 return
1188 # determine if this queue is currently active for this player
1189 queue.active = player.state.active_source in (queue.queue_id, None)
1190 if not queue.active and queue_id not in self._prev_states:
1191 queue.state = PlaybackState.IDLE
1192 # return early if the queue is not active and we have no previous state
1193 return
1194 if queue.queue_id in self._transitioning_players:
1195 # we're currently transitioning to a new track,
1196 # ignore updates from the player during this time
1197 return
1198
1199 # queue is active and preflight checks passed, update the queue details
1200 self._update_queue_from_player(player)
1201
1202 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1203 """Call when a player is removed from the registry."""
1204 if permanent:
1205 # if the player is permanently removed, we also remove the cached queue data
1206 self.mass.create_task(
1207 self.mass.cache.delete(
1208 key=player_id,
1209 provider=self.domain,
1210 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1211 )
1212 )
1213 self.mass.create_task(
1214 self.mass.cache.delete(
1215 key=player_id,
1216 provider=self.domain,
1217 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1218 )
1219 )
1220 self._queues.pop(player_id, None)
1221 self._queue_items.pop(player_id, None)
1222
1223 async def load_next_queue_item(
1224 self,
1225 queue_id: str,
1226 current_item_id: str,
1227 ) -> QueueItem:
1228 """
1229 Call when a player wants the next queue item to play.
1230
1231 Raises QueueEmpty if there are no more tracks left.
1232 """
1233 queue = self.get(queue_id)
1234 if not queue:
1235 msg = f"PlayerQueue {queue_id} is not available"
1236 raise PlayerUnavailableError(msg)
1237 cur_index = self.index_by_id(queue_id, current_item_id)
1238 if cur_index is None:
1239 # this is just a guard for bad data
1240 raise QueueEmpty("Invalid item id for queue given.")
1241 next_item: QueueItem | None = None
1242 idx = 0
1243 while True:
1244 next_index = self._get_next_index(queue_id, cur_index + idx)
1245 if next_index is None:
1246 raise QueueEmpty("No more tracks left in the queue.")
1247 queue_item = self.get_item(queue_id, next_index)
1248 if queue_item is None:
1249 raise QueueEmpty("No more tracks left in the queue.")
1250 if idx >= 10:
1251 # we only allow 10 retries to prevent infinite loops
1252 raise QueueEmpty("No more (playable) tracks left in the queue.")
1253 try:
1254 await self._load_item(queue_item, next_index)
1255 # we're all set, this is our next item
1256 next_item = queue_item
1257 break
1258 except (MediaNotFoundError, AudioError):
1259 # No stream details found, skip this QueueItem
1260 self.logger.warning(
1261 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1262 )
1263 queue_item.available = False
1264 idx += 1
1265 if idx != 0:
1266 # we skipped some items, signal a queue items update
1267 self.update_items(queue_id, self._queue_items[queue_id])
1268 if next_item is None:
1269 raise QueueEmpty("No more (playable) tracks left in the queue.")
1270
1271 return next_item
1272
1273 async def _load_item(
1274 self,
1275 queue_item: QueueItem,
1276 next_index: int | None,
1277 is_start: bool = False,
1278 seek_position: int = 0,
1279 fade_in: bool = False,
1280 ) -> None:
1281 """Try to load the stream details for the given queue item."""
1282 queue_id = queue_item.queue_id
1283 queue = self._queues[queue_id]
1284
1285 # we use a contextvar to bypass the throttler for this asyncio task/context
1286 # this makes sure that playback has priority over other requests that may be
1287 # happening in the background
1288 BYPASS_THROTTLER.set(True)
1289
1290 self.logger.debug(
1291 "(pre)loading (next) item for queue %s...",
1292 queue.display_name,
1293 )
1294
1295 if not queue_item.available:
1296 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1297
1298 # work out if we are playing an album and if we should prefer album
1299 # loudness
1300 next_track_from_same_album = (
1301 next_index is not None
1302 and (next_item := self.get_item(queue_id, next_index))
1303 and (
1304 queue_item.media_item
1305 and hasattr(queue_item.media_item, "album")
1306 and queue_item.media_item.album
1307 and next_item.media_item
1308 and hasattr(next_item.media_item, "album")
1309 and next_item.media_item.album
1310 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1311 )
1312 )
1313 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1314 if current_index is None:
1315 previous_track_from_same_album = False
1316 else:
1317 previous_index = max(current_index - 1, 0)
1318 previous_track_from_same_album = (
1319 previous_index > 0
1320 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1321 and previous_item.media_item is not None
1322 and hasattr(previous_item.media_item, "album")
1323 and previous_item.media_item.album is not None
1324 and queue_item.media_item is not None
1325 and hasattr(queue_item.media_item, "album")
1326 and queue_item.media_item.album is not None
1327 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1328 )
1329 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1330 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1331 album = queue_item.media_item.album
1332 # prefer the full library media item so we have all metadata and provider(quality) info
1333 # always request the full library item as there might be other qualities available
1334 if library_item := await self.mass.music.get_library_item_by_prov_id(
1335 queue_item.media_item.media_type,
1336 queue_item.media_item.item_id,
1337 queue_item.media_item.provider,
1338 ):
1339 queue_item.media_item = cast("Track", library_item)
1340 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1341 "ytmusic"
1342 ):
1343 # Youtube Music has poor thumbs by default, so we always fetch the full item
1344 # this also catches the case where they have an unavailable item in a listing
1345 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1346 queue_item.media_item = cast("Track", fetched_item)
1347
1348 # ensure we got the full (original) album set
1349 if album and (
1350 library_album := await self.mass.music.get_library_item_by_prov_id(
1351 album.media_type,
1352 album.item_id,
1353 album.provider,
1354 )
1355 ):
1356 queue_item.media_item.album = cast("Album", library_album)
1357 elif album:
1358 # Restore original album if we have no better alternative from the library
1359 queue_item.media_item.album = album
1360 # prefer album image over track image
1361 if queue_item.media_item.album and queue_item.media_item.album.image:
1362 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1363 queue_item.media_item.metadata.images = UniqueList(
1364 [
1365 queue_item.media_item.album.image,
1366 *org_images,
1367 ]
1368 )
1369 # Fetch the streamdetails, which could raise in case of an unplayable item.
1370 # For example, YT Music returns Radio Items that are not playable.
1371 queue_item.streamdetails = await get_stream_details(
1372 mass=self.mass,
1373 queue_item=queue_item,
1374 seek_position=seek_position,
1375 fade_in=fade_in,
1376 prefer_album_loudness=bool(playing_album_tracks),
1377 )
1378
1379 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1380 """Call when a player has (started) loading a track in the buffer."""
1381 queue = self.get(queue_id)
1382 if not queue:
1383 msg = f"PlayerQueue {queue_id} is not available"
1384 raise PlayerUnavailableError(msg)
1385 # store the index of the item that is currently (being) loaded in the buffer
1386 # which helps us a bit to determine how far the player has buffered ahead
1387 queue.index_in_buffer = self.index_by_id(queue_id, item_id)
1388 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1389 self.signal_update(queue_id)
1390 # preload next streamdetails
1391 self._preload_next_item(queue_id, item_id)
1392
1393 # Main queue manipulation methods
1394
1395 async def load(
1396 self,
1397 queue_id: str,
1398 queue_items: list[QueueItem],
1399 insert_at_index: int = 0,
1400 keep_remaining: bool = True,
1401 keep_played: bool = True,
1402 shuffle: bool = False,
1403 ) -> None:
1404 """Load new items at index.
1405
1406 - queue_id: id of the queue to process this request.
1407 - queue_items: a list of QueueItems
1408 - insert_at_index: insert the item(s) at this index
1409 - keep_remaining: keep the remaining items after the insert
1410 - shuffle: (re)shuffle the items after insert index
1411 """
1412 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1413 next_items = queue_items
1414
1415 # if keep_remaining, append the old 'next' items
1416 if keep_remaining:
1417 next_items += self._queue_items[queue_id][insert_at_index:]
1418
1419 # we set the original insert order as attribute so we can un-shuffle
1420 for index, item in enumerate(next_items):
1421 item.sort_index += insert_at_index + index
1422 # (re)shuffle the final batch if needed
1423 if shuffle:
1424 next_items = await _smart_shuffle(next_items)
1425 self.update_items(queue_id, prev_items + next_items)
1426
1427 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1428 """Update the existing queue items, mostly caused by reordering."""
1429 self._queue_items[queue_id] = queue_items
1430 queue = self._queues[queue_id]
1431 queue.items = len(self._queue_items[queue_id])
1432 # to track if the queue items changed we set a timestamp
1433 # this is a simple way to detect changes in the list of items
1434 # without having to compare the entire list
1435 queue.items_last_updated = time.time()
1436 self.signal_update(queue_id, True)
1437 if (
1438 queue.state == PlaybackState.PLAYING
1439 and queue.index_in_buffer is not None
1440 and queue.index_in_buffer == queue.current_index
1441 ):
1442 # if the queue is playing,
1443 # ensure to (re)queue the next track because it might have changed
1444 # note that we only do this if the player has loaded the current track
1445 # if not, we wait until it has loaded to prevent conflicts
1446 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1447 self._enqueue_next_item(queue_id, next_item)
1448
1449 # Helper methods
1450
1451 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1452 """Get queue item by index or item_id."""
1453 if item_id_or_index is None:
1454 return None
1455 if (queue_items := self._queue_items.get(queue_id)) is None:
1456 return None
1457 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1458 return queue_items[item_id_or_index]
1459 if isinstance(item_id_or_index, str):
1460 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1461 return None
1462
1463 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1464 """Signal state changed of given queue."""
1465 queue = self._queues[queue_id]
1466 if items_changed:
1467 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1468 # save items in cache - only cache items with valid media_item
1469 cache_data = [
1470 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1471 ]
1472 self.mass.create_task(
1473 self.mass.cache.set(
1474 key=queue_id,
1475 data=cache_data,
1476 provider=self.domain,
1477 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1478 )
1479 )
1480 # always send the base event
1481 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1482 # also signal update to the player itself so it can update its current_media
1483 self.mass.players.trigger_player_update(queue_id)
1484 # save state
1485 self.mass.create_task(
1486 self.mass.cache.set(
1487 key=queue_id,
1488 data=queue.to_cache(),
1489 provider=self.domain,
1490 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1491 )
1492 )
1493
1494 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1495 """Get index by queue_item_id."""
1496 queue_items = self._queue_items[queue_id]
1497 for index, item in enumerate(queue_items):
1498 if item.queue_item_id == queue_item_id:
1499 return index
1500 return None
1501
1502 async def player_media_from_queue_item(
1503 self, queue_item: QueueItem, flow_mode: bool
1504 ) -> PlayerMedia:
1505 """Parse PlayerMedia from QueueItem."""
1506 queue = self._queues[queue_item.queue_id]
1507 if flow_mode:
1508 duration = None
1509 elif queue_item.streamdetails:
1510 # prefer netto duration
1511 # when seeking, the player only receives the remaining duration
1512 duration = queue_item.streamdetails.duration or queue_item.duration
1513 if duration and queue_item.streamdetails.seek_position:
1514 duration = duration - queue_item.streamdetails.seek_position
1515 else:
1516 duration = queue_item.duration
1517 if queue.session_id is None:
1518 # handle error or return early
1519 raise InvalidDataError("Queue session_id is None")
1520 media = PlayerMedia(
1521 uri=queue_item.uri,
1522 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1523 title="Music Assistant" if flow_mode else queue_item.name,
1524 image_url=MASS_LOGO_ONLINE,
1525 duration=duration,
1526 source_id=queue_item.queue_id,
1527 queue_item_id=queue_item.queue_item_id,
1528 custom_data={
1529 "session_id": queue.session_id,
1530 "original_uri": queue_item.uri,
1531 "flow_mode": flow_mode,
1532 },
1533 )
1534 if not flow_mode and queue_item.media_item:
1535 media.title = queue_item.media_item.name
1536 media.artist = getattr(queue_item.media_item, "artist_str", "")
1537 media.album = (
1538 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1539 )
1540 if queue_item.image:
1541 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1542 # we prefer the imageproxy on the streamserver here because this request is sent
1543 # to the player itself which may not be able to reach the regular webserver
1544 media.image_url = self.mass.metadata.get_image_url(
1545 queue_item.image, size=500, prefer_stream_server=True
1546 )
1547 return media
1548
1549 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1550 """Return tracks for given artist, based on user preference."""
1551 artist_items_conf = self.mass.config.get_raw_core_config_value(
1552 self.domain,
1553 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1554 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1555 )
1556 self.logger.info(
1557 "Fetching tracks to play for artist %s",
1558 artist.name,
1559 )
1560 if artist_items_conf in ("library_tracks", "all_tracks"):
1561 all_items = await self.mass.music.artists.tracks(
1562 artist.item_id,
1563 artist.provider,
1564 in_library_only=artist_items_conf == "library_tracks",
1565 )
1566 random.shuffle(all_items)
1567 return all_items
1568 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1569 all_tracks: list[Track] = []
1570 for library_album in await self.mass.music.artists.albums(
1571 artist.item_id,
1572 artist.provider,
1573 in_library_only=artist_items_conf == "library_album_tracks",
1574 ):
1575 for album_track in await self.mass.music.albums.tracks(
1576 library_album.item_id, library_album.provider
1577 ):
1578 if album_track not in all_tracks:
1579 all_tracks.append(album_track)
1580 random.shuffle(all_tracks)
1581 return all_tracks
1582 return []
1583
1584 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1585 """Return tracks for given album, based on user preference."""
1586 album_items_conf = self.mass.config.get_raw_core_config_value(
1587 self.domain,
1588 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1589 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1590 )
1591 result: list[Track] = []
1592 start_item_found = False
1593 self.logger.info(
1594 "Fetching tracks to play for album %s",
1595 album.name,
1596 )
1597 for album_track in await self.mass.music.albums.tracks(
1598 item_id=album.item_id,
1599 provider_instance_id_or_domain=album.provider,
1600 in_library_only=album_items_conf == "library_tracks",
1601 ):
1602 if not album_track.available:
1603 continue
1604 if start_item in (album_track.item_id, album_track.uri):
1605 start_item_found = True
1606 if start_item is not None and not start_item_found:
1607 continue
1608 result.append(album_track)
1609 return result
1610
1611 async def get_playlist_tracks(
1612 self, playlist: Playlist, start_item: str | None
1613 ) -> list[PlaylistPlayableItem]:
1614 """Return tracks for given playlist, based on user preference."""
1615 result: list[PlaylistPlayableItem] = []
1616 start_item_found = False
1617 self.logger.info(
1618 "Fetching tracks to play for playlist %s",
1619 playlist.name,
1620 )
1621 # TODO: Handle other sort options etc.
1622 async for playlist_track in self.mass.music.playlists.tracks(
1623 playlist.item_id, playlist.provider
1624 ):
1625 if not playlist_track.available:
1626 continue
1627 if start_item in (playlist_track.item_id, playlist_track.uri):
1628 start_item_found = True
1629 if start_item is not None and not start_item_found:
1630 continue
1631 result.append(playlist_track)
1632 return result
1633
1634 async def get_audiobook_resume_point(
1635 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1636 ) -> int:
1637 """Return resume point (in milliseconds) for given audio book."""
1638 self.logger.debug(
1639 "Fetching resume point to play for audio book %s",
1640 audio_book.name,
1641 )
1642 if chapter is not None:
1643 # user explicitly selected a chapter to play
1644 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1645 if chapters := audio_book.metadata.chapters:
1646 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1647 return int(_chapter.start * 1000)
1648 raise InvalidDataError(
1649 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1650 )
1651 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1652 audio_book, userid=userid
1653 )
1654 return 0 if full_played else resume_position_ms
1655
1656 async def get_next_podcast_episodes(
1657 self,
1658 podcast: Podcast | None,
1659 episode: PodcastEpisode | str | None,
1660 userid: str | None = None,
1661 ) -> UniqueList[PodcastEpisode]:
1662 """Return (next) episode(s) and resume point for given podcast."""
1663 if podcast is None and isinstance(episode, str | NoneType):
1664 raise InvalidDataError("Either podcast or episode must be provided")
1665 if podcast is None:
1666 # single podcast episode requested
1667 assert isinstance(episode, PodcastEpisode) # checked above
1668 self.logger.debug(
1669 "Fetching resume point to play for Podcast episode %s",
1670 episode.name,
1671 )
1672 (
1673 fully_played,
1674 resume_position_ms,
1675 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1676 episode.fully_played = fully_played
1677 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1678 return UniqueList([episode])
1679 # podcast with optional start episode requested
1680 self.logger.debug(
1681 "Fetching episode(s) and resume point to play for Podcast %s",
1682 podcast.name,
1683 )
1684 all_episodes = [
1685 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1686 ]
1687 all_episodes.sort(key=lambda x: x.position)
1688 # if a episode was provided, a user explicitly selected a episode to play
1689 # so we need to find the index of the episode in the list
1690 resolved_episode: PodcastEpisode | None = None
1691 if isinstance(episode, PodcastEpisode):
1692 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1693 if resolved_episode:
1694 # ensure we have accurate resume info
1695 (
1696 fully_played,
1697 resume_position_ms,
1698 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1699 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1700 elif isinstance(episode, str):
1701 resolved_episode = next(
1702 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1703 )
1704 if resolved_episode:
1705 # ensure we have accurate resume info
1706 (
1707 fully_played,
1708 resume_position_ms,
1709 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1710 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1711 else:
1712 # get first episode that is not fully played
1713 for ep in all_episodes:
1714 if ep.fully_played:
1715 continue
1716 # ensure we have accurate resume info
1717 (
1718 fully_played,
1719 resume_position_ms,
1720 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1721 if fully_played:
1722 continue
1723 ep.resume_position_ms = resume_position_ms
1724 resolved_episode = ep
1725 break
1726 else:
1727 # no episodes found that are not fully played, so we start at the beginning
1728 resolved_episode = next((x for x in all_episodes), None)
1729 if resolved_episode is None:
1730 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1731 # get the index of the episode
1732 episode_index = all_episodes.index(resolved_episode)
1733 # return the (remaining) episode(s) to play
1734 return UniqueList(all_episodes[episode_index:])
1735
1736 def _get_next_index(
1737 self,
1738 queue_id: str,
1739 cur_index: int | None,
1740 is_skip: bool = False,
1741 allow_repeat: bool = True,
1742 ) -> int | None:
1743 """
1744 Return the next index for the queue, accounting for repeat settings.
1745
1746 Will return None if there are no (more) items in the queue.
1747 """
1748 queue = self._queues[queue_id]
1749 queue_items = self._queue_items[queue_id]
1750 if not queue_items or cur_index is None:
1751 # queue is empty
1752 return None
1753 # handle repeat single track
1754 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1755 return cur_index if allow_repeat else None
1756 # handle cur_index is last index of the queue
1757 if cur_index >= (len(queue_items) - 1):
1758 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1759 # if repeat all is enabled, we simply start again from the beginning
1760 return 0
1761 return None
1762 # all other: just the next index
1763 return cur_index + 1
1764
1765 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1766 """Return next QueueItem for given queue."""
1767 index: int
1768 if isinstance(cur_index, str):
1769 resolved_index = self.index_by_id(queue_id, cur_index)
1770 if resolved_index is None:
1771 return None # guard
1772 index = resolved_index
1773 else:
1774 index = cur_index
1775 # At this point index is guaranteed to be int
1776 for skip in range(5):
1777 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1778 break
1779 next_item = self.get_item(queue_id, next_index)
1780 if next_item is None:
1781 continue
1782 if not next_item.available:
1783 # ensure that we skip unavailable items (set by load_next track logic)
1784 continue
1785 return next_item
1786 return None
1787
1788 async def _fill_radio_tracks(self, queue_id: str) -> None:
1789 """Fill a Queue with (additional) Radio tracks."""
1790 self.logger.debug(
1791 "Filling radio tracks for queue %s",
1792 queue_id,
1793 )
1794 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1795 # fill queue - filter out unavailable items
1796 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1797 await self.load(
1798 queue_id,
1799 queue_items,
1800 insert_at_index=len(self._queue_items[queue_id]) + 1,
1801 )
1802
1803 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1804 """Enqueue the next item on the player."""
1805 if not next_item:
1806 # no next item, nothing to do...
1807 return
1808
1809 queue = self._queues[queue_id]
1810 if queue.flow_mode:
1811 # ignore this for flow mode
1812 return
1813
1814 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1815 await self.mass.players.enqueue_next_media(
1816 player_id=queue_id,
1817 media=await self.player_media_from_queue_item(next_item, False),
1818 )
1819 if queue.next_item_id_enqueued != next_item.queue_item_id:
1820 queue.next_item_id_enqueued = next_item.queue_item_id
1821 self.logger.debug(
1822 "Enqueued next track %s on queue %s",
1823 next_item.name,
1824 self._queues[queue_id].display_name,
1825 )
1826
1827 task_id = f"enqueue_next_item_{queue_id}"
1828 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1829
1830 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1831 """
1832 Preload the streamdetails for the next item in the queue/buffer.
1833
1834 This basically ensures the item is playable and fetches the stream details.
1835 If an error occurs, the item will be skipped and the next item will be loaded.
1836 """
1837 queue = self._queues[queue_id]
1838
1839 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1840 try:
1841 # wait for the item that was loaded in the buffer is the actually playing item
1842 # this prevents a race condition when we preload the next item too soon
1843 # while the player is actually preloading the previously enqueued item.
1844 retries = 120
1845 while retries > 0:
1846 if not queue.current_item:
1847 return # guard
1848 if queue.current_item.queue_item_id == item_id_in_buffer:
1849 break
1850 retries -= 1
1851 await asyncio.sleep(1)
1852 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1853 self.logger.debug(
1854 "Preloaded next item %s for queue %s",
1855 next_item.name,
1856 queue.display_name,
1857 )
1858 # enqueue the next item on the player
1859 self._enqueue_next_item(queue_id, next_item)
1860
1861 except QueueEmpty:
1862 return
1863
1864 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1865 # this should not happen, but guard anyways
1866 return
1867 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1868 # radio items or no duration, nothing to do
1869 return
1870
1871 task_id = f"preload_next_item_{queue_id}"
1872 self.mass.create_task(
1873 _preload_streamdetails,
1874 item_id_in_buffer,
1875 task_id=task_id,
1876 abort_existing=True,
1877 )
1878
1879 async def _resolve_media_items(
1880 self,
1881 media_item: MediaItemType | ItemMapping | BrowseFolder,
1882 start_item: str | None = None,
1883 userid: str | None = None,
1884 queue_id: str | None = None,
1885 ) -> list[MediaItemType]:
1886 """Resolve/unwrap media items to enqueue."""
1887 # resolve Itemmapping to full media item
1888 if isinstance(media_item, ItemMapping):
1889 if media_item.uri is None:
1890 raise InvalidDataError("ItemMapping has no URI")
1891 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1892 if media_item.media_type == MediaType.PLAYLIST:
1893 media_item = cast("Playlist", media_item)
1894 self.mass.create_task(
1895 self.mass.music.mark_item_played(
1896 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1897 )
1898 )
1899 return list(await self.get_playlist_tracks(media_item, start_item))
1900 if media_item.media_type == MediaType.ARTIST:
1901 media_item = cast("Artist", media_item)
1902 self.mass.create_task(
1903 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1904 )
1905 return list(await self.get_artist_tracks(media_item))
1906 if media_item.media_type == MediaType.ALBUM:
1907 media_item = cast("Album", media_item)
1908 self.mass.create_task(
1909 self.mass.music.mark_item_played(
1910 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1911 )
1912 )
1913 return list(await self.get_album_tracks(media_item, start_item))
1914 if media_item.media_type == MediaType.AUDIOBOOK:
1915 media_item = cast("Audiobook", media_item)
1916 # ensure we grab the correct/latest resume point info
1917 media_item.resume_position_ms = await self.get_audiobook_resume_point(
1918 media_item, start_item, userid=userid
1919 )
1920 return [media_item]
1921 if media_item.media_type == MediaType.PODCAST:
1922 media_item = cast("Podcast", media_item)
1923 self.mass.create_task(
1924 self.mass.music.mark_item_played(
1925 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1926 )
1927 )
1928 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
1929 if media_item.media_type == MediaType.PODCAST_EPISODE:
1930 media_item = cast("PodcastEpisode", media_item)
1931 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
1932 if media_item.media_type == MediaType.FOLDER:
1933 media_item = cast("BrowseFolder", media_item)
1934 return list(await self._get_folder_tracks(media_item))
1935 # all other: single track or radio item
1936 return [cast("MediaItemType", media_item)]
1937
1938 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
1939 """Try to resume playback from playlog when queue is empty.
1940
1941 Attempts to find user-initiated recently played items in the following order:
1942 1. By userid AND queue_id
1943 2. By queue_id only
1944 3. By userid only (if available)
1945 4. Any recently played item
1946
1947 :param queue: The queue to resume playback on.
1948 :return: True if playback was started, False otherwise.
1949 """
1950 # Try different filter combinations in order of specificity
1951 filter_attempts: list[tuple[str | None, str | None, str]] = []
1952 if queue.userid:
1953 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
1954 filter_attempts.append((None, queue.queue_id, "queue_id match"))
1955 if queue.userid:
1956 filter_attempts.append((queue.userid, None, "userid match"))
1957 filter_attempts.append((None, None, "any recent item"))
1958
1959 for userid, queue_id, match_type in filter_attempts:
1960 items = await self.mass.music.recently_played(
1961 limit=5,
1962 fully_played_only=False,
1963 user_initiated_only=True,
1964 userid=userid,
1965 queue_id=queue_id,
1966 )
1967 for item in items:
1968 if not item.uri:
1969 continue
1970 try:
1971 await self.play_media(queue.queue_id, item)
1972 self.logger.info(
1973 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
1974 )
1975 return True
1976 except MusicAssistantError as err:
1977 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
1978 continue
1979
1980 return False
1981
1982 async def _get_radio_tracks(
1983 self, queue_id: str, is_initial_radio_mode: bool = False
1984 ) -> list[Track]:
1985 """Call the registered music providers for dynamic tracks."""
1986 queue = self._queues[queue_id]
1987 queue_track_items: list[Track] = [
1988 q.media_item
1989 for q in self._queue_items[queue_id]
1990 if q.media_item and isinstance(q.media_item, Track)
1991 ]
1992 if not queue.radio_source:
1993 # this may happen during race conditions as this method is called delayed
1994 return []
1995 self.logger.info(
1996 "Fetching radio tracks for queue %s based on: %s",
1997 queue.display_name,
1998 ", ".join([x.name for x in queue.radio_source]),
1999 )
2000
2001 # Get user's preferred provider instances for steering provider selection
2002 preferred_provider_instances: list[str] | None = None
2003 if (
2004 queue.userid
2005 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
2006 and playback_user.provider_filter
2007 ):
2008 preferred_provider_instances = playback_user.provider_filter
2009
2010 available_base_tracks: list[Track] = []
2011 base_track_sample_size = 5
2012 # Some providers have very deterministic similar track algorithms when providing
2013 # a single track item. When we have a radio mode based on 1 track and we have to
2014 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2015 if (
2016 len(queue.radio_source) == 1
2017 and queue.radio_source[0].media_type == MediaType.TRACK
2018 and not is_initial_radio_mode
2019 ):
2020 available_base_tracks = queue_track_items
2021 else:
2022 # Grab all the available base tracks based on the selected source items.
2023 # shuffle the source items, just in case
2024 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2025 ctrl = self.mass.music.get_controller(radio_item.media_type)
2026 try:
2027 available_base_tracks += [
2028 track
2029 for track in await ctrl.radio_mode_base_tracks(
2030 radio_item, # type: ignore[arg-type]
2031 preferred_provider_instances,
2032 )
2033 # Avoid duplicate base tracks
2034 if track not in available_base_tracks
2035 ]
2036 except UnsupportedFeaturedException as err:
2037 self.logger.debug(
2038 "Skip loading radio items for %s: %s ",
2039 radio_item.uri,
2040 str(err),
2041 )
2042 if not available_base_tracks:
2043 raise UnsupportedFeaturedException("Radio mode not available for source items")
2044
2045 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2046 base_tracks = random.sample(
2047 available_base_tracks,
2048 min(base_track_sample_size, len(available_base_tracks)),
2049 )
2050 # Use a set to avoid duplicate dynamic tracks
2051 dynamic_tracks: set[Track] = set()
2052 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2053 for allow_lookup in (False, True):
2054 if dynamic_tracks:
2055 break
2056 for base_track in base_tracks:
2057 try:
2058 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2059 base_track.item_id,
2060 base_track.provider,
2061 allow_lookup=allow_lookup,
2062 preferred_provider_instances=preferred_provider_instances,
2063 )
2064 except MediaNotFoundError:
2065 # Some providers don't have similar tracks for all items. For example,
2066 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2067 # in that case, just skip the track.
2068 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2069 continue
2070 for track in _similar_tracks:
2071 if (
2072 track not in base_tracks
2073 # Exclude tracks we have already played / queued
2074 and track not in queue_track_items
2075 # Ignore tracks that are too long for radio mode, e.g. mixes
2076 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2077 ):
2078 dynamic_tracks.add(track)
2079 if len(dynamic_tracks) >= 50:
2080 break
2081 queue_tracks: list[Track] = []
2082 dynamic_tracks_list = list(dynamic_tracks)
2083 # Only include the sampled base tracks when the radio mode is first initialized
2084 if is_initial_radio_mode:
2085 queue_tracks += [base_tracks[0]]
2086 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2087 if len(base_tracks) > 1:
2088 for base_track in base_tracks[1:]:
2089 queue_tracks += [base_track]
2090 if len(dynamic_tracks_list) > 2:
2091 queue_tracks += random.sample(dynamic_tracks_list, 2)
2092 else:
2093 queue_tracks += dynamic_tracks_list
2094 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2095 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2096 if remaining_dynamic_tracks:
2097 queue_tracks += random.sample(
2098 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2099 )
2100 return queue_tracks
2101
2102 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2103 """Fetch (playable) tracks for given browse folder."""
2104 self.logger.info(
2105 "Fetching tracks to play for folder %s",
2106 folder.name,
2107 )
2108 tracks: list[Track] = []
2109 for item in await self.mass.music.browse(folder.path):
2110 if not item.is_playable:
2111 continue
2112 # recursively fetch tracks from all media types
2113 resolved = await self._resolve_media_items(item)
2114 tracks += [x for x in resolved if isinstance(x, Track)]
2115
2116 return tracks
2117
2118 def _update_queue_from_player(
2119 self,
2120 player: Player,
2121 ) -> None:
2122 """Update the Queue when the player state changed."""
2123 queue_id = player.player_id
2124 queue = self._queues[queue_id]
2125
2126 # basic properties
2127 queue.display_name = player.state.name
2128 queue.available = player.state.available
2129 queue.items = len(self._queue_items[queue_id])
2130
2131 queue.state = (
2132 player.state.playback_state or PlaybackState.IDLE
2133 if queue.active
2134 else PlaybackState.IDLE
2135 )
2136 # update current item/index from player report
2137 if queue.active and queue.state in (
2138 PlaybackState.PLAYING,
2139 PlaybackState.PAUSED,
2140 ):
2141 # NOTE: If the queue is not playing (yet) we will not update the current index
2142 # to ensure we keep the previously known current index
2143 if queue.flow_mode:
2144 # flow mode active, the player is playing one long stream
2145 # so we need to calculate the current index and elapsed time
2146 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2147 elif item_id := self._parse_player_current_item_id(queue_id, player):
2148 # normal mode, the player itself will report the current item
2149 elapsed_time = int(player.state.corrected_elapsed_time or 0)
2150 current_index = self.index_by_id(queue_id, item_id)
2151 else:
2152 # this may happen if the player is still transitioning between tracks
2153 # we ignore this for now and keep the current index as is
2154 return
2155
2156 # get current/next item based on current index
2157 queue.current_index = current_index
2158 queue.current_item = current_item = self.get_item(queue_id, current_index)
2159 queue.next_item = (
2160 self.get_next_item(queue_id, current_index)
2161 if current_item and current_index is not None
2162 else None
2163 )
2164
2165 # correct elapsed time when seeking
2166 if (
2167 not queue.flow_mode
2168 and current_item
2169 and current_item.streamdetails
2170 and current_item.streamdetails.seek_position
2171 ):
2172 elapsed_time += current_item.streamdetails.seek_position
2173 queue.elapsed_time = elapsed_time
2174 queue.elapsed_time_last_updated = time.time()
2175
2176 elif not queue.current_item and queue.current_index is not None:
2177 current_index = queue.current_index
2178 queue.current_item = current_item = self.get_item(queue_id, current_index)
2179 queue.next_item = (
2180 self.get_next_item(queue_id, current_index)
2181 if current_item and current_index is not None
2182 else None
2183 )
2184
2185 # This is enough to detect any changes in the DSPDetails
2186 # (so child count changed, or any output format changed)
2187 output_formats = []
2188 if output_format := player.extra_data.get("output_format"):
2189 output_formats.append(str(output_format))
2190 for child_id in player.state.group_members:
2191 if (child := self.mass.players.get_player(child_id)) and (
2192 output_format := child.extra_data.get("output_format")
2193 ):
2194 output_formats.append(str(output_format))
2195 else:
2196 output_formats.append("unknown")
2197
2198 # basic throttle: do not send state changed events if queue did not actually change
2199 prev_state: CompareState = self._prev_states.get(
2200 queue_id,
2201 CompareState(
2202 queue_id=queue_id,
2203 state=PlaybackState.IDLE,
2204 current_item_id=None,
2205 next_item_id=None,
2206 current_item=None,
2207 elapsed_time=0,
2208 last_playing_elapsed_time=0,
2209 stream_title=None,
2210 codec_type=None,
2211 output_formats=None,
2212 ),
2213 )
2214 # update last_playing_elapsed_time only when the player is actively playing
2215 # use corrected_elapsed_time which accounts for time since last update
2216 # this preserves the last known elapsed time when transitioning to idle/paused
2217 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2218 prev_item_id = prev_state["current_item_id"]
2219 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2220 if queue.state == PlaybackState.PLAYING:
2221 current_elapsed = int(queue.corrected_elapsed_time)
2222 if current_item_id != prev_item_id:
2223 # new track started, reset the elapsed time tracker
2224 last_playing_elapsed_time = current_elapsed
2225 else:
2226 # same track, use the max of current and previous to handle timing issues
2227 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2228 else:
2229 last_playing_elapsed_time = prev_playing_elapsed
2230 new_state = CompareState(
2231 queue_id=queue_id,
2232 state=queue.state,
2233 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2234 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2235 current_item=queue.current_item,
2236 elapsed_time=int(queue.elapsed_time),
2237 last_playing_elapsed_time=last_playing_elapsed_time,
2238 stream_title=(
2239 queue.current_item.streamdetails.stream_title
2240 if queue.current_item and queue.current_item.streamdetails
2241 else None
2242 ),
2243 codec_type=(
2244 queue.current_item.streamdetails.audio_format.codec_type
2245 if queue.current_item and queue.current_item.streamdetails
2246 else None
2247 ),
2248 output_formats=output_formats,
2249 )
2250 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2251 with suppress(KeyError):
2252 changed_keys.remove("next_item_id")
2253 with suppress(KeyError):
2254 changed_keys.remove("last_playing_elapsed_time")
2255
2256 # store the new state
2257 if queue.active:
2258 self._prev_states[queue_id] = new_state
2259 else:
2260 self._prev_states.pop(queue_id, None)
2261
2262 # return early if nothing changed
2263 if len(changed_keys) == 0:
2264 return
2265
2266 # signal update and store state
2267 send_update = True
2268 if changed_keys == {"elapsed_time"}:
2269 # only elapsed time changed, do not send full queue update
2270 send_update = False
2271 prev_time = prev_state.get("elapsed_time") or 0
2272 cur_time = new_state.get("elapsed_time") or 0
2273 if abs(cur_time - prev_time) > 2:
2274 # send dedicated event for time updates when seeking
2275 self.mass.signal_event(
2276 EventType.QUEUE_TIME_UPDATED,
2277 object_id=queue_id,
2278 data=queue.elapsed_time,
2279 )
2280 # also signal update to the player itself so it can update its current_media
2281 self.mass.players.trigger_player_update(queue_id)
2282
2283 if send_update:
2284 self.signal_update(queue_id)
2285
2286 if "output_formats" in changed_keys:
2287 # refresh DSP details since they may have changed
2288 dsp = get_stream_dsp_details(self.mass, queue_id)
2289 if queue.current_item and queue.current_item.streamdetails:
2290 queue.current_item.streamdetails.dsp = dsp
2291 if queue.next_item and queue.next_item.streamdetails:
2292 queue.next_item.streamdetails.dsp = dsp
2293
2294 # handle updating stream_metadata if needed
2295 if (
2296 queue.current_item
2297 and (streamdetails := queue.current_item.streamdetails)
2298 and streamdetails.stream_metadata_update_callback
2299 and (
2300 streamdetails.stream_metadata_last_updated is None
2301 or (
2302 time.time() - streamdetails.stream_metadata_last_updated
2303 >= streamdetails.stream_metadata_update_interval
2304 )
2305 )
2306 ):
2307 streamdetails.stream_metadata_last_updated = time.time()
2308 self.mass.create_task(
2309 streamdetails.stream_metadata_update_callback(
2310 streamdetails, int(queue.corrected_elapsed_time)
2311 )
2312 )
2313
2314 # handle sending a playback progress report
2315 # we do this every 30 seconds or when the state changes
2316 if (
2317 changed_keys.intersection({"state", "current_item_id"})
2318 or int(queue.elapsed_time) % 30 == 0
2319 ):
2320 self._handle_playback_progress_report(queue, prev_state, new_state)
2321
2322 # check if we need to clear the queue if we reached the end
2323 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2324 self._handle_end_of_queue(queue, prev_state, new_state)
2325
2326 # watch dynamic radio items refill if needed
2327 if "current_item_id" in changed_keys:
2328 # auto enable radio mode if dont stop the music is enabled
2329 if (
2330 queue.dont_stop_the_music_enabled
2331 and queue.enqueued_media_items
2332 and queue.current_index is not None
2333 and (queue.items - queue.current_index) <= 1
2334 ):
2335 # We have received the last item in the queue and Don't stop the music is enabled
2336 # set the played media item(s) as radio items (which will refill the queue)
2337 # note that this will fail if there are no media items for which we have
2338 # a dynamic radio source.
2339 self.logger.debug(
2340 "End of queue detected and Don't stop the music is enabled for %s"
2341 " - setting enqueued media items as radio source: %s",
2342 queue.display_name,
2343 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2344 )
2345 queue.radio_source = queue.enqueued_media_items
2346 # auto fill radio tracks if less than 5 tracks left in the queue
2347 if (
2348 queue.radio_source
2349 and queue.current_index is not None
2350 and (queue.items - queue.current_index) < 5
2351 ):
2352 task_id = f"fill_radio_tracks_{queue_id}"
2353 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2354
2355 def _get_flow_queue_stream_index(
2356 self, queue: PlayerQueue, player: Player
2357 ) -> tuple[int | None, int]:
2358 """Calculate current queue index and current track elapsed time when flow mode is active."""
2359 elapsed_time_queue_total = player.state.corrected_elapsed_time or 0
2360 if queue.current_index is None and not queue.flow_mode_stream_log:
2361 return queue.current_index, int(queue.elapsed_time)
2362
2363 # For each track that has been streamed/buffered to the player,
2364 # a playlog entry will be created with the queue item id
2365 # and the amount of seconds streamed. We traverse the playlog to figure
2366 # out where we are in the queue, accounting for actual streamed
2367 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2368 # it will simply be in the playlog multiple times.
2369 played_time = 0.0
2370 queue_index: int | None = queue.current_index or 0
2371 track_time = 0.0
2372 for play_log_entry in queue.flow_mode_stream_log:
2373 queue_item_duration = (
2374 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2375 play_log_entry.seconds_streamed
2376 if play_log_entry.seconds_streamed is not None
2377 else play_log_entry.duration or 3600 * 24 * 7
2378 )
2379 if elapsed_time_queue_total > (queue_item_duration + played_time):
2380 # total elapsed time is more than (streamed) track duration
2381 # this track has been fully played, move on.
2382 played_time += queue_item_duration
2383 else:
2384 # no more seconds left to divide, this is our track
2385 # account for any seeking by adding the skipped/seeked seconds
2386 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2387 queue_item = self.get_item(queue.queue_id, queue_index)
2388 if queue_item and queue_item.streamdetails:
2389 track_sec_skipped = queue_item.streamdetails.seek_position
2390 else:
2391 track_sec_skipped = 0
2392 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2393 break
2394 if player.state.playback_state != PlaybackState.PLAYING:
2395 # if the player is not playing, we can't be sure that the elapsed time is correct
2396 # so we just return the queue index and the elapsed time
2397 return queue.current_index, int(queue.elapsed_time)
2398 return queue_index, int(track_time)
2399
2400 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2401 """Parse QueueItem ID from Player's current url."""
2402 protocol_player = player
2403 if player.active_output_protocol and player.active_output_protocol != "native":
2404 protocol_player = self.mass.players.get_player(player.active_output_protocol) or player
2405 if not protocol_player.current_media:
2406 # YES, we use player.current_media on purpose here because we need the raw metadata
2407 return None
2408 # prefer queue_id and queue_item_id within the current media
2409 if (
2410 protocol_player.current_media.source_id == queue_id
2411 and protocol_player.current_media.queue_item_id
2412 ):
2413 return protocol_player.current_media.queue_item_id
2414 # special case for sonos players
2415 if protocol_player.current_media.uri and protocol_player.current_media.uri.startswith(
2416 f"mass:{queue_id}"
2417 ):
2418 if protocol_player.current_media.queue_item_id:
2419 return protocol_player.current_media.queue_item_id
2420 return protocol_player.current_media.uri.split(":")[-1]
2421 # try to extract the item id from a mass stream url
2422 if (
2423 protocol_player.current_media.uri
2424 and queue_id in protocol_player.current_media.uri
2425 and self.mass.streams.base_url in protocol_player.current_media.uri
2426 ):
2427 current_item_id = protocol_player.current_media.uri.rsplit("/")[-1].split(".")[0]
2428 if self.get_item(queue_id, current_item_id):
2429 return current_item_id
2430 # try to extract the item id from a queue_id/item_id combi
2431 if (
2432 protocol_player.current_media.uri
2433 and queue_id in protocol_player.current_media.uri
2434 and "/" in protocol_player.current_media.uri
2435 ):
2436 current_item_id = protocol_player.current_media.uri.split("/")[1]
2437 if self.get_item(queue_id, current_item_id):
2438 return current_item_id
2439
2440 return None
2441
2442 def _handle_end_of_queue(
2443 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2444 ) -> None:
2445 """Check if the queue should be cleared after the current item."""
2446 # check if queue state changed to stopped (from playing/paused to idle)
2447 if not (
2448 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2449 and new_state["state"] == PlaybackState.IDLE
2450 ):
2451 return
2452 # check if no more items in the queue (next_item should be None at end of queue)
2453 if queue.next_item is not None:
2454 return
2455 # check if we had a previous item playing
2456 if prev_state["current_item_id"] is None:
2457 return
2458
2459 async def _clear_queue_delayed() -> None:
2460 for _ in range(5):
2461 await asyncio.sleep(1)
2462 if queue.state != PlaybackState.IDLE:
2463 return
2464 if queue.next_item is not None:
2465 return
2466 self.logger.info("End of queue reached, clearing items")
2467 self.clear(queue.queue_id)
2468
2469 # all checks passed, we stopped playback at the last (or single) track of the queue
2470 # now determine if the item was fully played before clearing
2471
2472 # For flow mode, check if the last track was fully streamed using the stream log
2473 # This is more reliable than elapsed_time which can be reset/incorrect
2474 if queue.flow_mode and queue.flow_mode_stream_log:
2475 last_log_entry = queue.flow_mode_stream_log[-1]
2476 if last_log_entry.seconds_streamed is not None:
2477 # The last track finished streaming, safe to clear queue
2478 self.mass.create_task(_clear_queue_delayed())
2479 return
2480
2481 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2482 prev_item = prev_state["current_item"]
2483 if prev_item and (streamdetails := prev_item.streamdetails):
2484 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2485 elif prev_item:
2486 duration = prev_item.duration or 24 * 3600
2487 else:
2488 # No current item means player has already cleared it, safe to clear queue
2489 self.mass.create_task(_clear_queue_delayed())
2490 return
2491
2492 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2493 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2494 seconds_played = int(prev_state["last_playing_elapsed_time"])
2495 # debounce this a bit to make sure we're not clearing the queue by accident
2496 # only clear if the last track was played to near completion (within 5 seconds of end)
2497 if seconds_played >= (duration or 3600) - 5:
2498 self.mass.create_task(_clear_queue_delayed())
2499
2500 def _handle_playback_progress_report(
2501 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2502 ) -> None:
2503 """Handle playback progress report."""
2504 # detect change in current index to report that a item has been played
2505 prev_item_id = prev_state["current_item_id"]
2506 cur_item_id = new_state["current_item_id"]
2507 if prev_item_id is None and cur_item_id is None:
2508 return
2509
2510 if prev_item_id is not None and prev_item_id != cur_item_id:
2511 # we have a new item, so we need report the previous one
2512 is_current_item = False
2513 item_to_report = prev_state["current_item"]
2514 seconds_played = int(prev_state["elapsed_time"])
2515 else:
2516 # report on current item
2517 is_current_item = True
2518 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2519 seconds_played = int(new_state["elapsed_time"])
2520
2521 if not item_to_report:
2522 return # guard against invalid items
2523
2524 if not (media_item := item_to_report.media_item):
2525 # only report on media items
2526 return
2527 assert media_item.uri is not None # uri is set in __post_init__
2528
2529 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2530 # Ignore items that had a stream error
2531 return
2532
2533 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2534 duration = int(item_to_report.streamdetails.duration)
2535 else:
2536 duration = int(item_to_report.duration or 3 * 3600)
2537
2538 if seconds_played < 5:
2539 # ignore items that have been played less than 5 seconds
2540 # this also filters out a bounce effect where the previous item
2541 # gets reported with 0 elapsed seconds after a new item starts playing
2542 return
2543
2544 # determine if item is fully played
2545 # for podcasts and audiobooks we account for the last 60 seconds
2546 percentage_played = percentage(seconds_played, duration)
2547 if not is_current_item and item_to_report.media_type in (
2548 MediaType.AUDIOBOOK,
2549 MediaType.PODCAST_EPISODE,
2550 ):
2551 fully_played = seconds_played >= duration - 60
2552 elif not is_current_item:
2553 # 90% of the track must be played to be considered fully played
2554 fully_played = percentage_played >= 90
2555 else:
2556 fully_played = seconds_played >= duration - 10
2557
2558 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2559 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2560 self.logger.debug(
2561 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2562 queue.display_name,
2563 "is playing" if is_playing else "played",
2564 item_to_report.name,
2565 item_to_report.uri,
2566 fully_played,
2567 f"{percentage_played}%",
2568 seconds_played,
2569 duration,
2570 )
2571 # add entry to playlog - this also handles resume of podcasts/audiobooks
2572 self.mass.create_task(
2573 self.mass.music.mark_item_played(
2574 media_item,
2575 fully_played=fully_played,
2576 seconds_played=seconds_played,
2577 is_playing=is_playing,
2578 userid=queue.userid,
2579 queue_id=queue.queue_id,
2580 user_initiated=False,
2581 )
2582 )
2583
2584 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2585 # signal 'media item played' event,
2586 # which is useful for plugins that want to do scrobbling
2587 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2588 artists_names = [a.name for a in artists]
2589 self.mass.signal_event(
2590 EventType.MEDIA_ITEM_PLAYED,
2591 object_id=media_item.uri,
2592 data=MediaItemPlaybackProgressReport(
2593 uri=media_item.uri,
2594 media_type=media_item.media_type,
2595 name=media_item.name,
2596 version=getattr(media_item, "version", None),
2597 artist=(
2598 getattr(media_item, "artist_str", None) or artists_names[0]
2599 if artists_names
2600 else None
2601 ),
2602 artists=artists_names,
2603 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2604 album=album.name if album else None,
2605 album_mbid=album.mbid if album else None,
2606 album_artist=(album.artist_str if isinstance(album, Album) else None),
2607 album_artist_mbids=(
2608 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2609 ),
2610 image_url=(
2611 self.mass.metadata.get_image_url(
2612 item_to_report.media_item.image, prefer_proxy=False
2613 )
2614 if item_to_report.media_item.image
2615 else None
2616 ),
2617 duration=duration,
2618 mbid=(getattr(media_item, "mbid", None)),
2619 seconds_played=seconds_played,
2620 fully_played=fully_played,
2621 is_playing=is_playing,
2622 userid=queue.userid,
2623 ),
2624 )
2625
2626
2627async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2628 """Shuffle queue items, avoiding identical tracks next to each other.
2629
2630 Best-effort approach to prevent the same track from appearing adjacent.
2631 Does a random shuffle first, then makes a limited number of passes to
2632 swap adjacent duplicates with a random item further in the list.
2633
2634 :param items: List of queue items to shuffle.
2635 """
2636 if len(items) <= 2:
2637 return random.sample(items, len(items)) if len(items) == 2 else items
2638
2639 # Start with a random shuffle
2640 shuffled = random.sample(items, len(items))
2641
2642 # Make a few passes to fix adjacent duplicates
2643 max_passes = 3
2644 for _ in range(max_passes):
2645 swapped = False
2646 for i in range(len(shuffled) - 1):
2647 if shuffled[i].name == shuffled[i + 1].name:
2648 # Found adjacent duplicate - swap with random position at least 2 away
2649 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2650 if swap_candidates:
2651 swap_pos = random.choice(swap_candidates)
2652 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2653 swapped = True
2654 if not swapped:
2655 break
2656 # Yield to event loop between passes
2657 await asyncio.sleep(0)
2658
2659 return shuffled
2660