/
/
/
1"""Base protocol class for AirPlay streaming implementations."""
2
3from __future__ import annotations
4
5import asyncio
6import time
7from abc import ABC, abstractmethod
8from typing import TYPE_CHECKING
9
10from music_assistant_models.enums import PlaybackState
11
12from music_assistant.helpers.named_pipe import AsyncNamedPipeWriter
13from music_assistant.providers.airplay.constants import AIRPLAY_PCM_FORMAT
14from music_assistant.providers.airplay.helpers import generate_active_remote_id
15
16if TYPE_CHECKING:
17 from music_assistant_models.player import PlayerMedia
18
19 from music_assistant.helpers.process import AsyncProcess
20 from music_assistant.providers.airplay.player import AirPlayPlayer
21 from music_assistant.providers.airplay.stream_session import AirPlayStreamSession
22
23
24class AirPlayProtocol(ABC):
25 """Base class for AirPlay streaming protocols (RAOP and AirPlay2).
26
27 This class contains common logic shared between protocol implementations,
28 with abstract methods for protocol-specific behavior.
29 """
30
31 _cli_proc: AsyncProcess | None # reference to the (protocol-specific) CLI process
32 session: AirPlayStreamSession | None = None # reference to the active stream session (if any)
33
34 # the pcm audio format used for streaming to this protocol
35 pcm_format = AIRPLAY_PCM_FORMAT
36
37 def __init__(
38 self,
39 player: AirPlayPlayer,
40 ) -> None:
41 """Initialize base AirPlay protocol.
42
43 Args:
44 player: The player to stream to
45 """
46 self.prov = player.provider
47 self.mass = player.provider.mass
48 self.player = player
49 self.logger = player.provider.logger.getChild(f"protocol.{self.__class__.__name__}")
50 mac_address = self.player.device_info.mac_address or self.player.player_id
51 self.active_remote_id: str = generate_active_remote_id(mac_address)
52 self.prevent_playback: bool = False
53 self._cli_proc: AsyncProcess | None = None
54 self.commands_pipe = AsyncNamedPipeWriter(
55 f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd", # noqa: S108
56 )
57 self._stopped = False
58 self._total_bytes_sent = 0
59 self._stream_bytes_sent = 0
60 self._connected = asyncio.Event()
61
62 @property
63 def running(self) -> bool:
64 """Return boolean if this stream is running."""
65 return not self._stopped and self._cli_proc is not None and not self._cli_proc.closed
66
67 @abstractmethod
68 async def start(self, start_ntp: int) -> None:
69 """Start the CLI process.
70
71 :param start_ntp: NTP timestamp to start streaming.
72 """
73
74 async def wait_for_connection(self) -> None:
75 """Wait for device connection to be established."""
76 if not self._cli_proc:
77 return
78 await asyncio.wait_for(self._connected.wait(), timeout=10)
79 # repeat sending the volume level to the player because some players seem
80 # to ignore it the first time
81 # https://github.com/music-assistant/support/issues/3330
82 self.mass.call_later(2, self.send_cli_command(f"VOLUME={self.player.volume_level}"))
83
84 async def stop(self, force: bool = False) -> None:
85 """
86 Stop playback and cleanup.
87
88 :param force: If True, immediately kill the process without graceful shutdown.
89 """
90 # always send stop command first
91 await self.send_cli_command("ACTION=STOP")
92 if self._cli_proc:
93 await self._cli_proc.write_eof()
94 self._stopped = True
95 await self.commands_pipe.remove()
96 if force:
97 if self._cli_proc and not self._cli_proc.closed:
98 await self._cli_proc.kill()
99 elif self._cli_proc and not self._cli_proc.closed:
100 await self._cli_proc.close()
101 if not force:
102 self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
103
104 async def send_cli_command(self, command: str) -> None:
105 """Send an interactive command to the running CLI binary."""
106 if self._stopped or not self._cli_proc or self._cli_proc.closed:
107 return
108 if not self.commands_pipe:
109 return
110 self.player.last_command_sent = time.time()
111 if not command.endswith("\n"):
112 command += "\n"
113 await self.commands_pipe.write(command.encode("utf-8"))
114
115 async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
116 """Send metadata to player."""
117 if self._stopped:
118 return
119 if metadata:
120 duration = min(metadata.duration or 0, 3600)
121 title = metadata.title or ""
122 artist = metadata.artist or ""
123 album = metadata.album or ""
124 cmd = f"TITLE={title}\nARTIST={artist}\nALBUM={album}\n"
125 cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
126 await self.send_cli_command(cmd)
127 # get image
128 if metadata.image_url:
129 await self.send_cli_command(f"ARTWORK={metadata.image_url}")
130 if progress is not None:
131 await self.send_cli_command(f"PROGRESS={progress}")
132