/
/
/
1"""Logic for RAOP audio streaming to AirPlay devices."""
2
3from __future__ import annotations
4
5import asyncio
6import logging
7from typing import TYPE_CHECKING, cast
8
9from music_assistant_models.enums import PlaybackState
10
11from music_assistant.constants import VERBOSE_LOG_LEVEL
12from music_assistant.helpers.process import AsyncProcess
13from music_assistant.providers.airplay.constants import (
14 AIRPLAY_OUTPUT_BUFFER_DURATION_MS,
15 CONF_ALAC_ENCODE,
16 CONF_ENCRYPTION,
17 CONF_PASSWORD,
18 CONF_RAOP_CREDENTIALS,
19)
20from music_assistant.providers.airplay.helpers import get_cli_binary
21
22from ._protocol import AirPlayProtocol
23
24if TYPE_CHECKING:
25 from music_assistant.providers.airplay.provider import AirPlayProvider
26
27
28class RaopStream(AirPlayProtocol):
29 """
30 RAOP (AirPlay 1) Audio Streamer.
31
32 Python is not suitable for realtime audio streaming so we do the actual streaming
33 of (RAOP) audio using a small executable written in C based on libraop to do
34 the actual timestamped playback, which reads pcm audio from stdin
35 and we can send some interactive commands using a named pipe.
36 """
37
38 async def start(self, start_ntp: int) -> None:
39 """Start CLIRaop process."""
40 assert self.player.raop_discovery_info is not None # for type checker
41 cli_binary = await get_cli_binary(self.player.protocol)
42 extra_args: list[str] = []
43 player_id = self.player.player_id
44 extra_args += ["-if", self.mass.streams.bind_ip]
45 if self.player.config.get_value(CONF_ENCRYPTION, True):
46 extra_args += ["-encrypt"]
47 if self.player.config.get_value(CONF_ALAC_ENCODE, True):
48 extra_args += ["-alac"]
49 for prop in ("et", "md", "am", "pk", "pw"):
50 if prop_value := self.player.raop_discovery_info.decoded_properties.get(prop):
51 extra_args += [f"-{prop}", prop_value]
52 if device_password := self.mass.config.get_raw_player_config_value(
53 player_id, CONF_PASSWORD
54 ):
55 extra_args += ["-password", str(device_password)]
56 # Add RAOP credentials from pairing if available (for Apple devices)
57 if raop_credentials := self.player.config.get_value(CONF_RAOP_CREDENTIALS):
58 # Credentials format is "client_id:auth_secret", cliraop expects just auth_secret
59 creds_str = str(raop_credentials)
60 auth_secret = creds_str.split(":", 1)[1] if ":" in creds_str else creds_str
61 extra_args += ["-secret", auth_secret]
62 if self.prov.logger.isEnabledFor(logging.DEBUG):
63 extra_args += ["-debug", "5"]
64 elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
65 extra_args += ["-debug", "10"]
66
67 cliraop_args = [
68 cli_binary,
69 "-ntpstart",
70 str(start_ntp),
71 "-port",
72 str(self.player.raop_discovery_info.port),
73 "-latency",
74 str(AIRPLAY_OUTPUT_BUFFER_DURATION_MS),
75 "-volume",
76 str(self.player.volume_level),
77 *extra_args,
78 "-dacp",
79 cast("AirPlayProvider", self.prov).dacp_id,
80 "-activeremote",
81 self.active_remote_id,
82 "-cmdpipe",
83 self.commands_pipe.path,
84 "-udn",
85 self.player.raop_discovery_info.name,
86 self.player.address,
87 "-", # Use stdin for audio input
88 ]
89 self.player.logger.debug(
90 "Starting cliraop process for player %s with args: %s",
91 self.player.player_id,
92 cliraop_args,
93 )
94 self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
95 await self._cli_proc.start()
96 # start reading the stderr of the cliap2 process from another task
97 self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
98
99 async def _stderr_reader(self) -> None:
100 """Monitor stderr for the running CLIRaop process."""
101 player = self.player
102 logger = player.logger
103 lost_packets = 0
104 if not self._cli_proc:
105 return
106 async for line in self._cli_proc.iter_stderr():
107 if self._stopped:
108 break
109 if "connected to " in line:
110 self._connected.set()
111 # successfully connected - playback will/can start
112 if "set pause" in line or "Pause at" in line:
113 player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
114 elif "Restarted at" in line or "restarting w/ pause" in line:
115 player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
116 elif "restarting w/o pause" in line:
117 # streaming has started
118 player.set_state_from_stream(
119 state=PlaybackState.PLAYING, elapsed_time=0, stream=self
120 )
121 elif "elapsed milliseconds:" in line:
122 # this is received more or less every second while playing
123 millis = int(line.split("elapsed milliseconds: ")[1])
124 # note that this represents the total elapsed time of the streaming session
125 elapsed_time = millis / 1000
126 player.set_state_from_stream(elapsed_time=elapsed_time)
127 if "lost packet out of backlog" in line:
128 lost_packets += 1
129 if lost_packets == 100:
130 logger.error("High packet loss detected, restarting playback...")
131 self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
132 else:
133 logger.warning("Packet loss detected!")
134 if "end of stream reached" in line:
135 logger.debug("End of stream reached")
136 break
137 logger.log(VERBOSE_LOG_LEVEL, line)
138 await asyncio.sleep(0) # Yield to event loop
139
140 logger.debug("CLIRaop stderr reader ended")
141 if not self._stopped:
142 self._stopped = True
143 self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
144