/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 PlayerFeature,
32 ProviderFeature,
33 QueueOption,
34 RepeatMode,
35)
36from music_assistant_models.errors import (
37 AudioError,
38 InvalidCommand,
39 InvalidDataError,
40 MediaNotFoundError,
41 MusicAssistantError,
42 PlayerUnavailableError,
43 QueueEmpty,
44 UnsupportedFeaturedException,
45)
46from music_assistant_models.media_items import (
47 Album,
48 Artist,
49 Audiobook,
50 BrowseFolder,
51 ItemMapping,
52 MediaItemType,
53 PlayableMediaItemType,
54 Playlist,
55 Podcast,
56 PodcastEpisode,
57 Track,
58 UniqueList,
59 media_from_dict,
60)
61from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
62from music_assistant_models.player_queue import PlayerQueue
63from music_assistant_models.queue_item import QueueItem
64
65from music_assistant.constants import (
66 ATTR_ANNOUNCEMENT_IN_PROGRESS,
67 MASS_LOGO_ONLINE,
68 VERBOSE_LOG_LEVEL,
69 PlaylistPlayableItem,
70)
71from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
72from music_assistant.helpers.api import api_command
73from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
74from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
75from music_assistant.helpers.util import get_changed_keys, percentage
76from music_assistant.models.core_controller import CoreController
77from music_assistant.models.player import Player, PlayerMedia
78
79if TYPE_CHECKING:
80 from collections.abc import Iterator
81
82 from music_assistant_models.auth import User
83 from music_assistant_models.media_items.metadata import MediaItemImage
84
85 from music_assistant import MusicAssistant
86 from music_assistant.models.player import Player
87
88CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
89CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
90
91ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
92ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
93
94CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
95CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
96CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
97CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
98CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
99CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
100CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
101CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
102CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
103CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
104RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
105CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
106CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
107
108
109class CompareState(TypedDict):
110 """Simple object where we store the (previous) state of a queue.
111
112 Used for compare actions.
113 """
114
115 queue_id: str
116 state: PlaybackState
117 current_item_id: str | None
118 next_item_id: str | None
119 current_item: QueueItem | None
120 elapsed_time: int
121 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
122 # used to determine if a track was fully played when transitioning to idle
123 last_playing_elapsed_time: int
124 stream_title: str | None
125 codec_type: ContentType | None
126 output_formats: list[str] | None
127
128
129class PlayerQueuesController(CoreController):
130 """Controller holding all logic to enqueue music for players."""
131
132 domain: str = "player_queues"
133
134 def __init__(self, mass: MusicAssistant) -> None:
135 """Initialize core controller."""
136 super().__init__(mass)
137 self._queues: dict[str, PlayerQueue] = {}
138 self._queue_items: dict[str, list[QueueItem]] = {}
139 self._prev_states: dict[str, CompareState] = {}
140 self._transitioning_players: set[str] = set()
141 self.manifest.name = "Player Queues controller"
142 self.manifest.description = (
143 "Music Assistant's core controller which manages the queues for all players."
144 )
145 self.manifest.icon = "playlist-music"
146
147 async def close(self) -> None:
148 """Cleanup on exit."""
149 # stop all playback
150 for queue in self.all():
151 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
152 await self.stop(queue.queue_id)
153
154 async def get_config_entries(
155 self,
156 action: str | None = None,
157 values: dict[str, ConfigValueType] | None = None,
158 ) -> tuple[ConfigEntry, ...]:
159 """Return all Config Entries for this core module (if any)."""
160 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
161 return (
162 ConfigEntry(
163 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
164 type=ConfigEntryType.STRING,
165 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
166 label="Items to select when you play a (in-library) artist.",
167 options=[
168 ConfigValueOption(
169 title="Only in-library tracks",
170 value="library_tracks",
171 ),
172 ConfigValueOption(
173 title="All tracks from all albums in the library",
174 value="library_album_tracks",
175 ),
176 ConfigValueOption(
177 title="All (top) tracks from (all) streaming provider(s)",
178 value="all_tracks",
179 ),
180 ConfigValueOption(
181 title="All tracks from all albums from (all) streaming provider(s)",
182 value="all_album_tracks",
183 ),
184 ],
185 ),
186 ConfigEntry(
187 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
188 type=ConfigEntryType.STRING,
189 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
190 label="Items to select when you play a (in-library) album.",
191 options=[
192 ConfigValueOption(
193 title="Only in-library tracks",
194 value="library_tracks",
195 ),
196 ConfigValueOption(
197 title="All tracks for album on (streaming) provider",
198 value="all_tracks",
199 ),
200 ],
201 ),
202 ConfigEntry(
203 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
204 type=ConfigEntryType.STRING,
205 default_value=QueueOption.REPLACE.value,
206 label="Default enqueue option for Artist item(s).",
207 options=enqueue_options,
208 description="Define the default enqueue action for this mediatype.",
209 ),
210 ConfigEntry(
211 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
212 type=ConfigEntryType.STRING,
213 default_value=QueueOption.REPLACE.value,
214 label="Default enqueue option for Album item(s).",
215 options=enqueue_options,
216 description="Define the default enqueue action for this mediatype.",
217 ),
218 ConfigEntry(
219 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
220 type=ConfigEntryType.STRING,
221 default_value=QueueOption.PLAY.value,
222 label="Default enqueue option for Track item(s).",
223 options=enqueue_options,
224 description="Define the default enqueue action for this mediatype.",
225 ),
226 ConfigEntry(
227 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
228 type=ConfigEntryType.STRING,
229 default_value=QueueOption.REPLACE.value,
230 label="Default enqueue option for Radio item(s).",
231 options=enqueue_options,
232 description="Define the default enqueue action for this mediatype.",
233 ),
234 ConfigEntry(
235 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
236 type=ConfigEntryType.STRING,
237 default_value=QueueOption.REPLACE.value,
238 label="Default enqueue option for Playlist item(s).",
239 options=enqueue_options,
240 description="Define the default enqueue action for this mediatype.",
241 ),
242 ConfigEntry(
243 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
244 type=ConfigEntryType.STRING,
245 default_value=QueueOption.REPLACE.value,
246 label="Default enqueue option for Audiobook item(s).",
247 options=enqueue_options,
248 hidden=True,
249 ),
250 ConfigEntry(
251 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
252 type=ConfigEntryType.STRING,
253 default_value=QueueOption.REPLACE.value,
254 label="Default enqueue option for Podcast item(s).",
255 options=enqueue_options,
256 hidden=True,
257 ),
258 ConfigEntry(
259 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
260 type=ConfigEntryType.STRING,
261 default_value=QueueOption.REPLACE.value,
262 label="Default enqueue option for Podcast-episode item(s).",
263 options=enqueue_options,
264 hidden=True,
265 ),
266 ConfigEntry(
267 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
268 type=ConfigEntryType.STRING,
269 default_value=QueueOption.REPLACE.value,
270 label="Default enqueue option for Folder item(s).",
271 options=enqueue_options,
272 hidden=True,
273 ),
274 )
275
276 def __iter__(self) -> Iterator[PlayerQueue]:
277 """Iterate over (available) players."""
278 return iter(self._queues.values())
279
280 @api_command("player_queues/all")
281 def all(self) -> tuple[PlayerQueue, ...]:
282 """Return all registered PlayerQueues."""
283 return tuple(self._queues.values())
284
285 @api_command("player_queues/get")
286 def get(self, queue_id: str) -> PlayerQueue | None:
287 """Return PlayerQueue by queue_id or None if not found."""
288 return self._queues.get(queue_id)
289
290 @api_command("player_queues/items")
291 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
292 """Return all QueueItems for given PlayerQueue."""
293 if queue_id not in self._queue_items:
294 return []
295
296 return self._queue_items[queue_id][offset : offset + limit]
297
298 @api_command("player_queues/get_active_queue")
299 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
300 """Return the current active/synced queue for a player."""
301 if player := self.mass.players.get(player_id):
302 return self.mass.players.get_active_queue(player)
303 return None
304
305 # Queue commands
306
307 @api_command("player_queues/shuffle")
308 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
309 """Configure shuffle setting on the the queue."""
310 queue = self._queues[queue_id]
311 if queue.shuffle_enabled == shuffle_enabled:
312 return # no change
313 queue.shuffle_enabled = shuffle_enabled
314 queue_items = self._queue_items[queue_id]
315 cur_index = queue.index_in_buffer or queue.current_index
316 if cur_index is not None:
317 next_index = cur_index + 1
318 next_items = queue_items[next_index:]
319 else:
320 next_items = []
321 next_index = 0
322 if not shuffle_enabled:
323 # shuffle disabled, try to restore original sort order of the remaining items
324 next_items.sort(key=lambda x: x.sort_index, reverse=False)
325 await self.load(
326 queue_id=queue_id,
327 queue_items=next_items,
328 insert_at_index=next_index,
329 keep_remaining=False,
330 shuffle=shuffle_enabled,
331 )
332
333 @api_command("player_queues/dont_stop_the_music")
334 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
335 """Configure Don't stop the music setting on the queue."""
336 providers_available_with_similar_tracks = any(
337 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
338 for provider in self.mass.music.providers
339 )
340 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
341 raise UnsupportedFeaturedException(
342 "Don't stop the music is not supported by any of the available music providers"
343 )
344 queue = self._queues[queue_id]
345 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
346 self.signal_update(queue_id=queue_id)
347 # if this happens to be the last track in the queue, fill the radio source
348 if (
349 queue.dont_stop_the_music_enabled
350 and queue.enqueued_media_items
351 and queue.current_index is not None
352 and (queue.items - queue.current_index) <= 1
353 ):
354 queue.radio_source = queue.enqueued_media_items
355 task_id = f"fill_radio_tracks_{queue_id}"
356 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
357
358 @api_command("player_queues/repeat")
359 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
360 """Configure repeat setting on the the queue."""
361 queue = self._queues[queue_id]
362 if queue.repeat_mode == repeat_mode:
363 return # no change
364 queue.repeat_mode = repeat_mode
365 self.signal_update(queue_id)
366 if (
367 queue.state == PlaybackState.PLAYING
368 and queue.index_in_buffer is not None
369 and queue.index_in_buffer == queue.current_index
370 ):
371 # if the queue is playing,
372 # ensure to (re)queue the next track because it might have changed
373 # note that we only do this if the player has loaded the current track
374 # if not, we wait until it has loaded to prevent conflicts
375 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
376 self._enqueue_next_item(queue_id, next_item)
377
378 @api_command("player_queues/play_media")
379 async def play_media(
380 self,
381 queue_id: str,
382 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
383 option: QueueOption | None = None,
384 radio_mode: bool = False,
385 start_item: PlayableMediaItemType | str | None = None,
386 username: str | None = None,
387 ) -> None:
388 """Play media item(s) on the given queue.
389
390 :param queue_id: The queue_id of the queue to play media on.
391 :param media: Media that should be played (MediaItem(s) and/or uri's).
392 :param option: Which enqueue mode to use.
393 :param radio_mode: Enable radio mode for the given item(s).
394 :param start_item: Optional item to start the playlist or album from.
395 :param username: The username of the user requesting the playback.
396 Setting the username allows for overriding the logged-in user
397 to account for playback history per user when the play_media is
398 called from a shared context (like a web hook or automation).
399 """
400 # ruff: noqa: PLR0915
401 # we use a contextvar to bypass the throttler for this asyncio task/context
402 # this makes sure that playback has priority over other requests that may be
403 # happening in the background
404 BYPASS_THROTTLER.set(True)
405 if not (queue := self.get(queue_id)):
406 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
407 # always fetch the underlying player so we can raise early if its not available
408 queue_player = self.mass.players.get(queue_id, True)
409 assert queue_player is not None # for type checking
410 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
411 self.logger.warning("Ignore queue command: An announcement is in progress")
412 return
413
414 # save the user requesting the playback
415 playback_user: User | None
416 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
417 playback_user = user
418 else:
419 playback_user = get_current_user()
420 queue.userid = playback_user.user_id if playback_user else None
421
422 # a single item or list of items may be provided
423 media_list = media if isinstance(media, list) else [media]
424
425 # clear queue if needed
426 if option == QueueOption.REPLACE:
427 self.clear(queue_id)
428 # Clear the 'enqueued media item' list when a new queue is requested
429 if option not in (QueueOption.ADD, QueueOption.NEXT):
430 queue.enqueued_media_items.clear()
431
432 media_items: list[MediaItemType] = []
433 radio_source: list[MediaItemType] = []
434 # resolve all media items
435 for item in media_list:
436 try:
437 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
438 media_item: MediaItemType | ItemMapping | BrowseFolder
439 if isinstance(item, str):
440 media_item = await self.mass.music.get_item_by_uri(item)
441 elif isinstance(item, dict): # type: ignore[unreachable]
442 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
443 # converting them to MediaItem objects. The parse_value function in api.py
444 # should handle dict-to-object conversion, but dicts are slipping through
445 # in some cases. This is defensive handling for that parser bug.
446 media_item = media_from_dict(item) # type: ignore[unreachable]
447 self.logger.debug("Converted to: %s", type(media_item))
448 else:
449 # item is MediaItemType | ItemMapping at this point
450 media_item = item
451
452 # Save requested media item to play on the queue so we can use it as a source
453 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
454 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
455 if not isinstance(
456 media_item, (ItemMapping, BrowseFolder)
457 ) and media_item.media_type in (
458 MediaType.TRACK,
459 MediaType.ALBUM,
460 MediaType.PLAYLIST,
461 MediaType.ARTIST,
462 ):
463 queue.enqueued_media_items.append(media_item)
464 if len(queue.enqueued_media_items) > 10:
465 queue.enqueued_media_items.pop(0)
466
467 # handle default enqueue option if needed
468 if option is None:
469 config_value = await self.mass.config.get_core_config_value(
470 self.domain,
471 f"default_enqueue_option_{media_item.media_type.value}",
472 return_type=str,
473 )
474 option = QueueOption(config_value)
475 if option == QueueOption.REPLACE:
476 self.clear(queue_id, skip_stop=True)
477
478 # collect media_items to play
479 if radio_mode:
480 # Type guard for mypy - only add full MediaItemType to radio_source
481 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
482 radio_source.append(media_item)
483 else:
484 # Convert start_item to string URI if needed
485 start_item_uri: str | None = None
486 if isinstance(start_item, str):
487 start_item_uri = start_item
488 elif start_item is not None:
489 start_item_uri = start_item.uri
490 media_items += await self._resolve_media_items(
491 media_item, start_item_uri, queue_id=queue_id
492 )
493
494 except MusicAssistantError as err:
495 # invalid MA uri or item not found error
496 self.logger.warning("Skipping %s: %s", item, str(err))
497
498 # overwrite or append radio source items
499 if option not in (QueueOption.ADD, QueueOption.NEXT):
500 queue.radio_source = radio_source
501 else:
502 queue.radio_source += radio_source
503 # Use collected media items to calculate the radio if radio mode is on
504 if radio_mode:
505 radio_tracks = await self._get_radio_tracks(
506 queue_id=queue_id, is_initial_radio_mode=True
507 )
508 media_items = list(radio_tracks)
509
510 # only add valid/available items
511 queue_items: list[QueueItem] = []
512 for x in media_items:
513 if not x or not x.available:
514 continue
515 queue_items.append(
516 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
517 )
518
519 if not queue_items:
520 raise MediaNotFoundError("No playable items found")
521
522 # load the items into the queue
523 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
524 cur_index = queue.index_in_buffer or queue.current_index or 0
525 else:
526 cur_index = queue.current_index or 0
527 insert_at_index = cur_index + 1
528 # Radio modes are already shuffled in a pattern we would like to keep.
529 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
530
531 # handle replace: clear all items and replace with the new items
532 if option == QueueOption.REPLACE:
533 await self.load(
534 queue_id,
535 queue_items=queue_items,
536 keep_remaining=False,
537 keep_played=False,
538 shuffle=shuffle,
539 )
540 await self.play_index(queue_id, 0)
541 return
542 # handle next: add item(s) in the index next to the playing/loaded/buffered index
543 if option == QueueOption.NEXT:
544 await self.load(
545 queue_id,
546 queue_items=queue_items,
547 insert_at_index=insert_at_index,
548 shuffle=shuffle,
549 )
550 return
551 if option == QueueOption.REPLACE_NEXT:
552 await self.load(
553 queue_id,
554 queue_items=queue_items,
555 insert_at_index=insert_at_index,
556 keep_remaining=False,
557 shuffle=shuffle,
558 )
559 return
560 # handle play: replace current loaded/playing index with new item(s)
561 if option == QueueOption.PLAY:
562 await self.load(
563 queue_id,
564 queue_items=queue_items,
565 insert_at_index=insert_at_index,
566 shuffle=shuffle,
567 )
568 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
569 await self.play_index(queue_id, next_index)
570 return
571 # handle add: add/append item(s) to the remaining queue items
572 if option == QueueOption.ADD:
573 await self.load(
574 queue_id=queue_id,
575 queue_items=queue_items,
576 insert_at_index=insert_at_index
577 if queue.shuffle_enabled
578 else len(self._queue_items[queue_id]) + 1,
579 shuffle=queue.shuffle_enabled,
580 )
581 # handle edgecase, queue is empty and items are only added (not played)
582 # mark first item as new index
583 if queue.current_index is None:
584 queue.current_index = 0
585 queue.current_item = self.get_item(queue_id, 0)
586 queue.items = len(queue_items)
587 self.signal_update(queue_id)
588
589 @api_command("player_queues/move_item")
590 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
591 """
592 Move queue item x up/down the queue.
593
594 - queue_id: id of the queue to process this request.
595 - queue_item_id: the item_id of the queueitem that needs to be moved.
596 - pos_shift: move item x positions down if positive value
597 - pos_shift: move item x positions up if negative value
598 - pos_shift: move item to top of queue as next item if 0.
599 """
600 queue = self._queues[queue_id]
601 item_index = self.index_by_id(queue_id, queue_item_id)
602 if item_index is None:
603 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
604 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
605 msg = f"{item_index} is already played/buffered"
606 raise IndexError(msg)
607
608 queue_items = self._queue_items[queue_id]
609 queue_items = queue_items.copy()
610
611 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
612 new_index = (queue.current_index or 0) + 1
613 elif pos_shift == 0:
614 new_index = queue.current_index or 0
615 else:
616 new_index = item_index + pos_shift
617 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
618 return
619 # move the item in the list
620 queue_items.insert(new_index, queue_items.pop(item_index))
621 self.update_items(queue_id, queue_items)
622
623 @api_command("player_queues/move_item_end")
624 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
625 """
626 Move queue item to the end the queue.
627
628 - queue_id: id of the queue to process this request.
629 - queue_item_id: the item_id of the queueitem that needs to be moved.
630 """
631 queue = self._queues[queue_id]
632 item_index = self.index_by_id(queue_id, queue_item_id)
633 if item_index is None:
634 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
635 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
636 msg = f"{item_index} is already played/buffered"
637 raise IndexError(msg)
638
639 queue_items = self._queue_items[queue_id]
640 if item_index == (len(queue_items) - 1):
641 return
642 queue_items = queue_items.copy()
643
644 new_index = len(self._queue_items[queue_id]) - 1
645
646 # move the item in the list
647 queue_items.insert(new_index, queue_items.pop(item_index))
648 self.update_items(queue_id, queue_items)
649
650 @api_command("player_queues/delete_item")
651 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
652 """Delete item (by id or index) from the queue."""
653 if isinstance(item_id_or_index, str):
654 item_index = self.index_by_id(queue_id, item_id_or_index)
655 if item_index is None:
656 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
657 else:
658 item_index = item_id_or_index
659 queue = self._queues[queue_id]
660 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
661 # ignore request if track already loaded in the buffer
662 # the frontend should guard so this is just in case
663 self.logger.warning("delete requested for item already loaded in buffer")
664 return
665 queue_items = self._queue_items[queue_id]
666 queue_items.pop(item_index)
667 self.update_items(queue_id, queue_items)
668
669 @api_command("player_queues/clear")
670 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
671 """Clear all items in the queue."""
672 queue = self._queues[queue_id]
673 queue.radio_source = []
674 if queue.state != PlaybackState.IDLE and not skip_stop:
675 self.mass.create_task(self.stop(queue_id))
676 queue.current_index = None
677 queue.current_item = None
678 queue.elapsed_time = 0
679 queue.elapsed_time_last_updated = time.time()
680 queue.index_in_buffer = None
681 self.update_items(queue_id, [])
682
683 @api_command("player_queues/stop")
684 async def stop(self, queue_id: str) -> None:
685 """
686 Handle STOP command for given queue.
687
688 - queue_id: queue_id of the playerqueue to handle the command.
689 """
690 queue_player = self.mass.players.get(queue_id, True)
691 if queue_player is None:
692 raise PlayerUnavailableError(f"Player {queue_id} is not available")
693 if (queue := self.get(queue_id)) and queue.active:
694 if queue.state == PlaybackState.PLAYING:
695 queue.resume_pos = int(queue.corrected_elapsed_time)
696 # forward the actual command to the player
697 if temp_player := self.mass.players.get(queue_id):
698 await temp_player.stop()
699
700 @api_command("player_queues/play")
701 async def play(self, queue_id: str) -> None:
702 """
703 Handle PLAY command for given queue.
704
705 - queue_id: queue_id of the playerqueue to handle the command.
706 """
707 queue_player = self.mass.players.get(queue_id, True)
708 if queue_player is None:
709 raise PlayerUnavailableError(f"Player {queue_id} is not available")
710 if (
711 (queue := self._queues.get(queue_id))
712 and queue.active
713 and queue.state == PlaybackState.PAUSED
714 ):
715 # forward the actual play/unpause command to the player
716 await queue_player.play()
717 return
718 # player is not paused, perform resume instead
719 await self.resume(queue_id)
720
721 @api_command("player_queues/pause")
722 async def pause(self, queue_id: str) -> None:
723 """Handle PAUSE command for given queue.
724
725 - queue_id: queue_id of the playerqueue to handle the command.
726 """
727 if queue := self._queues.get(queue_id):
728 if queue.state == PlaybackState.PLAYING:
729 queue.resume_pos = int(queue.corrected_elapsed_time)
730 # forward the actual command to the player controller
731 queue_player = self.mass.players.get(queue_id)
732 assert queue_player is not None # for type checking
733 if not (self.mass.players.get_player_provider(queue_id)):
734 return # guard
735
736 if PlayerFeature.PAUSE not in queue_player.supported_features:
737 # if player does not support pause, we need to send stop
738 await queue_player.stop()
739 return
740 await queue_player.pause()
741
742 async def _watch_pause() -> None:
743 count = 0
744 # wait for pause
745 while count < 5 and queue_player.playback_state == PlaybackState.PLAYING:
746 count += 1
747 await asyncio.sleep(1)
748 # wait for unpause
749 if queue_player.playback_state != PlaybackState.PAUSED:
750 return
751 count = 0
752 while count < 30 and queue_player.playback_state == PlaybackState.PAUSED:
753 count += 1
754 await asyncio.sleep(1)
755 # if player is still paused when the limit is reached, send stop
756 if queue_player.playback_state == PlaybackState.PAUSED:
757 await queue_player.stop()
758
759 # we auto stop a player from paused when its paused for 30 seconds
760 if not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
761 self.mass.create_task(_watch_pause())
762
763 @api_command("player_queues/play_pause")
764 async def play_pause(self, queue_id: str) -> None:
765 """Toggle play/pause on given playerqueue.
766
767 - queue_id: queue_id of the queue to handle the command.
768 """
769 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
770 await self.pause(queue_id)
771 return
772 await self.play(queue_id)
773
774 @api_command("player_queues/next")
775 async def next(self, queue_id: str) -> None:
776 """Handle NEXT TRACK command for given queue.
777
778 - queue_id: queue_id of the queue to handle the command.
779 """
780 if (queue := self.get(queue_id)) is None or not queue.active:
781 # TODO: forward to underlying player if not active
782 return
783 idx = self._queues[queue_id].current_index
784 if idx is None:
785 self.logger.warning("Queue %s has no current index", queue.display_name)
786 return
787 attempts = 5
788 while attempts:
789 try:
790 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
791 await self.play_index(queue_id, next_index, debounce=True)
792 break
793 except MediaNotFoundError:
794 self.logger.warning(
795 "Failed to fetch next track for queue %s - trying next item",
796 queue.display_name,
797 )
798 idx += 1
799 attempts -= 1
800
801 @api_command("player_queues/previous")
802 async def previous(self, queue_id: str) -> None:
803 """Handle PREVIOUS TRACK command for given queue.
804
805 - queue_id: queue_id of the queue to handle the command.
806 """
807 if (queue := self.get(queue_id)) is None or not queue.active:
808 # TODO: forward to underlying player if not active
809 return
810 current_index = self._queues[queue_id].current_index
811 if current_index is None:
812 return
813 next_index = int(current_index)
814 # restart current track if current track has played longer than 4
815 # otherwise skip to previous track
816 if self._queues[queue_id].elapsed_time < 5:
817 next_index = max(current_index - 1, 0)
818 await self.play_index(queue_id, next_index, debounce=True)
819
820 @api_command("player_queues/skip")
821 async def skip(self, queue_id: str, seconds: int = 10) -> None:
822 """Handle SKIP command for given queue.
823
824 - queue_id: queue_id of the queue to handle the command.
825 - seconds: number of seconds to skip in track. Use negative value to skip back.
826 """
827 if (queue := self.get(queue_id)) is None or not queue.active:
828 # TODO: forward to underlying player if not active
829 return
830 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
831
832 @api_command("player_queues/seek")
833 async def seek(self, queue_id: str, position: int = 10) -> None:
834 """Handle SEEK command for given queue.
835
836 - queue_id: queue_id of the queue to handle the command.
837 - position: position in seconds to seek to in the current playing item.
838 """
839 if not (queue := self.get(queue_id)):
840 return
841 queue_player = self.mass.players.get(queue_id, True)
842 if queue_player is None:
843 raise PlayerUnavailableError(f"Player {queue_id} is not available")
844 if not queue.current_item:
845 raise InvalidCommand(f"Queue {queue_player.display_name} has no item(s) loaded.")
846 if not queue.current_item.duration:
847 raise InvalidCommand("Can not seek items without duration.")
848 position = max(0, int(position))
849 if position > queue.current_item.duration:
850 raise InvalidCommand("Can not seek outside of duration range.")
851 if queue.current_index is None:
852 raise InvalidCommand(f"Queue {queue_player.display_name} has no current index.")
853 await self.play_index(queue_id, queue.current_index, seek_position=position)
854
855 @api_command("player_queues/resume")
856 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
857 """Handle RESUME command for given queue.
858
859 - queue_id: queue_id of the queue to handle the command.
860 """
861 queue = self._queues[queue_id]
862 queue_items = self._queue_items[queue_id]
863 resume_item = queue.current_item
864 if queue.state == PlaybackState.PLAYING:
865 # resume requested while already playing,
866 # use current position as resume position
867 resume_pos = queue.corrected_elapsed_time
868 fade_in = False
869 else:
870 resume_pos = queue.resume_pos or queue.elapsed_time
871
872 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
873 resume_item = self.get_item(queue_id, queue.current_index)
874 resume_pos = 0
875 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
876 # items available in queue but no previous track, start at 0
877 resume_item = self.get_item(queue_id, 0)
878 resume_pos = 0
879
880 if resume_item is not None:
881 queue_player = self.mass.players.get(queue_id)
882 if queue_player is None:
883 raise PlayerUnavailableError(f"Player {queue_id} is not available")
884 if (
885 fade_in is None
886 and queue_player.playback_state == PlaybackState.IDLE
887 and (time.time() - queue.elapsed_time_last_updated) > 60
888 ):
889 # enable fade in effect if the player is idle for a while
890 fade_in = resume_pos > 0
891 if resume_item.media_type == MediaType.RADIO:
892 # we're not able to skip in online radio so this is pointless
893 resume_pos = 0
894 await self.play_index(
895 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
896 )
897 else:
898 # Queue is empty, try to resume from playlog
899 if await self._try_resume_from_playlog(queue):
900 return
901 msg = f"Resume queue requested but queue {queue.display_name} is empty"
902 raise QueueEmpty(msg)
903
904 @api_command("player_queues/play_index")
905 async def play_index(
906 self,
907 queue_id: str,
908 index: int | str,
909 seek_position: int = 0,
910 fade_in: bool = False,
911 debounce: bool = False,
912 ) -> None:
913 """Play item at index (or item_id) X in queue."""
914 queue = self._queues[queue_id]
915 queue.resume_pos = 0
916 if isinstance(index, str):
917 temp_index = self.index_by_id(queue_id, index)
918 if temp_index is None:
919 raise InvalidDataError(f"Item {index} not found in queue")
920 index = temp_index
921 # At this point index is guaranteed to be int
922 queue.current_index = index
923 # update current item and elapsed time and signal update
924 # this way the UI knows immediately that a new item is loading
925 queue.current_item = self.get_item(queue_id, index)
926 queue.elapsed_time = seek_position
927 queue.elapsed_time_last_updated = time.time()
928 self.signal_update(queue_id)
929 queue.index_in_buffer = index
930 queue.flow_mode_stream_log = []
931 target_player = self.mass.players.get(queue_id)
932 if target_player is None:
933 raise PlayerUnavailableError(f"Player {queue_id} is not available")
934 queue.next_item_id_enqueued = None
935 # always update session id when we start a new playback session
936 queue.session_id = shortuuid.random(length=8)
937 # handle resume point of audiobook(chapter) or podcast(episode)
938 if (
939 not seek_position
940 and (queue_item := self.get_item(queue_id, index))
941 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
942 ):
943 seek_position = max(0, int((resume_position_ms - 500) / 1000))
944
945 # send play_media request to player
946 # NOTE that we debounce this a bit to account for someone hitting the next button
947 # like a madman. This will prevent the player from being overloaded with requests.
948 async def _play_index(index: int, debounce: bool) -> None:
949 for attempt in range(5):
950 try:
951 queue_item = self.get_item(queue_id, index)
952 if not queue_item:
953 continue # guard
954 await self._load_item(
955 queue_item,
956 self._get_next_index(queue_id, index),
957 is_start=True,
958 seek_position=seek_position if attempt == 0 else 0,
959 fade_in=fade_in if attempt == 0 else False,
960 )
961 # if we reach this point, loading the item succeeded, break the loop
962 queue.current_index = index
963 queue.current_item = queue_item
964 break
965 except (MediaNotFoundError, AudioError):
966 # the requested index can not be played.
967 if queue_item:
968 self.logger.warning(
969 "Skipping unplayable item %s (%s)",
970 queue_item.name,
971 queue_item.uri,
972 )
973 queue_item.available = False
974 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
975 if next_index is None:
976 raise MediaNotFoundError("No next item available")
977 index = next_index
978 else:
979 # all attempts to find a playable item failed
980 raise MediaNotFoundError("No playable item found to start playback")
981
982 # work out if we need to use flow mode
983 flow_mode = target_player.flow_mode and queue_item.media_type not in (
984 # don't use flow mode for duration-less streams
985 MediaType.RADIO,
986 MediaType.PLUGIN_SOURCE,
987 )
988 await asyncio.sleep(0.5 if debounce else 0.1)
989 queue.flow_mode = flow_mode
990 await self.mass.players.play_media(
991 player_id=queue_id,
992 media=await self.player_media_from_queue_item(queue_item, flow_mode),
993 )
994 queue.current_index = index
995 queue.current_item = queue_item
996 await asyncio.sleep(2)
997 self._transitioning_players.discard(queue_id)
998
999 # we set a flag to notify the update logic that we're transitioning to a new track
1000 self._transitioning_players.add(queue_id)
1001
1002 # we debounce the play_index command to handle the case where someone
1003 # is spamming next/previous on the player
1004 task_id = f"play_index_{queue_id}"
1005 if existing_task := self.mass.get_task(task_id):
1006 existing_task.cancel()
1007 with suppress(asyncio.CancelledError):
1008 await existing_task
1009 task = self.mass.create_task(
1010 _play_index,
1011 index,
1012 debounce,
1013 task_id=task_id,
1014 )
1015 await task
1016 self.signal_update(queue_id)
1017
1018 @api_command("player_queues/transfer")
1019 async def transfer_queue(
1020 self,
1021 source_queue_id: str,
1022 target_queue_id: str,
1023 auto_play: bool | None = None,
1024 ) -> None:
1025 """Transfer queue to another queue."""
1026 if not (source_queue := self.get(source_queue_id)):
1027 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1028 if not (target_queue := self.get(target_queue_id)):
1029 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1030 if auto_play is None:
1031 auto_play = source_queue.state == PlaybackState.PLAYING
1032
1033 target_player = self.mass.players.get(target_queue_id)
1034 if target_player is None:
1035 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1036 if target_player.active_group or target_player.synced_to:
1037 # edge case: the user wants to move playback from the group as a whole, to a single
1038 # player in the group or it is grouped and the command targeted at the single player.
1039 # We need to dissolve the group first.
1040 group_id = target_player.active_group or target_player.synced_to
1041 assert group_id is not None # checked in if condition above
1042 await self.mass.players.cmd_ungroup(group_id)
1043 await asyncio.sleep(3)
1044
1045 source_items = self._queue_items[source_queue_id]
1046 target_queue.repeat_mode = source_queue.repeat_mode
1047 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1048 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1049 target_queue.radio_source = source_queue.radio_source
1050 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1051 target_queue.resume_pos = int(source_queue.elapsed_time)
1052 target_queue.current_index = source_queue.current_index
1053 if source_queue.current_item:
1054 target_queue.current_item = source_queue.current_item
1055 target_queue.current_item.queue_id = target_queue_id
1056 self.clear(source_queue_id)
1057
1058 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1059 for item in source_items:
1060 item.queue_id = target_queue_id
1061 self.update_items(target_queue_id, source_items)
1062 if auto_play:
1063 await self.resume(target_queue_id)
1064
1065 # Interaction with player
1066
1067 async def on_player_register(self, player: Player) -> None:
1068 """Register PlayerQueue for given player/queue id."""
1069 queue_id = player.player_id
1070 queue: PlayerQueue | None = None
1071 queue_items: list[QueueItem] = []
1072 # try to restore previous state
1073 if prev_state := await self.mass.cache.get(
1074 key=queue_id,
1075 provider=self.domain,
1076 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1077 ):
1078 try:
1079 queue = PlayerQueue.from_dict(prev_state)
1080 prev_items = await self.mass.cache.get(
1081 key=queue_id,
1082 provider=self.domain,
1083 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1084 default=[],
1085 )
1086 queue_items = []
1087 for idx, item_data in enumerate(prev_items):
1088 qi = QueueItem.from_cache(item_data)
1089 if not qi.media_item:
1090 # Skip items with missing media_item - this can happen if
1091 # MA was killed during shutdown while cache was being written
1092 self.logger.debug(
1093 "Skipping queue item %s (index %d) restored from cache "
1094 "without media_item",
1095 qi.name,
1096 idx,
1097 )
1098 continue
1099 queue_items.append(qi)
1100 if queue.enqueued_media_items:
1101 # we need to restore the MediaItem objects for the enqueued media items
1102 # Items from cache may be dicts that need deserialization
1103 restored_enqueued_items: list[MediaItemType] = []
1104 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1105 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1106 )
1107 for item in cached_items:
1108 if isinstance(item, dict):
1109 restored_item = media_from_dict(item)
1110 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1111 else:
1112 restored_enqueued_items.append(item)
1113 queue.enqueued_media_items = restored_enqueued_items
1114 except Exception as err:
1115 self.logger.warning(
1116 "Failed to restore the queue(items) for %s - %s",
1117 player.display_name,
1118 str(err),
1119 )
1120 # Reset to clean state on failure
1121 queue = None
1122 queue_items = []
1123 if queue is None:
1124 queue = PlayerQueue(
1125 queue_id=queue_id,
1126 active=False,
1127 display_name=player.display_name,
1128 available=player.available,
1129 dont_stop_the_music_enabled=False,
1130 items=0,
1131 )
1132
1133 self._queues[queue_id] = queue
1134 self._queue_items[queue_id] = queue_items
1135 # always call update to calculate state etc
1136 self.on_player_update(player, {})
1137 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1138
1139 def on_player_update(
1140 self,
1141 player: Player,
1142 changed_values: dict[str, tuple[Any, Any]],
1143 ) -> None:
1144 """
1145 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1146
1147 NOTE: This is called every second if the player is playing.
1148 """
1149 queue_id = player.player_id
1150 if (queue := self._queues.get(queue_id)) is None:
1151 # race condition
1152 return
1153 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1154 # do nothing while the announcement is in progress
1155 return
1156 # determine if this queue is currently active for this player
1157 queue.active = player.active_source in (queue.queue_id, None)
1158 if not queue.active and queue_id not in self._prev_states:
1159 queue.state = PlaybackState.IDLE
1160 # return early if the queue is not active and we have no previous state
1161 return
1162 if queue.queue_id in self._transitioning_players:
1163 # we're currently transitioning to a new track,
1164 # ignore updates from the player during this time
1165 return
1166
1167 # queue is active and preflight checks passed, update the queue details
1168 self._update_queue_from_player(player)
1169
1170 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1171 """Call when a player is removed from the registry."""
1172 if permanent:
1173 # if the player is permanently removed, we also remove the cached queue data
1174 self.mass.create_task(
1175 self.mass.cache.delete(
1176 key=player_id,
1177 provider=self.domain,
1178 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1179 )
1180 )
1181 self.mass.create_task(
1182 self.mass.cache.delete(
1183 key=player_id,
1184 provider=self.domain,
1185 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1186 )
1187 )
1188 self._queues.pop(player_id, None)
1189 self._queue_items.pop(player_id, None)
1190
1191 async def load_next_queue_item(
1192 self,
1193 queue_id: str,
1194 current_item_id: str,
1195 ) -> QueueItem:
1196 """
1197 Call when a player wants the next queue item to play.
1198
1199 Raises QueueEmpty if there are no more tracks left.
1200 """
1201 queue = self.get(queue_id)
1202 if not queue:
1203 msg = f"PlayerQueue {queue_id} is not available"
1204 raise PlayerUnavailableError(msg)
1205 cur_index = self.index_by_id(queue_id, current_item_id)
1206 if cur_index is None:
1207 # this is just a guard for bad data
1208 raise QueueEmpty("Invalid item id for queue given.")
1209 next_item: QueueItem | None = None
1210 idx = 0
1211 while True:
1212 next_index = self._get_next_index(queue_id, cur_index + idx)
1213 if next_index is None:
1214 raise QueueEmpty("No more tracks left in the queue.")
1215 queue_item = self.get_item(queue_id, next_index)
1216 if queue_item is None:
1217 raise QueueEmpty("No more tracks left in the queue.")
1218 if idx >= 10:
1219 # we only allow 10 retries to prevent infinite loops
1220 raise QueueEmpty("No more (playable) tracks left in the queue.")
1221 try:
1222 await self._load_item(queue_item, next_index)
1223 # we're all set, this is our next item
1224 next_item = queue_item
1225 break
1226 except (MediaNotFoundError, AudioError):
1227 # No stream details found, skip this QueueItem
1228 self.logger.warning(
1229 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1230 )
1231 queue_item.available = False
1232 idx += 1
1233 if idx != 0:
1234 # we skipped some items, signal a queue items update
1235 self.update_items(queue_id, self._queue_items[queue_id])
1236 if next_item is None:
1237 raise QueueEmpty("No more (playable) tracks left in the queue.")
1238
1239 return next_item
1240
1241 async def _load_item(
1242 self,
1243 queue_item: QueueItem,
1244 next_index: int | None,
1245 is_start: bool = False,
1246 seek_position: int = 0,
1247 fade_in: bool = False,
1248 ) -> None:
1249 """Try to load the stream details for the given queue item."""
1250 queue_id = queue_item.queue_id
1251 queue = self._queues[queue_id]
1252
1253 # we use a contextvar to bypass the throttler for this asyncio task/context
1254 # this makes sure that playback has priority over other requests that may be
1255 # happening in the background
1256 BYPASS_THROTTLER.set(True)
1257
1258 self.logger.debug(
1259 "(pre)loading (next) item for queue %s...",
1260 queue.display_name,
1261 )
1262
1263 if not queue_item.available:
1264 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1265
1266 # work out if we are playing an album and if we should prefer album
1267 # loudness
1268 next_track_from_same_album = (
1269 next_index is not None
1270 and (next_item := self.get_item(queue_id, next_index))
1271 and (
1272 queue_item.media_item
1273 and hasattr(queue_item.media_item, "album")
1274 and queue_item.media_item.album
1275 and next_item.media_item
1276 and hasattr(next_item.media_item, "album")
1277 and next_item.media_item.album
1278 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1279 )
1280 )
1281 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1282 if current_index is None:
1283 previous_track_from_same_album = False
1284 else:
1285 previous_index = max(current_index - 1, 0)
1286 previous_track_from_same_album = (
1287 previous_index > 0
1288 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1289 and previous_item.media_item is not None
1290 and hasattr(previous_item.media_item, "album")
1291 and previous_item.media_item.album is not None
1292 and queue_item.media_item is not None
1293 and hasattr(queue_item.media_item, "album")
1294 and queue_item.media_item.album is not None
1295 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1296 )
1297 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1298 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1299 album = queue_item.media_item.album
1300 # prefer the full library media item so we have all metadata and provider(quality) info
1301 # always request the full library item as there might be other qualities available
1302 if library_item := await self.mass.music.get_library_item_by_prov_id(
1303 queue_item.media_item.media_type,
1304 queue_item.media_item.item_id,
1305 queue_item.media_item.provider,
1306 ):
1307 queue_item.media_item = cast("Track", library_item)
1308 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1309 "ytmusic"
1310 ):
1311 # Youtube Music has poor thumbs by default, so we always fetch the full item
1312 # this also catches the case where they have an unavailable item in a listing
1313 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1314 queue_item.media_item = cast("Track", fetched_item)
1315
1316 # ensure we got the full (original) album set
1317 if album and (
1318 library_album := await self.mass.music.get_library_item_by_prov_id(
1319 album.media_type,
1320 album.item_id,
1321 album.provider,
1322 )
1323 ):
1324 queue_item.media_item.album = cast("Album", library_album)
1325 elif album:
1326 # Restore original album if we have no better alternative from the library
1327 queue_item.media_item.album = album
1328 # prefer album image over track image
1329 if queue_item.media_item.album and queue_item.media_item.album.image:
1330 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1331 queue_item.media_item.metadata.images = UniqueList(
1332 [
1333 queue_item.media_item.album.image,
1334 *org_images,
1335 ]
1336 )
1337 # Fetch the streamdetails, which could raise in case of an unplayable item.
1338 # For example, YT Music returns Radio Items that are not playable.
1339 queue_item.streamdetails = await get_stream_details(
1340 mass=self.mass,
1341 queue_item=queue_item,
1342 seek_position=seek_position,
1343 fade_in=fade_in,
1344 prefer_album_loudness=bool(playing_album_tracks),
1345 )
1346
1347 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1348 """Call when a player has (started) loading a track in the buffer."""
1349 queue = self.get(queue_id)
1350 if not queue:
1351 msg = f"PlayerQueue {queue_id} is not available"
1352 raise PlayerUnavailableError(msg)
1353 # store the index of the item that is currently (being) loaded in the buffer
1354 # which helps us a bit to determine how far the player has buffered ahead
1355 queue.index_in_buffer = self.index_by_id(queue_id, item_id)
1356 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1357 self.signal_update(queue_id)
1358 # preload next streamdetails
1359 self._preload_next_item(queue_id, item_id)
1360
1361 # Main queue manipulation methods
1362
1363 async def load(
1364 self,
1365 queue_id: str,
1366 queue_items: list[QueueItem],
1367 insert_at_index: int = 0,
1368 keep_remaining: bool = True,
1369 keep_played: bool = True,
1370 shuffle: bool = False,
1371 ) -> None:
1372 """Load new items at index.
1373
1374 - queue_id: id of the queue to process this request.
1375 - queue_items: a list of QueueItems
1376 - insert_at_index: insert the item(s) at this index
1377 - keep_remaining: keep the remaining items after the insert
1378 - shuffle: (re)shuffle the items after insert index
1379 """
1380 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1381 next_items = queue_items
1382
1383 # if keep_remaining, append the old 'next' items
1384 if keep_remaining:
1385 next_items += self._queue_items[queue_id][insert_at_index:]
1386
1387 # we set the original insert order as attribute so we can un-shuffle
1388 for index, item in enumerate(next_items):
1389 item.sort_index += insert_at_index + index
1390 # (re)shuffle the final batch if needed
1391 if shuffle:
1392 next_items = await _smart_shuffle(next_items)
1393 self.update_items(queue_id, prev_items + next_items)
1394
1395 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1396 """Update the existing queue items, mostly caused by reordering."""
1397 self._queue_items[queue_id] = queue_items
1398 queue = self._queues[queue_id]
1399 queue.items = len(self._queue_items[queue_id])
1400 # to track if the queue items changed we set a timestamp
1401 # this is a simple way to detect changes in the list of items
1402 # without having to compare the entire list
1403 queue.items_last_updated = time.time()
1404 self.signal_update(queue_id, True)
1405 if (
1406 queue.state == PlaybackState.PLAYING
1407 and queue.index_in_buffer is not None
1408 and queue.index_in_buffer == queue.current_index
1409 ):
1410 # if the queue is playing,
1411 # ensure to (re)queue the next track because it might have changed
1412 # note that we only do this if the player has loaded the current track
1413 # if not, we wait until it has loaded to prevent conflicts
1414 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1415 self._enqueue_next_item(queue_id, next_item)
1416
1417 # Helper methods
1418
1419 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1420 """Get queue item by index or item_id."""
1421 if item_id_or_index is None:
1422 return None
1423 if (queue_items := self._queue_items.get(queue_id)) is None:
1424 return None
1425 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1426 return queue_items[item_id_or_index]
1427 if isinstance(item_id_or_index, str):
1428 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1429 return None
1430
1431 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1432 """Signal state changed of given queue."""
1433 queue = self._queues[queue_id]
1434 if items_changed:
1435 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1436 # save items in cache - only cache items with valid media_item
1437 cache_data = [
1438 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1439 ]
1440 self.mass.create_task(
1441 self.mass.cache.set(
1442 key=queue_id,
1443 data=cache_data,
1444 provider=self.domain,
1445 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1446 )
1447 )
1448 # always send the base event
1449 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1450 # also signal update to the player itself so it can update its current_media
1451 self.mass.players.trigger_player_update(queue_id)
1452 # save state
1453 self.mass.create_task(
1454 self.mass.cache.set(
1455 key=queue_id,
1456 data=queue.to_cache(),
1457 provider=self.domain,
1458 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1459 )
1460 )
1461
1462 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1463 """Get index by queue_item_id."""
1464 queue_items = self._queue_items[queue_id]
1465 for index, item in enumerate(queue_items):
1466 if item.queue_item_id == queue_item_id:
1467 return index
1468 return None
1469
1470 async def player_media_from_queue_item(
1471 self, queue_item: QueueItem, flow_mode: bool
1472 ) -> PlayerMedia:
1473 """Parse PlayerMedia from QueueItem."""
1474 queue = self._queues[queue_item.queue_id]
1475 if flow_mode:
1476 duration = None
1477 elif queue_item.streamdetails:
1478 # prefer netto duration
1479 # when seeking, the player only receives the remaining duration
1480 duration = queue_item.streamdetails.duration or queue_item.duration
1481 if duration and queue_item.streamdetails.seek_position:
1482 duration = duration - queue_item.streamdetails.seek_position
1483 else:
1484 duration = queue_item.duration
1485 if queue.session_id is None:
1486 # handle error or return early
1487 raise InvalidDataError("Queue session_id is None")
1488 media = PlayerMedia(
1489 uri=await self.mass.streams.resolve_stream_url(
1490 queue.session_id, queue_item, flow_mode=flow_mode
1491 ),
1492 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1493 title="Music Assistant" if flow_mode else queue_item.name,
1494 image_url=MASS_LOGO_ONLINE,
1495 duration=duration,
1496 source_id=queue_item.queue_id,
1497 queue_item_id=queue_item.queue_item_id,
1498 )
1499 if not flow_mode and queue_item.media_item:
1500 media.title = queue_item.media_item.name
1501 media.artist = getattr(queue_item.media_item, "artist_str", "")
1502 media.album = (
1503 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1504 )
1505 if queue_item.image:
1506 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1507 # we prefer the imageproxy on the streamserver here because this request is sent
1508 # to the player itself which may not be able to reach the regular webserver
1509 media.image_url = self.mass.metadata.get_image_url(
1510 queue_item.image, size=500, prefer_stream_server=True
1511 )
1512 return media
1513
1514 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1515 """Return tracks for given artist, based on user preference."""
1516 artist_items_conf = self.mass.config.get_raw_core_config_value(
1517 self.domain,
1518 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1519 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1520 )
1521 self.logger.info(
1522 "Fetching tracks to play for artist %s",
1523 artist.name,
1524 )
1525 if artist_items_conf in ("library_tracks", "all_tracks"):
1526 all_items = await self.mass.music.artists.tracks(
1527 artist.item_id,
1528 artist.provider,
1529 in_library_only=artist_items_conf == "library_tracks",
1530 )
1531 random.shuffle(all_items)
1532 return all_items
1533 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1534 all_tracks: list[Track] = []
1535 for library_album in await self.mass.music.artists.albums(
1536 artist.item_id,
1537 artist.provider,
1538 in_library_only=artist_items_conf == "library_album_tracks",
1539 ):
1540 for album_track in await self.mass.music.albums.tracks(
1541 library_album.item_id, library_album.provider
1542 ):
1543 if album_track not in all_tracks:
1544 all_tracks.append(album_track)
1545 random.shuffle(all_tracks)
1546 return all_tracks
1547 return []
1548
1549 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1550 """Return tracks for given album, based on user preference."""
1551 album_items_conf = self.mass.config.get_raw_core_config_value(
1552 self.domain,
1553 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1554 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1555 )
1556 result: list[Track] = []
1557 start_item_found = False
1558 self.logger.info(
1559 "Fetching tracks to play for album %s",
1560 album.name,
1561 )
1562 for album_track in await self.mass.music.albums.tracks(
1563 item_id=album.item_id,
1564 provider_instance_id_or_domain=album.provider,
1565 in_library_only=album_items_conf == "library_tracks",
1566 ):
1567 if not album_track.available:
1568 continue
1569 if start_item in (album_track.item_id, album_track.uri):
1570 start_item_found = True
1571 if start_item is not None and not start_item_found:
1572 continue
1573 result.append(album_track)
1574 return result
1575
1576 async def get_playlist_tracks(
1577 self, playlist: Playlist, start_item: str | None
1578 ) -> list[PlaylistPlayableItem]:
1579 """Return tracks for given playlist, based on user preference."""
1580 result: list[PlaylistPlayableItem] = []
1581 start_item_found = False
1582 self.logger.info(
1583 "Fetching tracks to play for playlist %s",
1584 playlist.name,
1585 )
1586 # TODO: Handle other sort options etc.
1587 async for playlist_track in self.mass.music.playlists.tracks(
1588 playlist.item_id, playlist.provider
1589 ):
1590 if not playlist_track.available:
1591 continue
1592 if start_item in (playlist_track.item_id, playlist_track.uri):
1593 start_item_found = True
1594 if start_item is not None and not start_item_found:
1595 continue
1596 result.append(playlist_track)
1597 return result
1598
1599 async def get_audiobook_resume_point(
1600 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1601 ) -> int:
1602 """Return resume point (in milliseconds) for given audio book."""
1603 self.logger.debug(
1604 "Fetching resume point to play for audio book %s",
1605 audio_book.name,
1606 )
1607 if chapter is not None:
1608 # user explicitly selected a chapter to play
1609 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1610 if chapters := audio_book.metadata.chapters:
1611 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1612 return int(_chapter.start * 1000)
1613 raise InvalidDataError(
1614 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1615 )
1616 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1617 audio_book, userid=userid
1618 )
1619 return 0 if full_played else resume_position_ms
1620
1621 async def get_next_podcast_episodes(
1622 self,
1623 podcast: Podcast | None,
1624 episode: PodcastEpisode | str | None,
1625 userid: str | None = None,
1626 ) -> UniqueList[PodcastEpisode]:
1627 """Return (next) episode(s) and resume point for given podcast."""
1628 if podcast is None and isinstance(episode, str | NoneType):
1629 raise InvalidDataError("Either podcast or episode must be provided")
1630 if podcast is None:
1631 # single podcast episode requested
1632 assert isinstance(episode, PodcastEpisode) # checked above
1633 self.logger.debug(
1634 "Fetching resume point to play for Podcast episode %s",
1635 episode.name,
1636 )
1637 (
1638 fully_played,
1639 resume_position_ms,
1640 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1641 episode.fully_played = fully_played
1642 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1643 return UniqueList([episode])
1644 # podcast with optional start episode requested
1645 self.logger.debug(
1646 "Fetching episode(s) and resume point to play for Podcast %s",
1647 podcast.name,
1648 )
1649 all_episodes = [
1650 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1651 ]
1652 all_episodes.sort(key=lambda x: x.position)
1653 # if a episode was provided, a user explicitly selected a episode to play
1654 # so we need to find the index of the episode in the list
1655 resolved_episode: PodcastEpisode | None = None
1656 if isinstance(episode, PodcastEpisode):
1657 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1658 if resolved_episode:
1659 # ensure we have accurate resume info
1660 (
1661 fully_played,
1662 resume_position_ms,
1663 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1664 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1665 elif isinstance(episode, str):
1666 resolved_episode = next(
1667 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1668 )
1669 if resolved_episode:
1670 # ensure we have accurate resume info
1671 (
1672 fully_played,
1673 resume_position_ms,
1674 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1675 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1676 else:
1677 # get first episode that is not fully played
1678 for ep in all_episodes:
1679 if ep.fully_played:
1680 continue
1681 # ensure we have accurate resume info
1682 (
1683 fully_played,
1684 resume_position_ms,
1685 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1686 if fully_played:
1687 continue
1688 ep.resume_position_ms = resume_position_ms
1689 resolved_episode = ep
1690 break
1691 else:
1692 # no episodes found that are not fully played, so we start at the beginning
1693 resolved_episode = next((x for x in all_episodes), None)
1694 if resolved_episode is None:
1695 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1696 # get the index of the episode
1697 episode_index = all_episodes.index(resolved_episode)
1698 # return the (remaining) episode(s) to play
1699 return UniqueList(all_episodes[episode_index:])
1700
1701 def _get_next_index(
1702 self,
1703 queue_id: str,
1704 cur_index: int | None,
1705 is_skip: bool = False,
1706 allow_repeat: bool = True,
1707 ) -> int | None:
1708 """
1709 Return the next index for the queue, accounting for repeat settings.
1710
1711 Will return None if there are no (more) items in the queue.
1712 """
1713 queue = self._queues[queue_id]
1714 queue_items = self._queue_items[queue_id]
1715 if not queue_items or cur_index is None:
1716 # queue is empty
1717 return None
1718 # handle repeat single track
1719 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1720 return cur_index if allow_repeat else None
1721 # handle cur_index is last index of the queue
1722 if cur_index >= (len(queue_items) - 1):
1723 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1724 # if repeat all is enabled, we simply start again from the beginning
1725 return 0
1726 return None
1727 # all other: just the next index
1728 return cur_index + 1
1729
1730 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1731 """Return next QueueItem for given queue."""
1732 index: int
1733 if isinstance(cur_index, str):
1734 resolved_index = self.index_by_id(queue_id, cur_index)
1735 if resolved_index is None:
1736 return None # guard
1737 index = resolved_index
1738 else:
1739 index = cur_index
1740 # At this point index is guaranteed to be int
1741 for skip in range(5):
1742 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1743 break
1744 next_item = self.get_item(queue_id, next_index)
1745 if next_item is None:
1746 continue
1747 if not next_item.available:
1748 # ensure that we skip unavailable items (set by load_next track logic)
1749 continue
1750 return next_item
1751 return None
1752
1753 async def _fill_radio_tracks(self, queue_id: str) -> None:
1754 """Fill a Queue with (additional) Radio tracks."""
1755 self.logger.debug(
1756 "Filling radio tracks for queue %s",
1757 queue_id,
1758 )
1759 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1760 # fill queue - filter out unavailable items
1761 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1762 await self.load(
1763 queue_id,
1764 queue_items,
1765 insert_at_index=len(self._queue_items[queue_id]) + 1,
1766 )
1767
1768 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1769 """Enqueue the next item on the player."""
1770 if not next_item:
1771 # no next item, nothing to do...
1772 return
1773
1774 queue = self._queues[queue_id]
1775 if queue.flow_mode:
1776 # ignore this for flow mode
1777 return
1778
1779 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1780 await self.mass.players.enqueue_next_media(
1781 player_id=queue_id,
1782 media=await self.player_media_from_queue_item(next_item, False),
1783 )
1784 if queue.next_item_id_enqueued != next_item.queue_item_id:
1785 queue.next_item_id_enqueued = next_item.queue_item_id
1786 self.logger.debug(
1787 "Enqueued next track %s on queue %s",
1788 next_item.name,
1789 self._queues[queue_id].display_name,
1790 )
1791
1792 task_id = f"enqueue_next_item_{queue_id}"
1793 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1794
1795 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1796 """
1797 Preload the streamdetails for the next item in the queue/buffer.
1798
1799 This basically ensures the item is playable and fetches the stream details.
1800 If an error occurs, the item will be skipped and the next item will be loaded.
1801 """
1802 queue = self._queues[queue_id]
1803
1804 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1805 try:
1806 # wait for the item that was loaded in the buffer is the actually playing item
1807 # this prevents a race condition when we preload the next item too soon
1808 # while the player is actually preloading the previously enqueued item.
1809 retries = 120
1810 while retries > 0:
1811 if not queue.current_item:
1812 return # guard
1813 if queue.current_item.queue_item_id == item_id_in_buffer:
1814 break
1815 retries -= 1
1816 await asyncio.sleep(1)
1817 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1818 self.logger.debug(
1819 "Preloaded next item %s for queue %s",
1820 next_item.name,
1821 queue.display_name,
1822 )
1823 # enqueue the next item on the player
1824 self._enqueue_next_item(queue_id, next_item)
1825
1826 except QueueEmpty:
1827 return
1828
1829 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1830 # this should not happen, but guard anyways
1831 return
1832 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1833 # radio items or no duration, nothing to do
1834 return
1835
1836 task_id = f"preload_next_item_{queue_id}"
1837 self.mass.create_task(
1838 _preload_streamdetails,
1839 item_id_in_buffer,
1840 task_id=task_id,
1841 abort_existing=True,
1842 )
1843
1844 async def _resolve_media_items(
1845 self,
1846 media_item: MediaItemType | ItemMapping | BrowseFolder,
1847 start_item: str | None = None,
1848 userid: str | None = None,
1849 queue_id: str | None = None,
1850 ) -> list[MediaItemType]:
1851 """Resolve/unwrap media items to enqueue."""
1852 # resolve Itemmapping to full media item
1853 if isinstance(media_item, ItemMapping):
1854 if media_item.uri is None:
1855 raise InvalidDataError("ItemMapping has no URI")
1856 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1857 if media_item.media_type == MediaType.PLAYLIST:
1858 media_item = cast("Playlist", media_item)
1859 self.mass.create_task(
1860 self.mass.music.mark_item_played(
1861 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1862 )
1863 )
1864 return list(await self.get_playlist_tracks(media_item, start_item))
1865 if media_item.media_type == MediaType.ARTIST:
1866 media_item = cast("Artist", media_item)
1867 self.mass.create_task(
1868 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1869 )
1870 return list(await self.get_artist_tracks(media_item))
1871 if media_item.media_type == MediaType.ALBUM:
1872 media_item = cast("Album", media_item)
1873 self.mass.create_task(
1874 self.mass.music.mark_item_played(
1875 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1876 )
1877 )
1878 return list(await self.get_album_tracks(media_item, start_item))
1879 if media_item.media_type == MediaType.AUDIOBOOK:
1880 media_item = cast("Audiobook", media_item)
1881 # ensure we grab the correct/latest resume point info
1882 media_item.resume_position_ms = await self.get_audiobook_resume_point(
1883 media_item, start_item, userid=userid
1884 )
1885 return [media_item]
1886 if media_item.media_type == MediaType.PODCAST:
1887 media_item = cast("Podcast", media_item)
1888 self.mass.create_task(
1889 self.mass.music.mark_item_played(
1890 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1891 )
1892 )
1893 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
1894 if media_item.media_type == MediaType.PODCAST_EPISODE:
1895 media_item = cast("PodcastEpisode", media_item)
1896 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
1897 if media_item.media_type == MediaType.FOLDER:
1898 media_item = cast("BrowseFolder", media_item)
1899 return list(await self._get_folder_tracks(media_item))
1900 # all other: single track or radio item
1901 return [cast("MediaItemType", media_item)]
1902
1903 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
1904 """Try to resume playback from playlog when queue is empty.
1905
1906 Attempts to find user-initiated recently played items in the following order:
1907 1. By userid AND queue_id
1908 2. By queue_id only
1909 3. By userid only (if available)
1910 4. Any recently played item
1911
1912 :param queue: The queue to resume playback on.
1913 :return: True if playback was started, False otherwise.
1914 """
1915 # Try different filter combinations in order of specificity
1916 filter_attempts: list[tuple[str | None, str | None, str]] = []
1917 if queue.userid:
1918 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
1919 filter_attempts.append((None, queue.queue_id, "queue_id match"))
1920 if queue.userid:
1921 filter_attempts.append((queue.userid, None, "userid match"))
1922 filter_attempts.append((None, None, "any recent item"))
1923
1924 for userid, queue_id, match_type in filter_attempts:
1925 items = await self.mass.music.recently_played(
1926 limit=5,
1927 fully_played_only=False,
1928 user_initiated_only=True,
1929 userid=userid,
1930 queue_id=queue_id,
1931 )
1932 for item in items:
1933 if not item.uri:
1934 continue
1935 try:
1936 await self.play_media(queue.queue_id, item)
1937 self.logger.info(
1938 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
1939 )
1940 return True
1941 except MusicAssistantError as err:
1942 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
1943 continue
1944
1945 return False
1946
1947 async def _get_radio_tracks(
1948 self, queue_id: str, is_initial_radio_mode: bool = False
1949 ) -> list[Track]:
1950 """Call the registered music providers for dynamic tracks."""
1951 queue = self._queues[queue_id]
1952 queue_track_items: list[Track] = [
1953 q.media_item
1954 for q in self._queue_items[queue_id]
1955 if q.media_item and isinstance(q.media_item, Track)
1956 ]
1957 if not queue.radio_source:
1958 # this may happen during race conditions as this method is called delayed
1959 return []
1960 self.logger.info(
1961 "Fetching radio tracks for queue %s based on: %s",
1962 queue.display_name,
1963 ", ".join([x.name for x in queue.radio_source]),
1964 )
1965
1966 # Get user's preferred provider instances for steering provider selection
1967 preferred_provider_instances: list[str] | None = None
1968 if (
1969 queue.userid
1970 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
1971 and playback_user.provider_filter
1972 ):
1973 preferred_provider_instances = playback_user.provider_filter
1974
1975 available_base_tracks: list[Track] = []
1976 base_track_sample_size = 5
1977 # Some providers have very deterministic similar track algorithms when providing
1978 # a single track item. When we have a radio mode based on 1 track and we have to
1979 # refill the queue (ie not initial radio mode), we use the play history as base tracks
1980 if (
1981 len(queue.radio_source) == 1
1982 and queue.radio_source[0].media_type == MediaType.TRACK
1983 and not is_initial_radio_mode
1984 ):
1985 available_base_tracks = queue_track_items
1986 else:
1987 # Grab all the available base tracks based on the selected source items.
1988 # shuffle the source items, just in case
1989 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
1990 ctrl = self.mass.music.get_controller(radio_item.media_type)
1991 try:
1992 available_base_tracks += [
1993 track
1994 for track in await ctrl.radio_mode_base_tracks(
1995 radio_item, # type: ignore[arg-type]
1996 preferred_provider_instances,
1997 )
1998 # Avoid duplicate base tracks
1999 if track not in available_base_tracks
2000 ]
2001 except UnsupportedFeaturedException as err:
2002 self.logger.debug(
2003 "Skip loading radio items for %s: %s ",
2004 radio_item.uri,
2005 str(err),
2006 )
2007 if not available_base_tracks:
2008 raise UnsupportedFeaturedException("Radio mode not available for source items")
2009
2010 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2011 base_tracks = random.sample(
2012 available_base_tracks,
2013 min(base_track_sample_size, len(available_base_tracks)),
2014 )
2015 # Use a set to avoid duplicate dynamic tracks
2016 dynamic_tracks: set[Track] = set()
2017 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2018 for allow_lookup in (False, True):
2019 if dynamic_tracks:
2020 break
2021 for base_track in base_tracks:
2022 try:
2023 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2024 base_track.item_id,
2025 base_track.provider,
2026 allow_lookup=allow_lookup,
2027 preferred_provider_instances=preferred_provider_instances,
2028 )
2029 except MediaNotFoundError:
2030 # Some providers don't have similar tracks for all items. For example,
2031 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2032 # in that case, just skip the track.
2033 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2034 continue
2035 for track in _similar_tracks:
2036 if (
2037 track not in base_tracks
2038 # Exclude tracks we have already played / queued
2039 and track not in queue_track_items
2040 # Ignore tracks that are too long for radio mode, e.g. mixes
2041 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2042 ):
2043 dynamic_tracks.add(track)
2044 if len(dynamic_tracks) >= 50:
2045 break
2046 queue_tracks: list[Track] = []
2047 dynamic_tracks_list = list(dynamic_tracks)
2048 # Only include the sampled base tracks when the radio mode is first initialized
2049 if is_initial_radio_mode:
2050 queue_tracks += [base_tracks[0]]
2051 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2052 if len(base_tracks) > 1:
2053 for base_track in base_tracks[1:]:
2054 queue_tracks += [base_track]
2055 if len(dynamic_tracks_list) > 2:
2056 queue_tracks += random.sample(dynamic_tracks_list, 2)
2057 else:
2058 queue_tracks += dynamic_tracks_list
2059 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2060 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2061 if remaining_dynamic_tracks:
2062 queue_tracks += random.sample(
2063 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2064 )
2065 return queue_tracks
2066
2067 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2068 """Fetch (playable) tracks for given browse folder."""
2069 self.logger.info(
2070 "Fetching tracks to play for folder %s",
2071 folder.name,
2072 )
2073 tracks: list[Track] = []
2074 for item in await self.mass.music.browse(folder.path):
2075 if not item.is_playable:
2076 continue
2077 # recursively fetch tracks from all media types
2078 resolved = await self._resolve_media_items(item)
2079 tracks += [x for x in resolved if isinstance(x, Track)]
2080
2081 return tracks
2082
2083 def _update_queue_from_player(
2084 self,
2085 player: Player,
2086 ) -> None:
2087 """Update the Queue when the player state changed."""
2088 queue_id = player.player_id
2089 queue = self._queues[queue_id]
2090
2091 # basic properties
2092 queue.display_name = player.display_name
2093 queue.available = player.available
2094 queue.items = len(self._queue_items[queue_id])
2095
2096 queue.state = (
2097 player.playback_state or PlaybackState.IDLE if queue.active else PlaybackState.IDLE
2098 )
2099 # update current item/index from player report
2100 if queue.active and queue.state in (
2101 PlaybackState.PLAYING,
2102 PlaybackState.PAUSED,
2103 ):
2104 # NOTE: If the queue is not playing (yet) we will not update the current index
2105 # to ensure we keep the previously known current index
2106 if queue.flow_mode:
2107 # flow mode active, the player is playing one long stream
2108 # so we need to calculate the current index and elapsed time
2109 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2110 elif item_id := self._parse_player_current_item_id(queue_id, player):
2111 # normal mode, the player itself will report the current item
2112 elapsed_time = int(player.corrected_elapsed_time or 0)
2113 current_index = self.index_by_id(queue_id, item_id)
2114 else:
2115 # this may happen if the player is still transitioning between tracks
2116 # we ignore this for now and keep the current index as is
2117 return
2118
2119 # get current/next item based on current index
2120 queue.current_index = current_index
2121 queue.current_item = current_item = self.get_item(queue_id, current_index)
2122 queue.next_item = (
2123 self.get_next_item(queue_id, current_index)
2124 if current_item and current_index is not None
2125 else None
2126 )
2127
2128 # correct elapsed time when seeking
2129 if (
2130 not queue.flow_mode
2131 and current_item
2132 and current_item.streamdetails
2133 and current_item.streamdetails.seek_position
2134 ):
2135 elapsed_time += current_item.streamdetails.seek_position
2136 queue.elapsed_time = elapsed_time
2137 queue.elapsed_time_last_updated = time.time()
2138
2139 elif not queue.current_item and queue.current_index is not None:
2140 current_index = queue.current_index
2141 queue.current_item = current_item = self.get_item(queue_id, current_index)
2142 queue.next_item = (
2143 self.get_next_item(queue_id, current_index)
2144 if current_item and current_index is not None
2145 else None
2146 )
2147
2148 # This is enough to detect any changes in the DSPDetails
2149 # (so child count changed, or any output format changed)
2150 output_formats = []
2151 if output_format := player.extra_data.get("output_format"):
2152 output_formats.append(str(output_format))
2153 for child_id in player.group_members:
2154 if (child := self.mass.players.get(child_id)) and (
2155 output_format := child.extra_data.get("output_format")
2156 ):
2157 output_formats.append(str(output_format))
2158 else:
2159 output_formats.append("unknown")
2160
2161 # basic throttle: do not send state changed events if queue did not actually change
2162 prev_state: CompareState = self._prev_states.get(
2163 queue_id,
2164 CompareState(
2165 queue_id=queue_id,
2166 state=PlaybackState.IDLE,
2167 current_item_id=None,
2168 next_item_id=None,
2169 current_item=None,
2170 elapsed_time=0,
2171 last_playing_elapsed_time=0,
2172 stream_title=None,
2173 codec_type=None,
2174 output_formats=None,
2175 ),
2176 )
2177 # update last_playing_elapsed_time only when the player is actively playing
2178 # use corrected_elapsed_time which accounts for time since last update
2179 # this preserves the last known elapsed time when transitioning to idle/paused
2180 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2181 prev_item_id = prev_state["current_item_id"]
2182 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2183 if queue.state == PlaybackState.PLAYING:
2184 current_elapsed = int(queue.corrected_elapsed_time)
2185 if current_item_id != prev_item_id:
2186 # new track started, reset the elapsed time tracker
2187 last_playing_elapsed_time = current_elapsed
2188 else:
2189 # same track, use the max of current and previous to handle timing issues
2190 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2191 else:
2192 last_playing_elapsed_time = prev_playing_elapsed
2193 new_state = CompareState(
2194 queue_id=queue_id,
2195 state=queue.state,
2196 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2197 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2198 current_item=queue.current_item,
2199 elapsed_time=int(queue.elapsed_time),
2200 last_playing_elapsed_time=last_playing_elapsed_time,
2201 stream_title=(
2202 queue.current_item.streamdetails.stream_title
2203 if queue.current_item and queue.current_item.streamdetails
2204 else None
2205 ),
2206 codec_type=(
2207 queue.current_item.streamdetails.audio_format.codec_type
2208 if queue.current_item and queue.current_item.streamdetails
2209 else None
2210 ),
2211 output_formats=output_formats,
2212 )
2213 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2214 with suppress(KeyError):
2215 changed_keys.remove("next_item_id")
2216 with suppress(KeyError):
2217 changed_keys.remove("last_playing_elapsed_time")
2218
2219 # store the new state
2220 if queue.active:
2221 self._prev_states[queue_id] = new_state
2222 else:
2223 self._prev_states.pop(queue_id, None)
2224
2225 # return early if nothing changed
2226 if len(changed_keys) == 0:
2227 return
2228
2229 # signal update and store state
2230 send_update = True
2231 if changed_keys == {"elapsed_time"}:
2232 # only elapsed time changed, do not send full queue update
2233 send_update = False
2234 prev_time = prev_state.get("elapsed_time") or 0
2235 cur_time = new_state.get("elapsed_time") or 0
2236 if abs(cur_time - prev_time) > 2:
2237 # send dedicated event for time updates when seeking
2238 self.mass.signal_event(
2239 EventType.QUEUE_TIME_UPDATED,
2240 object_id=queue_id,
2241 data=queue.elapsed_time,
2242 )
2243 # also signal update to the player itself so it can update its current_media
2244 self.mass.players.trigger_player_update(queue_id)
2245
2246 if send_update:
2247 self.signal_update(queue_id)
2248
2249 if "output_formats" in changed_keys:
2250 # refresh DSP details since they may have changed
2251 dsp = get_stream_dsp_details(self.mass, queue_id)
2252 if queue.current_item and queue.current_item.streamdetails:
2253 queue.current_item.streamdetails.dsp = dsp
2254 if queue.next_item and queue.next_item.streamdetails:
2255 queue.next_item.streamdetails.dsp = dsp
2256
2257 # handle updating stream_metadata if needed
2258 if (
2259 queue.current_item
2260 and (streamdetails := queue.current_item.streamdetails)
2261 and streamdetails.stream_metadata_update_callback
2262 and (
2263 streamdetails.stream_metadata_last_updated is None
2264 or (
2265 time.time() - streamdetails.stream_metadata_last_updated
2266 >= streamdetails.stream_metadata_update_interval
2267 )
2268 )
2269 ):
2270 streamdetails.stream_metadata_last_updated = time.time()
2271 self.mass.create_task(
2272 streamdetails.stream_metadata_update_callback(
2273 streamdetails, int(queue.corrected_elapsed_time)
2274 )
2275 )
2276
2277 # handle sending a playback progress report
2278 # we do this every 30 seconds or when the state changes
2279 if (
2280 changed_keys.intersection({"state", "current_item_id"})
2281 or int(queue.elapsed_time) % 30 == 0
2282 ):
2283 self._handle_playback_progress_report(queue, prev_state, new_state)
2284
2285 # check if we need to clear the queue if we reached the end
2286 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2287 self._handle_end_of_queue(queue, prev_state, new_state)
2288
2289 # watch dynamic radio items refill if needed
2290 if "current_item_id" in changed_keys:
2291 # auto enable radio mode if dont stop the music is enabled
2292 if (
2293 queue.dont_stop_the_music_enabled
2294 and queue.enqueued_media_items
2295 and queue.current_index is not None
2296 and (queue.items - queue.current_index) <= 1
2297 ):
2298 # We have received the last item in the queue and Don't stop the music is enabled
2299 # set the played media item(s) as radio items (which will refill the queue)
2300 # note that this will fail if there are no media items for which we have
2301 # a dynamic radio source.
2302 self.logger.debug(
2303 "End of queue detected and Don't stop the music is enabled for %s"
2304 " - setting enqueued media items as radio source: %s",
2305 queue.display_name,
2306 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2307 )
2308 queue.radio_source = queue.enqueued_media_items
2309 # auto fill radio tracks if less than 5 tracks left in the queue
2310 if (
2311 queue.radio_source
2312 and queue.current_index is not None
2313 and (queue.items - queue.current_index) < 5
2314 ):
2315 task_id = f"fill_radio_tracks_{queue_id}"
2316 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2317
2318 def _get_flow_queue_stream_index(
2319 self, queue: PlayerQueue, player: Player
2320 ) -> tuple[int | None, int]:
2321 """Calculate current queue index and current track elapsed time when flow mode is active."""
2322 elapsed_time_queue_total = player.corrected_elapsed_time or 0
2323 if queue.current_index is None and not queue.flow_mode_stream_log:
2324 return queue.current_index, int(queue.elapsed_time)
2325
2326 # For each track that has been streamed/buffered to the player,
2327 # a playlog entry will be created with the queue item id
2328 # and the amount of seconds streamed. We traverse the playlog to figure
2329 # out where we are in the queue, accounting for actual streamed
2330 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2331 # it will simply be in the playlog multiple times.
2332 played_time = 0.0
2333 queue_index: int | None = queue.current_index or 0
2334 track_time = 0.0
2335 for play_log_entry in queue.flow_mode_stream_log:
2336 queue_item_duration = (
2337 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2338 play_log_entry.seconds_streamed
2339 if play_log_entry.seconds_streamed is not None
2340 else play_log_entry.duration or 3600 * 24 * 7
2341 )
2342 if elapsed_time_queue_total > (queue_item_duration + played_time):
2343 # total elapsed time is more than (streamed) track duration
2344 # this track has been fully played, move on.
2345 played_time += queue_item_duration
2346 else:
2347 # no more seconds left to divide, this is our track
2348 # account for any seeking by adding the skipped/seeked seconds
2349 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2350 queue_item = self.get_item(queue.queue_id, queue_index)
2351 if queue_item and queue_item.streamdetails:
2352 track_sec_skipped = queue_item.streamdetails.seek_position
2353 else:
2354 track_sec_skipped = 0
2355 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2356 break
2357 if player.playback_state != PlaybackState.PLAYING:
2358 # if the player is not playing, we can't be sure that the elapsed time is correct
2359 # so we just return the queue index and the elapsed time
2360 return queue.current_index, int(queue.elapsed_time)
2361 return queue_index, int(track_time)
2362
2363 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2364 """Parse QueueItem ID from Player's current url."""
2365 if not player._current_media:
2366 # YES, we use player._current_media on purpose here because we need the raw metadata
2367 return None
2368 # prefer queue_id and queue_item_id within the current media
2369 if player._current_media.source_id == queue_id and player._current_media.queue_item_id:
2370 return player._current_media.queue_item_id
2371 # special case for sonos players
2372 if player._current_media.uri and player._current_media.uri.startswith(f"mass:{queue_id}"):
2373 if player._current_media.queue_item_id:
2374 return player._current_media.queue_item_id
2375 return player._current_media.uri.split(":")[-1]
2376 # try to extract the item id from a mass stream url
2377 if (
2378 player._current_media.uri
2379 and queue_id in player._current_media.uri
2380 and self.mass.streams.base_url in player._current_media.uri
2381 ):
2382 current_item_id = player._current_media.uri.rsplit("/")[-1].split(".")[0]
2383 if self.get_item(queue_id, current_item_id):
2384 return current_item_id
2385 # try to extract the item id from a queue_id/item_id combi
2386 if (
2387 player._current_media.uri
2388 and queue_id in player._current_media.uri
2389 and "/" in player._current_media.uri
2390 ):
2391 current_item_id = player._current_media.uri.split("/")[1]
2392 if self.get_item(queue_id, current_item_id):
2393 return current_item_id
2394
2395 return None
2396
2397 def _handle_end_of_queue(
2398 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2399 ) -> None:
2400 """Check if the queue should be cleared after the current item."""
2401 # check if queue state changed to stopped (from playing/paused to idle)
2402 if not (
2403 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2404 and new_state["state"] == PlaybackState.IDLE
2405 ):
2406 return
2407 # check if no more items in the queue (next_item should be None at end of queue)
2408 if queue.next_item is not None:
2409 return
2410 # check if we had a previous item playing
2411 if prev_state["current_item_id"] is None:
2412 return
2413
2414 async def _clear_queue_delayed() -> None:
2415 for _ in range(5):
2416 await asyncio.sleep(1)
2417 if queue.state != PlaybackState.IDLE:
2418 return
2419 if queue.next_item is not None:
2420 return
2421 self.logger.info("End of queue reached, clearing items")
2422 self.clear(queue.queue_id)
2423
2424 # all checks passed, we stopped playback at the last (or single) track of the queue
2425 # now determine if the item was fully played before clearing
2426
2427 # For flow mode, check if the last track was fully streamed using the stream log
2428 # This is more reliable than elapsed_time which can be reset/incorrect
2429 if queue.flow_mode and queue.flow_mode_stream_log:
2430 last_log_entry = queue.flow_mode_stream_log[-1]
2431 if last_log_entry.seconds_streamed is not None:
2432 # The last track finished streaming, safe to clear queue
2433 self.mass.create_task(_clear_queue_delayed())
2434 return
2435
2436 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2437 prev_item = prev_state["current_item"]
2438 if prev_item and (streamdetails := prev_item.streamdetails):
2439 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2440 elif prev_item:
2441 duration = prev_item.duration or 24 * 3600
2442 else:
2443 # No current item means player has already cleared it, safe to clear queue
2444 self.mass.create_task(_clear_queue_delayed())
2445 return
2446
2447 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2448 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2449 seconds_played = int(prev_state["last_playing_elapsed_time"])
2450 # debounce this a bit to make sure we're not clearing the queue by accident
2451 # only clear if the last track was played to near completion (within 5 seconds of end)
2452 if seconds_played >= (duration or 3600) - 5:
2453 self.mass.create_task(_clear_queue_delayed())
2454
2455 def _handle_playback_progress_report(
2456 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2457 ) -> None:
2458 """Handle playback progress report."""
2459 # detect change in current index to report that a item has been played
2460 prev_item_id = prev_state["current_item_id"]
2461 cur_item_id = new_state["current_item_id"]
2462 if prev_item_id is None and cur_item_id is None:
2463 return
2464
2465 if prev_item_id is not None and prev_item_id != cur_item_id:
2466 # we have a new item, so we need report the previous one
2467 is_current_item = False
2468 item_to_report = prev_state["current_item"]
2469 seconds_played = int(prev_state["elapsed_time"])
2470 else:
2471 # report on current item
2472 is_current_item = True
2473 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2474 seconds_played = int(new_state["elapsed_time"])
2475
2476 if not item_to_report:
2477 return # guard against invalid items
2478
2479 if not (media_item := item_to_report.media_item):
2480 # only report on media items
2481 return
2482 assert media_item.uri is not None # uri is set in __post_init__
2483
2484 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2485 # Ignore items that had a stream error
2486 return
2487
2488 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2489 duration = int(item_to_report.streamdetails.duration)
2490 else:
2491 duration = int(item_to_report.duration or 3 * 3600)
2492
2493 if seconds_played < 5:
2494 # ignore items that have been played less than 5 seconds
2495 # this also filters out a bounce effect where the previous item
2496 # gets reported with 0 elapsed seconds after a new item starts playing
2497 return
2498
2499 # determine if item is fully played
2500 # for podcasts and audiobooks we account for the last 60 seconds
2501 percentage_played = percentage(seconds_played, duration)
2502 if not is_current_item and item_to_report.media_type in (
2503 MediaType.AUDIOBOOK,
2504 MediaType.PODCAST_EPISODE,
2505 ):
2506 fully_played = seconds_played >= duration - 60
2507 elif not is_current_item:
2508 # 90% of the track must be played to be considered fully played
2509 fully_played = percentage_played >= 90
2510 else:
2511 fully_played = seconds_played >= duration - 10
2512
2513 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2514 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2515 self.logger.debug(
2516 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2517 queue.display_name,
2518 "is playing" if is_playing else "played",
2519 item_to_report.name,
2520 item_to_report.uri,
2521 fully_played,
2522 f"{percentage_played}%",
2523 seconds_played,
2524 duration,
2525 )
2526 # add entry to playlog - this also handles resume of podcasts/audiobooks
2527 self.mass.create_task(
2528 self.mass.music.mark_item_played(
2529 media_item,
2530 fully_played=fully_played,
2531 seconds_played=seconds_played,
2532 is_playing=is_playing,
2533 userid=queue.userid,
2534 queue_id=queue.queue_id,
2535 user_initiated=False,
2536 )
2537 )
2538
2539 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2540 # signal 'media item played' event,
2541 # which is useful for plugins that want to do scrobbling
2542 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2543 artists_names = [a.name for a in artists]
2544 self.mass.signal_event(
2545 EventType.MEDIA_ITEM_PLAYED,
2546 object_id=media_item.uri,
2547 data=MediaItemPlaybackProgressReport(
2548 uri=media_item.uri,
2549 media_type=media_item.media_type,
2550 name=media_item.name,
2551 version=getattr(media_item, "version", None),
2552 artist=(
2553 getattr(media_item, "artist_str", None) or artists_names[0]
2554 if artists_names
2555 else None
2556 ),
2557 artists=artists_names,
2558 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2559 album=album.name if album else None,
2560 album_mbid=album.mbid if album else None,
2561 album_artist=(album.artist_str if isinstance(album, Album) else None),
2562 album_artist_mbids=(
2563 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2564 ),
2565 image_url=(
2566 self.mass.metadata.get_image_url(
2567 item_to_report.media_item.image, prefer_proxy=False
2568 )
2569 if item_to_report.media_item.image
2570 else None
2571 ),
2572 duration=duration,
2573 mbid=(getattr(media_item, "mbid", None)),
2574 seconds_played=seconds_played,
2575 fully_played=fully_played,
2576 is_playing=is_playing,
2577 userid=queue.userid,
2578 ),
2579 )
2580
2581
2582async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2583 """Shuffle queue items, avoiding identical tracks next to each other.
2584
2585 Best-effort approach to prevent the same track from appearing adjacent.
2586 Does a random shuffle first, then makes a limited number of passes to
2587 swap adjacent duplicates with a random item further in the list.
2588
2589 :param items: List of queue items to shuffle.
2590 """
2591 if len(items) <= 2:
2592 return random.sample(items, len(items)) if len(items) == 2 else items
2593
2594 # Start with a random shuffle
2595 shuffled = random.sample(items, len(items))
2596
2597 # Make a few passes to fix adjacent duplicates
2598 max_passes = 3
2599 for _ in range(max_passes):
2600 swapped = False
2601 for i in range(len(shuffled) - 1):
2602 if shuffled[i].name == shuffled[i + 1].name:
2603 # Found adjacent duplicate - swap with random position at least 2 away
2604 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2605 if swap_candidates:
2606 swap_pos = random.choice(swap_candidates)
2607 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2608 swapped = True
2609 if not swapped:
2610 break
2611 # Yield to event loop between passes
2612 await asyncio.sleep(0)
2613
2614 return shuffled
2615