/
/
/
1"""Per-player Plex remote control instances."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import platform
8import re
9import time
10import uuid
11from collections.abc import Callable
12from typing import TYPE_CHECKING, Any
13from urllib.parse import urlparse
14
15from aiohttp import ClientTimeout, web
16from music_assistant_models.enums import (
17 EventType,
18 PlayerFeature,
19 PlayerType,
20 QueueOption,
21 RepeatMode,
22)
23from plexapi.playqueue import PlayQueue
24
25from .gdm import PlexGDMAdvertiser
26
27if TYPE_CHECKING:
28 from music_assistant_models.event import MassEvent
29
30 from music_assistant.providers.plex import PlexProvider
31
32
33LOGGER = logging.getLogger(__name__)
34
35
36class PlayerRemoteInstance:
37 """Single remote control instance for one MA player."""
38
39 def __init__(
40 self,
41 plex_provider: PlexProvider,
42 ma_player_id: str,
43 player_name: str,
44 port: int,
45 device_class: str = "speaker",
46 remote_control: bool = False,
47 ) -> None:
48 """Initialize player remote instance.
49
50 :param plex_provider: Plex provider instance.
51 :param ma_player_id: Music Assistant player ID.
52 :param player_name: Display name for the player.
53 :param port: Port for the remote control server.
54 :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud).
55 :param remote_control: Whether to enable remote control.
56 """
57 self.plex_provider = plex_provider
58 self.plex_server = plex_provider._plex_server
59 self.ma_player_id = ma_player_id
60 self.player_name = player_name
61 self.port = port
62 self.device_class = device_class
63 self.remote_control = remote_control
64
65 self.client_id = str(
66 uuid.uuid5(
67 uuid.NAMESPACE_DNS,
68 f"music-assistant-plex-{plex_provider.instance_id}-{ma_player_id}",
69 )
70 )
71
72 if self.remote_control:
73 # Remote control server
74 self.server: PlexRemoteControlServer | None = None
75 # GDM advertiser
76 self.gdm: PlexGDMAdvertiser | None = None
77
78 async def start(self) -> None:
79 """Start this player's remote control."""
80 if self.remote_control:
81 # Create player-specific PlexServer instance with unique client identification
82 LOGGER.info(
83 f"Created PlexServer for '{self.player_name}' with client ID: {self.client_id}"
84 )
85
86 self.server = PlexRemoteControlServer(
87 plex_provider=self.plex_provider,
88 port=self.port,
89 client_id=self.client_id,
90 ma_player_id=self.ma_player_id,
91 device_class=self.device_class,
92 )
93 LOGGER.info(
94 f"Remote control server for '{self.player_name}' bound to MA player: "
95 f"{self.ma_player_id}"
96 )
97
98 await self.server.start()
99
100 # Step 4: Start GDM broadcasting
101 self.gdm = PlexGDMAdvertiser(
102 instance_id=self.client_id,
103 port=self.port,
104 publish_ip=str(self.plex_provider.mass.streams.publish_ip),
105 name=self.player_name,
106 product="Music Assistant",
107 version=self.plex_provider.mass.version
108 if self.plex_provider.mass.version != "0.0.0"
109 else "1.0.0",
110 )
111 self.gdm.start()
112
113 LOGGER.info(f"Player '{self.player_name}' is now discoverable on port {self.port}")
114
115 async def stop(self) -> None:
116 """Stop this player's remote control."""
117 if self.remote_control:
118 if self.gdm:
119 await self.gdm.stop()
120
121 if self.server:
122 await self.server.stop()
123
124 LOGGER.info(f"Stopped remote control for player '{self.player_name}'")
125
126
127class PlexRemoteControlServer:
128 """HTTP server to receive Plex remote control commands."""
129
130 def __init__(
131 self,
132 plex_provider: PlexProvider,
133 port: int = 32500,
134 client_id: str | None = None,
135 ma_player_id: str | None = None,
136 device_class: str = "speaker",
137 ) -> None:
138 """Initialize remote control server.
139
140 :param plex_provider: Plex provider instance.
141 :param port: Port for the HTTP server.
142 :param client_id: Unique client identifier.
143 :param ma_player_id: Music Assistant player ID.
144 :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud).
145 """
146 self.provider = plex_provider
147 self.plex_server = plex_provider._plex_server
148 self.port = port
149 self.client_id = client_id or plex_provider.instance_id
150 self.device_class = device_class
151 self.app = web.Application()
152 self.subscriptions: dict[str, dict[str, object]] = {}
153 self.runner: web.AppRunner | None = None
154 self.http_site: web.TCPSite | None = None
155
156 # Play queue tracking (Plex-specific state that doesn't exist in MA)
157 self.play_queue_id: str | None = None
158 self.play_queue_version: int = 1
159 # Map queue index to item ID
160 self.play_queue_item_ids: dict[int, int] = {}
161
162 # Track MA queue state to detect when we need to sync to Plex
163 self._last_synced_ma_queue_length: int = 0
164 self._last_synced_ma_queue_keys: list[str] = []
165
166 # Specific MA player this server controls (set by PlayerRemoteInstance)
167 self._ma_player_id = ma_player_id
168
169 # Store unsubscribe callbacks
170 self._unsub_callbacks: list[Callable[..., None]] = []
171
172 # Flag to prevent circular updates when we modify the queue ourselves
173 self._updating_from_plex = False
174
175 self.player = self.provider.mass.players.get_player(self._ma_player_id) # type: ignore[arg-type]
176
177 self.device_name = f"{self.player.display_name}" if self.player else "Music Assistant"
178
179 self.headers = {
180 "X-Plex-Device-Name": self.device_name,
181 "X-Plex-Session-Identifier": self.client_id,
182 "X-Plex-Client-Identifier": self.client_id,
183 "X-Plex-Product": "Music Assistant",
184 "X-Plex-Platform": "Music Assistant",
185 "X-Plex-Platform-Version": platform.release(),
186 }
187
188 self._setup_routes()
189
190 def _setup_routes(self) -> None:
191 """Set up all required endpoints."""
192 # Root endpoint
193 self.app.router.add_get("/", self.handle_root)
194
195 # Subscription management
196 self.app.router.add_get("/player/timeline/subscribe", self.handle_subscribe)
197 self.app.router.add_get("/player/timeline/unsubscribe", self.handle_unsubscribe)
198 self.app.router.add_get("/player/timeline/poll", self.handle_poll)
199
200 # Playback commands
201 self.app.router.add_get("/player/playback/playMedia", self.handle_play_media)
202 self.app.router.add_get("/player/playback/refreshPlayQueue", self.handle_refresh_play_queue)
203 self.app.router.add_get("/player/playback/createPlayQueue", self.handle_create_play_queue)
204 self.app.router.add_get("/player/playback/pause", self.handle_pause)
205 self.app.router.add_get("/player/playback/play", self.handle_play)
206 self.app.router.add_get("/player/playback/stop", self.handle_stop)
207 self.app.router.add_get("/player/playback/skipNext", self.handle_skip_next)
208 self.app.router.add_get("/player/playback/skipPrevious", self.handle_skip_previous)
209 self.app.router.add_get("/player/playback/stepForward", self.handle_step_forward)
210 self.app.router.add_get("/player/playback/stepBack", self.handle_step_back)
211 self.app.router.add_get("/player/playback/seekTo", self.handle_seek_to)
212 self.app.router.add_get("/player/playback/setParameters", self.handle_set_parameters)
213 self.app.router.add_get("/player/playback/skipTo", self.handle_skip_to)
214
215 # Resources endpoint
216 self.app.router.add_get("/resources", self.handle_resources)
217
218 # CORS OPTIONS handler (for all routes)
219 self.app.router.add_route("OPTIONS", "/{tail:.*}", self.handle_options)
220
221 # --- Catch-all fallback for debugging purposes ---
222 # self.app.router.add_route("*", "/{path_info:.*}", self.handle_unknown)
223
224 async def start(self) -> None:
225 """Start HTTP server and GDM advertising."""
226 self.runner = web.AppRunner(self.app)
227 await self.runner.setup()
228
229 # Start HTTP server
230 self.http_site = web.TCPSite(self.runner, "0.0.0.0", self.port)
231 await self.http_site.start()
232 LOGGER.info(f"Plex remote control server started on HTTP port {self.port}")
233
234 # Note: GDM advertising is handled by PlexProvider in __init__.py
235 # to avoid duplicate broadcasts
236
237 # Subscribe to player and queue events for state synchronization
238 if self._ma_player_id:
239 self._unsub_callbacks.append(
240 self.provider.mass.subscribe(
241 self._handle_player_event,
242 EventType.PLAYER_UPDATED,
243 id_filter=self._ma_player_id,
244 )
245 )
246 self._unsub_callbacks.append(
247 self.provider.mass.subscribe(
248 self._handle_queue_event,
249 EventType.QUEUE_UPDATED,
250 id_filter=self._ma_player_id,
251 )
252 )
253 self._unsub_callbacks.append(
254 self.provider.mass.subscribe(
255 self._handle_queue_event,
256 EventType.QUEUE_TIME_UPDATED,
257 id_filter=self._ma_player_id,
258 )
259 )
260 self._unsub_callbacks.append(
261 self.provider.mass.subscribe(
262 self._handle_queue_items_updated,
263 EventType.QUEUE_ITEMS_UPDATED,
264 id_filter=self._ma_player_id,
265 )
266 )
267
268 async def stop(self) -> None:
269 """Stop the HTTP server."""
270 # Unsubscribe from events
271 for unsub in self._unsub_callbacks:
272 unsub()
273 self._unsub_callbacks.clear()
274
275 # Stop HTTP server
276 if self.http_site:
277 await self.http_site.stop()
278 if self.runner:
279 await self.runner.cleanup()
280 LOGGER.info("Plex remote control server stopped")
281
282 async def handle_root(self, request: web.Request) -> web.Response:
283 """Handle root endpoint - return basic player info."""
284 # Get player name
285 player_name = "Music Assistant"
286 if self._ma_player_id:
287 player = self.provider.mass.players.get_player(self._ma_player_id)
288 if player:
289 player_name = player.display_name
290
291 xml = f"""<?xml version="1.0" encoding="UTF-8"?>
292<MediaContainer machineIdentifier="{self.client_id}" version="1.0">
293 <Player title="{player_name}" machineIdentifier="{self.client_id}"/>
294</MediaContainer>"""
295 return web.Response(
296 text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"}
297 )
298
299 async def handle_subscribe(self, request: web.Request) -> web.Response:
300 """Handle timeline subscription from controller."""
301 client_id = request.headers.get("X-Plex-Client-Identifier")
302 protocol = request.query.get("protocol", "http")
303 port = request.query.get("port")
304 command_id = int(request.query.get("commandID", 0))
305
306 if not client_id or not port:
307 return web.Response(status=400)
308
309 self.subscriptions[client_id] = {
310 "url": f"{protocol}://{request.remote}:{port}",
311 "command_id": command_id,
312 "last_update": time.time(),
313 }
314
315 LOGGER.info(f"Controller {client_id} subscribed for timeline updates")
316 await self._send_timeline(client_id)
317 return web.Response(status=200)
318
319 async def handle_unsubscribe(self, request: web.Request) -> web.Response:
320 """Handle unsubscribe request."""
321 client_id = request.headers.get("X-Plex-Client-Identifier")
322 if client_id in self.subscriptions:
323 del self.subscriptions[client_id]
324 LOGGER.info(f"Controller {client_id} unsubscribed")
325 return web.Response(status=200)
326
327 async def handle_poll(self, request: web.Request) -> web.Response:
328 """Handle timeline poll request."""
329 # Extract parameters
330 include_metadata = request.query.get("includeMetadata", "0") == "1"
331 command_id = request.query.get("commandID", "0")
332
333 # Update subscription timestamp if this client is subscribed
334 client_id = request.headers.get("X-Plex-Client-Identifier")
335 if client_id and client_id in self.subscriptions:
336 self.subscriptions[client_id]["last_update"] = time.time()
337
338 # Build timeline from current MA player state
339 timeline_xml = await self._build_timeline_xml(
340 include_metadata=include_metadata, command_id=command_id
341 )
342 return web.Response(
343 text=timeline_xml,
344 content_type="text/xml",
345 headers={
346 "X-Plex-Client-Identifier": self.client_id,
347 "Access-Control-Expose-Headers": "X-Plex-Client-Identifier",
348 "Access-Control-Allow-Origin": "*",
349 },
350 )
351
352 async def _ungroup_player_if_needed(self, player_id: str) -> None:
353 """Ungroup player before playback if it's part of a group/sync."""
354 player = self.provider.mass.players.get_player(player_id)
355 if not player or player.type == PlayerType.GROUP:
356 return
357
358 if not (player.state.synced_to or player.state.group_members or player.state.active_group):
359 return
360
361 LOGGER.debug("Ungrouping player %s before starting playback from Plex", player.display_name)
362 # Use set_members directly on the group to bypass static member check
363 if (
364 player.state.active_group
365 and (group := self.provider.mass.players.get_player(player.state.active_group))
366 and group.supports_feature(PlayerFeature.SET_MEMBERS)
367 ):
368 await group.set_members(player_ids_to_remove=[player_id])
369 elif (
370 player.state.synced_to
371 and (sync_leader := self.provider.mass.players.get_player(player.state.synced_to))
372 and sync_leader.supports_feature(PlayerFeature.SET_MEMBERS)
373 ):
374 await sync_leader.set_members(player_ids_to_remove=[player_id])
375 elif player.state.group_members and player.supports_feature(PlayerFeature.SET_MEMBERS):
376 await player.set_members(player_ids_to_remove=player.group_members)
377
378 async def handle_play_media(self, request: web.Request) -> web.Response:
379 """
380 Handle playMedia command from Plex controller.
381
382 Plexamp sends various parameters:
383 - key: The item to play (track, album, playlist, etc.)
384 - containerKey: The container context (play queue)
385 - offset: Starting position in milliseconds
386 - shuffle: Whether to shuffle
387 - repeat: Repeat mode
388 """
389 # Set flag to prevent circular updates
390 self._updating_from_plex = True
391 try:
392 key = request.query.get("key")
393 container_key = request.query.get("containerKey")
394 offset = int(request.query.get("offset", 0))
395 shuffle = request.query.get("shuffle", "0") == "1"
396
397 if not key:
398 return web.Response(
399 status=400, text="Missing required 'key' parameter for playMedia command"
400 )
401
402 LOGGER.info(
403 f"Received playMedia command - key: {key}, "
404 f"containerKey: {container_key}, offset: {offset}ms"
405 )
406
407 # Use the assigned player for this server instance
408 player_id = self._ma_player_id
409 if not player_id:
410 return web.Response(status=500, text="No player assigned to this server")
411
412 # Ungroup player if it's part of a group/sync
413 # User selected this specific player, so remove from any groups
414 await self._ungroup_player_if_needed(player_id)
415
416 if container_key and "/playQueues/" in container_key:
417 # Extract play queue ID from container key
418 queue_id_match = re.search(r"/playQueues/(\d+)", container_key)
419 if queue_id_match:
420 self.play_queue_id = queue_id_match.group(1)
421 self.play_queue_version = 1
422 LOGGER.info(f"Playing from queue: {container_key} starting at {key}")
423
424 await self._play_from_plex_queue(player_id, container_key, key, shuffle, offset)
425 else:
426 # Reset queue tracking if no valid queue ID found
427 self.play_queue_id = None
428 self.play_queue_item_ids = {}
429 # Fall back to single track
430 media = await self._resolve_plex_item(key)
431 await self.provider.mass.player_queues.play_media(
432 queue_id=player_id,
433 media=media, # type: ignore[arg-type]
434 option=QueueOption.REPLACE,
435 )
436 elif container_key:
437 # Playing from a regular container (album, playlist, artist) not a play queue
438 # Reset queue tracking
439 self.play_queue_id = None
440 self.play_queue_item_ids = {}
441
442 # The key is the specific track, containerKey is the collection
443 media_to_play = await self._resolve_plex_item(container_key)
444
445 # Queue the entire container
446 await self.provider.mass.player_queues.play_media(
447 queue_id=player_id,
448 media=media_to_play, # type: ignore[arg-type]
449 option=QueueOption.REPLACE,
450 )
451
452 else:
453 # Playing a single item, reset queue tracking
454 self.play_queue_id = None
455 self.play_queue_item_ids = {}
456
457 media = await self._resolve_plex_item(key)
458
459 # Replace the queue with this media
460 await self.provider.mass.player_queues.play_media(
461 queue_id=player_id,
462 media=media, # type: ignore[arg-type]
463 option=QueueOption.REPLACE,
464 )
465
466 # Set shuffle if requested
467 if shuffle:
468 await self.provider.mass.player_queues.set_shuffle(player_id, shuffle)
469
470 # Seek to offset if specified
471 if offset > 0:
472 await self._seek_to_offset_after_playback(player_id, offset)
473
474 await self._broadcast_timeline()
475 return web.Response(status=200)
476
477 except Exception as e:
478 LOGGER.exception(f"Error handling playMedia: {e}")
479 return web.Response(status=500, text=str(e))
480 finally:
481 # Clear flag after processing
482 self._updating_from_plex = False
483
484 def _reorder_tracks_for_playback(
485 self, tracks: list[Any], start_index: int
486 ) -> tuple[list[Any], dict[int, int]]:
487 """Reorder tracks to start from a specific index and update item ID mappings.
488
489 :param tracks: List of tracks to reorder.
490 :param start_index: Index of the track to start from.
491 :return: Tuple of (reordered tracks, updated item ID mappings).
492 """
493 if start_index <= 0 or start_index >= len(tracks):
494 # No reordering needed
495 return tracks, self.play_queue_item_ids
496
497 # Reorder: [selected track, tracks after it, tracks before it]
498 reordered_tracks = (
499 tracks[start_index:] # From selected to end
500 + tracks[:start_index] # From start to selected
501 )
502
503 # Update play queue item ID mappings to reflect new order
504 new_item_ids = {}
505 for new_idx, old_idx in enumerate(
506 list(range(start_index, len(tracks))) + list(range(start_index))
507 ):
508 if old_idx in self.play_queue_item_ids:
509 new_item_ids[new_idx] = self.play_queue_item_ids[old_idx]
510
511 LOGGER.info(f"Started playback from offset {start_index} (reordered queue)")
512 return reordered_tracks, new_item_ids
513
514 async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> None:
515 """Seek to the specified offset after playback starts.
516
517 :param player_id: The player ID to seek on.
518 :param offset: The offset in milliseconds.
519 """
520 # Wait for the queue to have items loaded before seeking
521 for _ in range(10): # Try up to 10 times (5 seconds total)
522 await asyncio.sleep(0.5)
523 queue = self.provider.mass.player_queues.get(player_id)
524 if queue and queue.current_item:
525 try:
526 await self.provider.mass.players.cmd_seek(player_id, offset // 1000)
527 # Wait briefly for player state to update
528 await asyncio.sleep(0.1)
529 break
530 except Exception as e:
531 LOGGER.debug(f"Could not seek to offset {offset}ms: {e}")
532 break
533 else:
534 LOGGER.warning("Queue not ready for seeking after timeout")
535
536 async def _play_from_plex_queue(
537 self,
538 player_id: str,
539 container_key: str,
540 starting_key: str | None,
541 shuffle: bool,
542 offset: int,
543 ) -> None:
544 """Fetch play queue from Plex and load tracks.
545
546 Starts playback immediately with the first track,
547 then loads remaining tracks in the background.
548 """
549 try:
550 LOGGER.info(f"Fetching play queue: {container_key}")
551
552 # Extract queue ID from container_key (e.g., "/playQueues/123" -> "123")
553 queue_id_match = re.search(r"/playQueues/(\d+)", container_key)
554 if not queue_id_match:
555 raise ValueError(f"Invalid container_key format: {container_key}")
556
557 queue_id = queue_id_match.group(1)
558
559 # Use plexapi to fetch the play queue
560 def fetch_queue() -> PlayQueue:
561 return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id)
562
563 playqueue = await asyncio.to_thread(fetch_queue)
564
565 if playqueue and playqueue.items:
566 # Get selected item offset from PlayQueue - this tells us which track to start from
567 selected_offset = getattr(playqueue, "playQueueSelectedItemOffset", 0)
568 LOGGER.info(f"PlayQueue selected item offset: {selected_offset}")
569
570 # Track play queue item IDs
571 self.play_queue_item_ids = {}
572
573 # Fetch the first track to start playback immediately
574 first_item = (
575 playqueue.items[selected_offset]
576 if selected_offset < len(playqueue.items)
577 else playqueue.items[0]
578 )
579 first_track_key = first_item.key if hasattr(first_item, "key") else None
580 first_play_queue_item_id = (
581 first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None
582 )
583
584 if not first_track_key:
585 LOGGER.error("No valid first track in play queue")
586 if starting_key:
587 track = await self.provider.get_track(starting_key)
588 await self.provider.mass.player_queues.play_media(
589 queue_id=player_id,
590 media=track,
591 option=QueueOption.REPLACE,
592 )
593 return
594
595 # Fetch and start playing the first track immediately
596 try:
597 first_track = await self.provider.get_track(first_track_key)
598 LOGGER.info(f"Starting playback with first track: {first_track.name}")
599
600 # Store first track's play queue item ID mapping
601 if first_play_queue_item_id:
602 self.play_queue_item_ids[0] = first_play_queue_item_id
603
604 # Start playback immediately with just the first track
605 await self.provider.mass.player_queues.play_media(
606 queue_id=player_id,
607 media=first_track,
608 option=QueueOption.REPLACE,
609 )
610
611 # Seek to offset if specified (do this before loading remaining tracks)
612 if offset > 0:
613 await self._seek_to_offset_after_playback(player_id, offset)
614
615 # Broadcast timeline update immediately
616 await self._broadcast_timeline()
617
618 # Now load the remaining tracks in the background
619 self.provider.mass.create_task(
620 self._load_remaining_queue_tracks(
621 player_id, playqueue, selected_offset, shuffle
622 )
623 )
624
625 except Exception as e:
626 LOGGER.exception(f"Error starting playback with first track: {e}")
627 # Fall back to single track
628 if starting_key:
629 track = await self.provider.get_track(starting_key)
630 await self.provider.mass.player_queues.play_media(
631 queue_id=player_id,
632 media=track,
633 option=QueueOption.REPLACE,
634 )
635 else:
636 LOGGER.error("Play queue is empty or could not be fetched")
637 # Fall back to single track
638 if starting_key:
639 track = await self.provider.get_track(starting_key)
640 await self.provider.mass.player_queues.play_media(
641 queue_id=player_id,
642 media=track,
643 option=QueueOption.REPLACE,
644 )
645
646 except Exception as e:
647 LOGGER.exception(f"Error playing from queue: {e}")
648 # Fall back to single track
649 if starting_key:
650 track = await self.provider.get_track(starting_key)
651 await self.provider.mass.player_queues.play_media(
652 queue_id=player_id,
653 media=track,
654 option=QueueOption.REPLACE,
655 )
656
657 async def _load_remaining_queue_tracks(
658 self,
659 player_id: str,
660 playqueue: PlayQueue,
661 selected_offset: int,
662 shuffle: bool,
663 ) -> None:
664 """Load remaining tracks from play queue in the background.
665
666 :param player_id: The Music Assistant player ID.
667 :param playqueue: The Plex play queue.
668 :param selected_offset: The offset of the track that's already playing.
669 :param shuffle: Whether shuffle is enabled.
670 """
671 try:
672 # Prepare to fetch all tracks except the first one
673 remaining_items = []
674
675 # Get items after selected track
676 for i in range(selected_offset + 1, len(playqueue.items)):
677 remaining_items.append((i, playqueue.items[i]))
678
679 # Get items before selected track (these will be added at the end)
680 for i in range(selected_offset):
681 remaining_items.append((i, playqueue.items[i]))
682
683 if not remaining_items:
684 LOGGER.debug("No remaining tracks to load")
685 return
686
687 # Fetch all remaining tracks concurrently
688 async def fetch_track(
689 plex_idx: int, item: Any
690 ) -> tuple[int, object | None, int | None]:
691 """Fetch a single track from Plex."""
692 track_key = item.key if hasattr(item, "key") else None
693 play_queue_item_id = (
694 item.playQueueItemID if hasattr(item, "playQueueItemID") else None
695 )
696
697 if track_key:
698 try:
699 track = await self.provider.get_track(track_key)
700 return plex_idx, track, play_queue_item_id
701 except Exception as e:
702 LOGGER.debug(f"Could not fetch track {track_key}: {e}")
703
704 return plex_idx, None, None
705
706 # Fetch all tracks in parallel
707 fetch_tasks = [fetch_track(idx, item) for idx, item in remaining_items]
708 results = await asyncio.gather(*fetch_tasks, return_exceptions=True)
709
710 # Process results and build track list
711 tracks_to_add: list[object] = []
712 for result in results:
713 if isinstance(result, Exception):
714 LOGGER.debug(f"Error fetching track: {result}")
715 continue
716
717 # result is guaranteed to be a tuple here after the Exception check
718 _plex_idx, track, play_queue_item_id = result # type: ignore[misc]
719 if track:
720 ma_idx = len(tracks_to_add) + 1 # +1 because first track is already queued
721 tracks_to_add.append(track)
722
723 # Store play queue item ID mapping
724 if play_queue_item_id:
725 self.play_queue_item_ids[ma_idx] = play_queue_item_id
726
727 if tracks_to_add:
728 LOGGER.info(f"Adding {len(tracks_to_add)} remaining tracks to queue")
729
730 # Add remaining tracks to the queue
731 await self.provider.mass.player_queues.play_media(
732 queue_id=player_id,
733 media=tracks_to_add, # type: ignore[arg-type]
734 option=QueueOption.ADD,
735 )
736
737 # Update tracked state to prevent sync loop
738 queue_items = self.provider.mass.player_queues.items(player_id)
739 synced_keys = []
740 for item in queue_items:
741 if item.media_item:
742 for mapping in item.media_item.provider_mappings:
743 if mapping.provider_instance == self.provider.instance_id:
744 synced_keys.append(mapping.item_id)
745 break
746 self._last_synced_ma_queue_length = len(synced_keys)
747 self._last_synced_ma_queue_keys = synced_keys
748
749 # Apply shuffle if requested (after all tracks are loaded)
750 if shuffle:
751 await self.provider.mass.player_queues.set_shuffle(player_id, shuffle)
752
753 LOGGER.info(
754 f"Successfully loaded {len(tracks_to_add)} remaining tracks "
755 f"(total queue: {len(synced_keys)} tracks)"
756 )
757 else:
758 LOGGER.warning("No valid remaining tracks found in play queue")
759
760 except Exception as e:
761 LOGGER.exception(f"Error loading remaining queue tracks: {e}")
762
763 async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None:
764 """Replace the entire queue when nothing is currently playing.
765
766 :param player_id: The Music Assistant player ID.
767 :param playqueue: The Plex play queue to load.
768 """
769 all_tracks = []
770 self.play_queue_item_ids = {}
771
772 for i, item in enumerate(playqueue.items):
773 track_key = item.key if hasattr(item, "key") else None
774 play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None
775
776 if track_key:
777 try:
778 track = await self.provider.get_track(track_key)
779 all_tracks.append(track)
780
781 if play_queue_item_id:
782 self.play_queue_item_ids[len(all_tracks) - 1] = play_queue_item_id
783 except Exception as e:
784 LOGGER.debug(f"Could not fetch track {track_key}: {e}")
785 continue
786
787 if all_tracks:
788 await self.provider.mass.player_queues.play_media(
789 queue_id=player_id,
790 media=all_tracks, # type: ignore[arg-type]
791 option=QueueOption.REPLACE,
792 )
793 LOGGER.info(f"Replaced queue with {len(all_tracks)} tracks")
794
795 async def _replace_remaining_queue(
796 self, player_id: str, playqueue: PlayQueue, current_index: int
797 ) -> None:
798 """Replace only items after the current track.
799
800 :param player_id: The Music Assistant player ID.
801 :param playqueue: The Plex play queue to load.
802 :param current_index: The current track index in the MA queue.
803 """
804 # Fetch tracks that come AFTER the current track in the Plex queue
805 remaining_tracks = []
806 new_item_mappings = {}
807
808 # Start from the track after current_index
809 for i in range(current_index + 1, len(playqueue.items)):
810 item = playqueue.items[i]
811 track_key = item.key if hasattr(item, "key") else None
812 play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None
813
814 if track_key:
815 try:
816 track = await self.provider.get_track(track_key)
817 remaining_tracks.append(track)
818
819 # Map relative to the current position
820 if play_queue_item_id:
821 new_item_mappings[current_index + 1 + len(remaining_tracks) - 1] = (
822 play_queue_item_id
823 )
824 except Exception as e:
825 LOGGER.debug(f"Could not fetch track {track_key}: {e}")
826 continue
827
828 # Replace items after current track
829 if remaining_tracks:
830 await self.provider.mass.player_queues.play_media(
831 queue_id=player_id,
832 media=remaining_tracks, # type: ignore[arg-type]
833 option=QueueOption.REPLACE_NEXT, # Replace everything after current
834 )
835 # Update mappings for the new items
836 self.play_queue_item_ids.update(new_item_mappings)
837
838 LOGGER.info(
839 f"Replaced {len(remaining_tracks)} tracks after current track "
840 f"(index {current_index})"
841 )
842 else:
843 # No tracks after current - clear remaining queue
844 LOGGER.debug("No tracks after current track in Plex queue")
845
846 # Rebuild complete item ID mappings from Plex queue
847 # Keep mappings for tracks from index 0 to current_index unchanged
848 for i, item in enumerate(playqueue.items):
849 play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None
850 if play_queue_item_id:
851 self.play_queue_item_ids[i] = play_queue_item_id
852
853 async def handle_refresh_play_queue(self, request: web.Request) -> web.Response:
854 """
855 Handle refreshPlayQueue command from Plex controller.
856
857 This is called when the play queue is modified (items added, removed, reordered).
858 We need to sync the entire updated queue state to MA while preserving playback.
859 """
860 try:
861 play_queue_id = request.query.get("playQueueID")
862
863 if not play_queue_id:
864 return web.Response(status=400, text="Missing 'playQueueID' parameter")
865
866 # Log all query parameters to understand what Plex sends
867 LOGGER.info(
868 f"Received refreshPlayQueue command - playQueueID: {play_queue_id}, "
869 f"params: {dict(request.query)}"
870 )
871
872 # Verify this is our active play queue
873 if self.play_queue_id != play_queue_id:
874 LOGGER.warning(
875 f"Refresh requested for queue {play_queue_id} but active queue is "
876 f"{self.play_queue_id}"
877 )
878 return web.Response(
879 status=409,
880 text=(
881 f"Requested playQueueID {play_queue_id} does not match "
882 f"active queue {self.play_queue_id}"
883 ),
884 )
885
886 # Update the play queue version (increments on each refresh)
887 self.play_queue_version += 1
888
889 # Use plexapi to fetch the updated play queue
890 def fetch_queue() -> PlayQueue:
891 return PlayQueue.get(self.provider._plex_server, playQueueID=play_queue_id)
892
893 playqueue = await asyncio.to_thread(fetch_queue)
894
895 if not playqueue or not playqueue.items:
896 LOGGER.error("Failed to refresh play queue - queue is empty or not found")
897 return web.Response(status=404, text="Play queue not found")
898
899 # Get current MA queue state
900 player_id = self._ma_player_id
901 if not player_id:
902 LOGGER.error("No player assigned to this server")
903 return web.Response(status=500, text="No player assigned")
904
905 # disable shuffle to avoid infinite loop
906 await self.provider.mass.player_queues.set_shuffle(player_id, False)
907 ma_queue = self.provider.mass.player_queues.get(player_id)
908 if not ma_queue:
909 LOGGER.error(f"MA queue not found for player {player_id}")
910 return web.Response(status=500, text="MA queue not found")
911
912 # Get current playback state
913 current_index = ma_queue.current_index
914
915 # Get MA queue item count
916 ma_queue_items = self.provider.mass.player_queues.items(player_id)
917 ma_queue_count = len(ma_queue_items) if ma_queue_items else 0
918
919 LOGGER.debug(
920 f"Queue refresh: Current index={current_index}, "
921 f"MA has {ma_queue_count} items, Plex has {len(playqueue.items)} items"
922 )
923
924 # If nothing is playing, replace the entire queue
925 if current_index is None:
926 LOGGER.debug("No track currently playing, replacing entire queue")
927 await self._replace_entire_queue(player_id, playqueue)
928 else:
929 # Something is playing - update only the remaining queue items
930 LOGGER.debug(
931 f"Track at index {current_index} is playing, "
932 f"replacing only items after current track"
933 )
934 await self._replace_remaining_queue(player_id, playqueue, current_index)
935
936 LOGGER.info(
937 f"Refreshed play queue {play_queue_id} - now has {len(playqueue.items)} items"
938 )
939
940 # Update tracked state to prevent sync loop
941 # Get what's actually in MA queue after the refresh
942 queue_items_after = self.provider.mass.player_queues.items(player_id)
943 synced_keys = []
944 for item in queue_items_after:
945 if item.media_item:
946 for mapping in item.media_item.provider_mappings:
947 if mapping.provider_instance == self.provider.instance_id:
948 synced_keys.append(mapping.item_id)
949 break
950 self._last_synced_ma_queue_length = len(synced_keys)
951 self._last_synced_ma_queue_keys = synced_keys
952
953 return web.Response(status=200)
954
955 except Exception as e:
956 LOGGER.exception(f"Error handling refreshPlayQueue: {e}")
957 return web.Response(status=500, text=str(e))
958
959 async def handle_create_play_queue(self, request: web.Request) -> web.Response:
960 """
961 Handle createPlayQueue command from Plex controller.
962
963 Creates a new play queue from a URI (album, playlist, artist tracks, etc.)
964 and optionally applies shuffle.
965 """
966 try:
967 uri = request.query.get("uri")
968 shuffle = request.query.get("shuffle", "0") == "1"
969 continuous = request.query.get("continuous", "0") == "1"
970
971 if not uri:
972 return web.Response(status=400, text="Missing 'uri' parameter")
973
974 LOGGER.info(f"Received createPlayQueue command - uri: {uri}, shuffle: {shuffle}")
975
976 # Use the assigned player for this server instance
977 player_id = self._ma_player_id
978 if not player_id:
979 return web.Response(status=500, text="No player assigned to this server")
980
981 # Use plexapi to create play queue
982 def create_queue() -> PlayQueue:
983 # Fetch the item from URI first
984 item = self.provider._plex_server.fetchItem(uri)
985 # Create play queue from the item
986 return PlayQueue.create(
987 self.provider._plex_server,
988 item,
989 shuffle=1 if shuffle else 0,
990 continuous=1 if continuous else 0,
991 )
992
993 playqueue = await asyncio.to_thread(create_queue)
994
995 if playqueue and playqueue.items:
996 # Extract play queue ID from response
997 self.play_queue_id = str(playqueue.playQueueID)
998 self.play_queue_version = 1
999
1000 LOGGER.info(
1001 f"Created play queue {self.play_queue_id} with {len(playqueue.items)} items"
1002 )
1003
1004 # Fetch the first track to start playback immediately
1005 self.play_queue_item_ids = {}
1006 first_item = playqueue.items[0]
1007 first_track_key = first_item.key if hasattr(first_item, "key") else None
1008 first_play_queue_item_id = (
1009 first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None
1010 )
1011
1012 if not first_track_key:
1013 LOGGER.error("No valid first track in created play queue")
1014 return web.Response(status=500, text="Failed to load tracks from play queue")
1015
1016 try:
1017 # Fetch and start playing the first track immediately
1018 first_track = await self.provider.get_track(first_track_key)
1019 LOGGER.info(f"Starting playback with first track: {first_track.name}")
1020
1021 # Store first track's play queue item ID mapping
1022 if first_play_queue_item_id:
1023 self.play_queue_item_ids[0] = first_play_queue_item_id
1024
1025 # Start playback immediately with just the first track
1026 await self.provider.mass.player_queues.play_media(
1027 queue_id=player_id,
1028 media=first_track,
1029 option=QueueOption.REPLACE,
1030 )
1031
1032 # Now load the remaining tracks in the background
1033 if len(playqueue.items) > 1:
1034 self.provider.mass.create_task(
1035 self._load_remaining_queue_tracks(
1036 player_id,
1037 playqueue,
1038 0, # Selected offset is 0 since we started from the first track
1039 shuffle,
1040 )
1041 )
1042
1043 # Broadcast timeline update
1044 await self._broadcast_timeline()
1045 return web.Response(status=200)
1046
1047 except Exception as e:
1048 LOGGER.exception(f"Error starting playback with first track: {e}")
1049 return web.Response(status=500, text=f"Failed to start playback: {e}")
1050 else:
1051 LOGGER.error("Failed to create play queue or queue is empty")
1052 return web.Response(status=500, text="Failed to create play queue")
1053
1054 except Exception as e:
1055 LOGGER.exception(f"Error handling createPlayQueue: {e}")
1056 return web.Response(status=500, text=str(e))
1057
1058 async def _resolve_plex_item(self, key: str) -> object:
1059 """Resolve a Plex key to a Music Assistant media item."""
1060 # Determine item type from the key format
1061 if "/library/metadata/" in key:
1062 # Could be track, album, or artist
1063 # Try to fetch as track first
1064 try:
1065 return await self.provider.get_track(key)
1066 except Exception as exc:
1067 LOGGER.debug(f"Failed to resolve Plex item as track for key '{key}': {exc}")
1068
1069 # Try as album
1070 try:
1071 return await self.provider.get_album(key)
1072 except Exception as exc:
1073 LOGGER.debug(f"Failed to resolve Plex item as album for key '{key}': {exc}")
1074
1075 # Try as artist
1076 try:
1077 return await self.provider.get_artist(key)
1078 except Exception:
1079 raise ValueError(f"Could not resolve Plex item: {key}") from None
1080
1081 elif "/playlists/" in key:
1082 return await self.provider.get_playlist(key)
1083 else:
1084 raise ValueError(f"Unknown Plex key format: {key}")
1085
1086 async def handle_pause(self, request: web.Request) -> web.Response:
1087 """Handle pause command (test-client.py line 98-101)."""
1088 self._updating_from_plex = True
1089 try:
1090 if self._ma_player_id:
1091 await self.provider.mass.players.cmd_pause(self._ma_player_id)
1092 await self._broadcast_timeline()
1093 return web.Response(status=200)
1094 finally:
1095 self._updating_from_plex = False
1096
1097 async def handle_play(self, request: web.Request) -> web.Response:
1098 """Handle play/resume command (test-client.py line 103-106)."""
1099 self._updating_from_plex = True
1100 try:
1101 if self._ma_player_id:
1102 # Ungroup player before resuming playback
1103 await self._ungroup_player_if_needed(self._ma_player_id)
1104 await self.provider.mass.players.cmd_play(self._ma_player_id)
1105 await self._broadcast_timeline()
1106 return web.Response(status=200)
1107 finally:
1108 self._updating_from_plex = False
1109
1110 async def handle_stop(self, request: web.Request) -> web.Response:
1111 """Handle stop command - stops playback and clears the queue."""
1112 self._updating_from_plex = True
1113 try:
1114 if self._ma_player_id:
1115 # Clear the queue (which also stops playback)
1116 self.provider.mass.player_queues.clear(self._ma_player_id)
1117
1118 # Reset play queue tracking since the queue is now cleared
1119 self.play_queue_id = None
1120 self.play_queue_item_ids = {}
1121
1122 await self._broadcast_timeline()
1123 return web.Response(status=200)
1124 finally:
1125 self._updating_from_plex = False
1126
1127 async def handle_skip_next(self, request: web.Request) -> web.Response:
1128 """Handle skip next command."""
1129 self._updating_from_plex = True
1130 try:
1131 if self._ma_player_id:
1132 await self.provider.mass.player_queues.next(self._ma_player_id)
1133 await self._broadcast_timeline()
1134 return web.Response(status=200)
1135 finally:
1136 self._updating_from_plex = False
1137
1138 async def handle_skip_previous(self, request: web.Request) -> web.Response:
1139 """Handle skip previous command."""
1140 self._updating_from_plex = True
1141 try:
1142 if self._ma_player_id:
1143 await self.provider.mass.player_queues.previous(self._ma_player_id)
1144 await self._broadcast_timeline()
1145 return web.Response(status=200)
1146 finally:
1147 self._updating_from_plex = False
1148
1149 async def handle_step_forward(self, request: web.Request) -> web.Response:
1150 """Handle step forward command (small skip forward)."""
1151 self._updating_from_plex = True
1152 try:
1153 if self._ma_player_id:
1154 queue = self.provider.mass.player_queues.get(self._ma_player_id)
1155 if queue:
1156 # Step forward 30 seconds
1157 new_position = queue.corrected_elapsed_time + 30
1158 if queue.current_item and queue.current_item.media_item:
1159 # Don't seek past the track duration
1160 max_duration = queue.current_item.media_item.duration or new_position
1161 new_position = min(new_position, max_duration)
1162 await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position))
1163 # Wait briefly for player state to update
1164 await asyncio.sleep(0.1)
1165 await self._broadcast_timeline()
1166 return web.Response(status=200)
1167 finally:
1168 self._updating_from_plex = False
1169
1170 async def handle_step_back(self, request: web.Request) -> web.Response:
1171 """Handle step back command (small skip backward)."""
1172 self._updating_from_plex = True
1173 try:
1174 if self._ma_player_id:
1175 queue = self.provider.mass.player_queues.get(self._ma_player_id)
1176 if queue:
1177 # Step back 10 seconds
1178 new_position = max(0, queue.corrected_elapsed_time - 10)
1179 await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position))
1180 # Wait briefly for player state to update
1181 await asyncio.sleep(0.1)
1182 await self._broadcast_timeline()
1183 return web.Response(status=200)
1184 finally:
1185 self._updating_from_plex = False
1186
1187 async def handle_skip_to(self, request: web.Request) -> web.Response:
1188 """Handle skip to specific queue item."""
1189 key = request.query.get("key")
1190 if not self._ma_player_id or not key:
1191 return web.Response(status=400, text="Missing player ID or key")
1192
1193 self._updating_from_plex = True
1194 try:
1195 ma_index = None
1196
1197 # Check if key is a play queue item ID (numeric) or a library path
1198 if key.isdigit():
1199 # Key is a play queue item ID
1200 play_queue_item_id = int(key)
1201
1202 # Find the MA queue index for this play queue item ID
1203 for idx, pq_item_id in self.play_queue_item_ids.items():
1204 if pq_item_id == play_queue_item_id:
1205 ma_index = idx
1206 break
1207
1208 if ma_index is None:
1209 LOGGER.warning(
1210 f"Could not find MA queue index for play queue item ID: "
1211 f"{play_queue_item_id}"
1212 )
1213 return web.Response(status=404, text="Queue item not found")
1214
1215 LOGGER.info(
1216 f"Skipping to queue index {ma_index} (play queue item ID: {play_queue_item_id})"
1217 )
1218 else:
1219 # Key is a library path (e.g., "/library/metadata/856761")
1220 # Find the track in the MA queue by matching the Plex key
1221 queue_items = self.provider.mass.player_queues.items(self._ma_player_id)
1222 if not queue_items:
1223 return web.Response(status=404, text="Queue is empty")
1224
1225 for idx, item in enumerate(queue_items):
1226 if not item.media_item:
1227 continue
1228
1229 # Find Plex mapping for this track
1230 for mapping in item.media_item.provider_mappings:
1231 if (
1232 mapping.provider_instance == self.provider.instance_id
1233 and mapping.item_id == key
1234 ):
1235 ma_index = idx
1236 break
1237
1238 if ma_index is not None:
1239 break
1240
1241 if ma_index is None:
1242 LOGGER.warning(f"Could not find track with key {key} in MA queue")
1243 return web.Response(status=404, text="Track not found in queue")
1244
1245 LOGGER.info(f"Skipping to queue index {ma_index} (track key: {key})")
1246
1247 # Skip to this index in the MA queue
1248 await self.provider.mass.player_queues.play_index(self._ma_player_id, ma_index)
1249
1250 await self._broadcast_timeline()
1251 return web.Response(status=200)
1252
1253 except Exception as e:
1254 LOGGER.exception(f"Error handling skipTo: {e}")
1255 return web.Response(status=500, text=str(e))
1256 finally:
1257 self._updating_from_plex = False
1258
1259 async def handle_seek_to(self, request: web.Request) -> web.Response:
1260 """Handle seek command."""
1261 self._updating_from_plex = True
1262 try:
1263 offset_ms = int(request.query.get("offset", 0))
1264 if self._ma_player_id:
1265 await self.provider.mass.players.cmd_seek(self._ma_player_id, int(offset_ms / 1000))
1266 # Wait briefly for player state to update
1267 await asyncio.sleep(0.1)
1268 await self._broadcast_timeline()
1269 return web.Response(status=200)
1270 finally:
1271 self._updating_from_plex = False
1272
1273 async def handle_set_parameters(self, request: web.Request) -> web.Response:
1274 """Handle parameter changes (volume, shuffle, repeat)."""
1275 if not self._ma_player_id:
1276 return web.Response(status=200)
1277
1278 self._updating_from_plex = True
1279 try:
1280 if "volume" in request.query:
1281 volume = int(request.query["volume"])
1282 await self.provider.mass.players.cmd_volume_set(self._ma_player_id, volume)
1283
1284 if "shuffle" in request.query:
1285 # Plex sends shuffle as "0" or "1"
1286 shuffle = request.query["shuffle"] == "1"
1287 await self.provider.mass.player_queues.set_shuffle(self._ma_player_id, shuffle)
1288
1289 if "repeat" in request.query:
1290 # Plex repeat: 0=off, 1=repeat one, 2=repeat all
1291 repeat_value = int(request.query["repeat"])
1292
1293 # Map Plex repeat to MA repeat mode
1294 if repeat_value == 0:
1295 # Repeat off
1296 self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.OFF)
1297 elif repeat_value == 1:
1298 # Repeat one track
1299 self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ONE)
1300 elif repeat_value == 2:
1301 # Repeat all
1302 self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ALL)
1303
1304 await self._broadcast_timeline()
1305 return web.Response(status=200)
1306 finally:
1307 self._updating_from_plex = False
1308
1309 async def handle_options(self, request: web.Request) -> web.Response:
1310 """Handle OPTIONS requests for CORS (like test-client.py)."""
1311 return web.Response(
1312 status=200,
1313 headers={
1314 "Access-Control-Allow-Origin": "*",
1315 "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
1316 "Access-Control-Allow-Headers": "*",
1317 },
1318 )
1319
1320 async def handle_resources(self, request: web.Request) -> web.Response:
1321 """Return player information (matching test-client.py format exactly)."""
1322 # Get player name
1323 player_name = "Music Assistant"
1324 if self._ma_player_id:
1325 player = self.provider.mass.players.get_player(self._ma_player_id)
1326 if player:
1327 player_name = player.display_name
1328
1329 # Get player state
1330 state = "stopped"
1331 if self._ma_player_id:
1332 player = self.provider.mass.players.get_player(self._ma_player_id)
1333 if player and player.state:
1334 state_value = (
1335 player.state.value if hasattr(player.state, "value") else str(player.state)
1336 )
1337 if state_value in ["playing", "paused"]:
1338 state = state_value
1339
1340 local_ip = self.provider.mass.streams.publish_ip
1341 version = self.provider.mass.version if self.provider.mass.version != "0.0.0" else "1.0.0"
1342
1343 # Match test-client.py format exactly
1344 xml = f"""<?xml version="1.0" encoding="UTF-8"?>
1345<MediaContainer>
1346 <Player title="{player_name}"
1347 protocol="plex"
1348 protocolVersion="1"
1349 protocolCapabilities="timeline,playback,navigation,playqueues"
1350 machineIdentifier="{self.client_id}"
1351 product="Music Assistant"
1352 platform="{platform.system()}"
1353 platformVersion="{platform.release()}"
1354 deviceClass="{self.device_class}"
1355 state="{state}"
1356 address="{local_ip}"
1357 port="{self.port}"
1358 version="{version}"
1359 provides="client,player,pubsub-player">
1360 <Connection protocol="http" address="{local_ip}" port="{self.port}"
1361 uri="http://{local_ip}:{self.port}" local="1"/>
1362 </Player>
1363</MediaContainer>"""
1364 return web.Response(
1365 text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"}
1366 )
1367
1368 def _build_timeline_attributes(
1369 self,
1370 track: Any,
1371 state: str,
1372 duration: int,
1373 time: int,
1374 volume: int,
1375 shuffle: int,
1376 repeat: int,
1377 controllable: str,
1378 queue: Any | None,
1379 ) -> list[str]:
1380 """Build timeline attributes for a playing track.
1381
1382 :param track: The current track media item.
1383 :param state: Playback state (playing, paused, etc.).
1384 :param duration: Track duration in milliseconds.
1385 :param time: Current playback time in milliseconds.
1386 :param volume: Volume level (0-100).
1387 :param shuffle: Shuffle state (0 or 1).
1388 :param repeat: Repeat mode (0=off, 1=one, 2=all).
1389 :param controllable: Controllable features string.
1390 :param queue: The MA queue object.
1391 :return: List of timeline attribute strings.
1392 """
1393 # Get Plex key and ratingKey
1394 key = None
1395 rating_key = None
1396 for mapping in track.provider_mappings:
1397 if mapping.provider_instance == self.provider.instance_id:
1398 key = mapping.item_id
1399 rating_key = key.split("/")[-1]
1400 break
1401
1402 if not key:
1403 return []
1404
1405 # Server identification
1406 plex_url = urlparse(self.provider._baseurl)
1407 machine_identifier = self.provider._plex_server.machineIdentifier
1408 address = plex_url.hostname
1409 port = plex_url.port or (443 if plex_url.scheme == "https" else 32400)
1410 protocol = plex_url.scheme
1411
1412 # Build timeline attributes
1413 attrs = [
1414 f'state="{state}"',
1415 f'duration="{duration}"',
1416 f'time="{time}"',
1417 f'ratingKey="{rating_key}"',
1418 f'key="{key}"',
1419 ]
1420
1421 # Add play queue info if available
1422 if self.play_queue_id and queue:
1423 if queue.current_index is not None:
1424 play_queue_item_id = self.play_queue_item_ids.get(
1425 queue.current_index, queue.current_index + 1
1426 )
1427 attrs.append(f'playQueueItemID="{play_queue_item_id}"')
1428 attrs.append(f'playQueueID="{self.play_queue_id}"')
1429 attrs.append(f'playQueueVersion="{self.play_queue_version}"')
1430 attrs.append(f'containerKey="/playQueues/{self.play_queue_id}"')
1431
1432 # Add standard attributes
1433 attrs.extend(
1434 [
1435 'type="music"',
1436 f'volume="{volume}"',
1437 f'shuffle="{shuffle}"',
1438 f'repeat="{repeat}"',
1439 f'controllable="{controllable}"',
1440 f'machineIdentifier="{machine_identifier}"',
1441 f'address="{address}"',
1442 f'port="{port}"',
1443 f'protocol="{protocol}"',
1444 ]
1445 )
1446
1447 return attrs
1448
1449 async def _build_timeline_xml(
1450 self, include_metadata: bool = False, command_id: str = "0"
1451 ) -> str:
1452 """Build timeline XML from current Music Assistant player state."""
1453 player_id = self._ma_player_id
1454
1455 # Get MA player and queue
1456 player = self.provider.mass.players.get_player(player_id) if player_id else None
1457 queue = self.provider.mass.player_queues.get(player_id) if player_id else None
1458
1459 # Controllable features for music
1460 controllable = (
1461 "volume,repeat,skipPrevious,seekTo,stepBack,stepForward,stop,playPause,shuffle,skipNext"
1462 )
1463
1464 # Map MA playback state to Plex state (stopped, paused, playing, buffering, error)
1465 state = "stopped"
1466 if player and player.playback_state:
1467 state_value = (
1468 player.playback_state.value
1469 if hasattr(player.playback_state, "value")
1470 else str(player.playback_state)
1471 )
1472
1473 # Map MA states to Plex states
1474 if state_value == "playing":
1475 state = "playing"
1476 elif state_value == "paused":
1477 state = "paused"
1478 elif state_value == "buffering":
1479 state = "buffering"
1480 elif state_value == "idle":
1481 # Idle with a current track = paused, idle without track = stopped
1482 state = (
1483 "paused"
1484 if queue and queue.current_item and queue.current_item.media_item
1485 else "stopped"
1486 )
1487 else:
1488 state = "stopped"
1489
1490 # Get volume (0-100) - use group_volume for groups, volume_level for others
1491 volume = 0
1492 if player:
1493 volume = (
1494 int(player.group_volume)
1495 if (player.type == PlayerType.GROUP or player.group_members)
1496 else (int(player.volume_level) if player.volume_level else 0)
1497 )
1498
1499 # Get shuffle (0/1) and repeat (0=off, 1=one, 2=all)
1500 shuffle = 0
1501 repeat = 0
1502 if queue:
1503 shuffle = 1 if queue.shuffle_enabled else 0
1504 if hasattr(queue, "repeat_mode"):
1505 repeat_mode = queue.repeat_mode
1506 if hasattr(repeat_mode, "value"):
1507 repeat_value = repeat_mode.value
1508 if repeat_value == "one":
1509 repeat = 1
1510 elif repeat_value == "all":
1511 repeat = 2
1512
1513 # Build music timeline
1514 if (
1515 state in ["playing", "paused"]
1516 and queue
1517 and queue.current_item
1518 and queue.current_item.media_item
1519 ):
1520 track = queue.current_item.media_item
1521
1522 # Duration in milliseconds
1523 duration = round(track.duration * 1000) if track.duration else 0
1524
1525 # Current playback time in milliseconds
1526 time = round(queue.corrected_elapsed_time * 1000)
1527
1528 # Build timeline attributes
1529 attrs = self._build_timeline_attributes(
1530 track, state, duration, time, volume, shuffle, repeat, controllable, queue
1531 )
1532
1533 if attrs:
1534 music_timeline = f"<Timeline {' '.join(attrs)}/>"
1535 else:
1536 # No Plex mapping, send basic timeline with actual state
1537 music_timeline = (
1538 f'<Timeline state="{state}" time="{time}" type="music" volume="{volume}" '
1539 f'shuffle="{shuffle}" repeat="{repeat}" controllable="{controllable}"/>'
1540 )
1541 else:
1542 # No current track - send stopped state with time=0
1543 time = 0
1544 music_timeline = (
1545 f'<Timeline state="{state}" time="{time}" type="music" volume="{volume}" '
1546 f'shuffle="{shuffle}" repeat="{repeat}" controllable="{controllable}"/>'
1547 )
1548
1549 # Video and photo timelines (always stopped for music player)
1550 video_timeline = '<Timeline type="video" state="stopped"/>'
1551 photo_timeline = '<Timeline type="photo" state="stopped"/>'
1552
1553 # Combine all timelines
1554 return (
1555 f'<MediaContainer commandID="{command_id}">'
1556 f"{music_timeline}{video_timeline}{photo_timeline}"
1557 f"</MediaContainer>"
1558 )
1559
1560 async def _handle_player_event(self, event: MassEvent) -> None:
1561 """Handle player state change events."""
1562 if not self._ma_player_id or event.object_id != self._ma_player_id:
1563 return
1564
1565 # Skip if we're the ones making the changes
1566 if self._updating_from_plex:
1567 return
1568
1569 try:
1570 # Send timeline to Plex server (for activity tracking)
1571 await self._send_timeline_to_server()
1572
1573 # Broadcast timeline to subscribed controllers
1574 # Timeline will be built from current MA player state
1575 await self._broadcast_timeline()
1576 except Exception as e:
1577 LOGGER.debug(f"Error handling player event: {e}")
1578
1579 async def _handle_queue_event(self, event: MassEvent) -> None:
1580 """Handle queue change events."""
1581 if not self._ma_player_id or event.object_id != self._ma_player_id:
1582 return
1583
1584 # Skip if we're the ones making the changes
1585 if self._updating_from_plex:
1586 return
1587
1588 try:
1589 # Send timeline to Plex server (for activity tracking)
1590 await self._send_timeline_to_server()
1591
1592 # Broadcast timeline to subscribed controllers
1593 # Timeline will be built from current MA player state
1594 await self._broadcast_timeline()
1595 except Exception as e:
1596 LOGGER.debug(f"Error handling queue event: {e}")
1597
1598 async def _handle_queue_items_updated(self, event: MassEvent) -> None:
1599 """Handle queue items being added/removed/reordered."""
1600 if not self._ma_player_id or event.object_id != self._ma_player_id:
1601 return
1602
1603 # Skip if we're the ones making the changes
1604 if self._updating_from_plex:
1605 return
1606
1607 # Get current MA queue state
1608 queue_items = self.provider.mass.player_queues.items(self._ma_player_id)
1609 if not queue_items:
1610 return
1611
1612 current_keys = []
1613 for item in queue_items:
1614 if not item.media_item:
1615 continue
1616 # Find Plex mapping
1617 for mapping in item.media_item.provider_mappings:
1618 if mapping.provider_instance == self.provider.instance_id:
1619 current_keys.append(mapping.item_id)
1620 break
1621
1622 # Check if queue actually changed from what we last synced FROM Plex
1623 if (
1624 len(current_keys) == self._last_synced_ma_queue_length
1625 and current_keys == self._last_synced_ma_queue_keys
1626 ):
1627 # Queue hasn't changed from last sync, skip
1628 LOGGER.debug("MA queue matches last synced state, skipping Plex sync")
1629 return
1630
1631 LOGGER.info(
1632 f"MA queue changed: {self._last_synced_ma_queue_length} -> {len(current_keys)} items"
1633 )
1634
1635 # (Re)create Plex PlayQueue from MA queue
1636 try:
1637 await self._create_plex_playqueue_from_ma()
1638 # Update tracked state
1639 self._last_synced_ma_queue_length = len(current_keys)
1640 self._last_synced_ma_queue_keys = current_keys
1641 except Exception as e:
1642 LOGGER.debug(f"Error creating Plex PlayQueue: {e}")
1643
1644 # Broadcast timeline update
1645 try:
1646 await self._broadcast_timeline()
1647 except Exception as e:
1648 LOGGER.debug(f"Error broadcasting timeline: {e}")
1649
1650 async def _create_plex_playqueue_from_ma(self) -> None:
1651 """Create a new Plex PlayQueue from current MA queue."""
1652 ma_queue = self.provider.mass.player_queues.get(self._ma_player_id) # type: ignore[arg-type]
1653 queue_items = self.provider.mass.player_queues.items(self._ma_player_id) # type: ignore[arg-type]
1654
1655 if not ma_queue or not queue_items:
1656 return
1657
1658 # Fetch Plex items for all tracks in MA queue
1659 async def fetch_plex_item(plex_key: str) -> object | None:
1660 """Fetch a single Plex item."""
1661 try:
1662
1663 def fetch_item() -> object:
1664 return self.plex_server.fetchItem(plex_key)
1665
1666 return await asyncio.to_thread(fetch_item)
1667 except Exception as e:
1668 LOGGER.debug(f"Failed to fetch Plex item {plex_key}: {e}")
1669 return None
1670
1671 # Collect all fetch tasks
1672 fetch_tasks = []
1673 for item in queue_items:
1674 if not item.media_item:
1675 continue
1676
1677 # Find Plex mapping
1678 plex_key = None
1679 for mapping in item.media_item.provider_mappings:
1680 if mapping.provider_instance == self.provider.instance_id:
1681 plex_key = mapping.item_id
1682 break
1683
1684 if plex_key:
1685 fetch_tasks.append(fetch_plex_item(plex_key))
1686
1687 # Fetch all items concurrently
1688 plex_items = []
1689 if fetch_tasks:
1690 fetched_items = await asyncio.gather(*fetch_tasks, return_exceptions=True)
1691 plex_items = [item for item in fetched_items if item is not None]
1692
1693 if not plex_items:
1694 LOGGER.debug("No Plex tracks in MA queue, skipping PlayQueue creation")
1695 return
1696
1697 # Determine which track should be selected (currently playing)
1698 start_item = None
1699 if ma_queue.current_index is not None and ma_queue.current_index < len(plex_items):
1700 start_item = plex_items[ma_queue.current_index]
1701
1702 # Create Plex PlayQueue - don't pass shuffle since MA queue is already in desired order
1703 def create_queue() -> PlayQueue:
1704 return PlayQueue.create(
1705 self.plex_server,
1706 items=plex_items,
1707 startItem=start_item,
1708 shuffle=0, # Don't shuffle, plex_items is already in MA queue order
1709 continuous=1,
1710 )
1711
1712 try:
1713 playqueue = await asyncio.to_thread(create_queue)
1714
1715 if playqueue:
1716 self.play_queue_id = str(playqueue.playQueueID)
1717 self.play_queue_version = playqueue.playQueueVersion
1718
1719 # Build item ID mappings
1720 self.play_queue_item_ids = {}
1721 for i, item in enumerate(playqueue.items):
1722 if hasattr(item, "playQueueItemID"):
1723 self.play_queue_item_ids[i] = item.playQueueItemID
1724
1725 LOGGER.info(
1726 f"Created Plex PlayQueue {self.play_queue_id} with {len(plex_items)} tracks"
1727 )
1728 except Exception as e:
1729 LOGGER.exception(f"Error creating Plex PlayQueue: {e}")
1730
1731 async def _send_timeline(self, client_id: str) -> None:
1732 """Send timeline update to specific controller."""
1733 subscription = self.subscriptions.get(client_id)
1734 if not subscription:
1735 return
1736
1737 timeline_xml = await self._build_timeline_xml()
1738
1739 try:
1740 await self.provider.mass.http_session.post(
1741 f"{subscription['url']}/:/timeline",
1742 data=timeline_xml,
1743 headers={
1744 "X-Plex-Client-Identifier": self.client_id,
1745 "Content-Type": "text/xml",
1746 },
1747 timeout=ClientTimeout(total=5),
1748 )
1749 # Update last_update timestamp on successful send
1750 subscription["last_update"] = time.time()
1751 except Exception as e:
1752 LOGGER.debug(f"Failed to send timeline to {client_id}: {e}")
1753
1754 async def _send_timeline_to_server(self) -> None:
1755 """Send timeline update to Plex server for activity tracking."""
1756 if not self._ma_player_id:
1757 return
1758
1759 try:
1760 player = self.provider.mass.players.get_player(self._ma_player_id)
1761 queue = self.provider.mass.player_queues.get(self._ma_player_id)
1762
1763 if (
1764 not player
1765 or not queue
1766 or not queue.current_item
1767 or not queue.current_item.media_item
1768 ):
1769 return
1770
1771 track = queue.current_item.media_item
1772
1773 # Find Plex mapping
1774 plex_key = None
1775 for mapping in track.provider_mappings:
1776 if mapping.provider_instance == self.provider.instance_id:
1777 plex_key = mapping.item_id
1778 break
1779
1780 if not plex_key:
1781 return
1782
1783 # Extract rating key from plex_key (e.g., "/library/metadata/12345" -> "12345")
1784 rating_key = plex_key.split("/")[-1]
1785
1786 # Get playback state
1787 state_value = (
1788 player.playback_state.value
1789 if hasattr(player.playback_state, "value")
1790 else str(player.playback_state)
1791 )
1792
1793 # Map to Plex state
1794 if state_value == "playing":
1795 plex_state = "playing"
1796 elif state_value == "paused":
1797 plex_state = "paused"
1798 else:
1799 plex_state = "stopped"
1800
1801 # Get position and duration in milliseconds
1802 position_ms = round(queue.corrected_elapsed_time * 1000)
1803 duration_ms = round(track.duration * 1000) if track.duration else 0
1804
1805 # Get play queue info if available
1806 container_key = ""
1807 play_queue_item_id = ""
1808 if self.play_queue_id:
1809 container_key = f"/playQueues/{self.play_queue_id}"
1810 if queue.current_index is not None:
1811 play_queue_item_id = str(
1812 self.play_queue_item_ids.get(queue.current_index, queue.current_index + 1)
1813 )
1814
1815 # Build timeline params (only Plex timeline data)
1816 params = {
1817 "ratingKey": rating_key,
1818 "key": plex_key,
1819 "state": plex_state,
1820 "time": str(position_ms),
1821 "duration": str(duration_ms),
1822 }
1823
1824 # Add play queue info if available
1825 if container_key:
1826 params["containerKey"] = container_key
1827 if play_queue_item_id:
1828 params["playQueueItemID"] = play_queue_item_id
1829
1830 def send_timeline() -> None:
1831 # Pass session headers to identify this specific player instance
1832 self.plex_server.query("/:/timeline", params=params, headers=self.headers)
1833
1834 await asyncio.to_thread(send_timeline)
1835
1836 except Exception as e:
1837 LOGGER.debug(f"Failed to send timeline to Plex server: {e}")
1838
1839 async def _broadcast_timeline(self) -> None:
1840 """Send timeline to all subscribed controllers."""
1841 current_time = time.time()
1842 stale_clients = []
1843 for client_id, sub in self.subscriptions.items():
1844 try:
1845 last_update = float(sub["last_update"]) # type: ignore[arg-type]
1846 if current_time - last_update > 90:
1847 stale_clients.append(client_id)
1848 except (ValueError, TypeError):
1849 # If conversion fails, treat client as stale
1850 LOGGER.debug(f"Invalid last_update for client {client_id}, treating as stale")
1851 stale_clients.append(client_id)
1852
1853 for client_id in stale_clients:
1854 del self.subscriptions[client_id]
1855
1856 await asyncio.gather(
1857 *(self._send_timeline(client_id) for client_id in list(self.subscriptions.keys())),
1858 return_exceptions=True, # Don't fail all if one fails
1859 )
1860
1861 # for debugging purposes only
1862 # async def handle_unknown(self, request: web.Request) -> web.Response:
1863 # """Catch-all handler for unexpected or unsupported paths."""
1864 # LOGGER.debug(
1865 # "Unhandled request: %s %s from %s",
1866 # request.method,
1867 # request.path,
1868 # request.remote,
1869 # )
1870 #
1871 # # You can log query/body if needed (be careful not to leak tokens)
1872 # if request.query:
1873 # LOGGER.debug("Query params for %s: %s", request.path, dict(request.query))
1874 # try:
1875 # data = await request.text()
1876 # if data:
1877 # LOGGER.debug("Body for %s: %s", request.path, data)
1878 # except Exception as e:
1879 # LOGGER.debug("Could not read request body: %s", e)
1880 #
1881 # return web.Response(status=404, text=f"Unhandled path: {request.path}")
1882