Files
breakpilot-compliance/backend-compliance/compliance/api/screening_routes.py
Sharang Parnerkar e613af1a7d refactor(backend/api): extract ScreeningService (Step 4 — file 8 of 18)
compliance/api/screening_routes.py (597 LOC) -> 233 LOC thin routes +
353-line ScreeningService + 60-line schemas file. Manages SBOM generation
(CycloneDX 1.5) and OSV.dev vulnerability scanning.

Pure helpers (parse_package_lock, parse_requirements_txt, parse_yarn_lock,
detect_and_parse, generate_sbom, query_osv, map_osv_severity,
extract_fix_version, scan_vulnerabilities) moved to the service module.
The two lookup endpoints (get_screening, list_screenings) delegate to
the new ScreeningService class.

Test-mock compatibility: tests/test_screening_routes.py uses
`patch("compliance.api.screening_routes.SessionLocal", ...)` and
`patch("compliance.api.screening_routes.scan_vulnerabilities", ...)`.
Both names are re-imported and re-exported from the route module so the
patches still take effect. The scan handler keeps direct
`SessionLocal()` usage; the lookup handlers also use SessionLocal so the
test mocks intercept them.

Latent bug fixed: the original scan handler had
    text = content.decode("utf-8")
on line 339, shadowing the imported `sqlalchemy.text` so that the
subsequent `text("INSERT ...")` calls would have raised at runtime.
The variable is now named `file_text`. Allowed under "minor behavior
fixes" — the bug was unreachable in tests because they always patched
SessionLocal.

Verified:
  - 240/240 pytest pass
  - OpenAPI 360/484 unchanged
  - mypy compliance/ -> Success on 134 source files
  - screening_routes.py 597 -> 233 LOC
  - Hard-cap violations: 11 -> 10

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 20:03:16 +02:00

255 lines
8.6 KiB
Python

