/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 ProviderFeature,
32 QueueOption,
33 RepeatMode,
34)
35from music_assistant_models.errors import (
36 AudioError,
37 InvalidCommand,
38 InvalidDataError,
39 MediaNotFoundError,
40 MusicAssistantError,
41 PlayerUnavailableError,
42 QueueEmpty,
43 UnsupportedFeaturedException,
44)
45from music_assistant_models.media_items import (
46 Album,
47 Artist,
48 Audiobook,
49 BrowseFolder,
50 ItemMapping,
51 MediaItemType,
52 PlayableMediaItemType,
53 Playlist,
54 Podcast,
55 PodcastEpisode,
56 Track,
57 UniqueList,
58 media_from_dict,
59)
60from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
61from music_assistant_models.player_queue import PlayerQueue
62from music_assistant_models.queue_item import QueueItem
63
64from music_assistant.constants import (
65 ATTR_ANNOUNCEMENT_IN_PROGRESS,
66 MASS_LOGO_ONLINE,
67 PLAYBACK_REPORT_INTERVAL_SECONDS,
68 PLAYLIST_MEDIA_TYPES,
69 VERBOSE_LOG_LEVEL,
70 PlaylistPlayableItem,
71)
72from music_assistant.controllers.players.controller import IN_QUEUE_COMMAND
73from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
74from music_assistant.helpers.api import api_command
75from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
76from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
77from music_assistant.helpers.util import get_changed_keys, percentage
78from music_assistant.models.core_controller import CoreController
79from music_assistant.models.player import Player, PlayerMedia
80
81if TYPE_CHECKING:
82 from collections.abc import Iterator
83
84 from music_assistant_models.auth import User
85 from music_assistant_models.media_items.metadata import MediaItemImage
86
87 from music_assistant import MusicAssistant
88 from music_assistant.models.player import Player
89
90CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
91CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
92
93ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
94ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
95
96CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
97CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
98CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
99CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
100CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
101CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
102CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
103CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
104CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
105CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
106RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
107CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
108CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
109
110
111class CompareState(TypedDict):
112 """Simple object where we store the (previous) state of a queue.
113
114 Used for compare actions.
115 """
116
117 queue_id: str
118 state: PlaybackState
119 current_item_id: str | None
120 next_item_id: str | None
121 current_item: QueueItem | None
122 elapsed_time: int
123 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
124 # used to determine if a track was fully played when transitioning to idle
125 last_playing_elapsed_time: int
126 stream_title: str | None
127 codec_type: ContentType | None
128 output_formats: list[str] | None
129
130
131class PlayerQueuesController(CoreController):
132 """Controller holding all logic to enqueue music for players."""
133
134 domain: str = "player_queues"
135
136 def __init__(self, mass: MusicAssistant) -> None:
137 """Initialize core controller."""
138 super().__init__(mass)
139 self._queues: dict[str, PlayerQueue] = {}
140 self._queue_items: dict[str, list[QueueItem]] = {}
141 self._prev_states: dict[str, CompareState] = {}
142 self._transitioning_players: set[str] = set()
143 self.manifest.name = "Player Queues controller"
144 self.manifest.description = (
145 "Music Assistant's core controller which manages the queues for all players."
146 )
147 self.manifest.icon = "playlist-music"
148
149 async def close(self) -> None:
150 """Cleanup on exit."""
151 # stop all playback
152 for queue in self.all():
153 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
154 await self.stop(queue.queue_id)
155
156 async def get_config_entries(
157 self,
158 action: str | None = None,
159 values: dict[str, ConfigValueType] | None = None,
160 ) -> tuple[ConfigEntry, ...]:
161 """Return all Config Entries for this core module (if any)."""
162 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
163 return (
164 ConfigEntry(
165 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
166 type=ConfigEntryType.STRING,
167 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
168 label="Items to select when you play a (in-library) artist.",
169 options=[
170 ConfigValueOption(
171 title="Only in-library tracks",
172 value="library_tracks",
173 ),
174 ConfigValueOption(
175 title="All tracks from all albums in the library",
176 value="library_album_tracks",
177 ),
178 ConfigValueOption(
179 title="All (top) tracks from (all) streaming provider(s)",
180 value="all_tracks",
181 ),
182 ConfigValueOption(
183 title="All tracks from all albums from (all) streaming provider(s)",
184 value="all_album_tracks",
185 ),
186 ],
187 ),
188 ConfigEntry(
189 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
190 type=ConfigEntryType.STRING,
191 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
192 label="Items to select when you play a (in-library) album.",
193 options=[
194 ConfigValueOption(
195 title="Only in-library tracks",
196 value="library_tracks",
197 ),
198 ConfigValueOption(
199 title="All tracks for album on (streaming) provider",
200 value="all_tracks",
201 ),
202 ],
203 ),
204 ConfigEntry(
205 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
206 type=ConfigEntryType.STRING,
207 default_value=QueueOption.REPLACE.value,
208 label="Default enqueue option for Artist item(s).",
209 options=enqueue_options,
210 description="Define the default enqueue action for this mediatype.",
211 ),
212 ConfigEntry(
213 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
214 type=ConfigEntryType.STRING,
215 default_value=QueueOption.REPLACE.value,
216 label="Default enqueue option for Album item(s).",
217 options=enqueue_options,
218 description="Define the default enqueue action for this mediatype.",
219 ),
220 ConfigEntry(
221 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
222 type=ConfigEntryType.STRING,
223 default_value=QueueOption.PLAY.value,
224 label="Default enqueue option for Track item(s).",
225 options=enqueue_options,
226 description="Define the default enqueue action for this mediatype.",
227 ),
228 ConfigEntry(
229 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
230 type=ConfigEntryType.STRING,
231 default_value=QueueOption.REPLACE.value,
232 label="Default enqueue option for Radio item(s).",
233 options=enqueue_options,
234 description="Define the default enqueue action for this mediatype.",
235 ),
236 ConfigEntry(
237 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
238 type=ConfigEntryType.STRING,
239 default_value=QueueOption.REPLACE.value,
240 label="Default enqueue option for Playlist item(s).",
241 options=enqueue_options,
242 description="Define the default enqueue action for this mediatype.",
243 ),
244 ConfigEntry(
245 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
246 type=ConfigEntryType.STRING,
247 default_value=QueueOption.REPLACE.value,
248 label="Default enqueue option for Audiobook item(s).",
249 options=enqueue_options,
250 hidden=True,
251 ),
252 ConfigEntry(
253 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
254 type=ConfigEntryType.STRING,
255 default_value=QueueOption.REPLACE.value,
256 label="Default enqueue option for Podcast item(s).",
257 options=enqueue_options,
258 hidden=True,
259 ),
260 ConfigEntry(
261 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
262 type=ConfigEntryType.STRING,
263 default_value=QueueOption.REPLACE.value,
264 label="Default enqueue option for Podcast-episode item(s).",
265 options=enqueue_options,
266 hidden=True,
267 ),
268 ConfigEntry(
269 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
270 type=ConfigEntryType.STRING,
271 default_value=QueueOption.REPLACE.value,
272 label="Default enqueue option for Folder item(s).",
273 options=enqueue_options,
274 hidden=True,
275 ),
276 )
277
278 def __iter__(self) -> Iterator[PlayerQueue]:
279 """Iterate over (available) players."""
280 return iter(self._queues.values())
281
282 @api_command("player_queues/all")
283 def all(self) -> tuple[PlayerQueue, ...]:
284 """Return all registered PlayerQueues."""
285 return tuple(self._queues.values())
286
287 @api_command("player_queues/get")
288 def get(self, queue_id: str) -> PlayerQueue | None:
289 """Return PlayerQueue by queue_id or None if not found."""
290 return self._queues.get(queue_id)
291
292 @api_command("player_queues/items")
293 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
294 """Return all QueueItems for given PlayerQueue."""
295 if queue_id not in self._queue_items:
296 return []
297
298 return self._queue_items[queue_id][offset : offset + limit]
299
300 @api_command("player_queues/get_active_queue")
301 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
302 """Return the current active/synced queue for a player."""
303 if player := self.mass.players.get_player(player_id):
304 return self.mass.players.get_active_queue(player)
305 return None
306
307 # Queue commands
308
309 @api_command("player_queues/shuffle")
310 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
311 """Configure shuffle setting on the the queue."""
312 queue = self._queues[queue_id]
313 if queue.shuffle_enabled == shuffle_enabled:
314 return # no change
315 queue.shuffle_enabled = shuffle_enabled
316 queue_items = self._queue_items[queue_id]
317 cur_index = (
318 queue.index_in_buffer if queue.index_in_buffer is not None else queue.current_index
319 )
320 if cur_index is not None:
321 next_index = cur_index + 1
322 next_items = queue_items[next_index:]
323 else:
324 next_items = []
325 next_index = 0
326 if not shuffle_enabled:
327 # shuffle disabled, try to restore original sort order of the remaining items
328 next_items.sort(key=lambda x: x.sort_index, reverse=False)
329 await self.load(
330 queue_id=queue_id,
331 queue_items=next_items,
332 insert_at_index=next_index,
333 keep_remaining=False,
334 shuffle=shuffle_enabled,
335 )
336
337 @api_command("player_queues/dont_stop_the_music")
338 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
339 """Configure Don't stop the music setting on the queue."""
340 providers_available_with_similar_tracks = any(
341 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
342 for provider in self.mass.music.providers
343 )
344 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
345 raise UnsupportedFeaturedException(
346 "Don't stop the music is not supported by any of the available music providers"
347 )
348 queue = self._queues[queue_id]
349 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
350 self.signal_update(queue_id=queue_id)
351 # if this happens to be the last track in the queue, fill the radio source
352 if (
353 queue.dont_stop_the_music_enabled
354 and queue.enqueued_media_items
355 and queue.current_index is not None
356 and (queue.items - queue.current_index) <= 1
357 ):
358 queue.radio_source = queue.enqueued_media_items
359 task_id = f"fill_radio_tracks_{queue_id}"
360 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
361
362 @api_command("player_queues/repeat")
363 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
364 """Configure repeat setting on the the queue."""
365 queue = self._queues[queue_id]
366 if queue.repeat_mode == repeat_mode:
367 return # no change
368 queue.repeat_mode = repeat_mode
369 self.signal_update(queue_id)
370 if (
371 queue.state == PlaybackState.PLAYING
372 and queue.index_in_buffer is not None
373 and queue.index_in_buffer == queue.current_index
374 ):
375 # if the queue is playing,
376 # ensure to (re)queue the next track because it might have changed
377 # note that we only do this if the player has loaded the current track
378 # if not, we wait until it has loaded to prevent conflicts
379 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
380 self._enqueue_next_item(queue_id, next_item)
381
382 @api_command("player_queues/play_media")
383 async def play_media(
384 self,
385 queue_id: str,
386 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
387 option: QueueOption | None = None,
388 radio_mode: bool = False,
389 start_item: PlayableMediaItemType | str | None = None,
390 username: str | None = None,
391 ) -> None:
392 """Play media item(s) on the given queue.
393
394 :param queue_id: The queue_id of the queue to play media on.
395 :param media: Media that should be played (MediaItem(s) and/or uri's).
396 :param option: Which enqueue mode to use.
397 :param radio_mode: Enable radio mode for the given item(s).
398 :param start_item: Optional item to start the playlist or album from.
399 :param username: The username of the user requesting the playback.
400 Setting the username allows for overriding the logged-in user
401 to account for playback history per user when the play_media is
402 called from a shared context (like a web hook or automation).
403 """
404 # ruff: noqa: PLR0915
405 # we use a contextvar to bypass the throttler for this asyncio task/context
406 # this makes sure that playback has priority over other requests that may be
407 # happening in the background
408 BYPASS_THROTTLER.set(True)
409 if not (queue := self.get(queue_id)):
410 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
411 # always fetch the underlying player so we can raise early if its not available
412 queue_player = self.mass.players.get_player(queue_id, True)
413 assert queue_player is not None # for type checking
414 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
415 self.logger.warning("Ignore queue command: An announcement is in progress")
416 return
417
418 # save the user requesting the playback
419 playback_user: User | None
420 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
421 playback_user = user
422 else:
423 playback_user = get_current_user()
424 queue.userid = playback_user.user_id if playback_user else None
425
426 # a single item or list of items may be provided
427 media_list = media if isinstance(media, list) else [media]
428
429 # clear queue if needed
430 if option == QueueOption.REPLACE:
431 self.clear(queue_id)
432 # Clear the 'enqueued media item' list when a new queue is requested
433 if option not in (QueueOption.ADD, QueueOption.NEXT):
434 queue.enqueued_media_items.clear()
435
436 media_items: list[MediaItemType] = []
437 radio_source: list[MediaItemType] = []
438 # resolve all media items
439 for item in media_list:
440 try:
441 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
442 media_item: MediaItemType | ItemMapping | BrowseFolder
443 if isinstance(item, str):
444 media_item = await self.mass.music.get_item_by_uri(item)
445 elif isinstance(item, dict): # type: ignore[unreachable]
446 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
447 # converting them to MediaItem objects. The parse_value function in api.py
448 # should handle dict-to-object conversion, but dicts are slipping through
449 # in some cases. This is defensive handling for that parser bug.
450 media_item = media_from_dict(item) # type: ignore[unreachable]
451 self.logger.debug("Converted to: %s", type(media_item))
452 else:
453 # item is MediaItemType | ItemMapping at this point
454 media_item = item
455
456 # Save requested media item to play on the queue so we can use it as a source
457 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
458 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
459 if not isinstance(
460 media_item, (ItemMapping, BrowseFolder)
461 ) and media_item.media_type in (
462 MediaType.TRACK,
463 MediaType.ALBUM,
464 MediaType.PLAYLIST,
465 MediaType.ARTIST,
466 ):
467 queue.enqueued_media_items.append(media_item)
468 if len(queue.enqueued_media_items) > 10:
469 queue.enqueued_media_items.pop(0)
470
471 # handle default enqueue option if needed
472 if option is None:
473 config_value = await self.mass.config.get_core_config_value(
474 self.domain,
475 f"default_enqueue_option_{media_item.media_type.value}",
476 return_type=str,
477 )
478 option = QueueOption(config_value)
479 if option == QueueOption.REPLACE:
480 self.clear(queue_id, skip_stop=True)
481
482 # collect media_items to play
483 if radio_mode:
484 # Type guard for mypy - only add full MediaItemType to radio_source
485 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
486 radio_source.append(media_item)
487 else:
488 # Convert start_item to string URI if needed
489 start_item_uri: str | None = None
490 if isinstance(start_item, str):
491 start_item_uri = start_item
492 elif start_item is not None:
493 start_item_uri = start_item.uri
494 media_items += await self._resolve_media_items(
495 media_item, start_item_uri, queue_id=queue_id
496 )
497
498 except MusicAssistantError as err:
499 # invalid MA uri or item not found error
500 self.logger.warning("Skipping %s: %s", item, str(err))
501
502 # overwrite or append radio source items
503 if option not in (QueueOption.ADD, QueueOption.NEXT):
504 queue.radio_source = radio_source
505 else:
506 queue.radio_source += radio_source
507 # Use collected media items to calculate the radio if radio mode is on
508 if radio_mode:
509 radio_tracks = await self._get_radio_tracks(
510 queue_id=queue_id, is_initial_radio_mode=True
511 )
512 media_items = list(radio_tracks)
513
514 # only add valid/available items
515 queue_items: list[QueueItem] = []
516 for x in media_items:
517 if not x or not x.available:
518 continue
519 queue_items.append(
520 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
521 )
522
523 if not queue_items:
524 raise MediaNotFoundError("No playable items found")
525
526 # load the items into the queue
527 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
528 cur_index = (
529 queue.index_in_buffer
530 if queue.index_in_buffer is not None
531 else (queue.current_index if queue.current_index is not None else 0)
532 )
533 else:
534 cur_index = queue.current_index or 0
535 insert_at_index = cur_index + 1
536 # Radio modes are already shuffled in a pattern we would like to keep.
537 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
538
539 # handle replace: clear all items and replace with the new items
540 if option == QueueOption.REPLACE:
541 await self.load(
542 queue_id,
543 queue_items=queue_items,
544 keep_remaining=False,
545 keep_played=False,
546 shuffle=shuffle,
547 )
548 await self.play_index(queue_id, 0)
549 return
550 # handle next: add item(s) in the index next to the playing/loaded/buffered index
551 if option == QueueOption.NEXT:
552 await self.load(
553 queue_id,
554 queue_items=queue_items,
555 insert_at_index=insert_at_index,
556 shuffle=shuffle,
557 )
558 return
559 if option == QueueOption.REPLACE_NEXT:
560 await self.load(
561 queue_id,
562 queue_items=queue_items,
563 insert_at_index=insert_at_index,
564 keep_remaining=False,
565 shuffle=shuffle,
566 )
567 return
568 # handle play: replace current loaded/playing index with new item(s)
569 if option == QueueOption.PLAY:
570 await self.load(
571 queue_id,
572 queue_items=queue_items,
573 insert_at_index=insert_at_index,
574 shuffle=shuffle,
575 )
576 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
577 await self.play_index(queue_id, next_index)
578 return
579 # handle add: add/append item(s) to the remaining queue items
580 if option == QueueOption.ADD:
581 await self.load(
582 queue_id=queue_id,
583 queue_items=queue_items,
584 insert_at_index=insert_at_index
585 if queue.shuffle_enabled
586 else len(self._queue_items[queue_id]) + 1,
587 shuffle=queue.shuffle_enabled,
588 )
589 # handle edgecase, queue is empty and items are only added (not played)
590 # mark first item as new index
591 if queue.current_index is None:
592 queue.current_index = 0
593 queue.current_item = self.get_item(queue_id, 0)
594 queue.items = len(queue_items)
595 self.signal_update(queue_id)
596
597 @api_command("player_queues/move_item")
598 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
599 """
600 Move queue item x up/down the queue.
601
602 - queue_id: id of the queue to process this request.
603 - queue_item_id: the item_id of the queueitem that needs to be moved.
604 - pos_shift: move item x positions down if positive value
605 - pos_shift: move item x positions up if negative value
606 - pos_shift: move item to top of queue as next item if 0.
607 """
608 queue = self._queues[queue_id]
609 item_index = self.index_by_id(queue_id, queue_item_id)
610 if item_index is None:
611 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
612 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
613 msg = f"{item_index} is already played/buffered"
614 raise IndexError(msg)
615
616 queue_items = self._queue_items[queue_id]
617 queue_items = queue_items.copy()
618
619 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
620 new_index = (queue.current_index or 0) + 1
621 elif pos_shift == 0:
622 new_index = queue.current_index or 0
623 else:
624 new_index = item_index + pos_shift
625 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
626 return
627 # move the item in the list
628 queue_items.insert(new_index, queue_items.pop(item_index))
629 self.update_items(queue_id, queue_items)
630
631 @api_command("player_queues/move_item_end")
632 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
633 """
634 Move queue item to the end the queue.
635
636 - queue_id: id of the queue to process this request.
637 - queue_item_id: the item_id of the queueitem that needs to be moved.
638 """
639 queue = self._queues[queue_id]
640 item_index = self.index_by_id(queue_id, queue_item_id)
641 if item_index is None:
642 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
643 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
644 msg = f"{item_index} is already played/buffered"
645 raise IndexError(msg)
646
647 queue_items = self._queue_items[queue_id]
648 if item_index == (len(queue_items) - 1):
649 return
650 queue_items = queue_items.copy()
651
652 new_index = len(self._queue_items[queue_id]) - 1
653
654 # move the item in the list
655 queue_items.insert(new_index, queue_items.pop(item_index))
656 self.update_items(queue_id, queue_items)
657
658 @api_command("player_queues/delete_item")
659 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
660 """Delete item (by id or index) from the queue."""
661 if isinstance(item_id_or_index, str):
662 item_index = self.index_by_id(queue_id, item_id_or_index)
663 if item_index is None:
664 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
665 else:
666 item_index = item_id_or_index
667 queue = self._queues[queue_id]
668 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
669 # ignore request if track already loaded in the buffer
670 # the frontend should guard so this is just in case
671 self.logger.warning("delete requested for item already loaded in buffer")
672 return
673 queue_items = self._queue_items[queue_id]
674 queue_items.pop(item_index)
675 self.update_items(queue_id, queue_items)
676
677 @api_command("player_queues/clear")
678 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
679 """Clear all items in the queue."""
680 queue = self._queues[queue_id]
681 queue.radio_source = []
682 if queue.state != PlaybackState.IDLE and not skip_stop:
683 self.mass.create_task(self.stop(queue_id))
684 queue.current_index = None
685 queue.current_item = None
686 queue.elapsed_time = 0
687 queue.elapsed_time_last_updated = time.time()
688 queue.index_in_buffer = None
689 self.update_items(queue_id, [])
690
691 @api_command("player_queues/save_as_playlist")
692 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
693 """Save the current queue items as a new playlist.
694
695 :param queue_id: The queue_id of the queue to save.
696 :param name: The name for the new playlist.
697 """
698 if not self.get(queue_id):
699 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
700 queue_items = self._queue_items.get(queue_id, [])
701 if not queue_items:
702 raise QueueEmpty("Cannot save an empty queue as a playlist.")
703 # collect URIs from queue items that are playlist-compatible
704 uris: list[str] = []
705 for item in queue_items:
706 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
707 uris.append(item.uri)
708 if not uris:
709 raise InvalidDataError("No valid items in queue to save as playlist.")
710 playlist = await self.mass.music.playlists.create_playlist(name)
711 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
712 return playlist
713
714 @api_command("player_queues/stop")
715 async def stop(self, queue_id: str) -> None:
716 """
717 Handle STOP command for given queue.
718
719 - queue_id: queue_id of the playerqueue to handle the command.
720 """
721 queue_player = self.mass.players.get_player(queue_id, True)
722 if queue_player is None:
723 raise PlayerUnavailableError(f"Player {queue_id} is not available")
724 if (queue := self.get(queue_id)) and queue.active:
725 if queue.state == PlaybackState.PLAYING:
726 queue.resume_pos = int(queue.corrected_elapsed_time)
727 # Set context to prevent circular call, then forward the actual command to the player
728 token = IN_QUEUE_COMMAND.set(True)
729 try:
730 await self.mass.players.cmd_stop(queue_id)
731 finally:
732 IN_QUEUE_COMMAND.reset(token)
733
734 @api_command("player_queues/play")
735 async def play(self, queue_id: str) -> None:
736 """
737 Handle PLAY command for given queue.
738
739 - queue_id: queue_id of the playerqueue to handle the command.
740 """
741 queue_player = self.mass.players.get_player(queue_id, True)
742 if queue_player is None:
743 raise PlayerUnavailableError(f"Player {queue_id} is not available")
744 if (
745 (queue := self._queues.get(queue_id))
746 and queue.active
747 and queue.state == PlaybackState.PAUSED
748 ):
749 # forward the actual play/unpause command to the player
750 await queue_player.play()
751 return
752 # player is not paused, perform resume instead
753 await self.resume(queue_id)
754
755 @api_command("player_queues/pause")
756 async def pause(self, queue_id: str) -> None:
757 """Handle PAUSE command for given queue.
758
759 - queue_id: queue_id of the playerqueue to handle the command.
760 """
761 if not (queue := self._queues.get(queue_id)):
762 return
763 queue_active = queue.active
764 if queue.active and queue.state == PlaybackState.PLAYING:
765 queue.resume_pos = int(queue.corrected_elapsed_time)
766 # forward the actual command to the player controller
767 # Set context to prevent circular call, then forward the actual command to the player
768 token = IN_QUEUE_COMMAND.set(True)
769 try:
770 await self.mass.players.cmd_pause(queue_id)
771 finally:
772 IN_QUEUE_COMMAND.reset(token)
773
774 async def _watch_pause(player: Player) -> None:
775 count = 0
776 # wait for pause
777 while count < 5 and player.state.playback_state == PlaybackState.PLAYING:
778 count += 1
779 await asyncio.sleep(1)
780 # wait for unpause
781 if player.state.playback_state != PlaybackState.PAUSED:
782 return
783 count = 0
784 while count < 30 and player.state.playback_state == PlaybackState.PAUSED:
785 count += 1
786 await asyncio.sleep(1)
787 # if player is still paused when the limit is reached, send stop
788 if player.state.playback_state == PlaybackState.PAUSED:
789 await self.stop(queue_id)
790
791 # we auto stop a player from paused when its paused for 30 seconds
792 if (
793 queue_active
794 and (queue_player := self.mass.players.get_player(queue_id))
795 and not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS)
796 ):
797 self.mass.create_task(_watch_pause(queue_player))
798
799 @api_command("player_queues/play_pause")
800 async def play_pause(self, queue_id: str) -> None:
801 """Toggle play/pause on given playerqueue.
802
803 - queue_id: queue_id of the queue to handle the command.
804 """
805 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
806 await self.pause(queue_id)
807 return
808 await self.play(queue_id)
809
810 @api_command("player_queues/next")
811 async def next(self, queue_id: str) -> None:
812 """Handle NEXT TRACK command for given queue.
813
814 - queue_id: queue_id of the queue to handle the command.
815 """
816 if (queue := self.get(queue_id)) is None or not queue.active:
817 raise InvalidCommand(f"Queue {queue_id} is not active")
818 idx = self._queues[queue_id].current_index
819 if idx is None:
820 self.logger.warning("Queue %s has no current index", queue.display_name)
821 return
822 attempts = 5
823 while attempts:
824 try:
825 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
826 await self.play_index(queue_id, next_index, debounce=True)
827 break
828 except MediaNotFoundError:
829 self.logger.warning(
830 "Failed to fetch next track for queue %s - trying next item",
831 queue.display_name,
832 )
833 idx += 1
834 attempts -= 1
835
836 @api_command("player_queues/previous")
837 async def previous(self, queue_id: str) -> None:
838 """Handle PREVIOUS TRACK command for given queue.
839
840 - queue_id: queue_id of the queue to handle the command.
841 """
842 if (queue := self.get(queue_id)) is None or not queue.active:
843 raise InvalidCommand(f"Queue {queue_id} is not active")
844 current_index = self._queues[queue_id].current_index
845 if current_index is None:
846 return
847 next_index = int(current_index)
848 # restart current track if current track has played longer than 4
849 # otherwise skip to previous track
850 if self._queues[queue_id].elapsed_time < 5:
851 next_index = max(current_index - 1, 0)
852 await self.play_index(queue_id, next_index, debounce=True)
853
854 @api_command("player_queues/skip")
855 async def skip(self, queue_id: str, seconds: int = 10) -> None:
856 """Handle SKIP command for given queue.
857
858 - queue_id: queue_id of the queue to handle the command.
859 - seconds: number of seconds to skip in track. Use negative value to skip back.
860 """
861 if (queue := self.get(queue_id)) is None or not queue.active:
862 raise InvalidCommand(f"Queue {queue_id} is not active")
863 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
864
865 @api_command("player_queues/seek")
866 async def seek(self, queue_id: str, position: int = 10) -> None:
867 """Handle SEEK command for given queue.
868
869 - queue_id: queue_id of the queue to handle the command.
870 - position: position in seconds to seek to in the current playing item.
871 """
872 if (queue := self.get(queue_id)) is None or not queue.active:
873 raise InvalidCommand(f"Queue {queue_id} is not active")
874 queue_player = self.mass.players.get_player(queue_id, True)
875 if queue_player is None:
876 raise PlayerUnavailableError(f"Player {queue_id} is not available")
877 if not queue.current_item:
878 raise InvalidCommand(f"Queue {queue_player.state.name} has no item(s) loaded.")
879 if not queue.current_item.duration:
880 raise InvalidCommand("Can not seek items without duration.")
881 position = max(0, int(position))
882 if position > queue.current_item.duration:
883 raise InvalidCommand("Can not seek outside of duration range.")
884 if queue.current_index is None:
885 raise InvalidCommand(f"Queue {queue_player.state.name} has no current index.")
886 await self.play_index(queue_id, queue.current_index, seek_position=position)
887
888 @api_command("player_queues/resume")
889 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
890 """Handle RESUME command for given queue.
891
892 - queue_id: queue_id of the queue to handle the command.
893 """
894 queue = self._queues[queue_id]
895 queue_items = self._queue_items[queue_id]
896 resume_item = queue.current_item
897 if queue.state == PlaybackState.PLAYING:
898 # resume requested while already playing,
899 # use current position as resume position
900 resume_pos = queue.corrected_elapsed_time
901 fade_in = False
902 else:
903 resume_pos = queue.resume_pos or queue.elapsed_time
904
905 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
906 resume_item = self.get_item(queue_id, queue.current_index)
907 resume_pos = 0
908 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
909 # items available in queue but no previous track, start at 0
910 resume_item = self.get_item(queue_id, 0)
911 resume_pos = 0
912
913 if resume_item is not None:
914 queue_player = self.mass.players.get_player(queue_id)
915 if queue_player is None:
916 raise PlayerUnavailableError(f"Player {queue_id} is not available")
917 if (
918 fade_in is None
919 and queue_player.state.playback_state == PlaybackState.IDLE
920 and (time.time() - queue.elapsed_time_last_updated) > 60
921 ):
922 # enable fade in effect if the player is idle for a while
923 fade_in = resume_pos > 0
924 if resume_item.media_type == MediaType.RADIO:
925 # we're not able to skip in online radio so this is pointless
926 resume_pos = 0
927 await self.play_index(
928 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
929 )
930 else:
931 # Queue is empty, try to resume from playlog
932 if await self._try_resume_from_playlog(queue):
933 return
934 msg = f"Resume queue requested but queue {queue.display_name} is empty"
935 raise QueueEmpty(msg)
936
937 @api_command("player_queues/play_index")
938 async def play_index(
939 self,
940 queue_id: str,
941 index: int | str,
942 seek_position: int = 0,
943 fade_in: bool = False,
944 debounce: bool = False,
945 ) -> None:
946 """Play item at index (or item_id) X in queue."""
947 queue = self._queues[queue_id]
948 queue.resume_pos = 0
949 if isinstance(index, str):
950 temp_index = self.index_by_id(queue_id, index)
951 if temp_index is None:
952 raise InvalidDataError(f"Item {index} not found in queue")
953 index = temp_index
954 # At this point index is guaranteed to be int
955 queue.current_index = index
956 # update current item and elapsed time and signal update
957 # this way the UI knows immediately that a new item is loading
958 queue.current_item = self.get_item(queue_id, index)
959 queue.elapsed_time = seek_position
960 queue.elapsed_time_last_updated = time.time()
961 self.signal_update(queue_id)
962 queue.index_in_buffer = index
963 queue.flow_mode_stream_log = []
964 target_player = self.mass.players.get_player(queue_id)
965 if target_player is None:
966 raise PlayerUnavailableError(f"Player {queue_id} is not available")
967 queue.next_item_id_enqueued = None
968 # always update session id when we start a new playback session
969 queue.session_id = shortuuid.random(length=8)
970 # handle resume point of audiobook(chapter) or podcast(episode)
971 if (
972 not seek_position
973 and (queue_item := self.get_item(queue_id, index))
974 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
975 ):
976 seek_position = max(0, int((resume_position_ms - 500) / 1000))
977
978 # send play_media request to player
979 # NOTE that we debounce this a bit to account for someone hitting the next button
980 # like a madman. This will prevent the player from being overloaded with requests.
981 async def _play_index(index: int, debounce: bool) -> None:
982 for attempt in range(5):
983 try:
984 queue_item = self.get_item(queue_id, index)
985 if not queue_item:
986 continue # guard
987 await self._load_item(
988 queue_item,
989 self._get_next_index(queue_id, index),
990 is_start=True,
991 seek_position=seek_position if attempt == 0 else 0,
992 fade_in=fade_in if attempt == 0 else False,
993 )
994 # if we reach this point, loading the item succeeded, break the loop
995 queue.current_index = index
996 queue.current_item = queue_item
997 break
998 except (MediaNotFoundError, AudioError):
999 # the requested index can not be played.
1000 if queue_item:
1001 self.logger.warning(
1002 "Skipping unplayable item %s (%s)",
1003 queue_item.name,
1004 queue_item.uri,
1005 )
1006 queue_item.available = False
1007 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
1008 if next_index is None:
1009 raise MediaNotFoundError("No next item available")
1010 index = next_index
1011 else:
1012 # all attempts to find a playable item failed
1013 raise MediaNotFoundError("No playable item found to start playback")
1014
1015 # work out if we need to use flow mode
1016 flow_mode = target_player.flow_mode and queue_item.media_type not in (
1017 # don't use flow mode for duration-less streams
1018 MediaType.RADIO,
1019 MediaType.PLUGIN_SOURCE,
1020 )
1021 await asyncio.sleep(0.5 if debounce else 0.1)
1022 queue.flow_mode = flow_mode
1023 await self.mass.players.play_media(
1024 player_id=queue_id,
1025 media=await self.player_media_from_queue_item(queue_item, flow_mode),
1026 )
1027 queue.current_index = index
1028 queue.current_item = queue_item
1029 await asyncio.sleep(2)
1030 self._transitioning_players.discard(queue_id)
1031
1032 # we set a flag to notify the update logic that we're transitioning to a new track
1033 self._transitioning_players.add(queue_id)
1034
1035 # we debounce the play_index command to handle the case where someone
1036 # is spamming next/previous on the player
1037 task_id = f"play_index_{queue_id}"
1038 if existing_task := self.mass.get_task(task_id):
1039 existing_task.cancel()
1040 with suppress(asyncio.CancelledError):
1041 await existing_task
1042 task = self.mass.create_task(
1043 _play_index,
1044 index,
1045 debounce,
1046 task_id=task_id,
1047 )
1048 await task
1049 self.signal_update(queue_id)
1050
1051 @api_command("player_queues/transfer")
1052 async def transfer_queue(
1053 self,
1054 source_queue_id: str,
1055 target_queue_id: str,
1056 auto_play: bool | None = None,
1057 ) -> None:
1058 """Transfer queue to another queue."""
1059 if not (source_queue := self.get(source_queue_id)):
1060 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1061 if not (target_queue := self.get(target_queue_id)):
1062 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1063 if auto_play is None:
1064 auto_play = source_queue.state == PlaybackState.PLAYING
1065
1066 target_player = self.mass.players.get_player(target_queue_id)
1067 if target_player is None:
1068 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1069 if target_player.state.active_group or target_player.state.synced_to:
1070 # edge case: the user wants to move playback from the group as a whole, to a single
1071 # player in the group or it is grouped and the command targeted at the single player.
1072 # We need to dissolve the group first.
1073 group_id = target_player.state.active_group or target_player.state.synced_to
1074 assert group_id is not None # checked in if condition above
1075 await self.mass.players.cmd_ungroup(group_id)
1076 await asyncio.sleep(3)
1077
1078 source_items = self._queue_items[source_queue_id]
1079 target_queue.repeat_mode = source_queue.repeat_mode
1080 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1081 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1082 target_queue.radio_source = source_queue.radio_source
1083 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1084 target_queue.resume_pos = int(source_queue.elapsed_time)
1085 target_queue.current_index = source_queue.current_index
1086 if source_queue.current_item:
1087 target_queue.current_item = source_queue.current_item
1088 target_queue.current_item.queue_id = target_queue_id
1089 self.clear(source_queue_id)
1090
1091 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1092 for item in source_items:
1093 item.queue_id = target_queue_id
1094 self.update_items(target_queue_id, source_items)
1095 if auto_play:
1096 await self.resume(target_queue_id)
1097
1098 # Interaction with player
1099
1100 async def on_player_register(self, player: Player) -> None:
1101 """Register PlayerQueue for given player/queue id."""
1102 queue_id = player.player_id
1103 queue: PlayerQueue | None = None
1104 queue_items: list[QueueItem] = []
1105 # try to restore previous state
1106 if prev_state := await self.mass.cache.get(
1107 key=queue_id,
1108 provider=self.domain,
1109 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1110 ):
1111 try:
1112 queue = PlayerQueue.from_dict(prev_state)
1113 prev_items = await self.mass.cache.get(
1114 key=queue_id,
1115 provider=self.domain,
1116 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1117 default=[],
1118 )
1119 queue_items = []
1120 for idx, item_data in enumerate(prev_items):
1121 qi = QueueItem.from_cache(item_data)
1122 if not qi.media_item:
1123 # Skip items with missing media_item - this can happen if
1124 # MA was killed during shutdown while cache was being written
1125 self.logger.debug(
1126 "Skipping queue item %s (index %d) restored from cache "
1127 "without media_item",
1128 qi.name,
1129 idx,
1130 )
1131 continue
1132 queue_items.append(qi)
1133 if queue.enqueued_media_items:
1134 # we need to restore the MediaItem objects for the enqueued media items
1135 # Items from cache may be dicts that need deserialization
1136 restored_enqueued_items: list[MediaItemType] = []
1137 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1138 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1139 )
1140 for item in cached_items:
1141 if isinstance(item, dict):
1142 restored_item = media_from_dict(item)
1143 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1144 else:
1145 restored_enqueued_items.append(item)
1146 queue.enqueued_media_items = restored_enqueued_items
1147 except Exception as err:
1148 self.logger.warning(
1149 "Failed to restore the queue(items) for %s - %s",
1150 player.state.name,
1151 str(err),
1152 )
1153 # Reset to clean state on failure
1154 queue = None
1155 queue_items = []
1156 if queue is None:
1157 queue = PlayerQueue(
1158 queue_id=queue_id,
1159 active=False,
1160 display_name=player.state.name,
1161 available=player.state.available,
1162 dont_stop_the_music_enabled=False,
1163 items=0,
1164 )
1165
1166 self._queues[queue_id] = queue
1167 self._queue_items[queue_id] = queue_items
1168 # always call update to calculate state etc
1169 self.on_player_update(player, {})
1170 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1171
1172 def on_player_update(
1173 self,
1174 player: Player,
1175 changed_values: dict[str, tuple[Any, Any]],
1176 ) -> None:
1177 """
1178 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1179
1180 NOTE: This is called every second if the player is playing.
1181 """
1182 queue_id = player.player_id
1183 if (queue := self._queues.get(queue_id)) is None:
1184 # race condition
1185 return
1186 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1187 # do nothing while the announcement is in progress
1188 return
1189 # determine if this queue is currently active for this player
1190 queue.active = player.state.active_source in (queue.queue_id, None)
1191 if not queue.active and queue_id not in self._prev_states:
1192 queue.state = PlaybackState.IDLE
1193 # return early if the queue is not active and we have no previous state
1194 return
1195 if queue.queue_id in self._transitioning_players:
1196 # we're currently transitioning to a new track,
1197 # ignore updates from the player during this time
1198 return
1199
1200 # queue is active and preflight checks passed, update the queue details
1201 self._update_queue_from_player(player)
1202
1203 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1204 """Call when a player is removed from the registry."""
1205 if permanent:
1206 # if the player is permanently removed, we also remove the cached queue data
1207 self.mass.create_task(
1208 self.mass.cache.delete(
1209 key=player_id,
1210 provider=self.domain,
1211 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1212 )
1213 )
1214 self.mass.create_task(
1215 self.mass.cache.delete(
1216 key=player_id,
1217 provider=self.domain,
1218 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1219 )
1220 )
1221 self._queues.pop(player_id, None)
1222 self._queue_items.pop(player_id, None)
1223
1224 async def load_next_queue_item(
1225 self,
1226 queue_id: str,
1227 current_item_id: str,
1228 ) -> QueueItem:
1229 """
1230 Call when a player wants the next queue item to play.
1231
1232 Raises QueueEmpty if there are no more tracks left.
1233 """
1234 queue = self.get(queue_id)
1235 if not queue:
1236 msg = f"PlayerQueue {queue_id} is not available"
1237 raise PlayerUnavailableError(msg)
1238 cur_index = self.index_by_id(queue_id, current_item_id)
1239 if cur_index is None:
1240 # this is just a guard for bad data
1241 raise QueueEmpty("Invalid item id for queue given.")
1242 next_item: QueueItem | None = None
1243 idx = 0
1244 while True:
1245 next_index = self._get_next_index(queue_id, cur_index + idx)
1246 if next_index is None:
1247 raise QueueEmpty("No more tracks left in the queue.")
1248 queue_item = self.get_item(queue_id, next_index)
1249 if queue_item is None:
1250 raise QueueEmpty("No more tracks left in the queue.")
1251 if idx >= 10:
1252 # we only allow 10 retries to prevent infinite loops
1253 raise QueueEmpty("No more (playable) tracks left in the queue.")
1254 try:
1255 await self._load_item(queue_item, next_index)
1256 # we're all set, this is our next item
1257 next_item = queue_item
1258 break
1259 except (MediaNotFoundError, AudioError):
1260 # No stream details found, skip this QueueItem
1261 self.logger.warning(
1262 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1263 )
1264 queue_item.available = False
1265 idx += 1
1266 if idx != 0:
1267 # we skipped some items, signal a queue items update
1268 self.update_items(queue_id, self._queue_items[queue_id])
1269 if next_item is None:
1270 raise QueueEmpty("No more (playable) tracks left in the queue.")
1271
1272 return next_item
1273
1274 async def _load_item(
1275 self,
1276 queue_item: QueueItem,
1277 next_index: int | None,
1278 is_start: bool = False,
1279 seek_position: int = 0,
1280 fade_in: bool = False,
1281 ) -> None:
1282 """Try to load the stream details for the given queue item."""
1283 queue_id = queue_item.queue_id
1284 queue = self._queues[queue_id]
1285
1286 # we use a contextvar to bypass the throttler for this asyncio task/context
1287 # this makes sure that playback has priority over other requests that may be
1288 # happening in the background
1289 BYPASS_THROTTLER.set(True)
1290
1291 self.logger.debug(
1292 "(pre)loading (next) item for queue %s...",
1293 queue.display_name,
1294 )
1295
1296 if not queue_item.available:
1297 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1298
1299 # work out if we are playing an album and if we should prefer album
1300 # loudness
1301 next_track_from_same_album = (
1302 next_index is not None
1303 and (next_item := self.get_item(queue_id, next_index))
1304 and (
1305 queue_item.media_item
1306 and hasattr(queue_item.media_item, "album")
1307 and queue_item.media_item.album
1308 and next_item.media_item
1309 and hasattr(next_item.media_item, "album")
1310 and next_item.media_item.album
1311 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1312 )
1313 )
1314 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1315 if current_index is None:
1316 previous_track_from_same_album = False
1317 else:
1318 previous_index = max(current_index - 1, 0)
1319 previous_track_from_same_album = (
1320 previous_index > 0
1321 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1322 and previous_item.media_item is not None
1323 and hasattr(previous_item.media_item, "album")
1324 and previous_item.media_item.album is not None
1325 and queue_item.media_item is not None
1326 and hasattr(queue_item.media_item, "album")
1327 and queue_item.media_item.album is not None
1328 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1329 )
1330 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1331 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1332 album = queue_item.media_item.album
1333 # prefer the full library media item so we have all metadata and provider(quality) info
1334 # always request the full library item as there might be other qualities available
1335 if library_item := await self.mass.music.get_library_item_by_prov_id(
1336 queue_item.media_item.media_type,
1337 queue_item.media_item.item_id,
1338 queue_item.media_item.provider,
1339 ):
1340 queue_item.media_item = cast("Track", library_item)
1341 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1342 "ytmusic"
1343 ):
1344 # Youtube Music has poor thumbs by default, so we always fetch the full item
1345 # this also catches the case where they have an unavailable item in a listing
1346 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1347 queue_item.media_item = cast("Track", fetched_item)
1348
1349 # ensure we got the full (original) album set
1350 if album and (
1351 library_album := await self.mass.music.get_library_item_by_prov_id(
1352 album.media_type,
1353 album.item_id,
1354 album.provider,
1355 )
1356 ):
1357 queue_item.media_item.album = cast("Album", library_album)
1358 elif album:
1359 # Restore original album if we have no better alternative from the library
1360 queue_item.media_item.album = album
1361 # prefer album image over track image
1362 if queue_item.media_item.album and queue_item.media_item.album.image:
1363 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1364 queue_item.media_item.metadata.images = UniqueList(
1365 [
1366 queue_item.media_item.album.image,
1367 *org_images,
1368 ]
1369 )
1370 # Fetch the streamdetails, which could raise in case of an unplayable item.
1371 # For example, YT Music returns Radio Items that are not playable.
1372 queue_item.streamdetails = await get_stream_details(
1373 mass=self.mass,
1374 queue_item=queue_item,
1375 seek_position=seek_position,
1376 fade_in=fade_in,
1377 prefer_album_loudness=bool(playing_album_tracks),
1378 )
1379
1380 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1381 """Call when a player has (started) loading a track in the buffer."""
1382 queue = self.get(queue_id)
1383 if not queue:
1384 msg = f"PlayerQueue {queue_id} is not available"
1385 raise PlayerUnavailableError(msg)
1386 # store the index of the item that is currently (being) loaded in the buffer
1387 # which helps us a bit to determine how far the player has buffered ahead
1388 queue.index_in_buffer = self.index_by_id(queue_id, item_id)
1389 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1390 self.signal_update(queue_id)
1391 # preload next streamdetails
1392 self._preload_next_item(queue_id, item_id)
1393
1394 # Main queue manipulation methods
1395
1396 async def load(
1397 self,
1398 queue_id: str,
1399 queue_items: list[QueueItem],
1400 insert_at_index: int = 0,
1401 keep_remaining: bool = True,
1402 keep_played: bool = True,
1403 shuffle: bool = False,
1404 ) -> None:
1405 """Load new items at index.
1406
1407 - queue_id: id of the queue to process this request.
1408 - queue_items: a list of QueueItems
1409 - insert_at_index: insert the item(s) at this index
1410 - keep_remaining: keep the remaining items after the insert
1411 - shuffle: (re)shuffle the items after insert index
1412 """
1413 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1414 next_items = queue_items
1415
1416 # if keep_remaining, append the old 'next' items
1417 if keep_remaining:
1418 next_items += self._queue_items[queue_id][insert_at_index:]
1419
1420 # we set the original insert order as attribute so we can un-shuffle
1421 for index, item in enumerate(next_items):
1422 item.sort_index += insert_at_index + index
1423 # (re)shuffle the final batch if needed
1424 if shuffle:
1425 next_items = await _smart_shuffle(next_items)
1426 self.update_items(queue_id, prev_items + next_items)
1427
1428 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1429 """Update the existing queue items, mostly caused by reordering."""
1430 self._queue_items[queue_id] = queue_items
1431 queue = self._queues[queue_id]
1432 queue.items = len(self._queue_items[queue_id])
1433 # to track if the queue items changed we set a timestamp
1434 # this is a simple way to detect changes in the list of items
1435 # without having to compare the entire list
1436 queue.items_last_updated = time.time()
1437 self.signal_update(queue_id, True)
1438 if (
1439 queue.state == PlaybackState.PLAYING
1440 and queue.index_in_buffer is not None
1441 and queue.index_in_buffer == queue.current_index
1442 ):
1443 # if the queue is playing,
1444 # ensure to (re)queue the next track because it might have changed
1445 # note that we only do this if the player has loaded the current track
1446 # if not, we wait until it has loaded to prevent conflicts
1447 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1448 self._enqueue_next_item(queue_id, next_item)
1449
1450 # Helper methods
1451
1452 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1453 """Get queue item by index or item_id."""
1454 if item_id_or_index is None:
1455 return None
1456 if (queue_items := self._queue_items.get(queue_id)) is None:
1457 return None
1458 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1459 return queue_items[item_id_or_index]
1460 if isinstance(item_id_or_index, str):
1461 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1462 return None
1463
1464 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1465 """Signal state changed of given queue."""
1466 queue = self._queues[queue_id]
1467 if items_changed:
1468 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1469 # save items in cache - only cache items with valid media_item
1470 cache_data = [
1471 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1472 ]
1473 self.mass.create_task(
1474 self.mass.cache.set(
1475 key=queue_id,
1476 data=cache_data,
1477 provider=self.domain,
1478 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1479 )
1480 )
1481 # always send the base event
1482 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1483 # also signal update to the player itself so it can update its current_media
1484 self.mass.players.trigger_player_update(queue_id)
1485 # save state
1486 self.mass.create_task(
1487 self.mass.cache.set(
1488 key=queue_id,
1489 data=queue.to_cache(),
1490 provider=self.domain,
1491 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1492 )
1493 )
1494
1495 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1496 """Get index by queue_item_id."""
1497 queue_items = self._queue_items[queue_id]
1498 for index, item in enumerate(queue_items):
1499 if item.queue_item_id == queue_item_id:
1500 return index
1501 return None
1502
1503 async def player_media_from_queue_item(
1504 self, queue_item: QueueItem, flow_mode: bool
1505 ) -> PlayerMedia:
1506 """Parse PlayerMedia from QueueItem."""
1507 queue = self._queues[queue_item.queue_id]
1508 if flow_mode:
1509 duration = None
1510 elif queue_item.streamdetails:
1511 # prefer netto duration
1512 # when seeking, the player only receives the remaining duration
1513 duration = queue_item.streamdetails.duration or queue_item.duration
1514 if duration and queue_item.streamdetails.seek_position:
1515 duration = duration - queue_item.streamdetails.seek_position
1516 else:
1517 duration = queue_item.duration
1518 if queue.session_id is None:
1519 # handle error or return early
1520 raise InvalidDataError("Queue session_id is None")
1521 media = PlayerMedia(
1522 uri=queue_item.uri,
1523 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1524 title="Music Assistant" if flow_mode else queue_item.name,
1525 image_url=MASS_LOGO_ONLINE,
1526 duration=duration,
1527 source_id=queue_item.queue_id,
1528 queue_item_id=queue_item.queue_item_id,
1529 custom_data={
1530 "session_id": queue.session_id,
1531 "original_uri": queue_item.uri,
1532 "flow_mode": flow_mode,
1533 },
1534 )
1535 if not flow_mode and queue_item.media_item:
1536 media.title = queue_item.media_item.name
1537 media.artist = getattr(queue_item.media_item, "artist_str", "")
1538 media.album = (
1539 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1540 )
1541 if queue_item.image:
1542 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1543 # we prefer the imageproxy on the streamserver here because this request is sent
1544 # to the player itself which may not be able to reach the regular webserver
1545 media.image_url = self.mass.metadata.get_image_url(
1546 queue_item.image, size=500, prefer_stream_server=True
1547 )
1548 return media
1549
1550 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1551 """Return tracks for given artist, based on user preference."""
1552 artist_items_conf = self.mass.config.get_raw_core_config_value(
1553 self.domain,
1554 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1555 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1556 )
1557 self.logger.info(
1558 "Fetching tracks to play for artist %s",
1559 artist.name,
1560 )
1561 if artist_items_conf in ("library_tracks", "all_tracks"):
1562 all_items = await self.mass.music.artists.tracks(
1563 artist.item_id,
1564 artist.provider,
1565 in_library_only=artist_items_conf == "library_tracks",
1566 )
1567 random.shuffle(all_items)
1568 return all_items
1569 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1570 all_tracks: list[Track] = []
1571 for library_album in await self.mass.music.artists.albums(
1572 artist.item_id,
1573 artist.provider,
1574 in_library_only=artist_items_conf == "library_album_tracks",
1575 ):
1576 for album_track in await self.mass.music.albums.tracks(
1577 library_album.item_id, library_album.provider
1578 ):
1579 if album_track not in all_tracks:
1580 all_tracks.append(album_track)
1581 random.shuffle(all_tracks)
1582 return all_tracks
1583 return []
1584
1585 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1586 """Return tracks for given album, based on user preference."""
1587 album_items_conf = self.mass.config.get_raw_core_config_value(
1588 self.domain,
1589 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1590 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1591 )
1592 result: list[Track] = []
1593 start_item_found = False
1594 self.logger.info(
1595 "Fetching tracks to play for album %s",
1596 album.name,
1597 )
1598 for album_track in await self.mass.music.albums.tracks(
1599 item_id=album.item_id,
1600 provider_instance_id_or_domain=album.provider,
1601 in_library_only=album_items_conf == "library_tracks",
1602 ):
1603 if not album_track.available:
1604 continue
1605 if start_item in (album_track.item_id, album_track.uri):
1606 start_item_found = True
1607 if start_item is not None and not start_item_found:
1608 continue
1609 result.append(album_track)
1610 return result
1611
1612 async def get_playlist_tracks(
1613 self, playlist: Playlist, start_item: str | None
1614 ) -> list[PlaylistPlayableItem]:
1615 """Return tracks for given playlist, based on user preference."""
1616 result: list[PlaylistPlayableItem] = []
1617 start_item_found = False
1618 self.logger.info(
1619 "Fetching tracks to play for playlist %s",
1620 playlist.name,
1621 )
1622 # TODO: Handle other sort options etc.
1623 async for playlist_track in self.mass.music.playlists.tracks(
1624 playlist.item_id, playlist.provider
1625 ):
1626 if not playlist_track.available:
1627 continue
1628 if start_item in (playlist_track.item_id, playlist_track.uri):
1629 start_item_found = True
1630 if start_item is not None and not start_item_found:
1631 continue
1632 result.append(playlist_track)
1633 return result
1634
1635 async def get_audiobook_resume_point(
1636 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1637 ) -> int:
1638 """Return resume point (in milliseconds) for given audio book."""
1639 self.logger.debug(
1640 "Fetching resume point to play for audio book %s",
1641 audio_book.name,
1642 )
1643 if chapter is not None:
1644 # user explicitly selected a chapter to play
1645 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1646 if chapters := audio_book.metadata.chapters:
1647 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1648 return int(_chapter.start * 1000)
1649 raise InvalidDataError(
1650 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1651 )
1652 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1653 audio_book, userid=userid
1654 )
1655 return 0 if full_played else resume_position_ms
1656
1657 async def get_next_podcast_episodes(
1658 self,
1659 podcast: Podcast | None,
1660 episode: PodcastEpisode | str | None,
1661 userid: str | None = None,
1662 ) -> UniqueList[PodcastEpisode]:
1663 """Return (next) episode(s) and resume point for given podcast."""
1664 if podcast is None and isinstance(episode, str | NoneType):
1665 raise InvalidDataError("Either podcast or episode must be provided")
1666 if podcast is None:
1667 # single podcast episode requested
1668 assert isinstance(episode, PodcastEpisode) # checked above
1669 self.logger.debug(
1670 "Fetching resume point to play for Podcast episode %s",
1671 episode.name,
1672 )
1673 (
1674 fully_played,
1675 resume_position_ms,
1676 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1677 episode.fully_played = fully_played
1678 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1679 return UniqueList([episode])
1680 # podcast with optional start episode requested
1681 self.logger.debug(
1682 "Fetching episode(s) and resume point to play for Podcast %s",
1683 podcast.name,
1684 )
1685 all_episodes = [
1686 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1687 ]
1688 all_episodes.sort(key=lambda x: x.position)
1689 # if a episode was provided, a user explicitly selected a episode to play
1690 # so we need to find the index of the episode in the list
1691 resolved_episode: PodcastEpisode | None = None
1692 if isinstance(episode, PodcastEpisode):
1693 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1694 if resolved_episode:
1695 # ensure we have accurate resume info
1696 (
1697 fully_played,
1698 resume_position_ms,
1699 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1700 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1701 elif isinstance(episode, str):
1702 resolved_episode = next(
1703 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1704 )
1705 if resolved_episode:
1706 # ensure we have accurate resume info
1707 (
1708 fully_played,
1709 resume_position_ms,
1710 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1711 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1712 else:
1713 # get first episode that is not fully played
1714 for ep in all_episodes:
1715 if ep.fully_played:
1716 continue
1717 # ensure we have accurate resume info
1718 (
1719 fully_played,
1720 resume_position_ms,
1721 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1722 if fully_played:
1723 continue
1724 ep.resume_position_ms = resume_position_ms
1725 resolved_episode = ep
1726 break
1727 else:
1728 # no episodes found that are not fully played, so we start at the beginning
1729 resolved_episode = next((x for x in all_episodes), None)
1730 if resolved_episode is None:
1731 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1732 # get the index of the episode
1733 episode_index = all_episodes.index(resolved_episode)
1734 # return the (remaining) episode(s) to play
1735 return UniqueList(all_episodes[episode_index:])
1736
1737 def _get_next_index(
1738 self,
1739 queue_id: str,
1740 cur_index: int | None,
1741 is_skip: bool = False,
1742 allow_repeat: bool = True,
1743 ) -> int | None:
1744 """
1745 Return the next index for the queue, accounting for repeat settings.
1746
1747 Will return None if there are no (more) items in the queue.
1748 """
1749 queue = self._queues[queue_id]
1750 queue_items = self._queue_items[queue_id]
1751 if not queue_items or cur_index is None:
1752 # queue is empty
1753 return None
1754 # handle repeat single track
1755 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1756 return cur_index if allow_repeat else None
1757 # handle cur_index is last index of the queue
1758 if cur_index >= (len(queue_items) - 1):
1759 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1760 # if repeat all is enabled, we simply start again from the beginning
1761 return 0
1762 return None
1763 # all other: just the next index
1764 return cur_index + 1
1765
1766 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1767 """Return next QueueItem for given queue."""
1768 index: int
1769 if isinstance(cur_index, str):
1770 resolved_index = self.index_by_id(queue_id, cur_index)
1771 if resolved_index is None:
1772 return None # guard
1773 index = resolved_index
1774 else:
1775 index = cur_index
1776 # At this point index is guaranteed to be int
1777 for skip in range(5):
1778 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1779 break
1780 next_item = self.get_item(queue_id, next_index)
1781 if next_item is None:
1782 continue
1783 if not next_item.available:
1784 # ensure that we skip unavailable items (set by load_next track logic)
1785 continue
1786 return next_item
1787 return None
1788
1789 async def _fill_radio_tracks(self, queue_id: str) -> None:
1790 """Fill a Queue with (additional) Radio tracks."""
1791 self.logger.debug(
1792 "Filling radio tracks for queue %s",
1793 queue_id,
1794 )
1795 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1796 # fill queue - filter out unavailable items
1797 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1798 await self.load(
1799 queue_id,
1800 queue_items,
1801 insert_at_index=len(self._queue_items[queue_id]) + 1,
1802 )
1803
1804 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1805 """Enqueue the next item on the player."""
1806 if not next_item:
1807 # no next item, nothing to do...
1808 return
1809
1810 queue = self._queues[queue_id]
1811 if queue.flow_mode:
1812 # ignore this for flow mode
1813 return
1814
1815 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1816 await self.mass.players.enqueue_next_media(
1817 player_id=queue_id,
1818 media=await self.player_media_from_queue_item(next_item, False),
1819 )
1820 if queue.next_item_id_enqueued != next_item.queue_item_id:
1821 queue.next_item_id_enqueued = next_item.queue_item_id
1822 self.logger.debug(
1823 "Enqueued next track %s on queue %s",
1824 next_item.name,
1825 self._queues[queue_id].display_name,
1826 )
1827
1828 task_id = f"enqueue_next_item_{queue_id}"
1829 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1830
1831 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1832 """
1833 Preload the streamdetails for the next item in the queue/buffer.
1834
1835 This basically ensures the item is playable and fetches the stream details.
1836 If an error occurs, the item will be skipped and the next item will be loaded.
1837 """
1838 queue = self._queues[queue_id]
1839
1840 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1841 try:
1842 # wait for the item that was loaded in the buffer is the actually playing item
1843 # this prevents a race condition when we preload the next item too soon
1844 # while the player is actually preloading the previously enqueued item.
1845 retries = 120
1846 while retries > 0:
1847 if not queue.current_item:
1848 return # guard
1849 if queue.current_item.queue_item_id == item_id_in_buffer:
1850 break
1851 retries -= 1
1852 await asyncio.sleep(1)
1853 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1854 self.logger.debug(
1855 "Preloaded next item %s for queue %s",
1856 next_item.name,
1857 queue.display_name,
1858 )
1859 # enqueue the next item on the player
1860 self._enqueue_next_item(queue_id, next_item)
1861
1862 except QueueEmpty:
1863 return
1864
1865 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1866 # this should not happen, but guard anyways
1867 return
1868 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1869 # radio items or no duration, nothing to do
1870 return
1871
1872 task_id = f"preload_next_item_{queue_id}"
1873 self.mass.create_task(
1874 _preload_streamdetails,
1875 item_id_in_buffer,
1876 task_id=task_id,
1877 abort_existing=True,
1878 )
1879
1880 async def _resolve_media_items(
1881 self,
1882 media_item: MediaItemType | ItemMapping | BrowseFolder,
1883 start_item: str | None = None,
1884 userid: str | None = None,
1885 queue_id: str | None = None,
1886 ) -> list[MediaItemType]:
1887 """Resolve/unwrap media items to enqueue."""
1888 # resolve Itemmapping to full media item
1889 if isinstance(media_item, ItemMapping):
1890 if media_item.uri is None:
1891 raise InvalidDataError("ItemMapping has no URI")
1892 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1893 if media_item.media_type == MediaType.PLAYLIST:
1894 media_item = cast("Playlist", media_item)
1895 self.mass.create_task(
1896 self.mass.music.mark_item_played(
1897 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1898 )
1899 )
1900 return list(await self.get_playlist_tracks(media_item, start_item))
1901 if media_item.media_type == MediaType.ARTIST:
1902 media_item = cast("Artist", media_item)
1903 self.mass.create_task(
1904 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1905 )
1906 return list(await self.get_artist_tracks(media_item))
1907 if media_item.media_type == MediaType.ALBUM:
1908 media_item = cast("Album", media_item)
1909 self.mass.create_task(
1910 self.mass.music.mark_item_played(
1911 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1912 )
1913 )
1914 return list(await self.get_album_tracks(media_item, start_item))
1915 if media_item.media_type == MediaType.AUDIOBOOK:
1916 media_item = cast("Audiobook", media_item)
1917 # ensure we grab the correct/latest resume point info
1918 media_item.resume_position_ms = await self.get_audiobook_resume_point(
1919 media_item, start_item, userid=userid
1920 )
1921 return [media_item]
1922 if media_item.media_type == MediaType.PODCAST:
1923 media_item = cast("Podcast", media_item)
1924 self.mass.create_task(
1925 self.mass.music.mark_item_played(
1926 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1927 )
1928 )
1929 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
1930 if media_item.media_type == MediaType.PODCAST_EPISODE:
1931 media_item = cast("PodcastEpisode", media_item)
1932 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
1933 if media_item.media_type == MediaType.FOLDER:
1934 media_item = cast("BrowseFolder", media_item)
1935 return list(await self._get_folder_tracks(media_item))
1936 # all other: single track or radio item
1937 return [cast("MediaItemType", media_item)]
1938
1939 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
1940 """Try to resume playback from playlog when queue is empty.
1941
1942 Attempts to find user-initiated recently played items in the following order:
1943 1. By userid AND queue_id
1944 2. By queue_id only
1945 3. By userid only (if available)
1946 4. Any recently played item
1947
1948 :param queue: The queue to resume playback on.
1949 :return: True if playback was started, False otherwise.
1950 """
1951 # Try different filter combinations in order of specificity
1952 filter_attempts: list[tuple[str | None, str | None, str]] = []
1953 if queue.userid:
1954 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
1955 filter_attempts.append((None, queue.queue_id, "queue_id match"))
1956 if queue.userid:
1957 filter_attempts.append((queue.userid, None, "userid match"))
1958 filter_attempts.append((None, None, "any recent item"))
1959
1960 for userid, queue_id, match_type in filter_attempts:
1961 items = await self.mass.music.recently_played(
1962 limit=5,
1963 fully_played_only=False,
1964 user_initiated_only=True,
1965 userid=userid,
1966 queue_id=queue_id,
1967 )
1968 for item in items:
1969 if not item.uri:
1970 continue
1971 try:
1972 await self.play_media(queue.queue_id, item)
1973 self.logger.info(
1974 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
1975 )
1976 return True
1977 except MusicAssistantError as err:
1978 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
1979 continue
1980
1981 return False
1982
1983 async def _get_radio_tracks(
1984 self, queue_id: str, is_initial_radio_mode: bool = False
1985 ) -> list[Track]:
1986 """Call the registered music providers for dynamic tracks."""
1987 queue = self._queues[queue_id]
1988 queue_track_items: list[Track] = [
1989 q.media_item
1990 for q in self._queue_items[queue_id]
1991 if q.media_item and isinstance(q.media_item, Track)
1992 ]
1993 if not queue.radio_source:
1994 # this may happen during race conditions as this method is called delayed
1995 return []
1996 self.logger.info(
1997 "Fetching radio tracks for queue %s based on: %s",
1998 queue.display_name,
1999 ", ".join([x.name for x in queue.radio_source]),
2000 )
2001
2002 # Get user's preferred provider instances for steering provider selection
2003 preferred_provider_instances: list[str] | None = None
2004 if (
2005 queue.userid
2006 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
2007 and playback_user.provider_filter
2008 ):
2009 preferred_provider_instances = playback_user.provider_filter
2010
2011 available_base_tracks: list[Track] = []
2012 base_track_sample_size = 5
2013 # Some providers have very deterministic similar track algorithms when providing
2014 # a single track item. When we have a radio mode based on 1 track and we have to
2015 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2016 if (
2017 len(queue.radio_source) == 1
2018 and queue.radio_source[0].media_type == MediaType.TRACK
2019 and not is_initial_radio_mode
2020 ):
2021 available_base_tracks = queue_track_items
2022 else:
2023 # Grab all the available base tracks based on the selected source items.
2024 # shuffle the source items, just in case
2025 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2026 ctrl = self.mass.music.get_controller(radio_item.media_type)
2027 try:
2028 available_base_tracks += [
2029 track
2030 for track in await ctrl.radio_mode_base_tracks(
2031 radio_item, # type: ignore[arg-type]
2032 preferred_provider_instances,
2033 )
2034 # Avoid duplicate base tracks
2035 if track not in available_base_tracks
2036 ]
2037 except UnsupportedFeaturedException as err:
2038 self.logger.debug(
2039 "Skip loading radio items for %s: %s ",
2040 radio_item.uri,
2041 str(err),
2042 )
2043 if not available_base_tracks:
2044 raise UnsupportedFeaturedException("Radio mode not available for source items")
2045
2046 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2047 base_tracks = random.sample(
2048 available_base_tracks,
2049 min(base_track_sample_size, len(available_base_tracks)),
2050 )
2051 # Use a set to avoid duplicate dynamic tracks
2052 dynamic_tracks: set[Track] = set()
2053 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2054 for allow_lookup in (False, True):
2055 if dynamic_tracks:
2056 break
2057 for base_track in base_tracks:
2058 try:
2059 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2060 base_track.item_id,
2061 base_track.provider,
2062 allow_lookup=allow_lookup,
2063 preferred_provider_instances=preferred_provider_instances,
2064 )
2065 except MediaNotFoundError:
2066 # Some providers don't have similar tracks for all items. For example,
2067 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2068 # in that case, just skip the track.
2069 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2070 continue
2071 for track in _similar_tracks:
2072 if (
2073 track not in base_tracks
2074 # Exclude tracks we have already played / queued
2075 and track not in queue_track_items
2076 # Ignore tracks that are too long for radio mode, e.g. mixes
2077 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2078 ):
2079 dynamic_tracks.add(track)
2080 if len(dynamic_tracks) >= 50:
2081 break
2082 queue_tracks: list[Track] = []
2083 dynamic_tracks_list = list(dynamic_tracks)
2084 # Only include the sampled base tracks when the radio mode is first initialized
2085 if is_initial_radio_mode:
2086 queue_tracks += [base_tracks[0]]
2087 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2088 if len(base_tracks) > 1:
2089 for base_track in base_tracks[1:]:
2090 queue_tracks += [base_track]
2091 if len(dynamic_tracks_list) > 2:
2092 queue_tracks += random.sample(dynamic_tracks_list, 2)
2093 else:
2094 queue_tracks += dynamic_tracks_list
2095 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2096 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2097 if remaining_dynamic_tracks:
2098 queue_tracks += random.sample(
2099 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2100 )
2101 return queue_tracks
2102
2103 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2104 """Fetch (playable) tracks for given browse folder."""
2105 self.logger.info(
2106 "Fetching tracks to play for folder %s",
2107 folder.name,
2108 )
2109 tracks: list[Track] = []
2110 for item in await self.mass.music.browse(folder.path):
2111 if not item.is_playable:
2112 continue
2113 # recursively fetch tracks from all media types
2114 resolved = await self._resolve_media_items(item)
2115 tracks += [x for x in resolved if isinstance(x, Track)]
2116
2117 return tracks
2118
2119 def _update_queue_from_player(
2120 self,
2121 player: Player,
2122 ) -> None:
2123 """Update the Queue when the player state changed."""
2124 queue_id = player.player_id
2125 queue = self._queues[queue_id]
2126
2127 # basic properties
2128 queue.display_name = player.state.name
2129 queue.available = player.state.available
2130 queue.items = len(self._queue_items[queue_id])
2131
2132 queue.state = (
2133 player.state.playback_state or PlaybackState.IDLE
2134 if queue.active
2135 else PlaybackState.IDLE
2136 )
2137 # update current item/index from player report
2138 if queue.active and queue.state in (
2139 PlaybackState.PLAYING,
2140 PlaybackState.PAUSED,
2141 ):
2142 # NOTE: If the queue is not playing (yet) we will not update the current index
2143 # to ensure we keep the previously known current index
2144 if queue.flow_mode:
2145 # flow mode active, the player is playing one long stream
2146 # so we need to calculate the current index and elapsed time
2147 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2148 elif item_id := self._parse_player_current_item_id(queue_id, player):
2149 # normal mode, the player itself will report the current item
2150 elapsed_time = int(player.state.corrected_elapsed_time or 0)
2151 current_index = self.index_by_id(queue_id, item_id)
2152 else:
2153 # this may happen if the player is still transitioning between tracks
2154 # we ignore this for now and keep the current index as is
2155 return
2156
2157 # get current/next item based on current index
2158 queue.current_index = current_index
2159 queue.current_item = current_item = self.get_item(queue_id, current_index)
2160 queue.next_item = (
2161 self.get_next_item(queue_id, current_index)
2162 if current_item and current_index is not None
2163 else None
2164 )
2165
2166 # correct elapsed time when seeking
2167 if (
2168 not queue.flow_mode
2169 and current_item
2170 and current_item.streamdetails
2171 and current_item.streamdetails.seek_position
2172 ):
2173 elapsed_time += current_item.streamdetails.seek_position
2174 queue.elapsed_time = elapsed_time
2175 queue.elapsed_time_last_updated = time.time()
2176
2177 elif not queue.current_item and queue.current_index is not None:
2178 current_index = queue.current_index
2179 queue.current_item = current_item = self.get_item(queue_id, current_index)
2180 queue.next_item = (
2181 self.get_next_item(queue_id, current_index)
2182 if current_item and current_index is not None
2183 else None
2184 )
2185
2186 # This is enough to detect any changes in the DSPDetails
2187 # (so child count changed, or any output format changed)
2188 output_formats = []
2189 if output_format := player.extra_data.get("output_format"):
2190 output_formats.append(str(output_format))
2191 for child_id in player.state.group_members:
2192 if (child := self.mass.players.get_player(child_id)) and (
2193 output_format := child.extra_data.get("output_format")
2194 ):
2195 output_formats.append(str(output_format))
2196 else:
2197 output_formats.append("unknown")
2198
2199 # basic throttle: do not send state changed events if queue did not actually change
2200 prev_state: CompareState = self._prev_states.get(
2201 queue_id,
2202 CompareState(
2203 queue_id=queue_id,
2204 state=PlaybackState.IDLE,
2205 current_item_id=None,
2206 next_item_id=None,
2207 current_item=None,
2208 elapsed_time=0,
2209 last_playing_elapsed_time=0,
2210 stream_title=None,
2211 codec_type=None,
2212 output_formats=None,
2213 ),
2214 )
2215 # update last_playing_elapsed_time only when the player is actively playing
2216 # use corrected_elapsed_time which accounts for time since last update
2217 # this preserves the last known elapsed time when transitioning to idle/paused
2218 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2219 prev_item_id = prev_state["current_item_id"]
2220 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2221 if queue.state == PlaybackState.PLAYING:
2222 current_elapsed = int(queue.corrected_elapsed_time)
2223 if current_item_id != prev_item_id:
2224 # new track started, reset the elapsed time tracker
2225 last_playing_elapsed_time = current_elapsed
2226 else:
2227 # same track, use the max of current and previous to handle timing issues
2228 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2229 else:
2230 last_playing_elapsed_time = prev_playing_elapsed
2231 new_state = CompareState(
2232 queue_id=queue_id,
2233 state=queue.state,
2234 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2235 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2236 current_item=queue.current_item,
2237 elapsed_time=int(queue.elapsed_time),
2238 last_playing_elapsed_time=last_playing_elapsed_time,
2239 stream_title=(
2240 queue.current_item.streamdetails.stream_title
2241 if queue.current_item and queue.current_item.streamdetails
2242 else None
2243 ),
2244 codec_type=(
2245 queue.current_item.streamdetails.audio_format.codec_type
2246 if queue.current_item and queue.current_item.streamdetails
2247 else None
2248 ),
2249 output_formats=output_formats,
2250 )
2251 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2252 with suppress(KeyError):
2253 changed_keys.remove("next_item_id")
2254 with suppress(KeyError):
2255 changed_keys.remove("last_playing_elapsed_time")
2256
2257 # store the new state
2258 if queue.active:
2259 self._prev_states[queue_id] = new_state
2260 else:
2261 self._prev_states.pop(queue_id, None)
2262
2263 # return early if nothing changed
2264 if len(changed_keys) == 0:
2265 return
2266
2267 # signal update and store state
2268 send_update = True
2269 if changed_keys == {"elapsed_time"}:
2270 # only elapsed time changed, do not send full queue update
2271 send_update = False
2272 prev_time = prev_state.get("elapsed_time") or 0
2273 cur_time = new_state.get("elapsed_time") or 0
2274 if abs(cur_time - prev_time) > 2:
2275 # send dedicated event for time updates when seeking
2276 self.mass.signal_event(
2277 EventType.QUEUE_TIME_UPDATED,
2278 object_id=queue_id,
2279 data=queue.elapsed_time,
2280 )
2281 # also signal update to the player itself so it can update its current_media
2282 self.mass.players.trigger_player_update(queue_id)
2283
2284 if send_update:
2285 self.signal_update(queue_id)
2286
2287 if "output_formats" in changed_keys:
2288 # refresh DSP details since they may have changed
2289 dsp = get_stream_dsp_details(self.mass, queue_id)
2290 if queue.current_item and queue.current_item.streamdetails:
2291 queue.current_item.streamdetails.dsp = dsp
2292 if queue.next_item and queue.next_item.streamdetails:
2293 queue.next_item.streamdetails.dsp = dsp
2294
2295 # handle updating stream_metadata if needed
2296 if (
2297 queue.current_item
2298 and (streamdetails := queue.current_item.streamdetails)
2299 and streamdetails.stream_metadata_update_callback
2300 and (
2301 streamdetails.stream_metadata_last_updated is None
2302 or (
2303 time.time() - streamdetails.stream_metadata_last_updated
2304 >= streamdetails.stream_metadata_update_interval
2305 )
2306 )
2307 ):
2308 streamdetails.stream_metadata_last_updated = time.time()
2309 self.mass.create_task(
2310 streamdetails.stream_metadata_update_callback(
2311 streamdetails, int(queue.corrected_elapsed_time)
2312 )
2313 )
2314
2315 # handle sending a playback progress report
2316 # we do this every 30 seconds or when the state changes
2317 if (
2318 changed_keys.intersection({"state", "current_item_id"})
2319 or int(queue.elapsed_time) % PLAYBACK_REPORT_INTERVAL_SECONDS == 0
2320 ):
2321 self._handle_playback_progress_report(queue, prev_state, new_state)
2322
2323 # check if we need to clear the queue if we reached the end
2324 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2325 self._handle_end_of_queue(queue, prev_state, new_state)
2326
2327 # watch dynamic radio items refill if needed
2328 if "current_item_id" in changed_keys:
2329 # auto enable radio mode if dont stop the music is enabled
2330 if (
2331 queue.dont_stop_the_music_enabled
2332 and queue.enqueued_media_items
2333 and queue.current_index is not None
2334 and (queue.items - queue.current_index) <= 1
2335 ):
2336 # We have received the last item in the queue and Don't stop the music is enabled
2337 # set the played media item(s) as radio items (which will refill the queue)
2338 # note that this will fail if there are no media items for which we have
2339 # a dynamic radio source.
2340 self.logger.debug(
2341 "End of queue detected and Don't stop the music is enabled for %s"
2342 " - setting enqueued media items as radio source: %s",
2343 queue.display_name,
2344 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2345 )
2346 queue.radio_source = queue.enqueued_media_items
2347 # auto fill radio tracks if less than 5 tracks left in the queue
2348 if (
2349 queue.radio_source
2350 and queue.current_index is not None
2351 and (queue.items - queue.current_index) < 5
2352 ):
2353 task_id = f"fill_radio_tracks_{queue_id}"
2354 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2355
2356 def _get_flow_queue_stream_index(
2357 self, queue: PlayerQueue, player: Player
2358 ) -> tuple[int | None, int]:
2359 """Calculate current queue index and current track elapsed time when flow mode is active."""
2360 elapsed_time_queue_total = player.state.corrected_elapsed_time or 0
2361 if queue.current_index is None and not queue.flow_mode_stream_log:
2362 return queue.current_index, int(queue.elapsed_time)
2363
2364 # For each track that has been streamed/buffered to the player,
2365 # a playlog entry will be created with the queue item id
2366 # and the amount of seconds streamed. We traverse the playlog to figure
2367 # out where we are in the queue, accounting for actual streamed
2368 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2369 # it will simply be in the playlog multiple times.
2370 played_time = 0.0
2371 queue_index: int | None = queue.current_index or 0
2372 track_time = 0.0
2373 for play_log_entry in queue.flow_mode_stream_log:
2374 queue_item_duration = (
2375 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2376 play_log_entry.seconds_streamed
2377 if play_log_entry.seconds_streamed is not None
2378 else play_log_entry.duration or 3600 * 24 * 7
2379 )
2380 if elapsed_time_queue_total > (queue_item_duration + played_time):
2381 # total elapsed time is more than (streamed) track duration
2382 # this track has been fully played, move on.
2383 played_time += queue_item_duration
2384 else:
2385 # no more seconds left to divide, this is our track
2386 # account for any seeking by adding the skipped/seeked seconds
2387 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2388 queue_item = self.get_item(queue.queue_id, queue_index)
2389 if queue_item and queue_item.streamdetails:
2390 track_sec_skipped = queue_item.streamdetails.seek_position
2391 else:
2392 track_sec_skipped = 0
2393 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2394 break
2395 if player.state.playback_state != PlaybackState.PLAYING:
2396 # if the player is not playing, we can't be sure that the elapsed time is correct
2397 # so we just return the queue index and the elapsed time
2398 return queue.current_index, int(queue.elapsed_time)
2399 return queue_index, int(track_time)
2400
2401 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2402 """Parse QueueItem ID from Player's current url."""
2403 protocol_player = player
2404 if player.active_output_protocol and player.active_output_protocol != "native":
2405 protocol_player = self.mass.players.get_player(player.active_output_protocol) or player
2406 if not protocol_player.current_media:
2407 # YES, we use player.current_media on purpose here because we need the raw metadata
2408 return None
2409 # prefer queue_id and queue_item_id within the current media
2410 if (
2411 protocol_player.current_media.source_id == queue_id
2412 and protocol_player.current_media.queue_item_id
2413 ):
2414 return protocol_player.current_media.queue_item_id
2415 # special case for sonos players
2416 if protocol_player.current_media.uri and protocol_player.current_media.uri.startswith(
2417 f"mass:{queue_id}"
2418 ):
2419 if protocol_player.current_media.queue_item_id:
2420 return protocol_player.current_media.queue_item_id
2421 return protocol_player.current_media.uri.split(":")[-1]
2422 # try to extract the item id from a mass stream url
2423 if (
2424 protocol_player.current_media.uri
2425 and queue_id in protocol_player.current_media.uri
2426 and self.mass.streams.base_url in protocol_player.current_media.uri
2427 ):
2428 current_item_id = protocol_player.current_media.uri.rsplit("/")[-1].split(".")[0]
2429 if self.get_item(queue_id, current_item_id):
2430 return current_item_id
2431 # try to extract the item id from a queue_id/item_id combi
2432 if (
2433 protocol_player.current_media.uri
2434 and queue_id in protocol_player.current_media.uri
2435 and "/" in protocol_player.current_media.uri
2436 ):
2437 current_item_id = protocol_player.current_media.uri.split("/")[1]
2438 if self.get_item(queue_id, current_item_id):
2439 return current_item_id
2440
2441 return None
2442
2443 def _handle_end_of_queue(
2444 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2445 ) -> None:
2446 """Check if the queue should be cleared after the current item."""
2447 # check if queue state changed to stopped (from playing/paused to idle)
2448 if not (
2449 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2450 and new_state["state"] == PlaybackState.IDLE
2451 ):
2452 return
2453 # check if no more items in the queue (next_item should be None at end of queue)
2454 if queue.next_item is not None:
2455 return
2456 # check if we had a previous item playing
2457 if prev_state["current_item_id"] is None:
2458 return
2459
2460 async def _clear_queue_delayed() -> None:
2461 for _ in range(5):
2462 await asyncio.sleep(1)
2463 if queue.state != PlaybackState.IDLE:
2464 return
2465 if queue.next_item is not None:
2466 return
2467 self.logger.info("End of queue reached, clearing items")
2468 self.clear(queue.queue_id)
2469
2470 # all checks passed, we stopped playback at the last (or single) track of the queue
2471 # now determine if the item was fully played before clearing
2472
2473 # For flow mode, check if the last track was fully streamed using the stream log
2474 # This is more reliable than elapsed_time which can be reset/incorrect
2475 if queue.flow_mode and queue.flow_mode_stream_log:
2476 last_log_entry = queue.flow_mode_stream_log[-1]
2477 if last_log_entry.seconds_streamed is not None:
2478 # The last track finished streaming, safe to clear queue
2479 self.mass.create_task(_clear_queue_delayed())
2480 return
2481
2482 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2483 prev_item = prev_state["current_item"]
2484 if prev_item and (streamdetails := prev_item.streamdetails):
2485 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2486 elif prev_item:
2487 duration = prev_item.duration or 24 * 3600
2488 else:
2489 # No current item means player has already cleared it, safe to clear queue
2490 self.mass.create_task(_clear_queue_delayed())
2491 return
2492
2493 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2494 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2495 seconds_played = int(prev_state["last_playing_elapsed_time"])
2496 # debounce this a bit to make sure we're not clearing the queue by accident
2497 # only clear if the last track was played to near completion (within 5 seconds of end)
2498 if seconds_played >= (duration or 3600) - 5:
2499 self.mass.create_task(_clear_queue_delayed())
2500
2501 def _handle_playback_progress_report(
2502 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2503 ) -> None:
2504 """Handle playback progress report."""
2505 # detect change in current index to report that a item has been played
2506 prev_item_id = prev_state["current_item_id"]
2507 cur_item_id = new_state["current_item_id"]
2508 if prev_item_id is None and cur_item_id is None:
2509 return
2510
2511 if prev_item_id is not None and prev_item_id != cur_item_id:
2512 # we have a new item, so we need report the previous one
2513 is_current_item = False
2514 item_to_report = prev_state["current_item"]
2515 seconds_played = int(prev_state["elapsed_time"])
2516 else:
2517 # report on current item
2518 is_current_item = True
2519 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2520 seconds_played = int(new_state["elapsed_time"])
2521
2522 if not item_to_report:
2523 return # guard against invalid items
2524
2525 if not (media_item := item_to_report.media_item):
2526 # only report on media items
2527 return
2528 assert media_item.uri is not None # uri is set in __post_init__
2529
2530 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2531 # Ignore items that had a stream error
2532 return
2533
2534 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2535 duration = int(item_to_report.streamdetails.duration)
2536 else:
2537 duration = int(item_to_report.duration or 3 * 3600)
2538
2539 if seconds_played < 5:
2540 # ignore items that have been played less than 5 seconds
2541 # this also filters out a bounce effect where the previous item
2542 # gets reported with 0 elapsed seconds after a new item starts playing
2543 return
2544
2545 # determine if item is fully played
2546 # for podcasts and audiobooks we account for the last 60 seconds
2547 percentage_played = percentage(seconds_played, duration)
2548 if not is_current_item and item_to_report.media_type in (
2549 MediaType.AUDIOBOOK,
2550 MediaType.PODCAST_EPISODE,
2551 ):
2552 fully_played = seconds_played >= duration - 60
2553 elif not is_current_item:
2554 # 90% of the track must be played to be considered fully played
2555 fully_played = percentage_played >= 90
2556 else:
2557 fully_played = seconds_played >= duration - 10
2558
2559 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2560 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2561 self.logger.debug(
2562 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2563 queue.display_name,
2564 "is playing" if is_playing else "played",
2565 item_to_report.name,
2566 item_to_report.uri,
2567 fully_played,
2568 f"{percentage_played}%",
2569 seconds_played,
2570 duration,
2571 )
2572 # add entry to playlog - this also handles resume of podcasts/audiobooks
2573 self.mass.create_task(
2574 self.mass.music.mark_item_played(
2575 media_item,
2576 fully_played=fully_played,
2577 seconds_played=seconds_played,
2578 is_playing=is_playing,
2579 userid=queue.userid,
2580 queue_id=queue.queue_id,
2581 user_initiated=False,
2582 )
2583 )
2584
2585 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2586 # signal 'media item played' event,
2587 # which is useful for plugins that want to do scrobbling
2588 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2589 artists_names = [a.name for a in artists]
2590 self.mass.signal_event(
2591 EventType.MEDIA_ITEM_PLAYED,
2592 object_id=media_item.uri,
2593 data=MediaItemPlaybackProgressReport(
2594 uri=media_item.uri,
2595 media_type=media_item.media_type,
2596 name=media_item.name,
2597 version=getattr(media_item, "version", None),
2598 artist=(
2599 getattr(media_item, "artist_str", None) or artists_names[0]
2600 if artists_names
2601 else None
2602 ),
2603 artists=artists_names,
2604 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2605 album=album.name if album else None,
2606 album_mbid=album.mbid if album else None,
2607 album_artist=(album.artist_str if isinstance(album, Album) else None),
2608 album_artist_mbids=(
2609 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2610 ),
2611 image_url=(
2612 self.mass.metadata.get_image_url(
2613 item_to_report.media_item.image, prefer_proxy=False
2614 )
2615 if item_to_report.media_item.image
2616 else None
2617 ),
2618 duration=duration,
2619 mbid=(getattr(media_item, "mbid", None)),
2620 seconds_played=seconds_played,
2621 fully_played=fully_played,
2622 is_playing=is_playing,
2623 userid=queue.userid,
2624 ),
2625 )
2626
2627
2628async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2629 """Shuffle queue items, avoiding identical tracks next to each other.
2630
2631 Best-effort approach to prevent the same track from appearing adjacent.
2632 Does a random shuffle first, then makes a limited number of passes to
2633 swap adjacent duplicates with a random item further in the list.
2634
2635 :param items: List of queue items to shuffle.
2636 """
2637 if len(items) <= 2:
2638 return random.sample(items, len(items)) if len(items) == 2 else items
2639
2640 # Start with a random shuffle
2641 shuffled = random.sample(items, len(items))
2642
2643 # Make a few passes to fix adjacent duplicates
2644 max_passes = 3
2645 for _ in range(max_passes):
2646 swapped = False
2647 for i in range(len(shuffled) - 1):
2648 if shuffled[i].name == shuffled[i + 1].name:
2649 # Found adjacent duplicate - swap with random position at least 2 away
2650 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2651 if swap_candidates:
2652 swap_pos = random.choice(swap_candidates)
2653 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2654 swapped = True
2655 if not swapped:
2656 break
2657 # Yield to event loop between passes
2658 await asyncio.sleep(0)
2659
2660 return shuffled
2661