/
/
/
1"""AirPlay Player provider for Music Assistant."""
2
3from __future__ import annotations
4
5import asyncio
6import socket
7from typing import cast
8
9from music_assistant_models.enums import PlaybackState
10from zeroconf import ServiceStateChange
11from zeroconf.asyncio import AsyncServiceInfo
12
13from music_assistant.helpers.datetime import utc
14from music_assistant.helpers.util import (
15 get_ip_pton,
16 get_primary_ip_address_from_zeroconf,
17 select_free_port,
18)
19from music_assistant.models.player_provider import PlayerProvider
20
21from .constants import (
22 AIRPLAY_DISCOVERY_TYPE,
23 CACHE_CATEGORY_PREV_VOLUME,
24 CONF_IGNORE_VOLUME,
25 DACP_DISCOVERY_TYPE,
26 FALLBACK_VOLUME,
27 RAOP_DISCOVERY_TYPE,
28)
29from .helpers import convert_airplay_volume, get_model_info
30from .player import AirPlayPlayer
31
32# TODO: AirPlay provider
33# - Implement authentication for Apple TV
34# - Implement volume control for Apple devices using pyatv
35# - Implement metadata for Apple Apple devices using pyatv
36# - Use pyatv for communicating with original Apple devices (and use cliraop for actual streaming)
37# - Implement AirPlay 2 support
38# - Implement late joining to existing stream (instead of restarting it)
39
40
41class AirPlayProvider(PlayerProvider):
42 """Player provider for AirPlay based players."""
43
44 _dacp_server: asyncio.Server
45 _dacp_info: AsyncServiceInfo
46
47 async def handle_async_init(self) -> None:
48 """Handle async initialization of the provider."""
49 # register DACP zeroconf service
50 dacp_port = await select_free_port(39831, 49831)
51 # Use first 16 hex chars of server_id as a persistent DACP ID
52 # This ensures the DACP ID remains the same across restarts, which is required
53 # for AirPlay 2 (HAP) pair-verify to work with previously paired devices
54 self.dacp_id = dacp_id = self.mass.server_id[:16].upper()
55 self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
56 self._dacp_server = await asyncio.start_server(
57 self._handle_dacp_request, "0.0.0.0", dacp_port
58 )
59 server_id = f"iTunes_Ctrl_{dacp_id}.{DACP_DISCOVERY_TYPE}"
60 self._dacp_info = AsyncServiceInfo(
61 DACP_DISCOVERY_TYPE,
62 name=server_id,
63 addresses=[await get_ip_pton(str(self.mass.streams.publish_ip))],
64 port=dacp_port,
65 properties={
66 "txtvers": "1",
67 "Ver": "63B5E5C0C201542E",
68 "DbId": "63B5E5C0C201542E",
69 "OSsi": "0x1F5",
70 },
71 server=f"{socket.gethostname()}.local",
72 )
73 await self.mass.aiozc.async_register_service(self._dacp_info)
74
75 async def on_mdns_service_state_change(
76 self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
77 ) -> None:
78 """Handle MDNS service state callback."""
79 if not info:
80 if state_change == ServiceStateChange.Removed and "@" in name:
81 # Service name is enough to mark the player as unavailable on 'Removed' notification
82 raw_id, display_name = name.split(".")[0].split("@", 1)
83 else:
84 # If we are not in a 'Removed' state, we need info to be filled to update the player
85 return
86 elif "@" in info.name:
87 raw_id, display_name = info.name.split(".")[0].split("@", 1)
88 elif deviceid := info.decoded_properties.get("deviceid"):
89 raw_id = deviceid.replace(":", "")
90 display_name = info.name.split(".")[0]
91 else:
92 return
93 player_id = f"ap{raw_id.lower()}"
94 # handle removed player
95 if state_change == ServiceStateChange.Removed:
96 if _player := self.mass.players.get(player_id):
97 # the player has become unavailable
98 self.logger.debug("Player offline: %s", _player.display_name)
99 await self.mass.players.unregister(player_id)
100 return
101 # handle update for existing device
102 assert info is not None # type guard
103 player: AirPlayPlayer | None
104 if player := cast("AirPlayPlayer | None", self.mass.players.get(player_id)):
105 # update the latest discovery info for existing player
106 player.set_discovery_info(info, display_name)
107 return
108 await self._setup_player(player_id, display_name, info)
109
110 async def unload(self, is_removed: bool = False) -> None:
111 """Handle unload/close of the provider."""
112 # shutdown DACP server
113 if self._dacp_server:
114 self._dacp_server.close()
115 # shutdown DACP zeroconf service
116 if self._dacp_info:
117 await self.mass.aiozc.async_unregister_service(self._dacp_info)
118
119 async def _setup_player(
120 self, player_id: str, display_name: str, discovery_info: AsyncServiceInfo
121 ) -> None:
122 """Handle setup of a new player that is discovered using mdns."""
123 raop_discovery_info: AsyncServiceInfo | None = None
124 airplay_discovery_info: AsyncServiceInfo | None = None
125 if discovery_info.type == RAOP_DISCOVERY_TYPE:
126 # RAOP service discovered
127 raop_discovery_info = discovery_info
128 self.logger.debug("Discovered RAOP service for %s", display_name)
129 # always prefer airplay mdns info as it has more details
130 # fallback to raop info if airplay info is not available,
131 # (old device only announcing raop)
132 airplay_discovery_info = AsyncServiceInfo(
133 AIRPLAY_DISCOVERY_TYPE,
134 discovery_info.name.split("@")[-1].replace("_raop", "_airplay"),
135 )
136 await airplay_discovery_info.async_request(self.mass.aiozc.zeroconf, 3000)
137 else:
138 # AirPlay service discovered
139 self.logger.debug("Discovered AirPlay service for %s", display_name)
140 airplay_discovery_info = discovery_info
141
142 if airplay_discovery_info:
143 manufacturer, model = get_model_info(airplay_discovery_info)
144 elif raop_discovery_info:
145 manufacturer, model = get_model_info(raop_discovery_info)
146 else:
147 manufacturer, model = "Unknown", "Unknown"
148
149 address = get_primary_ip_address_from_zeroconf(discovery_info)
150 if not address:
151 return # should not happen, but guard just in case
152
153 # Filter out shairport-sync instances running on THIS Music Assistant server
154 # These are managed by the AirPlay Receiver provider, not the AirPlay provider
155 # We check both model name AND that it's a local address to avoid filtering
156 # shairport-sync instances running on other machines
157 if model == "ShairportSync":
158 # Check if this is a local address (127.x.x.x or matches our server's IP)
159 if address.startswith("127.") or address == self.mass.streams.publish_ip:
160 return
161
162 if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
163 self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
164 return
165 if not discovery_info:
166 return # should not happen, but guard just in case
167
168 # if we reach this point, all preflights are ok and we can create the player
169 self.logger.debug("Discovered AirPlay device %s on %s", display_name, address)
170
171 # Get volume from cache
172 if not (
173 volume := await self.mass.cache.get(
174 key=player_id, provider=self.instance_id, category=CACHE_CATEGORY_PREV_VOLUME
175 )
176 ):
177 volume = FALLBACK_VOLUME
178
179 # Append airplay to the default name for non-apple devices
180 # to make it easier for users to distinguish
181 is_apple = manufacturer.lower() == "apple"
182 if not is_apple and "airplay" not in display_name.lower():
183 display_name += " (AirPlay)"
184
185 # Final check before registration to handle race conditions
186 # (multiple MDNS events processed in parallel for same device)
187 if self.mass.players.get(player_id):
188 self.logger.debug(
189 "Player %s already registered during setup, skipping registration", player_id
190 )
191 return
192
193 self.logger.debug(
194 "Setting up player %s: manufacturer=%s, model=%s",
195 display_name,
196 manufacturer,
197 model,
198 )
199
200 # Create single AirPlayPlayer for all devices
201 # Pairing config entries will be shown conditionally based on device type
202 player = AirPlayPlayer(
203 provider=self,
204 player_id=player_id,
205 raop_discovery_info=raop_discovery_info,
206 airplay_discovery_info=airplay_discovery_info,
207 address=address,
208 display_name=display_name,
209 manufacturer=manufacturer,
210 model=model,
211 initial_volume=volume,
212 )
213 await self.mass.players.register(player)
214
215 async def _handle_dacp_request( # noqa: PLR0915
216 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
217 ) -> None:
218 """Handle new connection on the socket."""
219 try:
220 raw_request = b""
221 while recv := await reader.read(1024):
222 raw_request += recv
223 if len(recv) < 1024:
224 break
225 if not raw_request:
226 # Some device (Phorus PS10) seems to send empty request
227 # Maybe as a ack message? we have nothing to do here with empty request
228 # so we return early.
229 return
230
231 request = raw_request.decode("UTF-8")
232 if "\r\n\r\n" in request:
233 headers_raw, body = request.split("\r\n\r\n", 1)
234 else:
235 headers_raw = request
236 body = ""
237 headers_split = headers_raw.split("\r\n")
238 headers = {}
239 for line in headers_split[1:]:
240 if ":" not in line:
241 continue
242 x, y = line.split(":", 1)
243 headers[x.strip()] = y.strip()
244 active_remote = headers.get("Active-Remote")
245 _, path, _ = headers_split[0].split(" ")
246 # lookup airplay player by active remote id
247 player: AirPlayPlayer | None = next(
248 (
249 x
250 for x in self.get_players()
251 if x.stream and x.stream.active_remote_id == active_remote
252 ),
253 None,
254 )
255 self.logger.debug(
256 "DACP request for %s (%s): %s -- %s",
257 player.name if player else "UNKNOWN PLAYER",
258 active_remote,
259 path,
260 body,
261 )
262 if not player:
263 return
264
265 player_id = player.player_id
266 ignore_volume_report = (
267 self.mass.config.get_raw_player_config_value(player_id, CONF_IGNORE_VOLUME, False)
268 or player.device_info.manufacturer.lower() == "apple"
269 )
270 active_queue = self.mass.player_queues.get_active_queue(player_id)
271 if not active_queue:
272 self.logger.warning(
273 "DACP request for %s (%s) but no active queue found, ignoring request",
274 player.display_name,
275 player_id,
276 )
277 return
278 if path == "/ctrl-int/1/nextitem":
279 self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id))
280 elif path == "/ctrl-int/1/previtem":
281 self.mass.create_task(self.mass.player_queues.previous(active_queue.queue_id))
282 elif path == "/ctrl-int/1/play":
283 # sometimes this request is sent by a device as confirmation of a play command
284 # we ignore this if the player is already playing
285 if player.playback_state != PlaybackState.PLAYING:
286 self.mass.create_task(self.mass.player_queues.play(active_queue.queue_id))
287 elif path == "/ctrl-int/1/playpause":
288 self.mass.create_task(self.mass.player_queues.play_pause(active_queue.queue_id))
289 elif path == "/ctrl-int/1/stop":
290 self.mass.create_task(self.mass.player_queues.stop(active_queue.queue_id))
291 elif path == "/ctrl-int/1/volumeup":
292 self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
293 elif path == "/ctrl-int/1/volumedown":
294 self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
295 elif path == "/ctrl-int/1/shuffle_songs":
296 queue = self.mass.player_queues.get(player_id)
297 if not queue:
298 return
299 await self.mass.player_queues.set_shuffle(
300 active_queue.queue_id, not queue.shuffle_enabled
301 )
302 elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
303 # sometimes this request is sent by a device as confirmation of a play command
304 # we ignore this if the player is already playing
305 if player.playback_state == PlaybackState.PLAYING:
306 self.mass.create_task(self.mass.player_queues.pause(active_queue.queue_id))
307 elif "dmcp.device-volume=" in path and not ignore_volume_report:
308 # This is a bit annoying as this can be either the device confirming a new volume
309 # we've sent or the device requesting a new volume itself.
310 # In case of a small rounding difference, we ignore this,
311 # to prevent an endless pingpong of volume changes
312 airplay_volume = float(path.split("dmcp.device-volume=", 1)[-1])
313 volume = convert_airplay_volume(airplay_volume)
314 player.update_volume_from_device(volume)
315 elif "dmcp.volume=" in path:
316 # volume change request from device (e.g. volume buttons)
317 volume = int(path.split("dmcp.volume=", 1)[-1])
318 player.update_volume_from_device(volume)
319 elif "device-prevent-playback=1" in path:
320 # device switched to another source (or is powered off)
321 # Ignore during stream transition (stale message from old CLI process)
322 if player._transitioning:
323 self.logger.debug("Ignoring prevent-playback during stream transition")
324 elif stream := player.stream:
325 stream.prevent_playback = True
326 if stream.session:
327 self.mass.create_task(stream.session.remove_client(player))
328 elif "device-prevent-playback=0" in path:
329 # device reports that its ready for playback again
330 if stream := player.stream:
331 stream.prevent_playback = False
332
333 # send response
334 date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
335 response = (
336 f"HTTP/1.0 204 No Content\r\nDate: {date_str} "
337 "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: "
338 "application/x-dmap-tagged\r\nContent-Length: 0\r\n"
339 "Connection: close\r\n\r\n"
340 )
341 writer.write(response.encode())
342 await writer.drain()
343 finally:
344 writer.close()
345
346 def get_players(self) -> list[AirPlayPlayer]:
347 """Return all airplay players belonging to this instance."""
348 return cast("list[AirPlayPlayer]", self.players)
349
350 def get_player(self, player_id: str) -> AirPlayPlayer | None:
351 """Return AirplayPlayer by id."""
352 return cast("AirPlayPlayer | None", self.mass.players.get(player_id))
353