/
/
/
1"""Simple async-friendly named pipe writer using threads."""
2
3from __future__ import annotations
4
5import asyncio
6import errno as errno_module
7import logging
8import os
9import time
10from contextlib import suppress
11from pathlib import Path
12
13_LOGGER = logging.getLogger("named_pipe")
14
15
16class AsyncNamedPipeWriter:
17 """Async writer for named pipes."""
18
19 def __init__(self, pipe_path: str) -> None:
20 """Initialize named pipe writer."""
21 self._pipe_path = pipe_path
22 self._write_fd: int | None = None
23
24 @property
25 def path(self) -> str:
26 """Return the named pipe path."""
27 return self._pipe_path
28
29 async def create(self) -> None:
30 """Create the named pipe."""
31
32 def _create() -> None:
33 pipe_path = Path(self._pipe_path)
34 if pipe_path.exists():
35 pipe_path.unlink()
36 os.mkfifo(self._pipe_path)
37
38 await asyncio.to_thread(_create)
39
40 def _ensure_write_fd(self) -> bool:
41 """Ensure we have a write fd open. Returns True if successful."""
42 if self._write_fd is not None:
43 return True
44 if not Path(self._pipe_path).exists():
45 return False
46 # Retry opening until reader is available (up to 1s)
47 for _ in range(20):
48 try:
49 self._write_fd = os.open(self._pipe_path, os.O_WRONLY | os.O_NONBLOCK)
50 return True
51 except OSError as e:
52 if e.errno in (errno_module.ENXIO, errno_module.ENOENT):
53 time.sleep(0.05)
54 continue
55 raise
56 _LOGGER.warning("Could not open pipe %s: no reader after retries", self._pipe_path)
57 return False
58
59 async def write(self, data: bytes) -> None:
60 """Write data to the named pipe."""
61
62 def _write() -> None:
63 if not self._ensure_write_fd():
64 return
65 try:
66 assert self._write_fd is not None
67 os.write(self._write_fd, data)
68 except OSError as e:
69 if e.errno == errno_module.EPIPE:
70 # Reader closed, reset fd for next attempt
71 if self._write_fd is not None:
72 with suppress(Exception):
73 os.close(self._write_fd)
74 self._write_fd = None
75 else:
76 raise
77
78 await asyncio.to_thread(_write)
79
80 async def remove(self) -> None:
81 """Close write fd and remove the pipe."""
82 if self._write_fd is not None:
83 with suppress(Exception):
84 os.close(self._write_fd)
85 self._write_fd = None
86 pipe_path = Path(self._pipe_path)
87 if pipe_path.exists():
88 with suppress(Exception):
89 pipe_path.unlink()
90
91 def __str__(self) -> str:
92 """Return string representation."""
93 return self._pipe_path
94