"""
FastAPI routes for System Screening (SBOM Generation + Vulnerability Scan).
Endpoints:
- POST /v1/screening/scan: Upload dependency file, generate SBOM, scan for vulnerabilities
- GET /v1/screening/{screening_id}: Get screening result by ID
- GET /v1/screening: List screenings for a tenant
Phase 1 Step 4 refactor: parsing + SBOM generation + OSV scanning logic
moved to ``compliance.services.screening_service``. The scan handler still
references ``SessionLocal`` and ``scan_vulnerabilities`` from this module
so existing test mocks
(``patch("compliance.api.screening_routes.SessionLocal", ...)``,
``patch("compliance.api.screening_routes.scan_vulnerabilities", ...)``)
keep working without test edits. The lookup endpoints delegate to
``ScreeningService`` via ``Depends(get_db)``.
"""
import json
import logging
import uuid
from datetime import datetime, timezone
from typing import Any
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from sqlalchemy import text
from database import SessionLocal # re-exported below for legacy test patches
from compliance.api._http_errors import translate_domain_errors
from compliance.schemas.screening import (
SBOMComponentResponse,
ScreeningListResponse,
ScreeningResponse,
SecurityIssueResponse,
)
from compliance.services.screening_service import (
ScreeningService,
detect_and_parse,
extract_fix_version,
generate_sbom,
map_osv_severity,
parse_package_lock,
parse_requirements_txt,
parse_yarn_lock,
query_osv,
scan_vulnerabilities,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/v1/screening", tags=["system-screening"])
# =============================================================================
# ROUTES
# =============================================================================
@router.post("/scan", response_model=ScreeningResponse)
async def scan_dependencies(
file: UploadFile = File(...),
tenant_id: str = Form("default"),
) -> ScreeningResponse:
"""Upload a dependency file, generate SBOM, and scan for vulnerabilities."""
if not file.filename:
raise HTTPException(status_code=400, detail="No file provided")
content = await file.read()
try:
file_text = content.decode("utf-8")
except UnicodeDecodeError:
raise HTTPException(
status_code=400, detail="File must be a text-based dependency file"
)
components, ecosystem = detect_and_parse(file.filename, file_text)
if not components:
raise HTTPException(
status_code=400,
detail=(
"Could not parse dependencies. Supported: package-lock.json, "
"requirements.txt, yarn.lock"
),
)
sbom = generate_sbom(components, ecosystem)
started_at = datetime.now(timezone.utc)
issues = await scan_vulnerabilities(components, ecosystem)
completed_at = datetime.now(timezone.utc)
critical = len([i for i in issues if i["severity"] == "CRITICAL"])
high = len([i for i in issues if i["severity"] == "HIGH"])
medium = len([i for i in issues if i["severity"] == "MEDIUM"])
low = len([i for i in issues if i["severity"] == "LOW"])
screening_id = str(uuid.uuid4())
db = SessionLocal()
try:
db.execute(
text(
"INSERT INTO compliance_screenings "
"(id, tenant_id, status, sbom_format, sbom_version, "
"total_components, total_issues, critical_issues, high_issues, "
"medium_issues, low_issues, sbom_data, started_at, completed_at) "
"VALUES (:id, :tenant_id, 'completed', 'CycloneDX', '1.5', "
":total_components, :total_issues, :critical, :high, :medium, :low, "
":sbom_data::jsonb, :started_at, :completed_at)"
),
{
"id": screening_id,
"tenant_id": tenant_id,
"total_components": len(components),
"total_issues": len(issues),
"critical": critical,
"high": high,
"medium": medium,
"low": low,
"sbom_data": json.dumps(sbom),
"started_at": started_at,
"completed_at": completed_at,
},
)
for issue in issues:
db.execute(
text(
"INSERT INTO compliance_security_issues "
"(id, screening_id, severity, title, description, cve, cvss, "
"affected_component, affected_version, fixed_in, remediation, status) "
"VALUES (:id, :screening_id, :severity, :title, :description, :cve, :cvss, "
":component, :version, :fixed_in, :remediation, :status)"
),
{
"id": issue["id"],
"screening_id": screening_id,
"severity": issue["severity"],
"title": issue["title"][:500],
"description": issue.get("description", "")[:1000],
"cve": issue.get("cve"),
"cvss": issue.get("cvss"),
"component": issue["affected_component"],
"version": issue.get("affected_version"),
"fixed_in": issue.get("fixed_in"),
"remediation": issue.get("remediation"),
"status": issue["status"],
},
)
db.commit()
except Exception as exc: # noqa: BLE001
db.rollback()
logger.error(f"Failed to persist screening: {exc}")
finally:
db.close()
# Build response
comp_vulns: dict[str, list[dict[str, Any]]] = {}
for issue in issues:
comp_vulns.setdefault(issue["affected_component"], []).append({
"id": issue.get("cve") or issue["id"],
"cve": issue.get("cve"),
"severity": issue["severity"],
"title": issue["title"],
"cvss": issue.get("cvss"),
"fixedIn": issue.get("fixed_in"),
})
sbom_components = [
SBOMComponentResponse(
name=sc["name"],
version=sc["version"],
type=sc["type"],
purl=sc["purl"],
licenses=sc.get("licenses", []),
vulnerabilities=comp_vulns.get(sc["name"], []),
)
for sc in sbom["components"]
]
issue_responses = [
SecurityIssueResponse(
id=i["id"],
severity=i["severity"],
title=i["title"],
description=i.get("description"),
cve=i.get("cve"),
cvss=i.get("cvss"),
affected_component=i["affected_component"],
affected_version=i.get("affected_version"),
fixed_in=i.get("fixed_in"),
remediation=i.get("remediation"),
status=i["status"],
)
for i in issues
]
return ScreeningResponse(
id=screening_id,
status="completed",
sbom_format="CycloneDX",
sbom_version="1.5",
total_components=len(components),
total_issues=len(issues),
critical_issues=critical,
high_issues=high,
medium_issues=medium,
low_issues=low,
components=sbom_components,
issues=issue_responses,
started_at=started_at.isoformat(),
completed_at=completed_at.isoformat(),
)
@router.get("/{screening_id}", response_model=ScreeningResponse)
async def get_screening(screening_id: str) -> ScreeningResponse:
"""Get a screening result by ID."""
db = SessionLocal()
try:
with translate_domain_errors():
return ScreeningService(db).get_screening(screening_id)
finally:
db.close()
@router.get("", response_model=ScreeningListResponse)
async def list_screenings(tenant_id: str = "default") -> ScreeningListResponse:
"""List all screenings for a tenant."""
db = SessionLocal()
try:
with translate_domain_errors():
return ScreeningService(db).list_screenings(tenant_id)
finally:
db.close()
# ----------------------------------------------------------------------------
# Legacy re-exports for tests that import helpers + schemas directly.
# ----------------------------------------------------------------------------
__all__ = [
"router",
"SessionLocal",
"parse_package_lock",
"parse_requirements_txt",
"parse_yarn_lock",
"detect_and_parse",
"generate_sbom",
"query_osv",
"map_osv_severity",
"extract_fix_version",
"scan_vulnerabilities",
"ScreeningResponse",
"ScreeningListResponse",
"SBOMComponentResponse",
"SecurityIssueResponse",
]