/
/
/
1#!/usr/bin/env python3
2"""Check PyPI package metadata for security and supply chain concerns.
3
4This script checks new or updated Python dependencies for suspicious indicators
5that might suggest supply chain attacks or unmaintained packages.
6"""
7
8# ruff: noqa: T201, S310, RUF001, PLR0915
9import json
10import re
11import sys
12import urllib.request
13from datetime import datetime
14from typing import Any
15
16# OSI-approved and common compatible licenses
17COMPATIBLE_LICENSES = {
18 "MIT",
19 "Apache-2.0",
20 "Apache Software License",
21 "BSD",
22 "BSD-3-Clause",
23 "BSD-2-Clause",
24 "ISC",
25 "Python Software Foundation License",
26 "PSF",
27 "LGPL",
28 "MPL-2.0",
29 "Unlicense",
30 "CC0",
31}
32
33# Common packages to check for typosquatting (popular Python packages)
34POPULAR_PACKAGES = {
35 "requests",
36 "urllib3",
37 "setuptools",
38 "certifi",
39 "pip",
40 "numpy",
41 "pandas",
42 "boto3",
43 "botocore",
44 "awscli",
45 "django",
46 "flask",
47 "sqlalchemy",
48 "pytest",
49 "pydantic",
50 "aiohttp",
51 "fastapi",
52}
53
54
55def check_typosquatting(package_name: str) -> str | None:
56 """Check if package name might be typosquatting a popular package.
57
58 :param package_name: The package name to check.
59 """
60 package_lower = package_name.lower().replace("-", "").replace("_", "")
61
62 for popular in POPULAR_PACKAGES:
63 popular_normalized = popular.lower().replace("-", "").replace("_", "")
64
65 # Check for common typosquatting techniques
66 if package_lower == popular_normalized:
67 continue # Exact match is fine
68
69 # Check edit distance (1-2 character changes)
70 if len(package_lower) == len(popular_normalized):
71 differences = sum(
72 c1 != c2 for c1, c2 in zip(package_lower, popular_normalized, strict=True)
73 )
74 if differences == 1:
75 return f"Suspicious: Very similar to popular package '{popular}'"
76
77 # Check for common substitutions
78 substitutions = [
79 ("0", "o"),
80 ("1", "l"),
81 ("1", "i"),
82 ]
83 for old, new in substitutions:
84 if old in package_lower:
85 test_name = package_lower.replace(old, new)
86 if test_name == popular_normalized:
87 return f"Suspicious: Character substitution of popular package '{popular}'"
88
89 return None
90
91
92def check_license_compatibility(license_str: str) -> tuple[bool, str]:
93 """Check if license is compatible with the project.
94
95 :param license_str: The license string from PyPI.
96 """
97 if not license_str or license_str == "Unknown":
98 return False, "No license information"
99
100 license_upper = license_str.upper()
101
102 # Check against compatible licenses
103 for compatible in COMPATIBLE_LICENSES:
104 if compatible.upper() in license_upper:
105 return True, f"Compatible ({license_str})"
106
107 # Check for problematic licenses
108 problematic = ["GPL", "AGPL", "SSPL"]
109 for problem in problematic:
110 if problem in license_upper and "LGPL" not in license_upper:
111 return False, f"Incompatible copyleft license ({license_str})"
112
113 # Unknown license
114 return False, f"Unknown/unverified license ({license_str})"
115
116
117def parse_requirement(line: str) -> str | None:
118 """Extract package name from a requirement line.
119
120 :param line: A line from requirements.txt (e.g., "package==1.0.0" or "package>=1.0")
121 """
122 line = line.strip()
123 if not line or line.startswith("#"):
124 return None
125
126 # Handle various requirement formats
127 # package==1.0.0, package>=1.0, package[extra]>=1.0, etc.
128 match = re.match(r"^([a-zA-Z0-9_-]+)", line)
129 if match:
130 return match.group(1).lower()
131 return None
132
133
134def get_pypi_metadata(package_name: str) -> dict[str, Any] | None:
135 """Fetch package metadata from PyPI JSON API.
136
137 :param package_name: The name of the package to check.
138 """
139 url = f"https://pypi.org/pypi/{package_name}/json"
140
141 try:
142 with urllib.request.urlopen(url, timeout=10) as response:
143 return json.loads(response.read())
144 except urllib.error.HTTPError as err:
145 if err.code == 404:
146 print(f"â Package '{package_name}' not found on PyPI")
147 else:
148 print(f"â ï¸ Error fetching metadata for '{package_name}': {err}")
149 return None
150 except Exception as err:
151 print(f"â ï¸ Error fetching metadata for '{package_name}': {err}")
152 return None
153
154
155def check_package(package_name: str) -> dict[str, Any]:
156 """Check a single package for security concerns.
157
158 :param package_name: The name of the package to check.
159 """
160 data = get_pypi_metadata(package_name)
161
162 if not data:
163 return {
164 "name": package_name,
165 "error": "Could not fetch package metadata",
166 "risk_level": "unknown",
167 "warnings": [],
168 }
169
170 info = data.get("info", {})
171 releases = data.get("releases", {})
172
173 # Get package age
174 upload_times = []
175 for release_files in releases.values():
176 if release_files:
177 for file_info in release_files:
178 if "upload_time" in file_info:
179 try:
180 upload_time_str = file_info["upload_time"]
181 # Handle both formats: with 'Z' suffix or with timezone
182 if upload_time_str.endswith("Z"):
183 upload_time_str = upload_time_str[:-1] + "+00:00"
184 upload_time = datetime.fromisoformat(upload_time_str)
185 upload_times.append(upload_time)
186 except (ValueError, AttributeError):
187 continue
188
189 first_upload = min(upload_times) if upload_times else None
190 age_days = (datetime.now(first_upload.tzinfo) - first_upload).days if first_upload else 0
191
192 # Extract metadata
193 project_urls = info.get("project_urls") or {}
194 homepage = info.get("home_page") or project_urls.get("Homepage")
195 source = project_urls.get("Source") or project_urls.get("Repository")
196
197 # Run automated security checks
198 typosquat_check = check_typosquatting(package_name)
199 license_compatible, license_status = check_license_compatibility(info.get("license", "Unknown"))
200
201 checks = {
202 "name": package_name,
203 "version": info.get("version", "unknown"),
204 "age_days": age_days,
205 "total_releases": len(releases),
206 "has_homepage": bool(homepage),
207 "has_source": bool(source),
208 "author": info.get("author") or info.get("maintainer") or "Unknown",
209 "license": info.get("license") or "Unknown",
210 "summary": info.get("summary", "No description"),
211 "warnings": [],
212 "info_items": [],
213 "risk_level": "low",
214 "automated_checks": {
215 "trusted_source": bool(source),
216 "typosquatting": typosquat_check is None,
217 "license_compatible": license_compatible,
218 },
219 "check_details": {
220 "typosquatting": typosquat_check or "â No typosquatting detected",
221 "license": license_status,
222 },
223 }
224
225 # Check for suspicious indicators
226 risk_score = 0
227
228 # Typosquatting check
229 if typosquat_check:
230 checks["warnings"].append(typosquat_check)
231 risk_score += 5 # High risk
232
233 # License check
234 if not license_compatible:
235 checks["warnings"].append(f"License issue: {license_status}")
236 risk_score += 2
237
238 if age_days < 30:
239 checks["warnings"].append(f"Very new package (only {age_days} days old)")
240 risk_score += 3
241 elif age_days < 90:
242 checks["warnings"].append(f"Relatively new package ({age_days} days old)")
243 risk_score += 1
244
245 if checks["total_releases"] < 3:
246 checks["warnings"].append(f"Very few releases (only {checks['total_releases']})")
247 risk_score += 2
248
249 if not source:
250 checks["warnings"].append("No source repository linked")
251 risk_score += 2
252
253 if not homepage and not source:
254 checks["warnings"].append("No homepage or source repository")
255 risk_score += 1
256
257 if checks["author"] == "Unknown":
258 checks["warnings"].append("No author information available")
259 risk_score += 1
260
261 # Add informational items
262 checks["info_items"].append(f"Age: {age_days} days")
263 checks["info_items"].append(f"Releases: {checks['total_releases']}")
264 checks["info_items"].append(f"Author: {checks['author']}")
265 checks["info_items"].append(f"License: {checks['license']}")
266 if source:
267 checks["info_items"].append(f"Source: {source}")
268
269 # Determine risk level
270 if risk_score >= 5:
271 checks["risk_level"] = "high"
272 elif risk_score >= 3:
273 checks["risk_level"] = "medium"
274 else:
275 checks["risk_level"] = "low"
276
277 return checks
278
279
280def format_check_result(result: dict[str, Any]) -> str:
281 """Format a check result for display.
282
283 :param result: The check result dictionary.
284 """
285 risk_emoji = {"high": "ð´", "medium": "ð¡", "low": "ð¢", "unknown": "âª"}
286 version = result.get("version", "unknown")
287
288 lines = [f"\n{risk_emoji[result['risk_level']]} **{result['name']}** (v{version})"]
289
290 if result.get("error"):
291 lines.append(f" â {result['error']}")
292 return "\n".join(lines)
293
294 if result.get("summary"):
295 lines.append(f" ð {result['summary']}")
296
297 if result.get("info_items"):
298 for item in result["info_items"]:
299 lines.append(f" â¹ï¸ {item}")
300
301 if result.get("warnings"):
302 for warning in result["warnings"]:
303 lines.append(f" â ï¸ {warning}")
304
305 return "\n".join(lines)
306
307
308def main() -> int:
309 """Run the package safety check."""
310 if len(sys.argv) < 2:
311 print("Usage: check_package_safety.py <requirements_file_or_package_name>")
312 print(" Or: check_package_safety.py package1 package2 package3")
313 return 1
314
315 packages = []
316
317 # Check if first argument is a file
318 if len(sys.argv) == 2 and sys.argv[1].endswith(".txt"):
319 try:
320 with open(sys.argv[1]) as f:
321 for line in f:
322 package = parse_requirement(line)
323 if package:
324 packages.append(package)
325 except FileNotFoundError:
326 print(f"Error: File '{sys.argv[1]}' not found")
327 return 1
328 else:
329 # Treat arguments as package names
330 packages = [arg.lower() for arg in sys.argv[1:]]
331
332 if not packages:
333 print("No packages to check")
334 return 0
335
336 print(f"Checking {len(packages)} package(s)...\n")
337 print("=" * 80)
338
339 results = []
340 for package in packages:
341 result = check_package(package)
342 results.append(result)
343 print(format_check_result(result))
344
345 print("\n" + "=" * 80)
346
347 # Automated checks summary
348 all_trusted = all(r.get("automated_checks", {}).get("trusted_source", False) for r in results)
349 all_no_typosquat = all(
350 r.get("automated_checks", {}).get("typosquatting", False) for r in results
351 )
352 all_license_ok = all(
353 r.get("automated_checks", {}).get("license_compatible", False) for r in results
354 )
355
356 print("\nð¤ Automated Security Checks:")
357 trusted_msg = (
358 "All packages have source repositories"
359 if all_trusted
360 else "Some packages missing source info"
361 )
362 print(f" {'â
' if all_trusted else 'â'} Trusted Sources: {trusted_msg}")
363
364 typosquat_msg = (
365 "No suspicious package names detected"
366 if all_no_typosquat
367 else "Possible typosquatting detected!"
368 )
369 print(f" {'â
' if all_no_typosquat else 'â'} Typosquatting: {typosquat_msg}")
370
371 license_msg = (
372 "All licenses are compatible" if all_license_ok else "Some license issues detected"
373 )
374 print(f" {'â
' if all_license_ok else 'â ï¸ '} License Compatibility: {license_msg}")
375
376 # Summary
377 high_risk = sum(1 for r in results if r["risk_level"] == "high")
378 medium_risk = sum(1 for r in results if r["risk_level"] == "medium")
379 low_risk = sum(1 for r in results if r["risk_level"] == "low")
380
381 print(f"\nð Summary: {len(results)} packages checked")
382 if high_risk:
383 print(f" ð´ High risk: {high_risk}")
384 if medium_risk:
385 print(f" ð¡ Medium risk: {medium_risk}")
386 print(f" ð¢ Low risk: {low_risk}")
387
388 if high_risk > 0:
389 print("\nâ ï¸ High-risk packages detected! Manual review strongly recommended.")
390 return 2
391 if medium_risk > 0:
392 print("\nâ ï¸ Medium-risk packages detected. Please review before merging.")
393 return 1
394
395 print("\nâ
All packages passed basic safety checks.")
396 return 0
397
398
399if __name__ == "__main__":
400 sys.exit(main())
401