/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 ProviderFeature,
32 QueueOption,
33 RepeatMode,
34)
35from music_assistant_models.errors import (
36 AudioError,
37 InvalidCommand,
38 InvalidDataError,
39 MediaNotFoundError,
40 MusicAssistantError,
41 PlayerUnavailableError,
42 QueueEmpty,
43 UnsupportedFeaturedException,
44)
45from music_assistant_models.media_items import (
46 Album,
47 Artist,
48 Audiobook,
49 BrowseFolder,
50 Genre,
51 ItemMapping,
52 MediaItemType,
53 PlayableMediaItemType,
54 Playlist,
55 Podcast,
56 PodcastEpisode,
57 Track,
58 UniqueList,
59 media_from_dict,
60)
61from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
62from music_assistant_models.player_queue import PlayerQueue
63from music_assistant_models.queue_item import QueueItem
64
65from music_assistant.constants import (
66 ATTR_ANNOUNCEMENT_IN_PROGRESS,
67 MASS_LOGO_ONLINE,
68 PLAYBACK_REPORT_INTERVAL_SECONDS,
69 PLAYLIST_MEDIA_TYPES,
70 VERBOSE_LOG_LEVEL,
71 PlaylistPlayableItem,
72)
73from music_assistant.controllers.players.controller import IN_QUEUE_COMMAND
74from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
75from music_assistant.helpers.api import api_command
76from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
77from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
78from music_assistant.helpers.util import get_changed_keys, percentage
79from music_assistant.models.core_controller import CoreController
80from music_assistant.models.player import Player, PlayerMedia
81
82if TYPE_CHECKING:
83 from collections.abc import Iterator
84
85 from music_assistant_models.auth import User
86 from music_assistant_models.media_items.metadata import MediaItemImage
87
88 from music_assistant import MusicAssistant
89 from music_assistant.models.player import Player
90
91CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
92CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
93
94ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
95ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
96
97CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
98CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
99CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
100CONF_DEFAULT_ENQUEUE_OPTION_GENRE = "default_enqueue_option_genre"
101CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
102CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
103CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
104CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
105CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
106CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
107CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
108RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
109CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
110CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
111
112
113class CompareState(TypedDict):
114 """Simple object where we store the (previous) state of a queue.
115
116 Used for compare actions.
117 """
118
119 queue_id: str
120 state: PlaybackState
121 current_item_id: str | None
122 next_item_id: str | None
123 current_item: QueueItem | None
124 elapsed_time: int
125 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
126 # used to determine if a track was fully played when transitioning to idle
127 last_playing_elapsed_time: int
128 stream_title: str | None
129 codec_type: ContentType | None
130 output_formats: list[str] | None
131
132
133class PlayerQueuesController(CoreController):
134 """Controller holding all logic to enqueue music for players."""
135
136 domain: str = "player_queues"
137
138 def __init__(self, mass: MusicAssistant) -> None:
139 """Initialize core controller."""
140 super().__init__(mass)
141 self._queues: dict[str, PlayerQueue] = {}
142 self._queue_items: dict[str, list[QueueItem]] = {}
143 self._prev_states: dict[str, CompareState] = {}
144 self._transitioning_players: set[str] = set()
145 self.manifest.name = "Player Queues controller"
146 self.manifest.description = (
147 "Music Assistant's core controller which manages the queues for all players."
148 )
149 self.manifest.icon = "playlist-music"
150
151 async def close(self) -> None:
152 """Cleanup on exit."""
153 # stop all playback
154 for queue in self.all():
155 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
156 await self.stop(queue.queue_id)
157
158 async def get_config_entries(
159 self,
160 action: str | None = None,
161 values: dict[str, ConfigValueType] | None = None,
162 ) -> tuple[ConfigEntry, ...]:
163 """Return all Config Entries for this core module (if any)."""
164 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
165 return (
166 ConfigEntry(
167 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
168 type=ConfigEntryType.STRING,
169 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
170 label="Items to select when you play a (in-library) artist.",
171 options=[
172 ConfigValueOption(
173 title="Only in-library tracks",
174 value="library_tracks",
175 ),
176 ConfigValueOption(
177 title="All tracks from all albums in the library",
178 value="library_album_tracks",
179 ),
180 ConfigValueOption(
181 title="All (top) tracks from (all) streaming provider(s)",
182 value="all_tracks",
183 ),
184 ConfigValueOption(
185 title="All tracks from all albums from (all) streaming provider(s)",
186 value="all_album_tracks",
187 ),
188 ],
189 ),
190 ConfigEntry(
191 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
192 type=ConfigEntryType.STRING,
193 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
194 label="Items to select when you play a (in-library) album.",
195 options=[
196 ConfigValueOption(
197 title="Only in-library tracks",
198 value="library_tracks",
199 ),
200 ConfigValueOption(
201 title="All tracks for album on (streaming) provider",
202 value="all_tracks",
203 ),
204 ],
205 ),
206 ConfigEntry(
207 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
208 type=ConfigEntryType.STRING,
209 default_value=QueueOption.REPLACE.value,
210 label="Default enqueue option for Artist item(s).",
211 options=enqueue_options,
212 description="Define the default enqueue action for this mediatype.",
213 ),
214 ConfigEntry(
215 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
216 type=ConfigEntryType.STRING,
217 default_value=QueueOption.REPLACE.value,
218 label="Default enqueue option for Album item(s).",
219 options=enqueue_options,
220 description="Define the default enqueue action for this mediatype.",
221 ),
222 ConfigEntry(
223 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
224 type=ConfigEntryType.STRING,
225 default_value=QueueOption.PLAY.value,
226 label="Default enqueue option for Track item(s).",
227 options=enqueue_options,
228 description="Define the default enqueue action for this mediatype.",
229 ),
230 ConfigEntry(
231 key=CONF_DEFAULT_ENQUEUE_OPTION_GENRE,
232 type=ConfigEntryType.STRING,
233 default_value=QueueOption.REPLACE.value,
234 label="Default enqueue option for Genre item(s).",
235 options=enqueue_options,
236 description="Define the default enqueue action for this mediatype.",
237 ),
238 ConfigEntry(
239 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
240 type=ConfigEntryType.STRING,
241 default_value=QueueOption.REPLACE.value,
242 label="Default enqueue option for Radio item(s).",
243 options=enqueue_options,
244 description="Define the default enqueue action for this mediatype.",
245 ),
246 ConfigEntry(
247 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
248 type=ConfigEntryType.STRING,
249 default_value=QueueOption.REPLACE.value,
250 label="Default enqueue option for Playlist item(s).",
251 options=enqueue_options,
252 description="Define the default enqueue action for this mediatype.",
253 ),
254 ConfigEntry(
255 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
256 type=ConfigEntryType.STRING,
257 default_value=QueueOption.REPLACE.value,
258 label="Default enqueue option for Audiobook item(s).",
259 options=enqueue_options,
260 hidden=True,
261 ),
262 ConfigEntry(
263 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
264 type=ConfigEntryType.STRING,
265 default_value=QueueOption.REPLACE.value,
266 label="Default enqueue option for Podcast item(s).",
267 options=enqueue_options,
268 hidden=True,
269 ),
270 ConfigEntry(
271 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
272 type=ConfigEntryType.STRING,
273 default_value=QueueOption.REPLACE.value,
274 label="Default enqueue option for Podcast-episode item(s).",
275 options=enqueue_options,
276 hidden=True,
277 ),
278 ConfigEntry(
279 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
280 type=ConfigEntryType.STRING,
281 default_value=QueueOption.REPLACE.value,
282 label="Default enqueue option for Folder item(s).",
283 options=enqueue_options,
284 hidden=True,
285 ),
286 )
287
288 def __iter__(self) -> Iterator[PlayerQueue]:
289 """Iterate over (available) players."""
290 return iter(self._queues.values())
291
292 @api_command("player_queues/all")
293 def all(self) -> tuple[PlayerQueue, ...]:
294 """Return all registered PlayerQueues."""
295 return tuple(self._queues.values())
296
297 @api_command("player_queues/get")
298 def get(self, queue_id: str) -> PlayerQueue | None:
299 """Return PlayerQueue by queue_id or None if not found."""
300 return self._queues.get(queue_id)
301
302 @api_command("player_queues/items")
303 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
304 """Return all QueueItems for given PlayerQueue."""
305 if queue_id not in self._queue_items:
306 return []
307
308 return self._queue_items[queue_id][offset : offset + limit]
309
310 @api_command("player_queues/get_active_queue")
311 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
312 """Return the current active/synced queue for a player."""
313 if player := self.mass.players.get_player(player_id):
314 return self.mass.players.get_active_queue(player)
315 return None
316
317 # Queue commands
318
319 @api_command("player_queues/shuffle")
320 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
321 """Configure shuffle setting on the the queue."""
322 queue = self._queues[queue_id]
323 if queue.shuffle_enabled == shuffle_enabled:
324 return # no change
325 queue.shuffle_enabled = shuffle_enabled
326 queue_items = self._queue_items[queue_id]
327 cur_index = (
328 queue.index_in_buffer if queue.index_in_buffer is not None else queue.current_index
329 )
330 if cur_index is not None:
331 next_index = cur_index + 1
332 next_items = queue_items[next_index:]
333 else:
334 next_items = []
335 next_index = 0
336 if not shuffle_enabled:
337 # shuffle disabled, try to restore original sort order of the remaining items
338 next_items.sort(key=lambda x: x.sort_index, reverse=False)
339 await self.load(
340 queue_id=queue_id,
341 queue_items=next_items,
342 insert_at_index=next_index,
343 keep_remaining=False,
344 shuffle=shuffle_enabled,
345 )
346
347 @api_command("player_queues/dont_stop_the_music")
348 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
349 """Configure Don't stop the music setting on the queue."""
350 providers_available_with_similar_tracks = any(
351 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
352 for provider in self.mass.music.providers
353 )
354 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
355 raise UnsupportedFeaturedException(
356 "Don't stop the music is not supported by any of the available music providers"
357 )
358 queue = self._queues[queue_id]
359 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
360 self.signal_update(queue_id=queue_id)
361 # if this happens to be the last track in the queue, fill the radio source
362 if (
363 queue.dont_stop_the_music_enabled
364 and queue.enqueued_media_items
365 and queue.current_index is not None
366 and (queue.items - queue.current_index) <= 1
367 ):
368 queue.radio_source = queue.enqueued_media_items
369 task_id = f"fill_radio_tracks_{queue_id}"
370 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
371
372 @api_command("player_queues/repeat")
373 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
374 """Configure repeat setting on the the queue."""
375 queue = self._queues[queue_id]
376 if queue.repeat_mode == repeat_mode:
377 return # no change
378 queue.repeat_mode = repeat_mode
379 self.signal_update(queue_id)
380 if (
381 queue.state == PlaybackState.PLAYING
382 and queue.index_in_buffer is not None
383 and queue.index_in_buffer == queue.current_index
384 ):
385 # if the queue is playing,
386 # ensure to (re)queue the next track because it might have changed
387 # note that we only do this if the player has loaded the current track
388 # if not, we wait until it has loaded to prevent conflicts
389 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
390 self._enqueue_next_item(queue_id, next_item)
391
392 @api_command("player_queues/play_media")
393 async def play_media(
394 self,
395 queue_id: str,
396 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
397 option: QueueOption | None = None,
398 radio_mode: bool = False,
399 start_item: PlayableMediaItemType | str | None = None,
400 username: str | None = None,
401 ) -> None:
402 """Play media item(s) on the given queue.
403
404 :param queue_id: The queue_id of the queue to play media on.
405 :param media: Media that should be played (MediaItem(s) and/or uri's).
406 :param option: Which enqueue mode to use.
407 :param radio_mode: Enable radio mode for the given item(s).
408 :param start_item: Optional item to start the playlist or album from.
409 :param username: The username of the user requesting the playback.
410 Setting the username allows for overriding the logged-in user
411 to account for playback history per user when the play_media is
412 called from a shared context (like a web hook or automation).
413 """
414 # ruff: noqa: PLR0915
415 # we use a contextvar to bypass the throttler for this asyncio task/context
416 # this makes sure that playback has priority over other requests that may be
417 # happening in the background
418 BYPASS_THROTTLER.set(True)
419 if not (queue := self.get(queue_id)):
420 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
421 # always fetch the underlying player so we can raise early if its not available
422 queue_player = self.mass.players.get_player(queue_id, True)
423 assert queue_player is not None # for type checking
424 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
425 self.logger.warning("Ignore queue command: An announcement is in progress")
426 return
427
428 # save the user requesting the playback
429 playback_user: User | None
430 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
431 playback_user = user
432 else:
433 playback_user = get_current_user()
434 queue.userid = playback_user.user_id if playback_user else None
435
436 # a single item or list of items may be provided
437 media_list = media if isinstance(media, list) else [media]
438
439 # clear queue if needed
440 if option == QueueOption.REPLACE:
441 self.clear(queue_id)
442 # Clear the 'enqueued media item' list when a new queue is requested
443 if option not in (QueueOption.ADD, QueueOption.NEXT):
444 queue.enqueued_media_items.clear()
445
446 media_items: list[MediaItemType] = []
447 radio_source: list[MediaItemType] = []
448 # resolve all media items
449 for item in media_list:
450 try:
451 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
452 media_item: MediaItemType | ItemMapping | BrowseFolder
453 if isinstance(item, str):
454 media_item = await self.mass.music.get_item_by_uri(item)
455 elif isinstance(item, dict): # type: ignore[unreachable]
456 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
457 # converting them to MediaItem objects. The parse_value function in api.py
458 # should handle dict-to-object conversion, but dicts are slipping through
459 # in some cases. This is defensive handling for that parser bug.
460 media_item = media_from_dict(item) # type: ignore[unreachable]
461 self.logger.debug("Converted to: %s", type(media_item))
462 else:
463 # item is MediaItemType | ItemMapping at this point
464 media_item = item
465
466 # Save requested media item to play on the queue so we can use it as a source
467 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
468 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
469 if not isinstance(
470 media_item, (ItemMapping, BrowseFolder)
471 ) and media_item.media_type in (
472 MediaType.TRACK,
473 MediaType.ALBUM,
474 MediaType.PLAYLIST,
475 MediaType.ARTIST,
476 ):
477 queue.enqueued_media_items.append(media_item)
478 if len(queue.enqueued_media_items) > 10:
479 queue.enqueued_media_items.pop(0)
480
481 # handle default enqueue option if needed
482 if option is None:
483 config_value = await self.mass.config.get_core_config_value(
484 self.domain,
485 f"default_enqueue_option_{media_item.media_type.value}",
486 return_type=str,
487 )
488 option = QueueOption(config_value)
489 if option == QueueOption.REPLACE:
490 self.clear(queue_id, skip_stop=True)
491
492 # collect media_items to play
493 if radio_mode:
494 # Type guard for mypy - only add full MediaItemType to radio_source
495 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
496 radio_source.append(media_item)
497 else:
498 # Convert start_item to string URI if needed
499 start_item_uri: str | None = None
500 if isinstance(start_item, str):
501 start_item_uri = start_item
502 elif start_item is not None:
503 start_item_uri = start_item.uri
504 media_items += await self._resolve_media_items(
505 media_item, start_item_uri, queue_id=queue_id
506 )
507
508 except MusicAssistantError as err:
509 # invalid MA uri or item not found error
510 self.logger.warning("Skipping %s: %s", item, str(err))
511
512 # overwrite or append radio source items
513 if option not in (QueueOption.ADD, QueueOption.NEXT):
514 queue.radio_source = radio_source
515 else:
516 queue.radio_source += radio_source
517 # Use collected media items to calculate the radio if radio mode is on
518 if radio_mode:
519 radio_tracks = await self._get_radio_tracks(
520 queue_id=queue_id, is_initial_radio_mode=True
521 )
522 media_items = list(radio_tracks)
523
524 # only add valid/available items
525 queue_items: list[QueueItem] = []
526 for x in media_items:
527 if not x or not x.available:
528 continue
529 queue_items.append(
530 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
531 )
532
533 if not queue_items:
534 raise MediaNotFoundError("No playable items found")
535
536 # load the items into the queue
537 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
538 cur_index = (
539 queue.index_in_buffer
540 if queue.index_in_buffer is not None
541 else (queue.current_index if queue.current_index is not None else 0)
542 )
543 else:
544 cur_index = queue.current_index or 0
545 insert_at_index = cur_index + 1
546 # Radio modes are already shuffled in a pattern we would like to keep.
547 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
548
549 # handle replace: clear all items and replace with the new items
550 if option == QueueOption.REPLACE:
551 await self.load(
552 queue_id,
553 queue_items=queue_items,
554 keep_remaining=False,
555 keep_played=False,
556 shuffle=shuffle,
557 )
558 await self.play_index(queue_id, 0)
559 return
560 # handle next: add item(s) in the index next to the playing/loaded/buffered index
561 if option == QueueOption.NEXT:
562 await self.load(
563 queue_id,
564 queue_items=queue_items,
565 insert_at_index=insert_at_index,
566 shuffle=shuffle,
567 )
568 return
569 if option == QueueOption.REPLACE_NEXT:
570 await self.load(
571 queue_id,
572 queue_items=queue_items,
573 insert_at_index=insert_at_index,
574 keep_remaining=False,
575 shuffle=shuffle,
576 )
577 return
578 # handle play: replace current loaded/playing index with new item(s)
579 if option == QueueOption.PLAY:
580 await self.load(
581 queue_id,
582 queue_items=queue_items,
583 insert_at_index=insert_at_index,
584 shuffle=shuffle,
585 )
586 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
587 await self.play_index(queue_id, next_index)
588 return
589 # handle add: add/append item(s) to the remaining queue items
590 if option == QueueOption.ADD:
591 await self.load(
592 queue_id=queue_id,
593 queue_items=queue_items,
594 insert_at_index=insert_at_index
595 if queue.shuffle_enabled
596 else len(self._queue_items[queue_id]) + 1,
597 shuffle=queue.shuffle_enabled,
598 )
599 # handle edgecase, queue is empty and items are only added (not played)
600 # mark first item as new index
601 if queue.current_index is None:
602 queue.current_index = 0
603 queue.current_item = self.get_item(queue_id, 0)
604 queue.items = len(queue_items)
605 self.signal_update(queue_id)
606
607 @api_command("player_queues/move_item")
608 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
609 """
610 Move queue item x up/down the queue.
611
612 - queue_id: id of the queue to process this request.
613 - queue_item_id: the item_id of the queueitem that needs to be moved.
614 - pos_shift: move item x positions down if positive value
615 - pos_shift: move item x positions up if negative value
616 - pos_shift: move item to top of queue as next item if 0.
617 """
618 queue = self._queues[queue_id]
619 item_index = self.index_by_id(queue_id, queue_item_id)
620 if item_index is None:
621 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
622 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
623 msg = f"{item_index} is already played/buffered"
624 raise IndexError(msg)
625
626 queue_items = self._queue_items[queue_id]
627 queue_items = queue_items.copy()
628
629 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
630 new_index = (queue.current_index or 0) + 1
631 elif pos_shift == 0:
632 new_index = queue.current_index or 0
633 else:
634 new_index = item_index + pos_shift
635 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
636 return
637 # move the item in the list
638 queue_items.insert(new_index, queue_items.pop(item_index))
639 self.update_items(queue_id, queue_items)
640
641 @api_command("player_queues/move_item_end")
642 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
643 """
644 Move queue item to the end the queue.
645
646 - queue_id: id of the queue to process this request.
647 - queue_item_id: the item_id of the queueitem that needs to be moved.
648 """
649 queue = self._queues[queue_id]
650 item_index = self.index_by_id(queue_id, queue_item_id)
651 if item_index is None:
652 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
653 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
654 msg = f"{item_index} is already played/buffered"
655 raise IndexError(msg)
656
657 queue_items = self._queue_items[queue_id]
658 if item_index == (len(queue_items) - 1):
659 return
660 queue_items = queue_items.copy()
661
662 new_index = len(self._queue_items[queue_id]) - 1
663
664 # move the item in the list
665 queue_items.insert(new_index, queue_items.pop(item_index))
666 self.update_items(queue_id, queue_items)
667
668 @api_command("player_queues/delete_item")
669 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
670 """Delete item (by id or index) from the queue."""
671 if isinstance(item_id_or_index, str):
672 item_index = self.index_by_id(queue_id, item_id_or_index)
673 if item_index is None:
674 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
675 else:
676 item_index = item_id_or_index
677 queue = self._queues[queue_id]
678 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
679 # ignore request if track already loaded in the buffer
680 # the frontend should guard so this is just in case
681 self.logger.warning("delete requested for item already loaded in buffer")
682 return
683 queue_items = self._queue_items[queue_id]
684 queue_items.pop(item_index)
685 self.update_items(queue_id, queue_items)
686
687 @api_command("player_queues/clear")
688 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
689 """Clear all items in the queue."""
690 queue = self._queues[queue_id]
691 queue.radio_source = []
692 if queue.state != PlaybackState.IDLE and not skip_stop:
693 self.mass.create_task(self.stop(queue_id))
694 queue.current_index = None
695 queue.current_item = None
696 queue.elapsed_time = 0
697 queue.elapsed_time_last_updated = time.time()
698 queue.index_in_buffer = None
699 self.update_items(queue_id, [])
700
701 @api_command("player_queues/save_as_playlist")
702 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
703 """Save the current queue items as a new playlist.
704
705 :param queue_id: The queue_id of the queue to save.
706 :param name: The name for the new playlist.
707 """
708 if not self.get(queue_id):
709 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
710 queue_items = self._queue_items.get(queue_id, [])
711 if not queue_items:
712 raise QueueEmpty("Cannot save an empty queue as a playlist.")
713 # collect URIs from queue items that are playlist-compatible
714 uris: list[str] = []
715 for item in queue_items:
716 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
717 uris.append(item.uri)
718 if not uris:
719 raise InvalidDataError("No valid items in queue to save as playlist.")
720 playlist = await self.mass.music.playlists.create_playlist(name)
721 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
722 return playlist
723
724 @api_command("player_queues/stop")
725 async def stop(self, queue_id: str) -> None:
726 """
727 Handle STOP command for given queue.
728
729 - queue_id: queue_id of the playerqueue to handle the command.
730 """
731 queue_player = self.mass.players.get_player(queue_id, True)
732 if queue_player is None:
733 raise PlayerUnavailableError(f"Player {queue_id} is not available")
734 if (queue := self.get(queue_id)) and queue.active:
735 if queue.state == PlaybackState.PLAYING:
736 queue.resume_pos = int(queue.corrected_elapsed_time)
737 # Set context to prevent circular call, then forward the actual command to the player
738 token = IN_QUEUE_COMMAND.set(True)
739 try:
740 await self.mass.players.cmd_stop(queue_id)
741 finally:
742 IN_QUEUE_COMMAND.reset(token)
743
744 @api_command("player_queues/play")
745 async def play(self, queue_id: str) -> None:
746 """
747 Handle PLAY command for given queue.
748
749 - queue_id: queue_id of the playerqueue to handle the command.
750 """
751 queue_player = self.mass.players.get_player(queue_id, True)
752 if queue_player is None:
753 raise PlayerUnavailableError(f"Player {queue_id} is not available")
754 if (
755 (queue := self._queues.get(queue_id))
756 and queue.active
757 and queue.state == PlaybackState.PAUSED
758 ):
759 # forward the actual play/unpause command to the player
760 await queue_player.play()
761 return
762 # player is not paused, perform resume instead
763 await self.resume(queue_id)
764
765 @api_command("player_queues/pause")
766 async def pause(self, queue_id: str) -> None:
767 """Handle PAUSE command for given queue.
768
769 - queue_id: queue_id of the playerqueue to handle the command.
770 """
771 if not (queue := self._queues.get(queue_id)):
772 return
773 queue_active = queue.active
774 if queue.active and queue.state == PlaybackState.PLAYING:
775 queue.resume_pos = int(queue.corrected_elapsed_time)
776 # forward the actual command to the player controller
777 # Set context to prevent circular call, then forward the actual command to the player
778 token = IN_QUEUE_COMMAND.set(True)
779 try:
780 await self.mass.players.cmd_pause(queue_id)
781 finally:
782 IN_QUEUE_COMMAND.reset(token)
783
784 async def _watch_pause(player: Player) -> None:
785 count = 0
786 # wait for pause
787 while count < 5 and player.state.playback_state == PlaybackState.PLAYING:
788 count += 1
789 await asyncio.sleep(1)
790 # wait for unpause
791 if player.state.playback_state != PlaybackState.PAUSED:
792 return
793 count = 0
794 while count < 30 and player.state.playback_state == PlaybackState.PAUSED:
795 count += 1
796 await asyncio.sleep(1)
797 # if player is still paused when the limit is reached, send stop
798 if player.state.playback_state == PlaybackState.PAUSED:
799 await self.stop(queue_id)
800
801 # we auto stop a player from paused when its paused for 30 seconds
802 if (
803 queue_active
804 and (queue_player := self.mass.players.get_player(queue_id))
805 and not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS)
806 ):
807 self.mass.create_task(_watch_pause(queue_player))
808
809 @api_command("player_queues/play_pause")
810 async def play_pause(self, queue_id: str) -> None:
811 """Toggle play/pause on given playerqueue.
812
813 - queue_id: queue_id of the queue to handle the command.
814 """
815 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
816 await self.pause(queue_id)
817 return
818 await self.play(queue_id)
819
820 @api_command("player_queues/next")
821 async def next(self, queue_id: str) -> None:
822 """Handle NEXT TRACK command for given queue.
823
824 - queue_id: queue_id of the queue to handle the command.
825 """
826 if (queue := self.get(queue_id)) is None or not queue.active:
827 raise InvalidCommand(f"Queue {queue_id} is not active")
828 idx = self._queues[queue_id].current_index
829 if idx is None:
830 self.logger.warning("Queue %s has no current index", queue.display_name)
831 return
832 attempts = 5
833 while attempts:
834 try:
835 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
836 await self.play_index(queue_id, next_index, debounce=True)
837 break
838 except MediaNotFoundError:
839 self.logger.warning(
840 "Failed to fetch next track for queue %s - trying next item",
841 queue.display_name,
842 )
843 idx += 1
844 attempts -= 1
845
846 @api_command("player_queues/previous")
847 async def previous(self, queue_id: str) -> None:
848 """Handle PREVIOUS TRACK command for given queue.
849
850 - queue_id: queue_id of the queue to handle the command.
851 """
852 if (queue := self.get(queue_id)) is None or not queue.active:
853 raise InvalidCommand(f"Queue {queue_id} is not active")
854 current_index = self._queues[queue_id].current_index
855 if current_index is None:
856 return
857 next_index = int(current_index)
858 # restart current track if current track has played longer than 4
859 # otherwise skip to previous track
860 if self._queues[queue_id].elapsed_time < 5:
861 next_index = max(current_index - 1, 0)
862 await self.play_index(queue_id, next_index, debounce=True)
863
864 @api_command("player_queues/skip")
865 async def skip(self, queue_id: str, seconds: int = 10) -> None:
866 """Handle SKIP command for given queue.
867
868 - queue_id: queue_id of the queue to handle the command.
869 - seconds: number of seconds to skip in track. Use negative value to skip back.
870 """
871 if (queue := self.get(queue_id)) is None or not queue.active:
872 raise InvalidCommand(f"Queue {queue_id} is not active")
873 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
874
875 @api_command("player_queues/seek")
876 async def seek(self, queue_id: str, position: int = 10) -> None:
877 """Handle SEEK command for given queue.
878
879 - queue_id: queue_id of the queue to handle the command.
880 - position: position in seconds to seek to in the current playing item.
881 """
882 if (queue := self.get(queue_id)) is None or not queue.active:
883 raise InvalidCommand(f"Queue {queue_id} is not active")
884 queue_player = self.mass.players.get_player(queue_id, True)
885 if queue_player is None:
886 raise PlayerUnavailableError(f"Player {queue_id} is not available")
887 if not queue.current_item:
888 raise InvalidCommand(f"Queue {queue_player.state.name} has no item(s) loaded.")
889 if not queue.current_item.duration:
890 raise InvalidCommand("Can not seek items without duration.")
891 position = max(0, int(position))
892 if position > queue.current_item.duration:
893 raise InvalidCommand("Can not seek outside of duration range.")
894 if queue.current_index is None:
895 raise InvalidCommand(f"Queue {queue_player.state.name} has no current index.")
896 await self.play_index(queue_id, queue.current_index, seek_position=position)
897
898 @api_command("player_queues/resume")
899 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
900 """Handle RESUME command for given queue.
901
902 - queue_id: queue_id of the queue to handle the command.
903 """
904 queue = self._queues[queue_id]
905 queue_items = self._queue_items[queue_id]
906 resume_item = queue.current_item
907 if queue.state == PlaybackState.PLAYING:
908 # resume requested while already playing,
909 # use current position as resume position
910 resume_pos = queue.corrected_elapsed_time
911 fade_in = False
912 else:
913 resume_pos = queue.resume_pos or queue.elapsed_time
914
915 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
916 resume_item = self.get_item(queue_id, queue.current_index)
917 resume_pos = 0
918 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
919 # items available in queue but no previous track, start at 0
920 resume_item = self.get_item(queue_id, 0)
921 resume_pos = 0
922
923 if resume_item is not None:
924 queue_player = self.mass.players.get_player(queue_id)
925 if queue_player is None:
926 raise PlayerUnavailableError(f"Player {queue_id} is not available")
927 if (
928 fade_in is None
929 and queue_player.state.playback_state == PlaybackState.IDLE
930 and (time.time() - queue.elapsed_time_last_updated) > 60
931 ):
932 # enable fade in effect if the player is idle for a while
933 fade_in = resume_pos > 0
934 if resume_item.media_type == MediaType.RADIO:
935 # we're not able to skip in online radio so this is pointless
936 resume_pos = 0
937 await self.play_index(
938 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
939 )
940 else:
941 # Queue is empty, try to resume from playlog
942 if await self._try_resume_from_playlog(queue):
943 return
944 msg = f"Resume queue requested but queue {queue.display_name} is empty"
945 raise QueueEmpty(msg)
946
947 @api_command("player_queues/play_index")
948 async def play_index(
949 self,
950 queue_id: str,
951 index: int | str,
952 seek_position: int = 0,
953 fade_in: bool = False,
954 debounce: bool = False,
955 ) -> None:
956 """Play item at index (or item_id) X in queue."""
957 queue = self._queues[queue_id]
958 queue.resume_pos = 0
959 if isinstance(index, str):
960 temp_index = self.index_by_id(queue_id, index)
961 if temp_index is None:
962 raise InvalidDataError(f"Item {index} not found in queue")
963 index = temp_index
964 # At this point index is guaranteed to be int
965 queue.current_index = index
966 # update current item and elapsed time and signal update
967 # this way the UI knows immediately that a new item is loading
968 queue.current_item = self.get_item(queue_id, index)
969 queue.elapsed_time = seek_position
970 queue.elapsed_time_last_updated = time.time()
971 self.signal_update(queue_id)
972 queue.index_in_buffer = index
973 queue.flow_mode_stream_log = []
974 target_player = self.mass.players.get_player(queue_id)
975 if target_player is None:
976 raise PlayerUnavailableError(f"Player {queue_id} is not available")
977 queue.next_item_id_enqueued = None
978 # always update session id when we start a new playback session
979 queue.session_id = shortuuid.random(length=8)
980 # handle resume point of audiobook(chapter) or podcast(episode)
981 if (
982 not seek_position
983 and (queue_item := self.get_item(queue_id, index))
984 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
985 ):
986 seek_position = max(0, int((resume_position_ms - 500) / 1000))
987
988 # send play_media request to player
989 # NOTE that we debounce this a bit to account for someone hitting the next button
990 # like a madman. This will prevent the player from being overloaded with requests.
991 async def _play_index(index: int, debounce: bool) -> None:
992 for attempt in range(5):
993 try:
994 queue_item = self.get_item(queue_id, index)
995 if not queue_item:
996 continue # guard
997 await self._load_item(
998 queue_item,
999 self._get_next_index(queue_id, index),
1000 is_start=True,
1001 seek_position=seek_position if attempt == 0 else 0,
1002 fade_in=fade_in if attempt == 0 else False,
1003 )
1004 # if we reach this point, loading the item succeeded, break the loop
1005 queue.current_index = index
1006 queue.current_item = queue_item
1007 break
1008 except (MediaNotFoundError, AudioError):
1009 # the requested index can not be played.
1010 if queue_item:
1011 self.logger.warning(
1012 "Skipping unplayable item %s (%s)",
1013 queue_item.name,
1014 queue_item.uri,
1015 )
1016 queue_item.available = False
1017 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
1018 if next_index is None:
1019 raise MediaNotFoundError("No next item available")
1020 index = next_index
1021 else:
1022 # all attempts to find a playable item failed
1023 raise MediaNotFoundError("No playable item found to start playback")
1024
1025 # work out if we need to use flow mode
1026 flow_mode = target_player.flow_mode and queue_item.media_type not in (
1027 # don't use flow mode for duration-less streams
1028 MediaType.RADIO,
1029 MediaType.PLUGIN_SOURCE,
1030 )
1031 await asyncio.sleep(0.5 if debounce else 0.1)
1032 queue.flow_mode = flow_mode
1033 await self.mass.players.play_media(
1034 player_id=queue_id,
1035 media=await self.player_media_from_queue_item(queue_item, flow_mode),
1036 )
1037 queue.current_index = index
1038 queue.current_item = queue_item
1039 await asyncio.sleep(2)
1040 self._transitioning_players.discard(queue_id)
1041
1042 # we set a flag to notify the update logic that we're transitioning to a new track
1043 self._transitioning_players.add(queue_id)
1044
1045 # we debounce the play_index command to handle the case where someone
1046 # is spamming next/previous on the player
1047 task_id = f"play_index_{queue_id}"
1048 if existing_task := self.mass.get_task(task_id):
1049 existing_task.cancel()
1050 with suppress(asyncio.CancelledError):
1051 await existing_task
1052 task = self.mass.create_task(
1053 _play_index,
1054 index,
1055 debounce,
1056 task_id=task_id,
1057 )
1058 await task
1059 self.signal_update(queue_id)
1060
1061 @api_command("player_queues/transfer")
1062 async def transfer_queue(
1063 self,
1064 source_queue_id: str,
1065 target_queue_id: str,
1066 auto_play: bool | None = None,
1067 ) -> None:
1068 """Transfer queue to another queue."""
1069 if not (source_queue := self.get(source_queue_id)):
1070 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1071 if not (target_queue := self.get(target_queue_id)):
1072 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1073 if auto_play is None:
1074 auto_play = source_queue.state == PlaybackState.PLAYING
1075
1076 target_player = self.mass.players.get_player(target_queue_id)
1077 if target_player is None:
1078 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1079 if target_player.state.active_group or target_player.state.synced_to:
1080 # edge case: the user wants to move playback from the group as a whole, to a single
1081 # player in the group or it is grouped and the command targeted at the single player.
1082 # We need to dissolve the group first.
1083 group_id = target_player.state.active_group or target_player.state.synced_to
1084 assert group_id is not None # checked in if condition above
1085 await self.mass.players.cmd_ungroup(group_id)
1086 await asyncio.sleep(3)
1087
1088 source_items = self._queue_items[source_queue_id]
1089 target_queue.repeat_mode = source_queue.repeat_mode
1090 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1091 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1092 target_queue.radio_source = source_queue.radio_source
1093 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1094 target_queue.resume_pos = int(source_queue.elapsed_time)
1095 target_queue.current_index = source_queue.current_index
1096 if source_queue.current_item:
1097 target_queue.current_item = source_queue.current_item
1098 target_queue.current_item.queue_id = target_queue_id
1099 self.clear(source_queue_id)
1100
1101 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1102 for item in source_items:
1103 item.queue_id = target_queue_id
1104 self.update_items(target_queue_id, source_items)
1105 if auto_play:
1106 await self.resume(target_queue_id)
1107
1108 # Interaction with player
1109
1110 async def on_player_register(self, player: Player) -> None:
1111 """Register PlayerQueue for given player/queue id."""
1112 queue_id = player.player_id
1113 queue: PlayerQueue | None = None
1114 queue_items: list[QueueItem] = []
1115 # try to restore previous state
1116 if prev_state := await self.mass.cache.get(
1117 key=queue_id,
1118 provider=self.domain,
1119 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1120 ):
1121 try:
1122 queue = PlayerQueue.from_dict(prev_state)
1123 prev_items = await self.mass.cache.get(
1124 key=queue_id,
1125 provider=self.domain,
1126 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1127 default=[],
1128 )
1129 queue_items = []
1130 for idx, item_data in enumerate(prev_items):
1131 qi = QueueItem.from_cache(item_data)
1132 if not qi.media_item:
1133 # Skip items with missing media_item - this can happen if
1134 # MA was killed during shutdown while cache was being written
1135 self.logger.debug(
1136 "Skipping queue item %s (index %d) restored from cache "
1137 "without media_item",
1138 qi.name,
1139 idx,
1140 )
1141 continue
1142 queue_items.append(qi)
1143 if queue.enqueued_media_items:
1144 # we need to restore the MediaItem objects for the enqueued media items
1145 # Items from cache may be dicts that need deserialization
1146 restored_enqueued_items: list[MediaItemType] = []
1147 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1148 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1149 )
1150 for item in cached_items:
1151 if isinstance(item, dict):
1152 restored_item = media_from_dict(item)
1153 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1154 else:
1155 restored_enqueued_items.append(item)
1156 queue.enqueued_media_items = restored_enqueued_items
1157 except Exception as err:
1158 self.logger.warning(
1159 "Failed to restore the queue(items) for %s - %s",
1160 player.state.name,
1161 str(err),
1162 )
1163 # Reset to clean state on failure
1164 queue = None
1165 queue_items = []
1166 if queue is None:
1167 queue = PlayerQueue(
1168 queue_id=queue_id,
1169 active=False,
1170 display_name=player.state.name,
1171 available=player.state.available,
1172 dont_stop_the_music_enabled=False,
1173 items=0,
1174 )
1175
1176 self._queues[queue_id] = queue
1177 self._queue_items[queue_id] = queue_items
1178 # always call update to calculate state etc
1179 self.on_player_update(player, {})
1180 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1181
1182 def on_player_update(
1183 self,
1184 player: Player,
1185 changed_values: dict[str, tuple[Any, Any]],
1186 ) -> None:
1187 """
1188 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1189
1190 NOTE: This is called every second if the player is playing.
1191 """
1192 queue_id = player.player_id
1193 if (queue := self._queues.get(queue_id)) is None:
1194 # race condition
1195 return
1196 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1197 # do nothing while the announcement is in progress
1198 return
1199 # determine if this queue is currently active for this player
1200 queue.active = player.state.active_source in (queue.queue_id, None)
1201 if not queue.active and queue_id not in self._prev_states:
1202 queue.state = PlaybackState.IDLE
1203 # return early if the queue is not active and we have no previous state
1204 return
1205 if queue.queue_id in self._transitioning_players:
1206 # we're currently transitioning to a new track,
1207 # ignore updates from the player during this time
1208 return
1209
1210 # queue is active and preflight checks passed, update the queue details
1211 self._update_queue_from_player(player)
1212
1213 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1214 """Call when a player is removed from the registry."""
1215 if permanent:
1216 # if the player is permanently removed, we also remove the cached queue data
1217 self.mass.create_task(
1218 self.mass.cache.delete(
1219 key=player_id,
1220 provider=self.domain,
1221 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1222 )
1223 )
1224 self.mass.create_task(
1225 self.mass.cache.delete(
1226 key=player_id,
1227 provider=self.domain,
1228 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1229 )
1230 )
1231 self._queues.pop(player_id, None)
1232 self._queue_items.pop(player_id, None)
1233
1234 async def load_next_queue_item(
1235 self,
1236 queue_id: str,
1237 current_item_id: str,
1238 ) -> QueueItem:
1239 """
1240 Call when a player wants the next queue item to play.
1241
1242 Raises QueueEmpty if there are no more tracks left.
1243 """
1244 queue = self.get(queue_id)
1245 if not queue:
1246 msg = f"PlayerQueue {queue_id} is not available"
1247 raise PlayerUnavailableError(msg)
1248 cur_index = self.index_by_id(queue_id, current_item_id)
1249 if cur_index is None:
1250 # this is just a guard for bad data
1251 raise QueueEmpty("Invalid item id for queue given.")
1252 next_item: QueueItem | None = None
1253 idx = 0
1254 while True:
1255 next_index = self._get_next_index(queue_id, cur_index + idx)
1256 if next_index is None:
1257 raise QueueEmpty("No more tracks left in the queue.")
1258 queue_item = self.get_item(queue_id, next_index)
1259 if queue_item is None:
1260 raise QueueEmpty("No more tracks left in the queue.")
1261 if idx >= 10:
1262 # we only allow 10 retries to prevent infinite loops
1263 raise QueueEmpty("No more (playable) tracks left in the queue.")
1264 try:
1265 await self._load_item(queue_item, next_index)
1266 # we're all set, this is our next item
1267 next_item = queue_item
1268 break
1269 except (MediaNotFoundError, AudioError):
1270 # No stream details found, skip this QueueItem
1271 self.logger.warning(
1272 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1273 )
1274 queue_item.available = False
1275 idx += 1
1276 if idx != 0:
1277 # we skipped some items, signal a queue items update
1278 self.update_items(queue_id, self._queue_items[queue_id])
1279 if next_item is None:
1280 raise QueueEmpty("No more (playable) tracks left in the queue.")
1281
1282 return next_item
1283
1284 async def _load_item(
1285 self,
1286 queue_item: QueueItem,
1287 next_index: int | None,
1288 is_start: bool = False,
1289 seek_position: int = 0,
1290 fade_in: bool = False,
1291 ) -> None:
1292 """Try to load the stream details for the given queue item."""
1293 queue_id = queue_item.queue_id
1294 queue = self._queues[queue_id]
1295
1296 # we use a contextvar to bypass the throttler for this asyncio task/context
1297 # this makes sure that playback has priority over other requests that may be
1298 # happening in the background
1299 BYPASS_THROTTLER.set(True)
1300
1301 self.logger.debug(
1302 "(pre)loading (next) item for queue %s...",
1303 queue.display_name,
1304 )
1305
1306 if not queue_item.available:
1307 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1308
1309 # work out if we are playing an album and if we should prefer album
1310 # loudness
1311 next_track_from_same_album = (
1312 next_index is not None
1313 and (next_item := self.get_item(queue_id, next_index))
1314 and (
1315 queue_item.media_item
1316 and hasattr(queue_item.media_item, "album")
1317 and queue_item.media_item.album
1318 and next_item.media_item
1319 and hasattr(next_item.media_item, "album")
1320 and next_item.media_item.album
1321 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1322 )
1323 )
1324 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1325 if current_index is None:
1326 previous_track_from_same_album = False
1327 else:
1328 previous_index = max(current_index - 1, 0)
1329 previous_track_from_same_album = (
1330 previous_index > 0
1331 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1332 and previous_item.media_item is not None
1333 and hasattr(previous_item.media_item, "album")
1334 and previous_item.media_item.album is not None
1335 and queue_item.media_item is not None
1336 and hasattr(queue_item.media_item, "album")
1337 and queue_item.media_item.album is not None
1338 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1339 )
1340 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1341 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1342 album = queue_item.media_item.album
1343 # prefer the full library media item so we have all metadata and provider(quality) info
1344 # always request the full library item as there might be other qualities available
1345 if library_item := await self.mass.music.get_library_item_by_prov_id(
1346 queue_item.media_item.media_type,
1347 queue_item.media_item.item_id,
1348 queue_item.media_item.provider,
1349 ):
1350 queue_item.media_item = cast("Track", library_item)
1351 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1352 "ytmusic"
1353 ):
1354 # Youtube Music has poor thumbs by default, so we always fetch the full item
1355 # this also catches the case where they have an unavailable item in a listing
1356 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1357 queue_item.media_item = cast("Track", fetched_item)
1358
1359 # ensure we got the full (original) album set
1360 if album and (
1361 library_album := await self.mass.music.get_library_item_by_prov_id(
1362 album.media_type,
1363 album.item_id,
1364 album.provider,
1365 )
1366 ):
1367 queue_item.media_item.album = cast("Album", library_album)
1368 elif album:
1369 # Restore original album if we have no better alternative from the library
1370 queue_item.media_item.album = album
1371 # prefer album image over track image
1372 if queue_item.media_item.album and queue_item.media_item.album.image:
1373 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1374 queue_item.media_item.metadata.images = UniqueList(
1375 [
1376 queue_item.media_item.album.image,
1377 *org_images,
1378 ]
1379 )
1380 # Fetch the streamdetails, which could raise in case of an unplayable item.
1381 # For example, YT Music returns Radio Items that are not playable.
1382 queue_item.streamdetails = await get_stream_details(
1383 mass=self.mass,
1384 queue_item=queue_item,
1385 seek_position=seek_position,
1386 fade_in=fade_in,
1387 prefer_album_loudness=bool(playing_album_tracks),
1388 )
1389
1390 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1391 """Call when a player has (started) loading a track in the buffer."""
1392 queue = self.get(queue_id)
1393 if not queue:
1394 msg = f"PlayerQueue {queue_id} is not available"
1395 raise PlayerUnavailableError(msg)
1396 # store the index of the item that is currently (being) loaded in the buffer
1397 # which helps us a bit to determine how far the player has buffered ahead
1398 queue.index_in_buffer = self.index_by_id(queue_id, item_id)
1399 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1400 self.signal_update(queue_id)
1401 # preload next streamdetails
1402 self._preload_next_item(queue_id, item_id)
1403
1404 # Main queue manipulation methods
1405
1406 async def load(
1407 self,
1408 queue_id: str,
1409 queue_items: list[QueueItem],
1410 insert_at_index: int = 0,
1411 keep_remaining: bool = True,
1412 keep_played: bool = True,
1413 shuffle: bool = False,
1414 ) -> None:
1415 """Load new items at index.
1416
1417 - queue_id: id of the queue to process this request.
1418 - queue_items: a list of QueueItems
1419 - insert_at_index: insert the item(s) at this index
1420 - keep_remaining: keep the remaining items after the insert
1421 - shuffle: (re)shuffle the items after insert index
1422 """
1423 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1424 next_items = queue_items
1425
1426 # if keep_remaining, append the old 'next' items
1427 if keep_remaining:
1428 next_items += self._queue_items[queue_id][insert_at_index:]
1429
1430 # we set the original insert order as attribute so we can un-shuffle
1431 for index, item in enumerate(next_items):
1432 item.sort_index += insert_at_index + index
1433 # (re)shuffle the final batch if needed
1434 if shuffle:
1435 next_items = await _smart_shuffle(next_items)
1436 self.update_items(queue_id, prev_items + next_items)
1437
1438 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1439 """Update the existing queue items, mostly caused by reordering."""
1440 self._queue_items[queue_id] = queue_items
1441 queue = self._queues[queue_id]
1442 queue.items = len(self._queue_items[queue_id])
1443 # to track if the queue items changed we set a timestamp
1444 # this is a simple way to detect changes in the list of items
1445 # without having to compare the entire list
1446 queue.items_last_updated = time.time()
1447 self.signal_update(queue_id, True)
1448 if (
1449 queue.state == PlaybackState.PLAYING
1450 and queue.index_in_buffer is not None
1451 and queue.index_in_buffer == queue.current_index
1452 ):
1453 # if the queue is playing,
1454 # ensure to (re)queue the next track because it might have changed
1455 # note that we only do this if the player has loaded the current track
1456 # if not, we wait until it has loaded to prevent conflicts
1457 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1458 self._enqueue_next_item(queue_id, next_item)
1459
1460 # Helper methods
1461
1462 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1463 """Get queue item by index or item_id."""
1464 if item_id_or_index is None:
1465 return None
1466 if (queue_items := self._queue_items.get(queue_id)) is None:
1467 return None
1468 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1469 return queue_items[item_id_or_index]
1470 if isinstance(item_id_or_index, str):
1471 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1472 return None
1473
1474 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1475 """Signal state changed of given queue."""
1476 queue = self._queues[queue_id]
1477 if items_changed:
1478 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1479 # save items in cache - only cache items with valid media_item
1480 cache_data = [
1481 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1482 ]
1483 self.mass.create_task(
1484 self.mass.cache.set(
1485 key=queue_id,
1486 data=cache_data,
1487 provider=self.domain,
1488 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1489 )
1490 )
1491 # always send the base event
1492 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1493 # also signal update to the player itself so it can update its current_media
1494 self.mass.players.trigger_player_update(queue_id)
1495 # save state
1496 self.mass.create_task(
1497 self.mass.cache.set(
1498 key=queue_id,
1499 data=queue.to_cache(),
1500 provider=self.domain,
1501 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1502 )
1503 )
1504
1505 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1506 """Get index by queue_item_id."""
1507 queue_items = self._queue_items[queue_id]
1508 for index, item in enumerate(queue_items):
1509 if item.queue_item_id == queue_item_id:
1510 return index
1511 return None
1512
1513 async def player_media_from_queue_item(
1514 self, queue_item: QueueItem, flow_mode: bool
1515 ) -> PlayerMedia:
1516 """Parse PlayerMedia from QueueItem."""
1517 queue = self._queues[queue_item.queue_id]
1518 if flow_mode:
1519 duration = None
1520 elif queue_item.streamdetails:
1521 # prefer netto duration
1522 # when seeking, the player only receives the remaining duration
1523 duration = queue_item.streamdetails.duration or queue_item.duration
1524 if duration and queue_item.streamdetails.seek_position:
1525 duration = duration - queue_item.streamdetails.seek_position
1526 else:
1527 duration = queue_item.duration
1528 if queue.session_id is None:
1529 # handle error or return early
1530 raise InvalidDataError("Queue session_id is None")
1531 media = PlayerMedia(
1532 uri=queue_item.uri,
1533 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1534 title="Music Assistant" if flow_mode else queue_item.name,
1535 image_url=MASS_LOGO_ONLINE,
1536 duration=duration,
1537 source_id=queue_item.queue_id,
1538 queue_item_id=queue_item.queue_item_id,
1539 custom_data={
1540 "session_id": queue.session_id,
1541 "original_uri": queue_item.uri,
1542 "flow_mode": flow_mode,
1543 },
1544 )
1545 if not flow_mode and queue_item.media_item:
1546 media.title = queue_item.media_item.name
1547 media.artist = getattr(queue_item.media_item, "artist_str", "")
1548 media.album = (
1549 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1550 )
1551 if queue_item.image:
1552 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1553 # we prefer the imageproxy on the streamserver here because this request is sent
1554 # to the player itself which may not be able to reach the regular webserver
1555 media.image_url = self.mass.metadata.get_image_url(
1556 queue_item.image, size=500, prefer_stream_server=True
1557 )
1558 return media
1559
1560 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1561 """Return tracks for given artist, based on user preference."""
1562 artist_items_conf = self.mass.config.get_raw_core_config_value(
1563 self.domain,
1564 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1565 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1566 )
1567 self.logger.info(
1568 "Fetching tracks to play for artist %s",
1569 artist.name,
1570 )
1571 if artist_items_conf in ("library_tracks", "all_tracks"):
1572 all_items = await self.mass.music.artists.tracks(
1573 artist.item_id,
1574 artist.provider,
1575 in_library_only=artist_items_conf == "library_tracks",
1576 )
1577 random.shuffle(all_items)
1578 return all_items
1579 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1580 all_tracks: list[Track] = []
1581 for library_album in await self.mass.music.artists.albums(
1582 artist.item_id,
1583 artist.provider,
1584 in_library_only=artist_items_conf == "library_album_tracks",
1585 ):
1586 for album_track in await self.mass.music.albums.tracks(
1587 library_album.item_id, library_album.provider
1588 ):
1589 if album_track not in all_tracks:
1590 all_tracks.append(album_track)
1591 random.shuffle(all_tracks)
1592 return all_tracks
1593 return []
1594
1595 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1596 """Return tracks for given album, based on user preference."""
1597 album_items_conf = self.mass.config.get_raw_core_config_value(
1598 self.domain,
1599 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1600 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1601 )
1602 result: list[Track] = []
1603 start_item_found = False
1604 self.logger.info(
1605 "Fetching tracks to play for album %s",
1606 album.name,
1607 )
1608 for album_track in await self.mass.music.albums.tracks(
1609 item_id=album.item_id,
1610 provider_instance_id_or_domain=album.provider,
1611 in_library_only=album_items_conf == "library_tracks",
1612 ):
1613 if not album_track.available:
1614 continue
1615 if start_item in (album_track.item_id, album_track.uri):
1616 start_item_found = True
1617 if start_item is not None and not start_item_found:
1618 continue
1619 result.append(album_track)
1620 return result
1621
1622 async def get_genre_tracks(self, genre: Genre, start_item: str | None) -> list[Track]:
1623 """Return tracks for given genre, based on alias mappings.
1624
1625 Limits results to avoid loading thousands of tracks for broad genres.
1626 Directly mapped tracks are fetched with random ordering, then supplemented
1627 with tracks from a limited set of mapped albums and artists.
1628 """
1629 result: list[Track] = []
1630 start_item_found = False
1631 self.logger.info(
1632 "Fetching tracks to play for genre %s",
1633 genre.name,
1634 )
1635 tracks, albums, artists = await self.mass.music.genres.mapped_media(
1636 genre,
1637 track_limit=25,
1638 album_limit=5,
1639 artist_limit=5,
1640 order_by="random",
1641 )
1642
1643 for genre_track in tracks:
1644 if not genre_track.available:
1645 continue
1646 if start_item in (genre_track.item_id, genre_track.uri):
1647 start_item_found = True
1648 if start_item is not None and not start_item_found:
1649 continue
1650 result.append(genre_track)
1651
1652 for album in albums:
1653 album_tracks = await self.get_album_tracks(album, None)
1654 result.extend(album_tracks[:5])
1655
1656 for artist in artists:
1657 artist_tracks = await self.get_artist_tracks(artist)
1658 result.extend(artist_tracks[:5])
1659 return result
1660
1661 async def get_playlist_tracks(
1662 self, playlist: Playlist, start_item: str | None
1663 ) -> list[PlaylistPlayableItem]:
1664 """Return tracks for given playlist, based on user preference."""
1665 result: list[PlaylistPlayableItem] = []
1666 start_item_found = False
1667 self.logger.info(
1668 "Fetching tracks to play for playlist %s",
1669 playlist.name,
1670 )
1671 # TODO: Handle other sort options etc.
1672 async for playlist_track in self.mass.music.playlists.tracks(
1673 playlist.item_id, playlist.provider
1674 ):
1675 if not playlist_track.available:
1676 continue
1677 if start_item in (playlist_track.item_id, playlist_track.uri):
1678 start_item_found = True
1679 if start_item is not None and not start_item_found:
1680 continue
1681 result.append(playlist_track)
1682 return result
1683
1684 async def get_audiobook_resume_point(
1685 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1686 ) -> int:
1687 """Return resume point (in milliseconds) for given audio book."""
1688 self.logger.debug(
1689 "Fetching resume point to play for audio book %s",
1690 audio_book.name,
1691 )
1692 if chapter is not None:
1693 # user explicitly selected a chapter to play
1694 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1695 if chapters := audio_book.metadata.chapters:
1696 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1697 return int(_chapter.start * 1000)
1698 raise InvalidDataError(
1699 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1700 )
1701 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1702 audio_book, userid=userid
1703 )
1704 return 0 if full_played else resume_position_ms
1705
1706 async def get_next_podcast_episodes(
1707 self,
1708 podcast: Podcast | None,
1709 episode: PodcastEpisode | str | None,
1710 userid: str | None = None,
1711 ) -> UniqueList[PodcastEpisode]:
1712 """Return (next) episode(s) and resume point for given podcast."""
1713 if podcast is None and isinstance(episode, str | NoneType):
1714 raise InvalidDataError("Either podcast or episode must be provided")
1715 if podcast is None:
1716 # single podcast episode requested
1717 assert isinstance(episode, PodcastEpisode) # checked above
1718 self.logger.debug(
1719 "Fetching resume point to play for Podcast episode %s",
1720 episode.name,
1721 )
1722 (
1723 fully_played,
1724 resume_position_ms,
1725 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1726 episode.fully_played = fully_played
1727 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1728 return UniqueList([episode])
1729 # podcast with optional start episode requested
1730 self.logger.debug(
1731 "Fetching episode(s) and resume point to play for Podcast %s",
1732 podcast.name,
1733 )
1734 all_episodes = [
1735 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1736 ]
1737 all_episodes.sort(key=lambda x: x.position)
1738 # if a episode was provided, a user explicitly selected a episode to play
1739 # so we need to find the index of the episode in the list
1740 resolved_episode: PodcastEpisode | None = None
1741 if isinstance(episode, PodcastEpisode):
1742 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1743 if resolved_episode:
1744 # ensure we have accurate resume info
1745 (
1746 fully_played,
1747 resume_position_ms,
1748 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1749 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1750 elif isinstance(episode, str):
1751 resolved_episode = next(
1752 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1753 )
1754 if resolved_episode:
1755 # ensure we have accurate resume info
1756 (
1757 fully_played,
1758 resume_position_ms,
1759 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1760 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1761 else:
1762 # get first episode that is not fully played
1763 for ep in all_episodes:
1764 if ep.fully_played:
1765 continue
1766 # ensure we have accurate resume info
1767 (
1768 fully_played,
1769 resume_position_ms,
1770 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1771 if fully_played:
1772 continue
1773 ep.resume_position_ms = resume_position_ms
1774 resolved_episode = ep
1775 break
1776 else:
1777 # no episodes found that are not fully played, so we start at the beginning
1778 resolved_episode = next((x for x in all_episodes), None)
1779 if resolved_episode is None:
1780 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1781 # get the index of the episode
1782 episode_index = all_episodes.index(resolved_episode)
1783 # return the (remaining) episode(s) to play
1784 return UniqueList(all_episodes[episode_index:])
1785
1786 def _get_next_index(
1787 self,
1788 queue_id: str,
1789 cur_index: int | None,
1790 is_skip: bool = False,
1791 allow_repeat: bool = True,
1792 ) -> int | None:
1793 """
1794 Return the next index for the queue, accounting for repeat settings.
1795
1796 Will return None if there are no (more) items in the queue.
1797 """
1798 queue = self._queues[queue_id]
1799 queue_items = self._queue_items[queue_id]
1800 if not queue_items or cur_index is None:
1801 # queue is empty
1802 return None
1803 # handle repeat single track
1804 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1805 return cur_index if allow_repeat else None
1806 # handle cur_index is last index of the queue
1807 if cur_index >= (len(queue_items) - 1):
1808 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1809 # if repeat all is enabled, we simply start again from the beginning
1810 return 0
1811 return None
1812 # all other: just the next index
1813 return cur_index + 1
1814
1815 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1816 """Return next QueueItem for given queue."""
1817 index: int
1818 if isinstance(cur_index, str):
1819 resolved_index = self.index_by_id(queue_id, cur_index)
1820 if resolved_index is None:
1821 return None # guard
1822 index = resolved_index
1823 else:
1824 index = cur_index
1825 # At this point index is guaranteed to be int
1826 for skip in range(5):
1827 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1828 break
1829 next_item = self.get_item(queue_id, next_index)
1830 if next_item is None:
1831 continue
1832 if not next_item.available:
1833 # ensure that we skip unavailable items (set by load_next track logic)
1834 continue
1835 return next_item
1836 return None
1837
1838 async def _fill_radio_tracks(self, queue_id: str) -> None:
1839 """Fill a Queue with (additional) Radio tracks."""
1840 self.logger.debug(
1841 "Filling radio tracks for queue %s",
1842 queue_id,
1843 )
1844 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1845 # fill queue - filter out unavailable items
1846 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1847 await self.load(
1848 queue_id,
1849 queue_items,
1850 insert_at_index=len(self._queue_items[queue_id]) + 1,
1851 )
1852
1853 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1854 """Enqueue the next item on the player."""
1855 if not next_item:
1856 # no next item, nothing to do...
1857 return
1858
1859 queue = self._queues[queue_id]
1860 if queue.flow_mode:
1861 # ignore this for flow mode
1862 return
1863
1864 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1865 await self.mass.players.enqueue_next_media(
1866 player_id=queue_id,
1867 media=await self.player_media_from_queue_item(next_item, False),
1868 )
1869 if queue.next_item_id_enqueued != next_item.queue_item_id:
1870 queue.next_item_id_enqueued = next_item.queue_item_id
1871 self.logger.debug(
1872 "Enqueued next track %s on queue %s",
1873 next_item.name,
1874 self._queues[queue_id].display_name,
1875 )
1876
1877 task_id = f"enqueue_next_item_{queue_id}"
1878 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1879
1880 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1881 """
1882 Preload the streamdetails for the next item in the queue/buffer.
1883
1884 This basically ensures the item is playable and fetches the stream details.
1885 If an error occurs, the item will be skipped and the next item will be loaded.
1886 """
1887 queue = self._queues[queue_id]
1888
1889 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1890 try:
1891 # wait for the item that was loaded in the buffer is the actually playing item
1892 # this prevents a race condition when we preload the next item too soon
1893 # while the player is actually preloading the previously enqueued item.
1894 retries = 120
1895 while retries > 0:
1896 if not queue.current_item:
1897 return # guard
1898 if queue.current_item.queue_item_id == item_id_in_buffer:
1899 break
1900 retries -= 1
1901 await asyncio.sleep(1)
1902 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1903 self.logger.debug(
1904 "Preloaded next item %s for queue %s",
1905 next_item.name,
1906 queue.display_name,
1907 )
1908 # enqueue the next item on the player
1909 self._enqueue_next_item(queue_id, next_item)
1910
1911 except QueueEmpty:
1912 return
1913
1914 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1915 # this should not happen, but guard anyways
1916 return
1917 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1918 # radio items or no duration, nothing to do
1919 return
1920
1921 task_id = f"preload_next_item_{queue_id}"
1922 self.mass.create_task(
1923 _preload_streamdetails,
1924 item_id_in_buffer,
1925 task_id=task_id,
1926 abort_existing=True,
1927 )
1928
1929 async def _resolve_media_items(
1930 self,
1931 media_item: MediaItemType | ItemMapping | BrowseFolder,
1932 start_item: str | None = None,
1933 userid: str | None = None,
1934 queue_id: str | None = None,
1935 ) -> list[MediaItemType]:
1936 """Resolve/unwrap media items to enqueue."""
1937 # resolve Itemmapping to full media item
1938 if isinstance(media_item, ItemMapping):
1939 if media_item.uri is None:
1940 raise InvalidDataError("ItemMapping has no URI")
1941 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1942 if media_item.media_type == MediaType.PLAYLIST:
1943 media_item = cast("Playlist", media_item)
1944 self.mass.create_task(
1945 self.mass.music.mark_item_played(
1946 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1947 )
1948 )
1949 return list(await self.get_playlist_tracks(media_item, start_item))
1950 if media_item.media_type == MediaType.ARTIST:
1951 media_item = cast("Artist", media_item)
1952 self.mass.create_task(
1953 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1954 )
1955 return list(await self.get_artist_tracks(media_item))
1956 if media_item.media_type == MediaType.ALBUM:
1957 media_item = cast("Album", media_item)
1958 self.mass.create_task(
1959 self.mass.music.mark_item_played(
1960 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1961 )
1962 )
1963 return list(await self.get_album_tracks(media_item, start_item))
1964 if media_item.media_type == MediaType.GENRE:
1965 media_item = cast("Genre", media_item)
1966 self.mass.create_task(
1967 self.mass.music.mark_item_played(
1968 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1969 )
1970 )
1971 return list(await self.get_genre_tracks(media_item, start_item))
1972 if media_item.media_type == MediaType.AUDIOBOOK:
1973 media_item = cast("Audiobook", media_item)
1974 # ensure we grab the correct/latest resume point info
1975 media_item.resume_position_ms = await self.get_audiobook_resume_point(
1976 media_item, start_item, userid=userid
1977 )
1978 return [media_item]
1979 if media_item.media_type == MediaType.PODCAST:
1980 media_item = cast("Podcast", media_item)
1981 self.mass.create_task(
1982 self.mass.music.mark_item_played(
1983 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1984 )
1985 )
1986 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
1987 if media_item.media_type == MediaType.PODCAST_EPISODE:
1988 media_item = cast("PodcastEpisode", media_item)
1989 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
1990 if media_item.media_type == MediaType.FOLDER:
1991 media_item = cast("BrowseFolder", media_item)
1992 return list(await self._get_folder_tracks(media_item))
1993 # all other: single track or radio item
1994 return [cast("MediaItemType", media_item)]
1995
1996 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
1997 """Try to resume playback from playlog when queue is empty.
1998
1999 Attempts to find user-initiated recently played items in the following order:
2000 1. By userid AND queue_id
2001 2. By queue_id only
2002 3. By userid only (if available)
2003 4. Any recently played item
2004
2005 :param queue: The queue to resume playback on.
2006 :return: True if playback was started, False otherwise.
2007 """
2008 # Try different filter combinations in order of specificity
2009 filter_attempts: list[tuple[str | None, str | None, str]] = []
2010 if queue.userid:
2011 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
2012 filter_attempts.append((None, queue.queue_id, "queue_id match"))
2013 if queue.userid:
2014 filter_attempts.append((queue.userid, None, "userid match"))
2015 filter_attempts.append((None, None, "any recent item"))
2016
2017 for userid, queue_id, match_type in filter_attempts:
2018 items = await self.mass.music.recently_played(
2019 limit=5,
2020 fully_played_only=False,
2021 user_initiated_only=True,
2022 userid=userid,
2023 queue_id=queue_id,
2024 )
2025 for item in items:
2026 if not item.uri:
2027 continue
2028 try:
2029 await self.play_media(queue.queue_id, item)
2030 self.logger.info(
2031 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
2032 )
2033 return True
2034 except MusicAssistantError as err:
2035 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
2036 continue
2037
2038 return False
2039
2040 async def _get_radio_tracks(
2041 self, queue_id: str, is_initial_radio_mode: bool = False
2042 ) -> list[Track]:
2043 """Call the registered music providers for dynamic tracks."""
2044 queue = self._queues[queue_id]
2045 queue_track_items: list[Track] = [
2046 q.media_item
2047 for q in self._queue_items[queue_id]
2048 if q.media_item and isinstance(q.media_item, Track)
2049 ]
2050 if not queue.radio_source:
2051 # this may happen during race conditions as this method is called delayed
2052 return []
2053 self.logger.info(
2054 "Fetching radio tracks for queue %s based on: %s",
2055 queue.display_name,
2056 ", ".join([x.name for x in queue.radio_source]),
2057 )
2058
2059 # Get user's preferred provider instances for steering provider selection
2060 preferred_provider_instances: list[str] | None = None
2061 if (
2062 queue.userid
2063 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
2064 and playback_user.provider_filter
2065 ):
2066 preferred_provider_instances = playback_user.provider_filter
2067
2068 available_base_tracks: list[Track] = []
2069 base_track_sample_size = 5
2070 # Some providers have very deterministic similar track algorithms when providing
2071 # a single track item. When we have a radio mode based on 1 track and we have to
2072 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2073 if (
2074 len(queue.radio_source) == 1
2075 and queue.radio_source[0].media_type == MediaType.TRACK
2076 and not is_initial_radio_mode
2077 ):
2078 available_base_tracks = queue_track_items
2079 else:
2080 # Grab all the available base tracks based on the selected source items.
2081 # shuffle the source items, just in case
2082 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2083 ctrl = self.mass.music.get_controller(radio_item.media_type)
2084 try:
2085 available_base_tracks += [
2086 track
2087 for track in await ctrl.radio_mode_base_tracks(
2088 radio_item, # type: ignore[arg-type]
2089 preferred_provider_instances,
2090 )
2091 # Avoid duplicate base tracks
2092 if track not in available_base_tracks
2093 ]
2094 except UnsupportedFeaturedException as err:
2095 self.logger.debug(
2096 "Skip loading radio items for %s: %s ",
2097 radio_item.uri,
2098 str(err),
2099 )
2100 if not available_base_tracks:
2101 raise UnsupportedFeaturedException("Radio mode not available for source items")
2102
2103 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2104 base_tracks = random.sample(
2105 available_base_tracks,
2106 min(base_track_sample_size, len(available_base_tracks)),
2107 )
2108 # Use a set to avoid duplicate dynamic tracks
2109 dynamic_tracks: set[Track] = set()
2110 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2111 for allow_lookup in (False, True):
2112 if dynamic_tracks:
2113 break
2114 for base_track in base_tracks:
2115 try:
2116 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2117 base_track.item_id,
2118 base_track.provider,
2119 allow_lookup=allow_lookup,
2120 preferred_provider_instances=preferred_provider_instances,
2121 )
2122 except MediaNotFoundError:
2123 # Some providers don't have similar tracks for all items. For example,
2124 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2125 # in that case, just skip the track.
2126 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2127 continue
2128 for track in _similar_tracks:
2129 if (
2130 track not in base_tracks
2131 # Exclude tracks we have already played / queued
2132 and track not in queue_track_items
2133 # Ignore tracks that are too long for radio mode, e.g. mixes
2134 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2135 ):
2136 dynamic_tracks.add(track)
2137 if len(dynamic_tracks) >= 50:
2138 break
2139 queue_tracks: list[Track] = []
2140 dynamic_tracks_list = list(dynamic_tracks)
2141 # Only include the sampled base tracks when the radio mode is first initialized
2142 if is_initial_radio_mode:
2143 queue_tracks += [base_tracks[0]]
2144 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2145 if len(base_tracks) > 1:
2146 for base_track in base_tracks[1:]:
2147 queue_tracks += [base_track]
2148 if len(dynamic_tracks_list) > 2:
2149 queue_tracks += random.sample(dynamic_tracks_list, 2)
2150 else:
2151 queue_tracks += dynamic_tracks_list
2152 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2153 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2154 if remaining_dynamic_tracks:
2155 queue_tracks += random.sample(
2156 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2157 )
2158 return queue_tracks
2159
2160 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2161 """Fetch (playable) tracks for given browse folder."""
2162 self.logger.info(
2163 "Fetching tracks to play for folder %s",
2164 folder.name,
2165 )
2166 tracks: list[Track] = []
2167 for item in await self.mass.music.browse(folder.path):
2168 if not item.is_playable:
2169 continue
2170 # recursively fetch tracks from all media types
2171 resolved = await self._resolve_media_items(item)
2172 tracks += [x for x in resolved if isinstance(x, Track)]
2173
2174 return tracks
2175
2176 def _update_queue_from_player(
2177 self,
2178 player: Player,
2179 ) -> None:
2180 """Update the Queue when the player state changed."""
2181 queue_id = player.player_id
2182 queue = self._queues[queue_id]
2183
2184 # basic properties
2185 queue.display_name = player.state.name
2186 queue.available = player.state.available
2187 queue.items = len(self._queue_items[queue_id])
2188
2189 queue.state = (
2190 player.state.playback_state or PlaybackState.IDLE
2191 if queue.active
2192 else PlaybackState.IDLE
2193 )
2194 # update current item/index from player report
2195 if queue.active and queue.state in (
2196 PlaybackState.PLAYING,
2197 PlaybackState.PAUSED,
2198 ):
2199 # NOTE: If the queue is not playing (yet) we will not update the current index
2200 # to ensure we keep the previously known current index
2201 if queue.flow_mode:
2202 # flow mode active, the player is playing one long stream
2203 # so we need to calculate the current index and elapsed time
2204 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2205 elif item_id := self._parse_player_current_item_id(queue_id, player):
2206 # normal mode, the player itself will report the current item
2207 elapsed_time = int(player.state.corrected_elapsed_time or 0)
2208 current_index = self.index_by_id(queue_id, item_id)
2209 else:
2210 # this may happen if the player is still transitioning between tracks
2211 # we ignore this for now and keep the current index as is
2212 return
2213
2214 # get current/next item based on current index
2215 queue.current_index = current_index
2216 queue.current_item = current_item = self.get_item(queue_id, current_index)
2217 queue.next_item = (
2218 self.get_next_item(queue_id, current_index)
2219 if current_item and current_index is not None
2220 else None
2221 )
2222
2223 # correct elapsed time when seeking
2224 if (
2225 not queue.flow_mode
2226 and current_item
2227 and current_item.streamdetails
2228 and current_item.streamdetails.seek_position
2229 ):
2230 elapsed_time += current_item.streamdetails.seek_position
2231 queue.elapsed_time = elapsed_time
2232 queue.elapsed_time_last_updated = time.time()
2233
2234 elif not queue.current_item and queue.current_index is not None:
2235 current_index = queue.current_index
2236 queue.current_item = current_item = self.get_item(queue_id, current_index)
2237 queue.next_item = (
2238 self.get_next_item(queue_id, current_index)
2239 if current_item and current_index is not None
2240 else None
2241 )
2242
2243 # This is enough to detect any changes in the DSPDetails
2244 # (so child count changed, or any output format changed)
2245 output_formats = []
2246 if output_format := player.extra_data.get("output_format"):
2247 output_formats.append(str(output_format))
2248 for child_id in player.state.group_members:
2249 if (child := self.mass.players.get_player(child_id)) and (
2250 output_format := child.extra_data.get("output_format")
2251 ):
2252 output_formats.append(str(output_format))
2253 else:
2254 output_formats.append("unknown")
2255
2256 # basic throttle: do not send state changed events if queue did not actually change
2257 prev_state: CompareState = self._prev_states.get(
2258 queue_id,
2259 CompareState(
2260 queue_id=queue_id,
2261 state=PlaybackState.IDLE,
2262 current_item_id=None,
2263 next_item_id=None,
2264 current_item=None,
2265 elapsed_time=0,
2266 last_playing_elapsed_time=0,
2267 stream_title=None,
2268 codec_type=None,
2269 output_formats=None,
2270 ),
2271 )
2272 # update last_playing_elapsed_time only when the player is actively playing
2273 # use corrected_elapsed_time which accounts for time since last update
2274 # this preserves the last known elapsed time when transitioning to idle/paused
2275 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2276 prev_item_id = prev_state["current_item_id"]
2277 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2278 if queue.state == PlaybackState.PLAYING:
2279 current_elapsed = int(queue.corrected_elapsed_time)
2280 if current_item_id != prev_item_id:
2281 # new track started, reset the elapsed time tracker
2282 last_playing_elapsed_time = current_elapsed
2283 else:
2284 # same track, use the max of current and previous to handle timing issues
2285 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2286 else:
2287 last_playing_elapsed_time = prev_playing_elapsed
2288 new_state = CompareState(
2289 queue_id=queue_id,
2290 state=queue.state,
2291 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2292 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2293 current_item=queue.current_item,
2294 elapsed_time=int(queue.elapsed_time),
2295 last_playing_elapsed_time=last_playing_elapsed_time,
2296 stream_title=(
2297 queue.current_item.streamdetails.stream_title
2298 if queue.current_item and queue.current_item.streamdetails
2299 else None
2300 ),
2301 codec_type=(
2302 queue.current_item.streamdetails.audio_format.codec_type
2303 if queue.current_item and queue.current_item.streamdetails
2304 else None
2305 ),
2306 output_formats=output_formats,
2307 )
2308 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2309 with suppress(KeyError):
2310 changed_keys.remove("next_item_id")
2311 with suppress(KeyError):
2312 changed_keys.remove("last_playing_elapsed_time")
2313
2314 # store the new state
2315 if queue.active:
2316 self._prev_states[queue_id] = new_state
2317 else:
2318 self._prev_states.pop(queue_id, None)
2319
2320 # return early if nothing changed
2321 if len(changed_keys) == 0:
2322 return
2323
2324 # signal update and store state
2325 send_update = True
2326 if changed_keys == {"elapsed_time"}:
2327 # only elapsed time changed, do not send full queue update
2328 send_update = False
2329 prev_time = prev_state.get("elapsed_time") or 0
2330 cur_time = new_state.get("elapsed_time") or 0
2331 if abs(cur_time - prev_time) > 2:
2332 # send dedicated event for time updates when seeking
2333 self.mass.signal_event(
2334 EventType.QUEUE_TIME_UPDATED,
2335 object_id=queue_id,
2336 data=queue.elapsed_time,
2337 )
2338 # also signal update to the player itself so it can update its current_media
2339 self.mass.players.trigger_player_update(queue_id)
2340
2341 if send_update:
2342 self.signal_update(queue_id)
2343
2344 if "output_formats" in changed_keys:
2345 # refresh DSP details since they may have changed
2346 dsp = get_stream_dsp_details(self.mass, queue_id)
2347 if queue.current_item and queue.current_item.streamdetails:
2348 queue.current_item.streamdetails.dsp = dsp
2349 if queue.next_item and queue.next_item.streamdetails:
2350 queue.next_item.streamdetails.dsp = dsp
2351
2352 # handle updating stream_metadata if needed
2353 if (
2354 queue.current_item
2355 and (streamdetails := queue.current_item.streamdetails)
2356 and streamdetails.stream_metadata_update_callback
2357 and (
2358 streamdetails.stream_metadata_last_updated is None
2359 or (
2360 time.time() - streamdetails.stream_metadata_last_updated
2361 >= streamdetails.stream_metadata_update_interval
2362 )
2363 )
2364 ):
2365 streamdetails.stream_metadata_last_updated = time.time()
2366 self.mass.create_task(
2367 streamdetails.stream_metadata_update_callback(
2368 streamdetails, int(queue.corrected_elapsed_time)
2369 )
2370 )
2371
2372 # handle sending a playback progress report
2373 # we do this every 30 seconds or when the state changes
2374 if (
2375 changed_keys.intersection({"state", "current_item_id"})
2376 or int(queue.elapsed_time) % PLAYBACK_REPORT_INTERVAL_SECONDS == 0
2377 ):
2378 self._handle_playback_progress_report(queue, prev_state, new_state)
2379
2380 # check if we need to clear the queue if we reached the end
2381 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2382 self._handle_end_of_queue(queue, prev_state, new_state)
2383
2384 # watch dynamic radio items refill if needed
2385 if "current_item_id" in changed_keys:
2386 # auto enable radio mode if dont stop the music is enabled
2387 if (
2388 queue.dont_stop_the_music_enabled
2389 and queue.enqueued_media_items
2390 and queue.current_index is not None
2391 and (queue.items - queue.current_index) <= 1
2392 ):
2393 # We have received the last item in the queue and Don't stop the music is enabled
2394 # set the played media item(s) as radio items (which will refill the queue)
2395 # note that this will fail if there are no media items for which we have
2396 # a dynamic radio source.
2397 self.logger.debug(
2398 "End of queue detected and Don't stop the music is enabled for %s"
2399 " - setting enqueued media items as radio source: %s",
2400 queue.display_name,
2401 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2402 )
2403 queue.radio_source = queue.enqueued_media_items
2404 # auto fill radio tracks if less than 5 tracks left in the queue
2405 if (
2406 queue.radio_source
2407 and queue.current_index is not None
2408 and (queue.items - queue.current_index) < 5
2409 ):
2410 task_id = f"fill_radio_tracks_{queue_id}"
2411 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2412
2413 def _get_flow_queue_stream_index(
2414 self, queue: PlayerQueue, player: Player
2415 ) -> tuple[int | None, int]:
2416 """Calculate current queue index and current track elapsed time when flow mode is active."""
2417 elapsed_time_queue_total = player.state.corrected_elapsed_time or 0
2418 if queue.current_index is None and not queue.flow_mode_stream_log:
2419 return queue.current_index, int(queue.elapsed_time)
2420
2421 # For each track that has been streamed/buffered to the player,
2422 # a playlog entry will be created with the queue item id
2423 # and the amount of seconds streamed. We traverse the playlog to figure
2424 # out where we are in the queue, accounting for actual streamed
2425 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2426 # it will simply be in the playlog multiple times.
2427 played_time = 0.0
2428 queue_index: int | None = queue.current_index or 0
2429 track_time = 0.0
2430 for play_log_entry in queue.flow_mode_stream_log:
2431 queue_item_duration = (
2432 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2433 play_log_entry.seconds_streamed
2434 if play_log_entry.seconds_streamed is not None
2435 else play_log_entry.duration or 3600 * 24 * 7
2436 )
2437 if elapsed_time_queue_total > (queue_item_duration + played_time):
2438 # total elapsed time is more than (streamed) track duration
2439 # this track has been fully played, move on.
2440 played_time += queue_item_duration
2441 else:
2442 # no more seconds left to divide, this is our track
2443 # account for any seeking by adding the skipped/seeked seconds
2444 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2445 queue_item = self.get_item(queue.queue_id, queue_index)
2446 if queue_item and queue_item.streamdetails:
2447 track_sec_skipped = queue_item.streamdetails.seek_position
2448 else:
2449 track_sec_skipped = 0
2450 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2451 break
2452 if player.state.playback_state != PlaybackState.PLAYING:
2453 # if the player is not playing, we can't be sure that the elapsed time is correct
2454 # so we just return the queue index and the elapsed time
2455 return queue.current_index, int(queue.elapsed_time)
2456 return queue_index, int(track_time)
2457
2458 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2459 """Parse QueueItem ID from Player's current url."""
2460 protocol_player = player
2461 if player.active_output_protocol and player.active_output_protocol != "native":
2462 protocol_player = self.mass.players.get_player(player.active_output_protocol) or player
2463 if not protocol_player.current_media:
2464 # YES, we use player.current_media on purpose here because we need the raw metadata
2465 return None
2466 # prefer queue_id and queue_item_id within the current media
2467 if (
2468 protocol_player.current_media.source_id == queue_id
2469 and protocol_player.current_media.queue_item_id
2470 ):
2471 return protocol_player.current_media.queue_item_id
2472 # special case for sonos players
2473 if protocol_player.current_media.uri and protocol_player.current_media.uri.startswith(
2474 f"mass:{queue_id}"
2475 ):
2476 if protocol_player.current_media.queue_item_id:
2477 return protocol_player.current_media.queue_item_id
2478 return protocol_player.current_media.uri.split(":")[-1]
2479 # try to extract the item id from a mass stream url
2480 if (
2481 protocol_player.current_media.uri
2482 and queue_id in protocol_player.current_media.uri
2483 and self.mass.streams.base_url in protocol_player.current_media.uri
2484 ):
2485 current_item_id = protocol_player.current_media.uri.rsplit("/")[-1].split(".")[0]
2486 if self.get_item(queue_id, current_item_id):
2487 return current_item_id
2488 # try to extract the item id from a queue_id/item_id combi
2489 if (
2490 protocol_player.current_media.uri
2491 and queue_id in protocol_player.current_media.uri
2492 and "/" in protocol_player.current_media.uri
2493 ):
2494 current_item_id = protocol_player.current_media.uri.split("/")[1]
2495 if self.get_item(queue_id, current_item_id):
2496 return current_item_id
2497
2498 return None
2499
2500 def _handle_end_of_queue(
2501 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2502 ) -> None:
2503 """Check if the queue should be cleared after the current item."""
2504 # check if queue state changed to stopped (from playing/paused to idle)
2505 if not (
2506 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2507 and new_state["state"] == PlaybackState.IDLE
2508 ):
2509 return
2510 # check if no more items in the queue (next_item should be None at end of queue)
2511 if queue.next_item is not None:
2512 return
2513 # check if we had a previous item playing
2514 if prev_state["current_item_id"] is None:
2515 return
2516
2517 async def _clear_queue_delayed() -> None:
2518 for _ in range(5):
2519 await asyncio.sleep(1)
2520 if queue.state != PlaybackState.IDLE:
2521 return
2522 if queue.next_item is not None:
2523 return
2524 self.logger.info("End of queue reached, clearing items")
2525 self.clear(queue.queue_id)
2526
2527 # all checks passed, we stopped playback at the last (or single) track of the queue
2528 # now determine if the item was fully played before clearing
2529
2530 # For flow mode, check if the last track was fully streamed using the stream log
2531 # This is more reliable than elapsed_time which can be reset/incorrect
2532 if queue.flow_mode and queue.flow_mode_stream_log:
2533 last_log_entry = queue.flow_mode_stream_log[-1]
2534 if last_log_entry.seconds_streamed is not None:
2535 # The last track finished streaming, safe to clear queue
2536 self.mass.create_task(_clear_queue_delayed())
2537 return
2538
2539 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2540 prev_item = prev_state["current_item"]
2541 if prev_item and (streamdetails := prev_item.streamdetails):
2542 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2543 elif prev_item:
2544 duration = prev_item.duration or 24 * 3600
2545 else:
2546 # No current item means player has already cleared it, safe to clear queue
2547 self.mass.create_task(_clear_queue_delayed())
2548 return
2549
2550 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2551 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2552 seconds_played = int(prev_state["last_playing_elapsed_time"])
2553 # debounce this a bit to make sure we're not clearing the queue by accident
2554 # only clear if the last track was played to near completion (within 5 seconds of end)
2555 if seconds_played >= (duration or 3600) - 5:
2556 self.mass.create_task(_clear_queue_delayed())
2557
2558 def _handle_playback_progress_report(
2559 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2560 ) -> None:
2561 """Handle playback progress report."""
2562 # detect change in current index to report that a item has been played
2563 prev_item_id = prev_state["current_item_id"]
2564 cur_item_id = new_state["current_item_id"]
2565 if prev_item_id is None and cur_item_id is None:
2566 return
2567
2568 if prev_item_id is not None and prev_item_id != cur_item_id:
2569 # we have a new item, so we need report the previous one
2570 is_current_item = False
2571 item_to_report = prev_state["current_item"]
2572 seconds_played = int(prev_state["elapsed_time"])
2573 else:
2574 # report on current item
2575 is_current_item = True
2576 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2577 seconds_played = int(new_state["elapsed_time"])
2578
2579 if not item_to_report:
2580 return # guard against invalid items
2581
2582 if not (media_item := item_to_report.media_item):
2583 # only report on media items
2584 return
2585 assert media_item.uri is not None # uri is set in __post_init__
2586
2587 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2588 # Ignore items that had a stream error
2589 return
2590
2591 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2592 duration = int(item_to_report.streamdetails.duration)
2593 else:
2594 duration = int(item_to_report.duration or 3 * 3600)
2595
2596 if seconds_played < 5:
2597 # ignore items that have been played less than 5 seconds
2598 # this also filters out a bounce effect where the previous item
2599 # gets reported with 0 elapsed seconds after a new item starts playing
2600 return
2601
2602 # determine if item is fully played
2603 # for podcasts and audiobooks we account for the last 60 seconds
2604 percentage_played = percentage(seconds_played, duration)
2605 if not is_current_item and item_to_report.media_type in (
2606 MediaType.AUDIOBOOK,
2607 MediaType.PODCAST_EPISODE,
2608 ):
2609 fully_played = seconds_played >= duration - 60
2610 elif not is_current_item:
2611 # 90% of the track must be played to be considered fully played
2612 fully_played = percentage_played >= 90
2613 else:
2614 fully_played = seconds_played >= duration - 10
2615
2616 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2617 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2618 self.logger.debug(
2619 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2620 queue.display_name,
2621 "is playing" if is_playing else "played",
2622 item_to_report.name,
2623 item_to_report.uri,
2624 fully_played,
2625 f"{percentage_played}%",
2626 seconds_played,
2627 duration,
2628 )
2629 # add entry to playlog - this also handles resume of podcasts/audiobooks
2630 self.mass.create_task(
2631 self.mass.music.mark_item_played(
2632 media_item,
2633 fully_played=fully_played,
2634 seconds_played=seconds_played,
2635 is_playing=is_playing,
2636 userid=queue.userid,
2637 queue_id=queue.queue_id,
2638 user_initiated=False,
2639 )
2640 )
2641
2642 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2643 # signal 'media item played' event,
2644 # which is useful for plugins that want to do scrobbling
2645 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2646 artists_names = [a.name for a in artists]
2647 self.mass.signal_event(
2648 EventType.MEDIA_ITEM_PLAYED,
2649 object_id=media_item.uri,
2650 data=MediaItemPlaybackProgressReport(
2651 uri=media_item.uri,
2652 media_type=media_item.media_type,
2653 name=media_item.name,
2654 version=getattr(media_item, "version", None),
2655 artist=(
2656 getattr(media_item, "artist_str", None) or artists_names[0]
2657 if artists_names
2658 else None
2659 ),
2660 artists=artists_names,
2661 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2662 album=album.name if album else None,
2663 album_mbid=album.mbid if album else None,
2664 album_artist=(album.artist_str if isinstance(album, Album) else None),
2665 album_artist_mbids=(
2666 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2667 ),
2668 image_url=(
2669 self.mass.metadata.get_image_url(
2670 item_to_report.media_item.image, prefer_proxy=False
2671 )
2672 if item_to_report.media_item.image
2673 else None
2674 ),
2675 duration=duration,
2676 mbid=(getattr(media_item, "mbid", None)),
2677 seconds_played=seconds_played,
2678 fully_played=fully_played,
2679 is_playing=is_playing,
2680 userid=queue.userid,
2681 ),
2682 )
2683
2684
2685async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2686 """Shuffle queue items, avoiding identical tracks next to each other.
2687
2688 Best-effort approach to prevent the same track from appearing adjacent.
2689 Does a random shuffle first, then makes a limited number of passes to
2690 swap adjacent duplicates with a random item further in the list.
2691
2692 :param items: List of queue items to shuffle.
2693 """
2694 if len(items) <= 2:
2695 return random.sample(items, len(items)) if len(items) == 2 else items
2696
2697 # Start with a random shuffle
2698 shuffled = random.sample(items, len(items))
2699
2700 # Make a few passes to fix adjacent duplicates
2701 max_passes = 3
2702 for _ in range(max_passes):
2703 swapped = False
2704 for i in range(len(shuffled) - 1):
2705 if shuffled[i].name == shuffled[i + 1].name:
2706 # Found adjacent duplicate - swap with random position at least 2 away
2707 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2708 if swap_candidates:
2709 swap_pos = random.choice(swap_candidates)
2710 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2711 swapped = True
2712 if not swapped:
2713 break
2714 # Yield to event loop between passes
2715 await asyncio.sleep(0)
2716
2717 return shuffled
2718