/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 PlayerFeature,
32 ProviderFeature,
33 QueueOption,
34 RepeatMode,
35)
36from music_assistant_models.errors import (
37 AudioError,
38 InvalidCommand,
39 InvalidDataError,
40 MediaNotFoundError,
41 MusicAssistantError,
42 PlayerUnavailableError,
43 QueueEmpty,
44 UnsupportedFeaturedException,
45)
46from music_assistant_models.media_items import (
47 Album,
48 Artist,
49 Audiobook,
50 BrowseFolder,
51 ItemMapping,
52 MediaItemType,
53 PlayableMediaItemType,
54 Playlist,
55 Podcast,
56 PodcastEpisode,
57 Track,
58 UniqueList,
59 media_from_dict,
60)
61from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
62from music_assistant_models.player_queue import PlayerQueue
63from music_assistant_models.queue_item import QueueItem
64
65from music_assistant.constants import (
66 ATTR_ANNOUNCEMENT_IN_PROGRESS,
67 MASS_LOGO_ONLINE,
68 PLAYLIST_MEDIA_TYPES,
69 VERBOSE_LOG_LEVEL,
70 PlaylistPlayableItem,
71)
72from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
73from music_assistant.helpers.api import api_command
74from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
75from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
76from music_assistant.helpers.util import get_changed_keys, percentage
77from music_assistant.models.core_controller import CoreController
78from music_assistant.models.player import Player, PlayerMedia
79
80if TYPE_CHECKING:
81 from collections.abc import Iterator
82
83 from music_assistant_models.auth import User
84 from music_assistant_models.media_items.metadata import MediaItemImage
85
86 from music_assistant import MusicAssistant
87 from music_assistant.models.player import Player
88
89CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
90CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
91
92ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
93ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
94
95CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
96CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
97CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
98CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
99CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
100CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
101CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
102CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
103CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
104CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
105RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
106CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
107CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
108
109
110class CompareState(TypedDict):
111 """Simple object where we store the (previous) state of a queue.
112
113 Used for compare actions.
114 """
115
116 queue_id: str
117 state: PlaybackState
118 current_item_id: str | None
119 next_item_id: str | None
120 current_item: QueueItem | None
121 elapsed_time: int
122 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
123 # used to determine if a track was fully played when transitioning to idle
124 last_playing_elapsed_time: int
125 stream_title: str | None
126 codec_type: ContentType | None
127 output_formats: list[str] | None
128
129
130class PlayerQueuesController(CoreController):
131 """Controller holding all logic to enqueue music for players."""
132
133 domain: str = "player_queues"
134
135 def __init__(self, mass: MusicAssistant) -> None:
136 """Initialize core controller."""
137 super().__init__(mass)
138 self._queues: dict[str, PlayerQueue] = {}
139 self._queue_items: dict[str, list[QueueItem]] = {}
140 self._prev_states: dict[str, CompareState] = {}
141 self._transitioning_players: set[str] = set()
142 self.manifest.name = "Player Queues controller"
143 self.manifest.description = (
144 "Music Assistant's core controller which manages the queues for all players."
145 )
146 self.manifest.icon = "playlist-music"
147
148 async def close(self) -> None:
149 """Cleanup on exit."""
150 # stop all playback
151 for queue in self.all():
152 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
153 await self.stop(queue.queue_id)
154
155 async def get_config_entries(
156 self,
157 action: str | None = None,
158 values: dict[str, ConfigValueType] | None = None,
159 ) -> tuple[ConfigEntry, ...]:
160 """Return all Config Entries for this core module (if any)."""
161 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
162 return (
163 ConfigEntry(
164 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
165 type=ConfigEntryType.STRING,
166 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
167 label="Items to select when you play a (in-library) artist.",
168 options=[
169 ConfigValueOption(
170 title="Only in-library tracks",
171 value="library_tracks",
172 ),
173 ConfigValueOption(
174 title="All tracks from all albums in the library",
175 value="library_album_tracks",
176 ),
177 ConfigValueOption(
178 title="All (top) tracks from (all) streaming provider(s)",
179 value="all_tracks",
180 ),
181 ConfigValueOption(
182 title="All tracks from all albums from (all) streaming provider(s)",
183 value="all_album_tracks",
184 ),
185 ],
186 ),
187 ConfigEntry(
188 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
189 type=ConfigEntryType.STRING,
190 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
191 label="Items to select when you play a (in-library) album.",
192 options=[
193 ConfigValueOption(
194 title="Only in-library tracks",
195 value="library_tracks",
196 ),
197 ConfigValueOption(
198 title="All tracks for album on (streaming) provider",
199 value="all_tracks",
200 ),
201 ],
202 ),
203 ConfigEntry(
204 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
205 type=ConfigEntryType.STRING,
206 default_value=QueueOption.REPLACE.value,
207 label="Default enqueue option for Artist item(s).",
208 options=enqueue_options,
209 description="Define the default enqueue action for this mediatype.",
210 ),
211 ConfigEntry(
212 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
213 type=ConfigEntryType.STRING,
214 default_value=QueueOption.REPLACE.value,
215 label="Default enqueue option for Album item(s).",
216 options=enqueue_options,
217 description="Define the default enqueue action for this mediatype.",
218 ),
219 ConfigEntry(
220 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
221 type=ConfigEntryType.STRING,
222 default_value=QueueOption.PLAY.value,
223 label="Default enqueue option for Track item(s).",
224 options=enqueue_options,
225 description="Define the default enqueue action for this mediatype.",
226 ),
227 ConfigEntry(
228 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
229 type=ConfigEntryType.STRING,
230 default_value=QueueOption.REPLACE.value,
231 label="Default enqueue option for Radio item(s).",
232 options=enqueue_options,
233 description="Define the default enqueue action for this mediatype.",
234 ),
235 ConfigEntry(
236 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
237 type=ConfigEntryType.STRING,
238 default_value=QueueOption.REPLACE.value,
239 label="Default enqueue option for Playlist item(s).",
240 options=enqueue_options,
241 description="Define the default enqueue action for this mediatype.",
242 ),
243 ConfigEntry(
244 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
245 type=ConfigEntryType.STRING,
246 default_value=QueueOption.REPLACE.value,
247 label="Default enqueue option for Audiobook item(s).",
248 options=enqueue_options,
249 hidden=True,
250 ),
251 ConfigEntry(
252 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
253 type=ConfigEntryType.STRING,
254 default_value=QueueOption.REPLACE.value,
255 label="Default enqueue option for Podcast item(s).",
256 options=enqueue_options,
257 hidden=True,
258 ),
259 ConfigEntry(
260 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
261 type=ConfigEntryType.STRING,
262 default_value=QueueOption.REPLACE.value,
263 label="Default enqueue option for Podcast-episode item(s).",
264 options=enqueue_options,
265 hidden=True,
266 ),
267 ConfigEntry(
268 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
269 type=ConfigEntryType.STRING,
270 default_value=QueueOption.REPLACE.value,
271 label="Default enqueue option for Folder item(s).",
272 options=enqueue_options,
273 hidden=True,
274 ),
275 )
276
277 def __iter__(self) -> Iterator[PlayerQueue]:
278 """Iterate over (available) players."""
279 return iter(self._queues.values())
280
281 @api_command("player_queues/all")
282 def all(self) -> tuple[PlayerQueue, ...]:
283 """Return all registered PlayerQueues."""
284 return tuple(self._queues.values())
285
286 @api_command("player_queues/get")
287 def get(self, queue_id: str) -> PlayerQueue | None:
288 """Return PlayerQueue by queue_id or None if not found."""
289 return self._queues.get(queue_id)
290
291 @api_command("player_queues/items")
292 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
293 """Return all QueueItems for given PlayerQueue."""
294 if queue_id not in self._queue_items:
295 return []
296
297 return self._queue_items[queue_id][offset : offset + limit]
298
299 @api_command("player_queues/get_active_queue")
300 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
301 """Return the current active/synced queue for a player."""
302 if player := self.mass.players.get(player_id):
303 return self.mass.players.get_active_queue(player)
304 return None
305
306 # Queue commands
307
308 @api_command("player_queues/shuffle")
309 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
310 """Configure shuffle setting on the the queue."""
311 queue = self._queues[queue_id]
312 if queue.shuffle_enabled == shuffle_enabled:
313 return # no change
314 queue.shuffle_enabled = shuffle_enabled
315 queue_items = self._queue_items[queue_id]
316 cur_index = queue.index_in_buffer or queue.current_index
317 if cur_index is not None:
318 next_index = cur_index + 1
319 next_items = queue_items[next_index:]
320 else:
321 next_items = []
322 next_index = 0
323 if not shuffle_enabled:
324 # shuffle disabled, try to restore original sort order of the remaining items
325 next_items.sort(key=lambda x: x.sort_index, reverse=False)
326 await self.load(
327 queue_id=queue_id,
328 queue_items=next_items,
329 insert_at_index=next_index,
330 keep_remaining=False,
331 shuffle=shuffle_enabled,
332 )
333
334 @api_command("player_queues/dont_stop_the_music")
335 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
336 """Configure Don't stop the music setting on the queue."""
337 providers_available_with_similar_tracks = any(
338 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
339 for provider in self.mass.music.providers
340 )
341 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
342 raise UnsupportedFeaturedException(
343 "Don't stop the music is not supported by any of the available music providers"
344 )
345 queue = self._queues[queue_id]
346 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
347 self.signal_update(queue_id=queue_id)
348 # if this happens to be the last track in the queue, fill the radio source
349 if (
350 queue.dont_stop_the_music_enabled
351 and queue.enqueued_media_items
352 and queue.current_index is not None
353 and (queue.items - queue.current_index) <= 1
354 ):
355 queue.radio_source = queue.enqueued_media_items
356 task_id = f"fill_radio_tracks_{queue_id}"
357 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
358
359 @api_command("player_queues/repeat")
360 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
361 """Configure repeat setting on the the queue."""
362 queue = self._queues[queue_id]
363 if queue.repeat_mode == repeat_mode:
364 return # no change
365 queue.repeat_mode = repeat_mode
366 self.signal_update(queue_id)
367 if (
368 queue.state == PlaybackState.PLAYING
369 and queue.index_in_buffer is not None
370 and queue.index_in_buffer == queue.current_index
371 ):
372 # if the queue is playing,
373 # ensure to (re)queue the next track because it might have changed
374 # note that we only do this if the player has loaded the current track
375 # if not, we wait until it has loaded to prevent conflicts
376 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
377 self._enqueue_next_item(queue_id, next_item)
378
379 @api_command("player_queues/play_media")
380 async def play_media(
381 self,
382 queue_id: str,
383 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
384 option: QueueOption | None = None,
385 radio_mode: bool = False,
386 start_item: PlayableMediaItemType | str | None = None,
387 username: str | None = None,
388 ) -> None:
389 """Play media item(s) on the given queue.
390
391 :param queue_id: The queue_id of the queue to play media on.
392 :param media: Media that should be played (MediaItem(s) and/or uri's).
393 :param option: Which enqueue mode to use.
394 :param radio_mode: Enable radio mode for the given item(s).
395 :param start_item: Optional item to start the playlist or album from.
396 :param username: The username of the user requesting the playback.
397 Setting the username allows for overriding the logged-in user
398 to account for playback history per user when the play_media is
399 called from a shared context (like a web hook or automation).
400 """
401 # ruff: noqa: PLR0915
402 # we use a contextvar to bypass the throttler for this asyncio task/context
403 # this makes sure that playback has priority over other requests that may be
404 # happening in the background
405 BYPASS_THROTTLER.set(True)
406 if not (queue := self.get(queue_id)):
407 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
408 # always fetch the underlying player so we can raise early if its not available
409 queue_player = self.mass.players.get(queue_id, True)
410 assert queue_player is not None # for type checking
411 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
412 self.logger.warning("Ignore queue command: An announcement is in progress")
413 return
414
415 # save the user requesting the playback
416 playback_user: User | None
417 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
418 playback_user = user
419 else:
420 playback_user = get_current_user()
421 queue.userid = playback_user.user_id if playback_user else None
422
423 # a single item or list of items may be provided
424 media_list = media if isinstance(media, list) else [media]
425
426 # clear queue if needed
427 if option == QueueOption.REPLACE:
428 self.clear(queue_id)
429 # Clear the 'enqueued media item' list when a new queue is requested
430 if option not in (QueueOption.ADD, QueueOption.NEXT):
431 queue.enqueued_media_items.clear()
432
433 media_items: list[MediaItemType] = []
434 radio_source: list[MediaItemType] = []
435 # resolve all media items
436 for item in media_list:
437 try:
438 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
439 media_item: MediaItemType | ItemMapping | BrowseFolder
440 if isinstance(item, str):
441 media_item = await self.mass.music.get_item_by_uri(item)
442 elif isinstance(item, dict): # type: ignore[unreachable]
443 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
444 # converting them to MediaItem objects. The parse_value function in api.py
445 # should handle dict-to-object conversion, but dicts are slipping through
446 # in some cases. This is defensive handling for that parser bug.
447 media_item = media_from_dict(item) # type: ignore[unreachable]
448 self.logger.debug("Converted to: %s", type(media_item))
449 else:
450 # item is MediaItemType | ItemMapping at this point
451 media_item = item
452
453 # Save requested media item to play on the queue so we can use it as a source
454 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
455 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
456 if not isinstance(
457 media_item, (ItemMapping, BrowseFolder)
458 ) and media_item.media_type in (
459 MediaType.TRACK,
460 MediaType.ALBUM,
461 MediaType.PLAYLIST,
462 MediaType.ARTIST,
463 ):
464 queue.enqueued_media_items.append(media_item)
465 if len(queue.enqueued_media_items) > 10:
466 queue.enqueued_media_items.pop(0)
467
468 # handle default enqueue option if needed
469 if option is None:
470 config_value = await self.mass.config.get_core_config_value(
471 self.domain,
472 f"default_enqueue_option_{media_item.media_type.value}",
473 return_type=str,
474 )
475 option = QueueOption(config_value)
476 if option == QueueOption.REPLACE:
477 self.clear(queue_id, skip_stop=True)
478
479 # collect media_items to play
480 if radio_mode:
481 # Type guard for mypy - only add full MediaItemType to radio_source
482 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
483 radio_source.append(media_item)
484 else:
485 # Convert start_item to string URI if needed
486 start_item_uri: str | None = None
487 if isinstance(start_item, str):
488 start_item_uri = start_item
489 elif start_item is not None:
490 start_item_uri = start_item.uri
491 media_items += await self._resolve_media_items(
492 media_item, start_item_uri, queue_id=queue_id
493 )
494
495 except MusicAssistantError as err:
496 # invalid MA uri or item not found error
497 self.logger.warning("Skipping %s: %s", item, str(err))
498
499 # overwrite or append radio source items
500 if option not in (QueueOption.ADD, QueueOption.NEXT):
501 queue.radio_source = radio_source
502 else:
503 queue.radio_source += radio_source
504 # Use collected media items to calculate the radio if radio mode is on
505 if radio_mode:
506 radio_tracks = await self._get_radio_tracks(
507 queue_id=queue_id, is_initial_radio_mode=True
508 )
509 media_items = list(radio_tracks)
510
511 # only add valid/available items
512 queue_items: list[QueueItem] = []
513 for x in media_items:
514 if not x or not x.available:
515 continue
516 queue_items.append(
517 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
518 )
519
520 if not queue_items:
521 raise MediaNotFoundError("No playable items found")
522
523 # load the items into the queue
524 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
525 cur_index = queue.index_in_buffer or queue.current_index or 0
526 else:
527 cur_index = queue.current_index or 0
528 insert_at_index = cur_index + 1
529 # Radio modes are already shuffled in a pattern we would like to keep.
530 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
531
532 # handle replace: clear all items and replace with the new items
533 if option == QueueOption.REPLACE:
534 await self.load(
535 queue_id,
536 queue_items=queue_items,
537 keep_remaining=False,
538 keep_played=False,
539 shuffle=shuffle,
540 )
541 await self.play_index(queue_id, 0)
542 return
543 # handle next: add item(s) in the index next to the playing/loaded/buffered index
544 if option == QueueOption.NEXT:
545 await self.load(
546 queue_id,
547 queue_items=queue_items,
548 insert_at_index=insert_at_index,
549 shuffle=shuffle,
550 )
551 return
552 if option == QueueOption.REPLACE_NEXT:
553 await self.load(
554 queue_id,
555 queue_items=queue_items,
556 insert_at_index=insert_at_index,
557 keep_remaining=False,
558 shuffle=shuffle,
559 )
560 return
561 # handle play: replace current loaded/playing index with new item(s)
562 if option == QueueOption.PLAY:
563 await self.load(
564 queue_id,
565 queue_items=queue_items,
566 insert_at_index=insert_at_index,
567 shuffle=shuffle,
568 )
569 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
570 await self.play_index(queue_id, next_index)
571 return
572 # handle add: add/append item(s) to the remaining queue items
573 if option == QueueOption.ADD:
574 await self.load(
575 queue_id=queue_id,
576 queue_items=queue_items,
577 insert_at_index=insert_at_index
578 if queue.shuffle_enabled
579 else len(self._queue_items[queue_id]) + 1,
580 shuffle=queue.shuffle_enabled,
581 )
582 # handle edgecase, queue is empty and items are only added (not played)
583 # mark first item as new index
584 if queue.current_index is None:
585 queue.current_index = 0
586 queue.current_item = self.get_item(queue_id, 0)
587 queue.items = len(queue_items)
588 self.signal_update(queue_id)
589
590 @api_command("player_queues/move_item")
591 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
592 """
593 Move queue item x up/down the queue.
594
595 - queue_id: id of the queue to process this request.
596 - queue_item_id: the item_id of the queueitem that needs to be moved.
597 - pos_shift: move item x positions down if positive value
598 - pos_shift: move item x positions up if negative value
599 - pos_shift: move item to top of queue as next item if 0.
600 """
601 queue = self._queues[queue_id]
602 item_index = self.index_by_id(queue_id, queue_item_id)
603 if item_index is None:
604 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
605 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
606 msg = f"{item_index} is already played/buffered"
607 raise IndexError(msg)
608
609 queue_items = self._queue_items[queue_id]
610 queue_items = queue_items.copy()
611
612 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
613 new_index = (queue.current_index or 0) + 1
614 elif pos_shift == 0:
615 new_index = queue.current_index or 0
616 else:
617 new_index = item_index + pos_shift
618 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
619 return
620 # move the item in the list
621 queue_items.insert(new_index, queue_items.pop(item_index))
622 self.update_items(queue_id, queue_items)
623
624 @api_command("player_queues/move_item_end")
625 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
626 """
627 Move queue item to the end the queue.
628
629 - queue_id: id of the queue to process this request.
630 - queue_item_id: the item_id of the queueitem that needs to be moved.
631 """
632 queue = self._queues[queue_id]
633 item_index = self.index_by_id(queue_id, queue_item_id)
634 if item_index is None:
635 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
636 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
637 msg = f"{item_index} is already played/buffered"
638 raise IndexError(msg)
639
640 queue_items = self._queue_items[queue_id]
641 if item_index == (len(queue_items) - 1):
642 return
643 queue_items = queue_items.copy()
644
645 new_index = len(self._queue_items[queue_id]) - 1
646
647 # move the item in the list
648 queue_items.insert(new_index, queue_items.pop(item_index))
649 self.update_items(queue_id, queue_items)
650
651 @api_command("player_queues/delete_item")
652 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
653 """Delete item (by id or index) from the queue."""
654 if isinstance(item_id_or_index, str):
655 item_index = self.index_by_id(queue_id, item_id_or_index)
656 if item_index is None:
657 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
658 else:
659 item_index = item_id_or_index
660 queue = self._queues[queue_id]
661 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
662 # ignore request if track already loaded in the buffer
663 # the frontend should guard so this is just in case
664 self.logger.warning("delete requested for item already loaded in buffer")
665 return
666 queue_items = self._queue_items[queue_id]
667 queue_items.pop(item_index)
668 self.update_items(queue_id, queue_items)
669
670 @api_command("player_queues/clear")
671 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
672 """Clear all items in the queue."""
673 queue = self._queues[queue_id]
674 queue.radio_source = []
675 if queue.state != PlaybackState.IDLE and not skip_stop:
676 self.mass.create_task(self.stop(queue_id))
677 queue.current_index = None
678 queue.current_item = None
679 queue.elapsed_time = 0
680 queue.elapsed_time_last_updated = time.time()
681 queue.index_in_buffer = None
682 self.update_items(queue_id, [])
683
684 @api_command("player_queues/save_as_playlist")
685 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
686 """Save the current queue items as a new playlist.
687
688 :param queue_id: The queue_id of the queue to save.
689 :param name: The name for the new playlist.
690 """
691 if not self.get(queue_id):
692 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
693 queue_items = self._queue_items.get(queue_id, [])
694 if not queue_items:
695 raise QueueEmpty("Cannot save an empty queue as a playlist.")
696 # collect URIs from queue items that are playlist-compatible
697 uris: list[str] = []
698 for item in queue_items:
699 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
700 uris.append(item.uri)
701 if not uris:
702 raise InvalidDataError("No valid items in queue to save as playlist.")
703 playlist = await self.mass.music.playlists.create_playlist(name)
704 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
705 return playlist
706
707 @api_command("player_queues/stop")
708 async def stop(self, queue_id: str) -> None:
709 """
710 Handle STOP command for given queue.
711
712 - queue_id: queue_id of the playerqueue to handle the command.
713 """
714 queue_player = self.mass.players.get(queue_id, True)
715 if queue_player is None:
716 raise PlayerUnavailableError(f"Player {queue_id} is not available")
717 if (queue := self.get(queue_id)) and queue.active:
718 if queue.state == PlaybackState.PLAYING:
719 queue.resume_pos = int(queue.corrected_elapsed_time)
720 # forward the actual command to the player
721 if temp_player := self.mass.players.get(queue_id):
722 await temp_player.stop()
723
724 @api_command("player_queues/play")
725 async def play(self, queue_id: str) -> None:
726 """
727 Handle PLAY command for given queue.
728
729 - queue_id: queue_id of the playerqueue to handle the command.
730 """
731 queue_player = self.mass.players.get(queue_id, True)
732 if queue_player is None:
733 raise PlayerUnavailableError(f"Player {queue_id} is not available")
734 if (
735 (queue := self._queues.get(queue_id))
736 and queue.active
737 and queue.state == PlaybackState.PAUSED
738 ):
739 # forward the actual play/unpause command to the player
740 await queue_player.play()
741 return
742 # player is not paused, perform resume instead
743 await self.resume(queue_id)
744
745 @api_command("player_queues/pause")
746 async def pause(self, queue_id: str) -> None:
747 """Handle PAUSE command for given queue.
748
749 - queue_id: queue_id of the playerqueue to handle the command.
750 """
751 if queue := self._queues.get(queue_id):
752 if queue.state == PlaybackState.PLAYING:
753 queue.resume_pos = int(queue.corrected_elapsed_time)
754 # forward the actual command to the player controller
755 queue_player = self.mass.players.get(queue_id)
756 assert queue_player is not None # for type checking
757 if not (self.mass.players.get_player_provider(queue_id)):
758 return # guard
759
760 if PlayerFeature.PAUSE not in queue_player.supported_features:
761 # if player does not support pause, we need to send stop
762 await queue_player.stop()
763 return
764 await queue_player.pause()
765
766 async def _watch_pause() -> None:
767 count = 0
768 # wait for pause
769 while count < 5 and queue_player.playback_state == PlaybackState.PLAYING:
770 count += 1
771 await asyncio.sleep(1)
772 # wait for unpause
773 if queue_player.playback_state != PlaybackState.PAUSED:
774 return
775 count = 0
776 while count < 30 and queue_player.playback_state == PlaybackState.PAUSED:
777 count += 1
778 await asyncio.sleep(1)
779 # if player is still paused when the limit is reached, send stop
780 if queue_player.playback_state == PlaybackState.PAUSED:
781 await queue_player.stop()
782
783 # we auto stop a player from paused when its paused for 30 seconds
784 if not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
785 self.mass.create_task(_watch_pause())
786
787 @api_command("player_queues/play_pause")
788 async def play_pause(self, queue_id: str) -> None:
789 """Toggle play/pause on given playerqueue.
790
791 - queue_id: queue_id of the queue to handle the command.
792 """
793 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
794 await self.pause(queue_id)
795 return
796 await self.play(queue_id)
797
798 @api_command("player_queues/next")
799 async def next(self, queue_id: str) -> None:
800 """Handle NEXT TRACK command for given queue.
801
802 - queue_id: queue_id of the queue to handle the command.
803 """
804 if (queue := self.get(queue_id)) is None or not queue.active:
805 # TODO: forward to underlying player if not active
806 return
807 idx = self._queues[queue_id].current_index
808 if idx is None:
809 self.logger.warning("Queue %s has no current index", queue.display_name)
810 return
811 attempts = 5
812 while attempts:
813 try:
814 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
815 await self.play_index(queue_id, next_index, debounce=True)
816 break
817 except MediaNotFoundError:
818 self.logger.warning(
819 "Failed to fetch next track for queue %s - trying next item",
820 queue.display_name,
821 )
822 idx += 1
823 attempts -= 1
824
825 @api_command("player_queues/previous")
826 async def previous(self, queue_id: str) -> None:
827 """Handle PREVIOUS TRACK command for given queue.
828
829 - queue_id: queue_id of the queue to handle the command.
830 """
831 if (queue := self.get(queue_id)) is None or not queue.active:
832 # TODO: forward to underlying player if not active
833 return
834 current_index = self._queues[queue_id].current_index
835 if current_index is None:
836 return
837 next_index = int(current_index)
838 # restart current track if current track has played longer than 4
839 # otherwise skip to previous track
840 if self._queues[queue_id].elapsed_time < 5:
841 next_index = max(current_index - 1, 0)
842 await self.play_index(queue_id, next_index, debounce=True)
843
844 @api_command("player_queues/skip")
845 async def skip(self, queue_id: str, seconds: int = 10) -> None:
846 """Handle SKIP command for given queue.
847
848 - queue_id: queue_id of the queue to handle the command.
849 - seconds: number of seconds to skip in track. Use negative value to skip back.
850 """
851 if (queue := self.get(queue_id)) is None or not queue.active:
852 # TODO: forward to underlying player if not active
853 return
854 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
855
856 @api_command("player_queues/seek")
857 async def seek(self, queue_id: str, position: int = 10) -> None:
858 """Handle SEEK command for given queue.
859
860 - queue_id: queue_id of the queue to handle the command.
861 - position: position in seconds to seek to in the current playing item.
862 """
863 if not (queue := self.get(queue_id)):
864 return
865 queue_player = self.mass.players.get(queue_id, True)
866 if queue_player is None:
867 raise PlayerUnavailableError(f"Player {queue_id} is not available")
868 if not queue.current_item:
869 raise InvalidCommand(f"Queue {queue_player.display_name} has no item(s) loaded.")
870 if not queue.current_item.duration:
871 raise InvalidCommand("Can not seek items without duration.")
872 position = max(0, int(position))
873 if position > queue.current_item.duration:
874 raise InvalidCommand("Can not seek outside of duration range.")
875 if queue.current_index is None:
876 raise InvalidCommand(f"Queue {queue_player.display_name} has no current index.")
877 await self.play_index(queue_id, queue.current_index, seek_position=position)
878
879 @api_command("player_queues/resume")
880 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
881 """Handle RESUME command for given queue.
882
883 - queue_id: queue_id of the queue to handle the command.
884 """
885 queue = self._queues[queue_id]
886 queue_items = self._queue_items[queue_id]
887 resume_item = queue.current_item
888 if queue.state == PlaybackState.PLAYING:
889 # resume requested while already playing,
890 # use current position as resume position
891 resume_pos = queue.corrected_elapsed_time
892 fade_in = False
893 else:
894 resume_pos = queue.resume_pos or queue.elapsed_time
895
896 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
897 resume_item = self.get_item(queue_id, queue.current_index)
898 resume_pos = 0
899 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
900 # items available in queue but no previous track, start at 0
901 resume_item = self.get_item(queue_id, 0)
902 resume_pos = 0
903
904 if resume_item is not None:
905 queue_player = self.mass.players.get(queue_id)
906 if queue_player is None:
907 raise PlayerUnavailableError(f"Player {queue_id} is not available")
908 if (
909 fade_in is None
910 and queue_player.playback_state == PlaybackState.IDLE
911 and (time.time() - queue.elapsed_time_last_updated) > 60
912 ):
913 # enable fade in effect if the player is idle for a while
914 fade_in = resume_pos > 0
915 if resume_item.media_type == MediaType.RADIO:
916 # we're not able to skip in online radio so this is pointless
917 resume_pos = 0
918 await self.play_index(
919 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
920 )
921 else:
922 # Queue is empty, try to resume from playlog
923 if await self._try_resume_from_playlog(queue):
924 return
925 msg = f"Resume queue requested but queue {queue.display_name} is empty"
926 raise QueueEmpty(msg)
927
928 @api_command("player_queues/play_index")
929 async def play_index(
930 self,
931 queue_id: str,
932 index: int | str,
933 seek_position: int = 0,
934 fade_in: bool = False,
935 debounce: bool = False,
936 ) -> None:
937 """Play item at index (or item_id) X in queue."""
938 queue = self._queues[queue_id]
939 queue.resume_pos = 0
940 if isinstance(index, str):
941 temp_index = self.index_by_id(queue_id, index)
942 if temp_index is None:
943 raise InvalidDataError(f"Item {index} not found in queue")
944 index = temp_index
945 # At this point index is guaranteed to be int
946 queue.current_index = index
947 # update current item and elapsed time and signal update
948 # this way the UI knows immediately that a new item is loading
949 queue.current_item = self.get_item(queue_id, index)
950 queue.elapsed_time = seek_position
951 queue.elapsed_time_last_updated = time.time()
952 self.signal_update(queue_id)
953 queue.index_in_buffer = index
954 queue.flow_mode_stream_log = []
955 target_player = self.mass.players.get(queue_id)
956 if target_player is None:
957 raise PlayerUnavailableError(f"Player {queue_id} is not available")
958 queue.next_item_id_enqueued = None
959 # always update session id when we start a new playback session
960 queue.session_id = shortuuid.random(length=8)
961 # handle resume point of audiobook(chapter) or podcast(episode)
962 if (
963 not seek_position
964 and (queue_item := self.get_item(queue_id, index))
965 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
966 ):
967 seek_position = max(0, int((resume_position_ms - 500) / 1000))
968
969 # send play_media request to player
970 # NOTE that we debounce this a bit to account for someone hitting the next button
971 # like a madman. This will prevent the player from being overloaded with requests.
972 async def _play_index(index: int, debounce: bool) -> None:
973 for attempt in range(5):
974 try:
975 queue_item = self.get_item(queue_id, index)
976 if not queue_item:
977 continue # guard
978 await self._load_item(
979 queue_item,
980 self._get_next_index(queue_id, index),
981 is_start=True,
982 seek_position=seek_position if attempt == 0 else 0,
983 fade_in=fade_in if attempt == 0 else False,
984 )
985 # if we reach this point, loading the item succeeded, break the loop
986 queue.current_index = index
987 queue.current_item = queue_item
988 break
989 except (MediaNotFoundError, AudioError):
990 # the requested index can not be played.
991 if queue_item:
992 self.logger.warning(
993 "Skipping unplayable item %s (%s)",
994 queue_item.name,
995 queue_item.uri,
996 )
997 queue_item.available = False
998 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
999 if next_index is None:
1000 raise MediaNotFoundError("No next item available")
1001 index = next_index
1002 else:
1003 # all attempts to find a playable item failed
1004 raise MediaNotFoundError("No playable item found to start playback")
1005
1006 # work out if we need to use flow mode
1007 flow_mode = target_player.flow_mode and queue_item.media_type not in (
1008 # don't use flow mode for duration-less streams
1009 MediaType.RADIO,
1010 MediaType.PLUGIN_SOURCE,
1011 )
1012 await asyncio.sleep(0.5 if debounce else 0.1)
1013 queue.flow_mode = flow_mode
1014 await self.mass.players.play_media(
1015 player_id=queue_id,
1016 media=await self.player_media_from_queue_item(queue_item, flow_mode),
1017 )
1018 queue.current_index = index
1019 queue.current_item = queue_item
1020 await asyncio.sleep(2)
1021 self._transitioning_players.discard(queue_id)
1022
1023 # we set a flag to notify the update logic that we're transitioning to a new track
1024 self._transitioning_players.add(queue_id)
1025
1026 # we debounce the play_index command to handle the case where someone
1027 # is spamming next/previous on the player
1028 task_id = f"play_index_{queue_id}"
1029 if existing_task := self.mass.get_task(task_id):
1030 existing_task.cancel()
1031 with suppress(asyncio.CancelledError):
1032 await existing_task
1033 task = self.mass.create_task(
1034 _play_index,
1035 index,
1036 debounce,
1037 task_id=task_id,
1038 )
1039 await task
1040 self.signal_update(queue_id)
1041
1042 @api_command("player_queues/transfer")
1043 async def transfer_queue(
1044 self,
1045 source_queue_id: str,
1046 target_queue_id: str,
1047 auto_play: bool | None = None,
1048 ) -> None:
1049 """Transfer queue to another queue."""
1050 if not (source_queue := self.get(source_queue_id)):
1051 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1052 if not (target_queue := self.get(target_queue_id)):
1053 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1054 if auto_play is None:
1055 auto_play = source_queue.state == PlaybackState.PLAYING
1056
1057 target_player = self.mass.players.get(target_queue_id)
1058 if target_player is None:
1059 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1060 if target_player.active_group or target_player.synced_to:
1061 # edge case: the user wants to move playback from the group as a whole, to a single
1062 # player in the group or it is grouped and the command targeted at the single player.
1063 # We need to dissolve the group first.
1064 group_id = target_player.active_group or target_player.synced_to
1065 assert group_id is not None # checked in if condition above
1066 await self.mass.players.cmd_ungroup(group_id)
1067 await asyncio.sleep(3)
1068
1069 source_items = self._queue_items[source_queue_id]
1070 target_queue.repeat_mode = source_queue.repeat_mode
1071 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1072 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1073 target_queue.radio_source = source_queue.radio_source
1074 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1075 target_queue.resume_pos = int(source_queue.elapsed_time)
1076 target_queue.current_index = source_queue.current_index
1077 if source_queue.current_item:
1078 target_queue.current_item = source_queue.current_item
1079 target_queue.current_item.queue_id = target_queue_id
1080 self.clear(source_queue_id)
1081
1082 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1083 for item in source_items:
1084 item.queue_id = target_queue_id
1085 self.update_items(target_queue_id, source_items)
1086 if auto_play:
1087 await self.resume(target_queue_id)
1088
1089 # Interaction with player
1090
1091 async def on_player_register(self, player: Player) -> None:
1092 """Register PlayerQueue for given player/queue id."""
1093 queue_id = player.player_id
1094 queue: PlayerQueue | None = None
1095 queue_items: list[QueueItem] = []
1096 # try to restore previous state
1097 if prev_state := await self.mass.cache.get(
1098 key=queue_id,
1099 provider=self.domain,
1100 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1101 ):
1102 try:
1103 queue = PlayerQueue.from_dict(prev_state)
1104 prev_items = await self.mass.cache.get(
1105 key=queue_id,
1106 provider=self.domain,
1107 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1108 default=[],
1109 )
1110 queue_items = []
1111 for idx, item_data in enumerate(prev_items):
1112 qi = QueueItem.from_cache(item_data)
1113 if not qi.media_item:
1114 # Skip items with missing media_item - this can happen if
1115 # MA was killed during shutdown while cache was being written
1116 self.logger.debug(
1117 "Skipping queue item %s (index %d) restored from cache "
1118 "without media_item",
1119 qi.name,
1120 idx,
1121 )
1122 continue
1123 queue_items.append(qi)
1124 if queue.enqueued_media_items:
1125 # we need to restore the MediaItem objects for the enqueued media items
1126 # Items from cache may be dicts that need deserialization
1127 restored_enqueued_items: list[MediaItemType] = []
1128 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1129 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1130 )
1131 for item in cached_items:
1132 if isinstance(item, dict):
1133 restored_item = media_from_dict(item)
1134 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1135 else:
1136 restored_enqueued_items.append(item)
1137 queue.enqueued_media_items = restored_enqueued_items
1138 except Exception as err:
1139 self.logger.warning(
1140 "Failed to restore the queue(items) for %s - %s",
1141 player.display_name,
1142 str(err),
1143 )
1144 # Reset to clean state on failure
1145 queue = None
1146 queue_items = []
1147 if queue is None:
1148 queue = PlayerQueue(
1149 queue_id=queue_id,
1150 active=False,
1151 display_name=player.display_name,
1152 available=player.available,
1153 dont_stop_the_music_enabled=False,
1154 items=0,
1155 )
1156
1157 self._queues[queue_id] = queue
1158 self._queue_items[queue_id] = queue_items
1159 # always call update to calculate state etc
1160 self.on_player_update(player, {})
1161 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1162
1163 def on_player_update(
1164 self,
1165 player: Player,
1166 changed_values: dict[str, tuple[Any, Any]],
1167 ) -> None:
1168 """
1169 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1170
1171 NOTE: This is called every second if the player is playing.
1172 """
1173 queue_id = player.player_id
1174 if (queue := self._queues.get(queue_id)) is None:
1175 # race condition
1176 return
1177 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1178 # do nothing while the announcement is in progress
1179 return
1180 # determine if this queue is currently active for this player
1181 queue.active = player.active_source in (queue.queue_id, None)
1182 if not queue.active and queue_id not in self._prev_states:
1183 queue.state = PlaybackState.IDLE
1184 # return early if the queue is not active and we have no previous state
1185 return
1186 if queue.queue_id in self._transitioning_players:
1187 # we're currently transitioning to a new track,
1188 # ignore updates from the player during this time
1189 return
1190
1191 # queue is active and preflight checks passed, update the queue details
1192 self._update_queue_from_player(player)
1193
1194 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1195 """Call when a player is removed from the registry."""
1196 if permanent:
1197 # if the player is permanently removed, we also remove the cached queue data
1198 self.mass.create_task(
1199 self.mass.cache.delete(
1200 key=player_id,
1201 provider=self.domain,
1202 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1203 )
1204 )
1205 self.mass.create_task(
1206 self.mass.cache.delete(
1207 key=player_id,
1208 provider=self.domain,
1209 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1210 )
1211 )
1212 self._queues.pop(player_id, None)
1213 self._queue_items.pop(player_id, None)
1214
1215 async def load_next_queue_item(
1216 self,
1217 queue_id: str,
1218 current_item_id: str,
1219 ) -> QueueItem:
1220 """
1221 Call when a player wants the next queue item to play.
1222
1223 Raises QueueEmpty if there are no more tracks left.
1224 """
1225 queue = self.get(queue_id)
1226 if not queue:
1227 msg = f"PlayerQueue {queue_id} is not available"
1228 raise PlayerUnavailableError(msg)
1229 cur_index = self.index_by_id(queue_id, current_item_id)
1230 if cur_index is None:
1231 # this is just a guard for bad data
1232 raise QueueEmpty("Invalid item id for queue given.")
1233 next_item: QueueItem | None = None
1234 idx = 0
1235 while True:
1236 next_index = self._get_next_index(queue_id, cur_index + idx)
1237 if next_index is None:
1238 raise QueueEmpty("No more tracks left in the queue.")
1239 queue_item = self.get_item(queue_id, next_index)
1240 if queue_item is None:
1241 raise QueueEmpty("No more tracks left in the queue.")
1242 if idx >= 10:
1243 # we only allow 10 retries to prevent infinite loops
1244 raise QueueEmpty("No more (playable) tracks left in the queue.")
1245 try:
1246 await self._load_item(queue_item, next_index)
1247 # we're all set, this is our next item
1248 next_item = queue_item
1249 break
1250 except (MediaNotFoundError, AudioError):
1251 # No stream details found, skip this QueueItem
1252 self.logger.warning(
1253 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1254 )
1255 queue_item.available = False
1256 idx += 1
1257 if idx != 0:
1258 # we skipped some items, signal a queue items update
1259 self.update_items(queue_id, self._queue_items[queue_id])
1260 if next_item is None:
1261 raise QueueEmpty("No more (playable) tracks left in the queue.")
1262
1263 return next_item
1264
1265 async def _load_item(
1266 self,
1267 queue_item: QueueItem,
1268 next_index: int | None,
1269 is_start: bool = False,
1270 seek_position: int = 0,
1271 fade_in: bool = False,
1272 ) -> None:
1273 """Try to load the stream details for the given queue item."""
1274 queue_id = queue_item.queue_id
1275 queue = self._queues[queue_id]
1276
1277 # we use a contextvar to bypass the throttler for this asyncio task/context
1278 # this makes sure that playback has priority over other requests that may be
1279 # happening in the background
1280 BYPASS_THROTTLER.set(True)
1281
1282 self.logger.debug(
1283 "(pre)loading (next) item for queue %s...",
1284 queue.display_name,
1285 )
1286
1287 if not queue_item.available:
1288 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1289
1290 # work out if we are playing an album and if we should prefer album
1291 # loudness
1292 next_track_from_same_album = (
1293 next_index is not None
1294 and (next_item := self.get_item(queue_id, next_index))
1295 and (
1296 queue_item.media_item
1297 and hasattr(queue_item.media_item, "album")
1298 and queue_item.media_item.album
1299 and next_item.media_item
1300 and hasattr(next_item.media_item, "album")
1301 and next_item.media_item.album
1302 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1303 )
1304 )
1305 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1306 if current_index is None:
1307 previous_track_from_same_album = False
1308 else:
1309 previous_index = max(current_index - 1, 0)
1310 previous_track_from_same_album = (
1311 previous_index > 0
1312 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1313 and previous_item.media_item is not None
1314 and hasattr(previous_item.media_item, "album")
1315 and previous_item.media_item.album is not None
1316 and queue_item.media_item is not None
1317 and hasattr(queue_item.media_item, "album")
1318 and queue_item.media_item.album is not None
1319 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1320 )
1321 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1322 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1323 album = queue_item.media_item.album
1324 # prefer the full library media item so we have all metadata and provider(quality) info
1325 # always request the full library item as there might be other qualities available
1326 if library_item := await self.mass.music.get_library_item_by_prov_id(
1327 queue_item.media_item.media_type,
1328 queue_item.media_item.item_id,
1329 queue_item.media_item.provider,
1330 ):
1331 queue_item.media_item = cast("Track", library_item)
1332 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1333 "ytmusic"
1334 ):
1335 # Youtube Music has poor thumbs by default, so we always fetch the full item
1336 # this also catches the case where they have an unavailable item in a listing
1337 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1338 queue_item.media_item = cast("Track", fetched_item)
1339
1340 # ensure we got the full (original) album set
1341 if album and (
1342 library_album := await self.mass.music.get_library_item_by_prov_id(
1343 album.media_type,
1344 album.item_id,
1345 album.provider,
1346 )
1347 ):
1348 queue_item.media_item.album = cast("Album", library_album)
1349 elif album:
1350 # Restore original album if we have no better alternative from the library
1351 queue_item.media_item.album = album
1352 # prefer album image over track image
1353 if queue_item.media_item.album and queue_item.media_item.album.image:
1354 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1355 queue_item.media_item.metadata.images = UniqueList(
1356 [
1357 queue_item.media_item.album.image,
1358 *org_images,
1359 ]
1360 )
1361 # Fetch the streamdetails, which could raise in case of an unplayable item.
1362 # For example, YT Music returns Radio Items that are not playable.
1363 queue_item.streamdetails = await get_stream_details(
1364 mass=self.mass,
1365 queue_item=queue_item,
1366 seek_position=seek_position,
1367 fade_in=fade_in,
1368 prefer_album_loudness=bool(playing_album_tracks),
1369 )
1370
1371 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1372 """Call when a player has (started) loading a track in the buffer."""
1373 queue = self.get(queue_id)
1374 if not queue:
1375 msg = f"PlayerQueue {queue_id} is not available"
1376 raise PlayerUnavailableError(msg)
1377 # store the index of the item that is currently (being) loaded in the buffer
1378 # which helps us a bit to determine how far the player has buffered ahead
1379 queue.index_in_buffer = self.index_by_id(queue_id, item_id)
1380 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1381 self.signal_update(queue_id)
1382 # preload next streamdetails
1383 self._preload_next_item(queue_id, item_id)
1384
1385 # Main queue manipulation methods
1386
1387 async def load(
1388 self,
1389 queue_id: str,
1390 queue_items: list[QueueItem],
1391 insert_at_index: int = 0,
1392 keep_remaining: bool = True,
1393 keep_played: bool = True,
1394 shuffle: bool = False,
1395 ) -> None:
1396 """Load new items at index.
1397
1398 - queue_id: id of the queue to process this request.
1399 - queue_items: a list of QueueItems
1400 - insert_at_index: insert the item(s) at this index
1401 - keep_remaining: keep the remaining items after the insert
1402 - shuffle: (re)shuffle the items after insert index
1403 """
1404 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1405 next_items = queue_items
1406
1407 # if keep_remaining, append the old 'next' items
1408 if keep_remaining:
1409 next_items += self._queue_items[queue_id][insert_at_index:]
1410
1411 # we set the original insert order as attribute so we can un-shuffle
1412 for index, item in enumerate(next_items):
1413 item.sort_index += insert_at_index + index
1414 # (re)shuffle the final batch if needed
1415 if shuffle:
1416 next_items = await _smart_shuffle(next_items)
1417 self.update_items(queue_id, prev_items + next_items)
1418
1419 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1420 """Update the existing queue items, mostly caused by reordering."""
1421 self._queue_items[queue_id] = queue_items
1422 queue = self._queues[queue_id]
1423 queue.items = len(self._queue_items[queue_id])
1424 # to track if the queue items changed we set a timestamp
1425 # this is a simple way to detect changes in the list of items
1426 # without having to compare the entire list
1427 queue.items_last_updated = time.time()
1428 self.signal_update(queue_id, True)
1429 if (
1430 queue.state == PlaybackState.PLAYING
1431 and queue.index_in_buffer is not None
1432 and queue.index_in_buffer == queue.current_index
1433 ):
1434 # if the queue is playing,
1435 # ensure to (re)queue the next track because it might have changed
1436 # note that we only do this if the player has loaded the current track
1437 # if not, we wait until it has loaded to prevent conflicts
1438 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1439 self._enqueue_next_item(queue_id, next_item)
1440
1441 # Helper methods
1442
1443 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1444 """Get queue item by index or item_id."""
1445 if item_id_or_index is None:
1446 return None
1447 if (queue_items := self._queue_items.get(queue_id)) is None:
1448 return None
1449 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1450 return queue_items[item_id_or_index]
1451 if isinstance(item_id_or_index, str):
1452 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1453 return None
1454
1455 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1456 """Signal state changed of given queue."""
1457 queue = self._queues[queue_id]
1458 if items_changed:
1459 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1460 # save items in cache - only cache items with valid media_item
1461 cache_data = [
1462 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1463 ]
1464 self.mass.create_task(
1465 self.mass.cache.set(
1466 key=queue_id,
1467 data=cache_data,
1468 provider=self.domain,
1469 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1470 )
1471 )
1472 # always send the base event
1473 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1474 # also signal update to the player itself so it can update its current_media
1475 self.mass.players.trigger_player_update(queue_id)
1476 # save state
1477 self.mass.create_task(
1478 self.mass.cache.set(
1479 key=queue_id,
1480 data=queue.to_cache(),
1481 provider=self.domain,
1482 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1483 )
1484 )
1485
1486 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1487 """Get index by queue_item_id."""
1488 queue_items = self._queue_items[queue_id]
1489 for index, item in enumerate(queue_items):
1490 if item.queue_item_id == queue_item_id:
1491 return index
1492 return None
1493
1494 async def player_media_from_queue_item(
1495 self, queue_item: QueueItem, flow_mode: bool
1496 ) -> PlayerMedia:
1497 """Parse PlayerMedia from QueueItem."""
1498 queue = self._queues[queue_item.queue_id]
1499 if flow_mode:
1500 duration = None
1501 elif queue_item.streamdetails:
1502 # prefer netto duration
1503 # when seeking, the player only receives the remaining duration
1504 duration = queue_item.streamdetails.duration or queue_item.duration
1505 if duration and queue_item.streamdetails.seek_position:
1506 duration = duration - queue_item.streamdetails.seek_position
1507 else:
1508 duration = queue_item.duration
1509 if queue.session_id is None:
1510 # handle error or return early
1511 raise InvalidDataError("Queue session_id is None")
1512 media = PlayerMedia(
1513 uri=await self.mass.streams.resolve_stream_url(
1514 queue.session_id, queue_item, flow_mode=flow_mode
1515 ),
1516 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1517 title="Music Assistant" if flow_mode else queue_item.name,
1518 image_url=MASS_LOGO_ONLINE,
1519 duration=duration,
1520 source_id=queue_item.queue_id,
1521 queue_item_id=queue_item.queue_item_id,
1522 )
1523 if not flow_mode and queue_item.media_item:
1524 media.title = queue_item.media_item.name
1525 media.artist = getattr(queue_item.media_item, "artist_str", "")
1526 media.album = (
1527 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1528 )
1529 if queue_item.image:
1530 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1531 # we prefer the imageproxy on the streamserver here because this request is sent
1532 # to the player itself which may not be able to reach the regular webserver
1533 media.image_url = self.mass.metadata.get_image_url(
1534 queue_item.image, size=500, prefer_stream_server=True
1535 )
1536 return media
1537
1538 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1539 """Return tracks for given artist, based on user preference."""
1540 artist_items_conf = self.mass.config.get_raw_core_config_value(
1541 self.domain,
1542 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1543 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1544 )
1545 self.logger.info(
1546 "Fetching tracks to play for artist %s",
1547 artist.name,
1548 )
1549 if artist_items_conf in ("library_tracks", "all_tracks"):
1550 all_items = await self.mass.music.artists.tracks(
1551 artist.item_id,
1552 artist.provider,
1553 in_library_only=artist_items_conf == "library_tracks",
1554 )
1555 random.shuffle(all_items)
1556 return all_items
1557 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1558 all_tracks: list[Track] = []
1559 for library_album in await self.mass.music.artists.albums(
1560 artist.item_id,
1561 artist.provider,
1562 in_library_only=artist_items_conf == "library_album_tracks",
1563 ):
1564 for album_track in await self.mass.music.albums.tracks(
1565 library_album.item_id, library_album.provider
1566 ):
1567 if album_track not in all_tracks:
1568 all_tracks.append(album_track)
1569 random.shuffle(all_tracks)
1570 return all_tracks
1571 return []
1572
1573 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1574 """Return tracks for given album, based on user preference."""
1575 album_items_conf = self.mass.config.get_raw_core_config_value(
1576 self.domain,
1577 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1578 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1579 )
1580 result: list[Track] = []
1581 start_item_found = False
1582 self.logger.info(
1583 "Fetching tracks to play for album %s",
1584 album.name,
1585 )
1586 for album_track in await self.mass.music.albums.tracks(
1587 item_id=album.item_id,
1588 provider_instance_id_or_domain=album.provider,
1589 in_library_only=album_items_conf == "library_tracks",
1590 ):
1591 if not album_track.available:
1592 continue
1593 if start_item in (album_track.item_id, album_track.uri):
1594 start_item_found = True
1595 if start_item is not None and not start_item_found:
1596 continue
1597 result.append(album_track)
1598 return result
1599
1600 async def get_playlist_tracks(
1601 self, playlist: Playlist, start_item: str | None
1602 ) -> list[PlaylistPlayableItem]:
1603 """Return tracks for given playlist, based on user preference."""
1604 result: list[PlaylistPlayableItem] = []
1605 start_item_found = False
1606 self.logger.info(
1607 "Fetching tracks to play for playlist %s",
1608 playlist.name,
1609 )
1610 # TODO: Handle other sort options etc.
1611 async for playlist_track in self.mass.music.playlists.tracks(
1612 playlist.item_id, playlist.provider
1613 ):
1614 if not playlist_track.available:
1615 continue
1616 if start_item in (playlist_track.item_id, playlist_track.uri):
1617 start_item_found = True
1618 if start_item is not None and not start_item_found:
1619 continue
1620 result.append(playlist_track)
1621 return result
1622
1623 async def get_audiobook_resume_point(
1624 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1625 ) -> int:
1626 """Return resume point (in milliseconds) for given audio book."""
1627 self.logger.debug(
1628 "Fetching resume point to play for audio book %s",
1629 audio_book.name,
1630 )
1631 if chapter is not None:
1632 # user explicitly selected a chapter to play
1633 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1634 if chapters := audio_book.metadata.chapters:
1635 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1636 return int(_chapter.start * 1000)
1637 raise InvalidDataError(
1638 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1639 )
1640 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1641 audio_book, userid=userid
1642 )
1643 return 0 if full_played else resume_position_ms
1644
1645 async def get_next_podcast_episodes(
1646 self,
1647 podcast: Podcast | None,
1648 episode: PodcastEpisode | str | None,
1649 userid: str | None = None,
1650 ) -> UniqueList[PodcastEpisode]:
1651 """Return (next) episode(s) and resume point for given podcast."""
1652 if podcast is None and isinstance(episode, str | NoneType):
1653 raise InvalidDataError("Either podcast or episode must be provided")
1654 if podcast is None:
1655 # single podcast episode requested
1656 assert isinstance(episode, PodcastEpisode) # checked above
1657 self.logger.debug(
1658 "Fetching resume point to play for Podcast episode %s",
1659 episode.name,
1660 )
1661 (
1662 fully_played,
1663 resume_position_ms,
1664 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1665 episode.fully_played = fully_played
1666 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1667 return UniqueList([episode])
1668 # podcast with optional start episode requested
1669 self.logger.debug(
1670 "Fetching episode(s) and resume point to play for Podcast %s",
1671 podcast.name,
1672 )
1673 all_episodes = [
1674 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1675 ]
1676 all_episodes.sort(key=lambda x: x.position)
1677 # if a episode was provided, a user explicitly selected a episode to play
1678 # so we need to find the index of the episode in the list
1679 resolved_episode: PodcastEpisode | None = None
1680 if isinstance(episode, PodcastEpisode):
1681 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1682 if resolved_episode:
1683 # ensure we have accurate resume info
1684 (
1685 fully_played,
1686 resume_position_ms,
1687 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1688 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1689 elif isinstance(episode, str):
1690 resolved_episode = next(
1691 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1692 )
1693 if resolved_episode:
1694 # ensure we have accurate resume info
1695 (
1696 fully_played,
1697 resume_position_ms,
1698 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1699 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1700 else:
1701 # get first episode that is not fully played
1702 for ep in all_episodes:
1703 if ep.fully_played:
1704 continue
1705 # ensure we have accurate resume info
1706 (
1707 fully_played,
1708 resume_position_ms,
1709 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1710 if fully_played:
1711 continue
1712 ep.resume_position_ms = resume_position_ms
1713 resolved_episode = ep
1714 break
1715 else:
1716 # no episodes found that are not fully played, so we start at the beginning
1717 resolved_episode = next((x for x in all_episodes), None)
1718 if resolved_episode is None:
1719 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1720 # get the index of the episode
1721 episode_index = all_episodes.index(resolved_episode)
1722 # return the (remaining) episode(s) to play
1723 return UniqueList(all_episodes[episode_index:])
1724
1725 def _get_next_index(
1726 self,
1727 queue_id: str,
1728 cur_index: int | None,
1729 is_skip: bool = False,
1730 allow_repeat: bool = True,
1731 ) -> int | None:
1732 """
1733 Return the next index for the queue, accounting for repeat settings.
1734
1735 Will return None if there are no (more) items in the queue.
1736 """
1737 queue = self._queues[queue_id]
1738 queue_items = self._queue_items[queue_id]
1739 if not queue_items or cur_index is None:
1740 # queue is empty
1741 return None
1742 # handle repeat single track
1743 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1744 return cur_index if allow_repeat else None
1745 # handle cur_index is last index of the queue
1746 if cur_index >= (len(queue_items) - 1):
1747 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1748 # if repeat all is enabled, we simply start again from the beginning
1749 return 0
1750 return None
1751 # all other: just the next index
1752 return cur_index + 1
1753
1754 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1755 """Return next QueueItem for given queue."""
1756 index: int
1757 if isinstance(cur_index, str):
1758 resolved_index = self.index_by_id(queue_id, cur_index)
1759 if resolved_index is None:
1760 return None # guard
1761 index = resolved_index
1762 else:
1763 index = cur_index
1764 # At this point index is guaranteed to be int
1765 for skip in range(5):
1766 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1767 break
1768 next_item = self.get_item(queue_id, next_index)
1769 if next_item is None:
1770 continue
1771 if not next_item.available:
1772 # ensure that we skip unavailable items (set by load_next track logic)
1773 continue
1774 return next_item
1775 return None
1776
1777 async def _fill_radio_tracks(self, queue_id: str) -> None:
1778 """Fill a Queue with (additional) Radio tracks."""
1779 self.logger.debug(
1780 "Filling radio tracks for queue %s",
1781 queue_id,
1782 )
1783 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1784 # fill queue - filter out unavailable items
1785 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1786 await self.load(
1787 queue_id,
1788 queue_items,
1789 insert_at_index=len(self._queue_items[queue_id]) + 1,
1790 )
1791
1792 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1793 """Enqueue the next item on the player."""
1794 if not next_item:
1795 # no next item, nothing to do...
1796 return
1797
1798 queue = self._queues[queue_id]
1799 if queue.flow_mode:
1800 # ignore this for flow mode
1801 return
1802
1803 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1804 await self.mass.players.enqueue_next_media(
1805 player_id=queue_id,
1806 media=await self.player_media_from_queue_item(next_item, False),
1807 )
1808 if queue.next_item_id_enqueued != next_item.queue_item_id:
1809 queue.next_item_id_enqueued = next_item.queue_item_id
1810 self.logger.debug(
1811 "Enqueued next track %s on queue %s",
1812 next_item.name,
1813 self._queues[queue_id].display_name,
1814 )
1815
1816 task_id = f"enqueue_next_item_{queue_id}"
1817 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1818
1819 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1820 """
1821 Preload the streamdetails for the next item in the queue/buffer.
1822
1823 This basically ensures the item is playable and fetches the stream details.
1824 If an error occurs, the item will be skipped and the next item will be loaded.
1825 """
1826 queue = self._queues[queue_id]
1827
1828 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1829 try:
1830 # wait for the item that was loaded in the buffer is the actually playing item
1831 # this prevents a race condition when we preload the next item too soon
1832 # while the player is actually preloading the previously enqueued item.
1833 retries = 120
1834 while retries > 0:
1835 if not queue.current_item:
1836 return # guard
1837 if queue.current_item.queue_item_id == item_id_in_buffer:
1838 break
1839 retries -= 1
1840 await asyncio.sleep(1)
1841 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1842 self.logger.debug(
1843 "Preloaded next item %s for queue %s",
1844 next_item.name,
1845 queue.display_name,
1846 )
1847 # enqueue the next item on the player
1848 self._enqueue_next_item(queue_id, next_item)
1849
1850 except QueueEmpty:
1851 return
1852
1853 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1854 # this should not happen, but guard anyways
1855 return
1856 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1857 # radio items or no duration, nothing to do
1858 return
1859
1860 task_id = f"preload_next_item_{queue_id}"
1861 self.mass.create_task(
1862 _preload_streamdetails,
1863 item_id_in_buffer,
1864 task_id=task_id,
1865 abort_existing=True,
1866 )
1867
1868 async def _resolve_media_items(
1869 self,
1870 media_item: MediaItemType | ItemMapping | BrowseFolder,
1871 start_item: str | None = None,
1872 userid: str | None = None,
1873 queue_id: str | None = None,
1874 ) -> list[MediaItemType]:
1875 """Resolve/unwrap media items to enqueue."""
1876 # resolve Itemmapping to full media item
1877 if isinstance(media_item, ItemMapping):
1878 if media_item.uri is None:
1879 raise InvalidDataError("ItemMapping has no URI")
1880 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1881 if media_item.media_type == MediaType.PLAYLIST:
1882 media_item = cast("Playlist", media_item)
1883 self.mass.create_task(
1884 self.mass.music.mark_item_played(
1885 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1886 )
1887 )
1888 return list(await self.get_playlist_tracks(media_item, start_item))
1889 if media_item.media_type == MediaType.ARTIST:
1890 media_item = cast("Artist", media_item)
1891 self.mass.create_task(
1892 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1893 )
1894 return list(await self.get_artist_tracks(media_item))
1895 if media_item.media_type == MediaType.ALBUM:
1896 media_item = cast("Album", media_item)
1897 self.mass.create_task(
1898 self.mass.music.mark_item_played(
1899 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1900 )
1901 )
1902 return list(await self.get_album_tracks(media_item, start_item))
1903 if media_item.media_type == MediaType.AUDIOBOOK:
1904 media_item = cast("Audiobook", media_item)
1905 # ensure we grab the correct/latest resume point info
1906 media_item.resume_position_ms = await self.get_audiobook_resume_point(
1907 media_item, start_item, userid=userid
1908 )
1909 return [media_item]
1910 if media_item.media_type == MediaType.PODCAST:
1911 media_item = cast("Podcast", media_item)
1912 self.mass.create_task(
1913 self.mass.music.mark_item_played(
1914 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1915 )
1916 )
1917 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
1918 if media_item.media_type == MediaType.PODCAST_EPISODE:
1919 media_item = cast("PodcastEpisode", media_item)
1920 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
1921 if media_item.media_type == MediaType.FOLDER:
1922 media_item = cast("BrowseFolder", media_item)
1923 return list(await self._get_folder_tracks(media_item))
1924 # all other: single track or radio item
1925 return [cast("MediaItemType", media_item)]
1926
1927 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
1928 """Try to resume playback from playlog when queue is empty.
1929
1930 Attempts to find user-initiated recently played items in the following order:
1931 1. By userid AND queue_id
1932 2. By queue_id only
1933 3. By userid only (if available)
1934 4. Any recently played item
1935
1936 :param queue: The queue to resume playback on.
1937 :return: True if playback was started, False otherwise.
1938 """
1939 # Try different filter combinations in order of specificity
1940 filter_attempts: list[tuple[str | None, str | None, str]] = []
1941 if queue.userid:
1942 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
1943 filter_attempts.append((None, queue.queue_id, "queue_id match"))
1944 if queue.userid:
1945 filter_attempts.append((queue.userid, None, "userid match"))
1946 filter_attempts.append((None, None, "any recent item"))
1947
1948 for userid, queue_id, match_type in filter_attempts:
1949 items = await self.mass.music.recently_played(
1950 limit=5,
1951 fully_played_only=False,
1952 user_initiated_only=True,
1953 userid=userid,
1954 queue_id=queue_id,
1955 )
1956 for item in items:
1957 if not item.uri:
1958 continue
1959 try:
1960 await self.play_media(queue.queue_id, item)
1961 self.logger.info(
1962 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
1963 )
1964 return True
1965 except MusicAssistantError as err:
1966 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
1967 continue
1968
1969 return False
1970
1971 async def _get_radio_tracks(
1972 self, queue_id: str, is_initial_radio_mode: bool = False
1973 ) -> list[Track]:
1974 """Call the registered music providers for dynamic tracks."""
1975 queue = self._queues[queue_id]
1976 queue_track_items: list[Track] = [
1977 q.media_item
1978 for q in self._queue_items[queue_id]
1979 if q.media_item and isinstance(q.media_item, Track)
1980 ]
1981 if not queue.radio_source:
1982 # this may happen during race conditions as this method is called delayed
1983 return []
1984 self.logger.info(
1985 "Fetching radio tracks for queue %s based on: %s",
1986 queue.display_name,
1987 ", ".join([x.name for x in queue.radio_source]),
1988 )
1989
1990 # Get user's preferred provider instances for steering provider selection
1991 preferred_provider_instances: list[str] | None = None
1992 if (
1993 queue.userid
1994 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
1995 and playback_user.provider_filter
1996 ):
1997 preferred_provider_instances = playback_user.provider_filter
1998
1999 available_base_tracks: list[Track] = []
2000 base_track_sample_size = 5
2001 # Some providers have very deterministic similar track algorithms when providing
2002 # a single track item. When we have a radio mode based on 1 track and we have to
2003 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2004 if (
2005 len(queue.radio_source) == 1
2006 and queue.radio_source[0].media_type == MediaType.TRACK
2007 and not is_initial_radio_mode
2008 ):
2009 available_base_tracks = queue_track_items
2010 else:
2011 # Grab all the available base tracks based on the selected source items.
2012 # shuffle the source items, just in case
2013 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2014 ctrl = self.mass.music.get_controller(radio_item.media_type)
2015 try:
2016 available_base_tracks += [
2017 track
2018 for track in await ctrl.radio_mode_base_tracks(
2019 radio_item, # type: ignore[arg-type]
2020 preferred_provider_instances,
2021 )
2022 # Avoid duplicate base tracks
2023 if track not in available_base_tracks
2024 ]
2025 except UnsupportedFeaturedException as err:
2026 self.logger.debug(
2027 "Skip loading radio items for %s: %s ",
2028 radio_item.uri,
2029 str(err),
2030 )
2031 if not available_base_tracks:
2032 raise UnsupportedFeaturedException("Radio mode not available for source items")
2033
2034 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2035 base_tracks = random.sample(
2036 available_base_tracks,
2037 min(base_track_sample_size, len(available_base_tracks)),
2038 )
2039 # Use a set to avoid duplicate dynamic tracks
2040 dynamic_tracks: set[Track] = set()
2041 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2042 for allow_lookup in (False, True):
2043 if dynamic_tracks:
2044 break
2045 for base_track in base_tracks:
2046 try:
2047 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2048 base_track.item_id,
2049 base_track.provider,
2050 allow_lookup=allow_lookup,
2051 preferred_provider_instances=preferred_provider_instances,
2052 )
2053 except MediaNotFoundError:
2054 # Some providers don't have similar tracks for all items. For example,
2055 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2056 # in that case, just skip the track.
2057 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2058 continue
2059 for track in _similar_tracks:
2060 if (
2061 track not in base_tracks
2062 # Exclude tracks we have already played / queued
2063 and track not in queue_track_items
2064 # Ignore tracks that are too long for radio mode, e.g. mixes
2065 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2066 ):
2067 dynamic_tracks.add(track)
2068 if len(dynamic_tracks) >= 50:
2069 break
2070 queue_tracks: list[Track] = []
2071 dynamic_tracks_list = list(dynamic_tracks)
2072 # Only include the sampled base tracks when the radio mode is first initialized
2073 if is_initial_radio_mode:
2074 queue_tracks += [base_tracks[0]]
2075 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2076 if len(base_tracks) > 1:
2077 for base_track in base_tracks[1:]:
2078 queue_tracks += [base_track]
2079 if len(dynamic_tracks_list) > 2:
2080 queue_tracks += random.sample(dynamic_tracks_list, 2)
2081 else:
2082 queue_tracks += dynamic_tracks_list
2083 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2084 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2085 if remaining_dynamic_tracks:
2086 queue_tracks += random.sample(
2087 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2088 )
2089 return queue_tracks
2090
2091 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2092 """Fetch (playable) tracks for given browse folder."""
2093 self.logger.info(
2094 "Fetching tracks to play for folder %s",
2095 folder.name,
2096 )
2097 tracks: list[Track] = []
2098 for item in await self.mass.music.browse(folder.path):
2099 if not item.is_playable:
2100 continue
2101 # recursively fetch tracks from all media types
2102 resolved = await self._resolve_media_items(item)
2103 tracks += [x for x in resolved if isinstance(x, Track)]
2104
2105 return tracks
2106
2107 def _update_queue_from_player(
2108 self,
2109 player: Player,
2110 ) -> None:
2111 """Update the Queue when the player state changed."""
2112 queue_id = player.player_id
2113 queue = self._queues[queue_id]
2114
2115 # basic properties
2116 queue.display_name = player.display_name
2117 queue.available = player.available
2118 queue.items = len(self._queue_items[queue_id])
2119
2120 queue.state = (
2121 player.playback_state or PlaybackState.IDLE if queue.active else PlaybackState.IDLE
2122 )
2123 # update current item/index from player report
2124 if queue.active and queue.state in (
2125 PlaybackState.PLAYING,
2126 PlaybackState.PAUSED,
2127 ):
2128 # NOTE: If the queue is not playing (yet) we will not update the current index
2129 # to ensure we keep the previously known current index
2130 if queue.flow_mode:
2131 # flow mode active, the player is playing one long stream
2132 # so we need to calculate the current index and elapsed time
2133 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2134 elif item_id := self._parse_player_current_item_id(queue_id, player):
2135 # normal mode, the player itself will report the current item
2136 elapsed_time = int(player.corrected_elapsed_time or 0)
2137 current_index = self.index_by_id(queue_id, item_id)
2138 else:
2139 # this may happen if the player is still transitioning between tracks
2140 # we ignore this for now and keep the current index as is
2141 return
2142
2143 # get current/next item based on current index
2144 queue.current_index = current_index
2145 queue.current_item = current_item = self.get_item(queue_id, current_index)
2146 queue.next_item = (
2147 self.get_next_item(queue_id, current_index)
2148 if current_item and current_index is not None
2149 else None
2150 )
2151
2152 # correct elapsed time when seeking
2153 if (
2154 not queue.flow_mode
2155 and current_item
2156 and current_item.streamdetails
2157 and current_item.streamdetails.seek_position
2158 ):
2159 elapsed_time += current_item.streamdetails.seek_position
2160 queue.elapsed_time = elapsed_time
2161 queue.elapsed_time_last_updated = time.time()
2162
2163 elif not queue.current_item and queue.current_index is not None:
2164 current_index = queue.current_index
2165 queue.current_item = current_item = self.get_item(queue_id, current_index)
2166 queue.next_item = (
2167 self.get_next_item(queue_id, current_index)
2168 if current_item and current_index is not None
2169 else None
2170 )
2171
2172 # This is enough to detect any changes in the DSPDetails
2173 # (so child count changed, or any output format changed)
2174 output_formats = []
2175 if output_format := player.extra_data.get("output_format"):
2176 output_formats.append(str(output_format))
2177 for child_id in player.group_members:
2178 if (child := self.mass.players.get(child_id)) and (
2179 output_format := child.extra_data.get("output_format")
2180 ):
2181 output_formats.append(str(output_format))
2182 else:
2183 output_formats.append("unknown")
2184
2185 # basic throttle: do not send state changed events if queue did not actually change
2186 prev_state: CompareState = self._prev_states.get(
2187 queue_id,
2188 CompareState(
2189 queue_id=queue_id,
2190 state=PlaybackState.IDLE,
2191 current_item_id=None,
2192 next_item_id=None,
2193 current_item=None,
2194 elapsed_time=0,
2195 last_playing_elapsed_time=0,
2196 stream_title=None,
2197 codec_type=None,
2198 output_formats=None,
2199 ),
2200 )
2201 # update last_playing_elapsed_time only when the player is actively playing
2202 # use corrected_elapsed_time which accounts for time since last update
2203 # this preserves the last known elapsed time when transitioning to idle/paused
2204 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2205 prev_item_id = prev_state["current_item_id"]
2206 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2207 if queue.state == PlaybackState.PLAYING:
2208 current_elapsed = int(queue.corrected_elapsed_time)
2209 if current_item_id != prev_item_id:
2210 # new track started, reset the elapsed time tracker
2211 last_playing_elapsed_time = current_elapsed
2212 else:
2213 # same track, use the max of current and previous to handle timing issues
2214 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2215 else:
2216 last_playing_elapsed_time = prev_playing_elapsed
2217 new_state = CompareState(
2218 queue_id=queue_id,
2219 state=queue.state,
2220 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2221 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2222 current_item=queue.current_item,
2223 elapsed_time=int(queue.elapsed_time),
2224 last_playing_elapsed_time=last_playing_elapsed_time,
2225 stream_title=(
2226 queue.current_item.streamdetails.stream_title
2227 if queue.current_item and queue.current_item.streamdetails
2228 else None
2229 ),
2230 codec_type=(
2231 queue.current_item.streamdetails.audio_format.codec_type
2232 if queue.current_item and queue.current_item.streamdetails
2233 else None
2234 ),
2235 output_formats=output_formats,
2236 )
2237 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2238 with suppress(KeyError):
2239 changed_keys.remove("next_item_id")
2240 with suppress(KeyError):
2241 changed_keys.remove("last_playing_elapsed_time")
2242
2243 # store the new state
2244 if queue.active:
2245 self._prev_states[queue_id] = new_state
2246 else:
2247 self._prev_states.pop(queue_id, None)
2248
2249 # return early if nothing changed
2250 if len(changed_keys) == 0:
2251 return
2252
2253 # signal update and store state
2254 send_update = True
2255 if changed_keys == {"elapsed_time"}:
2256 # only elapsed time changed, do not send full queue update
2257 send_update = False
2258 prev_time = prev_state.get("elapsed_time") or 0
2259 cur_time = new_state.get("elapsed_time") or 0
2260 if abs(cur_time - prev_time) > 2:
2261 # send dedicated event for time updates when seeking
2262 self.mass.signal_event(
2263 EventType.QUEUE_TIME_UPDATED,
2264 object_id=queue_id,
2265 data=queue.elapsed_time,
2266 )
2267 # also signal update to the player itself so it can update its current_media
2268 self.mass.players.trigger_player_update(queue_id)
2269
2270 if send_update:
2271 self.signal_update(queue_id)
2272
2273 if "output_formats" in changed_keys:
2274 # refresh DSP details since they may have changed
2275 dsp = get_stream_dsp_details(self.mass, queue_id)
2276 if queue.current_item and queue.current_item.streamdetails:
2277 queue.current_item.streamdetails.dsp = dsp
2278 if queue.next_item and queue.next_item.streamdetails:
2279 queue.next_item.streamdetails.dsp = dsp
2280
2281 # handle updating stream_metadata if needed
2282 if (
2283 queue.current_item
2284 and (streamdetails := queue.current_item.streamdetails)
2285 and streamdetails.stream_metadata_update_callback
2286 and (
2287 streamdetails.stream_metadata_last_updated is None
2288 or (
2289 time.time() - streamdetails.stream_metadata_last_updated
2290 >= streamdetails.stream_metadata_update_interval
2291 )
2292 )
2293 ):
2294 streamdetails.stream_metadata_last_updated = time.time()
2295 self.mass.create_task(
2296 streamdetails.stream_metadata_update_callback(
2297 streamdetails, int(queue.corrected_elapsed_time)
2298 )
2299 )
2300
2301 # handle sending a playback progress report
2302 # we do this every 30 seconds or when the state changes
2303 if (
2304 changed_keys.intersection({"state", "current_item_id"})
2305 or int(queue.elapsed_time) % 30 == 0
2306 ):
2307 self._handle_playback_progress_report(queue, prev_state, new_state)
2308
2309 # check if we need to clear the queue if we reached the end
2310 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2311 self._handle_end_of_queue(queue, prev_state, new_state)
2312
2313 # watch dynamic radio items refill if needed
2314 if "current_item_id" in changed_keys:
2315 # auto enable radio mode if dont stop the music is enabled
2316 if (
2317 queue.dont_stop_the_music_enabled
2318 and queue.enqueued_media_items
2319 and queue.current_index is not None
2320 and (queue.items - queue.current_index) <= 1
2321 ):
2322 # We have received the last item in the queue and Don't stop the music is enabled
2323 # set the played media item(s) as radio items (which will refill the queue)
2324 # note that this will fail if there are no media items for which we have
2325 # a dynamic radio source.
2326 self.logger.debug(
2327 "End of queue detected and Don't stop the music is enabled for %s"
2328 " - setting enqueued media items as radio source: %s",
2329 queue.display_name,
2330 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2331 )
2332 queue.radio_source = queue.enqueued_media_items
2333 # auto fill radio tracks if less than 5 tracks left in the queue
2334 if (
2335 queue.radio_source
2336 and queue.current_index is not None
2337 and (queue.items - queue.current_index) < 5
2338 ):
2339 task_id = f"fill_radio_tracks_{queue_id}"
2340 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2341
2342 def _get_flow_queue_stream_index(
2343 self, queue: PlayerQueue, player: Player
2344 ) -> tuple[int | None, int]:
2345 """Calculate current queue index and current track elapsed time when flow mode is active."""
2346 elapsed_time_queue_total = player.corrected_elapsed_time or 0
2347 if queue.current_index is None and not queue.flow_mode_stream_log:
2348 return queue.current_index, int(queue.elapsed_time)
2349
2350 # For each track that has been streamed/buffered to the player,
2351 # a playlog entry will be created with the queue item id
2352 # and the amount of seconds streamed. We traverse the playlog to figure
2353 # out where we are in the queue, accounting for actual streamed
2354 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2355 # it will simply be in the playlog multiple times.
2356 played_time = 0.0
2357 queue_index: int | None = queue.current_index or 0
2358 track_time = 0.0
2359 for play_log_entry in queue.flow_mode_stream_log:
2360 queue_item_duration = (
2361 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2362 play_log_entry.seconds_streamed
2363 if play_log_entry.seconds_streamed is not None
2364 else play_log_entry.duration or 3600 * 24 * 7
2365 )
2366 if elapsed_time_queue_total > (queue_item_duration + played_time):
2367 # total elapsed time is more than (streamed) track duration
2368 # this track has been fully played, move on.
2369 played_time += queue_item_duration
2370 else:
2371 # no more seconds left to divide, this is our track
2372 # account for any seeking by adding the skipped/seeked seconds
2373 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2374 queue_item = self.get_item(queue.queue_id, queue_index)
2375 if queue_item and queue_item.streamdetails:
2376 track_sec_skipped = queue_item.streamdetails.seek_position
2377 else:
2378 track_sec_skipped = 0
2379 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2380 break
2381 if player.playback_state != PlaybackState.PLAYING:
2382 # if the player is not playing, we can't be sure that the elapsed time is correct
2383 # so we just return the queue index and the elapsed time
2384 return queue.current_index, int(queue.elapsed_time)
2385 return queue_index, int(track_time)
2386
2387 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2388 """Parse QueueItem ID from Player's current url."""
2389 if not player._current_media:
2390 # YES, we use player._current_media on purpose here because we need the raw metadata
2391 return None
2392 # prefer queue_id and queue_item_id within the current media
2393 if player._current_media.source_id == queue_id and player._current_media.queue_item_id:
2394 return player._current_media.queue_item_id
2395 # special case for sonos players
2396 if player._current_media.uri and player._current_media.uri.startswith(f"mass:{queue_id}"):
2397 if player._current_media.queue_item_id:
2398 return player._current_media.queue_item_id
2399 return player._current_media.uri.split(":")[-1]
2400 # try to extract the item id from a mass stream url
2401 if (
2402 player._current_media.uri
2403 and queue_id in player._current_media.uri
2404 and self.mass.streams.base_url in player._current_media.uri
2405 ):
2406 current_item_id = player._current_media.uri.rsplit("/")[-1].split(".")[0]
2407 if self.get_item(queue_id, current_item_id):
2408 return current_item_id
2409 # try to extract the item id from a queue_id/item_id combi
2410 if (
2411 player._current_media.uri
2412 and queue_id in player._current_media.uri
2413 and "/" in player._current_media.uri
2414 ):
2415 current_item_id = player._current_media.uri.split("/")[1]
2416 if self.get_item(queue_id, current_item_id):
2417 return current_item_id
2418
2419 return None
2420
2421 def _handle_end_of_queue(
2422 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2423 ) -> None:
2424 """Check if the queue should be cleared after the current item."""
2425 # check if queue state changed to stopped (from playing/paused to idle)
2426 if not (
2427 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2428 and new_state["state"] == PlaybackState.IDLE
2429 ):
2430 return
2431 # check if no more items in the queue (next_item should be None at end of queue)
2432 if queue.next_item is not None:
2433 return
2434 # check if we had a previous item playing
2435 if prev_state["current_item_id"] is None:
2436 return
2437
2438 async def _clear_queue_delayed() -> None:
2439 for _ in range(5):
2440 await asyncio.sleep(1)
2441 if queue.state != PlaybackState.IDLE:
2442 return
2443 if queue.next_item is not None:
2444 return
2445 self.logger.info("End of queue reached, clearing items")
2446 self.clear(queue.queue_id)
2447
2448 # all checks passed, we stopped playback at the last (or single) track of the queue
2449 # now determine if the item was fully played before clearing
2450
2451 # For flow mode, check if the last track was fully streamed using the stream log
2452 # This is more reliable than elapsed_time which can be reset/incorrect
2453 if queue.flow_mode and queue.flow_mode_stream_log:
2454 last_log_entry = queue.flow_mode_stream_log[-1]
2455 if last_log_entry.seconds_streamed is not None:
2456 # The last track finished streaming, safe to clear queue
2457 self.mass.create_task(_clear_queue_delayed())
2458 return
2459
2460 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2461 prev_item = prev_state["current_item"]
2462 if prev_item and (streamdetails := prev_item.streamdetails):
2463 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2464 elif prev_item:
2465 duration = prev_item.duration or 24 * 3600
2466 else:
2467 # No current item means player has already cleared it, safe to clear queue
2468 self.mass.create_task(_clear_queue_delayed())
2469 return
2470
2471 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2472 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2473 seconds_played = int(prev_state["last_playing_elapsed_time"])
2474 # debounce this a bit to make sure we're not clearing the queue by accident
2475 # only clear if the last track was played to near completion (within 5 seconds of end)
2476 if seconds_played >= (duration or 3600) - 5:
2477 self.mass.create_task(_clear_queue_delayed())
2478
2479 def _handle_playback_progress_report(
2480 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2481 ) -> None:
2482 """Handle playback progress report."""
2483 # detect change in current index to report that a item has been played
2484 prev_item_id = prev_state["current_item_id"]
2485 cur_item_id = new_state["current_item_id"]
2486 if prev_item_id is None and cur_item_id is None:
2487 return
2488
2489 if prev_item_id is not None and prev_item_id != cur_item_id:
2490 # we have a new item, so we need report the previous one
2491 is_current_item = False
2492 item_to_report = prev_state["current_item"]
2493 seconds_played = int(prev_state["elapsed_time"])
2494 else:
2495 # report on current item
2496 is_current_item = True
2497 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2498 seconds_played = int(new_state["elapsed_time"])
2499
2500 if not item_to_report:
2501 return # guard against invalid items
2502
2503 if not (media_item := item_to_report.media_item):
2504 # only report on media items
2505 return
2506 assert media_item.uri is not None # uri is set in __post_init__
2507
2508 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2509 # Ignore items that had a stream error
2510 return
2511
2512 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2513 duration = int(item_to_report.streamdetails.duration)
2514 else:
2515 duration = int(item_to_report.duration or 3 * 3600)
2516
2517 if seconds_played < 5:
2518 # ignore items that have been played less than 5 seconds
2519 # this also filters out a bounce effect where the previous item
2520 # gets reported with 0 elapsed seconds after a new item starts playing
2521 return
2522
2523 # determine if item is fully played
2524 # for podcasts and audiobooks we account for the last 60 seconds
2525 percentage_played = percentage(seconds_played, duration)
2526 if not is_current_item and item_to_report.media_type in (
2527 MediaType.AUDIOBOOK,
2528 MediaType.PODCAST_EPISODE,
2529 ):
2530 fully_played = seconds_played >= duration - 60
2531 elif not is_current_item:
2532 # 90% of the track must be played to be considered fully played
2533 fully_played = percentage_played >= 90
2534 else:
2535 fully_played = seconds_played >= duration - 10
2536
2537 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2538 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2539 self.logger.debug(
2540 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2541 queue.display_name,
2542 "is playing" if is_playing else "played",
2543 item_to_report.name,
2544 item_to_report.uri,
2545 fully_played,
2546 f"{percentage_played}%",
2547 seconds_played,
2548 duration,
2549 )
2550 # add entry to playlog - this also handles resume of podcasts/audiobooks
2551 self.mass.create_task(
2552 self.mass.music.mark_item_played(
2553 media_item,
2554 fully_played=fully_played,
2555 seconds_played=seconds_played,
2556 is_playing=is_playing,
2557 userid=queue.userid,
2558 queue_id=queue.queue_id,
2559 user_initiated=False,
2560 )
2561 )
2562
2563 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2564 # signal 'media item played' event,
2565 # which is useful for plugins that want to do scrobbling
2566 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2567 artists_names = [a.name for a in artists]
2568 self.mass.signal_event(
2569 EventType.MEDIA_ITEM_PLAYED,
2570 object_id=media_item.uri,
2571 data=MediaItemPlaybackProgressReport(
2572 uri=media_item.uri,
2573 media_type=media_item.media_type,
2574 name=media_item.name,
2575 version=getattr(media_item, "version", None),
2576 artist=(
2577 getattr(media_item, "artist_str", None) or artists_names[0]
2578 if artists_names
2579 else None
2580 ),
2581 artists=artists_names,
2582 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2583 album=album.name if album else None,
2584 album_mbid=album.mbid if album else None,
2585 album_artist=(album.artist_str if isinstance(album, Album) else None),
2586 album_artist_mbids=(
2587 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2588 ),
2589 image_url=(
2590 self.mass.metadata.get_image_url(
2591 item_to_report.media_item.image, prefer_proxy=False
2592 )
2593 if item_to_report.media_item.image
2594 else None
2595 ),
2596 duration=duration,
2597 mbid=(getattr(media_item, "mbid", None)),
2598 seconds_played=seconds_played,
2599 fully_played=fully_played,
2600 is_playing=is_playing,
2601 userid=queue.userid,
2602 ),
2603 )
2604
2605
2606async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2607 """Shuffle queue items, avoiding identical tracks next to each other.
2608
2609 Best-effort approach to prevent the same track from appearing adjacent.
2610 Does a random shuffle first, then makes a limited number of passes to
2611 swap adjacent duplicates with a random item further in the list.
2612
2613 :param items: List of queue items to shuffle.
2614 """
2615 if len(items) <= 2:
2616 return random.sample(items, len(items)) if len(items) == 2 else items
2617
2618 # Start with a random shuffle
2619 shuffled = random.sample(items, len(items))
2620
2621 # Make a few passes to fix adjacent duplicates
2622 max_passes = 3
2623 for _ in range(max_passes):
2624 swapped = False
2625 for i in range(len(shuffled) - 1):
2626 if shuffled[i].name == shuffled[i + 1].name:
2627 # Found adjacent duplicate - swap with random position at least 2 away
2628 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2629 if swap_candidates:
2630 swap_pos = random.choice(swap_candidates)
2631 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2632 swapped = True
2633 if not swapped:
2634 break
2635 # Yield to event loop between passes
2636 await asyncio.sleep(0)
2637
2638 return shuffled
2639