/
/
/
1"""
2MusicAssistant Player Queues Controller.
3
4Handles all logic to PLAY Media Items, provided by Music Providers to supported players.
5
6It is loosely coupled to the MusicAssistant Music Controller and Player Controller.
7A Music Assistant Player always has a PlayerQueue associated with it
8which holds the queue items and state.
9
10The PlayerQueue is in that case the active source of the player,
11but it can also be something else, hence the loose coupling.
12"""
13
14from __future__ import annotations
15
16import asyncio
17import random
18import time
19from contextlib import suppress
20from types import NoneType
21from typing import TYPE_CHECKING, Any, TypedDict, cast
22
23import shortuuid
24from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
25from music_assistant_models.enums import (
26 ConfigEntryType,
27 ContentType,
28 EventType,
29 MediaType,
30 PlaybackState,
31 PlayerFeature,
32 ProviderFeature,
33 QueueOption,
34 RepeatMode,
35)
36from music_assistant_models.errors import (
37 AudioError,
38 InvalidCommand,
39 InvalidDataError,
40 MediaNotFoundError,
41 MusicAssistantError,
42 PlayerUnavailableError,
43 QueueEmpty,
44 UnsupportedFeaturedException,
45)
46from music_assistant_models.media_items import (
47 Album,
48 Artist,
49 Audiobook,
50 BrowseFolder,
51 ItemMapping,
52 MediaItemType,
53 PlayableMediaItemType,
54 Playlist,
55 Podcast,
56 PodcastEpisode,
57 Track,
58 UniqueList,
59 media_from_dict,
60)
61from music_assistant_models.playback_progress_report import MediaItemPlaybackProgressReport
62from music_assistant_models.player_queue import PlayerQueue
63from music_assistant_models.queue_item import QueueItem
64
65from music_assistant.constants import (
66 ATTR_ANNOUNCEMENT_IN_PROGRESS,
67 MASS_LOGO_ONLINE,
68 VERBOSE_LOG_LEVEL,
69)
70from music_assistant.controllers.webserver.helpers.auth_middleware import get_current_user
71from music_assistant.helpers.api import api_command
72from music_assistant.helpers.audio import get_stream_details, get_stream_dsp_details
73from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
74from music_assistant.helpers.util import get_changed_keys, percentage
75from music_assistant.models.core_controller import CoreController
76from music_assistant.models.player import Player, PlayerMedia
77
78if TYPE_CHECKING:
79 from collections.abc import Iterator
80
81 from music_assistant_models.auth import User
82 from music_assistant_models.media_items.metadata import MediaItemImage
83
84 from music_assistant import MusicAssistant
85 from music_assistant.models.player import Player
86
87
88CONF_DEFAULT_ENQUEUE_SELECT_ARTIST = "default_enqueue_select_artist"
89CONF_DEFAULT_ENQUEUE_SELECT_ALBUM = "default_enqueue_select_album"
90
91ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE = "all_tracks"
92ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE = "all_tracks"
93
94CONF_DEFAULT_ENQUEUE_OPTION_ARTIST = "default_enqueue_option_artist"
95CONF_DEFAULT_ENQUEUE_OPTION_ALBUM = "default_enqueue_option_album"
96CONF_DEFAULT_ENQUEUE_OPTION_TRACK = "default_enqueue_option_track"
97CONF_DEFAULT_ENQUEUE_OPTION_RADIO = "default_enqueue_option_radio"
98CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST = "default_enqueue_option_playlist"
99CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK = "default_enqueue_option_audiobook"
100CONF_DEFAULT_ENQUEUE_OPTION_PODCAST = "default_enqueue_option_podcast"
101CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE = "default_enqueue_option_podcast_episode"
102CONF_DEFAULT_ENQUEUE_OPTION_FOLDER = "default_enqueue_option_folder"
103CONF_DEFAULT_ENQUEUE_OPTION_UNKNOWN = "default_enqueue_option_unknown"
104RADIO_TRACK_MAX_DURATION_SECS = 20 * 60 # 20 minutes
105CACHE_CATEGORY_PLAYER_QUEUE_STATE = 0
106CACHE_CATEGORY_PLAYER_QUEUE_ITEMS = 1
107
108
109class CompareState(TypedDict):
110 """Simple object where we store the (previous) state of a queue.
111
112 Used for compare actions.
113 """
114
115 queue_id: str
116 state: PlaybackState
117 current_item_id: str | None
118 next_item_id: str | None
119 current_item: QueueItem | None
120 elapsed_time: int
121 # last_playing_elapsed_time: elapsed time from the last PLAYING state update
122 # used to determine if a track was fully played when transitioning to idle
123 last_playing_elapsed_time: int
124 stream_title: str | None
125 codec_type: ContentType | None
126 output_formats: list[str] | None
127
128
129class PlayerQueuesController(CoreController):
130 """Controller holding all logic to enqueue music for players."""
131
132 domain: str = "player_queues"
133
134 def __init__(self, mass: MusicAssistant) -> None:
135 """Initialize core controller."""
136 super().__init__(mass)
137 self._queues: dict[str, PlayerQueue] = {}
138 self._queue_items: dict[str, list[QueueItem]] = {}
139 self._prev_states: dict[str, CompareState] = {}
140 self._transitioning_players: set[str] = set()
141 self.manifest.name = "Player Queues controller"
142 self.manifest.description = (
143 "Music Assistant's core controller which manages the queues for all players."
144 )
145 self.manifest.icon = "playlist-music"
146
147 async def close(self) -> None:
148 """Cleanup on exit."""
149 # stop all playback
150 for queue in self.all():
151 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
152 await self.stop(queue.queue_id)
153
154 async def get_config_entries(
155 self,
156 action: str | None = None,
157 values: dict[str, ConfigValueType] | None = None,
158 ) -> tuple[ConfigEntry, ...]:
159 """Return all Config Entries for this core module (if any)."""
160 enqueue_options = [ConfigValueOption(x.name, x.value) for x in QueueOption]
161 return (
162 ConfigEntry(
163 key=CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
164 type=ConfigEntryType.STRING,
165 default_value=ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
166 label="Items to select when you play a (in-library) artist.",
167 options=[
168 ConfigValueOption(
169 title="Only in-library tracks",
170 value="library_tracks",
171 ),
172 ConfigValueOption(
173 title="All tracks from all albums in the library",
174 value="library_album_tracks",
175 ),
176 ConfigValueOption(
177 title="All (top) tracks from (all) streaming provider(s)",
178 value="all_tracks",
179 ),
180 ConfigValueOption(
181 title="All tracks from all albums from (all) streaming provider(s)",
182 value="all_album_tracks",
183 ),
184 ],
185 ),
186 ConfigEntry(
187 key=CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
188 type=ConfigEntryType.STRING,
189 default_value=ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
190 label="Items to select when you play a (in-library) album.",
191 options=[
192 ConfigValueOption(
193 title="Only in-library tracks",
194 value="library_tracks",
195 ),
196 ConfigValueOption(
197 title="All tracks for album on (streaming) provider",
198 value="all_tracks",
199 ),
200 ],
201 ),
202 ConfigEntry(
203 key=CONF_DEFAULT_ENQUEUE_OPTION_ARTIST,
204 type=ConfigEntryType.STRING,
205 default_value=QueueOption.REPLACE.value,
206 label="Default enqueue option for Artist item(s).",
207 options=enqueue_options,
208 description="Define the default enqueue action for this mediatype.",
209 ),
210 ConfigEntry(
211 key=CONF_DEFAULT_ENQUEUE_OPTION_ALBUM,
212 type=ConfigEntryType.STRING,
213 default_value=QueueOption.REPLACE.value,
214 label="Default enqueue option for Album item(s).",
215 options=enqueue_options,
216 description="Define the default enqueue action for this mediatype.",
217 ),
218 ConfigEntry(
219 key=CONF_DEFAULT_ENQUEUE_OPTION_TRACK,
220 type=ConfigEntryType.STRING,
221 default_value=QueueOption.PLAY.value,
222 label="Default enqueue option for Track item(s).",
223 options=enqueue_options,
224 description="Define the default enqueue action for this mediatype.",
225 ),
226 ConfigEntry(
227 key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
228 type=ConfigEntryType.STRING,
229 default_value=QueueOption.REPLACE.value,
230 label="Default enqueue option for Radio item(s).",
231 options=enqueue_options,
232 description="Define the default enqueue action for this mediatype.",
233 ),
234 ConfigEntry(
235 key=CONF_DEFAULT_ENQUEUE_OPTION_PLAYLIST,
236 type=ConfigEntryType.STRING,
237 default_value=QueueOption.REPLACE.value,
238 label="Default enqueue option for Playlist item(s).",
239 options=enqueue_options,
240 description="Define the default enqueue action for this mediatype.",
241 ),
242 ConfigEntry(
243 key=CONF_DEFAULT_ENQUEUE_OPTION_AUDIOBOOK,
244 type=ConfigEntryType.STRING,
245 default_value=QueueOption.REPLACE.value,
246 label="Default enqueue option for Audiobook item(s).",
247 options=enqueue_options,
248 hidden=True,
249 ),
250 ConfigEntry(
251 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST,
252 type=ConfigEntryType.STRING,
253 default_value=QueueOption.REPLACE.value,
254 label="Default enqueue option for Podcast item(s).",
255 options=enqueue_options,
256 hidden=True,
257 ),
258 ConfigEntry(
259 key=CONF_DEFAULT_ENQUEUE_OPTION_PODCAST_EPISODE,
260 type=ConfigEntryType.STRING,
261 default_value=QueueOption.REPLACE.value,
262 label="Default enqueue option for Podcast-episode item(s).",
263 options=enqueue_options,
264 hidden=True,
265 ),
266 ConfigEntry(
267 key=CONF_DEFAULT_ENQUEUE_OPTION_FOLDER,
268 type=ConfigEntryType.STRING,
269 default_value=QueueOption.REPLACE.value,
270 label="Default enqueue option for Folder item(s).",
271 options=enqueue_options,
272 hidden=True,
273 ),
274 )
275
276 def __iter__(self) -> Iterator[PlayerQueue]:
277 """Iterate over (available) players."""
278 return iter(self._queues.values())
279
280 @api_command("player_queues/all")
281 def all(self) -> tuple[PlayerQueue, ...]:
282 """Return all registered PlayerQueues."""
283 return tuple(self._queues.values())
284
285 @api_command("player_queues/get")
286 def get(self, queue_id: str) -> PlayerQueue | None:
287 """Return PlayerQueue by queue_id or None if not found."""
288 return self._queues.get(queue_id)
289
290 @api_command("player_queues/items")
291 def items(self, queue_id: str, limit: int = 500, offset: int = 0) -> list[QueueItem]:
292 """Return all QueueItems for given PlayerQueue."""
293 if queue_id not in self._queue_items:
294 return []
295
296 return self._queue_items[queue_id][offset : offset + limit]
297
298 @api_command("player_queues/get_active_queue")
299 def get_active_queue(self, player_id: str) -> PlayerQueue | None:
300 """Return the current active/synced queue for a player."""
301 if player := self.mass.players.get(player_id):
302 return self.mass.players.get_active_queue(player)
303 return None
304
305 # Queue commands
306
307 @api_command("player_queues/shuffle")
308 async def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
309 """Configure shuffle setting on the the queue."""
310 queue = self._queues[queue_id]
311 if queue.shuffle_enabled == shuffle_enabled:
312 return # no change
313 queue.shuffle_enabled = shuffle_enabled
314 queue_items = self._queue_items[queue_id]
315 cur_index = queue.index_in_buffer or queue.current_index
316 if cur_index is not None:
317 next_index = cur_index + 1
318 next_items = queue_items[next_index:]
319 else:
320 next_items = []
321 next_index = 0
322 if not shuffle_enabled:
323 # shuffle disabled, try to restore original sort order of the remaining items
324 next_items.sort(key=lambda x: x.sort_index, reverse=False)
325 await self.load(
326 queue_id=queue_id,
327 queue_items=next_items,
328 insert_at_index=next_index,
329 keep_remaining=False,
330 shuffle=shuffle_enabled,
331 )
332
333 @api_command("player_queues/dont_stop_the_music")
334 def set_dont_stop_the_music(self, queue_id: str, dont_stop_the_music_enabled: bool) -> None:
335 """Configure Don't stop the music setting on the queue."""
336 providers_available_with_similar_tracks = any(
337 ProviderFeature.SIMILAR_TRACKS in provider.supported_features
338 for provider in self.mass.music.providers
339 )
340 if dont_stop_the_music_enabled and not providers_available_with_similar_tracks:
341 raise UnsupportedFeaturedException(
342 "Don't stop the music is not supported by any of the available music providers"
343 )
344 queue = self._queues[queue_id]
345 queue.dont_stop_the_music_enabled = dont_stop_the_music_enabled
346 self.signal_update(queue_id=queue_id)
347 # if this happens to be the last track in the queue, fill the radio source
348 if (
349 queue.dont_stop_the_music_enabled
350 and queue.enqueued_media_items
351 and queue.current_index is not None
352 and (queue.items - queue.current_index) <= 1
353 ):
354 queue.radio_source = queue.enqueued_media_items
355 task_id = f"fill_radio_tracks_{queue_id}"
356 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
357
358 @api_command("player_queues/repeat")
359 def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
360 """Configure repeat setting on the the queue."""
361 queue = self._queues[queue_id]
362 if queue.repeat_mode == repeat_mode:
363 return # no change
364 queue.repeat_mode = repeat_mode
365 self.signal_update(queue_id)
366 if (
367 queue.state == PlaybackState.PLAYING
368 and queue.index_in_buffer is not None
369 and queue.index_in_buffer == queue.current_index
370 ):
371 # if the queue is playing,
372 # ensure to (re)queue the next track because it might have changed
373 # note that we only do this if the player has loaded the current track
374 # if not, we wait until it has loaded to prevent conflicts
375 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
376 self._enqueue_next_item(queue_id, next_item)
377
378 @api_command("player_queues/play_media")
379 async def play_media(
380 self,
381 queue_id: str,
382 media: MediaItemType | ItemMapping | str | list[MediaItemType | ItemMapping | str],
383 option: QueueOption | None = None,
384 radio_mode: bool = False,
385 start_item: PlayableMediaItemType | str | None = None,
386 username: str | None = None,
387 ) -> None:
388 """Play media item(s) on the given queue.
389
390 :param queue_id: The queue_id of the queue to play media on.
391 :param media: Media that should be played (MediaItem(s) and/or uri's).
392 :param option: Which enqueue mode to use.
393 :param radio_mode: Enable radio mode for the given item(s).
394 :param start_item: Optional item to start the playlist or album from.
395 :param username: The username of the user requesting the playback.
396 Setting the username allows for overriding the logged-in user
397 to account for playback history per user when the play_media is
398 called from a shared context (like a web hook or automation).
399 """
400 # ruff: noqa: PLR0915
401 # we use a contextvar to bypass the throttler for this asyncio task/context
402 # this makes sure that playback has priority over other requests that may be
403 # happening in the background
404 BYPASS_THROTTLER.set(True)
405 if not (queue := self.get(queue_id)):
406 raise PlayerUnavailableError(f"Queue {queue_id} is not available")
407 # always fetch the underlying player so we can raise early if its not available
408 queue_player = self.mass.players.get(queue_id, True)
409 assert queue_player is not None # for type checking
410 if queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
411 self.logger.warning("Ignore queue command: An announcement is in progress")
412 return
413
414 # save the user requesting the playback
415 playback_user: User | None
416 if username and (user := await self.mass.webserver.auth.get_user_by_username(username)):
417 playback_user = user
418 else:
419 playback_user = get_current_user()
420 queue.userid = playback_user.user_id if playback_user else None
421
422 # a single item or list of items may be provided
423 media_list = media if isinstance(media, list) else [media]
424
425 # clear queue if needed
426 if option == QueueOption.REPLACE:
427 self.clear(queue_id)
428 # Clear the 'enqueued media item' list when a new queue is requested
429 if option not in (QueueOption.ADD, QueueOption.NEXT):
430 queue.enqueued_media_items.clear()
431
432 media_items: list[MediaItemType] = []
433 radio_source: list[MediaItemType] = []
434 # resolve all media items
435 for item in media_list:
436 try:
437 # parse provided uri into a MA MediaItem or Basic QueueItem from URL
438 media_item: MediaItemType | ItemMapping | BrowseFolder
439 if isinstance(item, str):
440 media_item = await self.mass.music.get_item_by_uri(item)
441 elif isinstance(item, dict): # type: ignore[unreachable]
442 # TODO: Investigate why the API parser sometimes passes raw dicts instead of
443 # converting them to MediaItem objects. The parse_value function in api.py
444 # should handle dict-to-object conversion, but dicts are slipping through
445 # in some cases. This is defensive handling for that parser bug.
446 media_item = media_from_dict(item) # type: ignore[unreachable]
447 self.logger.debug("Converted to: %s", type(media_item))
448 else:
449 # item is MediaItemType | ItemMapping at this point
450 media_item = item
451
452 # Save requested media item to play on the queue so we can use it as a source
453 # for Don't stop the music. Use FIFO list to keep track of the last 10 played items
454 # Skip ItemMapping and BrowseFolder - only queue full MediaItemType objects
455 if not isinstance(
456 media_item, (ItemMapping, BrowseFolder)
457 ) and media_item.media_type in (
458 MediaType.TRACK,
459 MediaType.ALBUM,
460 MediaType.PLAYLIST,
461 MediaType.ARTIST,
462 ):
463 queue.enqueued_media_items.append(media_item)
464 if len(queue.enqueued_media_items) > 10:
465 queue.enqueued_media_items.pop(0)
466
467 # handle default enqueue option if needed
468 if option is None:
469 config_value = await self.mass.config.get_core_config_value(
470 self.domain,
471 f"default_enqueue_option_{media_item.media_type.value}",
472 return_type=str,
473 )
474 option = QueueOption(config_value)
475 if option == QueueOption.REPLACE:
476 self.clear(queue_id, skip_stop=True)
477
478 # collect media_items to play
479 if radio_mode:
480 # Type guard for mypy - only add full MediaItemType to radio_source
481 if not isinstance(media_item, (ItemMapping, BrowseFolder)):
482 radio_source.append(media_item)
483 else:
484 # Convert start_item to string URI if needed
485 start_item_uri: str | None = None
486 if isinstance(start_item, str):
487 start_item_uri = start_item
488 elif start_item is not None:
489 start_item_uri = start_item.uri
490 media_items += await self._resolve_media_items(
491 media_item, start_item_uri, queue_id=queue_id
492 )
493
494 except MusicAssistantError as err:
495 # invalid MA uri or item not found error
496 self.logger.warning("Skipping %s: %s", item, str(err))
497
498 # overwrite or append radio source items
499 if option not in (QueueOption.ADD, QueueOption.NEXT):
500 queue.radio_source = radio_source
501 else:
502 queue.radio_source += radio_source
503 # Use collected media items to calculate the radio if radio mode is on
504 if radio_mode:
505 radio_tracks = await self._get_radio_tracks(
506 queue_id=queue_id, is_initial_radio_mode=True
507 )
508 media_items = list(radio_tracks)
509
510 # only add valid/available items
511 queue_items: list[QueueItem] = []
512 for x in media_items:
513 if not x or not x.available:
514 continue
515 queue_items.append(
516 QueueItem.from_media_item(queue_id, cast("PlayableMediaItemType", x))
517 )
518
519 if not queue_items:
520 raise MediaNotFoundError("No playable items found")
521
522 # load the items into the queue
523 if queue.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
524 cur_index = queue.index_in_buffer or queue.current_index or 0
525 else:
526 cur_index = queue.current_index or 0
527 insert_at_index = cur_index + 1
528 # Radio modes are already shuffled in a pattern we would like to keep.
529 shuffle = queue.shuffle_enabled and len(queue_items) > 1 and not radio_mode
530
531 # handle replace: clear all items and replace with the new items
532 if option == QueueOption.REPLACE:
533 await self.load(
534 queue_id,
535 queue_items=queue_items,
536 keep_remaining=False,
537 keep_played=False,
538 shuffle=shuffle,
539 )
540 await self.play_index(queue_id, 0)
541 return
542 # handle next: add item(s) in the index next to the playing/loaded/buffered index
543 if option == QueueOption.NEXT:
544 await self.load(
545 queue_id,
546 queue_items=queue_items,
547 insert_at_index=insert_at_index,
548 shuffle=shuffle,
549 )
550 return
551 if option == QueueOption.REPLACE_NEXT:
552 await self.load(
553 queue_id,
554 queue_items=queue_items,
555 insert_at_index=insert_at_index,
556 keep_remaining=False,
557 shuffle=shuffle,
558 )
559 return
560 # handle play: replace current loaded/playing index with new item(s)
561 if option == QueueOption.PLAY:
562 await self.load(
563 queue_id,
564 queue_items=queue_items,
565 insert_at_index=insert_at_index,
566 shuffle=shuffle,
567 )
568 next_index = min(insert_at_index, len(self._queue_items[queue_id]) - 1)
569 await self.play_index(queue_id, next_index)
570 return
571 # handle add: add/append item(s) to the remaining queue items
572 if option == QueueOption.ADD:
573 await self.load(
574 queue_id=queue_id,
575 queue_items=queue_items,
576 insert_at_index=insert_at_index
577 if queue.shuffle_enabled
578 else len(self._queue_items[queue_id]) + 1,
579 shuffle=queue.shuffle_enabled,
580 )
581 # handle edgecase, queue is empty and items are only added (not played)
582 # mark first item as new index
583 if queue.current_index is None:
584 queue.current_index = 0
585 queue.current_item = self.get_item(queue_id, 0)
586 queue.items = len(queue_items)
587 self.signal_update(queue_id)
588
589 @api_command("player_queues/move_item")
590 def move_item(self, queue_id: str, queue_item_id: str, pos_shift: int = 1) -> None:
591 """
592 Move queue item x up/down the queue.
593
594 - queue_id: id of the queue to process this request.
595 - queue_item_id: the item_id of the queueitem that needs to be moved.
596 - pos_shift: move item x positions down if positive value
597 - pos_shift: move item x positions up if negative value
598 - pos_shift: move item to top of queue as next item if 0.
599 """
600 queue = self._queues[queue_id]
601 item_index = self.index_by_id(queue_id, queue_item_id)
602 if item_index is None:
603 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
604 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
605 msg = f"{item_index} is already played/buffered"
606 raise IndexError(msg)
607
608 queue_items = self._queue_items[queue_id]
609 queue_items = queue_items.copy()
610
611 if pos_shift == 0 and queue.state == PlaybackState.PLAYING:
612 new_index = (queue.current_index or 0) + 1
613 elif pos_shift == 0:
614 new_index = queue.current_index or 0
615 else:
616 new_index = item_index + pos_shift
617 if (new_index < (queue.current_index or 0)) or (new_index > len(queue_items)):
618 return
619 # move the item in the list
620 queue_items.insert(new_index, queue_items.pop(item_index))
621 self.update_items(queue_id, queue_items)
622
623 @api_command("player_queues/move_item_end")
624 def move_item_end(self, queue_id: str, queue_item_id: str) -> None:
625 """
626 Move queue item to the end the queue.
627
628 - queue_id: id of the queue to process this request.
629 - queue_item_id: the item_id of the queueitem that needs to be moved.
630 """
631 queue = self._queues[queue_id]
632 item_index = self.index_by_id(queue_id, queue_item_id)
633 if item_index is None:
634 raise InvalidDataError(f"Item {queue_item_id} not found in queue")
635 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
636 msg = f"{item_index} is already played/buffered"
637 raise IndexError(msg)
638
639 queue_items = self._queue_items[queue_id]
640 if item_index == (len(queue_items) - 1):
641 return
642 queue_items = queue_items.copy()
643
644 new_index = len(self._queue_items[queue_id]) - 1
645
646 # move the item in the list
647 queue_items.insert(new_index, queue_items.pop(item_index))
648 self.update_items(queue_id, queue_items)
649
650 @api_command("player_queues/delete_item")
651 def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
652 """Delete item (by id or index) from the queue."""
653 if isinstance(item_id_or_index, str):
654 item_index = self.index_by_id(queue_id, item_id_or_index)
655 if item_index is None:
656 raise InvalidDataError(f"Item {item_id_or_index} not found in queue")
657 else:
658 item_index = item_id_or_index
659 queue = self._queues[queue_id]
660 if queue.index_in_buffer is not None and item_index <= queue.index_in_buffer:
661 # ignore request if track already loaded in the buffer
662 # the frontend should guard so this is just in case
663 self.logger.warning("delete requested for item already loaded in buffer")
664 return
665 queue_items = self._queue_items[queue_id]
666 queue_items.pop(item_index)
667 self.update_items(queue_id, queue_items)
668
669 @api_command("player_queues/clear")
670 def clear(self, queue_id: str, skip_stop: bool = False) -> None:
671 """Clear all items in the queue."""
672 queue = self._queues[queue_id]
673 queue.radio_source = []
674 if queue.state != PlaybackState.IDLE and not skip_stop:
675 self.mass.create_task(self.stop(queue_id))
676 queue.current_index = None
677 queue.current_item = None
678 queue.elapsed_time = 0
679 queue.elapsed_time_last_updated = time.time()
680 queue.index_in_buffer = None
681 self.update_items(queue_id, [])
682
683 @api_command("player_queues/stop")
684 async def stop(self, queue_id: str) -> None:
685 """
686 Handle STOP command for given queue.
687
688 - queue_id: queue_id of the playerqueue to handle the command.
689 """
690 queue_player = self.mass.players.get(queue_id, True)
691 if queue_player is None:
692 raise PlayerUnavailableError(f"Player {queue_id} is not available")
693 if (queue := self.get(queue_id)) and queue.active:
694 if queue.state == PlaybackState.PLAYING:
695 queue.resume_pos = int(queue.corrected_elapsed_time)
696 # forward the actual command to the player
697 if temp_player := self.mass.players.get(queue_id):
698 await temp_player.stop()
699
700 @api_command("player_queues/play")
701 async def play(self, queue_id: str) -> None:
702 """
703 Handle PLAY command for given queue.
704
705 - queue_id: queue_id of the playerqueue to handle the command.
706 """
707 queue_player = self.mass.players.get(queue_id, True)
708 if queue_player is None:
709 raise PlayerUnavailableError(f"Player {queue_id} is not available")
710 if (
711 (queue := self._queues.get(queue_id))
712 and queue.active
713 and queue.state == PlaybackState.PAUSED
714 ):
715 # forward the actual play/unpause command to the player
716 await queue_player.play()
717 return
718 # player is not paused, perform resume instead
719 await self.resume(queue_id)
720
721 @api_command("player_queues/pause")
722 async def pause(self, queue_id: str) -> None:
723 """Handle PAUSE command for given queue.
724
725 - queue_id: queue_id of the playerqueue to handle the command.
726 """
727 if queue := self._queues.get(queue_id):
728 if queue.state == PlaybackState.PLAYING:
729 queue.resume_pos = int(queue.corrected_elapsed_time)
730 # forward the actual command to the player controller
731 queue_player = self.mass.players.get(queue_id)
732 assert queue_player is not None # for type checking
733 if not (self.mass.players.get_player_provider(queue_id)):
734 return # guard
735
736 if PlayerFeature.PAUSE not in queue_player.supported_features:
737 # if player does not support pause, we need to send stop
738 await queue_player.stop()
739 return
740 await queue_player.pause()
741
742 async def _watch_pause() -> None:
743 count = 0
744 # wait for pause
745 while count < 5 and queue_player.playback_state == PlaybackState.PLAYING:
746 count += 1
747 await asyncio.sleep(1)
748 # wait for unpause
749 if queue_player.playback_state != PlaybackState.PAUSED:
750 return
751 count = 0
752 while count < 30 and queue_player.playback_state == PlaybackState.PAUSED:
753 count += 1
754 await asyncio.sleep(1)
755 # if player is still paused when the limit is reached, send stop
756 if queue_player.playback_state == PlaybackState.PAUSED:
757 await queue_player.stop()
758
759 # we auto stop a player from paused when its paused for 30 seconds
760 if not queue_player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
761 self.mass.create_task(_watch_pause())
762
763 @api_command("player_queues/play_pause")
764 async def play_pause(self, queue_id: str) -> None:
765 """Toggle play/pause on given playerqueue.
766
767 - queue_id: queue_id of the queue to handle the command.
768 """
769 if (queue := self._queues.get(queue_id)) and queue.state == PlaybackState.PLAYING:
770 await self.pause(queue_id)
771 return
772 await self.play(queue_id)
773
774 @api_command("player_queues/next")
775 async def next(self, queue_id: str) -> None:
776 """Handle NEXT TRACK command for given queue.
777
778 - queue_id: queue_id of the queue to handle the command.
779 """
780 if (queue := self.get(queue_id)) is None or not queue.active:
781 # TODO: forward to underlying player if not active
782 return
783 idx = self._queues[queue_id].current_index
784 if idx is None:
785 self.logger.warning("Queue %s has no current index", queue.display_name)
786 return
787 attempts = 5
788 while attempts:
789 try:
790 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
791 await self.play_index(queue_id, next_index, debounce=True)
792 break
793 except MediaNotFoundError:
794 self.logger.warning(
795 "Failed to fetch next track for queue %s - trying next item",
796 queue.display_name,
797 )
798 idx += 1
799 attempts -= 1
800
801 @api_command("player_queues/previous")
802 async def previous(self, queue_id: str) -> None:
803 """Handle PREVIOUS TRACK command for given queue.
804
805 - queue_id: queue_id of the queue to handle the command.
806 """
807 if (queue := self.get(queue_id)) is None or not queue.active:
808 # TODO: forward to underlying player if not active
809 return
810 current_index = self._queues[queue_id].current_index
811 if current_index is None:
812 return
813 next_index = int(current_index)
814 # restart current track if current track has played longer than 4
815 # otherwise skip to previous track
816 if self._queues[queue_id].elapsed_time < 5:
817 next_index = max(current_index - 1, 0)
818 await self.play_index(queue_id, next_index, debounce=True)
819
820 @api_command("player_queues/skip")
821 async def skip(self, queue_id: str, seconds: int = 10) -> None:
822 """Handle SKIP command for given queue.
823
824 - queue_id: queue_id of the queue to handle the command.
825 - seconds: number of seconds to skip in track. Use negative value to skip back.
826 """
827 if (queue := self.get(queue_id)) is None or not queue.active:
828 # TODO: forward to underlying player if not active
829 return
830 await self.seek(queue_id, int(self._queues[queue_id].elapsed_time + seconds))
831
832 @api_command("player_queues/seek")
833 async def seek(self, queue_id: str, position: int = 10) -> None:
834 """Handle SEEK command for given queue.
835
836 - queue_id: queue_id of the queue to handle the command.
837 - position: position in seconds to seek to in the current playing item.
838 """
839 if not (queue := self.get(queue_id)):
840 return
841 queue_player = self.mass.players.get(queue_id, True)
842 if queue_player is None:
843 raise PlayerUnavailableError(f"Player {queue_id} is not available")
844 if not queue.current_item:
845 raise InvalidCommand(f"Queue {queue_player.display_name} has no item(s) loaded.")
846 if not queue.current_item.duration:
847 raise InvalidCommand("Can not seek items without duration.")
848 position = max(0, int(position))
849 if position > queue.current_item.duration:
850 raise InvalidCommand("Can not seek outside of duration range.")
851 if queue.current_index is None:
852 raise InvalidCommand(f"Queue {queue_player.display_name} has no current index.")
853 await self.play_index(queue_id, queue.current_index, seek_position=position)
854
855 @api_command("player_queues/resume")
856 async def resume(self, queue_id: str, fade_in: bool | None = None) -> None:
857 """Handle RESUME command for given queue.
858
859 - queue_id: queue_id of the queue to handle the command.
860 """
861 queue = self._queues[queue_id]
862 queue_items = self._queue_items[queue_id]
863 resume_item = queue.current_item
864 if queue.state == PlaybackState.PLAYING:
865 # resume requested while already playing,
866 # use current position as resume position
867 resume_pos = queue.corrected_elapsed_time
868 fade_in = False
869 else:
870 resume_pos = queue.resume_pos or queue.elapsed_time
871
872 if not resume_item and queue.current_index is not None and len(queue_items) > 0:
873 resume_item = self.get_item(queue_id, queue.current_index)
874 resume_pos = 0
875 elif not resume_item and queue.current_index is None and len(queue_items) > 0:
876 # items available in queue but no previous track, start at 0
877 resume_item = self.get_item(queue_id, 0)
878 resume_pos = 0
879
880 if resume_item is not None:
881 queue_player = self.mass.players.get(queue_id)
882 if queue_player is None:
883 raise PlayerUnavailableError(f"Player {queue_id} is not available")
884 if (
885 fade_in is None
886 and queue_player.playback_state == PlaybackState.IDLE
887 and (time.time() - queue.elapsed_time_last_updated) > 60
888 ):
889 # enable fade in effect if the player is idle for a while
890 fade_in = resume_pos > 0
891 if resume_item.media_type == MediaType.RADIO:
892 # we're not able to skip in online radio so this is pointless
893 resume_pos = 0
894 await self.play_index(
895 queue_id, resume_item.queue_item_id, int(resume_pos), fade_in or False
896 )
897 else:
898 # Queue is empty, try to resume from playlog
899 if await self._try_resume_from_playlog(queue):
900 return
901 msg = f"Resume queue requested but queue {queue.display_name} is empty"
902 raise QueueEmpty(msg)
903
904 @api_command("player_queues/play_index")
905 async def play_index(
906 self,
907 queue_id: str,
908 index: int | str,
909 seek_position: int = 0,
910 fade_in: bool = False,
911 debounce: bool = False,
912 ) -> None:
913 """Play item at index (or item_id) X in queue."""
914 queue = self._queues[queue_id]
915 queue.resume_pos = 0
916 if isinstance(index, str):
917 temp_index = self.index_by_id(queue_id, index)
918 if temp_index is None:
919 raise InvalidDataError(f"Item {index} not found in queue")
920 index = temp_index
921 # At this point index is guaranteed to be int
922 queue.current_index = index
923 # update current item and elapsed time and signal update
924 # this way the UI knows immediately that a new item is loading
925 queue.current_item = self.get_item(queue_id, index)
926 queue.elapsed_time = seek_position
927 queue.elapsed_time_last_updated = time.time()
928 self.signal_update(queue_id)
929 queue.index_in_buffer = index
930 queue.flow_mode_stream_log = []
931 target_player = self.mass.players.get(queue_id)
932 if target_player is None:
933 raise PlayerUnavailableError(f"Player {queue_id} is not available")
934 queue.next_item_id_enqueued = None
935 # always update session id when we start a new playback session
936 queue.session_id = shortuuid.random(length=8)
937 # handle resume point of audiobook(chapter) or podcast(episode)
938 if (
939 not seek_position
940 and (queue_item := self.get_item(queue_id, index))
941 and (resume_position_ms := getattr(queue_item.media_item, "resume_position_ms", 0))
942 ):
943 seek_position = max(0, int((resume_position_ms - 500) / 1000))
944
945 # send play_media request to player
946 # NOTE that we debounce this a bit to account for someone hitting the next button
947 # like a madman. This will prevent the player from being overloaded with requests.
948 async def _play_index(index: int, debounce: bool) -> None:
949 for attempt in range(5):
950 try:
951 queue_item = self.get_item(queue_id, index)
952 if not queue_item:
953 continue # guard
954 await self._load_item(
955 queue_item,
956 self._get_next_index(queue_id, index),
957 is_start=True,
958 seek_position=seek_position if attempt == 0 else 0,
959 fade_in=fade_in if attempt == 0 else False,
960 )
961 # if we reach this point, loading the item succeeded, break the loop
962 queue.current_index = index
963 queue.current_item = queue_item
964 break
965 except (MediaNotFoundError, AudioError):
966 # the requested index can not be played.
967 if queue_item:
968 self.logger.warning(
969 "Skipping unplayable item %s (%s)",
970 queue_item.name,
971 queue_item.uri,
972 )
973 queue_item.available = False
974 next_index = self._get_next_index(queue_id, index, allow_repeat=False)
975 if next_index is None:
976 raise MediaNotFoundError("No next item available")
977 index = next_index
978 else:
979 # all attempts to find a playable item failed
980 raise MediaNotFoundError("No playable item found to start playback")
981
982 # work out if we need to use flow mode
983 flow_mode = target_player.flow_mode and queue_item.media_type not in (
984 # don't use flow mode for duration-less streams
985 MediaType.RADIO,
986 MediaType.PLUGIN_SOURCE,
987 )
988 await asyncio.sleep(0.5 if debounce else 0.1)
989 queue.flow_mode = flow_mode
990 await self.mass.players.play_media(
991 player_id=queue_id,
992 media=await self.player_media_from_queue_item(queue_item, flow_mode),
993 )
994 queue.current_index = index
995 queue.current_item = queue_item
996 await asyncio.sleep(2)
997 self._transitioning_players.discard(queue_id)
998
999 # we set a flag to notify the update logic that we're transitioning to a new track
1000 self._transitioning_players.add(queue_id)
1001
1002 # we debounce the play_index command to handle the case where someone
1003 # is spamming next/previous on the player
1004 task_id = f"play_index_{queue_id}"
1005 if existing_task := self.mass.get_task(task_id):
1006 existing_task.cancel()
1007 with suppress(asyncio.CancelledError):
1008 await existing_task
1009 task = self.mass.create_task(
1010 _play_index,
1011 index,
1012 debounce,
1013 task_id=task_id,
1014 )
1015 await task
1016 self.signal_update(queue_id)
1017
1018 @api_command("player_queues/transfer")
1019 async def transfer_queue(
1020 self,
1021 source_queue_id: str,
1022 target_queue_id: str,
1023 auto_play: bool | None = None,
1024 ) -> None:
1025 """Transfer queue to another queue."""
1026 if not (source_queue := self.get(source_queue_id)):
1027 raise PlayerUnavailableError(f"Queue {source_queue_id} is not available")
1028 if not (target_queue := self.get(target_queue_id)):
1029 raise PlayerUnavailableError(f"Queue {target_queue_id} is not available")
1030 if auto_play is None:
1031 auto_play = source_queue.state == PlaybackState.PLAYING
1032
1033 target_player = self.mass.players.get(target_queue_id)
1034 if target_player is None:
1035 raise PlayerUnavailableError(f"Player {target_queue_id} is not available")
1036 if target_player.active_group or target_player.synced_to:
1037 # edge case: the user wants to move playback from the group as a whole, to a single
1038 # player in the group or it is grouped and the command targeted at the single player.
1039 # We need to dissolve the group first.
1040 group_id = target_player.active_group or target_player.synced_to
1041 assert group_id is not None # checked in if condition above
1042 await self.mass.players.cmd_ungroup(group_id)
1043 await asyncio.sleep(3)
1044
1045 source_items = self._queue_items[source_queue_id]
1046 target_queue.repeat_mode = source_queue.repeat_mode
1047 target_queue.shuffle_enabled = source_queue.shuffle_enabled
1048 target_queue.dont_stop_the_music_enabled = source_queue.dont_stop_the_music_enabled
1049 target_queue.radio_source = source_queue.radio_source
1050 target_queue.enqueued_media_items = source_queue.enqueued_media_items
1051 target_queue.resume_pos = int(source_queue.elapsed_time)
1052 target_queue.current_index = source_queue.current_index
1053 if source_queue.current_item:
1054 target_queue.current_item = source_queue.current_item
1055 target_queue.current_item.queue_id = target_queue_id
1056 self.clear(source_queue_id)
1057
1058 await self.load(target_queue_id, source_items, keep_remaining=False, keep_played=False)
1059 for item in source_items:
1060 item.queue_id = target_queue_id
1061 self.update_items(target_queue_id, source_items)
1062 if auto_play:
1063 await self.resume(target_queue_id)
1064
1065 # Interaction with player
1066
1067 async def on_player_register(self, player: Player) -> None:
1068 """Register PlayerQueue for given player/queue id."""
1069 queue_id = player.player_id
1070 queue: PlayerQueue | None = None
1071 queue_items: list[QueueItem] = []
1072 # try to restore previous state
1073 if prev_state := await self.mass.cache.get(
1074 key=queue_id,
1075 provider=self.domain,
1076 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1077 ):
1078 try:
1079 queue = PlayerQueue.from_dict(prev_state)
1080 prev_items = await self.mass.cache.get(
1081 key=queue_id,
1082 provider=self.domain,
1083 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1084 default=[],
1085 )
1086 queue_items = []
1087 for idx, item_data in enumerate(prev_items):
1088 qi = QueueItem.from_cache(item_data)
1089 if not qi.media_item:
1090 # Skip items with missing media_item - this can happen if
1091 # MA was killed during shutdown while cache was being written
1092 self.logger.debug(
1093 "Skipping queue item %s (index %d) restored from cache "
1094 "without media_item",
1095 qi.name,
1096 idx,
1097 )
1098 continue
1099 queue_items.append(qi)
1100 if queue.enqueued_media_items:
1101 # we need to restore the MediaItem objects for the enqueued media items
1102 # Items from cache may be dicts that need deserialization
1103 restored_enqueued_items: list[MediaItemType] = []
1104 cached_items: list[dict[str, Any] | MediaItemType] = cast(
1105 "list[dict[str, Any] | MediaItemType]", queue.enqueued_media_items
1106 )
1107 for item in cached_items:
1108 if isinstance(item, dict):
1109 restored_item = media_from_dict(item)
1110 restored_enqueued_items.append(cast("MediaItemType", restored_item))
1111 else:
1112 restored_enqueued_items.append(item)
1113 queue.enqueued_media_items = restored_enqueued_items
1114 except Exception as err:
1115 self.logger.warning(
1116 "Failed to restore the queue(items) for %s - %s",
1117 player.display_name,
1118 str(err),
1119 )
1120 # Reset to clean state on failure
1121 queue = None
1122 queue_items = []
1123 if queue is None:
1124 queue = PlayerQueue(
1125 queue_id=queue_id,
1126 active=False,
1127 display_name=player.display_name,
1128 available=player.available,
1129 dont_stop_the_music_enabled=False,
1130 items=0,
1131 )
1132
1133 self._queues[queue_id] = queue
1134 self._queue_items[queue_id] = queue_items
1135 # always call update to calculate state etc
1136 self.on_player_update(player, {})
1137 self.mass.signal_event(EventType.QUEUE_ADDED, object_id=queue_id, data=queue)
1138
1139 def on_player_update(
1140 self,
1141 player: Player,
1142 changed_values: dict[str, tuple[Any, Any]],
1143 ) -> None:
1144 """
1145 Call when a PlayerQueue needs to be updated (e.g. when player updates).
1146
1147 NOTE: This is called every second if the player is playing.
1148 """
1149 queue_id = player.player_id
1150 if (queue := self._queues.get(queue_id)) is None:
1151 # race condition
1152 return
1153 if player.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
1154 # do nothing while the announcement is in progress
1155 return
1156 # determine if this queue is currently active for this player
1157 queue.active = player.active_source in (queue.queue_id, None)
1158 if not queue.active and queue_id not in self._prev_states:
1159 queue.state = PlaybackState.IDLE
1160 # return early if the queue is not active and we have no previous state
1161 return
1162 if queue.queue_id in self._transitioning_players:
1163 # we're currently transitioning to a new track,
1164 # ignore updates from the player during this time
1165 return
1166
1167 # queue is active and preflight checks passed, update the queue details
1168 self._update_queue_from_player(player)
1169
1170 def on_player_remove(self, player_id: str, permanent: bool) -> None:
1171 """Call when a player is removed from the registry."""
1172 if permanent:
1173 # if the player is permanently removed, we also remove the cached queue data
1174 self.mass.create_task(
1175 self.mass.cache.delete(
1176 key=player_id,
1177 provider=self.domain,
1178 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1179 )
1180 )
1181 self.mass.create_task(
1182 self.mass.cache.delete(
1183 key=player_id,
1184 provider=self.domain,
1185 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1186 )
1187 )
1188 self._queues.pop(player_id, None)
1189 self._queue_items.pop(player_id, None)
1190
1191 async def load_next_queue_item(
1192 self,
1193 queue_id: str,
1194 current_item_id: str,
1195 ) -> QueueItem:
1196 """
1197 Call when a player wants the next queue item to play.
1198
1199 Raises QueueEmpty if there are no more tracks left.
1200 """
1201 queue = self.get(queue_id)
1202 if not queue:
1203 msg = f"PlayerQueue {queue_id} is not available"
1204 raise PlayerUnavailableError(msg)
1205 cur_index = self.index_by_id(queue_id, current_item_id)
1206 if cur_index is None:
1207 # this is just a guard for bad data
1208 raise QueueEmpty("Invalid item id for queue given.")
1209 next_item: QueueItem | None = None
1210 idx = 0
1211 while True:
1212 next_index = self._get_next_index(queue_id, cur_index + idx)
1213 if next_index is None:
1214 raise QueueEmpty("No more tracks left in the queue.")
1215 queue_item = self.get_item(queue_id, next_index)
1216 if queue_item is None:
1217 raise QueueEmpty("No more tracks left in the queue.")
1218 if idx >= 10:
1219 # we only allow 10 retries to prevent infinite loops
1220 raise QueueEmpty("No more (playable) tracks left in the queue.")
1221 try:
1222 await self._load_item(queue_item, next_index)
1223 # we're all set, this is our next item
1224 next_item = queue_item
1225 break
1226 except (MediaNotFoundError, AudioError):
1227 # No stream details found, skip this QueueItem
1228 self.logger.warning(
1229 "Skipping unplayable item %s (%s)", queue_item.name, queue_item.uri
1230 )
1231 queue_item.available = False
1232 idx += 1
1233 if idx != 0:
1234 # we skipped some items, signal a queue items update
1235 self.update_items(queue_id, self._queue_items[queue_id])
1236 if next_item is None:
1237 raise QueueEmpty("No more (playable) tracks left in the queue.")
1238
1239 return next_item
1240
1241 async def _load_item(
1242 self,
1243 queue_item: QueueItem,
1244 next_index: int | None,
1245 is_start: bool = False,
1246 seek_position: int = 0,
1247 fade_in: bool = False,
1248 ) -> None:
1249 """Try to load the stream details for the given queue item."""
1250 queue_id = queue_item.queue_id
1251 queue = self._queues[queue_id]
1252
1253 # we use a contextvar to bypass the throttler for this asyncio task/context
1254 # this makes sure that playback has priority over other requests that may be
1255 # happening in the background
1256 BYPASS_THROTTLER.set(True)
1257
1258 self.logger.debug(
1259 "(pre)loading (next) item for queue %s...",
1260 queue.display_name,
1261 )
1262
1263 if not queue_item.available:
1264 raise MediaNotFoundError(f"Item {queue_item.uri} is not available")
1265
1266 # work out if we are playing an album and if we should prefer album
1267 # loudness
1268 next_track_from_same_album = (
1269 next_index is not None
1270 and (next_item := self.get_item(queue_id, next_index))
1271 and (
1272 queue_item.media_item
1273 and hasattr(queue_item.media_item, "album")
1274 and queue_item.media_item.album
1275 and next_item.media_item
1276 and hasattr(next_item.media_item, "album")
1277 and next_item.media_item.album
1278 and queue_item.media_item.album.item_id == next_item.media_item.album.item_id
1279 )
1280 )
1281 current_index = self.index_by_id(queue_id, queue_item.queue_item_id)
1282 if current_index is None:
1283 previous_track_from_same_album = False
1284 else:
1285 previous_index = max(current_index - 1, 0)
1286 previous_track_from_same_album = (
1287 previous_index > 0
1288 and (previous_item := self.get_item(queue_id, previous_index)) is not None
1289 and previous_item.media_item is not None
1290 and hasattr(previous_item.media_item, "album")
1291 and previous_item.media_item.album is not None
1292 and queue_item.media_item is not None
1293 and hasattr(queue_item.media_item, "album")
1294 and queue_item.media_item.album is not None
1295 and queue_item.media_item.album.item_id == previous_item.media_item.album.item_id
1296 )
1297 playing_album_tracks = next_track_from_same_album or previous_track_from_same_album
1298 if queue_item.media_item and isinstance(queue_item.media_item, Track):
1299 album = queue_item.media_item.album
1300 # prefer the full library media item so we have all metadata and provider(quality) info
1301 # always request the full library item as there might be other qualities available
1302 if library_item := await self.mass.music.get_library_item_by_prov_id(
1303 queue_item.media_item.media_type,
1304 queue_item.media_item.item_id,
1305 queue_item.media_item.provider,
1306 ):
1307 queue_item.media_item = cast("Track", library_item)
1308 elif not queue_item.media_item.image or queue_item.media_item.provider.startswith(
1309 "ytmusic"
1310 ):
1311 # Youtube Music has poor thumbs by default, so we always fetch the full item
1312 # this also catches the case where they have an unavailable item in a listing
1313 fetched_item = await self.mass.music.get_item_by_uri(queue_item.uri)
1314 queue_item.media_item = cast("Track", fetched_item)
1315
1316 # ensure we got the full (original) album set
1317 if album and (
1318 library_album := await self.mass.music.get_library_item_by_prov_id(
1319 album.media_type,
1320 album.item_id,
1321 album.provider,
1322 )
1323 ):
1324 queue_item.media_item.album = cast("Album", library_album)
1325 elif album:
1326 # Restore original album if we have no better alternative from the library
1327 queue_item.media_item.album = album
1328 # prefer album image over track image
1329 if queue_item.media_item.album and queue_item.media_item.album.image:
1330 org_images: list[MediaItemImage] = queue_item.media_item.metadata.images or []
1331 queue_item.media_item.metadata.images = UniqueList(
1332 [
1333 queue_item.media_item.album.image,
1334 *org_images,
1335 ]
1336 )
1337 # Fetch the streamdetails, which could raise in case of an unplayable item.
1338 # For example, YT Music returns Radio Items that are not playable.
1339 queue_item.streamdetails = await get_stream_details(
1340 mass=self.mass,
1341 queue_item=queue_item,
1342 seek_position=seek_position,
1343 fade_in=fade_in,
1344 prefer_album_loudness=bool(playing_album_tracks),
1345 )
1346
1347 def track_loaded_in_buffer(self, queue_id: str, item_id: str) -> None:
1348 """Call when a player has (started) loading a track in the buffer."""
1349 queue = self.get(queue_id)
1350 if not queue:
1351 msg = f"PlayerQueue {queue_id} is not available"
1352 raise PlayerUnavailableError(msg)
1353 # store the index of the item that is currently (being) loaded in the buffer
1354 # which helps us a bit to determine how far the player has buffered ahead
1355 queue.index_in_buffer = self.index_by_id(queue_id, item_id)
1356 self.logger.debug("PlayerQueue %s loaded item %s in buffer", queue.display_name, item_id)
1357 self.signal_update(queue_id)
1358 # preload next streamdetails
1359 self._preload_next_item(queue_id, item_id)
1360
1361 # Main queue manipulation methods
1362
1363 async def load(
1364 self,
1365 queue_id: str,
1366 queue_items: list[QueueItem],
1367 insert_at_index: int = 0,
1368 keep_remaining: bool = True,
1369 keep_played: bool = True,
1370 shuffle: bool = False,
1371 ) -> None:
1372 """Load new items at index.
1373
1374 - queue_id: id of the queue to process this request.
1375 - queue_items: a list of QueueItems
1376 - insert_at_index: insert the item(s) at this index
1377 - keep_remaining: keep the remaining items after the insert
1378 - shuffle: (re)shuffle the items after insert index
1379 """
1380 prev_items = self._queue_items[queue_id][:insert_at_index] if keep_played else []
1381 next_items = queue_items
1382
1383 # if keep_remaining, append the old 'next' items
1384 if keep_remaining:
1385 next_items += self._queue_items[queue_id][insert_at_index:]
1386
1387 # we set the original insert order as attribute so we can un-shuffle
1388 for index, item in enumerate(next_items):
1389 item.sort_index += insert_at_index + index
1390 # (re)shuffle the final batch if needed
1391 if shuffle:
1392 next_items = await _smart_shuffle(next_items)
1393 self.update_items(queue_id, prev_items + next_items)
1394
1395 def update_items(self, queue_id: str, queue_items: list[QueueItem]) -> None:
1396 """Update the existing queue items, mostly caused by reordering."""
1397 self._queue_items[queue_id] = queue_items
1398 queue = self._queues[queue_id]
1399 queue.items = len(self._queue_items[queue_id])
1400 # to track if the queue items changed we set a timestamp
1401 # this is a simple way to detect changes in the list of items
1402 # without having to compare the entire list
1403 queue.items_last_updated = time.time()
1404 self.signal_update(queue_id, True)
1405 if (
1406 queue.state == PlaybackState.PLAYING
1407 and queue.index_in_buffer is not None
1408 and queue.index_in_buffer == queue.current_index
1409 ):
1410 # if the queue is playing,
1411 # ensure to (re)queue the next track because it might have changed
1412 # note that we only do this if the player has loaded the current track
1413 # if not, we wait until it has loaded to prevent conflicts
1414 if next_item := self.get_next_item(queue_id, queue.index_in_buffer):
1415 self._enqueue_next_item(queue_id, next_item)
1416
1417 # Helper methods
1418
1419 def get_item(self, queue_id: str, item_id_or_index: int | str | None) -> QueueItem | None:
1420 """Get queue item by index or item_id."""
1421 if item_id_or_index is None:
1422 return None
1423 if (queue_items := self._queue_items.get(queue_id)) is None:
1424 return None
1425 if isinstance(item_id_or_index, int) and len(queue_items) > item_id_or_index:
1426 return queue_items[item_id_or_index]
1427 if isinstance(item_id_or_index, str):
1428 return next((x for x in queue_items if x.queue_item_id == item_id_or_index), None)
1429 return None
1430
1431 def signal_update(self, queue_id: str, items_changed: bool = False) -> None:
1432 """Signal state changed of given queue."""
1433 queue = self._queues[queue_id]
1434 if items_changed:
1435 self.mass.signal_event(EventType.QUEUE_ITEMS_UPDATED, object_id=queue_id, data=queue)
1436 # save items in cache - only cache items with valid media_item
1437 cache_data = [
1438 x.to_cache() for x in self._queue_items[queue_id] if x.media_item is not None
1439 ]
1440 self.mass.create_task(
1441 self.mass.cache.set(
1442 key=queue_id,
1443 data=cache_data,
1444 provider=self.domain,
1445 category=CACHE_CATEGORY_PLAYER_QUEUE_ITEMS,
1446 )
1447 )
1448 # always send the base event
1449 self.mass.signal_event(EventType.QUEUE_UPDATED, object_id=queue_id, data=queue)
1450 # also signal update to the player itself so it can update its current_media
1451 self.mass.players.trigger_player_update(queue_id)
1452 # save state
1453 self.mass.create_task(
1454 self.mass.cache.set(
1455 key=queue_id,
1456 data=queue.to_cache(),
1457 provider=self.domain,
1458 category=CACHE_CATEGORY_PLAYER_QUEUE_STATE,
1459 )
1460 )
1461
1462 def index_by_id(self, queue_id: str, queue_item_id: str) -> int | None:
1463 """Get index by queue_item_id."""
1464 queue_items = self._queue_items[queue_id]
1465 for index, item in enumerate(queue_items):
1466 if item.queue_item_id == queue_item_id:
1467 return index
1468 return None
1469
1470 async def player_media_from_queue_item(
1471 self, queue_item: QueueItem, flow_mode: bool
1472 ) -> PlayerMedia:
1473 """Parse PlayerMedia from QueueItem."""
1474 queue = self._queues[queue_item.queue_id]
1475 if flow_mode:
1476 duration = None
1477 elif queue_item.streamdetails:
1478 # prefer netto duration
1479 # when seeking, the player only receives the remaining duration
1480 duration = queue_item.streamdetails.duration or queue_item.duration
1481 if duration and queue_item.streamdetails.seek_position:
1482 duration = duration - queue_item.streamdetails.seek_position
1483 else:
1484 duration = queue_item.duration
1485 if queue.session_id is None:
1486 # handle error or return early
1487 raise InvalidDataError("Queue session_id is None")
1488 media = PlayerMedia(
1489 uri=await self.mass.streams.resolve_stream_url(
1490 queue.session_id, queue_item, flow_mode=flow_mode
1491 ),
1492 media_type=MediaType.FLOW_STREAM if flow_mode else queue_item.media_type,
1493 title="Music Assistant" if flow_mode else queue_item.name,
1494 image_url=MASS_LOGO_ONLINE,
1495 duration=duration,
1496 source_id=queue_item.queue_id,
1497 queue_item_id=queue_item.queue_item_id,
1498 )
1499 if not flow_mode and queue_item.media_item:
1500 media.title = queue_item.media_item.name
1501 media.artist = getattr(queue_item.media_item, "artist_str", "")
1502 media.album = (
1503 album.name if (album := getattr(queue_item.media_item, "album", None)) else ""
1504 )
1505 if queue_item.image:
1506 # the image format needs to be 500x500 jpeg for maximum compatibility with players
1507 # we prefer the imageproxy on the streamserver here because this request is sent
1508 # to the player itself which may not be able to reach the regular webserver
1509 media.image_url = self.mass.metadata.get_image_url(
1510 queue_item.image, size=500, prefer_stream_server=True
1511 )
1512 return media
1513
1514 async def get_artist_tracks(self, artist: Artist) -> list[Track]:
1515 """Return tracks for given artist, based on user preference."""
1516 artist_items_conf = self.mass.config.get_raw_core_config_value(
1517 self.domain,
1518 CONF_DEFAULT_ENQUEUE_SELECT_ARTIST,
1519 ENQUEUE_SELECT_ARTIST_DEFAULT_VALUE,
1520 )
1521 self.logger.info(
1522 "Fetching tracks to play for artist %s",
1523 artist.name,
1524 )
1525 if artist_items_conf in ("library_tracks", "all_tracks"):
1526 all_items = await self.mass.music.artists.tracks(
1527 artist.item_id,
1528 artist.provider,
1529 in_library_only=artist_items_conf == "library_tracks",
1530 )
1531 random.shuffle(all_items)
1532 return all_items
1533 if artist_items_conf in ("library_album_tracks", "all_album_tracks"):
1534 all_tracks: list[Track] = []
1535 for library_album in await self.mass.music.artists.albums(
1536 artist.item_id,
1537 artist.provider,
1538 in_library_only=artist_items_conf == "library_album_tracks",
1539 ):
1540 for album_track in await self.mass.music.albums.tracks(
1541 library_album.item_id, library_album.provider
1542 ):
1543 if album_track not in all_tracks:
1544 all_tracks.append(album_track)
1545 random.shuffle(all_tracks)
1546 return all_tracks
1547 return []
1548
1549 async def get_album_tracks(self, album: Album, start_item: str | None) -> list[Track]:
1550 """Return tracks for given album, based on user preference."""
1551 album_items_conf = self.mass.config.get_raw_core_config_value(
1552 self.domain,
1553 CONF_DEFAULT_ENQUEUE_SELECT_ALBUM,
1554 ENQUEUE_SELECT_ALBUM_DEFAULT_VALUE,
1555 )
1556 result: list[Track] = []
1557 start_item_found = False
1558 self.logger.info(
1559 "Fetching tracks to play for album %s",
1560 album.name,
1561 )
1562 for album_track in await self.mass.music.albums.tracks(
1563 item_id=album.item_id,
1564 provider_instance_id_or_domain=album.provider,
1565 in_library_only=album_items_conf == "library_tracks",
1566 ):
1567 if not album_track.available:
1568 continue
1569 if start_item in (album_track.item_id, album_track.uri):
1570 start_item_found = True
1571 if start_item is not None and not start_item_found:
1572 continue
1573 result.append(album_track)
1574 return result
1575
1576 async def get_playlist_tracks(self, playlist: Playlist, start_item: str | None) -> list[Track]:
1577 """Return tracks for given playlist, based on user preference."""
1578 result: list[Track] = []
1579 start_item_found = False
1580 self.logger.info(
1581 "Fetching tracks to play for playlist %s",
1582 playlist.name,
1583 )
1584 # TODO: Handle other sort options etc.
1585 async for playlist_track in self.mass.music.playlists.tracks(
1586 playlist.item_id, playlist.provider
1587 ):
1588 if not playlist_track.available:
1589 continue
1590 if start_item in (playlist_track.item_id, playlist_track.uri):
1591 start_item_found = True
1592 if start_item is not None and not start_item_found:
1593 continue
1594 result.append(playlist_track)
1595 return result
1596
1597 async def get_audiobook_resume_point(
1598 self, audio_book: Audiobook, chapter: str | int | None = None, userid: str | None = None
1599 ) -> int:
1600 """Return resume point (in milliseconds) for given audio book."""
1601 self.logger.debug(
1602 "Fetching resume point to play for audio book %s",
1603 audio_book.name,
1604 )
1605 if chapter is not None:
1606 # user explicitly selected a chapter to play
1607 start_chapter = int(chapter) if isinstance(chapter, str) else chapter
1608 if chapters := audio_book.metadata.chapters:
1609 if _chapter := next((x for x in chapters if x.position == start_chapter), None):
1610 return int(_chapter.start * 1000)
1611 raise InvalidDataError(
1612 f"Unable to resolve chapter to play for Audiobook {audio_book.name}"
1613 )
1614 full_played, resume_position_ms = await self.mass.music.get_resume_position(
1615 audio_book, userid=userid
1616 )
1617 return 0 if full_played else resume_position_ms
1618
1619 async def get_next_podcast_episodes(
1620 self,
1621 podcast: Podcast | None,
1622 episode: PodcastEpisode | str | None,
1623 userid: str | None = None,
1624 ) -> UniqueList[PodcastEpisode]:
1625 """Return (next) episode(s) and resume point for given podcast."""
1626 if podcast is None and isinstance(episode, str | NoneType):
1627 raise InvalidDataError("Either podcast or episode must be provided")
1628 if podcast is None:
1629 # single podcast episode requested
1630 assert isinstance(episode, PodcastEpisode) # checked above
1631 self.logger.debug(
1632 "Fetching resume point to play for Podcast episode %s",
1633 episode.name,
1634 )
1635 (
1636 fully_played,
1637 resume_position_ms,
1638 ) = await self.mass.music.get_resume_position(episode, userid=userid)
1639 episode.fully_played = fully_played
1640 episode.resume_position_ms = 0 if fully_played else resume_position_ms
1641 return UniqueList([episode])
1642 # podcast with optional start episode requested
1643 self.logger.debug(
1644 "Fetching episode(s) and resume point to play for Podcast %s",
1645 podcast.name,
1646 )
1647 all_episodes = [
1648 x async for x in self.mass.music.podcasts.episodes(podcast.item_id, podcast.provider)
1649 ]
1650 all_episodes.sort(key=lambda x: x.position)
1651 # if a episode was provided, a user explicitly selected a episode to play
1652 # so we need to find the index of the episode in the list
1653 resolved_episode: PodcastEpisode | None = None
1654 if isinstance(episode, PodcastEpisode):
1655 resolved_episode = next((x for x in all_episodes if x.uri == episode.uri), None)
1656 if resolved_episode:
1657 # ensure we have accurate resume info
1658 (
1659 fully_played,
1660 resume_position_ms,
1661 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1662 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1663 elif isinstance(episode, str):
1664 resolved_episode = next(
1665 (x for x in all_episodes if episode in (x.uri, x.item_id)), None
1666 )
1667 if resolved_episode:
1668 # ensure we have accurate resume info
1669 (
1670 fully_played,
1671 resume_position_ms,
1672 ) = await self.mass.music.get_resume_position(resolved_episode, userid=userid)
1673 resolved_episode.resume_position_ms = 0 if fully_played else resume_position_ms
1674 else:
1675 # get first episode that is not fully played
1676 for ep in all_episodes:
1677 if ep.fully_played:
1678 continue
1679 # ensure we have accurate resume info
1680 (
1681 fully_played,
1682 resume_position_ms,
1683 ) = await self.mass.music.get_resume_position(ep, userid=userid)
1684 if fully_played:
1685 continue
1686 ep.resume_position_ms = resume_position_ms
1687 resolved_episode = ep
1688 break
1689 else:
1690 # no episodes found that are not fully played, so we start at the beginning
1691 resolved_episode = next((x for x in all_episodes), None)
1692 if resolved_episode is None:
1693 raise InvalidDataError(f"Unable to resolve episode to play for Podcast {podcast.name}")
1694 # get the index of the episode
1695 episode_index = all_episodes.index(resolved_episode)
1696 # return the (remaining) episode(s) to play
1697 return UniqueList(all_episodes[episode_index:])
1698
1699 def _get_next_index(
1700 self,
1701 queue_id: str,
1702 cur_index: int | None,
1703 is_skip: bool = False,
1704 allow_repeat: bool = True,
1705 ) -> int | None:
1706 """
1707 Return the next index for the queue, accounting for repeat settings.
1708
1709 Will return None if there are no (more) items in the queue.
1710 """
1711 queue = self._queues[queue_id]
1712 queue_items = self._queue_items[queue_id]
1713 if not queue_items or cur_index is None:
1714 # queue is empty
1715 return None
1716 # handle repeat single track
1717 if queue.repeat_mode == RepeatMode.ONE and not is_skip:
1718 return cur_index if allow_repeat else None
1719 # handle cur_index is last index of the queue
1720 if cur_index >= (len(queue_items) - 1):
1721 if allow_repeat and queue.repeat_mode == RepeatMode.ALL:
1722 # if repeat all is enabled, we simply start again from the beginning
1723 return 0
1724 return None
1725 # all other: just the next index
1726 return cur_index + 1
1727
1728 def get_next_item(self, queue_id: str, cur_index: int | str) -> QueueItem | None:
1729 """Return next QueueItem for given queue."""
1730 index: int
1731 if isinstance(cur_index, str):
1732 resolved_index = self.index_by_id(queue_id, cur_index)
1733 if resolved_index is None:
1734 return None # guard
1735 index = resolved_index
1736 else:
1737 index = cur_index
1738 # At this point index is guaranteed to be int
1739 for skip in range(5):
1740 if (next_index := self._get_next_index(queue_id, index + skip)) is None:
1741 break
1742 next_item = self.get_item(queue_id, next_index)
1743 if next_item is None:
1744 continue
1745 if not next_item.available:
1746 # ensure that we skip unavailable items (set by load_next track logic)
1747 continue
1748 return next_item
1749 return None
1750
1751 async def _fill_radio_tracks(self, queue_id: str) -> None:
1752 """Fill a Queue with (additional) Radio tracks."""
1753 self.logger.debug(
1754 "Filling radio tracks for queue %s",
1755 queue_id,
1756 )
1757 tracks = await self._get_radio_tracks(queue_id=queue_id, is_initial_radio_mode=False)
1758 # fill queue - filter out unavailable items
1759 queue_items = [QueueItem.from_media_item(queue_id, x) for x in tracks if x.available]
1760 await self.load(
1761 queue_id,
1762 queue_items,
1763 insert_at_index=len(self._queue_items[queue_id]) + 1,
1764 )
1765
1766 def _enqueue_next_item(self, queue_id: str, next_item: QueueItem | None) -> None:
1767 """Enqueue the next item on the player."""
1768 if not next_item:
1769 # no next item, nothing to do...
1770 return
1771
1772 queue = self._queues[queue_id]
1773 if queue.flow_mode:
1774 # ignore this for flow mode
1775 return
1776
1777 async def _enqueue_next_item_on_player(next_item: QueueItem) -> None:
1778 await self.mass.players.enqueue_next_media(
1779 player_id=queue_id,
1780 media=await self.player_media_from_queue_item(next_item, False),
1781 )
1782 if queue.next_item_id_enqueued != next_item.queue_item_id:
1783 queue.next_item_id_enqueued = next_item.queue_item_id
1784 self.logger.debug(
1785 "Enqueued next track %s on queue %s",
1786 next_item.name,
1787 self._queues[queue_id].display_name,
1788 )
1789
1790 task_id = f"enqueue_next_item_{queue_id}"
1791 self.mass.call_later(0.5, _enqueue_next_item_on_player, next_item, task_id=task_id)
1792
1793 def _preload_next_item(self, queue_id: str, item_id_in_buffer: str) -> None:
1794 """
1795 Preload the streamdetails for the next item in the queue/buffer.
1796
1797 This basically ensures the item is playable and fetches the stream details.
1798 If an error occurs, the item will be skipped and the next item will be loaded.
1799 """
1800 queue = self._queues[queue_id]
1801
1802 async def _preload_streamdetails(item_id_in_buffer: str) -> None:
1803 try:
1804 # wait for the item that was loaded in the buffer is the actually playing item
1805 # this prevents a race condition when we preload the next item too soon
1806 # while the player is actually preloading the previously enqueued item.
1807 retries = 120
1808 while retries > 0:
1809 if not queue.current_item:
1810 return # guard
1811 if queue.current_item.queue_item_id == item_id_in_buffer:
1812 break
1813 retries -= 1
1814 await asyncio.sleep(1)
1815 if next_item := await self.load_next_queue_item(queue_id, item_id_in_buffer):
1816 self.logger.debug(
1817 "Preloaded next item %s for queue %s",
1818 next_item.name,
1819 queue.display_name,
1820 )
1821 # enqueue the next item on the player
1822 self._enqueue_next_item(queue_id, next_item)
1823
1824 except QueueEmpty:
1825 return
1826
1827 if not (current_item := self.get_item(queue_id, item_id_in_buffer)):
1828 # this should not happen, but guard anyways
1829 return
1830 if current_item.media_type == MediaType.RADIO or not current_item.duration:
1831 # radio items or no duration, nothing to do
1832 return
1833
1834 task_id = f"preload_next_item_{queue_id}"
1835 self.mass.create_task(
1836 _preload_streamdetails,
1837 item_id_in_buffer,
1838 task_id=task_id,
1839 abort_existing=True,
1840 )
1841
1842 async def _resolve_media_items(
1843 self,
1844 media_item: MediaItemType | ItemMapping | BrowseFolder,
1845 start_item: str | None = None,
1846 userid: str | None = None,
1847 queue_id: str | None = None,
1848 ) -> list[MediaItemType]:
1849 """Resolve/unwrap media items to enqueue."""
1850 # resolve Itemmapping to full media item
1851 if isinstance(media_item, ItemMapping):
1852 if media_item.uri is None:
1853 raise InvalidDataError("ItemMapping has no URI")
1854 media_item = await self.mass.music.get_item_by_uri(media_item.uri)
1855 if media_item.media_type == MediaType.PLAYLIST:
1856 media_item = cast("Playlist", media_item)
1857 self.mass.create_task(
1858 self.mass.music.mark_item_played(
1859 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1860 )
1861 )
1862 return list(await self.get_playlist_tracks(media_item, start_item))
1863 if media_item.media_type == MediaType.ARTIST:
1864 media_item = cast("Artist", media_item)
1865 self.mass.create_task(
1866 self.mass.music.mark_item_played(media_item, queue_id=queue_id, user_initiated=True)
1867 )
1868 return list(await self.get_artist_tracks(media_item))
1869 if media_item.media_type == MediaType.ALBUM:
1870 media_item = cast("Album", media_item)
1871 self.mass.create_task(
1872 self.mass.music.mark_item_played(
1873 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1874 )
1875 )
1876 return list(await self.get_album_tracks(media_item, start_item))
1877 if media_item.media_type == MediaType.AUDIOBOOK:
1878 media_item = cast("Audiobook", media_item)
1879 # ensure we grab the correct/latest resume point info
1880 media_item.resume_position_ms = await self.get_audiobook_resume_point(
1881 media_item, start_item, userid=userid
1882 )
1883 return [media_item]
1884 if media_item.media_type == MediaType.PODCAST:
1885 media_item = cast("Podcast", media_item)
1886 self.mass.create_task(
1887 self.mass.music.mark_item_played(
1888 media_item, userid=userid, queue_id=queue_id, user_initiated=True
1889 )
1890 )
1891 return list(await self.get_next_podcast_episodes(media_item, start_item, userid=userid))
1892 if media_item.media_type == MediaType.PODCAST_EPISODE:
1893 media_item = cast("PodcastEpisode", media_item)
1894 return list(await self.get_next_podcast_episodes(None, media_item, userid=userid))
1895 if media_item.media_type == MediaType.FOLDER:
1896 media_item = cast("BrowseFolder", media_item)
1897 return list(await self._get_folder_tracks(media_item))
1898 # all other: single track or radio item
1899 return [cast("MediaItemType", media_item)]
1900
1901 async def _try_resume_from_playlog(self, queue: PlayerQueue) -> bool:
1902 """Try to resume playback from playlog when queue is empty.
1903
1904 Attempts to find user-initiated recently played items in the following order:
1905 1. By userid AND queue_id
1906 2. By queue_id only
1907 3. By userid only (if available)
1908 4. Any recently played item
1909
1910 :param queue: The queue to resume playback on.
1911 :return: True if playback was started, False otherwise.
1912 """
1913 # Try different filter combinations in order of specificity
1914 filter_attempts: list[tuple[str | None, str | None, str]] = []
1915 if queue.userid:
1916 filter_attempts.append((queue.userid, queue.queue_id, "userid + queue_id match"))
1917 filter_attempts.append((None, queue.queue_id, "queue_id match"))
1918 if queue.userid:
1919 filter_attempts.append((queue.userid, None, "userid match"))
1920 filter_attempts.append((None, None, "any recent item"))
1921
1922 for userid, queue_id, match_type in filter_attempts:
1923 items = await self.mass.music.recently_played(
1924 limit=5,
1925 fully_played_only=False,
1926 user_initiated_only=True,
1927 userid=userid,
1928 queue_id=queue_id,
1929 )
1930 for item in items:
1931 if not item.uri:
1932 continue
1933 try:
1934 await self.play_media(queue.queue_id, item)
1935 self.logger.info(
1936 "Resumed queue %s from playlog (%s)", queue.display_name, match_type
1937 )
1938 return True
1939 except MusicAssistantError as err:
1940 self.logger.debug("Failed to resume with item %s: %s", item.name, err)
1941 continue
1942
1943 return False
1944
1945 async def _get_radio_tracks(
1946 self, queue_id: str, is_initial_radio_mode: bool = False
1947 ) -> list[Track]:
1948 """Call the registered music providers for dynamic tracks."""
1949 queue = self._queues[queue_id]
1950 queue_track_items: list[Track] = [
1951 q.media_item
1952 for q in self._queue_items[queue_id]
1953 if q.media_item and isinstance(q.media_item, Track)
1954 ]
1955 if not queue.radio_source:
1956 # this may happen during race conditions as this method is called delayed
1957 return []
1958 self.logger.info(
1959 "Fetching radio tracks for queue %s based on: %s",
1960 queue.display_name,
1961 ", ".join([x.name for x in queue.radio_source]),
1962 )
1963
1964 # Get user's preferred provider instances for steering provider selection
1965 preferred_provider_instances: list[str] | None = None
1966 if (
1967 queue.userid
1968 and (playback_user := await self.mass.webserver.auth.get_user(queue.userid))
1969 and playback_user.provider_filter
1970 ):
1971 preferred_provider_instances = playback_user.provider_filter
1972
1973 available_base_tracks: list[Track] = []
1974 base_track_sample_size = 5
1975 # Some providers have very deterministic similar track algorithms when providing
1976 # a single track item. When we have a radio mode based on 1 track and we have to
1977 # refill the queue (ie not initial radio mode), we use the play history as base tracks
1978 if (
1979 len(queue.radio_source) == 1
1980 and queue.radio_source[0].media_type == MediaType.TRACK
1981 and not is_initial_radio_mode
1982 ):
1983 available_base_tracks = queue_track_items
1984 else:
1985 # Grab all the available base tracks based on the selected source items.
1986 # shuffle the source items, just in case
1987 for radio_item in random.sample(queue.radio_source, len(queue.radio_source)):
1988 ctrl = self.mass.music.get_controller(radio_item.media_type)
1989 try:
1990 available_base_tracks += [
1991 track
1992 for track in await ctrl.radio_mode_base_tracks(
1993 radio_item, # type: ignore[arg-type]
1994 preferred_provider_instances,
1995 )
1996 # Avoid duplicate base tracks
1997 if track not in available_base_tracks
1998 ]
1999 except UnsupportedFeaturedException as err:
2000 self.logger.debug(
2001 "Skip loading radio items for %s: %s ",
2002 radio_item.uri,
2003 str(err),
2004 )
2005 if not available_base_tracks:
2006 raise UnsupportedFeaturedException("Radio mode not available for source items")
2007
2008 # Sample tracks from the base tracks, which will be used to calculate the dynamic ones
2009 base_tracks = random.sample(
2010 available_base_tracks,
2011 min(base_track_sample_size, len(available_base_tracks)),
2012 )
2013 # Use a set to avoid duplicate dynamic tracks
2014 dynamic_tracks: set[Track] = set()
2015 # Use base tracks + Trackcontroller to obtain similar tracks for every base Track
2016 for allow_lookup in (False, True):
2017 if dynamic_tracks:
2018 break
2019 for base_track in base_tracks:
2020 try:
2021 _similar_tracks = await self.mass.music.tracks.similar_tracks(
2022 base_track.item_id,
2023 base_track.provider,
2024 allow_lookup=allow_lookup,
2025 preferred_provider_instances=preferred_provider_instances,
2026 )
2027 except MediaNotFoundError:
2028 # Some providers don't have similar tracks for all items. For example,
2029 # Tidal can sometimes return a 404 when the 'similar_tracks' endpoint is called.
2030 # in that case, just skip the track.
2031 self.logger.debug("Similar tracks not found for track %s", base_track.name)
2032 continue
2033 for track in _similar_tracks:
2034 if (
2035 track not in base_tracks
2036 # Exclude tracks we have already played / queued
2037 and track not in queue_track_items
2038 # Ignore tracks that are too long for radio mode, e.g. mixes
2039 and track.duration <= RADIO_TRACK_MAX_DURATION_SECS
2040 ):
2041 dynamic_tracks.add(track)
2042 if len(dynamic_tracks) >= 50:
2043 break
2044 queue_tracks: list[Track] = []
2045 dynamic_tracks_list = list(dynamic_tracks)
2046 # Only include the sampled base tracks when the radio mode is first initialized
2047 if is_initial_radio_mode:
2048 queue_tracks += [base_tracks[0]]
2049 # Exhaust base tracks with the pattern of BDDBDDBDD (1 base track + 2 dynamic tracks)
2050 if len(base_tracks) > 1:
2051 for base_track in base_tracks[1:]:
2052 queue_tracks += [base_track]
2053 if len(dynamic_tracks_list) > 2:
2054 queue_tracks += random.sample(dynamic_tracks_list, 2)
2055 else:
2056 queue_tracks += dynamic_tracks_list
2057 # Add dynamic tracks to the queue, make sure to exclude already picked tracks
2058 remaining_dynamic_tracks = [t for t in dynamic_tracks_list if t not in queue_tracks]
2059 if remaining_dynamic_tracks:
2060 queue_tracks += random.sample(
2061 remaining_dynamic_tracks, min(len(remaining_dynamic_tracks), 25)
2062 )
2063 return queue_tracks
2064
2065 async def _get_folder_tracks(self, folder: BrowseFolder) -> list[Track]:
2066 """Fetch (playable) tracks for given browse folder."""
2067 self.logger.info(
2068 "Fetching tracks to play for folder %s",
2069 folder.name,
2070 )
2071 tracks: list[Track] = []
2072 for item in await self.mass.music.browse(folder.path):
2073 if not item.is_playable:
2074 continue
2075 # recursively fetch tracks from all media types
2076 resolved = await self._resolve_media_items(item)
2077 tracks += [x for x in resolved if isinstance(x, Track)]
2078
2079 return tracks
2080
2081 def _update_queue_from_player(
2082 self,
2083 player: Player,
2084 ) -> None:
2085 """Update the Queue when the player state changed."""
2086 queue_id = player.player_id
2087 queue = self._queues[queue_id]
2088
2089 # basic properties
2090 queue.display_name = player.display_name
2091 queue.available = player.available
2092 queue.items = len(self._queue_items[queue_id])
2093
2094 queue.state = (
2095 player.playback_state or PlaybackState.IDLE if queue.active else PlaybackState.IDLE
2096 )
2097 # update current item/index from player report
2098 if queue.active and queue.state in (
2099 PlaybackState.PLAYING,
2100 PlaybackState.PAUSED,
2101 ):
2102 # NOTE: If the queue is not playing (yet) we will not update the current index
2103 # to ensure we keep the previously known current index
2104 if queue.flow_mode:
2105 # flow mode active, the player is playing one long stream
2106 # so we need to calculate the current index and elapsed time
2107 current_index, elapsed_time = self._get_flow_queue_stream_index(queue, player)
2108 elif item_id := self._parse_player_current_item_id(queue_id, player):
2109 # normal mode, the player itself will report the current item
2110 elapsed_time = int(player.corrected_elapsed_time or 0)
2111 current_index = self.index_by_id(queue_id, item_id)
2112 else:
2113 # this may happen if the player is still transitioning between tracks
2114 # we ignore this for now and keep the current index as is
2115 return
2116
2117 # get current/next item based on current index
2118 queue.current_index = current_index
2119 queue.current_item = current_item = self.get_item(queue_id, current_index)
2120 queue.next_item = (
2121 self.get_next_item(queue_id, current_index)
2122 if current_item and current_index is not None
2123 else None
2124 )
2125
2126 # correct elapsed time when seeking
2127 if (
2128 not queue.flow_mode
2129 and current_item
2130 and current_item.streamdetails
2131 and current_item.streamdetails.seek_position
2132 ):
2133 elapsed_time += current_item.streamdetails.seek_position
2134 queue.elapsed_time = elapsed_time
2135 queue.elapsed_time_last_updated = time.time()
2136
2137 elif not queue.current_item and queue.current_index is not None:
2138 current_index = queue.current_index
2139 queue.current_item = current_item = self.get_item(queue_id, current_index)
2140 queue.next_item = (
2141 self.get_next_item(queue_id, current_index)
2142 if current_item and current_index is not None
2143 else None
2144 )
2145
2146 # This is enough to detect any changes in the DSPDetails
2147 # (so child count changed, or any output format changed)
2148 output_formats = []
2149 if output_format := player.extra_data.get("output_format"):
2150 output_formats.append(str(output_format))
2151 for child_id in player.group_members:
2152 if (child := self.mass.players.get(child_id)) and (
2153 output_format := child.extra_data.get("output_format")
2154 ):
2155 output_formats.append(str(output_format))
2156 else:
2157 output_formats.append("unknown")
2158
2159 # basic throttle: do not send state changed events if queue did not actually change
2160 prev_state: CompareState = self._prev_states.get(
2161 queue_id,
2162 CompareState(
2163 queue_id=queue_id,
2164 state=PlaybackState.IDLE,
2165 current_item_id=None,
2166 next_item_id=None,
2167 current_item=None,
2168 elapsed_time=0,
2169 last_playing_elapsed_time=0,
2170 stream_title=None,
2171 codec_type=None,
2172 output_formats=None,
2173 ),
2174 )
2175 # update last_playing_elapsed_time only when the player is actively playing
2176 # use corrected_elapsed_time which accounts for time since last update
2177 # this preserves the last known elapsed time when transitioning to idle/paused
2178 prev_playing_elapsed = prev_state["last_playing_elapsed_time"]
2179 prev_item_id = prev_state["current_item_id"]
2180 current_item_id = queue.current_item.queue_item_id if queue.current_item else None
2181 if queue.state == PlaybackState.PLAYING:
2182 current_elapsed = int(queue.corrected_elapsed_time)
2183 if current_item_id != prev_item_id:
2184 # new track started, reset the elapsed time tracker
2185 last_playing_elapsed_time = current_elapsed
2186 else:
2187 # same track, use the max of current and previous to handle timing issues
2188 last_playing_elapsed_time = max(current_elapsed, prev_playing_elapsed)
2189 else:
2190 last_playing_elapsed_time = prev_playing_elapsed
2191 new_state = CompareState(
2192 queue_id=queue_id,
2193 state=queue.state,
2194 current_item_id=queue.current_item.queue_item_id if queue.current_item else None,
2195 next_item_id=queue.next_item.queue_item_id if queue.next_item else None,
2196 current_item=queue.current_item,
2197 elapsed_time=int(queue.elapsed_time),
2198 last_playing_elapsed_time=last_playing_elapsed_time,
2199 stream_title=(
2200 queue.current_item.streamdetails.stream_title
2201 if queue.current_item and queue.current_item.streamdetails
2202 else None
2203 ),
2204 codec_type=(
2205 queue.current_item.streamdetails.audio_format.codec_type
2206 if queue.current_item and queue.current_item.streamdetails
2207 else None
2208 ),
2209 output_formats=output_formats,
2210 )
2211 changed_keys = get_changed_keys(dict(prev_state), dict(new_state))
2212 with suppress(KeyError):
2213 changed_keys.remove("next_item_id")
2214 with suppress(KeyError):
2215 changed_keys.remove("last_playing_elapsed_time")
2216
2217 # store the new state
2218 if queue.active:
2219 self._prev_states[queue_id] = new_state
2220 else:
2221 self._prev_states.pop(queue_id, None)
2222
2223 # return early if nothing changed
2224 if len(changed_keys) == 0:
2225 return
2226
2227 # signal update and store state
2228 send_update = True
2229 if changed_keys == {"elapsed_time"}:
2230 # only elapsed time changed, do not send full queue update
2231 send_update = False
2232 prev_time = prev_state.get("elapsed_time") or 0
2233 cur_time = new_state.get("elapsed_time") or 0
2234 if abs(cur_time - prev_time) > 2:
2235 # send dedicated event for time updates when seeking
2236 self.mass.signal_event(
2237 EventType.QUEUE_TIME_UPDATED,
2238 object_id=queue_id,
2239 data=queue.elapsed_time,
2240 )
2241 # also signal update to the player itself so it can update its current_media
2242 self.mass.players.trigger_player_update(queue_id)
2243
2244 if send_update:
2245 self.signal_update(queue_id)
2246
2247 if "output_formats" in changed_keys:
2248 # refresh DSP details since they may have changed
2249 dsp = get_stream_dsp_details(self.mass, queue_id)
2250 if queue.current_item and queue.current_item.streamdetails:
2251 queue.current_item.streamdetails.dsp = dsp
2252 if queue.next_item and queue.next_item.streamdetails:
2253 queue.next_item.streamdetails.dsp = dsp
2254
2255 # handle updating stream_metadata if needed
2256 if (
2257 queue.current_item
2258 and (streamdetails := queue.current_item.streamdetails)
2259 and streamdetails.stream_metadata_update_callback
2260 and (
2261 streamdetails.stream_metadata_last_updated is None
2262 or (
2263 time.time() - streamdetails.stream_metadata_last_updated
2264 >= streamdetails.stream_metadata_update_interval
2265 )
2266 )
2267 ):
2268 streamdetails.stream_metadata_last_updated = time.time()
2269 self.mass.create_task(
2270 streamdetails.stream_metadata_update_callback(
2271 streamdetails, int(queue.corrected_elapsed_time)
2272 )
2273 )
2274
2275 # handle sending a playback progress report
2276 # we do this every 30 seconds or when the state changes
2277 if (
2278 changed_keys.intersection({"state", "current_item_id"})
2279 or int(queue.elapsed_time) % 30 == 0
2280 ):
2281 self._handle_playback_progress_report(queue, prev_state, new_state)
2282
2283 # check if we need to clear the queue if we reached the end
2284 if "state" in changed_keys and queue.state == PlaybackState.IDLE:
2285 self._handle_end_of_queue(queue, prev_state, new_state)
2286
2287 # watch dynamic radio items refill if needed
2288 if "current_item_id" in changed_keys:
2289 # auto enable radio mode if dont stop the music is enabled
2290 if (
2291 queue.dont_stop_the_music_enabled
2292 and queue.enqueued_media_items
2293 and queue.current_index is not None
2294 and (queue.items - queue.current_index) <= 1
2295 ):
2296 # We have received the last item in the queue and Don't stop the music is enabled
2297 # set the played media item(s) as radio items (which will refill the queue)
2298 # note that this will fail if there are no media items for which we have
2299 # a dynamic radio source.
2300 self.logger.debug(
2301 "End of queue detected and Don't stop the music is enabled for %s"
2302 " - setting enqueued media items as radio source: %s",
2303 queue.display_name,
2304 ", ".join([x.uri for x in queue.enqueued_media_items]), # type: ignore[misc] # uri set in __post_init__
2305 )
2306 queue.radio_source = queue.enqueued_media_items
2307 # auto fill radio tracks if less than 5 tracks left in the queue
2308 if (
2309 queue.radio_source
2310 and queue.current_index is not None
2311 and (queue.items - queue.current_index) < 5
2312 ):
2313 task_id = f"fill_radio_tracks_{queue_id}"
2314 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
2315
2316 def _get_flow_queue_stream_index(
2317 self, queue: PlayerQueue, player: Player
2318 ) -> tuple[int | None, int]:
2319 """Calculate current queue index and current track elapsed time when flow mode is active."""
2320 elapsed_time_queue_total = player.corrected_elapsed_time or 0
2321 if queue.current_index is None and not queue.flow_mode_stream_log:
2322 return queue.current_index, int(queue.elapsed_time)
2323
2324 # For each track that has been streamed/buffered to the player,
2325 # a playlog entry will be created with the queue item id
2326 # and the amount of seconds streamed. We traverse the playlog to figure
2327 # out where we are in the queue, accounting for actual streamed
2328 # seconds (and not duration) and skipped seconds. If a track has been repeated,
2329 # it will simply be in the playlog multiple times.
2330 played_time = 0.0
2331 queue_index: int | None = queue.current_index or 0
2332 track_time = 0.0
2333 for play_log_entry in queue.flow_mode_stream_log:
2334 queue_item_duration = (
2335 # NOTE: 'seconds_streamed' can actually be 0 if there was a stream error!
2336 play_log_entry.seconds_streamed
2337 if play_log_entry.seconds_streamed is not None
2338 else play_log_entry.duration or 3600 * 24 * 7
2339 )
2340 if elapsed_time_queue_total > (queue_item_duration + played_time):
2341 # total elapsed time is more than (streamed) track duration
2342 # this track has been fully played, move on.
2343 played_time += queue_item_duration
2344 else:
2345 # no more seconds left to divide, this is our track
2346 # account for any seeking by adding the skipped/seeked seconds
2347 queue_index = self.index_by_id(queue.queue_id, play_log_entry.queue_item_id)
2348 queue_item = self.get_item(queue.queue_id, queue_index)
2349 if queue_item and queue_item.streamdetails:
2350 track_sec_skipped = queue_item.streamdetails.seek_position
2351 else:
2352 track_sec_skipped = 0
2353 track_time = elapsed_time_queue_total + track_sec_skipped - played_time
2354 break
2355 if player.playback_state != PlaybackState.PLAYING:
2356 # if the player is not playing, we can't be sure that the elapsed time is correct
2357 # so we just return the queue index and the elapsed time
2358 return queue.current_index, int(queue.elapsed_time)
2359 return queue_index, int(track_time)
2360
2361 def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
2362 """Parse QueueItem ID from Player's current url."""
2363 if not player._current_media:
2364 # YES, we use player._current_media on purpose here because we need the raw metadata
2365 return None
2366 # prefer queue_id and queue_item_id within the current media
2367 if player._current_media.source_id == queue_id and player._current_media.queue_item_id:
2368 return player._current_media.queue_item_id
2369 # special case for sonos players
2370 if player._current_media.uri and player._current_media.uri.startswith(f"mass:{queue_id}"):
2371 if player._current_media.queue_item_id:
2372 return player._current_media.queue_item_id
2373 return player._current_media.uri.split(":")[-1]
2374 # try to extract the item id from a mass stream url
2375 if (
2376 player._current_media.uri
2377 and queue_id in player._current_media.uri
2378 and self.mass.streams.base_url in player._current_media.uri
2379 ):
2380 current_item_id = player._current_media.uri.rsplit("/")[-1].split(".")[0]
2381 if self.get_item(queue_id, current_item_id):
2382 return current_item_id
2383 # try to extract the item id from a queue_id/item_id combi
2384 if (
2385 player._current_media.uri
2386 and queue_id in player._current_media.uri
2387 and "/" in player._current_media.uri
2388 ):
2389 current_item_id = player._current_media.uri.split("/")[1]
2390 if self.get_item(queue_id, current_item_id):
2391 return current_item_id
2392
2393 return None
2394
2395 def _handle_end_of_queue(
2396 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2397 ) -> None:
2398 """Check if the queue should be cleared after the current item."""
2399 # check if queue state changed to stopped (from playing/paused to idle)
2400 if not (
2401 prev_state["state"] in (PlaybackState.PLAYING, PlaybackState.PAUSED)
2402 and new_state["state"] == PlaybackState.IDLE
2403 ):
2404 return
2405 # check if no more items in the queue (next_item should be None at end of queue)
2406 if queue.next_item is not None:
2407 return
2408 # check if we had a previous item playing
2409 if prev_state["current_item_id"] is None:
2410 return
2411
2412 async def _clear_queue_delayed() -> None:
2413 for _ in range(5):
2414 await asyncio.sleep(1)
2415 if queue.state != PlaybackState.IDLE:
2416 return
2417 if queue.next_item is not None:
2418 return
2419 self.logger.info("End of queue reached, clearing items")
2420 self.clear(queue.queue_id)
2421
2422 # all checks passed, we stopped playback at the last (or single) track of the queue
2423 # now determine if the item was fully played before clearing
2424
2425 # For flow mode, check if the last track was fully streamed using the stream log
2426 # This is more reliable than elapsed_time which can be reset/incorrect
2427 if queue.flow_mode and queue.flow_mode_stream_log:
2428 last_log_entry = queue.flow_mode_stream_log[-1]
2429 if last_log_entry.seconds_streamed is not None:
2430 # The last track finished streaming, safe to clear queue
2431 self.mass.create_task(_clear_queue_delayed())
2432 return
2433
2434 # For non-flow mode, use prev_state values since queue state may have been updated/reset
2435 prev_item = prev_state["current_item"]
2436 if prev_item and (streamdetails := prev_item.streamdetails):
2437 duration = streamdetails.duration or prev_item.duration or 24 * 3600
2438 elif prev_item:
2439 duration = prev_item.duration or 24 * 3600
2440 else:
2441 # No current item means player has already cleared it, safe to clear queue
2442 self.mass.create_task(_clear_queue_delayed())
2443 return
2444
2445 # use last_playing_elapsed_time which preserves the elapsed time from when the player
2446 # was still playing (before transitioning to idle where elapsed_time may be reset to 0)
2447 seconds_played = int(prev_state["last_playing_elapsed_time"])
2448 # debounce this a bit to make sure we're not clearing the queue by accident
2449 # only clear if the last track was played to near completion (within 5 seconds of end)
2450 if seconds_played >= (duration or 3600) - 5:
2451 self.mass.create_task(_clear_queue_delayed())
2452
2453 def _handle_playback_progress_report(
2454 self, queue: PlayerQueue, prev_state: CompareState, new_state: CompareState
2455 ) -> None:
2456 """Handle playback progress report."""
2457 # detect change in current index to report that a item has been played
2458 prev_item_id = prev_state["current_item_id"]
2459 cur_item_id = new_state["current_item_id"]
2460 if prev_item_id is None and cur_item_id is None:
2461 return
2462
2463 if prev_item_id is not None and prev_item_id != cur_item_id:
2464 # we have a new item, so we need report the previous one
2465 is_current_item = False
2466 item_to_report = prev_state["current_item"]
2467 seconds_played = int(prev_state["elapsed_time"])
2468 else:
2469 # report on current item
2470 is_current_item = True
2471 item_to_report = self.get_item(queue.queue_id, cur_item_id) or new_state["current_item"]
2472 seconds_played = int(new_state["elapsed_time"])
2473
2474 if not item_to_report:
2475 return # guard against invalid items
2476
2477 if not (media_item := item_to_report.media_item):
2478 # only report on media items
2479 return
2480 assert media_item.uri is not None # uri is set in __post_init__
2481
2482 if item_to_report.streamdetails and item_to_report.streamdetails.stream_error:
2483 # Ignore items that had a stream error
2484 return
2485
2486 if item_to_report.streamdetails and item_to_report.streamdetails.duration:
2487 duration = int(item_to_report.streamdetails.duration)
2488 else:
2489 duration = int(item_to_report.duration or 3 * 3600)
2490
2491 if seconds_played < 5:
2492 # ignore items that have been played less than 5 seconds
2493 # this also filters out a bounce effect where the previous item
2494 # gets reported with 0 elapsed seconds after a new item starts playing
2495 return
2496
2497 # determine if item is fully played
2498 # for podcasts and audiobooks we account for the last 60 seconds
2499 percentage_played = percentage(seconds_played, duration)
2500 if not is_current_item and item_to_report.media_type in (
2501 MediaType.AUDIOBOOK,
2502 MediaType.PODCAST_EPISODE,
2503 ):
2504 fully_played = seconds_played >= duration - 60
2505 elif not is_current_item:
2506 # 90% of the track must be played to be considered fully played
2507 fully_played = percentage_played >= 90
2508 else:
2509 fully_played = seconds_played >= duration - 10
2510
2511 is_playing = is_current_item and queue.state == PlaybackState.PLAYING
2512 if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
2513 self.logger.debug(
2514 "%s %s '%s' (%s) - Fully played: %s - Progress: %s (%s/%ss)",
2515 queue.display_name,
2516 "is playing" if is_playing else "played",
2517 item_to_report.name,
2518 item_to_report.uri,
2519 fully_played,
2520 f"{percentage_played}%",
2521 seconds_played,
2522 duration,
2523 )
2524 # add entry to playlog - this also handles resume of podcasts/audiobooks
2525 self.mass.create_task(
2526 self.mass.music.mark_item_played(
2527 media_item,
2528 fully_played=fully_played,
2529 seconds_played=seconds_played,
2530 is_playing=is_playing,
2531 userid=queue.userid,
2532 queue_id=queue.queue_id,
2533 user_initiated=False,
2534 )
2535 )
2536
2537 album: Album | ItemMapping | None = getattr(media_item, "album", None)
2538 # signal 'media item played' event,
2539 # which is useful for plugins that want to do scrobbling
2540 artists: list[Artist | ItemMapping] = getattr(media_item, "artists", [])
2541 artists_names = [a.name for a in artists]
2542 self.mass.signal_event(
2543 EventType.MEDIA_ITEM_PLAYED,
2544 object_id=media_item.uri,
2545 data=MediaItemPlaybackProgressReport(
2546 uri=media_item.uri,
2547 media_type=media_item.media_type,
2548 name=media_item.name,
2549 version=getattr(media_item, "version", None),
2550 artist=(
2551 getattr(media_item, "artist_str", None) or artists_names[0]
2552 if artists_names
2553 else None
2554 ),
2555 artists=artists_names,
2556 artist_mbids=[a.mbid for a in artists if a.mbid] if artists else None,
2557 album=album.name if album else None,
2558 album_mbid=album.mbid if album else None,
2559 album_artist=(album.artist_str if isinstance(album, Album) else None),
2560 album_artist_mbids=(
2561 [a.mbid for a in album.artists if a.mbid] if isinstance(album, Album) else None
2562 ),
2563 image_url=(
2564 self.mass.metadata.get_image_url(
2565 item_to_report.media_item.image, prefer_proxy=False
2566 )
2567 if item_to_report.media_item.image
2568 else None
2569 ),
2570 duration=duration,
2571 mbid=(getattr(media_item, "mbid", None)),
2572 seconds_played=seconds_played,
2573 fully_played=fully_played,
2574 is_playing=is_playing,
2575 userid=queue.userid,
2576 ),
2577 )
2578
2579
2580async def _smart_shuffle(items: list[QueueItem]) -> list[QueueItem]:
2581 """Shuffle queue items, avoiding identical tracks next to each other.
2582
2583 Best-effort approach to prevent the same track from appearing adjacent.
2584 Does a random shuffle first, then makes a limited number of passes to
2585 swap adjacent duplicates with a random item further in the list.
2586
2587 :param items: List of queue items to shuffle.
2588 """
2589 if len(items) <= 2:
2590 return random.sample(items, len(items)) if len(items) == 2 else items
2591
2592 # Start with a random shuffle
2593 shuffled = random.sample(items, len(items))
2594
2595 # Make a few passes to fix adjacent duplicates
2596 max_passes = 3
2597 for _ in range(max_passes):
2598 swapped = False
2599 for i in range(len(shuffled) - 1):
2600 if shuffled[i].name == shuffled[i + 1].name:
2601 # Found adjacent duplicate - swap with random position at least 2 away
2602 swap_candidates = [j for j in range(len(shuffled)) if abs(j - i - 1) >= 2]
2603 if swap_candidates:
2604 swap_pos = random.choice(swap_candidates)
2605 shuffled[i + 1], shuffled[swap_pos] = shuffled[swap_pos], shuffled[i + 1]
2606 swapped = True
2607 if not swapped:
2608 break
2609 # Yield to event loop between passes
2610 await asyncio.sleep(0)
2611
2612 return shuffled
2613