/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 ProviderFeature,
32 QueueOption,
33 RepeatMode,
34)
35from music_assistant_models.errors import (
36 AudioError,
37 InvalidCommand,
38 InvalidDataError,
39 MediaNotFoundError,
40 MusicAssistantError,
41 PlayerUnavailableError,
42 QueueEmpty,
43 UnsupportedFeaturedException,
44)
45from music_assistant_models.media_items import (
46 Album,
47 Artist,
48 Audiobook,
49 BrowseFolder,
50 Genre,
51 ItemMapping,
52 MediaItemType,
53 PlayableMediaItemType,
54 Playlist,
55 Podcast,
56 PodcastEpisode,
57 Track,
58 UniqueList,
59 media_from_dict,
60)
61from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
62from music_assistant_models.player_queue import PlayerQueue
63from music_assistant_models.queue_item import QueueItem
64
65from music_assistant.constants import (
66 ATTR_ANNOUNCEMENT_IN_PROGRESS,
67 MASS_LOGO_ONLINE,
68 PLAYBACK_REPORT_INTERVAL_SECONDS,
69 PLAYLIST_MEDIA_TYPES,
70 VERBOSE_LOG_LEVEL,
71 PlaylistPlayableItem,
72)
73from music_assistant.controllers.players.controller import IN_QUEUE_COMMAND
74from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
75from music_assistant.helpers.api import api_command
76from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
77from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
78from music_assistant.helpers.util import get_changed_keys, percentage
79from music_assistant.models.core_controller import CoreController
80from music_assistant.models.player import Player, PlayerMedia
81
82if TYPE_CHECKING:
83 from collections.abc import Iterator
84
85 from music_assistant_models.auth import User
86 from music_assistant_models.media_items.metadata import MediaItemImage
87
88 from music_assistant import MusicAssistant
89 from music_assistant.models.player import Player
90
91CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
92CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
93
94ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
95ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
96
97CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
98CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
99CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
100CONF_DEFAULT_ENQUEUE_OPTION_GENRE = "default_enqueue_option_genre"
101CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
102CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
103CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
104CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
105CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
106CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
107CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
108RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
109CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
110CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
111
112
113class CompareState(TypedDict):
114 """Simple object where we store the (previous) state of a queue.
115
116 Used for compare actions.
117 """
118
119 queue_id: str
120 state: PlaybackState
121 current_item_id: str | None
122 next_item_id: str | None
123 current_item: QueueItem | None
124 elapsed_time: int
125 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
126 # used to determine if a track was fully played when transitioning to idle
127 last_playing_elapsed_time: int
128 stream_title: str | None
129 codec_type: ContentType | None
130 output_formats: list[str] | None
131
132
133class PlayerQueuesController(CoreController):
134 """Controller holding all logic to enqueue music for players."""
135
136 domain: str = "player_queues"
137
138 def __init__(self, mass: MusicAssistant) -> None:
139 """Initialize core controller."""
140 super().__init__(mass)
141 self._queues: dict[str, PlayerQueue] = {}
142 self._queue_items: dict[str, list[QueueItem]] = {}
143 self._prev_states: dict[str, CompareState] = {}
144 self._transitioning_players: set[str] = set()
145 self.manifest.name = "Player Queues controller"
146 self.manifest.description = (
147 "Music Assistant's core controller which manages the queues for all players."
148 )
149 self.manifest.icon = "playlist-music"
150
151 async def close(self) -> None:
152 """Cleanup on exit."""
153 # stop all playback
154 for queue in self.all():
155 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
156 await self.stop(queue.queue_id)
157
158 async def get_config_entries(
159 self,
160 action: str | None = None,
161 values: dict[str, ConfigValueType] | None = None,
162 ) -> tuple[ConfigEntry, ...]:
163 """Return all Config Entries for this core module (if any)."""
164 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
165 return (
166 ConfigEntry(
167 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
168 type=ConfigEntryType.STRING,
169 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
170 label="Items to select when you play a (in-library) artist.",
171 options=[
172 ConfigValueOption(
173 title="Only in-library tracks",
174 value="library_tracks",
175 ),
176 ConfigValueOption(
177 title="All tracks from all albums in the library",
178 value="library_album_tracks",
179 ),
180 ConfigValueOption(
181 title="All (top) tracks from (all) streaming provider(s)",
182 value="all_tracks",
183 ),
184 ConfigValueOption(
185 title="All tracks from all albums from (all) streaming provider(s)",
186 value="all_album_tracks",
187 ),
188 ],
189 ),
190 ConfigEntry(
191 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
192 type=ConfigEntryType.STRING,
193 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
194 label="Items to select when you play a (in-library) album.",
195 options=[
196 ConfigValueOption(
197 title="Only in-library tracks",
198 value="library_tracks",
199 ),
200 ConfigValueOption(
201 title="All tracks for album on (streaming) provider",
202 value="all_tracks",
203 ),
204 ],
205 ),
206 ConfigEntry(
207 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
208 type=ConfigEntryType.STRING,
209 default_value=QueueOption.REPLACE.value,
210 label="Default enqueue option for Artist item(s).",
211 options=enqueue_options,
212 description="Define the default enqueue action for this mediatype.",
213 ),
214 ConfigEntry(
215 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
216 type=ConfigEntryType.STRING,
217 default_value=QueueOption.REPLACE.value,
218 label="Default enqueue option for Album item(s).",
219 options=enqueue_options,
220 description="Define the default enqueue action for this mediatype.",
221 ),
222 ConfigEntry(
223 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
224 type=ConfigEntryType.STRING,
225 default_value=QueueOption.PLAY.value,
226 label="Default enqueue option for Track item(s).",
227 options=enqueue_options,
228 description="Define the default enqueue action for this mediatype.",
229 ),
230 ConfigEntry(
231 key=CONF_DEFAULT_ENQUEUE_OPTION_GENRE,
232 type=ConfigEntryType.STRING,
233 default_value=QueueOption.REPLACE.value,
234 label="Default enqueue option for Genre item(s).",
235 options=enqueue_options,
236 description="Define the default enqueue action for this mediatype.",
237 ),
238 ConfigEntry(
239 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
240 type=ConfigEntryType.STRING,
241 default_value=QueueOption.REPLACE.value,
242 label="Default enqueue option for Radio item(s).",
243 options=enqueue_options,
244 description="Define the default enqueue action for this mediatype.",
245 ),
246 ConfigEntry(
247 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
248 type=ConfigEntryType.STRING,
249 default_value=QueueOption.REPLACE.value,
250 label="Default enqueue option for Playlist item(s).",
251 options=enqueue_options,
252 description="Define the default enqueue action for this mediatype.",
253 ),
254 ConfigEntry(
255 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
256 type=ConfigEntryType.STRING,
257 default_value=QueueOption.REPLACE.value,
258 label="Default enqueue option for Audiobook item(s).",
259 options=enqueue_options,
260 hidden=True,
261 ),
262 ConfigEntry(
263 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
264 type=ConfigEntryType.STRING,
265 default_value=QueueOption.REPLACE.value,
266 label="Default enqueue option for Podcast item(s).",
267 options=enqueue_options,
268 hidden=True,
269 ),
270 ConfigEntry(
271 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
272 type=ConfigEntryType.STRING,
273 default_value=QueueOption.REPLACE.value,
274 label="Default enqueue option for Podcast-episode item(s).",
275 options=enqueue_options,
276 hidden=True,
277 ),
278 ConfigEntry(
279 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
280 type=ConfigEntryType.STRING,
281 default_value=QueueOption.REPLACE.value,
282 label="Default enqueue option for Folder item(s).",
283 options=enqueue_options,
284 hidden=True,
285 ),
286 )
287
288 def __iter__(self) -> Iterator[PlayerQueue]:
289 """Iterate over (available) players."""
290 return iter(self._queues.values())
291
292 @api_command("player_queues/all")
293 def all(self) -> tuple[PlayerQueue, ...]:
294 """Return all registered PlayerQueues."""
295 return tuple(self._queues.values())
296
297 @api_command("player_queues/get")
298 def get(self, queue_id: str) -> PlayerQueue | None:
299 """Return PlayerQueue by queue_id or None if not found."""
300 return self._queues.get(queue_id)
301
302 @api_command("player_queues/items")
303 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
304 """Return all QueueItems for given PlayerQueue."""
305 if queue_id not in self._queue_items:
306 return []
307
308 return self._queue_items[queue_id][offset : offset + limit]
309
310 @api_command("player_queues/get_active_queue")
311 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
312 """Return the current active/synced queue for a player."""
313 if player := self.mass.players.get_player(player_id):
314 return self.mass.players.get_active_queue(player)
315 return None
316
317 # Queue commands
318
319 @api_command("player_queues/shuffle")
320 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
321 """Configure shuffle setting on the the queue."""
322 queue = self._queues[queue_id]
323 if queue.shuffle_enabled == shuffle_enabled:
324 return # no change
325 queue.shuffle_enabled = shuffle_enabled
326 queue_items = self._queue_items[queue_id]
327 cur_index = (
328 queue.index_in_buffer if queue.index_in_buffer is not None else queue.current_index
329 )
330 if cur_index is not None:
331 next_index = cur_index + 1
332 next_items = queue_items[next_index:]
333 else:
334 next_items = []
335 next_index = 0
336 if not shuffle_enabled:
337 # shuffle disabled, try to restore original sort order of the remaining items
338 next_items.sort(key=lambda x: x.sort_index, reverse=False)
339 await self.load(
340 queue_id=queue_id,
341 queue_items=next_items,
342 insert_at_index=next_index,
343 keep_remaining=False,
344 shuffle=shuffle_enabled,
345 )
346
347 @api_command("player_queues/dont_stop_the_music")
348 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
349 """Configure Don't stop the music setting on the queue."""
350 providers_available_with_similar_tracks = any(
351 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
352 for provider in self.mass.music.providers
353 )
354 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
355 raise UnsupportedFeaturedException(
356 "Don't stop the music is not supported by any of the available music providers"
357 )
358 queue = self._queues[queue_id]
359 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
360 self.signal_update(queue_id=queue_id)
361 # if this happens to be the last track in the queue, fill the radio source
362 if (
363 queue.dont_stop_the_music_enabled
364 and queue.enqueued_media_items
365 and queue.current_index is not None
366 and (queue.items - queue.current_index) <= 1
367 ):
368 queue.radio_source = queue.enqueued_media_items
369 task_id = f"fill_radio_tracks_{queue_id}"
370 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
371
372 @api_command("player_queues/repeat")
373 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
374 """Configure repeat setting on the the queue."""
375 queue = self._queues[queue_id]
376 if queue.repeat_mode == repeat_mode:
377 return # no change
378 queue.repeat_mode = repeat_mode
379 self.signal_update(queue_id)
380 if (
381 queue.state == PlaybackState.PLAYING
382 and queue.index_in_buffer is not None
383 and queue.index_in_buffer == queue.current_index
384 ):
385 # if the queue is playing,
386 # ensure to (re)queue the next track because it might have changed
387 # note that we only do this if the player has loaded the current track
388 # if not, we wait until it has loaded to prevent conflicts
389 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
390 self._enqueue_next_item(queue_id, next_item)
391
392 @api_command("player_queues/set_playback_speed")
393 async def set_playback_speed(
394 self, queue_id: str, speed: float, queue_item_id: str | None = None
395 ) -> None:
396 """
397 Set the playback speed for the given queue item.
398
399 If queue_item_id is not provided,
400 the speed will be set for the current item in the queue.
401
402 :param queue_id: queue_id of the queue to configure.
403 :param speed: playback speed multiplier (0.5 to 2.0). 1.0 = normal speed.
404 """
405 if not (0.5 <= speed <= 2.0):
406 raise InvalidDataError(f"Playback speed must be between 0.5 and 2.0, got {speed}")
407 queue = self._queues[queue_id]
408 if not queue.current_item:
409 raise QueueEmpty("Cannot set playback speed: queue is empty")
410 queue_item_id = queue_item_id or queue.current_item.queue_item_id
411 queue_item = self.get_item(queue_id, queue_item_id)
412 if not queue_item:
413 raise InvalidDataError(f"Queue item {queue_item_id} not found in queue")
414 if not queue_item.duration or queue_item.media_type == MediaType.RADIO:
415 raise InvalidCommand("Cannot set playback speed for items with unknown duration")
416 current_speed = float(queue_item.extra_attributes.get("playback_speed") or 1.0)
417 if abs(current_speed - speed) < 0.001:
418 return # no change
419 # use extra_attributes of the queue item to store the playback speed
420 queue_item.extra_attributes["playback_speed"] = speed
421 self.signal_update(queue_id)
422 if queue.state == PlaybackState.PLAYING:
423 await self.resume(queue_id)
424
425 @api_command("player_queues/play_media")
426 async def play_media(
427 self,
428 queue_id: str,
429 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
430 option: QueueOption | None = None,
431 radio_mode: bool = False,
432 start_item: PlayableMediaItemType | str | None = None,
433 username: str | None = None,
434 ) -> None:
435 """Play media item(s) on the given queue.
436
437 :param queue_id: The queue_id of the queue to play media on.
438 :param media: Media that should be played (MediaItem(s) and/or uri's).
439 :param option: Which enqueue mode to use.
440 :param radio_mode: Enable radio mode for the given item(s).
441 :param start_item: Optional item to start the playlist or album from.
442 :param username: The username of the user requesting the playback.
443 Setting the username allows for overriding the logged-in user
444 to account for playback history per user when the play_media is
445 called from a shared context (like a web hook or automation).
446 """
447 # ruff: noqa: PLR0915
448 # we use a contextvar to bypass the throttler for this asyncio task/context
449 # this makes sure that playback has priority over other requests that may be
450 # happening in the background
451 BYPASS_THROTTLER.set(True)
452 if not (queue := self.get(queue_id)):
453 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
454 # always fetch the underlying player so we can raise early if its not available
455 queue_player = self.mass.players.get_player(queue_id, True)
456 assert queue_player is not None # for type checking
457 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
458 self.logger.warning("Ignore queue command: An announcement is in progress")
459 return
460
461 # save the user requesting the playback
462 playback_user: User | None
463 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
464 playback_user = user
465 else:
466 playback_user = get_current_user()
467 queue.userid = playback_user.user_id if playback_user else None
468
469 # a single item or list of items may be provided
470 media_list = media if isinstance(media, list) else [media]
471
472 # clear queue if needed
473 if option == QueueOption.REPLACE:
474 self.clear(queue_id)
475 # Clear the 'enqueued media item' list when a new queue is requested
476 if option not in (QueueOption.ADD, QueueOption.NEXT):
477 queue.enqueued_media_items.clear()
478
479 media_items: list[MediaItemType] = []
480 radio_source: list[MediaItemType] = []
481 # resolve all media items
482 for item in media_list:
483 try:
484 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
485 media_item: MediaItemType | ItemMapping | BrowseFolder
486 if isinstance(item, str):
487 media_item = await self.mass.music.get_item_by_uri(item)
488 elif isinstance(item, dict): # type: ignore[unreachable]
489 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
490 # converting them to MediaItem objects. The parse_value function in api.py
491 # should handle dict-to-object conversion, but dicts are slipping through
492 # in some cases. This is defensive handling for that parser bug.
493 media_item = media_from_dict(item) # type: ignore[unreachable]
494 self.logger.debug("Converted to: %s", type(media_item))
495 else:
496 # item is MediaItemType | ItemMapping at this point
497 media_item = item
498
499 # Save requested media item to play on the queue so we can use it as a source
500 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
501 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
502 if not isinstance(
503 media_item, (ItemMapping, BrowseFolder)
504 ) and media_item.media_type in (
505 MediaType.TRACK,
506 MediaType.ALBUM,
507 MediaType.PLAYLIST,
508 MediaType.ARTIST,
509 ):
510 queue.enqueued_media_items.append(media_item)
511 if len(queue.enqueued_media_items) > 10:
512 queue.enqueued_media_items.pop(0)
513
514 # handle default enqueue option if needed
515 if option is None:
516 config_value = await self.mass.config.get_core_config_value(
517 self.domain,
518 f"default_enqueue_option_{media_item.media_type.value}",
519 return_type=str,
520 )
521 option = QueueOption(config_value)
522 if option == QueueOption.REPLACE:
523 self.clear(queue_id, skip_stop=True)
524
525 # collect media_items to play
526 if radio_mode:
527 # Type guard for mypy - only add full MediaItemType to radio_source
528 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
529 radio_source.append(media_item)
530 else:
531 # Convert start_item to string URI if needed
532 start_item_uri: str | None = None
533 if isinstance(start_item, str):
534 start_item_uri = start_item
535 elif start_item is not None:
536 start_item_uri = start_item.uri
537 media_items += await self._resolve_media_items(
538 media_item, start_item_uri, queue_id=queue_id
539 )
540
541 except MusicAssistantError as err:
542 # invalid MA uri or item not found error
543 self.logger.warning("Skipping %s: %s", item, str(err))
544
545 # overwrite or append radio source items
546 if option not in (QueueOption.ADD, QueueOption.NEXT):
547 queue.radio_source = radio_source
548 else:
549 queue.radio_source += radio_source
550 # Use collected media items to calculate the radio if radio mode is on
551 if radio_mode:
552 radio_tracks = await self._get_radio_tracks(
553 queue_id=queue_id, is_initial_radio_mode=True
554 )
555 media_items = list(radio_tracks)
556
557 # only add valid/available items
558 queue_items: list[QueueItem] = []
559 for x in media_items:
560 if not x or not x.available:
561 continue
562 queue_items.append(
563 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
564 )
565
566 if not queue_items:
567 raise MediaNotFoundError("No playable items found")
568
569 # load the items into the queue
570 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
571 cur_index = (
572 queue.index_in_buffer
573 if queue.index_in_buffer is not None
574 else (queue.current_index if queue.current_index is not None else 0)
575 )
576 else:
577 cur_index = queue.current_index or 0
578 insert_at_index = cur_index + 1
579 # Radio modes are already shuffled in a pattern we would like to keep.
580 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
581
582 # handle replace: clear all items and replace with the new items
583 if option == QueueOption.REPLACE:
584 await self.load(
585 queue_id,
586 queue_items=queue_items,
587 keep_remaining=False,
588 keep_played=False,
589 shuffle=shuffle,
590 )
591 await self.play_index(queue_id, 0)
592 return
593 # handle next: add item(s) in the index next to the playing/loaded/buffered index
594 if option == QueueOption.NEXT:
595 await self.load(
596 queue_id,
597 queue_items=queue_items,
598 insert_at_index=insert_at_index,
599 shuffle=shuffle,
600 )
601 return
602 if option == QueueOption.REPLACE_NEXT:
603 await self.load(
604 queue_id,
605 queue_items=queue_items,
606 insert_at_index=insert_at_index,
607 keep_remaining=False,
608 shuffle=shuffle,
609 )
610 return
611 # handle play: replace current loaded/playing index with new item(s)
612 if option == QueueOption.PLAY:
613 await self.load(
614 queue_id,
615 queue_items=queue_items,
616 insert_at_index=insert_at_index,
617 shuffle=shuffle,
618 )
619 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
620 await self.play_index(queue_id, next_index)
621 return
622 # handle add: add/append item(s) to the remaining queue items
623 if option == QueueOption.ADD:
624 await self.load(
625 queue_id=queue_id,
626 queue_items=queue_items,
627 insert_at_index=insert_at_index
628 if queue.shuffle_enabled
629 else len(self._queue_items[queue_id]) + 1,
630 shuffle=queue.shuffle_enabled,
631 )
632 # handle edgecase, queue is empty and items are only added (not played)
633 # mark first item as new index
634 if queue.current_index is None:
635 queue.current_index = 0
636 queue.current_item = self.get_item(queue_id, 0)
637 queue.items = len(queue_items)
638 self.signal_update(queue_id)
639
640 @api_command("player_queues/move_item")
641 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
642 """
643 Move queue item x up/down the queue.
644
645 - queue_id: id of the queue to process this request.
646 - queue_item_id: the item_id of the queueitem that needs to be moved.
647 - pos_shift: move item x positions down if positive value
648 - pos_shift: move item x positions up if negative value
649 - pos_shift: move item to top of queue as next item if 0.
650 """
651 queue = self._queues[queue_id]
652 item_index = self.index_by_id(queue_id, queue_item_id)
653 if item_index is None:
654 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
655 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
656 msg = f"{item_index} is already played/buffered"
657 raise IndexError(msg)
658
659 queue_items = self._queue_items[queue_id]
660 queue_items = queue_items.copy()
661
662 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
663 new_index = (queue.current_index or 0) + 1
664 elif pos_shift == 0:
665 new_index = queue.current_index or 0
666 else:
667 new_index = item_index + pos_shift
668 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
669 return
670 # move the item in the list
671 queue_items.insert(new_index, queue_items.pop(item_index))
672 self.update_items(queue_id, queue_items)
673
674 @api_command("player_queues/move_item_end")
675 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
676 """
677 Move queue item to the end the queue.
678
679 - queue_id: id of the queue to process this request.
680 - queue_item_id: the item_id of the queueitem that needs to be moved.
681 """
682 queue = self._queues[queue_id]
683 item_index = self.index_by_id(queue_id, queue_item_id)
684 if item_index is None:
685 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
686 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
687 msg = f"{item_index} is already played/buffered"
688 raise IndexError(msg)
689
690 queue_items = self._queue_items[queue_id]
691 if item_index == (len(queue_items) - 1):
692 return
693 queue_items = queue_items.copy()
694
695 new_index = len(self._queue_items[queue_id]) - 1
696
697 # move the item in the list
698 queue_items.insert(new_index, queue_items.pop(item_index))
699 self.update_items(queue_id, queue_items)
700
701 @api_command("player_queues/delete_item")
702 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
703 """Delete item (by id or index) from the queue."""
704 if isinstance(item_id_or_index, str):
705 item_index = self.index_by_id(queue_id, item_id_or_index)
706 if item_index is None:
707 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
708 else:
709 item_index = item_id_or_index
710 queue = self._queues[queue_id]
711 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
712 # ignore request if track already loaded in the buffer
713 # the frontend should guard so this is just in case
714 self.logger.warning("delete requested for item already loaded in buffer")
715 return
716 queue_items = self._queue_items[queue_id]
717 queue_items.pop(item_index)
718 self.update_items(queue_id, queue_items)
719
720 @api_command("player_queues/clear")
721 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
722 """Clear all items in the queue."""
723 queue = self._queues[queue_id]
724 queue.radio_source = []
725 if queue.state != PlaybackState.IDLE and not skip_stop:
726 self.mass.create_task(self.stop(queue_id))
727 queue.current_index = None
728 queue.current_item = None
729 queue.elapsed_time = 0
730 queue.elapsed_time_last_updated = time.time()
731 queue.index_in_buffer = None
732 self.mass.create_task(self.mass.streams.cleanup_queue_audio_data(queue_id))
733 self.update_items(queue_id, [])
734
735 @api_command("player_queues/save_as_playlist")
736 async def save_as_playlist(self, queue_id: str, name: str) -> Playlist:
737 """Save the current queue items as a new playlist.
738
739 :param queue_id: The queue_id of the queue to save.
740 :param name: The name for the new playlist.
741 """
742 if not self.get(queue_id):
743 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
744 queue_items = self._queue_items.get(queue_id, [])
745 if not queue_items:
746 raise QueueEmpty("Cannot save an empty queue as a playlist.")
747 # collect URIs from queue items that are playlist-compatible
748 uris: list[str] = []
749 for item in queue_items:
750 if item.uri and item.media_type in PLAYLIST_MEDIA_TYPES:
751 uris.append(item.uri)
752 if not uris:
753 raise InvalidDataError("No valid items in queue to save as playlist.")
754 playlist = await self.mass.music.playlists.create_playlist(name)
755 await self.mass.music.playlists.add_playlist_tracks(playlist.item_id, uris)
756 return playlist
757
758 @api_command("player_queues/stop")
759 async def stop(self, queue_id: str) -> None:
760 """
761 Handle STOP command for given queue.
762
763 - queue_id: queue_id of the playerqueue to handle the command.
764 """
765 queue_player = self.mass.players.get_player(queue_id, True)
766 if queue_player is None:
767 raise PlayerUnavailableError(f"Player {queue_id} is not available")
768 if (queue := self.get(queue_id)) and queue.active:
769 if queue.state == PlaybackState.PLAYING:
770 queue.resume_pos = int(queue.corrected_elapsed_time)
771 # Set context to prevent circular call, then forward the actual command to the player
772 token = IN_QUEUE_COMMAND.set(True)
773 try:
774 await self.mass.players.cmd_stop(queue_id)
775 finally:
776 IN_QUEUE_COMMAND.reset(token)
777 self.mass.create_task(self.mass.streams.cleanup_queue_audio_data(queue_id))
778
779 @api_command("player_queues/play")
780 async def play(self, queue_id: str) -> None:
781 """
782 Handle PLAY command for given queue.
783
784 - queue_id: queue_id of the playerqueue to handle the command.
785 """
786 queue_player = self.mass.players.get_player(queue_id, True)
787 if queue_player is None:
788 raise PlayerUnavailableError(f"Player {queue_id} is not available")
789 if (
790 (queue := self._queues.get(queue_id))
791 and queue.active
792 and queue.state == PlaybackState.PAUSED
793 ):
794 # forward the actual play/unpause command to the player
795 await queue_player.play()
796 return
797 # player is not paused, perform resume instead
798 await self.resume(queue_id)
799
800 @api_command("player_queues/pause")
801 async def pause(self, queue_id: str) -> None:
802 """Handle PAUSE command for given queue.
803
804 - queue_id: queue_id of the playerqueue to handle the command.
805 """
806 if not (queue := self._queues.get(queue_id)):
807 return
808 queue_active = queue.active
809 if queue.active and queue.state == PlaybackState.PLAYING:
810 queue.resume_pos = int(queue.corrected_elapsed_time)
811 # forward the actual command to the player controller
812 # Set context to prevent circular call, then forward the actual command to the player
813 token = IN_QUEUE_COMMAND.set(True)
814 try:
815 await self.mass.players.cmd_pause(queue_id)
816 finally:
817 IN_QUEUE_COMMAND.reset(token)
818
819 async def _watch_pause(player: Player) -> None:
820 count = 0
821 # wait for pause
822 while count < 5 and player.state.playback_state == PlaybackState.PLAYING:
823 count += 1
824 await asyncio.sleep(1)
825 # wait for unpause
826 if player.state.playback_state != PlaybackState.PAUSED:
827 return
828 count = 0
829 while count < 30 and player.state.playback_state == PlaybackState.PAUSED:
830 count += 1
831 await asyncio.sleep(1)
832 # if player is still paused when the limit is reached, send stop
833 if player.state.playback_state == PlaybackState.PAUSED:
834 await self.stop(queue_id)
835
836 # we auto stop a player from paused when its paused for 30 seconds
837 if (
838 queue_active
839 and (queue_player := self.mass.players.get_player(queue_id))
840 and not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS)
841 ):
842 self.mass.create_task(_watch_pause(queue_player))
843
844 @api_command("player_queues/play_pause")
845 async def play_pause(self, queue_id: str) -> None:
846 """Toggle play/pause on given playerqueue.
847
848 - queue_id: queue_id of the queue to handle the command.
849 """
850 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
851 await self.pause(queue_id)
852 return
853 await self.play(queue_id)
854
855 @api_command("player_queues/next")
856 async def next(self, queue_id: str) -> None:
857 """Handle NEXT TRACK command for given queue.
858
859 - queue_id: queue_id of the queue to handle the command.
860 """
861 if (queue := self.get(queue_id)) is None or not queue.active:
862 raise InvalidCommand(f"Queue {queue_id} is not active")
863 idx = self._queues[queue_id].current_index
864 if idx is None:
865 self.logger.warning("Queue %s has no current index", queue.display_name)
866 return
867 attempts = 5
868 while attempts:
869 try:
870 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
871 await self.play_index(queue_id, next_index, debounce=True)
872 break
873 except MediaNotFoundError:
874 self.logger.warning(
875 "Failed to fetch next track for queue %s - trying next item",
876 queue.display_name,
877 )
878 idx += 1
879 attempts -= 1
880
881 @api_command("player_queues/previous")
882 async def previous(self, queue_id: str) -> None:
883 """Handle PREVIOUS TRACK command for given queue.
884
885 - queue_id: queue_id of the queue to handle the command.
886 """
887 if (queue := self.get(queue_id)) is None or not queue.active:
888 raise InvalidCommand(f"Queue {queue_id} is not active")
889 current_index = self._queues[queue_id].current_index
890 if current_index is None:
891 return
892 next_index = int(current_index)
893 # restart current track if current track has played longer than 4
894 # otherwise skip to previous track
895 if self._queues[queue_id].elapsed_time < 5:
896 next_index = max(current_index - 1, 0)
897 await self.play_index(queue_id, next_index, debounce=True)
898
899 @api_command("player_queues/skip")
900 async def skip(self, queue_id: str, seconds: int = 10) -> None:
901 """Handle SKIP command for given queue.
902
903 - queue_id: queue_id of the queue to handle the command.
904 - seconds: number of seconds to skip in track. Use negative value to skip back.
905 """
906 if (queue := self.get(queue_id)) is None or not queue.active:
907 raise InvalidCommand(f"Queue {queue_id} is not active")
908 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
909
910 @api_command("player_queues/seek")
911 async def seek(self, queue_id: str, position: int = 10) -> None:
912 """Handle SEEK command for given queue.
913
914 - queue_id: queue_id of the queue to handle the command.
915 - position: position in seconds to seek to in the current playing item.
916 """
917 if (queue := self.get(queue_id)) is None or not queue.active:
918 raise InvalidCommand(f"Queue {queue_id} is not active")
919 queue_player = self.mass.players.get_player(queue_id, True)
920 if queue_player is None:
921 raise PlayerUnavailableError(f"Player {queue_id} is not available")
922 if not queue.current_item:
923 raise InvalidCommand(f"Queue {queue_player.state.name} has no item(s) loaded.")
924 if not queue.current_item.duration:
925 raise InvalidCommand("Can not seek items without duration.")
926 position = max(0, int(position))
927 if position > queue.current_item.duration:
928 raise InvalidCommand("Can not seek outside of duration range.")
929 if queue.current_index is None:
930 raise InvalidCommand(f"Queue {queue_player.state.name} has no current index.")
931 await self.play_index(queue_id, queue.current_index, seek_position=position)
932
933 @api_command("player_queues/resume")
934 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
935 """Handle RESUME command for given queue.
936
937 - queue_id: queue_id of the queue to handle the command.
938 """
939 queue = self._queues[queue_id]
940 queue_items = self._queue_items[queue_id]
941 resume_item = queue.current_item
942 if queue.state == PlaybackState.PLAYING:
943 # resume requested while already playing,
944 # use current position as resume position
945 resume_pos = queue.corrected_elapsed_time
946 fade_in = False
947 else:
948 resume_pos = queue.resume_pos or queue.elapsed_time
949
950 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
951 resume_item = self.get_item(queue_id, queue.current_index)
952 resume_pos = 0
953 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
954 # items available in queue but no previous track, start at 0
955 resume_item = self.get_item(queue_id, 0)
956 resume_pos = 0
957
958 if resume_item is not None:
959 queue_player = self.mass.players.get_player(queue_id)
960 if queue_player is None:
961 raise PlayerUnavailableError(f"Player {queue_id} is not available")
962 if (
963 fade_in is None
964 and queue_player.state.playback_state == PlaybackState.IDLE
965 and (time.time() - queue.elapsed_time_last_updated) > 60
966 ):
967 # enable fade in effect if the player is idle for a while
968 fade_in = resume_pos > 0
969 if resume_item.media_type == MediaType.RADIO:
970 # we're not able to skip in online radio so this is pointless
971 resume_pos = 0
972 await self.play_index(
973 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
974 )
975 else:
976 # Queue is empty, try to resume from playlog
977 if await self._try_resume_from_playlog(queue):
978 return
979 msg = f"Resume queue requested but queue {queue.display_name} is empty"
980 raise QueueEmpty(msg)
981
982 @api_command("player_queues/play_index")
983 async def play_index(
984 self,
985 queue_id: str,
986 index: int | str,
987 seek_position: int = 0,
988 fade_in: bool = False,
989 debounce: bool = False,
990 ) -> None:
991 """Play item at index (or item_id) X in queue."""
992 queue = self._queues[queue_id]
993 queue.resume_pos = 0
994 if isinstance(index, str):
995 temp_index = self.index_by_id(queue_id, index)
996 if temp_index is None:
997 raise InvalidDataError(f"Item {index} not found in queue")
998 index = temp_index
999 # At this point index is guaranteed to be int
1000 queue.current_index = index
1001 # update current item and elapsed time and signal update
1002 # this way the UI knows immediately that a new item is loading
1003 queue.current_item = self.get_item(queue_id, index)
1004 queue.elapsed_time = seek_position
1005 queue.elapsed_time_last_updated = time.time()
1006 self.signal_update(queue_id)
1007 queue.index_in_buffer = index
1008 queue.flow_mode_stream_log = []
1009 target_player = self.mass.players.get_player(queue_id)
1010 if target_player is None:
1011 raise PlayerUnavailableError(f"Player {queue_id} is not available")
1012 queue.next_item_id_enqueued = None
1013 # always update session id when we start a new playback session
1014 queue.session_id = shortuuid.random(length=8)
1015 # handle resume point of audiobook(chapter) or podcast(episode)
1016 if (
1017 not seek_position
1018 and (queue_item := self.get_item(queue_id, index))
1019 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
1020 ):
1021 seek_position = max(0, int((resume_position_ms - 500) / 1000))
1022
1023 # send play_media request to player
1024 # NOTE that we debounce this a bit to account for someone hitting the next button
1025 # like a madman. This will prevent the player from being overloaded with requests.
1026 async def _play_index(index: int, debounce: bool) -> None:
1027 for attempt in range(5):
1028 try:
1029 queue_item = self.get_item(queue_id, index)
1030 if not queue_item:
1031 continue # guard
1032 await self._load_item(
1033 queue_item,
1034 self._get_next_index(queue_id, index),
1035 is_start=True,
1036 seek_position=seek_position if attempt == 0 else 0,
1037 fade_in=fade_in if attempt == 0 else False,
1038 )
1039 # if we reach this point, loading the item succeeded, break the loop
1040 queue.current_index = index
1041 queue.current_item = queue_item
1042 break
1043 except (MediaNotFoundError, AudioError):
1044 # the requested index can not be played.
1045 if queue_item:
1046 self.logger.warning(
1047 "Skipping unplayable item %s (%s)",
1048 queue_item.name,
1049 queue_item.uri,
1050 )
1051 queue_item.available = False
1052 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
1053 if next_index is None:
1054 raise MediaNotFoundError("No next item available")
1055 index = next_index
1056 else:
1057 # all attempts to find a playable item failed
1058 raise MediaNotFoundError("No playable item found to start playback")
1059
1060 # Select and set the output protocol before evaluating flow_mode,
1061 # since flow_mode depends on the active output protocol
1062 self.mass.players.select_output_protocol(queue_id)
1063 # work out if we need to use flow mode
1064 flow_mode = target_player.flow_mode and queue_item.media_type not in (
1065 # don't use flow mode for duration-less streams
1066 MediaType.RADIO,
1067 MediaType.PLUGIN_SOURCE,
1068 )
1069 await asyncio.sleep(0.5 if debounce else 0.1)
1070 queue.flow_mode = flow_mode
1071 await self.mass.players.play_media(
1072 player_id=queue_id,
1073 media=await self.player_media_from_queue_item(queue_item, flow_mode),
1074 )
1075 queue.current_index = index
1076 queue.current_item = queue_item
1077 await asyncio.sleep(2)
1078 self._transitioning_players.discard(queue_id)
1079
1080 # we set a flag to notify the update logic that we're transitioning to a new track
1081 self._transitioning_players.add(queue_id)
1082
1083 # we debounce the play_index command to handle the case where someone
1084 # is spamming next/previous on the player
1085 task_id = f"play_index_{queue_id}"
1086 if existing_task := self.mass.get_task(task_id):
1087 existing_task.cancel()
1088 with suppress(asyncio.CancelledError):
1089 await existing_task
1090 task = self.mass.create_task(
1091 _play_index,
1092 index,
1093 debounce,
1094 task_id=task_id,
1095 )
1096 await task
1097 self.signal_update(queue_id)
1098
1099 @api_command("player_queues/transfer")
1100 async def transfer_queue(
1101 self,
1102 source_queue_id: str,
1103 target_queue_id: str,
1104 auto_play: bool | None = None,
1105 ) -> None:
1106 """Transfer queue to another queue."""
1107 if not (source_queue := self.get(source_queue_id)):
1108 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1109 if not (target_queue := self.get(target_queue_id)):
1110 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1111 if auto_play is None:
1112 auto_play = source_queue.state == PlaybackState.PLAYING
1113
1114 target_player = self.mass.players.get_player(target_queue_id)
1115 if target_player is None:
1116 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1117 if target_player.state.active_group or target_player.state.synced_to:
1118 # edge case: the user wants to move playback from the group as a whole, to a single
1119 # player in the group or it is grouped and the command targeted at the single player.
1120 # We need to dissolve the group first.
1121 group_id = target_player.state.active_group or target_player.state.synced_to
1122 assert group_id is not None # checked in if condition above
1123 await self.mass.players.cmd_ungroup(group_id)
1124 await asyncio.sleep(3)
1125
1126 source_items = self._queue_items[source_queue_id]
1127 target_queue.repeat_mode = source_queue.repeat_mode
1128 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1129 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1130 target_queue.radio_source = source_queue.radio_source
1131 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1132 target_queue.resume_pos = int(source_queue.elapsed_time)
1133 target_queue.current_index = source_queue.current_index
1134 if source_queue.current_item:
1135 target_queue.current_item = source_queue.current_item
1136 target_queue.current_item.queue_id = target_queue_id
1137 self.clear(source_queue_id)
1138
1139 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1140 for item in source_items:
1141 item.queue_id = target_queue_id
1142 self.update_items(target_queue_id, source_items)
1143 if auto_play:
1144 await self.resume(target_queue_id)
1145
1146 # Interaction with player
1147
1148 async def on_player_register(self, player: Player) -> None:
1149 """Register PlayerQueue for given player/queue id."""
1150 queue_id = player.player_id
1151 queue: PlayerQueue | None = None
1152 queue_items: list[QueueItem] = []
1153 # try to restore previous state
1154 if prev_state := await self.mass.cache.get(
1155 key=queue_id,
1156 provider=self.domain,
1157 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1158 ):
1159 try:
1160 queue = PlayerQueue.from_dict(prev_state)
1161 prev_items = await self.mass.cache.get(
1162 key=queue_id,
1163 provider=self.domain,
1164 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1165 default=[],
1166 )
1167 queue_items = []
1168 for idx, item_data in enumerate(prev_items):
1169 qi = QueueItem.from_cache(item_data)
1170 if not qi.media_item:
1171 # Skip items with missing media_item - this can happen if
1172 # MA was killed during shutdown while cache was being written
1173 self.logger.debug(
1174 "Skipping queue item %s (index %d) restored from cache "
1175 "without media_item",
1176 qi.name,
1177 idx,
1178 )
1179 continue
1180 queue_items.append(qi)
1181 if queue.enqueued_media_items:
1182 # we need to restore the MediaItem objects for the enqueued media items
1183 # Items from cache may be dicts that need deserialization
1184 restored_enqueued_items: list[MediaItemType] = []
1185 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1186 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1187 )
1188 for item in cached_items:
1189 if isinstance(item, dict):
1190 restored_item = media_from_dict(item)
1191 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1192 else:
1193 restored_enqueued_items.append(item)
1194 queue.enqueued_media_items = restored_enqueued_items
1195 except Exception as err:
1196 self.logger.warning(
1197 "Failed to restore the queue(items) for %s - %s",
1198 player.state.name,
1199 str(err),
1200 )
1201 # Reset to clean state on failure
1202 queue = None
1203 queue_items = []
1204 if queue is None:
1205 queue = PlayerQueue(
1206 queue_id=queue_id,
1207 active=False,
1208 display_name=player.state.name,
1209 available=player.state.available,
1210 dont_stop_the_music_enabled=False,
1211 items=0,
1212 )
1213
1214 self._queues[queue_id] = queue
1215 self._queue_items[queue_id] = queue_items
1216 # always call update to calculate state etc
1217 self.on_player_update(player, {})
1218 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1219
1220 def on_player_update(
1221 self,
1222 player: Player,
1223 changed_values: dict[str, tuple[Any, Any]],
1224 ) -> None:
1225 """
1226 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1227
1228 NOTE: This is called every second if the player is playing.
1229 """
1230 queue_id = player.player_id
1231 if (queue := self._queues.get(queue_id)) is None:
1232 # race condition
1233 return
1234 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1235 # do nothing while the announcement is in progress
1236 return
1237 # determine if this queue is currently active for this player
1238 queue.active = player.state.active_source in (queue.queue_id, None)
1239 if not queue.active and queue_id not in self._prev_states:
1240 queue.state = PlaybackState.IDLE
1241 # return early if the queue is not active and we have no previous state
1242 return
1243 if queue.queue_id in self._transitioning_players:
1244 # we're currently transitioning to a new track,
1245 # ignore updates from the player during this time
1246 return
1247
1248 # queue is active and preflight checks passed, update the queue details
1249 self._update_queue_from_player(player)
1250
1251 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1252 """Call when a player is removed from the registry."""
1253 if permanent:
1254 # if the player is permanently removed, we also remove the cached queue data
1255 self.mass.create_task(
1256 self.mass.cache.delete(
1257 key=player_id,
1258 provider=self.domain,
1259 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1260 )
1261 )
1262 self.mass.create_task(
1263 self.mass.cache.delete(
1264 key=player_id,
1265 provider=self.domain,
1266 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1267 )
1268 )
1269 self._queues.pop(player_id, None)
1270 self._queue_items.pop(player_id, None)
1271
1272 async def load_next_queue_item(
1273 self,
1274 queue_id: str,
1275 current_item_id: str,
1276 ) -> QueueItem:
1277 """
1278 Call when a player wants the next queue item to play.
1279
1280 Raises QueueEmpty if there are no more tracks left.
1281 """
1282 queue = self.get(queue_id)
1283 if not queue:
1284 msg = f"PlayerQueue {queue_id} is not available"
1285 raise PlayerUnavailableError(msg)
1286 cur_index = self.index_by_id(queue_id, current_item_id)
1287 if cur_index is None:
1288 # this is just a guard for bad data
1289 raise QueueEmpty("Invalid item id for queue given.")
1290 next_item: QueueItem | None = None
1291 idx = 0
1292 while True:
1293 next_index = self._get_next_index(queue_id, cur_index + idx)
1294 if next_index is None:
1295 raise QueueEmpty("No more tracks left in the queue.")
1296 queue_item = self.get_item(queue_id, next_index)
1297 if queue_item is None:
1298 raise QueueEmpty("No more tracks left in the queue.")
1299 if idx >= 10:
1300 # we only allow 10 retries to prevent infinite loops
1301 raise QueueEmpty("No more (playable) tracks left in the queue.")
1302 try:
1303 await self._load_item(queue_item, next_index)
1304 # we're all set, this is our next item
1305 next_item = queue_item
1306 break
1307 except (MediaNotFoundError, AudioError):
1308 # No stream details found, skip this QueueItem
1309 self.logger.warning(
1310 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1311 )
1312 queue_item.available = False
1313 idx += 1
1314 if idx != 0:
1315 # we skipped some items, signal a queue items update
1316 self.update_items(queue_id, self._queue_items[queue_id])
1317 if next_item is None:
1318 raise QueueEmpty("No more (playable) tracks left in the queue.")
1319
1320 return next_item
1321
1322 async def _load_item(
1323 self,
1324 queue_item: QueueItem,
1325 next_index: int | None,
1326 is_start: bool = False,
1327 seek_position: int = 0,
1328 fade_in: bool = False,
1329 ) -> None:
1330 """Try to load the stream details for the given queue item."""
1331 queue_id = queue_item.queue_id
1332 queue = self._queues[queue_id]
1333
1334 # we use a contextvar to bypass the throttler for this asyncio task/context
1335 # this makes sure that playback has priority over other requests that may be
1336 # happening in the background
1337 BYPASS_THROTTLER.set(True)
1338
1339 self.logger.debug(
1340 "(pre)loading (next) item for queue %s...",
1341 queue.display_name,
1342 )
1343
1344 if not queue_item.available:
1345 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1346
1347 # work out if we are playing an album and if we should prefer album
1348 # loudness
1349 next_track_from_same_album = (
1350 next_index is not None
1351 and (next_item := self.get_item(queue_id, next_index))
1352 and (
1353 queue_item.media_item
1354 and hasattr(queue_item.media_item, "album")
1355 and queue_item.media_item.album
1356 and next_item.media_item
1357 and hasattr(next_item.media_item, "album")
1358 and next_item.media_item.album
1359 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1360 )
1361 )
1362 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1363 if current_index is None:
1364 previous_track_from_same_album = False
1365 else:
1366 previous_index = max(current_index - 1, 0)
1367 previous_track_from_same_album = (
1368 previous_index > 0
1369 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1370 and previous_item.media_item is not None
1371 and hasattr(previous_item.media_item, "album")
1372 and previous_item.media_item.album is not None
1373 and queue_item.media_item is not None
1374 and hasattr(queue_item.media_item, "album")
1375 and queue_item.media_item.album is not None
1376 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1377 )
1378 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1379 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1380 album = queue_item.media_item.album
1381 # prefer the full library media item so we have all metadata and provider(quality) info
1382 # always request the full library item as there might be other qualities available
1383 if library_item := await self.mass.music.get_library_item_by_prov_id(
1384 queue_item.media_item.media_type,
1385 queue_item.media_item.item_id,
1386 queue_item.media_item.provider,
1387 ):
1388 queue_item.media_item = cast("Track", library_item)
1389 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1390 "ytmusic"
1391 ):
1392 # Youtube Music has poor thumbs by default, so we always fetch the full item
1393 # this also catches the case where they have an unavailable item in a listing
1394 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1395 queue_item.media_item = cast("Track", fetched_item)
1396
1397 # ensure we got the full (original) album set
1398 if album and (
1399 library_album := await self.mass.music.get_library_item_by_prov_id(
1400 album.media_type,
1401 album.item_id,
1402 album.provider,
1403 )
1404 ):
1405 queue_item.media_item.album = cast("Album", library_album)
1406 elif album:
1407 # Restore original album if we have no better alternative from the library
1408 queue_item.media_item.album = album
1409 # prefer album image over track image
1410 if queue_item.media_item.album and queue_item.media_item.album.image:
1411 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1412 queue_item.media_item.metadata.images = UniqueList(
1413 [
1414 queue_item.media_item.album.image,
1415 *org_images,
1416 ]
1417 )
1418 # Fetch the streamdetails, which could raise in case of an unplayable item.
1419 # For example, YT Music returns Radio Items that are not playable.
1420 queue_item.streamdetails = await get_stream_details(
1421 mass=self.mass,
1422 queue_item=queue_item,
1423 seek_position=seek_position,
1424 fade_in=fade_in,
1425 prefer_album_loudness=bool(playing_album_tracks),
1426 )
1427
1428 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1429 """Call when a player has (started) loading a track in the buffer."""
1430 queue = self.get(queue_id)
1431 if not queue:
1432 msg = f"PlayerQueue {queue_id} is not available"
1433 raise PlayerUnavailableError(msg)
1434 # store the index of the item that is currently (being) loaded in the buffer
1435 # which helps us a bit to determine how far the player has buffered ahead
1436 current_index = self.index_by_id(queue_id, item_id)
1437 queue.index_in_buffer = current_index
1438 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1439 self.signal_update(queue_id)
1440 # preload next streamdetails
1441 self._preload_next_item(queue_id, item_id)
1442 # clean up stale audio buffers for old queue items to prevent memory leaks
1443 if current_index is not None:
1444 self.mass.create_task(
1445 self.mass.streams.cleanup_stale_queue_buffers(queue_id, current_index)
1446 )
1447
1448 # Main queue manipulation methods
1449
1450 async def load(
1451 self,
1452 queue_id: str,
1453 queue_items: list[QueueItem],
1454 insert_at_index: int = 0,
1455 keep_remaining: bool = True,
1456 keep_played: bool = True,
1457 shuffle: bool = False,
1458 ) -> None:
1459 """Load new items at index.
1460
1461 - queue_id: id of the queue to process this request.
1462 - queue_items: a list of QueueItems
1463 - insert_at_index: insert the item(s) at this index
1464 - keep_remaining: keep the remaining items after the insert
1465 - shuffle: (re)shuffle the items after insert index
1466 """
1467 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1468 next_items = queue_items
1469
1470 # if keep_remaining, append the old 'next' items
1471 if keep_remaining:
1472 next_items += self._queue_items[queue_id][insert_at_index:]
1473
1474 # we set the original insert order as attribute so we can un-shuffle
1475 for index, item in enumerate(next_items):
1476 item.sort_index += insert_at_index + index
1477 # (re)shuffle the final batch if needed
1478 if shuffle:
1479 next_items = await _smart_shuffle(next_items)
1480 self.update_items(queue_id, prev_items + next_items)
1481
1482 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1483 """Update the existing queue items, mostly caused by reordering."""
1484 self._queue_items[queue_id] = queue_items
1485 queue = self._queues[queue_id]
1486 queue.items = len(self._queue_items[queue_id])
1487 # to track if the queue items changed we set a timestamp
1488 # this is a simple way to detect changes in the list of items
1489 # without having to compare the entire list
1490 queue.items_last_updated = time.time()
1491 self.signal_update(queue_id, True)
1492 if (
1493 queue.state == PlaybackState.PLAYING
1494 and queue.index_in_buffer is not None
1495 and queue.index_in_buffer == queue.current_index
1496 ):
1497 # if the queue is playing,
1498 # ensure to (re)queue the next track because it might have changed
1499 # note that we only do this if the player has loaded the current track
1500 # if not, we wait until it has loaded to prevent conflicts
1501 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1502 self._enqueue_next_item(queue_id, next_item)
1503
1504 # Helper methods
1505
1506 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1507 """Get queue item by index or item_id."""
1508 if item_id_or_index is None:
1509 return None
1510 if (queue_items := self._queue_items.get(queue_id)) is None:
1511 return None
1512 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1513 return queue_items[item_id_or_index]
1514 if isinstance(item_id_or_index, str):
1515 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1516 return None
1517
1518 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1519 """Signal state changed of given queue."""
1520 queue = self._queues[queue_id]
1521 if items_changed:
1522 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1523 # save items in cache - only cache items with valid media_item
1524 cache_data = [
1525 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1526 ]
1527 self.mass.create_task(
1528 self.mass.cache.set(
1529 key=queue_id,
1530 data=cache_data,
1531 provider=self.domain,
1532 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1533 )
1534 )
1535 # always send the base event
1536 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1537 # also signal update to the player itself so it can update its current_media
1538 self.mass.players.trigger_player_update(queue_id)
1539 # save state
1540 self.mass.create_task(
1541 self.mass.cache.set(
1542 key=queue_id,
1543 data=queue.to_cache(),
1544 provider=self.domain,
1545 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1546 )
1547 )
1548
1549 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1550 """Get index by queue_item_id."""
1551 queue_items = self._queue_items[queue_id]
1552 for index, item in enumerate(queue_items):
1553 if item.queue_item_id == queue_item_id:
1554 return index
1555 return None
1556
1557 async def player_media_from_queue_item(
1558 self, queue_item: QueueItem, flow_mode: bool
1559 ) -> PlayerMedia:
1560 """Parse PlayerMedia from QueueItem."""
1561 queue = self._queues[queue_item.queue_id]
1562 if flow_mode:
1563 duration = None
1564 elif queue_item.streamdetails:
1565 # prefer netto duration
1566 # when seeking, the player only receives the remaining duration
1567 duration = queue_item.streamdetails.duration or queue_item.duration
1568 if duration and queue_item.streamdetails.seek_position:
1569 duration = duration - queue_item.streamdetails.seek_position
1570 else:
1571 duration = queue_item.duration
1572 if queue.session_id is None:
1573 # handle error or return early
1574 raise InvalidDataError("Queue session_id is None")
1575 media = PlayerMedia(
1576 uri=queue_item.uri,
1577 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1578 title="Music Assistant" if flow_mode else queue_item.name,
1579 image_url=MASS_LOGO_ONLINE,
1580 duration=duration,
1581 source_id=queue_item.queue_id,
1582 queue_item_id=queue_item.queue_item_id,
1583 custom_data={
1584 "session_id": queue.session_id,
1585 "original_uri": queue_item.uri,
1586 "flow_mode": flow_mode,
1587 },
1588 )
1589 if not flow_mode and queue_item.media_item:
1590 media.title = queue_item.media_item.name
1591 media.artist = getattr(queue_item.media_item, "artist_str", "")
1592 media.album = (
1593 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1594 )
1595 if queue_item.image:
1596 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1597 # we prefer the imageproxy on the streamserver here because this request is sent
1598 # to the player itself which may not be able to reach the regular webserver
1599 media.image_url = self.mass.metadata.get_image_url(
1600 queue_item.image, size=500, prefer_stream_server=True
1601 )
1602 return media
1603
1604 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1605 """Return tracks for given artist, based on user preference."""
1606 artist_items_conf = self.mass.config.get_raw_core_config_value(
1607 self.domain,
1608 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1609 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1610 )
1611 self.logger.info(
1612 "Fetching tracks to play for artist %s",
1613 artist.name,
1614 )
1615 if artist_items_conf in ("library_tracks", "all_tracks"):
1616 all_items = await self.mass.music.artists.tracks(
1617 artist.item_id,
1618 artist.provider,
1619 in_library_only=artist_items_conf == "library_tracks",
1620 )
1621 random.shuffle(all_items)
1622 return all_items
1623 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1624 all_tracks: list[Track] = []
1625 for library_album in await self.mass.music.artists.albums(
1626 artist.item_id,
1627 artist.provider,
1628 in_library_only=artist_items_conf == "library_album_tracks",
1629 ):
1630 for album_track in await self.mass.music.albums.tracks(
1631 library_album.item_id, library_album.provider
1632 ):
1633 if album_track not in all_tracks:
1634 all_tracks.append(album_track)
1635 random.shuffle(all_tracks)
1636 return all_tracks
1637 return []
1638
1639 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1640 """Return tracks for given album, based on user preference."""
1641 album_items_conf = self.mass.config.get_raw_core_config_value(
1642 self.domain,
1643 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1644 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1645 )
1646 result: list[Track] = []
1647 start_item_found = False
1648 self.logger.info(
1649 "Fetching tracks to play for album %s",
1650 album.name,
1651 )
1652 for album_track in await self.mass.music.albums.tracks(
1653 item_id=album.item_id,
1654 provider_instance_id_or_domain=album.provider,
1655 in_library_only=album_items_conf == "library_tracks",
1656 ):
1657 if not album_track.available:
1658 continue
1659 if start_item in (album_track.item_id, album_track.uri):
1660 start_item_found = True
1661 if start_item is not None and not start_item_found:
1662 continue
1663 result.append(album_track)
1664 return result
1665
1666 async def get_genre_tracks(self, genre: Genre, start_item: str | None) -> list[Track]:
1667 """Return tracks for given genre, based on alias mappings.
1668
1669 Limits results to avoid loading thousands of tracks for broad genres.
1670 Directly mapped tracks are fetched with random ordering, then supplemented
1671 with tracks from a limited set of mapped albums and artists.
1672 """
1673 result: list[Track] = []
1674 start_item_found = False
1675 self.logger.info(
1676 "Fetching tracks to play for genre %s",
1677 genre.name,
1678 )
1679 tracks, albums, artists = await self.mass.music.genres.mapped_media(
1680 genre,
1681 track_limit=25,
1682 album_limit=5,
1683 artist_limit=5,
1684 order_by="random",
1685 )
1686
1687 for genre_track in tracks:
1688 if not genre_track.available:
1689 continue
1690 if start_item in (genre_track.item_id, genre_track.uri):
1691 start_item_found = True
1692 if start_item is not None and not start_item_found:
1693 continue
1694 result.append(genre_track)
1695
1696 for album in albums:
1697 album_tracks = await self.get_album_tracks(album, None)
1698 result.extend(album_tracks[:5])
1699
1700 for artist in artists:
1701 artist_tracks = await self.get_artist_tracks(artist)
1702 result.extend(artist_tracks[:5])
1703 return result
1704
1705 async def get_playlist_tracks(
1706 self, playlist: Playlist, start_item: str | None
1707 ) -> list[PlaylistPlayableItem]:
1708 """Return tracks for given playlist, based on user preference."""
1709 result: list[PlaylistPlayableItem] = []
1710 start_item_found = False
1711 self.logger.info(
1712 "Fetching tracks to play for playlist %s",
1713 playlist.name,
1714 )
1715 # TODO: Handle other sort options etc.
1716 async for playlist_track in self.mass.music.playlists.tracks(
1717 playlist.item_id, playlist.provider
1718 ):
1719 if not playlist_track.available:
1720 continue
1721 if start_item in (playlist_track.item_id, playlist_track.uri):
1722 start_item_found = True
1723 if start_item is not None and not start_item_found:
1724 continue
1725 result.append(playlist_track)
1726 return result
1727
1728 async def get_audiobook_resume_point(
1729 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1730 ) -> int:
1731 """Return resume point (in milliseconds) for given audio book."""
1732 self.logger.debug(
1733 "Fetching resume point to play for audio book %s",
1734 audio_book.name,
1735 )
1736 if chapter is not None:
1737 # user explicitly selected a chapter to play
1738 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1739 if chapters := audio_book.metadata.chapters:
1740 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1741 return int(_chapter.start * 1000)
1742 raise InvalidDataError(
1743 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1744 )
1745 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1746 audio_book, userid=userid
1747 )
1748 return 0 if full_played else resume_position_ms
1749
1750 async def get_next_podcast_episodes(
1751 self,
1752 podcast: Podcast | None,
1753 episode: PodcastEpisode | str | None,
1754 userid: str | None = None,
1755 ) -> UniqueList[PodcastEpisode]:
1756 """Return (next) episode(s) and resume point for given podcast."""
1757 if podcast is None and isinstance(episode, str | NoneType):
1758 raise InvalidDataError("Either podcast or episode must be provided")
1759 if podcast is None:
1760 # single podcast episode requested
1761 assert isinstance(episode, PodcastEpisode) # checked above
1762 self.logger.debug(
1763 "Fetching resume point to play for Podcast episode %s",
1764 episode.name,
1765 )
1766 (
1767 fully_played,
1768 resume_position_ms,
1769 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1770 episode.fully_played = fully_played
1771 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1772 return UniqueList([episode])
1773 # podcast with optional start episode requested
1774 self.logger.debug(
1775 "Fetching episode(s) and resume point to play for Podcast %s",
1776 podcast.name,
1777 )
1778 all_episodes = [
1779 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1780 ]
1781 all_episodes.sort(key=lambda x: x.position)
1782 # if a episode was provided, a user explicitly selected a episode to play
1783 # so we need to find the index of the episode in the list
1784 resolved_episode: PodcastEpisode | None = None
1785 if isinstance(episode, PodcastEpisode):
1786 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1787 if resolved_episode:
1788 # ensure we have accurate resume info
1789 (
1790 fully_played,
1791 resume_position_ms,
1792 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1793 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1794 elif isinstance(episode, str):
1795 resolved_episode = next(
1796 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1797 )
1798 if resolved_episode:
1799 # ensure we have accurate resume info
1800 (
1801 fully_played,
1802 resume_position_ms,
1803 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1804 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1805 else:
1806 # get first episode that is not fully played
1807 for ep in all_episodes:
1808 if ep.fully_played:
1809 continue
1810 # ensure we have accurate resume info
1811 (
1812 fully_played,
1813 resume_position_ms,
1814 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1815 if fully_played:
1816 continue
1817 ep.resume_position_ms = resume_position_ms
1818 resolved_episode = ep
1819 break
1820 else:
1821 # no episodes found that are not fully played, so we start at the beginning
1822 resolved_episode = next((x for x in all_episodes), None)
1823 if resolved_episode is None:
1824 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1825 # get the index of the episode
1826 episode_index = all_episodes.index(resolved_episode)
1827 # return the (remaining) episode(s) to play
1828 return UniqueList(all_episodes[episode_index:])
1829
1830 def _get_next_index(
1831 self,
1832 queue_id: str,
1833 cur_index: int | None,
1834 is_skip: bool = False,
1835 allow_repeat: bool = True,
1836 ) -> int | None:
1837 """
1838 Return the next index for the queue, accounting for repeat settings.
1839
1840 Will return None if there are no (more) items in the queue.
1841 """
1842 queue = self._queues[queue_id]
1843 queue_items = self._queue_items[queue_id]
1844 if not queue_items or cur_index is None:
1845 # queue is empty
1846 return None
1847 # handle repeat single track
1848 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1849 return cur_index if allow_repeat else None
1850 # handle cur_index is last index of the queue
1851 if cur_index >= (len(queue_items) - 1):
1852 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1853 # if repeat all is enabled, we simply start again from the beginning
1854 return 0
1855 return None
1856 # all other: just the next index
1857 return cur_index + 1
1858
1859 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1860 """Return next QueueItem for given queue."""
1861 index: int
1862 if isinstance(cur_index, str):
1863 resolved_index = self.index_by_id(queue_id, cur_index)
1864 if resolved_index is None:
1865 return None # guard
1866 index = resolved_index
1867 else:
1868 index = cur_index
1869 # At this point index is guaranteed to be int
1870 for skip in range(5):
1871 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1872 break
1873 next_item = self.get_item(queue_id, next_index)
1874 if next_item is None:
1875 continue
1876 if not next_item.available:
1877 # ensure that we skip unavailable items (set by load_next track logic)
1878 continue
1879 return next_item
1880 return None
1881
1882 async def _fill_radio_tracks(self, queue_id: str) -> None:
1883 """Fill a Queue with (additional) Radio tracks."""
1884 self.logger.debug(
1885 "Filling radio tracks for queue %s",
1886 queue_id,
1887 )
1888 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1889 # fill queue - filter out unavailable items
1890 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1891 await self.load(
1892 queue_id,
1893 queue_items,
1894 insert_at_index=len(self._queue_items[queue_id]) + 1,
1895 )
1896
1897 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1898 """Enqueue the next item on the player."""
1899 if not next_item:
1900 # no next item, nothing to do...
1901 return
1902
1903 queue = self._queues[queue_id]
1904 if queue.flow_mode:
1905 # ignore this for flow mode
1906 return
1907
1908 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1909 await self.mass.players.enqueue_next_media(
1910 player_id=queue_id,
1911 media=await self.player_media_from_queue_item(next_item, False),
1912 )
1913 if queue.next_item_id_enqueued != next_item.queue_item_id:
1914 queue.next_item_id_enqueued = next_item.queue_item_id
1915 self.logger.debug(
1916 "Enqueued next track %s on queue %s",
1917 next_item.name,
1918 self._queues[queue_id].display_name,
1919 )
1920
1921 task_id = f"enqueue_next_item_{queue_id}"
1922 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1923
1924 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1925 """
1926 Preload the streamdetails for the next item in the queue/buffer.
1927
1928 This basically ensures the item is playable and fetches the stream details.
1929 If an error occurs, the item will be skipped and the next item will be loaded.
1930 """
1931 queue = self._queues[queue_id]
1932
1933 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1934 try:
1935 # wait for the item that was loaded in the buffer is the actually playing item
1936 # this prevents a race condition when we preload the next item too soon
1937 # while the player is actually preloading the previously enqueued item.
1938 retries = 120
1939 while retries > 0:
1940 if not queue.current_item:
1941 return # guard
1942 if queue.current_item.queue_item_id == item_id_in_buffer:
1943 break
1944 retries -= 1
1945 await asyncio.sleep(1)
1946 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1947 self.logger.debug(
1948 "Preloaded next item %s for queue %s",
1949 next_item.name,
1950 queue.display_name,
1951 )
1952 # enqueue the next item on the player
1953 self._enqueue_next_item(queue_id, next_item)
1954
1955 except QueueEmpty:
1956 return
1957
1958 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1959 # this should not happen, but guard anyways
1960 return
1961 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1962 # radio items or no duration, nothing to do
1963 return
1964
1965 task_id = f"preload_next_item_{queue_id}"
1966 self.mass.create_task(
1967 _preload_streamdetails,
1968 item_id_in_buffer,
1969 task_id=task_id,
1970 abort_existing=True,
1971 )
1972
1973 async def _resolve_media_items(
1974 self,
1975 media_item: MediaItemType | ItemMapping | BrowseFolder,
1976 start_item: str | None = None,
1977 userid: str | None = None,
1978 queue_id: str | None = None,
1979 ) -> list[MediaItemType]:
1980 """Resolve/unwrap media items to enqueue."""
1981 # resolve Itemmapping to full media item
1982 if isinstance(media_item, ItemMapping):
1983 if media_item.uri is None:
1984 raise InvalidDataError("ItemMapping has no URI")
1985 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1986 if media_item.media_type == MediaType.PLAYLIST:
1987 media_item = cast("Playlist", media_item)
1988 self.mass.create_task(
1989 self.mass.music.mark_item_played(
1990 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1991 )
1992 )
1993 return list(await self.get_playlist_tracks(media_item, start_item))
1994 if media_item.media_type == MediaType.ARTIST:
1995 media_item = cast("Artist", media_item)
1996 self.mass.create_task(
1997 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1998 )
1999 return list(await self.get_artist_tracks(media_item))
2000 if media_item.media_type == MediaType.ALBUM:
2001 media_item = cast("Album", media_item)
2002 self.mass.create_task(
2003 self.mass.music.mark_item_played(
2004 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2005 )
2006 )
2007 return list(await self.get_album_tracks(media_item, start_item))
2008 if media_item.media_type == MediaType.GENRE:
2009 media_item = cast("Genre", media_item)
2010 self.mass.create_task(
2011 self.mass.music.mark_item_played(
2012 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2013 )
2014 )
2015 return list(await self.get_genre_tracks(media_item, start_item))
2016 if media_item.media_type == MediaType.AUDIOBOOK:
2017 media_item = cast("Audiobook", media_item)
2018 # ensure we grab the correct/latest resume point info
2019 media_item.resume_position_ms = await self.get_audiobook_resume_point(
2020 media_item, start_item, userid=userid
2021 )
2022 return [media_item]
2023 if media_item.media_type == MediaType.PODCAST:
2024 media_item = cast("Podcast", media_item)
2025 self.mass.create_task(
2026 self.mass.music.mark_item_played(
2027 media_item, userid=userid, queue_id=queue_id, user_initiated=True
2028 )
2029 )
2030 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
2031 if media_item.media_type == MediaType.PODCAST_EPISODE:
2032 media_item = cast("PodcastEpisode", media_item)
2033 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
2034 if media_item.media_type == MediaType.FOLDER:
2035 media_item = cast("BrowseFolder", media_item)
2036 return list(await self._get_folder_tracks(media_item))
2037 # all other: single track or radio item
2038 return [cast("MediaItemType", media_item)]
2039
2040 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
2041 """Try to resume playback from playlog when queue is empty.
2042
2043 Attempts to find user-initiated recently played items in the following order:
2044 1. By userid AND queue_id
2045 2. By queue_id only
2046 3. By userid only (if available)
2047 4. Any recently played item
2048
2049 :param queue: The queue to resume playback on.
2050 :return: True if playback was started, False otherwise.
2051 """
2052 # Try different filter combinations in order of specificity
2053 filter_attempts: list[tuple[str | None, str | None, str]] = []
2054 if queue.userid:
2055 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
2056 filter_attempts.append((None, queue.queue_id, "queue_id match"))
2057 if queue.userid:
2058 filter_attempts.append((queue.userid, None, "userid match"))
2059 filter_attempts.append((None, None, "any recent item"))
2060
2061 for userid, queue_id, match_type in filter_attempts:
2062 items = await self.mass.music.recently_played(
2063 limit=5,
2064 fully_played_only=False,
2065 user_initiated_only=True,
2066 userid=userid,
2067 queue_id=queue_id,
2068 )
2069 for item in items:
2070 if not item.uri:
2071 continue
2072 try:
2073 await self.play_media(queue.queue_id, item)
2074 self.logger.info(
2075 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
2076 )
2077 return True
2078 except MusicAssistantError as err:
2079 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
2080 continue
2081
2082 return False
2083
2084 async def _get_radio_tracks(
2085 self, queue_id: str, is_initial_radio_mode: bool = False
2086 ) -> list[Track]:
2087 """Call the registered music providers for dynamic tracks."""
2088 queue = self._queues[queue_id]
2089 queue_track_items: list[Track] = [
2090 q.media_item
2091 for q in self._queue_items[queue_id]
2092 if q.media_item and isinstance(q.media_item, Track)
2093 ]
2094 if not queue.radio_source:
2095 # this may happen during race conditions as this method is called delayed
2096 return []
2097 self.logger.info(
2098 "Fetching radio tracks for queue %s based on: %s",
2099 queue.display_name,
2100 ", ".join([x.name for x in queue.radio_source]),
2101 )
2102
2103 # Get user's preferred provider instances for steering provider selection
2104 preferred_provider_instances: list[str] | None = None
2105 if (
2106 queue.userid
2107 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
2108 and playback_user.provider_filter
2109 ):
2110 preferred_provider_instances = playback_user.provider_filter
2111
2112 available_base_tracks: list[Track] = []
2113 base_track_sample_size = 5
2114 # Some providers have very deterministic similar track algorithms when providing
2115 # a single track item. When we have a radio mode based on 1 track and we have to
2116 # refill the queue (ie not initial radio mode), we use the play history as base tracks
2117 if (
2118 len(queue.radio_source) == 1
2119 and queue.radio_source[0].media_type == MediaType.TRACK
2120 and not is_initial_radio_mode
2121 ):
2122 available_base_tracks = queue_track_items
2123 else:
2124 # Grab all the available base tracks based on the selected source items.
2125 # shuffle the source items, just in case
2126 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
2127 ctrl = self.mass.music.get_controller(radio_item.media_type)
2128 try:
2129 available_base_tracks += [
2130 track
2131 for track in await ctrl.radio_mode_base_tracks(
2132 radio_item, # type: ignore[arg-type]
2133 preferred_provider_instances,
2134 )
2135 # Avoid duplicate base tracks
2136 if track not in available_base_tracks
2137 ]
2138 except UnsupportedFeaturedException as err:
2139 self.logger.debug(
2140 "Skip loading radio items for %s: %s ",
2141 radio_item.uri,
2142 str(err),
2143 )
2144 if not available_base_tracks:
2145 raise UnsupportedFeaturedException("Radio mode not available for source items")
2146
2147 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2148 base_tracks = random.sample(
2149 available_base_tracks,
2150 min(base_track_sample_size, len(available_base_tracks)),
2151 )
2152 # Use a set to avoid duplicate dynamic tracks
2153 dynamic_tracks: set[Track] = set()
2154 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2155 for allow_lookup in (False, True):
2156 if dynamic_tracks:
2157 break
2158 for base_track in base_tracks:
2159 try:
2160 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2161 base_track.item_id,
2162 base_track.provider,
2163 allow_lookup=allow_lookup,
2164 preferred_provider_instances=preferred_provider_instances,
2165 )
2166 except MediaNotFoundError:
2167 # Some providers don't have similar tracks for all items. For example,
2168 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2169 # in that case, just skip the track.
2170 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2171 continue
2172 for track in _similar_tracks:
2173 if (
2174 track not in base_tracks
2175 # Exclude tracks we have already played / queued
2176 and track not in queue_track_items
2177 # Ignore tracks that are too long for radio mode, e.g. mixes
2178 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2179 ):
2180 dynamic_tracks.add(track)
2181 if len(dynamic_tracks) >= 50:
2182 break
2183 queue_tracks: list[Track] = []
2184 dynamic_tracks_list = list(dynamic_tracks)
2185 # Only include the sampled base tracks when the radio mode is first initialized
2186 if is_initial_radio_mode:
2187 queue_tracks += [base_tracks[0]]
2188 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2189 if len(base_tracks) > 1:
2190 for base_track in base_tracks[1:]:
2191 queue_tracks += [base_track]
2192 if len(dynamic_tracks_list) > 2:
2193 queue_tracks += random.sample(dynamic_tracks_list, 2)
2194 else:
2195 queue_tracks += dynamic_tracks_list
2196 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2197 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2198 if remaining_dynamic_tracks:
2199 queue_tracks += random.sample(
2200 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2201 )
2202 return queue_tracks
2203
2204 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2205 """Fetch (playable) tracks for given browse folder."""
2206 self.logger.info(
2207 "Fetching tracks to play for folder %s",
2208 folder.name,
2209 )
2210 tracks: list[Track] = []
2211 for item in await self.mass.music.browse(folder.path):
2212 if not item.is_playable:
2213 continue
2214 # recursively fetch tracks from all media types
2215 resolved = await self._resolve_media_items(item)
2216 tracks += [x for x in resolved if isinstance(x, Track)]
2217
2218 return tracks
2219
2220 def _update_queue_from_player(
2221 self,
2222 player: Player,
2223 ) -> None:
2224 """Update the Queue when the player state changed."""
2225 queue_id = player.player_id
2226 queue = self._queues[queue_id]
2227
2228 # basic properties
2229 queue.display_name = player.state.name
2230 queue.available = player.state.available
2231 queue.items = len(self._queue_items[queue_id])
2232
2233 queue.state = (
2234 player.state.playback_state or PlaybackState.IDLE
2235 if queue.active
2236 else PlaybackState.IDLE
2237 )
2238 # update current item/index from player report
2239 if queue.active and queue.state in (
2240 PlaybackState.PLAYING,
2241 PlaybackState.PAUSED,
2242 ):
2243 # NOTE: If the queue is not playing (yet) we will not update the current index
2244 # to ensure we keep the previously known current index
2245 if queue.flow_mode:
2246 # flow mode active, the player is playing one long stream
2247 # so we need to calculate the current index and elapsed time
2248 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2249 elif item_id := self._parse_player_current_item_id(queue_id, player):
2250 # normal mode, the player itself will report the current item
2251 elapsed_time = int(player.state.corrected_elapsed_time or 0)
2252 current_index = self.index_by_id(queue_id, item_id)
2253 else:
2254 # this may happen if the player is still transitioning between tracks
2255 # we ignore this for now and keep the current index as is
2256 return
2257
2258 # get current/next item based on current index
2259 queue.current_index = current_index
2260 queue.current_item = current_item = self.get_item(queue_id, current_index)
2261 queue.next_item = (
2262 self.get_next_item(queue_id, current_index)
2263 if current_item and current_index is not None
2264 else None
2265 )
2266
2267 # correct elapsed time when seeking
2268 if (
2269 not queue.flow_mode
2270 and current_item
2271 and current_item.streamdetails
2272 and current_item.streamdetails.seek_position
2273 ):
2274 elapsed_time += current_item.streamdetails.seek_position
2275 queue.elapsed_time = elapsed_time
2276 queue.elapsed_time_last_updated = time.time()
2277
2278 elif not queue.current_item and queue.current_index is not None:
2279 current_index = queue.current_index
2280 queue.current_item = current_item = self.get_item(queue_id, current_index)
2281 queue.next_item = (
2282 self.get_next_item(queue_id, current_index)
2283 if current_item and current_index is not None
2284 else None
2285 )
2286
2287 # This is enough to detect any changes in the DSPDetails
2288 # (so child count changed, or any output format changed)
2289 output_formats = []
2290 if output_format := player.extra_data.get("output_format"):
2291 output_formats.append(str(output_format))
2292 for child_id in player.state.group_members:
2293 if (child := self.mass.players.get_player(child_id)) and (
2294 output_format := child.extra_data.get("output_format")
2295 ):
2296 output_formats.append(str(output_format))
2297 else:
2298 output_formats.append("unknown")
2299
2300 # basic throttle: do not send state changed events if queue did not actually change
2301 prev_state: CompareState = self._prev_states.get(
2302 queue_id,
2303 CompareState(
2304 queue_id=queue_id,
2305 state=PlaybackState.IDLE,
2306 current_item_id=None,
2307 next_item_id=None,
2308 current_item=None,
2309 elapsed_time=0,
2310 last_playing_elapsed_time=0,
2311 stream_title=None,
2312 codec_type=None,
2313 output_formats=None,
2314 ),
2315 )
2316 # update last_playing_elapsed_time only when the player is actively playing
2317 # use corrected_elapsed_time which accounts for time since last update
2318 # this preserves the last known elapsed time when transitioning to idle/paused
2319 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2320 prev_item_id = prev_state["current_item_id"]
2321 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2322 if queue.state == PlaybackState.PLAYING:
2323 current_elapsed = int(queue.corrected_elapsed_time)
2324 if current_item_id != prev_item_id:
2325 # new track started, reset the elapsed time tracker
2326 last_playing_elapsed_time = current_elapsed
2327 else:
2328 # same track, use the max of current and previous to handle timing issues
2329 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2330 else:
2331 last_playing_elapsed_time = prev_playing_elapsed
2332 new_state = CompareState(
2333 queue_id=queue_id,
2334 state=queue.state,
2335 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2336 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2337 current_item=queue.current_item,
2338 elapsed_time=int(queue.elapsed_time),
2339 last_playing_elapsed_time=last_playing_elapsed_time,
2340 stream_title=(
2341 queue.current_item.streamdetails.stream_title
2342 if queue.current_item and queue.current_item.streamdetails
2343 else None
2344 ),
2345 codec_type=(
2346 queue.current_item.streamdetails.audio_format.codec_type
2347 if queue.current_item and queue.current_item.streamdetails
2348 else None
2349 ),
2350 output_formats=output_formats,
2351 )
2352 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2353 with suppress(KeyError):
2354 changed_keys.remove("next_item_id")
2355 with suppress(KeyError):
2356 changed_keys.remove("last_playing_elapsed_time")
2357
2358 # store the new state
2359 if queue.active:
2360 self._prev_states[queue_id] = new_state
2361 else:
2362 self._prev_states.pop(queue_id, None)
2363
2364 # return early if nothing changed
2365 if len(changed_keys) == 0:
2366 return
2367
2368 # signal update and store state
2369 send_update = True
2370 if changed_keys == {"elapsed_time"}:
2371 # only elapsed time changed, do not send full queue update
2372 send_update = False
2373 prev_time = prev_state.get("elapsed_time") or 0
2374 cur_time = new_state.get("elapsed_time") or 0
2375 if abs(cur_time - prev_time) > 2:
2376 # send dedicated event for time updates when seeking
2377 self.mass.signal_event(
2378 EventType.QUEUE_TIME_UPDATED,
2379 object_id=queue_id,
2380 data=queue.elapsed_time,
2381 )
2382 # also signal update to the player itself so it can update its current_media
2383 self.mass.players.trigger_player_update(queue_id)
2384
2385 if send_update:
2386 self.signal_update(queue_id)
2387
2388 if "output_formats" in changed_keys:
2389 # refresh DSP details since they may have changed
2390 dsp = get_stream_dsp_details(self.mass, queue_id)
2391 if queue.current_item and queue.current_item.streamdetails:
2392 queue.current_item.streamdetails.dsp = dsp
2393 if queue.next_item and queue.next_item.streamdetails:
2394 queue.next_item.streamdetails.dsp = dsp
2395
2396 # handle updating stream_metadata if needed
2397 if (
2398 queue.current_item
2399 and (streamdetails := queue.current_item.streamdetails)
2400 and streamdetails.stream_metadata_update_callback
2401 and (
2402 streamdetails.stream_metadata_last_updated is None
2403 or (
2404 time.time() - streamdetails.stream_metadata_last_updated
2405 >= streamdetails.stream_metadata_update_interval
2406 )
2407 )
2408 ):
2409 streamdetails.stream_metadata_last_updated = time.time()
2410 self.mass.create_task(
2411 streamdetails.stream_metadata_update_callback(
2412 streamdetails, int(queue.corrected_elapsed_time)
2413 )
2414 )
2415
2416 # handle sending a playback progress report
2417 # we do this every 30 seconds or when the state changes
2418 if (
2419 changed_keys.intersection({"state", "current_item_id"})
2420 or int(queue.elapsed_time) % PLAYBACK_REPORT_INTERVAL_SECONDS == 0
2421 ):
2422 self._handle_playback_progress_report(queue, prev_state, new_state)
2423
2424 # check if we need to clear the queue if we reached the end
2425 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2426 self._handle_end_of_queue(queue, prev_state, new_state)
2427
2428 # watch dynamic radio items refill if needed
2429 if "current_item_id" in changed_keys:
2430 # auto enable radio mode if dont stop the music is enabled
2431 if (
2432 queue.dont_stop_the_music_enabled
2433 and queue.enqueued_media_items
2434 and queue.current_index is not None
2435 and (queue.items - queue.current_index) <= 1
2436 ):
2437 # We have received the last item in the queue and Don't stop the music is enabled
2438 # set the played media item(s) as radio items (which will refill the queue)
2439 # note that this will fail if there are no media items for which we have
2440 # a dynamic radio source.
2441 self.logger.debug(
2442 "End of queue detected and Don't stop the music is enabled for %s"
2443 " - setting enqueued media items as radio source: %s",
2444 queue.display_name,
2445 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2446 )
2447 queue.radio_source = queue.enqueued_media_items
2448 # auto fill radio tracks if less than 5 tracks left in the queue
2449 if (
2450 queue.radio_source
2451 and queue.current_index is not None
2452 and (queue.items - queue.current_index) < 5
2453 ):
2454 task_id = f"fill_radio_tracks_{queue_id}"
2455 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2456
2457 def _get_flow_queue_stream_index(
2458 self, queue: PlayerQueue, player: Player
2459 ) -> tuple[int | None, int]:
2460 """Calculate current queue index and current track elapsed time when flow mode is active."""
2461 elapsed_time_queue_total = player.state.corrected_elapsed_time or 0
2462 if queue.current_index is None and not queue.flow_mode_stream_log:
2463 return queue.current_index, int(queue.elapsed_time)
2464
2465 # For each track that has been streamed/buffered to the player,
2466 # a playlog entry will be created with the queue item id
2467 # and the amount of seconds streamed. We traverse the playlog to figure
2468 # out where we are in the queue, accounting for actual streamed
2469 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2470 # it will simply be in the playlog multiple times.
2471 played_time = 0.0
2472 queue_index: int | None = queue.current_index or 0
2473 track_time = 0.0
2474 for play_log_entry in queue.flow_mode_stream_log:
2475 queue_item_duration = (
2476 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2477 play_log_entry.seconds_streamed
2478 if play_log_entry.seconds_streamed is not None
2479 else play_log_entry.duration or 3600 * 24 * 7
2480 )
2481 if elapsed_time_queue_total > (queue_item_duration + played_time):
2482 # total elapsed time is more than (streamed) track duration
2483 # this track has been fully played, move on.
2484 played_time += queue_item_duration
2485 else:
2486 # no more seconds left to divide, this is our track
2487 # account for any seeking by adding the skipped/seeked seconds
2488 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2489 queue_item = self.get_item(queue.queue_id, queue_index)
2490 if queue_item and queue_item.streamdetails:
2491 track_sec_skipped = queue_item.streamdetails.seek_position
2492 else:
2493 track_sec_skipped = 0
2494 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2495 break
2496 if player.state.playback_state != PlaybackState.PLAYING:
2497 # if the player is not playing, we can't be sure that the elapsed time is correct
2498 # so we just return the queue index and the elapsed time
2499 return queue.current_index, int(queue.elapsed_time)
2500 return queue_index, int(track_time)
2501
2502 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2503 """Parse QueueItem ID from Player's current url."""
2504 protocol_player = player
2505 if player.active_output_protocol and player.active_output_protocol != "native":
2506 protocol_player = self.mass.players.get_player(player.active_output_protocol) or player
2507 if not protocol_player.current_media:
2508 # YES, we use player.current_media on purpose here because we need the raw metadata
2509 return None
2510 # prefer queue_id and queue_item_id within the current media
2511 if (
2512 protocol_player.current_media.source_id == queue_id
2513 and protocol_player.current_media.queue_item_id
2514 ):
2515 return protocol_player.current_media.queue_item_id
2516 # special case for sonos players
2517 if protocol_player.current_media.uri and protocol_player.current_media.uri.startswith(
2518 f"mass:{queue_id}"
2519 ):
2520 if protocol_player.current_media.queue_item_id:
2521 return protocol_player.current_media.queue_item_id
2522 current_item_id = protocol_player.current_media.uri.split(":")[-1]
2523 if self.get_item(queue_id, current_item_id):
2524 return current_item_id
2525 return None
2526 # try to extract the item id from a mass stream url
2527 if (
2528 protocol_player.current_media.uri
2529 and queue_id in protocol_player.current_media.uri
2530 and self.mass.streams.base_url in protocol_player.current_media.uri
2531 ):
2532 current_item_id = protocol_player.current_media.uri.rsplit("/")[-1].split(".")[0]
2533 if self.get_item(queue_id, current_item_id):
2534 return current_item_id
2535 # try to extract the item id from a queue_id/item_id combi
2536 if (
2537 protocol_player.current_media.uri
2538 and queue_id in protocol_player.current_media.uri
2539 and "/" in protocol_player.current_media.uri
2540 ):
2541 current_item_id = protocol_player.current_media.uri.split("/")[1]
2542 if self.get_item(queue_id, current_item_id):
2543 return current_item_id
2544
2545 return None
2546
2547 def _handle_end_of_queue(
2548 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2549 ) -> None:
2550 """Check if the queue should be cleared after the current item."""
2551 # check if queue state changed to stopped (from playing/paused to idle)
2552 if not (
2553 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2554 and new_state["state"] == PlaybackState.IDLE
2555 ):
2556 return
2557 # check if no more items in the queue (next_item should be None at end of queue)
2558 if queue.next_item is not None:
2559 return
2560 # check if we had a previous item playing
2561 if prev_state["current_item_id"] is None:
2562 return
2563
2564 async def _clear_queue_delayed() -> None:
2565 for _ in range(5):
2566 await asyncio.sleep(1)
2567 if queue.state != PlaybackState.IDLE:
2568 return
2569 if queue.next_item is not None:
2570 return
2571 self.logger.info("End of queue reached, clearing items")
2572 self.clear(queue.queue_id)
2573
2574 # all checks passed, we stopped playback at the last (or single) track of the queue
2575 # now determine if the item was fully played before clearing
2576
2577 # For flow mode, check if the last track was fully streamed using the stream log
2578 # This is more reliable than elapsed_time which can be reset/incorrect
2579 if queue.flow_mode and queue.flow_mode_stream_log:
2580 last_log_entry = queue.flow_mode_stream_log[-1]
2581 if last_log_entry.seconds_streamed is not None:
2582 # The last track finished streaming, safe to clear queue
2583 self.mass.create_task(_clear_queue_delayed())
2584 return
2585
2586 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2587 prev_item = prev_state["current_item"]
2588 if prev_item and (streamdetails := prev_item.streamdetails):
2589 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2590 elif prev_item:
2591 duration = prev_item.duration or 24 * 3600
2592 else:
2593 # No current item means player has already cleared it, safe to clear queue
2594 self.mass.create_task(_clear_queue_delayed())
2595 return
2596
2597 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2598 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2599 seconds_played = int(prev_state["last_playing_elapsed_time"])
2600 # debounce this a bit to make sure we're not clearing the queue by accident
2601 # only clear if the last track was played to near completion (within 5 seconds of end)
2602 if seconds_played >= (duration or 3600) - 5:
2603 self.mass.create_task(_clear_queue_delayed())
2604
2605 def _handle_playback_progress_report(
2606 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2607 ) -> None:
2608 """Handle playback progress report."""
2609 # detect change in current index to report that a item has been played
2610 prev_item_id = prev_state["current_item_id"]
2611 cur_item_id = new_state["current_item_id"]
2612 if prev_item_id is None and cur_item_id is None:
2613 return
2614
2615 if prev_item_id is not None and prev_item_id != cur_item_id:
2616 # we have a new item, so we need report the previous one
2617 is_current_item = False
2618 item_to_report = prev_state["current_item"]
2619 seconds_played = int(prev_state["elapsed_time"])
2620 else:
2621 # report on current item
2622 is_current_item = True
2623 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2624 seconds_played = int(new_state["elapsed_time"])
2625
2626 if not item_to_report:
2627 return # guard against invalid items
2628
2629 if not (media_item := item_to_report.media_item):
2630 # only report on media items
2631 return
2632 assert media_item.uri is not None # uri is set in __post_init__
2633
2634 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2635 # Ignore items that had a stream error
2636 return
2637
2638 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2639 duration = int(item_to_report.streamdetails.duration)
2640 else:
2641 duration = int(item_to_report.duration or 3 * 3600)
2642
2643 if seconds_played < 5:
2644 # ignore items that have been played less than 5 seconds
2645 # this also filters out a bounce effect where the previous item
2646 # gets reported with 0 elapsed seconds after a new item starts playing
2647 return
2648
2649 # determine if item is fully played
2650 # for podcasts and audiobooks we account for the last 60 seconds
2651 percentage_played = percentage(seconds_played, duration)
2652 if not is_current_item and item_to_report.media_type in (
2653 MediaType.AUDIOBOOK,
2654 MediaType.PODCAST_EPISODE,
2655 ):
2656 fully_played = seconds_played >= duration - 60
2657 elif not is_current_item:
2658 # 90% of the track must be played to be considered fully played
2659 fully_played = percentage_played >= 90
2660 else:
2661 fully_played = seconds_played >= duration - 10
2662
2663 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2664 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2665 self.logger.debug(
2666 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2667 queue.display_name,
2668 "is playing" if is_playing else "played",
2669 item_to_report.name,
2670 item_to_report.uri,
2671 fully_played,
2672 f"{percentage_played}%",
2673 seconds_played,
2674 duration,
2675 )
2676 # add entry to playlog - this also handles resume of podcasts/audiobooks
2677 self.mass.create_task(
2678 self.mass.music.mark_item_played(
2679 media_item,
2680 fully_played=fully_played,
2681 seconds_played=seconds_played,
2682 is_playing=is_playing,
2683 userid=queue.userid,
2684 queue_id=queue.queue_id,
2685 user_initiated=False,
2686 )
2687 )
2688
2689 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2690 # signal 'media item played' event,
2691 # which is useful for plugins that want to do scrobbling
2692 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2693 artists_names = [a.name for a in artists]
2694 self.mass.signal_event(
2695 EventType.MEDIA_ITEM_PLAYED,
2696 object_id=media_item.uri,
2697 data=MediaItemPlaybackProgressReport(
2698 uri=media_item.uri,
2699 media_type=media_item.media_type,
2700 name=media_item.name,
2701 version=getattr(media_item, "version", None),
2702 artist=(
2703 getattr(media_item, "artist_str", None) or artists_names[0]
2704 if artists_names
2705 else None
2706 ),
2707 artists=artists_names,
2708 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2709 album=album.name if album else None,
2710 album_mbid=album.mbid if album else None,
2711 album_artist=(album.artist_str if isinstance(album, Album) else None),
2712 album_artist_mbids=(
2713 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2714 ),
2715 image_url=(
2716 self.mass.metadata.get_image_url(
2717 item_to_report.media_item.image, prefer_proxy=False
2718 )
2719 if item_to_report.media_item.image
2720 else None
2721 ),
2722 duration=duration,
2723 mbid=(getattr(media_item, "mbid", None)),
2724 seconds_played=seconds_played,
2725 fully_played=fully_played,
2726 is_playing=is_playing,
2727 userid=queue.userid,
2728 ),
2729 )
2730
2731
2732async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2733 """Shuffle queue items, avoiding identical tracks next to each other.
2734
2735 Best-effort approach to prevent the same track from appearing adjacent.
2736 Does a random shuffle first, then makes a limited number of passes to
2737 swap adjacent duplicates with a random item further in the list.
2738
2739 :param items: List of queue items to shuffle.
2740 """
2741 if len(items) <= 2:
2742 return random.sample(items, len(items)) if len(items) == 2 else items
2743
2744 # Start with a random shuffle
2745 shuffled = random.sample(items, len(items))
2746
2747 # Make a few passes to fix adjacent duplicates
2748 max_passes = 3
2749 for _ in range(max_passes):
2750 swapped = False
2751 for i in range(len(shuffled) - 1):
2752 if shuffled[i].name == shuffled[i + 1].name:
2753 # Found adjacent duplicate - swap with random position at least 2 away
2754 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2755 if swap_candidates:
2756 swap_pos = random.choice(swap_candidates)
2757 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2758 swapped = True
2759 if not swapped:
2760 break
2761 # Yield to event loop between passes
2762 await asyncio.sleep(0)
2763
2764 return shuffled
2765