/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 ProviderFeature,
32 QueueOption,
33 RepeatMode,
34)
35from music_assistant_models.errors import (
36 AudioError,
37 InvalidCommand,
38 InvalidDataError,
39 MediaNotFoundError,
40 MusicAssistantError,
41 PlayerUnavailableError,
42 QueueEmpty,
43 UnsupportedFeaturedException,
44)
45from music_assistant_models.media_items import (
46 Album,
47 Artist,
48 Audiobook,
49 BrowseFolder,
50 Genre,
51 ItemMapping,
52 MediaItemType,
53 PlayableMediaItemType,
54 Playlist,
55 Podcast,
56 PodcastEpisode,
57 Track,
58 UniqueList,
59 media_from_dict,
60)
61from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
62from music_assistant_models.player_queue import PlayerQueue
63from music_assistant_models.queue_item import QueueItem
64
65from music_assistant.constants import (
66 ATTR_ANNOUNCEMENT_IN_PROGRESS,
67 MASS_LOGO_ONLINE,
68 PLAYBACK_REPORT_INTERVAL_SECONDS,
69 PLAYLIST_MEDIA_TYPES,
70 VERBOSE_LOG_LEVEL,
71 PlaylistPlayableItem,
72)
73from music_assistant.controllers.players.controller import IN_QUEUE_COMMAND
74from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
75from music_assistant.helpers.api import api_command
76from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
77from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
78from music_assistant.helpers.util import get_changed_keys, percentage
79from music_assistant.models.core_controller import CoreController
80from music_assistant.models.player import Player, PlayerMedia
81
82if TYPE_CHECKING:
83 from collections.abc import Iterator
84
85 from music_assistant_models.auth import User
86 from music_assistant_models.media_items.metadata import MediaItemImage
87
88 from music_assistant import MusicAssistant
89 from music_assistant.models.player import Player
90
91CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
92CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
93
94ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
95ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
96
97CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
98CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
99CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
100CONF_DEFAULT_ENQUEUE_OPTION_GENRE = "default_enqueue_option_genre"
101CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
102CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
103CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
104CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
105CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
106CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
107CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
108RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
109CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
110CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
111
112
113class CompareState(TypedDict):
114 """Simple object where we store the (previous) state of a queue.
115
116 Used for compare actions.
117 """
118
119 queue_id: str
120 state: PlaybackState
121 current_item_id: str | None
122 next_item_id: str | None
123 current_item: QueueItem | None
124 elapsed_time: int
125 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
126 # used to determine if a track was fully played when transitioning to idle
127 last_playing_elapsed_time: int
128 stream_title: str | None
129 codec_type: ContentType | None
130 output_formats: list[str] | None
131
132
133class PlayerQueuesController(CoreController):
134 """Controller holding all logic to enqueue music for players."""
135
136 domain: str = "player_queues"
137
138 def __init__(self, mass: MusicAssistant) -> None:
139 """Initialize core controller."""
140 super().__init__(mass)
141 self._queues: dict[str, PlayerQueue] = {}
142 self._queue_items: dict[str, list[QueueItem]] = {}
143 self._prev_states: dict[str, CompareState] = {}
144 self._transitioning_players: set[str] = set()
145 self.manifest.name = "Player Queues controller"
146 self.manifest.description = (
147 "Music Assistant's core controller which manages the queues for all players."
148 )
149 self.manifest.icon = "playlist-music"
150
151 async def close(self) -> None:
152 """Cleanup on exit."""
153 # stop all playback
154 for queue in self.all():
155 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
156 await self.stop(queue.queue_id)
157
158 async def get_config_entries(
159 self,
160 action: str | None = None,
161 values: dict[str, ConfigValueType] | None = None,
162 ) -> tuple[ConfigEntry, ...]:
163 """Return all Config Entries for this core module (if any)."""
164 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
165 return (
166 ConfigEntry(
167 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
168 type=ConfigEntryType.STRING,
169 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
170 label="Items to select when you play a (in-library) artist.",
171 options=[
172 ConfigValueOption(
173 title="Only in-library tracks",
174 value="library_tracks",
175 ),
176 ConfigValueOption(
177 title="All tracks from all albums in the library",
178 value="library_album_tracks",
179 ),
180 ConfigValueOption(
181 title="All (top) tracks from (all) streaming provider(s)",
182 value="all_tracks",
183 ),
184 ConfigValueOption(
185 title="All tracks from all albums from (all) streaming provider(s)",
186 value="all_album_tracks",
187 ),
188 ],
189 ),
190 ConfigEntry(
191 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
192 type=ConfigEntryType.STRING,
193 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
194 label="Items to select when you play a (in-library) album.",
195 options=[
196 ConfigValueOption(
197 title="Only in-library tracks",
198 value="library_tracks",
199 ),
200 ConfigValueOption(
201 title="All tracks for album on (streaming) provider",
202 value="all_tracks",
203 ),
204 ],
205 ),
206 ConfigEntry(
207 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
208 type=ConfigEntryType.STRING,
209 default_value=QueueOption.REPLACE.value,
210 label="Default enqueue option for Artist item(s).",
211 options=enqueue_options,
212 description="Define the default enqueue action for this mediatype.",
213 ),
214 ConfigEntry(
215 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
216 type=ConfigEntryType.STRING,
217 default_value=QueueOption.REPLACE.value,
218 label="Default enqueue option for Album item(s).",
219 options=enqueue_options,
220 description="Define the default enqueue action for this mediatype.",
221 ),
222 ConfigEntry(
223 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
224 type=ConfigEntryType.STRING,
225 default_value=QueueOption.PLAY.value,
226 label="Default enqueue option for Track item(s).",
227 options=enqueue_options,
228 description="Define the default enqueue action for this mediatype.",
229 ),
230 ConfigEntry(
231 key=CONF_DEFAULT_ENQUEUE_OPTION_GENRE,
232 type=ConfigEntryType.STRING,
233 default_value=QueueOption.REPLACE.value,
234 label="Default enqueue option for Genre item(s).",
235 options=enqueue_options,
236 description="Define the default enqueue action for this mediatype.",
237 ),
238 ConfigEntry(
239 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
240 type=ConfigEntryType.STRING,
241 default_value=QueueOption.REPLACE.value,
242 label="Default enqueue option for Radio item(s).",
243 options=enqueue_options,
244 description="Define the default enqueue action for this mediatype.",
245 ),
246 ConfigEntry(
247 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
248 type=ConfigEntryType.STRING,
249 default_value=QueueOption.REPLACE.value,
250 label="Default enqueue option for Playlist item(s).",
251 options=enqueue_options,
252 description="Define the default enqueue action for this mediatype.",
253 ),
254 ConfigEntry(
255 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
256 type=ConfigEntryType.STRING,
257 default_value=QueueOption.REPLACE.value,
258 label="Default enqueue option for Audiobook item(s).",
259 options=enqueue_options,
260 hidden=True,
261 ),
262 ConfigEntry(
263 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
264 type=ConfigEntryType.STRING,
265 default_value=QueueOption.REPLACE.value,
266 label="Default enqueue option for Podcast item(s).",
267 options=enqueue_options,
268 hidden=True,
269 ),
270 ConfigEntry(
271 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
272 type=ConfigEntryType.STRING,
273 default_value=QueueOption.REPLACE.value,
274 label="Default enqueue option for Podcast-episode item(s).",
275 options=enqueue_options,
276 hidden=True,
277 ),
278 ConfigEntry(
279 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
280 type=ConfigEntryType.STRING,
281 default_value=QueueOption.REPLACE.value,
282 label="Default enqueue option for Folder item(s).",
283 options=enqueue_options,
284 hidden=True,
285 ),
286 )
287
288 def __iter__(self) -> Iterator[PlayerQueue]:
289 """Iterate over (available) players."""
290 return iter(self._queues.values())
291
292 @api_command("player_queues/all")
293 def all(self) -> tuple[PlayerQueue, ...]:
294 """Return all registered PlayerQueues."""
295 return tuple(self._queues.values())
296
297 @api_command("player_queues/get")
298 def get(self, queue_id: str) -> PlayerQueue | None:
299 """Return PlayerQueue by queue_id or None if not found."""
300 return self._queues.get(queue_id)
301
302 @api_command("player_queues/items")
303 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
304 """Return all QueueItems for given PlayerQueue."""
305 if queue_id not in self._queue_items:
306 return []
307
308 return self._queue_items[queue_id][offset : offset + limit]
309
310 @api_command("player_queues/get_active_queue")
311 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
312 """Return the current active/synced queue for a player."""
313 if player := self.mass.players.get_player(player_id):
314 return self.mass.players.get_active_queue(player)
315 return None
316
317 # Queue commands
318
319 @api_command("player_queues/shuffle")
320 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
321 """Configure shuffle setting on the the queue."""
322 queue = self._queues[queue_id]
323 if queue.shuffle_enabled == shuffle_enabled:
324 return # no change
325 queue.shuffle_enabled = shuffle_enabled
326 queue_items = self._queue_items[queue_id]
327 cur_index = (
328 queue.index_in_buffer if queue.index_in_buffer is not None else queue.current_index
329 )
330 if cur_index is not None:
331 next_index = cur_index + 1
332 next_items = queue_items[next_index:]
333 else:
334 next_items = []
335 next_index = 0
336 if not shuffle_enabled:
337 # shuffle disabled, try to restore original sort order of the remaining items
338 next_items.sort(key=lambda x: x.sort_index, reverse=False)
339 await self.load(
340 queue_id=queue_id,
341 queue_items=next_items,
342 insert_at_index=next_index,
343 keep_remaining=False,
344 shuffle=shuffle_enabled,
345 )
346
347 @api_command("player_queues/dont_stop_the_music")
348 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
349 """Configure Don't stop the music setting on the queue."""
350 providers_available_with_similar_tracks = any(
351 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
352 for provider in self.mass.music.providers
353 )
354 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
355 raise UnsupportedFeaturedException(
356 "Don't stop the music is not supported by any of the available music providers"
357 )
358 queue = self._queues[queue_id]
359 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
360 self.signal_update(queue_id=queue_id)
361 # if this happens to be the last track in the queue, fill the radio source
362 if (
363 queue.dont_stop_the_music_enabled
364 and queue.enqueued_media_items
365 and queue.current_index is not None
366 and (queue.items - queue.current_index) <= 1
367 ):
368 queue.radio_source = queue.enqueued_media_items
369 task_id = f"fill_radio_tracks_{queue_id}"
370 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
371
372 @api_command("player_queues/repeat")
373 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
374 """Configure repeat setting on the the queue."""
375 queue = self._queues[queue_id]
376 if queue.repeat_mode == repeat_mode:
377 return # no change
378 queue.repeat_mode = repeat_mode
379 self.signal_update(queue_id)
380 if (
381 queue.state == PlaybackState.PLAYING
382 and queue.index_in_buffer is not None
383 and queue.index_in_buffer == queue.current_index
384 ):
385 # if the queue is playing,
386 # ensure to (re)queue the next track because it might have changed
387 # note that we only do this if the player has loaded the current track
388 # if not, we wait until it has loaded to prevent conflicts
389 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
390 self._enqueue_next_item(queue_id, next_item)
391
392 @api_command("player_queues/set_playback_speed")
393 async def set_playback_speed(self, queue_id: str, speed: float) -> None:
394 """Set the playback speed for the given queue.
395
396 :param queue_id: queue_id of the queue to configure.
397 :param speed: playback speed multiplier (0.5 to 2.0). 1.0 = normal speed.
398 """
399 if not (0.5 <= speed <= 2.0):
400 raise InvalidDataError(f"Playback speed must be between 0.5 and 2.0, got {speed}")
401 queue = self._queues[queue_id]
402 current_speed = float(queue.extra_attributes.get("playback_speed") or 1.0)
403 if abs(current_speed - speed) < 0.001:
404 return # no change
405 queue.extra_attributes["playback_speed"] = speed
406 self.signal_update(queue_id)
407 if queue.state == PlaybackState.PLAYING:
408 await self.resume(queue_id)
409
410 @api_command("player_queues/play_media")
411 async def play_media(
412 self,
413 queue_id: str,
414 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
415 option: QueueOption | None = None,
416 radio_mode: bool = False,
417 start_item: PlayableMediaItemType | str | None = None,
418 username: str | None = None,
419 ) -> None:
420 """Play media item(s) on the given queue.
421
422 :param queue_id: The queue_id of the queue to play media on.
423 :param media: Media that should be played (MediaItem(s) and/or uri's).
424 :param option: Which enqueue mode to use.
425 :param radio_mode: Enable radio mode for the given item(s).
426 :param start_item: Optional item to start the playlist or album from.
427 :param username: The username of the user requesting the playback.
428 Setting the username allows for overriding the logged-in user
429 to account for playback history per user when the play_media is
430 called from a shared context (like a web hook or automation).
431 """
432 # ruff: noqa: PLR0915
433 # we use a contextvar to bypass the throttler for this asyncio task/context
434 # this makes sure that playback has priority over other requests that may be
435 # happening in the background
436 BYPASS_THROTTLER.set(True)
437 if not (queue := self.get(queue_id)):
438 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
439 # always fetch the underlying player so we can raise early if its not available
440 queue_player = self.mass.players.get_player(queue_id, True)
441 assert queue_player is not None # for type checking
442 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
443 self.logger.warning("Ignore queue command: An announcement is in progress")
444 return
445
446 # save the user requesting the playback
447 playback_user: User | None
448 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
449 playback_user = user
450 else:
451 playback_user = get_current_user()
452 queue.userid = playback_user.user_id if playback_user else None
453
454 # a single item or list of items may be provided
455 media_list = media if isinstance(media, list) else [media]
456
457 # clear queue if needed
458 if option == QueueOption.REPLACE:
459 self.clear(queue_id)
460 # Clear the 'enqueued media item' list when a new queue is requested
461 if option not in (QueueOption.ADD, QueueOption.NEXT):
462 queue.enqueued_media_items.clear()
463
464 media_items: list[MediaItemType] = []
465 radio_source: list[MediaItemType] = []
466 # resolve all media items
467 for item in media_list:
468 try:
469 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
470 media_item: MediaItemType | ItemMapping | BrowseFolder
471 if isinstance(item, str):
472 media_item = await self.mass.music.get_item_by_uri(item)
473 elif isinstance(item, dict): # type: ignore[unreachable]
474 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
475 # converting them to MediaItem objects. The parse_value function in api.py
476 # should handle dict-to-object conversion, but dicts are slipping through
477 # in some cases. This is defensive handling for that parser bug.
478 media_item = media_from_dict(item) # type: ignore[unreachable]
479 self.logger.debug("Converted to: %s", type(media_item))
480 else:
481 # item is MediaItemType | ItemMapping at this point
482 media_item = item
483
484 # Save requested media item to play on the queue so we can use it as a source
485 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
486 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
487 if not isinstance(
488 media_item, (ItemMapping, BrowseFolder)
489 ) and media_item.media_type in (
490 MediaType.TRACK,
491 MediaType.ALBUM,
492 MediaType.PLAYLIST,
493 MediaType.ARTIST,
494 ):
495 queue.enqueued_media_items.append(media_item)
496 if len(queue.enqueued_media_items) > 10:
497 queue.enqueued_media_items.pop(0)
498
499 # handle default enqueue option if needed
500 if option is None:
501 config_value = await self.mass.config.get_core_config_value(
502 self.domain,
503 f"default_enqueue_option_{media_item.media_type.value}",
504 return_type=str,
505 )
506 option = QueueOption(config_value)
507 if option == QueueOption.REPLACE:
508 self.clear(queue_id, skip_stop=True)
509
510 # collect media_items to play
511 if radio_mode:
512 # Type guard for mypy - only add full MediaItemType to radio_source
513 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
514 radio_source.append(media_item)
515 else:
516 # Convert start_item to string URI if needed
517 start_item_uri: str | None = None
518 if isinstance(start_item, str):
519 start_item_uri = start_item
520 elif start_item is not None:
521 start_item_uri = start_item.uri
522 media_items += await self._resolve_media_items(
523 media_item, start_item_uri, queue_id=queue_id
524 )
525
526 except MusicAssistantError as err:
527 # invalid MA uri or item not found error
528 self.logger.warning("Skipping %s: %s", item, str(err))
529
530 # overwrite or append radio source items
531 if option not in (QueueOption.ADD, QueueOption.NEXT):
532 queue.radio_source = radio_source
533 else:
534 queue.radio_source += radio_source
535 # Use collected media items to calculate the radio if radio mode is on
536 if radio_mode:
537 radio_tracks = await self._get_radio_tracks(
538 queue_id=queue_id, is_initial_radio_mode=True
539 )
540 media_items = list(radio_tracks)
541
542 # only add valid/available items
543 queue_items: list[QueueItem] = []
544 for x in media_items:
545 if not x or not x.available:
546 continue
547 queue_items.append(
548 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
549 )
550
551 if not queue_items:
552 raise MediaNotFoundError("No playable items found")
553
554 # load the items into the queue
555 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
556 cur_index = (
557 queue.index_in_buffer
558 if queue.index_in_buffer is not None
559 else (queue.current_index if queue.current_index is not None else 0)
560 )
561 else:
562 cur_index = queue.current_index or 0
563 insert_at_index = cur_index + 1
564 # Radio modes are already shuffled in a pattern we would like to keep.
565 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
566
567 # handle replace: clear all items and replace with the new items
568 if option == QueueOption.REPLACE:
569 await self.load(
570 queue_id,
571 queue_items=queue_items,
572 keep_remaining=False,
573 keep_played=False,
574 shuffle=shuffle,
575 )
576 await self.play_index(queue_id, 0)
577 return
578 # handle next: add item(s) in the index next to the playing/loaded/buffered index
579 if option == QueueOption.NEXT:
580 await self.load(
581 queue_id,
582 queue_items=queue_items,
583 insert_at_index=insert_at_index,
584 shuffle=shuffle,
585 )
586 return
587 if option == QueueOption.REPLACE_NEXT:
588 await self.load(
589 queue_id,
590 queue_items=queue_items,
591 insert_at_index=insert_at_index,
592 keep_remaining=False,
593 shuffle=shuffle,
594 )
595 return
596 # handle play: replace current loaded/playing index with new item(s)
597 if option == QueueOption.PLAY:
598 await self.load(
599 queue_id,
600 queue_items=queue_items,
601 insert_at_index=insert_at_index,
602 shuffle=shuffle,
603 )
604 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
605 await self.play_index(queue_id, next_index)
606 return
607 # handle add: add/append item(s) to the remaining queue items
608 if option == QueueOption.ADD:
609 await self.load(
610 queue_id=queue_id,
611 queue_items=queue_items,
612 insert_at_index=insert_at_index
613 if queue.shuffle_enabled
614 else len(self._queue_items[queue_id]) + 1,
615 shuffle=queue.shuffle_enabled,
616 )
617 # handle edgecase, queue is empty and items are only added (not played)
618 # mark first item as new index
619 if queue.current_index is None:
620 queue.current_index = 0
621 queue.current_item = self.get_item(queue_id, 0)
622 queue.items = len(queue_items)
623 self.signal_update(queue_id)
624
625 @api_command("player_queues/move_item")
626 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
627 """
628 Move queue item x up/down the queue.
629
630 - queue_id: id of the queue to process this request.
631 - queue_item_id: the item_id of the queueitem that needs to be moved.
632 - pos_shift: move item x positions down if positive value
633 - pos_shift: move item x positions up if negative value
634 - pos_shift: move item to top of queue as next item if 0.
635 """
636 queue = self._queues[queue_id]
637 item_index = self.index_by_id(queue_id, queue_item_id)
638 if item_index is None:
639 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
640 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
641 msg = f"{item_index} is already played/buffered"
642 raise IndexError(msg)
643
644 queue_items = self._queue_items[queue_id]
645 queue_items = queue_items.copy()
646
647 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
648 new_index = (queue.current_index or 0) + 1
649 elif pos_shift == 0:
650 new_index = queue.current_index or 0
651 else:
652 new_index = item_index + pos_shift
653 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
654 return
655 # move the item in the list
656 queue_items.insert(new_index, queue_items.pop(item_index))
657 self.update_items(queue_id, queue_items)
658
659 @api_command("player_queues/move_item_end")
660 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
661 """
662 Move queue item to the end the queue.
663
664 - queue_id: id of the queue to process this request.
665 - queue_item_id: the item_id of the queueitem that needs to be moved.
666 """
667 queue = self._queues[queue_id]
668 item_index = self.index_by_id(queue_id, queue_item_id)
669 if item_index is None:
670 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
671 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
672 msg = f"{item_index} is already played/buffered"
673 raise IndexError(msg)
674
675 queue_items = self._queue_items[queue_id]
676 if item_index == (len(queue_items) - 1):
677 return
678 queue_items = queue_items.copy()
679
680 new_index = len(self._queue_items[queue_id]) - 1
681
682 # move the item in the list
683 queue_items.insert(new_index, queue_items.pop(item_index))
684 self.update_items(queue_id, queue_items)
685
686 @api_command("player_queues/delete_item")
687 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
688 """Delete item (by id or index) from the queue."""
689 if isinstance(item_id_or_index, str):
690 item_index = self.index_by_id(queue_id, item_id_or_index)
691 if item_index is None:
692 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
693 else:
694 item_index = item_id_or_index
695 queue = self._queues[queue_id]
696 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
697 # ignore request if track already loaded in the buffer
698 # the frontend should guard so this is just in case
699 self.logger.warning("delete requested for item already loaded in buffer")
700 return
701 queue_items = self._queue_items[queue_id]
702 queue_items.pop(item_index)
703 self.update_items(queue_id, queue_items)
704
705 @api_command("player_queues/clear")
706 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
707 """Clear all items in the queue."""
708 queue = self._queues[queue_id]
709 queue.radio_source = []
710 if queue.state != PlaybackState.IDLE and not skip_stop:
711 self.mass.create_task(self.stop(queue_id))
712 queue.current_index = None
713 queue.current_item = None
714 queue.elapsed_time = 0
715 queue.elapsed_time_last_updated = time.time()
716 queue.index_in_buffer = None
717 self.update_items(queue_id, [])
718
719 @api_command("player_queues/save_as_playlist")
720 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
721 """Save the current queue items as a new playlist.
722
723 :param queue_id: The queue_id of the queue to save.
724 :param name: The name for the new playlist.
725 """
726 if not self.get(queue_id):
727 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
728 queue_items = self._queue_items.get(queue_id, [])
729 if not queue_items:
730 raise QueueEmpty("Cannot save an empty queue as a playlist.")
731 # collect URIs from queue items that are playlist-compatible
732 uris: list[str] = []
733 for item in queue_items:
734 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
735 uris.append(item.uri)
736 if not uris:
737 raise InvalidDataError("No valid items in queue to save as playlist.")
738 playlist = await self.mass.music.playlists.create_playlist(name)
739 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
740 return playlist
741
742 @api_command("player_queues/stop")
743 async def stop(self, queue_id: str) -> None:
744 """
745 Handle STOP command for given queue.
746
747 - queue_id: queue_id of the playerqueue to handle the command.
748 """
749 queue_player = self.mass.players.get_player(queue_id, True)
750 if queue_player is None:
751 raise PlayerUnavailableError(f"Player {queue_id} is not available")
752 if (queue := self.get(queue_id)) and queue.active:
753 if queue.state == PlaybackState.PLAYING:
754 queue.resume_pos = int(queue.corrected_elapsed_time)
755 # Set context to prevent circular call, then forward the actual command to the player
756 token = IN_QUEUE_COMMAND.set(True)
757 try:
758 await self.mass.players.cmd_stop(queue_id)
759 finally:
760 IN_QUEUE_COMMAND.reset(token)
761
762 @api_command("player_queues/play")
763 async def play(self, queue_id: str) -> None:
764 """
765 Handle PLAY command for given queue.
766
767 - queue_id: queue_id of the playerqueue to handle the command.
768 """
769 queue_player = self.mass.players.get_player(queue_id, True)
770 if queue_player is None:
771 raise PlayerUnavailableError(f"Player {queue_id} is not available")
772 if (
773 (queue := self._queues.get(queue_id))
774 and queue.active
775 and queue.state == PlaybackState.PAUSED
776 ):
777 # forward the actual play/unpause command to the player
778 await queue_player.play()
779 return
780 # player is not paused, perform resume instead
781 await self.resume(queue_id)
782
783 @api_command("player_queues/pause")
784 async def pause(self, queue_id: str) -> None:
785 """Handle PAUSE command for given queue.
786
787 - queue_id: queue_id of the playerqueue to handle the command.
788 """
789 if not (queue := self._queues.get(queue_id)):
790 return
791 queue_active = queue.active
792 if queue.active and queue.state == PlaybackState.PLAYING:
793 queue.resume_pos = int(queue.corrected_elapsed_time)
794 # forward the actual command to the player controller
795 # Set context to prevent circular call, then forward the actual command to the player
796 token = IN_QUEUE_COMMAND.set(True)
797 try:
798 await self.mass.players.cmd_pause(queue_id)
799 finally:
800 IN_QUEUE_COMMAND.reset(token)
801
802 async def _watch_pause(player: Player) -> None:
803 count = 0
804 # wait for pause
805 while count < 5 and player.state.playback_state == PlaybackState.PLAYING:
806 count += 1
807 await asyncio.sleep(1)
808 # wait for unpause
809 if player.state.playback_state != PlaybackState.PAUSED:
810 return
811 count = 0
812 while count < 30 and player.state.playback_state == PlaybackState.PAUSED:
813 count += 1
814 await asyncio.sleep(1)
815 # if player is still paused when the limit is reached, send stop
816 if player.state.playback_state == PlaybackState.PAUSED:
817 await self.stop(queue_id)
818
819 # we auto stop a player from paused when its paused for 30 seconds
820 if (
821 queue_active
822 and (queue_player := self.mass.players.get_player(queue_id))
823 and not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS)
824 ):
825 self.mass.create_task(_watch_pause(queue_player))
826
827 @api_command("player_queues/play_pause")
828 async def play_pause(self, queue_id: str) -> None:
829 """Toggle play/pause on given playerqueue.
830
831 - queue_id: queue_id of the queue to handle the command.
832 """
833 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
834 await self.pause(queue_id)
835 return
836 await self.play(queue_id)
837
838 @api_command("player_queues/next")
839 async def next(self, queue_id: str) -> None:
840 """Handle NEXT TRACK command for given queue.
841
842 - queue_id: queue_id of the queue to handle the command.
843 """
844 if (queue := self.get(queue_id)) is None or not queue.active:
845 raise InvalidCommand(f"Queue {queue_id} is not active")
846 idx = self._queues[queue_id].current_index
847 if idx is None:
848 self.logger.warning("Queue %s has no current index", queue.display_name)
849 return
850 attempts = 5
851 while attempts:
852 try:
853 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
854 await self.play_index(queue_id, next_index, debounce=True)
855 break
856 except MediaNotFoundError:
857 self.logger.warning(
858 "Failed to fetch next track for queue %s - trying next item",
859 queue.display_name,
860 )
861 idx += 1
862 attempts -= 1
863
864 @api_command("player_queues/previous")
865 async def previous(self, queue_id: str) -> None:
866 """Handle PREVIOUS TRACK command for given queue.
867
868 - queue_id: queue_id of the queue to handle the command.
869 """
870 if (queue := self.get(queue_id)) is None or not queue.active:
871 raise InvalidCommand(f"Queue {queue_id} is not active")
872 current_index = self._queues[queue_id].current_index
873 if current_index is None:
874 return
875 next_index = int(current_index)
876 # restart current track if current track has played longer than 4
877 # otherwise skip to previous track
878 if self._queues[queue_id].elapsed_time < 5:
879 next_index = max(current_index - 1, 0)
880 await self.play_index(queue_id, next_index, debounce=True)
881
882 @api_command("player_queues/skip")
883 async def skip(self, queue_id: str, seconds: int = 10) -> None:
884 """Handle SKIP command for given queue.
885
886 - queue_id: queue_id of the queue to handle the command.
887 - seconds: number of seconds to skip in track. Use negative value to skip back.
888 """
889 if (queue := self.get(queue_id)) is None or not queue.active:
890 raise InvalidCommand(f"Queue {queue_id} is not active")
891 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
892
893 @api_command("player_queues/seek")
894 async def seek(self, queue_id: str, position: int = 10) -> None:
895 """Handle SEEK command for given queue.
896
897 - queue_id: queue_id of the queue to handle the command.
898 - position: position in seconds to seek to in the current playing item.
899 """
900 if (queue := self.get(queue_id)) is None or not queue.active:
901 raise InvalidCommand(f"Queue {queue_id} is not active")
902 queue_player = self.mass.players.get_player(queue_id, True)
903 if queue_player is None:
904 raise PlayerUnavailableError(f"Player {queue_id} is not available")
905 if not queue.current_item:
906 raise InvalidCommand(f"Queue {queue_player.state.name} has no item(s) loaded.")
907 if not queue.current_item.duration:
908 raise InvalidCommand("Can not seek items without duration.")
909 position = max(0, int(position))
910 if position > queue.current_item.duration:
911 raise InvalidCommand("Can not seek outside of duration range.")
912 if queue.current_index is None:
913 raise InvalidCommand(f"Queue {queue_player.state.name} has no current index.")
914 await self.play_index(queue_id, queue.current_index, seek_position=position)
915
916 @api_command("player_queues/resume")
917 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
918 """Handle RESUME command for given queue.
919
920 - queue_id: queue_id of the queue to handle the command.
921 """
922 queue = self._queues[queue_id]
923 queue_items = self._queue_items[queue_id]
924 resume_item = queue.current_item
925 if queue.state == PlaybackState.PLAYING:
926 # resume requested while already playing,
927 # use current position as resume position
928 resume_pos = queue.corrected_elapsed_time
929 fade_in = False
930 else:
931 resume_pos = queue.resume_pos or queue.elapsed_time
932
933 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
934 resume_item = self.get_item(queue_id, queue.current_index)
935 resume_pos = 0
936 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
937 # items available in queue but no previous track, start at 0
938 resume_item = self.get_item(queue_id, 0)
939 resume_pos = 0
940
941 if resume_item is not None:
942 queue_player = self.mass.players.get_player(queue_id)
943 if queue_player is None:
944 raise PlayerUnavailableError(f"Player {queue_id} is not available")
945 if (
946 fade_in is None
947 and queue_player.state.playback_state == PlaybackState.IDLE
948 and (time.time() - queue.elapsed_time_last_updated) > 60
949 ):
950 # enable fade in effect if the player is idle for a while
951 fade_in = resume_pos > 0
952 if resume_item.media_type == MediaType.RADIO:
953 # we're not able to skip in online radio so this is pointless
954 resume_pos = 0
955 await self.play_index(
956 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
957 )
958 else:
959 # Queue is empty, try to resume from playlog
960 if await self._try_resume_from_playlog(queue):
961 return
962 msg = f"Resume queue requested but queue {queue.display_name} is empty"
963 raise QueueEmpty(msg)
964
965 @api_command("player_queues/play_index")
966 async def play_index(
967 self,
968 queue_id: str,
969 index: int | str,
970 seek_position: int = 0,
971 fade_in: bool = False,
972 debounce: bool = False,
973 ) -> None:
974 """Play item at index (or item_id) X in queue."""
975 queue = self._queues[queue_id]
976 queue.resume_pos = 0
977 if isinstance(index, str):
978 temp_index = self.index_by_id(queue_id, index)
979 if temp_index is None:
980 raise InvalidDataError(f"Item {index} not found in queue")
981 index = temp_index
982 # At this point index is guaranteed to be int
983 queue.current_index = index
984 # update current item and elapsed time and signal update
985 # this way the UI knows immediately that a new item is loading
986 queue.current_item = self.get_item(queue_id, index)
987 queue.elapsed_time = seek_position
988 queue.elapsed_time_last_updated = time.time()
989 self.signal_update(queue_id)
990 queue.index_in_buffer = index
991 queue.flow_mode_stream_log = []
992 target_player = self.mass.players.get_player(queue_id)
993 if target_player is None:
994 raise PlayerUnavailableError(f"Player {queue_id} is not available")
995 queue.next_item_id_enqueued = None
996 # always update session id when we start a new playback session
997 queue.session_id = shortuuid.random(length=8)
998 # handle resume point of audiobook(chapter) or podcast(episode)
999 if (
1000 not seek_position
1001 and (queue_item := self.get_item(queue_id, index))
1002 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
1003 ):
1004 seek_position = max(0, int((resume_position_ms - 500) / 1000))
1005
1006 # send play_media request to player
1007 # NOTE that we debounce this a bit to account for someone hitting the next button
1008 # like a madman. This will prevent the player from being overloaded with requests.
1009 async def _play_index(index: int, debounce: bool) -> None:
1010 for attempt in range(5):
1011 try:
1012 queue_item = self.get_item(queue_id, index)
1013 if not queue_item:
1014 continue # guard
1015 await self._load_item(
1016 queue_item,
1017 self._get_next_index(queue_id, index),
1018 is_start=True,
1019 seek_position=seek_position if attempt == 0 else 0,
1020 fade_in=fade_in if attempt == 0 else False,
1021 )
1022 # if we reach this point, loading the item succeeded, break the loop
1023 queue.current_index = index
1024 queue.current_item = queue_item
1025 break
1026 except (MediaNotFoundError, AudioError):
1027 # the requested index can not be played.
1028 if queue_item:
1029 self.logger.warning(
1030 "Skipping unplayable item %s (%s)",
1031 queue_item.name,
1032 queue_item.uri,
1033 )
1034 queue_item.available = False
1035 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
1036 if next_index is None:
1037 raise MediaNotFoundError("No next item available")
1038 index = next_index
1039 else:
1040 # all attempts to find a playable item failed
1041 raise MediaNotFoundError("No playable item found to start playback")
1042
1043 # work out if we need to use flow mode
1044 flow_mode = target_player.flow_mode and queue_item.media_type not in (
1045 # don't use flow mode for duration-less streams
1046 MediaType.RADIO,
1047 MediaType.PLUGIN_SOURCE,
1048 )
1049 await asyncio.sleep(0.5 if debounce else 0.1)
1050 queue.flow_mode = flow_mode
1051 await self.mass.players.play_media(
1052 player_id=queue_id,
1053 media=await self.player_media_from_queue_item(queue_item, flow_mode),
1054 )
1055 queue.current_index = index
1056 queue.current_item = queue_item
1057 await asyncio.sleep(2)
1058 self._transitioning_players.discard(queue_id)
1059
1060 # we set a flag to notify the update logic that we're transitioning to a new track
1061 self._transitioning_players.add(queue_id)
1062
1063 # we debounce the play_index command to handle the case where someone
1064 # is spamming next/previous on the player
1065 task_id = f"play_index_{queue_id}"
1066 if existing_task := self.mass.get_task(task_id):
1067 existing_task.cancel()
1068 with suppress(asyncio.CancelledError):
1069 await existing_task
1070 task = self.mass.create_task(
1071 _play_index,
1072 index,
1073 debounce,
1074 task_id=task_id,
1075 )
1076 await task
1077 self.signal_update(queue_id)
1078
1079 @api_command("player_queues/transfer")
1080 async def transfer_queue(
1081 self,
1082 source_queue_id: str,
1083 target_queue_id: str,
1084 auto_play: bool | None = None,
1085 ) -> None:
1086 """Transfer queue to another queue."""
1087 if not (source_queue := self.get(source_queue_id)):
1088 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1089 if not (target_queue := self.get(target_queue_id)):
1090 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1091 if auto_play is None:
1092 auto_play = source_queue.state == PlaybackState.PLAYING
1093
1094 target_player = self.mass.players.get_player(target_queue_id)
1095 if target_player is None:
1096 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1097 if target_player.state.active_group or target_player.state.synced_to:
1098 # edge case: the user wants to move playback from the group as a whole, to a single
1099 # player in the group or it is grouped and the command targeted at the single player.
1100 # We need to dissolve the group first.
1101 group_id = target_player.state.active_group or target_player.state.synced_to
1102 assert group_id is not None # checked in if condition above
1103 await self.mass.players.cmd_ungroup(group_id)
1104 await asyncio.sleep(3)
1105
1106 source_items = self._queue_items[source_queue_id]
1107 target_queue.repeat_mode = source_queue.repeat_mode
1108 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1109 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1110 target_queue.radio_source = source_queue.radio_source
1111 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1112 target_queue.resume_pos = int(source_queue.elapsed_time)
1113 target_queue.current_index = source_queue.current_index
1114 if source_queue.current_item:
1115 target_queue.current_item = source_queue.current_item
1116 target_queue.current_item.queue_id = target_queue_id
1117 self.clear(source_queue_id)
1118
1119 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1120 for item in source_items:
1121 item.queue_id = target_queue_id
1122 self.update_items(target_queue_id, source_items)
1123 if auto_play:
1124 await self.resume(target_queue_id)
1125
1126 # Interaction with player
1127
1128 async def on_player_register(self, player: Player) -> None:
1129 """Register PlayerQueue for given player/queue id."""
1130 queue_id = player.player_id
1131 queue: PlayerQueue | None = None
1132 queue_items: list[QueueItem] = []
1133 # try to restore previous state
1134 if prev_state := await self.mass.cache.get(
1135 key=queue_id,
1136 provider=self.domain,
1137 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1138 ):
1139 try:
1140 queue = PlayerQueue.from_dict(prev_state)
1141 prev_items = await self.mass.cache.get(
1142 key=queue_id,
1143 provider=self.domain,
1144 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1145 default=[],
1146 )
1147 queue_items = []
1148 for idx, item_data in enumerate(prev_items):
1149 qi = QueueItem.from_cache(item_data)
1150 if not qi.media_item:
1151 # Skip items with missing media_item - this can happen if
1152 # MA was killed during shutdown while cache was being written
1153 self.logger.debug(
1154 "Skipping queue item %s (index %d) restored from cache "
1155 "without media_item",
1156 qi.name,
1157 idx,
1158 )
1159 continue
1160 queue_items.append(qi)
1161 if queue.enqueued_media_items:
1162 # we need to restore the MediaItem objects for the enqueued media items
1163 # Items from cache may be dicts that need deserialization
1164 restored_enqueued_items: list[MediaItemType] = []
1165 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1166 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1167 )
1168 for item in cached_items:
1169 if isinstance(item, dict):
1170 restored_item = media_from_dict(item)
1171 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1172 else:
1173 restored_enqueued_items.append(item)
1174 queue.enqueued_media_items = restored_enqueued_items
1175 except Exception as err:
1176 self.logger.warning(
1177 "Failed to restore the queue(items) for %s - %s",
1178 player.state.name,
1179 str(err),
1180 )
1181 # Reset to clean state on failure
1182 queue = None
1183 queue_items = []
1184 if queue is None:
1185 queue = PlayerQueue(
1186 queue_id=queue_id,
1187 active=False,
1188 display_name=player.state.name,
1189 available=player.state.available,
1190 dont_stop_the_music_enabled=False,
1191 items=0,
1192 )
1193
1194 self._queues[queue_id] = queue
1195 self._queue_items[queue_id] = queue_items
1196 # always call update to calculate state etc
1197 self.on_player_update(player, {})
1198 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1199
1200 def on_player_update(
1201 self,
1202 player: Player,
1203 changed_values: dict[str, tuple[Any, Any]],
1204 ) -> None:
1205 """
1206 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1207
1208 NOTE: This is called every second if the player is playing.
1209 """
1210 queue_id = player.player_id
1211 if (queue := self._queues.get(queue_id)) is None:
1212 # race condition
1213 return
1214 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1215 # do nothing while the announcement is in progress
1216 return
1217 # determine if this queue is currently active for this player
1218 queue.active = player.state.active_source in (queue.queue_id, None)
1219 if not queue.active and queue_id not in self._prev_states:
1220 queue.state = PlaybackState.IDLE
1221 # return early if the queue is not active and we have no previous state
1222 return
1223 if queue.queue_id in self._transitioning_players:
1224 # we're currently transitioning to a new track,
1225 # ignore updates from the player during this time
1226 return
1227
1228 # queue is active and preflight checks passed, update the queue details
1229 self._update_queue_from_player(player)
1230
1231 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1232 """Call when a player is removed from the registry."""
1233 if permanent:
1234 # if the player is permanently removed, we also remove the cached queue data
1235 self.mass.create_task(
1236 self.mass.cache.delete(
1237 key=player_id,
1238 provider=self.domain,
1239 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1240 )
1241 )
1242 self.mass.create_task(
1243 self.mass.cache.delete(
1244 key=player_id,
1245 provider=self.domain,
1246 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1247 )
1248 )
1249 self._queues.pop(player_id, None)
1250 self._queue_items.pop(player_id, None)
1251
1252 async def load_next_queue_item(
1253 self,
1254 queue_id: str,
1255 current_item_id: str,
1256 ) -> QueueItem:
1257 """
1258 Call when a player wants the next queue item to play.
1259
1260 Raises QueueEmpty if there are no more tracks left.
1261 """
1262 queue = self.get(queue_id)
1263 if not queue:
1264 msg = f"PlayerQueue {queue_id} is not available"
1265 raise PlayerUnavailableError(msg)
1266 cur_index = self.index_by_id(queue_id, current_item_id)
1267 if cur_index is None:
1268 # this is just a guard for bad data
1269 raise QueueEmpty("Invalid item id for queue given.")
1270 next_item: QueueItem | None = None
1271 idx = 0
1272 while True:
1273 next_index = self._get_next_index(queue_id, cur_index + idx)
1274 if next_index is None:
1275 raise QueueEmpty("No more tracks left in the queue.")
1276 queue_item = self.get_item(queue_id, next_index)
1277 if queue_item is None:
1278 raise QueueEmpty("No more tracks left in the queue.")
1279 if idx >= 10:
1280 # we only allow 10 retries to prevent infinite loops
1281 raise QueueEmpty("No more (playable) tracks left in the queue.")
1282 try:
1283 await self._load_item(queue_item, next_index)
1284 # we're all set, this is our next item
1285 next_item = queue_item
1286 break
1287 except (MediaNotFoundError, AudioError):
1288 # No stream details found, skip this QueueItem
1289 self.logger.warning(
1290 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1291 )
1292 queue_item.available = False
1293 idx += 1
1294 if idx != 0:
1295 # we skipped some items, signal a queue items update
1296 self.update_items(queue_id, self._queue_items[queue_id])
1297 if next_item is None:
1298 raise QueueEmpty("No more (playable) tracks left in the queue.")
1299
1300 return next_item
1301
1302 async def _load_item(
1303 self,
1304 queue_item: QueueItem,
1305 next_index: int | None,
1306 is_start: bool = False,
1307 seek_position: int = 0,
1308 fade_in: bool = False,
1309 ) -> None:
1310 """Try to load the stream details for the given queue item."""
1311 queue_id = queue_item.queue_id
1312 queue = self._queues[queue_id]
1313
1314 # we use a contextvar to bypass the throttler for this asyncio task/context
1315 # this makes sure that playback has priority over other requests that may be
1316 # happening in the background
1317 BYPASS_THROTTLER.set(True)
1318
1319 self.logger.debug(
1320 "(pre)loading (next) item for queue %s...",
1321 queue.display_name,
1322 )
1323
1324 if not queue_item.available:
1325 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1326
1327 # work out if we are playing an album and if we should prefer album
1328 # loudness
1329 next_track_from_same_album = (
1330 next_index is not None
1331 and (next_item := self.get_item(queue_id, next_index))
1332 and (
1333 queue_item.media_item
1334 and hasattr(queue_item.media_item, "album")
1335 and queue_item.media_item.album
1336 and next_item.media_item
1337 and hasattr(next_item.media_item, "album")
1338 and next_item.media_item.album
1339 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1340 )
1341 )
1342 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1343 if current_index is None:
1344 previous_track_from_same_album = False
1345 else:
1346 previous_index = max(current_index - 1, 0)
1347 previous_track_from_same_album = (
1348 previous_index > 0
1349 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1350 and previous_item.media_item is not None
1351 and hasattr(previous_item.media_item, "album")
1352 and previous_item.media_item.album is not None
1353 and queue_item.media_item is not None
1354 and hasattr(queue_item.media_item, "album")
1355 and queue_item.media_item.album is not None
1356 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1357 )
1358 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1359 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1360 album = queue_item.media_item.album
1361 # prefer the full library media item so we have all metadata and provider(quality) info
1362 # always request the full library item as there might be other qualities available
1363 if library_item := await self.mass.music.get_library_item_by_prov_id(
1364 queue_item.media_item.media_type,
1365 queue_item.media_item.item_id,
1366 queue_item.media_item.provider,
1367 ):
1368 queue_item.media_item = cast("Track", library_item)
1369 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1370 "ytmusic"
1371 ):
1372 # Youtube Music has poor thumbs by default, so we always fetch the full item
1373 # this also catches the case where they have an unavailable item in a listing
1374 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1375 queue_item.media_item = cast("Track", fetched_item)
1376
1377 # ensure we got the full (original) album set
1378 if album and (
1379 library_album := await self.mass.music.get_library_item_by_prov_id(
1380 album.media_type,
1381 album.item_id,
1382 album.provider,
1383 )
1384 ):
1385 queue_item.media_item.album = cast("Album", library_album)
1386 elif album:
1387 # Restore original album if we have no better alternative from the library
1388 queue_item.media_item.album = album
1389 # prefer album image over track image
1390 if queue_item.media_item.album and queue_item.media_item.album.image:
1391 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1392 queue_item.media_item.metadata.images = UniqueList(
1393 [
1394 queue_item.media_item.album.image,
1395 *org_images,
1396 ]
1397 )
1398 # Fetch the streamdetails, which could raise in case of an unplayable item.
1399 # For example, YT Music returns Radio Items that are not playable.
1400 queue_item.streamdetails = await get_stream_details(
1401 mass=self.mass,
1402 queue_item=queue_item,
1403 seek_position=seek_position,
1404 fade_in=fade_in,
1405 prefer_album_loudness=bool(playing_album_tracks),
1406 )
1407
1408 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1409 """Call when a player has (started) loading a track in the buffer."""
1410 queue = self.get(queue_id)
1411 if not queue:
1412 msg = f"PlayerQueue {queue_id} is not available"
1413 raise PlayerUnavailableError(msg)
1414 # store the index of the item that is currently (being) loaded in the buffer
1415 # which helps us a bit to determine how far the player has buffered ahead
1416 queue.index_in_buffer = self.index_by_id(queue_id, item_id)
1417 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1418 self.signal_update(queue_id)
1419 # preload next streamdetails
1420 self._preload_next_item(queue_id, item_id)
1421
1422 # Main queue manipulation methods
1423
1424 async def load(
1425 self,
1426 queue_id: str,
1427 queue_items: list[QueueItem],
1428 insert_at_index: int = 0,
1429 keep_remaining: bool = True,
1430 keep_played: bool = True,
1431 shuffle: bool = False,
1432 ) -> None:
1433 """Load new items at index.
1434
1435 - queue_id: id of the queue to process this request.
1436 - queue_items: a list of QueueItems
1437 - insert_at_index: insert the item(s) at this index
1438 - keep_remaining: keep the remaining items after the insert
1439 - shuffle: (re)shuffle the items after insert index
1440 """
1441 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1442 next_items = queue_items
1443
1444 # if keep_remaining, append the old 'next' items
1445 if keep_remaining:
1446 next_items += self._queue_items[queue_id][insert_at_index:]
1447
1448 # we set the original insert order as attribute so we can un-shuffle
1449 for index, item in enumerate(next_items):
1450 item.sort_index += insert_at_index + index
1451 # (re)shuffle the final batch if needed
1452 if shuffle:
1453 next_items = await _smart_shuffle(next_items)
1454 self.update_items(queue_id, prev_items + next_items)
1455
1456 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1457 """Update the existing queue items, mostly caused by reordering."""
1458 self._queue_items[queue_id] = queue_items
1459 queue = self._queues[queue_id]
1460 queue.items = len(self._queue_items[queue_id])
1461 # to track if the queue items changed we set a timestamp
1462 # this is a simple way to detect changes in the list of items
1463 # without having to compare the entire list
1464 queue.items_last_updated = time.time()
1465 self.signal_update(queue_id, True)
1466 if (
1467 queue.state == PlaybackState.PLAYING
1468 and queue.index_in_buffer is not None
1469 and queue.index_in_buffer == queue.current_index
1470 ):
1471 # if the queue is playing,
1472 # ensure to (re)queue the next track because it might have changed
1473 # note that we only do this if the player has loaded the current track
1474 # if not, we wait until it has loaded to prevent conflicts
1475 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1476 self._enqueue_next_item(queue_id, next_item)
1477
1478 # Helper methods
1479
1480 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1481 """Get queue item by index or item_id."""
1482 if item_id_or_index is None:
1483 return None
1484 if (queue_items := self._queue_items.get(queue_id)) is None:
1485 return None
1486 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1487 return queue_items[item_id_or_index]
1488 if isinstance(item_id_or_index, str):
1489 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1490 return None
1491
1492 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1493 """Signal state changed of given queue."""
1494 queue = self._queues[queue_id]
1495 if items_changed:
1496 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1497 # save items in cache - only cache items with valid media_item
1498 cache_data = [
1499 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1500 ]
1501 self.mass.create_task(
1502 self.mass.cache.set(
1503 key=queue_id,
1504 data=cache_data,
1505 provider=self.domain,
1506 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1507 )
1508 )
1509 # always send the base event
1510 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1511 # also signal update to the player itself so it can update its current_media
1512 self.mass.players.trigger_player_update(queue_id)
1513 # save state
1514 self.mass.create_task(
1515 self.mass.cache.set(
1516 key=queue_id,
1517 data=queue.to_cache(),
1518 provider=self.domain,
1519 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1520 )
1521 )
1522
1523 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1524 """Get index by queue_item_id."""
1525 queue_items = self._queue_items[queue_id]
1526 for index, item in enumerate(queue_items):
1527 if item.queue_item_id == queue_item_id:
1528 return index
1529 return None
1530
1531 async def player_media_from_queue_item(
1532 self, queue_item: QueueItem, flow_mode: bool
1533 ) -> PlayerMedia:
1534 """Parse PlayerMedia from QueueItem."""
1535 queue = self._queues[queue_item.queue_id]
1536 if flow_mode:
1537 duration = None
1538 elif queue_item.streamdetails:
1539 # prefer netto duration
1540 # when seeking, the player only receives the remaining duration
1541 duration = queue_item.streamdetails.duration or queue_item.duration
1542 if duration and queue_item.streamdetails.seek_position:
1543 duration = duration - queue_item.streamdetails.seek_position
1544 else:
1545 duration = queue_item.duration
1546 if queue.session_id is None:
1547 # handle error or return early
1548 raise InvalidDataError("Queue session_id is None")
1549 media = PlayerMedia(
1550 uri=queue_item.uri,
1551 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1552 title="Music Assistant" if flow_mode else queue_item.name,
1553 image_url=MASS_LOGO_ONLINE,
1554 duration=duration,
1555 source_id=queue_item.queue_id,
1556 queue_item_id=queue_item.queue_item_id,
1557 custom_data={
1558 "session_id": queue.session_id,
1559 "original_uri": queue_item.uri,
1560 "flow_mode": flow_mode,
1561 },
1562 )
1563 if not flow_mode and queue_item.media_item:
1564 media.title = queue_item.media_item.name
1565 media.artist = getattr(queue_item.media_item, "artist_str", "")
1566 media.album = (
1567 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1568 )
1569 if queue_item.image:
1570 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1571 # we prefer the imageproxy on the streamserver here because this request is sent
1572 # to the player itself which may not be able to reach the regular webserver
1573 media.image_url = self.mass.metadata.get_image_url(
1574 queue_item.image, size=500, prefer_stream_server=True
1575 )
1576 return media
1577
1578 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1579 """Return tracks for given artist, based on user preference."""
1580 artist_items_conf = self.mass.config.get_raw_core_config_value(
1581 self.domain,
1582 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1583 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1584 )
1585 self.logger.info(
1586 "Fetching tracks to play for artist %s",
1587 artist.name,
1588 )
1589 if artist_items_conf in ("library_tracks", "all_tracks"):
1590 all_items = await self.mass.music.artists.tracks(
1591 artist.item_id,
1592 artist.provider,
1593 in_library_only=artist_items_conf == "library_tracks",
1594 )
1595 random.shuffle(all_items)
1596 return all_items
1597 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1598 all_tracks: list[Track] = []
1599 for library_album in await self.mass.music.artists.albums(
1600 artist.item_id,
1601 artist.provider,
1602 in_library_only=artist_items_conf == "library_album_tracks",
1603 ):
1604 for album_track in await self.mass.music.albums.tracks(
1605 library_album.item_id, library_album.provider
1606 ):
1607 if album_track not in all_tracks:
1608 all_tracks.append(album_track)
1609 random.shuffle(all_tracks)
1610 return all_tracks
1611 return []
1612
1613 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1614 """Return tracks for given album, based on user preference."""
1615 album_items_conf = self.mass.config.get_raw_core_config_value(
1616 self.domain,
1617 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1618 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1619 )
1620 result: list[Track] = []
1621 start_item_found = False
1622 self.logger.info(
1623 "Fetching tracks to play for album %s",
1624 album.name,
1625 )
1626 for album_track in await self.mass.music.albums.tracks(
1627 item_id=album.item_id,
1628 provider_instance_id_or_domain=album.provider,
1629 in_library_only=album_items_conf == "library_tracks",
1630 ):
1631 if not album_track.available:
1632 continue
1633 if start_item in (album_track.item_id, album_track.uri):
1634 start_item_found = True
1635 if start_item is not None and not start_item_found:
1636 continue
1637 result.append(album_track)
1638 return result
1639
1640 async def get_genre_tracks(self, genre: Genre, start_item: str | None) -> list[Track]:
1641 """Return tracks for given genre, based on alias mappings.
1642
1643 Limits results to avoid loading thousands of tracks for broad genres.
1644 Directly mapped tracks are fetched with random ordering, then supplemented
1645 with tracks from a limited set of mapped albums and artists.
1646 """
1647 result: list[Track] = []
1648 start_item_found = False
1649 self.logger.info(
1650 "Fetching tracks to play for genre %s",
1651 genre.name,
1652 )
1653 tracks, albums, artists = await self.mass.music.genres.mapped_media(
1654 genre,
1655 track_limit=25,
1656 album_limit=5,
1657 artist_limit=5,
1658 order_by="random",
1659 )
1660
1661 for genre_track in tracks:
1662 if not genre_track.available:
1663 continue
1664 if start_item in (genre_track.item_id, genre_track.uri):
1665 start_item_found = True
1666 if start_item is not None and not start_item_found:
1667 continue
1668 result.append(genre_track)
1669
1670 for album in albums:
1671 album_tracks = await self.get_album_tracks(album, None)
1672 result.extend(album_tracks[:5])
1673
1674 for artist in artists:
1675 artist_tracks = await self.get_artist_tracks(artist)
1676 result.extend(artist_tracks[:5])
1677 return result
1678
1679 async def get_playlist_tracks(
1680 self, playlist: Playlist, start_item: str | None
1681 ) -> list[PlaylistPlayableItem]:
1682 """Return tracks for given playlist, based on user preference."""
1683 result: list[PlaylistPlayableItem] = []
1684 start_item_found = False
1685 self.logger.info(
1686 "Fetching tracks to play for playlist %s",
1687 playlist.name,
1688 )
1689 # TODO: Handle other sort options etc.
1690 async for playlist_track in self.mass.music.playlists.tracks(
1691 playlist.item_id, playlist.provider
1692 ):
1693 if not playlist_track.available:
1694 continue
1695 if start_item in (playlist_track.item_id, playlist_track.uri):
1696 start_item_found = True
1697 if start_item is not None and not start_item_found:
1698 continue
1699 result.append(playlist_track)
1700 return result
1701
1702 async def get_audiobook_resume_point(
1703 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1704 ) -> int:
1705 """Return resume point (in milliseconds) for given audio book."""
1706 self.logger.debug(
1707 "Fetching resume point to play for audio book %s",
1708 audio_book.name,
1709 )
1710 if chapter is not None:
1711 # user explicitly selected a chapter to play
1712 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1713 if chapters := audio_book.metadata.chapters:
1714 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1715 return int(_chapter.start * 1000)
1716 raise InvalidDataError(
1717 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1718 )
1719 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1720 audio_book, userid=userid
1721 )
1722 return 0 if full_played else resume_position_ms
1723
1724 async def get_next_podcast_episodes(
1725 self,
1726 podcast: Podcast | None,
1727 episode: PodcastEpisode | str | None,
1728 userid: str | None = None,
1729 ) -> UniqueList[PodcastEpisode]:
1730 """Return (next) episode(s) and resume point for given podcast."""
1731 if podcast is None and isinstance(episode, str | NoneType):
1732 raise InvalidDataError("Either podcast or episode must be provided")
1733 if podcast is None:
1734 # single podcast episode requested
1735 assert isinstance(episode, PodcastEpisode) # checked above
1736 self.logger.debug(
1737 "Fetching resume point to play for Podcast episode %s",
1738 episode.name,
1739 )
1740 (
1741 fully_played,
1742 resume_position_ms,
1743 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1744 episode.fully_played = fully_played
1745 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1746 return UniqueList([episode])
1747 # podcast with optional start episode requested
1748 self.logger.debug(
1749 "Fetching episode(s) and resume point to play for Podcast %s",
1750 podcast.name,
1751 )
1752 all_episodes = [
1753 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1754 ]
1755 all_episodes.sort(key=lambda x: x.position)
1756 # if a episode was provided, a user explicitly selected a episode to play
1757 # so we need to find the index of the episode in the list
1758 resolved_episode: PodcastEpisode | None = None
1759 if isinstance(episode, PodcastEpisode):
1760 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1761 if resolved_episode:
1762 # ensure we have accurate resume info
1763 (
1764 fully_played,
1765 resume_position_ms,
1766 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1767 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1768 elif isinstance(episode, str):
1769 resolved_episode = next(
1770 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1771 )
1772 if resolved_episode:
1773 # ensure we have accurate resume info
1774 (
1775 fully_played,
1776 resume_position_ms,
1777 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1778 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1779 else:
1780 # get first episode that is not fully played
1781 for ep in all_episodes:
1782 if ep.fully_played:
1783 continue
1784 # ensure we have accurate resume info
1785 (
1786 fully_played,
1787 resume_position_ms,
1788 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1789 if fully_played:
1790 continue
1791 ep.resume_position_ms = resume_position_ms
1792 resolved_episode = ep
1793 break
1794 else:
1795 # no episodes found that are not fully played, so we start at the beginning
1796 resolved_episode = next((x for x in all_episodes), None)
1797 if resolved_episode is None:
1798 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1799 # get the index of the episode
1800 episode_index = all_episodes.index(resolved_episode)
1801 # return the (remaining) episode(s) to play
1802 return UniqueList(all_episodes[episode_index:])
1803
1804 def _get_next_index(
1805 self,
1806 queue_id: str,
1807 cur_index: int | None,
1808 is_skip: bool = False,
1809 allow_repeat: bool = True,
1810 ) -> int | None:
1811 """
1812 Return the next index for the queue, accounting for repeat settings.
1813
1814 Will return None if there are no (more) items in the queue.
1815 """
1816 queue = self._queues[queue_id]
1817 queue_items = self._queue_items[queue_id]
1818 if not queue_items or cur_index is None:
1819 # queue is empty
1820 return None
1821 # handle repeat single track
1822 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1823 return cur_index if allow_repeat else None
1824 # handle cur_index is last index of the queue
1825 if cur_index >= (len(queue_items) - 1):
1826 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1827 # if repeat all is enabled, we simply start again from the beginning
1828 return 0
1829 return None
1830 # all other: just the next index
1831 return cur_index + 1
1832
1833 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1834 """Return next QueueItem for given queue."""
1835 index: int
1836 if isinstance(cur_index, str):
1837 resolved_index = self.index_by_id(queue_id, cur_index)
1838 if resolved_index is None:
1839 return None # guard
1840 index = resolved_index
1841 else:
1842 index = cur_index
1843 # At this point index is guaranteed to be int
1844 for skip in range(5):
1845 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1846 break
1847 next_item = self.get_item(queue_id, next_index)
1848 if next_item is None:
1849 continue
1850 if not next_item.available:
1851 # ensure that we skip unavailable items (set by load_next track logic)
1852 continue
1853 return next_item
1854 return None
1855
1856 async def _fill_radio_tracks(self, queue_id: str) -> None:
1857 """Fill a Queue with (additional) Radio tracks."""
1858 self.logger.debug(
1859 "Filling radio tracks for queue %s",
1860 queue_id,
1861 )
1862 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1863 # fill queue - filter out unavailable items
1864 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1865 await self.load(
1866 queue_id,
1867 queue_items,
1868 insert_at_index=len(self._queue_items[queue_id]) + 1,
1869 )
1870
1871 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1872 """Enqueue the next item on the player."""
1873 if not next_item:
1874 # no next item, nothing to do...
1875 return
1876
1877 queue = self._queues[queue_id]
1878 if queue.flow_mode:
1879 # ignore this for flow mode
1880 return
1881
1882 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1883 await self.mass.players.enqueue_next_media(
1884 player_id=queue_id,
1885 media=await self.player_media_from_queue_item(next_item, False),
1886 )
1887 if queue.next_item_id_enqueued != next_item.queue_item_id:
1888 queue.next_item_id_enqueued = next_item.queue_item_id
1889 self.logger.debug(
1890 "Enqueued next track %s on queue %s",
1891 next_item.name,
1892 self._queues[queue_id].display_name,
1893 )
1894
1895 task_id = f"enqueue_next_item_{queue_id}"
1896 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1897
1898 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1899 """
1900 Preload the streamdetails for the next item in the queue/buffer.
1901
1902 This basically ensures the item is playable and fetches the stream details.
1903 If an error occurs, the item will be skipped and the next item will be loaded.
1904 """
1905 queue = self._queues[queue_id]
1906
1907 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1908 try:
1909 # wait for the item that was loaded in the buffer is the actually playing item
1910 # this prevents a race condition when we preload the next item too soon
1911 # while the player is actually preloading the previously enqueued item.
1912 retries = 120
1913 while retries > 0:
1914 if not queue.current_item:
1915 return # guard
1916 if queue.current_item.queue_item_id == item_id_in_buffer:
1917 break
1918 retries -= 1
1919 await asyncio.sleep(1)
1920 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1921 self.logger.debug(
1922 "Preloaded next item %s for queue %s",
1923 next_item.name,
1924 queue.display_name,
1925 )
1926 # enqueue the next item on the player
1927 self._enqueue_next_item(queue_id, next_item)
1928
1929 except QueueEmpty:
1930 return
1931
1932 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1933 # this should not happen, but guard anyways
1934 return
1935 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1936 # radio items or no duration, nothing to do
1937 return
1938
1939 task_id = f"preload_next_item_{queue_id}"
1940 self.mass.create_task(
1941 _preload_streamdetails,
1942 item_id_in_buffer,
1943 task_id=task_id,
1944 abort_existing=True,
1945 )
1946
1947 async def _resolve_media_items(
1948 self,
1949 media_item: MediaItemType | ItemMapping | BrowseFolder,
1950 start_item: str | None = None,
1951 userid: str | None = None,
1952 queue_id: str | None = None,
1953 ) -> list[MediaItemType]:
1954 """Resolve/unwrap media items to enqueue."""
1955 # resolve Itemmapping to full media item
1956 if isinstance(media_item, ItemMapping):
1957 if media_item.uri is None:
1958 raise InvalidDataError("ItemMapping has no URI")
1959 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1960 if media_item.media_type == MediaType.PLAYLIST:
1961 media_item = cast("Playlist", media_item)
1962 self.mass.create_task(
1963 self.mass.music.mark_item_played(
1964 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1965 )
1966 )
1967 return list(await self.get_playlist_tracks(media_item, start_item))
1968 if media_item.media_type == MediaType.ARTIST:
1969 media_item = cast("Artist", media_item)
1970 self.mass.create_task(
1971 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1972 )
1973 return list(await self.get_artist_tracks(media_item))
1974 if media_item.media_type == MediaType.ALBUM:
1975 media_item = cast("Album", media_item)
1976 self.mass.create_task(
1977 self.mass.music.mark_item_played(
1978 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1979 )
1980 )
1981 return list(await self.get_album_tracks(media_item, start_item))
1982 if media_item.media_type == MediaType.GENRE:
1983 media_item = cast("Genre", media_item)
1984 self.mass.create_task(
1985 self.mass.music.mark_item_played(
1986 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1987 )
1988 )
1989 return list(await self.get_genre_tracks(media_item, start_item))
1990 if media_item.media_type == MediaType.AUDIOBOOK:
1991 media_item = cast("Audiobook", media_item)
1992 # ensure we grab the correct/latest resume point info
1993 media_item.resume_position_ms = await self.get_audiobook_resume_point(
1994 media_item, start_item, userid=userid
1995 )
1996 return [media_item]
1997 if media_item.media_type == MediaType.PODCAST:
1998 media_item = cast("Podcast", media_item)
1999 self.mass.create_task(
2000 self.mass.music.mark_item_played(
2001 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2002 )
2003 )
2004 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
2005 if media_item.media_type == MediaType.PODCAST_EPISODE:
2006 media_item = cast("PodcastEpisode", media_item)
2007 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
2008 if media_item.media_type == MediaType.FOLDER:
2009 media_item = cast("BrowseFolder", media_item)
2010 return list(await self._get_folder_tracks(media_item))
2011 # all other: single track or radio item
2012 return [cast("MediaItemType", media_item)]
2013
2014 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
2015 """Try to resume playback from playlog when queue is empty.
2016
2017 Attempts to find user-initiated recently played items in the following order:
2018 1. By userid AND queue_id
2019 2. By queue_id only
2020 3. By userid only (if available)
2021 4. Any recently played item
2022
2023 :param queue: The queue to resume playback on.
2024 :return: True if playback was started, False otherwise.
2025 """
2026 # Try different filter combinations in order of specificity
2027 filter_attempts: list[tuple[str | None, str | None, str]] = []
2028 if queue.userid:
2029 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
2030 filter_attempts.append((None, queue.queue_id, "queue_id match"))
2031 if queue.userid:
2032 filter_attempts.append((queue.userid, None, "userid match"))
2033 filter_attempts.append((None, None, "any recent item"))
2034
2035 for userid, queue_id, match_type in filter_attempts:
2036 items = await self.mass.music.recently_played(
2037 limit=5,
2038 fully_played_only=False,
2039 user_initiated_only=True,
2040 userid=userid,
2041 queue_id=queue_id,
2042 )
2043 for item in items:
2044 if not item.uri:
2045 continue
2046 try:
2047 await self.play_media(queue.queue_id, item)
2048 self.logger.info(
2049 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
2050 )
2051 return True
2052 except MusicAssistantError as err:
2053 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
2054 continue
2055
2056 return False
2057
2058 async def _get_radio_tracks(
2059 self, queue_id: str, is_initial_radio_mode: bool = False
2060 ) -> list[Track]:
2061 """Call the registered music providers for dynamic tracks."""
2062 queue = self._queues[queue_id]
2063 queue_track_items: list[Track] = [
2064 q.media_item
2065 for q in self._queue_items[queue_id]
2066 if q.media_item and isinstance(q.media_item, Track)
2067 ]
2068 if not queue.radio_source:
2069 # this may happen during race conditions as this method is called delayed
2070 return []
2071 self.logger.info(
2072 "Fetching radio tracks for queue %s based on: %s",
2073 queue.display_name,
2074 ", ".join([x.name for x in queue.radio_source]),
2075 )
2076
2077 # Get user's preferred provider instances for steering provider selection
2078 preferred_provider_instances: list[str] | None = None
2079 if (
2080 queue.userid
2081 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
2082 and playback_user.provider_filter
2083 ):
2084 preferred_provider_instances = playback_user.provider_filter
2085
2086 available_base_tracks: list[Track] = []
2087 base_track_sample_size = 5
2088 # Some providers have very deterministic similar track algorithms when providing
2089 # a single track item. When we have a radio mode based on 1 track and we have to
2090 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2091 if (
2092 len(queue.radio_source) == 1
2093 and queue.radio_source[0].media_type == MediaType.TRACK
2094 and not is_initial_radio_mode
2095 ):
2096 available_base_tracks = queue_track_items
2097 else:
2098 # Grab all the available base tracks based on the selected source items.
2099 # shuffle the source items, just in case
2100 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2101 ctrl = self.mass.music.get_controller(radio_item.media_type)
2102 try:
2103 available_base_tracks += [
2104 track
2105 for track in await ctrl.radio_mode_base_tracks(
2106 radio_item, # type: ignore[arg-type]
2107 preferred_provider_instances,
2108 )
2109 # Avoid duplicate base tracks
2110 if track not in available_base_tracks
2111 ]
2112 except UnsupportedFeaturedException as err:
2113 self.logger.debug(
2114 "Skip loading radio items for %s: %s ",
2115 radio_item.uri,
2116 str(err),
2117 )
2118 if not available_base_tracks:
2119 raise UnsupportedFeaturedException("Radio mode not available for source items")
2120
2121 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2122 base_tracks = random.sample(
2123 available_base_tracks,
2124 min(base_track_sample_size, len(available_base_tracks)),
2125 )
2126 # Use a set to avoid duplicate dynamic tracks
2127 dynamic_tracks: set[Track] = set()
2128 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2129 for allow_lookup in (False, True):
2130 if dynamic_tracks:
2131 break
2132 for base_track in base_tracks:
2133 try:
2134 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2135 base_track.item_id,
2136 base_track.provider,
2137 allow_lookup=allow_lookup,
2138 preferred_provider_instances=preferred_provider_instances,
2139 )
2140 except MediaNotFoundError:
2141 # Some providers don't have similar tracks for all items. For example,
2142 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2143 # in that case, just skip the track.
2144 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2145 continue
2146 for track in _similar_tracks:
2147 if (
2148 track not in base_tracks
2149 # Exclude tracks we have already played / queued
2150 and track not in queue_track_items
2151 # Ignore tracks that are too long for radio mode, e.g. mixes
2152 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2153 ):
2154 dynamic_tracks.add(track)
2155 if len(dynamic_tracks) >= 50:
2156 break
2157 queue_tracks: list[Track] = []
2158 dynamic_tracks_list = list(dynamic_tracks)
2159 # Only include the sampled base tracks when the radio mode is first initialized
2160 if is_initial_radio_mode:
2161 queue_tracks += [base_tracks[0]]
2162 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2163 if len(base_tracks) > 1:
2164 for base_track in base_tracks[1:]:
2165 queue_tracks += [base_track]
2166 if len(dynamic_tracks_list) > 2:
2167 queue_tracks += random.sample(dynamic_tracks_list, 2)
2168 else:
2169 queue_tracks += dynamic_tracks_list
2170 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2171 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2172 if remaining_dynamic_tracks:
2173 queue_tracks += random.sample(
2174 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2175 )
2176 return queue_tracks
2177
2178 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2179 """Fetch (playable) tracks for given browse folder."""
2180 self.logger.info(
2181 "Fetching tracks to play for folder %s",
2182 folder.name,
2183 )
2184 tracks: list[Track] = []
2185 for item in await self.mass.music.browse(folder.path):
2186 if not item.is_playable:
2187 continue
2188 # recursively fetch tracks from all media types
2189 resolved = await self._resolve_media_items(item)
2190 tracks += [x for x in resolved if isinstance(x, Track)]
2191
2192 return tracks
2193
2194 def _update_queue_from_player(
2195 self,
2196 player: Player,
2197 ) -> None:
2198 """Update the Queue when the player state changed."""
2199 queue_id = player.player_id
2200 queue = self._queues[queue_id]
2201
2202 # basic properties
2203 queue.display_name = player.state.name
2204 queue.available = player.state.available
2205 queue.items = len(self._queue_items[queue_id])
2206
2207 queue.state = (
2208 player.state.playback_state or PlaybackState.IDLE
2209 if queue.active
2210 else PlaybackState.IDLE
2211 )
2212 # update current item/index from player report
2213 if queue.active and queue.state in (
2214 PlaybackState.PLAYING,
2215 PlaybackState.PAUSED,
2216 ):
2217 # NOTE: If the queue is not playing (yet) we will not update the current index
2218 # to ensure we keep the previously known current index
2219 if queue.flow_mode:
2220 # flow mode active, the player is playing one long stream
2221 # so we need to calculate the current index and elapsed time
2222 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2223 elif item_id := self._parse_player_current_item_id(queue_id, player):
2224 # normal mode, the player itself will report the current item
2225 elapsed_time = int(player.state.corrected_elapsed_time or 0)
2226 current_index = self.index_by_id(queue_id, item_id)
2227 else:
2228 # this may happen if the player is still transitioning between tracks
2229 # we ignore this for now and keep the current index as is
2230 return
2231
2232 # get current/next item based on current index
2233 queue.current_index = current_index
2234 queue.current_item = current_item = self.get_item(queue_id, current_index)
2235 queue.next_item = (
2236 self.get_next_item(queue_id, current_index)
2237 if current_item and current_index is not None
2238 else None
2239 )
2240
2241 # correct elapsed time when seeking
2242 if (
2243 not queue.flow_mode
2244 and current_item
2245 and current_item.streamdetails
2246 and current_item.streamdetails.seek_position
2247 ):
2248 elapsed_time += current_item.streamdetails.seek_position
2249 queue.elapsed_time = elapsed_time
2250 queue.elapsed_time_last_updated = time.time()
2251
2252 elif not queue.current_item and queue.current_index is not None:
2253 current_index = queue.current_index
2254 queue.current_item = current_item = self.get_item(queue_id, current_index)
2255 queue.next_item = (
2256 self.get_next_item(queue_id, current_index)
2257 if current_item and current_index is not None
2258 else None
2259 )
2260
2261 # This is enough to detect any changes in the DSPDetails
2262 # (so child count changed, or any output format changed)
2263 output_formats = []
2264 if output_format := player.extra_data.get("output_format"):
2265 output_formats.append(str(output_format))
2266 for child_id in player.state.group_members:
2267 if (child := self.mass.players.get_player(child_id)) and (
2268 output_format := child.extra_data.get("output_format")
2269 ):
2270 output_formats.append(str(output_format))
2271 else:
2272 output_formats.append("unknown")
2273
2274 # basic throttle: do not send state changed events if queue did not actually change
2275 prev_state: CompareState = self._prev_states.get(
2276 queue_id,
2277 CompareState(
2278 queue_id=queue_id,
2279 state=PlaybackState.IDLE,
2280 current_item_id=None,
2281 next_item_id=None,
2282 current_item=None,
2283 elapsed_time=0,
2284 last_playing_elapsed_time=0,
2285 stream_title=None,
2286 codec_type=None,
2287 output_formats=None,
2288 ),
2289 )
2290 # update last_playing_elapsed_time only when the player is actively playing
2291 # use corrected_elapsed_time which accounts for time since last update
2292 # this preserves the last known elapsed time when transitioning to idle/paused
2293 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2294 prev_item_id = prev_state["current_item_id"]
2295 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2296 if queue.state == PlaybackState.PLAYING:
2297 current_elapsed = int(queue.corrected_elapsed_time)
2298 if current_item_id != prev_item_id:
2299 # new track started, reset the elapsed time tracker
2300 last_playing_elapsed_time = current_elapsed
2301 else:
2302 # same track, use the max of current and previous to handle timing issues
2303 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2304 else:
2305 last_playing_elapsed_time = prev_playing_elapsed
2306 new_state = CompareState(
2307 queue_id=queue_id,
2308 state=queue.state,
2309 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2310 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2311 current_item=queue.current_item,
2312 elapsed_time=int(queue.elapsed_time),
2313 last_playing_elapsed_time=last_playing_elapsed_time,
2314 stream_title=(
2315 queue.current_item.streamdetails.stream_title
2316 if queue.current_item and queue.current_item.streamdetails
2317 else None
2318 ),
2319 codec_type=(
2320 queue.current_item.streamdetails.audio_format.codec_type
2321 if queue.current_item and queue.current_item.streamdetails
2322 else None
2323 ),
2324 output_formats=output_formats,
2325 )
2326 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2327 with suppress(KeyError):
2328 changed_keys.remove("next_item_id")
2329 with suppress(KeyError):
2330 changed_keys.remove("last_playing_elapsed_time")
2331
2332 # store the new state
2333 if queue.active:
2334 self._prev_states[queue_id] = new_state
2335 else:
2336 self._prev_states.pop(queue_id, None)
2337
2338 # return early if nothing changed
2339 if len(changed_keys) == 0:
2340 return
2341
2342 # signal update and store state
2343 send_update = True
2344 if changed_keys == {"elapsed_time"}:
2345 # only elapsed time changed, do not send full queue update
2346 send_update = False
2347 prev_time = prev_state.get("elapsed_time") or 0
2348 cur_time = new_state.get("elapsed_time") or 0
2349 if abs(cur_time - prev_time) > 2:
2350 # send dedicated event for time updates when seeking
2351 self.mass.signal_event(
2352 EventType.QUEUE_TIME_UPDATED,
2353 object_id=queue_id,
2354 data=queue.elapsed_time,
2355 )
2356 # also signal update to the player itself so it can update its current_media
2357 self.mass.players.trigger_player_update(queue_id)
2358
2359 if send_update:
2360 self.signal_update(queue_id)
2361
2362 if "output_formats" in changed_keys:
2363 # refresh DSP details since they may have changed
2364 dsp = get_stream_dsp_details(self.mass, queue_id)
2365 if queue.current_item and queue.current_item.streamdetails:
2366 queue.current_item.streamdetails.dsp = dsp
2367 if queue.next_item and queue.next_item.streamdetails:
2368 queue.next_item.streamdetails.dsp = dsp
2369
2370 # handle updating stream_metadata if needed
2371 if (
2372 queue.current_item
2373 and (streamdetails := queue.current_item.streamdetails)
2374 and streamdetails.stream_metadata_update_callback
2375 and (
2376 streamdetails.stream_metadata_last_updated is None
2377 or (
2378 time.time() - streamdetails.stream_metadata_last_updated
2379 >= streamdetails.stream_metadata_update_interval
2380 )
2381 )
2382 ):
2383 streamdetails.stream_metadata_last_updated = time.time()
2384 self.mass.create_task(
2385 streamdetails.stream_metadata_update_callback(
2386 streamdetails, int(queue.corrected_elapsed_time)
2387 )
2388 )
2389
2390 # handle sending a playback progress report
2391 # we do this every 30 seconds or when the state changes
2392 if (
2393 changed_keys.intersection({"state", "current_item_id"})
2394 or int(queue.elapsed_time) % PLAYBACK_REPORT_INTERVAL_SECONDS == 0
2395 ):
2396 self._handle_playback_progress_report(queue, prev_state, new_state)
2397
2398 # check if we need to clear the queue if we reached the end
2399 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2400 self._handle_end_of_queue(queue, prev_state, new_state)
2401
2402 # watch dynamic radio items refill if needed
2403 if "current_item_id" in changed_keys:
2404 # auto enable radio mode if dont stop the music is enabled
2405 if (
2406 queue.dont_stop_the_music_enabled
2407 and queue.enqueued_media_items
2408 and queue.current_index is not None
2409 and (queue.items - queue.current_index) <= 1
2410 ):
2411 # We have received the last item in the queue and Don't stop the music is enabled
2412 # set the played media item(s) as radio items (which will refill the queue)
2413 # note that this will fail if there are no media items for which we have
2414 # a dynamic radio source.
2415 self.logger.debug(
2416 "End of queue detected and Don't stop the music is enabled for %s"
2417 " - setting enqueued media items as radio source: %s",
2418 queue.display_name,
2419 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2420 )
2421 queue.radio_source = queue.enqueued_media_items
2422 # auto fill radio tracks if less than 5 tracks left in the queue
2423 if (
2424 queue.radio_source
2425 and queue.current_index is not None
2426 and (queue.items - queue.current_index) < 5
2427 ):
2428 task_id = f"fill_radio_tracks_{queue_id}"
2429 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2430
2431 def _get_flow_queue_stream_index(
2432 self, queue: PlayerQueue, player: Player
2433 ) -> tuple[int | None, int]:
2434 """Calculate current queue index and current track elapsed time when flow mode is active."""
2435 elapsed_time_queue_total = player.state.corrected_elapsed_time or 0
2436 if queue.current_index is None and not queue.flow_mode_stream_log:
2437 return queue.current_index, int(queue.elapsed_time)
2438
2439 # For each track that has been streamed/buffered to the player,
2440 # a playlog entry will be created with the queue item id
2441 # and the amount of seconds streamed. We traverse the playlog to figure
2442 # out where we are in the queue, accounting for actual streamed
2443 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2444 # it will simply be in the playlog multiple times.
2445 played_time = 0.0
2446 queue_index: int | None = queue.current_index or 0
2447 track_time = 0.0
2448 for play_log_entry in queue.flow_mode_stream_log:
2449 queue_item_duration = (
2450 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2451 play_log_entry.seconds_streamed
2452 if play_log_entry.seconds_streamed is not None
2453 else play_log_entry.duration or 3600 * 24 * 7
2454 )
2455 if elapsed_time_queue_total > (queue_item_duration + played_time):
2456 # total elapsed time is more than (streamed) track duration
2457 # this track has been fully played, move on.
2458 played_time += queue_item_duration
2459 else:
2460 # no more seconds left to divide, this is our track
2461 # account for any seeking by adding the skipped/seeked seconds
2462 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2463 queue_item = self.get_item(queue.queue_id, queue_index)
2464 if queue_item and queue_item.streamdetails:
2465 track_sec_skipped = queue_item.streamdetails.seek_position
2466 else:
2467 track_sec_skipped = 0
2468 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2469 break
2470 if player.state.playback_state != PlaybackState.PLAYING:
2471 # if the player is not playing, we can't be sure that the elapsed time is correct
2472 # so we just return the queue index and the elapsed time
2473 return queue.current_index, int(queue.elapsed_time)
2474 return queue_index, int(track_time)
2475
2476 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2477 """Parse QueueItem ID from Player's current url."""
2478 protocol_player = player
2479 if player.active_output_protocol and player.active_output_protocol != "native":
2480 protocol_player = self.mass.players.get_player(player.active_output_protocol) or player
2481 if not protocol_player.current_media:
2482 # YES, we use player.current_media on purpose here because we need the raw metadata
2483 return None
2484 # prefer queue_id and queue_item_id within the current media
2485 if (
2486 protocol_player.current_media.source_id == queue_id
2487 and protocol_player.current_media.queue_item_id
2488 ):
2489 return protocol_player.current_media.queue_item_id
2490 # special case for sonos players
2491 if protocol_player.current_media.uri and protocol_player.current_media.uri.startswith(
2492 f"mass:{queue_id}"
2493 ):
2494 if protocol_player.current_media.queue_item_id:
2495 return protocol_player.current_media.queue_item_id
2496 return protocol_player.current_media.uri.split(":")[-1]
2497 # try to extract the item id from a mass stream url
2498 if (
2499 protocol_player.current_media.uri
2500 and queue_id in protocol_player.current_media.uri
2501 and self.mass.streams.base_url in protocol_player.current_media.uri
2502 ):
2503 current_item_id = protocol_player.current_media.uri.rsplit("/")[-1].split(".")[0]
2504 if self.get_item(queue_id, current_item_id):
2505 return current_item_id
2506 # try to extract the item id from a queue_id/item_id combi
2507 if (
2508 protocol_player.current_media.uri
2509 and queue_id in protocol_player.current_media.uri
2510 and "/" in protocol_player.current_media.uri
2511 ):
2512 current_item_id = protocol_player.current_media.uri.split("/")[1]
2513 if self.get_item(queue_id, current_item_id):
2514 return current_item_id
2515
2516 return None
2517
2518 def _handle_end_of_queue(
2519 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2520 ) -> None:
2521 """Check if the queue should be cleared after the current item."""
2522 # check if queue state changed to stopped (from playing/paused to idle)
2523 if not (
2524 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2525 and new_state["state"] == PlaybackState.IDLE
2526 ):
2527 return
2528 # check if no more items in the queue (next_item should be None at end of queue)
2529 if queue.next_item is not None:
2530 return
2531 # check if we had a previous item playing
2532 if prev_state["current_item_id"] is None:
2533 return
2534
2535 async def _clear_queue_delayed() -> None:
2536 for _ in range(5):
2537 await asyncio.sleep(1)
2538 if queue.state != PlaybackState.IDLE:
2539 return
2540 if queue.next_item is not None:
2541 return
2542 self.logger.info("End of queue reached, clearing items")
2543 self.clear(queue.queue_id)
2544
2545 # all checks passed, we stopped playback at the last (or single) track of the queue
2546 # now determine if the item was fully played before clearing
2547
2548 # For flow mode, check if the last track was fully streamed using the stream log
2549 # This is more reliable than elapsed_time which can be reset/incorrect
2550 if queue.flow_mode and queue.flow_mode_stream_log:
2551 last_log_entry = queue.flow_mode_stream_log[-1]
2552 if last_log_entry.seconds_streamed is not None:
2553 # The last track finished streaming, safe to clear queue
2554 self.mass.create_task(_clear_queue_delayed())
2555 return
2556
2557 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2558 prev_item = prev_state["current_item"]
2559 if prev_item and (streamdetails := prev_item.streamdetails):
2560 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2561 elif prev_item:
2562 duration = prev_item.duration or 24 * 3600
2563 else:
2564 # No current item means player has already cleared it, safe to clear queue
2565 self.mass.create_task(_clear_queue_delayed())
2566 return
2567
2568 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2569 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2570 seconds_played = int(prev_state["last_playing_elapsed_time"])
2571 # debounce this a bit to make sure we're not clearing the queue by accident
2572 # only clear if the last track was played to near completion (within 5 seconds of end)
2573 if seconds_played >= (duration or 3600) - 5:
2574 self.mass.create_task(_clear_queue_delayed())
2575
2576 def _handle_playback_progress_report(
2577 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2578 ) -> None:
2579 """Handle playback progress report."""
2580 # detect change in current index to report that a item has been played
2581 prev_item_id = prev_state["current_item_id"]
2582 cur_item_id = new_state["current_item_id"]
2583 if prev_item_id is None and cur_item_id is None:
2584 return
2585
2586 if prev_item_id is not None and prev_item_id != cur_item_id:
2587 # we have a new item, so we need report the previous one
2588 is_current_item = False
2589 item_to_report = prev_state["current_item"]
2590 seconds_played = int(prev_state["elapsed_time"])
2591 else:
2592 # report on current item
2593 is_current_item = True
2594 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2595 seconds_played = int(new_state["elapsed_time"])
2596
2597 if not item_to_report:
2598 return # guard against invalid items
2599
2600 if not (media_item := item_to_report.media_item):
2601 # only report on media items
2602 return
2603 assert media_item.uri is not None # uri is set in __post_init__
2604
2605 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2606 # Ignore items that had a stream error
2607 return
2608
2609 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2610 duration = int(item_to_report.streamdetails.duration)
2611 else:
2612 duration = int(item_to_report.duration or 3 * 3600)
2613
2614 if seconds_played < 5:
2615 # ignore items that have been played less than 5 seconds
2616 # this also filters out a bounce effect where the previous item
2617 # gets reported with 0 elapsed seconds after a new item starts playing
2618 return
2619
2620 # determine if item is fully played
2621 # for podcasts and audiobooks we account for the last 60 seconds
2622 percentage_played = percentage(seconds_played, duration)
2623 if not is_current_item and item_to_report.media_type in (
2624 MediaType.AUDIOBOOK,
2625 MediaType.PODCAST_EPISODE,
2626 ):
2627 fully_played = seconds_played >= duration - 60
2628 elif not is_current_item:
2629 # 90% of the track must be played to be considered fully played
2630 fully_played = percentage_played >= 90
2631 else:
2632 fully_played = seconds_played >= duration - 10
2633
2634 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2635 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2636 self.logger.debug(
2637 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2638 queue.display_name,
2639 "is playing" if is_playing else "played",
2640 item_to_report.name,
2641 item_to_report.uri,
2642 fully_played,
2643 f"{percentage_played}%",
2644 seconds_played,
2645 duration,
2646 )
2647 # add entry to playlog - this also handles resume of podcasts/audiobooks
2648 self.mass.create_task(
2649 self.mass.music.mark_item_played(
2650 media_item,
2651 fully_played=fully_played,
2652 seconds_played=seconds_played,
2653 is_playing=is_playing,
2654 userid=queue.userid,
2655 queue_id=queue.queue_id,
2656 user_initiated=False,
2657 )
2658 )
2659
2660 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2661 # signal 'media item played' event,
2662 # which is useful for plugins that want to do scrobbling
2663 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2664 artists_names = [a.name for a in artists]
2665 self.mass.signal_event(
2666 EventType.MEDIA_ITEM_PLAYED,
2667 object_id=media_item.uri,
2668 data=MediaItemPlaybackProgressReport(
2669 uri=media_item.uri,
2670 media_type=media_item.media_type,
2671 name=media_item.name,
2672 version=getattr(media_item, "version", None),
2673 artist=(
2674 getattr(media_item, "artist_str", None) or artists_names[0]
2675 if artists_names
2676 else None
2677 ),
2678 artists=artists_names,
2679 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2680 album=album.name if album else None,
2681 album_mbid=album.mbid if album else None,
2682 album_artist=(album.artist_str if isinstance(album, Album) else None),
2683 album_artist_mbids=(
2684 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2685 ),
2686 image_url=(
2687 self.mass.metadata.get_image_url(
2688 item_to_report.media_item.image, prefer_proxy=False
2689 )
2690 if item_to_report.media_item.image
2691 else None
2692 ),
2693 duration=duration,
2694 mbid=(getattr(media_item, "mbid", None)),
2695 seconds_played=seconds_played,
2696 fully_played=fully_played,
2697 is_playing=is_playing,
2698 userid=queue.userid,
2699 ),
2700 )
2701
2702
2703async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2704 """Shuffle queue items, avoiding identical tracks next to each other.
2705
2706 Best-effort approach to prevent the same track from appearing adjacent.
2707 Does a random shuffle first, then makes a limited number of passes to
2708 swap adjacent duplicates with a random item further in the list.
2709
2710 :param items: List of queue items to shuffle.
2711 """
2712 if len(items) <= 2:
2713 return random.sample(items, len(items)) if len(items) == 2 else items
2714
2715 # Start with a random shuffle
2716 shuffled = random.sample(items, len(items))
2717
2718 # Make a few passes to fix adjacent duplicates
2719 max_passes = 3
2720 for _ in range(max_passes):
2721 swapped = False
2722 for i in range(len(shuffled) - 1):
2723 if shuffled[i].name == shuffled[i + 1].name:
2724 # Found adjacent duplicate - swap with random position at least 2 away
2725 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2726 if swap_candidates:
2727 swap_pos = random.choice(swap_candidates)
2728 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2729 swapped = True
2730 if not swapped:
2731 break
2732 # Yield to event loop between passes
2733 await asyncio.sleep(0)
2734
2735 return shuffled
2736