Files
breakpilot-compliance/backend-compliance/compliance/api/screening_routes.py
Sharang Parnerkar c43d9da6d0 merge: sync with origin/main, take upstream on conflicts
# Conflicts:
#	admin-compliance/lib/sdk/types.ts
#	admin-compliance/lib/sdk/vendor-compliance/types.ts
2026-04-16 16:26:48 +02:00

347 lines
12 KiB
Python

"""
FastAPI routes for System Screening (SBOM Generation + Vulnerability Scan).
Endpoints:
- POST /v1/screening/scan: Upload dependency file, generate SBOM, scan for vulnerabilities
- GET /v1/screening/{screening_id}: Get screening result by ID
- GET /v1/screening: List screenings for a tenant
Phase 1 Step 4 refactor: parsing + SBOM generation + OSV scanning logic
moved to ``compliance.services.screening_service``. The scan handler still
references ``SessionLocal`` and ``scan_vulnerabilities`` from this module
so existing test mocks
(``patch("compliance.api.screening_routes.SessionLocal", ...)``,
``patch("compliance.api.screening_routes.scan_vulnerabilities", ...)``)
keep working without test edits. The lookup endpoints delegate to
``ScreeningService`` via ``Depends(get_db)``.
"""
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
import httpx
from fastapi import APIRouter, File, Form, UploadFile, HTTPException
from pydantic import BaseModel
from sqlalchemy import text
from database import SessionLocal # re-exported below for legacy test patches
from compliance.api._http_errors import translate_domain_errors
from compliance.schemas.screening import (
SBOMComponentResponse,
ScreeningListResponse,
ScreeningResponse,
SecurityIssueResponse,
)
from compliance.services.screening_service import (
ScreeningService,
detect_and_parse,
extract_fix_version,
generate_sbom,
map_osv_severity,
parse_package_lock,
parse_requirements_txt,
parse_yarn_lock,
query_osv,
scan_vulnerabilities,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/screening", tags=["system-screening"])
# =============================================================================
# ROUTES
# =============================================================================
@router.post("/scan", response_model=ScreeningResponse)
async def scan_dependencies(
file: UploadFile = File(...),
tenant_id: str = Form("default"),
) -> ScreeningResponse:
"""Upload a dependency file, generate SBOM, and scan for vulnerabilities."""
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
content = await file.read()
try:
file_text = content.decode("utf-8")
except UnicodeDecodeError:
raise HTTPException(
status_code=400, detail="File must be a text-based dependency file"
)
components, ecosystem = detect_and_parse(file.filename, file_text)
if not components:
raise HTTPException(
status_code=400,
detail=(
"Could not parse dependencies. Supported: package-lock.json, "
"requirements.txt, yarn.lock"
),
)
sbom = generate_sbom(components, ecosystem)
started_at = datetime.now(timezone.utc)
issues = await scan_vulnerabilities(components, ecosystem)
completed_at = datetime.now(timezone.utc)
critical = len([i for i in issues if i["severity"] == "CRITICAL"])
high = len([i for i in issues if i["severity"] == "HIGH"])
medium = len([i for i in issues if i["severity"] == "MEDIUM"])
low = len([i for i in issues if i["severity"] == "LOW"])
screening_id = str(uuid.uuid4())
db = SessionLocal()
try:
db.execute(
text("""INSERT INTO compliance_screenings
(id, tenant_id, status, sbom_format, sbom_version,
total_components, total_issues, critical_issues, high_issues, medium_issues, low_issues,
sbom_data, started_at, completed_at)
VALUES (:id, :tenant_id, 'completed', 'CycloneDX', '1.5',
:total_components, :total_issues, :critical, :high, :medium, :low,
:sbom_data::jsonb, :started_at, :completed_at)"""),
{
"id": screening_id,
"tenant_id": tenant_id,
"total_components": len(components),
"total_issues": len(issues),
"critical": critical,
"high": high,
"medium": medium,
"low": low,
"sbom_data": json.dumps(sbom),
"started_at": started_at,
"completed_at": completed_at,
},
)
for issue in issues:
db.execute(
text("""INSERT INTO compliance_security_issues
(id, screening_id, severity, title, description, cve, cvss,
affected_component, affected_version, fixed_in, remediation, status)
VALUES (:id, :screening_id, :severity, :title, :description, :cve, :cvss,
:component, :version, :fixed_in, :remediation, :status)"""),
{
"id": issue["id"],
"screening_id": screening_id,
"severity": issue["severity"],
"title": issue["title"][:500],
"description": issue.get("description", "")[:1000],
"cve": issue.get("cve"),
"cvss": issue.get("cvss"),
"component": issue["affected_component"],
"version": issue.get("affected_version"),
"fixed_in": issue.get("fixed_in"),
"remediation": issue.get("remediation"),
"status": issue["status"],
},
)
db.commit()
except Exception as exc: # noqa: BLE001
db.rollback()
logger.error(f"Failed to persist screening: {exc}")
finally:
db.close()
# Build response
comp_vulns: dict[str, list[dict[str, Any]]] = {}
for issue in issues:
comp_vulns.setdefault(issue["affected_component"], []).append({
"id": issue.get("cve") or issue["id"],
"cve": issue.get("cve"),
"severity": issue["severity"],
"title": issue["title"],
"cvss": issue.get("cvss"),
"fixedIn": issue.get("fixed_in"),
})
sbom_components = [
SBOMComponentResponse(
name=sc["name"],
version=sc["version"],
type=sc["type"],
purl=sc["purl"],
licenses=sc.get("licenses", []),
vulnerabilities=comp_vulns.get(sc["name"], []),
)
for sc in sbom["components"]
]
issue_responses = [
SecurityIssueResponse(
id=i["id"],
severity=i["severity"],
title=i["title"],
description=i.get("description"),
cve=i.get("cve"),
cvss=i.get("cvss"),
affected_component=i["affected_component"],
affected_version=i.get("affected_version"),
fixed_in=i.get("fixed_in"),
remediation=i.get("remediation"),
status=i["status"],
)
for i in issues
]
return ScreeningResponse(
id=screening_id,
status="completed",
sbom_format="CycloneDX",
sbom_version="1.5",
total_components=len(components),
total_issues=len(issues),
critical_issues=critical,
high_issues=high,
medium_issues=medium,
low_issues=low,
components=sbom_components,
issues=issue_responses,
started_at=started_at.isoformat(),
completed_at=completed_at.isoformat(),
)
@router.get("/{screening_id}", response_model=ScreeningResponse)
async def get_screening(screening_id: str) -> ScreeningResponse:
"""Get a screening result by ID."""
db = SessionLocal()
try:
result = db.execute(
text("""SELECT id, status, sbom_format, sbom_version,
total_components, total_issues, critical_issues, high_issues,
medium_issues, low_issues, sbom_data, started_at, completed_at
FROM compliance_screenings WHERE id = :id"""),
{"id": screening_id},
)
row = result.fetchone()
if not row:
raise HTTPException(status_code=404, detail="Screening not found")
# Fetch issues
issues_result = db.execute(
text("""SELECT id, severity, title, description, cve, cvss,
affected_component, affected_version, fixed_in, remediation, status
FROM compliance_security_issues WHERE screening_id = :id"""),
{"id": screening_id},
)
issues_rows = issues_result.fetchall()
issues = [
SecurityIssueResponse(
id=str(r[0]), severity=r[1], title=r[2], description=r[3],
cve=r[4], cvss=r[5], affected_component=r[6],
affected_version=r[7], fixed_in=r[8], remediation=r[9], status=r[10],
)
for r in issues_rows
]
# Reconstruct components from SBOM data
sbom_data = row[10] or {}
components = []
comp_vulns: dict[str, list[dict]] = {}
for issue in issues:
if issue.affected_component not in comp_vulns:
comp_vulns[issue.affected_component] = []
comp_vulns[issue.affected_component].append({
"id": issue.cve or issue.id,
"cve": issue.cve,
"severity": issue.severity,
"title": issue.title,
"cvss": issue.cvss,
"fixedIn": issue.fixed_in,
})
for sc in sbom_data.get("components", []):
components.append(SBOMComponentResponse(
name=sc["name"],
version=sc["version"],
type=sc.get("type", "library"),
purl=sc.get("purl", ""),
licenses=sc.get("licenses", []),
vulnerabilities=comp_vulns.get(sc["name"], []),
))
return ScreeningResponse(
id=str(row[0]),
status=row[1],
sbom_format=row[2] or "CycloneDX",
sbom_version=row[3] or "1.5",
total_components=row[4] or 0,
total_issues=row[5] or 0,
critical_issues=row[6] or 0,
high_issues=row[7] or 0,
medium_issues=row[8] or 0,
low_issues=row[9] or 0,
components=components,
issues=issues,
started_at=str(row[11]) if row[11] else None,
completed_at=str(row[12]) if row[12] else None,
)
finally:
db.close()
@router.get("", response_model=ScreeningListResponse)
async def list_screenings(tenant_id: str = "default") -> ScreeningListResponse:
"""List all screenings for a tenant."""
db = SessionLocal()
try:
result = db.execute(
text("""SELECT id, status, total_components, total_issues,
critical_issues, high_issues, medium_issues, low_issues,
started_at, completed_at, created_at
FROM compliance_screenings
WHERE tenant_id = :tenant_id
ORDER BY created_at DESC"""),
{"tenant_id": tenant_id},
)
rows = result.fetchall()
screenings = [
{
"id": str(r[0]),
"status": r[1],
"total_components": r[2],
"total_issues": r[3],
"critical_issues": r[4],
"high_issues": r[5],
"medium_issues": r[6],
"low_issues": r[7],
"started_at": str(r[8]) if r[8] else None,
"completed_at": str(r[9]) if r[9] else None,
"created_at": str(r[10]),
}
for r in rows
]
return ScreeningListResponse(screenings=screenings, total=len(screenings))
finally:
db.close()
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that import helpers + schemas directly.
# ----------------------------------------------------------------------------
__all__ = [
"router",
"SessionLocal",
"parse_package_lock",
"parse_requirements_txt",
"parse_yarn_lock",
"detect_and_parse",
"generate_sbom",
"query_osv",
"map_osv_severity",
"extract_fix_version",
"scan_vulnerabilities",
"ScreeningResponse",
"ScreeningListResponse",
"SBOMComponentResponse",
"SecurityIssueResponse",
]