/
/
/
1"""WebRTC DTLS Certificate Management.
2
3This module provides persistent DTLS certificate management for WebRTC connections.
4The certificate is generated once and stored persistently, enabling client-side
5certificate pinning for authentication.
6"""
7
8from __future__ import annotations
9
10import base64
11import logging
12import stat
13from datetime import UTC, datetime, timedelta
14from pathlib import Path
15
16from aiortc import RTCConfiguration, RTCPeerConnection
17from aiortc.rtcdtlstransport import RTCCertificate
18from cryptography import x509
19from cryptography.hazmat.primitives import hashes, serialization
20from cryptography.hazmat.primitives.asymmetric import ec
21from cryptography.x509.oid import NameOID
22
23LOGGER = logging.getLogger(__name__)
24
25CERT_FILENAME = "webrtc_certificate.pem"
26KEY_FILENAME = "webrtc_private_key.pem"
27
28CERT_VALIDITY_DAYS = 3650 # 10 years
29
30CERT_RENEWAL_THRESHOLD_DAYS = 30
31
32
33def _generate_certificate() -> tuple[ec.EllipticCurvePrivateKey, x509.Certificate]:
34 """Generate a new ECDSA certificate for WebRTC DTLS.
35
36 :return: Tuple of (private_key, certificate).
37 """
38 # Generate ECDSA key (SECP256R1 - same as aiortc default)
39 private_key = ec.generate_private_key(ec.SECP256R1())
40
41 now = datetime.now(UTC)
42 not_before = now - timedelta(days=1)
43 not_after = now + timedelta(days=CERT_VALIDITY_DAYS)
44
45 subject = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "Music Assistant WebRTC")])
46
47 cert = (
48 x509.CertificateBuilder()
49 .subject_name(subject)
50 .issuer_name(subject)
51 .public_key(private_key.public_key())
52 .serial_number(x509.random_serial_number())
53 .not_valid_before(not_before)
54 .not_valid_after(not_after)
55 .sign(private_key, hashes.SHA256())
56 )
57
58 return private_key, cert
59
60
61def _save_certificate(
62 storage_path: str,
63 private_key: ec.EllipticCurvePrivateKey,
64 cert: x509.Certificate,
65) -> None:
66 """Save certificate and private key to disk.
67
68 :param storage_path: Directory to store the files.
69 :param private_key: The EC private key.
70 :param cert: The X.509 certificate.
71 """
72 cert_path = Path(storage_path) / CERT_FILENAME
73 key_path = Path(storage_path) / KEY_FILENAME
74
75 cert_pem = cert.public_bytes(serialization.Encoding.PEM)
76 cert_path.write_bytes(cert_pem)
77
78 key_pem = private_key.private_bytes(
79 encoding=serialization.Encoding.PEM,
80 format=serialization.PrivateFormat.PKCS8,
81 encryption_algorithm=serialization.NoEncryption(),
82 )
83 key_path.write_bytes(key_pem)
84
85 # Set restrictive permissions on private key (owner read/write only)
86 key_path.chmod(stat.S_IRUSR | stat.S_IWUSR)
87
88
89def _load_certificate(
90 storage_path: str,
91) -> tuple[ec.EllipticCurvePrivateKey, x509.Certificate] | None:
92 """Load certificate and private key from disk.
93
94 :param storage_path: Directory containing the files.
95 :return: Tuple of (private_key, certificate) or None if files don't exist.
96 """
97 cert_path = Path(storage_path) / CERT_FILENAME
98 key_path = Path(storage_path) / KEY_FILENAME
99
100 if not cert_path.exists() or not key_path.exists():
101 return None
102
103 try:
104 cert_pem = cert_path.read_bytes()
105 cert = x509.load_pem_x509_certificate(cert_pem)
106
107 key_pem = key_path.read_bytes()
108 private_key = serialization.load_pem_private_key(key_pem, password=None)
109
110 if not isinstance(private_key, ec.EllipticCurvePrivateKey):
111 LOGGER.warning("WebRTC private key is not an EC key, will regenerate")
112 return None
113
114 return private_key, cert
115 except Exception as err:
116 LOGGER.warning("Failed to load WebRTC certificate: %s", err)
117 return None
118
119
120def _is_certificate_valid(cert: x509.Certificate) -> bool:
121 """Check if certificate is still valid with enough time remaining.
122
123 :param cert: The X.509 certificate to check.
124 :return: True if certificate is valid and has sufficient time remaining.
125 """
126 now = datetime.now(UTC)
127 not_after = cert.not_valid_after_utc
128
129 if now >= not_after:
130 return False
131
132 days_remaining = (not_after - now).days
133 return not days_remaining < CERT_RENEWAL_THRESHOLD_DAYS
134
135
136def get_or_create_webrtc_certificate(storage_path: str) -> RTCCertificate:
137 """Get or create a persistent WebRTC DTLS certificate.
138
139 Loads an existing certificate from disk if available and valid.
140 Otherwise, generates a new certificate and saves it.
141
142 :param storage_path: Directory to store/load the certificate files.
143 :return: RTCCertificate instance for use with WebRTC.
144 """
145 loaded = _load_certificate(storage_path)
146
147 if loaded is not None:
148 private_key, cert = loaded
149
150 if _is_certificate_valid(cert):
151 return RTCCertificate(key=private_key, cert=cert)
152
153 LOGGER.debug("Generating new WebRTC DTLS certificate (valid for %d days)", CERT_VALIDITY_DAYS)
154 private_key, cert = _generate_certificate()
155 _save_certificate(storage_path, private_key, cert)
156
157 return RTCCertificate(key=private_key, cert=cert)
158
159
160def _get_certificate_fingerprint(certificate: RTCCertificate) -> str:
161 """Get the SHA-256 fingerprint of a certificate.
162
163 :param certificate: The RTCCertificate to get the fingerprint for.
164 :return: SHA-256 fingerprint as colon-separated hex string (e.g., "A1:B2:C3:...").
165 """
166 fingerprints = certificate.getFingerprints()
167 for fp in fingerprints:
168 if fp.algorithm == "sha-256":
169 return fp.value
170 raise ValueError("SHA-256 fingerprint not found in certificate")
171
172
173def get_remote_id_from_certificate(certificate: RTCCertificate) -> str:
174 """Generate a remote ID from the certificate fingerprint.
175
176 Uses base32-encoded 128-bit truncation of the SHA-256 fingerprint.
177 This creates a deterministic remote ID tied to the certificate.
178
179 :param certificate: The RTCCertificate to derive the remote ID from.
180 :return: Custom base32-encoded (with 9s instead of 2s) remote ID string
181 (26 characters, uppercase, no-padding).
182 """
183 fingerprint = _get_certificate_fingerprint(certificate)
184
185 # Parse the colon-separated hex fingerprint to bytes
186 # Format: "A1:B2:C3:D4:..." -> bytes
187 fingerprint_bytes = bytes.fromhex(fingerprint.replace(":", ""))
188
189 # Take first 128 bits (16 bytes) of SHA-256
190 truncated = fingerprint_bytes[:16]
191
192 # Base32 encode (with 9s instead of 2s) and return (uppercase) without padding
193 return base64.b32encode(truncated).decode("ascii").rstrip("=").replace("2", "9")
194
195
196def create_peer_connection_with_certificate(
197 certificate: RTCCertificate,
198 configuration: RTCConfiguration | None = None,
199) -> RTCPeerConnection:
200 """Create an RTCPeerConnection with a custom persistent certificate.
201
202 :param certificate: The RTCCertificate to use for DTLS.
203 :param configuration: Optional RTCConfiguration with ICE servers.
204 :return: RTCPeerConnection configured with the provided certificate.
205 """
206 pc = RTCPeerConnection(configuration=configuration)
207 # Replace the auto-generated certificate with our persistent one
208 # Uses name-mangled private attribute access
209 pc._RTCPeerConnection__certificates = [certificate] # type: ignore[attr-defined]
210 return pc
211