SQLAlchemy 2.x requires raw SQL strings to be explicitly wrapped in text(). Fixed 16 instances across 5 route files. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
598 lines
20 KiB
Python
598 lines
20 KiB
Python
"""
|
|
FastAPI routes for System Screening (SBOM Generation + Vulnerability Scan).
|
|
|
|
Endpoints:
|
|
- POST /v1/screening/scan: Upload dependency file, generate SBOM, scan for vulnerabilities
|
|
- GET /v1/screening/{screening_id}: Get screening result by ID
|
|
- GET /v1/screening: List screenings for a tenant
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, File, Form, UploadFile, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import text
|
|
|
|
from database import SessionLocal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/v1/screening", tags=["system-screening"])
|
|
|
|
OSV_API_URL = "https://api.osv.dev/v1/query"
|
|
|
|
|
|
# =============================================================================
|
|
# RESPONSE MODELS
|
|
# =============================================================================
|
|
|
|
class SecurityIssueResponse(BaseModel):
|
|
id: str
|
|
severity: str
|
|
title: str
|
|
description: Optional[str] = None
|
|
cve: Optional[str] = None
|
|
cvss: Optional[float] = None
|
|
affected_component: str
|
|
affected_version: Optional[str] = None
|
|
fixed_in: Optional[str] = None
|
|
remediation: Optional[str] = None
|
|
status: str = "OPEN"
|
|
|
|
|
|
class SBOMComponentResponse(BaseModel):
|
|
name: str
|
|
version: str
|
|
type: str
|
|
purl: str
|
|
licenses: list[str]
|
|
vulnerabilities: list[dict]
|
|
|
|
|
|
class ScreeningResponse(BaseModel):
|
|
id: str
|
|
status: str
|
|
sbom_format: str
|
|
sbom_version: str
|
|
total_components: int
|
|
total_issues: int
|
|
critical_issues: int
|
|
high_issues: int
|
|
medium_issues: int
|
|
low_issues: int
|
|
components: list[SBOMComponentResponse]
|
|
issues: list[SecurityIssueResponse]
|
|
started_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
|
|
|
|
class ScreeningListResponse(BaseModel):
|
|
screenings: list[dict]
|
|
total: int
|
|
|
|
|
|
# =============================================================================
|
|
# DEPENDENCY PARSING
|
|
# =============================================================================
|
|
|
|
def parse_package_lock(content: str) -> list[dict]:
|
|
"""Parse package-lock.json and extract dependencies."""
|
|
try:
|
|
data = json.loads(content)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
|
|
components = []
|
|
|
|
# package-lock.json v2/v3 format (packages field)
|
|
packages = data.get("packages", {})
|
|
if packages:
|
|
for path, info in packages.items():
|
|
if not path: # Skip root
|
|
continue
|
|
name = path.split("node_modules/")[-1] if "node_modules/" in path else path
|
|
version = info.get("version", "unknown")
|
|
if name and version != "unknown":
|
|
components.append({
|
|
"name": name,
|
|
"version": version,
|
|
"type": "library",
|
|
"ecosystem": "npm",
|
|
"license": info.get("license", "unknown"),
|
|
})
|
|
|
|
# Fallback: v1 format (dependencies field)
|
|
if not components:
|
|
dependencies = data.get("dependencies", {})
|
|
for name, info in dependencies.items():
|
|
if isinstance(info, dict):
|
|
components.append({
|
|
"name": name,
|
|
"version": info.get("version", "unknown"),
|
|
"type": "library",
|
|
"ecosystem": "npm",
|
|
"license": "unknown",
|
|
})
|
|
|
|
return components
|
|
|
|
|
|
def parse_requirements_txt(content: str) -> list[dict]:
|
|
"""Parse requirements.txt and extract dependencies."""
|
|
components = []
|
|
for line in content.strip().split("\n"):
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or line.startswith("-"):
|
|
continue
|
|
|
|
# Match patterns: package==version, package>=version, package~=version
|
|
match = re.match(r'^([a-zA-Z0-9_.-]+)\s*([>=<~!]+)\s*([a-zA-Z0-9_.*-]+)', line)
|
|
if match:
|
|
components.append({
|
|
"name": match.group(1),
|
|
"version": match.group(3),
|
|
"type": "library",
|
|
"ecosystem": "PyPI",
|
|
"license": "unknown",
|
|
})
|
|
elif re.match(r'^[a-zA-Z0-9_.-]+$', line):
|
|
components.append({
|
|
"name": line,
|
|
"version": "latest",
|
|
"type": "library",
|
|
"ecosystem": "PyPI",
|
|
"license": "unknown",
|
|
})
|
|
|
|
return components
|
|
|
|
|
|
def parse_yarn_lock(content: str) -> list[dict]:
|
|
"""Parse yarn.lock and extract dependencies (basic)."""
|
|
components = []
|
|
current_name = None
|
|
for line in content.split("\n"):
|
|
# Match: "package@version":
|
|
match = re.match(r'^"?([^@]+)@[^"]*"?:', line)
|
|
if match:
|
|
current_name = match.group(1).strip()
|
|
elif current_name and line.strip().startswith("version "):
|
|
version_match = re.match(r'\s+version\s+"?([^"]+)"?', line)
|
|
if version_match:
|
|
components.append({
|
|
"name": current_name,
|
|
"version": version_match.group(1),
|
|
"type": "library",
|
|
"ecosystem": "npm",
|
|
"license": "unknown",
|
|
})
|
|
current_name = None
|
|
|
|
return components
|
|
|
|
|
|
def detect_and_parse(filename: str, content: str) -> tuple[list[dict], str]:
|
|
"""Detect file type and parse accordingly."""
|
|
fname = filename.lower()
|
|
|
|
if "package-lock" in fname or fname.endswith("package-lock.json"):
|
|
return parse_package_lock(content), "npm"
|
|
elif fname == "requirements.txt" or fname.endswith("/requirements.txt"):
|
|
return parse_requirements_txt(content), "PyPI"
|
|
elif "yarn.lock" in fname:
|
|
return parse_yarn_lock(content), "npm"
|
|
elif fname.endswith(".json"):
|
|
# Try package-lock format
|
|
comps = parse_package_lock(content)
|
|
if comps:
|
|
return comps, "npm"
|
|
|
|
# Fallback: try requirements.txt format
|
|
comps = parse_requirements_txt(content)
|
|
if comps:
|
|
return comps, "PyPI"
|
|
|
|
return [], "unknown"
|
|
|
|
|
|
# =============================================================================
|
|
# SBOM GENERATION (CycloneDX format)
|
|
# =============================================================================
|
|
|
|
def generate_sbom(components: list[dict], ecosystem: str) -> dict:
|
|
"""Generate a CycloneDX 1.5 SBOM from parsed components."""
|
|
sbom_components = []
|
|
for comp in components:
|
|
purl = f"pkg:{ecosystem.lower()}/{comp['name']}@{comp['version']}"
|
|
sbom_components.append({
|
|
"type": "library",
|
|
"name": comp["name"],
|
|
"version": comp["version"],
|
|
"purl": purl,
|
|
"licenses": [comp.get("license", "unknown")] if comp.get("license") != "unknown" else [],
|
|
})
|
|
|
|
return {
|
|
"bomFormat": "CycloneDX",
|
|
"specVersion": "1.5",
|
|
"version": 1,
|
|
"metadata": {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"tools": [{"name": "breakpilot-screening", "version": "1.0.0"}],
|
|
},
|
|
"components": sbom_components,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# VULNERABILITY SCANNING (OSV.dev API)
|
|
# =============================================================================
|
|
|
|
async def query_osv(name: str, version: str, ecosystem: str) -> list[dict]:
|
|
"""Query OSV.dev API for vulnerabilities of a single package."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
|
response = await client.post(
|
|
OSV_API_URL,
|
|
json={
|
|
"package": {"name": name, "ecosystem": ecosystem},
|
|
"version": version,
|
|
},
|
|
)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return data.get("vulns", [])
|
|
except Exception as e:
|
|
logger.warning(f"OSV query failed for {name}@{version}: {e}")
|
|
|
|
return []
|
|
|
|
|
|
def map_osv_severity(vuln: dict) -> tuple[str, float]:
|
|
"""Extract severity and CVSS from OSV vulnerability data."""
|
|
severity = "MEDIUM"
|
|
cvss = 5.0
|
|
|
|
# Check database_specific for severity
|
|
db_specific = vuln.get("database_specific", {})
|
|
if "severity" in db_specific:
|
|
sev_str = db_specific["severity"].upper()
|
|
if sev_str in ("CRITICAL", "HIGH", "MEDIUM", "LOW"):
|
|
severity = sev_str
|
|
|
|
# Derive CVSS from severity if not found
|
|
cvss_map = {"CRITICAL": 9.5, "HIGH": 7.5, "MEDIUM": 5.0, "LOW": 2.5}
|
|
cvss = cvss_map.get(severity, 5.0)
|
|
|
|
return severity, cvss
|
|
|
|
|
|
def extract_fix_version(vuln: dict, package_name: str) -> Optional[str]:
|
|
"""Extract the fixed-in version from OSV data."""
|
|
for affected in vuln.get("affected", []):
|
|
pkg = affected.get("package", {})
|
|
if pkg.get("name", "").lower() == package_name.lower():
|
|
for rng in affected.get("ranges", []):
|
|
for event in rng.get("events", []):
|
|
if "fixed" in event:
|
|
return event["fixed"]
|
|
return None
|
|
|
|
|
|
async def scan_vulnerabilities(components: list[dict], ecosystem: str) -> list[dict]:
|
|
"""Scan all components for vulnerabilities via OSV.dev."""
|
|
issues = []
|
|
|
|
# Batch: scan up to 50 components to avoid timeouts
|
|
scan_limit = min(len(components), 50)
|
|
|
|
for comp in components[:scan_limit]:
|
|
if comp["version"] in ("latest", "unknown", "*"):
|
|
continue
|
|
|
|
vulns = await query_osv(comp["name"], comp["version"], ecosystem)
|
|
|
|
for vuln in vulns:
|
|
vuln_id = vuln.get("id", f"OSV-{uuid.uuid4().hex[:8]}")
|
|
aliases = vuln.get("aliases", [])
|
|
cve = next((a for a in aliases if a.startswith("CVE-")), None)
|
|
severity, cvss = map_osv_severity(vuln)
|
|
fixed_in = extract_fix_version(vuln, comp["name"])
|
|
|
|
issues.append({
|
|
"id": str(uuid.uuid4()),
|
|
"severity": severity,
|
|
"title": vuln.get("summary", vuln_id),
|
|
"description": vuln.get("details", "")[:500],
|
|
"cve": cve,
|
|
"cvss": cvss,
|
|
"affected_component": comp["name"],
|
|
"affected_version": comp["version"],
|
|
"fixed_in": fixed_in,
|
|
"remediation": f"Upgrade {comp['name']} to {fixed_in}" if fixed_in else f"Check {vuln_id} for remediation steps",
|
|
"status": "OPEN",
|
|
})
|
|
|
|
return issues
|
|
|
|
|
|
# =============================================================================
|
|
# ROUTES
|
|
# =============================================================================
|
|
|
|
@router.post("/scan", response_model=ScreeningResponse)
|
|
async def scan_dependencies(
|
|
file: UploadFile = File(...),
|
|
tenant_id: str = Form("default"),
|
|
):
|
|
"""Upload a dependency file, generate SBOM, and scan for vulnerabilities."""
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail="No file provided")
|
|
|
|
content = await file.read()
|
|
try:
|
|
text = content.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
raise HTTPException(status_code=400, detail="File must be a text-based dependency file")
|
|
|
|
# Parse dependencies
|
|
components, ecosystem = detect_and_parse(file.filename, text)
|
|
if not components:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Could not parse dependencies. Supported: package-lock.json, requirements.txt, yarn.lock",
|
|
)
|
|
|
|
# Generate SBOM
|
|
sbom = generate_sbom(components, ecosystem)
|
|
|
|
# Scan for vulnerabilities
|
|
started_at = datetime.now(timezone.utc)
|
|
issues = await scan_vulnerabilities(components, ecosystem)
|
|
completed_at = datetime.now(timezone.utc)
|
|
|
|
# Count severities
|
|
critical = len([i for i in issues if i["severity"] == "CRITICAL"])
|
|
high = len([i for i in issues if i["severity"] == "HIGH"])
|
|
medium = len([i for i in issues if i["severity"] == "MEDIUM"])
|
|
low = len([i for i in issues if i["severity"] == "LOW"])
|
|
|
|
# Persist to database
|
|
screening_id = str(uuid.uuid4())
|
|
db = SessionLocal()
|
|
try:
|
|
db.execute(
|
|
text("""INSERT INTO compliance_screenings
|
|
(id, tenant_id, status, sbom_format, sbom_version,
|
|
total_components, total_issues, critical_issues, high_issues, medium_issues, low_issues,
|
|
sbom_data, started_at, completed_at)
|
|
VALUES (:id, :tenant_id, 'completed', 'CycloneDX', '1.5',
|
|
:total_components, :total_issues, :critical, :high, :medium, :low,
|
|
:sbom_data::jsonb, :started_at, :completed_at)"""),
|
|
{
|
|
"id": screening_id,
|
|
"tenant_id": tenant_id,
|
|
"total_components": len(components),
|
|
"total_issues": len(issues),
|
|
"critical": critical,
|
|
"high": high,
|
|
"medium": medium,
|
|
"low": low,
|
|
"sbom_data": json.dumps(sbom),
|
|
"started_at": started_at,
|
|
"completed_at": completed_at,
|
|
},
|
|
)
|
|
|
|
# Persist security issues
|
|
for issue in issues:
|
|
db.execute(
|
|
text("""INSERT INTO compliance_security_issues
|
|
(id, screening_id, severity, title, description, cve, cvss,
|
|
affected_component, affected_version, fixed_in, remediation, status)
|
|
VALUES (:id, :screening_id, :severity, :title, :description, :cve, :cvss,
|
|
:component, :version, :fixed_in, :remediation, :status)"""),
|
|
{
|
|
"id": issue["id"],
|
|
"screening_id": screening_id,
|
|
"severity": issue["severity"],
|
|
"title": issue["title"][:500],
|
|
"description": issue.get("description", "")[:1000],
|
|
"cve": issue.get("cve"),
|
|
"cvss": issue.get("cvss"),
|
|
"component": issue["affected_component"],
|
|
"version": issue.get("affected_version"),
|
|
"fixed_in": issue.get("fixed_in"),
|
|
"remediation": issue.get("remediation"),
|
|
"status": issue["status"],
|
|
},
|
|
)
|
|
|
|
db.commit()
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Failed to persist screening: {e}")
|
|
finally:
|
|
db.close()
|
|
|
|
# Build response
|
|
sbom_components = []
|
|
comp_vulns: dict[str, list[dict]] = {}
|
|
for issue in issues:
|
|
comp_name = issue["affected_component"]
|
|
if comp_name not in comp_vulns:
|
|
comp_vulns[comp_name] = []
|
|
comp_vulns[comp_name].append({
|
|
"id": issue.get("cve") or issue["id"],
|
|
"cve": issue.get("cve"),
|
|
"severity": issue["severity"],
|
|
"title": issue["title"],
|
|
"cvss": issue.get("cvss"),
|
|
"fixedIn": issue.get("fixed_in"),
|
|
})
|
|
|
|
for sc in sbom["components"]:
|
|
sbom_components.append(SBOMComponentResponse(
|
|
name=sc["name"],
|
|
version=sc["version"],
|
|
type=sc["type"],
|
|
purl=sc["purl"],
|
|
licenses=sc.get("licenses", []),
|
|
vulnerabilities=comp_vulns.get(sc["name"], []),
|
|
))
|
|
|
|
issue_responses = [
|
|
SecurityIssueResponse(
|
|
id=i["id"],
|
|
severity=i["severity"],
|
|
title=i["title"],
|
|
description=i.get("description"),
|
|
cve=i.get("cve"),
|
|
cvss=i.get("cvss"),
|
|
affected_component=i["affected_component"],
|
|
affected_version=i.get("affected_version"),
|
|
fixed_in=i.get("fixed_in"),
|
|
remediation=i.get("remediation"),
|
|
status=i["status"],
|
|
)
|
|
for i in issues
|
|
]
|
|
|
|
return ScreeningResponse(
|
|
id=screening_id,
|
|
status="completed",
|
|
sbom_format="CycloneDX",
|
|
sbom_version="1.5",
|
|
total_components=len(components),
|
|
total_issues=len(issues),
|
|
critical_issues=critical,
|
|
high_issues=high,
|
|
medium_issues=medium,
|
|
low_issues=low,
|
|
components=sbom_components,
|
|
issues=issue_responses,
|
|
started_at=started_at.isoformat(),
|
|
completed_at=completed_at.isoformat(),
|
|
)
|
|
|
|
|
|
@router.get("/{screening_id}", response_model=ScreeningResponse)
|
|
async def get_screening(screening_id: str):
|
|
"""Get a screening result by ID."""
|
|
db = SessionLocal()
|
|
try:
|
|
result = db.execute(
|
|
text("""SELECT id, status, sbom_format, sbom_version,
|
|
total_components, total_issues, critical_issues, high_issues,
|
|
medium_issues, low_issues, sbom_data, started_at, completed_at
|
|
FROM compliance_screenings WHERE id = :id"""),
|
|
{"id": screening_id},
|
|
)
|
|
row = result.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Screening not found")
|
|
|
|
# Fetch issues
|
|
issues_result = db.execute(
|
|
text("""SELECT id, severity, title, description, cve, cvss,
|
|
affected_component, affected_version, fixed_in, remediation, status
|
|
FROM compliance_security_issues WHERE screening_id = :id"""),
|
|
{"id": screening_id},
|
|
)
|
|
issues_rows = issues_result.fetchall()
|
|
|
|
issues = [
|
|
SecurityIssueResponse(
|
|
id=str(r[0]), severity=r[1], title=r[2], description=r[3],
|
|
cve=r[4], cvss=r[5], affected_component=r[6],
|
|
affected_version=r[7], fixed_in=r[8], remediation=r[9], status=r[10],
|
|
)
|
|
for r in issues_rows
|
|
]
|
|
|
|
# Reconstruct components from SBOM data
|
|
sbom_data = row[10] or {}
|
|
components = []
|
|
comp_vulns: dict[str, list[dict]] = {}
|
|
for issue in issues:
|
|
if issue.affected_component not in comp_vulns:
|
|
comp_vulns[issue.affected_component] = []
|
|
comp_vulns[issue.affected_component].append({
|
|
"id": issue.cve or issue.id,
|
|
"cve": issue.cve,
|
|
"severity": issue.severity,
|
|
"title": issue.title,
|
|
"cvss": issue.cvss,
|
|
"fixedIn": issue.fixed_in,
|
|
})
|
|
|
|
for sc in sbom_data.get("components", []):
|
|
components.append(SBOMComponentResponse(
|
|
name=sc["name"],
|
|
version=sc["version"],
|
|
type=sc.get("type", "library"),
|
|
purl=sc.get("purl", ""),
|
|
licenses=sc.get("licenses", []),
|
|
vulnerabilities=comp_vulns.get(sc["name"], []),
|
|
))
|
|
|
|
return ScreeningResponse(
|
|
id=str(row[0]),
|
|
status=row[1],
|
|
sbom_format=row[2] or "CycloneDX",
|
|
sbom_version=row[3] or "1.5",
|
|
total_components=row[4] or 0,
|
|
total_issues=row[5] or 0,
|
|
critical_issues=row[6] or 0,
|
|
high_issues=row[7] or 0,
|
|
medium_issues=row[8] or 0,
|
|
low_issues=row[9] or 0,
|
|
components=components,
|
|
issues=issues,
|
|
started_at=str(row[11]) if row[11] else None,
|
|
completed_at=str(row[12]) if row[12] else None,
|
|
)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@router.get("", response_model=ScreeningListResponse)
|
|
async def list_screenings(tenant_id: str = "default"):
|
|
"""List all screenings for a tenant."""
|
|
db = SessionLocal()
|
|
try:
|
|
result = db.execute(
|
|
text("""SELECT id, status, total_components, total_issues,
|
|
critical_issues, high_issues, medium_issues, low_issues,
|
|
started_at, completed_at, created_at
|
|
FROM compliance_screenings
|
|
WHERE tenant_id = :tenant_id
|
|
ORDER BY created_at DESC"""),
|
|
{"tenant_id": tenant_id},
|
|
)
|
|
rows = result.fetchall()
|
|
screenings = [
|
|
{
|
|
"id": str(r[0]),
|
|
"status": r[1],
|
|
"total_components": r[2],
|
|
"total_issues": r[3],
|
|
"critical_issues": r[4],
|
|
"high_issues": r[5],
|
|
"medium_issues": r[6],
|
|
"low_issues": r[7],
|
|
"started_at": str(r[8]) if r[8] else None,
|
|
"completed_at": str(r[9]) if r[9] else None,
|
|
"created_at": str(r[10]),
|
|
}
|
|
for r in rows
|
|
]
|
|
return ScreeningListResponse(screenings=screenings, total=len(screenings))
|
|
finally:
|
|
db.close()
|