/
/
/
1#!/usr/bin/env python3
2"""Check PyPI package metadata for security and supply chain concerns.
3
4This script checks new or updated Python dependencies for suspicious indicators
5that might suggest supply chain attacks or unmaintained packages.
6"""
7
8# ruff: noqa: T201, S310, RUF001, PLR0915
9import json
10import re
11import sys
12import urllib.request
13from datetime import datetime
14from typing import Any
15
16
17def parse_requirement(line: str) -> str | None:
18 """Extract package name from a requirement line.
19
20 :param line: A line from requirements.txt (e.g., "package==1.0.0" or "package>=1.0")
21 """
22 line = line.strip()
23 if not line or line.startswith("#"):
24 return None
25
26 # Handle various requirement formats
27 # package==1.0.0, package>=1.0, package[extra]>=1.0, etc.
28 match = re.match(r"^([a-zA-Z0-9_-]+)", line)
29 if match:
30 return match.group(1).lower()
31 return None
32
33
34def get_pypi_metadata(package_name: str) -> dict[str, Any] | None:
35 """Fetch package metadata from PyPI JSON API.
36
37 :param package_name: The name of the package to check.
38 """
39 url = f"https://pypi.org/pypi/{package_name}/json"
40
41 try:
42 with urllib.request.urlopen(url, timeout=10) as response:
43 return json.loads(response.read())
44 except urllib.error.HTTPError as err:
45 if err.code == 404:
46 print(f"â Package '{package_name}' not found on PyPI")
47 else:
48 print(f"â ï¸ Error fetching metadata for '{package_name}': {err}")
49 return None
50 except Exception as err:
51 print(f"â ï¸ Error fetching metadata for '{package_name}': {err}")
52 return None
53
54
55def check_package(package_name: str) -> dict[str, Any]:
56 """Check a single package for security concerns.
57
58 :param package_name: The name of the package to check.
59 """
60 data = get_pypi_metadata(package_name)
61
62 if not data:
63 return {
64 "name": package_name,
65 "error": "Could not fetch package metadata",
66 "risk_level": "unknown",
67 "warnings": [],
68 }
69
70 info = data.get("info", {})
71 releases = data.get("releases", {})
72
73 # Get package age
74 upload_times = []
75 for release_files in releases.values():
76 if release_files:
77 for file_info in release_files:
78 if "upload_time" in file_info:
79 try:
80 upload_time_str = file_info["upload_time"]
81 # Handle both formats: with 'Z' suffix or with timezone
82 if upload_time_str.endswith("Z"):
83 upload_time_str = upload_time_str[:-1] + "+00:00"
84 upload_time = datetime.fromisoformat(upload_time_str)
85 upload_times.append(upload_time)
86 except (ValueError, AttributeError):
87 continue
88
89 first_upload = min(upload_times) if upload_times else None
90 age_days = (datetime.now(first_upload.tzinfo) - first_upload).days if first_upload else 0
91
92 # Extract metadata
93 project_urls = info.get("project_urls") or {}
94 homepage = info.get("home_page") or project_urls.get("Homepage")
95 source = project_urls.get("Source") or project_urls.get("Repository")
96
97 checks = {
98 "name": package_name,
99 "version": info.get("version", "unknown"),
100 "age_days": age_days,
101 "total_releases": len(releases),
102 "has_homepage": bool(homepage),
103 "has_source": bool(source),
104 "author": info.get("author") or info.get("maintainer") or "Unknown",
105 "license": info.get("license") or "Unknown",
106 "summary": info.get("summary", "No description"),
107 "warnings": [],
108 "info_items": [],
109 "risk_level": "low",
110 }
111
112 # Check for suspicious indicators
113 risk_score = 0
114
115 if age_days < 30:
116 checks["warnings"].append(f"Very new package (only {age_days} days old)")
117 risk_score += 3
118 elif age_days < 90:
119 checks["warnings"].append(f"Relatively new package ({age_days} days old)")
120 risk_score += 1
121
122 if checks["total_releases"] < 3:
123 checks["warnings"].append(f"Very few releases (only {checks['total_releases']})")
124 risk_score += 2
125
126 if not source:
127 checks["warnings"].append("No source repository linked")
128 risk_score += 2
129
130 if not homepage and not source:
131 checks["warnings"].append("No homepage or source repository")
132 risk_score += 1
133
134 if checks["author"] == "Unknown":
135 checks["warnings"].append("No author information available")
136 risk_score += 1
137
138 # Add informational items
139 checks["info_items"].append(f"Age: {age_days} days")
140 checks["info_items"].append(f"Releases: {checks['total_releases']}")
141 checks["info_items"].append(f"Author: {checks['author']}")
142 checks["info_items"].append(f"License: {checks['license']}")
143 if source:
144 checks["info_items"].append(f"Source: {source}")
145
146 # Determine risk level
147 if risk_score >= 5:
148 checks["risk_level"] = "high"
149 elif risk_score >= 3:
150 checks["risk_level"] = "medium"
151 else:
152 checks["risk_level"] = "low"
153
154 return checks
155
156
157def format_check_result(result: dict[str, Any]) -> str:
158 """Format a check result for display.
159
160 :param result: The check result dictionary.
161 """
162 risk_emoji = {"high": "ð´", "medium": "ð¡", "low": "ð¢", "unknown": "âª"}
163 version = result.get("version", "unknown")
164
165 lines = [f"\n{risk_emoji[result['risk_level']]} **{result['name']}** (v{version})"]
166
167 if result.get("error"):
168 lines.append(f" â {result['error']}")
169 return "\n".join(lines)
170
171 if result.get("summary"):
172 lines.append(f" ð {result['summary']}")
173
174 if result.get("info_items"):
175 for item in result["info_items"]:
176 lines.append(f" â¹ï¸ {item}")
177
178 if result.get("warnings"):
179 for warning in result["warnings"]:
180 lines.append(f" â ï¸ {warning}")
181
182 return "\n".join(lines)
183
184
185def main() -> int:
186 """Run the package safety check."""
187 if len(sys.argv) < 2:
188 print("Usage: check_package_safety.py <requirements_file_or_package_name>")
189 print(" Or: check_package_safety.py package1 package2 package3")
190 return 1
191
192 packages = []
193
194 # Check if first argument is a file
195 if len(sys.argv) == 2 and sys.argv[1].endswith(".txt"):
196 try:
197 with open(sys.argv[1]) as f:
198 for line in f:
199 package = parse_requirement(line)
200 if package:
201 packages.append(package)
202 except FileNotFoundError:
203 print(f"Error: File '{sys.argv[1]}' not found")
204 return 1
205 else:
206 # Treat arguments as package names
207 packages = [arg.lower() for arg in sys.argv[1:]]
208
209 if not packages:
210 print("No packages to check")
211 return 0
212
213 print(f"Checking {len(packages)} package(s)...\n")
214 print("=" * 80)
215
216 results = []
217 for package in packages:
218 result = check_package(package)
219 results.append(result)
220 print(format_check_result(result))
221
222 print("\n" + "=" * 80)
223
224 # Summary
225 high_risk = sum(1 for r in results if r["risk_level"] == "high")
226 medium_risk = sum(1 for r in results if r["risk_level"] == "medium")
227 low_risk = sum(1 for r in results if r["risk_level"] == "low")
228
229 print(f"\nð Summary: {len(results)} packages checked")
230 if high_risk:
231 print(f" ð´ High risk: {high_risk}")
232 if medium_risk:
233 print(f" ð¡ Medium risk: {medium_risk}")
234 print(f" ð¢ Low risk: {low_risk}")
235
236 if high_risk > 0:
237 print("\nâ ï¸ High-risk packages detected! Manual review strongly recommended.")
238 return 2
239 if medium_risk > 0:
240 print("\nâ ï¸ Medium-risk packages detected. Please review before merging.")
241 return 1
242
243 print("\nâ
All packages passed basic safety checks.")
244 return 0
245
246
247if __name__ == "__main__":
248 sys.exit(main())
249