/
/
/
1"""Logic for RAOP audio streaming to AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from typing import TYPE_CHECKING, cast
8
9from music_assistant_models.enums import PlaybackState
10
11from music_assistant.constants import VERBOSE_LOG_LEVEL
12from music_assistant.helpers.process import AsyncProcess
13from music_assistant.providers.airplay.constants import (
14 AIRPLAY_OUTPUT_BUFFER_DURATION_MS,
15 CONF_ALAC_ENCODE,
16 CONF_ENCRYPTION,
17 CONF_PASSWORD,
18 CONF_RAOP_CREDENTIALS,
19)
20from music_assistant.providers.airplay.helpers import get_cli_binary
21
22from ._protocol import AirPlayProtocol
23
24if TYPE_CHECKING:
25 from music_assistant.providers.airplay.provider import AirPlayProvider
26
27
28class RaopStream(AirPlayProtocol):
29 """
30 RAOP (AirPlay 1) Audio Streamer.
31
32 Python is not suitable for realtime audio streaming so we do the actual streaming
33 of (RAOP) audio using a small executable written in C based on libraop to do
34 the actual timestamped playback, which reads pcm audio from stdin
35 and we can send some interactive commands using a named pipe.
36 """
37
38 async def start(self, start_ntp: int) -> None:
39 """Start CLIRaop process."""
40 assert self.player.raop_discovery_info is not None # for type checker
41 cli_binary = await get_cli_binary(self.player.protocol)
42 extra_args: list[str] = []
43 extra_args += ["-if", self.mass.streams.bind_ip]
44 if self.player.config.get_value(CONF_ENCRYPTION, True):
45 extra_args += ["-encrypt"]
46 if self.player.config.get_value(CONF_ALAC_ENCODE, True):
47 extra_args += ["-alac"]
48 for prop in ("et", "md", "am", "pk", "pw"):
49 if prop_value := self.player.raop_discovery_info.decoded_properties.get(prop):
50 extra_args += [f"-{prop}", prop_value]
51 if device_password := self.player.config.get_value(CONF_PASSWORD):
52 extra_args += ["-password", str(device_password)]
53 # Add RAOP credentials from pairing if available (for Apple devices)
54 if raop_credentials := self.player.config.get_value(CONF_RAOP_CREDENTIALS):
55 # Credentials format is "client_id:auth_secret", cliraop expects just auth_secret
56 creds_str = str(raop_credentials)
57 auth_secret = creds_str.split(":", 1)[1] if ":" in creds_str else creds_str
58 extra_args += ["-secret", auth_secret]
59 if self.prov.logger.isEnabledFor(logging.DEBUG):
60 extra_args += ["-debug", "5"]
61 elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
62 extra_args += ["-debug", "10"]
63
64 cliraop_args = [
65 cli_binary,
66 "-ntpstart",
67 str(start_ntp),
68 "-port",
69 str(self.player.raop_discovery_info.port),
70 "-latency",
71 str(AIRPLAY_OUTPUT_BUFFER_DURATION_MS),
72 "-volume",
73 str(self.player.volume_level),
74 *extra_args,
75 "-dacp",
76 cast("AirPlayProvider", self.prov).dacp_id,
77 "-activeremote",
78 self.active_remote_id,
79 "-cmdpipe",
80 self.commands_pipe.path,
81 "-udn",
82 self.player.raop_discovery_info.name,
83 self.player.address,
84 "-", # Use stdin for audio input
85 ]
86 self.player.logger.debug(
87 "Starting cliraop process for player %s with args: %s",
88 self.player.player_id,
89 cliraop_args,
90 )
91 self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
92 await self._cli_proc.start()
93 # start reading the stderr of the cliap2 process from another task
94 self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
95
96 async def _stderr_reader(self) -> None:
97 """Monitor stderr for the running CLIRaop process."""
98 player = self.player
99 logger = player.logger
100 lost_packets = 0
101 if not self._cli_proc:
102 return
103 async for line in self._cli_proc.iter_stderr():
104 if self._stopped:
105 break
106 if "connected to " in line:
107 self._connected.set()
108 # successfully connected - playback will/can start
109 if "set pause" in line or "Pause at" in line:
110 player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
111 elif "Restarted at" in line or "restarting w/ pause" in line:
112 player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
113 elif "restarting w/o pause" in line:
114 # streaming has started
115 player.set_state_from_stream(
116 state=PlaybackState.PLAYING, elapsed_time=0, stream=self
117 )
118 elif "elapsed milliseconds:" in line:
119 # this is received more or less every second while playing
120 millis = int(line.split("elapsed milliseconds: ")[1])
121 # note that this represents the total elapsed time of the streaming session
122 elapsed_time = millis / 1000
123 player.set_state_from_stream(elapsed_time=elapsed_time)
124 elif "Password required, but none supplied." in line:
125 logger.error(
126 f"Player {self.player.name} requires a password. "
127 f"Please add one in Player Settings"
128 )
129 break
130 if "lost packet out of backlog" in line:
131 lost_packets += 1
132 if lost_packets == 100:
133 logger.error("High packet loss detected, restarting playback...")
134 self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
135 else:
136 logger.warning("Packet loss detected!")
137 if "end of stream reached" in line:
138 logger.debug("End of stream reached")
139 break
140 logger.log(VERBOSE_LOG_LEVEL, line)
141 await asyncio.sleep(0) # Yield to event loop
142
143 logger.debug("CLIRaop stderr reader ended")
144 if not self._stopped:
145 self._stopped = True
146 self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
147