music-assistant-server

75.7 KBPY
player_remote.py
75.7 KB1,882 lines • python
1"""Per-player Plex remote control instances."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7import platform
8import re
9import time
10import uuid
11from collections.abc import Callable
12from typing import TYPE_CHECKING, Any
13from urllib.parse import urlparse
14
15from aiohttp import ClientTimeout, web
16from music_assistant_models.enums import (
17    EventType,
18    PlayerFeature,
19    PlayerType,
20    QueueOption,
21    RepeatMode,
22)
23from plexapi.playqueue import PlayQueue
24
25from .gdm import PlexGDMAdvertiser
26
27if TYPE_CHECKING:
28    from music_assistant_models.event import MassEvent
29
30    from music_assistant.providers.plex import PlexProvider
31
32
33LOGGER = logging.getLogger(__name__)
34
35
36class PlayerRemoteInstance:
37    """Single remote control instance for one MA player."""
38
39    def __init__(
40        self,
41        plex_provider: PlexProvider,
42        ma_player_id: str,
43        player_name: str,
44        port: int,
45        device_class: str = "speaker",
46        remote_control: bool = False,
47    ) -> None:
48        """Initialize player remote instance.
49
50        :param plex_provider: Plex provider instance.
51        :param ma_player_id: Music Assistant player ID.
52        :param player_name: Display name for the player.
53        :param port: Port for the remote control server.
54        :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud).
55        :param remote_control: Whether to enable remote control.
56        """
57        self.plex_provider = plex_provider
58        self.plex_server = plex_provider._plex_server
59        self.ma_player_id = ma_player_id
60        self.player_name = player_name
61        self.port = port
62        self.device_class = device_class
63        self.remote_control = remote_control
64
65        self.client_id = str(
66            uuid.uuid5(
67                uuid.NAMESPACE_DNS,
68                f"music-assistant-plex-{plex_provider.instance_id}-{ma_player_id}",
69            )
70        )
71
72        if self.remote_control:
73            # Remote control server
74            self.server: PlexRemoteControlServer | None = None
75            # GDM advertiser
76            self.gdm: PlexGDMAdvertiser | None = None
77
78    async def start(self) -> None:
79        """Start this player's remote control."""
80        if self.remote_control:
81            # Create player-specific PlexServer instance with unique client identification
82            LOGGER.info(
83                f"Created PlexServer for '{self.player_name}' with client ID: {self.client_id}"
84            )
85
86            self.server = PlexRemoteControlServer(
87                plex_provider=self.plex_provider,
88                port=self.port,
89                client_id=self.client_id,
90                ma_player_id=self.ma_player_id,
91                device_class=self.device_class,
92            )
93            LOGGER.info(
94                f"Remote control server for '{self.player_name}' bound to MA player: "
95                f"{self.ma_player_id}"
96            )
97
98            await self.server.start()
99
100            # Step 4: Start GDM broadcasting
101            self.gdm = PlexGDMAdvertiser(
102                instance_id=self.client_id,
103                port=self.port,
104                publish_ip=str(self.plex_provider.mass.streams.publish_ip),
105                name=self.player_name,
106                product="Music Assistant",
107                version=self.plex_provider.mass.version
108                if self.plex_provider.mass.version != "0.0.0"
109                else "1.0.0",
110            )
111            self.gdm.start()
112
113            LOGGER.info(f"Player '{self.player_name}' is now discoverable on port {self.port}")
114
115    async def stop(self) -> None:
116        """Stop this player's remote control."""
117        if self.remote_control:
118            if self.gdm:
119                await self.gdm.stop()
120
121            if self.server:
122                await self.server.stop()
123
124            LOGGER.info(f"Stopped remote control for player '{self.player_name}'")
125
126
127class PlexRemoteControlServer:
128    """HTTP server to receive Plex remote control commands."""
129
130    def __init__(
131        self,
132        plex_provider: PlexProvider,
133        port: int = 32500,
134        client_id: str | None = None,
135        ma_player_id: str | None = None,
136        device_class: str = "speaker",
137    ) -> None:
138        """Initialize remote control server.
139
140        :param plex_provider: Plex provider instance.
141        :param port: Port for the HTTP server.
142        :param client_id: Unique client identifier.
143        :param ma_player_id: Music Assistant player ID.
144        :param device_class: Device class (speaker, phone, tablet, stb, tv, pc, cloud).
145        """
146        self.provider = plex_provider
147        self.plex_server = plex_provider._plex_server
148        self.port = port
149        self.client_id = client_id or plex_provider.instance_id
150        self.device_class = device_class
151        self.app = web.Application()
152        self.subscriptions: dict[str, dict[str, object]] = {}
153        self.runner: web.AppRunner | None = None
154        self.http_site: web.TCPSite | None = None
155
156        # Play queue tracking (Plex-specific state that doesn't exist in MA)
157        self.play_queue_id: str | None = None
158        self.play_queue_version: int = 1
159        # Map queue index to item ID
160        self.play_queue_item_ids: dict[int, int] = {}
161
162        # Track MA queue state to detect when we need to sync to Plex
163        self._last_synced_ma_queue_length: int = 0
164        self._last_synced_ma_queue_keys: list[str] = []
165
166        # Specific MA player this server controls (set by PlayerRemoteInstance)
167        self._ma_player_id = ma_player_id
168
169        # Store unsubscribe callbacks
170        self._unsub_callbacks: list[Callable[..., None]] = []
171
172        # Flag to prevent circular updates when we modify the queue ourselves
173        self._updating_from_plex = False
174
175        self.player = self.provider.mass.players.get_player(self._ma_player_id)  # type: ignore[arg-type]
176
177        self.device_name = f"{self.player.display_name}" if self.player else "Music Assistant"
178
179        self.headers = {
180            "X-Plex-Device-Name": self.device_name,
181            "X-Plex-Session-Identifier": self.client_id,
182            "X-Plex-Client-Identifier": self.client_id,
183            "X-Plex-Product": "Music Assistant",
184            "X-Plex-Platform": "Music Assistant",
185            "X-Plex-Platform-Version": platform.release(),
186        }
187
188        self._setup_routes()
189
190    def _setup_routes(self) -> None:
191        """Set up all required endpoints."""
192        # Root endpoint
193        self.app.router.add_get("/", self.handle_root)
194
195        # Subscription management
196        self.app.router.add_get("/player/timeline/subscribe", self.handle_subscribe)
197        self.app.router.add_get("/player/timeline/unsubscribe", self.handle_unsubscribe)
198        self.app.router.add_get("/player/timeline/poll", self.handle_poll)
199
200        # Playback commands
201        self.app.router.add_get("/player/playback/playMedia", self.handle_play_media)
202        self.app.router.add_get("/player/playback/refreshPlayQueue", self.handle_refresh_play_queue)
203        self.app.router.add_get("/player/playback/createPlayQueue", self.handle_create_play_queue)
204        self.app.router.add_get("/player/playback/pause", self.handle_pause)
205        self.app.router.add_get("/player/playback/play", self.handle_play)
206        self.app.router.add_get("/player/playback/stop", self.handle_stop)
207        self.app.router.add_get("/player/playback/skipNext", self.handle_skip_next)
208        self.app.router.add_get("/player/playback/skipPrevious", self.handle_skip_previous)
209        self.app.router.add_get("/player/playback/stepForward", self.handle_step_forward)
210        self.app.router.add_get("/player/playback/stepBack", self.handle_step_back)
211        self.app.router.add_get("/player/playback/seekTo", self.handle_seek_to)
212        self.app.router.add_get("/player/playback/setParameters", self.handle_set_parameters)
213        self.app.router.add_get("/player/playback/skipTo", self.handle_skip_to)
214
215        # Resources endpoint
216        self.app.router.add_get("/resources", self.handle_resources)
217
218        # CORS OPTIONS handler (for all routes)
219        self.app.router.add_route("OPTIONS", "/{tail:.*}", self.handle_options)
220
221        # --- Catch-all fallback for debugging purposes ---
222        # self.app.router.add_route("*", "/{path_info:.*}", self.handle_unknown)
223
224    async def start(self) -> None:
225        """Start HTTP server and GDM advertising."""
226        self.runner = web.AppRunner(self.app)
227        await self.runner.setup()
228
229        # Start HTTP server
230        self.http_site = web.TCPSite(self.runner, "0.0.0.0", self.port)
231        await self.http_site.start()
232        LOGGER.info(f"Plex remote control server started on HTTP port {self.port}")
233
234        # Note: GDM advertising is handled by PlexProvider in __init__.py
235        # to avoid duplicate broadcasts
236
237        # Subscribe to player and queue events for state synchronization
238        if self._ma_player_id:
239            self._unsub_callbacks.append(
240                self.provider.mass.subscribe(
241                    self._handle_player_event,
242                    EventType.PLAYER_UPDATED,
243                    id_filter=self._ma_player_id,
244                )
245            )
246            self._unsub_callbacks.append(
247                self.provider.mass.subscribe(
248                    self._handle_queue_event,
249                    EventType.QUEUE_UPDATED,
250                    id_filter=self._ma_player_id,
251                )
252            )
253            self._unsub_callbacks.append(
254                self.provider.mass.subscribe(
255                    self._handle_queue_event,
256                    EventType.QUEUE_TIME_UPDATED,
257                    id_filter=self._ma_player_id,
258                )
259            )
260            self._unsub_callbacks.append(
261                self.provider.mass.subscribe(
262                    self._handle_queue_items_updated,
263                    EventType.QUEUE_ITEMS_UPDATED,
264                    id_filter=self._ma_player_id,
265                )
266            )
267
268    async def stop(self) -> None:
269        """Stop the HTTP server."""
270        # Unsubscribe from events
271        for unsub in self._unsub_callbacks:
272            unsub()
273        self._unsub_callbacks.clear()
274
275        # Stop HTTP server
276        if self.http_site:
277            await self.http_site.stop()
278        if self.runner:
279            await self.runner.cleanup()
280        LOGGER.info("Plex remote control server stopped")
281
282    async def handle_root(self, request: web.Request) -> web.Response:
283        """Handle root endpoint - return basic player info."""
284        # Get player name
285        player_name = "Music Assistant"
286        if self._ma_player_id:
287            player = self.provider.mass.players.get_player(self._ma_player_id)
288            if player:
289                player_name = player.display_name
290
291        xml = f"""<?xml version="1.0" encoding="UTF-8"?>
292<MediaContainer machineIdentifier="{self.client_id}" version="1.0">
293    <Player title="{player_name}" machineIdentifier="{self.client_id}"/>
294</MediaContainer>"""
295        return web.Response(
296            text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"}
297        )
298
299    async def handle_subscribe(self, request: web.Request) -> web.Response:
300        """Handle timeline subscription from controller."""
301        client_id = request.headers.get("X-Plex-Client-Identifier")
302        protocol = request.query.get("protocol", "http")
303        port = request.query.get("port")
304        command_id = int(request.query.get("commandID", 0))
305
306        if not client_id or not port:
307            return web.Response(status=400)
308
309        self.subscriptions[client_id] = {
310            "url": f"{protocol}://{request.remote}:{port}",
311            "command_id": command_id,
312            "last_update": time.time(),
313        }
314
315        LOGGER.info(f"Controller {client_id} subscribed for timeline updates")
316        await self._send_timeline(client_id)
317        return web.Response(status=200)
318
319    async def handle_unsubscribe(self, request: web.Request) -> web.Response:
320        """Handle unsubscribe request."""
321        client_id = request.headers.get("X-Plex-Client-Identifier")
322        if client_id in self.subscriptions:
323            del self.subscriptions[client_id]
324            LOGGER.info(f"Controller {client_id} unsubscribed")
325        return web.Response(status=200)
326
327    async def handle_poll(self, request: web.Request) -> web.Response:
328        """Handle timeline poll request."""
329        # Extract parameters
330        include_metadata = request.query.get("includeMetadata", "0") == "1"
331        command_id = request.query.get("commandID", "0")
332
333        # Update subscription timestamp if this client is subscribed
334        client_id = request.headers.get("X-Plex-Client-Identifier")
335        if client_id and client_id in self.subscriptions:
336            self.subscriptions[client_id]["last_update"] = time.time()
337
338        # Build timeline from current MA player state
339        timeline_xml = await self._build_timeline_xml(
340            include_metadata=include_metadata, command_id=command_id
341        )
342        return web.Response(
343            text=timeline_xml,
344            content_type="text/xml",
345            headers={
346                "X-Plex-Client-Identifier": self.client_id,
347                "Access-Control-Expose-Headers": "X-Plex-Client-Identifier",
348                "Access-Control-Allow-Origin": "*",
349            },
350        )
351
352    async def _ungroup_player_if_needed(self, player_id: str) -> None:
353        """Ungroup player before playback if it's part of a group/sync."""
354        player = self.provider.mass.players.get_player(player_id)
355        if not player or player.type == PlayerType.GROUP:
356            return
357
358        if not (player.state.synced_to or player.state.group_members or player.state.active_group):
359            return
360
361        LOGGER.debug("Ungrouping player %s before starting playback from Plex", player.display_name)
362        # Use set_members directly on the group to bypass static member check
363        if (
364            player.state.active_group
365            and (group := self.provider.mass.players.get_player(player.state.active_group))
366            and group.supports_feature(PlayerFeature.SET_MEMBERS)
367        ):
368            await group.set_members(player_ids_to_remove=[player_id])
369        elif (
370            player.state.synced_to
371            and (sync_leader := self.provider.mass.players.get_player(player.state.synced_to))
372            and sync_leader.supports_feature(PlayerFeature.SET_MEMBERS)
373        ):
374            await sync_leader.set_members(player_ids_to_remove=[player_id])
375        elif player.state.group_members and player.supports_feature(PlayerFeature.SET_MEMBERS):
376            await player.set_members(player_ids_to_remove=player.group_members)
377
378    async def handle_play_media(self, request: web.Request) -> web.Response:
379        """
380        Handle playMedia command from Plex controller.
381
382        Plexamp sends various parameters:
383        - key: The item to play (track, album, playlist, etc.)
384        - containerKey: The container context (play queue)
385        - offset: Starting position in milliseconds
386        - shuffle: Whether to shuffle
387        - repeat: Repeat mode
388        """
389        # Set flag to prevent circular updates
390        self._updating_from_plex = True
391        try:
392            key = request.query.get("key")
393            container_key = request.query.get("containerKey")
394            offset = int(request.query.get("offset", 0))
395            shuffle = request.query.get("shuffle", "0") == "1"
396
397            if not key:
398                return web.Response(
399                    status=400, text="Missing required 'key' parameter for playMedia command"
400                )
401
402            LOGGER.info(
403                f"Received playMedia command - key: {key}, "
404                f"containerKey: {container_key}, offset: {offset}ms"
405            )
406
407            # Use the assigned player for this server instance
408            player_id = self._ma_player_id
409            if not player_id:
410                return web.Response(status=500, text="No player assigned to this server")
411
412            # Ungroup player if it's part of a group/sync
413            # User selected this specific player, so remove from any groups
414            await self._ungroup_player_if_needed(player_id)
415
416            if container_key and "/playQueues/" in container_key:
417                # Extract play queue ID from container key
418                queue_id_match = re.search(r"/playQueues/(\d+)", container_key)
419                if queue_id_match:
420                    self.play_queue_id = queue_id_match.group(1)
421                    self.play_queue_version = 1
422                    LOGGER.info(f"Playing from queue: {container_key} starting at {key}")
423
424                    await self._play_from_plex_queue(player_id, container_key, key, shuffle, offset)
425                else:
426                    # Reset queue tracking if no valid queue ID found
427                    self.play_queue_id = None
428                    self.play_queue_item_ids = {}
429                    # Fall back to single track
430                    media = await self._resolve_plex_item(key)
431                    await self.provider.mass.player_queues.play_media(
432                        queue_id=player_id,
433                        media=media,  # type: ignore[arg-type]
434                        option=QueueOption.REPLACE,
435                    )
436            elif container_key:
437                # Playing from a regular container (album, playlist, artist) not a play queue
438                # Reset queue tracking
439                self.play_queue_id = None
440                self.play_queue_item_ids = {}
441
442                # The key is the specific track, containerKey is the collection
443                media_to_play = await self._resolve_plex_item(container_key)
444
445                # Queue the entire container
446                await self.provider.mass.player_queues.play_media(
447                    queue_id=player_id,
448                    media=media_to_play,  # type: ignore[arg-type]
449                    option=QueueOption.REPLACE,
450                )
451
452            else:
453                # Playing a single item, reset queue tracking
454                self.play_queue_id = None
455                self.play_queue_item_ids = {}
456
457                media = await self._resolve_plex_item(key)
458
459                # Replace the queue with this media
460                await self.provider.mass.player_queues.play_media(
461                    queue_id=player_id,
462                    media=media,  # type: ignore[arg-type]
463                    option=QueueOption.REPLACE,
464                )
465
466            # Set shuffle if requested
467            if shuffle:
468                await self.provider.mass.player_queues.set_shuffle(player_id, shuffle)
469
470            # Seek to offset if specified
471            if offset > 0:
472                await self._seek_to_offset_after_playback(player_id, offset)
473
474            await self._broadcast_timeline()
475            return web.Response(status=200)
476
477        except Exception as e:
478            LOGGER.exception(f"Error handling playMedia: {e}")
479            return web.Response(status=500, text=str(e))
480        finally:
481            # Clear flag after processing
482            self._updating_from_plex = False
483
484    def _reorder_tracks_for_playback(
485        self, tracks: list[Any], start_index: int
486    ) -> tuple[list[Any], dict[int, int]]:
487        """Reorder tracks to start from a specific index and update item ID mappings.
488
489        :param tracks: List of tracks to reorder.
490        :param start_index: Index of the track to start from.
491        :return: Tuple of (reordered tracks, updated item ID mappings).
492        """
493        if start_index <= 0 or start_index >= len(tracks):
494            # No reordering needed
495            return tracks, self.play_queue_item_ids
496
497        # Reorder: [selected track, tracks after it, tracks before it]
498        reordered_tracks = (
499            tracks[start_index:]  # From selected to end
500            + tracks[:start_index]  # From start to selected
501        )
502
503        # Update play queue item ID mappings to reflect new order
504        new_item_ids = {}
505        for new_idx, old_idx in enumerate(
506            list(range(start_index, len(tracks))) + list(range(start_index))
507        ):
508            if old_idx in self.play_queue_item_ids:
509                new_item_ids[new_idx] = self.play_queue_item_ids[old_idx]
510
511        LOGGER.info(f"Started playback from offset {start_index} (reordered queue)")
512        return reordered_tracks, new_item_ids
513
514    async def _seek_to_offset_after_playback(self, player_id: str, offset: int) -> None:
515        """Seek to the specified offset after playback starts.
516
517        :param player_id: The player ID to seek on.
518        :param offset: The offset in milliseconds.
519        """
520        # Wait for the queue to have items loaded before seeking
521        for _ in range(10):  # Try up to 10 times (5 seconds total)
522            await asyncio.sleep(0.5)
523            queue = self.provider.mass.player_queues.get(player_id)
524            if queue and queue.current_item:
525                try:
526                    await self.provider.mass.players.cmd_seek(player_id, offset // 1000)
527                    # Wait briefly for player state to update
528                    await asyncio.sleep(0.1)
529                    break
530                except Exception as e:
531                    LOGGER.debug(f"Could not seek to offset {offset}ms: {e}")
532                    break
533        else:
534            LOGGER.warning("Queue not ready for seeking after timeout")
535
536    async def _play_from_plex_queue(
537        self,
538        player_id: str,
539        container_key: str,
540        starting_key: str | None,
541        shuffle: bool,
542        offset: int,
543    ) -> None:
544        """Fetch play queue from Plex and load tracks.
545
546        Starts playback immediately with the first track,
547        then loads remaining tracks in the background.
548        """
549        try:
550            LOGGER.info(f"Fetching play queue: {container_key}")
551
552            # Extract queue ID from container_key (e.g., "/playQueues/123" -> "123")
553            queue_id_match = re.search(r"/playQueues/(\d+)", container_key)
554            if not queue_id_match:
555                raise ValueError(f"Invalid container_key format: {container_key}")
556
557            queue_id = queue_id_match.group(1)
558
559            # Use plexapi to fetch the play queue
560            def fetch_queue() -> PlayQueue:
561                return PlayQueue.get(self.provider._plex_server, playQueueID=queue_id)
562
563            playqueue = await asyncio.to_thread(fetch_queue)
564
565            if playqueue and playqueue.items:
566                # Get selected item offset from PlayQueue - this tells us which track to start from
567                selected_offset = getattr(playqueue, "playQueueSelectedItemOffset", 0)
568                LOGGER.info(f"PlayQueue selected item offset: {selected_offset}")
569
570                # Track play queue item IDs
571                self.play_queue_item_ids = {}
572
573                # Fetch the first track to start playback immediately
574                first_item = (
575                    playqueue.items[selected_offset]
576                    if selected_offset < len(playqueue.items)
577                    else playqueue.items[0]
578                )
579                first_track_key = first_item.key if hasattr(first_item, "key") else None
580                first_play_queue_item_id = (
581                    first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None
582                )
583
584                if not first_track_key:
585                    LOGGER.error("No valid first track in play queue")
586                    if starting_key:
587                        track = await self.provider.get_track(starting_key)
588                        await self.provider.mass.player_queues.play_media(
589                            queue_id=player_id,
590                            media=track,
591                            option=QueueOption.REPLACE,
592                        )
593                    return
594
595                # Fetch and start playing the first track immediately
596                try:
597                    first_track = await self.provider.get_track(first_track_key)
598                    LOGGER.info(f"Starting playback with first track: {first_track.name}")
599
600                    # Store first track's play queue item ID mapping
601                    if first_play_queue_item_id:
602                        self.play_queue_item_ids[0] = first_play_queue_item_id
603
604                    # Start playback immediately with just the first track
605                    await self.provider.mass.player_queues.play_media(
606                        queue_id=player_id,
607                        media=first_track,
608                        option=QueueOption.REPLACE,
609                    )
610
611                    # Seek to offset if specified (do this before loading remaining tracks)
612                    if offset > 0:
613                        await self._seek_to_offset_after_playback(player_id, offset)
614
615                    # Broadcast timeline update immediately
616                    await self._broadcast_timeline()
617
618                    # Now load the remaining tracks in the background
619                    self.provider.mass.create_task(
620                        self._load_remaining_queue_tracks(
621                            player_id, playqueue, selected_offset, shuffle
622                        )
623                    )
624
625                except Exception as e:
626                    LOGGER.exception(f"Error starting playback with first track: {e}")
627                    # Fall back to single track
628                    if starting_key:
629                        track = await self.provider.get_track(starting_key)
630                        await self.provider.mass.player_queues.play_media(
631                            queue_id=player_id,
632                            media=track,
633                            option=QueueOption.REPLACE,
634                        )
635            else:
636                LOGGER.error("Play queue is empty or could not be fetched")
637                # Fall back to single track
638                if starting_key:
639                    track = await self.provider.get_track(starting_key)
640                    await self.provider.mass.player_queues.play_media(
641                        queue_id=player_id,
642                        media=track,
643                        option=QueueOption.REPLACE,
644                    )
645
646        except Exception as e:
647            LOGGER.exception(f"Error playing from queue: {e}")
648            # Fall back to single track
649            if starting_key:
650                track = await self.provider.get_track(starting_key)
651                await self.provider.mass.player_queues.play_media(
652                    queue_id=player_id,
653                    media=track,
654                    option=QueueOption.REPLACE,
655                )
656
657    async def _load_remaining_queue_tracks(
658        self,
659        player_id: str,
660        playqueue: PlayQueue,
661        selected_offset: int,
662        shuffle: bool,
663    ) -> None:
664        """Load remaining tracks from play queue in the background.
665
666        :param player_id: The Music Assistant player ID.
667        :param playqueue: The Plex play queue.
668        :param selected_offset: The offset of the track that's already playing.
669        :param shuffle: Whether shuffle is enabled.
670        """
671        try:
672            # Prepare to fetch all tracks except the first one
673            remaining_items = []
674
675            # Get items after selected track
676            for i in range(selected_offset + 1, len(playqueue.items)):
677                remaining_items.append((i, playqueue.items[i]))
678
679            # Get items before selected track (these will be added at the end)
680            for i in range(selected_offset):
681                remaining_items.append((i, playqueue.items[i]))
682
683            if not remaining_items:
684                LOGGER.debug("No remaining tracks to load")
685                return
686
687            # Fetch all remaining tracks concurrently
688            async def fetch_track(
689                plex_idx: int, item: Any
690            ) -> tuple[int, object | None, int | None]:
691                """Fetch a single track from Plex."""
692                track_key = item.key if hasattr(item, "key") else None
693                play_queue_item_id = (
694                    item.playQueueItemID if hasattr(item, "playQueueItemID") else None
695                )
696
697                if track_key:
698                    try:
699                        track = await self.provider.get_track(track_key)
700                        return plex_idx, track, play_queue_item_id
701                    except Exception as e:
702                        LOGGER.debug(f"Could not fetch track {track_key}: {e}")
703
704                return plex_idx, None, None
705
706            # Fetch all tracks in parallel
707            fetch_tasks = [fetch_track(idx, item) for idx, item in remaining_items]
708            results = await asyncio.gather(*fetch_tasks, return_exceptions=True)
709
710            # Process results and build track list
711            tracks_to_add: list[object] = []
712            for result in results:
713                if isinstance(result, Exception):
714                    LOGGER.debug(f"Error fetching track: {result}")
715                    continue
716
717                # result is guaranteed to be a tuple here after the Exception check
718                _plex_idx, track, play_queue_item_id = result  # type: ignore[misc]
719                if track:
720                    ma_idx = len(tracks_to_add) + 1  # +1 because first track is already queued
721                    tracks_to_add.append(track)
722
723                    # Store play queue item ID mapping
724                    if play_queue_item_id:
725                        self.play_queue_item_ids[ma_idx] = play_queue_item_id
726
727            if tracks_to_add:
728                LOGGER.info(f"Adding {len(tracks_to_add)} remaining tracks to queue")
729
730                # Add remaining tracks to the queue
731                await self.provider.mass.player_queues.play_media(
732                    queue_id=player_id,
733                    media=tracks_to_add,  # type: ignore[arg-type]
734                    option=QueueOption.ADD,
735                )
736
737                # Update tracked state to prevent sync loop
738                queue_items = self.provider.mass.player_queues.items(player_id)
739                synced_keys = []
740                for item in queue_items:
741                    if item.media_item:
742                        for mapping in item.media_item.provider_mappings:
743                            if mapping.provider_instance == self.provider.instance_id:
744                                synced_keys.append(mapping.item_id)
745                                break
746                self._last_synced_ma_queue_length = len(synced_keys)
747                self._last_synced_ma_queue_keys = synced_keys
748
749                # Apply shuffle if requested (after all tracks are loaded)
750                if shuffle:
751                    await self.provider.mass.player_queues.set_shuffle(player_id, shuffle)
752
753                LOGGER.info(
754                    f"Successfully loaded {len(tracks_to_add)} remaining tracks "
755                    f"(total queue: {len(synced_keys)} tracks)"
756                )
757            else:
758                LOGGER.warning("No valid remaining tracks found in play queue")
759
760        except Exception as e:
761            LOGGER.exception(f"Error loading remaining queue tracks: {e}")
762
763    async def _replace_entire_queue(self, player_id: str, playqueue: PlayQueue) -> None:
764        """Replace the entire queue when nothing is currently playing.
765
766        :param player_id: The Music Assistant player ID.
767        :param playqueue: The Plex play queue to load.
768        """
769        all_tracks = []
770        self.play_queue_item_ids = {}
771
772        for i, item in enumerate(playqueue.items):
773            track_key = item.key if hasattr(item, "key") else None
774            play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None
775
776            if track_key:
777                try:
778                    track = await self.provider.get_track(track_key)
779                    all_tracks.append(track)
780
781                    if play_queue_item_id:
782                        self.play_queue_item_ids[len(all_tracks) - 1] = play_queue_item_id
783                except Exception as e:
784                    LOGGER.debug(f"Could not fetch track {track_key}: {e}")
785                    continue
786
787        if all_tracks:
788            await self.provider.mass.player_queues.play_media(
789                queue_id=player_id,
790                media=all_tracks,  # type: ignore[arg-type]
791                option=QueueOption.REPLACE,
792            )
793            LOGGER.info(f"Replaced queue with {len(all_tracks)} tracks")
794
795    async def _replace_remaining_queue(
796        self, player_id: str, playqueue: PlayQueue, current_index: int
797    ) -> None:
798        """Replace only items after the current track.
799
800        :param player_id: The Music Assistant player ID.
801        :param playqueue: The Plex play queue to load.
802        :param current_index: The current track index in the MA queue.
803        """
804        # Fetch tracks that come AFTER the current track in the Plex queue
805        remaining_tracks = []
806        new_item_mappings = {}
807
808        # Start from the track after current_index
809        for i in range(current_index + 1, len(playqueue.items)):
810            item = playqueue.items[i]
811            track_key = item.key if hasattr(item, "key") else None
812            play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None
813
814            if track_key:
815                try:
816                    track = await self.provider.get_track(track_key)
817                    remaining_tracks.append(track)
818
819                    # Map relative to the current position
820                    if play_queue_item_id:
821                        new_item_mappings[current_index + 1 + len(remaining_tracks) - 1] = (
822                            play_queue_item_id
823                        )
824                except Exception as e:
825                    LOGGER.debug(f"Could not fetch track {track_key}: {e}")
826                    continue
827
828        # Replace items after current track
829        if remaining_tracks:
830            await self.provider.mass.player_queues.play_media(
831                queue_id=player_id,
832                media=remaining_tracks,  # type: ignore[arg-type]
833                option=QueueOption.REPLACE_NEXT,  # Replace everything after current
834            )
835            # Update mappings for the new items
836            self.play_queue_item_ids.update(new_item_mappings)
837
838            LOGGER.info(
839                f"Replaced {len(remaining_tracks)} tracks after current track "
840                f"(index {current_index})"
841            )
842        else:
843            # No tracks after current - clear remaining queue
844            LOGGER.debug("No tracks after current track in Plex queue")
845
846        # Rebuild complete item ID mappings from Plex queue
847        # Keep mappings for tracks from index 0 to current_index unchanged
848        for i, item in enumerate(playqueue.items):
849            play_queue_item_id = item.playQueueItemID if hasattr(item, "playQueueItemID") else None
850            if play_queue_item_id:
851                self.play_queue_item_ids[i] = play_queue_item_id
852
853    async def handle_refresh_play_queue(self, request: web.Request) -> web.Response:
854        """
855        Handle refreshPlayQueue command from Plex controller.
856
857        This is called when the play queue is modified (items added, removed, reordered).
858        We need to sync the entire updated queue state to MA while preserving playback.
859        """
860        try:
861            play_queue_id = request.query.get("playQueueID")
862
863            if not play_queue_id:
864                return web.Response(status=400, text="Missing 'playQueueID' parameter")
865
866            # Log all query parameters to understand what Plex sends
867            LOGGER.info(
868                f"Received refreshPlayQueue command - playQueueID: {play_queue_id}, "
869                f"params: {dict(request.query)}"
870            )
871
872            # Verify this is our active play queue
873            if self.play_queue_id != play_queue_id:
874                LOGGER.warning(
875                    f"Refresh requested for queue {play_queue_id} but active queue is "
876                    f"{self.play_queue_id}"
877                )
878                return web.Response(
879                    status=409,
880                    text=(
881                        f"Requested playQueueID {play_queue_id} does not match "
882                        f"active queue {self.play_queue_id}"
883                    ),
884                )
885
886            # Update the play queue version (increments on each refresh)
887            self.play_queue_version += 1
888
889            # Use plexapi to fetch the updated play queue
890            def fetch_queue() -> PlayQueue:
891                return PlayQueue.get(self.provider._plex_server, playQueueID=play_queue_id)
892
893            playqueue = await asyncio.to_thread(fetch_queue)
894
895            if not playqueue or not playqueue.items:
896                LOGGER.error("Failed to refresh play queue - queue is empty or not found")
897                return web.Response(status=404, text="Play queue not found")
898
899            # Get current MA queue state
900            player_id = self._ma_player_id
901            if not player_id:
902                LOGGER.error("No player assigned to this server")
903                return web.Response(status=500, text="No player assigned")
904
905            # disable shuffle to avoid infinite loop
906            await self.provider.mass.player_queues.set_shuffle(player_id, False)
907            ma_queue = self.provider.mass.player_queues.get(player_id)
908            if not ma_queue:
909                LOGGER.error(f"MA queue not found for player {player_id}")
910                return web.Response(status=500, text="MA queue not found")
911
912            # Get current playback state
913            current_index = ma_queue.current_index
914
915            # Get MA queue item count
916            ma_queue_items = self.provider.mass.player_queues.items(player_id)
917            ma_queue_count = len(ma_queue_items) if ma_queue_items else 0
918
919            LOGGER.debug(
920                f"Queue refresh: Current index={current_index}, "
921                f"MA has {ma_queue_count} items, Plex has {len(playqueue.items)} items"
922            )
923
924            # If nothing is playing, replace the entire queue
925            if current_index is None:
926                LOGGER.debug("No track currently playing, replacing entire queue")
927                await self._replace_entire_queue(player_id, playqueue)
928            else:
929                # Something is playing - update only the remaining queue items
930                LOGGER.debug(
931                    f"Track at index {current_index} is playing, "
932                    f"replacing only items after current track"
933                )
934                await self._replace_remaining_queue(player_id, playqueue, current_index)
935
936            LOGGER.info(
937                f"Refreshed play queue {play_queue_id} - now has {len(playqueue.items)} items"
938            )
939
940            # Update tracked state to prevent sync loop
941            # Get what's actually in MA queue after the refresh
942            queue_items_after = self.provider.mass.player_queues.items(player_id)
943            synced_keys = []
944            for item in queue_items_after:
945                if item.media_item:
946                    for mapping in item.media_item.provider_mappings:
947                        if mapping.provider_instance == self.provider.instance_id:
948                            synced_keys.append(mapping.item_id)
949                            break
950            self._last_synced_ma_queue_length = len(synced_keys)
951            self._last_synced_ma_queue_keys = synced_keys
952
953            return web.Response(status=200)
954
955        except Exception as e:
956            LOGGER.exception(f"Error handling refreshPlayQueue: {e}")
957            return web.Response(status=500, text=str(e))
958
959    async def handle_create_play_queue(self, request: web.Request) -> web.Response:
960        """
961        Handle createPlayQueue command from Plex controller.
962
963        Creates a new play queue from a URI (album, playlist, artist tracks, etc.)
964        and optionally applies shuffle.
965        """
966        try:
967            uri = request.query.get("uri")
968            shuffle = request.query.get("shuffle", "0") == "1"
969            continuous = request.query.get("continuous", "0") == "1"
970
971            if not uri:
972                return web.Response(status=400, text="Missing 'uri' parameter")
973
974            LOGGER.info(f"Received createPlayQueue command - uri: {uri}, shuffle: {shuffle}")
975
976            # Use the assigned player for this server instance
977            player_id = self._ma_player_id
978            if not player_id:
979                return web.Response(status=500, text="No player assigned to this server")
980
981            # Use plexapi to create play queue
982            def create_queue() -> PlayQueue:
983                # Fetch the item from URI first
984                item = self.provider._plex_server.fetchItem(uri)
985                # Create play queue from the item
986                return PlayQueue.create(
987                    self.provider._plex_server,
988                    item,
989                    shuffle=1 if shuffle else 0,
990                    continuous=1 if continuous else 0,
991                )
992
993            playqueue = await asyncio.to_thread(create_queue)
994
995            if playqueue and playqueue.items:
996                # Extract play queue ID from response
997                self.play_queue_id = str(playqueue.playQueueID)
998                self.play_queue_version = 1
999
1000                LOGGER.info(
1001                    f"Created play queue {self.play_queue_id} with {len(playqueue.items)} items"
1002                )
1003
1004                # Fetch the first track to start playback immediately
1005                self.play_queue_item_ids = {}
1006                first_item = playqueue.items[0]
1007                first_track_key = first_item.key if hasattr(first_item, "key") else None
1008                first_play_queue_item_id = (
1009                    first_item.playQueueItemID if hasattr(first_item, "playQueueItemID") else None
1010                )
1011
1012                if not first_track_key:
1013                    LOGGER.error("No valid first track in created play queue")
1014                    return web.Response(status=500, text="Failed to load tracks from play queue")
1015
1016                try:
1017                    # Fetch and start playing the first track immediately
1018                    first_track = await self.provider.get_track(first_track_key)
1019                    LOGGER.info(f"Starting playback with first track: {first_track.name}")
1020
1021                    # Store first track's play queue item ID mapping
1022                    if first_play_queue_item_id:
1023                        self.play_queue_item_ids[0] = first_play_queue_item_id
1024
1025                    # Start playback immediately with just the first track
1026                    await self.provider.mass.player_queues.play_media(
1027                        queue_id=player_id,
1028                        media=first_track,
1029                        option=QueueOption.REPLACE,
1030                    )
1031
1032                    # Now load the remaining tracks in the background
1033                    if len(playqueue.items) > 1:
1034                        self.provider.mass.create_task(
1035                            self._load_remaining_queue_tracks(
1036                                player_id,
1037                                playqueue,
1038                                0,  # Selected offset is 0 since we started from the first track
1039                                shuffle,
1040                            )
1041                        )
1042
1043                    # Broadcast timeline update
1044                    await self._broadcast_timeline()
1045                    return web.Response(status=200)
1046
1047                except Exception as e:
1048                    LOGGER.exception(f"Error starting playback with first track: {e}")
1049                    return web.Response(status=500, text=f"Failed to start playback: {e}")
1050            else:
1051                LOGGER.error("Failed to create play queue or queue is empty")
1052                return web.Response(status=500, text="Failed to create play queue")
1053
1054        except Exception as e:
1055            LOGGER.exception(f"Error handling createPlayQueue: {e}")
1056            return web.Response(status=500, text=str(e))
1057
1058    async def _resolve_plex_item(self, key: str) -> object:
1059        """Resolve a Plex key to a Music Assistant media item."""
1060        # Determine item type from the key format
1061        if "/library/metadata/" in key:
1062            # Could be track, album, or artist
1063            # Try to fetch as track first
1064            try:
1065                return await self.provider.get_track(key)
1066            except Exception as exc:
1067                LOGGER.debug(f"Failed to resolve Plex item as track for key '{key}': {exc}")
1068
1069            # Try as album
1070            try:
1071                return await self.provider.get_album(key)
1072            except Exception as exc:
1073                LOGGER.debug(f"Failed to resolve Plex item as album for key '{key}': {exc}")
1074
1075            # Try as artist
1076            try:
1077                return await self.provider.get_artist(key)
1078            except Exception:
1079                raise ValueError(f"Could not resolve Plex item: {key}") from None
1080
1081        elif "/playlists/" in key:
1082            return await self.provider.get_playlist(key)
1083        else:
1084            raise ValueError(f"Unknown Plex key format: {key}")
1085
1086    async def handle_pause(self, request: web.Request) -> web.Response:
1087        """Handle pause command (test-client.py line 98-101)."""
1088        self._updating_from_plex = True
1089        try:
1090            if self._ma_player_id:
1091                await self.provider.mass.players.cmd_pause(self._ma_player_id)
1092            await self._broadcast_timeline()
1093            return web.Response(status=200)
1094        finally:
1095            self._updating_from_plex = False
1096
1097    async def handle_play(self, request: web.Request) -> web.Response:
1098        """Handle play/resume command (test-client.py line 103-106)."""
1099        self._updating_from_plex = True
1100        try:
1101            if self._ma_player_id:
1102                # Ungroup player before resuming playback
1103                await self._ungroup_player_if_needed(self._ma_player_id)
1104                await self.provider.mass.players.cmd_play(self._ma_player_id)
1105            await self._broadcast_timeline()
1106            return web.Response(status=200)
1107        finally:
1108            self._updating_from_plex = False
1109
1110    async def handle_stop(self, request: web.Request) -> web.Response:
1111        """Handle stop command - stops playback and clears the queue."""
1112        self._updating_from_plex = True
1113        try:
1114            if self._ma_player_id:
1115                # Clear the queue (which also stops playback)
1116                self.provider.mass.player_queues.clear(self._ma_player_id)
1117
1118                # Reset play queue tracking since the queue is now cleared
1119                self.play_queue_id = None
1120                self.play_queue_item_ids = {}
1121
1122            await self._broadcast_timeline()
1123            return web.Response(status=200)
1124        finally:
1125            self._updating_from_plex = False
1126
1127    async def handle_skip_next(self, request: web.Request) -> web.Response:
1128        """Handle skip next command."""
1129        self._updating_from_plex = True
1130        try:
1131            if self._ma_player_id:
1132                await self.provider.mass.player_queues.next(self._ma_player_id)
1133            await self._broadcast_timeline()
1134            return web.Response(status=200)
1135        finally:
1136            self._updating_from_plex = False
1137
1138    async def handle_skip_previous(self, request: web.Request) -> web.Response:
1139        """Handle skip previous command."""
1140        self._updating_from_plex = True
1141        try:
1142            if self._ma_player_id:
1143                await self.provider.mass.player_queues.previous(self._ma_player_id)
1144            await self._broadcast_timeline()
1145            return web.Response(status=200)
1146        finally:
1147            self._updating_from_plex = False
1148
1149    async def handle_step_forward(self, request: web.Request) -> web.Response:
1150        """Handle step forward command (small skip forward)."""
1151        self._updating_from_plex = True
1152        try:
1153            if self._ma_player_id:
1154                queue = self.provider.mass.player_queues.get(self._ma_player_id)
1155                if queue:
1156                    # Step forward 30 seconds
1157                    new_position = queue.corrected_elapsed_time + 30
1158                    if queue.current_item and queue.current_item.media_item:
1159                        # Don't seek past the track duration
1160                        max_duration = queue.current_item.media_item.duration or new_position
1161                        new_position = min(new_position, max_duration)
1162                    await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position))
1163                    # Wait briefly for player state to update
1164                    await asyncio.sleep(0.1)
1165            await self._broadcast_timeline()
1166            return web.Response(status=200)
1167        finally:
1168            self._updating_from_plex = False
1169
1170    async def handle_step_back(self, request: web.Request) -> web.Response:
1171        """Handle step back command (small skip backward)."""
1172        self._updating_from_plex = True
1173        try:
1174            if self._ma_player_id:
1175                queue = self.provider.mass.player_queues.get(self._ma_player_id)
1176                if queue:
1177                    # Step back 10 seconds
1178                    new_position = max(0, queue.corrected_elapsed_time - 10)
1179                    await self.provider.mass.players.cmd_seek(self._ma_player_id, int(new_position))
1180                    # Wait briefly for player state to update
1181                    await asyncio.sleep(0.1)
1182            await self._broadcast_timeline()
1183            return web.Response(status=200)
1184        finally:
1185            self._updating_from_plex = False
1186
1187    async def handle_skip_to(self, request: web.Request) -> web.Response:
1188        """Handle skip to specific queue item."""
1189        key = request.query.get("key")
1190        if not self._ma_player_id or not key:
1191            return web.Response(status=400, text="Missing player ID or key")
1192
1193        self._updating_from_plex = True
1194        try:
1195            ma_index = None
1196
1197            # Check if key is a play queue item ID (numeric) or a library path
1198            if key.isdigit():
1199                # Key is a play queue item ID
1200                play_queue_item_id = int(key)
1201
1202                # Find the MA queue index for this play queue item ID
1203                for idx, pq_item_id in self.play_queue_item_ids.items():
1204                    if pq_item_id == play_queue_item_id:
1205                        ma_index = idx
1206                        break
1207
1208                if ma_index is None:
1209                    LOGGER.warning(
1210                        f"Could not find MA queue index for play queue item ID: "
1211                        f"{play_queue_item_id}"
1212                    )
1213                    return web.Response(status=404, text="Queue item not found")
1214
1215                LOGGER.info(
1216                    f"Skipping to queue index {ma_index} (play queue item ID: {play_queue_item_id})"
1217                )
1218            else:
1219                # Key is a library path (e.g., "/library/metadata/856761")
1220                # Find the track in the MA queue by matching the Plex key
1221                queue_items = self.provider.mass.player_queues.items(self._ma_player_id)
1222                if not queue_items:
1223                    return web.Response(status=404, text="Queue is empty")
1224
1225                for idx, item in enumerate(queue_items):
1226                    if not item.media_item:
1227                        continue
1228
1229                    # Find Plex mapping for this track
1230                    for mapping in item.media_item.provider_mappings:
1231                        if (
1232                            mapping.provider_instance == self.provider.instance_id
1233                            and mapping.item_id == key
1234                        ):
1235                            ma_index = idx
1236                            break
1237
1238                    if ma_index is not None:
1239                        break
1240
1241                if ma_index is None:
1242                    LOGGER.warning(f"Could not find track with key {key} in MA queue")
1243                    return web.Response(status=404, text="Track not found in queue")
1244
1245                LOGGER.info(f"Skipping to queue index {ma_index} (track key: {key})")
1246
1247            # Skip to this index in the MA queue
1248            await self.provider.mass.player_queues.play_index(self._ma_player_id, ma_index)
1249
1250            await self._broadcast_timeline()
1251            return web.Response(status=200)
1252
1253        except Exception as e:
1254            LOGGER.exception(f"Error handling skipTo: {e}")
1255            return web.Response(status=500, text=str(e))
1256        finally:
1257            self._updating_from_plex = False
1258
1259    async def handle_seek_to(self, request: web.Request) -> web.Response:
1260        """Handle seek command."""
1261        self._updating_from_plex = True
1262        try:
1263            offset_ms = int(request.query.get("offset", 0))
1264            if self._ma_player_id:
1265                await self.provider.mass.players.cmd_seek(self._ma_player_id, int(offset_ms / 1000))
1266                # Wait briefly for player state to update
1267                await asyncio.sleep(0.1)
1268            await self._broadcast_timeline()
1269            return web.Response(status=200)
1270        finally:
1271            self._updating_from_plex = False
1272
1273    async def handle_set_parameters(self, request: web.Request) -> web.Response:
1274        """Handle parameter changes (volume, shuffle, repeat)."""
1275        if not self._ma_player_id:
1276            return web.Response(status=200)
1277
1278        self._updating_from_plex = True
1279        try:
1280            if "volume" in request.query:
1281                volume = int(request.query["volume"])
1282                await self.provider.mass.players.cmd_volume_set(self._ma_player_id, volume)
1283
1284            if "shuffle" in request.query:
1285                # Plex sends shuffle as "0" or "1"
1286                shuffle = request.query["shuffle"] == "1"
1287                await self.provider.mass.player_queues.set_shuffle(self._ma_player_id, shuffle)
1288
1289            if "repeat" in request.query:
1290                # Plex repeat: 0=off, 1=repeat one, 2=repeat all
1291                repeat_value = int(request.query["repeat"])
1292
1293                # Map Plex repeat to MA repeat mode
1294                if repeat_value == 0:
1295                    # Repeat off
1296                    self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.OFF)
1297                elif repeat_value == 1:
1298                    # Repeat one track
1299                    self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ONE)
1300                elif repeat_value == 2:
1301                    # Repeat all
1302                    self.provider.mass.player_queues.set_repeat(self._ma_player_id, RepeatMode.ALL)
1303
1304            await self._broadcast_timeline()
1305            return web.Response(status=200)
1306        finally:
1307            self._updating_from_plex = False
1308
1309    async def handle_options(self, request: web.Request) -> web.Response:
1310        """Handle OPTIONS requests for CORS (like test-client.py)."""
1311        return web.Response(
1312            status=200,
1313            headers={
1314                "Access-Control-Allow-Origin": "*",
1315                "Access-Control-Allow-Methods": "GET, POST, OPTIONS",
1316                "Access-Control-Allow-Headers": "*",
1317            },
1318        )
1319
1320    async def handle_resources(self, request: web.Request) -> web.Response:
1321        """Return player information (matching test-client.py format exactly)."""
1322        # Get player name
1323        player_name = "Music Assistant"
1324        if self._ma_player_id:
1325            player = self.provider.mass.players.get_player(self._ma_player_id)
1326            if player:
1327                player_name = player.display_name
1328
1329        # Get player state
1330        state = "stopped"
1331        if self._ma_player_id:
1332            player = self.provider.mass.players.get_player(self._ma_player_id)
1333            if player and player.state:
1334                state_value = (
1335                    player.state.value if hasattr(player.state, "value") else str(player.state)
1336                )
1337                if state_value in ["playing", "paused"]:
1338                    state = state_value
1339
1340        local_ip = self.provider.mass.streams.publish_ip
1341        version = self.provider.mass.version if self.provider.mass.version != "0.0.0" else "1.0.0"
1342
1343        # Match test-client.py format exactly
1344        xml = f"""<?xml version="1.0" encoding="UTF-8"?>
1345<MediaContainer>
1346    <Player title="{player_name}"
1347            protocol="plex"
1348            protocolVersion="1"
1349            protocolCapabilities="timeline,playback,navigation,playqueues"
1350            machineIdentifier="{self.client_id}"
1351            product="Music Assistant"
1352            platform="{platform.system()}"
1353            platformVersion="{platform.release()}"
1354            deviceClass="{self.device_class}"
1355            state="{state}"
1356            address="{local_ip}"
1357            port="{self.port}"
1358            version="{version}"
1359            provides="client,player,pubsub-player">
1360        <Connection protocol="http" address="{local_ip}" port="{self.port}"
1361                    uri="http://{local_ip}:{self.port}" local="1"/>
1362    </Player>
1363</MediaContainer>"""
1364        return web.Response(
1365            text=xml, content_type="text/xml", headers={"Access-Control-Allow-Origin": "*"}
1366        )
1367
1368    def _build_timeline_attributes(
1369        self,
1370        track: Any,
1371        state: str,
1372        duration: int,
1373        time: int,
1374        volume: int,
1375        shuffle: int,
1376        repeat: int,
1377        controllable: str,
1378        queue: Any | None,
1379    ) -> list[str]:
1380        """Build timeline attributes for a playing track.
1381
1382        :param track: The current track media item.
1383        :param state: Playback state (playing, paused, etc.).
1384        :param duration: Track duration in milliseconds.
1385        :param time: Current playback time in milliseconds.
1386        :param volume: Volume level (0-100).
1387        :param shuffle: Shuffle state (0 or 1).
1388        :param repeat: Repeat mode (0=off, 1=one, 2=all).
1389        :param controllable: Controllable features string.
1390        :param queue: The MA queue object.
1391        :return: List of timeline attribute strings.
1392        """
1393        # Get Plex key and ratingKey
1394        key = None
1395        rating_key = None
1396        for mapping in track.provider_mappings:
1397            if mapping.provider_instance == self.provider.instance_id:
1398                key = mapping.item_id
1399                rating_key = key.split("/")[-1]
1400                break
1401
1402        if not key:
1403            return []
1404
1405        # Server identification
1406        plex_url = urlparse(self.provider._baseurl)
1407        machine_identifier = self.provider._plex_server.machineIdentifier
1408        address = plex_url.hostname
1409        port = plex_url.port or (443 if plex_url.scheme == "https" else 32400)
1410        protocol = plex_url.scheme
1411
1412        # Build timeline attributes
1413        attrs = [
1414            f'state="{state}"',
1415            f'duration="{duration}"',
1416            f'time="{time}"',
1417            f'ratingKey="{rating_key}"',
1418            f'key="{key}"',
1419        ]
1420
1421        # Add play queue info if available
1422        if self.play_queue_id and queue:
1423            if queue.current_index is not None:
1424                play_queue_item_id = self.play_queue_item_ids.get(
1425                    queue.current_index, queue.current_index + 1
1426                )
1427                attrs.append(f'playQueueItemID="{play_queue_item_id}"')
1428            attrs.append(f'playQueueID="{self.play_queue_id}"')
1429            attrs.append(f'playQueueVersion="{self.play_queue_version}"')
1430            attrs.append(f'containerKey="/playQueues/{self.play_queue_id}"')
1431
1432        # Add standard attributes
1433        attrs.extend(
1434            [
1435                'type="music"',
1436                f'volume="{volume}"',
1437                f'shuffle="{shuffle}"',
1438                f'repeat="{repeat}"',
1439                f'controllable="{controllable}"',
1440                f'machineIdentifier="{machine_identifier}"',
1441                f'address="{address}"',
1442                f'port="{port}"',
1443                f'protocol="{protocol}"',
1444            ]
1445        )
1446
1447        return attrs
1448
1449    async def _build_timeline_xml(
1450        self, include_metadata: bool = False, command_id: str = "0"
1451    ) -> str:
1452        """Build timeline XML from current Music Assistant player state."""
1453        player_id = self._ma_player_id
1454
1455        # Get MA player and queue
1456        player = self.provider.mass.players.get_player(player_id) if player_id else None
1457        queue = self.provider.mass.player_queues.get(player_id) if player_id else None
1458
1459        # Controllable features for music
1460        controllable = (
1461            "volume,repeat,skipPrevious,seekTo,stepBack,stepForward,stop,playPause,shuffle,skipNext"
1462        )
1463
1464        # Map MA playback state to Plex state (stopped, paused, playing, buffering, error)
1465        state = "stopped"
1466        if player and player.playback_state:
1467            state_value = (
1468                player.playback_state.value
1469                if hasattr(player.playback_state, "value")
1470                else str(player.playback_state)
1471            )
1472
1473            # Map MA states to Plex states
1474            if state_value == "playing":
1475                state = "playing"
1476            elif state_value == "paused":
1477                state = "paused"
1478            elif state_value == "buffering":
1479                state = "buffering"
1480            elif state_value == "idle":
1481                # Idle with a current track = paused, idle without track = stopped
1482                state = (
1483                    "paused"
1484                    if queue and queue.current_item and queue.current_item.media_item
1485                    else "stopped"
1486                )
1487            else:
1488                state = "stopped"
1489
1490        # Get volume (0-100) - use group_volume for groups, volume_level for others
1491        volume = 0
1492        if player:
1493            volume = (
1494                int(player.group_volume)
1495                if (player.type == PlayerType.GROUP or player.group_members)
1496                else (int(player.volume_level) if player.volume_level else 0)
1497            )
1498
1499        # Get shuffle (0/1) and repeat (0=off, 1=one, 2=all)
1500        shuffle = 0
1501        repeat = 0
1502        if queue:
1503            shuffle = 1 if queue.shuffle_enabled else 0
1504            if hasattr(queue, "repeat_mode"):
1505                repeat_mode = queue.repeat_mode
1506                if hasattr(repeat_mode, "value"):
1507                    repeat_value = repeat_mode.value
1508                    if repeat_value == "one":
1509                        repeat = 1
1510                    elif repeat_value == "all":
1511                        repeat = 2
1512
1513        # Build music timeline
1514        if (
1515            state in ["playing", "paused"]
1516            and queue
1517            and queue.current_item
1518            and queue.current_item.media_item
1519        ):
1520            track = queue.current_item.media_item
1521
1522            # Duration in milliseconds
1523            duration = round(track.duration * 1000) if track.duration else 0
1524
1525            # Current playback time in milliseconds
1526            time = round(queue.corrected_elapsed_time * 1000)
1527
1528            # Build timeline attributes
1529            attrs = self._build_timeline_attributes(
1530                track, state, duration, time, volume, shuffle, repeat, controllable, queue
1531            )
1532
1533            if attrs:
1534                music_timeline = f"<Timeline {' '.join(attrs)}/>"
1535            else:
1536                # No Plex mapping, send basic timeline with actual state
1537                music_timeline = (
1538                    f'<Timeline state="{state}" time="{time}" type="music" volume="{volume}" '
1539                    f'shuffle="{shuffle}" repeat="{repeat}" controllable="{controllable}"/>'
1540                )
1541        else:
1542            # No current track - send stopped state with time=0
1543            time = 0
1544            music_timeline = (
1545                f'<Timeline state="{state}" time="{time}" type="music" volume="{volume}" '
1546                f'shuffle="{shuffle}" repeat="{repeat}" controllable="{controllable}"/>'
1547            )
1548
1549        # Video and photo timelines (always stopped for music player)
1550        video_timeline = '<Timeline type="video" state="stopped"/>'
1551        photo_timeline = '<Timeline type="photo" state="stopped"/>'
1552
1553        # Combine all timelines
1554        return (
1555            f'<MediaContainer commandID="{command_id}">'
1556            f"{music_timeline}{video_timeline}{photo_timeline}"
1557            f"</MediaContainer>"
1558        )
1559
1560    async def _handle_player_event(self, event: MassEvent) -> None:
1561        """Handle player state change events."""
1562        if not self._ma_player_id or event.object_id != self._ma_player_id:
1563            return
1564
1565        # Skip if we're the ones making the changes
1566        if self._updating_from_plex:
1567            return
1568
1569        try:
1570            # Send timeline to Plex server (for activity tracking)
1571            await self._send_timeline_to_server()
1572
1573            # Broadcast timeline to subscribed controllers
1574            # Timeline will be built from current MA player state
1575            await self._broadcast_timeline()
1576        except Exception as e:
1577            LOGGER.debug(f"Error handling player event: {e}")
1578
1579    async def _handle_queue_event(self, event: MassEvent) -> None:
1580        """Handle queue change events."""
1581        if not self._ma_player_id or event.object_id != self._ma_player_id:
1582            return
1583
1584        # Skip if we're the ones making the changes
1585        if self._updating_from_plex:
1586            return
1587
1588        try:
1589            # Send timeline to Plex server (for activity tracking)
1590            await self._send_timeline_to_server()
1591
1592            # Broadcast timeline to subscribed controllers
1593            # Timeline will be built from current MA player state
1594            await self._broadcast_timeline()
1595        except Exception as e:
1596            LOGGER.debug(f"Error handling queue event: {e}")
1597
1598    async def _handle_queue_items_updated(self, event: MassEvent) -> None:
1599        """Handle queue items being added/removed/reordered."""
1600        if not self._ma_player_id or event.object_id != self._ma_player_id:
1601            return
1602
1603        # Skip if we're the ones making the changes
1604        if self._updating_from_plex:
1605            return
1606
1607        # Get current MA queue state
1608        queue_items = self.provider.mass.player_queues.items(self._ma_player_id)
1609        if not queue_items:
1610            return
1611
1612        current_keys = []
1613        for item in queue_items:
1614            if not item.media_item:
1615                continue
1616            # Find Plex mapping
1617            for mapping in item.media_item.provider_mappings:
1618                if mapping.provider_instance == self.provider.instance_id:
1619                    current_keys.append(mapping.item_id)
1620                    break
1621
1622        # Check if queue actually changed from what we last synced FROM Plex
1623        if (
1624            len(current_keys) == self._last_synced_ma_queue_length
1625            and current_keys == self._last_synced_ma_queue_keys
1626        ):
1627            # Queue hasn't changed from last sync, skip
1628            LOGGER.debug("MA queue matches last synced state, skipping Plex sync")
1629            return
1630
1631        LOGGER.info(
1632            f"MA queue changed: {self._last_synced_ma_queue_length} -> {len(current_keys)} items"
1633        )
1634
1635        # (Re)create Plex PlayQueue from MA queue
1636        try:
1637            await self._create_plex_playqueue_from_ma()
1638            # Update tracked state
1639            self._last_synced_ma_queue_length = len(current_keys)
1640            self._last_synced_ma_queue_keys = current_keys
1641        except Exception as e:
1642            LOGGER.debug(f"Error creating Plex PlayQueue: {e}")
1643
1644        # Broadcast timeline update
1645        try:
1646            await self._broadcast_timeline()
1647        except Exception as e:
1648            LOGGER.debug(f"Error broadcasting timeline: {e}")
1649
1650    async def _create_plex_playqueue_from_ma(self) -> None:
1651        """Create a new Plex PlayQueue from current MA queue."""
1652        ma_queue = self.provider.mass.player_queues.get(self._ma_player_id)  # type: ignore[arg-type]
1653        queue_items = self.provider.mass.player_queues.items(self._ma_player_id)  # type: ignore[arg-type]
1654
1655        if not ma_queue or not queue_items:
1656            return
1657
1658        # Fetch Plex items for all tracks in MA queue
1659        async def fetch_plex_item(plex_key: str) -> object | None:
1660            """Fetch a single Plex item."""
1661            try:
1662
1663                def fetch_item() -> object:
1664                    return self.plex_server.fetchItem(plex_key)
1665
1666                return await asyncio.to_thread(fetch_item)
1667            except Exception as e:
1668                LOGGER.debug(f"Failed to fetch Plex item {plex_key}: {e}")
1669                return None
1670
1671        # Collect all fetch tasks
1672        fetch_tasks = []
1673        for item in queue_items:
1674            if not item.media_item:
1675                continue
1676
1677            # Find Plex mapping
1678            plex_key = None
1679            for mapping in item.media_item.provider_mappings:
1680                if mapping.provider_instance == self.provider.instance_id:
1681                    plex_key = mapping.item_id
1682                    break
1683
1684            if plex_key:
1685                fetch_tasks.append(fetch_plex_item(plex_key))
1686
1687        # Fetch all items concurrently
1688        plex_items = []
1689        if fetch_tasks:
1690            fetched_items = await asyncio.gather(*fetch_tasks, return_exceptions=True)
1691            plex_items = [item for item in fetched_items if item is not None]
1692
1693        if not plex_items:
1694            LOGGER.debug("No Plex tracks in MA queue, skipping PlayQueue creation")
1695            return
1696
1697        # Determine which track should be selected (currently playing)
1698        start_item = None
1699        if ma_queue.current_index is not None and ma_queue.current_index < len(plex_items):
1700            start_item = plex_items[ma_queue.current_index]
1701
1702        # Create Plex PlayQueue - don't pass shuffle since MA queue is already in desired order
1703        def create_queue() -> PlayQueue:
1704            return PlayQueue.create(
1705                self.plex_server,
1706                items=plex_items,
1707                startItem=start_item,
1708                shuffle=0,  # Don't shuffle, plex_items is already in MA queue order
1709                continuous=1,
1710            )
1711
1712        try:
1713            playqueue = await asyncio.to_thread(create_queue)
1714
1715            if playqueue:
1716                self.play_queue_id = str(playqueue.playQueueID)
1717                self.play_queue_version = playqueue.playQueueVersion
1718
1719                # Build item ID mappings
1720                self.play_queue_item_ids = {}
1721                for i, item in enumerate(playqueue.items):
1722                    if hasattr(item, "playQueueItemID"):
1723                        self.play_queue_item_ids[i] = item.playQueueItemID
1724
1725                LOGGER.info(
1726                    f"Created Plex PlayQueue {self.play_queue_id} with {len(plex_items)} tracks"
1727                )
1728        except Exception as e:
1729            LOGGER.exception(f"Error creating Plex PlayQueue: {e}")
1730
1731    async def _send_timeline(self, client_id: str) -> None:
1732        """Send timeline update to specific controller."""
1733        subscription = self.subscriptions.get(client_id)
1734        if not subscription:
1735            return
1736
1737        timeline_xml = await self._build_timeline_xml()
1738
1739        try:
1740            await self.provider.mass.http_session.post(
1741                f"{subscription['url']}/:/timeline",
1742                data=timeline_xml,
1743                headers={
1744                    "X-Plex-Client-Identifier": self.client_id,
1745                    "Content-Type": "text/xml",
1746                },
1747                timeout=ClientTimeout(total=5),
1748            )
1749            # Update last_update timestamp on successful send
1750            subscription["last_update"] = time.time()
1751        except Exception as e:
1752            LOGGER.debug(f"Failed to send timeline to {client_id}: {e}")
1753
1754    async def _send_timeline_to_server(self) -> None:
1755        """Send timeline update to Plex server for activity tracking."""
1756        if not self._ma_player_id:
1757            return
1758
1759        try:
1760            player = self.provider.mass.players.get_player(self._ma_player_id)
1761            queue = self.provider.mass.player_queues.get(self._ma_player_id)
1762
1763            if (
1764                not player
1765                or not queue
1766                or not queue.current_item
1767                or not queue.current_item.media_item
1768            ):
1769                return
1770
1771            track = queue.current_item.media_item
1772
1773            # Find Plex mapping
1774            plex_key = None
1775            for mapping in track.provider_mappings:
1776                if mapping.provider_instance == self.provider.instance_id:
1777                    plex_key = mapping.item_id
1778                    break
1779
1780            if not plex_key:
1781                return
1782
1783            # Extract rating key from plex_key (e.g., "/library/metadata/12345" -> "12345")
1784            rating_key = plex_key.split("/")[-1]
1785
1786            # Get playback state
1787            state_value = (
1788                player.playback_state.value
1789                if hasattr(player.playback_state, "value")
1790                else str(player.playback_state)
1791            )
1792
1793            # Map to Plex state
1794            if state_value == "playing":
1795                plex_state = "playing"
1796            elif state_value == "paused":
1797                plex_state = "paused"
1798            else:
1799                plex_state = "stopped"
1800
1801            # Get position and duration in milliseconds
1802            position_ms = round(queue.corrected_elapsed_time * 1000)
1803            duration_ms = round(track.duration * 1000) if track.duration else 0
1804
1805            # Get play queue info if available
1806            container_key = ""
1807            play_queue_item_id = ""
1808            if self.play_queue_id:
1809                container_key = f"/playQueues/{self.play_queue_id}"
1810                if queue.current_index is not None:
1811                    play_queue_item_id = str(
1812                        self.play_queue_item_ids.get(queue.current_index, queue.current_index + 1)
1813                    )
1814
1815            # Build timeline params (only Plex timeline data)
1816            params = {
1817                "ratingKey": rating_key,
1818                "key": plex_key,
1819                "state": plex_state,
1820                "time": str(position_ms),
1821                "duration": str(duration_ms),
1822            }
1823
1824            # Add play queue info if available
1825            if container_key:
1826                params["containerKey"] = container_key
1827            if play_queue_item_id:
1828                params["playQueueItemID"] = play_queue_item_id
1829
1830            def send_timeline() -> None:
1831                # Pass session headers to identify this specific player instance
1832                self.plex_server.query("/:/timeline", params=params, headers=self.headers)
1833
1834            await asyncio.to_thread(send_timeline)
1835
1836        except Exception as e:
1837            LOGGER.debug(f"Failed to send timeline to Plex server: {e}")
1838
1839    async def _broadcast_timeline(self) -> None:
1840        """Send timeline to all subscribed controllers."""
1841        current_time = time.time()
1842        stale_clients = []
1843        for client_id, sub in self.subscriptions.items():
1844            try:
1845                last_update = float(sub["last_update"])  # type: ignore[arg-type]
1846                if current_time - last_update > 90:
1847                    stale_clients.append(client_id)
1848            except (ValueError, TypeError):
1849                # If conversion fails, treat client as stale
1850                LOGGER.debug(f"Invalid last_update for client {client_id}, treating as stale")
1851                stale_clients.append(client_id)
1852
1853        for client_id in stale_clients:
1854            del self.subscriptions[client_id]
1855
1856        await asyncio.gather(
1857            *(self._send_timeline(client_id) for client_id in list(self.subscriptions.keys())),
1858            return_exceptions=True,  # Don't fail all if one fails
1859        )
1860
1861    # for debugging purposes only
1862    # async def handle_unknown(self, request: web.Request) -> web.Response:
1863    #     """Catch-all handler for unexpected or unsupported paths."""
1864    #     LOGGER.debug(
1865    #         "Unhandled request: %s %s from %s",
1866    #         request.method,
1867    #         request.path,
1868    #         request.remote,
1869    #     )
1870    #
1871    #     # You can log query/body if needed (be careful not to leak tokens)
1872    #     if request.query:
1873    #         LOGGER.debug("Query params for %s: %s", request.path, dict(request.query))
1874    #     try:
1875    #         data = await request.text()
1876    #         if data:
1877    #             LOGGER.debug("Body for %s: %s", request.path, data)
1878    #     except Exception as e:
1879    #         LOGGER.debug("Could not read request body: %s", e)
1880    #
1881    #     return web.Response(status=404, text=f"Unhandled path: {request.path}")
1882