music-assistant-server

6.2 KBPY
raop.py
6.2 KB147 lines • python
1"""Logic for RAOP audio streaming to AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from typing import TYPE_CHECKING, cast
8
9from music_assistant_models.enums import PlaybackState
10
11from music_assistant.constants import VERBOSE_LOG_LEVEL
12from music_assistant.helpers.process import AsyncProcess
13from music_assistant.providers.airplay.constants import (
14    AIRPLAY_OUTPUT_BUFFER_DURATION_MS,
15    CONF_ALAC_ENCODE,
16    CONF_ENCRYPTION,
17    CONF_PASSWORD,
18    CONF_RAOP_CREDENTIALS,
19)
20from music_assistant.providers.airplay.helpers import get_cli_binary
21
22from ._protocol import AirPlayProtocol
23
24if TYPE_CHECKING:
25    from music_assistant.providers.airplay.provider import AirPlayProvider
26
27
28class RaopStream(AirPlayProtocol):
29    """
30    RAOP (AirPlay 1) Audio Streamer.
31
32    Python is not suitable for realtime audio streaming so we do the actual streaming
33    of (RAOP) audio using a small executable written in C based on libraop to do
34    the actual timestamped playback, which reads pcm audio from stdin
35    and we can send some interactive commands using a named pipe.
36    """
37
38    async def start(self, start_ntp: int) -> None:
39        """Start CLIRaop process."""
40        assert self.player.raop_discovery_info is not None  # for type checker
41        cli_binary = await get_cli_binary(self.player.protocol)
42        extra_args: list[str] = []
43        extra_args += ["-if", self.mass.streams.bind_ip]
44        if self.player.config.get_value(CONF_ENCRYPTION, True):
45            extra_args += ["-encrypt"]
46        if self.player.config.get_value(CONF_ALAC_ENCODE, True):
47            extra_args += ["-alac"]
48        for prop in ("et", "md", "am", "pk", "pw"):
49            if prop_value := self.player.raop_discovery_info.decoded_properties.get(prop):
50                extra_args += [f"-{prop}", prop_value]
51        if device_password := self.player.config.get_value(CONF_PASSWORD):
52            extra_args += ["-password", str(device_password)]
53        # Add RAOP credentials from pairing if available (for Apple devices)
54        if raop_credentials := self.player.config.get_value(CONF_RAOP_CREDENTIALS):
55            # Credentials format is "client_id:auth_secret", cliraop expects just auth_secret
56            creds_str = str(raop_credentials)
57            auth_secret = creds_str.split(":", 1)[1] if ":" in creds_str else creds_str
58            extra_args += ["-secret", auth_secret]
59        if self.prov.logger.isEnabledFor(logging.DEBUG):
60            extra_args += ["-debug", "5"]
61        elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
62            extra_args += ["-debug", "10"]
63
64        cliraop_args = [
65            cli_binary,
66            "-ntpstart",
67            str(start_ntp),
68            "-port",
69            str(self.player.raop_discovery_info.port),
70            "-latency",
71            str(AIRPLAY_OUTPUT_BUFFER_DURATION_MS),
72            "-volume",
73            str(self.player.volume_level),
74            *extra_args,
75            "-dacp",
76            cast("AirPlayProvider", self.prov).dacp_id,
77            "-activeremote",
78            self.active_remote_id,
79            "-cmdpipe",
80            self.commands_pipe.path,
81            "-udn",
82            self.player.raop_discovery_info.name,
83            self.player.address,
84            "-",  # Use stdin for audio input
85        ]
86        self.player.logger.debug(
87            "Starting cliraop process for player %s with args: %s",
88            self.player.player_id,
89            cliraop_args,
90        )
91        self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
92        await self._cli_proc.start()
93        # start reading the stderr of the cliap2 process from another task
94        self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
95
96    async def _stderr_reader(self) -> None:
97        """Monitor stderr for the running CLIRaop process."""
98        player = self.player
99        logger = player.logger
100        lost_packets = 0
101        if not self._cli_proc:
102            return
103        async for line in self._cli_proc.iter_stderr():
104            if self._stopped:
105                break
106            if "connected to " in line:
107                self._connected.set()
108                # successfully connected - playback will/can start
109            if "set pause" in line or "Pause at" in line:
110                player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
111            elif "Restarted at" in line or "restarting w/ pause" in line:
112                player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
113            elif "restarting w/o pause" in line:
114                # streaming has started
115                player.set_state_from_stream(
116                    state=PlaybackState.PLAYING, elapsed_time=0, stream=self
117                )
118            elif "elapsed milliseconds:" in line:
119                # this is received more or less every second while playing
120                millis = int(line.split("elapsed milliseconds: ")[1])
121                # note that this represents the total elapsed time of the streaming session
122                elapsed_time = millis / 1000
123                player.set_state_from_stream(elapsed_time=elapsed_time)
124            elif "Password required, but none supplied." in line:
125                logger.error(
126                    f"Player {self.player.name} requires a password. "
127                    f"Please add one in Player Settings"
128                )
129                break
130            if "lost packet out of backlog" in line:
131                lost_packets += 1
132                if lost_packets == 100:
133                    logger.error("High packet loss detected, restarting playback...")
134                    self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
135                else:
136                    logger.warning("Packet loss detected!")
137            if "end of stream reached" in line:
138                logger.debug("End of stream reached")
139                break
140            logger.log(VERBOSE_LOG_LEVEL, line)
141            await asyncio.sleep(0)  # Yield to event loop
142
143        logger.debug("CLIRaop stderr reader ended")
144        if not self._stopped:
145            self._stopped = True
146            self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
147